From b171e1c78b0411c8a2648de0eee2102735bec6b4 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 28 Jun 2019 14:38:05 +0200 Subject: [PATCH 1/2] raft: centralize configuration change application Put all the logic related to applying a configuration change in one place in preparation for adding joint consensus. This inspired various TODOs. I had to rewrite TestSnapshotSucceedViaAppResp since it was relying on a snapshot applied to the leader, which is now prevented. --- raft/node.go | 41 ++++----- raft/raft.go | 200 +++++++++++++++++++++++++++-------------- raft/raft_snap_test.go | 59 +++++++----- raft/raft_test.go | 26 +++--- raft/rawnode.go | 19 +--- 5 files changed, 201 insertions(+), 144 deletions(-) diff --git a/raft/node.go b/raft/node.go index 745553c4f..4a3b2f1df 100644 --- a/raft/node.go +++ b/raft/node.go @@ -208,7 +208,19 @@ func StartNode(c *Config, peers []Peer) Node { if err != nil { panic("unexpected marshal error") } - e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d} + // TODO(tbg): this should append the ConfChange for the own node first + // and also call applyConfChange below for that node first. Otherwise + // we have a Raft group (for a little while) that doesn't have itself + // in its config, which is bad. + // This whole way of setting things up is rickety. The app should just + // populate the initial ConfState appropriately and then all of this + // goes away. + e := pb.Entry{ + Type: pb.EntryConfChange, + Term: 1, + Index: r.raftLog.lastIndex() + 1, + Data: d, + } r.raftLog.append(e) } // Mark these initial entries as committed. @@ -225,7 +237,7 @@ func StartNode(c *Config, peers []Peer) Node { // We do not set raftLog.applied so the application will be able // to observe all conf changes via Ready.CommittedEntries. for _, peer := range peers { - r.addNode(peer.ID) + r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}) } n := newNode() @@ -357,35 +369,16 @@ func (n *node) run(r *raft) { r.Step(m) } case cc := <-n.confc: - if cc.NodeID == None { - select { - case n.confstatec <- pb.ConfState{ - Nodes: r.prs.VoterNodes(), - Learners: r.prs.LearnerNodes()}: - case <-n.done: - } - break - } - switch cc.Type { - case pb.ConfChangeAddNode: - r.addNode(cc.NodeID) - case pb.ConfChangeAddLearnerNode: - r.addLearner(cc.NodeID) - case pb.ConfChangeRemoveNode: + cs := r.applyConfChange(cc) + if _, ok := r.prs.Progress[r.id]; !ok { // block incoming proposal when local node is // removed if cc.NodeID == r.id { propc = nil } - r.removeNode(cc.NodeID) - case pb.ConfChangeUpdateNode: - default: - panic("unexpected conf type") } select { - case n.confstatec <- pb.ConfState{ - Nodes: r.prs.VoterNodes(), - Learners: r.prs.LearnerNodes()}: + case n.confstatec <- cs: case <-n.done: } case <-n.tickc: diff --git a/raft/raft.go b/raft/raft.go index 3cdc1f0ac..4edf9616c 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -1322,11 +1322,51 @@ func (r *raft) handleSnapshot(m pb.Message) { } // restore recovers the state machine from a snapshot. It restores the log and the -// configuration of state machine. +// configuration of state machine. If this method returns false, the snapshot was +// ignored, either because it was obsolete or because of an error. func (r *raft) restore(s pb.Snapshot) bool { if s.Metadata.Index <= r.raftLog.committed { return false } + if r.state != StateFollower { + // This is defense-in-depth: if the leader somehow ended up applying a + // snapshot, it could move into a new term without moving into a + // follower state. This should never fire, but if it did, we'd have + // prevented damage by returning early, so log only a loud warning. + // + // At the time of writing, the instance is guaranteed to be in follower + // state when this method is called. + r.logger.Warningf("%x attempted to restore snapshot as leader; should never happen", r.id) + r.becomeFollower(r.Term+1, None) + return false + } + + // More defense-in-depth: throw away snapshot if recipient is not in the + // config. This shouuldn't ever happen (at the time of writing) but lots of + // code here and there assumes that r.id is in the progress tracker. + found := false + cs := s.Metadata.ConfState + for _, set := range [][]uint64{ + cs.Nodes, + cs.Learners, + } { + for _, id := range set { + if id == r.id { + found = true + break + } + } + } + if !found { + r.logger.Warningf( + "%x attempted to restore snapshot but it is not in the ConfState %v; should never happen", + r.id, cs, + ) + return false + } + + // Now go ahead and actually restore. + if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) { r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]", r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) @@ -1344,26 +1384,23 @@ func (r *raft) restore(s pb.Snapshot) bool { } } - r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]", - r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) - r.raftLog.restore(s) - r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight) - r.restoreNode(s.Metadata.ConfState.Nodes, false) - r.restoreNode(s.Metadata.ConfState.Learners, true) - return true -} -func (r *raft) restoreNode(nodes []uint64, isLearner bool) { - for _, n := range nodes { - match, next := uint64(0), r.raftLog.lastIndex()+1 - if n == r.id { - match = next - 1 - r.isLearner = isLearner - } - r.prs.InitProgress(n, match, next, isLearner) - r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs.Progress[n]) + // Reset the configuration and add the (potentially updated) peers in anew. + r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight) + for _, id := range s.Metadata.ConfState.Nodes { + r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddNode}) } + for _, id := range s.Metadata.ConfState.Learners { + r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddLearnerNode}) + } + + pr := r.prs.Progress[r.id] + pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded + + r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] restored snapshot [index: %d, term: %d]", + r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) + return true } // promotable indicates whether state machine can be promoted to leader, @@ -1373,68 +1410,97 @@ func (r *raft) promotable() bool { return pr != nil && !pr.IsLearner } -func (r *raft) addNode(id uint64) { - r.addNodeOrLearnerNode(id, false) -} +func (r *raft) applyConfChange(cc pb.ConfChange) pb.ConfState { + addNodeOrLearnerNode := func(id uint64, isLearner bool) { + // NB: this method is intentionally hidden from view. All mutations of + // the conf state must call applyConfChange directly. + pr := r.prs.Progress[id] + if pr == nil { + r.prs.InitProgress(id, 0, r.raftLog.lastIndex()+1, isLearner) + } else { + if isLearner && !pr.IsLearner { + // Can only change Learner to Voter. + // + // TODO(tbg): why? + r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id) + return + } -func (r *raft) addLearner(id uint64) { - r.addNodeOrLearnerNode(id, true) -} + if isLearner == pr.IsLearner { + // Ignore any redundant addNode calls (which can happen because the + // initial bootstrapping entries are applied twice). + return + } -func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { - pr := r.prs.Progress[id] - if pr == nil { - r.prs.InitProgress(id, 0, r.raftLog.lastIndex()+1, isLearner) - } else { - if isLearner && !pr.IsLearner { - // Can only change Learner to Voter. - r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id) - return + // Change Learner to Voter, use origin Learner progress. + r.prs.RemoveAny(id) + r.prs.InitProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */) + pr.IsLearner = false + *r.prs.Progress[id] = *pr } - if isLearner == pr.IsLearner { - // Ignore any redundant addNode calls (which can happen because the - // initial bootstrapping entries are applied twice). - return + // When a node is first added, we should mark it as recently active. + // Otherwise, CheckQuorum may cause us to step down if it is invoked + // before the added node has had a chance to communicate with us. + r.prs.Progress[id].RecentActive = true + } + + var removed int + if cc.NodeID != None { + switch cc.Type { + case pb.ConfChangeAddNode: + addNodeOrLearnerNode(cc.NodeID, false /* isLearner */) + case pb.ConfChangeAddLearnerNode: + addNodeOrLearnerNode(cc.NodeID, true /* isLearner */) + case pb.ConfChangeRemoveNode: + removed++ + r.prs.RemoveAny(cc.NodeID) + case pb.ConfChangeUpdateNode: + default: + panic("unexpected conf type") } - - // Change Learner to Voter, use origin Learner progress. - r.prs.RemoveAny(id) - r.prs.InitProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */) - pr.IsLearner = false - *r.prs.Progress[id] = *pr } - if r.id == id { - r.isLearner = isLearner + // Now that the configuration is updated, handle any side effects. + + cs := pb.ConfState{Nodes: r.prs.VoterNodes(), Learners: r.prs.LearnerNodes()} + pr, ok := r.prs.Progress[r.id] + + // Update whether the node itself is a learner, resetting to false when the + // node is removed. + r.isLearner = ok && pr.IsLearner + + if (!ok || r.isLearner) && r.state == StateLeader { + // This node is leader and was removed or demoted. We prevent demotions + // at the time writing but hypothetically we handle them the same way as + // removing the leader: stepping down into the next Term. + // + // TODO(tbg): step down (for sanity) and ask follower with largest Match + // to TimeoutNow (to avoid interruption). This might still drop some + // proposals but it's better than nothing. + // + // TODO(tbg): test this branch. It is untested at the time of writing. + return cs } - // When a node is first added, we should mark it as recently active. - // Otherwise, CheckQuorum may cause us to step down if it is invoked - // before the added node has a chance to communicate with us. - r.prs.Progress[id].RecentActive = true -} - -func (r *raft) removeNode(id uint64) { - r.prs.RemoveAny(id) - - // Do not try to commit or abort transferring if the cluster is now empty. - if len(r.prs.Voters[0]) == 0 && len(r.prs.Learners) == 0 { - return + // The remaining steps only make sense if this node is the leader and there + // are other nodes. + if r.state != StateLeader || len(cs.Nodes) == 0 { + return cs } - - // TODO(tbg): won't bad (or at least unfortunate) things happen if the - // leader just removed itself? - - // The quorum size is now smaller, so see if any pending entries can - // be committed. - if r.maybeCommit() { - r.bcastAppend() + if removed > 0 { + // The quorum size may have been reduced (but not to zero), so see if + // any pending entries can be committed. + if r.maybeCommit() { + r.bcastAppend() + } } - // If the removed node is the leadTransferee, then abort the leadership transferring. - if r.state == StateLeader && r.leadTransferee == id { + // If the the leadTransferee was removed, abort the leadership transfer. + if _, tOK := r.prs.Progress[r.leadTransferee]; !tOK && r.leadTransferee != 0 { r.abortLeaderTransfer() } + + return cs } func (r *raft) loadState(state pb.HardState) { diff --git a/raft/raft_snap_test.go b/raft/raft_snap_test.go index de72c4c4d..187aef697 100644 --- a/raft/raft_snap_test.go +++ b/raft/raft_snap_test.go @@ -118,30 +118,38 @@ func TestSnapshotSucceed(t *testing.T) { // in the past left the follower in probing status until the next log entry was // committed. func TestSnapshotSucceedViaAppResp(t *testing.T) { - snap := pb.Snapshot{ - Metadata: pb.SnapshotMetadata{ - Index: 11, // magic number - Term: 11, // magic number - ConfState: pb.ConfState{Nodes: []uint64{1, 2, 3}}, - }, - } - s1 := NewMemoryStorage() - n1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, s1) - - // Become follower because otherwise the way this test sets things up the - // leadership term will be 1 (which is stale). We want it to match the snap- - // shot term in this test. - n1.becomeFollower(snap.Metadata.Term-1, 2) + // Create a single-node leader. + n1 := newTestRaft(1, []uint64{1}, 10, 1, s1) n1.becomeCandidate() n1.becomeLeader() + // We need to add a second empty entry so that we can truncate the first + // one away. + n1.Step(pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{}}}) - // Apply a snapshot on the leader. This is a workaround against the fact that - // the leader will always append an empty entry, but that empty entry works - // against what we're trying to assert in this test, namely that a snapshot - // at the latest committed index leaves the follower in probing state. - // With the snapshot, the empty entry is fully committed. - n1.restore(snap) + rd := newReady(n1, &SoftState{}, pb.HardState{}) + s1.Append(rd.Entries) + s1.SetHardState(rd.HardState) + + if exp, ci := s1.lastIndex(), n1.raftLog.committed; ci != exp { + t.Fatalf("unexpected committed index %d, wanted %d: %+v", ci, exp, s1) + } + + // Force a log truncation. + if err := s1.Compact(1); err != nil { + t.Fatal(err) + } + + // Add a follower to the group. Do this in a clandestine way for simplicity. + // Also set up a snapshot that will be sent to the follower. + n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) + s1.snapshot = pb.Snapshot{ + Metadata: pb.SnapshotMetadata{ + ConfState: pb.ConfState{Nodes: []uint64{1, 2}}, + Index: s1.lastIndex(), + Term: s1.ents[len(s1.ents)-1].Term, + }, + } noMessage := pb.MessageType(-1) mustSend := func(from, to *raft, typ pb.MessageType) pb.Message { @@ -151,6 +159,9 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) { continue } t.Log(DescribeMessage(msg, func([]byte) string { return "" })) + if len(msg.Entries) > 0 { + t.Log(DescribeEntries(msg.Entries, func(b []byte) string { return string(b) })) + } if err := to.Step(msg); err != nil { t.Fatalf("%v: %s", msg, err) } @@ -169,7 +180,7 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) { // Create the follower that will receive the snapshot. s2 := NewMemoryStorage() - n2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, s2) + n2 := newTestRaft(2, []uint64{1, 2}, 10, 1, s2) // Let the leader probe the follower. if !n1.maybeSendAppend(2, true /* sendIfEmpty */) { @@ -186,9 +197,9 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) { t.Fatalf("expected a rejection with zero hint, got reject=%t hint=%d", msg.Reject, msg.RejectHint) } - expIdx := snap.Metadata.Index - // Leader sends snapshot due to RejectHint of zero (the storage we use here - // has index zero compacted). + const expIdx = 2 + // Leader sends snapshot due to RejectHint of zero (we set up the raft log + // to start at index 2). if msg := mustSend(n1, n2, pb.MsgSnap); msg.Snapshot.Metadata.Index != expIdx { t.Fatalf("expected snapshot at index %d, got %d", expIdx, msg.Snapshot.Metadata.Index) } diff --git a/raft/raft_test.go b/raft/raft_test.go index 805e6071f..8fcc2f5c7 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -356,8 +356,8 @@ func TestLearnerPromotion(t *testing.T) { nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - n1.addNode(2) - n2.addNode(2) + n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) + n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) if n2.isLearner { t.Error("peer 2 is learner, want not") } @@ -3076,7 +3076,7 @@ func TestNewLeaderPendingConfig(t *testing.T) { // TestAddNode tests that addNode could update nodes correctly. func TestAddNode(t *testing.T) { r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) - r.addNode(2) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) nodes := r.prs.VoterNodes() wnodes := []uint64{1, 2} if !reflect.DeepEqual(nodes, wnodes) { @@ -3087,7 +3087,7 @@ func TestAddNode(t *testing.T) { // TestAddLearner tests that addLearner could update nodes correctly. func TestAddLearner(t *testing.T) { r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) - r.addLearner(2) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}) nodes := r.prs.LearnerNodes() wnodes := []uint64{2} if !reflect.DeepEqual(nodes, wnodes) { @@ -3111,7 +3111,7 @@ func TestAddNodeCheckQuorum(t *testing.T) { r.tick() } - r.addNode(2) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) // This tick will reach electionTimeout, which triggers a quorum check. r.tick() @@ -3136,14 +3136,14 @@ func TestAddNodeCheckQuorum(t *testing.T) { // and removed list correctly. func TestRemoveNode(t *testing.T) { r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) - r.removeNode(2) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode}) w := []uint64{1} if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } // remove all nodes from cluster - r.removeNode(1) + r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode}) w = []uint64{} if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) @@ -3154,7 +3154,7 @@ func TestRemoveNode(t *testing.T) { // and removed list correctly. func TestRemoveLearner(t *testing.T) { r := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) - r.removeNode(2) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode}) w := []uint64{1} if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) @@ -3166,7 +3166,7 @@ func TestRemoveLearner(t *testing.T) { } // remove all nodes from cluster - r.removeNode(1) + r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode}) if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } @@ -3300,7 +3300,7 @@ func TestCommitAfterRemoveNode(t *testing.T) { // Apply the config change. This reduces quorum requirements so the // pending command can now commit. - r.removeNode(2) + r.applyConfChange(cc) ents = nextEnts(r, s) if len(ents) != 1 || ents[0].Type != pb.EntryNormal || string(ents[0].Data) != "hello" { @@ -3549,7 +3549,7 @@ func TestLeaderTransferRemoveNode(t *testing.T) { t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3) } - lead.removeNode(3) + lead.applyConfChange(pb.ConfChange{NodeID: 3, Type: pb.ConfChangeRemoveNode}) checkLeaderTransferState(t, lead, StateLeader, 1) } @@ -3875,9 +3875,9 @@ func TestPreVoteWithCheckQuorum(t *testing.T) { // a MsgHup or MsgTimeoutNow. func TestLearnerCampaign(t *testing.T) { n1 := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) - n1.addLearner(2) + n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}) n2 := newTestRaft(2, []uint64{1}, 10, 1, NewMemoryStorage()) - n2.addLearner(2) + n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}) nt := newNetwork(n1, n2) nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup}) diff --git a/raft/rawnode.go b/raft/rawnode.go index d7e54eeea..77183b793 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -101,7 +101,7 @@ func NewRawNode(config *Config, peers []Peer) (*RawNode, error) { r.raftLog.append(ents...) r.raftLog.committed = uint64(len(ents)) for _, peer := range peers { - r.addNode(peer.ID) + r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}) } } @@ -166,21 +166,8 @@ func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error { // ApplyConfChange applies a config change to the local node. func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { - if cc.NodeID == None { - return &pb.ConfState{Nodes: rn.raft.prs.VoterNodes(), Learners: rn.raft.prs.LearnerNodes()} - } - switch cc.Type { - case pb.ConfChangeAddNode: - rn.raft.addNode(cc.NodeID) - case pb.ConfChangeAddLearnerNode: - rn.raft.addLearner(cc.NodeID) - case pb.ConfChangeRemoveNode: - rn.raft.removeNode(cc.NodeID) - case pb.ConfChangeUpdateNode: - default: - panic("unexpected conf type") - } - return &pb.ConfState{Nodes: rn.raft.prs.VoterNodes(), Learners: rn.raft.prs.LearnerNodes()} + cs := rn.raft.applyConfChange(cc) + return &cs } // Step advances the state machine using the given message. From 6697adfff88fdc473ac98c9c33814d5df3af511b Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Wed, 3 Jul 2019 21:26:37 +0200 Subject: [PATCH 2/2] raft/tracker: pull Voters and Learners into Config struct This is helpful to quickly print the configuration log messages without having to specify Voters and Learners separately. It will also come in handy for joint quorums because it allows holding on to voters and learners as a unit, which is useful for unit testing. --- raft/quorum/joint.go | 7 ++++ raft/quorum/majority.go | 18 ++++++++++ raft/quorum/quorum.go | 1 + raft/raft.go | 1 + raft/tracker/tracker.go | 76 +++++++++++++++++++++++++++++++++++++---- 5 files changed, 97 insertions(+), 6 deletions(-) diff --git a/raft/quorum/joint.go b/raft/quorum/joint.go index 9f8f484dc..e3741e0b0 100644 --- a/raft/quorum/joint.go +++ b/raft/quorum/joint.go @@ -18,6 +18,13 @@ package quorum // majority configurations. Decisions require the support of both majorities. type JointConfig [2]MajorityConfig +func (c JointConfig) String() string { + if len(c[1]) > 0 { + return c[0].String() + "&&" + c[1].String() + } + return c[0].String() +} + // IDs returns a newly initialized map representing the set of voters present // in the joint configuration. func (c JointConfig) IDs() map[uint64]struct{} { diff --git a/raft/quorum/majority.go b/raft/quorum/majority.go index 3d7bf8233..5eba50344 100644 --- a/raft/quorum/majority.go +++ b/raft/quorum/majority.go @@ -24,6 +24,24 @@ import ( // MajorityConfig is a set of IDs that uses majority quorums to make decisions. type MajorityConfig map[uint64]struct{} +func (c MajorityConfig) String() string { + sl := make([]uint64, 0, len(c)) + for id := range c { + sl = append(sl, id) + } + sort.Slice(sl, func(i, j int) bool { return sl[i] < sl[j] }) + var buf strings.Builder + buf.WriteByte('(') + for i := range sl { + if i > 0 { + buf.WriteByte(' ') + } + fmt.Fprint(&buf, sl[i]) + } + buf.WriteByte(')') + return buf.String() +} + // Describe returns a (multi-line) representation of the commit indexes for the // given lookuper. func (c MajorityConfig) Describe(l AckedIndexer) string { diff --git a/raft/quorum/quorum.go b/raft/quorum/quorum.go index ff9c6f48d..2899e46c9 100644 --- a/raft/quorum/quorum.go +++ b/raft/quorum/quorum.go @@ -19,6 +19,7 @@ import ( "strconv" ) +// Index is a Raft log position. type Index uint64 func (i Index) String() string { diff --git a/raft/raft.go b/raft/raft.go index 4edf9616c..a42bb4e63 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -1461,6 +1461,7 @@ func (r *raft) applyConfChange(cc pb.ConfChange) pb.ConfState { } } + r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config) // Now that the configuration is updated, handle any side effects. cs := pb.ConfState{Nodes: r.prs.VoterNodes(), Learners: r.prs.LearnerNodes()} diff --git a/raft/tracker/tracker.go b/raft/tracker/tracker.go index 2d162c6de..4b3396fbe 100644 --- a/raft/tracker/tracker.go +++ b/raft/tracker/tracker.go @@ -21,12 +21,72 @@ import ( "go.etcd.io/etcd/raft/quorum" ) +// Config reflects the configuration tracked in a ProgressTracker. +type Config struct { + Voters quorum.JointConfig + // Learners is a set of IDs corresponding to the learners active in the + // current configuration. + // + // Invariant: Learners and Voters does not intersect, i.e. if a peer is in + // either half of the joint config, it can't be a learner; if it is a + // learner it can't be in either half of the joint config. This invariant + // simplifies the implementation since it allows peers to have clarity about + // its current role without taking into account joint consensus. + Learners map[uint64]struct{} + // TODO(tbg): when we actually carry out joint consensus changes and turn a + // voter into a learner, we cannot add the learner when entering the joint + // state. This is because this would violate the invariant that the inter- + // section of voters and learners is empty. For example, assume a Voter is + // removed and immediately re-added as a learner (or in other words, it is + // demoted). + // + // Initially, the configuration will be + // + // voters: {1 2 3} + // learners: {} + // + // and we want to demote 3. Entering the joint configuration, we naively get + // + // voters: {1 2} & {1 2 3} + // learners: {3} + // + // but this violates the invariant (3 is both voter and learner). Instead, + // we have + // + // voters: {1 2} & {1 2 3} + // learners: {} + // next_learners: {3} + // + // Where 3 is now still purely a voter, but we are remembering the intention + // to make it a learner upon transitioning into the final configuration: + // + // voters: {1 2} + // learners: {3} + // next_learners: {} + // + // Note that next_learners is not used while adding a learner that is not + // also a voter in the joint config. In this case, the learner is added + // to Learners right away when entering the joint configuration, so that it + // is caught up as soon as possible. + // + // NextLearners map[uint64]struct{} +} + +func (c *Config) String() string { + if len(c.Learners) == 0 { + return fmt.Sprintf("voters=%s", c.Voters) + } + return fmt.Sprintf( + "voters=%s learners=%s", + c.Voters, quorum.MajorityConfig(c.Learners).String(), + ) +} + // ProgressTracker tracks the currently active configuration and the information // known about the nodes and learners in it. In particular, it tracks the match // index for each peer which in turn allows reasoning about the committed index. type ProgressTracker struct { - Voters quorum.JointConfig - Learners map[uint64]struct{} + Config Progress map[uint64]*Progress @@ -39,11 +99,15 @@ type ProgressTracker struct { func MakeProgressTracker(maxInflight int) ProgressTracker { p := ProgressTracker{ MaxInflight: maxInflight, - Voters: quorum.JointConfig{ - quorum.MajorityConfig{}, - quorum.MajorityConfig{}, + Config: Config{ + Voters: quorum.JointConfig{ + quorum.MajorityConfig{}, + // TODO(tbg): this will be mostly empty, so make it a nil pointer + // in the common case. + quorum.MajorityConfig{}, + }, + Learners: map[uint64]struct{}{}, }, - Learners: map[uint64]struct{}{}, Votes: map[uint64]bool{}, Progress: map[uint64]*Progress{}, }