diff --git a/clientv3/maintenance.go b/clientv3/maintenance.go index f200cc3b5..75c0df705 100644 --- a/clientv3/maintenance.go +++ b/clientv3/maintenance.go @@ -15,6 +15,7 @@ package clientv3 import ( + "io" "sync" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" @@ -47,6 +48,9 @@ type Maintenance interface { // Status gets the status of the member. Status(ctx context.Context, endpoint string) (*StatusResponse, error) + + // Snapshot provides a reader for a snapshot of a backend. + Snapshot(ctx context.Context) (io.ReadCloser, error) } type maintenance struct { @@ -145,6 +149,33 @@ func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusRespo return (*StatusResponse)(resp), nil } +func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) { + ss, err := m.getRemote().Snapshot(ctx, &pb.SnapshotRequest{}) + if err != nil { + return nil, err + } + + pr, pw := io.Pipe() + go func() { + for { + resp, err := ss.Recv() + if err != nil { + pw.CloseWithError(err) + return + } + if resp == nil && err == nil { + break + } + if _, werr := pw.Write(resp.Blob); werr != nil { + pw.CloseWithError(werr) + return + } + } + pw.Close() + }() + return pr, nil +} + func (m *maintenance) getRemote() pb.MaintenanceClient { m.mu.Lock() defer m.mu.Unlock() diff --git a/etcdctl/ctlv3/command/snapshot_command.go b/etcdctl/ctlv3/command/snapshot_command.go index b6edd5f5e..4ef4a1f84 100644 --- a/etcdctl/ctlv3/command/snapshot_command.go +++ b/etcdctl/ctlv3/command/snapshot_command.go @@ -15,56 +15,81 @@ package command import ( + "encoding/json" "fmt" "io" "os" + "path" + "strings" - "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/clientv3/mirror" - "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/etcdserver/membership" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/storage" + "github.com/coreos/etcd/storage/backend" + "github.com/coreos/etcd/wal" "github.com/spf13/cobra" "golang.org/x/net/context" ) +const ( + defaultName = "default" + defaultInitialAdvertisePeerURLs = "http://localhost:2380,http://localhost:7001" +) + +var ( + restoreCluster string + restoreClusterToken string + restoreDataDir string + restorePeerURLs string + restoreName string +) + // NewSnapshotCommand returns the cobra command for "snapshot". func NewSnapshotCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "snapshot", + Short: "snapshot manages etcd node snapshots.", + } + cmd.AddCommand(NewSnapshotSaveCommand()) + cmd.AddCommand(NewSnapshotRestoreCommand()) + return cmd +} + +func NewSnapshotSaveCommand() *cobra.Command { return &cobra.Command{ - Use: "snapshot [filename]", - Short: "Snapshot streams a point-in-time snapshot of the store", - Run: snapshotCommandFunc, + Use: "save ", + Short: "save stores an etcd node backend snapshot to a given file.", + Run: snapshotSaveCommandFunc, } } -// snapshotCommandFunc watches for the length of the entire store and records -// to a file. -func snapshotCommandFunc(cmd *cobra.Command, args []string) { - switch { - case len(args) == 0: - snapshotToStdout(mustClientFromCmd(cmd)) - case len(args) == 1: - snapshotToFile(mustClientFromCmd(cmd), args[0]) - default: - err := fmt.Errorf("snapshot takes at most one argument") +func NewSnapshotRestoreCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "restore ", + Short: "restore an etcd node snapshot to an etcd directory", + Run: snapshotRestoreCommandFunc, + } + cmd.Flags().StringVar(&restoreDataDir, "data-dir", "", "Path to the data directory.") + cmd.Flags().StringVar(&restoreCluster, "initial-cluster", initialClusterFromName(defaultName), "Initial cluster configuration for restore bootstrap.") + cmd.Flags().StringVar(&restoreClusterToken, "initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during restore bootstrap.") + cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster.") + cmd.Flags().StringVar(&restoreName, "name", defaultName, "Human-readable name for this member.") + + return cmd +} + +func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) { + if len(args) != 1 { + err := fmt.Errorf("snapshot save expects one argument") ExitWithError(ExitBadArgs, err) } -} -// snapshotToStdout streams a snapshot over stdout -func snapshotToStdout(c *clientv3.Client) { - // must explicitly fetch first revision since no retry on stdout - wr := <-c.Watch(context.TODO(), "", clientv3.WithPrefix(), clientv3.WithRev(1)) - if wr.Err() == nil { - wr.CompactRevision = 1 - } - if rev := snapshot(os.Stdout, c, wr.CompactRevision+1); rev != 0 { - err := fmt.Errorf("snapshot interrupted by compaction %v", rev) - ExitWithError(ExitInterrupted, err) - } - os.Stdout.Sync() -} + path := args[0] -// snapshotToFile atomically writes a snapshot to a file -func snapshotToFile(c *clientv3.Client, path string) { partpath := path + ".part" f, err := os.Create(partpath) defer f.Close() @@ -72,56 +97,172 @@ func snapshotToFile(c *clientv3.Client, path string) { exiterr := fmt.Errorf("could not open %s (%v)", partpath, err) ExitWithError(ExitBadArgs, exiterr) } - rev := int64(1) - for rev != 0 { - f.Seek(0, 0) - f.Truncate(0) - rev = snapshot(f, c, rev) + + c := mustClientFromCmd(cmd) + r, serr := c.Snapshot(context.TODO()) + if serr != nil { + os.RemoveAll(partpath) + ExitWithError(ExitInterrupted, serr) } + if _, rerr := io.Copy(f, r); rerr != nil { + os.RemoveAll(partpath) + ExitWithError(ExitInterrupted, rerr) + } + f.Sync() - if err := os.Rename(partpath, path); err != nil { - exiterr := fmt.Errorf("could not rename %s to %s (%v)", partpath, path, err) + + if rerr := os.Rename(partpath, path); rerr != nil { + exiterr := fmt.Errorf("could not rename %s to %s (%v)", partpath, path, rerr) ExitWithError(ExitIO, exiterr) } } -// snapshot reads all of a watcher; returns compaction revision if incomplete -// TODO: stabilize snapshot format -func snapshot(w io.Writer, c *clientv3.Client, rev int64) int64 { - s := mirror.NewSyncer(c, "", rev) - - rc, errc := s.SyncBase(context.TODO()) - - for r := range rc { - for _, kv := range r.Kvs { - fmt.Fprintln(w, kv) - } +func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) { + if len(args) != 1 { + err := fmt.Errorf("snapshot restore exactly one argument") + ExitWithError(ExitBadArgs, err) } - err := <-errc - if err != nil { - if err == rpctypes.ErrCompacted { - // will get correct compact revision on retry - return rev + 1 - } - // failed for some unknown reason, retry on same revision - return rev + urlmap, uerr := types.NewURLsMap(restoreCluster) + if uerr != nil { + ExitWithError(ExitBadArgs, uerr) } - wc := s.SyncUpdates(context.TODO()) - - for wr := range wc { - if wr.Err() != nil { - return wr.CompactRevision - } - for _, ev := range wr.Events { - fmt.Fprintln(w, ev) - } - rev := wr.Events[len(wr.Events)-1].Kv.ModRevision - if rev >= wr.Header.Revision { - break - } + cfg := etcdserver.ServerConfig{ + InitialClusterToken: restoreClusterToken, + InitialPeerURLsMap: urlmap, + PeerURLs: types.MustNewURLs(strings.Split(restorePeerURLs, ",")), + Name: restoreName, + } + if err := cfg.VerifyBootstrap(); err != nil { + ExitWithError(ExitBadArgs, err) } - return 0 + cl, cerr := membership.NewClusterFromURLsMap(restoreClusterToken, urlmap) + if cerr != nil { + ExitWithError(ExitBadArgs, cerr) + } + + basedir := restoreDataDir + if basedir == "" { + basedir = restoreName + ".etcd" + } + + waldir := path.Join(basedir, "member", "wal") + snapdir := path.Join(basedir, "member", "snap") + + if _, err := os.Stat(basedir); err == nil { + ExitWithError(ExitInvalidInput, fmt.Errorf("data-dir %q exists", basedir)) + } + + makeDB(snapdir, args[0]) + makeWAL(waldir, cl) +} + +func initialClusterFromName(name string) string { + n := name + if name == "" { + n = defaultName + } + return fmt.Sprintf("%s=http://localhost:2380,%s=http://localhost:7001", n, n) +} + +// makeWAL creates a WAL for the initial cluster +func makeWAL(waldir string, cl *membership.RaftCluster) { + if err := os.MkdirAll(waldir, 0755); err != nil { + ExitWithError(ExitIO, err) + } + + m := cl.MemberByName(restoreName) + md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(cl.ID())} + metadata, merr := md.Marshal() + if merr != nil { + ExitWithError(ExitInvalidInput, merr) + } + + w, walerr := wal.Create(waldir, metadata) + if walerr != nil { + ExitWithError(ExitIO, walerr) + } + defer w.Close() + + peers := make([]raft.Peer, len(cl.MemberIDs())) + for i, id := range cl.MemberIDs() { + ctx, err := json.Marshal((*cl).Member(id)) + if err != nil { + ExitWithError(ExitInvalidInput, err) + } + peers[i] = raft.Peer{ID: uint64(id), Context: ctx} + } + + ents := make([]raftpb.Entry, len(peers)) + for i, p := range peers { + cc := raftpb.ConfChange{ + Type: raftpb.ConfChangeAddNode, + NodeID: p.ID, + Context: p.Context} + d, err := cc.Marshal() + if err != nil { + ExitWithError(ExitInvalidInput, err) + } + e := raftpb.Entry{ + Type: raftpb.EntryConfChange, + Term: 1, + Index: uint64(i + 1), + Data: d, + } + ents[i] = e + } + + w.Save(raftpb.HardState{ + Term: 1, + Vote: peers[0].ID, + Commit: uint64(len(ents))}, ents) +} + +// initIndex implements ConsistentIndexGetter so the snapshot won't block +// the new raft instance by waiting for a future raft index. +type initIndex struct{} + +func (*initIndex) ConsistentIndex() uint64 { return 1 } + +// makeDB copies the database snapshot to the snapshot directory +func makeDB(snapdir, dbfile string) { + f, ferr := os.OpenFile(dbfile, os.O_RDONLY, 0600) + if ferr != nil { + ExitWithError(ExitInvalidInput, ferr) + } + defer f.Close() + + if err := os.MkdirAll(snapdir, 0755); err != nil { + ExitWithError(ExitIO, err) + } + + dbpath := path.Join(snapdir, "db") + db, dberr := os.OpenFile(dbpath, os.O_WRONLY|os.O_CREATE, 0600) + if dberr != nil { + ExitWithError(ExitIO, dberr) + } + if _, err := io.Copy(db, f); err != nil { + ExitWithError(ExitIO, err) + } + db.Close() + + // update consistentIndex so applies go through on etcdserver despite + // having a new raft instance + be := backend.NewDefaultBackend(dbpath) + s := storage.NewStore(be, nil, &initIndex{}) + id := s.TxnBegin() + btx := be.BatchTx() + del := func(k, v []byte) error { + _, _, err := s.TxnDeleteRange(id, k, nil) + return err + } + // delete stored members from old cluster since using new members + btx.UnsafeForEach([]byte("members"), del) + btx.UnsafeForEach([]byte("members_removed"), del) + // trigger write-out of new consistent index + s.TxnEnd(id) + s.Commit() + s.Close() } diff --git a/etcdserver/api/v3rpc/maintenance.go b/etcdserver/api/v3rpc/maintenance.go index 5dae0c452..b455fe98a 100644 --- a/etcdserver/api/v3rpc/maintenance.go +++ b/etcdserver/api/v3rpc/maintenance.go @@ -15,6 +15,8 @@ package v3rpc import ( + "io" + "github.com/coreos/etcd/etcdserver" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/storage/backend" @@ -51,6 +53,41 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe return &pb.DefragmentResponse{}, nil } +func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error { + snap := ms.bg.Backend().Snapshot() + pr, pw := io.Pipe() + + defer pr.Close() + + go func() { + snap.WriteTo(pw) + if err := snap.Close(); err != nil { + plog.Errorf("error closing snapshot (%v)", err) + } + pw.Close() + }() + + br := int64(0) + buf := make([]byte, 32*1024) + sz := snap.Size() + for br < sz { + n, err := io.ReadFull(pr, buf) + if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { + return togRPCError(err) + } + br += int64(n) + resp := &pb.SnapshotResponse{ + RemainingBytes: uint64(sz - br), + Blob: buf[:n], + } + if err = srv.Send(resp); err != nil { + return togRPCError(err) + } + } + + return nil +} + func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) { h, err := ms.bg.Backend().Hash() if err != nil { diff --git a/etcdserver/etcdserverpb/etcdserver.pb.go b/etcdserver/etcdserverpb/etcdserver.pb.go index 6ab9804c6..832168929 100644 --- a/etcdserver/etcdserverpb/etcdserver.pb.go +++ b/etcdserver/etcdserverpb/etcdserver.pb.go @@ -31,6 +31,8 @@ CompactionResponse HashRequest HashResponse + SnapshotRequest + SnapshotResponse WatchRequest WatchCreateRequest WatchCancelRequest @@ -92,10 +94,10 @@ import ( "fmt" proto "github.com/gogo/protobuf/proto" - - math "math" ) +import math "math" + import io "io" // Reference imports to suppress errors if they are not otherwise used. diff --git a/etcdserver/etcdserverpb/raft_internal.pb.go b/etcdserver/etcdserverpb/raft_internal.pb.go index 0b3a5fed6..3cefe8f3d 100644 --- a/etcdserver/etcdserverpb/raft_internal.pb.go +++ b/etcdserver/etcdserverpb/raft_internal.pb.go @@ -8,10 +8,10 @@ import ( "fmt" proto "github.com/gogo/protobuf/proto" - - math "math" ) +import math "math" + import io "io" // Reference imports to suppress errors if they are not otherwise used. diff --git a/etcdserver/etcdserverpb/rpc.pb.go b/etcdserver/etcdserverpb/rpc.pb.go index c07034146..cd95b9ca7 100644 --- a/etcdserver/etcdserverpb/rpc.pb.go +++ b/etcdserver/etcdserverpb/rpc.pb.go @@ -8,21 +8,20 @@ import ( "fmt" proto "github.com/gogo/protobuf/proto" - - math "math" - - authpb "github.com/coreos/etcd/auth/authpb" - - io "io" ) +import math "math" + import storagepb "github.com/coreos/etcd/storage/storagepb" +import authpb "github.com/coreos/etcd/auth/authpb" import ( context "golang.org/x/net/context" grpc "google.golang.org/grpc" ) +import io "io" + // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal var _ = fmt.Errorf @@ -815,6 +814,34 @@ func (m *HashResponse) GetHeader() *ResponseHeader { return nil } +type SnapshotRequest struct { +} + +func (m *SnapshotRequest) Reset() { *m = SnapshotRequest{} } +func (m *SnapshotRequest) String() string { return proto.CompactTextString(m) } +func (*SnapshotRequest) ProtoMessage() {} + +type SnapshotResponse struct { + // header has the current store information. The first header in the snapshot + // stream indicates the point in time of the snapshot. + Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` + // remaining_bytes is the number of blob bytes to be sent after this message + RemainingBytes uint64 `protobuf:"varint,2,opt,name=remaining_bytes,proto3" json:"remaining_bytes,omitempty"` + // blob has the next chunk of the snapshot in the snapshot stream. + Blob []byte `protobuf:"bytes,3,opt,name=blob,proto3" json:"blob,omitempty"` +} + +func (m *SnapshotResponse) Reset() { *m = SnapshotResponse{} } +func (m *SnapshotResponse) String() string { return proto.CompactTextString(m) } +func (*SnapshotResponse) ProtoMessage() {} + +func (m *SnapshotResponse) GetHeader() *ResponseHeader { + if m != nil { + return m.Header + } + return nil +} + type WatchRequest struct { // Types that are valid to be assigned to RequestUnion: // *WatchRequest_CreateRequest @@ -1616,6 +1643,8 @@ func init() { proto.RegisterType((*CompactionResponse)(nil), "etcdserverpb.CompactionResponse") proto.RegisterType((*HashRequest)(nil), "etcdserverpb.HashRequest") proto.RegisterType((*HashResponse)(nil), "etcdserverpb.HashResponse") + proto.RegisterType((*SnapshotRequest)(nil), "etcdserverpb.SnapshotRequest") + proto.RegisterType((*SnapshotResponse)(nil), "etcdserverpb.SnapshotResponse") proto.RegisterType((*WatchRequest)(nil), "etcdserverpb.WatchRequest") proto.RegisterType((*WatchCreateRequest)(nil), "etcdserverpb.WatchCreateRequest") proto.RegisterType((*WatchCancelRequest)(nil), "etcdserverpb.WatchCancelRequest") @@ -2296,6 +2325,8 @@ type MaintenanceClient interface { // This is designed for testing; do not use this in production when there // are ongoing transactions. Hash(ctx context.Context, in *HashRequest, opts ...grpc.CallOption) (*HashResponse, error) + // Snapshot sends a snapshot of the entire backend + Snapshot(ctx context.Context, in *SnapshotRequest, opts ...grpc.CallOption) (Maintenance_SnapshotClient, error) } type maintenanceClient struct { @@ -2342,6 +2373,38 @@ func (c *maintenanceClient) Hash(ctx context.Context, in *HashRequest, opts ...g return out, nil } +func (c *maintenanceClient) Snapshot(ctx context.Context, in *SnapshotRequest, opts ...grpc.CallOption) (Maintenance_SnapshotClient, error) { + stream, err := grpc.NewClientStream(ctx, &_Maintenance_serviceDesc.Streams[0], c.cc, "/etcdserverpb.Maintenance/Snapshot", opts...) + if err != nil { + return nil, err + } + x := &maintenanceSnapshotClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Maintenance_SnapshotClient interface { + Recv() (*SnapshotResponse, error) + grpc.ClientStream +} + +type maintenanceSnapshotClient struct { + grpc.ClientStream +} + +func (x *maintenanceSnapshotClient) Recv() (*SnapshotResponse, error) { + m := new(SnapshotResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // Server API for Maintenance service type MaintenanceServer interface { @@ -2354,6 +2417,8 @@ type MaintenanceServer interface { // This is designed for testing; do not use this in production when there // are ongoing transactions. Hash(context.Context, *HashRequest) (*HashResponse, error) + // Snapshot sends a snapshot of the entire backend + Snapshot(*SnapshotRequest, Maintenance_SnapshotServer) error } func RegisterMaintenanceServer(s *grpc.Server, srv MaintenanceServer) { @@ -2408,6 +2473,27 @@ func _Maintenance_Hash_Handler(srv interface{}, ctx context.Context, dec func(in return out, nil } +func _Maintenance_Snapshot_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(SnapshotRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(MaintenanceServer).Snapshot(m, &maintenanceSnapshotServer{stream}) +} + +type Maintenance_SnapshotServer interface { + Send(*SnapshotResponse) error + grpc.ServerStream +} + +type maintenanceSnapshotServer struct { + grpc.ServerStream +} + +func (x *maintenanceSnapshotServer) Send(m *SnapshotResponse) error { + return x.ServerStream.SendMsg(m) +} + var _Maintenance_serviceDesc = grpc.ServiceDesc{ ServiceName: "etcdserverpb.Maintenance", HandlerType: (*MaintenanceServer)(nil), @@ -2429,7 +2515,13 @@ var _Maintenance_serviceDesc = grpc.ServiceDesc{ Handler: _Maintenance_Hash_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Snapshot", + Handler: _Maintenance_Snapshot_Handler, + ServerStreams: true, + }, + }, } // Client API for Auth service @@ -3578,6 +3670,65 @@ func (m *HashResponse) MarshalTo(data []byte) (int, error) { return i, nil } +func (m *SnapshotRequest) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *SnapshotRequest) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + +func (m *SnapshotResponse) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *SnapshotResponse) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Header != nil { + data[i] = 0xa + i++ + i = encodeVarintRpc(data, i, uint64(m.Header.Size())) + n16, err := m.Header.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n16 + } + if m.RemainingBytes != 0 { + data[i] = 0x10 + i++ + i = encodeVarintRpc(data, i, uint64(m.RemainingBytes)) + } + if m.Blob != nil { + if len(m.Blob) > 0 { + data[i] = 0x1a + i++ + i = encodeVarintRpc(data, i, uint64(len(m.Blob))) + i += copy(data[i:], m.Blob) + } + } + return i, nil +} + func (m *WatchRequest) Marshal() (data []byte, err error) { size := m.Size() data = make([]byte, size) @@ -3594,11 +3745,11 @@ func (m *WatchRequest) MarshalTo(data []byte) (int, error) { var l int _ = l if m.RequestUnion != nil { - nn16, err := m.RequestUnion.MarshalTo(data[i:]) + nn17, err := m.RequestUnion.MarshalTo(data[i:]) if err != nil { return 0, err } - i += nn16 + i += nn17 } return i, nil } @@ -3609,11 +3760,11 @@ func (m *WatchRequest_CreateRequest) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.CreateRequest.Size())) - n17, err := m.CreateRequest.MarshalTo(data[i:]) + n18, err := m.CreateRequest.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n17 + i += n18 } return i, nil } @@ -3623,11 +3774,11 @@ func (m *WatchRequest_CancelRequest) MarshalTo(data []byte) (int, error) { data[i] = 0x12 i++ i = encodeVarintRpc(data, i, uint64(m.CancelRequest.Size())) - n18, err := m.CancelRequest.MarshalTo(data[i:]) + n19, err := m.CancelRequest.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n18 + i += n19 } return i, nil } @@ -3722,11 +3873,11 @@ func (m *WatchResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n19, err := m.Header.MarshalTo(data[i:]) + n20, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n19 + i += n20 } if m.WatchId != 0 { data[i] = 0x10 @@ -3820,11 +3971,11 @@ func (m *LeaseGrantResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n20, err := m.Header.MarshalTo(data[i:]) + n21, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n20 + i += n21 } if m.ID != 0 { data[i] = 0x10 @@ -3887,11 +4038,11 @@ func (m *LeaseRevokeResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n21, err := m.Header.MarshalTo(data[i:]) + n22, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n21 + i += n22 } return i, nil } @@ -3938,11 +4089,11 @@ func (m *LeaseKeepAliveResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n22, err := m.Header.MarshalTo(data[i:]) + n23, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n22 + i += n23 } if m.ID != 0 { data[i] = 0x10 @@ -4078,21 +4229,21 @@ func (m *MemberAddResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n23, err := m.Header.MarshalTo(data[i:]) + n24, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n23 + i += n24 } if m.Member != nil { data[i] = 0x12 i++ i = encodeVarintRpc(data, i, uint64(m.Member.Size())) - n24, err := m.Member.MarshalTo(data[i:]) + n25, err := m.Member.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n24 + i += n25 } return i, nil } @@ -4139,11 +4290,11 @@ func (m *MemberRemoveResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n25, err := m.Header.MarshalTo(data[i:]) + n26, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n25 + i += n26 } return i, nil } @@ -4205,11 +4356,11 @@ func (m *MemberUpdateResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n26, err := m.Header.MarshalTo(data[i:]) + n27, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n26 + i += n27 } return i, nil } @@ -4251,11 +4402,11 @@ func (m *MemberListResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n27, err := m.Header.MarshalTo(data[i:]) + n28, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n27 + i += n28 } if len(m.Members) > 0 { for _, msg := range m.Members { @@ -4309,11 +4460,11 @@ func (m *DefragmentResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n28, err := m.Header.MarshalTo(data[i:]) + n29, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n28 + i += n29 } return i, nil } @@ -4398,11 +4549,11 @@ func (m *AlarmResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n29, err := m.Header.MarshalTo(data[i:]) + n30, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n29 + i += n30 } if len(m.Alarms) > 0 { for _, msg := range m.Alarms { @@ -4456,11 +4607,11 @@ func (m *StatusResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n30, err := m.Header.MarshalTo(data[i:]) + n31, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n30 + i += n31 } if len(m.Version) > 0 { data[i] = 0x12 @@ -4760,11 +4911,11 @@ func (m *AuthRoleGrantRequest) MarshalTo(data []byte) (int, error) { data[i] = 0x12 i++ i = encodeVarintRpc(data, i, uint64(m.Perm.Size())) - n31, err := m.Perm.MarshalTo(data[i:]) + n32, err := m.Perm.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n31 + i += n32 } return i, nil } @@ -4806,11 +4957,11 @@ func (m *AuthEnableResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n32, err := m.Header.MarshalTo(data[i:]) + n33, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n32 + i += n33 } return i, nil } @@ -4834,11 +4985,11 @@ func (m *AuthDisableResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n33, err := m.Header.MarshalTo(data[i:]) + n34, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n33 + i += n34 } return i, nil } @@ -4862,11 +5013,11 @@ func (m *AuthenticateResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n34, err := m.Header.MarshalTo(data[i:]) + n35, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n34 + i += n35 } return i, nil } @@ -4890,11 +5041,11 @@ func (m *AuthUserAddResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n35, err := m.Header.MarshalTo(data[i:]) + n36, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n35 + i += n36 } return i, nil } @@ -4918,11 +5069,11 @@ func (m *AuthUserGetResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n36, err := m.Header.MarshalTo(data[i:]) + n37, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n36 + i += n37 } return i, nil } @@ -4946,11 +5097,11 @@ func (m *AuthUserDeleteResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n37, err := m.Header.MarshalTo(data[i:]) + n38, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n37 + i += n38 } return i, nil } @@ -4974,11 +5125,11 @@ func (m *AuthUserChangePasswordResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n38, err := m.Header.MarshalTo(data[i:]) + n39, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n38 + i += n39 } return i, nil } @@ -5002,11 +5153,11 @@ func (m *AuthUserGrantResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n39, err := m.Header.MarshalTo(data[i:]) + n40, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n39 + i += n40 } return i, nil } @@ -5030,11 +5181,11 @@ func (m *AuthUserRevokeResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n40, err := m.Header.MarshalTo(data[i:]) + n41, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n40 + i += n41 } return i, nil } @@ -5058,11 +5209,11 @@ func (m *AuthRoleAddResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n41, err := m.Header.MarshalTo(data[i:]) + n42, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n41 + i += n42 } return i, nil } @@ -5086,11 +5237,11 @@ func (m *AuthRoleGetResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n42, err := m.Header.MarshalTo(data[i:]) + n43, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n42 + i += n43 } return i, nil } @@ -5114,11 +5265,11 @@ func (m *AuthRoleDeleteResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n43, err := m.Header.MarshalTo(data[i:]) + n44, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n43 + i += n44 } return i, nil } @@ -5142,11 +5293,11 @@ func (m *AuthRoleGrantResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n44, err := m.Header.MarshalTo(data[i:]) + n45, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n44 + i += n45 } return i, nil } @@ -5170,11 +5321,11 @@ func (m *AuthRoleRevokeResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n45, err := m.Header.MarshalTo(data[i:]) + n46, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n45 + i += n46 } return i, nil } @@ -5542,6 +5693,31 @@ func (m *HashResponse) Size() (n int) { return n } +func (m *SnapshotRequest) Size() (n int) { + var l int + _ = l + return n +} + +func (m *SnapshotResponse) Size() (n int) { + var l int + _ = l + if m.Header != nil { + l = m.Header.Size() + n += 1 + l + sovRpc(uint64(l)) + } + if m.RemainingBytes != 0 { + n += 1 + sovRpc(uint64(m.RemainingBytes)) + } + if m.Blob != nil { + l = len(m.Blob) + if l > 0 { + n += 1 + l + sovRpc(uint64(l)) + } + } + return n +} + func (m *WatchRequest) Size() (n int) { var l int _ = l @@ -8180,6 +8356,189 @@ func (m *HashResponse) Unmarshal(data []byte) error { } return nil } +func (m *SnapshotRequest) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SnapshotRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SnapshotRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipRpc(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SnapshotResponse) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SnapshotResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SnapshotResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Header", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Header == nil { + m.Header = &ResponseHeader{} + } + if err := m.Header.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field RemainingBytes", wireType) + } + m.RemainingBytes = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.RemainingBytes |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Blob", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Blob = append(m.Blob[:0], data[iNdEx:postIndex]...) + if m.Blob == nil { + m.Blob = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRpc(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *WatchRequest) Unmarshal(data []byte) error { l := len(data) iNdEx := 0 diff --git a/etcdserver/etcdserverpb/rpc.proto b/etcdserver/etcdserverpb/rpc.proto index a36cd19b7..c9737ba7a 100644 --- a/etcdserver/etcdserverpb/rpc.proto +++ b/etcdserver/etcdserverpb/rpc.proto @@ -85,6 +85,9 @@ service Maintenance { // This is designed for testing; do not use this in production when there // are ongoing transactions. rpc Hash(HashRequest) returns (HashResponse) {} + + // Snapshot sends a snapshot of the entire backend + rpc Snapshot(SnapshotRequest) returns (stream SnapshotResponse) {} } service Auth { @@ -311,6 +314,21 @@ message HashResponse { uint32 hash = 2; } +message SnapshotRequest { +} + +message SnapshotResponse { + // header has the current store information. The first header in the snapshot + // stream indicates the point in time of the snapshot. + ResponseHeader header = 1; + + // remaining_bytes is the number of blob bytes to be sent after this message + uint64 remaining_bytes = 2; + + // blob has the next chunk of the snapshot in the snapshot stream. + bytes blob = 3; +} + message WatchRequest { oneof request_union { WatchCreateRequest create_request = 1; diff --git a/storage/backend/batch_tx.go b/storage/backend/batch_tx.go index d59833cd1..75d4b03b6 100644 --- a/storage/backend/batch_tx.go +++ b/storage/backend/batch_tx.go @@ -125,7 +125,12 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { // UnsafeForEach must be called holding the lock on the tx. func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error { - return t.tx.Bucket(bucketName).ForEach(visitor) + b := t.tx.Bucket(bucketName) + if b == nil { + // bucket does not exist + return nil + } + return b.ForEach(visitor) } // Commit commits a previous tx and begins a new writable one.