From 149389cbfa087c0a95a36abb0f51ddfb50d6c61f Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 3 Dec 2014 21:32:12 -0800 Subject: [PATCH] raft: add msgHeartbeat type --- raft/raft.go | 18 +++++++++--------- raft/raft_paper_test.go | 4 ++-- raft/raft_test.go | 12 ++++++------ raft/raftpb/raft.pb.go | 35 +++++++++++++++++++---------------- raft/raftpb/raft.proto | 17 +++++++++-------- 5 files changed, 45 insertions(+), 41 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 9fb7998c9..a6e93ece8 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -262,7 +262,7 @@ func (r *raft) sendHeartbeat(to uint64) { commit := min(r.prs[to].match, r.raftLog.committed) m := pb.Message{ To: to, - Type: pb.MsgApp, + Type: pb.MsgHeartbeat, Commit: commit, } r.send(m) @@ -501,9 +501,6 @@ func stepLeader(r *raft, m pb.Message) { r.appendEntry(e) r.bcastAppend() case pb.MsgAppResp: - if m.Index == 0 { - return - } if m.Reject { log.Printf("raft: %x received msgApp rejection from %x for index %d", r.id, m.From, m.Index) @@ -530,6 +527,9 @@ func stepCandidate(r *raft, m pb.Message) { case pb.MsgApp: r.becomeFollower(r.Term, m.From) r.handleAppendEntries(m) + case pb.MsgHeartbeat: + r.becomeFollower(r.Term, m.From) + r.handleHeartbeat(m) case pb.MsgSnap: r.becomeFollower(m.Term, m.From) r.handleSnapshot(m) @@ -561,11 +561,11 @@ func stepFollower(r *raft, m pb.Message) { case pb.MsgApp: r.elapsed = 0 r.lead = m.From - if m.LogTerm == 0 && m.Index == 0 && len(m.Entries) == 0 { - r.handleHeartbeat(m) - } else { - r.handleAppendEntries(m) - } + r.handleAppendEntries(m) + case pb.MsgHeartbeat: + r.elapsed = 0 + r.lead = m.From + r.handleHeartbeat(m) case pb.MsgSnap: r.elapsed = 0 r.handleSnapshot(m) diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index 8f8421c10..d91426db5 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -127,8 +127,8 @@ func TestLeaderBcastBeat(t *testing.T) { msgs := r.readMessages() sort.Sort(messageSlice(msgs)) wmsgs := []pb.Message{ - {From: 1, To: 2, Term: 1, Type: pb.MsgApp}, - {From: 1, To: 3, Term: 1, Type: pb.MsgApp}, + {From: 1, To: 2, Term: 1, Type: pb.MsgHeartbeat}, + {From: 1, To: 3, Term: 1, Type: pb.MsgHeartbeat}, } if !reflect.DeepEqual(msgs, wmsgs) { t.Errorf("msgs = %v, want %v", msgs, wmsgs) diff --git a/raft/raft_test.go b/raft/raft_test.go index 5cc51e25c..3fa855598 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -997,8 +997,8 @@ func TestBcastBeat(t *testing.T) { 3: min(sm.raftLog.committed, sm.prs[3].match), } for i, m := range msgs { - if m.Type != pb.MsgApp { - t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgApp) + if m.Type != pb.MsgHeartbeat { + t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgHeartbeat) } if m.Index != 0 { t.Fatalf("#%d: prevIndex = %d, want %d", i, m.Index, 0) @@ -1052,8 +1052,8 @@ func TestRecvMsgBeat(t *testing.T) { t.Errorf("%d: len(msgs) = %d, want %d", i, len(msgs), tt.wMsg) } for _, m := range msgs { - if m.Type != pb.MsgApp { - t.Errorf("%d: msg.type = %v, want %v", i, m.Type, pb.MsgApp) + if m.Type != pb.MsgHeartbeat { + t.Errorf("%d: msg.type = %v, want %v", i, m.Type, pb.MsgHeartbeat) } } } @@ -1377,9 +1377,9 @@ func TestRaftNodes(t *testing.T) { } func ents(terms ...uint64) *raft { - ents := []pb.Entry{} + ents := []pb.Entry{{}} for i, term := range terms { - ents = append(ents, pb.Entry{Index: uint64(i), Term: term}) + ents = append(ents, pb.Entry{Index: uint64(i + 1), Term: term}) } sm := &raft{ diff --git a/raft/raftpb/raft.pb.go b/raft/raftpb/raft.pb.go index 4c3eaafda..2c18034b6 100644 --- a/raft/raftpb/raft.pb.go +++ b/raft/raftpb/raft.pb.go @@ -69,14 +69,15 @@ func (x *EntryType) UnmarshalJSON(data []byte) error { type MessageType int32 const ( - MsgHup MessageType = 0 - MsgBeat MessageType = 1 - MsgProp MessageType = 2 - MsgApp MessageType = 3 - MsgAppResp MessageType = 4 - MsgVote MessageType = 5 - MsgVoteResp MessageType = 6 - MsgSnap MessageType = 7 + MsgHup MessageType = 0 + MsgBeat MessageType = 1 + MsgProp MessageType = 2 + MsgApp MessageType = 3 + MsgAppResp MessageType = 4 + MsgVote MessageType = 5 + MsgVoteResp MessageType = 6 + MsgSnap MessageType = 7 + MsgHeartbeat MessageType = 8 ) var MessageType_name = map[int32]string{ @@ -88,16 +89,18 @@ var MessageType_name = map[int32]string{ 5: "MsgVote", 6: "MsgVoteResp", 7: "MsgSnap", + 8: "MsgHeartbeat", } var MessageType_value = map[string]int32{ - "MsgHup": 0, - "MsgBeat": 1, - "MsgProp": 2, - "MsgApp": 3, - "MsgAppResp": 4, - "MsgVote": 5, - "MsgVoteResp": 6, - "MsgSnap": 7, + "MsgHup": 0, + "MsgBeat": 1, + "MsgProp": 2, + "MsgApp": 3, + "MsgAppResp": 4, + "MsgVote": 5, + "MsgVoteResp": 6, + "MsgSnap": 7, + "MsgHeartbeat": 8, } func (x MessageType) Enum() *MessageType { diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index 247b5a0db..2806bd5ac 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -32,14 +32,15 @@ message Snapshot { } enum MessageType { - MsgHup = 0; - MsgBeat = 1; - MsgProp = 2; - MsgApp = 3; - MsgAppResp = 4; - MsgVote = 5; - MsgVoteResp = 6; - MsgSnap = 7; + MsgHup = 0; + MsgBeat = 1; + MsgProp = 2; + MsgApp = 3; + MsgAppResp = 4; + MsgVote = 5; + MsgVoteResp = 6; + MsgSnap = 7; + MsgHeartbeat = 8; } message Message {