From 714b48a4b4b5df3a54a6d8e39eaa832ed4695c37 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 17 Apr 2017 16:32:36 -0700 Subject: [PATCH] etcdserver: initialize raftNode with constructor raftNode was being initialized in start(), which was causing hangs when trying to stop the etcd server since the stop channel would not be initialized in time for the stop call. Instead, setup non-configurable bits in a constructor. Fixes #7668 --- etcdserver/raft.go | 71 ++++++++------ etcdserver/raft_test.go | 14 +-- etcdserver/server.go | 43 +++++---- etcdserver/server_test.go | 193 +++++++++++++++++++------------------- 4 files changed, 166 insertions(+), 155 deletions(-) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index a9825d0a5..b603c459c 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -94,14 +94,7 @@ type raftNode struct { term uint64 lead uint64 - mu sync.Mutex - // last lead elected time - lt time.Time - - // to check if msg receiver is removed from cluster - isIDRemoved func(id uint64) bool - - raft.Node + raftNodeConfig // a chan to send/receive snapshot msgSnapC chan raftpb.Message @@ -115,26 +108,49 @@ type raftNode struct { // utility ticker *time.Ticker // contention detectors for raft heartbeat message - td *contention.TimeoutDetector - heartbeat time.Duration // for logging - raftStorage *raft.MemoryStorage - storage Storage - // transport specifies the transport to send and receive msgs to members. - // Sending messages MUST NOT block. It is okay to drop messages, since - // clients should timeout and reissue their messages. - // If transport is nil, server will panic. - transport rafthttp.Transporter + td *contention.TimeoutDetector stopped chan struct{} done chan struct{} } +type raftNodeConfig struct { + // to check if msg receiver is removed from cluster + isIDRemoved func(id uint64) bool + raft.Node + raftStorage *raft.MemoryStorage + storage Storage + heartbeat time.Duration // for logging + // transport specifies the transport to send and receive msgs to members. + // Sending messages MUST NOT block. It is okay to drop messages, since + // clients should timeout and reissue their messages. + // If transport is nil, server will panic. + transport rafthttp.Transporter +} + +func newRaftNode(cfg raftNodeConfig) *raftNode { + r := &raftNode{ + raftNodeConfig: cfg, + // set up contention detectors for raft heartbeat message. + // expect to send a heartbeat within 2 heartbeat intervals. + td: contention.NewTimeoutDetector(2 * cfg.heartbeat), + readStateC: make(chan raft.ReadState, 1), + msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), + applyc: make(chan apply), + stopped: make(chan struct{}), + done: make(chan struct{}), + } + if r.heartbeat == 0 { + r.ticker = &time.Ticker{} + } else { + r.ticker = time.NewTicker(r.heartbeat) + } + return r +} + // start prepares and starts raftNode in a new goroutine. It is no longer safe // to modify the fields after it has been started. func (r *raftNode) start(rh *raftReadyHandler) { - r.applyc = make(chan apply) - r.stopped = make(chan struct{}) - r.done = make(chan struct{}) internalTimeout := time.Second go func() { @@ -147,10 +163,8 @@ func (r *raftNode) start(rh *raftReadyHandler) { r.Tick() case rd := <-r.Ready(): if rd.SoftState != nil { - if lead := atomic.LoadUint64(&r.lead); rd.SoftState.Lead != raft.None && lead != rd.SoftState.Lead { - r.mu.Lock() - r.lt = time.Now() - r.mu.Unlock() + newLeader := rd.SoftState.Lead != raft.None && atomic.LoadUint64(&r.lead) != rd.SoftState.Lead + if newLeader { leaderChanges.Inc() } @@ -162,7 +176,8 @@ func (r *raftNode) start(rh *raftReadyHandler) { atomic.StoreUint64(&r.lead, rd.SoftState.Lead) islead = rd.RaftState == raft.StateLeader - rh.updateLeadership() + rh.updateLeadership(newLeader) + r.td.Reset() } if len(rd.ReadStates) != 0 { @@ -316,12 +331,6 @@ func (r *raftNode) apply() chan apply { return r.applyc } -func (r *raftNode) leadElectedTime() time.Time { - r.mu.Lock() - defer r.mu.Unlock() - return r.lt -} - func (r *raftNode) stop() { r.stopped <- struct{}{} <-r.done diff --git a/etcdserver/raft_test.go b/etcdserver/raft_test.go index 945f63ce2..fa5c0174b 100644 --- a/etcdserver/raft_test.go +++ b/etcdserver/raft_test.go @@ -153,13 +153,13 @@ func TestCreateConfigChangeEnts(t *testing.T) { func TestStopRaftWhenWaitingForApplyDone(t *testing.T) { n := newNopReadyNode() - srv := &EtcdServer{r: raftNode{ + r := newRaftNode(raftNodeConfig{ Node: n, storage: mockstorage.NewStorageRecorder(""), raftStorage: raft.NewMemoryStorage(), transport: rafthttp.NewNopTransporter(), - ticker: &time.Ticker{}, - }} + }) + srv := &EtcdServer{r: *r} srv.r.start(nil) n.readyc <- raft.Ready{} select { @@ -182,16 +182,16 @@ func TestConfgChangeBlocksApply(t *testing.T) { waitApplyc := make(chan struct{}) - srv := &EtcdServer{r: raftNode{ + r := newRaftNode(raftNodeConfig{ Node: n, storage: mockstorage.NewStorageRecorder(""), raftStorage: raft.NewMemoryStorage(), transport: rafthttp.NewNopTransporter(), - ticker: &time.Ticker{}, - }} + }) + srv := &EtcdServer{r: *r} rh := &raftReadyHandler{ - updateLeadership: func() {}, + updateLeadership: func(bool) {}, waitForApply: func() { <-waitApplyc }, diff --git a/etcdserver/server.go b/etcdserver/server.go index cd9533945..aa4a17359 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -41,7 +41,6 @@ import ( "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc" "github.com/coreos/etcd/mvcc/backend" - "github.com/coreos/etcd/pkg/contention" "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/idutil" "github.com/coreos/etcd/pkg/pbutil" @@ -243,6 +242,9 @@ type EtcdServer struct { // on etcd server shutdown. ctx context.Context cancel context.CancelFunc + + leadTimeMu sync.RWMutex + leadElectedTime time.Time } // NewServer creates a new EtcdServer from the supplied configuration. The @@ -419,19 +421,15 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { snapCount: cfg.SnapCount, errorc: make(chan error, 1), store: st, - r: raftNode{ - isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, - Node: n, - ticker: time.NewTicker(heartbeat), - // set up contention detectors for raft heartbeat message. - // expect to send a heartbeat within 2 heartbeat intervals. - td: contention.NewTimeoutDetector(2 * heartbeat), - heartbeat: heartbeat, - raftStorage: s, - storage: NewStorage(w, ss), - msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), - readStateC: make(chan raft.ReadState, 1), - }, + r: *newRaftNode( + raftNodeConfig{ + isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, + Node: n, + heartbeat: heartbeat, + raftStorage: s, + storage: NewStorage(w, ss), + }, + ), id: id, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, cluster: cl, @@ -614,7 +612,7 @@ type etcdProgress struct { // and helps decouple state machine logic from Raft algorithms. // TODO: add a state machine interface to apply the commit entries and do snapshot/recover type raftReadyHandler struct { - updateLeadership func() + updateLeadership func(newLeader bool) updateCommittedIndex func(uint64) waitForApply func() } @@ -644,7 +642,7 @@ func (s *EtcdServer) run() { return } rh := &raftReadyHandler{ - updateLeadership: func() { + updateLeadership: func(newLeader bool) { if !s.isLeader() { if s.lessor != nil { s.lessor.Demote() @@ -654,6 +652,12 @@ func (s *EtcdServer) run() { } setSyncC(nil) } else { + if newLeader { + t := time.Now() + s.leadTimeMu.Lock() + s.leadElectedTime = t + s.leadTimeMu.Unlock() + } setSyncC(s.SyncTicker.C) if s.compactor != nil { s.compactor.Resume() @@ -665,9 +669,6 @@ func (s *EtcdServer) run() { if s.stats != nil { s.stats.BecomeLeader() } - if s.r.td != nil { - s.r.td.Reset() - } }, updateCommittedIndex: func(ci uint64) { cci := s.getCommittedIndex() @@ -1580,7 +1581,9 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error { case context.Canceled: return ErrCanceled case context.DeadlineExceeded: - curLeadElected := s.r.leadElectedTime() + s.leadTimeMu.RLock() + curLeadElected := s.leadElectedTime + s.leadTimeMu.RUnlock() prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond) if start.After(prevLeadLost) && start.Before(curLeadElected) { return ErrTimeoutDueToLeaderFail diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 60aabc00e..f8bbb0dc4 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -167,14 +167,14 @@ func TestApplyRepeat(t *testing.T) { st := store.New() cl.SetStore(store.New()) cl.AddMember(&membership.Member{ID: 1234}) + r := newRaftNode(raftNodeConfig{ + Node: n, + raftStorage: raft.NewMemoryStorage(), + storage: mockstorage.NewStorageRecorder(""), + transport: rafthttp.NewNopTransporter(), + }) s := &EtcdServer{ - r: raftNode{ - Node: n, - raftStorage: raft.NewMemoryStorage(), - storage: mockstorage.NewStorageRecorder(""), - transport: rafthttp.NewNopTransporter(), - ticker: &time.Ticker{}, - }, + r: *r, Cfg: &ServerConfig{}, store: st, cluster: cl, @@ -525,7 +525,7 @@ func TestApplyConfChangeError(t *testing.T) { for i, tt := range tests { n := newNodeRecorder() srv := &EtcdServer{ - r: raftNode{Node: n}, + r: *newRaftNode(raftNodeConfig{Node: n}), cluster: cl, Cfg: &ServerConfig{}, } @@ -552,12 +552,13 @@ func TestApplyConfChangeShouldStop(t *testing.T) { for i := 1; i <= 3; i++ { cl.AddMember(&membership.Member{ID: types.ID(i)}) } + r := newRaftNode(raftNodeConfig{ + Node: newNodeNop(), + transport: rafthttp.NewNopTransporter(), + }) srv := &EtcdServer{ - id: 1, - r: raftNode{ - Node: newNodeNop(), - transport: rafthttp.NewNopTransporter(), - }, + id: 1, + r: *r, cluster: cl, } cc := raftpb.ConfChange{ @@ -592,12 +593,13 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { for i := 1; i <= 5; i++ { cl.AddMember(&membership.Member{ID: types.ID(i)}) } + r := newRaftNode(raftNodeConfig{ + Node: newNodeNop(), + transport: rafthttp.NewNopTransporter(), + }) srv := &EtcdServer{ - id: 2, - r: raftNode{ - Node: newNodeNop(), - transport: rafthttp.NewNopTransporter(), - }, + id: 2, + r: *r, cluster: cl, w: wait.New(), } @@ -630,15 +632,15 @@ func TestDoProposal(t *testing.T) { } for i, tt := range tests { st := mockstore.NewRecorder() + r := newRaftNode(raftNodeConfig{ + Node: newNodeCommitter(), + storage: mockstorage.NewStorageRecorder(""), + raftStorage: raft.NewMemoryStorage(), + transport: rafthttp.NewNopTransporter(), + }) srv := &EtcdServer{ - Cfg: &ServerConfig{TickMs: 1}, - r: raftNode{ - Node: newNodeCommitter(), - storage: mockstorage.NewStorageRecorder(""), - raftStorage: raft.NewMemoryStorage(), - transport: rafthttp.NewNopTransporter(), - ticker: &time.Ticker{}, - }, + Cfg: &ServerConfig{TickMs: 1}, + r: *r, store: st, reqIDGen: idutil.NewGenerator(0, time.Time{}), SyncTicker: &time.Ticker{}, @@ -666,7 +668,7 @@ func TestDoProposalCancelled(t *testing.T) { wt := mockwait.NewRecorder() srv := &EtcdServer{ Cfg: &ServerConfig{TickMs: 1}, - r: raftNode{Node: newNodeNop()}, + r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}), w: wt, reqIDGen: idutil.NewGenerator(0, time.Time{}), } @@ -688,7 +690,7 @@ func TestDoProposalCancelled(t *testing.T) { func TestDoProposalTimeout(t *testing.T) { srv := &EtcdServer{ Cfg: &ServerConfig{TickMs: 1}, - r: raftNode{Node: newNodeNop()}, + r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}), w: mockwait.NewNop(), reqIDGen: idutil.NewGenerator(0, time.Time{}), } @@ -704,7 +706,7 @@ func TestDoProposalTimeout(t *testing.T) { func TestDoProposalStopped(t *testing.T) { srv := &EtcdServer{ Cfg: &ServerConfig{TickMs: 1}, - r: raftNode{Node: newNodeNop()}, + r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}), w: mockwait.NewNop(), reqIDGen: idutil.NewGenerator(0, time.Time{}), } @@ -723,7 +725,7 @@ func TestSync(t *testing.T) { n := newNodeRecorder() ctx, cancel := context.WithCancel(context.TODO()) srv := &EtcdServer{ - r: raftNode{Node: n}, + r: *newRaftNode(raftNodeConfig{Node: n}), reqIDGen: idutil.NewGenerator(0, time.Time{}), ctx: ctx, cancel: cancel, @@ -766,7 +768,7 @@ func TestSyncTimeout(t *testing.T) { n := newProposalBlockerRecorder() ctx, cancel := context.WithCancel(context.TODO()) srv := &EtcdServer{ - r: raftNode{Node: n}, + r: *newRaftNode(raftNodeConfig{Node: n}), reqIDGen: idutil.NewGenerator(0, time.Time{}), ctx: ctx, cancel: cancel, @@ -799,15 +801,16 @@ func TestSyncTrigger(t *testing.T) { n := newReadyNode() st := make(chan time.Time, 1) tk := &time.Ticker{C: st} + r := newRaftNode(raftNodeConfig{ + Node: n, + raftStorage: raft.NewMemoryStorage(), + transport: rafthttp.NewNopTransporter(), + storage: mockstorage.NewStorageRecorder(""), + }) + srv := &EtcdServer{ - Cfg: &ServerConfig{TickMs: 1}, - r: raftNode{ - Node: n, - raftStorage: raft.NewMemoryStorage(), - transport: rafthttp.NewNopTransporter(), - storage: mockstorage.NewStorageRecorder(""), - ticker: &time.Ticker{}, - }, + Cfg: &ServerConfig{TickMs: 1}, + r: *r, store: mockstore.NewNop(), SyncTicker: tk, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -858,13 +861,14 @@ func TestSnapshot(t *testing.T) { s.Append([]raftpb.Entry{{Index: 1}}) st := mockstore.NewRecorderStream() p := mockstorage.NewStorageRecorderStream("") + r := newRaftNode(raftNodeConfig{ + Node: newNodeNop(), + raftStorage: s, + storage: p, + }) srv := &EtcdServer{ - Cfg: &ServerConfig{}, - r: raftNode{ - Node: newNodeNop(), - raftStorage: s, - storage: p, - }, + Cfg: &ServerConfig{}, + r: *r, store: st, } srv.kv = mvcc.New(be, &lease.FakeLessor{}, &srv.consistIndex) @@ -914,16 +918,16 @@ func TestTriggerSnap(t *testing.T) { snapc := 10 st := mockstore.NewRecorder() p := mockstorage.NewStorageRecorderStream("") + r := newRaftNode(raftNodeConfig{ + Node: newNodeCommitter(), + raftStorage: raft.NewMemoryStorage(), + storage: p, + transport: rafthttp.NewNopTransporter(), + }) srv := &EtcdServer{ - Cfg: &ServerConfig{TickMs: 1}, - snapCount: uint64(snapc), - r: raftNode{ - Node: newNodeCommitter(), - raftStorage: raft.NewMemoryStorage(), - storage: p, - transport: rafthttp.NewNopTransporter(), - ticker: &time.Ticker{}, - }, + Cfg: &ServerConfig{TickMs: 1}, + snapCount: uint64(snapc), + r: *r, store: st, reqIDGen: idutil.NewGenerator(0, time.Time{}), SyncTicker: &time.Ticker{}, @@ -962,10 +966,6 @@ func TestTriggerSnap(t *testing.T) { // TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with // proposals. func TestConcurrentApplyAndSnapshotV3(t *testing.T) { - const ( - // snapshots that may queue up at once without dropping - maxInFlightMsgSnap = 16 - ) n := newNopReadyNode() st := store.New() cl := membership.NewCluster("abc") @@ -982,19 +982,18 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { rs := raft.NewMemoryStorage() tr, snapDoneC := rafthttp.NewSnapTransporter(testdir) + r := newRaftNode(raftNodeConfig{ + isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, + Node: n, + transport: tr, + storage: mockstorage.NewStorageRecorder(testdir), + raftStorage: rs, + }) s := &EtcdServer{ Cfg: &ServerConfig{ DataDir: testdir, }, - r: raftNode{ - isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, - Node: n, - transport: tr, - storage: mockstorage.NewStorageRecorder(testdir), - raftStorage: rs, - msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), - ticker: &time.Ticker{}, - }, + r: *r, store: st, cluster: cl, SyncTicker: &time.Ticker{}, @@ -1069,14 +1068,14 @@ func TestAddMember(t *testing.T) { cl := newTestCluster(nil) st := store.New() cl.SetStore(st) + r := newRaftNode(raftNodeConfig{ + Node: n, + raftStorage: raft.NewMemoryStorage(), + storage: mockstorage.NewStorageRecorder(""), + transport: rafthttp.NewNopTransporter(), + }) s := &EtcdServer{ - r: raftNode{ - Node: n, - raftStorage: raft.NewMemoryStorage(), - storage: mockstorage.NewStorageRecorder(""), - transport: rafthttp.NewNopTransporter(), - ticker: &time.Ticker{}, - }, + r: *r, Cfg: &ServerConfig{}, store: st, cluster: cl, @@ -1111,14 +1110,14 @@ func TestRemoveMember(t *testing.T) { st := store.New() cl.SetStore(store.New()) cl.AddMember(&membership.Member{ID: 1234}) + r := newRaftNode(raftNodeConfig{ + Node: n, + raftStorage: raft.NewMemoryStorage(), + storage: mockstorage.NewStorageRecorder(""), + transport: rafthttp.NewNopTransporter(), + }) s := &EtcdServer{ - r: raftNode{ - Node: n, - raftStorage: raft.NewMemoryStorage(), - storage: mockstorage.NewStorageRecorder(""), - transport: rafthttp.NewNopTransporter(), - ticker: &time.Ticker{}, - }, + r: *r, Cfg: &ServerConfig{}, store: st, cluster: cl, @@ -1152,14 +1151,14 @@ func TestUpdateMember(t *testing.T) { st := store.New() cl.SetStore(st) cl.AddMember(&membership.Member{ID: 1234}) + r := newRaftNode(raftNodeConfig{ + Node: n, + raftStorage: raft.NewMemoryStorage(), + storage: mockstorage.NewStorageRecorder(""), + transport: rafthttp.NewNopTransporter(), + }) s := &EtcdServer{ - r: raftNode{ - Node: n, - raftStorage: raft.NewMemoryStorage(), - storage: mockstorage.NewStorageRecorder(""), - transport: rafthttp.NewNopTransporter(), - ticker: &time.Ticker{}, - }, + r: *r, store: st, cluster: cl, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -1196,7 +1195,7 @@ func TestPublish(t *testing.T) { readych: make(chan struct{}), Cfg: &ServerConfig{TickMs: 1}, id: 1, - r: raftNode{Node: n, ticker: &time.Ticker{}}, + r: *newRaftNode(raftNodeConfig{Node: n}), attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}, cluster: &membership.RaftCluster{}, w: w, @@ -1239,13 +1238,13 @@ func TestPublish(t *testing.T) { // TestPublishStopped tests that publish will be stopped if server is stopped. func TestPublishStopped(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) + r := newRaftNode(raftNodeConfig{ + Node: newNodeNop(), + transport: rafthttp.NewNopTransporter(), + }) srv := &EtcdServer{ - Cfg: &ServerConfig{TickMs: 1}, - r: raftNode{ - Node: newNodeNop(), - transport: rafthttp.NewNopTransporter(), - ticker: &time.Ticker{}, - }, + Cfg: &ServerConfig{TickMs: 1}, + r: *r, cluster: &membership.RaftCluster{}, w: mockwait.NewNop(), done: make(chan struct{}), @@ -1267,7 +1266,7 @@ func TestPublishRetry(t *testing.T) { n := newNodeRecorderStream() srv := &EtcdServer{ Cfg: &ServerConfig{TickMs: 1}, - r: raftNode{Node: n, ticker: &time.Ticker{}}, + r: *newRaftNode(raftNodeConfig{Node: n}), w: mockwait.NewNop(), stopping: make(chan struct{}), reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -1308,7 +1307,7 @@ func TestUpdateVersion(t *testing.T) { srv := &EtcdServer{ id: 1, Cfg: &ServerConfig{TickMs: 1}, - r: raftNode{Node: n, ticker: &time.Ticker{}}, + r: *newRaftNode(raftNodeConfig{Node: n}), attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}}, cluster: &membership.RaftCluster{}, w: w,