From a6b6fcf1c438b51fc46145cb0308bf98c24bab32 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 8 Apr 2016 00:58:46 -0700 Subject: [PATCH] etcdserverpb, v3rpc: add Snapshot to Maintenance RPC service --- etcdserver/api/v3rpc/maintenance.go | 37 ++ etcdserver/etcdserverpb/etcdserver.pb.go | 6 +- etcdserver/etcdserverpb/raft_internal.pb.go | 4 +- etcdserver/etcdserverpb/rpc.pb.go | 493 +++++++++++++++++--- etcdserver/etcdserverpb/rpc.proto | 18 + 5 files changed, 487 insertions(+), 71 deletions(-) diff --git a/etcdserver/api/v3rpc/maintenance.go b/etcdserver/api/v3rpc/maintenance.go index 5dae0c452..b455fe98a 100644 --- a/etcdserver/api/v3rpc/maintenance.go +++ b/etcdserver/api/v3rpc/maintenance.go @@ -15,6 +15,8 @@ package v3rpc import ( + "io" + "github.com/coreos/etcd/etcdserver" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/storage/backend" @@ -51,6 +53,41 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe return &pb.DefragmentResponse{}, nil } +func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error { + snap := ms.bg.Backend().Snapshot() + pr, pw := io.Pipe() + + defer pr.Close() + + go func() { + snap.WriteTo(pw) + if err := snap.Close(); err != nil { + plog.Errorf("error closing snapshot (%v)", err) + } + pw.Close() + }() + + br := int64(0) + buf := make([]byte, 32*1024) + sz := snap.Size() + for br < sz { + n, err := io.ReadFull(pr, buf) + if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { + return togRPCError(err) + } + br += int64(n) + resp := &pb.SnapshotResponse{ + RemainingBytes: uint64(sz - br), + Blob: buf[:n], + } + if err = srv.Send(resp); err != nil { + return togRPCError(err) + } + } + + return nil +} + func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) { h, err := ms.bg.Backend().Hash() if err != nil { diff --git a/etcdserver/etcdserverpb/etcdserver.pb.go b/etcdserver/etcdserverpb/etcdserver.pb.go index 6ab9804c6..832168929 100644 --- a/etcdserver/etcdserverpb/etcdserver.pb.go +++ b/etcdserver/etcdserverpb/etcdserver.pb.go @@ -31,6 +31,8 @@ CompactionResponse HashRequest HashResponse + SnapshotRequest + SnapshotResponse WatchRequest WatchCreateRequest WatchCancelRequest @@ -92,10 +94,10 @@ import ( "fmt" proto "github.com/gogo/protobuf/proto" - - math "math" ) +import math "math" + import io "io" // Reference imports to suppress errors if they are not otherwise used. diff --git a/etcdserver/etcdserverpb/raft_internal.pb.go b/etcdserver/etcdserverpb/raft_internal.pb.go index 88e265aa0..66807636f 100644 --- a/etcdserver/etcdserverpb/raft_internal.pb.go +++ b/etcdserver/etcdserverpb/raft_internal.pb.go @@ -8,10 +8,10 @@ import ( "fmt" proto "github.com/gogo/protobuf/proto" - - math "math" ) +import math "math" + import io "io" // Reference imports to suppress errors if they are not otherwise used. diff --git a/etcdserver/etcdserverpb/rpc.pb.go b/etcdserver/etcdserverpb/rpc.pb.go index 7b574020a..1a4b10abd 100644 --- a/etcdserver/etcdserverpb/rpc.pb.go +++ b/etcdserver/etcdserverpb/rpc.pb.go @@ -8,21 +8,20 @@ import ( "fmt" proto "github.com/gogo/protobuf/proto" - - math "math" - - authpb "github.com/coreos/etcd/auth/authpb" - - io "io" ) +import math "math" + import storagepb "github.com/coreos/etcd/storage/storagepb" +import authpb "github.com/coreos/etcd/auth/authpb" import ( context "golang.org/x/net/context" grpc "google.golang.org/grpc" ) +import io "io" + // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal var _ = fmt.Errorf @@ -815,6 +814,34 @@ func (m *HashResponse) GetHeader() *ResponseHeader { return nil } +type SnapshotRequest struct { +} + +func (m *SnapshotRequest) Reset() { *m = SnapshotRequest{} } +func (m *SnapshotRequest) String() string { return proto.CompactTextString(m) } +func (*SnapshotRequest) ProtoMessage() {} + +type SnapshotResponse struct { + // header has the current store information. The first header in the snapshot + // stream indicates the point in time of the snapshot. + Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` + // remaining_bytes is the number of blob bytes to be sent after this message + RemainingBytes uint64 `protobuf:"varint,2,opt,name=remaining_bytes,proto3" json:"remaining_bytes,omitempty"` + // blob has the next chunk of the snapshot in the snapshot stream. + Blob []byte `protobuf:"bytes,3,opt,name=blob,proto3" json:"blob,omitempty"` +} + +func (m *SnapshotResponse) Reset() { *m = SnapshotResponse{} } +func (m *SnapshotResponse) String() string { return proto.CompactTextString(m) } +func (*SnapshotResponse) ProtoMessage() {} + +func (m *SnapshotResponse) GetHeader() *ResponseHeader { + if m != nil { + return m.Header + } + return nil +} + type WatchRequest struct { // Types that are valid to be assigned to RequestUnion: // *WatchRequest_CreateRequest @@ -1614,6 +1641,8 @@ func init() { proto.RegisterType((*CompactionResponse)(nil), "etcdserverpb.CompactionResponse") proto.RegisterType((*HashRequest)(nil), "etcdserverpb.HashRequest") proto.RegisterType((*HashResponse)(nil), "etcdserverpb.HashResponse") + proto.RegisterType((*SnapshotRequest)(nil), "etcdserverpb.SnapshotRequest") + proto.RegisterType((*SnapshotResponse)(nil), "etcdserverpb.SnapshotResponse") proto.RegisterType((*WatchRequest)(nil), "etcdserverpb.WatchRequest") proto.RegisterType((*WatchCreateRequest)(nil), "etcdserverpb.WatchCreateRequest") proto.RegisterType((*WatchCancelRequest)(nil), "etcdserverpb.WatchCancelRequest") @@ -2294,6 +2323,8 @@ type MaintenanceClient interface { // This is designed for testing; do not use this in production when there // are ongoing transactions. Hash(ctx context.Context, in *HashRequest, opts ...grpc.CallOption) (*HashResponse, error) + // Snapshot sends a snapshot of the entire backend + Snapshot(ctx context.Context, in *SnapshotRequest, opts ...grpc.CallOption) (Maintenance_SnapshotClient, error) } type maintenanceClient struct { @@ -2340,6 +2371,38 @@ func (c *maintenanceClient) Hash(ctx context.Context, in *HashRequest, opts ...g return out, nil } +func (c *maintenanceClient) Snapshot(ctx context.Context, in *SnapshotRequest, opts ...grpc.CallOption) (Maintenance_SnapshotClient, error) { + stream, err := grpc.NewClientStream(ctx, &_Maintenance_serviceDesc.Streams[0], c.cc, "/etcdserverpb.Maintenance/Snapshot", opts...) + if err != nil { + return nil, err + } + x := &maintenanceSnapshotClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Maintenance_SnapshotClient interface { + Recv() (*SnapshotResponse, error) + grpc.ClientStream +} + +type maintenanceSnapshotClient struct { + grpc.ClientStream +} + +func (x *maintenanceSnapshotClient) Recv() (*SnapshotResponse, error) { + m := new(SnapshotResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // Server API for Maintenance service type MaintenanceServer interface { @@ -2352,6 +2415,8 @@ type MaintenanceServer interface { // This is designed for testing; do not use this in production when there // are ongoing transactions. Hash(context.Context, *HashRequest) (*HashResponse, error) + // Snapshot sends a snapshot of the entire backend + Snapshot(*SnapshotRequest, Maintenance_SnapshotServer) error } func RegisterMaintenanceServer(s *grpc.Server, srv MaintenanceServer) { @@ -2406,6 +2471,27 @@ func _Maintenance_Hash_Handler(srv interface{}, ctx context.Context, dec func(in return out, nil } +func _Maintenance_Snapshot_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(SnapshotRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(MaintenanceServer).Snapshot(m, &maintenanceSnapshotServer{stream}) +} + +type Maintenance_SnapshotServer interface { + Send(*SnapshotResponse) error + grpc.ServerStream +} + +type maintenanceSnapshotServer struct { + grpc.ServerStream +} + +func (x *maintenanceSnapshotServer) Send(m *SnapshotResponse) error { + return x.ServerStream.SendMsg(m) +} + var _Maintenance_serviceDesc = grpc.ServiceDesc{ ServiceName: "etcdserverpb.Maintenance", HandlerType: (*MaintenanceServer)(nil), @@ -2427,7 +2513,13 @@ var _Maintenance_serviceDesc = grpc.ServiceDesc{ Handler: _Maintenance_Hash_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Snapshot", + Handler: _Maintenance_Snapshot_Handler, + ServerStreams: true, + }, + }, } // Client API for Auth service @@ -3576,6 +3668,65 @@ func (m *HashResponse) MarshalTo(data []byte) (int, error) { return i, nil } +func (m *SnapshotRequest) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *SnapshotRequest) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + +func (m *SnapshotResponse) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *SnapshotResponse) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Header != nil { + data[i] = 0xa + i++ + i = encodeVarintRpc(data, i, uint64(m.Header.Size())) + n16, err := m.Header.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n16 + } + if m.RemainingBytes != 0 { + data[i] = 0x10 + i++ + i = encodeVarintRpc(data, i, uint64(m.RemainingBytes)) + } + if m.Blob != nil { + if len(m.Blob) > 0 { + data[i] = 0x1a + i++ + i = encodeVarintRpc(data, i, uint64(len(m.Blob))) + i += copy(data[i:], m.Blob) + } + } + return i, nil +} + func (m *WatchRequest) Marshal() (data []byte, err error) { size := m.Size() data = make([]byte, size) @@ -3592,11 +3743,11 @@ func (m *WatchRequest) MarshalTo(data []byte) (int, error) { var l int _ = l if m.RequestUnion != nil { - nn16, err := m.RequestUnion.MarshalTo(data[i:]) + nn17, err := m.RequestUnion.MarshalTo(data[i:]) if err != nil { return 0, err } - i += nn16 + i += nn17 } return i, nil } @@ -3607,11 +3758,11 @@ func (m *WatchRequest_CreateRequest) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.CreateRequest.Size())) - n17, err := m.CreateRequest.MarshalTo(data[i:]) + n18, err := m.CreateRequest.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n17 + i += n18 } return i, nil } @@ -3621,11 +3772,11 @@ func (m *WatchRequest_CancelRequest) MarshalTo(data []byte) (int, error) { data[i] = 0x12 i++ i = encodeVarintRpc(data, i, uint64(m.CancelRequest.Size())) - n18, err := m.CancelRequest.MarshalTo(data[i:]) + n19, err := m.CancelRequest.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n18 + i += n19 } return i, nil } @@ -3720,11 +3871,11 @@ func (m *WatchResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n19, err := m.Header.MarshalTo(data[i:]) + n20, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n19 + i += n20 } if m.WatchId != 0 { data[i] = 0x10 @@ -3818,11 +3969,11 @@ func (m *LeaseGrantResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n20, err := m.Header.MarshalTo(data[i:]) + n21, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n20 + i += n21 } if m.ID != 0 { data[i] = 0x10 @@ -3885,11 +4036,11 @@ func (m *LeaseRevokeResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n21, err := m.Header.MarshalTo(data[i:]) + n22, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n21 + i += n22 } return i, nil } @@ -3936,11 +4087,11 @@ func (m *LeaseKeepAliveResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n22, err := m.Header.MarshalTo(data[i:]) + n23, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n22 + i += n23 } if m.ID != 0 { data[i] = 0x10 @@ -4076,21 +4227,21 @@ func (m *MemberAddResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n23, err := m.Header.MarshalTo(data[i:]) + n24, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n23 + i += n24 } if m.Member != nil { data[i] = 0x12 i++ i = encodeVarintRpc(data, i, uint64(m.Member.Size())) - n24, err := m.Member.MarshalTo(data[i:]) + n25, err := m.Member.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n24 + i += n25 } return i, nil } @@ -4137,11 +4288,11 @@ func (m *MemberRemoveResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n25, err := m.Header.MarshalTo(data[i:]) + n26, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n25 + i += n26 } return i, nil } @@ -4203,11 +4354,11 @@ func (m *MemberUpdateResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n26, err := m.Header.MarshalTo(data[i:]) + n27, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n26 + i += n27 } return i, nil } @@ -4249,11 +4400,11 @@ func (m *MemberListResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n27, err := m.Header.MarshalTo(data[i:]) + n28, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n27 + i += n28 } if len(m.Members) > 0 { for _, msg := range m.Members { @@ -4307,11 +4458,11 @@ func (m *DefragmentResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n28, err := m.Header.MarshalTo(data[i:]) + n29, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n28 + i += n29 } return i, nil } @@ -4396,11 +4547,11 @@ func (m *AlarmResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n29, err := m.Header.MarshalTo(data[i:]) + n30, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n29 + i += n30 } if len(m.Alarms) > 0 { for _, msg := range m.Alarms { @@ -4454,11 +4605,11 @@ func (m *StatusResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n30, err := m.Header.MarshalTo(data[i:]) + n31, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n30 + i += n31 } if len(m.Version) > 0 { data[i] = 0x12 @@ -4746,11 +4897,11 @@ func (m *AuthRoleGrantRequest) MarshalTo(data []byte) (int, error) { data[i] = 0x12 i++ i = encodeVarintRpc(data, i, uint64(m.Perm.Size())) - n31, err := m.Perm.MarshalTo(data[i:]) + n32, err := m.Perm.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n31 + i += n32 } return i, nil } @@ -4792,11 +4943,11 @@ func (m *AuthEnableResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n32, err := m.Header.MarshalTo(data[i:]) + n33, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n32 + i += n33 } return i, nil } @@ -4820,11 +4971,11 @@ func (m *AuthDisableResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n33, err := m.Header.MarshalTo(data[i:]) + n34, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n33 + i += n34 } return i, nil } @@ -4848,11 +4999,11 @@ func (m *AuthenticateResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n34, err := m.Header.MarshalTo(data[i:]) + n35, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n34 + i += n35 } return i, nil } @@ -4876,11 +5027,11 @@ func (m *AuthUserAddResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n35, err := m.Header.MarshalTo(data[i:]) + n36, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n35 + i += n36 } return i, nil } @@ -4904,11 +5055,11 @@ func (m *AuthUserGetResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n36, err := m.Header.MarshalTo(data[i:]) + n37, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n36 + i += n37 } return i, nil } @@ -4932,11 +5083,11 @@ func (m *AuthUserDeleteResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n37, err := m.Header.MarshalTo(data[i:]) + n38, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n37 + i += n38 } return i, nil } @@ -4960,11 +5111,11 @@ func (m *AuthUserChangePasswordResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n38, err := m.Header.MarshalTo(data[i:]) + n39, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n38 + i += n39 } return i, nil } @@ -4988,11 +5139,11 @@ func (m *AuthUserGrantResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n39, err := m.Header.MarshalTo(data[i:]) + n40, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n39 + i += n40 } return i, nil } @@ -5016,11 +5167,11 @@ func (m *AuthUserRevokeResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n40, err := m.Header.MarshalTo(data[i:]) + n41, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n40 + i += n41 } return i, nil } @@ -5044,11 +5195,11 @@ func (m *AuthRoleAddResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n41, err := m.Header.MarshalTo(data[i:]) + n42, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n41 + i += n42 } return i, nil } @@ -5072,11 +5223,11 @@ func (m *AuthRoleGetResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n42, err := m.Header.MarshalTo(data[i:]) + n43, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n42 + i += n43 } return i, nil } @@ -5100,11 +5251,11 @@ func (m *AuthRoleDeleteResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n43, err := m.Header.MarshalTo(data[i:]) + n44, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n43 + i += n44 } return i, nil } @@ -5128,11 +5279,11 @@ func (m *AuthRoleGrantResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n44, err := m.Header.MarshalTo(data[i:]) + n45, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n44 + i += n45 } return i, nil } @@ -5156,11 +5307,11 @@ func (m *AuthRoleRevokeResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n45, err := m.Header.MarshalTo(data[i:]) + n46, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n45 + i += n46 } return i, nil } @@ -5528,6 +5679,31 @@ func (m *HashResponse) Size() (n int) { return n } +func (m *SnapshotRequest) Size() (n int) { + var l int + _ = l + return n +} + +func (m *SnapshotResponse) Size() (n int) { + var l int + _ = l + if m.Header != nil { + l = m.Header.Size() + n += 1 + l + sovRpc(uint64(l)) + } + if m.RemainingBytes != 0 { + n += 1 + sovRpc(uint64(m.RemainingBytes)) + } + if m.Blob != nil { + l = len(m.Blob) + if l > 0 { + n += 1 + l + sovRpc(uint64(l)) + } + } + return n +} + func (m *WatchRequest) Size() (n int) { var l int _ = l @@ -8158,6 +8334,189 @@ func (m *HashResponse) Unmarshal(data []byte) error { } return nil } +func (m *SnapshotRequest) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SnapshotRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SnapshotRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipRpc(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SnapshotResponse) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SnapshotResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SnapshotResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Header", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Header == nil { + m.Header = &ResponseHeader{} + } + if err := m.Header.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field RemainingBytes", wireType) + } + m.RemainingBytes = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.RemainingBytes |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Blob", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Blob = append(m.Blob[:0], data[iNdEx:postIndex]...) + if m.Blob == nil { + m.Blob = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRpc(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *WatchRequest) Unmarshal(data []byte) error { l := len(data) iNdEx := 0 diff --git a/etcdserver/etcdserverpb/rpc.proto b/etcdserver/etcdserverpb/rpc.proto index afd578480..5061db2d1 100644 --- a/etcdserver/etcdserverpb/rpc.proto +++ b/etcdserver/etcdserverpb/rpc.proto @@ -85,6 +85,9 @@ service Maintenance { // This is designed for testing; do not use this in production when there // are ongoing transactions. rpc Hash(HashRequest) returns (HashResponse) {} + + // Snapshot sends a snapshot of the entire backend + rpc Snapshot(SnapshotRequest) returns (stream SnapshotResponse) {} } service Auth { @@ -311,6 +314,21 @@ message HashResponse { uint32 hash = 2; } +message SnapshotRequest { +} + +message SnapshotResponse { + // header has the current store information. The first header in the snapshot + // stream indicates the point in time of the snapshot. + ResponseHeader header = 1; + + // remaining_bytes is the number of blob bytes to be sent after this message + uint64 remaining_bytes = 2; + + // blob has the next chunk of the snapshot in the snapshot stream. + bytes blob = 3; +} + message WatchRequest { oneof request_union { WatchCreateRequest create_request = 1;