raft: fix heartbeat

release-2.0
Xiang Li 2014-09-15 09:58:22 -07:00
parent 072a21782e
commit 21d116d3e1
2 changed files with 63 additions and 1 deletions

View File

@ -189,7 +189,7 @@ func (r *raft) sendAppend(to int64) {
// sendHeartbeat sends RRPC, without entries to the given peer.
func (r *raft) sendHeartbeat(to int64) {
pr := r.prs[to]
index := max(pr.next-1, r.raftLog.lastIndex())
index := max(pr.next-1, r.raftLog.offset)
m := pb.Message{
To: to,
Type: msgApp,

View File

@ -731,6 +731,68 @@ func TestLeaderAppResp(t *testing.T) {
}
}
// When the leader receives a heartbeat tick, it should
// send a msgApp with m.Index = max(progress.next-1,log.offset) and empty
// entries.
func TestBcastBeat(t *testing.T) {
offset := int64(1000)
// make a state machine with log.offset = 1000
s := pb.Snapshot{
Index: offset,
Term: 1,
Nodes: []int64{1, 2},
}
sm := newRaft(1, []int64{1, 2}, 0, 0)
sm.Term = 1
sm.restore(s)
sm.becomeCandidate()
sm.becomeLeader()
for i := 0; i < 10; i++ {
sm.appendEntry(pb.Entry{})
}
tests := []struct {
pnext int64
windex int64
wterm int64
wto int64
}{
{offset + 1, offset, 1, 2},
{offset + 2, offset + 1, 2, 2},
// pr.next -1 < offset
{offset, offset, 1, 2},
{offset - 1, offset, 1, 2},
}
for i, tt := range tests {
sm.prs[2].match = 0
sm.prs[2].next = tt.pnext
sm.Step(pb.Message{Type: msgBeat})
msgs := sm.ReadMessages()
if len(msgs) != 1 {
t.Fatalf("#%d: len(msgs) = %v, want 1", i, len(msgs))
}
m := msgs[0]
if m.Type != msgApp {
t.Fatalf("#%d: type = %v, want = %v", i, m.Type, msgApp)
}
if m.Index != tt.windex {
t.Fatalf("#%d: prevIndex = %d, want %d", i, m.Index, tt.windex)
}
if m.LogTerm != tt.wterm {
t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, tt.wterm)
}
if m.To != tt.wto {
t.Fatalf("#%d: to = %d, want %d", i, m.To, tt.wto)
}
if len(m.Entries) != 0 {
t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries))
}
}
}
// tests the output of the statemachine when receiving msgBeat
func TestRecvMsgBeat(t *testing.T) {
tests := []struct {