diff --git a/raft/node.go b/raft/node.go index 4a3b2f1df..e01a7167a 100644 --- a/raft/node.go +++ b/raft/node.go @@ -198,51 +198,15 @@ type Peer struct { // StartNode returns a new Node given configuration and a list of raft peers. // It appends a ConfChangeAddNode entry for each given peer to the initial log. func StartNode(c *Config, peers []Peer) Node { - r := newRaft(c) - // become the follower at term 1 and apply initial configuration - // entries of term 1 - r.becomeFollower(1, None) - for _, peer := range peers { - cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} - d, err := cc.Marshal() - if err != nil { - panic("unexpected marshal error") - } - // TODO(tbg): this should append the ConfChange for the own node first - // and also call applyConfChange below for that node first. Otherwise - // we have a Raft group (for a little while) that doesn't have itself - // in its config, which is bad. - // This whole way of setting things up is rickety. The app should just - // populate the initial ConfState appropriately and then all of this - // goes away. - e := pb.Entry{ - Type: pb.EntryConfChange, - Term: 1, - Index: r.raftLog.lastIndex() + 1, - Data: d, - } - r.raftLog.append(e) - } - // Mark these initial entries as committed. - // TODO(bdarnell): These entries are still unstable; do we need to preserve - // the invariant that committed < unstable? - r.raftLog.committed = r.raftLog.lastIndex() - // Now apply them, mainly so that the application can call Campaign - // immediately after StartNode in tests. Note that these nodes will - // be added to raft twice: here and when the application's Ready - // loop calls ApplyConfChange. The calls to addNode must come after - // all calls to raftLog.append so progress.next is set after these - // bootstrapping entries (it is an error if we try to append these - // entries since they have already been committed). - // We do not set raftLog.applied so the application will be able - // to observe all conf changes via Ready.CommittedEntries. - for _, peer := range peers { - r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}) + rn, err := NewRawNode(c, peers) + if err != nil { + panic(err) } n := newNode() n.logger = c.Logger - go n.run(r) + + go n.run(rn) return &n } @@ -251,12 +215,7 @@ func StartNode(c *Config, peers []Peer) Node { // If the caller has an existing state machine, pass in the last log index that // has been applied to it; otherwise use zero. func RestartNode(c *Config) Node { - r := newRaft(c) - - n := newNode() - n.logger = c.Logger - go n.run(r) - return &n + return StartNode(c, nil) } type msgWithResult struct { @@ -310,30 +269,30 @@ func (n *node) Stop() { <-n.done } -func (n *node) run(r *raft) { +func (n *node) run(rn *RawNode) { var propc chan msgWithResult var readyc chan Ready var advancec chan struct{} - var prevLastUnstablei, prevLastUnstablet uint64 - var havePrevLastUnstablei bool - var prevSnapi uint64 - var applyingToI uint64 var rd Ready + r := rn.raft + lead := None - prevSoftSt := r.softState() - prevHardSt := emptyState for { if advancec != nil { readyc = nil - } else { - rd = newReady(r, prevSoftSt, prevHardSt) - if rd.containsUpdates() { - readyc = n.readyc - } else { - readyc = nil - } + } else if rn.HasReady() { + // Populate a Ready. Note that this Ready is not guaranteed to + // actually be handled. We will arm readyc, but there's no guarantee + // that we will actually send on it. It's possible that we will + // service another channel instead, loop around, and then populate + // the Ready again. We could instead force the previous Ready to be + // handled first, but it's generally good to emit larger Readys plus + // it simplifies testing (by emitting less frequently and more + // predictably). + rd = rn.Ready() + readyc = n.readyc } if lead != r.lead { @@ -382,40 +341,13 @@ func (n *node) run(r *raft) { case <-n.done: } case <-n.tickc: - r.tick() + rn.Tick() case readyc <- rd: - if rd.SoftState != nil { - prevSoftSt = rd.SoftState - } - if len(rd.Entries) > 0 { - prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index - prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term - havePrevLastUnstablei = true - } - if !IsEmptyHardState(rd.HardState) { - prevHardSt = rd.HardState - } - if !IsEmptySnap(rd.Snapshot) { - prevSnapi = rd.Snapshot.Metadata.Index - } - if index := rd.appliedCursor(); index != 0 { - applyingToI = index - } - - r.msgs = nil - r.readStates = nil - r.reduceUncommittedSize(rd.CommittedEntries) + rn.acceptReady(rd) advancec = n.advancec case <-advancec: - if applyingToI != 0 { - r.raftLog.appliedTo(applyingToI) - applyingToI = 0 - } - if havePrevLastUnstablei { - r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet) - havePrevLastUnstablei = false - } - r.raftLog.stableSnapTo(prevSnapi) + rn.commitReady(rd) + rd = Ready{} advancec = nil case c := <-n.status: c <- getStatus(r) diff --git a/raft/node_bench_test.go b/raft/node_bench_test.go index c3fc89f3f..d0ecdc519 100644 --- a/raft/node_bench_test.go +++ b/raft/node_bench_test.go @@ -26,8 +26,8 @@ func BenchmarkOneNode(b *testing.B) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - go n.run(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + go n.run(rn) defer n.Stop() diff --git a/raft/node_test.go b/raft/node_test.go index 641a5ca2f..29e35b2ed 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -132,9 +132,12 @@ func TestNodePropose(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - go n.run(r) - n.Campaign(context.TODO()) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + r := rn.raft + go n.run(rn) + if err := n.Campaign(context.TODO()); err != nil { + t.Fatal(err) + } for { rd := <-n.Ready() s.Append(rd.Entries) @@ -172,10 +175,11 @@ func TestNodeReadIndex(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + r := rn.raft r.readStates = wrs - go n.run(r) + go n.run(rn) n.Campaign(context.TODO()) for { rd := <-n.Ready() @@ -309,8 +313,9 @@ func TestNodeProposeConfig(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - go n.run(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + r := rn.raft + go n.run(rn) n.Campaign(context.TODO()) for { rd := <-n.Ready() @@ -347,8 +352,8 @@ func TestNodeProposeConfig(t *testing.T) { func TestNodeProposeAddDuplicateNode(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - go n.run(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + go n.run(rn) n.Campaign(context.TODO()) rdyEntries := make([]raftpb.Entry, 0) ticker := time.NewTicker(time.Millisecond * 100) @@ -422,8 +427,8 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) { // who is the current leader. func TestBlockProposal(t *testing.T) { n := newNode() - r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) - go n.run(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, NewMemoryStorage()) + go n.run(rn) defer n.Stop() errc := make(chan error, 1) @@ -463,8 +468,9 @@ func TestNodeProposeWaitDropped(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - go n.run(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + r := rn.raft + go n.run(rn) n.Campaign(context.TODO()) for { rd := <-n.Ready() @@ -497,8 +503,9 @@ func TestNodeProposeWaitDropped(t *testing.T) { func TestNodeTick(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - go n.run(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + r := rn.raft + go n.run(rn) elapsed := r.electionElapsed n.Tick() @@ -517,11 +524,11 @@ func TestNodeTick(t *testing.T) { func TestNodeStop(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) donec := make(chan struct{}) go func() { - n.run(r) + n.run(rn) close(donec) }() @@ -618,7 +625,9 @@ func TestNodeStart(t *testing.T) { n.Advance() } - n.Campaign(ctx) + if err := n.Campaign(ctx); err != nil { + t.Fatal(err) + } rd := <-n.Ready() storage.Append(rd.Entries) n.Advance() @@ -646,10 +655,12 @@ func TestNodeRestart(t *testing.T) { st := raftpb.HardState{Term: 1, Commit: 1} want := Ready{ - HardState: st, + // No HardState is emitted because there was no change. + HardState: raftpb.HardState{}, // commit up to index commit index in st CommittedEntries: entries[:st.Commit], - MustSync: true, + // MustSync is false because no HardState or new entries are provided. + MustSync: false, } storage := NewMemoryStorage() @@ -691,10 +702,14 @@ func TestNodeRestartFromSnapshot(t *testing.T) { st := raftpb.HardState{Term: 1, Commit: 3} want := Ready{ - HardState: st, + // No HardState is emitted because nothing changed relative to what is + // already persisted. + HardState: raftpb.HardState{}, // commit up to index commit index in st CommittedEntries: entries, - MustSync: true, + // MustSync is only true when there is a new HardState or new entries; + // neither is the case here. + MustSync: false, } s := NewMemoryStorage() @@ -800,8 +815,8 @@ func TestNodeProposeAddLearnerNode(t *testing.T) { defer ticker.Stop() n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - go n.run(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + go n.run(rn) n.Campaign(context.TODO()) stop := make(chan struct{}) done := make(chan struct{}) @@ -895,9 +910,12 @@ func TestCommitPagination(t *testing.T) { s := NewMemoryStorage() cfg := newTestConfig(1, []uint64{1}, 10, 1, s) cfg.MaxCommittedSizePerReady = 2048 - r := newRaft(cfg) + rn, err := NewRawNode(cfg, nil) + if err != nil { + t.Fatal(err) + } n := newNode() - go n.run(r) + go n.run(rn) n.Campaign(context.TODO()) rd := readyWithTimeout(&n) @@ -984,9 +1002,12 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) { // this and *will* return it (which is how the Commit index ended up being 10 initially). cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1 - r := newRaft(cfg) + rn, err := NewRawNode(cfg, nil) + if err != nil { + t.Fatal(err) + } n := newNode() - go n.run(r) + go n.run(rn) defer n.Stop() rd := readyWithTimeout(&n) @@ -1011,9 +1032,12 @@ func TestNodeBoundedLogGrowthWithPartition(t *testing.T) { s := NewMemoryStorage() cfg := newTestConfig(1, []uint64{1}, 10, 1, s) cfg.MaxUncommittedEntriesSize = maxEntrySize - r := newRaft(cfg) + rn, err := NewRawNode(cfg, nil) + if err != nil { + t.Fatal(err) + } n := newNode() - go n.run(r) + go n.run(rn) defer n.Stop() n.Campaign(context.TODO()) @@ -1028,14 +1052,14 @@ func TestNodeBoundedLogGrowthWithPartition(t *testing.T) { // committing anything. These proposals should not cause the leader's // log to grow indefinitely. for i := 0; i < 1024; i++ { - n.Propose(context.TODO(), data) + _ = n.Propose(context.TODO(), data) } // Check the size of leader's uncommitted log tail. It should not exceed the // MaxUncommittedEntriesSize limit. checkUncommitted := func(exp uint64) { t.Helper() - if a := r.uncommittedSize; exp != a { + if a := rn.raft.uncommittedSize; exp != a { t.Fatalf("expected %d uncommitted entry bytes, found %d", exp, a) } } diff --git a/raft/raft_test.go b/raft/raft_test.go index fc27fee11..b42db1522 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -4310,3 +4310,12 @@ func newTestLearnerRaft(id uint64, peers []uint64, learners []uint64, election, cfg.learners = learners return newRaft(cfg) } + +func newTestRawNode(id uint64, peers []uint64, election, heartbeat int, storage Storage) *RawNode { + cfg := newTestConfig(id, peers, election, heartbeat, storage) + rn, err := NewRawNode(cfg, nil) + if err != nil { + panic(err) + } + return rn +} diff --git a/raft/rawnode.go b/raft/rawnode.go index c2bc7c132..378a931ab 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -37,38 +37,6 @@ type RawNode struct { prevHardSt pb.HardState } -func (rn *RawNode) newReady() Ready { - return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt) -} - -func (rn *RawNode) commitReady(rd Ready) { - if rd.SoftState != nil { - rn.prevSoftSt = rd.SoftState - } - if !IsEmptyHardState(rd.HardState) { - rn.prevHardSt = rd.HardState - } - - // If entries were applied (or a snapshot), update our cursor for - // the next Ready. Note that if the current HardState contains a - // new Commit index, this does not mean that we're also applying - // all of the new entries due to commit pagination by size. - if index := rd.appliedCursor(); index > 0 { - rn.raft.raftLog.appliedTo(index) - } - - if len(rd.Entries) > 0 { - e := rd.Entries[len(rd.Entries)-1] - rn.raft.raftLog.stableTo(e.Index, e.Term) - } - if !IsEmptySnap(rd.Snapshot) { - rn.raft.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index) - } - if len(rd.ReadStates) != 0 { - rn.raft.readStates = nil - } -} - // NewRawNode returns a new RawNode given configuration and a list of raft peers. func NewRawNode(config *Config, peers []Peer) (*RawNode, error) { if config.ID == 0 { @@ -78,27 +46,48 @@ func NewRawNode(config *Config, peers []Peer) (*RawNode, error) { rn := &RawNode{ raft: r, } - lastIndex, err := config.Storage.LastIndex() + if err := rn.init(peers); err != nil { + return nil, err + } + return rn, nil +} + +func (rn *RawNode) init(peers []Peer) error { + r := rn.raft + lastIndex, err := rn.raft.raftLog.storage.LastIndex() if err != nil { - panic(err) // TODO(bdarnell) + return err } // If the log is empty, this is a new RawNode (like StartNode); otherwise it's // restoring an existing RawNode (like RestartNode). // TODO(bdarnell): rethink RawNode initialization and whether the application needs // to be able to tell us when it expects the RawNode to exist. if lastIndex == 0 { - r.becomeFollower(1, None) + rn.raft.becomeFollower(1, None) ents := make([]pb.Entry, len(peers)) for i, peer := range peers { cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} data, err := cc.Marshal() if err != nil { - panic("unexpected marshal error") + return err } ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data} } - r.raftLog.append(ents...) + rn.raft.raftLog.append(ents...) + + // Now apply them, mainly so that the application can call Campaign + // immediately after StartNode in tests. Note that these nodes will + // be added to raft twice: here and when the application's Ready + // loop calls ApplyConfChange. The calls to addNode must come after + // all calls to raftLog.append so progress.next is set after these + // bootstrapping entries (it is an error if we try to append these + // entries since they have already been committed). + // We do not set raftLog.applied so the application will be able + // to observe all conf changes via Ready.CommittedEntries. + // + // TODO(bdarnell): These entries are still unstable; do we need to preserve + // the invariant that committed < unstable? r.raftLog.committed = uint64(len(ents)) for _, peer := range peers { r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}) @@ -113,7 +102,7 @@ func NewRawNode(config *Config, peers []Peer) (*RawNode, error) { rn.prevHardSt = r.hardState() } - return rn, nil + return nil } // Tick advances the internal logical clock by a single tick. @@ -182,14 +171,61 @@ func (rn *RawNode) Step(m pb.Message) error { return ErrStepPeerNotFound } -// Ready returns the current point-in-time state of this RawNode. +// Ready returns the outstanding work that the application needs to handle. This +// includes appending and applying entries or a snapshot, updating the HardState, +// and sending messages. Ready() is a read-only operation, that is, it does not +// require the caller to actually handle the result. Typically, a caller will +// want to handle the Ready and must pass the Ready to Advance *after* having +// done so. While a Ready is being handled, the RawNode must not be used for +// operations that may alter its state. For example, it is illegal to call +// Ready, followed by Step, followed by Advance. func (rn *RawNode) Ready() Ready { rd := rn.newReady() - rn.raft.msgs = nil - rn.raft.reduceUncommittedSize(rd.CommittedEntries) return rd } +func (rn *RawNode) newReady() Ready { + return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt) +} + +// acceptReady is called when the consumer of the RawNode has decided to go +// ahead and handle a Ready. Nothing must alter the state of the RawNode between +// this call and the prior call to Ready(). +func (rn *RawNode) acceptReady(rd Ready) { + if rd.SoftState != nil { + rn.prevSoftSt = rd.SoftState + } + if len(rd.ReadStates) != 0 { + rn.raft.readStates = nil + } + rn.raft.msgs = nil +} + +// commitReady is called when the consumer of the RawNode has successfully +// handled a Ready (having previously called acceptReady). +func (rn *RawNode) commitReady(rd Ready) { + if !IsEmptyHardState(rd.HardState) { + rn.prevHardSt = rd.HardState + } + + // If entries were applied (or a snapshot), update our cursor for + // the next Ready. Note that if the current HardState contains a + // new Commit index, this does not mean that we're also applying + // all of the new entries due to commit pagination by size. + if index := rd.appliedCursor(); index > 0 { + rn.raft.raftLog.appliedTo(index) + } + rn.raft.reduceUncommittedSize(rd.CommittedEntries) + + if len(rd.Entries) > 0 { + e := rd.Entries[len(rd.Entries)-1] + rn.raft.raftLog.stableTo(e.Index, e.Term) + } + if !IsEmptySnap(rd.Snapshot) { + rn.raft.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index) + } +} + // HasReady called when RawNode user need to check if any Ready pending. // Checking logic in this method should be consistent with Ready.containsUpdates(). func (rn *RawNode) HasReady() bool { @@ -215,6 +251,11 @@ func (rn *RawNode) HasReady() bool { // Advance notifies the RawNode that the application has applied and saved progress in the // last Ready results. func (rn *RawNode) Advance(rd Ready) { + // Advance combines accept and commit. Callers can't mutate the RawNode + // between the call to Ready and the matching call to Advance, or the work + // done in acceptReady will clobber potentially newer data that has not been + // emitted in a Ready yet. + rn.acceptReady(rd) rn.commitReady(rd) }