diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 6f42c3504..75845d772 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -479,7 +479,11 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id } } - n = raft.StartNode(c, peers) + if len(peers) == 0 { + n = raft.RestartNode(c) + } else { + n = raft.StartNode(c, peers) + } raftStatusMu.Lock() raftStatus = n.Status raftStatusMu.Unlock() diff --git a/raft/bootstrap.go b/raft/bootstrap.go new file mode 100644 index 000000000..fdd098756 --- /dev/null +++ b/raft/bootstrap.go @@ -0,0 +1,80 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package raft + +import ( + "errors" + + pb "go.etcd.io/etcd/raft/raftpb" +) + +// Bootstrap initializes the RawNode for first use by appending configuration +// changes for the supplied peers. This method returns an error if the Storage +// is nonempty. +// +// It is recommended that instead of calling this method, applications bootstrap +// their state manually by setting up a Storage that has a first index > 1 and +// which stores the desired ConfState as its InitialState. +func (rn *RawNode) Bootstrap(peers []Peer) error { + if len(peers) == 0 { + return errors.New("must provide at least one peer to Bootstrap") + } + lastIndex, err := rn.raft.raftLog.storage.LastIndex() + if err != nil { + return err + } + + if lastIndex != 0 { + return errors.New("can't bootstrap a nonempty Storage") + } + + // We've faked out initial entries above, but nothing has been + // persisted. Start with an empty HardState (thus the first Ready will + // emit a HardState update for the app to persist). + rn.prevHardSt = emptyState + + // TODO(tbg): remove StartNode and give the application the right tools to + // bootstrap the initial membership in a cleaner way. + rn.raft.becomeFollower(1, None) + ents := make([]pb.Entry, len(peers)) + for i, peer := range peers { + cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} + data, err := cc.Marshal() + if err != nil { + return err + } + + ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data} + } + rn.raft.raftLog.append(ents...) + + // Now apply them, mainly so that the application can call Campaign + // immediately after StartNode in tests. Note that these nodes will + // be added to raft twice: here and when the application's Ready + // loop calls ApplyConfChange. The calls to addNode must come after + // all calls to raftLog.append so progress.next is set after these + // bootstrapping entries (it is an error if we try to append these + // entries since they have already been committed). + // We do not set raftLog.applied so the application will be able + // to observe all conf changes via Ready.CommittedEntries. + // + // TODO(bdarnell): These entries are still unstable; do we need to preserve + // the invariant that committed < unstable? + rn.raft.raftLog.committed = uint64(len(ents)) + for _, peer := range peers { + rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}) + } + return nil +} diff --git a/raft/node.go b/raft/node.go index 4a3b2f1df..6b730c0d4 100644 --- a/raft/node.go +++ b/raft/node.go @@ -197,52 +197,22 @@ type Peer struct { // StartNode returns a new Node given configuration and a list of raft peers. // It appends a ConfChangeAddNode entry for each given peer to the initial log. +// +// Peers must not be zero length; call RestartNode in that case. func StartNode(c *Config, peers []Peer) Node { - r := newRaft(c) - // become the follower at term 1 and apply initial configuration - // entries of term 1 - r.becomeFollower(1, None) - for _, peer := range peers { - cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} - d, err := cc.Marshal() - if err != nil { - panic("unexpected marshal error") - } - // TODO(tbg): this should append the ConfChange for the own node first - // and also call applyConfChange below for that node first. Otherwise - // we have a Raft group (for a little while) that doesn't have itself - // in its config, which is bad. - // This whole way of setting things up is rickety. The app should just - // populate the initial ConfState appropriately and then all of this - // goes away. - e := pb.Entry{ - Type: pb.EntryConfChange, - Term: 1, - Index: r.raftLog.lastIndex() + 1, - Data: d, - } - r.raftLog.append(e) + if len(peers) == 0 { + panic("no peers given; use RestartNode instead") } - // Mark these initial entries as committed. - // TODO(bdarnell): These entries are still unstable; do we need to preserve - // the invariant that committed < unstable? - r.raftLog.committed = r.raftLog.lastIndex() - // Now apply them, mainly so that the application can call Campaign - // immediately after StartNode in tests. Note that these nodes will - // be added to raft twice: here and when the application's Ready - // loop calls ApplyConfChange. The calls to addNode must come after - // all calls to raftLog.append so progress.next is set after these - // bootstrapping entries (it is an error if we try to append these - // entries since they have already been committed). - // We do not set raftLog.applied so the application will be able - // to observe all conf changes via Ready.CommittedEntries. - for _, peer := range peers { - r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}) + rn, err := NewRawNode(c) + if err != nil { + panic(err) } + rn.Bootstrap(peers) n := newNode() n.logger = c.Logger - go n.run(r) + + go n.run(rn) return &n } @@ -251,11 +221,13 @@ func StartNode(c *Config, peers []Peer) Node { // If the caller has an existing state machine, pass in the last log index that // has been applied to it; otherwise use zero. func RestartNode(c *Config) Node { - r := newRaft(c) - + rn, err := NewRawNode(c) + if err != nil { + panic(err) + } n := newNode() n.logger = c.Logger - go n.run(r) + go n.run(rn) return &n } @@ -310,30 +282,30 @@ func (n *node) Stop() { <-n.done } -func (n *node) run(r *raft) { +func (n *node) run(rn *RawNode) { var propc chan msgWithResult var readyc chan Ready var advancec chan struct{} - var prevLastUnstablei, prevLastUnstablet uint64 - var havePrevLastUnstablei bool - var prevSnapi uint64 - var applyingToI uint64 var rd Ready + r := rn.raft + lead := None - prevSoftSt := r.softState() - prevHardSt := emptyState for { if advancec != nil { readyc = nil - } else { - rd = newReady(r, prevSoftSt, prevHardSt) - if rd.containsUpdates() { - readyc = n.readyc - } else { - readyc = nil - } + } else if rn.HasReady() { + // Populate a Ready. Note that this Ready is not guaranteed to + // actually be handled. We will arm readyc, but there's no guarantee + // that we will actually send on it. It's possible that we will + // service another channel instead, loop around, and then populate + // the Ready again. We could instead force the previous Ready to be + // handled first, but it's generally good to emit larger Readys plus + // it simplifies testing (by emitting less frequently and more + // predictably). + rd = rn.Ready() + readyc = n.readyc } if lead != r.lead { @@ -382,40 +354,13 @@ func (n *node) run(r *raft) { case <-n.done: } case <-n.tickc: - r.tick() + rn.Tick() case readyc <- rd: - if rd.SoftState != nil { - prevSoftSt = rd.SoftState - } - if len(rd.Entries) > 0 { - prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index - prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term - havePrevLastUnstablei = true - } - if !IsEmptyHardState(rd.HardState) { - prevHardSt = rd.HardState - } - if !IsEmptySnap(rd.Snapshot) { - prevSnapi = rd.Snapshot.Metadata.Index - } - if index := rd.appliedCursor(); index != 0 { - applyingToI = index - } - - r.msgs = nil - r.readStates = nil - r.reduceUncommittedSize(rd.CommittedEntries) + rn.acceptReady(rd) advancec = n.advancec case <-advancec: - if applyingToI != 0 { - r.raftLog.appliedTo(applyingToI) - applyingToI = 0 - } - if havePrevLastUnstablei { - r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet) - havePrevLastUnstablei = false - } - r.raftLog.stableSnapTo(prevSnapi) + rn.commitReady(rd) + rd = Ready{} advancec = nil case c := <-n.status: c <- getStatus(r) diff --git a/raft/node_bench_test.go b/raft/node_bench_test.go index c3fc89f3f..d0ecdc519 100644 --- a/raft/node_bench_test.go +++ b/raft/node_bench_test.go @@ -26,8 +26,8 @@ func BenchmarkOneNode(b *testing.B) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - go n.run(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + go n.run(rn) defer n.Stop() diff --git a/raft/node_test.go b/raft/node_test.go index 641a5ca2f..af0728522 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -132,9 +132,12 @@ func TestNodePropose(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - go n.run(r) - n.Campaign(context.TODO()) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + r := rn.raft + go n.run(rn) + if err := n.Campaign(context.TODO()); err != nil { + t.Fatal(err) + } for { rd := <-n.Ready() s.Append(rd.Entries) @@ -172,10 +175,11 @@ func TestNodeReadIndex(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + r := rn.raft r.readStates = wrs - go n.run(r) + go n.run(rn) n.Campaign(context.TODO()) for { rd := <-n.Ready() @@ -309,8 +313,9 @@ func TestNodeProposeConfig(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - go n.run(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + r := rn.raft + go n.run(rn) n.Campaign(context.TODO()) for { rd := <-n.Ready() @@ -347,8 +352,8 @@ func TestNodeProposeConfig(t *testing.T) { func TestNodeProposeAddDuplicateNode(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - go n.run(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + go n.run(rn) n.Campaign(context.TODO()) rdyEntries := make([]raftpb.Entry, 0) ticker := time.NewTicker(time.Millisecond * 100) @@ -422,8 +427,8 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) { // who is the current leader. func TestBlockProposal(t *testing.T) { n := newNode() - r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) - go n.run(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, NewMemoryStorage()) + go n.run(rn) defer n.Stop() errc := make(chan error, 1) @@ -463,8 +468,9 @@ func TestNodeProposeWaitDropped(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - go n.run(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + r := rn.raft + go n.run(rn) n.Campaign(context.TODO()) for { rd := <-n.Ready() @@ -497,8 +503,9 @@ func TestNodeProposeWaitDropped(t *testing.T) { func TestNodeTick(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - go n.run(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + r := rn.raft + go n.run(rn) elapsed := r.electionElapsed n.Tick() @@ -517,11 +524,11 @@ func TestNodeTick(t *testing.T) { func TestNodeStop(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) donec := make(chan struct{}) go func() { - n.run(r) + n.run(rn) close(donec) }() @@ -618,7 +625,9 @@ func TestNodeStart(t *testing.T) { n.Advance() } - n.Campaign(ctx) + if err := n.Campaign(ctx); err != nil { + t.Fatal(err) + } rd := <-n.Ready() storage.Append(rd.Entries) n.Advance() @@ -646,10 +655,12 @@ func TestNodeRestart(t *testing.T) { st := raftpb.HardState{Term: 1, Commit: 1} want := Ready{ - HardState: st, + // No HardState is emitted because there was no change. + HardState: raftpb.HardState{}, // commit up to index commit index in st CommittedEntries: entries[:st.Commit], - MustSync: true, + // MustSync is false because no HardState or new entries are provided. + MustSync: false, } storage := NewMemoryStorage() @@ -691,10 +702,14 @@ func TestNodeRestartFromSnapshot(t *testing.T) { st := raftpb.HardState{Term: 1, Commit: 3} want := Ready{ - HardState: st, + // No HardState is emitted because nothing changed relative to what is + // already persisted. + HardState: raftpb.HardState{}, // commit up to index commit index in st CommittedEntries: entries, - MustSync: true, + // MustSync is only true when there is a new HardState or new entries; + // neither is the case here. + MustSync: false, } s := NewMemoryStorage() @@ -800,8 +815,8 @@ func TestNodeProposeAddLearnerNode(t *testing.T) { defer ticker.Stop() n := newNode() s := NewMemoryStorage() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - go n.run(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + go n.run(rn) n.Campaign(context.TODO()) stop := make(chan struct{}) done := make(chan struct{}) @@ -895,9 +910,12 @@ func TestCommitPagination(t *testing.T) { s := NewMemoryStorage() cfg := newTestConfig(1, []uint64{1}, 10, 1, s) cfg.MaxCommittedSizePerReady = 2048 - r := newRaft(cfg) + rn, err := NewRawNode(cfg) + if err != nil { + t.Fatal(err) + } n := newNode() - go n.run(r) + go n.run(rn) n.Campaign(context.TODO()) rd := readyWithTimeout(&n) @@ -984,9 +1002,12 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) { // this and *will* return it (which is how the Commit index ended up being 10 initially). cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1 - r := newRaft(cfg) + rn, err := NewRawNode(cfg) + if err != nil { + t.Fatal(err) + } n := newNode() - go n.run(r) + go n.run(rn) defer n.Stop() rd := readyWithTimeout(&n) @@ -997,57 +1018,3 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) { ) } } - -// TestNodeBoundedLogGrowthWithPartition tests a scenario where a leader is -// partitioned from a quorum of nodes. It verifies that the leader's log is -// protected from unbounded growth even as new entries continue to be proposed. -// This protection is provided by the MaxUncommittedEntriesSize configuration. -func TestNodeBoundedLogGrowthWithPartition(t *testing.T) { - const maxEntries = 16 - data := []byte("testdata") - testEntry := raftpb.Entry{Data: data} - maxEntrySize := uint64(maxEntries * PayloadSize(testEntry)) - - s := NewMemoryStorage() - cfg := newTestConfig(1, []uint64{1}, 10, 1, s) - cfg.MaxUncommittedEntriesSize = maxEntrySize - r := newRaft(cfg) - n := newNode() - go n.run(r) - defer n.Stop() - n.Campaign(context.TODO()) - - rd := readyWithTimeout(&n) - if len(rd.CommittedEntries) != 1 { - t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries)) - } - s.Append(rd.Entries) - n.Advance() - - // Simulate a network partition while we make our proposals by never - // committing anything. These proposals should not cause the leader's - // log to grow indefinitely. - for i := 0; i < 1024; i++ { - n.Propose(context.TODO(), data) - } - - // Check the size of leader's uncommitted log tail. It should not exceed the - // MaxUncommittedEntriesSize limit. - checkUncommitted := func(exp uint64) { - t.Helper() - if a := r.uncommittedSize; exp != a { - t.Fatalf("expected %d uncommitted entry bytes, found %d", exp, a) - } - } - checkUncommitted(maxEntrySize) - - // Recover from the partition. The uncommitted tail of the Raft log should - // disappear as entries are committed. - rd = readyWithTimeout(&n) - if len(rd.CommittedEntries) != maxEntries { - t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.CommittedEntries)) - } - s.Append(rd.Entries) - n.Advance() - checkUncommitted(0) -} diff --git a/raft/raft_test.go b/raft/raft_test.go index fc27fee11..1e2d0e2af 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -4310,3 +4310,14 @@ func newTestLearnerRaft(id uint64, peers []uint64, learners []uint64, election, cfg.learners = learners return newRaft(cfg) } + +// newTestRawNode sets up a RawNode with the given peers. The configuration will +// not be reflected in the Storage. +func newTestRawNode(id uint64, peers []uint64, election, heartbeat int, storage Storage) *RawNode { + cfg := newTestConfig(id, peers, election, heartbeat, storage) + rn, err := NewRawNode(cfg) + if err != nil { + panic(err) + } + return rn +} diff --git a/raft/rawnode.go b/raft/rawnode.go index c2bc7c132..b7e534346 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -37,40 +37,14 @@ type RawNode struct { prevHardSt pb.HardState } -func (rn *RawNode) newReady() Ready { - return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt) -} - -func (rn *RawNode) commitReady(rd Ready) { - if rd.SoftState != nil { - rn.prevSoftSt = rd.SoftState - } - if !IsEmptyHardState(rd.HardState) { - rn.prevHardSt = rd.HardState - } - - // If entries were applied (or a snapshot), update our cursor for - // the next Ready. Note that if the current HardState contains a - // new Commit index, this does not mean that we're also applying - // all of the new entries due to commit pagination by size. - if index := rd.appliedCursor(); index > 0 { - rn.raft.raftLog.appliedTo(index) - } - - if len(rd.Entries) > 0 { - e := rd.Entries[len(rd.Entries)-1] - rn.raft.raftLog.stableTo(e.Index, e.Term) - } - if !IsEmptySnap(rd.Snapshot) { - rn.raft.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index) - } - if len(rd.ReadStates) != 0 { - rn.raft.readStates = nil - } -} - -// NewRawNode returns a new RawNode given configuration and a list of raft peers. -func NewRawNode(config *Config, peers []Peer) (*RawNode, error) { +// NewRawNode instantiates a RawNode from the given configuration. +// +// See Bootstrap() for bootstrapping an initial state; this replaces the former +// 'peers' argument to this method (with identical behavior). However, It is +// recommended that instead of calling Bootstrap, applications bootstrap their +// state manually by setting up a Storage that has a first index > 1 and which +// stores the desired ConfState as its InitialState. +func NewRawNode(config *Config) (*RawNode, error) { if config.ID == 0 { panic("config.ID must not be zero") } @@ -78,41 +52,8 @@ func NewRawNode(config *Config, peers []Peer) (*RawNode, error) { rn := &RawNode{ raft: r, } - lastIndex, err := config.Storage.LastIndex() - if err != nil { - panic(err) // TODO(bdarnell) - } - // If the log is empty, this is a new RawNode (like StartNode); otherwise it's - // restoring an existing RawNode (like RestartNode). - // TODO(bdarnell): rethink RawNode initialization and whether the application needs - // to be able to tell us when it expects the RawNode to exist. - if lastIndex == 0 { - r.becomeFollower(1, None) - ents := make([]pb.Entry, len(peers)) - for i, peer := range peers { - cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} - data, err := cc.Marshal() - if err != nil { - panic("unexpected marshal error") - } - - ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data} - } - r.raftLog.append(ents...) - r.raftLog.committed = uint64(len(ents)) - for _, peer := range peers { - r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}) - } - } - - // Set the initial hard and soft states after performing all initialization. rn.prevSoftSt = r.softState() - if lastIndex == 0 { - rn.prevHardSt = emptyState - } else { - rn.prevHardSt = r.hardState() - } - + rn.prevHardSt = r.hardState() return rn, nil } @@ -182,14 +123,61 @@ func (rn *RawNode) Step(m pb.Message) error { return ErrStepPeerNotFound } -// Ready returns the current point-in-time state of this RawNode. +// Ready returns the outstanding work that the application needs to handle. This +// includes appending and applying entries or a snapshot, updating the HardState, +// and sending messages. Ready() is a read-only operation, that is, it does not +// require the caller to actually handle the result. Typically, a caller will +// want to handle the Ready and must pass the Ready to Advance *after* having +// done so. While a Ready is being handled, the RawNode must not be used for +// operations that may alter its state. For example, it is illegal to call +// Ready, followed by Step, followed by Advance. func (rn *RawNode) Ready() Ready { rd := rn.newReady() - rn.raft.msgs = nil - rn.raft.reduceUncommittedSize(rd.CommittedEntries) return rd } +func (rn *RawNode) newReady() Ready { + return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt) +} + +// acceptReady is called when the consumer of the RawNode has decided to go +// ahead and handle a Ready. Nothing must alter the state of the RawNode between +// this call and the prior call to Ready(). +func (rn *RawNode) acceptReady(rd Ready) { + if rd.SoftState != nil { + rn.prevSoftSt = rd.SoftState + } + if len(rd.ReadStates) != 0 { + rn.raft.readStates = nil + } + rn.raft.msgs = nil +} + +// commitReady is called when the consumer of the RawNode has successfully +// handled a Ready (having previously called acceptReady). +func (rn *RawNode) commitReady(rd Ready) { + if !IsEmptyHardState(rd.HardState) { + rn.prevHardSt = rd.HardState + } + + // If entries were applied (or a snapshot), update our cursor for + // the next Ready. Note that if the current HardState contains a + // new Commit index, this does not mean that we're also applying + // all of the new entries due to commit pagination by size. + if index := rd.appliedCursor(); index > 0 { + rn.raft.raftLog.appliedTo(index) + } + rn.raft.reduceUncommittedSize(rd.CommittedEntries) + + if len(rd.Entries) > 0 { + e := rd.Entries[len(rd.Entries)-1] + rn.raft.raftLog.stableTo(e.Index, e.Term) + } + if !IsEmptySnap(rd.Snapshot) { + rn.raft.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index) + } +} + // HasReady called when RawNode user need to check if any Ready pending. // Checking logic in this method should be consistent with Ready.containsUpdates(). func (rn *RawNode) HasReady() bool { @@ -215,6 +203,11 @@ func (rn *RawNode) HasReady() bool { // Advance notifies the RawNode that the application has applied and saved progress in the // last Ready results. func (rn *RawNode) Advance(rd Ready) { + // Advance combines accept and commit. Callers can't mutate the RawNode + // between the call to Ready and the matching call to Advance, or the work + // done in acceptReady will clobber potentially newer data that has not been + // emitted in a Ready yet. + rn.acceptReady(rd) rn.commitReady(rd) } diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 3a6c392cc..373863876 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -18,11 +18,12 @@ import ( "bytes" "context" "fmt" + "math" "reflect" "testing" "go.etcd.io/etcd/raft/quorum" - "go.etcd.io/etcd/raft/raftpb" + pb "go.etcd.io/etcd/raft/raftpb" "go.etcd.io/etcd/raft/tracker" ) @@ -61,28 +62,43 @@ func (a *rawNodeAdapter) ReadIndex(_ context.Context, rctx []byte) error { // RawNode swallowed the error in ReadIndex, it probably should not do that. return nil } -func (a *rawNodeAdapter) Step(_ context.Context, m raftpb.Message) error { return a.RawNode.Step(m) } -func (a *rawNodeAdapter) Propose(_ context.Context, data []byte) error { return a.RawNode.Propose(data) } -func (a *rawNodeAdapter) ProposeConfChange(_ context.Context, cc raftpb.ConfChange) error { +func (a *rawNodeAdapter) Step(_ context.Context, m pb.Message) error { return a.RawNode.Step(m) } +func (a *rawNodeAdapter) Propose(_ context.Context, data []byte) error { return a.RawNode.Propose(data) } +func (a *rawNodeAdapter) ProposeConfChange(_ context.Context, cc pb.ConfChange) error { return a.RawNode.ProposeConfChange(cc) } // TestRawNodeStep ensures that RawNode.Step ignore local message. func TestRawNodeStep(t *testing.T) { - for i, msgn := range raftpb.MessageType_name { - s := NewMemoryStorage() - rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}}) - if err != nil { - t.Fatal(err) - } - msgt := raftpb.MessageType(i) - err = rawNode.Step(raftpb.Message{Type: msgt}) - // LocalMsg should be ignored. - if IsLocalMsg(msgt) { - if err != ErrStepLocalMsg { - t.Errorf("%d: step should ignore %s", msgt, msgn) + for i, msgn := range pb.MessageType_name { + t.Run(msgn, func(t *testing.T) { + s := NewMemoryStorage() + s.SetHardState(pb.HardState{Term: 1, Commit: 1}) + s.Append([]pb.Entry{{Term: 1, Index: 1}}) + if err := s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{ + ConfState: pb.ConfState{ + Nodes: []uint64{1}, + }, + Index: 1, + Term: 1, + }}); err != nil { + t.Fatal(err) } - } + // Append an empty entry to make sure the non-local messages (like + // vote requests) are ignored and don't trigger assertions. + rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s)) + if err != nil { + t.Fatal(err) + } + msgt := pb.MessageType(i) + err = rawNode.Step(pb.Message{Type: msgt}) + // LocalMsg should be ignored. + if IsLocalMsg(msgt) { + if err != ErrStepLocalMsg { + t.Errorf("%d: step should ignore %s", msgt, msgn) + } + } + }) } } @@ -94,17 +110,10 @@ func TestRawNodeStep(t *testing.T) { func TestRawNodeProposeAndConfChange(t *testing.T) { s := NewMemoryStorage() var err error - rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}}) + rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s)) if err != nil { t.Fatal(err) } - rd := rawNode.Ready() - s.Append(rd.Entries) - rawNode.Advance(rd) - - if d := rawNode.Ready(); d.MustSync || !IsEmptyHardState(d.HardState) || len(d.Entries) > 0 { - t.Fatalf("expected empty hard state with must-sync=false: %#v", d) - } rawNode.Campaign() proposed := false @@ -113,13 +122,15 @@ func TestRawNodeProposeAndConfChange(t *testing.T) { ccdata []byte ) for { - rd = rawNode.Ready() + rd := rawNode.Ready() s.Append(rd.Entries) + rawNode.Advance(rd) // Once we are the leader, propose a command and a ConfChange. if !proposed && rd.SoftState.Lead == rawNode.raft.id { - rawNode.Propose([]byte("somedata")) - - cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1} + if err = rawNode.Propose([]byte("somedata")); err != nil { + t.Fatal(err) + } + cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 1} ccdata, err = cc.Marshal() if err != nil { t.Fatal(err) @@ -127,16 +138,13 @@ func TestRawNodeProposeAndConfChange(t *testing.T) { rawNode.ProposeConfChange(cc) proposed = true - } - rawNode.Advance(rd) - - // Exit when we have four entries: one ConfChange, one no-op for the election, - // our proposed command and proposed ConfChange. - lastIndex, err = s.LastIndex() - if err != nil { - t.Fatal(err) - } - if lastIndex >= 4 { + } else if proposed { + // We proposed last cycle, which means we appended the conf change + // in this cycle. + lastIndex, err = s.LastIndex() + if err != nil { + t.Fatal(err) + } break } } @@ -151,8 +159,8 @@ func TestRawNodeProposeAndConfChange(t *testing.T) { if !bytes.Equal(entries[0].Data, []byte("somedata")) { t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata")) } - if entries[1].Type != raftpb.EntryConfChange { - t.Fatalf("type = %v, want %v", entries[1].Type, raftpb.EntryConfChange) + if entries[1].Type != pb.EntryConfChange { + t.Fatalf("type = %v, want %v", entries[1].Type, pb.EntryConfChange) } if !bytes.Equal(entries[1].Data, ccdata) { t.Errorf("data = %v, want %v", entries[1].Data, ccdata) @@ -163,7 +171,7 @@ func TestRawNodeProposeAndConfChange(t *testing.T) { // not affect the later propose to add new node. func TestRawNodeProposeAddDuplicateNode(t *testing.T) { s := NewMemoryStorage() - rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}}) + rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s)) if err != nil { t.Fatal(err) } @@ -182,13 +190,13 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) { rawNode.Advance(rd) } - proposeConfChangeAndApply := func(cc raftpb.ConfChange) { + proposeConfChangeAndApply := func(cc pb.ConfChange) { rawNode.ProposeConfChange(cc) rd = rawNode.Ready() s.Append(rd.Entries) for _, entry := range rd.CommittedEntries { - if entry.Type == raftpb.EntryConfChange { - var cc raftpb.ConfChange + if entry.Type == pb.EntryConfChange { + var cc pb.ConfChange cc.Unmarshal(entry.Data) rawNode.ApplyConfChange(cc) } @@ -196,7 +204,7 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) { rawNode.Advance(rd) } - cc1 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1} + cc1 := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 1} ccdata1, err := cc1.Marshal() if err != nil { t.Fatal(err) @@ -207,7 +215,7 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) { proposeConfChangeAndApply(cc1) // the new node join should be ok - cc2 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2} + cc2 := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 2} ccdata2, err := cc2.Marshal() if err != nil { t.Fatal(err) @@ -238,16 +246,16 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) { // TestRawNodeReadIndex ensures that Rawnode.ReadIndex sends the MsgReadIndex message // to the underlying raft. It also ensures that ReadState can be read out. func TestRawNodeReadIndex(t *testing.T) { - msgs := []raftpb.Message{} - appendStep := func(r *raft, m raftpb.Message) error { + msgs := []pb.Message{} + appendStep := func(r *raft, m pb.Message) error { msgs = append(msgs, m) return nil } wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}} s := NewMemoryStorage() - c := newTestConfig(1, nil, 10, 1, s) - rawNode, err := NewRawNode(c, []Peer{{ID: 1}}) + c := newTestConfig(1, []uint64{1}, 10, 1, s) + rawNode, err := NewRawNode(c) if err != nil { t.Fatal(err) } @@ -288,8 +296,8 @@ func TestRawNodeReadIndex(t *testing.T) { if len(msgs) != 1 { t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1) } - if msgs[0].Type != raftpb.MsgReadIndex { - t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgReadIndex) + if msgs[0].Type != pb.MsgReadIndex { + t.Errorf("msg type = %d, want %d", msgs[0].Type, pb.MsgReadIndex) } if !bytes.Equal(msgs[0].Entries[0].Data, wrequestCtx) { t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, wrequestCtx) @@ -305,61 +313,108 @@ func TestRawNodeReadIndex(t *testing.T) { // TestNodeStop from node_test.go has no equivalent in rawNode because there is // no goroutine in RawNode. -// TestRawNodeStart ensures that a node can be started correctly. The node should -// start with correct configuration change entries, and can accept and commit -// proposals. +// TestRawNodeStart ensures that a node can be started correctly. Note that RawNode +// requires the application to bootstrap the state, i.e. it does not accept peers +// and will not create faux configuration change entries. func TestRawNodeStart(t *testing.T) { - cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1} - ccdata, err := cc.Marshal() - if err != nil { - t.Fatalf("unexpected marshal error: %v", err) - } - wants := []Ready{ - { - HardState: raftpb.HardState{Term: 1, Commit: 1, Vote: 0}, - Entries: []raftpb.Entry{ - {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata}, - }, - CommittedEntries: []raftpb.Entry{ - {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata}, - }, - MustSync: true, + want := Ready{ + SoftState: &SoftState{Lead: 1, RaftState: StateLeader}, + HardState: pb.HardState{Term: 1, Commit: 3, Vote: 1}, + Entries: []pb.Entry{ + {Term: 1, Index: 2, Data: nil}, // empty entry + {Term: 1, Index: 3, Data: []byte("foo")}, // empty entry }, - { - HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1}, - Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}}, - CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}}, - MustSync: true, + CommittedEntries: []pb.Entry{ + {Term: 1, Index: 2, Data: nil}, // empty entry + {Term: 1, Index: 3, Data: []byte("foo")}, // empty entry }, + MustSync: true, } storage := NewMemoryStorage() - rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage), []Peer{{ID: 1}}) + storage.ents[0].Index = 1 + + // TODO(tbg): this is a first prototype of what bootstrapping could look + // like (without the annoying faux ConfChanges). We want to persist a + // ConfState at some index and make sure that this index can't be reached + // from log position 1, so that followers are forced to pick up the + // ConfState in order to move away from log position 1 (unless they got + // bootstrapped in the same way already). Failing to do so would mean that + // followers diverge from the bootstrapped nodes and don't learn about the + // initial config. + // + // NB: this is exactly what CockroachDB does. The Raft log really begins at + // index 10, so empty followers (at index 1) always need a snapshot first. + type appenderStorage interface { + Storage + ApplySnapshot(pb.Snapshot) error + } + bootstrap := func(storage appenderStorage, cs pb.ConfState) error { + if len(cs.Nodes) == 0 { + return fmt.Errorf("no voters specified") + } + fi, err := storage.FirstIndex() + if err != nil { + return err + } + if fi < 2 { + return fmt.Errorf("FirstIndex >= 2 is prerequisite for bootstrap") + } + if _, err = storage.Entries(fi, fi, math.MaxUint64); err == nil { + // TODO(tbg): match exact error + return fmt.Errorf("should not have been able to load first index") + } + li, err := storage.LastIndex() + if err != nil { + return err + } + if _, err = storage.Entries(li, li, math.MaxUint64); err == nil { + return fmt.Errorf("should not have been able to load last index") + } + hs, ics, err := storage.InitialState() + if err != nil { + return err + } + if !IsEmptyHardState(hs) { + return fmt.Errorf("HardState not empty") + } + if len(ics.Nodes) != 0 { + return fmt.Errorf("ConfState not empty") + } + + meta := pb.SnapshotMetadata{ + Index: 1, + Term: 0, + ConfState: cs, + } + snap := pb.Snapshot{Metadata: meta} + return storage.ApplySnapshot(snap) + } + + if err := bootstrap(storage, pb.ConfState{Nodes: []uint64{1}}); err != nil { + t.Fatal(err) + } + + rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage)) if err != nil { t.Fatal(err) } - rd := rawNode.Ready() - t.Logf("rd %v", rd) - if !reflect.DeepEqual(rd, wants[0]) { - t.Fatalf("#%d: g = %+v,\n w %+v", 1, rd, wants[0]) - } else { - storage.Append(rd.Entries) - rawNode.Advance(rd) + if rawNode.HasReady() { + t.Fatalf("unexpected ready: %+v", rawNode.Ready()) } - storage.Append(rd.Entries) - rawNode.Advance(rd) - rawNode.Campaign() - rd = rawNode.Ready() + rawNode.Propose([]byte("foo")) + if !rawNode.HasReady() { + t.Fatal("expected a Ready") + } + rd := rawNode.Ready() storage.Append(rd.Entries) rawNode.Advance(rd) - rawNode.Propose([]byte("foo")) - if rd = rawNode.Ready(); !reflect.DeepEqual(rd, wants[1]) { - t.Errorf("#%d: g = %+v,\n w %+v", 2, rd, wants[1]) - } else { - storage.Append(rd.Entries) - rawNode.Advance(rd) + rd.SoftState, want.SoftState = nil, nil + + if !reflect.DeepEqual(rd, want) { + t.Fatalf("unexpected Ready:\n%+v\nvs\n%+v", rd, want) } if rawNode.HasReady() { @@ -368,11 +423,11 @@ func TestRawNodeStart(t *testing.T) { } func TestRawNodeRestart(t *testing.T) { - entries := []raftpb.Entry{ + entries := []pb.Entry{ {Term: 1, Index: 1}, {Term: 1, Index: 2, Data: []byte("foo")}, } - st := raftpb.HardState{Term: 1, Commit: 1} + st := pb.HardState{Term: 1, Commit: 1} want := Ready{ HardState: emptyState, @@ -384,7 +439,7 @@ func TestRawNodeRestart(t *testing.T) { storage := NewMemoryStorage() storage.SetHardState(st) storage.Append(entries) - rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage), nil) + rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, storage)) if err != nil { t.Fatal(err) } @@ -399,17 +454,17 @@ func TestRawNodeRestart(t *testing.T) { } func TestRawNodeRestartFromSnapshot(t *testing.T) { - snap := raftpb.Snapshot{ - Metadata: raftpb.SnapshotMetadata{ - ConfState: raftpb.ConfState{Nodes: []uint64{1, 2}}, + snap := pb.Snapshot{ + Metadata: pb.SnapshotMetadata{ + ConfState: pb.ConfState{Nodes: []uint64{1, 2}}, Index: 2, Term: 1, }, } - entries := []raftpb.Entry{ + entries := []pb.Entry{ {Term: 1, Index: 3, Data: []byte("foo")}, } - st := raftpb.HardState{Term: 1, Commit: 3} + st := pb.HardState{Term: 1, Commit: 3} want := Ready{ HardState: emptyState, @@ -422,7 +477,7 @@ func TestRawNodeRestartFromSnapshot(t *testing.T) { s.SetHardState(st) s.ApplySnapshot(snap) s.Append(entries) - rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), nil) + rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s)) if err != nil { t.Fatal(err) } @@ -441,7 +496,7 @@ func TestRawNodeRestartFromSnapshot(t *testing.T) { func TestRawNodeStatus(t *testing.T) { s := NewMemoryStorage() - rn, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s), nil) + rn, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s)) if err != nil { t.Fatal(err) } @@ -489,20 +544,20 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) { s := &ignoreSizeHintMemStorage{ MemoryStorage: NewMemoryStorage(), } - persistedHardState := raftpb.HardState{ + persistedHardState := pb.HardState{ Term: 1, Vote: 1, Commit: 10, } s.hardState = persistedHardState - s.ents = make([]raftpb.Entry, 10) + s.ents = make([]pb.Entry, 10) var size uint64 for i := range s.ents { - ent := raftpb.Entry{ + ent := pb.Entry{ Term: 1, Index: uint64(i + 1), - Type: raftpb.EntryNormal, + Type: pb.EntryNormal, Data: []byte("a"), } @@ -516,14 +571,14 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) { // this and *will* return it (which is how the Commit index ended up being 10 initially). cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1 - s.ents = append(s.ents, raftpb.Entry{ + s.ents = append(s.ents, pb.Entry{ Term: 1, Index: uint64(11), - Type: raftpb.EntryNormal, + Type: pb.EntryNormal, Data: []byte("boom"), }) - rawNode, err := NewRawNode(cfg, []Peer{{ID: 1}}) + rawNode, err := NewRawNode(cfg) if err != nil { t.Fatal(err) } @@ -539,8 +594,8 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) { } highestApplied = rd.CommittedEntries[n-1].Index rawNode.Advance(rd) - rawNode.Step(raftpb.Message{ - Type: raftpb.MsgHeartbeat, + rawNode.Step(pb.Message{ + Type: pb.MsgHeartbeat, To: 1, From: 1, // illegal, but we get away with it Term: 1, @@ -556,13 +611,13 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) { func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) { const maxEntries = 16 data := []byte("testdata") - testEntry := raftpb.Entry{Data: data} + testEntry := pb.Entry{Data: data} maxEntrySize := uint64(maxEntries * PayloadSize(testEntry)) s := NewMemoryStorage() cfg := newTestConfig(1, []uint64{1}, 10, 1, s) cfg.MaxUncommittedEntriesSize = maxEntrySize - rawNode, err := NewRawNode(cfg, []Peer{{ID: 1}}) + rawNode, err := NewRawNode(cfg) if err != nil { t.Fatal(err) }