From ab61a8aa9a881e1d1a8f4cabcece4106257c94e0 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 16 Sep 2014 18:18:45 -0700 Subject: [PATCH] *: init for on disk snap support --- etcdserver/server.go | 64 +++++++++++-- etcdserver/server_test.go | 147 +++++++++++++++++++++++------ functional/http_functional_test.go | 14 ++- main.go | 91 +++++++++++------- raft/log.go | 13 ++- raft/node.go | 57 ++++++++--- raft/node_test.go | 53 ++++++++++- raft/raft.go | 3 - snap/snapshotter.go | 10 +- snap/snapshotter_test.go | 10 +- wal/wal.go | 1 + 11 files changed, 359 insertions(+), 104 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index d60ce9933..dee3910f9 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -17,7 +17,10 @@ import ( "github.com/coreos/etcd/wait" ) -const defaultSyncTimeout = time.Second +const ( + defaultSyncTimeout = time.Second + DefaultSnapCount = 10000 +) var ( ErrUnknownMethod = errors.New("etcdserver: unknown method") @@ -33,6 +36,19 @@ type Response struct { err error } +type Storage interface { + // Save function saves ents and state to the underlying stable storage. + // Save MUST block until st and ents are on stable storage. + Save(st raftpb.HardState, ents []raftpb.Entry) + // SaveSnap function saves snapshot to the underlying stable storage. + SaveSnap(snap raftpb.Snapshot) + + // TODO: WAL should be able to control cut itself. After implement self-controled cut, + // remove it in this interface. + // Cut cuts out a new wal file for saving new state and entries. + Cut() error +} + type Server interface { // Start performs any initialization of the Server necessary for it to // begin serving requests. It must be called before Do or Process. @@ -63,18 +79,21 @@ type EtcdServer struct { // panic. Send SendFunc - // Save specifies the save function for saving ents to stable storage. - // Save MUST block until st and ents are on stable storage. If Send is - // nil, server will panic. - Save func(st raftpb.HardState, ents []raftpb.Entry) + Storage Storage Ticker <-chan time.Time SyncTicker <-chan time.Time + + SnapCount int64 // number of entries to trigger a snapshot } // Start prepares and starts server in a new goroutine. It is no longer safe to // modify a server's fields after it has been sent to Start. func (s *EtcdServer) Start() { + if s.SnapCount == 0 { + log.Printf("etcdserver: set snapshot count to default %d", DefaultSnapCount) + s.SnapCount = DefaultSnapCount + } s.w = wait.New() s.done = make(chan struct{}) go s.run() @@ -86,12 +105,15 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { func (s *EtcdServer) run() { var syncC <-chan time.Time + // snapi indicates the index of the last submitted snapshot request + var snapi, appliedi int64 for { select { case <-s.Ticker: s.Node.Tick() case rd := <-s.Node.Ready(): - s.Save(rd.HardState, rd.Entries) + s.Storage.Save(rd.HardState, rd.Entries) + s.Storage.SaveSnap(rd.Snapshot) s.Send(rd.Messages) // TODO(bmizerany): do this in the background, but take @@ -103,6 +125,24 @@ func (s *EtcdServer) run() { panic("TODO: this is bad, what do we do about it?") } s.w.Trigger(r.Id, s.apply(r)) + appliedi = e.Index + } + + if rd.Snapshot.Index > snapi { + snapi = rd.Snapshot.Index + } + + // recover from snapshot if it is more updated than current applied + if rd.Snapshot.Index > appliedi { + if err := s.Store.Recovery(rd.Snapshot.Data); err != nil { + panic("TODO: this is bad, what do we do about it?") + } + appliedi = rd.Snapshot.Index + } + + if appliedi-snapi > s.SnapCount { + s.snapshot() + snapi = appliedi } if rd.SoftState != nil { @@ -241,6 +281,18 @@ func (s *EtcdServer) apply(r pb.Request) Response { } } +// TODO: non-blocking snapshot +func (s *EtcdServer) snapshot() { + d, err := s.Store.Save() + // TODO: current store will never fail to do a snapshot + // what should we do if the store might fail? + if err != nil { + panic("TODO: this is bad, what do we do about it?") + } + s.Node.Compact(d) + s.Storage.Cut() +} + // TODO: move the function to /id pkg maybe? // GenID generates a random id that is not equal to 0. func GenID() int64 { diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 74563121a..534f06780 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -162,11 +162,11 @@ func testServer(t *testing.T, ns int64) { tk := time.NewTicker(10 * time.Millisecond) defer tk.Stop() srv := &EtcdServer{ - Node: n, - Store: store.New(), - Send: send, - Save: func(_ raftpb.HardState, _ []raftpb.Entry) {}, - Ticker: tk.C, + Node: n, + Store: store.New(), + Send: send, + Storage: &storageRecorder{}, + Ticker: tk.C, } srv.Start() // TODO(xiangli): randomize election timeout @@ -231,11 +231,11 @@ func TestDoProposal(t *testing.T) { // this makes <-tk always successful, which accelerates internal clock close(tk) srv := &EtcdServer{ - Node: n, - Store: st, - Send: func(_ []raftpb.Message) {}, - Save: func(_ raftpb.HardState, _ []raftpb.Entry) {}, - Ticker: tk, + Node: n, + Store: st, + Send: func(_ []raftpb.Message) {}, + Storage: &storageRecorder{}, + Ticker: tk, } srv.Start() resp, err := srv.Do(ctx, tt) @@ -299,11 +299,11 @@ func TestDoProposalStopped(t *testing.T) { close(tk) srv := &EtcdServer{ // TODO: use fake node for better testability - Node: n, - Store: st, - Send: func(_ []raftpb.Message) {}, - Save: func(_ raftpb.HardState, _ []raftpb.Entry) {}, - Ticker: tk, + Node: n, + Store: st, + Send: func(_ []raftpb.Message) {}, + Storage: &storageRecorder{}, + Ticker: tk, } srv.Start() @@ -417,7 +417,7 @@ func TestSyncTriggerDeleteExpriedKeys(t *testing.T) { Node: n, Store: st, Send: func(_ []raftpb.Message) {}, - Save: func(_ raftpb.HardState, _ []raftpb.Entry) {}, + Storage: &storageRecorder{}, SyncTicker: syncTicker.C, } srv.Start() @@ -435,6 +435,73 @@ func TestSyncTriggerDeleteExpriedKeys(t *testing.T) { } } +// snapshot should snapshot the store and cut the persistent +// TODO: node.Compact is called... we need to make the node an interface +func TestSnapshot(t *testing.T) { + n := raft.Start(0xBAD0, []int64{0xBAD0}, 10, 1) + defer n.Stop() + st := &storeRecorder{} + p := &storageRecorder{} + s := &EtcdServer{ + Store: st, + Storage: p, + Node: n, + } + + s.snapshot() + action := st.Action() + if len(action) != 1 { + t.Fatalf("len(action) = %d, want 1", len(action)) + } + if action[0] != "Save" { + t.Errorf("action = %s, want Save", action[0]) + } + + action = p.Action() + if len(action) != 1 { + t.Fatalf("len(action) = %d, want 1", len(action)) + } + if action[0] != "Cut" { + t.Errorf("action = %s, want Cut", action[0]) + } +} + +// Applied > SnapCount should trigger a SaveSnap event +// TODO: receive a snapshot from raft leader should also be able +// to trigger snapSave and also trigger a store.Recover. +// We need fake node! +func TestTriggerSnap(t *testing.T) { + ctx := context.Background() + n := raft.Start(0xBAD0, []int64{0xBAD0}, 10, 1) + n.Campaign(ctx) + st := &storeRecorder{} + p := &storageRecorder{} + s := &EtcdServer{ + Store: st, + Send: func(_ []raftpb.Message) {}, + Storage: p, + Node: n, + SnapCount: 10, + } + + s.Start() + for i := 0; int64(i) < s.SnapCount; i++ { + s.Do(ctx, pb.Request{Method: "PUT", Id: 1}) + } + time.Sleep(time.Millisecond) + s.Stop() + + action := p.Action() + // each operation is recorded as a Save + // Nop + SnapCount * Puts + Cut + SaveSnap = Save + SnapCount * Save + Cut + SaveSnap + if len(action) != 3+int(s.SnapCount) { + t.Fatalf("len(action) = %d, want %d", len(action), 3+int(s.SnapCount)) + } + if action[12] != "SaveSnap" { + t.Errorf("action = %s, want SaveSnap", action[12]) + } +} + // TODO: test wait trigger correctness in multi-server case func TestGetBool(t *testing.T) { @@ -458,23 +525,28 @@ func TestGetBool(t *testing.T) { } } -type storeRecorder struct { +type recorder struct { sync.Mutex action []string } -func (s *storeRecorder) record(action string) { - s.Lock() - s.action = append(s.action, action) - s.Unlock() +func (r *recorder) record(action string) { + r.Lock() + r.action = append(r.action, action) + r.Unlock() } -func (s *storeRecorder) Action() []string { - s.Lock() - cpy := make([]string, len(s.action)) - copy(cpy, s.action) - s.Unlock() +func (r *recorder) Action() []string { + r.Lock() + cpy := make([]string, len(r.action)) + copy(cpy, r.action) + r.Unlock() return cpy } + +type storeRecorder struct { + recorder +} + func (s *storeRecorder) Version() int { return 0 } func (s *storeRecorder) Index() uint64 { return 0 } func (s *storeRecorder) Get(_ string, _, _ bool) (*store.Event, error) { @@ -509,7 +581,10 @@ func (s *storeRecorder) Watch(_ string, _, _ bool, _ uint64) (store.Watcher, err s.record("Watch") return &stubWatcher{}, nil } -func (s *storeRecorder) Save() ([]byte, error) { return nil, nil } +func (s *storeRecorder) Save() ([]byte, error) { + s.record("Save") + return nil, nil +} func (s *storeRecorder) Recovery(b []byte) error { return nil } func (s *storeRecorder) TotalTransactions() uint64 { return 0 } func (s *storeRecorder) JsonStats() []byte { return nil } @@ -537,3 +612,21 @@ func (w *waitRecorder) Trigger(id int64, x interface{}) { func boolp(b bool) *bool { return &b } func stringp(s string) *string { return &s } + +type storageRecorder struct { + recorder +} + +func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) { + p.record("Save") +} +func (p *storageRecorder) Cut() error { + p.record("Cut") + return nil +} +func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) { + if raft.IsEmptySnap(st) { + return + } + p.record("SaveSnap") +} diff --git a/functional/http_functional_test.go b/functional/http_functional_test.go index d5dab7d9f..c1a777f8b 100644 --- a/functional/http_functional_test.go +++ b/functional/http_functional_test.go @@ -28,10 +28,10 @@ func TestSet(t *testing.T) { n.Campaign(ctx) srv := &etcdserver.EtcdServer{ - Store: store.New(), - Node: n, - Save: func(st raftpb.HardState, ents []raftpb.Entry) {}, - Send: etcdserver.SendFunc(nopSend), + Store: store.New(), + Node: n, + Storage: nopStorage{}, + Send: etcdserver.SendFunc(nopSend), } srv.Start() defer srv.Stop() @@ -66,3 +66,9 @@ func TestSet(t *testing.T) { } func stringp(s string) *string { return &s } + +type nopStorage struct{} + +func (np nopStorage) Save(st raftpb.HardState, ents []raftpb.Entry) {} +func (np nopStorage) Cut() error { return nil } +func (np nopStorage) SaveSnap(st raftpb.Snapshot) {} diff --git a/main.go b/main.go index 8c5c83393..1574f6feb 100644 --- a/main.go +++ b/main.go @@ -16,6 +16,7 @@ import ( "github.com/coreos/etcd/etcdserver/etcdhttp" "github.com/coreos/etcd/proxy" "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/snap" "github.com/coreos/etcd/store" "github.com/coreos/etcd/wal" ) @@ -31,6 +32,7 @@ var ( paddr = flag.String("peer-bind-addr", ":7001", "Peer service address (e.g., ':7001')") dir = flag.String("data-dir", "", "Path to the data directory") proxyMode = flag.Bool("proxy-mode", false, "Forward HTTP requests to peers, do not participate in raft.") + snapCount = flag.Int64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot") peers = &etcdhttp.Peers{} addrs = &Addrs{} @@ -70,6 +72,10 @@ func startEtcd() { log.Fatalf("%#x= must be specified in peers", id) } + if *snapCount <= 0 { + log.Fatalf("etcd: snapshot-count must be greater than 0: snapshot-count=%d", *snapCount) + } + if *dir == "" { *dir = fmt.Sprintf("%v_etcd_data", *fid) log.Printf("main: no data-dir is given, using default data-dir ./%s", *dir) @@ -77,16 +83,61 @@ func startEtcd() { if err := os.MkdirAll(*dir, privateDirMode); err != nil { log.Fatalf("main: cannot create data directory: %v", err) } + snapdir := path.Join(*dir, "snap") + if err := os.MkdirAll(snapdir, privateDirMode); err != nil { + log.Fatalf("etcd: cannot create snapshot directory: %v", err) + } + snapshotter := snap.New(snapdir) - n, w := startRaft(id, peers.IDs(), path.Join(*dir, "wal")) + waldir := path.Join(*dir, "wal") + var w *wal.WAL + var n raft.Node + st := store.New() + + if !wal.Exist(waldir) { + w, err = wal.Create(waldir) + if err != nil { + log.Fatal(err) + } + n = raft.Start(id, peers.IDs(), 10, 1) + } else { + var index int64 + snapshot, err := snapshotter.Load() + if err != nil && err != snap.ErrNoSnapshot { + log.Fatal(err) + } + if snapshot != nil { + log.Printf("etcd: restart from snapshot at index %d", snapshot.Index) + st.Recovery(snapshot.Data) + index = snapshot.Index + } + + // restart a node from previous wal + if w, err = wal.OpenAtIndex(waldir, index); err != nil { + log.Fatal(err) + } + wid, st, ents, err := w.ReadAll() + if err != nil { + log.Fatal(err) + } + // TODO(xiangli): save/recovery nodeID? + if wid != 0 { + log.Fatalf("unexpected nodeid %d: nodeid should always be zero until we save nodeid into wal", wid) + } + n = raft.Restart(id, peers.IDs(), 10, 1, snapshot, st, ents) + } s := &etcdserver.EtcdServer{ - Store: store.New(), - Node: n, - Save: w.Save, + Store: st, + Node: n, + Storage: struct { + *wal.WAL + *snap.Snapshotter + }{w, snapshotter}, Send: etcdhttp.Sender(*peers), Ticker: time.Tick(100 * time.Millisecond), SyncTicker: time.Tick(500 * time.Millisecond), + SnapCount: *snapCount, } s.Start() @@ -109,38 +160,6 @@ func startEtcd() { } } -// startRaft starts a raft node from the given wal dir. -// If the wal dir does not exist, startRaft will start a new raft node. -// If the wal dir exists, startRaft will restart the previous raft node. -// startRaft returns the started raft node and the opened wal. -func startRaft(id int64, peerIDs []int64, waldir string) (raft.Node, *wal.WAL) { - if !wal.Exist(waldir) { - w, err := wal.Create(waldir) - if err != nil { - log.Fatal(err) - } - n := raft.Start(id, peerIDs, 10, 1) - return n, w - } - - // restart a node from previous wal - // TODO(xiangli): check snapshot; not open from one - w, err := wal.OpenAtIndex(waldir, 0) - if err != nil { - log.Fatal(err) - } - wid, st, ents, err := w.ReadAll() - // TODO(xiangli): save/recovery nodeID? - if wid != 0 { - log.Fatalf("unexpected nodeid %d: nodeid should always be zero until we save nodeid into wal", wid) - } - if err != nil { - log.Fatal(err) - } - n := raft.Restart(id, peerIDs, 10, 1, st, ents) - return n, w -} - // startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes. func startProxy() { ph, err := proxy.NewHandler((*peers).Endpoints()) diff --git a/raft/log.go b/raft/log.go index cbfa90ee4..562afae9f 100644 --- a/raft/log.go +++ b/raft/log.go @@ -11,13 +11,12 @@ const ( ) type raftLog struct { - ents []pb.Entry - unstable int64 - committed int64 - applied int64 - offset int64 - snapshot pb.Snapshot - unstableSnapshot pb.Snapshot + ents []pb.Entry + unstable int64 + committed int64 + applied int64 + offset int64 + snapshot pb.Snapshot // want a compact after the number of entries exceeds the threshold // TODO(xiangli) size might be a better criteria diff --git a/raft/node.go b/raft/node.go index 986c4e7f7..98cfbbfdc 100644 --- a/raft/node.go +++ b/raft/node.go @@ -42,6 +42,9 @@ type Ready struct { // Messages are sent. Entries []pb.Entry + // Snapshot specifies the snapshot to be saved to stable storage. + Snapshot pb.Snapshot + // CommittedEntries specifies entries to be committed to a // store/state-machine. These have previously been committed to stable // store. @@ -60,16 +63,22 @@ func IsEmptyHardState(st pb.HardState) bool { return isHardStateEqual(st, emptyState) } +func IsEmptySnap(sp pb.Snapshot) bool { + return sp.Index == 0 +} + func (rd Ready) containsUpdates() bool { - return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) || len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 + return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) || !IsEmptySnap(rd.Snapshot) || + len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 } type Node struct { - propc chan pb.Message - recvc chan pb.Message - readyc chan Ready - tickc chan struct{} - done chan struct{} + propc chan pb.Message + recvc chan pb.Message + compactc chan []byte + readyc chan Ready + tickc chan struct{} + done chan struct{} } // Start returns a new Node given a unique raft id, a list of raft peers, and @@ -84,9 +93,12 @@ func Start(id int64, peers []int64, election, heartbeat int) Node { // Restart is identical to Start but takes an initial State and a slice of // entries. Generally this is used when restarting from a stable storage // log. -func Restart(id int64, peers []int64, election, heartbeat int, st pb.HardState, ents []pb.Entry) Node { +func Restart(id int64, peers []int64, election, heartbeat int, snapshot *pb.Snapshot, st pb.HardState, ents []pb.Entry) Node { n := newNode() r := newRaft(id, peers, election, heartbeat) + if snapshot != nil { + r.restore(*snapshot) + } r.loadState(st) r.loadEnts(ents) go n.run(r) @@ -95,11 +107,12 @@ func Restart(id int64, peers []int64, election, heartbeat int, st pb.HardState, func newNode() Node { return Node{ - propc: make(chan pb.Message), - recvc: make(chan pb.Message), - readyc: make(chan Ready), - tickc: make(chan struct{}), - done: make(chan struct{}), + propc: make(chan pb.Message), + recvc: make(chan pb.Message), + compactc: make(chan []byte), + readyc: make(chan Ready), + tickc: make(chan struct{}), + done: make(chan struct{}), } } @@ -114,9 +127,10 @@ func (n *Node) run(r *raft) { lead := None prevSoftSt := r.softState() prevHardSt := r.HardState + prevSnapi := r.raftLog.snapshot.Index for { - rd := newReady(r, prevSoftSt, prevHardSt) + rd := newReady(r, prevSoftSt, prevHardSt, prevSnapi) if rd.containsUpdates() { readyc = n.readyc } else { @@ -139,6 +153,8 @@ func (n *Node) run(r *raft) { r.Step(m) case m := <-n.recvc: r.Step(m) // raft never returns an error + case d := <-n.compactc: + r.compact(d) case <-n.tickc: r.tick() case readyc <- rd: @@ -148,6 +164,9 @@ func (n *Node) run(r *raft) { if !IsEmptyHardState(rd.HardState) { prevHardSt = rd.HardState } + if !IsEmptySnap(rd.Snapshot) { + prevSnapi = rd.Snapshot.Index + } r.raftLog.resetNextEnts() r.raftLog.resetUnstable() r.msgs = nil @@ -198,7 +217,14 @@ func (n *Node) Ready() <-chan Ready { return n.readyc } -func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { +func (n *Node) Compact(d []byte) { + select { + case n.compactc <- d: + case <-n.done: + } +} + +func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState, prevSnapi int64) Ready { rd := Ready{ Entries: r.raftLog.unstableEnts(), CommittedEntries: r.raftLog.nextEnts(), @@ -210,5 +236,8 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { if !isHardStateEqual(r.HardState, prevHardSt) { rd.HardState = r.HardState } + if prevSnapi != r.raftLog.snapshot.Index { + rd.Snapshot = r.raftLog.snapshot + } return rd } diff --git a/raft/node_test.go b/raft/node_test.go index bf668a1db..d00a7a405 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -126,6 +126,7 @@ func TestReadyContainUpdates(t *testing.T) { {Ready{Entries: make([]raftpb.Entry, 1, 1)}, true}, {Ready{CommittedEntries: make([]raftpb.Entry, 1, 1)}, true}, {Ready{Messages: make([]raftpb.Message, 1, 1)}, true}, + {Ready{Snapshot: raftpb.Snapshot{Index: 1}}, true}, } for i, tt := range tests { @@ -185,7 +186,7 @@ func TestNodeRestart(t *testing.T) { CommittedEntries: entries[1 : st.Commit+1], } - n := Restart(1, []int64{1}, 0, 0, st, entries) + n := Restart(1, []int64{1}, 0, 0, nil, st, entries) if g := <-n.Ready(); !reflect.DeepEqual(g, want) { t.Errorf("g = %+v,\n w %+v", g, want) } @@ -197,6 +198,56 @@ func TestNodeRestart(t *testing.T) { } } +// TestCompacts ensures Node.Compact creates a correct raft snapshot and compacts +// the raft log (call raft.compact) +func TestCompact(t *testing.T) { + ctx := context.Background() + n := newNode() + r := newRaft(1, []int64{1}, 0, 0) + go n.run(r) + + n.Campaign(ctx) + n.Propose(ctx, []byte("foo")) + + w := raftpb.Snapshot{ + Term: 1, + Index: 2, // one nop + one proposal + Data: []byte("a snapshot"), + Nodes: []int64{1}, + } + + forceGosched() + select { + case <-n.Ready(): + default: + t.Fatalf("unexpected proposal failure: unable to commit entry") + } + + n.Compact(w.Data) + forceGosched() + select { + case rd := <-n.Ready(): + if !reflect.DeepEqual(rd.Snapshot, w) { + t.Errorf("snap = %+v, want %+v", rd.Snapshot, w) + } + default: + t.Fatalf("unexpected compact failure: unable to create a snapshot") + } + forceGosched() + // TODO: this test the run updates the snapi correctly... should be tested + // separately with other kinds of updates + select { + case <-n.Ready(): + t.Fatalf("unexpected more ready") + default: + } + n.Stop() + + if r.raftLog.offset != w.Index { + t.Errorf("log.offset = %d, want %d", r.raftLog.offset, w.Index) + } +} + func TestIsStateEqual(t *testing.T) { tests := []struct { st raftpb.HardState diff --git a/raft/raft.go b/raft/raft.go index 249198094..1097bb36f 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -502,9 +502,6 @@ func (r *raft) delProgress(id int64) { } func (r *raft) loadEnts(ents []pb.Entry) { - if !r.raftLog.isEmpty() { - panic("cannot load entries when log is not empty") - } r.raftLog.load(ents) } diff --git a/snap/snapshotter.go b/snap/snapshotter.go index d813d3ef4..f2f38c672 100644 --- a/snap/snapshotter.go +++ b/snap/snapshotter.go @@ -11,6 +11,7 @@ import ( "sort" "strings" + "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap/snappb" ) @@ -35,7 +36,14 @@ func New(dir string) *Snapshotter { } } -func (s *Snapshotter) Save(snapshot *raftpb.Snapshot) error { +func (s *Snapshotter) SaveSnap(snapshot raftpb.Snapshot) { + if raft.IsEmptySnap(snapshot) { + return + } + s.save(&snapshot) +} + +func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error { fname := fmt.Sprintf("%016x-%016x%s", snapshot.Term, snapshot.Index, snapSuffix) b, err := snapshot.Marshal() if err != nil { diff --git a/snap/snapshotter_test.go b/snap/snapshotter_test.go index 56a95b2d5..0b487855c 100644 --- a/snap/snapshotter_test.go +++ b/snap/snapshotter_test.go @@ -27,7 +27,7 @@ func TestSaveAndLoad(t *testing.T) { } defer os.RemoveAll(dir) ss := New(dir) - err = ss.Save(testSnap) + err = ss.save(testSnap) if err != nil { t.Fatal(err) } @@ -49,7 +49,7 @@ func TestBadCRC(t *testing.T) { } defer os.RemoveAll(dir) ss := New(dir) - err = ss.Save(testSnap) + err = ss.save(testSnap) if err != nil { t.Fatal(err) } @@ -79,7 +79,7 @@ func TestFailback(t *testing.T) { } ss := New(dir) - err = ss.Save(testSnap) + err = ss.save(testSnap) if err != nil { t.Fatal(err) } @@ -134,14 +134,14 @@ func TestLoadNewestSnap(t *testing.T) { } defer os.RemoveAll(dir) ss := New(dir) - err = ss.Save(testSnap) + err = ss.save(testSnap) if err != nil { t.Fatal(err) } newSnap := *testSnap newSnap.Index = 5 - err = ss.Save(&newSnap) + err = ss.save(&newSnap) if err != nil { t.Fatal(err) } diff --git a/wal/wal.go b/wal/wal.go index 295370afd..955b0d9b7 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -142,6 +142,7 @@ func OpenAtIndex(dirpath string, index int64) (*WAL, error) { // create a WAL ready for reading w := &WAL{ + dir: dirpath, ri: index, decoder: newDecoder(rc),