snapshot: enforce single endpoint in client, change interface

To enforce single endpoint in client configuration.
And pass client object only to "Save" method.

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
release-3.4
Gyuho Lee 2018-04-12 10:26:17 -07:00
parent 70341b1614
commit a2b1449431
2 changed files with 137 additions and 109 deletions

View File

@ -25,6 +25,7 @@ import (
"os"
"path/filepath"
"reflect"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver"
@ -48,65 +49,34 @@ import (
// Manager defines snapshot methods.
type Manager interface {
// Save fetches snapshot from remote etcd server and saves data to target path.
// If the context "ctx" is canceled or timed out, snapshot save stream will error out
// (e.g. context.Canceled, context.DeadlineExceeded).
Save(ctx context.Context, dbPath string) error
// Save fetches snapshot from remote etcd server and saves data
// to target path. If the context "ctx" is canceled or timed out,
// snapshot save stream will error out (e.g. context.Canceled,
// context.DeadlineExceeded). Make sure to specify only one endpoint
// in client configuration. Snapshot API must be requested to a
// selected node, and saved snapshot is the point-in-time state of
// the selected node.
Save(ctx context.Context, cfg clientv3.Config, dbPath string) error
// Status returns the snapshot file information.
Status(dbPath string) (Status, error)
// Restore restores a new etcd data directory from given snapshot file.
// It returns an error if specified data directory already exists, to
// prevent unintended data directory overwrites.
Restore(dbPath string, cfg RestoreConfig) error
}
// Status is the snapshot file status.
type Status struct {
Hash uint32 `json:"hash"`
Revision int64 `json:"revision"`
TotalKey int `json:"totalKey"`
TotalSize int64 `json:"totalSize"`
}
// RestoreConfig configures snapshot restore operation.
type RestoreConfig struct {
// Name is the human-readable name of this member.
Name string
// OutputDataDir is the target data directory to save restored data.
// OutputDataDir should not conflict with existing etcd data directory.
// If OutputDataDir already exists, it will return an error to prevent
// unintended data directory overwrites.
// Defaults to "[Name].etcd" if not given.
OutputDataDir string
// OutputWALDir is the target WAL data directory.
// Defaults to "[OutputDataDir]/member/wal" if not given.
OutputWALDir string
// InitialCluster is the initial cluster configuration for restore bootstrap.
InitialCluster types.URLsMap
// InitialClusterToken is the initial cluster token for etcd cluster during restore bootstrap.
InitialClusterToken string
// PeerURLs is a list of member's peer URLs to advertise to the rest of the cluster.
PeerURLs types.URLs
// SkipHashCheck is "true" to ignore snapshot integrity hash value
// (required if copied from data directory).
SkipHashCheck bool
// Restore restores a new etcd data directory from given snapshot
// file. It returns an error if specified data directory already
// exists, to prevent unintended data directory overwrites.
Restore(cfg RestoreConfig) error
}
// NewV3 returns a new snapshot Manager for v3.x snapshot.
// "*clientv3.Client" is only used for "Save" method.
// Otherwise, pass "nil".
func NewV3(cli *clientv3.Client, lg *zap.Logger) Manager {
func NewV3(lg *zap.Logger) Manager {
if lg == nil {
lg = zap.NewExample()
}
return &v3Manager{cli: cli, lg: lg}
return &v3Manager{lg: lg}
}
type v3Manager struct {
cli *clientv3.Client
lg *zap.Logger
lg *zap.Logger
name string
dbPath string
@ -117,11 +87,23 @@ type v3Manager struct {
skipHashCheck bool
}
func (s *v3Manager) Save(ctx context.Context, dbPath string) error {
partpath := dbPath + ".part"
f, err := os.Create(partpath)
// Save fetches snapshot from remote etcd server and saves data to target path.
func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string) error {
if len(cfg.Endpoints) != 1 {
return fmt.Errorf("snapshot must be requested to one selected node, not multiple %v", cfg.Endpoints)
}
cli, err := clientv3.New(cfg)
if err != nil {
return err
}
defer cli.Close()
partpath := dbPath + ".part"
defer os.RemoveAll(partpath)
var f *os.File
f, err = os.Create(partpath)
if err != nil {
os.RemoveAll(partpath)
return fmt.Errorf("could not open %s (%v)", partpath, err)
}
s.lg.Info(
@ -129,34 +111,47 @@ func (s *v3Manager) Save(ctx context.Context, dbPath string) error {
zap.String("path", partpath),
)
now := time.Now()
var rd io.ReadCloser
rd, err = s.cli.Snapshot(ctx)
rd, err = cli.Snapshot(ctx)
if err != nil {
os.RemoveAll(partpath)
return err
}
s.lg.Info("copying from snapshot stream")
s.lg.Info(
"fetching snapshot",
zap.String("endpoint", cfg.Endpoints[0]),
)
if _, err = io.Copy(f, rd); err != nil {
os.RemoveAll(partpath)
return err
}
if err = fileutil.Fsync(f); err != nil {
os.RemoveAll(partpath)
return err
}
if err = f.Close(); err != nil {
os.RemoveAll(partpath)
return err
}
s.lg.Info(
"fetched snapshot",
zap.String("endpoint", cfg.Endpoints[0]),
zap.Duration("took", time.Since(now)),
)
s.lg.Info("rename", zap.String("from", partpath), zap.String("to", dbPath))
if err = os.Rename(partpath, dbPath); err != nil {
os.RemoveAll(partpath)
return fmt.Errorf("could not rename %s to %s (%v)", partpath, dbPath, err)
}
s.lg.Info("saved", zap.String("path", dbPath))
return nil
}
// Status is the snapshot file status.
type Status struct {
Hash uint32 `json:"hash"`
Revision int64 `json:"revision"`
TotalKey int `json:"totalKey"`
TotalSize int64 `json:"totalSize"`
}
// Status returns the snapshot file information.
func (s *v3Manager) Status(dbPath string) (ds Status, err error) {
if _, err = os.Stat(dbPath); err != nil {
return ds, err
@ -200,19 +195,60 @@ func (s *v3Manager) Status(dbPath string) (ds Status, err error) {
return ds, nil
}
func (s *v3Manager) Restore(dbPath string, cfg RestoreConfig) error {
srv := etcdserver.ServerConfig{
Name: cfg.Name,
InitialClusterToken: cfg.InitialClusterToken,
InitialPeerURLsMap: cfg.InitialCluster,
PeerURLs: cfg.PeerURLs,
// RestoreConfig configures snapshot restore operation.
type RestoreConfig struct {
// SnapshotPath is the path of snapshot file to restore from.
SnapshotPath string
// Name is the human-readable name of this member.
Name string
// OutputDataDir is the target data directory to save restored data.
// OutputDataDir should not conflict with existing etcd data directory.
// If OutputDataDir already exists, it will return an error to prevent
// unintended data directory overwrites.
// If empty, defaults to "[Name].etcd" if not given.
OutputDataDir string
// OutputWALDir is the target WAL data directory.
// If empty, defaults to "[OutputDataDir]/member/wal" if not given.
OutputWALDir string
// PeerURLs is a list of member's peer URLs to advertise to the rest of the cluster.
PeerURLs []string
// InitialCluster is the initial cluster configuration for restore bootstrap.
InitialCluster string
// InitialClusterToken is the initial cluster token for etcd cluster during restore bootstrap.
InitialClusterToken string
// SkipHashCheck is "true" to ignore snapshot integrity hash value
// (required if copied from data directory).
SkipHashCheck bool
}
// Restore restores a new etcd data directory from given snapshot file.
func (s *v3Manager) Restore(cfg RestoreConfig) error {
pURLs, err := types.NewURLs(cfg.PeerURLs)
if err != nil {
return err
}
if err := srv.VerifyBootstrap(); err != nil {
var ics types.URLsMap
ics, err = types.NewURLsMap(cfg.InitialCluster)
if err != nil {
return err
}
var err error
s.cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialCluster)
srv := etcdserver.ServerConfig{
Name: cfg.Name,
PeerURLs: pURLs,
InitialPeerURLsMap: ics,
InitialClusterToken: cfg.InitialClusterToken,
}
if err = srv.VerifyBootstrap(); err != nil {
return err
}
s.cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, ics)
if err != nil {
return err
}
@ -221,44 +257,44 @@ func (s *v3Manager) Restore(dbPath string, cfg RestoreConfig) error {
if dataDir == "" {
dataDir = cfg.Name + ".etcd"
}
if _, err = os.Stat(dataDir); err == nil {
if fileutil.Exist(dataDir) {
return fmt.Errorf("data-dir %q exists", dataDir)
}
walDir := cfg.OutputWALDir
if walDir == "" {
walDir = filepath.Join(dataDir, "member", "wal")
} else if _, err = os.Stat(walDir); err == nil {
} else if fileutil.Exist(walDir) {
return fmt.Errorf("wal-dir %q exists", walDir)
}
s.lg.Info(
"restoring snapshot file",
zap.String("path", dbPath),
zap.String("wal-dir", walDir),
zap.String("data-dir", dataDir),
zap.String("snap-dir", s.snapDir),
)
s.name = cfg.Name
s.dbPath = dbPath
s.dbPath = cfg.SnapshotPath
s.walDir = walDir
s.snapDir = filepath.Join(dataDir, "member", "snap")
s.skipHashCheck = cfg.SkipHashCheck
s.lg.Info(
"restoring snapshot",
zap.String("path", s.dbPath),
zap.String("wal-dir", s.walDir),
zap.String("data-dir", dataDir),
zap.String("snap-dir", s.snapDir),
)
if err = s.saveDB(); err != nil {
return err
}
if err = s.saveWALAndSnap(); err != nil {
return err
}
s.lg.Info(
"finished restoring snapshot file",
zap.String("path", dbPath),
zap.String("wal-dir", walDir),
"restored snapshot",
zap.String("path", s.dbPath),
zap.String("wal-dir", s.walDir),
zap.String("data-dir", dataDir),
zap.String("snap-dir", s.snapDir),
)
return nil
}

View File

@ -27,7 +27,6 @@ import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/embed"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/types"
"go.uber.org/zap"
)
@ -52,29 +51,23 @@ func TestSnapshotV3RestoreSingle(t *testing.T) {
cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String())
cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()))
sp := NewV3(nil, zap.NewExample())
err := sp.Restore(dbPath, RestoreConfig{})
if err.Error() != `couldn't find local name "" in the initial cluster configuration` {
t.Fatalf("expected restore error, got %v", err)
sp := NewV3(zap.NewExample())
pss := make([]string, 0, len(pURLs))
for _, p := range pURLs {
pss = append(pss, p.String())
}
var iURLs types.URLsMap
iURLs, err = types.NewURLsMap(cfg.InitialCluster)
if err != nil {
t.Fatal(err)
}
if err = sp.Restore(dbPath, RestoreConfig{
if err := sp.Restore(RestoreConfig{
SnapshotPath: dbPath,
Name: cfg.Name,
OutputDataDir: cfg.Dir,
InitialCluster: iURLs,
InitialCluster: cfg.InitialCluster,
InitialClusterToken: cfg.InitialClusterToken,
PeerURLs: pURLs,
PeerURLs: pss,
}); err != nil {
t.Fatal(err)
}
var srv *embed.Etcd
srv, err = embed.StartEtcd(cfg)
srv, err := embed.StartEtcd(cfg)
if err != nil {
t.Fatal(err)
}
@ -176,10 +169,12 @@ func createSnapshotFile(t *testing.T, kvs []kv) string {
t.Fatalf("failed to start embed.Etcd for creating snapshots")
}
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}})
ccfg := clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}}
cli, err := clientv3.New(ccfg)
if err != nil {
t.Fatal(err)
}
defer cli.Close()
for i := range kvs {
ctx, cancel := context.WithTimeout(context.Background(), testutil.RequestTimeout)
_, err = cli.Put(ctx, kvs[i].k, kvs[i].v)
@ -189,9 +184,9 @@ func createSnapshotFile(t *testing.T, kvs []kv) string {
}
}
sp := NewV3(cli, zap.NewExample())
sp := NewV3(zap.NewExample())
dpPath := filepath.Join(os.TempDir(), fmt.Sprintf("snapshot%d.db", time.Now().Nanosecond()))
if err = sp.Save(context.Background(), dpPath); err != nil {
if err = sp.Save(context.Background(), ccfg, dpPath); err != nil {
t.Fatal(err)
}
@ -214,10 +209,6 @@ func restoreCluster(t *testing.T, clusterN int, dbPath string) (
ics += fmt.Sprintf(",%d=%s", i, pURLs[i].String())
}
ics = ics[1:]
iURLs, err := types.NewURLsMap(ics)
if err != nil {
t.Fatal(err)
}
cfgs := make([]*embed.Config, clusterN)
for i := 0; i < clusterN; i++ {
@ -230,13 +221,14 @@ func restoreCluster(t *testing.T, clusterN int, dbPath string) (
cfg.InitialCluster = ics
cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()+i))
sp := NewV3(nil, zap.NewExample())
if err := sp.Restore(dbPath, RestoreConfig{
sp := NewV3(zap.NewExample())
if err := sp.Restore(RestoreConfig{
SnapshotPath: dbPath,
Name: cfg.Name,
OutputDataDir: cfg.Dir,
InitialCluster: iURLs,
PeerURLs: []string{pURLs[i].String()},
InitialCluster: ics,
InitialClusterToken: cfg.InitialClusterToken,
PeerURLs: types.URLs{pURLs[i]},
}); err != nil {
t.Fatal(err)
}