Move config (ServerConfig) out of etcdserver package.

Motivation:
  - ServerConfig is part of 'embed' public API, while etcdserver is more 'internal'
  - EtcdServer is already too big and config is pretty wide-spread leaf
if we were to split etcdserver (e.g. into pre & post-apply part).
release-3.5
Piotr Tabor 2021-03-11 19:55:20 +01:00
parent 3ead91ca3e
commit fd7fed1511
9 changed files with 49 additions and 42 deletions

View File

@ -36,6 +36,7 @@ import (
"go.etcd.io/etcd/pkg/v3/types"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
@ -213,7 +214,7 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error {
return err
}
srv := etcdserver.ServerConfig{
srv := config.ServerConfig{
Logger: s.lg,
Name: cfg.Name,
PeerURLs: pURLs,

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
package config
import (
"context"
@ -188,7 +188,7 @@ func (c *ServerConfig) VerifyBootstrap() error {
if err := c.advertiseMatchesCluster(); err != nil {
return err
}
if checkDuplicateURL(c.InitialPeerURLsMap) {
if CheckDuplicateURL(c.InitialPeerURLsMap) {
return fmt.Errorf("initial cluster %s has duplicate url", c.InitialPeerURLsMap)
}
if c.InitialPeerURLsMap.String() == "" && c.DiscoveryURL == "" {
@ -205,7 +205,7 @@ func (c *ServerConfig) VerifyJoinExisting() error {
if err := c.hasLocalMember(); err != nil {
return err
}
if checkDuplicateURL(c.InitialPeerURLsMap) {
if CheckDuplicateURL(c.InitialPeerURLsMap) {
return fmt.Errorf("initial cluster %s has duplicate url", c.InitialPeerURLsMap)
}
if c.DiscoveryURL != "" {
@ -294,16 +294,16 @@ func (c *ServerConfig) ReqTimeout() time.Duration {
return 5*time.Second + 2*time.Duration(c.ElectionTicks*int(c.TickMs))*time.Millisecond
}
func (c *ServerConfig) electionTimeout() time.Duration {
func (c *ServerConfig) ElectionTimeout() time.Duration {
return time.Duration(c.ElectionTicks*int(c.TickMs)) * time.Millisecond
}
func (c *ServerConfig) peerDialTimeout() time.Duration {
func (c *ServerConfig) PeerDialTimeout() time.Duration {
// 1s for queue wait and election timeout
return time.Second + time.Duration(c.ElectionTicks*int(c.TickMs))*time.Millisecond
}
func checkDuplicateURL(urlsmap types.URLsMap) bool {
func CheckDuplicateURL(urlsmap types.URLsMap) bool {
um := make(map[string]bool)
for _, urls := range urlsmap {
for _, url := range urls {
@ -317,11 +317,11 @@ func checkDuplicateURL(urlsmap types.URLsMap) bool {
return false
}
func (c *ServerConfig) bootstrapTimeout() time.Duration {
func (c *ServerConfig) BootstrapTimeoutEffective() time.Duration {
if c.BootstrapTimeout != 0 {
return c.BootstrapTimeout
}
return time.Second
}
func (c *ServerConfig) backendPath() string { return filepath.Join(c.SnapDir(), "db") }
func (c *ServerConfig) BackendPath() string { return filepath.Join(c.SnapDir(), "db") }

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
package config
import (
"net/url"

View File

@ -34,6 +34,7 @@ import (
runtimeutil "go.etcd.io/etcd/pkg/v3/runtime"
"go.etcd.io/etcd/pkg/v3/transport"
"go.etcd.io/etcd/pkg/v3/types"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
@ -161,7 +162,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType)
srvcfg := etcdserver.ServerConfig{
srvcfg := config.ServerConfig{
Name: cfg.Name,
ClientURLs: cfg.ACUrls,
PeerURLs: cfg.APUrls,
@ -256,7 +257,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
return e, nil
}
func print(lg *zap.Logger, ec Config, sc etcdserver.ServerConfig, memberInitialized bool) {
func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized bool) {
cors := make([]string, 0, len(ec.CORS))
for v := range ec.CORS {
cors = append(cors, v)

View File

@ -20,6 +20,7 @@ import (
"time"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/mvcc/backend"
@ -27,9 +28,9 @@ import (
"go.uber.org/zap"
)
func newBackend(cfg ServerConfig) backend.Backend {
func newBackend(cfg config.ServerConfig) backend.Backend {
bcfg := backend.DefaultBackendConfig()
bcfg.Path = cfg.backendPath()
bcfg.Path = cfg.BackendPath()
bcfg.UnsafeNoFsync = cfg.UnsafeNoFsync
if cfg.BackendBatchLimit != 0 {
bcfg.BatchLimit = cfg.BackendBatchLimit
@ -53,20 +54,20 @@ func newBackend(cfg ServerConfig) backend.Backend {
}
// openSnapshotBackend renames a snapshot db to the current etcd db and opens it.
func openSnapshotBackend(cfg ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) {
func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) {
snapPath, err := ss.DBFilePath(snapshot.Metadata.Index)
if err != nil {
return nil, fmt.Errorf("failed to find database snapshot file (%v)", err)
}
if err := os.Rename(snapPath, cfg.backendPath()); err != nil {
if err := os.Rename(snapPath, cfg.BackendPath()); err != nil {
return nil, fmt.Errorf("failed to rename database snapshot file (%v)", err)
}
return openBackend(cfg), nil
}
// openBackend returns a backend using the current etcd db.
func openBackend(cfg ServerConfig) backend.Backend {
fn := cfg.backendPath()
func openBackend(cfg config.ServerConfig) backend.Backend {
fn := cfg.BackendPath()
now, beOpened := time.Now(), make(chan backend.Backend)
go func() {
@ -93,7 +94,7 @@ func openBackend(cfg ServerConfig) backend.Backend {
// before updating the backend db after persisting raft snapshot to disk,
// violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this
// case, replace the db with the snapshot db sent by the leader.
func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool) (backend.Backend, error) {
func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool) (backend.Backend, error) {
consistentIndex := uint64(0)
if beExist {
ci := cindex.NewConsistentIndex(oldbe.BatchTx())

View File

@ -30,6 +30,7 @@ import (
"go.etcd.io/etcd/pkg/v3/types"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
"go.etcd.io/etcd/server/v3/wal"
@ -419,7 +420,7 @@ func (r *raftNode) advanceTicks(ticks int) {
}
}
func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
var err error
member := cl.MemberByName(cfg.Name)
metadata := pbutil.MustMarshal(
@ -483,7 +484,7 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id
return id, n, s, w
}
func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
func restartNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
@ -534,7 +535,7 @@ func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *member
return id, cl, n, s, w
}
func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term

View File

@ -33,6 +33,7 @@ import (
"github.com/coreos/go-semver/semver"
humanize "github.com/dustin/go-humanize"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/etcd/server/v3/config"
"go.uber.org/zap"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
@ -210,7 +211,7 @@ type EtcdServer struct {
r raftNode // uses 64-bit atomics; keep 64-bit aligned.
readych chan struct{}
Cfg ServerConfig
Cfg config.ServerConfig
lgMu *sync.RWMutex
lg *zap.Logger
@ -295,7 +296,7 @@ type EtcdServer struct {
// NewServer creates a new EtcdServer from the supplied configuration. The
// configuration is considered static for the lifetime of the EtcdServer.
func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
var (
@ -339,7 +340,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
}
ss := snap.New(cfg.Logger, cfg.SnapDir())
bepath := cfg.backendPath()
bepath := cfg.BackendPath()
beExist := fileutil.Exist(bepath)
be := openBackend(cfg)
@ -349,7 +350,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
}
}()
prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout())
prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.PeerDialTimeout())
if err != nil {
return nil, err
}
@ -394,7 +395,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
return nil, err
}
m := cl.MemberByName(cfg.Name)
if isMemberBootstrapped(cfg.Logger, cl, cfg.Name, prt, cfg.bootstrapTimeout()) {
if isMemberBootstrapped(cfg.Logger, cl, cfg.Name, prt, cfg.BootstrapTimeoutEffective()) {
return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID)
}
if cfg.ShouldDiscover() {
@ -408,7 +409,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
if err != nil {
return nil, err
}
if checkDuplicateURL(urlsmap) {
if config.CheckDuplicateURL(urlsmap) {
return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap)
}
if cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, urlsmap); err != nil {
@ -611,7 +612,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
tr := &rafthttp.Transport{
Logger: cfg.Logger,
TLSInfo: cfg.PeerTLSInfo,
DialTimeout: cfg.peerDialTimeout(),
DialTimeout: cfg.PeerDialTimeout(),
ID: id,
URLs: cfg.PeerURLs,
ClusterID: cl.ID(),
@ -2073,7 +2074,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
// promote lessor when the local member is leader and finished
// applying all entries from the last term.
if s.isLeader() {
s.lessor.Promote(s.Cfg.electionTimeout())
s.lessor.Promote(s.Cfg.ElectionTimeout())
}
return
}

View File

@ -36,6 +36,7 @@ import (
"go.etcd.io/etcd/pkg/v3/wait"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
@ -738,7 +739,7 @@ func TestDoProposal(t *testing.T) {
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *r,
v2store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -770,7 +771,7 @@ func TestDoProposalCancelled(t *testing.T) {
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
w: wt,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -794,7 +795,7 @@ func TestDoProposalTimeout(t *testing.T) {
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
w: mockwait.NewNop(),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -813,7 +814,7 @@ func TestDoProposalStopped(t *testing.T) {
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: newNodeNop()}),
w: mockwait.NewNop(),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -924,7 +925,7 @@ func TestSyncTrigger(t *testing.T) {
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *r,
v2store: mockstore.NewNop(),
SyncTicker: tk,
@ -1066,7 +1067,7 @@ func TestSnapshotOrdering(t *testing.T) {
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
Cfg: config.ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *r,
v2store: st,
snapshotter: snap.New(zap.NewExample(), snapdir),
@ -1141,7 +1142,7 @@ func TestTriggerSnap(t *testing.T) {
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCount: uint64(snapc), SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCount: uint64(snapc), SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *r,
v2store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -1219,7 +1220,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
Cfg: config.ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *r,
v2store: st,
snapshotter: snap.New(zap.NewExample(), testdir),
@ -1430,7 +1431,7 @@ func TestPublish(t *testing.T) {
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
readych: make(chan struct{}),
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
id: 1,
r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
@ -1483,7 +1484,7 @@ func TestPublishStopped(t *testing.T) {
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *r,
cluster: &membership.RaftCluster{},
w: mockwait.NewNop(),
@ -1507,7 +1508,7 @@ func TestPublishRetry(t *testing.T) {
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
w: mockwait.NewNop(),
stopping: make(chan struct{}),

View File

@ -40,6 +40,7 @@ import (
"go.etcd.io/etcd/pkg/v3/transport"
"go.etcd.io/etcd/pkg/v3/types"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp"
@ -556,7 +557,7 @@ func NewListenerWithAddr(t testutil.TB, addr string) net.Listener {
}
type member struct {
etcdserver.ServerConfig
config.ServerConfig
PeerListeners, ClientListeners []net.Listener
grpcListener net.Listener
// PeerTLSInfo enables peer TLS when set