diff --git a/raft/node.go b/raft/node.go index 3f01cbaa2..ab6185b99 100644 --- a/raft/node.go +++ b/raft/node.go @@ -221,10 +221,9 @@ func StartNode(c *Config, peers []Peer) Node { } rn.Bootstrap(peers) - n := newNode() - n.logger = c.Logger + n := newNode(rn) - go n.run(rn) + go n.run() return &n } @@ -237,9 +236,8 @@ func RestartNode(c *Config) Node { if err != nil { panic(err) } - n := newNode() - n.logger = c.Logger - go n.run(rn) + n := newNode(rn) + go n.run() return &n } @@ -261,10 +259,10 @@ type node struct { stop chan struct{} status chan chan Status - logger Logger + rn *RawNode } -func newNode() node { +func newNode(rn *RawNode) node { return node{ propc: make(chan msgWithResult), recvc: make(chan pb.Message), @@ -279,6 +277,7 @@ func newNode() node { done: make(chan struct{}), stop: make(chan struct{}), status: make(chan chan Status), + rn: rn, } } @@ -294,20 +293,20 @@ func (n *node) Stop() { <-n.done } -func (n *node) run(rn *RawNode) { +func (n *node) run() { var propc chan msgWithResult var readyc chan Ready var advancec chan struct{} var rd Ready - r := rn.raft + r := n.rn.raft lead := None for { if advancec != nil { readyc = nil - } else if rn.HasReady() { + } else if n.rn.HasReady() { // Populate a Ready. Note that this Ready is not guaranteed to // actually be handled. We will arm readyc, but there's no guarantee // that we will actually send on it. It's possible that we will @@ -316,7 +315,7 @@ func (n *node) run(rn *RawNode) { // handled first, but it's generally good to emit larger Readys plus // it simplifies testing (by emitting less frequently and more // predictably). - rd = rn.readyWithoutAccept() + rd = n.rn.readyWithoutAccept() readyc = n.readyc } @@ -382,12 +381,12 @@ func (n *node) run(rn *RawNode) { case <-n.done: } case <-n.tickc: - rn.Tick() + n.rn.Tick() case readyc <- rd: - rn.acceptReady(rd) + n.rn.acceptReady(rd) advancec = n.advancec case <-advancec: - rn.Advance(rd) + n.rn.Advance(rd) rd = Ready{} advancec = nil case c := <-n.status: @@ -406,7 +405,7 @@ func (n *node) Tick() { case n.tickc <- struct{}{}: case <-n.done: default: - n.logger.Warningf("A tick missed to fire. Node blocks too long!") + n.rn.raft.logger.Warningf("%x (leader %v) A tick missed to fire. Node blocks too long!", n.rn.raft.id, n.rn.raft.id == n.rn.raft.lead) } } diff --git a/raft/node_bench_test.go b/raft/node_bench_test.go index d0ecdc519..b499305bb 100644 --- a/raft/node_bench_test.go +++ b/raft/node_bench_test.go @@ -24,10 +24,10 @@ func BenchmarkOneNode(b *testing.B) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - n := newNode() s := NewMemoryStorage() rn := newTestRawNode(1, []uint64{1}, 10, 1, s) - go n.run(rn) + n := newNode(rn) + go n.run() defer n.Stop() diff --git a/raft/node_test.go b/raft/node_test.go index 6127bf397..0a2c1cafc 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -130,11 +130,11 @@ func TestNodePropose(t *testing.T) { return nil } - n := newNode() s := NewMemoryStorage() rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + n := newNode(rn) r := rn.raft - go n.run(rn) + go n.run() if err := n.Campaign(context.TODO()); err != nil { t.Fatal(err) } @@ -173,13 +173,13 @@ func TestNodeReadIndex(t *testing.T) { } wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}} - n := newNode() s := NewMemoryStorage() rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + n := newNode(rn) r := rn.raft r.readStates = wrs - go n.run(rn) + go n.run() n.Campaign(context.TODO()) for { rd := <-n.Ready() @@ -311,11 +311,11 @@ func TestNodeProposeConfig(t *testing.T) { return nil } - n := newNode() s := NewMemoryStorage() rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + n := newNode(rn) r := rn.raft - go n.run(rn) + go n.run() n.Campaign(context.TODO()) for { rd := <-n.Ready() @@ -350,10 +350,10 @@ func TestNodeProposeConfig(t *testing.T) { // TestNodeProposeAddDuplicateNode ensures that two proposes to add the same node should // not affect the later propose to add new node. func TestNodeProposeAddDuplicateNode(t *testing.T) { - n := newNode() s := NewMemoryStorage() rn := newTestRawNode(1, []uint64{1}, 10, 1, s) - go n.run(rn) + n := newNode(rn) + go n.run() n.Campaign(context.TODO()) rdyEntries := make([]raftpb.Entry, 0) ticker := time.NewTicker(time.Millisecond * 100) @@ -426,9 +426,9 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) { // know who is the current leader; node will accept proposal when it knows // who is the current leader. func TestBlockProposal(t *testing.T) { - n := newNode() rn := newTestRawNode(1, []uint64{1}, 10, 1, NewMemoryStorage()) - go n.run(rn) + n := newNode(rn) + go n.run() defer n.Stop() errc := make(chan error, 1) @@ -466,11 +466,11 @@ func TestNodeProposeWaitDropped(t *testing.T) { return nil } - n := newNode() s := NewMemoryStorage() rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + n := newNode(rn) r := rn.raft - go n.run(rn) + go n.run() n.Campaign(context.TODO()) for { rd := <-n.Ready() @@ -501,11 +501,11 @@ func TestNodeProposeWaitDropped(t *testing.T) { // TestNodeTick ensures that node.Tick() will increase the // elapsed of the underlying raft state machine. func TestNodeTick(t *testing.T) { - n := newNode() s := NewMemoryStorage() rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + n := newNode(rn) r := rn.raft - go n.run(rn) + go n.run() elapsed := r.electionElapsed n.Tick() @@ -522,13 +522,12 @@ func TestNodeTick(t *testing.T) { // TestNodeStop ensures that node.Stop() blocks until the node has stopped // processing, and that it is idempotent func TestNodeStop(t *testing.T) { - n := newNode() - s := NewMemoryStorage() - rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + rn := newTestRawNode(1, []uint64{1}, 10, 1, NewMemoryStorage()) + n := newNode(rn) donec := make(chan struct{}) go func() { - n.run(rn) + n.run() close(donec) }() @@ -813,10 +812,10 @@ func TestIsHardStateEqual(t *testing.T) { func TestNodeProposeAddLearnerNode(t *testing.T) { ticker := time.NewTicker(time.Millisecond * 100) defer ticker.Stop() - n := newNode() s := NewMemoryStorage() rn := newTestRawNode(1, []uint64{1}, 10, 1, s) - go n.run(rn) + n := newNode(rn) + go n.run() n.Campaign(context.TODO()) stop := make(chan struct{}) done := make(chan struct{}) @@ -914,8 +913,8 @@ func TestCommitPagination(t *testing.T) { if err != nil { t.Fatal(err) } - n := newNode() - go n.run(rn) + n := newNode(rn) + go n.run() n.Campaign(context.TODO()) rd := readyWithTimeout(&n) @@ -1006,8 +1005,8 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) { if err != nil { t.Fatal(err) } - n := newNode() - go n.run(rn) + n := newNode(rn) + go n.run() defer n.Stop() rd := readyWithTimeout(&n)