diff --git a/snapshot/v3_snapshot.go b/snapshot/v3_snapshot.go index a74408a3f..48847e5b7 100644 --- a/snapshot/v3_snapshot.go +++ b/snapshot/v3_snapshot.go @@ -25,6 +25,7 @@ import ( "os" "path/filepath" "reflect" + "time" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/etcdserver" @@ -48,65 +49,34 @@ import ( // Manager defines snapshot methods. type Manager interface { - // Save fetches snapshot from remote etcd server and saves data to target path. - // If the context "ctx" is canceled or timed out, snapshot save stream will error out - // (e.g. context.Canceled, context.DeadlineExceeded). - Save(ctx context.Context, dbPath string) error + // Save fetches snapshot from remote etcd server and saves data + // to target path. If the context "ctx" is canceled or timed out, + // snapshot save stream will error out (e.g. context.Canceled, + // context.DeadlineExceeded). Make sure to specify only one endpoint + // in client configuration. Snapshot API must be requested to a + // selected node, and saved snapshot is the point-in-time state of + // the selected node. + Save(ctx context.Context, cfg clientv3.Config, dbPath string) error // Status returns the snapshot file information. Status(dbPath string) (Status, error) - // Restore restores a new etcd data directory from given snapshot file. - // It returns an error if specified data directory already exists, to - // prevent unintended data directory overwrites. - Restore(dbPath string, cfg RestoreConfig) error -} - -// Status is the snapshot file status. -type Status struct { - Hash uint32 `json:"hash"` - Revision int64 `json:"revision"` - TotalKey int `json:"totalKey"` - TotalSize int64 `json:"totalSize"` -} - -// RestoreConfig configures snapshot restore operation. -type RestoreConfig struct { - // Name is the human-readable name of this member. - Name string - // OutputDataDir is the target data directory to save restored data. - // OutputDataDir should not conflict with existing etcd data directory. - // If OutputDataDir already exists, it will return an error to prevent - // unintended data directory overwrites. - // Defaults to "[Name].etcd" if not given. - OutputDataDir string - // OutputWALDir is the target WAL data directory. - // Defaults to "[OutputDataDir]/member/wal" if not given. - OutputWALDir string - // InitialCluster is the initial cluster configuration for restore bootstrap. - InitialCluster types.URLsMap - // InitialClusterToken is the initial cluster token for etcd cluster during restore bootstrap. - InitialClusterToken string - // PeerURLs is a list of member's peer URLs to advertise to the rest of the cluster. - PeerURLs types.URLs - // SkipHashCheck is "true" to ignore snapshot integrity hash value - // (required if copied from data directory). - SkipHashCheck bool + // Restore restores a new etcd data directory from given snapshot + // file. It returns an error if specified data directory already + // exists, to prevent unintended data directory overwrites. + Restore(cfg RestoreConfig) error } // NewV3 returns a new snapshot Manager for v3.x snapshot. -// "*clientv3.Client" is only used for "Save" method. -// Otherwise, pass "nil". -func NewV3(cli *clientv3.Client, lg *zap.Logger) Manager { +func NewV3(lg *zap.Logger) Manager { if lg == nil { lg = zap.NewExample() } - return &v3Manager{cli: cli, lg: lg} + return &v3Manager{lg: lg} } type v3Manager struct { - cli *clientv3.Client - lg *zap.Logger + lg *zap.Logger name string dbPath string @@ -117,11 +87,23 @@ type v3Manager struct { skipHashCheck bool } -func (s *v3Manager) Save(ctx context.Context, dbPath string) error { - partpath := dbPath + ".part" - f, err := os.Create(partpath) +// Save fetches snapshot from remote etcd server and saves data to target path. +func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string) error { + if len(cfg.Endpoints) != 1 { + return fmt.Errorf("snapshot must be requested to one selected node, not multiple %v", cfg.Endpoints) + } + cli, err := clientv3.New(cfg) + if err != nil { + return err + } + defer cli.Close() + + partpath := dbPath + ".part" + defer os.RemoveAll(partpath) + + var f *os.File + f, err = os.Create(partpath) if err != nil { - os.RemoveAll(partpath) return fmt.Errorf("could not open %s (%v)", partpath, err) } s.lg.Info( @@ -129,34 +111,47 @@ func (s *v3Manager) Save(ctx context.Context, dbPath string) error { zap.String("path", partpath), ) + now := time.Now() var rd io.ReadCloser - rd, err = s.cli.Snapshot(ctx) + rd, err = cli.Snapshot(ctx) if err != nil { - os.RemoveAll(partpath) return err } - s.lg.Info("copying from snapshot stream") + s.lg.Info( + "fetching snapshot", + zap.String("endpoint", cfg.Endpoints[0]), + ) if _, err = io.Copy(f, rd); err != nil { - os.RemoveAll(partpath) return err } if err = fileutil.Fsync(f); err != nil { - os.RemoveAll(partpath) return err } if err = f.Close(); err != nil { - os.RemoveAll(partpath) return err } + s.lg.Info( + "fetched snapshot", + zap.String("endpoint", cfg.Endpoints[0]), + zap.Duration("took", time.Since(now)), + ) - s.lg.Info("rename", zap.String("from", partpath), zap.String("to", dbPath)) if err = os.Rename(partpath, dbPath); err != nil { - os.RemoveAll(partpath) return fmt.Errorf("could not rename %s to %s (%v)", partpath, dbPath, err) } + s.lg.Info("saved", zap.String("path", dbPath)) return nil } +// Status is the snapshot file status. +type Status struct { + Hash uint32 `json:"hash"` + Revision int64 `json:"revision"` + TotalKey int `json:"totalKey"` + TotalSize int64 `json:"totalSize"` +} + +// Status returns the snapshot file information. func (s *v3Manager) Status(dbPath string) (ds Status, err error) { if _, err = os.Stat(dbPath); err != nil { return ds, err @@ -200,19 +195,60 @@ func (s *v3Manager) Status(dbPath string) (ds Status, err error) { return ds, nil } -func (s *v3Manager) Restore(dbPath string, cfg RestoreConfig) error { - srv := etcdserver.ServerConfig{ - Name: cfg.Name, - InitialClusterToken: cfg.InitialClusterToken, - InitialPeerURLsMap: cfg.InitialCluster, - PeerURLs: cfg.PeerURLs, +// RestoreConfig configures snapshot restore operation. +type RestoreConfig struct { + // SnapshotPath is the path of snapshot file to restore from. + SnapshotPath string + + // Name is the human-readable name of this member. + Name string + + // OutputDataDir is the target data directory to save restored data. + // OutputDataDir should not conflict with existing etcd data directory. + // If OutputDataDir already exists, it will return an error to prevent + // unintended data directory overwrites. + // If empty, defaults to "[Name].etcd" if not given. + OutputDataDir string + // OutputWALDir is the target WAL data directory. + // If empty, defaults to "[OutputDataDir]/member/wal" if not given. + OutputWALDir string + + // PeerURLs is a list of member's peer URLs to advertise to the rest of the cluster. + PeerURLs []string + + // InitialCluster is the initial cluster configuration for restore bootstrap. + InitialCluster string + // InitialClusterToken is the initial cluster token for etcd cluster during restore bootstrap. + InitialClusterToken string + + // SkipHashCheck is "true" to ignore snapshot integrity hash value + // (required if copied from data directory). + SkipHashCheck bool +} + +// Restore restores a new etcd data directory from given snapshot file. +func (s *v3Manager) Restore(cfg RestoreConfig) error { + pURLs, err := types.NewURLs(cfg.PeerURLs) + if err != nil { + return err } - if err := srv.VerifyBootstrap(); err != nil { + var ics types.URLsMap + ics, err = types.NewURLsMap(cfg.InitialCluster) + if err != nil { return err } - var err error - s.cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialCluster) + srv := etcdserver.ServerConfig{ + Name: cfg.Name, + PeerURLs: pURLs, + InitialPeerURLsMap: ics, + InitialClusterToken: cfg.InitialClusterToken, + } + if err = srv.VerifyBootstrap(); err != nil { + return err + } + + s.cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, ics) if err != nil { return err } @@ -221,44 +257,44 @@ func (s *v3Manager) Restore(dbPath string, cfg RestoreConfig) error { if dataDir == "" { dataDir = cfg.Name + ".etcd" } - if _, err = os.Stat(dataDir); err == nil { + if fileutil.Exist(dataDir) { return fmt.Errorf("data-dir %q exists", dataDir) } + walDir := cfg.OutputWALDir if walDir == "" { walDir = filepath.Join(dataDir, "member", "wal") - } else if _, err = os.Stat(walDir); err == nil { + } else if fileutil.Exist(walDir) { return fmt.Errorf("wal-dir %q exists", walDir) } - s.lg.Info( - "restoring snapshot file", - zap.String("path", dbPath), - zap.String("wal-dir", walDir), - zap.String("data-dir", dataDir), - zap.String("snap-dir", s.snapDir), - ) - s.name = cfg.Name - s.dbPath = dbPath + s.dbPath = cfg.SnapshotPath s.walDir = walDir s.snapDir = filepath.Join(dataDir, "member", "snap") s.skipHashCheck = cfg.SkipHashCheck + s.lg.Info( + "restoring snapshot", + zap.String("path", s.dbPath), + zap.String("wal-dir", s.walDir), + zap.String("data-dir", dataDir), + zap.String("snap-dir", s.snapDir), + ) if err = s.saveDB(); err != nil { return err } if err = s.saveWALAndSnap(); err != nil { return err } - s.lg.Info( - "finished restoring snapshot file", - zap.String("path", dbPath), - zap.String("wal-dir", walDir), + "restored snapshot", + zap.String("path", s.dbPath), + zap.String("wal-dir", s.walDir), zap.String("data-dir", dataDir), zap.String("snap-dir", s.snapDir), ) + return nil } diff --git a/snapshot/v3_snapshot_test.go b/snapshot/v3_snapshot_test.go index f1468b429..2aed7b014 100644 --- a/snapshot/v3_snapshot_test.go +++ b/snapshot/v3_snapshot_test.go @@ -27,7 +27,6 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/embed" "github.com/coreos/etcd/pkg/testutil" - "github.com/coreos/etcd/pkg/types" "go.uber.org/zap" ) @@ -52,29 +51,23 @@ func TestSnapshotV3RestoreSingle(t *testing.T) { cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String()) cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond())) - sp := NewV3(nil, zap.NewExample()) - - err := sp.Restore(dbPath, RestoreConfig{}) - if err.Error() != `couldn't find local name "" in the initial cluster configuration` { - t.Fatalf("expected restore error, got %v", err) + sp := NewV3(zap.NewExample()) + pss := make([]string, 0, len(pURLs)) + for _, p := range pURLs { + pss = append(pss, p.String()) } - var iURLs types.URLsMap - iURLs, err = types.NewURLsMap(cfg.InitialCluster) - if err != nil { - t.Fatal(err) - } - if err = sp.Restore(dbPath, RestoreConfig{ + if err := sp.Restore(RestoreConfig{ + SnapshotPath: dbPath, Name: cfg.Name, OutputDataDir: cfg.Dir, - InitialCluster: iURLs, + InitialCluster: cfg.InitialCluster, InitialClusterToken: cfg.InitialClusterToken, - PeerURLs: pURLs, + PeerURLs: pss, }); err != nil { t.Fatal(err) } - var srv *embed.Etcd - srv, err = embed.StartEtcd(cfg) + srv, err := embed.StartEtcd(cfg) if err != nil { t.Fatal(err) } @@ -176,10 +169,12 @@ func createSnapshotFile(t *testing.T, kvs []kv) string { t.Fatalf("failed to start embed.Etcd for creating snapshots") } - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}}) + ccfg := clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}} + cli, err := clientv3.New(ccfg) if err != nil { t.Fatal(err) } + defer cli.Close() for i := range kvs { ctx, cancel := context.WithTimeout(context.Background(), testutil.RequestTimeout) _, err = cli.Put(ctx, kvs[i].k, kvs[i].v) @@ -189,9 +184,9 @@ func createSnapshotFile(t *testing.T, kvs []kv) string { } } - sp := NewV3(cli, zap.NewExample()) + sp := NewV3(zap.NewExample()) dpPath := filepath.Join(os.TempDir(), fmt.Sprintf("snapshot%d.db", time.Now().Nanosecond())) - if err = sp.Save(context.Background(), dpPath); err != nil { + if err = sp.Save(context.Background(), ccfg, dpPath); err != nil { t.Fatal(err) } @@ -214,10 +209,6 @@ func restoreCluster(t *testing.T, clusterN int, dbPath string) ( ics += fmt.Sprintf(",%d=%s", i, pURLs[i].String()) } ics = ics[1:] - iURLs, err := types.NewURLsMap(ics) - if err != nil { - t.Fatal(err) - } cfgs := make([]*embed.Config, clusterN) for i := 0; i < clusterN; i++ { @@ -230,13 +221,14 @@ func restoreCluster(t *testing.T, clusterN int, dbPath string) ( cfg.InitialCluster = ics cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()+i)) - sp := NewV3(nil, zap.NewExample()) - if err := sp.Restore(dbPath, RestoreConfig{ + sp := NewV3(zap.NewExample()) + if err := sp.Restore(RestoreConfig{ + SnapshotPath: dbPath, Name: cfg.Name, OutputDataDir: cfg.Dir, - InitialCluster: iURLs, + PeerURLs: []string{pURLs[i].String()}, + InitialCluster: ics, InitialClusterToken: cfg.InitialClusterToken, - PeerURLs: types.URLs{pURLs[i]}, }); err != nil { t.Fatal(err) }