Merge pull request #1830 from xiang90/raft_snap_log

raft: log snapshot events
release-2.0
Xiang Li 2014-12-02 12:06:15 -08:00
commit 3cadaca1a3
2 changed files with 17 additions and 3 deletions

View File

@ -67,7 +67,7 @@ func newLog(storage Storage) *raftLog {
} }
func (l *raftLog) String() string { func (l *raftLog) String() string {
return fmt.Sprintf("unstable=%d committed=%d applied=%d len(unstableEntries)=%d", l.unstable.offset, l.committed, l.applied, len(l.unstable.entries)) return fmt.Sprintf("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d", l.unstable.offset, l.committed, l.applied, len(l.unstable.entries))
} }
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise, // maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
@ -243,6 +243,7 @@ func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
} }
func (l *raftLog) restore(s pb.Snapshot) { func (l *raftLog) restore(s pb.Snapshot) {
log.Printf("raftlog: log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term)
l.committed = s.Metadata.Index l.committed = s.Metadata.Index
l.unstable.restore(s) l.unstable.restore(s)
} }

View File

@ -233,6 +233,9 @@ func (r *raft) sendAppend(to uint64) {
panic("need non-empty snapshot") panic("need non-empty snapshot")
} }
m.Snapshot = snapshot m.Snapshot = snapshot
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
log.Printf("raft: %x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [match: %d, next: %d]",
r.id, r.raftLog.firstIndex(), r.Commit, to, sindex, sterm, pr.match, pr.next)
} else { } else {
m.Type = pb.MsgApp m.Type = pb.MsgApp
m.Index = pr.next - 1 m.Index = pr.next - 1
@ -445,9 +448,14 @@ func (r *raft) handleHeartbeat(m pb.Message) {
} }
func (r *raft) handleSnapshot(m pb.Message) { func (r *raft) handleSnapshot(m pb.Message) {
sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
if r.restore(m.Snapshot) { if r.restore(m.Snapshot) {
log.Printf("raft: %x [commit: %d] restored snapshot [index: %d, term: %d]",
r.id, r.Commit, sindex, sterm)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()}) r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
} else { } else {
log.Printf("raft: %x [commit: %d] ignored snapshot [index: %d, term: %d]",
r.id, r.Commit, sindex, sterm)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
} }
} }
@ -573,15 +581,20 @@ func (r *raft) restore(s pb.Snapshot) bool {
if s.Metadata.Index <= r.raftLog.committed { if s.Metadata.Index <= r.raftLog.committed {
return false return false
} }
log.Printf("raft: %x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
r.raftLog.restore(s) r.raftLog.restore(s)
r.prs = make(map[uint64]*progress) r.prs = make(map[uint64]*progress)
for _, n := range s.Metadata.ConfState.Nodes { for _, n := range s.Metadata.ConfState.Nodes {
match, next := uint64(0), uint64(r.raftLog.lastIndex())+1
if n == r.id { if n == r.id {
r.setProgress(n, r.raftLog.lastIndex(), r.raftLog.lastIndex()+1) match = next - 1
} else { } else {
r.setProgress(n, 0, r.raftLog.lastIndex()+1) match = 0
} }
log.Printf("raft: %x restored progress of %x [match: %d, next: %d]", r.id, n, match, next)
r.setProgress(n, match, next)
} }
return true return true
} }