From 0af1679b6103d4746011e7a67f0c717b7e66ab83 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Fri, 30 Dec 2016 16:26:27 -0800 Subject: [PATCH] raftexample: load snapshot when opening WAL Fix https://github.com/coreos/etcd/issues/7056. Previously we don't load snapshot when replaying WAL. --- contrib/raftexample/kvstore.go | 1 + contrib/raftexample/raft.go | 29 ++++++++++++++++++++++++----- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/contrib/raftexample/kvstore.go b/contrib/raftexample/kvstore.go index 5651cf0f5..b378ec336 100644 --- a/contrib/raftexample/kvstore.go +++ b/contrib/raftexample/kvstore.go @@ -77,6 +77,7 @@ func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) { if err := s.recoverFromSnapshot(snapshot.Data); err != nil { log.Panic(err) } + continue } var dataKv kv diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go index 67fa47ae9..3d6878eb5 100644 --- a/contrib/raftexample/raft.go +++ b/contrib/raftexample/raft.go @@ -94,7 +94,6 @@ func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, waldir: fmt.Sprintf("raftexample-%d", id), snapdir: fmt.Sprintf("raftexample-%d-snap", id), getSnapshot: getSnapshot, - raftStorage: raft.NewMemoryStorage(), snapCount: defaultSnapCount, stopc: make(chan struct{}), httpstopc: make(chan struct{}), @@ -185,8 +184,16 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool { return true } +func (rc *raftNode) loadSnapshot() *raftpb.Snapshot { + snapshot, err := rc.snapshotter.Load() + if err != nil && err != snap.ErrNoSnapshot { + log.Fatalf("raftexample: error loading snapshot (%v)", err) + } + return snapshot +} + // openWAL returns a WAL ready for reading. -func (rc *raftNode) openWAL() *wal.WAL { +func (rc *raftNode) openWAL(snapshot *raftpb.Snapshot) *wal.WAL { if !wal.Exist(rc.waldir) { if err := os.Mkdir(rc.waldir, 0750); err != nil { log.Fatalf("raftexample: cannot create dir for wal (%v)", err) @@ -199,7 +206,12 @@ func (rc *raftNode) openWAL() *wal.WAL { w.Close() } - w, err := wal.Open(rc.waldir, walpb.Snapshot{}) + walsnap := walpb.Snapshot{} + if snapshot != nil { + walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term + } + log.Printf("loading WAL at term %d and index %d", walsnap.Term, walsnap.Index) + w, err := wal.Open(rc.waldir, walsnap) if err != nil { log.Fatalf("raftexample: error loading wal (%v)", err) } @@ -209,11 +221,19 @@ func (rc *raftNode) openWAL() *wal.WAL { // replayWAL replays WAL entries into the raft instance. func (rc *raftNode) replayWAL() *wal.WAL { - w := rc.openWAL() + log.Printf("replaying WAL of member %d", rc.id) + snapshot := rc.loadSnapshot() + w := rc.openWAL(snapshot) _, st, ents, err := w.ReadAll() if err != nil { log.Fatalf("raftexample: failed to read WAL (%v)", err) } + rc.raftStorage = raft.NewMemoryStorage() + if snapshot != nil { + rc.raftStorage.ApplySnapshot(*snapshot) + } + rc.raftStorage.SetHardState(st) + // append to storage so raft starts at the right place in log rc.raftStorage.Append(ents) // send nil once lastIndex is published so client knows commit channel is current @@ -222,7 +242,6 @@ func (rc *raftNode) replayWAL() *wal.WAL { } else { rc.commitC <- nil } - rc.raftStorage.SetHardState(st) return w }