raft: restart using last written entry also

release-2.0
Yicheng Qin 2014-09-14 16:45:45 -07:00
parent a9af70c52b
commit 140fd6d6c4
5 changed files with 9 additions and 5 deletions

View File

@ -107,7 +107,7 @@ func startRaft(id int64, peerIDs []int64, waldir string) (raft.Node, *wal.WAL) {
// restart a node from previous wal
// TODO(xiangli): check snapshot; not open from one
w, err := wal.OpenAtIndex(waldir, 1)
w, err := wal.OpenAtIndex(waldir, 0)
if err != nil {
log.Fatal(err)
}

View File

@ -38,6 +38,11 @@ func (l *raftLog) isEmpty() bool {
return l.offset == 0 && len(l.ents) == 1
}
func (l *raftLog) load(ents []pb.Entry) {
l.ents = ents
l.unstable = l.offset + int64(len(ents))
}
func (l *raftLog) String() string {
return fmt.Sprintf("offset=%d committed=%d applied=%d len(ents)=%d", l.offset, l.committed, l.applied, len(l.ents))
}

View File

@ -171,6 +171,7 @@ func TestNode(t *testing.T) {
func TestNodeRestart(t *testing.T) {
entries := []raftpb.Entry{
{},
{Term: 1, Index: 1},
{Term: 1, Index: 2, Data: []byte("foo")},
}
@ -179,7 +180,7 @@ func TestNodeRestart(t *testing.T) {
want := Ready{
State: emptyState,
// commit upto index commit index in st
CommittedEntries: entries[:st.Commit],
CommittedEntries: entries[1 : st.Commit+1],
}
n := Restart(1, []int64{1}, 0, 0, st, entries)

View File

@ -503,8 +503,7 @@ func (r *raft) loadEnts(ents []pb.Entry) {
if !r.raftLog.isEmpty() {
panic("cannot load entries when log is not empty")
}
r.raftLog.append(0, ents...)
r.raftLog.unstable = r.raftLog.lastIndex() + 1
r.raftLog.load(ents)
}
func (r *raft) loadState(state pb.State) {

View File

@ -65,7 +65,6 @@ type WAL struct {
}
// Create creates a WAL ready for appending records.
// The index of first record saved MUST be 0.
func Create(dirpath string) (*WAL, error) {
log.Printf("path=%s wal.create", dirpath)
if Exist(dirpath) {