raft: block ReadState until state updates
parent
10eb997621
commit
61a413c219
|
@ -9,6 +9,10 @@ type stateResp struct {
|
|||
msgs []Message
|
||||
}
|
||||
|
||||
func (sr stateResp) containsUpdates(prev stateResp) bool {
|
||||
return !prev.state.Equal(sr.state) || len(sr.ents) > 0 || len(sr.cents) > 0 || len(sr.msgs) > 0
|
||||
}
|
||||
|
||||
type Node struct {
|
||||
ctx context.Context
|
||||
propc chan []byte
|
||||
|
@ -36,7 +40,9 @@ func Start(ctx context.Context, name string, election, heartbeat int) *Node {
|
|||
|
||||
func (n *Node) run(r *raft) {
|
||||
propc := n.propc
|
||||
statec := n.statec
|
||||
|
||||
var prev stateResp
|
||||
for {
|
||||
if r.hasLeader() {
|
||||
propc = n.propc
|
||||
|
@ -54,6 +60,12 @@ func (n *Node) run(r *raft) {
|
|||
r.msgs,
|
||||
}
|
||||
|
||||
if sr.containsUpdates(prev) {
|
||||
statec = n.statec
|
||||
} else {
|
||||
statec = nil
|
||||
}
|
||||
|
||||
select {
|
||||
case p := <-propc:
|
||||
r.propose(p)
|
||||
|
@ -61,7 +73,7 @@ func (n *Node) run(r *raft) {
|
|||
r.Step(m) // raft never returns an error
|
||||
case <-n.tickc:
|
||||
// r.tick()
|
||||
case n.statec <- sr:
|
||||
case statec <- sr:
|
||||
r.raftLog.resetNextEnts()
|
||||
r.raftLog.resetUnstable()
|
||||
r.msgs = nil
|
||||
|
|
Loading…
Reference in New Issue