raft: use RawNode for node's event loop

It has always bugged me that any new feature essentially needed to be
tested twice due to the two ways in which apps can use raft (`*node` and
`*RawNode`). Due to upcoming testing work for joint consensus, now is a
good time to rectify this somewhat.

This commit removes most logic from `(*node).run` and uses `*RawNode`
internally. This simplifies the logic and also lead (via debugging) to
some insight on how the semantics of the approaches differ, which is now
documented in the comments.
release-3.4
Tobias Schottdorf 2019-07-15 17:18:56 +02:00
parent 233be58056
commit c62b7048b5
5 changed files with 173 additions and 167 deletions

View File

@ -198,51 +198,15 @@ type Peer struct {
// StartNode returns a new Node given configuration and a list of raft peers.
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
func StartNode(c *Config, peers []Peer) Node {
r := newRaft(c)
// become the follower at term 1 and apply initial configuration
// entries of term 1
r.becomeFollower(1, None)
for _, peer := range peers {
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
d, err := cc.Marshal()
if err != nil {
panic("unexpected marshal error")
}
// TODO(tbg): this should append the ConfChange for the own node first
// and also call applyConfChange below for that node first. Otherwise
// we have a Raft group (for a little while) that doesn't have itself
// in its config, which is bad.
// This whole way of setting things up is rickety. The app should just
// populate the initial ConfState appropriately and then all of this
// goes away.
e := pb.Entry{
Type: pb.EntryConfChange,
Term: 1,
Index: r.raftLog.lastIndex() + 1,
Data: d,
}
r.raftLog.append(e)
}
// Mark these initial entries as committed.
// TODO(bdarnell): These entries are still unstable; do we need to preserve
// the invariant that committed < unstable?
r.raftLog.committed = r.raftLog.lastIndex()
// Now apply them, mainly so that the application can call Campaign
// immediately after StartNode in tests. Note that these nodes will
// be added to raft twice: here and when the application's Ready
// loop calls ApplyConfChange. The calls to addNode must come after
// all calls to raftLog.append so progress.next is set after these
// bootstrapping entries (it is an error if we try to append these
// entries since they have already been committed).
// We do not set raftLog.applied so the application will be able
// to observe all conf changes via Ready.CommittedEntries.
for _, peer := range peers {
r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
rn, err := NewRawNode(c, peers)
if err != nil {
panic(err)
}
n := newNode()
n.logger = c.Logger
go n.run(r)
go n.run(rn)
return &n
}
@ -251,12 +215,7 @@ func StartNode(c *Config, peers []Peer) Node {
// If the caller has an existing state machine, pass in the last log index that
// has been applied to it; otherwise use zero.
func RestartNode(c *Config) Node {
r := newRaft(c)
n := newNode()
n.logger = c.Logger
go n.run(r)
return &n
return StartNode(c, nil)
}
type msgWithResult struct {
@ -310,30 +269,30 @@ func (n *node) Stop() {
<-n.done
}
func (n *node) run(r *raft) {
func (n *node) run(rn *RawNode) {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var prevLastUnstablei, prevLastUnstablet uint64
var havePrevLastUnstablei bool
var prevSnapi uint64
var applyingToI uint64
var rd Ready
r := rn.raft
lead := None
prevSoftSt := r.softState()
prevHardSt := emptyState
for {
if advancec != nil {
readyc = nil
} else {
rd = newReady(r, prevSoftSt, prevHardSt)
if rd.containsUpdates() {
readyc = n.readyc
} else {
readyc = nil
}
} else if rn.HasReady() {
// Populate a Ready. Note that this Ready is not guaranteed to
// actually be handled. We will arm readyc, but there's no guarantee
// that we will actually send on it. It's possible that we will
// service another channel instead, loop around, and then populate
// the Ready again. We could instead force the previous Ready to be
// handled first, but it's generally good to emit larger Readys plus
// it simplifies testing (by emitting less frequently and more
// predictably).
rd = rn.Ready()
readyc = n.readyc
}
if lead != r.lead {
@ -382,40 +341,13 @@ func (n *node) run(r *raft) {
case <-n.done:
}
case <-n.tickc:
r.tick()
rn.Tick()
case readyc <- rd:
if rd.SoftState != nil {
prevSoftSt = rd.SoftState
}
if len(rd.Entries) > 0 {
prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
havePrevLastUnstablei = true
}
if !IsEmptyHardState(rd.HardState) {
prevHardSt = rd.HardState
}
if !IsEmptySnap(rd.Snapshot) {
prevSnapi = rd.Snapshot.Metadata.Index
}
if index := rd.appliedCursor(); index != 0 {
applyingToI = index
}
r.msgs = nil
r.readStates = nil
r.reduceUncommittedSize(rd.CommittedEntries)
rn.acceptReady(rd)
advancec = n.advancec
case <-advancec:
if applyingToI != 0 {
r.raftLog.appliedTo(applyingToI)
applyingToI = 0
}
if havePrevLastUnstablei {
r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
havePrevLastUnstablei = false
}
r.raftLog.stableSnapTo(prevSnapi)
rn.commitReady(rd)
rd = Ready{}
advancec = nil
case c := <-n.status:
c <- getStatus(r)

View File

@ -26,8 +26,8 @@ func BenchmarkOneNode(b *testing.B) {
n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
go n.run(r)
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
go n.run(rn)
defer n.Stop()

View File

@ -132,9 +132,12 @@ func TestNodePropose(t *testing.T) {
n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
go n.run(r)
n.Campaign(context.TODO())
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
r := rn.raft
go n.run(rn)
if err := n.Campaign(context.TODO()); err != nil {
t.Fatal(err)
}
for {
rd := <-n.Ready()
s.Append(rd.Entries)
@ -172,10 +175,11 @@ func TestNodeReadIndex(t *testing.T) {
n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
r := rn.raft
r.readStates = wrs
go n.run(r)
go n.run(rn)
n.Campaign(context.TODO())
for {
rd := <-n.Ready()
@ -309,8 +313,9 @@ func TestNodeProposeConfig(t *testing.T) {
n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
go n.run(r)
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
r := rn.raft
go n.run(rn)
n.Campaign(context.TODO())
for {
rd := <-n.Ready()
@ -347,8 +352,8 @@ func TestNodeProposeConfig(t *testing.T) {
func TestNodeProposeAddDuplicateNode(t *testing.T) {
n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
go n.run(r)
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
go n.run(rn)
n.Campaign(context.TODO())
rdyEntries := make([]raftpb.Entry, 0)
ticker := time.NewTicker(time.Millisecond * 100)
@ -422,8 +427,8 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) {
// who is the current leader.
func TestBlockProposal(t *testing.T) {
n := newNode()
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
go n.run(r)
rn := newTestRawNode(1, []uint64{1}, 10, 1, NewMemoryStorage())
go n.run(rn)
defer n.Stop()
errc := make(chan error, 1)
@ -463,8 +468,9 @@ func TestNodeProposeWaitDropped(t *testing.T) {
n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
go n.run(r)
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
r := rn.raft
go n.run(rn)
n.Campaign(context.TODO())
for {
rd := <-n.Ready()
@ -497,8 +503,9 @@ func TestNodeProposeWaitDropped(t *testing.T) {
func TestNodeTick(t *testing.T) {
n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
go n.run(r)
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
r := rn.raft
go n.run(rn)
elapsed := r.electionElapsed
n.Tick()
@ -517,11 +524,11 @@ func TestNodeTick(t *testing.T) {
func TestNodeStop(t *testing.T) {
n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
donec := make(chan struct{})
go func() {
n.run(r)
n.run(rn)
close(donec)
}()
@ -618,7 +625,9 @@ func TestNodeStart(t *testing.T) {
n.Advance()
}
n.Campaign(ctx)
if err := n.Campaign(ctx); err != nil {
t.Fatal(err)
}
rd := <-n.Ready()
storage.Append(rd.Entries)
n.Advance()
@ -646,10 +655,12 @@ func TestNodeRestart(t *testing.T) {
st := raftpb.HardState{Term: 1, Commit: 1}
want := Ready{
HardState: st,
// No HardState is emitted because there was no change.
HardState: raftpb.HardState{},
// commit up to index commit index in st
CommittedEntries: entries[:st.Commit],
MustSync: true,
// MustSync is false because no HardState or new entries are provided.
MustSync: false,
}
storage := NewMemoryStorage()
@ -691,10 +702,14 @@ func TestNodeRestartFromSnapshot(t *testing.T) {
st := raftpb.HardState{Term: 1, Commit: 3}
want := Ready{
HardState: st,
// No HardState is emitted because nothing changed relative to what is
// already persisted.
HardState: raftpb.HardState{},
// commit up to index commit index in st
CommittedEntries: entries,
MustSync: true,
// MustSync is only true when there is a new HardState or new entries;
// neither is the case here.
MustSync: false,
}
s := NewMemoryStorage()
@ -800,8 +815,8 @@ func TestNodeProposeAddLearnerNode(t *testing.T) {
defer ticker.Stop()
n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
go n.run(r)
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
go n.run(rn)
n.Campaign(context.TODO())
stop := make(chan struct{})
done := make(chan struct{})
@ -895,9 +910,12 @@ func TestCommitPagination(t *testing.T) {
s := NewMemoryStorage()
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
cfg.MaxCommittedSizePerReady = 2048
r := newRaft(cfg)
rn, err := NewRawNode(cfg, nil)
if err != nil {
t.Fatal(err)
}
n := newNode()
go n.run(r)
go n.run(rn)
n.Campaign(context.TODO())
rd := readyWithTimeout(&n)
@ -984,9 +1002,12 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) {
// this and *will* return it (which is how the Commit index ended up being 10 initially).
cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1
r := newRaft(cfg)
rn, err := NewRawNode(cfg, nil)
if err != nil {
t.Fatal(err)
}
n := newNode()
go n.run(r)
go n.run(rn)
defer n.Stop()
rd := readyWithTimeout(&n)
@ -1011,9 +1032,12 @@ func TestNodeBoundedLogGrowthWithPartition(t *testing.T) {
s := NewMemoryStorage()
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
cfg.MaxUncommittedEntriesSize = maxEntrySize
r := newRaft(cfg)
rn, err := NewRawNode(cfg, nil)
if err != nil {
t.Fatal(err)
}
n := newNode()
go n.run(r)
go n.run(rn)
defer n.Stop()
n.Campaign(context.TODO())
@ -1028,14 +1052,14 @@ func TestNodeBoundedLogGrowthWithPartition(t *testing.T) {
// committing anything. These proposals should not cause the leader's
// log to grow indefinitely.
for i := 0; i < 1024; i++ {
n.Propose(context.TODO(), data)
_ = n.Propose(context.TODO(), data)
}
// Check the size of leader's uncommitted log tail. It should not exceed the
// MaxUncommittedEntriesSize limit.
checkUncommitted := func(exp uint64) {
t.Helper()
if a := r.uncommittedSize; exp != a {
if a := rn.raft.uncommittedSize; exp != a {
t.Fatalf("expected %d uncommitted entry bytes, found %d", exp, a)
}
}

View File

@ -4310,3 +4310,12 @@ func newTestLearnerRaft(id uint64, peers []uint64, learners []uint64, election,
cfg.learners = learners
return newRaft(cfg)
}
func newTestRawNode(id uint64, peers []uint64, election, heartbeat int, storage Storage) *RawNode {
cfg := newTestConfig(id, peers, election, heartbeat, storage)
rn, err := NewRawNode(cfg, nil)
if err != nil {
panic(err)
}
return rn
}

View File

@ -37,38 +37,6 @@ type RawNode struct {
prevHardSt pb.HardState
}
func (rn *RawNode) newReady() Ready {
return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt)
}
func (rn *RawNode) commitReady(rd Ready) {
if rd.SoftState != nil {
rn.prevSoftSt = rd.SoftState
}
if !IsEmptyHardState(rd.HardState) {
rn.prevHardSt = rd.HardState
}
// If entries were applied (or a snapshot), update our cursor for
// the next Ready. Note that if the current HardState contains a
// new Commit index, this does not mean that we're also applying
// all of the new entries due to commit pagination by size.
if index := rd.appliedCursor(); index > 0 {
rn.raft.raftLog.appliedTo(index)
}
if len(rd.Entries) > 0 {
e := rd.Entries[len(rd.Entries)-1]
rn.raft.raftLog.stableTo(e.Index, e.Term)
}
if !IsEmptySnap(rd.Snapshot) {
rn.raft.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
}
if len(rd.ReadStates) != 0 {
rn.raft.readStates = nil
}
}
// NewRawNode returns a new RawNode given configuration and a list of raft peers.
func NewRawNode(config *Config, peers []Peer) (*RawNode, error) {
if config.ID == 0 {
@ -78,27 +46,48 @@ func NewRawNode(config *Config, peers []Peer) (*RawNode, error) {
rn := &RawNode{
raft: r,
}
lastIndex, err := config.Storage.LastIndex()
if err := rn.init(peers); err != nil {
return nil, err
}
return rn, nil
}
func (rn *RawNode) init(peers []Peer) error {
r := rn.raft
lastIndex, err := rn.raft.raftLog.storage.LastIndex()
if err != nil {
panic(err) // TODO(bdarnell)
return err
}
// If the log is empty, this is a new RawNode (like StartNode); otherwise it's
// restoring an existing RawNode (like RestartNode).
// TODO(bdarnell): rethink RawNode initialization and whether the application needs
// to be able to tell us when it expects the RawNode to exist.
if lastIndex == 0 {
r.becomeFollower(1, None)
rn.raft.becomeFollower(1, None)
ents := make([]pb.Entry, len(peers))
for i, peer := range peers {
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
data, err := cc.Marshal()
if err != nil {
panic("unexpected marshal error")
return err
}
ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
}
r.raftLog.append(ents...)
rn.raft.raftLog.append(ents...)
// Now apply them, mainly so that the application can call Campaign
// immediately after StartNode in tests. Note that these nodes will
// be added to raft twice: here and when the application's Ready
// loop calls ApplyConfChange. The calls to addNode must come after
// all calls to raftLog.append so progress.next is set after these
// bootstrapping entries (it is an error if we try to append these
// entries since they have already been committed).
// We do not set raftLog.applied so the application will be able
// to observe all conf changes via Ready.CommittedEntries.
//
// TODO(bdarnell): These entries are still unstable; do we need to preserve
// the invariant that committed < unstable?
r.raftLog.committed = uint64(len(ents))
for _, peer := range peers {
r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
@ -113,7 +102,7 @@ func NewRawNode(config *Config, peers []Peer) (*RawNode, error) {
rn.prevHardSt = r.hardState()
}
return rn, nil
return nil
}
// Tick advances the internal logical clock by a single tick.
@ -182,14 +171,61 @@ func (rn *RawNode) Step(m pb.Message) error {
return ErrStepPeerNotFound
}
// Ready returns the current point-in-time state of this RawNode.
// Ready returns the outstanding work that the application needs to handle. This
// includes appending and applying entries or a snapshot, updating the HardState,
// and sending messages. Ready() is a read-only operation, that is, it does not
// require the caller to actually handle the result. Typically, a caller will
// want to handle the Ready and must pass the Ready to Advance *after* having
// done so. While a Ready is being handled, the RawNode must not be used for
// operations that may alter its state. For example, it is illegal to call
// Ready, followed by Step, followed by Advance.
func (rn *RawNode) Ready() Ready {
rd := rn.newReady()
rn.raft.msgs = nil
rn.raft.reduceUncommittedSize(rd.CommittedEntries)
return rd
}
func (rn *RawNode) newReady() Ready {
return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt)
}
// acceptReady is called when the consumer of the RawNode has decided to go
// ahead and handle a Ready. Nothing must alter the state of the RawNode between
// this call and the prior call to Ready().
func (rn *RawNode) acceptReady(rd Ready) {
if rd.SoftState != nil {
rn.prevSoftSt = rd.SoftState
}
if len(rd.ReadStates) != 0 {
rn.raft.readStates = nil
}
rn.raft.msgs = nil
}
// commitReady is called when the consumer of the RawNode has successfully
// handled a Ready (having previously called acceptReady).
func (rn *RawNode) commitReady(rd Ready) {
if !IsEmptyHardState(rd.HardState) {
rn.prevHardSt = rd.HardState
}
// If entries were applied (or a snapshot), update our cursor for
// the next Ready. Note that if the current HardState contains a
// new Commit index, this does not mean that we're also applying
// all of the new entries due to commit pagination by size.
if index := rd.appliedCursor(); index > 0 {
rn.raft.raftLog.appliedTo(index)
}
rn.raft.reduceUncommittedSize(rd.CommittedEntries)
if len(rd.Entries) > 0 {
e := rd.Entries[len(rd.Entries)-1]
rn.raft.raftLog.stableTo(e.Index, e.Term)
}
if !IsEmptySnap(rd.Snapshot) {
rn.raft.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
}
}
// HasReady called when RawNode user need to check if any Ready pending.
// Checking logic in this method should be consistent with Ready.containsUpdates().
func (rn *RawNode) HasReady() bool {
@ -215,6 +251,11 @@ func (rn *RawNode) HasReady() bool {
// Advance notifies the RawNode that the application has applied and saved progress in the
// last Ready results.
func (rn *RawNode) Advance(rd Ready) {
// Advance combines accept and commit. Callers can't mutate the RawNode
// between the call to Ready and the matching call to Advance, or the work
// done in acceptReady will clobber potentially newer data that has not been
// emitted in a Ready yet.
rn.acceptReady(rd)
rn.commitReady(rd)
}