raft: add msgHeartbeat type
parent
2c0d323318
commit
149389cbfa
18
raft/raft.go
18
raft/raft.go
|
@ -262,7 +262,7 @@ func (r *raft) sendHeartbeat(to uint64) {
|
||||||
commit := min(r.prs[to].match, r.raftLog.committed)
|
commit := min(r.prs[to].match, r.raftLog.committed)
|
||||||
m := pb.Message{
|
m := pb.Message{
|
||||||
To: to,
|
To: to,
|
||||||
Type: pb.MsgApp,
|
Type: pb.MsgHeartbeat,
|
||||||
Commit: commit,
|
Commit: commit,
|
||||||
}
|
}
|
||||||
r.send(m)
|
r.send(m)
|
||||||
|
@ -501,9 +501,6 @@ func stepLeader(r *raft, m pb.Message) {
|
||||||
r.appendEntry(e)
|
r.appendEntry(e)
|
||||||
r.bcastAppend()
|
r.bcastAppend()
|
||||||
case pb.MsgAppResp:
|
case pb.MsgAppResp:
|
||||||
if m.Index == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if m.Reject {
|
if m.Reject {
|
||||||
log.Printf("raft: %x received msgApp rejection from %x for index %d",
|
log.Printf("raft: %x received msgApp rejection from %x for index %d",
|
||||||
r.id, m.From, m.Index)
|
r.id, m.From, m.Index)
|
||||||
|
@ -530,6 +527,9 @@ func stepCandidate(r *raft, m pb.Message) {
|
||||||
case pb.MsgApp:
|
case pb.MsgApp:
|
||||||
r.becomeFollower(r.Term, m.From)
|
r.becomeFollower(r.Term, m.From)
|
||||||
r.handleAppendEntries(m)
|
r.handleAppendEntries(m)
|
||||||
|
case pb.MsgHeartbeat:
|
||||||
|
r.becomeFollower(r.Term, m.From)
|
||||||
|
r.handleHeartbeat(m)
|
||||||
case pb.MsgSnap:
|
case pb.MsgSnap:
|
||||||
r.becomeFollower(m.Term, m.From)
|
r.becomeFollower(m.Term, m.From)
|
||||||
r.handleSnapshot(m)
|
r.handleSnapshot(m)
|
||||||
|
@ -561,11 +561,11 @@ func stepFollower(r *raft, m pb.Message) {
|
||||||
case pb.MsgApp:
|
case pb.MsgApp:
|
||||||
r.elapsed = 0
|
r.elapsed = 0
|
||||||
r.lead = m.From
|
r.lead = m.From
|
||||||
if m.LogTerm == 0 && m.Index == 0 && len(m.Entries) == 0 {
|
r.handleAppendEntries(m)
|
||||||
r.handleHeartbeat(m)
|
case pb.MsgHeartbeat:
|
||||||
} else {
|
r.elapsed = 0
|
||||||
r.handleAppendEntries(m)
|
r.lead = m.From
|
||||||
}
|
r.handleHeartbeat(m)
|
||||||
case pb.MsgSnap:
|
case pb.MsgSnap:
|
||||||
r.elapsed = 0
|
r.elapsed = 0
|
||||||
r.handleSnapshot(m)
|
r.handleSnapshot(m)
|
||||||
|
|
|
@ -127,8 +127,8 @@ func TestLeaderBcastBeat(t *testing.T) {
|
||||||
msgs := r.readMessages()
|
msgs := r.readMessages()
|
||||||
sort.Sort(messageSlice(msgs))
|
sort.Sort(messageSlice(msgs))
|
||||||
wmsgs := []pb.Message{
|
wmsgs := []pb.Message{
|
||||||
{From: 1, To: 2, Term: 1, Type: pb.MsgApp},
|
{From: 1, To: 2, Term: 1, Type: pb.MsgHeartbeat},
|
||||||
{From: 1, To: 3, Term: 1, Type: pb.MsgApp},
|
{From: 1, To: 3, Term: 1, Type: pb.MsgHeartbeat},
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(msgs, wmsgs) {
|
if !reflect.DeepEqual(msgs, wmsgs) {
|
||||||
t.Errorf("msgs = %v, want %v", msgs, wmsgs)
|
t.Errorf("msgs = %v, want %v", msgs, wmsgs)
|
||||||
|
|
|
@ -997,8 +997,8 @@ func TestBcastBeat(t *testing.T) {
|
||||||
3: min(sm.raftLog.committed, sm.prs[3].match),
|
3: min(sm.raftLog.committed, sm.prs[3].match),
|
||||||
}
|
}
|
||||||
for i, m := range msgs {
|
for i, m := range msgs {
|
||||||
if m.Type != pb.MsgApp {
|
if m.Type != pb.MsgHeartbeat {
|
||||||
t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgApp)
|
t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgHeartbeat)
|
||||||
}
|
}
|
||||||
if m.Index != 0 {
|
if m.Index != 0 {
|
||||||
t.Fatalf("#%d: prevIndex = %d, want %d", i, m.Index, 0)
|
t.Fatalf("#%d: prevIndex = %d, want %d", i, m.Index, 0)
|
||||||
|
@ -1052,8 +1052,8 @@ func TestRecvMsgBeat(t *testing.T) {
|
||||||
t.Errorf("%d: len(msgs) = %d, want %d", i, len(msgs), tt.wMsg)
|
t.Errorf("%d: len(msgs) = %d, want %d", i, len(msgs), tt.wMsg)
|
||||||
}
|
}
|
||||||
for _, m := range msgs {
|
for _, m := range msgs {
|
||||||
if m.Type != pb.MsgApp {
|
if m.Type != pb.MsgHeartbeat {
|
||||||
t.Errorf("%d: msg.type = %v, want %v", i, m.Type, pb.MsgApp)
|
t.Errorf("%d: msg.type = %v, want %v", i, m.Type, pb.MsgHeartbeat)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1377,9 +1377,9 @@ func TestRaftNodes(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func ents(terms ...uint64) *raft {
|
func ents(terms ...uint64) *raft {
|
||||||
ents := []pb.Entry{}
|
ents := []pb.Entry{{}}
|
||||||
for i, term := range terms {
|
for i, term := range terms {
|
||||||
ents = append(ents, pb.Entry{Index: uint64(i), Term: term})
|
ents = append(ents, pb.Entry{Index: uint64(i + 1), Term: term})
|
||||||
}
|
}
|
||||||
|
|
||||||
sm := &raft{
|
sm := &raft{
|
||||||
|
|
|
@ -69,14 +69,15 @@ func (x *EntryType) UnmarshalJSON(data []byte) error {
|
||||||
type MessageType int32
|
type MessageType int32
|
||||||
|
|
||||||
const (
|
const (
|
||||||
MsgHup MessageType = 0
|
MsgHup MessageType = 0
|
||||||
MsgBeat MessageType = 1
|
MsgBeat MessageType = 1
|
||||||
MsgProp MessageType = 2
|
MsgProp MessageType = 2
|
||||||
MsgApp MessageType = 3
|
MsgApp MessageType = 3
|
||||||
MsgAppResp MessageType = 4
|
MsgAppResp MessageType = 4
|
||||||
MsgVote MessageType = 5
|
MsgVote MessageType = 5
|
||||||
MsgVoteResp MessageType = 6
|
MsgVoteResp MessageType = 6
|
||||||
MsgSnap MessageType = 7
|
MsgSnap MessageType = 7
|
||||||
|
MsgHeartbeat MessageType = 8
|
||||||
)
|
)
|
||||||
|
|
||||||
var MessageType_name = map[int32]string{
|
var MessageType_name = map[int32]string{
|
||||||
|
@ -88,16 +89,18 @@ var MessageType_name = map[int32]string{
|
||||||
5: "MsgVote",
|
5: "MsgVote",
|
||||||
6: "MsgVoteResp",
|
6: "MsgVoteResp",
|
||||||
7: "MsgSnap",
|
7: "MsgSnap",
|
||||||
|
8: "MsgHeartbeat",
|
||||||
}
|
}
|
||||||
var MessageType_value = map[string]int32{
|
var MessageType_value = map[string]int32{
|
||||||
"MsgHup": 0,
|
"MsgHup": 0,
|
||||||
"MsgBeat": 1,
|
"MsgBeat": 1,
|
||||||
"MsgProp": 2,
|
"MsgProp": 2,
|
||||||
"MsgApp": 3,
|
"MsgApp": 3,
|
||||||
"MsgAppResp": 4,
|
"MsgAppResp": 4,
|
||||||
"MsgVote": 5,
|
"MsgVote": 5,
|
||||||
"MsgVoteResp": 6,
|
"MsgVoteResp": 6,
|
||||||
"MsgSnap": 7,
|
"MsgSnap": 7,
|
||||||
|
"MsgHeartbeat": 8,
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x MessageType) Enum() *MessageType {
|
func (x MessageType) Enum() *MessageType {
|
||||||
|
|
|
@ -32,14 +32,15 @@ message Snapshot {
|
||||||
}
|
}
|
||||||
|
|
||||||
enum MessageType {
|
enum MessageType {
|
||||||
MsgHup = 0;
|
MsgHup = 0;
|
||||||
MsgBeat = 1;
|
MsgBeat = 1;
|
||||||
MsgProp = 2;
|
MsgProp = 2;
|
||||||
MsgApp = 3;
|
MsgApp = 3;
|
||||||
MsgAppResp = 4;
|
MsgAppResp = 4;
|
||||||
MsgVote = 5;
|
MsgVote = 5;
|
||||||
MsgVoteResp = 6;
|
MsgVoteResp = 6;
|
||||||
MsgSnap = 7;
|
MsgSnap = 7;
|
||||||
|
MsgHeartbeat = 8;
|
||||||
}
|
}
|
||||||
|
|
||||||
message Message {
|
message Message {
|
||||||
|
|
Loading…
Reference in New Issue