diff --git a/etcdserver/raft_test.go b/etcdserver/raft_test.go index fa5830ef4..3ec76f4d3 100644 --- a/etcdserver/raft_test.go +++ b/etcdserver/raft_test.go @@ -178,8 +178,8 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) { } } -// TestConfgChangeBlocksApply ensures apply blocks if committed entries contain config-change. -func TestConfgChangeBlocksApply(t *testing.T) { +// TestConfigChangeBlocksApply ensures apply blocks if committed entries contain config-change. +func TestConfigChangeBlocksApply(t *testing.T) { n := newNopReadyNode() r := newRaftNode(raftNodeConfig{ diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index e7cde53b4..9d8b4c240 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -28,8 +28,6 @@ import ( "testing" "time" - "go.uber.org/zap" - "go.etcd.io/etcd/etcdserver/api/membership" "go.etcd.io/etcd/etcdserver/api/rafthttp" "go.etcd.io/etcd/etcdserver/api/snap" @@ -49,6 +47,7 @@ import ( "go.etcd.io/etcd/pkg/wait" "go.etcd.io/etcd/raft" "go.etcd.io/etcd/raft/raftpb" + "go.uber.org/zap" ) // TestDoLocalAction tests requests which do not need to go through raft to be applied, @@ -1632,7 +1631,7 @@ func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error { n.Record(testutil.Action{Name: "Propose", Params: []interface{}{data}}) return nil } -func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error { +func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChangeI) error { n.Record(testutil.Action{Name: "ProposeConfChange"}) return nil } @@ -1645,7 +1644,7 @@ func (n *nodeRecorder) Ready() <-chan raft.Ready func (n *nodeRecorder) TransferLeadership(ctx context.Context, lead, transferee uint64) {} func (n *nodeRecorder) ReadIndex(ctx context.Context, rctx []byte) error { return nil } func (n *nodeRecorder) Advance() {} -func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState { +func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChangeI) *raftpb.ConfState { n.Record(testutil.Action{Name: "ApplyConfChange", Params: []interface{}{conf}}) return &raftpb.ConfState{} } @@ -1706,21 +1705,37 @@ func newNodeConfChangeCommitterStream() *nodeConfChangeCommitterRecorder { return &nodeConfChangeCommitterRecorder{*newReadyNode(), 0} } -func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error { - data, err := conf.Marshal() +func confChangeActionName(conf raftpb.ConfChangeI) string { + var s string + if confV1, ok := conf.AsV1(); ok { + s = confV1.Type.String() + } else { + for i, chg := range conf.AsV2().Changes { + if i > 0 { + s += "/" + } + s += chg.Type.String() + } + } + return s +} + +func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChangeI) error { + typ, data, err := raftpb.MarshalConfChange(conf) if err != nil { return err } + n.index++ - n.Record(testutil.Action{Name: "ProposeConfChange:" + conf.Type.String()}) - n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Index: n.index, Type: raftpb.EntryConfChange, Data: data}}} + n.Record(testutil.Action{Name: "ProposeConfChange:" + confChangeActionName(conf)}) + n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Index: n.index, Type: typ, Data: data}}} return nil } func (n *nodeConfChangeCommitterRecorder) Ready() <-chan raft.Ready { return n.readyc } -func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState { - n.Record(testutil.Action{Name: "ApplyConfChange:" + conf.Type.String()}) +func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChangeI) *raftpb.ConfState { + n.Record(testutil.Action{Name: "ApplyConfChange:" + confChangeActionName(conf)}) return &raftpb.ConfState{} } diff --git a/raft/bootstrap.go b/raft/bootstrap.go index fdd098756..bd82b2041 100644 --- a/raft/bootstrap.go +++ b/raft/bootstrap.go @@ -74,7 +74,7 @@ func (rn *RawNode) Bootstrap(peers []Peer) error { // the invariant that committed < unstable? rn.raft.raftLog.committed = uint64(len(ents)) for _, peer := range peers { - rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}) + rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}.AsV2()) } return nil } diff --git a/raft/confchange/confchange.go b/raft/confchange/confchange.go index 8f1ed0bee..bfb2033c9 100644 --- a/raft/confchange/confchange.go +++ b/raft/confchange/confchange.go @@ -46,7 +46,7 @@ type Changer struct { // (Section 4.3) corresponds to `C_{new,old}`. // // [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf -func (c Changer) EnterJoint(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) { +func (c Changer) EnterJoint(autoLeave bool, ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) { cfg, prs, err := c.checkAndCopy() if err != nil { return c.err(err) @@ -74,7 +74,7 @@ func (c Changer) EnterJoint(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker if err := c.apply(&cfg, prs, ccs...); err != nil { return c.err(err) } - + cfg.AutoLeave = autoLeave return checkAndReturn(cfg, prs) } @@ -120,6 +120,7 @@ func (c Changer) LeaveJoint() (tracker.Config, tracker.ProgressMap, error) { } } *outgoingPtr(&cfg.Voters) = nil + cfg.AutoLeave = false return checkAndReturn(cfg, prs) } @@ -142,7 +143,7 @@ func (c Changer) Simple(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.Pro return c.err(err) } if n := symdiff(incoming(c.Tracker.Voters), incoming(cfg.Voters)); n > 1 { - return tracker.Config{}, nil, errors.New("more than voter changed without entering joint config") + return tracker.Config{}, nil, errors.New("more than one voter changed without entering joint config") } if err := checkInvariants(cfg, prs); err != nil { return tracker.Config{}, tracker.ProgressMap{}, nil @@ -327,6 +328,9 @@ func checkInvariants(cfg tracker.Config, prs tracker.ProgressMap) error { if cfg.LearnersNext != nil { return fmt.Errorf("LearnersNext must be nil when not joint") } + if cfg.AutoLeave { + return fmt.Errorf("AutoLeave must be false when not joint") + } } return nil diff --git a/raft/confchange/datadriven_test.go b/raft/confchange/datadriven_test.go index dbbb4d95b..063d927da 100644 --- a/raft/confchange/datadriven_test.go +++ b/raft/confchange/datadriven_test.go @@ -85,7 +85,11 @@ func TestConfChangeDataDriven(t *testing.T) { case "simple": cfg, prs, err = c.Simple(ccs...) case "enter-joint": - cfg, prs, err = c.EnterJoint(ccs...) + var autoLeave bool + if len(d.CmdArgs) > 0 { + d.ScanArgs(t, "autoleave", &autoLeave) + } + cfg, prs, err = c.EnterJoint(autoLeave, ccs...) case "leave-joint": if len(ccs) > 0 { err = errors.New("this command takes no input") diff --git a/raft/confchange/quick_test.go b/raft/confchange/quick_test.go index f2dc9a635..04a77b3ce 100644 --- a/raft/confchange/quick_test.go +++ b/raft/confchange/quick_test.go @@ -15,6 +15,7 @@ package confchange import ( + "fmt" "math/rand" "reflect" "testing" @@ -36,16 +37,38 @@ func TestConfChangeQuick(t *testing.T) { const infoCount = 5 runWithJoint := func(c *Changer, ccs []pb.ConfChangeSingle) error { - cfg, prs, err := c.EnterJoint(ccs...) + cfg, prs, err := c.EnterJoint(false /* autoLeave */, ccs...) if err != nil { return err } + // Also do this with autoLeave on, just to check that we'd get the same + // result. + cfg2a, prs2a, err := c.EnterJoint(true /* autoLeave */, ccs...) + if err != nil { + return err + } + cfg2a.AutoLeave = false + if !reflect.DeepEqual(cfg, cfg2a) || !reflect.DeepEqual(prs, prs2a) { + return fmt.Errorf("cfg: %+v\ncfg2a: %+v\nprs: %+v\nprs2a: %+v", + cfg, cfg2a, prs, prs2a) + } + c.Tracker.Config = cfg + c.Tracker.Progress = prs + cfg2b, prs2b, err := c.LeaveJoint() + if err != nil { + return err + } + // Reset back to the main branch with autoLeave=false. c.Tracker.Config = cfg c.Tracker.Progress = prs cfg, prs, err = c.LeaveJoint() if err != nil { return err } + if !reflect.DeepEqual(cfg, cfg2b) || !reflect.DeepEqual(prs, prs2b) { + return fmt.Errorf("cfg: %+v\ncfg2b: %+v\nprs: %+v\nprs2b: %+v", + cfg, cfg2b, prs, prs2b) + } c.Tracker.Config = cfg c.Tracker.Progress = prs return nil diff --git a/raft/confchange/testdata/joint_autoleave.txt b/raft/confchange/testdata/joint_autoleave.txt new file mode 100644 index 000000000..be855f781 --- /dev/null +++ b/raft/confchange/testdata/joint_autoleave.txt @@ -0,0 +1,29 @@ +# Test the autoleave argument to EnterJoint. It defaults to false in the +# datadriven tests. The flag has no associated semantics in this package, +# it is simply passed through. +simple +v1 +---- +voters=(1) +1: StateProbe match=0 next=1 + +# Autoleave is reflected in the config. +enter-joint autoleave=true +v2 v3 +---- +voters=(1 2 3)&&(1) autoleave +1: StateProbe match=0 next=1 +2: StateProbe match=0 next=2 +3: StateProbe match=0 next=2 + +# Can't enter-joint twice, even if autoleave changes. +enter-joint autoleave=false +---- +config is already joint + +leave-joint +---- +voters=(1 2 3) +1: StateProbe match=0 next=1 +2: StateProbe match=0 next=2 +3: StateProbe match=0 next=2 diff --git a/raft/confchange/testdata/simple_safety.txt b/raft/confchange/testdata/simple_safety.txt index 4bf420fc1..b360737fb 100644 --- a/raft/confchange/testdata/simple_safety.txt +++ b/raft/confchange/testdata/simple_safety.txt @@ -20,7 +20,7 @@ voters=(1 2) learners=(3) simple r1 v5 ---- -more than voter changed without entering joint config +more than one voter changed without entering joint config simple r1 r2 @@ -30,12 +30,12 @@ removed all voters simple v3 v4 ---- -more than voter changed without entering joint config +more than one voter changed without entering joint config simple l1 v5 ---- -more than voter changed without entering joint config +more than one voter changed without entering joint config simple l1 l2 diff --git a/raft/node.go b/raft/node.go index 6b730c0d4..ddfce0407 100644 --- a/raft/node.go +++ b/raft/node.go @@ -132,10 +132,20 @@ type Node interface { // Propose proposes that data be appended to the log. Note that proposals can be lost without // notice, therefore it is user's job to ensure proposal retries. Propose(ctx context.Context, data []byte) error - // ProposeConfChange proposes config change. - // At most one ConfChange can be in the process of going through consensus. - // Application needs to call ApplyConfChange when applying EntryConfChange type entry. - ProposeConfChange(ctx context.Context, cc pb.ConfChange) error + // ProposeConfChange proposes a configuration change. Like any proposal, the + // configuration change may be dropped with or without an error being + // returned. In particular, configuration changes are dropped unless the + // leader has certainty that there is no prior unapplied configuration + // change in its log. + // + // The method accepts either a pb.ConfChange (deprecated) or pb.ConfChangeV2 + // message. The latter allows arbitrary configuration changes via joint + // consensus, notably including replacing a voter. Passing a ConfChangeV2 + // message is only allowed if all Nodes participating in the cluster run a + // version of this library aware of the V2 API. See pb.ConfChangeV2 for + // usage details and semantics. + ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error + // Step advances the state machine using the given message. ctx.Err() will be returned, if any. Step(ctx context.Context, msg pb.Message) error @@ -156,11 +166,13 @@ type Node interface { // a long time to apply the snapshot data. To continue receiving Ready without blocking raft // progress, it can call Advance before finishing applying the last ready. Advance() - // ApplyConfChange applies config change to the local node. - // Returns an opaque ConfState protobuf which must be recorded - // in snapshots. Will never return nil; it returns a pointer only - // to match MemoryStorage.Compact. - ApplyConfChange(cc pb.ConfChange) *pb.ConfState + // ApplyConfChange applies a config change (previously passed to + // ProposeConfChange) to the node. This must be called whenever a config + // change is observed in Ready.CommittedEntries. + // + // Returns an opaque non-nil ConfState protobuf which must be recorded in + // snapshots. + ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState // TransferLeadership attempts to transfer leadership to the given transferee. TransferLeadership(ctx context.Context, lead, transferee uint64) @@ -240,7 +252,7 @@ type msgWithResult struct { type node struct { propc chan msgWithResult recvc chan pb.Message - confc chan pb.ConfChange + confc chan pb.ConfChangeV2 confstatec chan pb.ConfState readyc chan Ready advancec chan struct{} @@ -256,7 +268,7 @@ func newNode() node { return node{ propc: make(chan msgWithResult), recvc: make(chan pb.Message), - confc: make(chan pb.ConfChange), + confc: make(chan pb.ConfChangeV2), confstatec: make(chan pb.ConfState), readyc: make(chan Ready), advancec: make(chan struct{}), @@ -341,11 +353,27 @@ func (n *node) run(rn *RawNode) { r.Step(m) } case cc := <-n.confc: + _, okBefore := r.prs.Progress[r.id] cs := r.applyConfChange(cc) - if _, ok := r.prs.Progress[r.id]; !ok { - // block incoming proposal when local node is - // removed - if cc.NodeID == r.id { + // If the node was removed, block incoming proposals. Note that we + // only do this if the node was in the config before. Nodes may be + // a member of the group without knowing this (when they're catching + // up on the log and don't have the latest config) and we don't want + // to block the proposal channel in that case. + // + // NB: propc is reset when the leader changes, which, if we learn + // about it, sort of implies that we got readded, maybe? This isn't + // very sound and likely has bugs. + if _, okAfter := r.prs.Progress[r.id]; okBefore && !okAfter { + var found bool + for _, sl := range [][]uint64{cs.Nodes, cs.NodesJoint} { + for _, id := range sl { + if id == r.id { + found = true + } + } + } + if !found { propc = nil } } @@ -397,12 +425,20 @@ func (n *node) Step(ctx context.Context, m pb.Message) error { return n.step(ctx, m) } -func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error { - data, err := cc.Marshal() +func confChangeToMsg(c pb.ConfChangeI) (pb.Message, error) { + typ, data, err := pb.MarshalConfChange(c) + if err != nil { + return pb.Message{}, err + } + return pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: typ, Data: data}}}, nil +} + +func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error { + msg, err := confChangeToMsg(cc) if err != nil { return err } - return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}}) + return n.Step(ctx, msg) } func (n *node) step(ctx context.Context, m pb.Message) error { @@ -463,10 +499,10 @@ func (n *node) Advance() { } } -func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { +func (n *node) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState { var cs pb.ConfState select { - case n.confc <- cc: + case n.confc <- cc.AsV2(): case <-n.done: } select { diff --git a/raft/quorum/majority.go b/raft/quorum/majority.go index 5eba50344..9b10e9583 100644 --- a/raft/quorum/majority.go +++ b/raft/quorum/majority.go @@ -102,6 +102,16 @@ func (c MajorityConfig) Describe(l AckedIndexer) string { return buf.String() } +// Slice returns the MajorityConfig as a sorted slice. +func (c MajorityConfig) Slice() []uint64 { + var sl []uint64 + for id := range c { + sl = append(sl, id) + } + sort.Slice(sl, func(i, j int) bool { return sl[i] < sl[j] }) + return sl +} + type uint64Slice []uint64 func insertionSort(sl uint64Slice) { diff --git a/raft/raft.go b/raft/raft.go index 61ea892eb..16174f210 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -357,11 +357,11 @@ func newRaft(c *Config) *raft { } for _, p := range peers { // Add node to active config. - r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: p}) + r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: p}.AsV2()) } for _, p := range learners { // Add learner to active config. - r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddLearnerNode, NodeID: p}) + r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddLearnerNode, NodeID: p}.AsV2()) } if !isHardStateEqual(hs, emptyState) { @@ -551,6 +551,46 @@ func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { }) } +func (r *raft) advance(rd Ready) { + // If entries were applied (or a snapshot), update our cursor for + // the next Ready. Note that if the current HardState contains a + // new Commit index, this does not mean that we're also applying + // all of the new entries due to commit pagination by size. + if index := rd.appliedCursor(); index > 0 { + r.raftLog.appliedTo(index) + if r.prs.Config.AutoLeave && index >= r.pendingConfIndex && r.state == StateLeader { + // If the current (and most recent, at least for this leader's term) + // configuration should be auto-left, initiate that now. + ccdata, err := (&pb.ConfChangeV2{}).Marshal() + if err != nil { + panic(err) + } + ent := pb.Entry{ + Type: pb.EntryConfChangeV2, + Data: ccdata, + } + if !r.appendEntry(ent) { + // If we could not append the entry, bump the pending conf index + // so that we'll try again later. + // + // TODO(tbg): test this case. + r.pendingConfIndex = r.raftLog.lastIndex() + } else { + r.logger.Infof("initiating automatic transition out of joint configuration %s", r.prs.Config) + } + } + } + r.reduceUncommittedSize(rd.CommittedEntries) + + if len(rd.Entries) > 0 { + e := rd.Entries[len(rd.Entries)-1] + r.raftLog.stableTo(e.Index, e.Term) + } + if !IsEmptySnap(rd.Snapshot) { + r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index) + } +} + // maybeCommit attempts to advance the commit index. Returns true if // the commit index changed (in which case the caller should call // r.bcastAppend). @@ -973,10 +1013,10 @@ func stepLeader(r *raft, m pb.Message) error { for i := range m.Entries { e := &m.Entries[i] - if e.Type == pb.EntryConfChange { + if e.Type == pb.EntryConfChange || e.Type == pb.EntryConfChangeV2 { if r.pendingConfIndex > r.raftLog.applied { - r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]", - e, r.pendingConfIndex, r.raftLog.applied) + r.logger.Infof("%x propose conf %s ignored since pending unapplied configuration [index %d, applied %d]", + r.id, e, r.pendingConfIndex, r.raftLog.applied) m.Entries[i] = pb.Entry{Type: pb.EntryNormal} } else { r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1 @@ -1376,10 +1416,10 @@ func (r *raft) restore(s pb.Snapshot) bool { // Reset the configuration and add the (potentially updated) peers in anew. r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight) for _, id := range s.Metadata.ConfState.Nodes { - r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddNode}) + r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddNode}.AsV2()) } for _, id := range s.Metadata.ConfState.Learners { - r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddLearnerNode}) + r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddLearnerNode}.AsV2()) } pr := r.prs.Progress[r.id] @@ -1397,24 +1437,38 @@ func (r *raft) promotable() bool { return pr != nil && !pr.IsLearner } -func (r *raft) applyConfChange(cc pb.ConfChange) pb.ConfState { - cfg, prs, err := confchange.Changer{ - Tracker: r.prs, - LastIndex: r.raftLog.lastIndex(), - }.Simple(pb.ConfChangeSingle{ - Type: cc.Type, - NodeID: cc.NodeID, - }) +func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState { + cfg, prs, err := func() (tracker.Config, tracker.ProgressMap, error) { + changer := confchange.Changer{ + Tracker: r.prs, + LastIndex: r.raftLog.lastIndex(), + } + if cc.LeaveJoint() { + return changer.LeaveJoint() + } else if autoLeave, ok := cc.EnterJoint(); ok { + return changer.EnterJoint(autoLeave, cc.Changes...) + } + return changer.Simple(cc.Changes...) + }() + if err != nil { + // TODO(tbg): return the error to the caller. panic(err) } + r.prs.Config = cfg r.prs.Progress = prs r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config) // Now that the configuration is updated, handle any side effects. - cs := pb.ConfState{Nodes: r.prs.VoterNodes(), Learners: r.prs.LearnerNodes()} + cs := pb.ConfState{ + Nodes: r.prs.Voters[0].Slice(), + NodesJoint: r.prs.Voters[1].Slice(), + Learners: quorum.MajorityConfig(r.prs.Learners).Slice(), + LearnersNext: quorum.MajorityConfig(r.prs.LearnersNext).Slice(), + AutoLeave: r.prs.AutoLeave, + } pr, ok := r.prs.Progress[r.id] // Update whether the node itself is a learner, resetting to false when the diff --git a/raft/raft_snap_test.go b/raft/raft_snap_test.go index 187aef697..2294382da 100644 --- a/raft/raft_snap_test.go +++ b/raft/raft_snap_test.go @@ -142,7 +142,7 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) { // Add a follower to the group. Do this in a clandestine way for simplicity. // Also set up a snapshot that will be sent to the follower. - n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) + n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2()) s1.snapshot = pb.Snapshot{ Metadata: pb.SnapshotMetadata{ ConfState: pb.ConfState{Nodes: []uint64{1, 2}}, diff --git a/raft/raft_test.go b/raft/raft_test.go index 1e2d0e2af..1a5820bf2 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -356,8 +356,8 @@ func TestLearnerPromotion(t *testing.T) { nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) - n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) + n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2()) + n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2()) if n2.isLearner { t.Error("peer 2 is learner, want not") } @@ -1143,7 +1143,7 @@ func TestCommit(t *testing.T) { for j := 0; j < len(tt.matches); j++ { id := uint64(j) + 1 if id > 1 { - sm.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: id}) + sm.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: id}.AsV2()) } pr := sm.prs.Progress[id] pr.Match, pr.Next = tt.matches[j], tt.matches[j]+1 @@ -1931,7 +1931,7 @@ func TestNonPromotableVoterWithCheckQuorum(t *testing.T) { nt := newNetwork(a, b) setRandomizedElectionTimeout(b, b.electionTimeout+1) // Need to remove 2 again to make it a non-promotable node since newNetwork overwritten some internal states - b.applyConfChange(pb.ConfChange{Type: pb.ConfChangeRemoveNode, NodeID: 2}) + b.applyConfChange(pb.ConfChange{Type: pb.ConfChangeRemoveNode, NodeID: 2}.AsV2()) if b.promotable() { t.Fatalf("promotable = %v, want false", b.promotable()) @@ -3086,7 +3086,7 @@ func TestNewLeaderPendingConfig(t *testing.T) { // TestAddNode tests that addNode could update nodes correctly. func TestAddNode(t *testing.T) { r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) - r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2()) nodes := r.prs.VoterNodes() wnodes := []uint64{1, 2} if !reflect.DeepEqual(nodes, wnodes) { @@ -3098,7 +3098,7 @@ func TestAddNode(t *testing.T) { func TestAddLearner(t *testing.T) { r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) // Add new learner peer. - r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}.AsV2()) if r.isLearner { t.Fatal("expected 1 to be voter") } @@ -3112,13 +3112,13 @@ func TestAddLearner(t *testing.T) { } // Promote peer to voter. - r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2()) if r.prs.Progress[2].IsLearner { t.Fatal("expected 2 to be voter") } // Demote r. - r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddLearnerNode}) + r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddLearnerNode}.AsV2()) if !r.prs.Progress[1].IsLearner { t.Fatal("expected 1 to be learner") } @@ -3127,7 +3127,7 @@ func TestAddLearner(t *testing.T) { } // Promote r again. - r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddNode}) + r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddNode}.AsV2()) if r.prs.Progress[1].IsLearner { t.Fatal("expected 1 to be voter") } @@ -3149,7 +3149,7 @@ func TestAddNodeCheckQuorum(t *testing.T) { r.tick() } - r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2()) // This tick will reach electionTimeout, which triggers a quorum check. r.tick() @@ -3174,7 +3174,7 @@ func TestAddNodeCheckQuorum(t *testing.T) { // and removed list correctly. func TestRemoveNode(t *testing.T) { r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) - r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode}) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode}.AsV2()) w := []uint64{1} if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) @@ -3186,20 +3186,20 @@ func TestRemoveNode(t *testing.T) { t.Error("did not panic") } }() - r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode}) + r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode}.AsV2()) } // TestRemoveLearner tests that removeNode could update nodes and // and removed list correctly. func TestRemoveLearner(t *testing.T) { r := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) - r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode}) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode}.AsV2()) w := []uint64{1} if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } - w = []uint64{} + w = nil if g := r.prs.LearnerNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } @@ -3210,7 +3210,7 @@ func TestRemoveLearner(t *testing.T) { t.Error("did not panic") } }() - r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode}) + r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode}.AsV2()) } func TestPromotable(t *testing.T) { @@ -3342,7 +3342,7 @@ func TestCommitAfterRemoveNode(t *testing.T) { // Apply the config change. This reduces quorum requirements so the // pending command can now commit. - r.applyConfChange(cc) + r.applyConfChange(cc.AsV2()) ents = nextEnts(r, s) if len(ents) != 1 || ents[0].Type != pb.EntryNormal || string(ents[0].Data) != "hello" { @@ -3591,7 +3591,7 @@ func TestLeaderTransferRemoveNode(t *testing.T) { t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3) } - lead.applyConfChange(pb.ConfChange{NodeID: 3, Type: pb.ConfChangeRemoveNode}) + lead.applyConfChange(pb.ConfChange{NodeID: 3, Type: pb.ConfChangeRemoveNode}.AsV2()) checkLeaderTransferState(t, lead, StateLeader, 1) } @@ -3917,9 +3917,9 @@ func TestPreVoteWithCheckQuorum(t *testing.T) { // a MsgHup or MsgTimeoutNow. func TestLearnerCampaign(t *testing.T) { n1 := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) - n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}) + n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}.AsV2()) n2 := newTestRaft(2, []uint64{1}, 10, 1, NewMemoryStorage()) - n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}) + n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}.AsV2()) nt := newNetwork(n1, n2) nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup}) diff --git a/raft/raftpb/confchange.go b/raft/raftpb/confchange.go new file mode 100644 index 000000000..a91c18dc1 --- /dev/null +++ b/raft/raftpb/confchange.go @@ -0,0 +1,105 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package raftpb + +import ( + "fmt" + + "github.com/gogo/protobuf/proto" +) + +// ConfChangeI abstracts over ConfChangeV2 and (legacy) ConfChange to allow +// treating them in a unified manner. +type ConfChangeI interface { + AsV2() ConfChangeV2 + AsV1() (ConfChange, bool) +} + +// MarshalConfChange calls Marshal on the underlying ConfChange or ConfChangeV2 +// and returns the result along with the corresponding EntryType. +func MarshalConfChange(c ConfChangeI) (EntryType, []byte, error) { + var typ EntryType + var ccdata []byte + var err error + if ccv1, ok := c.AsV1(); ok { + typ = EntryConfChange + ccdata, err = ccv1.Marshal() + } else { + ccv2 := c.AsV2() + typ = EntryConfChangeV2 + ccdata, err = ccv2.Marshal() + } + return typ, ccdata, err +} + +// AsV2 returns a V2 configuration change carrying out the same operation. +func (c ConfChange) AsV2() ConfChangeV2 { + return ConfChangeV2{ + Changes: []ConfChangeSingle{{ + Type: c.Type, + NodeID: c.NodeID, + }}, + Context: c.Context, + } +} + +// AsV1 returns the ConfChange and true. +func (c ConfChange) AsV1() (ConfChange, bool) { + return c, true +} + +// AsV2 is the identity. +func (c ConfChangeV2) AsV2() ConfChangeV2 { return c } + +// AsV1 returns ConfChange{} and false. +func (c ConfChangeV2) AsV1() (ConfChange, bool) { return ConfChange{}, false } + +// EnterJoint returns two bools. The second bool is true if and only if this +// config change will use Joint Consensus, which is the case if it contains more +// than one change or if the use of Joint Consensus was requested explicitly. +// The first bool can only be true if second one is, and indicates whether the +// Joint State will be left automatically. +func (c *ConfChangeV2) EnterJoint() (autoLeave bool, ok bool) { + // NB: in theory, more config changes could qualify for the "simple" + // protocol but it depends on the config on top of which the changes apply. + // For example, adding two learners is not OK if both nodes are part of the + // base config (i.e. two voters are turned into learners in the process of + // applying the conf change). In practice, these distinctions should not + // matter, so we keep it simple and use Joint Consensus liberally. + if c.Transition != ConfChangeTransitionAuto || len(c.Changes) > 1 { + // Use Joint Consensus. + var autoLeave bool + switch c.Transition { + case ConfChangeTransitionAuto: + autoLeave = true + case ConfChangeTransitionJointImplicit: + autoLeave = true + case ConfChangeTransitionJointExplicit: + default: + panic(fmt.Sprintf("unknown transition: %+v", c)) + } + return autoLeave, true + } + return false, false +} + +// LeaveJoint is true if the configuration change leaves a joint configuration. +// This is the case if the ConfChangeV2 is zero, with the possible exception of +// the Context field. +func (c *ConfChangeV2) LeaveJoint() bool { + cpy := *c + cpy.Context = nil + return proto.Equal(&cpy, &ConfChangeV2{}) +} diff --git a/raft/rawnode.go b/raft/rawnode.go index b7e534346..9c192fdd0 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -91,23 +91,19 @@ func (rn *RawNode) Propose(data []byte) error { }}) } -// ProposeConfChange proposes a config change. -func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error { - data, err := cc.Marshal() +// ProposeConfChange proposes a config change. See (Node).ProposeConfChange for +// details. +func (rn *RawNode) ProposeConfChange(cc pb.ConfChangeI) error { + m, err := confChangeToMsg(cc) if err != nil { return err } - return rn.raft.Step(pb.Message{ - Type: pb.MsgProp, - Entries: []pb.Entry{ - {Type: pb.EntryConfChange, Data: data}, - }, - }) + return rn.raft.Step(m) } // ApplyConfChange applies a config change to the local node. -func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { - cs := rn.raft.applyConfChange(cc) +func (rn *RawNode) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState { + cs := rn.raft.applyConfChange(cc.AsV2()) return &cs } @@ -159,23 +155,7 @@ func (rn *RawNode) commitReady(rd Ready) { if !IsEmptyHardState(rd.HardState) { rn.prevHardSt = rd.HardState } - - // If entries were applied (or a snapshot), update our cursor for - // the next Ready. Note that if the current HardState contains a - // new Commit index, this does not mean that we're also applying - // all of the new entries due to commit pagination by size. - if index := rd.appliedCursor(); index > 0 { - rn.raft.raftLog.appliedTo(index) - } - rn.raft.reduceUncommittedSize(rd.CommittedEntries) - - if len(rd.Entries) > 0 { - e := rd.Entries[len(rd.Entries)-1] - rn.raft.raftLog.stableTo(e.Index, e.Term) - } - if !IsEmptySnap(rd.Snapshot) { - rn.raft.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index) - } + rn.raft.advance(rd) } // HasReady called when RawNode user need to check if any Ready pending. diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 373863876..80fb075c8 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -64,7 +64,7 @@ func (a *rawNodeAdapter) ReadIndex(_ context.Context, rctx []byte) error { } func (a *rawNodeAdapter) Step(_ context.Context, m pb.Message) error { return a.RawNode.Step(m) } func (a *rawNodeAdapter) Propose(_ context.Context, data []byte) error { return a.RawNode.Propose(data) } -func (a *rawNodeAdapter) ProposeConfChange(_ context.Context, cc pb.ConfChange) error { +func (a *rawNodeAdapter) ProposeConfChange(_ context.Context, cc pb.ConfChangeI) error { return a.RawNode.ProposeConfChange(cc) } @@ -105,65 +105,257 @@ func TestRawNodeStep(t *testing.T) { // TestNodeStepUnblock from node_test.go has no equivalent in rawNode because there is // no goroutine in RawNode. -// TestRawNodeProposeAndConfChange ensures that RawNode.Propose and RawNode.ProposeConfChange -// send the given proposal and ConfChange to the underlying raft. +// TestRawNodeProposeAndConfChange tests the configuration change mechanism. Each +// test case sends a configuration change which is either simple or joint, verifies +// that it applies and that the resulting ConfState matches expectations, and for +// joint configurations makes sure that they are exited successfully. func TestRawNodeProposeAndConfChange(t *testing.T) { - s := NewMemoryStorage() - var err error - rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s)) - if err != nil { - t.Fatal(err) + testCases := []struct { + cc pb.ConfChangeI + exp pb.ConfState + exp2 *pb.ConfState + }{ + // V1 config change. + { + pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 2}, + pb.ConfState{Nodes: []uint64{1, 2}}, + nil, + }, + // Proposing the same as a V2 change works just the same, without entering + // a joint config. + { + pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{ + {Type: pb.ConfChangeAddNode, NodeID: 2}, + }, + }, + pb.ConfState{Nodes: []uint64{1, 2}}, + nil, + }, + // Ditto if we add it as a learner instead. + { + pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{ + {Type: pb.ConfChangeAddLearnerNode, NodeID: 2}, + }, + }, + pb.ConfState{Nodes: []uint64{1}, Learners: []uint64{2}}, + nil, + }, + // We can ask explicitly for joint consensus if we want it. + { + pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{ + {Type: pb.ConfChangeAddLearnerNode, NodeID: 2}, + }, + Transition: pb.ConfChangeTransitionJointExplicit, + }, + pb.ConfState{Nodes: []uint64{1}, NodesJoint: []uint64{1}, Learners: []uint64{2}}, + &pb.ConfState{Nodes: []uint64{1}, Learners: []uint64{2}}, + }, + // Ditto, but with implicit transition (the harness checks this). + { + pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{ + {Type: pb.ConfChangeAddLearnerNode, NodeID: 2}, + }, + Transition: pb.ConfChangeTransitionJointImplicit, + }, + pb.ConfState{ + Nodes: []uint64{1}, NodesJoint: []uint64{1}, Learners: []uint64{2}, + AutoLeave: true, + }, + &pb.ConfState{Nodes: []uint64{1}, Learners: []uint64{2}}, + }, + // Add a new node and demote n1. This exercises the interesting case in + // which we really need joint config changes and also need LearnersNext. + { + pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{ + {NodeID: 2, Type: pb.ConfChangeAddNode}, + {NodeID: 1, Type: pb.ConfChangeAddLearnerNode}, + {NodeID: 3, Type: pb.ConfChangeAddLearnerNode}, + }, + }, + pb.ConfState{ + Nodes: []uint64{2}, + NodesJoint: []uint64{1}, + Learners: []uint64{3}, + LearnersNext: []uint64{1}, + AutoLeave: true, + }, + &pb.ConfState{Nodes: []uint64{2}, Learners: []uint64{1, 3}}, + }, + // Ditto explicit. + { + pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{ + {NodeID: 2, Type: pb.ConfChangeAddNode}, + {NodeID: 1, Type: pb.ConfChangeAddLearnerNode}, + {NodeID: 3, Type: pb.ConfChangeAddLearnerNode}, + }, + Transition: pb.ConfChangeTransitionJointExplicit, + }, + pb.ConfState{ + Nodes: []uint64{2}, + NodesJoint: []uint64{1}, + Learners: []uint64{3}, + LearnersNext: []uint64{1}, + }, + &pb.ConfState{Nodes: []uint64{2}, Learners: []uint64{1, 3}}, + }, + // Ditto implicit. + { + pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{ + {NodeID: 2, Type: pb.ConfChangeAddNode}, + {NodeID: 1, Type: pb.ConfChangeAddLearnerNode}, + {NodeID: 3, Type: pb.ConfChangeAddLearnerNode}, + }, + Transition: pb.ConfChangeTransitionJointImplicit, + }, + pb.ConfState{ + Nodes: []uint64{2}, + NodesJoint: []uint64{1}, + Learners: []uint64{3}, + LearnersNext: []uint64{1}, + AutoLeave: true, + }, + &pb.ConfState{Nodes: []uint64{2}, Learners: []uint64{1, 3}}, + }, } - rawNode.Campaign() - proposed := false - var ( - lastIndex uint64 - ccdata []byte - ) - for { - rd := rawNode.Ready() - s.Append(rd.Entries) - rawNode.Advance(rd) - // Once we are the leader, propose a command and a ConfChange. - if !proposed && rd.SoftState.Lead == rawNode.raft.id { - if err = rawNode.Propose([]byte("somedata")); err != nil { - t.Fatal(err) - } - cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 1} - ccdata, err = cc.Marshal() + for _, tc := range testCases { + t.Run("", func(t *testing.T) { + s := NewMemoryStorage() + rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s)) if err != nil { t.Fatal(err) } - rawNode.ProposeConfChange(cc) - proposed = true - } else if proposed { - // We proposed last cycle, which means we appended the conf change - // in this cycle. + rawNode.Campaign() + proposed := false + var ( + lastIndex uint64 + ccdata []byte + ) + // Propose the ConfChange, wait until it applies, save the resulting + // ConfState. + var cs *pb.ConfState + for cs == nil { + rd := rawNode.Ready() + s.Append(rd.Entries) + for _, ent := range rd.CommittedEntries { + var cc pb.ConfChangeI + if ent.Type == pb.EntryConfChange { + var ccc pb.ConfChange + if err = ccc.Unmarshal(ent.Data); err != nil { + t.Fatal(err) + } + cc = ccc + } else if ent.Type == pb.EntryConfChangeV2 { + var ccc pb.ConfChangeV2 + if err = ccc.Unmarshal(ent.Data); err != nil { + t.Fatal(err) + } + cc = ccc + } + if cc != nil { + cs = rawNode.ApplyConfChange(cc) + } + } + rawNode.Advance(rd) + // Once we are the leader, propose a command and a ConfChange. + if !proposed && rd.SoftState.Lead == rawNode.raft.id { + if err = rawNode.Propose([]byte("somedata")); err != nil { + t.Fatal(err) + } + if ccv1, ok := tc.cc.AsV1(); ok { + ccdata, err = ccv1.Marshal() + if err != nil { + t.Fatal(err) + } + rawNode.ProposeConfChange(ccv1) + } else { + ccv2 := tc.cc.AsV2() + ccdata, err = ccv2.Marshal() + if err != nil { + t.Fatal(err) + } + rawNode.ProposeConfChange(ccv2) + } + proposed = true + } + } + + // Check that the last index is exactly the conf change we put in, + // down to the bits. lastIndex, err = s.LastIndex() if err != nil { t.Fatal(err) } - break - } - } - entries, err := s.Entries(lastIndex-1, lastIndex+1, noLimit) - if err != nil { - t.Fatal(err) - } - if len(entries) != 2 { - t.Fatalf("len(entries) = %d, want %d", len(entries), 2) - } - if !bytes.Equal(entries[0].Data, []byte("somedata")) { - t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata")) - } - if entries[1].Type != pb.EntryConfChange { - t.Fatalf("type = %v, want %v", entries[1].Type, pb.EntryConfChange) - } - if !bytes.Equal(entries[1].Data, ccdata) { - t.Errorf("data = %v, want %v", entries[1].Data, ccdata) + entries, err := s.Entries(lastIndex-1, lastIndex+1, noLimit) + if err != nil { + t.Fatal(err) + } + if len(entries) != 2 { + t.Fatalf("len(entries) = %d, want %d", len(entries), 2) + } + if !bytes.Equal(entries[0].Data, []byte("somedata")) { + t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata")) + } + typ := pb.EntryConfChange + if _, ok := tc.cc.AsV1(); !ok { + typ = pb.EntryConfChangeV2 + } + if entries[1].Type != typ { + t.Fatalf("type = %v, want %v", entries[1].Type, typ) + } + if !bytes.Equal(entries[1].Data, ccdata) { + t.Errorf("data = %v, want %v", entries[1].Data, ccdata) + } + + if exp := &tc.exp; !reflect.DeepEqual(exp, cs) { + t.Fatalf("exp:\n%+v\nact:\n%+v", exp, cs) + } + + if exp, act := lastIndex, rawNode.raft.pendingConfIndex; exp != act { + t.Fatalf("pendingConfIndex: expected %d, got %d", exp, act) + } + + // Move the RawNode along. If the ConfChange was simple, nothing else + // should happen. Otherwise, we're in a joint state, which is either + // left automatically or not. If not, we add the proposal that leaves + // it manually. + rd := rawNode.Ready() + var context []byte + if !tc.exp.AutoLeave { + if len(rd.Entries) > 0 { + t.Fatal("expected no more entries") + } + if tc.exp2 == nil { + return + } + context = []byte("manual") + t.Log("leaving joint state manually") + if err := rawNode.ProposeConfChange(pb.ConfChangeV2{Context: context}); err != nil { + t.Fatal(err) + } + rd = rawNode.Ready() + } + + // Check that the right ConfChange comes out. + if len(rd.Entries) != 1 || rd.Entries[0].Type != pb.EntryConfChangeV2 { + t.Fatalf("expected exactly one more entry, got %+v", rd) + } + var cc pb.ConfChangeV2 + if err := cc.Unmarshal(rd.Entries[0].Data); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(cc, pb.ConfChangeV2{Context: context}) { + t.Fatalf("expected zero ConfChangeV2, got %+v", cc) + } + // Lie and pretend the ConfChange applied. It won't do so because now + // we require the joint quorum and we're only running one node. + cs = rawNode.ApplyConfChange(cc) + if exp := tc.exp2; !reflect.DeepEqual(exp, cs) { + t.Fatalf("exp:\n%+v\nact:\n%+v", exp, cs) + } + }) } } diff --git a/raft/tracker/tracker.go b/raft/tracker/tracker.go index a2638f5f0..f67f3aa53 100644 --- a/raft/tracker/tracker.go +++ b/raft/tracker/tracker.go @@ -25,6 +25,11 @@ import ( // Config reflects the configuration tracked in a ProgressTracker. type Config struct { Voters quorum.JointConfig + // AutoLeave is true if the configuration is joint and a transition to the + // incoming configuration should be carried out automatically by Raft when + // this is possible. If false, the configuration will be joint until the + // application initiates the transition manually. + AutoLeave bool // Learners is a set of IDs corresponding to the learners active in the // current configuration. // @@ -80,6 +85,9 @@ func (c Config) String() string { if c.LearnersNext != nil { fmt.Fprintf(&buf, " learners_next=%s", quorum.MajorityConfig(c.LearnersNext).String()) } + if c.AutoLeave { + fmt.Fprintf(&buf, " autoleave") + } return buf.String() } @@ -192,6 +200,9 @@ func (p *ProgressTracker) VoterNodes() []uint64 { // LearnerNodes returns a sorted slice of learners. func (p *ProgressTracker) LearnerNodes() []uint64 { + if len(p.Learners) == 0 { + return nil + } nodes := make([]uint64, 0, len(p.Learners)) for id := range p.Learners { nodes = append(nodes, id)