raft: allow use of joint quorums

This change introduces joint quorums by changing the Node and RawNode
API to accept pb.ConfChangeV2 (on top of pb.ConfChange).

pb.ConfChange continues to work as today: it allows carrying out a
single configuration change. A pb.ConfChange proposal gets added to
the Raft log as such and is thus also observed by the app during Ready
handling, and fed back to ApplyConfChange.

ConfChangeV2 allows joint configuration changes but will continue to
carry out configuration changes in "one phase" (i.e. without ever
entering a joint config) when this is possible.
release-3.4
Tobias Schottdorf 2019-07-22 22:30:47 +02:00
parent 88f5561733
commit b67303c6a2
17 changed files with 616 additions and 153 deletions

View File

@ -178,8 +178,8 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
}
}
// TestConfgChangeBlocksApply ensures apply blocks if committed entries contain config-change.
func TestConfgChangeBlocksApply(t *testing.T) {
// TestConfigChangeBlocksApply ensures apply blocks if committed entries contain config-change.
func TestConfigChangeBlocksApply(t *testing.T) {
n := newNopReadyNode()
r := newRaftNode(raftNodeConfig{

View File

@ -28,8 +28,6 @@ import (
"testing"
"time"
"go.uber.org/zap"
"go.etcd.io/etcd/etcdserver/api/membership"
"go.etcd.io/etcd/etcdserver/api/rafthttp"
"go.etcd.io/etcd/etcdserver/api/snap"
@ -49,6 +47,7 @@ import (
"go.etcd.io/etcd/pkg/wait"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
"go.uber.org/zap"
)
// TestDoLocalAction tests requests which do not need to go through raft to be applied,
@ -1632,7 +1631,7 @@ func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error {
n.Record(testutil.Action{Name: "Propose", Params: []interface{}{data}})
return nil
}
func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChangeI) error {
n.Record(testutil.Action{Name: "ProposeConfChange"})
return nil
}
@ -1645,7 +1644,7 @@ func (n *nodeRecorder) Ready() <-chan raft.Ready
func (n *nodeRecorder) TransferLeadership(ctx context.Context, lead, transferee uint64) {}
func (n *nodeRecorder) ReadIndex(ctx context.Context, rctx []byte) error { return nil }
func (n *nodeRecorder) Advance() {}
func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChangeI) *raftpb.ConfState {
n.Record(testutil.Action{Name: "ApplyConfChange", Params: []interface{}{conf}})
return &raftpb.ConfState{}
}
@ -1706,21 +1705,37 @@ func newNodeConfChangeCommitterStream() *nodeConfChangeCommitterRecorder {
return &nodeConfChangeCommitterRecorder{*newReadyNode(), 0}
}
func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
data, err := conf.Marshal()
func confChangeActionName(conf raftpb.ConfChangeI) string {
var s string
if confV1, ok := conf.AsV1(); ok {
s = confV1.Type.String()
} else {
for i, chg := range conf.AsV2().Changes {
if i > 0 {
s += "/"
}
s += chg.Type.String()
}
}
return s
}
func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChangeI) error {
typ, data, err := raftpb.MarshalConfChange(conf)
if err != nil {
return err
}
n.index++
n.Record(testutil.Action{Name: "ProposeConfChange:" + conf.Type.String()})
n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Index: n.index, Type: raftpb.EntryConfChange, Data: data}}}
n.Record(testutil.Action{Name: "ProposeConfChange:" + confChangeActionName(conf)})
n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Index: n.index, Type: typ, Data: data}}}
return nil
}
func (n *nodeConfChangeCommitterRecorder) Ready() <-chan raft.Ready {
return n.readyc
}
func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
n.Record(testutil.Action{Name: "ApplyConfChange:" + conf.Type.String()})
func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChangeI) *raftpb.ConfState {
n.Record(testutil.Action{Name: "ApplyConfChange:" + confChangeActionName(conf)})
return &raftpb.ConfState{}
}

View File

@ -74,7 +74,7 @@ func (rn *RawNode) Bootstrap(peers []Peer) error {
// the invariant that committed < unstable?
rn.raft.raftLog.committed = uint64(len(ents))
for _, peer := range peers {
rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}.AsV2())
}
return nil
}

View File

@ -46,7 +46,7 @@ type Changer struct {
// (Section 4.3) corresponds to `C_{new,old}`.
//
// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
func (c Changer) EnterJoint(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) {
func (c Changer) EnterJoint(autoLeave bool, ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) {
cfg, prs, err := c.checkAndCopy()
if err != nil {
return c.err(err)
@ -74,7 +74,7 @@ func (c Changer) EnterJoint(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker
if err := c.apply(&cfg, prs, ccs...); err != nil {
return c.err(err)
}
cfg.AutoLeave = autoLeave
return checkAndReturn(cfg, prs)
}
@ -120,6 +120,7 @@ func (c Changer) LeaveJoint() (tracker.Config, tracker.ProgressMap, error) {
}
}
*outgoingPtr(&cfg.Voters) = nil
cfg.AutoLeave = false
return checkAndReturn(cfg, prs)
}
@ -142,7 +143,7 @@ func (c Changer) Simple(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.Pro
return c.err(err)
}
if n := symdiff(incoming(c.Tracker.Voters), incoming(cfg.Voters)); n > 1 {
return tracker.Config{}, nil, errors.New("more than voter changed without entering joint config")
return tracker.Config{}, nil, errors.New("more than one voter changed without entering joint config")
}
if err := checkInvariants(cfg, prs); err != nil {
return tracker.Config{}, tracker.ProgressMap{}, nil
@ -327,6 +328,9 @@ func checkInvariants(cfg tracker.Config, prs tracker.ProgressMap) error {
if cfg.LearnersNext != nil {
return fmt.Errorf("LearnersNext must be nil when not joint")
}
if cfg.AutoLeave {
return fmt.Errorf("AutoLeave must be false when not joint")
}
}
return nil

View File

@ -85,7 +85,11 @@ func TestConfChangeDataDriven(t *testing.T) {
case "simple":
cfg, prs, err = c.Simple(ccs...)
case "enter-joint":
cfg, prs, err = c.EnterJoint(ccs...)
var autoLeave bool
if len(d.CmdArgs) > 0 {
d.ScanArgs(t, "autoleave", &autoLeave)
}
cfg, prs, err = c.EnterJoint(autoLeave, ccs...)
case "leave-joint":
if len(ccs) > 0 {
err = errors.New("this command takes no input")

View File

@ -15,6 +15,7 @@
package confchange
import (
"fmt"
"math/rand"
"reflect"
"testing"
@ -36,16 +37,38 @@ func TestConfChangeQuick(t *testing.T) {
const infoCount = 5
runWithJoint := func(c *Changer, ccs []pb.ConfChangeSingle) error {
cfg, prs, err := c.EnterJoint(ccs...)
cfg, prs, err := c.EnterJoint(false /* autoLeave */, ccs...)
if err != nil {
return err
}
// Also do this with autoLeave on, just to check that we'd get the same
// result.
cfg2a, prs2a, err := c.EnterJoint(true /* autoLeave */, ccs...)
if err != nil {
return err
}
cfg2a.AutoLeave = false
if !reflect.DeepEqual(cfg, cfg2a) || !reflect.DeepEqual(prs, prs2a) {
return fmt.Errorf("cfg: %+v\ncfg2a: %+v\nprs: %+v\nprs2a: %+v",
cfg, cfg2a, prs, prs2a)
}
c.Tracker.Config = cfg
c.Tracker.Progress = prs
cfg2b, prs2b, err := c.LeaveJoint()
if err != nil {
return err
}
// Reset back to the main branch with autoLeave=false.
c.Tracker.Config = cfg
c.Tracker.Progress = prs
cfg, prs, err = c.LeaveJoint()
if err != nil {
return err
}
if !reflect.DeepEqual(cfg, cfg2b) || !reflect.DeepEqual(prs, prs2b) {
return fmt.Errorf("cfg: %+v\ncfg2b: %+v\nprs: %+v\nprs2b: %+v",
cfg, cfg2b, prs, prs2b)
}
c.Tracker.Config = cfg
c.Tracker.Progress = prs
return nil

View File

@ -0,0 +1,29 @@
# Test the autoleave argument to EnterJoint. It defaults to false in the
# datadriven tests. The flag has no associated semantics in this package,
# it is simply passed through.
simple
v1
----
voters=(1)
1: StateProbe match=0 next=1
# Autoleave is reflected in the config.
enter-joint autoleave=true
v2 v3
----
voters=(1 2 3)&&(1) autoleave
1: StateProbe match=0 next=1
2: StateProbe match=0 next=2
3: StateProbe match=0 next=2
# Can't enter-joint twice, even if autoleave changes.
enter-joint autoleave=false
----
config is already joint
leave-joint
----
voters=(1 2 3)
1: StateProbe match=0 next=1
2: StateProbe match=0 next=2
3: StateProbe match=0 next=2

View File

@ -20,7 +20,7 @@ voters=(1 2) learners=(3)
simple
r1 v5
----
more than voter changed without entering joint config
more than one voter changed without entering joint config
simple
r1 r2
@ -30,12 +30,12 @@ removed all voters
simple
v3 v4
----
more than voter changed without entering joint config
more than one voter changed without entering joint config
simple
l1 v5
----
more than voter changed without entering joint config
more than one voter changed without entering joint config
simple
l1 l2

View File

@ -132,10 +132,20 @@ type Node interface {
// Propose proposes that data be appended to the log. Note that proposals can be lost without
// notice, therefore it is user's job to ensure proposal retries.
Propose(ctx context.Context, data []byte) error
// ProposeConfChange proposes config change.
// At most one ConfChange can be in the process of going through consensus.
// Application needs to call ApplyConfChange when applying EntryConfChange type entry.
ProposeConfChange(ctx context.Context, cc pb.ConfChange) error
// ProposeConfChange proposes a configuration change. Like any proposal, the
// configuration change may be dropped with or without an error being
// returned. In particular, configuration changes are dropped unless the
// leader has certainty that there is no prior unapplied configuration
// change in its log.
//
// The method accepts either a pb.ConfChange (deprecated) or pb.ConfChangeV2
// message. The latter allows arbitrary configuration changes via joint
// consensus, notably including replacing a voter. Passing a ConfChangeV2
// message is only allowed if all Nodes participating in the cluster run a
// version of this library aware of the V2 API. See pb.ConfChangeV2 for
// usage details and semantics.
ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error
// Step advances the state machine using the given message. ctx.Err() will be returned, if any.
Step(ctx context.Context, msg pb.Message) error
@ -156,11 +166,13 @@ type Node interface {
// a long time to apply the snapshot data. To continue receiving Ready without blocking raft
// progress, it can call Advance before finishing applying the last ready.
Advance()
// ApplyConfChange applies config change to the local node.
// Returns an opaque ConfState protobuf which must be recorded
// in snapshots. Will never return nil; it returns a pointer only
// to match MemoryStorage.Compact.
ApplyConfChange(cc pb.ConfChange) *pb.ConfState
// ApplyConfChange applies a config change (previously passed to
// ProposeConfChange) to the node. This must be called whenever a config
// change is observed in Ready.CommittedEntries.
//
// Returns an opaque non-nil ConfState protobuf which must be recorded in
// snapshots.
ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState
// TransferLeadership attempts to transfer leadership to the given transferee.
TransferLeadership(ctx context.Context, lead, transferee uint64)
@ -240,7 +252,7 @@ type msgWithResult struct {
type node struct {
propc chan msgWithResult
recvc chan pb.Message
confc chan pb.ConfChange
confc chan pb.ConfChangeV2
confstatec chan pb.ConfState
readyc chan Ready
advancec chan struct{}
@ -256,7 +268,7 @@ func newNode() node {
return node{
propc: make(chan msgWithResult),
recvc: make(chan pb.Message),
confc: make(chan pb.ConfChange),
confc: make(chan pb.ConfChangeV2),
confstatec: make(chan pb.ConfState),
readyc: make(chan Ready),
advancec: make(chan struct{}),
@ -341,11 +353,27 @@ func (n *node) run(rn *RawNode) {
r.Step(m)
}
case cc := <-n.confc:
_, okBefore := r.prs.Progress[r.id]
cs := r.applyConfChange(cc)
if _, ok := r.prs.Progress[r.id]; !ok {
// block incoming proposal when local node is
// removed
if cc.NodeID == r.id {
// If the node was removed, block incoming proposals. Note that we
// only do this if the node was in the config before. Nodes may be
// a member of the group without knowing this (when they're catching
// up on the log and don't have the latest config) and we don't want
// to block the proposal channel in that case.
//
// NB: propc is reset when the leader changes, which, if we learn
// about it, sort of implies that we got readded, maybe? This isn't
// very sound and likely has bugs.
if _, okAfter := r.prs.Progress[r.id]; okBefore && !okAfter {
var found bool
for _, sl := range [][]uint64{cs.Nodes, cs.NodesJoint} {
for _, id := range sl {
if id == r.id {
found = true
}
}
}
if !found {
propc = nil
}
}
@ -397,12 +425,20 @@ func (n *node) Step(ctx context.Context, m pb.Message) error {
return n.step(ctx, m)
}
func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error {
data, err := cc.Marshal()
func confChangeToMsg(c pb.ConfChangeI) (pb.Message, error) {
typ, data, err := pb.MarshalConfChange(c)
if err != nil {
return pb.Message{}, err
}
return pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: typ, Data: data}}}, nil
}
func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error {
msg, err := confChangeToMsg(cc)
if err != nil {
return err
}
return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}})
return n.Step(ctx, msg)
}
func (n *node) step(ctx context.Context, m pb.Message) error {
@ -463,10 +499,10 @@ func (n *node) Advance() {
}
}
func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
func (n *node) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState {
var cs pb.ConfState
select {
case n.confc <- cc:
case n.confc <- cc.AsV2():
case <-n.done:
}
select {

View File

@ -102,6 +102,16 @@ func (c MajorityConfig) Describe(l AckedIndexer) string {
return buf.String()
}
// Slice returns the MajorityConfig as a sorted slice.
func (c MajorityConfig) Slice() []uint64 {
var sl []uint64
for id := range c {
sl = append(sl, id)
}
sort.Slice(sl, func(i, j int) bool { return sl[i] < sl[j] })
return sl
}
type uint64Slice []uint64
func insertionSort(sl uint64Slice) {

View File

@ -357,11 +357,11 @@ func newRaft(c *Config) *raft {
}
for _, p := range peers {
// Add node to active config.
r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: p})
r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: p}.AsV2())
}
for _, p := range learners {
// Add learner to active config.
r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddLearnerNode, NodeID: p})
r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddLearnerNode, NodeID: p}.AsV2())
}
if !isHardStateEqual(hs, emptyState) {
@ -551,6 +551,46 @@ func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
})
}
func (r *raft) advance(rd Ready) {
// If entries were applied (or a snapshot), update our cursor for
// the next Ready. Note that if the current HardState contains a
// new Commit index, this does not mean that we're also applying
// all of the new entries due to commit pagination by size.
if index := rd.appliedCursor(); index > 0 {
r.raftLog.appliedTo(index)
if r.prs.Config.AutoLeave && index >= r.pendingConfIndex && r.state == StateLeader {
// If the current (and most recent, at least for this leader's term)
// configuration should be auto-left, initiate that now.
ccdata, err := (&pb.ConfChangeV2{}).Marshal()
if err != nil {
panic(err)
}
ent := pb.Entry{
Type: pb.EntryConfChangeV2,
Data: ccdata,
}
if !r.appendEntry(ent) {
// If we could not append the entry, bump the pending conf index
// so that we'll try again later.
//
// TODO(tbg): test this case.
r.pendingConfIndex = r.raftLog.lastIndex()
} else {
r.logger.Infof("initiating automatic transition out of joint configuration %s", r.prs.Config)
}
}
}
r.reduceUncommittedSize(rd.CommittedEntries)
if len(rd.Entries) > 0 {
e := rd.Entries[len(rd.Entries)-1]
r.raftLog.stableTo(e.Index, e.Term)
}
if !IsEmptySnap(rd.Snapshot) {
r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
}
}
// maybeCommit attempts to advance the commit index. Returns true if
// the commit index changed (in which case the caller should call
// r.bcastAppend).
@ -973,10 +1013,10 @@ func stepLeader(r *raft, m pb.Message) error {
for i := range m.Entries {
e := &m.Entries[i]
if e.Type == pb.EntryConfChange {
if e.Type == pb.EntryConfChange || e.Type == pb.EntryConfChangeV2 {
if r.pendingConfIndex > r.raftLog.applied {
r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]",
e, r.pendingConfIndex, r.raftLog.applied)
r.logger.Infof("%x propose conf %s ignored since pending unapplied configuration [index %d, applied %d]",
r.id, e, r.pendingConfIndex, r.raftLog.applied)
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
} else {
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
@ -1376,10 +1416,10 @@ func (r *raft) restore(s pb.Snapshot) bool {
// Reset the configuration and add the (potentially updated) peers in anew.
r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
for _, id := range s.Metadata.ConfState.Nodes {
r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddNode})
r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddNode}.AsV2())
}
for _, id := range s.Metadata.ConfState.Learners {
r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddLearnerNode})
r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddLearnerNode}.AsV2())
}
pr := r.prs.Progress[r.id]
@ -1397,24 +1437,38 @@ func (r *raft) promotable() bool {
return pr != nil && !pr.IsLearner
}
func (r *raft) applyConfChange(cc pb.ConfChange) pb.ConfState {
cfg, prs, err := confchange.Changer{
Tracker: r.prs,
LastIndex: r.raftLog.lastIndex(),
}.Simple(pb.ConfChangeSingle{
Type: cc.Type,
NodeID: cc.NodeID,
})
func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState {
cfg, prs, err := func() (tracker.Config, tracker.ProgressMap, error) {
changer := confchange.Changer{
Tracker: r.prs,
LastIndex: r.raftLog.lastIndex(),
}
if cc.LeaveJoint() {
return changer.LeaveJoint()
} else if autoLeave, ok := cc.EnterJoint(); ok {
return changer.EnterJoint(autoLeave, cc.Changes...)
}
return changer.Simple(cc.Changes...)
}()
if err != nil {
// TODO(tbg): return the error to the caller.
panic(err)
}
r.prs.Config = cfg
r.prs.Progress = prs
r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config)
// Now that the configuration is updated, handle any side effects.
cs := pb.ConfState{Nodes: r.prs.VoterNodes(), Learners: r.prs.LearnerNodes()}
cs := pb.ConfState{
Nodes: r.prs.Voters[0].Slice(),
NodesJoint: r.prs.Voters[1].Slice(),
Learners: quorum.MajorityConfig(r.prs.Learners).Slice(),
LearnersNext: quorum.MajorityConfig(r.prs.LearnersNext).Slice(),
AutoLeave: r.prs.AutoLeave,
}
pr, ok := r.prs.Progress[r.id]
// Update whether the node itself is a learner, resetting to false when the

View File

@ -142,7 +142,7 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) {
// Add a follower to the group. Do this in a clandestine way for simplicity.
// Also set up a snapshot that will be sent to the follower.
n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode})
n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2())
s1.snapshot = pb.Snapshot{
Metadata: pb.SnapshotMetadata{
ConfState: pb.ConfState{Nodes: []uint64{1, 2}},

View File

@ -356,8 +356,8 @@ func TestLearnerPromotion(t *testing.T) {
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode})
n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode})
n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2())
n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2())
if n2.isLearner {
t.Error("peer 2 is learner, want not")
}
@ -1143,7 +1143,7 @@ func TestCommit(t *testing.T) {
for j := 0; j < len(tt.matches); j++ {
id := uint64(j) + 1
if id > 1 {
sm.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: id})
sm.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: id}.AsV2())
}
pr := sm.prs.Progress[id]
pr.Match, pr.Next = tt.matches[j], tt.matches[j]+1
@ -1931,7 +1931,7 @@ func TestNonPromotableVoterWithCheckQuorum(t *testing.T) {
nt := newNetwork(a, b)
setRandomizedElectionTimeout(b, b.electionTimeout+1)
// Need to remove 2 again to make it a non-promotable node since newNetwork overwritten some internal states
b.applyConfChange(pb.ConfChange{Type: pb.ConfChangeRemoveNode, NodeID: 2})
b.applyConfChange(pb.ConfChange{Type: pb.ConfChangeRemoveNode, NodeID: 2}.AsV2())
if b.promotable() {
t.Fatalf("promotable = %v, want false", b.promotable())
@ -3086,7 +3086,7 @@ func TestNewLeaderPendingConfig(t *testing.T) {
// TestAddNode tests that addNode could update nodes correctly.
func TestAddNode(t *testing.T) {
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode})
r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2())
nodes := r.prs.VoterNodes()
wnodes := []uint64{1, 2}
if !reflect.DeepEqual(nodes, wnodes) {
@ -3098,7 +3098,7 @@ func TestAddNode(t *testing.T) {
func TestAddLearner(t *testing.T) {
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
// Add new learner peer.
r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode})
r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}.AsV2())
if r.isLearner {
t.Fatal("expected 1 to be voter")
}
@ -3112,13 +3112,13 @@ func TestAddLearner(t *testing.T) {
}
// Promote peer to voter.
r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode})
r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2())
if r.prs.Progress[2].IsLearner {
t.Fatal("expected 2 to be voter")
}
// Demote r.
r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddLearnerNode})
r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddLearnerNode}.AsV2())
if !r.prs.Progress[1].IsLearner {
t.Fatal("expected 1 to be learner")
}
@ -3127,7 +3127,7 @@ func TestAddLearner(t *testing.T) {
}
// Promote r again.
r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddNode})
r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddNode}.AsV2())
if r.prs.Progress[1].IsLearner {
t.Fatal("expected 1 to be voter")
}
@ -3149,7 +3149,7 @@ func TestAddNodeCheckQuorum(t *testing.T) {
r.tick()
}
r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode})
r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2())
// This tick will reach electionTimeout, which triggers a quorum check.
r.tick()
@ -3174,7 +3174,7 @@ func TestAddNodeCheckQuorum(t *testing.T) {
// and removed list correctly.
func TestRemoveNode(t *testing.T) {
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode})
r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode}.AsV2())
w := []uint64{1}
if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
@ -3186,20 +3186,20 @@ func TestRemoveNode(t *testing.T) {
t.Error("did not panic")
}
}()
r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode})
r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode}.AsV2())
}
// TestRemoveLearner tests that removeNode could update nodes and
// and removed list correctly.
func TestRemoveLearner(t *testing.T) {
r := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode})
r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode}.AsV2())
w := []uint64{1}
if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
}
w = []uint64{}
w = nil
if g := r.prs.LearnerNodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
}
@ -3210,7 +3210,7 @@ func TestRemoveLearner(t *testing.T) {
t.Error("did not panic")
}
}()
r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode})
r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode}.AsV2())
}
func TestPromotable(t *testing.T) {
@ -3342,7 +3342,7 @@ func TestCommitAfterRemoveNode(t *testing.T) {
// Apply the config change. This reduces quorum requirements so the
// pending command can now commit.
r.applyConfChange(cc)
r.applyConfChange(cc.AsV2())
ents = nextEnts(r, s)
if len(ents) != 1 || ents[0].Type != pb.EntryNormal ||
string(ents[0].Data) != "hello" {
@ -3591,7 +3591,7 @@ func TestLeaderTransferRemoveNode(t *testing.T) {
t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
}
lead.applyConfChange(pb.ConfChange{NodeID: 3, Type: pb.ConfChangeRemoveNode})
lead.applyConfChange(pb.ConfChange{NodeID: 3, Type: pb.ConfChangeRemoveNode}.AsV2())
checkLeaderTransferState(t, lead, StateLeader, 1)
}
@ -3917,9 +3917,9 @@ func TestPreVoteWithCheckQuorum(t *testing.T) {
// a MsgHup or MsgTimeoutNow.
func TestLearnerCampaign(t *testing.T) {
n1 := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode})
n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}.AsV2())
n2 := newTestRaft(2, []uint64{1}, 10, 1, NewMemoryStorage())
n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode})
n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}.AsV2())
nt := newNetwork(n1, n2)
nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})

105
raft/raftpb/confchange.go Normal file
View File

@ -0,0 +1,105 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package raftpb
import (
"fmt"
"github.com/gogo/protobuf/proto"
)
// ConfChangeI abstracts over ConfChangeV2 and (legacy) ConfChange to allow
// treating them in a unified manner.
type ConfChangeI interface {
AsV2() ConfChangeV2
AsV1() (ConfChange, bool)
}
// MarshalConfChange calls Marshal on the underlying ConfChange or ConfChangeV2
// and returns the result along with the corresponding EntryType.
func MarshalConfChange(c ConfChangeI) (EntryType, []byte, error) {
var typ EntryType
var ccdata []byte
var err error
if ccv1, ok := c.AsV1(); ok {
typ = EntryConfChange
ccdata, err = ccv1.Marshal()
} else {
ccv2 := c.AsV2()
typ = EntryConfChangeV2
ccdata, err = ccv2.Marshal()
}
return typ, ccdata, err
}
// AsV2 returns a V2 configuration change carrying out the same operation.
func (c ConfChange) AsV2() ConfChangeV2 {
return ConfChangeV2{
Changes: []ConfChangeSingle{{
Type: c.Type,
NodeID: c.NodeID,
}},
Context: c.Context,
}
}
// AsV1 returns the ConfChange and true.
func (c ConfChange) AsV1() (ConfChange, bool) {
return c, true
}
// AsV2 is the identity.
func (c ConfChangeV2) AsV2() ConfChangeV2 { return c }
// AsV1 returns ConfChange{} and false.
func (c ConfChangeV2) AsV1() (ConfChange, bool) { return ConfChange{}, false }
// EnterJoint returns two bools. The second bool is true if and only if this
// config change will use Joint Consensus, which is the case if it contains more
// than one change or if the use of Joint Consensus was requested explicitly.
// The first bool can only be true if second one is, and indicates whether the
// Joint State will be left automatically.
func (c *ConfChangeV2) EnterJoint() (autoLeave bool, ok bool) {
// NB: in theory, more config changes could qualify for the "simple"
// protocol but it depends on the config on top of which the changes apply.
// For example, adding two learners is not OK if both nodes are part of the
// base config (i.e. two voters are turned into learners in the process of
// applying the conf change). In practice, these distinctions should not
// matter, so we keep it simple and use Joint Consensus liberally.
if c.Transition != ConfChangeTransitionAuto || len(c.Changes) > 1 {
// Use Joint Consensus.
var autoLeave bool
switch c.Transition {
case ConfChangeTransitionAuto:
autoLeave = true
case ConfChangeTransitionJointImplicit:
autoLeave = true
case ConfChangeTransitionJointExplicit:
default:
panic(fmt.Sprintf("unknown transition: %+v", c))
}
return autoLeave, true
}
return false, false
}
// LeaveJoint is true if the configuration change leaves a joint configuration.
// This is the case if the ConfChangeV2 is zero, with the possible exception of
// the Context field.
func (c *ConfChangeV2) LeaveJoint() bool {
cpy := *c
cpy.Context = nil
return proto.Equal(&cpy, &ConfChangeV2{})
}

View File

@ -91,23 +91,19 @@ func (rn *RawNode) Propose(data []byte) error {
}})
}
// ProposeConfChange proposes a config change.
func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error {
data, err := cc.Marshal()
// ProposeConfChange proposes a config change. See (Node).ProposeConfChange for
// details.
func (rn *RawNode) ProposeConfChange(cc pb.ConfChangeI) error {
m, err := confChangeToMsg(cc)
if err != nil {
return err
}
return rn.raft.Step(pb.Message{
Type: pb.MsgProp,
Entries: []pb.Entry{
{Type: pb.EntryConfChange, Data: data},
},
})
return rn.raft.Step(m)
}
// ApplyConfChange applies a config change to the local node.
func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
cs := rn.raft.applyConfChange(cc)
func (rn *RawNode) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState {
cs := rn.raft.applyConfChange(cc.AsV2())
return &cs
}
@ -159,23 +155,7 @@ func (rn *RawNode) commitReady(rd Ready) {
if !IsEmptyHardState(rd.HardState) {
rn.prevHardSt = rd.HardState
}
// If entries were applied (or a snapshot), update our cursor for
// the next Ready. Note that if the current HardState contains a
// new Commit index, this does not mean that we're also applying
// all of the new entries due to commit pagination by size.
if index := rd.appliedCursor(); index > 0 {
rn.raft.raftLog.appliedTo(index)
}
rn.raft.reduceUncommittedSize(rd.CommittedEntries)
if len(rd.Entries) > 0 {
e := rd.Entries[len(rd.Entries)-1]
rn.raft.raftLog.stableTo(e.Index, e.Term)
}
if !IsEmptySnap(rd.Snapshot) {
rn.raft.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
}
rn.raft.advance(rd)
}
// HasReady called when RawNode user need to check if any Ready pending.

View File

@ -64,7 +64,7 @@ func (a *rawNodeAdapter) ReadIndex(_ context.Context, rctx []byte) error {
}
func (a *rawNodeAdapter) Step(_ context.Context, m pb.Message) error { return a.RawNode.Step(m) }
func (a *rawNodeAdapter) Propose(_ context.Context, data []byte) error { return a.RawNode.Propose(data) }
func (a *rawNodeAdapter) ProposeConfChange(_ context.Context, cc pb.ConfChange) error {
func (a *rawNodeAdapter) ProposeConfChange(_ context.Context, cc pb.ConfChangeI) error {
return a.RawNode.ProposeConfChange(cc)
}
@ -105,65 +105,257 @@ func TestRawNodeStep(t *testing.T) {
// TestNodeStepUnblock from node_test.go has no equivalent in rawNode because there is
// no goroutine in RawNode.
// TestRawNodeProposeAndConfChange ensures that RawNode.Propose and RawNode.ProposeConfChange
// send the given proposal and ConfChange to the underlying raft.
// TestRawNodeProposeAndConfChange tests the configuration change mechanism. Each
// test case sends a configuration change which is either simple or joint, verifies
// that it applies and that the resulting ConfState matches expectations, and for
// joint configurations makes sure that they are exited successfully.
func TestRawNodeProposeAndConfChange(t *testing.T) {
s := NewMemoryStorage()
var err error
rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s))
if err != nil {
t.Fatal(err)
testCases := []struct {
cc pb.ConfChangeI
exp pb.ConfState
exp2 *pb.ConfState
}{
// V1 config change.
{
pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 2},
pb.ConfState{Nodes: []uint64{1, 2}},
nil,
},
// Proposing the same as a V2 change works just the same, without entering
// a joint config.
{
pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
{Type: pb.ConfChangeAddNode, NodeID: 2},
},
},
pb.ConfState{Nodes: []uint64{1, 2}},
nil,
},
// Ditto if we add it as a learner instead.
{
pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
{Type: pb.ConfChangeAddLearnerNode, NodeID: 2},
},
},
pb.ConfState{Nodes: []uint64{1}, Learners: []uint64{2}},
nil,
},
// We can ask explicitly for joint consensus if we want it.
{
pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
{Type: pb.ConfChangeAddLearnerNode, NodeID: 2},
},
Transition: pb.ConfChangeTransitionJointExplicit,
},
pb.ConfState{Nodes: []uint64{1}, NodesJoint: []uint64{1}, Learners: []uint64{2}},
&pb.ConfState{Nodes: []uint64{1}, Learners: []uint64{2}},
},
// Ditto, but with implicit transition (the harness checks this).
{
pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
{Type: pb.ConfChangeAddLearnerNode, NodeID: 2},
},
Transition: pb.ConfChangeTransitionJointImplicit,
},
pb.ConfState{
Nodes: []uint64{1}, NodesJoint: []uint64{1}, Learners: []uint64{2},
AutoLeave: true,
},
&pb.ConfState{Nodes: []uint64{1}, Learners: []uint64{2}},
},
// Add a new node and demote n1. This exercises the interesting case in
// which we really need joint config changes and also need LearnersNext.
{
pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
{NodeID: 2, Type: pb.ConfChangeAddNode},
{NodeID: 1, Type: pb.ConfChangeAddLearnerNode},
{NodeID: 3, Type: pb.ConfChangeAddLearnerNode},
},
},
pb.ConfState{
Nodes: []uint64{2},
NodesJoint: []uint64{1},
Learners: []uint64{3},
LearnersNext: []uint64{1},
AutoLeave: true,
},
&pb.ConfState{Nodes: []uint64{2}, Learners: []uint64{1, 3}},
},
// Ditto explicit.
{
pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
{NodeID: 2, Type: pb.ConfChangeAddNode},
{NodeID: 1, Type: pb.ConfChangeAddLearnerNode},
{NodeID: 3, Type: pb.ConfChangeAddLearnerNode},
},
Transition: pb.ConfChangeTransitionJointExplicit,
},
pb.ConfState{
Nodes: []uint64{2},
NodesJoint: []uint64{1},
Learners: []uint64{3},
LearnersNext: []uint64{1},
},
&pb.ConfState{Nodes: []uint64{2}, Learners: []uint64{1, 3}},
},
// Ditto implicit.
{
pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
{NodeID: 2, Type: pb.ConfChangeAddNode},
{NodeID: 1, Type: pb.ConfChangeAddLearnerNode},
{NodeID: 3, Type: pb.ConfChangeAddLearnerNode},
},
Transition: pb.ConfChangeTransitionJointImplicit,
},
pb.ConfState{
Nodes: []uint64{2},
NodesJoint: []uint64{1},
Learners: []uint64{3},
LearnersNext: []uint64{1},
AutoLeave: true,
},
&pb.ConfState{Nodes: []uint64{2}, Learners: []uint64{1, 3}},
},
}
rawNode.Campaign()
proposed := false
var (
lastIndex uint64
ccdata []byte
)
for {
rd := rawNode.Ready()
s.Append(rd.Entries)
rawNode.Advance(rd)
// Once we are the leader, propose a command and a ConfChange.
if !proposed && rd.SoftState.Lead == rawNode.raft.id {
if err = rawNode.Propose([]byte("somedata")); err != nil {
t.Fatal(err)
}
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 1}
ccdata, err = cc.Marshal()
for _, tc := range testCases {
t.Run("", func(t *testing.T) {
s := NewMemoryStorage()
rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s))
if err != nil {
t.Fatal(err)
}
rawNode.ProposeConfChange(cc)
proposed = true
} else if proposed {
// We proposed last cycle, which means we appended the conf change
// in this cycle.
rawNode.Campaign()
proposed := false
var (
lastIndex uint64
ccdata []byte
)
// Propose the ConfChange, wait until it applies, save the resulting
// ConfState.
var cs *pb.ConfState
for cs == nil {
rd := rawNode.Ready()
s.Append(rd.Entries)
for _, ent := range rd.CommittedEntries {
var cc pb.ConfChangeI
if ent.Type == pb.EntryConfChange {
var ccc pb.ConfChange
if err = ccc.Unmarshal(ent.Data); err != nil {
t.Fatal(err)
}
cc = ccc
} else if ent.Type == pb.EntryConfChangeV2 {
var ccc pb.ConfChangeV2
if err = ccc.Unmarshal(ent.Data); err != nil {
t.Fatal(err)
}
cc = ccc
}
if cc != nil {
cs = rawNode.ApplyConfChange(cc)
}
}
rawNode.Advance(rd)
// Once we are the leader, propose a command and a ConfChange.
if !proposed && rd.SoftState.Lead == rawNode.raft.id {
if err = rawNode.Propose([]byte("somedata")); err != nil {
t.Fatal(err)
}
if ccv1, ok := tc.cc.AsV1(); ok {
ccdata, err = ccv1.Marshal()
if err != nil {
t.Fatal(err)
}
rawNode.ProposeConfChange(ccv1)
} else {
ccv2 := tc.cc.AsV2()
ccdata, err = ccv2.Marshal()
if err != nil {
t.Fatal(err)
}
rawNode.ProposeConfChange(ccv2)
}
proposed = true
}
}
// Check that the last index is exactly the conf change we put in,
// down to the bits.
lastIndex, err = s.LastIndex()
if err != nil {
t.Fatal(err)
}
break
}
}
entries, err := s.Entries(lastIndex-1, lastIndex+1, noLimit)
if err != nil {
t.Fatal(err)
}
if len(entries) != 2 {
t.Fatalf("len(entries) = %d, want %d", len(entries), 2)
}
if !bytes.Equal(entries[0].Data, []byte("somedata")) {
t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata"))
}
if entries[1].Type != pb.EntryConfChange {
t.Fatalf("type = %v, want %v", entries[1].Type, pb.EntryConfChange)
}
if !bytes.Equal(entries[1].Data, ccdata) {
t.Errorf("data = %v, want %v", entries[1].Data, ccdata)
entries, err := s.Entries(lastIndex-1, lastIndex+1, noLimit)
if err != nil {
t.Fatal(err)
}
if len(entries) != 2 {
t.Fatalf("len(entries) = %d, want %d", len(entries), 2)
}
if !bytes.Equal(entries[0].Data, []byte("somedata")) {
t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata"))
}
typ := pb.EntryConfChange
if _, ok := tc.cc.AsV1(); !ok {
typ = pb.EntryConfChangeV2
}
if entries[1].Type != typ {
t.Fatalf("type = %v, want %v", entries[1].Type, typ)
}
if !bytes.Equal(entries[1].Data, ccdata) {
t.Errorf("data = %v, want %v", entries[1].Data, ccdata)
}
if exp := &tc.exp; !reflect.DeepEqual(exp, cs) {
t.Fatalf("exp:\n%+v\nact:\n%+v", exp, cs)
}
if exp, act := lastIndex, rawNode.raft.pendingConfIndex; exp != act {
t.Fatalf("pendingConfIndex: expected %d, got %d", exp, act)
}
// Move the RawNode along. If the ConfChange was simple, nothing else
// should happen. Otherwise, we're in a joint state, which is either
// left automatically or not. If not, we add the proposal that leaves
// it manually.
rd := rawNode.Ready()
var context []byte
if !tc.exp.AutoLeave {
if len(rd.Entries) > 0 {
t.Fatal("expected no more entries")
}
if tc.exp2 == nil {
return
}
context = []byte("manual")
t.Log("leaving joint state manually")
if err := rawNode.ProposeConfChange(pb.ConfChangeV2{Context: context}); err != nil {
t.Fatal(err)
}
rd = rawNode.Ready()
}
// Check that the right ConfChange comes out.
if len(rd.Entries) != 1 || rd.Entries[0].Type != pb.EntryConfChangeV2 {
t.Fatalf("expected exactly one more entry, got %+v", rd)
}
var cc pb.ConfChangeV2
if err := cc.Unmarshal(rd.Entries[0].Data); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(cc, pb.ConfChangeV2{Context: context}) {
t.Fatalf("expected zero ConfChangeV2, got %+v", cc)
}
// Lie and pretend the ConfChange applied. It won't do so because now
// we require the joint quorum and we're only running one node.
cs = rawNode.ApplyConfChange(cc)
if exp := tc.exp2; !reflect.DeepEqual(exp, cs) {
t.Fatalf("exp:\n%+v\nact:\n%+v", exp, cs)
}
})
}
}

View File

@ -25,6 +25,11 @@ import (
// Config reflects the configuration tracked in a ProgressTracker.
type Config struct {
Voters quorum.JointConfig
// AutoLeave is true if the configuration is joint and a transition to the
// incoming configuration should be carried out automatically by Raft when
// this is possible. If false, the configuration will be joint until the
// application initiates the transition manually.
AutoLeave bool
// Learners is a set of IDs corresponding to the learners active in the
// current configuration.
//
@ -80,6 +85,9 @@ func (c Config) String() string {
if c.LearnersNext != nil {
fmt.Fprintf(&buf, " learners_next=%s", quorum.MajorityConfig(c.LearnersNext).String())
}
if c.AutoLeave {
fmt.Fprintf(&buf, " autoleave")
}
return buf.String()
}
@ -192,6 +200,9 @@ func (p *ProgressTracker) VoterNodes() []uint64 {
// LearnerNodes returns a sorted slice of learners.
func (p *ProgressTracker) LearnerNodes() []uint64 {
if len(p.Learners) == 0 {
return nil
}
nodes := make([]uint64, 0, len(p.Learners))
for id := range p.Learners {
nodes = append(nodes, id)