raft: move "RawNode", clarify tick miss

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
release-3.4
Gyuho Lee 2019-07-24 23:35:36 -07:00
parent 8f000c755b
commit c7c9428f6b
3 changed files with 40 additions and 42 deletions

View File

@ -221,10 +221,9 @@ func StartNode(c *Config, peers []Peer) Node {
}
rn.Bootstrap(peers)
n := newNode()
n.logger = c.Logger
n := newNode(rn)
go n.run(rn)
go n.run()
return &n
}
@ -237,9 +236,8 @@ func RestartNode(c *Config) Node {
if err != nil {
panic(err)
}
n := newNode()
n.logger = c.Logger
go n.run(rn)
n := newNode(rn)
go n.run()
return &n
}
@ -261,10 +259,10 @@ type node struct {
stop chan struct{}
status chan chan Status
logger Logger
rn *RawNode
}
func newNode() node {
func newNode(rn *RawNode) node {
return node{
propc: make(chan msgWithResult),
recvc: make(chan pb.Message),
@ -279,6 +277,7 @@ func newNode() node {
done: make(chan struct{}),
stop: make(chan struct{}),
status: make(chan chan Status),
rn: rn,
}
}
@ -294,20 +293,20 @@ func (n *node) Stop() {
<-n.done
}
func (n *node) run(rn *RawNode) {
func (n *node) run() {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var rd Ready
r := rn.raft
r := n.rn.raft
lead := None
for {
if advancec != nil {
readyc = nil
} else if rn.HasReady() {
} else if n.rn.HasReady() {
// Populate a Ready. Note that this Ready is not guaranteed to
// actually be handled. We will arm readyc, but there's no guarantee
// that we will actually send on it. It's possible that we will
@ -316,7 +315,7 @@ func (n *node) run(rn *RawNode) {
// handled first, but it's generally good to emit larger Readys plus
// it simplifies testing (by emitting less frequently and more
// predictably).
rd = rn.readyWithoutAccept()
rd = n.rn.readyWithoutAccept()
readyc = n.readyc
}
@ -382,12 +381,12 @@ func (n *node) run(rn *RawNode) {
case <-n.done:
}
case <-n.tickc:
rn.Tick()
n.rn.Tick()
case readyc <- rd:
rn.acceptReady(rd)
n.rn.acceptReady(rd)
advancec = n.advancec
case <-advancec:
rn.Advance(rd)
n.rn.Advance(rd)
rd = Ready{}
advancec = nil
case c := <-n.status:
@ -406,7 +405,7 @@ func (n *node) Tick() {
case n.tickc <- struct{}{}:
case <-n.done:
default:
n.logger.Warningf("A tick missed to fire. Node blocks too long!")
n.rn.raft.logger.Warningf("%x (leader %v) A tick missed to fire. Node blocks too long!", n.rn.raft.id, n.rn.raft.id == n.rn.raft.lead)
}
}

View File

@ -24,10 +24,10 @@ func BenchmarkOneNode(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
n := newNode()
s := NewMemoryStorage()
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
go n.run(rn)
n := newNode(rn)
go n.run()
defer n.Stop()

View File

@ -130,11 +130,11 @@ func TestNodePropose(t *testing.T) {
return nil
}
n := newNode()
s := NewMemoryStorage()
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
n := newNode(rn)
r := rn.raft
go n.run(rn)
go n.run()
if err := n.Campaign(context.TODO()); err != nil {
t.Fatal(err)
}
@ -173,13 +173,13 @@ func TestNodeReadIndex(t *testing.T) {
}
wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}
n := newNode()
s := NewMemoryStorage()
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
n := newNode(rn)
r := rn.raft
r.readStates = wrs
go n.run(rn)
go n.run()
n.Campaign(context.TODO())
for {
rd := <-n.Ready()
@ -311,11 +311,11 @@ func TestNodeProposeConfig(t *testing.T) {
return nil
}
n := newNode()
s := NewMemoryStorage()
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
n := newNode(rn)
r := rn.raft
go n.run(rn)
go n.run()
n.Campaign(context.TODO())
for {
rd := <-n.Ready()
@ -350,10 +350,10 @@ func TestNodeProposeConfig(t *testing.T) {
// TestNodeProposeAddDuplicateNode ensures that two proposes to add the same node should
// not affect the later propose to add new node.
func TestNodeProposeAddDuplicateNode(t *testing.T) {
n := newNode()
s := NewMemoryStorage()
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
go n.run(rn)
n := newNode(rn)
go n.run()
n.Campaign(context.TODO())
rdyEntries := make([]raftpb.Entry, 0)
ticker := time.NewTicker(time.Millisecond * 100)
@ -426,9 +426,9 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) {
// know who is the current leader; node will accept proposal when it knows
// who is the current leader.
func TestBlockProposal(t *testing.T) {
n := newNode()
rn := newTestRawNode(1, []uint64{1}, 10, 1, NewMemoryStorage())
go n.run(rn)
n := newNode(rn)
go n.run()
defer n.Stop()
errc := make(chan error, 1)
@ -466,11 +466,11 @@ func TestNodeProposeWaitDropped(t *testing.T) {
return nil
}
n := newNode()
s := NewMemoryStorage()
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
n := newNode(rn)
r := rn.raft
go n.run(rn)
go n.run()
n.Campaign(context.TODO())
for {
rd := <-n.Ready()
@ -501,11 +501,11 @@ func TestNodeProposeWaitDropped(t *testing.T) {
// TestNodeTick ensures that node.Tick() will increase the
// elapsed of the underlying raft state machine.
func TestNodeTick(t *testing.T) {
n := newNode()
s := NewMemoryStorage()
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
n := newNode(rn)
r := rn.raft
go n.run(rn)
go n.run()
elapsed := r.electionElapsed
n.Tick()
@ -522,13 +522,12 @@ func TestNodeTick(t *testing.T) {
// TestNodeStop ensures that node.Stop() blocks until the node has stopped
// processing, and that it is idempotent
func TestNodeStop(t *testing.T) {
n := newNode()
s := NewMemoryStorage()
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
rn := newTestRawNode(1, []uint64{1}, 10, 1, NewMemoryStorage())
n := newNode(rn)
donec := make(chan struct{})
go func() {
n.run(rn)
n.run()
close(donec)
}()
@ -813,10 +812,10 @@ func TestIsHardStateEqual(t *testing.T) {
func TestNodeProposeAddLearnerNode(t *testing.T) {
ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()
n := newNode()
s := NewMemoryStorage()
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
go n.run(rn)
n := newNode(rn)
go n.run()
n.Campaign(context.TODO())
stop := make(chan struct{})
done := make(chan struct{})
@ -914,8 +913,8 @@ func TestCommitPagination(t *testing.T) {
if err != nil {
t.Fatal(err)
}
n := newNode()
go n.run(rn)
n := newNode(rn)
go n.run()
n.Campaign(context.TODO())
rd := readyWithTimeout(&n)
@ -1006,8 +1005,8 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) {
if err != nil {
t.Fatal(err)
}
n := newNode()
go n.run(rn)
n := newNode(rn)
go n.run()
defer n.Stop()
rd := readyWithTimeout(&n)