From fd7fed151123df9e483d46da045f5975949bd4a2 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Thu, 11 Mar 2021 19:55:20 +0100 Subject: [PATCH] Move config (ServerConfig) out of etcdserver package. Motivation: - ServerConfig is part of 'embed' public API, while etcdserver is more 'internal' - EtcdServer is already too big and config is pretty wide-spread leaf if we were to split etcdserver (e.g. into pre & post-apply part). --- etcdctl/snapshot/v3_snapshot.go | 3 ++- server/{etcdserver => config}/config.go | 16 +++++++------- server/{etcdserver => config}/config_test.go | 2 +- server/embed/etcd.go | 5 +++-- server/etcdserver/backend.go | 15 +++++++------ server/etcdserver/raft.go | 7 +++--- server/etcdserver/server.go | 17 ++++++++------- server/etcdserver/server_test.go | 23 ++++++++++---------- tests/integration/cluster.go | 3 ++- 9 files changed, 49 insertions(+), 42 deletions(-) rename server/{etcdserver => config}/config.go (96%) rename server/{etcdserver => config}/config_test.go (99%) diff --git a/etcdctl/snapshot/v3_snapshot.go b/etcdctl/snapshot/v3_snapshot.go index 4b4d8dd92..9594bdcb6 100644 --- a/etcdctl/snapshot/v3_snapshot.go +++ b/etcdctl/snapshot/v3_snapshot.go @@ -36,6 +36,7 @@ import ( "go.etcd.io/etcd/pkg/v3/types" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" + "go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" @@ -213,7 +214,7 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error { return err } - srv := etcdserver.ServerConfig{ + srv := config.ServerConfig{ Logger: s.lg, Name: cfg.Name, PeerURLs: pURLs, diff --git a/server/etcdserver/config.go b/server/config/config.go similarity index 96% rename from server/etcdserver/config.go rename to server/config/config.go index df00df5a1..7f4f6ea67 100644 --- a/server/etcdserver/config.go +++ b/server/config/config.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package etcdserver +package config import ( "context" @@ -188,7 +188,7 @@ func (c *ServerConfig) VerifyBootstrap() error { if err := c.advertiseMatchesCluster(); err != nil { return err } - if checkDuplicateURL(c.InitialPeerURLsMap) { + if CheckDuplicateURL(c.InitialPeerURLsMap) { return fmt.Errorf("initial cluster %s has duplicate url", c.InitialPeerURLsMap) } if c.InitialPeerURLsMap.String() == "" && c.DiscoveryURL == "" { @@ -205,7 +205,7 @@ func (c *ServerConfig) VerifyJoinExisting() error { if err := c.hasLocalMember(); err != nil { return err } - if checkDuplicateURL(c.InitialPeerURLsMap) { + if CheckDuplicateURL(c.InitialPeerURLsMap) { return fmt.Errorf("initial cluster %s has duplicate url", c.InitialPeerURLsMap) } if c.DiscoveryURL != "" { @@ -294,16 +294,16 @@ func (c *ServerConfig) ReqTimeout() time.Duration { return 5*time.Second + 2*time.Duration(c.ElectionTicks*int(c.TickMs))*time.Millisecond } -func (c *ServerConfig) electionTimeout() time.Duration { +func (c *ServerConfig) ElectionTimeout() time.Duration { return time.Duration(c.ElectionTicks*int(c.TickMs)) * time.Millisecond } -func (c *ServerConfig) peerDialTimeout() time.Duration { +func (c *ServerConfig) PeerDialTimeout() time.Duration { // 1s for queue wait and election timeout return time.Second + time.Duration(c.ElectionTicks*int(c.TickMs))*time.Millisecond } -func checkDuplicateURL(urlsmap types.URLsMap) bool { +func CheckDuplicateURL(urlsmap types.URLsMap) bool { um := make(map[string]bool) for _, urls := range urlsmap { for _, url := range urls { @@ -317,11 +317,11 @@ func checkDuplicateURL(urlsmap types.URLsMap) bool { return false } -func (c *ServerConfig) bootstrapTimeout() time.Duration { +func (c *ServerConfig) BootstrapTimeoutEffective() time.Duration { if c.BootstrapTimeout != 0 { return c.BootstrapTimeout } return time.Second } -func (c *ServerConfig) backendPath() string { return filepath.Join(c.SnapDir(), "db") } +func (c *ServerConfig) BackendPath() string { return filepath.Join(c.SnapDir(), "db") } diff --git a/server/etcdserver/config_test.go b/server/config/config_test.go similarity index 99% rename from server/etcdserver/config_test.go rename to server/config/config_test.go index 9e2f0b919..0846e70f2 100644 --- a/server/etcdserver/config_test.go +++ b/server/config/config_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package etcdserver +package config import ( "net/url" diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 84f46c694..73ef8bfb8 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -34,6 +34,7 @@ import ( runtimeutil "go.etcd.io/etcd/pkg/v3/runtime" "go.etcd.io/etcd/pkg/v3/transport" "go.etcd.io/etcd/pkg/v3/types" + "go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" @@ -161,7 +162,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType) - srvcfg := etcdserver.ServerConfig{ + srvcfg := config.ServerConfig{ Name: cfg.Name, ClientURLs: cfg.ACUrls, PeerURLs: cfg.APUrls, @@ -256,7 +257,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { return e, nil } -func print(lg *zap.Logger, ec Config, sc etcdserver.ServerConfig, memberInitialized bool) { +func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized bool) { cors := make([]string, 0, len(ec.CORS)) for v := range ec.CORS { cors = append(cors, v) diff --git a/server/etcdserver/backend.go b/server/etcdserver/backend.go index 41747b138..346ab4303 100644 --- a/server/etcdserver/backend.go +++ b/server/etcdserver/backend.go @@ -20,6 +20,7 @@ import ( "time" "go.etcd.io/etcd/raft/v3/raftpb" + "go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/mvcc/backend" @@ -27,9 +28,9 @@ import ( "go.uber.org/zap" ) -func newBackend(cfg ServerConfig) backend.Backend { +func newBackend(cfg config.ServerConfig) backend.Backend { bcfg := backend.DefaultBackendConfig() - bcfg.Path = cfg.backendPath() + bcfg.Path = cfg.BackendPath() bcfg.UnsafeNoFsync = cfg.UnsafeNoFsync if cfg.BackendBatchLimit != 0 { bcfg.BatchLimit = cfg.BackendBatchLimit @@ -53,20 +54,20 @@ func newBackend(cfg ServerConfig) backend.Backend { } // openSnapshotBackend renames a snapshot db to the current etcd db and opens it. -func openSnapshotBackend(cfg ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) { +func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) { snapPath, err := ss.DBFilePath(snapshot.Metadata.Index) if err != nil { return nil, fmt.Errorf("failed to find database snapshot file (%v)", err) } - if err := os.Rename(snapPath, cfg.backendPath()); err != nil { + if err := os.Rename(snapPath, cfg.BackendPath()); err != nil { return nil, fmt.Errorf("failed to rename database snapshot file (%v)", err) } return openBackend(cfg), nil } // openBackend returns a backend using the current etcd db. -func openBackend(cfg ServerConfig) backend.Backend { - fn := cfg.backendPath() +func openBackend(cfg config.ServerConfig) backend.Backend { + fn := cfg.BackendPath() now, beOpened := time.Now(), make(chan backend.Backend) go func() { @@ -93,7 +94,7 @@ func openBackend(cfg ServerConfig) backend.Backend { // before updating the backend db after persisting raft snapshot to disk, // violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this // case, replace the db with the snapshot db sent by the leader. -func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool) (backend.Backend, error) { +func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool) (backend.Backend, error) { consistentIndex := uint64(0) if beExist { ci := cindex.NewConsistentIndex(oldbe.BatchTx()) diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 09812ee67..7c617e725 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -30,6 +30,7 @@ import ( "go.etcd.io/etcd/pkg/v3/types" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" + "go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/wal" @@ -419,7 +420,7 @@ func (r *raftNode) advanceTicks(ticks int) { } } -func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { +func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { var err error member := cl.MemberByName(cfg.Name) metadata := pbutil.MustMarshal( @@ -483,7 +484,7 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id return id, n, s, w } -func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { +func restartNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term @@ -534,7 +535,7 @@ func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *member return id, cl, n, s, w } -func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { +func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 662458a6d..17a473867 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -33,6 +33,7 @@ import ( "github.com/coreos/go-semver/semver" humanize "github.com/dustin/go-humanize" "github.com/prometheus/client_golang/prometheus" + "go.etcd.io/etcd/server/v3/config" "go.uber.org/zap" pb "go.etcd.io/etcd/api/v3/etcdserverpb" @@ -210,7 +211,7 @@ type EtcdServer struct { r raftNode // uses 64-bit atomics; keep 64-bit aligned. readych chan struct{} - Cfg ServerConfig + Cfg config.ServerConfig lgMu *sync.RWMutex lg *zap.Logger @@ -295,7 +296,7 @@ type EtcdServer struct { // NewServer creates a new EtcdServer from the supplied configuration. The // configuration is considered static for the lifetime of the EtcdServer. -func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { +func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) var ( @@ -339,7 +340,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { } ss := snap.New(cfg.Logger, cfg.SnapDir()) - bepath := cfg.backendPath() + bepath := cfg.BackendPath() beExist := fileutil.Exist(bepath) be := openBackend(cfg) @@ -349,7 +350,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { } }() - prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout()) + prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.PeerDialTimeout()) if err != nil { return nil, err } @@ -394,7 +395,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { return nil, err } m := cl.MemberByName(cfg.Name) - if isMemberBootstrapped(cfg.Logger, cl, cfg.Name, prt, cfg.bootstrapTimeout()) { + if isMemberBootstrapped(cfg.Logger, cl, cfg.Name, prt, cfg.BootstrapTimeoutEffective()) { return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID) } if cfg.ShouldDiscover() { @@ -408,7 +409,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { if err != nil { return nil, err } - if checkDuplicateURL(urlsmap) { + if config.CheckDuplicateURL(urlsmap) { return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap) } if cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, urlsmap); err != nil { @@ -611,7 +612,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { tr := &rafthttp.Transport{ Logger: cfg.Logger, TLSInfo: cfg.PeerTLSInfo, - DialTimeout: cfg.peerDialTimeout(), + DialTimeout: cfg.PeerDialTimeout(), ID: id, URLs: cfg.PeerURLs, ClusterID: cl.ID(), @@ -2073,7 +2074,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { // promote lessor when the local member is leader and finished // applying all entries from the last term. if s.isLeader() { - s.lessor.Promote(s.Cfg.electionTimeout()) + s.lessor.Promote(s.Cfg.ElectionTimeout()) } return } diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 5285b356b..5d4fe044e 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -36,6 +36,7 @@ import ( "go.etcd.io/etcd/pkg/v3/wait" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" + "go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" @@ -738,7 +739,7 @@ func TestDoProposal(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, + Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *r, v2store: st, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -770,7 +771,7 @@ func TestDoProposalCancelled(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, + Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}), w: wt, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -794,7 +795,7 @@ func TestDoProposalTimeout(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, + Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}), w: mockwait.NewNop(), reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -813,7 +814,7 @@ func TestDoProposalStopped(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, + Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: newNodeNop()}), w: mockwait.NewNop(), reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -924,7 +925,7 @@ func TestSyncTrigger(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, + Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *r, v2store: mockstore.NewNop(), SyncTicker: tk, @@ -1066,7 +1067,7 @@ func TestSnapshotOrdering(t *testing.T) { s := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, + Cfg: config.ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *r, v2store: st, snapshotter: snap.New(zap.NewExample(), snapdir), @@ -1141,7 +1142,7 @@ func TestTriggerSnap(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCount: uint64(snapc), SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, + Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCount: uint64(snapc), SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *r, v2store: st, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -1219,7 +1220,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { s := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, + Cfg: config.ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *r, v2store: st, snapshotter: snap.New(zap.NewExample(), testdir), @@ -1430,7 +1431,7 @@ func TestPublish(t *testing.T) { lgMu: new(sync.RWMutex), lg: zap.NewExample(), readych: make(chan struct{}), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, + Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, id: 1, r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}), attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}, @@ -1483,7 +1484,7 @@ func TestPublishStopped(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, + Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *r, cluster: &membership.RaftCluster{}, w: mockwait.NewNop(), @@ -1507,7 +1508,7 @@ func TestPublishRetry(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, + Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}), w: mockwait.NewNop(), stopping: make(chan struct{}), diff --git a/tests/integration/cluster.go b/tests/integration/cluster.go index b57312667..74e54ca8b 100644 --- a/tests/integration/cluster.go +++ b/tests/integration/cluster.go @@ -40,6 +40,7 @@ import ( "go.etcd.io/etcd/pkg/v3/transport" "go.etcd.io/etcd/pkg/v3/types" "go.etcd.io/etcd/raft/v3" + "go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/embed" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp" @@ -556,7 +557,7 @@ func NewListenerWithAddr(t testutil.TB, addr string) net.Listener { } type member struct { - etcdserver.ServerConfig + config.ServerConfig PeerListeners, ClientListeners []net.Listener grpcListener net.Listener // PeerTLSInfo enables peer TLS when set