raft: Send any waiting appends after receiving MsgAppResp.

This addresses a problem that comes up in the cockroach tests,
in which the order of messages may lead to deadlocks (due to
the fact that we don't have regular heartbeat timers in most
of our tests).
release-2.0
Ben Darnell 2015-01-27 17:40:14 -05:00
parent 915c22292f
commit 33d2400063
2 changed files with 71 additions and 1 deletions

View File

@ -491,9 +491,14 @@ func stepLeader(r *raft, m pb.Message) {
r.sendAppend(m.From)
}
} else {
oldWait := r.prs[m.From].shouldWait()
r.prs[m.From].update(m.Index)
if r.maybeCommit() {
r.bcastAppend()
} else if oldWait {
// update() reset the wait state on this node. If we had delayed sending
// an update before, send it now.
r.sendAppend(m.From)
}
}
case pb.MsgHeartbeatResp:

View File

@ -856,17 +856,82 @@ func TestHandleHeartbeatResp(t *testing.T) {
Type: pb.MsgAppResp,
Index: msgs[1].Index + uint64(len(msgs[1].Entries)),
})
// Consume the message sent in response to MsgAppResp
sm.readMessages()
sm.bcastHeartbeat() // reset wait state
sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp})
msgs = sm.readMessages()
if len(msgs) != 1 {
t.Fatalf("len(msgs) = %d, want 1", len(msgs))
t.Fatalf("len(msgs) = %d, want 1: %+v", len(msgs), msgs)
}
if msgs[0].Type != pb.MsgHeartbeat {
t.Errorf("type = %v, want MsgHeartbeat", msgs[0].Type)
}
}
// TestMsgAppRespWaitReset verifies the waitReset behavior of a leader
// MsgAppResp.
func TestMsgAppRespWaitReset(t *testing.T) {
sm := newRaft(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage(), 0)
sm.becomeCandidate()
sm.becomeLeader()
// The new leader has just emitted a new Term 4 entry; consume those messages
// from the outgoing queue.
sm.bcastAppend()
sm.readMessages()
// Node 2 acks the first entry, making it committed.
sm.Step(pb.Message{
From: 2,
Type: pb.MsgAppResp,
Index: 1,
})
if sm.Commit != 1 {
t.Fatalf("expected Commit to be 1, got %d", sm.Commit)
}
// Also consume the MsgApp messages that update Commit on the followers.
sm.readMessages()
// A new command is now proposed on node 1.
sm.Step(pb.Message{
From: 1,
Type: pb.MsgProp,
Entries: []pb.Entry{{}},
})
// The command is broadcast to all nodes not in the wait state.
// Node 2 left the wait state due to its MsgAppResp, but node 3 is still waiting.
msgs := sm.readMessages()
if len(msgs) != 1 {
t.Fatalf("expected 1 message, got %d: %+v", len(msgs), msgs)
}
if msgs[0].Type != pb.MsgApp || msgs[0].To != 2 {
t.Errorf("expected MsgApp to node 2, got %s to %d", msgs[0].Type, msgs[0].To)
}
if len(msgs[0].Entries) != 1 || msgs[0].Entries[0].Index != 2 {
t.Errorf("expected to send entry 2, but got %v", msgs[0].Entries)
}
// Now Node 3 acks the first entry. This releases the wait and entry 2 is sent.
sm.Step(pb.Message{
From: 3,
Type: pb.MsgAppResp,
Index: 1,
})
msgs = sm.readMessages()
if len(msgs) != 1 {
t.Fatalf("expected 1 message, got %d: %+v", len(msgs), msgs)
}
if msgs[0].Type != pb.MsgApp || msgs[0].To != 3 {
t.Errorf("expected MsgApp to node 3, got %s to %d", msgs[0].Type, msgs[0].To)
}
if len(msgs[0].Entries) != 1 || msgs[0].Entries[0].Index != 2 {
t.Errorf("expected to send entry 2, but got %v", msgs[0].Entries)
}
}
func TestRecvMsgVote(t *testing.T) {
tests := []struct {
state StateType