From 88767d913d175f28719469e176f9153944500ebb Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 18 Dec 2014 12:02:15 -0800 Subject: [PATCH] raft: leader waits for the reply of previous message when follower is not in good path. It is reasonable for the leader to wait for the reply before sending out the next msgApp or msgSnap for the follower in bad path. Or the leader will send out useless messages if the previous message is rejected or the previous message is a snapshot. Especially for the snapshot case, the leader will be 100% to send out duplicate message including the snapshot, which is a huge waste. This commit implement a timeout based wait mechanism. The timeout for normal msgApp is a heartbeatTimeout and the timeout for snapshot is electionTimeout(snapshot is larger). We can implement a piggyback mechanism(application notifies the msg lost) in the future if necessary. --- raft/raft.go | 32 ++++++++++++++- raft/raft_test.go | 100 ++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 117 insertions(+), 15 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 2f42cfabc..15fc55786 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -56,9 +56,13 @@ func (st StateType) MarshalJSON() ([]byte, error) { return []byte(fmt.Sprintf("%q", st.String())), nil } -type progress struct{ match, next uint64 } +type progress struct { + match, next uint64 + wait int +} func (pr *progress) update(n uint64) { + pr.waitReset() if pr.match < n { pr.match = n } @@ -72,6 +76,7 @@ func (pr *progress) optimisticUpdate(n uint64) { pr.next = n + 1 } // maybeDecrTo returns false if the given to index comes from an out of order message. // Otherwise it decreases the progress next index and returns true. func (pr *progress) maybeDecrTo(to uint64) bool { + pr.waitReset() if pr.match != 0 { // the rejection must be stale if the progress has matched and "to" // is smaller than "match". @@ -94,7 +99,19 @@ func (pr *progress) maybeDecrTo(to uint64) bool { return true } -func (pr *progress) String() string { return fmt.Sprintf("next = %d, match = %d", pr.next, pr.match) } +func (pr *progress) waitDecr(i int) { + pr.wait -= i + if pr.wait < 0 { + pr.wait = 0 + } +} +func (pr *progress) waitSet(w int) { pr.wait = w } +func (pr *progress) waitReset() { pr.wait = 0 } +func (pr *progress) shouldWait() bool { return pr.match == 0 && pr.wait > 0 } + +func (pr *progress) String() string { + return fmt.Sprintf("next = %d, match = %d, wait = %v", pr.next, pr.match, pr.wait) +} type raft struct { pb.HardState @@ -203,6 +220,10 @@ func (r *raft) send(m pb.Message) { // sendAppend sends RRPC, with entries to the given peer. func (r *raft) sendAppend(to uint64) { pr := r.prs[to] + if pr.shouldWait() { + log.Printf("raft: %x ignored sending %s to %x [%s]", r.id, pb.MsgApp, to, pr) + return + } m := pb.Message{} m.To = to if r.needSnapshot(pr.next) { @@ -218,6 +239,7 @@ func (r *raft) sendAppend(to uint64) { sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term log.Printf("raft: %x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr) + pr.waitSet(r.electionTimeout) } else { m.Type = pb.MsgApp m.Index = pr.next - 1 @@ -228,6 +250,11 @@ func (r *raft) sendAppend(to uint64) { // has been matched. if n := len(m.Entries); pr.match != 0 && n != 0 { pr.optimisticUpdate(m.Entries[n-1].Index) + } else if pr.match == 0 { + // TODO (xiangli): better way to find out if the follwer is in good path or not + // a follower might be in bad path even if match != 0, since we optmistically + // increase the next. + pr.waitSet(r.heartbeatTimeout) } } r.send(m) @@ -268,6 +295,7 @@ func (r *raft) bcastHeartbeat() { continue } r.sendHeartbeat(i) + r.prs[i].waitDecr(r.heartbeatTimeout) } } diff --git a/raft/raft_test.go b/raft/raft_test.go index e63d16dab..472e2c32d 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -70,10 +70,10 @@ func TestProgressUpdate(t *testing.T) { } p.update(tt.update) if p.match != tt.wm { - t.Errorf("#%d: match=%d, want %d", i, p.match, tt.wm) + t.Errorf("#%d: match= %d, want %d", i, p.match, tt.wm) } if p.next != tt.wn { - t.Errorf("#%d: next=%d, want %d", i, p.next, tt.wn) + t.Errorf("#%d: next= %d, want %d", i, p.next, tt.wn) } } } @@ -132,17 +132,85 @@ func TestProgressMaybeDecr(t *testing.T) { next: tt.n, } if g := p.maybeDecrTo(tt.to); g != tt.w { - t.Errorf("#%d: maybeDecrTo=%t, want %t", i, g, tt.w) + t.Errorf("#%d: maybeDecrTo= %t, want %t", i, g, tt.w) } if gm := p.match; gm != tt.m { - t.Errorf("#%d: match=%d, want %d", i, gm, tt.m) + t.Errorf("#%d: match= %d, want %d", i, gm, tt.m) } if gn := p.next; gn != tt.wn { - t.Errorf("#%d: next=%d, want %d", i, gn, tt.wn) + t.Errorf("#%d: next= %d, want %d", i, gn, tt.wn) } } } +func TestProgressShouldWait(t *testing.T) { + tests := []struct { + m uint64 + wait int + + w bool + }{ + // match != 0 is always not wait + {1, 0, false}, + {1, 1, false}, + {0, 1, true}, + {0, 0, false}, + } + for i, tt := range tests { + p := &progress{ + match: tt.m, + wait: tt.wait, + } + if g := p.shouldWait(); g != tt.w { + t.Errorf("#%d: shouldwait = %t, want %t", i, g, tt.w) + } + } +} + +// TestProgressWaitReset ensures that progress.Update and progress.DercTo +// will reset progress.wait. +func TestProgressWaitReset(t *testing.T) { + p := &progress{ + wait: 1, + } + p.maybeDecrTo(1) + if p.wait != 0 { + t.Errorf("wait= %d, want 0", p.wait) + } + p.wait = 1 + p.update(2) + if p.wait != 0 { + t.Errorf("wait= %d, want 0", p.wait) + } +} + +// TestProgressDecr ensures raft.heartbeat decreases progress.wait by heartbeat. +func TestProgressDecr(t *testing.T) { + r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) + r.becomeCandidate() + r.becomeLeader() + r.prs[2].wait = r.heartbeatTimeout * 2 + + r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) + if r.prs[2].wait != r.heartbeatTimeout*(2-1) { + t.Errorf("wait = %d, want %d", r.prs[2].wait, r.heartbeatTimeout*(2-1)) + } +} + +func TestProgressWait(t *testing.T) { + r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) + r.becomeCandidate() + r.becomeLeader() + r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) + r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) + r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) + + ms := r.readMessages() + if len(ms) != 1 { + t.Errorf("len(ms) = %d, want 1", len(ms)) + } +} + func TestLeaderElection(t *testing.T) { tests := []struct { *network @@ -269,7 +337,7 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) { // avoid committing ChangeTerm proposal tt.ignore(pb.MsgApp) - // elect 1 as the new leader with term 2 + // elect 2 as the new leader with term 2 tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup}) // no log entries from previous term should be committed @@ -279,10 +347,11 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) { } tt.recover() - - // still be able to append a entry + // send heartbeat; reset wait + tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgBeat}) + // append an entry at current term tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) - + // expect the committed to be advanced if sm.raftLog.committed != 5 { t.Errorf("committed = %d, want %d", sm.raftLog.committed, 5) } @@ -378,6 +447,8 @@ func TestCandidateConcede(t *testing.T) { // heal the partition tt.recover() + // send heartbeat; reset wait + tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgBeat}) data := []byte("force follower") // send a proposal to 2 to flush out a MsgApp to 0 @@ -425,18 +496,21 @@ func TestOldMessages(t *testing.T) { tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup}) tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) - // pretend we're an old leader trying to make progress - tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgApp, Term: 1, Entries: []pb.Entry{{Term: 1}}}) + // pretend we're an old leader trying to make progress; this entry is expected to be ignored. + tt.send(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, Entries: []pb.Entry{{Index: 3, Term: 2}}}) + // commit a new entry + tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) l := &raftLog{ storage: &MemoryStorage{ ents: []pb.Entry{ {}, {Data: nil, Term: 1, Index: 1}, {Data: nil, Term: 2, Index: 2}, {Data: nil, Term: 3, Index: 3}, + {Data: []byte("somedata"), Term: 3, Index: 4}, }, }, - unstable: unstable{offset: 4}, - committed: 3, + unstable: unstable{offset: 5}, + committed: 4, } base := ltoa(l) for i, p := range tt.peers {