diff --git a/etcdserver/raft.go b/etcdserver/raft.go index eec154e23..a9825d0a5 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -140,7 +140,6 @@ func (r *raftNode) start(rh *raftReadyHandler) { go func() { defer r.onStop() islead := false - isCandidate := false for { select { @@ -163,7 +162,6 @@ func (r *raftNode) start(rh *raftReadyHandler) { atomic.StoreUint64(&r.lead, rd.SoftState.Lead) islead = rd.RaftState == raft.StateLeader - isCandidate = rd.RaftState == raft.StateCandidate rh.updateLeadership() } @@ -197,7 +195,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { // For more details, check raft thesis 10.2.1 if islead { // gofail: var raftBeforeLeaderSend struct{} - r.sendMessages(rd.Messages) + r.transport.Send(r.processMessages(rd.Messages)) } // gofail: var raftBeforeSave struct{} @@ -223,21 +221,38 @@ func (r *raftNode) start(rh *raftReadyHandler) { r.raftStorage.Append(rd.Entries) if !islead { - // gofail: var raftBeforeFollowerSend struct{} - r.sendMessages(rd.Messages) - } - raftDone <- struct{}{} + // finish processing incoming messages before we signal raftdone chan + msgs := r.processMessages(rd.Messages) - r.Advance() + // now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots + raftDone <- struct{}{} - if isCandidate { - // candidate needs to wait for all pending configuration changes to be applied - // before continue. Or we might incorrectly count the number of votes (e.g. receive vote from - // a removed member). + // Candidate or follower needs to wait for all pending configuration + // changes to be applied before sending messages. + // Otherwise we might incorrectly count votes (e.g. votes from removed members). + // Also slow machine's follower raft-layer could proceed to become the leader + // on its own single-node cluster, before apply-layer applies the config change. // We simply wait for ALL pending entries to be applied for now. // We might improve this later on if it causes unnecessary long blocking issues. - rh.waitForApply() + waitApply := false + for _, ent := range rd.CommittedEntries { + if ent.Type == raftpb.EntryConfChange { + waitApply = true + break + } + } + if waitApply { + rh.waitForApply() + } + + // gofail: var raftBeforeFollowerSend struct{} + r.transport.Send(msgs) + } else { + // leader already processed 'MsgSnap' and signaled + raftDone <- struct{}{} } + + r.Advance() case <-r.stopped: return } @@ -258,7 +273,7 @@ func updateCommittedIndex(ap *apply, rh *raftReadyHandler) { } } -func (r *raftNode) sendMessages(ms []raftpb.Message) { +func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message { sentAppResp := false for i := len(ms) - 1; i >= 0; i-- { if r.isIDRemoved(ms[i].To) { @@ -294,8 +309,7 @@ func (r *raftNode) sendMessages(ms []raftpb.Message) { } } } - - r.transport.Send(ms) + return ms } func (r *raftNode) apply() chan apply { diff --git a/etcdserver/raft_test.go b/etcdserver/raft_test.go index c95823705..945f63ce2 100644 --- a/etcdserver/raft_test.go +++ b/etcdserver/raft_test.go @@ -176,7 +176,8 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) { } } -func TestCandidateBlocksByApply(t *testing.T) { +// TestConfgChangeBlocksApply ensures apply blocks if committed entries contain config-change. +func TestConfgChangeBlocksApply(t *testing.T) { n := newNopReadyNode() waitApplyc := make(chan struct{}) @@ -199,8 +200,10 @@ func TestCandidateBlocksByApply(t *testing.T) { srv.r.start(rh) defer srv.r.Stop() - // become candidate - n.readyc <- raft.Ready{SoftState: &raft.SoftState{RaftState: raft.StateCandidate}} + n.readyc <- raft.Ready{ + SoftState: &raft.SoftState{RaftState: raft.StateFollower}, + CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange}}, + } <-srv.r.applyc continueC := make(chan struct{})