From ac059eb8cb2c5a9acff3924f762454e77c10bc7e Mon Sep 17 00:00:00 2001 From: es-chow Date: Thu, 24 Mar 2016 19:59:41 +0800 Subject: [PATCH] raft: transfer leader feature --- raft/node.go | 4 +- raft/node_test.go | 2 +- raft/raft.go | 75 +++++++++++- raft/raft_test.go | 255 +++++++++++++++++++++++++++++++++++++++++ raft/raftpb/raft.pb.go | 58 +++++----- raft/raftpb/raft.proto | 2 + raft/rawnode.go | 9 +- raft/rawnode_test.go | 2 +- raft/util.go | 9 +- 9 files changed, 378 insertions(+), 38 deletions(-) diff --git a/raft/node.go b/raft/node.go index 5811af23a..727dec016 100644 --- a/raft/node.go +++ b/raft/node.go @@ -306,7 +306,7 @@ func (n *node) run(r *raft) { r.Step(m) case m := <-n.recvc: // filter out response message from unknown From. - if _, ok := r.prs[m.From]; ok || !IsResponseMsg(m) { + if _, ok := r.prs[m.From]; ok || !IsResponseMsg(m.Type) { r.Step(m) // raft never returns an error } case cc := <-n.confc: @@ -392,7 +392,7 @@ func (n *node) Propose(ctx context.Context, data []byte) error { func (n *node) Step(ctx context.Context, m pb.Message) error { // ignore unexpected local messages receiving over network - if IsLocalMsg(m) { + if IsLocalMsg(m.Type) { // TODO: return an error? return nil } diff --git a/raft/node_test.go b/raft/node_test.go index 8e24b4764..c4be7d82c 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -42,7 +42,7 @@ func TestNodeStep(t *testing.T) { t.Errorf("%d: cannot receive %s on propc chan", msgt, msgn) } } else { - if msgt == raftpb.MsgBeat || msgt == raftpb.MsgHup || msgt == raftpb.MsgUnreachable || msgt == raftpb.MsgSnapStatus || msgt == raftpb.MsgCheckQuorum { + if IsLocalMsg(msgt) { select { case <-n.recvc: t.Errorf("%d: step should ignore %s", msgt, msgn) diff --git a/raft/raft.go b/raft/raft.go index e7da600b7..0ac40315b 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -156,7 +156,9 @@ type raft struct { // the leader id lead uint64 - + // leadTransferee is id of the leader transfer target when its value is not zero. + // Follow the procedure defined in raft thesis 3.10. + leadTransferee uint64 // New configuration is ignored if there exists unapplied configuration. pendingConf bool @@ -397,6 +399,8 @@ func (r *raft) reset(term uint64) { r.heartbeatElapsed = 0 r.resetRandomizedElectionTimeout() + r.leadTransferee = None + r.votes = make(map[uint64]bool) for id := range r.prs { r.prs[id] = &Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight)} @@ -442,6 +446,10 @@ func (r *raft) tickHeartbeat() { if r.checkQuorum { r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum}) } + // If current leader cannot transfer leadership in electionTimeout, it becomes leader again. + if r.state == StateLeader && r.leadTransferee != None { + r.abortLeaderTransfer() + } } if r.state != StateLeader { @@ -547,6 +555,11 @@ func (r *raft) Step(m pb.Message) error { } return nil } + if m.Type == pb.MsgTransferLeader { + if r.state != StateLeader { + r.logger.Debugf("%x [term %d state %v] ignoring MsgTransferLeader to %x", r.id, r.Term, r.state, m.From) + } + } switch { case m.Term == 0: @@ -594,6 +607,11 @@ func stepLeader(r *raft, m pb.Message) { // drop any new proposals. return } + if r.leadTransferee != None { + r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee) + return + } + for i, e := range m.Entries { if e.Type == pb.EntryConfChange { if r.pendingConf { @@ -615,7 +633,7 @@ func stepLeader(r *raft, m pb.Message) { // All other message types require a progress for m.From (pr). pr, prOk := r.prs[m.From] if !prOk { - r.logger.Debugf("no progress available for %x", m.From) + r.logger.Debugf("%x no progress available for %x", r.id, m.From) return } switch m.Type { @@ -652,6 +670,11 @@ func stepLeader(r *raft, m pb.Message) { // an update before, send it now. r.sendAppend(m.From) } + // Transfer leadership is in progress. + if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() { + r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From) + r.sendTimeoutNow(m.From) + } } } case pb.MsgHeartbeatResp: @@ -687,6 +710,37 @@ func stepLeader(r *raft, m pb.Message) { pr.becomeProbe() } r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr) + case pb.MsgTransferLeader: + leadTransferee := m.From + lastLeadTransferee := r.leadTransferee + if lastLeadTransferee != None { + if lastLeadTransferee == leadTransferee { + r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x", + r.id, r.Term, leadTransferee, leadTransferee) + return + } + r.abortLeaderTransfer() + r.logger.Infof("%x [term %d] abort transfer leadership to %x", r.id, r.Term, lastLeadTransferee) + } + if leadTransferee == r.id { + if lastLeadTransferee == None { + r.logger.Debugf("%x is already leader. Ignored transfer leadership to %x", r.id, r.id) + } else { + r.logger.Debugf("%x abort transfer leadership to %x, transfer to current leader %x.", r.id, lastLeadTransferee, r.id) + } + return + } + // Transfer leadership to third party. + r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee) + // Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed. + r.electionElapsed = 0 + r.leadTransferee = leadTransferee + if pr.Match == r.raftLog.lastIndex() { + r.sendTimeoutNow(leadTransferee) + r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee) + } else { + r.sendAppend(leadTransferee) + } } } @@ -718,6 +772,8 @@ func stepCandidate(r *raft, m pb.Message) { case len(r.votes) - gr: r.becomeFollower(r.Term, None) } + case pb.MsgTimeoutNow: + r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From) } } @@ -753,6 +809,9 @@ func stepFollower(r *raft, m pb.Message) { r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term) r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true}) } + case pb.MsgTimeoutNow: + r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From) + r.campaign() } } @@ -846,6 +905,10 @@ func (r *raft) removeNode(id uint64) { if r.maybeCommit() { r.bcastAppend() } + // If the removed node is the leadTransferee, then abort the leadership transfering. + if r.state == StateLeader && r.leadTransferee == id { + r.abortLeaderTransfer() + } } func (r *raft) resetPendingConf() { r.pendingConf = false } @@ -900,3 +963,11 @@ func (r *raft) checkQuorumActive() bool { return act >= r.quorum() } + +func (r *raft) sendTimeoutNow(to uint64) { + r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow}) +} + +func (r *raft) abortLeaderTransfer() { + r.leadTransferee = None +} diff --git a/raft/raft_test.go b/raft/raft_test.go index 5187480c6..ab8ec2446 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -1911,6 +1911,261 @@ func TestCommitAfterRemoveNode(t *testing.T) { } } +// TestLeaderTransferToUpToDateNode verifies transfering should succeed +// if the transferee has the most up-to-date log entires when transfer starts. +func TestLeaderTransferToUpToDateNode(t *testing.T) { + nt := newNetwork(nil, nil, nil) + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + + lead := nt.peers[1].(*raft) + + if lead.lead != 1 { + t.Fatalf("after election leader is %x, want 1", lead.lead) + } + + // Transfer leadership to 2. + nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader}) + + checkLeaderTransferState(t, lead, StateFollower, 2) + + // After some log replication, transfer leadership back to 1. + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) + + nt.send(pb.Message{From: 1, To: 2, Type: pb.MsgTransferLeader}) + + checkLeaderTransferState(t, lead, StateLeader, 1) +} + +func TestLeaderTransferToSlowFollower(t *testing.T) { + defaultLogger.EnableDebug() + nt := newNetwork(nil, nil, nil) + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + + nt.isolate(3) + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) + + nt.recover() + lead := nt.peers[1].(*raft) + if lead.prs[3].Match != 1 { + t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs[3].Match, 1) + } + + // Transfer leadership to 3 when node 3 is lack of log. + nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) + + checkLeaderTransferState(t, lead, StateFollower, 3) +} + +func TestLeaderTransferAfterSnapshot(t *testing.T) { + nt := newNetwork(nil, nil, nil) + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + + nt.isolate(3) + + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) + lead := nt.peers[1].(*raft) + nextEnts(lead, nt.storage[1]) + nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.nodes()}, nil) + nt.storage[1].Compact(lead.raftLog.applied) + + nt.recover() + if lead.prs[3].Match != 1 { + t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs[3].Match, 1) + } + + // Transfer leadership to 3 when node 3 is lack of snapshot. + nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) + // Send pb.MsgHeartbeatResp to leader to trigger a snapshot for node 3. + nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgHeartbeatResp}) + + checkLeaderTransferState(t, lead, StateFollower, 3) +} + +func TestLeaderTransferToSelf(t *testing.T) { + nt := newNetwork(nil, nil, nil) + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + + lead := nt.peers[1].(*raft) + + // Transfer leadership to self, there will be noop. + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgTransferLeader}) + checkLeaderTransferState(t, lead, StateLeader, 1) +} + +func TestLeaderTransferToNonExistingNode(t *testing.T) { + nt := newNetwork(nil, nil, nil) + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + + lead := nt.peers[1].(*raft) + // Transfer leadership to non-existing node, there will be noop. + nt.send(pb.Message{From: 4, To: 1, Type: pb.MsgTransferLeader}) + checkLeaderTransferState(t, lead, StateLeader, 1) +} + +func TestLeaderTransferTimeout(t *testing.T) { + nt := newNetwork(nil, nil, nil) + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + + nt.isolate(3) + + lead := nt.peers[1].(*raft) + + // Transfer leadership to isolated node, wait for timeout. + nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) + if lead.leadTransferee != 3 { + t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3) + } + for i := 0; i < lead.heartbeatTimeout; i++ { + lead.tick() + } + if lead.leadTransferee != 3 { + t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3) + } + + for i := 0; i < lead.electionTimeout-lead.heartbeatTimeout; i++ { + lead.tick() + } + + checkLeaderTransferState(t, lead, StateLeader, 1) +} + +func TestLeaderTransferIgnoreProposal(t *testing.T) { + nt := newNetwork(nil, nil, nil) + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + + nt.isolate(3) + + lead := nt.peers[1].(*raft) + + // Transfer leadership to isolated node to let transfer pending, then send proposal. + nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) + if lead.leadTransferee != 3 { + t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3) + } + + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) + + if lead.prs[1].Match != 1 { + t.Fatalf("node 1 has match %x, want %x", lead.prs[1].Match, 1) + } +} + +func TestLeaderTransferReceiveHigherTermVote(t *testing.T) { + nt := newNetwork(nil, nil, nil) + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + + nt.isolate(3) + + lead := nt.peers[1].(*raft) + + // Transfer leadership to isolated node to let transfer pending. + nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) + if lead.leadTransferee != 3 { + t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3) + } + + nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup, Index: 1, Term: 2}) + + checkLeaderTransferState(t, lead, StateFollower, 2) +} + +func TestLeaderTransferRemoveNode(t *testing.T) { + nt := newNetwork(nil, nil, nil) + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + + nt.ignore(pb.MsgTimeoutNow) + + lead := nt.peers[1].(*raft) + + // The leadTransferee is removed when leadship transfering. + nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) + if lead.leadTransferee != 3 { + t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3) + } + + lead.removeNode(3) + + checkLeaderTransferState(t, lead, StateLeader, 1) +} + +// TestLeaderTransferBack verifies leadership can transfer back to self when last transfer is pending. +func TestLeaderTransferBack(t *testing.T) { + nt := newNetwork(nil, nil, nil) + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + + nt.isolate(3) + + lead := nt.peers[1].(*raft) + + nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) + if lead.leadTransferee != 3 { + t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3) + } + + // Transfer leadership back to self. + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgTransferLeader}) + + checkLeaderTransferState(t, lead, StateLeader, 1) +} + +// TestLeaderTransferSecondTransferToAnotherNode verifies leader can transfer to another node +// when last transfer is pending. +func TestLeaderTransferSecondTransferToAnotherNode(t *testing.T) { + nt := newNetwork(nil, nil, nil) + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + + nt.isolate(3) + + lead := nt.peers[1].(*raft) + + nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) + if lead.leadTransferee != 3 { + t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3) + } + + // Transfer leadership to another node. + nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader}) + + checkLeaderTransferState(t, lead, StateFollower, 2) +} + +// TestLeaderTransferSecondTransferToSameNode verifies second transfer leader request +// to the same node should not extend the timeout while the first one is pending. +func TestLeaderTransferSecondTransferToSameNode(t *testing.T) { + nt := newNetwork(nil, nil, nil) + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + + nt.isolate(3) + + lead := nt.peers[1].(*raft) + + nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) + if lead.leadTransferee != 3 { + t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3) + } + + for i := 0; i < lead.heartbeatTimeout; i++ { + lead.tick() + } + // Second transfer leadership request to the same node. + nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) + + for i := 0; i < lead.electionTimeout-lead.heartbeatTimeout; i++ { + lead.tick() + } + + checkLeaderTransferState(t, lead, StateLeader, 1) +} + +func checkLeaderTransferState(t *testing.T, r *raft, state StateType, lead uint64) { + if r.state != state || r.lead != lead { + t.Fatalf("after transfering, node has state %v lead %v, want state %v lead %v", r.state, r.lead, state, lead) + } + if r.leadTransferee != None { + t.Fatalf("after transfering, node has leadTransferee %v, want leadTransferee %v", r.leadTransferee, None) + } +} + func ents(terms ...uint64) *raft { storage := NewMemoryStorage() for i, term := range terms { diff --git a/raft/raftpb/raft.pb.go b/raft/raftpb/raft.pb.go index 41e619d65..d5abecb48 100644 --- a/raft/raftpb/raft.pb.go +++ b/raft/raftpb/raft.pb.go @@ -70,19 +70,21 @@ func (x *EntryType) UnmarshalJSON(data []byte) error { type MessageType int32 const ( - MsgHup MessageType = 0 - MsgBeat MessageType = 1 - MsgProp MessageType = 2 - MsgApp MessageType = 3 - MsgAppResp MessageType = 4 - MsgVote MessageType = 5 - MsgVoteResp MessageType = 6 - MsgSnap MessageType = 7 - MsgHeartbeat MessageType = 8 - MsgHeartbeatResp MessageType = 9 - MsgUnreachable MessageType = 10 - MsgSnapStatus MessageType = 11 - MsgCheckQuorum MessageType = 12 + MsgHup MessageType = 0 + MsgBeat MessageType = 1 + MsgProp MessageType = 2 + MsgApp MessageType = 3 + MsgAppResp MessageType = 4 + MsgVote MessageType = 5 + MsgVoteResp MessageType = 6 + MsgSnap MessageType = 7 + MsgHeartbeat MessageType = 8 + MsgHeartbeatResp MessageType = 9 + MsgUnreachable MessageType = 10 + MsgSnapStatus MessageType = 11 + MsgCheckQuorum MessageType = 12 + MsgTransferLeader MessageType = 13 + MsgTimeoutNow MessageType = 14 ) var MessageType_name = map[int32]string{ @@ -99,21 +101,25 @@ var MessageType_name = map[int32]string{ 10: "MsgUnreachable", 11: "MsgSnapStatus", 12: "MsgCheckQuorum", + 13: "MsgTransferLeader", + 14: "MsgTimeoutNow", } var MessageType_value = map[string]int32{ - "MsgHup": 0, - "MsgBeat": 1, - "MsgProp": 2, - "MsgApp": 3, - "MsgAppResp": 4, - "MsgVote": 5, - "MsgVoteResp": 6, - "MsgSnap": 7, - "MsgHeartbeat": 8, - "MsgHeartbeatResp": 9, - "MsgUnreachable": 10, - "MsgSnapStatus": 11, - "MsgCheckQuorum": 12, + "MsgHup": 0, + "MsgBeat": 1, + "MsgProp": 2, + "MsgApp": 3, + "MsgAppResp": 4, + "MsgVote": 5, + "MsgVoteResp": 6, + "MsgSnap": 7, + "MsgHeartbeat": 8, + "MsgHeartbeatResp": 9, + "MsgUnreachable": 10, + "MsgSnapStatus": 11, + "MsgCheckQuorum": 12, + "MsgTransferLeader": 13, + "MsgTimeoutNow": 14, } func (x MessageType) Enum() *MessageType { diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index 0a98b8cfa..42f10d269 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -46,6 +46,8 @@ enum MessageType { MsgUnreachable = 10; MsgSnapStatus = 11; MsgCheckQuorum = 12; + MsgTransferLeader = 13; + MsgTimeoutNow = 14; } message Message { diff --git a/raft/rawnode.go b/raft/rawnode.go index 8cf085891..47e76c515 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -168,10 +168,10 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { // Step advances the state machine using the given message. func (rn *RawNode) Step(m pb.Message) error { // ignore unexpected local messages receiving over network - if IsLocalMsg(m) { + if IsLocalMsg(m.Type) { return ErrStepLocalMsg } - if _, ok := rn.raft.prs[m.From]; ok || !IsResponseMsg(m) { + if _, ok := rn.raft.prs[m.From]; ok || !IsResponseMsg(m.Type) { return rn.raft.Step(m) } return ErrStepPeerNotFound @@ -226,3 +226,8 @@ func (rn *RawNode) ReportSnapshot(id uint64, status SnapshotStatus) { _ = rn.raft.Step(pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej}) } + +// TransferLeader tries to transfer leadership to the given transferee. +func (rn *RawNode) TransferLeader(transferee uint64) { + _ = rn.raft.Step(pb.Message{Type: pb.MsgTransferLeader, From: transferee}) +} diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 56692fc16..2f1262f46 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -33,7 +33,7 @@ func TestRawNodeStep(t *testing.T) { msgt := raftpb.MessageType(i) err = rawNode.Step(raftpb.Message{Type: msgt}) // LocalMsg should be ignored. - if msgt == raftpb.MsgBeat || msgt == raftpb.MsgHup || msgt == raftpb.MsgUnreachable || msgt == raftpb.MsgSnapStatus { + if IsLocalMsg(msgt) { if err != ErrStepLocalMsg { t.Errorf("%d: step should ignore %s", msgt, msgn) } diff --git a/raft/util.go b/raft/util.go index 8d4c41900..c8c2615d8 100644 --- a/raft/util.go +++ b/raft/util.go @@ -46,12 +46,13 @@ func max(a, b uint64) uint64 { return b } -func IsLocalMsg(m pb.Message) bool { - return m.Type == pb.MsgHup || m.Type == pb.MsgBeat || m.Type == pb.MsgUnreachable || m.Type == pb.MsgSnapStatus || m.Type == pb.MsgCheckQuorum +func IsLocalMsg(msgt pb.MessageType) bool { + return msgt == pb.MsgHup || msgt == pb.MsgBeat || msgt == pb.MsgUnreachable || + msgt == pb.MsgSnapStatus || msgt == pb.MsgCheckQuorum || msgt == pb.MsgTransferLeader } -func IsResponseMsg(m pb.Message) bool { - return m.Type == pb.MsgAppResp || m.Type == pb.MsgVoteResp || m.Type == pb.MsgHeartbeatResp || m.Type == pb.MsgUnreachable +func IsResponseMsg(msgt pb.MessageType) bool { + return msgt == pb.MsgAppResp || msgt == pb.MsgVoteResp || msgt == pb.MsgHeartbeatResp || msgt == pb.MsgUnreachable } // EntryFormatter can be implemented by the application to provide human-readable formatting