diff --git a/raft/node.go b/raft/node.go index 745553c4f..4a3b2f1df 100644 --- a/raft/node.go +++ b/raft/node.go @@ -208,7 +208,19 @@ func StartNode(c *Config, peers []Peer) Node { if err != nil { panic("unexpected marshal error") } - e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d} + // TODO(tbg): this should append the ConfChange for the own node first + // and also call applyConfChange below for that node first. Otherwise + // we have a Raft group (for a little while) that doesn't have itself + // in its config, which is bad. + // This whole way of setting things up is rickety. The app should just + // populate the initial ConfState appropriately and then all of this + // goes away. + e := pb.Entry{ + Type: pb.EntryConfChange, + Term: 1, + Index: r.raftLog.lastIndex() + 1, + Data: d, + } r.raftLog.append(e) } // Mark these initial entries as committed. @@ -225,7 +237,7 @@ func StartNode(c *Config, peers []Peer) Node { // We do not set raftLog.applied so the application will be able // to observe all conf changes via Ready.CommittedEntries. for _, peer := range peers { - r.addNode(peer.ID) + r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}) } n := newNode() @@ -357,35 +369,16 @@ func (n *node) run(r *raft) { r.Step(m) } case cc := <-n.confc: - if cc.NodeID == None { - select { - case n.confstatec <- pb.ConfState{ - Nodes: r.prs.VoterNodes(), - Learners: r.prs.LearnerNodes()}: - case <-n.done: - } - break - } - switch cc.Type { - case pb.ConfChangeAddNode: - r.addNode(cc.NodeID) - case pb.ConfChangeAddLearnerNode: - r.addLearner(cc.NodeID) - case pb.ConfChangeRemoveNode: + cs := r.applyConfChange(cc) + if _, ok := r.prs.Progress[r.id]; !ok { // block incoming proposal when local node is // removed if cc.NodeID == r.id { propc = nil } - r.removeNode(cc.NodeID) - case pb.ConfChangeUpdateNode: - default: - panic("unexpected conf type") } select { - case n.confstatec <- pb.ConfState{ - Nodes: r.prs.VoterNodes(), - Learners: r.prs.LearnerNodes()}: + case n.confstatec <- cs: case <-n.done: } case <-n.tickc: diff --git a/raft/quorum/joint.go b/raft/quorum/joint.go index 9f8f484dc..e3741e0b0 100644 --- a/raft/quorum/joint.go +++ b/raft/quorum/joint.go @@ -18,6 +18,13 @@ package quorum // majority configurations. Decisions require the support of both majorities. type JointConfig [2]MajorityConfig +func (c JointConfig) String() string { + if len(c[1]) > 0 { + return c[0].String() + "&&" + c[1].String() + } + return c[0].String() +} + // IDs returns a newly initialized map representing the set of voters present // in the joint configuration. func (c JointConfig) IDs() map[uint64]struct{} { diff --git a/raft/quorum/majority.go b/raft/quorum/majority.go index 3d7bf8233..5eba50344 100644 --- a/raft/quorum/majority.go +++ b/raft/quorum/majority.go @@ -24,6 +24,24 @@ import ( // MajorityConfig is a set of IDs that uses majority quorums to make decisions. type MajorityConfig map[uint64]struct{} +func (c MajorityConfig) String() string { + sl := make([]uint64, 0, len(c)) + for id := range c { + sl = append(sl, id) + } + sort.Slice(sl, func(i, j int) bool { return sl[i] < sl[j] }) + var buf strings.Builder + buf.WriteByte('(') + for i := range sl { + if i > 0 { + buf.WriteByte(' ') + } + fmt.Fprint(&buf, sl[i]) + } + buf.WriteByte(')') + return buf.String() +} + // Describe returns a (multi-line) representation of the commit indexes for the // given lookuper. func (c MajorityConfig) Describe(l AckedIndexer) string { diff --git a/raft/quorum/quorum.go b/raft/quorum/quorum.go index ff9c6f48d..2899e46c9 100644 --- a/raft/quorum/quorum.go +++ b/raft/quorum/quorum.go @@ -19,6 +19,7 @@ import ( "strconv" ) +// Index is a Raft log position. type Index uint64 func (i Index) String() string { diff --git a/raft/raft.go b/raft/raft.go index 3cdc1f0ac..a42bb4e63 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -1322,11 +1322,51 @@ func (r *raft) handleSnapshot(m pb.Message) { } // restore recovers the state machine from a snapshot. It restores the log and the -// configuration of state machine. +// configuration of state machine. If this method returns false, the snapshot was +// ignored, either because it was obsolete or because of an error. func (r *raft) restore(s pb.Snapshot) bool { if s.Metadata.Index <= r.raftLog.committed { return false } + if r.state != StateFollower { + // This is defense-in-depth: if the leader somehow ended up applying a + // snapshot, it could move into a new term without moving into a + // follower state. This should never fire, but if it did, we'd have + // prevented damage by returning early, so log only a loud warning. + // + // At the time of writing, the instance is guaranteed to be in follower + // state when this method is called. + r.logger.Warningf("%x attempted to restore snapshot as leader; should never happen", r.id) + r.becomeFollower(r.Term+1, None) + return false + } + + // More defense-in-depth: throw away snapshot if recipient is not in the + // config. This shouuldn't ever happen (at the time of writing) but lots of + // code here and there assumes that r.id is in the progress tracker. + found := false + cs := s.Metadata.ConfState + for _, set := range [][]uint64{ + cs.Nodes, + cs.Learners, + } { + for _, id := range set { + if id == r.id { + found = true + break + } + } + } + if !found { + r.logger.Warningf( + "%x attempted to restore snapshot but it is not in the ConfState %v; should never happen", + r.id, cs, + ) + return false + } + + // Now go ahead and actually restore. + if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) { r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]", r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) @@ -1344,26 +1384,23 @@ func (r *raft) restore(s pb.Snapshot) bool { } } - r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]", - r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) - r.raftLog.restore(s) - r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight) - r.restoreNode(s.Metadata.ConfState.Nodes, false) - r.restoreNode(s.Metadata.ConfState.Learners, true) - return true -} -func (r *raft) restoreNode(nodes []uint64, isLearner bool) { - for _, n := range nodes { - match, next := uint64(0), r.raftLog.lastIndex()+1 - if n == r.id { - match = next - 1 - r.isLearner = isLearner - } - r.prs.InitProgress(n, match, next, isLearner) - r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs.Progress[n]) + // Reset the configuration and add the (potentially updated) peers in anew. + r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight) + for _, id := range s.Metadata.ConfState.Nodes { + r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddNode}) } + for _, id := range s.Metadata.ConfState.Learners { + r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddLearnerNode}) + } + + pr := r.prs.Progress[r.id] + pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded + + r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] restored snapshot [index: %d, term: %d]", + r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) + return true } // promotable indicates whether state machine can be promoted to leader, @@ -1373,68 +1410,98 @@ func (r *raft) promotable() bool { return pr != nil && !pr.IsLearner } -func (r *raft) addNode(id uint64) { - r.addNodeOrLearnerNode(id, false) -} +func (r *raft) applyConfChange(cc pb.ConfChange) pb.ConfState { + addNodeOrLearnerNode := func(id uint64, isLearner bool) { + // NB: this method is intentionally hidden from view. All mutations of + // the conf state must call applyConfChange directly. + pr := r.prs.Progress[id] + if pr == nil { + r.prs.InitProgress(id, 0, r.raftLog.lastIndex()+1, isLearner) + } else { + if isLearner && !pr.IsLearner { + // Can only change Learner to Voter. + // + // TODO(tbg): why? + r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id) + return + } -func (r *raft) addLearner(id uint64) { - r.addNodeOrLearnerNode(id, true) -} + if isLearner == pr.IsLearner { + // Ignore any redundant addNode calls (which can happen because the + // initial bootstrapping entries are applied twice). + return + } -func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { - pr := r.prs.Progress[id] - if pr == nil { - r.prs.InitProgress(id, 0, r.raftLog.lastIndex()+1, isLearner) - } else { - if isLearner && !pr.IsLearner { - // Can only change Learner to Voter. - r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id) - return + // Change Learner to Voter, use origin Learner progress. + r.prs.RemoveAny(id) + r.prs.InitProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */) + pr.IsLearner = false + *r.prs.Progress[id] = *pr } - if isLearner == pr.IsLearner { - // Ignore any redundant addNode calls (which can happen because the - // initial bootstrapping entries are applied twice). - return + // When a node is first added, we should mark it as recently active. + // Otherwise, CheckQuorum may cause us to step down if it is invoked + // before the added node has had a chance to communicate with us. + r.prs.Progress[id].RecentActive = true + } + + var removed int + if cc.NodeID != None { + switch cc.Type { + case pb.ConfChangeAddNode: + addNodeOrLearnerNode(cc.NodeID, false /* isLearner */) + case pb.ConfChangeAddLearnerNode: + addNodeOrLearnerNode(cc.NodeID, true /* isLearner */) + case pb.ConfChangeRemoveNode: + removed++ + r.prs.RemoveAny(cc.NodeID) + case pb.ConfChangeUpdateNode: + default: + panic("unexpected conf type") } - - // Change Learner to Voter, use origin Learner progress. - r.prs.RemoveAny(id) - r.prs.InitProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */) - pr.IsLearner = false - *r.prs.Progress[id] = *pr } - if r.id == id { - r.isLearner = isLearner + r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config) + // Now that the configuration is updated, handle any side effects. + + cs := pb.ConfState{Nodes: r.prs.VoterNodes(), Learners: r.prs.LearnerNodes()} + pr, ok := r.prs.Progress[r.id] + + // Update whether the node itself is a learner, resetting to false when the + // node is removed. + r.isLearner = ok && pr.IsLearner + + if (!ok || r.isLearner) && r.state == StateLeader { + // This node is leader and was removed or demoted. We prevent demotions + // at the time writing but hypothetically we handle them the same way as + // removing the leader: stepping down into the next Term. + // + // TODO(tbg): step down (for sanity) and ask follower with largest Match + // to TimeoutNow (to avoid interruption). This might still drop some + // proposals but it's better than nothing. + // + // TODO(tbg): test this branch. It is untested at the time of writing. + return cs } - // When a node is first added, we should mark it as recently active. - // Otherwise, CheckQuorum may cause us to step down if it is invoked - // before the added node has a chance to communicate with us. - r.prs.Progress[id].RecentActive = true -} - -func (r *raft) removeNode(id uint64) { - r.prs.RemoveAny(id) - - // Do not try to commit or abort transferring if the cluster is now empty. - if len(r.prs.Voters[0]) == 0 && len(r.prs.Learners) == 0 { - return + // The remaining steps only make sense if this node is the leader and there + // are other nodes. + if r.state != StateLeader || len(cs.Nodes) == 0 { + return cs } - - // TODO(tbg): won't bad (or at least unfortunate) things happen if the - // leader just removed itself? - - // The quorum size is now smaller, so see if any pending entries can - // be committed. - if r.maybeCommit() { - r.bcastAppend() + if removed > 0 { + // The quorum size may have been reduced (but not to zero), so see if + // any pending entries can be committed. + if r.maybeCommit() { + r.bcastAppend() + } } - // If the removed node is the leadTransferee, then abort the leadership transferring. - if r.state == StateLeader && r.leadTransferee == id { + // If the the leadTransferee was removed, abort the leadership transfer. + if _, tOK := r.prs.Progress[r.leadTransferee]; !tOK && r.leadTransferee != 0 { r.abortLeaderTransfer() } + + return cs } func (r *raft) loadState(state pb.HardState) { diff --git a/raft/raft_snap_test.go b/raft/raft_snap_test.go index de72c4c4d..187aef697 100644 --- a/raft/raft_snap_test.go +++ b/raft/raft_snap_test.go @@ -118,30 +118,38 @@ func TestSnapshotSucceed(t *testing.T) { // in the past left the follower in probing status until the next log entry was // committed. func TestSnapshotSucceedViaAppResp(t *testing.T) { - snap := pb.Snapshot{ - Metadata: pb.SnapshotMetadata{ - Index: 11, // magic number - Term: 11, // magic number - ConfState: pb.ConfState{Nodes: []uint64{1, 2, 3}}, - }, - } - s1 := NewMemoryStorage() - n1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, s1) - - // Become follower because otherwise the way this test sets things up the - // leadership term will be 1 (which is stale). We want it to match the snap- - // shot term in this test. - n1.becomeFollower(snap.Metadata.Term-1, 2) + // Create a single-node leader. + n1 := newTestRaft(1, []uint64{1}, 10, 1, s1) n1.becomeCandidate() n1.becomeLeader() + // We need to add a second empty entry so that we can truncate the first + // one away. + n1.Step(pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{}}}) - // Apply a snapshot on the leader. This is a workaround against the fact that - // the leader will always append an empty entry, but that empty entry works - // against what we're trying to assert in this test, namely that a snapshot - // at the latest committed index leaves the follower in probing state. - // With the snapshot, the empty entry is fully committed. - n1.restore(snap) + rd := newReady(n1, &SoftState{}, pb.HardState{}) + s1.Append(rd.Entries) + s1.SetHardState(rd.HardState) + + if exp, ci := s1.lastIndex(), n1.raftLog.committed; ci != exp { + t.Fatalf("unexpected committed index %d, wanted %d: %+v", ci, exp, s1) + } + + // Force a log truncation. + if err := s1.Compact(1); err != nil { + t.Fatal(err) + } + + // Add a follower to the group. Do this in a clandestine way for simplicity. + // Also set up a snapshot that will be sent to the follower. + n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) + s1.snapshot = pb.Snapshot{ + Metadata: pb.SnapshotMetadata{ + ConfState: pb.ConfState{Nodes: []uint64{1, 2}}, + Index: s1.lastIndex(), + Term: s1.ents[len(s1.ents)-1].Term, + }, + } noMessage := pb.MessageType(-1) mustSend := func(from, to *raft, typ pb.MessageType) pb.Message { @@ -151,6 +159,9 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) { continue } t.Log(DescribeMessage(msg, func([]byte) string { return "" })) + if len(msg.Entries) > 0 { + t.Log(DescribeEntries(msg.Entries, func(b []byte) string { return string(b) })) + } if err := to.Step(msg); err != nil { t.Fatalf("%v: %s", msg, err) } @@ -169,7 +180,7 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) { // Create the follower that will receive the snapshot. s2 := NewMemoryStorage() - n2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, s2) + n2 := newTestRaft(2, []uint64{1, 2}, 10, 1, s2) // Let the leader probe the follower. if !n1.maybeSendAppend(2, true /* sendIfEmpty */) { @@ -186,9 +197,9 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) { t.Fatalf("expected a rejection with zero hint, got reject=%t hint=%d", msg.Reject, msg.RejectHint) } - expIdx := snap.Metadata.Index - // Leader sends snapshot due to RejectHint of zero (the storage we use here - // has index zero compacted). + const expIdx = 2 + // Leader sends snapshot due to RejectHint of zero (we set up the raft log + // to start at index 2). if msg := mustSend(n1, n2, pb.MsgSnap); msg.Snapshot.Metadata.Index != expIdx { t.Fatalf("expected snapshot at index %d, got %d", expIdx, msg.Snapshot.Metadata.Index) } diff --git a/raft/raft_test.go b/raft/raft_test.go index 805e6071f..8fcc2f5c7 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -356,8 +356,8 @@ func TestLearnerPromotion(t *testing.T) { nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - n1.addNode(2) - n2.addNode(2) + n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) + n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) if n2.isLearner { t.Error("peer 2 is learner, want not") } @@ -3076,7 +3076,7 @@ func TestNewLeaderPendingConfig(t *testing.T) { // TestAddNode tests that addNode could update nodes correctly. func TestAddNode(t *testing.T) { r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) - r.addNode(2) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) nodes := r.prs.VoterNodes() wnodes := []uint64{1, 2} if !reflect.DeepEqual(nodes, wnodes) { @@ -3087,7 +3087,7 @@ func TestAddNode(t *testing.T) { // TestAddLearner tests that addLearner could update nodes correctly. func TestAddLearner(t *testing.T) { r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) - r.addLearner(2) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}) nodes := r.prs.LearnerNodes() wnodes := []uint64{2} if !reflect.DeepEqual(nodes, wnodes) { @@ -3111,7 +3111,7 @@ func TestAddNodeCheckQuorum(t *testing.T) { r.tick() } - r.addNode(2) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) // This tick will reach electionTimeout, which triggers a quorum check. r.tick() @@ -3136,14 +3136,14 @@ func TestAddNodeCheckQuorum(t *testing.T) { // and removed list correctly. func TestRemoveNode(t *testing.T) { r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) - r.removeNode(2) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode}) w := []uint64{1} if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } // remove all nodes from cluster - r.removeNode(1) + r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode}) w = []uint64{} if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) @@ -3154,7 +3154,7 @@ func TestRemoveNode(t *testing.T) { // and removed list correctly. func TestRemoveLearner(t *testing.T) { r := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) - r.removeNode(2) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode}) w := []uint64{1} if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) @@ -3166,7 +3166,7 @@ func TestRemoveLearner(t *testing.T) { } // remove all nodes from cluster - r.removeNode(1) + r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode}) if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } @@ -3300,7 +3300,7 @@ func TestCommitAfterRemoveNode(t *testing.T) { // Apply the config change. This reduces quorum requirements so the // pending command can now commit. - r.removeNode(2) + r.applyConfChange(cc) ents = nextEnts(r, s) if len(ents) != 1 || ents[0].Type != pb.EntryNormal || string(ents[0].Data) != "hello" { @@ -3549,7 +3549,7 @@ func TestLeaderTransferRemoveNode(t *testing.T) { t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3) } - lead.removeNode(3) + lead.applyConfChange(pb.ConfChange{NodeID: 3, Type: pb.ConfChangeRemoveNode}) checkLeaderTransferState(t, lead, StateLeader, 1) } @@ -3875,9 +3875,9 @@ func TestPreVoteWithCheckQuorum(t *testing.T) { // a MsgHup or MsgTimeoutNow. func TestLearnerCampaign(t *testing.T) { n1 := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) - n1.addLearner(2) + n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}) n2 := newTestRaft(2, []uint64{1}, 10, 1, NewMemoryStorage()) - n2.addLearner(2) + n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}) nt := newNetwork(n1, n2) nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup}) diff --git a/raft/rawnode.go b/raft/rawnode.go index d7e54eeea..77183b793 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -101,7 +101,7 @@ func NewRawNode(config *Config, peers []Peer) (*RawNode, error) { r.raftLog.append(ents...) r.raftLog.committed = uint64(len(ents)) for _, peer := range peers { - r.addNode(peer.ID) + r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}) } } @@ -166,21 +166,8 @@ func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error { // ApplyConfChange applies a config change to the local node. func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { - if cc.NodeID == None { - return &pb.ConfState{Nodes: rn.raft.prs.VoterNodes(), Learners: rn.raft.prs.LearnerNodes()} - } - switch cc.Type { - case pb.ConfChangeAddNode: - rn.raft.addNode(cc.NodeID) - case pb.ConfChangeAddLearnerNode: - rn.raft.addLearner(cc.NodeID) - case pb.ConfChangeRemoveNode: - rn.raft.removeNode(cc.NodeID) - case pb.ConfChangeUpdateNode: - default: - panic("unexpected conf type") - } - return &pb.ConfState{Nodes: rn.raft.prs.VoterNodes(), Learners: rn.raft.prs.LearnerNodes()} + cs := rn.raft.applyConfChange(cc) + return &cs } // Step advances the state machine using the given message. diff --git a/raft/tracker/tracker.go b/raft/tracker/tracker.go index 2d162c6de..4b3396fbe 100644 --- a/raft/tracker/tracker.go +++ b/raft/tracker/tracker.go @@ -21,12 +21,72 @@ import ( "go.etcd.io/etcd/raft/quorum" ) +// Config reflects the configuration tracked in a ProgressTracker. +type Config struct { + Voters quorum.JointConfig + // Learners is a set of IDs corresponding to the learners active in the + // current configuration. + // + // Invariant: Learners and Voters does not intersect, i.e. if a peer is in + // either half of the joint config, it can't be a learner; if it is a + // learner it can't be in either half of the joint config. This invariant + // simplifies the implementation since it allows peers to have clarity about + // its current role without taking into account joint consensus. + Learners map[uint64]struct{} + // TODO(tbg): when we actually carry out joint consensus changes and turn a + // voter into a learner, we cannot add the learner when entering the joint + // state. This is because this would violate the invariant that the inter- + // section of voters and learners is empty. For example, assume a Voter is + // removed and immediately re-added as a learner (or in other words, it is + // demoted). + // + // Initially, the configuration will be + // + // voters: {1 2 3} + // learners: {} + // + // and we want to demote 3. Entering the joint configuration, we naively get + // + // voters: {1 2} & {1 2 3} + // learners: {3} + // + // but this violates the invariant (3 is both voter and learner). Instead, + // we have + // + // voters: {1 2} & {1 2 3} + // learners: {} + // next_learners: {3} + // + // Where 3 is now still purely a voter, but we are remembering the intention + // to make it a learner upon transitioning into the final configuration: + // + // voters: {1 2} + // learners: {3} + // next_learners: {} + // + // Note that next_learners is not used while adding a learner that is not + // also a voter in the joint config. In this case, the learner is added + // to Learners right away when entering the joint configuration, so that it + // is caught up as soon as possible. + // + // NextLearners map[uint64]struct{} +} + +func (c *Config) String() string { + if len(c.Learners) == 0 { + return fmt.Sprintf("voters=%s", c.Voters) + } + return fmt.Sprintf( + "voters=%s learners=%s", + c.Voters, quorum.MajorityConfig(c.Learners).String(), + ) +} + // ProgressTracker tracks the currently active configuration and the information // known about the nodes and learners in it. In particular, it tracks the match // index for each peer which in turn allows reasoning about the committed index. type ProgressTracker struct { - Voters quorum.JointConfig - Learners map[uint64]struct{} + Config Progress map[uint64]*Progress @@ -39,11 +99,15 @@ type ProgressTracker struct { func MakeProgressTracker(maxInflight int) ProgressTracker { p := ProgressTracker{ MaxInflight: maxInflight, - Voters: quorum.JointConfig{ - quorum.MajorityConfig{}, - quorum.MajorityConfig{}, + Config: Config{ + Voters: quorum.JointConfig{ + quorum.MajorityConfig{}, + // TODO(tbg): this will be mostly empty, so make it a nil pointer + // in the common case. + quorum.MajorityConfig{}, + }, + Learners: map[uint64]struct{}{}, }, - Learners: map[uint64]struct{}{}, Votes: map[uint64]bool{}, Progress: map[uint64]*Progress{}, }