From 23bd60ccce27fee2d9f65b667cd160ca4ce3b958 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 8 Dec 2015 07:52:54 -0800 Subject: [PATCH] *: rewrite snapshot sending --- etcdserver/config.go | 2 - etcdserver/raft.go | 15 +- etcdserver/raft_storage.go | 64 -------- etcdserver/raft_test.go | 2 +- etcdserver/server.go | 93 +++++------ etcdserver/server_test.go | 95 ++++------- etcdserver/snapshot_merge.go | 71 ++++++++ etcdserver/snapshot_store.go | 260 ------------------------------ etcdserver/snapshot_store_test.go | 205 ----------------------- etcdserver/storage.go | 3 + rafthttp/http.go | 21 +-- rafthttp/http_test.go | 9 +- rafthttp/peer.go | 18 ++- rafthttp/snapshot_sender.go | 24 ++- rafthttp/snapshot_store.go | 45 ------ rafthttp/transport.go | 43 ++--- snap/db.go | 67 ++++++++ snap/message.go | 34 ++++ 18 files changed, 308 insertions(+), 763 deletions(-) delete mode 100644 etcdserver/raft_storage.go create mode 100644 etcdserver/snapshot_merge.go delete mode 100644 etcdserver/snapshot_store.go delete mode 100644 etcdserver/snapshot_store_test.go delete mode 100644 rafthttp/snapshot_store.go create mode 100644 snap/db.go create mode 100644 snap/message.go diff --git a/etcdserver/config.go b/etcdserver/config.go index ff28e4cff..2afb7a807 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -121,8 +121,6 @@ func (c *ServerConfig) WALDir() string { func (c *ServerConfig) SnapDir() string { return path.Join(c.MemberDir(), "snap") } -func (c *ServerConfig) StorageDir() string { return path.Join(c.MemberDir(), "storage") } - func (c *ServerConfig) ShouldDiscover() bool { return c.DiscoveryURL != "" } // ReqTimeout returns timeout for request to finish. diff --git a/etcdserver/raft.go b/etcdserver/raft.go index c5aa558d0..f663cf888 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -109,7 +109,7 @@ type raftNode struct { // utility ticker <-chan time.Time - raftStorage *raftStorage + raftStorage *raft.MemoryStorage storage Storage // transport specifies the transport to send and receive msgs to members. // Sending messages MUST NOT block. It is okay to drop messages, since @@ -126,7 +126,6 @@ type raftNode struct { // TODO: Ideally raftNode should get rid of the passed in server structure. func (r *raftNode) start(s *EtcdServer) { r.s = s - r.raftStorage.raftStarted = true r.applyc = make(chan apply) r.stopped = make(chan struct{}) r.done = make(chan struct{}) @@ -245,7 +244,7 @@ func advanceTicksForElection(n raft.Node, electionTicks int) { } } -func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n raft.Node, s *raftStorage, w *wal.WAL) { +func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { var err error member := cl.MemberByName(cfg.Name) metadata := pbutil.MustMarshal( @@ -270,7 +269,7 @@ func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n r } id = member.ID plog.Infof("starting member %s in cluster %s", id, cl.ID()) - s = newRaftStorage() + s = raft.NewMemoryStorage() c := &raft.Config{ ID: uint64(id), ElectionTick: cfg.ElectionTicks, @@ -287,7 +286,7 @@ func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n r return } -func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raftStorage, *wal.WAL) { +func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term @@ -297,7 +296,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *clust plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit) cl := newCluster("") cl.SetID(cid) - s := newRaftStorage() + s := raft.NewMemoryStorage() if snapshot != nil { s.ApplySnapshot(*snapshot) } @@ -319,7 +318,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *clust return id, cl, n, s, w } -func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raftStorage, *wal.WAL) { +func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term @@ -351,7 +350,7 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit) cl := newCluster("") cl.SetID(cid) - s := newRaftStorage() + s := raft.NewMemoryStorage() if snapshot != nil { s.ApplySnapshot(*snapshot) } diff --git a/etcdserver/raft_storage.go b/etcdserver/raft_storage.go deleted file mode 100644 index b546793df..000000000 --- a/etcdserver/raft_storage.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright 2015 CoreOS, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package etcdserver - -import ( - "github.com/coreos/etcd/raft" - "github.com/coreos/etcd/raft/raftpb" -) - -type raftStorage struct { - *raft.MemoryStorage - // snapStore is the place to request snapshot when v3demo is enabled. - // If snapStore is nil, it uses the snapshot saved in MemoryStorage. - snapStore *snapshotStore - // raftStarted indicates whether raft starts to function. If not, it cannot - // request snapshot, and should get snapshot from MemoryStorage. - raftStarted bool -} - -func newRaftStorage() *raftStorage { - return &raftStorage{ - MemoryStorage: raft.NewMemoryStorage(), - } -} - -func (rs *raftStorage) reqsnap() <-chan struct{} { - if rs.snapStore == nil { - return nil - } - return rs.snapStore.reqsnapc -} - -func (rs *raftStorage) raftsnap() chan<- raftpb.Snapshot { - if rs.snapStore == nil { - return nil - } - return rs.snapStore.raftsnapc -} - -// Snapshot returns raft snapshot. If snapStore is nil or raft is not started, this method -// returns snapshot saved in MemoryStorage. Otherwise, this method -// returns snapshot from snapStore. -func (rs *raftStorage) Snapshot() (raftpb.Snapshot, error) { - if rs.snapStore == nil || !rs.raftStarted { - return rs.MemoryStorage.Snapshot() - } - snap, err := rs.snapStore.getSnap() - if err != nil { - return raftpb.Snapshot{}, err - } - return snap.raft(), nil -} diff --git a/etcdserver/raft_test.go b/etcdserver/raft_test.go index 336e85e88..7cce36870 100644 --- a/etcdserver/raft_test.go +++ b/etcdserver/raft_test.go @@ -153,7 +153,7 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) { r := raftNode{ Node: n, storage: &storageRecorder{}, - raftStorage: newRaftStorage(), + raftStorage: raft.NewMemoryStorage(), transport: &nopTransporter{}, } r.start(&EtcdServer{r: r}) diff --git a/etcdserver/server.go b/etcdserver/server.go index df33c5a89..e49da3cf9 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -19,7 +19,6 @@ import ( "errors" "expvar" "fmt" - "log" "math/rand" "net/http" "os" @@ -65,6 +64,9 @@ const ( monitorVersionInterval = 5 * time.Second databaseFilename = "db" + // max number of in-flight snapshot messages etcdserver allows to have + // This number is more than enough for most clusters with 5 machines. + maxInFlightMsgSnap = 16 ) var ( @@ -177,19 +179,23 @@ type EtcdServer struct { // forceVersionC is used to force the version monitor loop // to detect the cluster version immediately. forceVersionC chan struct{} + + msgSnapC chan raftpb.Message } // NewServer creates a new EtcdServer from the supplied configuration. The // configuration is considered static for the lifetime of the EtcdServer. func NewServer(cfg *ServerConfig) (*EtcdServer, error) { st := store.New(StoreClusterPrefix, StoreKeysPrefix) - var w *wal.WAL - var n raft.Node - var s *raftStorage - var id types.ID - var cl *cluster + var ( + w *wal.WAL + n raft.Node + s *raft.MemoryStorage + id types.ID + cl *cluster + ) - if !cfg.V3demo && fileutil.Exist(path.Join(cfg.StorageDir(), databaseFilename)) { + if !cfg.V3demo && fileutil.Exist(path.Join(cfg.SnapDir(), databaseFilename)) { return nil, errors.New("experimental-v3demo cannot be disabled once it is enabled") } @@ -340,18 +346,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { versionRt: prt, reqIDGen: idutil.NewGenerator(uint8(id), time.Now()), forceVersionC: make(chan struct{}), + msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), } if cfg.V3demo { - err = os.MkdirAll(cfg.StorageDir(), privateDirMode) - if err != nil && err != os.ErrExist { - return nil, err - } - srv.kv = dstorage.New(path.Join(cfg.StorageDir(), databaseFilename), &srv.consistIndex) + srv.kv = dstorage.New(path.Join(cfg.SnapDir(), databaseFilename), &srv.consistIndex) if err := srv.kv.Restore(); err != nil { plog.Fatalf("v3 storage restore error: %v", err) } - s.snapStore = newSnapshotStore(cfg.StorageDir(), srv.kv) } // TODO: move transport initialization near the definition of remote @@ -361,7 +363,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { ID: id, ClusterID: cl.ID(), Raft: srv, - SnapSaver: s.snapStore, + Snapshotter: ss, ServerStats: sstats, LeaderStats: lstats, ErrorC: srv.errorc, @@ -383,10 +385,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { } srv.r.transport = tr - if cfg.V3demo { - s.snapStore.tr = tr - } - return srv, nil } @@ -465,9 +463,6 @@ func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) } // and clears the used snapshot from the snapshot store. func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) { s.r.ReportSnapshot(id, status) - if s.cfg.V3demo { - s.r.raftStorage.snapStore.clearUsedSnap() - } } func (s *EtcdServer) run() { @@ -496,12 +491,12 @@ func (s *EtcdServer) run() { } if s.cfg.V3demo { - snapfn, err := s.r.raftStorage.snapStore.getSnapFilePath(apply.snapshot.Metadata.Index) + snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index) if err != nil { - plog.Panicf("get snapshot file path error: %v", err) + plog.Panicf("get database snapshot file path error: %v", err) } - fn := path.Join(s.cfg.StorageDir(), databaseFilename) + fn := path.Join(s.cfg.SnapDir(), databaseFilename) if err := os.Rename(snapfn, fn); err != nil { plog.Panicf("rename snapshot file error: %v", err) } @@ -514,7 +509,6 @@ func (s *EtcdServer) run() { oldKV := s.kv // TODO: swap the kv pointer atomically s.kv = newKV - s.r.raftStorage.snapStore.kv = newKV // Closing oldKV might block until all the txns // on the kv are finished. @@ -571,9 +565,9 @@ func (s *EtcdServer) run() { s.snapshot(appliedi, confState) snapi = appliedi } - case <-s.r.raftStorage.reqsnap(): - s.r.raftStorage.raftsnap() <- s.createRaftSnapshot(appliedi, confState) - plog.Infof("requested snapshot created at %d", appliedi) + case m := <-s.msgSnapC: + merged := s.createMergedSnapshotMessage(m, appliedi, confState) + s.r.transport.SendSnapshot(merged) case err := <-s.errorc: plog.Errorf("%s", err) plog.Infof("the data-dir used by this member must be removed.") @@ -828,7 +822,24 @@ func (s *EtcdServer) send(ms []raftpb.Message) { if s.cluster.IsIDRemoved(types.ID(ms[i].To)) { ms[i].To = 0 } + + if s.cfg.V3demo { + if ms[i].Type == raftpb.MsgSnap { + // There are two separate data store when v3 demo is enabled: the store for v2, + // and the KV for v3. + // The msgSnap only contains the most recent snapshot of store without KV. + // So we need to redirect the msgSnap to etcd server main loop for merging in the + // current store snapshot and KV snapshot. + select { + case s.msgSnapC <- ms[i]: + default: + // drop msgSnap if the inflight chan if full. + } + ms[i].To = 0 + } + } } + s.r.transport.Send(ms) } @@ -998,29 +1009,6 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con return false, nil } -// createRaftSnapshot creates a raft snapshot that includes the state of store for v2 api. -func (s *EtcdServer) createRaftSnapshot(snapi uint64, confState raftpb.ConfState) raftpb.Snapshot { - snapt, err := s.r.raftStorage.Term(snapi) - if err != nil { - log.Panicf("get term should never fail: %v", err) - } - - clone := s.store.Clone() - d, err := clone.SaveNoCopy() - if err != nil { - plog.Panicf("store save should never fail: %v", err) - } - - return raftpb.Snapshot{ - Metadata: raftpb.SnapshotMetadata{ - Index: snapi, - Term: snapt, - ConfState: confState, - }, - Data: d, - } -} - // TODO: non-blocking snapshot func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { clone := s.store.Clone() @@ -1068,9 +1056,6 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { plog.Panicf("unexpected compaction error %v", err) } plog.Infof("compacted raft log at %d", compacti) - if s.cfg.V3demo && s.r.raftStorage.snapStore.closeSnapBefore(compacti) { - plog.Infof("closed snapshot stored due to compaction at %d", compacti) - } }() } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index a3d2b5605..a6a954ada 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -17,7 +17,6 @@ package etcdserver import ( "encoding/json" "fmt" - "io" "net/http" "path" "reflect" @@ -33,6 +32,7 @@ import ( "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/snap" "github.com/coreos/etcd/store" ) @@ -523,7 +523,7 @@ func TestDoProposal(t *testing.T) { r: raftNode{ Node: newNodeCommitter(), storage: &storageRecorder{}, - raftStorage: newRaftStorage(), + raftStorage: raft.NewMemoryStorage(), transport: &nopTransporter{}, }, store: st, @@ -675,7 +675,7 @@ func TestSyncTrigger(t *testing.T) { cfg: &ServerConfig{TickMs: 1}, r: raftNode{ Node: n, - raftStorage: newRaftStorage(), + raftStorage: raft.NewMemoryStorage(), transport: &nopTransporter{}, storage: &storageRecorder{}, }, @@ -712,51 +712,9 @@ func TestSyncTrigger(t *testing.T) { } } -func TestCreateRaftSnapshot(t *testing.T) { - s := newRaftStorage() - s.Append([]raftpb.Entry{{Index: 1, Term: 1}}) - st := &storeRecorder{} - srv := &EtcdServer{ - r: raftNode{ - raftStorage: s, - }, - store: st, - } - - snap := srv.createRaftSnapshot(1, raftpb.ConfState{Nodes: []uint64{1}}) - wdata, err := st.Save() - if err != nil { - t.Fatal(err) - } - wsnap := raftpb.Snapshot{ - Metadata: raftpb.SnapshotMetadata{ - Index: 1, - Term: 1, - ConfState: raftpb.ConfState{Nodes: []uint64{1}}, - }, - Data: wdata, - } - if !reflect.DeepEqual(snap, wsnap) { - t.Errorf("snap = %+v, want %+v", snap, wsnap) - } - - gaction := st.Action() - // the third action is store.Save used in testing - if len(gaction) != 3 { - t.Fatalf("len(action) = %d, want 3", len(gaction)) - } - if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "Clone"}) { - t.Errorf("action = %s, want Clone", gaction[0]) - } - if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "SaveNoCopy"}) { - t.Errorf("action = %s, want SaveNoCopy", gaction[1]) - } - -} - // snapshot should snapshot the store and cut the persistent func TestSnapshot(t *testing.T) { - s := newRaftStorage() + s := raft.NewMemoryStorage() s.Append([]raftpb.Entry{{Index: 1}}) st := &storeRecorder{} p := &storageRecorder{} @@ -800,7 +758,7 @@ func TestTriggerSnap(t *testing.T) { snapCount: uint64(snapc), r: raftNode{ Node: newNodeCommitter(), - raftStorage: newRaftStorage(), + raftStorage: raft.NewMemoryStorage(), storage: p, transport: &nopTransporter{}, }, @@ -841,7 +799,7 @@ func TestRecvSnapshot(t *testing.T) { Node: n, transport: &nopTransporter{}, storage: p, - raftStorage: newRaftStorage(), + raftStorage: raft.NewMemoryStorage(), }, store: st, cluster: cl, @@ -874,7 +832,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) { st := &storeRecorder{} cl := newCluster("abc") cl.SetStore(store.New()) - storage := newRaftStorage() + storage := raft.NewMemoryStorage() s := &EtcdServer{ cfg: &ServerConfig{}, r: raftNode{ @@ -923,7 +881,7 @@ func TestAddMember(t *testing.T) { s := &EtcdServer{ r: raftNode{ Node: n, - raftStorage: newRaftStorage(), + raftStorage: raft.NewMemoryStorage(), storage: &storageRecorder{}, transport: &nopTransporter{}, }, @@ -963,7 +921,7 @@ func TestRemoveMember(t *testing.T) { s := &EtcdServer{ r: raftNode{ Node: n, - raftStorage: newRaftStorage(), + raftStorage: raft.NewMemoryStorage(), storage: &storageRecorder{}, transport: &nopTransporter{}, }, @@ -1002,7 +960,7 @@ func TestUpdateMember(t *testing.T) { s := &EtcdServer{ r: raftNode{ Node: n, - raftStorage: newRaftStorage(), + raftStorage: raft.NewMemoryStorage(), storage: &storageRecorder{}, transport: &nopTransporter{}, }, @@ -1355,12 +1313,19 @@ func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) error { p.Record(testutil.Action{Name: "Save"}) return nil } + func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error { if !raft.IsEmptySnap(st) { p.Record(testutil.Action{Name: "SaveSnap"}) } return nil } + +func (p *storageRecorder) DBFilePath(id uint64) (string, error) { + p.Record(testutil.Action{Name: "DBFilePath"}) + return fmt.Sprintf("%016x.snap.db", id), nil +} + func (p *storageRecorder) Close() error { return nil } type nodeRecorder struct{ testutil.Recorder } @@ -1477,16 +1442,16 @@ func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc } type nopTransporter struct{} -func (s *nopTransporter) Start() error { return nil } -func (s *nopTransporter) Handler() http.Handler { return nil } -func (s *nopTransporter) Send(m []raftpb.Message) {} -func (s *nopTransporter) AddRemote(id types.ID, us []string) {} -func (s *nopTransporter) AddPeer(id types.ID, us []string) {} -func (s *nopTransporter) RemovePeer(id types.ID) {} -func (s *nopTransporter) RemoveAllPeers() {} -func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {} -func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} } -func (s *nopTransporter) SnapshotReady(rc io.ReadCloser, index uint64) {} -func (s *nopTransporter) Stop() {} -func (s *nopTransporter) Pause() {} -func (s *nopTransporter) Resume() {} +func (s *nopTransporter) Start() error { return nil } +func (s *nopTransporter) Handler() http.Handler { return nil } +func (s *nopTransporter) Send(m []raftpb.Message) {} +func (s *nopTransporter) SendSnapshot(m snap.Message) {} +func (s *nopTransporter) AddRemote(id types.ID, us []string) {} +func (s *nopTransporter) AddPeer(id types.ID, us []string) {} +func (s *nopTransporter) RemovePeer(id types.ID) {} +func (s *nopTransporter) RemoveAllPeers() {} +func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {} +func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} } +func (s *nopTransporter) Stop() {} +func (s *nopTransporter) Pause() {} +func (s *nopTransporter) Resume() {} diff --git a/etcdserver/snapshot_merge.go b/etcdserver/snapshot_merge.go new file mode 100644 index 000000000..429192df3 --- /dev/null +++ b/etcdserver/snapshot_merge.go @@ -0,0 +1,71 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +import ( + "io" + "log" + + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/snap" + dstorage "github.com/coreos/etcd/storage" +) + +// createMergedSnapshotMessage creates a snapshot message that contains: raft status (term, conf), +// a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message +// as ReadCloser. +func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64, confState raftpb.ConfState) snap.Message { + snapt, err := s.r.raftStorage.Term(snapi) + if err != nil { + log.Panicf("get term should never fail: %v", err) + } + + // get a snapshot of v2 store as []byte + clone := s.store.Clone() + d, err := clone.SaveNoCopy() + if err != nil { + plog.Panicf("store save should never fail: %v", err) + } + + // get a snapshot of v3 KV as readCloser + rc := newSnapshotReaderCloser(s.kv.Snapshot()) + + // put the []byte snapshot of store into raft snapshot and return the merged snapshot with + // KV readCloser snapshot. + snapshot := raftpb.Snapshot{ + Metadata: raftpb.SnapshotMetadata{ + Index: snapi, + Term: snapt, + ConfState: confState, + }, + Data: d, + } + m.Snapshot = snapshot + + return snap.Message{ + Message: m, + ReadCloser: rc, + } +} + +func newSnapshotReaderCloser(snapshot dstorage.Snapshot) io.ReadCloser { + pr, pw := io.Pipe() + go func() { + _, err := snapshot.WriteTo(pw) + pw.CloseWithError(err) + snapshot.Close() + }() + return pr +} diff --git a/etcdserver/snapshot_store.go b/etcdserver/snapshot_store.go deleted file mode 100644 index 573f0e7ff..000000000 --- a/etcdserver/snapshot_store.go +++ /dev/null @@ -1,260 +0,0 @@ -// Copyright 2015 CoreOS, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package etcdserver - -import ( - "fmt" - "io" - "io/ioutil" - "os" - "path" - "sync" - "time" - - "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork" - "github.com/coreos/etcd/pkg/fileutil" - "github.com/coreos/etcd/raft" - "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/rafthttp" - dstorage "github.com/coreos/etcd/storage" -) - -// clearUnusedSnapshotInterval specifies the time interval to wait -// before clearing unused snapshot. -// The newly created snapshot should be retrieved within one heartbeat -// interval because raft state machine retries to send snapshot -// to slow follower when receiving MsgHeartbeatResp from the follower. -// Set it as 5s to match the upper limit of heartbeat interval. -const clearUnusedSnapshotInterval = 5 * time.Second - -type snapshot struct { - r raftpb.Snapshot - - io.ReadCloser // used to read out v3 snapshot - - done chan struct{} -} - -func newSnapshot(r raftpb.Snapshot, kv dstorage.Snapshot) *snapshot { - done := make(chan struct{}) - pr, pw := io.Pipe() - go func() { - _, err := kv.WriteTo(pw) - pw.CloseWithError(err) - kv.Close() - close(done) - }() - return &snapshot{ - r: r, - ReadCloser: pr, - done: done, - } -} - -func (s *snapshot) raft() raftpb.Snapshot { return s.r } - -func (s *snapshot) isClosed() bool { - select { - case <-s.done: - return true - default: - return false - } -} - -// TODO: remove snapshotStore. getSnap part could be put into memoryStorage, -// while SaveFrom could be put into another struct, or even put into dstorage package. -type snapshotStore struct { - // dir to save snapshot data - dir string - kv dstorage.KV - tr rafthttp.Transporter - - // send empty to reqsnapc to notify the channel receiver to send back latest - // snapshot to snapc - reqsnapc chan struct{} - // a chan to receive the requested raft snapshot - // snapshotStore will receive from the chan immediately after it sends empty to reqsnapc - raftsnapc chan raftpb.Snapshot - - mu sync.Mutex // protect belowing vars - // snap is nil iff there is no snapshot stored - snap *snapshot - inUse bool - createOnce sync.Once // ensure at most one snapshot is created when no snapshot stored - - clock clockwork.Clock -} - -func newSnapshotStore(dir string, kv dstorage.KV) *snapshotStore { - return &snapshotStore{ - dir: dir, - kv: kv, - reqsnapc: make(chan struct{}), - raftsnapc: make(chan raftpb.Snapshot), - clock: clockwork.NewRealClock(), - } -} - -// getSnap returns a snapshot. -// If there is no available snapshot, ErrSnapshotTemporarilyUnavaliable will be returned. -// -// If the snapshot stored is in use, it returns ErrSnapshotTemporarilyUnavailable. -// If there is no snapshot stored, it creates new snapshot -// asynchronously and returns ErrSnapshotTemporarilyUnavailable, so -// caller could get snapshot later when the snapshot is created. -// Otherwise, it returns the snapshot stored. -// -// The created snapshot is cleared from the snapshot store if it is -// either unused after clearUnusedSnapshotInterval, or explicitly cleared -// through clearUsedSnap after using. -// closeSnapBefore is used to close outdated snapshot, -// so the snapshot will be cleared faster when in use. -// -// snapshot store stores at most one snapshot at a time. -// If raft state machine wants to send two snapshot messages to two followers, -// the second snapshot message will keep getting snapshot and succeed only after -// the first message is sent. This increases the time used to send messages, -// but it is acceptable because this should happen seldomly. -func (ss *snapshotStore) getSnap() (*snapshot, error) { - ss.mu.Lock() - defer ss.mu.Unlock() - - if ss.inUse { - return nil, raft.ErrSnapshotTemporarilyUnavailable - } - - if ss.snap == nil { - // create snapshot asynchronously - ss.createOnce.Do(func() { go ss.createSnap() }) - return nil, raft.ErrSnapshotTemporarilyUnavailable - } - - ss.inUse = true - // give transporter the generated snapshot that is ready to send out - ss.tr.SnapshotReady(ss.snap, ss.snap.raft().Metadata.Index) - return ss.snap, nil -} - -// clearUsedSnap clears the snapshot from the snapshot store after it -// is used. -// After clear, snapshotStore could create new snapshot when getSnap. -func (ss *snapshotStore) clearUsedSnap() { - ss.mu.Lock() - defer ss.mu.Unlock() - if !ss.inUse { - plog.Panicf("unexpected clearUsedSnap when snapshot is not in use") - } - ss.clear() -} - -// closeSnapBefore closes the stored snapshot if its index is not greater -// than the given compact index. -// If it closes the snapshot, it returns true. -func (ss *snapshotStore) closeSnapBefore(index uint64) bool { - ss.mu.Lock() - defer ss.mu.Unlock() - if ss.snap != nil && ss.snap.raft().Metadata.Index <= index { - if err := ss.snap.Close(); err != nil { - plog.Errorf("snapshot close error (%v)", err) - } - return true - } - return false -} - -// createSnap creates a new snapshot and stores it into the snapshot store. -// It also sets a timer to clear the snapshot if it is not in use after -// some time interval. -// It should only be called in snapshotStore functions. -func (ss *snapshotStore) createSnap() { - // ask to generate v2 snapshot - ss.reqsnapc <- struct{}{} - // generate KV snapshot - kvsnap := ss.kv.Snapshot() - raftsnap := <-ss.raftsnapc - snap := newSnapshot(raftsnap, kvsnap) - - ss.mu.Lock() - ss.snap = snap - ss.mu.Unlock() - - go func() { - <-ss.clock.After(clearUnusedSnapshotInterval) - ss.mu.Lock() - defer ss.mu.Unlock() - if snap == ss.snap && !ss.inUse { - ss.clear() - } - }() -} - -// clear clears snapshot related variables in snapshotStore. It closes -// the snapshot stored and sets the variables to initial values. -// It should only be called in snapshotStore functions. -func (ss *snapshotStore) clear() { - if err := ss.snap.Close(); err != nil { - plog.Errorf("snapshot close error (%v)", err) - } - ss.snap = nil - ss.inUse = false - ss.createOnce = sync.Once{} -} - -// SaveFrom saves snapshot at the given index from the given reader. -// If the snapshot with the given index has been saved successfully, it keeps -// the original saved snapshot and returns error. -// The function guarantees that SaveFrom always saves either complete -// snapshot or no snapshot, even if the call is aborted because program -// is hard killed. -func (ss *snapshotStore) SaveFrom(r io.Reader, index uint64) error { - f, err := ioutil.TempFile(ss.dir, "tmp") - if err != nil { - return err - } - _, err = io.Copy(f, r) - f.Close() - if err != nil { - os.Remove(f.Name()) - return err - } - fn := path.Join(ss.dir, fmt.Sprintf("%016x.db", index)) - if fileutil.Exist(fn) { - os.Remove(f.Name()) - return fmt.Errorf("snapshot to save has existed") - } - err = os.Rename(f.Name(), fn) - if err != nil { - os.Remove(f.Name()) - return err - } - return nil -} - -// getSnapFilePath returns the file path for the snapshot with given index. -// If the snapshot does not exist, it returns error. -func (ss *snapshotStore) getSnapFilePath(index uint64) (string, error) { - fns, err := fileutil.ReadDir(ss.dir) - if err != nil { - return "", err - } - wfn := fmt.Sprintf("%016x.db", index) - for _, fn := range fns { - if fn == wfn { - return path.Join(ss.dir, fn), nil - } - } - return "", fmt.Errorf("snapshot file doesn't exist") -} diff --git a/etcdserver/snapshot_store_test.go b/etcdserver/snapshot_store_test.go deleted file mode 100644 index 49ae6f413..000000000 --- a/etcdserver/snapshot_store_test.go +++ /dev/null @@ -1,205 +0,0 @@ -// Copyright 2015 CoreOS, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package etcdserver - -import ( - "io" - "reflect" - "sync" - "testing" - - "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork" - "github.com/coreos/etcd/pkg/testutil" - "github.com/coreos/etcd/raft" - "github.com/coreos/etcd/raft/raftpb" - dstorage "github.com/coreos/etcd/storage" - "github.com/coreos/etcd/storage/storagepb" -) - -func TestSnapshotStoreCreateSnap(t *testing.T) { - snap := raftpb.Snapshot{ - Metadata: raftpb.SnapshotMetadata{Index: 1}, - } - ss := newSnapshotStore("", &nopKV{}) - fakeClock := clockwork.NewFakeClock() - ss.clock = fakeClock - go func() { - <-ss.reqsnapc - ss.raftsnapc <- snap - }() - - // create snapshot - ss.createSnap() - if !reflect.DeepEqual(ss.snap.raft(), snap) { - t.Errorf("raftsnap = %+v, want %+v", ss.snap.raft(), snap) - } - - // unused snapshot is cleared after clearUnusedSnapshotInterval - fakeClock.BlockUntil(1) - fakeClock.Advance(clearUnusedSnapshotInterval) - testutil.WaitSchedule() - ss.mu.Lock() - if ss.snap != nil { - t.Errorf("snap = %+v, want %+v", ss.snap, nil) - } - ss.mu.Unlock() -} - -func TestSnapshotStoreGetSnap(t *testing.T) { - snap := raftpb.Snapshot{ - Metadata: raftpb.SnapshotMetadata{Index: 1}, - } - ss := newSnapshotStore("", &nopKV{}) - fakeClock := clockwork.NewFakeClock() - ss.clock = fakeClock - ss.tr = &nopTransporter{} - go func() { - <-ss.reqsnapc - ss.raftsnapc <- snap - }() - - // get snap when no snapshot stored - _, err := ss.getSnap() - if err != raft.ErrSnapshotTemporarilyUnavailable { - t.Fatalf("getSnap error = %v, want %v", err, raft.ErrSnapshotTemporarilyUnavailable) - } - - // wait for asynchronous snapshot creation to finish - testutil.WaitSchedule() - // get the created snapshot - s, err := ss.getSnap() - if err != nil { - t.Fatalf("getSnap error = %v, want nil", err) - } - if !reflect.DeepEqual(s.raft(), snap) { - t.Errorf("raftsnap = %+v, want %+v", s.raft(), snap) - } - if !ss.inUse { - t.Errorf("inUse = %v, want true", ss.inUse) - } - - // get snap when snapshot stored has been in use - _, err = ss.getSnap() - if err != raft.ErrSnapshotTemporarilyUnavailable { - t.Fatalf("getSnap error = %v, want %v", err, raft.ErrSnapshotTemporarilyUnavailable) - } - - // clean up - fakeClock.Advance(clearUnusedSnapshotInterval) -} - -func TestSnapshotStoreClearUsedSnap(t *testing.T) { - s := &fakeSnapshot{} - var once sync.Once - once.Do(func() {}) - ss := &snapshotStore{ - snap: newSnapshot(raftpb.Snapshot{}, s), - inUse: true, - createOnce: once, - } - - ss.clearUsedSnap() - // wait for underlying KV snapshot closed - testutil.WaitSchedule() - s.mu.Lock() - if !s.closed { - t.Errorf("snapshot closed = %v, want true", s.closed) - } - s.mu.Unlock() - if ss.snap != nil { - t.Errorf("snapshot = %v, want nil", ss.snap) - } - if ss.inUse { - t.Errorf("isUse = %v, want false", ss.inUse) - } - // test createOnce is reset - if ss.createOnce == once { - t.Errorf("createOnce fails to reset") - } -} - -func TestSnapshotStoreCloseSnapBefore(t *testing.T) { - snapIndex := uint64(5) - - tests := []struct { - index uint64 - wok bool - }{ - {snapIndex - 2, false}, - {snapIndex - 1, false}, - {snapIndex, true}, - } - for i, tt := range tests { - rs := raftpb.Snapshot{ - Metadata: raftpb.SnapshotMetadata{Index: 5}, - } - s := &fakeSnapshot{} - ss := &snapshotStore{ - snap: newSnapshot(rs, s), - } - - ok := ss.closeSnapBefore(tt.index) - if ok != tt.wok { - t.Errorf("#%d: closeSnapBefore = %v, want %v", i, ok, tt.wok) - } - if ok { - // wait for underlying KV snapshot closed - testutil.WaitSchedule() - s.mu.Lock() - if !s.closed { - t.Errorf("#%d: snapshot closed = %v, want true", i, s.closed) - } - s.mu.Unlock() - } - } -} - -type nopKV struct{} - -func (kv *nopKV) Rev() int64 { return 0 } -func (kv *nopKV) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) { - return nil, 0, nil -} -func (kv *nopKV) Put(key, value []byte) (rev int64) { return 0 } -func (kv *nopKV) DeleteRange(key, end []byte) (n, rev int64) { return 0, 0 } -func (kv *nopKV) TxnBegin() int64 { return 0 } -func (kv *nopKV) TxnEnd(txnID int64) error { return nil } -func (kv *nopKV) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) { - return nil, 0, nil -} -func (kv *nopKV) TxnPut(txnID int64, key, value []byte) (rev int64, err error) { return 0, nil } -func (kv *nopKV) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) { - return 0, 0, nil -} -func (kv *nopKV) Compact(rev int64) error { return nil } -func (kv *nopKV) Hash() (uint32, error) { return 0, nil } -func (kv *nopKV) Snapshot() dstorage.Snapshot { return &fakeSnapshot{} } -func (kv *nopKV) Commit() {} -func (kv *nopKV) Restore() error { return nil } -func (kv *nopKV) Close() error { return nil } - -type fakeSnapshot struct { - mu sync.Mutex - closed bool -} - -func (s *fakeSnapshot) Size() int64 { return 0 } -func (s *fakeSnapshot) WriteTo(w io.Writer) (int64, error) { return 0, nil } -func (s *fakeSnapshot) Close() error { - s.mu.Lock() - s.closed = true - s.mu.Unlock() - return nil -} diff --git a/etcdserver/storage.go b/etcdserver/storage.go index b9870eae4..30891f06b 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -35,6 +35,9 @@ type Storage interface { Save(st raftpb.HardState, ents []raftpb.Entry) error // SaveSnap function saves snapshot to the underlying stable storage. SaveSnap(snap raftpb.Snapshot) error + // DBFilePath returns the file path of database snapshot saved with given + // id. + DBFilePath(id uint64) (string, error) // Close closes the Storage and performs finalization. Close() error } diff --git a/rafthttp/http.go b/rafthttp/http.go index ae14d4202..ec823820f 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -25,6 +25,7 @@ import ( pioutil "github.com/coreos/etcd/pkg/ioutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/snap" "github.com/coreos/etcd/version" ) @@ -118,16 +119,16 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } type snapshotHandler struct { - r Raft - snapSaver SnapshotSaver - cid types.ID + r Raft + snapshotter *snap.Snapshotter + cid types.ID } -func newSnapshotHandler(r Raft, snapSaver SnapshotSaver, cid types.ID) http.Handler { +func newSnapshotHandler(r Raft, snapshotter *snap.Snapshotter, cid types.ID) http.Handler { return &snapshotHandler{ - r: r, - snapSaver: snapSaver, - cid: cid, + r: r, + snapshotter: snapshotter, + cid: cid, } } @@ -168,14 +169,14 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - // save snapshot - if err := h.snapSaver.SaveFrom(r.Body, m.Snapshot.Metadata.Index); err != nil { + // save incoming database snapshot. + if err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index); err != nil { msg := fmt.Sprintf("failed to save KV snapshot (%v)", err) plog.Error(msg) http.Error(w, msg, http.StatusInternalServerError) return } - plog.Infof("received and saved snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From)) + plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From)) if err := h.r.Process(context.TODO(), m); err != nil { switch v := err.(type) { diff --git a/rafthttp/http_test.go b/rafthttp/http_test.go index c4ef9672a..341acac56 100644 --- a/rafthttp/http_test.go +++ b/rafthttp/http_test.go @@ -28,6 +28,7 @@ import ( "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/snap" "github.com/coreos/etcd/version" ) @@ -340,9 +341,10 @@ type fakePeerGetter struct { func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] } type fakePeer struct { - msgs []raftpb.Message - urls types.URLs - connc chan *outgoingConn + msgs []raftpb.Message + snapMsgs []snap.Message + urls types.URLs + connc chan *outgoingConn } func newFakePeer() *fakePeer { @@ -352,6 +354,7 @@ func newFakePeer() *fakePeer { } func (pr *fakePeer) send(m raftpb.Message) { pr.msgs = append(pr.msgs, m) } +func (pr *fakePeer) sendSnap(m snap.Message) { pr.snapMsgs = append(pr.snapMsgs, m) } func (pr *fakePeer) update(urls types.URLs) { pr.urls = urls } func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn } func (pr *fakePeer) activeSince() time.Time { return time.Time{} } diff --git a/rafthttp/peer.go b/rafthttp/peer.go index a71fd54bd..f74f5d8b0 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -23,6 +23,7 @@ import ( "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/snap" ) const ( @@ -57,6 +58,11 @@ type Peer interface { // When it fails to send message out, it will report the status to underlying // raft. send(m raftpb.Message) + + // sendSanp sends the merged snapshot message to the remote peer. Its behavior + // is similar to send. + sendSnap(m snap.Message) + // update updates the urls of remote peer. update(urls types.URLs) // attachOutgoingConn attachs the outgoing connection to the peer for @@ -110,7 +116,7 @@ type peer struct { done chan struct{} } -func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, to, cid types.ID, snapst *snapshotStore, r Raft, fs *stats.FollowerStats, errorc chan error, v3demo bool) *peer { +func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error, v3demo bool) *peer { picker := newURLPicker(urls) status := newPeerStatus(to) p := &peer{ @@ -121,7 +127,7 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t msgAppV2Writer: startStreamWriter(to, status, fs, r), writer: startStreamWriter(to, status, fs, r), pipeline: newPipeline(pipelineRt, picker, local, to, cid, status, fs, r, errorc), - snapSender: newSnapshotSender(pipelineRt, picker, local, to, cid, status, snapst, r, errorc), + snapSender: newSnapshotSender(pipelineRt, picker, local, to, cid, status, r, errorc), sendc: make(chan raftpb.Message), recvc: make(chan raftpb.Message, recvBufSize), propc: make(chan raftpb.Message, maxPendingProposals), @@ -158,10 +164,6 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t if paused { continue } - if p.v3demo && isMsgSnap(m) { - go p.snapSender.send(m) - continue - } writec, name := p.pick(m) select { case writec <- m: @@ -209,6 +211,10 @@ func (p *peer) send(m raftpb.Message) { } } +func (p *peer) sendSnap(m snap.Message) { + go p.snapSender.send(m) +} + func (p *peer) update(urls types.URLs) { select { case p.newURLsC <- urls: diff --git a/rafthttp/snapshot_sender.go b/rafthttp/snapshot_sender.go index bf297505d..8a9a99e93 100644 --- a/rafthttp/snapshot_sender.go +++ b/rafthttp/snapshot_sender.go @@ -24,7 +24,7 @@ import ( "github.com/coreos/etcd/pkg/httputil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" - "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/snap" ) type snapshotSender struct { @@ -34,14 +34,13 @@ type snapshotSender struct { tr http.RoundTripper picker *urlPicker status *peerStatus - snapst *snapshotStore r Raft errorc chan error stopc chan struct{} } -func newSnapshotSender(tr http.RoundTripper, picker *urlPicker, from, to, cid types.ID, status *peerStatus, snapst *snapshotStore, r Raft, errorc chan error) *snapshotSender { +func newSnapshotSender(tr http.RoundTripper, picker *urlPicker, from, to, cid types.ID, status *peerStatus, r Raft, errorc chan error) *snapshotSender { return &snapshotSender{ from: from, to: to, @@ -49,7 +48,6 @@ func newSnapshotSender(tr http.RoundTripper, picker *urlPicker, from, to, cid ty tr: tr, picker: picker, status: status, - snapst: snapst, r: r, errorc: errorc, stopc: make(chan struct{}), @@ -58,10 +56,12 @@ func newSnapshotSender(tr http.RoundTripper, picker *urlPicker, from, to, cid ty func (s *snapshotSender) stop() { close(s.stopc) } -func (s *snapshotSender) send(m raftpb.Message) { +func (s *snapshotSender) send(merged snap.Message) { + m := merged.Message + start := time.Now() - body := createSnapBody(m, s.snapst) + body := createSnapBody(merged) defer body.Close() u := s.picker.pick() @@ -142,20 +142,16 @@ type readCloser struct { io.Closer } -// createSnapBody creates the request body for the given raft snapshot message. -// Callers should close body when done reading from it. -func createSnapBody(m raftpb.Message, snapst *snapshotStore) io.ReadCloser { +func createSnapBody(merged snap.Message) io.ReadCloser { buf := new(bytes.Buffer) enc := &messageEncoder{w: buf} // encode raft message - if err := enc.encode(m); err != nil { + if err := enc.encode(merged.Message); err != nil { plog.Panicf("encode message error (%v)", err) } - // get snapshot - rc := snapst.get(m.Snapshot.Metadata.Index) return &readCloser{ - Reader: io.MultiReader(buf, rc), - Closer: rc, + Reader: io.MultiReader(buf, merged.ReadCloser), + Closer: merged.ReadCloser, } } diff --git a/rafthttp/snapshot_store.go b/rafthttp/snapshot_store.go deleted file mode 100644 index 239a0ed55..000000000 --- a/rafthttp/snapshot_store.go +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2015 CoreOS, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rafthttp - -import ( - "io" -) - -// snapshotStore is the store of snapshot. Caller could put one -// snapshot into the store, and get it later. -// snapshotStore stores at most one snapshot at a time, or it panics. -type snapshotStore struct { - rc io.ReadCloser - // index of the stored snapshot - // index is 0 if and only if there is no snapshot stored. - index uint64 -} - -func (s *snapshotStore) put(rc io.ReadCloser, index uint64) { - if s.index != 0 { - plog.Panicf("unexpected put when there is one snapshot stored") - } - s.rc, s.index = rc, index -} - -func (s *snapshotStore) get(index uint64) io.ReadCloser { - if s.index == index { - // set index to 0 to indicate no snapshot stored - s.index = 0 - return s.rc - } - return nil -} diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 127dab4ef..43ae95b26 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -15,7 +15,6 @@ package rafthttp import ( - "io" "net/http" "sync" "time" @@ -29,6 +28,7 @@ import ( "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/snap" ) var plog = logutil.NewMergeLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "rafthttp")) @@ -40,12 +40,6 @@ type Raft interface { ReportSnapshot(id uint64, status raft.SnapshotStatus) } -// SnapshotSaver is the interface that wraps the SaveFrom method. -type SnapshotSaver interface { - // SaveFrom saves the snapshot data at the given index from the given reader. - SaveFrom(r io.Reader, index uint64) error -} - type Transporter interface { // Start starts the given Transporter. // Start MUST be called before calling other functions in the interface. @@ -62,6 +56,9 @@ type Transporter interface { // If the id cannot be found in the transport, the message // will be ignored. Send(m []raftpb.Message) + // SendSnapshot sends out the given snapshot message to a remote peer. + // The behavior of SendSnapshot is similar to Send. + SendSnapshot(m snap.Message) // AddRemote adds a remote with given peer urls into the transport. // A remote helps newly joined member to catch up the progress of cluster, // and will not be used after that. @@ -86,14 +83,6 @@ type Transporter interface { // If the connection is active since peer was added, it returns the adding time. // If the connection is currently inactive, it returns zero time. ActiveSince(id types.ID) time.Time - // SnapshotReady accepts a snapshot at the given index that is ready to send out. - // It is expected that caller sends a raft snapshot message with - // the given index soon, and the accepted snapshot will be sent out - // together. After sending, snapshot sent status is reported - // through Raft.SnapshotStatus. - // SnapshotReady MUST not be called when the snapshot sent status of previous - // accepted one has not been reported. - SnapshotReady(rc io.ReadCloser, index uint64) // Stop closes the connections and stops the transporter. Stop() } @@ -108,10 +97,10 @@ type Transport struct { DialTimeout time.Duration // maximum duration before timing out dial of the request TLSInfo transport.TLSInfo // TLS information used when creating connection - ID types.ID // local member ID - ClusterID types.ID // raft cluster ID for request validation - Raft Raft // raft state machine, to which the Transport forwards received messages and reports status - SnapSaver SnapshotSaver // used to save snapshot in v3 snapshot messages + ID types.ID // local member ID + ClusterID types.ID // raft cluster ID for request validation + Raft Raft // raft state machine, to which the Transport forwards received messages and reports status + Snapshotter *snap.Snapshotter ServerStats *stats.ServerStats // used to record general transportation statistics // used to record transportation statistics with followers when // performing as leader in raft protocol @@ -130,8 +119,6 @@ type Transport struct { remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up peers map[types.ID]Peer // peers map - snapst *snapshotStore - prober probing.Prober } @@ -147,7 +134,6 @@ func (t *Transport) Start() error { } t.remotes = make(map[types.ID]*remote) t.peers = make(map[types.ID]Peer) - t.snapst = &snapshotStore{} t.prober = probing.NewProber(t.pipelineRt) return nil } @@ -155,7 +141,7 @@ func (t *Transport) Start() error { func (t *Transport) Handler() http.Handler { pipelineHandler := newPipelineHandler(t.Raft, t.ClusterID) streamHandler := newStreamHandler(t, t.Raft, t.ID, t.ClusterID) - snapHandler := newSnapshotHandler(t.Raft, t.SnapSaver, t.ClusterID) + snapHandler := newSnapshotHandler(t.Raft, t.Snapshotter, t.ClusterID) mux := http.NewServeMux() mux.Handle(RaftPrefix, pipelineHandler) mux.Handle(RaftStreamPrefix+"/", streamHandler) @@ -240,7 +226,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) { plog.Panicf("newURLs %+v should never fail: %+v", us, err) } fs := t.LeaderStats.Follower(id.String()) - t.peers[id] = startPeer(t.streamRt, t.pipelineRt, urls, t.ID, id, t.ClusterID, t.snapst, t.Raft, fs, t.ErrorC, t.V3demo) + t.peers[id] = startPeer(t.streamRt, t.pipelineRt, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC, t.V3demo) addPeerToProber(t.prober, id.String(), us) } @@ -296,8 +282,13 @@ func (t *Transport) ActiveSince(id types.ID) time.Time { return time.Time{} } -func (t *Transport) SnapshotReady(rc io.ReadCloser, index uint64) { - t.snapst.put(rc, index) +func (t *Transport) SendSnapshot(m snap.Message) { + p := t.peers[types.ID(m.To)] + if p == nil { + m.ReadCloser.Close() + return + } + p.sendSnap(m) } type Pausable interface { diff --git a/snap/db.go b/snap/db.go new file mode 100644 index 000000000..4c0f9471c --- /dev/null +++ b/snap/db.go @@ -0,0 +1,67 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package snap + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "path" + + "github.com/coreos/etcd/pkg/fileutil" +) + +// SaveDBFrom saves snapshot of the database from the given reader. It +// guarantees the save operation is atomic. +func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) error { + f, err := ioutil.TempFile(s.dir, "tmp") + if err != nil { + return err + } + _, err = io.Copy(f, r) + f.Close() + if err != nil { + os.Remove(f.Name()) + return err + } + fn := path.Join(s.dir, fmt.Sprintf("%016x.snap.db", id)) + if fileutil.Exist(fn) { + os.Remove(f.Name()) + return nil + } + err = os.Rename(f.Name(), fn) + if err != nil { + os.Remove(f.Name()) + return err + } + return nil +} + +// DBFilePath returns the file path for the snapshot of the database with +// given id. If the snapshot does not exist, it returns error. +func (s *Snapshotter) DBFilePath(id uint64) (string, error) { + fns, err := fileutil.ReadDir(s.dir) + if err != nil { + return "", err + } + wfn := fmt.Sprintf("%016x.snap.db", id) + for _, fn := range fns { + if fn == wfn { + return path.Join(s.dir, fn), nil + } + } + return "", fmt.Errorf("snap: snapshot file doesn't exist") +} diff --git a/snap/message.go b/snap/message.go new file mode 100644 index 000000000..1b7fff192 --- /dev/null +++ b/snap/message.go @@ -0,0 +1,34 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package snap + +import ( + "io" + + "github.com/coreos/etcd/raft/raftpb" +) + +// Message is a struct that contains a raft Message and a ReadCloser. The type +// of raft message MUST be MsgSnap, which contains the raft meta-data and an +// additional data []byte field that contains the snapshot of the actual state +// machine. +// Message contains the ReadCloser field for handling large snapshot. This avoid +// copying the entire snapshot into a byte array, which consumes a lot of memory. +// +// User of Message should close the ReadCloser after sending it. +type Message struct { + raftpb.Message + ReadCloser io.ReadCloser +}