diff --git a/etcdserver/backend.go b/etcdserver/backend.go new file mode 100644 index 000000000..c5e2dabf3 --- /dev/null +++ b/etcdserver/backend.go @@ -0,0 +1,81 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +import ( + "fmt" + "os" + "time" + + "github.com/coreos/etcd/lease" + "github.com/coreos/etcd/mvcc" + "github.com/coreos/etcd/mvcc/backend" + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/snap" +) + +func newBackend(cfg *ServerConfig) backend.Backend { + bcfg := backend.DefaultBackendConfig() + bcfg.Path = cfg.backendPath() + if cfg.QuotaBackendBytes > 0 && cfg.QuotaBackendBytes != DefaultQuotaBytes { + // permit 10% excess over quota for disarm + bcfg.MmapSize = uint64(cfg.QuotaBackendBytes + cfg.QuotaBackendBytes/10) + } + return backend.New(bcfg) +} + +// openSnapshotBackend renames a snapshot db to the current etcd db and opens it. +func openSnapshotBackend(cfg *ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) { + snapPath, err := ss.DBFilePath(snapshot.Metadata.Index) + if err != nil { + return nil, fmt.Errorf("database snapshot file path error: %v", err) + } + if err := os.Rename(snapPath, cfg.backendPath()); err != nil { + return nil, fmt.Errorf("rename snapshot file error: %v", err) + } + return openBackend(cfg), nil +} + +// openBackend returns a backend using the current etcd db. +func openBackend(cfg *ServerConfig) backend.Backend { + fn := cfg.backendPath() + beOpened := make(chan backend.Backend) + go func() { + beOpened <- newBackend(cfg) + }() + select { + case be := <-beOpened: + return be + case <-time.After(time.Second): + plog.Warningf("another etcd process is using %q and holds the file lock.", fn) + plog.Warningf("waiting for it to exit before starting...") + } + return <-beOpened +} + +// recoverBackendSnapshot recovers the DB from a snapshot in case etcd crashes +// before updating the backend db after persisting raft snapshot to disk, +// violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this +// case, replace the db with the snapshot db sent by the leader. +func recoverSnapshotBackend(cfg *ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) { + var cIndex consistentIndex + kv := mvcc.New(oldbe, &lease.FakeLessor{}, &cIndex) + defer kv.Close() + if snapshot.Metadata.Index <= kv.ConsistentIndex() { + return oldbe, nil + } + oldbe.Close() + return openSnapshotBackend(cfg, snap.New(cfg.SnapDir()), snapshot) +} diff --git a/etcdserver/config.go b/etcdserver/config.go index 50bc21292..9c258934a 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -200,3 +200,5 @@ func (c *ServerConfig) bootstrapTimeout() time.Duration { } return time.Second } + +func (c *ServerConfig) backendPath() string { return filepath.Join(c.SnapDir(), "db") } diff --git a/etcdserver/server.go b/etcdserver/server.go index 1c2a95c05..87602606b 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -23,7 +23,6 @@ import ( "net/http" "os" "path" - "path/filepath" "regexp" "sync" "sync/atomic" @@ -76,7 +75,6 @@ const ( // (since it will timeout). monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second - databaseFilename = "db" // max number of in-flight snapshot messages etcdserver allows to have // This number is more than enough for most clusters with 5 machines. maxInFlightMsgSnap = 16 @@ -200,7 +198,8 @@ type EtcdServer struct { cluster *membership.RaftCluster - store store.Store + store store.Store + snapshotter *snap.Snapshotter applyV2 ApplierV2 @@ -271,10 +270,9 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { } ss := snap.New(cfg.SnapDir()) - bepath := filepath.Join(cfg.SnapDir(), databaseFilename) + bepath := cfg.backendPath() beExist := fileutil.Exist(bepath) - - be := openBackend(bepath, cfg.QuotaBackendBytes) + be := openBackend(cfg) defer func() { if err != nil { @@ -372,9 +370,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { plog.Panicf("recovered store from snapshot error: %v", err) } plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index) - - be, err = checkAndRecoverDB(snapshot, be, cfg.QuotaBackendBytes, cfg.SnapDir()) - if err != nil { + if be, err = recoverSnapshotBackend(cfg, be, *snapshot); err != nil { plog.Panicf("recovering backend from snapshot error: %v", err) } } @@ -408,11 +404,12 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { heartbeat := time.Duration(cfg.TickMs) * time.Millisecond srv = &EtcdServer{ - readych: make(chan struct{}), - Cfg: cfg, - snapCount: cfg.SnapCount, - errorc: make(chan error, 1), - store: st, + readych: make(chan struct{}), + Cfg: cfg, + snapCount: cfg.SnapCount, + errorc: make(chan error, 1), + store: st, + snapshotter: ss, r: *newRaftNode( raftNodeConfig{ isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, @@ -795,21 +792,14 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { apply.snapshot.Metadata.Index, ep.appliedi) } - // wait for raftNode to persist snashot onto the disk + // wait for raftNode to persist snapshot onto the disk <-apply.notifyc - snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index) + newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot) if err != nil { - plog.Panicf("get database snapshot file path error: %v", err) + plog.Panic(err) } - fn := filepath.Join(s.Cfg.SnapDir(), databaseFilename) - if err := os.Rename(snapfn, fn); err != nil { - plog.Panicf("rename snapshot file error: %v", err) - } - - newbe := newBackend(fn, s.Cfg.QuotaBackendBytes) - // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. if s.lessor != nil { @@ -1662,13 +1652,3 @@ func (s *EtcdServer) goAttach(f func()) { f() }() } - -func newBackend(path string, quotaBytes int64) backend.Backend { - bcfg := backend.DefaultBackendConfig() - bcfg.Path = path - if quotaBytes > 0 && quotaBytes != DefaultQuotaBytes { - // permit 10% excess over quota for disarm - bcfg.MmapSize = uint64(quotaBytes + quotaBytes/10) - } - return backend.New(bcfg) -} diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 0f2b0edf7..c7ba09519 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -20,6 +20,7 @@ import ( "io/ioutil" "os" "path" + "path/filepath" "reflect" "testing" "time" @@ -29,6 +30,7 @@ import ( "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc" "github.com/coreos/etcd/mvcc/backend" + "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/idutil" "github.com/coreos/etcd/pkg/mock/mockstorage" "github.com/coreos/etcd/pkg/mock/mockstore" @@ -40,6 +42,7 @@ import ( "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/rafthttp" + "github.com/coreos/etcd/snap" "github.com/coreos/etcd/store" "golang.org/x/net/context" ) @@ -964,13 +967,15 @@ func TestSnapshotOrdering(t *testing.T) { t.Fatalf("couldn't open tempdir (%v)", err) } defer os.RemoveAll(testdir) - if err := os.MkdirAll(testdir+"/member/snap", 0755); err != nil { + + snapdir := filepath.Join(testdir, "member", "snap") + if err := os.MkdirAll(snapdir, 0755); err != nil { t.Fatalf("couldn't make snap dir (%v)", err) } rs := raft.NewMemoryStorage() p := mockstorage.NewStorageRecorderStream(testdir) - tr, snapDoneC := rafthttp.NewSnapTransporter(testdir) + tr, snapDoneC := rafthttp.NewSnapTransporter(snapdir) r := newRaftNode(raftNodeConfig{ isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, Node: n, @@ -982,10 +987,11 @@ func TestSnapshotOrdering(t *testing.T) { Cfg: &ServerConfig{ DataDir: testdir, }, - r: *r, - store: st, - cluster: cl, - SyncTicker: &time.Ticker{}, + r: *r, + store: st, + snapshotter: snap.New(snapdir), + cluster: cl, + SyncTicker: &time.Ticker{}, } s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster} @@ -997,40 +1003,30 @@ func TestSnapshotOrdering(t *testing.T) { s.start() defer s.Stop() - actionc := p.Chan() n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}} - if ac := <-actionc; ac.Name != "Save" { - // MsgSnap triggers raftNode to call Save() - t.Fatalf("expect save() is called, but got %v", ac.Name) + go func() { + // get the snapshot sent by the transport + snapMsg := <-snapDoneC + // Snapshot first triggers raftnode to persists the snapshot onto disk + // before renaming db snapshot file to db + snapMsg.Snapshot.Metadata.Index = 1 + n.readyc <- raft.Ready{Snapshot: snapMsg.Snapshot} + }() + + if ac := <-p.Chan(); ac.Name != "Save" { + t.Fatalf("expected Save, got %+v", ac) } - - // get the snapshot sent by the transport - snapMsg := <-snapDoneC - - // Snapshot first triggers raftnode to persists the snapshot onto disk - // before renaming db snapshot file to db - snapMsg.Snapshot.Metadata.Index = 1 - n.readyc <- raft.Ready{Snapshot: snapMsg.Snapshot} - var seenSaveSnap bool - timer := time.After(5 * time.Second) - for { - select { - case ac := <-actionc: - switch ac.Name { - // DBFilePath() is called immediately before snapshot renaming. - case "DBFilePath": - if !seenSaveSnap { - t.Fatalf("DBFilePath called before SaveSnap") - } - return - case "SaveSnap": - seenSaveSnap = true - default: - continue - } - case <-timer: - t.Fatalf("timeout waiting on actions") - } + if ac := <-p.Chan(); ac.Name != "Save" { + t.Fatalf("expected Save, got %+v", ac) + } + // confirm snapshot file still present before calling SaveSnap + snapPath := filepath.Join(snapdir, fmt.Sprintf("%016x.snap.db", 1)) + if !fileutil.Exist(snapPath) { + t.Fatalf("expected file %q, got missing", snapPath) + } + // unblock SaveSnapshot, etcdserver now permitted to move snapshot file + if ac := <-p.Chan(); ac.Name != "SaveSnap" { + t.Fatalf("expected SaveSnap, got %+v", ac) } } @@ -1119,10 +1115,11 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { Cfg: &ServerConfig{ DataDir: testdir, }, - r: *r, - store: st, - cluster: cl, - SyncTicker: &time.Ticker{}, + r: *r, + store: st, + snapshotter: snap.New(testdir), + cluster: cl, + SyncTicker: &time.Ticker{}, } s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster} diff --git a/etcdserver/storage.go b/etcdserver/storage.go index 693618fbd..aa8f87569 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -32,9 +32,6 @@ type Storage interface { Save(st raftpb.HardState, ents []raftpb.Entry) error // SaveSnap function saves snapshot to the underlying stable storage. SaveSnap(snap raftpb.Snapshot) error - // DBFilePath returns the file path of database snapshot saved with given - // id. - DBFilePath(id uint64) (string, error) // Close closes the Storage and performs finalization. Close() error } diff --git a/etcdserver/util.go b/etcdserver/util.go index de7ef179a..e3896ffc2 100644 --- a/etcdserver/util.go +++ b/etcdserver/util.go @@ -15,18 +15,11 @@ package etcdserver import ( - "fmt" - "os" "time" "github.com/coreos/etcd/etcdserver/membership" - "github.com/coreos/etcd/lease" - "github.com/coreos/etcd/mvcc" - "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/pkg/types" - "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/rafthttp" - "github.com/coreos/etcd/snap" ) // isConnectedToQuorumSince checks whether the local member is connected to the @@ -102,55 +95,3 @@ func (nc *notifier) notify(err error) { nc.err = err close(nc.c) } - -// checkAndRecoverDB attempts to recover db in the scenario when -// etcd server crashes before updating its in-state db -// and after persisting snapshot to disk from syncing with leader, -// snapshot can be newer than db where -// (snapshot.Metadata.Index > db.consistentIndex ). -// -// when that happen: -// 1. find xxx.snap.db that matches snap index. -// 2. rename xxx.snap.db to db. -// 3. open the new db as the backend. -func checkAndRecoverDB(snapshot *raftpb.Snapshot, oldbe backend.Backend, quotaBackendBytes int64, snapdir string) (be backend.Backend, err error) { - var cIndex consistentIndex - kv := mvcc.New(oldbe, &lease.FakeLessor{}, &cIndex) - defer kv.Close() - kvindex := kv.ConsistentIndex() - if snapshot.Metadata.Index <= kvindex { - return oldbe, nil - } - - id := snapshot.Metadata.Index - snapfn, err := snap.DBFilePathFromID(snapdir, id) - if err != nil { - return nil, fmt.Errorf("finding %v error: %v", snapdir+fmt.Sprintf("%016x.snap.db", id), err) - } - - bepath := snapdir + databaseFilename - if err := os.Rename(snapfn, bepath); err != nil { - return nil, fmt.Errorf("rename snapshot file error: %v", err) - } - - oldbe.Close() - be = openBackend(bepath, quotaBackendBytes) - return be, nil -} - -func openBackend(bepath string, quotaBackendBytes int64) (be backend.Backend) { - beOpened := make(chan struct{}) - go func() { - be = newBackend(bepath, quotaBackendBytes) - beOpened <- struct{}{} - }() - - select { - case <-beOpened: - case <-time.After(time.Second): - plog.Warningf("another etcd process is running with the same data dir and holding the file lock.") - plog.Warningf("waiting for it to exit before starting...") - <-beOpened - } - return be -} diff --git a/pkg/mock/mockstorage/storage_recorder.go b/pkg/mock/mockstorage/storage_recorder.go index 4a592545e..4ecab9831 100644 --- a/pkg/mock/mockstorage/storage_recorder.go +++ b/pkg/mock/mockstorage/storage_recorder.go @@ -15,8 +15,6 @@ package mockstorage import ( - "fmt" - "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" @@ -47,13 +45,4 @@ func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error { return nil } -func (p *storageRecorder) DBFilePath(id uint64) (string, error) { - p.Record(testutil.Action{Name: "DBFilePath"}) - path := p.dbPath - if path != "" { - path = path + "/" - } - return fmt.Sprintf("%s%016x.snap.db", path, id), nil -} - func (p *storageRecorder) Close() error { return nil } diff --git a/snap/db.go b/snap/db.go index 77d109141..01d897ae8 100644 --- a/snap/db.go +++ b/snap/db.go @@ -44,7 +44,7 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) { os.Remove(f.Name()) return n, err } - fn := filepath.Join(s.dir, fmt.Sprintf("%016x.snap.db", id)) + fn := s.dbFilePath(id) if fileutil.Exist(fn) { os.Remove(f.Name()) return n, nil @@ -63,19 +63,15 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) { // DBFilePath returns the file path for the snapshot of the database with // given id. If the snapshot does not exist, it returns error. func (s *Snapshotter) DBFilePath(id uint64) (string, error) { - return DBFilePathFromID(s.dir, id) -} - -func DBFilePathFromID(dbPath string, id uint64) (string, error) { - fns, err := fileutil.ReadDir(dbPath) - if err != nil { + if _, err := fileutil.ReadDir(s.dir); err != nil { return "", err } - wfn := fmt.Sprintf("%016x.snap.db", id) - for _, fn := range fns { - if fn == wfn { - return filepath.Join(dbPath, fn), nil - } + if fn := s.dbFilePath(id); fileutil.Exist(fn) { + return fn, nil } return "", ErrNoDBSnapshot } + +func (s *Snapshotter) dbFilePath(id uint64) string { + return filepath.Join(s.dir, fmt.Sprintf("%016x.snap.db", id)) +}