Merge pull request #1837 from xiang90/fix_restore

raft: do not restore snapshot if local raft has longer matching history
release-2.0
Xiang Li 2014-12-02 21:48:43 -08:00
commit 7e01c02abb
2 changed files with 41 additions and 0 deletions

View File

@ -581,6 +581,13 @@ func (r *raft) restore(s pb.Snapshot) bool {
if s.Metadata.Index <= r.raftLog.committed {
return false
}
if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
log.Printf("raft: %x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
r.raftLog.commitTo(s.Metadata.Index)
return false
}
log.Printf("raft: %x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)

View File

@ -1121,6 +1121,40 @@ func TestRestore(t *testing.T) {
}
}
func TestRestoreIgnoreSnapshot(t *testing.T) {
previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
commit := uint64(1)
storage := NewMemoryStorage()
sm := newRaft(1, []uint64{1, 2}, 10, 1, storage)
sm.raftLog.append(previousEnts...)
sm.raftLog.commitTo(commit)
s := pb.Snapshot{
Metadata: pb.SnapshotMetadata{
Index: commit,
Term: 1,
ConfState: pb.ConfState{Nodes: []uint64{1, 2}},
},
}
// ignore snapshot
if ok := sm.restore(s); ok {
t.Errorf("restore = %t, want %t", ok, false)
}
if sm.raftLog.committed != commit {
t.Errorf("commit = %d, want %d", sm.raftLog.committed, commit)
}
// ignore snapshot and fast forward commit
s.Metadata.Index = commit + 1
if ok := sm.restore(s); ok {
t.Errorf("restore = %t, want %t", ok, false)
}
if sm.raftLog.committed != commit+1 {
t.Errorf("commit = %d, want %d", sm.raftLog.committed, commit+1)
}
}
func TestProvideSnap(t *testing.T) {
// restore the statemachin from a snapshot
// so it has a compacted log and a snapshot