diff --git a/raft/raft.go b/raft/raft.go index 330191ab3..ebc4d7501 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -491,9 +491,14 @@ func stepLeader(r *raft, m pb.Message) { r.sendAppend(m.From) } } else { + oldWait := r.prs[m.From].shouldWait() r.prs[m.From].update(m.Index) if r.maybeCommit() { r.bcastAppend() + } else if oldWait { + // update() reset the wait state on this node. If we had delayed sending + // an update before, send it now. + r.sendAppend(m.From) } } case pb.MsgHeartbeatResp: diff --git a/raft/raft_test.go b/raft/raft_test.go index 5bd5e7c03..30d1bfbef 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -856,17 +856,82 @@ func TestHandleHeartbeatResp(t *testing.T) { Type: pb.MsgAppResp, Index: msgs[1].Index + uint64(len(msgs[1].Entries)), }) + // Consume the message sent in response to MsgAppResp + sm.readMessages() + sm.bcastHeartbeat() // reset wait state sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp}) msgs = sm.readMessages() if len(msgs) != 1 { - t.Fatalf("len(msgs) = %d, want 1", len(msgs)) + t.Fatalf("len(msgs) = %d, want 1: %+v", len(msgs), msgs) } if msgs[0].Type != pb.MsgHeartbeat { t.Errorf("type = %v, want MsgHeartbeat", msgs[0].Type) } } +// TestMsgAppRespWaitReset verifies the waitReset behavior of a leader +// MsgAppResp. +func TestMsgAppRespWaitReset(t *testing.T) { + sm := newRaft(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage(), 0) + sm.becomeCandidate() + sm.becomeLeader() + + // The new leader has just emitted a new Term 4 entry; consume those messages + // from the outgoing queue. + sm.bcastAppend() + sm.readMessages() + + // Node 2 acks the first entry, making it committed. + sm.Step(pb.Message{ + From: 2, + Type: pb.MsgAppResp, + Index: 1, + }) + if sm.Commit != 1 { + t.Fatalf("expected Commit to be 1, got %d", sm.Commit) + } + // Also consume the MsgApp messages that update Commit on the followers. + sm.readMessages() + + // A new command is now proposed on node 1. + sm.Step(pb.Message{ + From: 1, + Type: pb.MsgProp, + Entries: []pb.Entry{{}}, + }) + + // The command is broadcast to all nodes not in the wait state. + // Node 2 left the wait state due to its MsgAppResp, but node 3 is still waiting. + msgs := sm.readMessages() + if len(msgs) != 1 { + t.Fatalf("expected 1 message, got %d: %+v", len(msgs), msgs) + } + if msgs[0].Type != pb.MsgApp || msgs[0].To != 2 { + t.Errorf("expected MsgApp to node 2, got %s to %d", msgs[0].Type, msgs[0].To) + } + if len(msgs[0].Entries) != 1 || msgs[0].Entries[0].Index != 2 { + t.Errorf("expected to send entry 2, but got %v", msgs[0].Entries) + } + + // Now Node 3 acks the first entry. This releases the wait and entry 2 is sent. + sm.Step(pb.Message{ + From: 3, + Type: pb.MsgAppResp, + Index: 1, + }) + msgs = sm.readMessages() + if len(msgs) != 1 { + t.Fatalf("expected 1 message, got %d: %+v", len(msgs), msgs) + } + if msgs[0].Type != pb.MsgApp || msgs[0].To != 3 { + t.Errorf("expected MsgApp to node 3, got %s to %d", msgs[0].Type, msgs[0].To) + } + if len(msgs[0].Entries) != 1 || msgs[0].Entries[0].Index != 2 { + t.Errorf("expected to send entry 2, but got %v", msgs[0].Entries) + } +} + func TestRecvMsgVote(t *testing.T) { tests := []struct { state StateType