From c62b7048b5229aa6f7aa84c98b44703bdf5e1b1a Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Mon, 15 Jul 2019 17:18:56 +0200 Subject: [PATCH 1/4] raft: use RawNode for node's event loop It has always bugged me that any new feature essentially needed to be tested twice due to the two ways in which apps can use raft (`*node` and `*RawNode`). Due to upcoming testing work for joint consensus, now is a good time to rectify this somewhat. This commit removes most logic from `(*node).run` and uses `*RawNode` internally. This simplifies the logic and also lead (via debugging) to some insight on how the semantics of the approaches differ, which is now documented in the comments. --- raft/node.go | 116 ++++++++----------------------------- raft/node_bench_test.go | 4 +- raft/node_test.go | 88 +++++++++++++++++----------- raft/raft_test.go | 9 +++ raft/rawnode.go | 123 ++++++++++++++++++++++++++-------------- 5 files changed, 173 insertions(+), 167 deletions(-) diff --git a/raft/node.go b/raft/node.go index 4a3b2f1df..e01a7167a 100644 --- a/raft/node.go +++ b/raft/node.go @@ -198,51 +198,15 @@ type Peer struct { // StartNode returns a new Node given configuration and a list of raft peers. // It appends a ConfChangeAddNode entry for each given peer to the initial log. func StartNode(c *Config, peers []Peer) Node { - r := newRaft(c) - // become the follower at term 1 and apply initial configuration - // entries of term 1 - r.becomeFollower(1, None) - for _, peer := range peers { - cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} - d, err := cc.Marshal() - if err != nil { - panic("unexpected marshal error") - } - // TODO(tbg): this should append the ConfChange for the own node first - // and also call applyConfChange below for that node first. Otherwise - // we have a Raft group (for a little while) that doesn't have itself - // in its config, which is bad. - // This whole way of setting things up is rickety. The app should just - // populate the initial ConfState appropriately and then all of this - // goes away. - e := pb.Entry{ - Type: pb.EntryConfChange, - Term: 1, - Index: r.raftLog.lastIndex() + 1, - Data: d, - } - r.raftLog.append(e) - } - // Mark these initial entries as committed. - // TODO(bdarnell): These entries are still unstable; do we need to preserve - // the invariant that committed < unstable? - r.raftLog.committed = r.raftLog.lastIndex() - // Now apply them, mainly so that the application can call Campaign - // immediately after StartNode in tests. Note that these nodes will - // be added to raft twice: here and when the application's Ready - // loop calls ApplyConfChange. The calls to addNode must come after - // all calls to raftLog.append so progress.next is set after these - // bootstrapping entries (it is an error if we try to append these - // entries since they have already been committed). - // We do not set raftLog.applied so the application will be able - // to observe all conf changes via Ready.CommittedEntries. - for _, peer := range peers { - r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}) + rn, err := NewRawNode(c, peers) + if err != nil { + panic(err) } n := newNode() n.logger = c.Logger - go n.run(r) + + go n.run(rn) return &n } @@ -251,12 +215,7 @@ func StartNode(c *Config, peers []Peer) Node { // If the caller has an existing state machine, pass in the last log index that // has been applied to it; otherwise use zero. func RestartNode(c *Config) Node { - r := newRaft(c) - - n := newNode() - n.logger = c.Logger - go n.run(r) - return &n + return StartNode(c, nil) } type msgWithResult struct { @@ -310,30 +269,30 @@ func (n *node) Stop() { <-n.done } -func (n *node) run(r *raft) { +func (n *node) run(rn *RawNode) { var propc chan msgWithResult var readyc chan Ready var advancec chan struct{} - var prevLastUnstablei, prevLastUnstablet uint64 - var havePrevLastUnstablei bool - var prevSnapi uint64 - var applyingToI uint64 var rd Ready + r := rn.raft + lead := None - prevSoftSt := r.softState() - prevHardSt := emptyState for { if advancec != nil { readyc = nil - } else { - rd = newReady(r, prevSoftSt, prevHardSt) - if rd.containsUpdates() { - readyc = n.readyc - } else { - readyc = nil - } + } else if rn.HasReady() { + // Populate a Ready. Note that this Ready is not guaranteed to + // actually be handled. We will arm readyc, but there's no guarantee + // that we will actually send on it. It's possible that we will + // service another channel instead, loop around, and then populate + // the Ready again. We could instead force the previous Ready to be + // handled first, but it's generally good to emit larger Readys plus + // it simplifies testing (by emitting less frequently and more + // predictably). + rd = rn.Ready() + readyc = n.readyc } if lead != r.lead { @@ -382,40 +341,13 @@ func (n *node) run(r *raft) { case <-n.done: } case <-n.tickc: - r.tick() + rn.Tick() case readyc <- rd: - if rd.SoftState != nil { - prevSoftSt = rd.SoftState - } - if len(rd.Entries) > 0 { - prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index - prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term - havePrevLastUnstablei = true - } - if !IsEmptyHardState(rd.HardState) { - prevHardSt = rd.HardState - } - if !IsEmptySnap(rd.Snapshot) { - prevSnapi = rd.Snapshot.Metadata.Index - } - if index := rd.appliedCursor(); index != 0 { - applyingToI = index - } - - r.msgs = nil - r.readStates = nil - r.reduceUncommittedSize(rd.CommittedEntries) + rn.acceptReady(rd) advancec = n.advancec case <-advancec: - if applyingToI != 0 { - r.raftLog.appliedTo(applyingToI) - applyingToI = 0 - } - if havePrevLastUnstablei { - r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet) - havePrevLastUnstablei = false - } - r.raftLog.stableSnapTo(prevSnapi) + rn.commitReady(rd) + rd = Ready{} advancec = nil case c := <-n.status: c <- getStatus(r) diff --git a/raft/node_bench_test.go b/raft/node_bench_test.go index c3fc89f3f..d0ecdc519 100644 --- a/raft/node_bench_test.go +++ b/raft/node_bench_test.go @@ -26,8 +26,8 @@ func BenchmarkOneNode(b *testing.B) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - go n.run(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + go n.run(rn) defer n.Stop() diff --git a/raft/node_test.go b/raft/node_test.go index 641a5ca2f..29e35b2ed 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -132,9 +132,12 @@ func TestNodePropose(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - go n.run(r) - n.Campaign(context.TODO()) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + r := rn.raft + go n.run(rn) + if err := n.Campaign(context.TODO()); err != nil { + t.Fatal(err) + } for { rd := <-n.Ready() s.Append(rd.Entries) @@ -172,10 +175,11 @@ func TestNodeReadIndex(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + r := rn.raft r.readStates = wrs - go n.run(r) + go n.run(rn) n.Campaign(context.TODO()) for { rd := <-n.Ready() @@ -309,8 +313,9 @@ func TestNodeProposeConfig(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - go n.run(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + r := rn.raft + go n.run(rn) n.Campaign(context.TODO()) for { rd := <-n.Ready() @@ -347,8 +352,8 @@ func TestNodeProposeConfig(t *testing.T) { func TestNodeProposeAddDuplicateNode(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - go n.run(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + go n.run(rn) n.Campaign(context.TODO()) rdyEntries := make([]raftpb.Entry, 0) ticker := time.NewTicker(time.Millisecond * 100) @@ -422,8 +427,8 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) { // who is the current leader. func TestBlockProposal(t *testing.T) { n := newNode() - r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) - go n.run(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, NewMemoryStorage()) + go n.run(rn) defer n.Stop() errc := make(chan error, 1) @@ -463,8 +468,9 @@ func TestNodeProposeWaitDropped(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - go n.run(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + r := rn.raft + go n.run(rn) n.Campaign(context.TODO()) for { rd := <-n.Ready() @@ -497,8 +503,9 @@ func TestNodeProposeWaitDropped(t *testing.T) { func TestNodeTick(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - go n.run(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + r := rn.raft + go n.run(rn) elapsed := r.electionElapsed n.Tick() @@ -517,11 +524,11 @@ func TestNodeTick(t *testing.T) { func TestNodeStop(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) donec := make(chan struct{}) go func() { - n.run(r) + n.run(rn) close(donec) }() @@ -618,7 +625,9 @@ func TestNodeStart(t *testing.T) { n.Advance() } - n.Campaign(ctx) + if err := n.Campaign(ctx); err != nil { + t.Fatal(err) + } rd := <-n.Ready() storage.Append(rd.Entries) n.Advance() @@ -646,10 +655,12 @@ func TestNodeRestart(t *testing.T) { st := raftpb.HardState{Term: 1, Commit: 1} want := Ready{ - HardState: st, + // No HardState is emitted because there was no change. + HardState: raftpb.HardState{}, // commit up to index commit index in st CommittedEntries: entries[:st.Commit], - MustSync: true, + // MustSync is false because no HardState or new entries are provided. + MustSync: false, } storage := NewMemoryStorage() @@ -691,10 +702,14 @@ func TestNodeRestartFromSnapshot(t *testing.T) { st := raftpb.HardState{Term: 1, Commit: 3} want := Ready{ - HardState: st, + // No HardState is emitted because nothing changed relative to what is + // already persisted. + HardState: raftpb.HardState{}, // commit up to index commit index in st CommittedEntries: entries, - MustSync: true, + // MustSync is only true when there is a new HardState or new entries; + // neither is the case here. + MustSync: false, } s := NewMemoryStorage() @@ -800,8 +815,8 @@ func TestNodeProposeAddLearnerNode(t *testing.T) { defer ticker.Stop() n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - go n.run(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + go n.run(rn) n.Campaign(context.TODO()) stop := make(chan struct{}) done := make(chan struct{}) @@ -895,9 +910,12 @@ func TestCommitPagination(t *testing.T) { s := NewMemoryStorage() cfg := newTestConfig(1, []uint64{1}, 10, 1, s) cfg.MaxCommittedSizePerReady = 2048 - r := newRaft(cfg) + rn, err := NewRawNode(cfg, nil) + if err != nil { + t.Fatal(err) + } n := newNode() - go n.run(r) + go n.run(rn) n.Campaign(context.TODO()) rd := readyWithTimeout(&n) @@ -984,9 +1002,12 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) { // this and *will* return it (which is how the Commit index ended up being 10 initially). cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1 - r := newRaft(cfg) + rn, err := NewRawNode(cfg, nil) + if err != nil { + t.Fatal(err) + } n := newNode() - go n.run(r) + go n.run(rn) defer n.Stop() rd := readyWithTimeout(&n) @@ -1011,9 +1032,12 @@ func TestNodeBoundedLogGrowthWithPartition(t *testing.T) { s := NewMemoryStorage() cfg := newTestConfig(1, []uint64{1}, 10, 1, s) cfg.MaxUncommittedEntriesSize = maxEntrySize - r := newRaft(cfg) + rn, err := NewRawNode(cfg, nil) + if err != nil { + t.Fatal(err) + } n := newNode() - go n.run(r) + go n.run(rn) defer n.Stop() n.Campaign(context.TODO()) @@ -1028,14 +1052,14 @@ func TestNodeBoundedLogGrowthWithPartition(t *testing.T) { // committing anything. These proposals should not cause the leader's // log to grow indefinitely. for i := 0; i < 1024; i++ { - n.Propose(context.TODO(), data) + _ = n.Propose(context.TODO(), data) } // Check the size of leader's uncommitted log tail. It should not exceed the // MaxUncommittedEntriesSize limit. checkUncommitted := func(exp uint64) { t.Helper() - if a := r.uncommittedSize; exp != a { + if a := rn.raft.uncommittedSize; exp != a { t.Fatalf("expected %d uncommitted entry bytes, found %d", exp, a) } } diff --git a/raft/raft_test.go b/raft/raft_test.go index fc27fee11..b42db1522 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -4310,3 +4310,12 @@ func newTestLearnerRaft(id uint64, peers []uint64, learners []uint64, election, cfg.learners = learners return newRaft(cfg) } + +func newTestRawNode(id uint64, peers []uint64, election, heartbeat int, storage Storage) *RawNode { + cfg := newTestConfig(id, peers, election, heartbeat, storage) + rn, err := NewRawNode(cfg, nil) + if err != nil { + panic(err) + } + return rn +} diff --git a/raft/rawnode.go b/raft/rawnode.go index c2bc7c132..378a931ab 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -37,38 +37,6 @@ type RawNode struct { prevHardSt pb.HardState } -func (rn *RawNode) newReady() Ready { - return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt) -} - -func (rn *RawNode) commitReady(rd Ready) { - if rd.SoftState != nil { - rn.prevSoftSt = rd.SoftState - } - if !IsEmptyHardState(rd.HardState) { - rn.prevHardSt = rd.HardState - } - - // If entries were applied (or a snapshot), update our cursor for - // the next Ready. Note that if the current HardState contains a - // new Commit index, this does not mean that we're also applying - // all of the new entries due to commit pagination by size. - if index := rd.appliedCursor(); index > 0 { - rn.raft.raftLog.appliedTo(index) - } - - if len(rd.Entries) > 0 { - e := rd.Entries[len(rd.Entries)-1] - rn.raft.raftLog.stableTo(e.Index, e.Term) - } - if !IsEmptySnap(rd.Snapshot) { - rn.raft.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index) - } - if len(rd.ReadStates) != 0 { - rn.raft.readStates = nil - } -} - // NewRawNode returns a new RawNode given configuration and a list of raft peers. func NewRawNode(config *Config, peers []Peer) (*RawNode, error) { if config.ID == 0 { @@ -78,27 +46,48 @@ func NewRawNode(config *Config, peers []Peer) (*RawNode, error) { rn := &RawNode{ raft: r, } - lastIndex, err := config.Storage.LastIndex() + if err := rn.init(peers); err != nil { + return nil, err + } + return rn, nil +} + +func (rn *RawNode) init(peers []Peer) error { + r := rn.raft + lastIndex, err := rn.raft.raftLog.storage.LastIndex() if err != nil { - panic(err) // TODO(bdarnell) + return err } // If the log is empty, this is a new RawNode (like StartNode); otherwise it's // restoring an existing RawNode (like RestartNode). // TODO(bdarnell): rethink RawNode initialization and whether the application needs // to be able to tell us when it expects the RawNode to exist. if lastIndex == 0 { - r.becomeFollower(1, None) + rn.raft.becomeFollower(1, None) ents := make([]pb.Entry, len(peers)) for i, peer := range peers { cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} data, err := cc.Marshal() if err != nil { - panic("unexpected marshal error") + return err } ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data} } - r.raftLog.append(ents...) + rn.raft.raftLog.append(ents...) + + // Now apply them, mainly so that the application can call Campaign + // immediately after StartNode in tests. Note that these nodes will + // be added to raft twice: here and when the application's Ready + // loop calls ApplyConfChange. The calls to addNode must come after + // all calls to raftLog.append so progress.next is set after these + // bootstrapping entries (it is an error if we try to append these + // entries since they have already been committed). + // We do not set raftLog.applied so the application will be able + // to observe all conf changes via Ready.CommittedEntries. + // + // TODO(bdarnell): These entries are still unstable; do we need to preserve + // the invariant that committed < unstable? r.raftLog.committed = uint64(len(ents)) for _, peer := range peers { r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}) @@ -113,7 +102,7 @@ func NewRawNode(config *Config, peers []Peer) (*RawNode, error) { rn.prevHardSt = r.hardState() } - return rn, nil + return nil } // Tick advances the internal logical clock by a single tick. @@ -182,14 +171,61 @@ func (rn *RawNode) Step(m pb.Message) error { return ErrStepPeerNotFound } -// Ready returns the current point-in-time state of this RawNode. +// Ready returns the outstanding work that the application needs to handle. This +// includes appending and applying entries or a snapshot, updating the HardState, +// and sending messages. Ready() is a read-only operation, that is, it does not +// require the caller to actually handle the result. Typically, a caller will +// want to handle the Ready and must pass the Ready to Advance *after* having +// done so. While a Ready is being handled, the RawNode must not be used for +// operations that may alter its state. For example, it is illegal to call +// Ready, followed by Step, followed by Advance. func (rn *RawNode) Ready() Ready { rd := rn.newReady() - rn.raft.msgs = nil - rn.raft.reduceUncommittedSize(rd.CommittedEntries) return rd } +func (rn *RawNode) newReady() Ready { + return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt) +} + +// acceptReady is called when the consumer of the RawNode has decided to go +// ahead and handle a Ready. Nothing must alter the state of the RawNode between +// this call and the prior call to Ready(). +func (rn *RawNode) acceptReady(rd Ready) { + if rd.SoftState != nil { + rn.prevSoftSt = rd.SoftState + } + if len(rd.ReadStates) != 0 { + rn.raft.readStates = nil + } + rn.raft.msgs = nil +} + +// commitReady is called when the consumer of the RawNode has successfully +// handled a Ready (having previously called acceptReady). +func (rn *RawNode) commitReady(rd Ready) { + if !IsEmptyHardState(rd.HardState) { + rn.prevHardSt = rd.HardState + } + + // If entries were applied (or a snapshot), update our cursor for + // the next Ready. Note that if the current HardState contains a + // new Commit index, this does not mean that we're also applying + // all of the new entries due to commit pagination by size. + if index := rd.appliedCursor(); index > 0 { + rn.raft.raftLog.appliedTo(index) + } + rn.raft.reduceUncommittedSize(rd.CommittedEntries) + + if len(rd.Entries) > 0 { + e := rd.Entries[len(rd.Entries)-1] + rn.raft.raftLog.stableTo(e.Index, e.Term) + } + if !IsEmptySnap(rd.Snapshot) { + rn.raft.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index) + } +} + // HasReady called when RawNode user need to check if any Ready pending. // Checking logic in this method should be consistent with Ready.containsUpdates(). func (rn *RawNode) HasReady() bool { @@ -215,6 +251,11 @@ func (rn *RawNode) HasReady() bool { // Advance notifies the RawNode that the application has applied and saved progress in the // last Ready results. func (rn *RawNode) Advance(rd Ready) { + // Advance combines accept and commit. Callers can't mutate the RawNode + // between the call to Ready and the matching call to Advance, or the work + // done in acceptReady will clobber potentially newer data that has not been + // emitted in a Ready yet. + rn.acceptReady(rd) rn.commitReady(rd) } From c9491d7861b40c271f634d1d6b57804755cd7e79 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Thu, 18 Jul 2019 13:41:52 +0200 Subject: [PATCH 2/4] raft: clean up bootstrap This is the first (maybe not last) step in cleaning up the bootstrap code around StartNode. Initializing a Raft group for the first time is awkward, since a configuration has to be pulled from thin air. The way this is solved today is unclean: The app is supposed to pass peers to StartNode(), we add configuration changes for them to the log, immediately pretend that they are applied, but actually leave them unapplied (to give the app a chance to observe them, though if the app did decide to not apply them things would really go off the rails), and then return control to the app. The app will then process the initial Readys and as a result the configuration will be persisted to disk; restarts of the node then use RestartNode which doesn't take any peers. The code that did this lived awkwardly in two places fairly deep down the callstack, though it was really only necessary in StartNode(). This commit refactors things to make this more obvious: only StartNode does this dance now. In particular, RawNode does not support this at all any more; it expects the app to set up its Storage correctly. Future work may provide helpers to make this "preseeding" of the Storage more user-friendly. It isn't entirely straightforward to do so since the Storage interface doesn't provide the right accessors for this purpose. Briefly speaking, we want to make sure that a non-bootstrapped node can never catch up via the log so that we can implicitly use one of the "skipped" log entries to represent the configuration change into the bootstrap configuration. This is an invasive change that affects all consumers of raft, and it is of lower urgency since the code (post this commit) already encapsulates the complexity sufficiently. --- etcdserver/raft.go | 6 +- raft/node.go | 62 +++++++++- raft/node_test.go | 6 +- raft/raft_test.go | 4 +- raft/rawnode.go | 62 +--------- raft/rawnode_test.go | 285 ++++++++++++++++++++++++++----------------- 6 files changed, 245 insertions(+), 180 deletions(-) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 6f42c3504..75845d772 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -479,7 +479,11 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id } } - n = raft.StartNode(c, peers) + if len(peers) == 0 { + n = raft.RestartNode(c) + } else { + n = raft.StartNode(c, peers) + } raftStatusMu.Lock() raftStatus = n.Status raftStatusMu.Unlock() diff --git a/raft/node.go b/raft/node.go index e01a7167a..7c0d154bf 100644 --- a/raft/node.go +++ b/raft/node.go @@ -197,12 +197,63 @@ type Peer struct { // StartNode returns a new Node given configuration and a list of raft peers. // It appends a ConfChangeAddNode entry for each given peer to the initial log. +// +// Peers must not be zero length; call RestartNode in that case. func StartNode(c *Config, peers []Peer) Node { - rn, err := NewRawNode(c, peers) + if len(peers) == 0 { + panic("no peers given; use RestartNode instead") + } + rn, err := NewRawNode(c) if err != nil { panic(err) } + lastIndex, err := rn.raft.raftLog.storage.LastIndex() + if err != nil { + panic(err) + } + + if lastIndex != 0 { + panic("can't StartNode on a nonempty Storage") + } + + // We've faked out initial entries above, but nothing has been + // persisted. Start with an empty HardState (thus the first Ready will + // emit a HardState update for the app to persist). + rn.prevHardSt = emptyState + + // TODO(tbg): remove StartNode and give the application the right tools to + // bootstrap the initial membership in a cleaner way. + rn.raft.becomeFollower(1, None) + ents := make([]pb.Entry, len(peers)) + for i, peer := range peers { + cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} + data, err := cc.Marshal() + if err != nil { + panic(err) + } + + ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data} + } + rn.raft.raftLog.append(ents...) + + // Now apply them, mainly so that the application can call Campaign + // immediately after StartNode in tests. Note that these nodes will + // be added to raft twice: here and when the application's Ready + // loop calls ApplyConfChange. The calls to addNode must come after + // all calls to raftLog.append so progress.next is set after these + // bootstrapping entries (it is an error if we try to append these + // entries since they have already been committed). + // We do not set raftLog.applied so the application will be able + // to observe all conf changes via Ready.CommittedEntries. + // + // TODO(bdarnell): These entries are still unstable; do we need to preserve + // the invariant that committed < unstable? + rn.raft.raftLog.committed = uint64(len(ents)) + for _, peer := range peers { + rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}) + } + n := newNode() n.logger = c.Logger @@ -215,7 +266,14 @@ func StartNode(c *Config, peers []Peer) Node { // If the caller has an existing state machine, pass in the last log index that // has been applied to it; otherwise use zero. func RestartNode(c *Config) Node { - return StartNode(c, nil) + rn, err := NewRawNode(c) + if err != nil { + panic(err) + } + n := newNode() + n.logger = c.Logger + go n.run(rn) + return &n } type msgWithResult struct { diff --git a/raft/node_test.go b/raft/node_test.go index 29e35b2ed..8ee9453d1 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -910,7 +910,7 @@ func TestCommitPagination(t *testing.T) { s := NewMemoryStorage() cfg := newTestConfig(1, []uint64{1}, 10, 1, s) cfg.MaxCommittedSizePerReady = 2048 - rn, err := NewRawNode(cfg, nil) + rn, err := NewRawNode(cfg) if err != nil { t.Fatal(err) } @@ -1002,7 +1002,7 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) { // this and *will* return it (which is how the Commit index ended up being 10 initially). cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1 - rn, err := NewRawNode(cfg, nil) + rn, err := NewRawNode(cfg) if err != nil { t.Fatal(err) } @@ -1032,7 +1032,7 @@ func TestNodeBoundedLogGrowthWithPartition(t *testing.T) { s := NewMemoryStorage() cfg := newTestConfig(1, []uint64{1}, 10, 1, s) cfg.MaxUncommittedEntriesSize = maxEntrySize - rn, err := NewRawNode(cfg, nil) + rn, err := NewRawNode(cfg) if err != nil { t.Fatal(err) } diff --git a/raft/raft_test.go b/raft/raft_test.go index b42db1522..1e2d0e2af 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -4311,9 +4311,11 @@ func newTestLearnerRaft(id uint64, peers []uint64, learners []uint64, election, return newRaft(cfg) } +// newTestRawNode sets up a RawNode with the given peers. The configuration will +// not be reflected in the Storage. func newTestRawNode(id uint64, peers []uint64, election, heartbeat int, storage Storage) *RawNode { cfg := newTestConfig(id, peers, election, heartbeat, storage) - rn, err := NewRawNode(cfg, nil) + rn, err := NewRawNode(cfg) if err != nil { panic(err) } diff --git a/raft/rawnode.go b/raft/rawnode.go index 378a931ab..ff6faf2ab 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -37,8 +37,8 @@ type RawNode struct { prevHardSt pb.HardState } -// NewRawNode returns a new RawNode given configuration and a list of raft peers. -func NewRawNode(config *Config, peers []Peer) (*RawNode, error) { +// NewRawNode instantiates a RawNode from the given configuration. +func NewRawNode(config *Config) (*RawNode, error) { if config.ID == 0 { panic("config.ID must not be zero") } @@ -46,63 +46,9 @@ func NewRawNode(config *Config, peers []Peer) (*RawNode, error) { rn := &RawNode{ raft: r, } - if err := rn.init(peers); err != nil { - return nil, err - } - return rn, nil -} - -func (rn *RawNode) init(peers []Peer) error { - r := rn.raft - lastIndex, err := rn.raft.raftLog.storage.LastIndex() - if err != nil { - return err - } - // If the log is empty, this is a new RawNode (like StartNode); otherwise it's - // restoring an existing RawNode (like RestartNode). - // TODO(bdarnell): rethink RawNode initialization and whether the application needs - // to be able to tell us when it expects the RawNode to exist. - if lastIndex == 0 { - rn.raft.becomeFollower(1, None) - ents := make([]pb.Entry, len(peers)) - for i, peer := range peers { - cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} - data, err := cc.Marshal() - if err != nil { - return err - } - - ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data} - } - rn.raft.raftLog.append(ents...) - - // Now apply them, mainly so that the application can call Campaign - // immediately after StartNode in tests. Note that these nodes will - // be added to raft twice: here and when the application's Ready - // loop calls ApplyConfChange. The calls to addNode must come after - // all calls to raftLog.append so progress.next is set after these - // bootstrapping entries (it is an error if we try to append these - // entries since they have already been committed). - // We do not set raftLog.applied so the application will be able - // to observe all conf changes via Ready.CommittedEntries. - // - // TODO(bdarnell): These entries are still unstable; do we need to preserve - // the invariant that committed < unstable? - r.raftLog.committed = uint64(len(ents)) - for _, peer := range peers { - r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}) - } - } - - // Set the initial hard and soft states after performing all initialization. rn.prevSoftSt = r.softState() - if lastIndex == 0 { - rn.prevHardSt = emptyState - } else { - rn.prevHardSt = r.hardState() - } - - return nil + rn.prevHardSt = r.hardState() + return rn, nil } // Tick advances the internal logical clock by a single tick. diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 3a6c392cc..373863876 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -18,11 +18,12 @@ import ( "bytes" "context" "fmt" + "math" "reflect" "testing" "go.etcd.io/etcd/raft/quorum" - "go.etcd.io/etcd/raft/raftpb" + pb "go.etcd.io/etcd/raft/raftpb" "go.etcd.io/etcd/raft/tracker" ) @@ -61,28 +62,43 @@ func (a *rawNodeAdapter) ReadIndex(_ context.Context, rctx []byte) error { // RawNode swallowed the error in ReadIndex, it probably should not do that. return nil } -func (a *rawNodeAdapter) Step(_ context.Context, m raftpb.Message) error { return a.RawNode.Step(m) } -func (a *rawNodeAdapter) Propose(_ context.Context, data []byte) error { return a.RawNode.Propose(data) } -func (a *rawNodeAdapter) ProposeConfChange(_ context.Context, cc raftpb.ConfChange) error { +func (a *rawNodeAdapter) Step(_ context.Context, m pb.Message) error { return a.RawNode.Step(m) } +func (a *rawNodeAdapter) Propose(_ context.Context, data []byte) error { return a.RawNode.Propose(data) } +func (a *rawNodeAdapter) ProposeConfChange(_ context.Context, cc pb.ConfChange) error { return a.RawNode.ProposeConfChange(cc) } // TestRawNodeStep ensures that RawNode.Step ignore local message. func TestRawNodeStep(t *testing.T) { - for i, msgn := range raftpb.MessageType_name { - s := NewMemoryStorage() - rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}}) - if err != nil { - t.Fatal(err) - } - msgt := raftpb.MessageType(i) - err = rawNode.Step(raftpb.Message{Type: msgt}) - // LocalMsg should be ignored. - if IsLocalMsg(msgt) { - if err != ErrStepLocalMsg { - t.Errorf("%d: step should ignore %s", msgt, msgn) + for i, msgn := range pb.MessageType_name { + t.Run(msgn, func(t *testing.T) { + s := NewMemoryStorage() + s.SetHardState(pb.HardState{Term: 1, Commit: 1}) + s.Append([]pb.Entry{{Term: 1, Index: 1}}) + if err := s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{ + ConfState: pb.ConfState{ + Nodes: []uint64{1}, + }, + Index: 1, + Term: 1, + }}); err != nil { + t.Fatal(err) } - } + // Append an empty entry to make sure the non-local messages (like + // vote requests) are ignored and don't trigger assertions. + rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s)) + if err != nil { + t.Fatal(err) + } + msgt := pb.MessageType(i) + err = rawNode.Step(pb.Message{Type: msgt}) + // LocalMsg should be ignored. + if IsLocalMsg(msgt) { + if err != ErrStepLocalMsg { + t.Errorf("%d: step should ignore %s", msgt, msgn) + } + } + }) } } @@ -94,17 +110,10 @@ func TestRawNodeStep(t *testing.T) { func TestRawNodeProposeAndConfChange(t *testing.T) { s := NewMemoryStorage() var err error - rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}}) + rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s)) if err != nil { t.Fatal(err) } - rd := rawNode.Ready() - s.Append(rd.Entries) - rawNode.Advance(rd) - - if d := rawNode.Ready(); d.MustSync || !IsEmptyHardState(d.HardState) || len(d.Entries) > 0 { - t.Fatalf("expected empty hard state with must-sync=false: %#v", d) - } rawNode.Campaign() proposed := false @@ -113,13 +122,15 @@ func TestRawNodeProposeAndConfChange(t *testing.T) { ccdata []byte ) for { - rd = rawNode.Ready() + rd := rawNode.Ready() s.Append(rd.Entries) + rawNode.Advance(rd) // Once we are the leader, propose a command and a ConfChange. if !proposed && rd.SoftState.Lead == rawNode.raft.id { - rawNode.Propose([]byte("somedata")) - - cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1} + if err = rawNode.Propose([]byte("somedata")); err != nil { + t.Fatal(err) + } + cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 1} ccdata, err = cc.Marshal() if err != nil { t.Fatal(err) @@ -127,16 +138,13 @@ func TestRawNodeProposeAndConfChange(t *testing.T) { rawNode.ProposeConfChange(cc) proposed = true - } - rawNode.Advance(rd) - - // Exit when we have four entries: one ConfChange, one no-op for the election, - // our proposed command and proposed ConfChange. - lastIndex, err = s.LastIndex() - if err != nil { - t.Fatal(err) - } - if lastIndex >= 4 { + } else if proposed { + // We proposed last cycle, which means we appended the conf change + // in this cycle. + lastIndex, err = s.LastIndex() + if err != nil { + t.Fatal(err) + } break } } @@ -151,8 +159,8 @@ func TestRawNodeProposeAndConfChange(t *testing.T) { if !bytes.Equal(entries[0].Data, []byte("somedata")) { t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata")) } - if entries[1].Type != raftpb.EntryConfChange { - t.Fatalf("type = %v, want %v", entries[1].Type, raftpb.EntryConfChange) + if entries[1].Type != pb.EntryConfChange { + t.Fatalf("type = %v, want %v", entries[1].Type, pb.EntryConfChange) } if !bytes.Equal(entries[1].Data, ccdata) { t.Errorf("data = %v, want %v", entries[1].Data, ccdata) @@ -163,7 +171,7 @@ func TestRawNodeProposeAndConfChange(t *testing.T) { // not affect the later propose to add new node. func TestRawNodeProposeAddDuplicateNode(t *testing.T) { s := NewMemoryStorage() - rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}}) + rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s)) if err != nil { t.Fatal(err) } @@ -182,13 +190,13 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) { rawNode.Advance(rd) } - proposeConfChangeAndApply := func(cc raftpb.ConfChange) { + proposeConfChangeAndApply := func(cc pb.ConfChange) { rawNode.ProposeConfChange(cc) rd = rawNode.Ready() s.Append(rd.Entries) for _, entry := range rd.CommittedEntries { - if entry.Type == raftpb.EntryConfChange { - var cc raftpb.ConfChange + if entry.Type == pb.EntryConfChange { + var cc pb.ConfChange cc.Unmarshal(entry.Data) rawNode.ApplyConfChange(cc) } @@ -196,7 +204,7 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) { rawNode.Advance(rd) } - cc1 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1} + cc1 := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 1} ccdata1, err := cc1.Marshal() if err != nil { t.Fatal(err) @@ -207,7 +215,7 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) { proposeConfChangeAndApply(cc1) // the new node join should be ok - cc2 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2} + cc2 := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 2} ccdata2, err := cc2.Marshal() if err != nil { t.Fatal(err) @@ -238,16 +246,16 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) { // TestRawNodeReadIndex ensures that Rawnode.ReadIndex sends the MsgReadIndex message // to the underlying raft. It also ensures that ReadState can be read out. func TestRawNodeReadIndex(t *testing.T) { - msgs := []raftpb.Message{} - appendStep := func(r *raft, m raftpb.Message) error { + msgs := []pb.Message{} + appendStep := func(r *raft, m pb.Message) error { msgs = append(msgs, m) return nil } wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}} s := NewMemoryStorage() - c := newTestConfig(1, nil, 10, 1, s) - rawNode, err := NewRawNode(c, []Peer{{ID: 1}}) + c := newTestConfig(1, []uint64{1}, 10, 1, s) + rawNode, err := NewRawNode(c) if err != nil { t.Fatal(err) } @@ -288,8 +296,8 @@ func TestRawNodeReadIndex(t *testing.T) { if len(msgs) != 1 { t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1) } - if msgs[0].Type != raftpb.MsgReadIndex { - t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgReadIndex) + if msgs[0].Type != pb.MsgReadIndex { + t.Errorf("msg type = %d, want %d", msgs[0].Type, pb.MsgReadIndex) } if !bytes.Equal(msgs[0].Entries[0].Data, wrequestCtx) { t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, wrequestCtx) @@ -305,61 +313,108 @@ func TestRawNodeReadIndex(t *testing.T) { // TestNodeStop from node_test.go has no equivalent in rawNode because there is // no goroutine in RawNode. -// TestRawNodeStart ensures that a node can be started correctly. The node should -// start with correct configuration change entries, and can accept and commit -// proposals. +// TestRawNodeStart ensures that a node can be started correctly. Note that RawNode +// requires the application to bootstrap the state, i.e. it does not accept peers +// and will not create faux configuration change entries. func TestRawNodeStart(t *testing.T) { - cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1} - ccdata, err := cc.Marshal() - if err != nil { - t.Fatalf("unexpected marshal error: %v", err) - } - wants := []Ready{ - { - HardState: raftpb.HardState{Term: 1, Commit: 1, Vote: 0}, - Entries: []raftpb.Entry{ - {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata}, - }, - CommittedEntries: []raftpb.Entry{ - {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata}, - }, - MustSync: true, + want := Ready{ + SoftState: &SoftState{Lead: 1, RaftState: StateLeader}, + HardState: pb.HardState{Term: 1, Commit: 3, Vote: 1}, + Entries: []pb.Entry{ + {Term: 1, Index: 2, Data: nil}, // empty entry + {Term: 1, Index: 3, Data: []byte("foo")}, // empty entry }, - { - HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1}, - Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}}, - CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}}, - MustSync: true, + CommittedEntries: []pb.Entry{ + {Term: 1, Index: 2, Data: nil}, // empty entry + {Term: 1, Index: 3, Data: []byte("foo")}, // empty entry }, + MustSync: true, } storage := NewMemoryStorage() - rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage), []Peer{{ID: 1}}) + storage.ents[0].Index = 1 + + // TODO(tbg): this is a first prototype of what bootstrapping could look + // like (without the annoying faux ConfChanges). We want to persist a + // ConfState at some index and make sure that this index can't be reached + // from log position 1, so that followers are forced to pick up the + // ConfState in order to move away from log position 1 (unless they got + // bootstrapped in the same way already). Failing to do so would mean that + // followers diverge from the bootstrapped nodes and don't learn about the + // initial config. + // + // NB: this is exactly what CockroachDB does. The Raft log really begins at + // index 10, so empty followers (at index 1) always need a snapshot first. + type appenderStorage interface { + Storage + ApplySnapshot(pb.Snapshot) error + } + bootstrap := func(storage appenderStorage, cs pb.ConfState) error { + if len(cs.Nodes) == 0 { + return fmt.Errorf("no voters specified") + } + fi, err := storage.FirstIndex() + if err != nil { + return err + } + if fi < 2 { + return fmt.Errorf("FirstIndex >= 2 is prerequisite for bootstrap") + } + if _, err = storage.Entries(fi, fi, math.MaxUint64); err == nil { + // TODO(tbg): match exact error + return fmt.Errorf("should not have been able to load first index") + } + li, err := storage.LastIndex() + if err != nil { + return err + } + if _, err = storage.Entries(li, li, math.MaxUint64); err == nil { + return fmt.Errorf("should not have been able to load last index") + } + hs, ics, err := storage.InitialState() + if err != nil { + return err + } + if !IsEmptyHardState(hs) { + return fmt.Errorf("HardState not empty") + } + if len(ics.Nodes) != 0 { + return fmt.Errorf("ConfState not empty") + } + + meta := pb.SnapshotMetadata{ + Index: 1, + Term: 0, + ConfState: cs, + } + snap := pb.Snapshot{Metadata: meta} + return storage.ApplySnapshot(snap) + } + + if err := bootstrap(storage, pb.ConfState{Nodes: []uint64{1}}); err != nil { + t.Fatal(err) + } + + rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage)) if err != nil { t.Fatal(err) } - rd := rawNode.Ready() - t.Logf("rd %v", rd) - if !reflect.DeepEqual(rd, wants[0]) { - t.Fatalf("#%d: g = %+v,\n w %+v", 1, rd, wants[0]) - } else { - storage.Append(rd.Entries) - rawNode.Advance(rd) + if rawNode.HasReady() { + t.Fatalf("unexpected ready: %+v", rawNode.Ready()) } - storage.Append(rd.Entries) - rawNode.Advance(rd) - rawNode.Campaign() - rd = rawNode.Ready() + rawNode.Propose([]byte("foo")) + if !rawNode.HasReady() { + t.Fatal("expected a Ready") + } + rd := rawNode.Ready() storage.Append(rd.Entries) rawNode.Advance(rd) - rawNode.Propose([]byte("foo")) - if rd = rawNode.Ready(); !reflect.DeepEqual(rd, wants[1]) { - t.Errorf("#%d: g = %+v,\n w %+v", 2, rd, wants[1]) - } else { - storage.Append(rd.Entries) - rawNode.Advance(rd) + rd.SoftState, want.SoftState = nil, nil + + if !reflect.DeepEqual(rd, want) { + t.Fatalf("unexpected Ready:\n%+v\nvs\n%+v", rd, want) } if rawNode.HasReady() { @@ -368,11 +423,11 @@ func TestRawNodeStart(t *testing.T) { } func TestRawNodeRestart(t *testing.T) { - entries := []raftpb.Entry{ + entries := []pb.Entry{ {Term: 1, Index: 1}, {Term: 1, Index: 2, Data: []byte("foo")}, } - st := raftpb.HardState{Term: 1, Commit: 1} + st := pb.HardState{Term: 1, Commit: 1} want := Ready{ HardState: emptyState, @@ -384,7 +439,7 @@ func TestRawNodeRestart(t *testing.T) { storage := NewMemoryStorage() storage.SetHardState(st) storage.Append(entries) - rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage), nil) + rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, storage)) if err != nil { t.Fatal(err) } @@ -399,17 +454,17 @@ func TestRawNodeRestart(t *testing.T) { } func TestRawNodeRestartFromSnapshot(t *testing.T) { - snap := raftpb.Snapshot{ - Metadata: raftpb.SnapshotMetadata{ - ConfState: raftpb.ConfState{Nodes: []uint64{1, 2}}, + snap := pb.Snapshot{ + Metadata: pb.SnapshotMetadata{ + ConfState: pb.ConfState{Nodes: []uint64{1, 2}}, Index: 2, Term: 1, }, } - entries := []raftpb.Entry{ + entries := []pb.Entry{ {Term: 1, Index: 3, Data: []byte("foo")}, } - st := raftpb.HardState{Term: 1, Commit: 3} + st := pb.HardState{Term: 1, Commit: 3} want := Ready{ HardState: emptyState, @@ -422,7 +477,7 @@ func TestRawNodeRestartFromSnapshot(t *testing.T) { s.SetHardState(st) s.ApplySnapshot(snap) s.Append(entries) - rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), nil) + rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s)) if err != nil { t.Fatal(err) } @@ -441,7 +496,7 @@ func TestRawNodeRestartFromSnapshot(t *testing.T) { func TestRawNodeStatus(t *testing.T) { s := NewMemoryStorage() - rn, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s), nil) + rn, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s)) if err != nil { t.Fatal(err) } @@ -489,20 +544,20 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) { s := &ignoreSizeHintMemStorage{ MemoryStorage: NewMemoryStorage(), } - persistedHardState := raftpb.HardState{ + persistedHardState := pb.HardState{ Term: 1, Vote: 1, Commit: 10, } s.hardState = persistedHardState - s.ents = make([]raftpb.Entry, 10) + s.ents = make([]pb.Entry, 10) var size uint64 for i := range s.ents { - ent := raftpb.Entry{ + ent := pb.Entry{ Term: 1, Index: uint64(i + 1), - Type: raftpb.EntryNormal, + Type: pb.EntryNormal, Data: []byte("a"), } @@ -516,14 +571,14 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) { // this and *will* return it (which is how the Commit index ended up being 10 initially). cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1 - s.ents = append(s.ents, raftpb.Entry{ + s.ents = append(s.ents, pb.Entry{ Term: 1, Index: uint64(11), - Type: raftpb.EntryNormal, + Type: pb.EntryNormal, Data: []byte("boom"), }) - rawNode, err := NewRawNode(cfg, []Peer{{ID: 1}}) + rawNode, err := NewRawNode(cfg) if err != nil { t.Fatal(err) } @@ -539,8 +594,8 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) { } highestApplied = rd.CommittedEntries[n-1].Index rawNode.Advance(rd) - rawNode.Step(raftpb.Message{ - Type: raftpb.MsgHeartbeat, + rawNode.Step(pb.Message{ + Type: pb.MsgHeartbeat, To: 1, From: 1, // illegal, but we get away with it Term: 1, @@ -556,13 +611,13 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) { func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) { const maxEntries = 16 data := []byte("testdata") - testEntry := raftpb.Entry{Data: data} + testEntry := pb.Entry{Data: data} maxEntrySize := uint64(maxEntries * PayloadSize(testEntry)) s := NewMemoryStorage() cfg := newTestConfig(1, []uint64{1}, 10, 1, s) cfg.MaxUncommittedEntriesSize = maxEntrySize - rawNode, err := NewRawNode(cfg, []Peer{{ID: 1}}) + rawNode, err := NewRawNode(cfg) if err != nil { t.Fatal(err) } From 500af9165394930fdd55bfb961a6f568100360c4 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 19 Jul 2019 09:55:19 +0200 Subject: [PATCH 3/4] raft: restore ability to bootstrap RawNode We are worried about breaking backwards compatibility for any application out there that may have relied on the old behavior. Their RawNode invocation would have been broken by the removal of the peers argument so it would not have changed silently; an associated comment tells callers how to fix it. --- raft/bootstrap.go | 80 +++++++++++++++++++++++++++++++++++++++++++++++ raft/node.go | 47 +--------------------------- raft/rawnode.go | 6 ++++ 3 files changed, 87 insertions(+), 46 deletions(-) create mode 100644 raft/bootstrap.go diff --git a/raft/bootstrap.go b/raft/bootstrap.go new file mode 100644 index 000000000..fdd098756 --- /dev/null +++ b/raft/bootstrap.go @@ -0,0 +1,80 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package raft + +import ( + "errors" + + pb "go.etcd.io/etcd/raft/raftpb" +) + +// Bootstrap initializes the RawNode for first use by appending configuration +// changes for the supplied peers. This method returns an error if the Storage +// is nonempty. +// +// It is recommended that instead of calling this method, applications bootstrap +// their state manually by setting up a Storage that has a first index > 1 and +// which stores the desired ConfState as its InitialState. +func (rn *RawNode) Bootstrap(peers []Peer) error { + if len(peers) == 0 { + return errors.New("must provide at least one peer to Bootstrap") + } + lastIndex, err := rn.raft.raftLog.storage.LastIndex() + if err != nil { + return err + } + + if lastIndex != 0 { + return errors.New("can't bootstrap a nonempty Storage") + } + + // We've faked out initial entries above, but nothing has been + // persisted. Start with an empty HardState (thus the first Ready will + // emit a HardState update for the app to persist). + rn.prevHardSt = emptyState + + // TODO(tbg): remove StartNode and give the application the right tools to + // bootstrap the initial membership in a cleaner way. + rn.raft.becomeFollower(1, None) + ents := make([]pb.Entry, len(peers)) + for i, peer := range peers { + cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} + data, err := cc.Marshal() + if err != nil { + return err + } + + ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data} + } + rn.raft.raftLog.append(ents...) + + // Now apply them, mainly so that the application can call Campaign + // immediately after StartNode in tests. Note that these nodes will + // be added to raft twice: here and when the application's Ready + // loop calls ApplyConfChange. The calls to addNode must come after + // all calls to raftLog.append so progress.next is set after these + // bootstrapping entries (it is an error if we try to append these + // entries since they have already been committed). + // We do not set raftLog.applied so the application will be able + // to observe all conf changes via Ready.CommittedEntries. + // + // TODO(bdarnell): These entries are still unstable; do we need to preserve + // the invariant that committed < unstable? + rn.raft.raftLog.committed = uint64(len(ents)) + for _, peer := range peers { + rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}) + } + return nil +} diff --git a/raft/node.go b/raft/node.go index 7c0d154bf..6b730c0d4 100644 --- a/raft/node.go +++ b/raft/node.go @@ -207,52 +207,7 @@ func StartNode(c *Config, peers []Peer) Node { if err != nil { panic(err) } - - lastIndex, err := rn.raft.raftLog.storage.LastIndex() - if err != nil { - panic(err) - } - - if lastIndex != 0 { - panic("can't StartNode on a nonempty Storage") - } - - // We've faked out initial entries above, but nothing has been - // persisted. Start with an empty HardState (thus the first Ready will - // emit a HardState update for the app to persist). - rn.prevHardSt = emptyState - - // TODO(tbg): remove StartNode and give the application the right tools to - // bootstrap the initial membership in a cleaner way. - rn.raft.becomeFollower(1, None) - ents := make([]pb.Entry, len(peers)) - for i, peer := range peers { - cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} - data, err := cc.Marshal() - if err != nil { - panic(err) - } - - ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data} - } - rn.raft.raftLog.append(ents...) - - // Now apply them, mainly so that the application can call Campaign - // immediately after StartNode in tests. Note that these nodes will - // be added to raft twice: here and when the application's Ready - // loop calls ApplyConfChange. The calls to addNode must come after - // all calls to raftLog.append so progress.next is set after these - // bootstrapping entries (it is an error if we try to append these - // entries since they have already been committed). - // We do not set raftLog.applied so the application will be able - // to observe all conf changes via Ready.CommittedEntries. - // - // TODO(bdarnell): These entries are still unstable; do we need to preserve - // the invariant that committed < unstable? - rn.raft.raftLog.committed = uint64(len(ents)) - for _, peer := range peers { - rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}) - } + rn.Bootstrap(peers) n := newNode() n.logger = c.Logger diff --git a/raft/rawnode.go b/raft/rawnode.go index ff6faf2ab..b7e534346 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -38,6 +38,12 @@ type RawNode struct { } // NewRawNode instantiates a RawNode from the given configuration. +// +// See Bootstrap() for bootstrapping an initial state; this replaces the former +// 'peers' argument to this method (with identical behavior). However, It is +// recommended that instead of calling Bootstrap, applications bootstrap their +// state manually by setting up a Storage that has a first index > 1 and which +// stores the desired ConfState as its InitialState. func NewRawNode(config *Config) (*RawNode, error) { if config.ID == 0 { panic("config.ID must not be zero") From caa48bcc3d6146f0a55232d69c5ca76ea7756727 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 19 Jul 2019 12:35:14 +0200 Subject: [PATCH 4/4] raft: remove TestNodeBoundedLogGrowthWithPartition It has a data race between the test's call to `reduceUncommittedSize` and a corresponding call during Ready handling in `(*node).run()`. The corresponding RawNode test still verifies the functionality, so instead of fixing the test we can remove it. --- raft/node_test.go | 57 ----------------------------------------------- 1 file changed, 57 deletions(-) diff --git a/raft/node_test.go b/raft/node_test.go index 8ee9453d1..af0728522 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -1018,60 +1018,3 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) { ) } } - -// TestNodeBoundedLogGrowthWithPartition tests a scenario where a leader is -// partitioned from a quorum of nodes. It verifies that the leader's log is -// protected from unbounded growth even as new entries continue to be proposed. -// This protection is provided by the MaxUncommittedEntriesSize configuration. -func TestNodeBoundedLogGrowthWithPartition(t *testing.T) { - const maxEntries = 16 - data := []byte("testdata") - testEntry := raftpb.Entry{Data: data} - maxEntrySize := uint64(maxEntries * PayloadSize(testEntry)) - - s := NewMemoryStorage() - cfg := newTestConfig(1, []uint64{1}, 10, 1, s) - cfg.MaxUncommittedEntriesSize = maxEntrySize - rn, err := NewRawNode(cfg) - if err != nil { - t.Fatal(err) - } - n := newNode() - go n.run(rn) - defer n.Stop() - n.Campaign(context.TODO()) - - rd := readyWithTimeout(&n) - if len(rd.CommittedEntries) != 1 { - t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries)) - } - s.Append(rd.Entries) - n.Advance() - - // Simulate a network partition while we make our proposals by never - // committing anything. These proposals should not cause the leader's - // log to grow indefinitely. - for i := 0; i < 1024; i++ { - _ = n.Propose(context.TODO(), data) - } - - // Check the size of leader's uncommitted log tail. It should not exceed the - // MaxUncommittedEntriesSize limit. - checkUncommitted := func(exp uint64) { - t.Helper() - if a := rn.raft.uncommittedSize; exp != a { - t.Fatalf("expected %d uncommitted entry bytes, found %d", exp, a) - } - } - checkUncommitted(maxEntrySize) - - // Recover from the partition. The uncommitted tail of the Raft log should - // disappear as entries are committed. - rd = readyWithTimeout(&n) - if len(rd.CommittedEntries) != maxEntries { - t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.CommittedEntries)) - } - s.Append(rd.Entries) - n.Advance() - checkUncommitted(0) -}