From e838c26f8acbecfe69e164da5ee26edf09e59e27 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 8 Apr 2016 01:35:58 -0700 Subject: [PATCH] etcdctl: use snapshot RPC in snapshot command --- etcdctl/ctlv3/command/snapshot_command.go | 285 ++++++++++++++++------ storage/backend/batch_tx.go | 7 +- 2 files changed, 219 insertions(+), 73 deletions(-) diff --git a/etcdctl/ctlv3/command/snapshot_command.go b/etcdctl/ctlv3/command/snapshot_command.go index b6edd5f5e..4ef4a1f84 100644 --- a/etcdctl/ctlv3/command/snapshot_command.go +++ b/etcdctl/ctlv3/command/snapshot_command.go @@ -15,56 +15,81 @@ package command import ( + "encoding/json" "fmt" "io" "os" + "path" + "strings" - "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/clientv3/mirror" - "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/etcdserver/membership" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/storage" + "github.com/coreos/etcd/storage/backend" + "github.com/coreos/etcd/wal" "github.com/spf13/cobra" "golang.org/x/net/context" ) +const ( + defaultName = "default" + defaultInitialAdvertisePeerURLs = "http://localhost:2380,http://localhost:7001" +) + +var ( + restoreCluster string + restoreClusterToken string + restoreDataDir string + restorePeerURLs string + restoreName string +) + // NewSnapshotCommand returns the cobra command for "snapshot". func NewSnapshotCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "snapshot", + Short: "snapshot manages etcd node snapshots.", + } + cmd.AddCommand(NewSnapshotSaveCommand()) + cmd.AddCommand(NewSnapshotRestoreCommand()) + return cmd +} + +func NewSnapshotSaveCommand() *cobra.Command { return &cobra.Command{ - Use: "snapshot [filename]", - Short: "Snapshot streams a point-in-time snapshot of the store", - Run: snapshotCommandFunc, + Use: "save ", + Short: "save stores an etcd node backend snapshot to a given file.", + Run: snapshotSaveCommandFunc, } } -// snapshotCommandFunc watches for the length of the entire store and records -// to a file. -func snapshotCommandFunc(cmd *cobra.Command, args []string) { - switch { - case len(args) == 0: - snapshotToStdout(mustClientFromCmd(cmd)) - case len(args) == 1: - snapshotToFile(mustClientFromCmd(cmd), args[0]) - default: - err := fmt.Errorf("snapshot takes at most one argument") +func NewSnapshotRestoreCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "restore ", + Short: "restore an etcd node snapshot to an etcd directory", + Run: snapshotRestoreCommandFunc, + } + cmd.Flags().StringVar(&restoreDataDir, "data-dir", "", "Path to the data directory.") + cmd.Flags().StringVar(&restoreCluster, "initial-cluster", initialClusterFromName(defaultName), "Initial cluster configuration for restore bootstrap.") + cmd.Flags().StringVar(&restoreClusterToken, "initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during restore bootstrap.") + cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster.") + cmd.Flags().StringVar(&restoreName, "name", defaultName, "Human-readable name for this member.") + + return cmd +} + +func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) { + if len(args) != 1 { + err := fmt.Errorf("snapshot save expects one argument") ExitWithError(ExitBadArgs, err) } -} -// snapshotToStdout streams a snapshot over stdout -func snapshotToStdout(c *clientv3.Client) { - // must explicitly fetch first revision since no retry on stdout - wr := <-c.Watch(context.TODO(), "", clientv3.WithPrefix(), clientv3.WithRev(1)) - if wr.Err() == nil { - wr.CompactRevision = 1 - } - if rev := snapshot(os.Stdout, c, wr.CompactRevision+1); rev != 0 { - err := fmt.Errorf("snapshot interrupted by compaction %v", rev) - ExitWithError(ExitInterrupted, err) - } - os.Stdout.Sync() -} + path := args[0] -// snapshotToFile atomically writes a snapshot to a file -func snapshotToFile(c *clientv3.Client, path string) { partpath := path + ".part" f, err := os.Create(partpath) defer f.Close() @@ -72,56 +97,172 @@ func snapshotToFile(c *clientv3.Client, path string) { exiterr := fmt.Errorf("could not open %s (%v)", partpath, err) ExitWithError(ExitBadArgs, exiterr) } - rev := int64(1) - for rev != 0 { - f.Seek(0, 0) - f.Truncate(0) - rev = snapshot(f, c, rev) + + c := mustClientFromCmd(cmd) + r, serr := c.Snapshot(context.TODO()) + if serr != nil { + os.RemoveAll(partpath) + ExitWithError(ExitInterrupted, serr) } + if _, rerr := io.Copy(f, r); rerr != nil { + os.RemoveAll(partpath) + ExitWithError(ExitInterrupted, rerr) + } + f.Sync() - if err := os.Rename(partpath, path); err != nil { - exiterr := fmt.Errorf("could not rename %s to %s (%v)", partpath, path, err) + + if rerr := os.Rename(partpath, path); rerr != nil { + exiterr := fmt.Errorf("could not rename %s to %s (%v)", partpath, path, rerr) ExitWithError(ExitIO, exiterr) } } -// snapshot reads all of a watcher; returns compaction revision if incomplete -// TODO: stabilize snapshot format -func snapshot(w io.Writer, c *clientv3.Client, rev int64) int64 { - s := mirror.NewSyncer(c, "", rev) - - rc, errc := s.SyncBase(context.TODO()) - - for r := range rc { - for _, kv := range r.Kvs { - fmt.Fprintln(w, kv) - } +func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) { + if len(args) != 1 { + err := fmt.Errorf("snapshot restore exactly one argument") + ExitWithError(ExitBadArgs, err) } - err := <-errc - if err != nil { - if err == rpctypes.ErrCompacted { - // will get correct compact revision on retry - return rev + 1 - } - // failed for some unknown reason, retry on same revision - return rev + urlmap, uerr := types.NewURLsMap(restoreCluster) + if uerr != nil { + ExitWithError(ExitBadArgs, uerr) } - wc := s.SyncUpdates(context.TODO()) - - for wr := range wc { - if wr.Err() != nil { - return wr.CompactRevision - } - for _, ev := range wr.Events { - fmt.Fprintln(w, ev) - } - rev := wr.Events[len(wr.Events)-1].Kv.ModRevision - if rev >= wr.Header.Revision { - break - } + cfg := etcdserver.ServerConfig{ + InitialClusterToken: restoreClusterToken, + InitialPeerURLsMap: urlmap, + PeerURLs: types.MustNewURLs(strings.Split(restorePeerURLs, ",")), + Name: restoreName, + } + if err := cfg.VerifyBootstrap(); err != nil { + ExitWithError(ExitBadArgs, err) } - return 0 + cl, cerr := membership.NewClusterFromURLsMap(restoreClusterToken, urlmap) + if cerr != nil { + ExitWithError(ExitBadArgs, cerr) + } + + basedir := restoreDataDir + if basedir == "" { + basedir = restoreName + ".etcd" + } + + waldir := path.Join(basedir, "member", "wal") + snapdir := path.Join(basedir, "member", "snap") + + if _, err := os.Stat(basedir); err == nil { + ExitWithError(ExitInvalidInput, fmt.Errorf("data-dir %q exists", basedir)) + } + + makeDB(snapdir, args[0]) + makeWAL(waldir, cl) +} + +func initialClusterFromName(name string) string { + n := name + if name == "" { + n = defaultName + } + return fmt.Sprintf("%s=http://localhost:2380,%s=http://localhost:7001", n, n) +} + +// makeWAL creates a WAL for the initial cluster +func makeWAL(waldir string, cl *membership.RaftCluster) { + if err := os.MkdirAll(waldir, 0755); err != nil { + ExitWithError(ExitIO, err) + } + + m := cl.MemberByName(restoreName) + md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(cl.ID())} + metadata, merr := md.Marshal() + if merr != nil { + ExitWithError(ExitInvalidInput, merr) + } + + w, walerr := wal.Create(waldir, metadata) + if walerr != nil { + ExitWithError(ExitIO, walerr) + } + defer w.Close() + + peers := make([]raft.Peer, len(cl.MemberIDs())) + for i, id := range cl.MemberIDs() { + ctx, err := json.Marshal((*cl).Member(id)) + if err != nil { + ExitWithError(ExitInvalidInput, err) + } + peers[i] = raft.Peer{ID: uint64(id), Context: ctx} + } + + ents := make([]raftpb.Entry, len(peers)) + for i, p := range peers { + cc := raftpb.ConfChange{ + Type: raftpb.ConfChangeAddNode, + NodeID: p.ID, + Context: p.Context} + d, err := cc.Marshal() + if err != nil { + ExitWithError(ExitInvalidInput, err) + } + e := raftpb.Entry{ + Type: raftpb.EntryConfChange, + Term: 1, + Index: uint64(i + 1), + Data: d, + } + ents[i] = e + } + + w.Save(raftpb.HardState{ + Term: 1, + Vote: peers[0].ID, + Commit: uint64(len(ents))}, ents) +} + +// initIndex implements ConsistentIndexGetter so the snapshot won't block +// the new raft instance by waiting for a future raft index. +type initIndex struct{} + +func (*initIndex) ConsistentIndex() uint64 { return 1 } + +// makeDB copies the database snapshot to the snapshot directory +func makeDB(snapdir, dbfile string) { + f, ferr := os.OpenFile(dbfile, os.O_RDONLY, 0600) + if ferr != nil { + ExitWithError(ExitInvalidInput, ferr) + } + defer f.Close() + + if err := os.MkdirAll(snapdir, 0755); err != nil { + ExitWithError(ExitIO, err) + } + + dbpath := path.Join(snapdir, "db") + db, dberr := os.OpenFile(dbpath, os.O_WRONLY|os.O_CREATE, 0600) + if dberr != nil { + ExitWithError(ExitIO, dberr) + } + if _, err := io.Copy(db, f); err != nil { + ExitWithError(ExitIO, err) + } + db.Close() + + // update consistentIndex so applies go through on etcdserver despite + // having a new raft instance + be := backend.NewDefaultBackend(dbpath) + s := storage.NewStore(be, nil, &initIndex{}) + id := s.TxnBegin() + btx := be.BatchTx() + del := func(k, v []byte) error { + _, _, err := s.TxnDeleteRange(id, k, nil) + return err + } + // delete stored members from old cluster since using new members + btx.UnsafeForEach([]byte("members"), del) + btx.UnsafeForEach([]byte("members_removed"), del) + // trigger write-out of new consistent index + s.TxnEnd(id) + s.Commit() + s.Close() } diff --git a/storage/backend/batch_tx.go b/storage/backend/batch_tx.go index d59833cd1..75d4b03b6 100644 --- a/storage/backend/batch_tx.go +++ b/storage/backend/batch_tx.go @@ -125,7 +125,12 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { // UnsafeForEach must be called holding the lock on the tx. func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error { - return t.tx.Bucket(bucketName).ForEach(visitor) + b := t.tx.Bucket(bucketName) + if b == nil { + // bucket does not exist + return nil + } + return b.ForEach(visitor) } // Commit commits a previous tx and begins a new writable one.