raft: implemented leader lease when quorum check is on

release-3.0
swingbach@gmail.com 2016-05-27 17:23:18 +08:00
parent 77775e8e92
commit 337ef64ed5
2 changed files with 205 additions and 8 deletions

View File

@ -423,12 +423,9 @@ func (r *raft) appendEntry(es ...pb.Entry) {
// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
if !r.promotable() {
r.electionElapsed = 0
return
}
r.electionElapsed++
if r.pastElectionTimeout() {
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
@ -565,15 +562,35 @@ func (r *raft) Step(m pb.Message) error {
case m.Term > r.Term:
lead := m.From
if m.Type == pb.MsgVote {
if r.checkQuorum && r.state != StateCandidate && r.electionElapsed < r.electionTimeout {
// If a server receives a RequestVote request within the minimum election timeout
// of hearing from a current leader, it does not update its term or grant its vote
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored vote from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
return nil
}
lead = None
}
r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
r.id, r.Term, m.Type, m.From, m.Term)
r.becomeFollower(m.Term, lead)
case m.Term < r.Term:
// ignore
r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
r.id, r.Term, m.Type, m.From, m.Term)
if r.checkQuorum && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
// We have received messages from a leader at a lower term. It is possible that these messages were
// simply delayed in the network, but this could also mean that this node has advanced its term number
// during a network partition, and it is now unable to either win an election or to rejoin the majority
// on the old term. If checkQuorum is false, this will be handled by incrementing term numbers in response
// to MsgVote with a higher term, but if checkQuorum is true we may not advance the term on MsgVote and
// must generate other messages to advance the term. The net result of these two features is to minimize
// the disruption caused by nodes that have been removed from the cluster's configuration: a removed node
// will send MsgVotes which will be ignored, but it will not receive MsgApp or MsgHeartbeat, so it will not
// create disruptive term increases
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
} else {
// ignore other cases
r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
r.id, r.Term, m.Type, m.From, m.Term)
}
return nil
}
r.step(r, m)

View File

@ -1209,6 +1209,186 @@ func TestLeaderStepdownWhenQuorumLost(t *testing.T) {
}
}
func TestLeaderSupersedingWithCheckQuorum(t *testing.T) {
a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
a.checkQuorum = true
b.checkQuorum = true
c.checkQuorum = true
nt := newNetwork(a, b, c)
// Prevent campaigning from b
b.randomizedElectionTimeout = b.electionTimeout + 1
for i := 0; i < b.electionTimeout; i++ {
b.tick()
}
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
if a.state != StateLeader {
t.Errorf("state = %s, want %s", a.state, StateLeader)
}
if c.state != StateFollower {
t.Errorf("state = %s, want %s", c.state, StateFollower)
}
nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
// Peer b rejected c's vote since its electionElapsed had not reached to electionTimeout
if c.state != StateCandidate {
t.Errorf("state = %s, want %s", c.state, StateCandidate)
}
// Letting b's electionElapsed reach to electionTimeout
for i := 0; i < b.electionTimeout; i++ {
b.tick()
}
nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
if c.state != StateLeader {
t.Errorf("state = %s, want %s", c.state, StateLeader)
}
}
func TestLeaderElectionWithCheckQuorum(t *testing.T) {
a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
a.checkQuorum = true
b.checkQuorum = true
c.checkQuorum = true
nt := newNetwork(a, b, c)
// Letting b's electionElapsed reach to timeout so that it can vote for a
for i := 0; i < b.electionTimeout; i++ {
b.tick()
}
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
if a.state != StateLeader {
t.Errorf("state = %s, want %s", a.state, StateLeader)
}
if c.state != StateFollower {
t.Errorf("state = %s, want %s", c.state, StateFollower)
}
for i := 0; i < a.electionTimeout; i++ {
a.tick()
}
for i := 0; i < b.electionTimeout; i++ {
b.tick()
}
nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
if a.state != StateFollower {
t.Errorf("state = %s, want %s", a.state, StateFollower)
}
if c.state != StateLeader {
t.Errorf("state = %s, want %s", c.state, StateLeader)
}
}
// TestFreeStuckCandidateWithCheckQuorum ensures that a candidate with a higher term
// can disrupt the leader even if the leader still "officially" holds the lease, The
// leader is expected to step down and adopt the candidate's term
func TestFreeStuckCandidateWithCheckQuorum(t *testing.T) {
a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
a.checkQuorum = true
b.checkQuorum = true
c.checkQuorum = true
nt := newNetwork(a, b, c)
for i := 0; i < b.electionTimeout; i++ {
b.tick()
}
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
nt.isolate(1)
nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
if b.state != StateFollower {
t.Errorf("state = %s, want %s", b.state, StateFollower)
}
if c.state != StateCandidate {
t.Errorf("state = %s, want %s", c.state, StateCandidate)
}
if c.Term != b.Term+1 {
t.Errorf("term = %d, want %d", c.Term, b.Term+1)
}
// Vote again for safety
nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
if b.state != StateFollower {
t.Errorf("state = %s, want %s", b.state, StateFollower)
}
if c.state != StateCandidate {
t.Errorf("state = %s, want %s", c.state, StateCandidate)
}
if c.Term != b.Term+2 {
t.Errorf("term = %d, want %d", c.Term, b.Term+2)
}
nt.recover()
nt.send(pb.Message{From: 1, To: 3, Type: pb.MsgHeartbeat, Term: a.Term})
// Disrupt the leader so that the stuck peer is freed
if a.state != StateFollower {
t.Errorf("state = %s, want %s", a.state, StateFollower)
}
if c.Term != a.Term {
t.Errorf("term = %d, want %d", c.Term, a.Term)
}
}
func TestNonPromotableVoterWithCheckQuorum(t *testing.T) {
a := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
b := newTestRaft(2, []uint64{1}, 10, 1, NewMemoryStorage())
a.checkQuorum = true
b.checkQuorum = true
nt := newNetwork(a, b)
// Need to remove 2 again to make it a non-promotable node since newNetwork overwritten some internal states
b.delProgress(2)
if b.promotable() {
t.Fatalf("promotable = %v, want false", b.promotable())
}
for i := 0; i < b.electionTimeout; i++ {
b.tick()
}
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
if a.state != StateLeader {
t.Errorf("state = %s, want %s", a.state, StateLeader)
}
if b.state != StateFollower {
t.Errorf("state = %s, want %s", b.state, StateFollower)
}
if b.lead != 1 {
t.Errorf("lead = %d, want 1", b.lead)
}
}
func TestLeaderAppResp(t *testing.T) {
// initial progress: match = 0; next = 3
tests := []struct {