From 66252c7d6264503c2288227ce9e94c582dadbf10 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 26 Nov 2014 13:36:17 -0800 Subject: [PATCH] raft: move all unstable stuff into one struct for future cleanup --- raft/log.go | 84 +++++++++++++++++++++++++---------------------- raft/log_test.go | 8 ++--- raft/node.go | 8 ++--- raft/raft_test.go | 20 +++++------ 4 files changed, 63 insertions(+), 57 deletions(-) diff --git a/raft/log.go b/raft/log.go index fe8a2cb58..351c379ed 100644 --- a/raft/log.go +++ b/raft/log.go @@ -27,16 +27,10 @@ type raftLog struct { // storage contains all stable entries since the last snapshot. storage Storage - // the incoming unstable snapshot, if any. - unstableSnapshot *pb.Snapshot - // unstableEnts contains all entries that have not yet been written - // to storage. - unstableEnts []pb.Entry - // unstableEnts[i] has raft log position i+unstable. Note that - // unstable may be less than the highest log position in storage; - // this means that the next write to storage will truncate the log - // before persisting unstableEnts. - unstable uint64 + // unstable contains all unstable entries and snapshot. + // they will be saved into storage. + unstable unstable + // committed is the highest log position that is known to be in // stable storage on a quorum of nodes. // Invariant: committed < unstable @@ -47,6 +41,18 @@ type raftLog struct { applied uint64 } +// unstable.entris[i] has raft log position i+unstable.offset. +// Note that unstable.offset may be less than the highest log +// position in storage; this means that the next write to storage +// might need to truncate the log before persisting unstable.entries. +type unstable struct { + // the incoming unstable snapshot, if any. + snapshot *pb.Snapshot + // all entries that have not yet been written to storage. + entries []pb.Entry + offset uint64 +} + // newLog returns log using the given storage. It recovers the log to the state // that it just commits and applies the lastest snapshot. func newLog(storage Storage) *raftLog { @@ -64,7 +70,7 @@ func newLog(storage Storage) *raftLog { if err != nil { panic(err) // TODO(bdarnell) } - log.unstable = lastIndex + 1 + log.unstable.offset = lastIndex + 1 // Initialize our committed and applied pointers to the time of the last compaction. log.committed = firstIndex - 1 log.applied = firstIndex - 1 @@ -73,7 +79,7 @@ func newLog(storage Storage) *raftLog { } func (l *raftLog) String() string { - return fmt.Sprintf("unstable=%d committed=%d applied=%d len(unstableEntries)=%d", l.unstable, l.committed, l.applied, len(l.unstableEnts)) + return fmt.Sprintf("unstable=%d committed=%d applied=%d len(unstableEntries)=%d", l.unstable.offset, l.committed, l.applied, len(l.unstable.entries)) } // maybeAppend returns (0, false) if the entries cannot be appended. Otherwise, @@ -100,15 +106,15 @@ func (l *raftLog) append(after uint64, ents ...pb.Entry) uint64 { if after < l.committed { log.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed) } - if after < l.unstable { + if after < l.unstable.offset { // The log is being truncated to before our current unstable // portion, so discard it and reset unstable. - l.unstableEnts = nil - l.unstable = after + 1 + l.unstable.entries = nil + l.unstable.offset = after + 1 } // Truncate any unstable entries that are being replaced, then // append the new ones. - l.unstableEnts = append(l.unstableEnts[:after+1-l.unstable], ents...) + l.unstable.entries = append(l.unstable.entries[:after+1-l.unstable.offset], ents...) return l.lastIndex() } @@ -134,11 +140,11 @@ func (l *raftLog) findConflict(from uint64, ents []pb.Entry) uint64 { } func (l *raftLog) unstableEntries() []pb.Entry { - if len(l.unstableEnts) == 0 { + if len(l.unstable.entries) == 0 { return nil } // copy unstable entries to an empty slice - return append([]pb.Entry{}, l.unstableEnts...) + return append([]pb.Entry{}, l.unstable.entries...) } // nextEnts returns all the available entries for execution. @@ -153,15 +159,15 @@ func (l *raftLog) nextEnts() (ents []pb.Entry) { } func (l *raftLog) snapshot() (pb.Snapshot, error) { - if l.unstableSnapshot != nil { - return *l.unstableSnapshot, nil + if l.unstable.snapshot != nil { + return *l.unstable.snapshot, nil } return l.storage.Snapshot() } func (l *raftLog) firstIndex() uint64 { - if l.unstableSnapshot != nil { - return l.unstableSnapshot.Metadata.Index + 1 + if l.unstable.snapshot != nil { + return l.unstable.snapshot.Metadata.Index + 1 } index, err := l.storage.FirstIndex() if err != nil { @@ -171,7 +177,7 @@ func (l *raftLog) firstIndex() uint64 { } func (l *raftLog) lastIndex() uint64 { - return l.unstable + uint64(len(l.unstableEnts)) - 1 + return l.unstable.offset + uint64(len(l.unstable.entries)) - 1 } func (l *raftLog) commitTo(tocommit uint64) { @@ -195,12 +201,12 @@ func (l *raftLog) appliedTo(i uint64) { } func (l *raftLog) stableTo(i uint64) { - if i < l.unstable || i+1-l.unstable > uint64(len(l.unstableEnts)) { + if i < l.unstable.offset || i+1-l.unstable.offset > uint64(len(l.unstable.entries)) { log.Panicf("stableTo(%d) is out of range [unstable(%d), len(unstableEnts)(%d)]", - i, l.unstable, len(l.unstableEnts)) + i, l.unstable.offset, len(l.unstable.entries)) } - l.unstableEnts = l.unstableEnts[i+1-l.unstable:] - l.unstable = i + 1 + l.unstable.entries = l.unstable.entries[i+1-l.unstable.offset:] + l.unstable.offset = i + 1 } func (l *raftLog) lastTerm() uint64 { @@ -211,8 +217,8 @@ func (l *raftLog) term(i uint64) uint64 { switch { case i > l.lastIndex(): return 0 - case i < l.unstable: - if snap := l.unstableSnapshot; snap != nil { + case i < l.unstable.offset: + if snap := l.unstable.snapshot; snap != nil { if i == snap.Metadata.Index { return snap.Metadata.Term } @@ -228,7 +234,7 @@ func (l *raftLog) term(i uint64) uint64 { panic(err) // TODO(bdarnell) } default: - return l.unstableEnts[i-l.unstable].Term + return l.unstable.entries[i-l.unstable.offset].Term } } @@ -265,9 +271,9 @@ func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { func (l *raftLog) restore(s pb.Snapshot) { l.committed = s.Metadata.Index - l.unstable = l.committed - l.unstableEnts = []pb.Entry{{Index: s.Metadata.Index, Term: s.Metadata.Term}} - l.unstableSnapshot = &s + l.unstable.offset = l.committed + l.unstable.entries = []pb.Entry{{Index: s.Metadata.Index, Term: s.Metadata.Term}} + l.unstable.snapshot = &s } // slice returns a slice of log entries from lo through hi-1, inclusive. @@ -279,20 +285,20 @@ func (l *raftLog) slice(lo uint64, hi uint64) []pb.Entry { return nil } var ents []pb.Entry - if lo < l.unstable { - storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable)) + if lo < l.unstable.offset { + storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset)) if err == ErrCompacted { // This should never fail because it has been checked before. - log.Panicf("entries[%d:%d) from storage is out of bound", lo, min(hi, l.unstable)) + log.Panicf("entries[%d:%d) from storage is out of bound", lo, min(hi, l.unstable.offset)) return nil } else if err != nil { panic(err) // TODO(bdarnell) } ents = append(ents, storedEnts...) } - if hi > l.unstable { - firstUnstable := max(lo, l.unstable) - ents = append(ents, l.unstableEnts[firstUnstable-l.unstable:hi-l.unstable]...) + if hi > l.unstable.offset { + firstUnstable := max(lo, l.unstable.offset) + ents = append(ents, l.unstable.entries[firstUnstable-l.unstable.offset:hi-l.unstable.offset]...) } return ents } diff --git a/raft/log_test.go b/raft/log_test.go index 80a57428a..e3a324030 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -143,7 +143,7 @@ func TestAppend(t *testing.T) { if g := raftLog.entries(1); !reflect.DeepEqual(g, tt.wents) { t.Errorf("#%d: logEnts = %+v, want %+v", i, g, tt.wents) } - if g := raftLog.unstable; g != tt.wunstable { + if g := raftLog.unstable.offset; g != tt.wunstable { t.Errorf("#%d: unstable = %d, want %d", i, g, tt.wunstable) } } @@ -398,7 +398,7 @@ func TestUnstableEnts(t *testing.T) { t.Errorf("#%d: unstableEnts = %+v, want %+v", i, ents, tt.wents) } w := previousEnts[len(previousEnts)-1].Index + 1 - if g := raftLog.unstable; g != w { + if g := raftLog.unstable.offset; g != w { t.Errorf("#%d: unstable = %d, want %d", i, g, w) } } @@ -448,7 +448,7 @@ func TestStableTo(t *testing.T) { raftLog := newLog(NewMemoryStorage()) raftLog.append(0, []pb.Entry{{}, {}}...) raftLog.stableTo(tt.stable) - if raftLog.unstable != tt.wunstable { + if raftLog.unstable.offset != tt.wunstable { t.Errorf("#%d: unstable = %d, want %d", i, raftLog.unstable, tt.wunstable) } } @@ -520,7 +520,7 @@ func TestLogRestore(t *testing.T) { if raftLog.committed != index { t.Errorf("comitted = %d, want %d", raftLog.committed, index) } - if raftLog.unstable != index+1 { + if raftLog.unstable.offset != index+1 { t.Errorf("unstable = %d, want %d", raftLog.unstable, index+1) } if raftLog.term(index) != term { diff --git a/raft/node.go b/raft/node.go index 827f9a9b4..51c2bcb18 100644 --- a/raft/node.go +++ b/raft/node.go @@ -306,8 +306,8 @@ func (n *node) run(r *raft) { r.raftLog.stableTo(prevLastUnstablei) havePrevLastUnstablei = false } - if r.raftLog.unstableSnapshot != nil && r.raftLog.unstableSnapshot.Metadata.Index == prevSnapi { - r.raftLog.unstableSnapshot = nil + if r.raftLog.unstable.snapshot != nil && r.raftLog.unstable.snapshot.Metadata.Index == prevSnapi { + r.raftLog.unstable.snapshot = nil } advancec = nil case <-n.stop: @@ -405,8 +405,8 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { if !isHardStateEqual(r.HardState, prevHardSt) { rd.HardState = r.HardState } - if r.raftLog.unstableSnapshot != nil { - rd.Snapshot = *r.raftLog.unstableSnapshot + if r.raftLog.unstable.snapshot != nil { + rd.Snapshot = *r.raftLog.unstable.snapshot } return rd } diff --git a/raft/raft_test.go b/raft/raft_test.go index 4969fbfc8..13d14f34a 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -337,7 +337,7 @@ func TestDuelingCandidates(t *testing.T) { wlog := &raftLog{ storage: &MemoryStorage{ents: []pb.Entry{{}, pb.Entry{Data: nil, Term: 1, Index: 1}}}, committed: 1, - unstable: 2, + unstable: unstable{offset: 2}, } tests := []struct { sm *raft @@ -394,7 +394,7 @@ func TestCandidateConcede(t *testing.T) { storage: &MemoryStorage{ ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, }, - unstable: 3, + unstable: unstable{offset: 3}, committed: 2, }) for i, p := range tt.peers { @@ -435,7 +435,7 @@ func TestOldMessages(t *testing.T) { {Data: nil, Term: 2, Index: 2}, {Data: nil, Term: 3, Index: 3}, }, }, - unstable: 4, + unstable: unstable{offset: 4}, committed: 3, } base := ltoa(l) @@ -492,7 +492,7 @@ func TestProposal(t *testing.T) { storage: &MemoryStorage{ ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, }, - unstable: 3, + unstable: unstable{offset: 3}, committed: 2} } base := ltoa(wantLog) @@ -531,7 +531,7 @@ func TestProposalByProxy(t *testing.T) { storage: &MemoryStorage{ ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}}, }, - unstable: 3, + unstable: unstable{offset: 3}, committed: 2} base := ltoa(wantLog) for i, p := range tt.peers { @@ -585,7 +585,7 @@ func TestCommit(t *testing.T) { prs[uint64(j)] = &progress{tt.matches[j], tt.matches[j] + 1} } sm := &raft{ - raftLog: &raftLog{storage: &MemoryStorage{ents: tt.logs}, unstable: uint64(len(tt.logs))}, + raftLog: &raftLog{storage: &MemoryStorage{ents: tt.logs}, unstable: unstable{offset: uint64(len(tt.logs))}}, prs: prs, HardState: pb.HardState{Term: tt.smTerm}, } @@ -681,7 +681,7 @@ func TestHandleMsgApp(t *testing.T) { raftLog: &raftLog{ committed: 0, storage: &MemoryStorage{ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}}, - unstable: 3, + unstable: unstable{offset: 3}, }, } @@ -781,7 +781,7 @@ func TestRecvMsgVote(t *testing.T) { sm.HardState = pb.HardState{Vote: tt.voteFor} sm.raftLog = &raftLog{ storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 2}, {Index: 2, Term: 2}}}, - unstable: 3, + unstable: unstable{offset: 3}, } sm.Step(pb.Message{Type: pb.MsgVote, From: 2, Index: tt.i, LogTerm: tt.term}) @@ -929,7 +929,7 @@ func TestLeaderAppResp(t *testing.T) { sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) sm.raftLog = &raftLog{ storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}, - unstable: 3, + unstable: unstable{offset: 3}, } sm.becomeCandidate() sm.becomeLeader() @@ -1351,7 +1351,7 @@ func ents(terms ...uint64) *raft { sm := &raft{ raftLog: &raftLog{ storage: &MemoryStorage{ents: ents}, - unstable: uint64(len(ents)), + unstable: unstable{offset: uint64(len(ents))}, }, } sm.reset(0)