raft: transfer leader feature

release-3.0
es-chow 2016-03-24 19:59:41 +08:00
parent 1c12b66e35
commit ac059eb8cb
9 changed files with 378 additions and 38 deletions

View File

@ -306,7 +306,7 @@ func (n *node) run(r *raft) {
r.Step(m)
case m := <-n.recvc:
// filter out response message from unknown From.
if _, ok := r.prs[m.From]; ok || !IsResponseMsg(m) {
if _, ok := r.prs[m.From]; ok || !IsResponseMsg(m.Type) {
r.Step(m) // raft never returns an error
}
case cc := <-n.confc:
@ -392,7 +392,7 @@ func (n *node) Propose(ctx context.Context, data []byte) error {
func (n *node) Step(ctx context.Context, m pb.Message) error {
// ignore unexpected local messages receiving over network
if IsLocalMsg(m) {
if IsLocalMsg(m.Type) {
// TODO: return an error?
return nil
}

View File

@ -42,7 +42,7 @@ func TestNodeStep(t *testing.T) {
t.Errorf("%d: cannot receive %s on propc chan", msgt, msgn)
}
} else {
if msgt == raftpb.MsgBeat || msgt == raftpb.MsgHup || msgt == raftpb.MsgUnreachable || msgt == raftpb.MsgSnapStatus || msgt == raftpb.MsgCheckQuorum {
if IsLocalMsg(msgt) {
select {
case <-n.recvc:
t.Errorf("%d: step should ignore %s", msgt, msgn)

View File

@ -156,7 +156,9 @@ type raft struct {
// the leader id
lead uint64
// leadTransferee is id of the leader transfer target when its value is not zero.
// Follow the procedure defined in raft thesis 3.10.
leadTransferee uint64
// New configuration is ignored if there exists unapplied configuration.
pendingConf bool
@ -397,6 +399,8 @@ func (r *raft) reset(term uint64) {
r.heartbeatElapsed = 0
r.resetRandomizedElectionTimeout()
r.leadTransferee = None
r.votes = make(map[uint64]bool)
for id := range r.prs {
r.prs[id] = &Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight)}
@ -442,6 +446,10 @@ func (r *raft) tickHeartbeat() {
if r.checkQuorum {
r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
}
// If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
if r.state == StateLeader && r.leadTransferee != None {
r.abortLeaderTransfer()
}
}
if r.state != StateLeader {
@ -547,6 +555,11 @@ func (r *raft) Step(m pb.Message) error {
}
return nil
}
if m.Type == pb.MsgTransferLeader {
if r.state != StateLeader {
r.logger.Debugf("%x [term %d state %v] ignoring MsgTransferLeader to %x", r.id, r.Term, r.state, m.From)
}
}
switch {
case m.Term == 0:
@ -594,6 +607,11 @@ func stepLeader(r *raft, m pb.Message) {
// drop any new proposals.
return
}
if r.leadTransferee != None {
r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
return
}
for i, e := range m.Entries {
if e.Type == pb.EntryConfChange {
if r.pendingConf {
@ -615,7 +633,7 @@ func stepLeader(r *raft, m pb.Message) {
// All other message types require a progress for m.From (pr).
pr, prOk := r.prs[m.From]
if !prOk {
r.logger.Debugf("no progress available for %x", m.From)
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
return
}
switch m.Type {
@ -652,6 +670,11 @@ func stepLeader(r *raft, m pb.Message) {
// an update before, send it now.
r.sendAppend(m.From)
}
// Transfer leadership is in progress.
if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
r.sendTimeoutNow(m.From)
}
}
}
case pb.MsgHeartbeatResp:
@ -687,6 +710,37 @@ func stepLeader(r *raft, m pb.Message) {
pr.becomeProbe()
}
r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
case pb.MsgTransferLeader:
leadTransferee := m.From
lastLeadTransferee := r.leadTransferee
if lastLeadTransferee != None {
if lastLeadTransferee == leadTransferee {
r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
r.id, r.Term, leadTransferee, leadTransferee)
return
}
r.abortLeaderTransfer()
r.logger.Infof("%x [term %d] abort transfer leadership to %x", r.id, r.Term, lastLeadTransferee)
}
if leadTransferee == r.id {
if lastLeadTransferee == None {
r.logger.Debugf("%x is already leader. Ignored transfer leadership to %x", r.id, r.id)
} else {
r.logger.Debugf("%x abort transfer leadership to %x, transfer to current leader %x.", r.id, lastLeadTransferee, r.id)
}
return
}
// Transfer leadership to third party.
r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee)
// Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed.
r.electionElapsed = 0
r.leadTransferee = leadTransferee
if pr.Match == r.raftLog.lastIndex() {
r.sendTimeoutNow(leadTransferee)
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
} else {
r.sendAppend(leadTransferee)
}
}
}
@ -718,6 +772,8 @@ func stepCandidate(r *raft, m pb.Message) {
case len(r.votes) - gr:
r.becomeFollower(r.Term, None)
}
case pb.MsgTimeoutNow:
r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
}
}
@ -753,6 +809,9 @@ func stepFollower(r *raft, m pb.Message) {
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
}
case pb.MsgTimeoutNow:
r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
r.campaign()
}
}
@ -846,6 +905,10 @@ func (r *raft) removeNode(id uint64) {
if r.maybeCommit() {
r.bcastAppend()
}
// If the removed node is the leadTransferee, then abort the leadership transfering.
if r.state == StateLeader && r.leadTransferee == id {
r.abortLeaderTransfer()
}
}
func (r *raft) resetPendingConf() { r.pendingConf = false }
@ -900,3 +963,11 @@ func (r *raft) checkQuorumActive() bool {
return act >= r.quorum()
}
func (r *raft) sendTimeoutNow(to uint64) {
r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow})
}
func (r *raft) abortLeaderTransfer() {
r.leadTransferee = None
}

View File

@ -1911,6 +1911,261 @@ func TestCommitAfterRemoveNode(t *testing.T) {
}
}
// TestLeaderTransferToUpToDateNode verifies transfering should succeed
// if the transferee has the most up-to-date log entires when transfer starts.
func TestLeaderTransferToUpToDateNode(t *testing.T) {
nt := newNetwork(nil, nil, nil)
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
lead := nt.peers[1].(*raft)
if lead.lead != 1 {
t.Fatalf("after election leader is %x, want 1", lead.lead)
}
// Transfer leadership to 2.
nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader})
checkLeaderTransferState(t, lead, StateFollower, 2)
// After some log replication, transfer leadership back to 1.
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
nt.send(pb.Message{From: 1, To: 2, Type: pb.MsgTransferLeader})
checkLeaderTransferState(t, lead, StateLeader, 1)
}
func TestLeaderTransferToSlowFollower(t *testing.T) {
defaultLogger.EnableDebug()
nt := newNetwork(nil, nil, nil)
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
nt.isolate(3)
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
nt.recover()
lead := nt.peers[1].(*raft)
if lead.prs[3].Match != 1 {
t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs[3].Match, 1)
}
// Transfer leadership to 3 when node 3 is lack of log.
nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
checkLeaderTransferState(t, lead, StateFollower, 3)
}
func TestLeaderTransferAfterSnapshot(t *testing.T) {
nt := newNetwork(nil, nil, nil)
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
nt.isolate(3)
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
lead := nt.peers[1].(*raft)
nextEnts(lead, nt.storage[1])
nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.nodes()}, nil)
nt.storage[1].Compact(lead.raftLog.applied)
nt.recover()
if lead.prs[3].Match != 1 {
t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs[3].Match, 1)
}
// Transfer leadership to 3 when node 3 is lack of snapshot.
nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
// Send pb.MsgHeartbeatResp to leader to trigger a snapshot for node 3.
nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgHeartbeatResp})
checkLeaderTransferState(t, lead, StateFollower, 3)
}
func TestLeaderTransferToSelf(t *testing.T) {
nt := newNetwork(nil, nil, nil)
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
lead := nt.peers[1].(*raft)
// Transfer leadership to self, there will be noop.
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgTransferLeader})
checkLeaderTransferState(t, lead, StateLeader, 1)
}
func TestLeaderTransferToNonExistingNode(t *testing.T) {
nt := newNetwork(nil, nil, nil)
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
lead := nt.peers[1].(*raft)
// Transfer leadership to non-existing node, there will be noop.
nt.send(pb.Message{From: 4, To: 1, Type: pb.MsgTransferLeader})
checkLeaderTransferState(t, lead, StateLeader, 1)
}
func TestLeaderTransferTimeout(t *testing.T) {
nt := newNetwork(nil, nil, nil)
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
nt.isolate(3)
lead := nt.peers[1].(*raft)
// Transfer leadership to isolated node, wait for timeout.
nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
if lead.leadTransferee != 3 {
t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
}
for i := 0; i < lead.heartbeatTimeout; i++ {
lead.tick()
}
if lead.leadTransferee != 3 {
t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
}
for i := 0; i < lead.electionTimeout-lead.heartbeatTimeout; i++ {
lead.tick()
}
checkLeaderTransferState(t, lead, StateLeader, 1)
}
func TestLeaderTransferIgnoreProposal(t *testing.T) {
nt := newNetwork(nil, nil, nil)
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
nt.isolate(3)
lead := nt.peers[1].(*raft)
// Transfer leadership to isolated node to let transfer pending, then send proposal.
nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
if lead.leadTransferee != 3 {
t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
}
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
if lead.prs[1].Match != 1 {
t.Fatalf("node 1 has match %x, want %x", lead.prs[1].Match, 1)
}
}
func TestLeaderTransferReceiveHigherTermVote(t *testing.T) {
nt := newNetwork(nil, nil, nil)
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
nt.isolate(3)
lead := nt.peers[1].(*raft)
// Transfer leadership to isolated node to let transfer pending.
nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
if lead.leadTransferee != 3 {
t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
}
nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup, Index: 1, Term: 2})
checkLeaderTransferState(t, lead, StateFollower, 2)
}
func TestLeaderTransferRemoveNode(t *testing.T) {
nt := newNetwork(nil, nil, nil)
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
nt.ignore(pb.MsgTimeoutNow)
lead := nt.peers[1].(*raft)
// The leadTransferee is removed when leadship transfering.
nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
if lead.leadTransferee != 3 {
t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
}
lead.removeNode(3)
checkLeaderTransferState(t, lead, StateLeader, 1)
}
// TestLeaderTransferBack verifies leadership can transfer back to self when last transfer is pending.
func TestLeaderTransferBack(t *testing.T) {
nt := newNetwork(nil, nil, nil)
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
nt.isolate(3)
lead := nt.peers[1].(*raft)
nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
if lead.leadTransferee != 3 {
t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
}
// Transfer leadership back to self.
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgTransferLeader})
checkLeaderTransferState(t, lead, StateLeader, 1)
}
// TestLeaderTransferSecondTransferToAnotherNode verifies leader can transfer to another node
// when last transfer is pending.
func TestLeaderTransferSecondTransferToAnotherNode(t *testing.T) {
nt := newNetwork(nil, nil, nil)
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
nt.isolate(3)
lead := nt.peers[1].(*raft)
nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
if lead.leadTransferee != 3 {
t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
}
// Transfer leadership to another node.
nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader})
checkLeaderTransferState(t, lead, StateFollower, 2)
}
// TestLeaderTransferSecondTransferToSameNode verifies second transfer leader request
// to the same node should not extend the timeout while the first one is pending.
func TestLeaderTransferSecondTransferToSameNode(t *testing.T) {
nt := newNetwork(nil, nil, nil)
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
nt.isolate(3)
lead := nt.peers[1].(*raft)
nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
if lead.leadTransferee != 3 {
t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
}
for i := 0; i < lead.heartbeatTimeout; i++ {
lead.tick()
}
// Second transfer leadership request to the same node.
nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
for i := 0; i < lead.electionTimeout-lead.heartbeatTimeout; i++ {
lead.tick()
}
checkLeaderTransferState(t, lead, StateLeader, 1)
}
func checkLeaderTransferState(t *testing.T, r *raft, state StateType, lead uint64) {
if r.state != state || r.lead != lead {
t.Fatalf("after transfering, node has state %v lead %v, want state %v lead %v", r.state, r.lead, state, lead)
}
if r.leadTransferee != None {
t.Fatalf("after transfering, node has leadTransferee %v, want leadTransferee %v", r.leadTransferee, None)
}
}
func ents(terms ...uint64) *raft {
storage := NewMemoryStorage()
for i, term := range terms {

View File

@ -70,19 +70,21 @@ func (x *EntryType) UnmarshalJSON(data []byte) error {
type MessageType int32
const (
MsgHup MessageType = 0
MsgBeat MessageType = 1
MsgProp MessageType = 2
MsgApp MessageType = 3
MsgAppResp MessageType = 4
MsgVote MessageType = 5
MsgVoteResp MessageType = 6
MsgSnap MessageType = 7
MsgHeartbeat MessageType = 8
MsgHeartbeatResp MessageType = 9
MsgUnreachable MessageType = 10
MsgSnapStatus MessageType = 11
MsgCheckQuorum MessageType = 12
MsgHup MessageType = 0
MsgBeat MessageType = 1
MsgProp MessageType = 2
MsgApp MessageType = 3
MsgAppResp MessageType = 4
MsgVote MessageType = 5
MsgVoteResp MessageType = 6
MsgSnap MessageType = 7
MsgHeartbeat MessageType = 8
MsgHeartbeatResp MessageType = 9
MsgUnreachable MessageType = 10
MsgSnapStatus MessageType = 11
MsgCheckQuorum MessageType = 12
MsgTransferLeader MessageType = 13
MsgTimeoutNow MessageType = 14
)
var MessageType_name = map[int32]string{
@ -99,21 +101,25 @@ var MessageType_name = map[int32]string{
10: "MsgUnreachable",
11: "MsgSnapStatus",
12: "MsgCheckQuorum",
13: "MsgTransferLeader",
14: "MsgTimeoutNow",
}
var MessageType_value = map[string]int32{
"MsgHup": 0,
"MsgBeat": 1,
"MsgProp": 2,
"MsgApp": 3,
"MsgAppResp": 4,
"MsgVote": 5,
"MsgVoteResp": 6,
"MsgSnap": 7,
"MsgHeartbeat": 8,
"MsgHeartbeatResp": 9,
"MsgUnreachable": 10,
"MsgSnapStatus": 11,
"MsgCheckQuorum": 12,
"MsgHup": 0,
"MsgBeat": 1,
"MsgProp": 2,
"MsgApp": 3,
"MsgAppResp": 4,
"MsgVote": 5,
"MsgVoteResp": 6,
"MsgSnap": 7,
"MsgHeartbeat": 8,
"MsgHeartbeatResp": 9,
"MsgUnreachable": 10,
"MsgSnapStatus": 11,
"MsgCheckQuorum": 12,
"MsgTransferLeader": 13,
"MsgTimeoutNow": 14,
}
func (x MessageType) Enum() *MessageType {

View File

@ -46,6 +46,8 @@ enum MessageType {
MsgUnreachable = 10;
MsgSnapStatus = 11;
MsgCheckQuorum = 12;
MsgTransferLeader = 13;
MsgTimeoutNow = 14;
}
message Message {

View File

@ -168,10 +168,10 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
// Step advances the state machine using the given message.
func (rn *RawNode) Step(m pb.Message) error {
// ignore unexpected local messages receiving over network
if IsLocalMsg(m) {
if IsLocalMsg(m.Type) {
return ErrStepLocalMsg
}
if _, ok := rn.raft.prs[m.From]; ok || !IsResponseMsg(m) {
if _, ok := rn.raft.prs[m.From]; ok || !IsResponseMsg(m.Type) {
return rn.raft.Step(m)
}
return ErrStepPeerNotFound
@ -226,3 +226,8 @@ func (rn *RawNode) ReportSnapshot(id uint64, status SnapshotStatus) {
_ = rn.raft.Step(pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej})
}
// TransferLeader tries to transfer leadership to the given transferee.
func (rn *RawNode) TransferLeader(transferee uint64) {
_ = rn.raft.Step(pb.Message{Type: pb.MsgTransferLeader, From: transferee})
}

View File

@ -33,7 +33,7 @@ func TestRawNodeStep(t *testing.T) {
msgt := raftpb.MessageType(i)
err = rawNode.Step(raftpb.Message{Type: msgt})
// LocalMsg should be ignored.
if msgt == raftpb.MsgBeat || msgt == raftpb.MsgHup || msgt == raftpb.MsgUnreachable || msgt == raftpb.MsgSnapStatus {
if IsLocalMsg(msgt) {
if err != ErrStepLocalMsg {
t.Errorf("%d: step should ignore %s", msgt, msgn)
}

View File

@ -46,12 +46,13 @@ func max(a, b uint64) uint64 {
return b
}
func IsLocalMsg(m pb.Message) bool {
return m.Type == pb.MsgHup || m.Type == pb.MsgBeat || m.Type == pb.MsgUnreachable || m.Type == pb.MsgSnapStatus || m.Type == pb.MsgCheckQuorum
func IsLocalMsg(msgt pb.MessageType) bool {
return msgt == pb.MsgHup || msgt == pb.MsgBeat || msgt == pb.MsgUnreachable ||
msgt == pb.MsgSnapStatus || msgt == pb.MsgCheckQuorum || msgt == pb.MsgTransferLeader
}
func IsResponseMsg(m pb.Message) bool {
return m.Type == pb.MsgAppResp || m.Type == pb.MsgVoteResp || m.Type == pb.MsgHeartbeatResp || m.Type == pb.MsgUnreachable
func IsResponseMsg(msgt pb.MessageType) bool {
return msgt == pb.MsgAppResp || msgt == pb.MsgVoteResp || msgt == pb.MsgHeartbeatResp || msgt == pb.MsgUnreachable
}
// EntryFormatter can be implemented by the application to provide human-readable formatting