diff --git a/etcdserver/api/v3rpc/grpc.go b/etcdserver/api/v3rpc/grpc.go index 4c79bb009..25d7b384a 100644 --- a/etcdserver/api/v3rpc/grpc.go +++ b/etcdserver/api/v3rpc/grpc.go @@ -37,5 +37,6 @@ func Server(s *etcdserver.EtcdServer, tls *transport.TLSInfo) (*grpc.Server, err pb.RegisterLeaseServer(grpcServer, NewLeaseServer(s)) pb.RegisterClusterServer(grpcServer, NewClusterServer(s)) pb.RegisterAuthServer(grpcServer, NewAuthServer(s)) + pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s)) return grpcServer, nil } diff --git a/etcdserver/api/v3rpc/maintenance.go b/etcdserver/api/v3rpc/maintenance.go new file mode 100644 index 000000000..27cf103ad --- /dev/null +++ b/etcdserver/api/v3rpc/maintenance.go @@ -0,0 +1,45 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v3rpc + +import ( + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/etcdserver" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/storage/backend" +) + +type BackendGetter interface { + Backend() backend.Backend +} + +type maintenanceServer struct { + bg BackendGetter +} + +func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer { + return &maintenanceServer{bg: s} +} + +func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) { + plog.Noticef("starting to defragment the storage backend...") + err := ms.bg.Backend().Defrag() + if err != nil { + plog.Errorf("failed to deframent the storage backend (%v)", err) + return nil, err + } + plog.Noticef("finished defragmenting the storage backend") + return &pb.DefragmentResponse{}, nil +} diff --git a/etcdserver/etcdserverpb/etcdserver.pb.go b/etcdserver/etcdserverpb/etcdserver.pb.go index a560dd0ec..4e5ebb9f6 100644 --- a/etcdserver/etcdserverpb/etcdserver.pb.go +++ b/etcdserver/etcdserverpb/etcdserver.pb.go @@ -50,6 +50,8 @@ MemberUpdateResponse MemberListRequest MemberListResponse + DefragmentRequest + DefragmentResponse AuthEnableRequest AuthDisableRequest AuthenticateRequest diff --git a/etcdserver/etcdserverpb/rpc.pb.go b/etcdserver/etcdserverpb/rpc.pb.go index b461a8755..9d749a9d0 100644 --- a/etcdserver/etcdserverpb/rpc.pb.go +++ b/etcdserver/etcdserverpb/rpc.pb.go @@ -1137,6 +1137,28 @@ func (m *MemberListResponse) GetMembers() []*Member { return nil } +type DefragmentRequest struct { +} + +func (m *DefragmentRequest) Reset() { *m = DefragmentRequest{} } +func (m *DefragmentRequest) String() string { return proto.CompactTextString(m) } +func (*DefragmentRequest) ProtoMessage() {} + +type DefragmentResponse struct { + Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` +} + +func (m *DefragmentResponse) Reset() { *m = DefragmentResponse{} } +func (m *DefragmentResponse) String() string { return proto.CompactTextString(m) } +func (*DefragmentResponse) ProtoMessage() {} + +func (m *DefragmentResponse) GetHeader() *ResponseHeader { + if m != nil { + return m.Header + } + return nil +} + type AuthEnableRequest struct { } @@ -1481,6 +1503,8 @@ func init() { proto.RegisterType((*MemberUpdateResponse)(nil), "etcdserverpb.MemberUpdateResponse") proto.RegisterType((*MemberListRequest)(nil), "etcdserverpb.MemberListRequest") proto.RegisterType((*MemberListResponse)(nil), "etcdserverpb.MemberListResponse") + proto.RegisterType((*DefragmentRequest)(nil), "etcdserverpb.DefragmentRequest") + proto.RegisterType((*DefragmentResponse)(nil), "etcdserverpb.DefragmentResponse") proto.RegisterType((*AuthEnableRequest)(nil), "etcdserverpb.AuthEnableRequest") proto.RegisterType((*AuthDisableRequest)(nil), "etcdserverpb.AuthDisableRequest") proto.RegisterType((*AuthenticateRequest)(nil), "etcdserverpb.AuthenticateRequest") @@ -2154,6 +2178,65 @@ var _Cluster_serviceDesc = grpc.ServiceDesc{ Streams: []grpc.StreamDesc{}, } +// Client API for Maintenance service + +type MaintenanceClient interface { + // TODO: move Hash from kv to Maintenance + Defragment(ctx context.Context, in *DefragmentRequest, opts ...grpc.CallOption) (*DefragmentResponse, error) +} + +type maintenanceClient struct { + cc *grpc.ClientConn +} + +func NewMaintenanceClient(cc *grpc.ClientConn) MaintenanceClient { + return &maintenanceClient{cc} +} + +func (c *maintenanceClient) Defragment(ctx context.Context, in *DefragmentRequest, opts ...grpc.CallOption) (*DefragmentResponse, error) { + out := new(DefragmentResponse) + err := grpc.Invoke(ctx, "/etcdserverpb.Maintenance/Defragment", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for Maintenance service + +type MaintenanceServer interface { + // TODO: move Hash from kv to Maintenance + Defragment(context.Context, *DefragmentRequest) (*DefragmentResponse, error) +} + +func RegisterMaintenanceServer(s *grpc.Server, srv MaintenanceServer) { + s.RegisterService(&_Maintenance_serviceDesc, srv) +} + +func _Maintenance_Defragment_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { + in := new(DefragmentRequest) + if err := dec(in); err != nil { + return nil, err + } + out, err := srv.(MaintenanceServer).Defragment(ctx, in) + if err != nil { + return nil, err + } + return out, nil +} + +var _Maintenance_serviceDesc = grpc.ServiceDesc{ + ServiceName: "etcdserverpb.Maintenance", + HandlerType: (*MaintenanceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Defragment", + Handler: _Maintenance_Defragment_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, +} + // Client API for Auth service type AuthClient interface { @@ -3984,6 +4067,52 @@ func (m *MemberListResponse) MarshalTo(data []byte) (int, error) { return i, nil } +func (m *DefragmentRequest) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *DefragmentRequest) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + +func (m *DefragmentResponse) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *DefragmentResponse) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Header != nil { + data[i] = 0xa + i++ + i = encodeVarintRpc(data, i, uint64(m.Header.Size())) + n28, err := m.Header.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n28 + } + return i, nil +} + func (m *AuthEnableRequest) Marshal() (data []byte, err error) { size := m.Size() data = make([]byte, size) @@ -4255,11 +4384,11 @@ func (m *AuthEnableResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n28, err := m.Header.MarshalTo(data[i:]) + n29, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n28 + i += n29 } return i, nil } @@ -4283,11 +4412,11 @@ func (m *AuthDisableResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n29, err := m.Header.MarshalTo(data[i:]) + n30, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n29 + i += n30 } return i, nil } @@ -4311,11 +4440,11 @@ func (m *AuthenticateResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n30, err := m.Header.MarshalTo(data[i:]) + n31, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n30 + i += n31 } return i, nil } @@ -4339,11 +4468,11 @@ func (m *UserAddResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n31, err := m.Header.MarshalTo(data[i:]) + n32, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n31 + i += n32 } return i, nil } @@ -4367,11 +4496,11 @@ func (m *UserGetResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n32, err := m.Header.MarshalTo(data[i:]) + n33, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n32 + i += n33 } return i, nil } @@ -4395,11 +4524,11 @@ func (m *UserDeleteResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n33, err := m.Header.MarshalTo(data[i:]) + n34, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n33 + i += n34 } return i, nil } @@ -4423,11 +4552,11 @@ func (m *UserChangePasswordResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n34, err := m.Header.MarshalTo(data[i:]) + n35, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n34 + i += n35 } return i, nil } @@ -4451,11 +4580,11 @@ func (m *UserGrantResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n35, err := m.Header.MarshalTo(data[i:]) + n36, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n35 + i += n36 } return i, nil } @@ -4479,11 +4608,11 @@ func (m *UserRevokeResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n36, err := m.Header.MarshalTo(data[i:]) + n37, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n36 + i += n37 } return i, nil } @@ -4507,11 +4636,11 @@ func (m *RoleAddResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n37, err := m.Header.MarshalTo(data[i:]) + n38, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n37 + i += n38 } return i, nil } @@ -4535,11 +4664,11 @@ func (m *RoleGetResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n38, err := m.Header.MarshalTo(data[i:]) + n39, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n38 + i += n39 } return i, nil } @@ -4563,11 +4692,11 @@ func (m *RoleDeleteResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n39, err := m.Header.MarshalTo(data[i:]) + n40, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n39 + i += n40 } return i, nil } @@ -4591,11 +4720,11 @@ func (m *RoleGrantResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n40, err := m.Header.MarshalTo(data[i:]) + n41, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n40 + i += n41 } return i, nil } @@ -4619,11 +4748,11 @@ func (m *RoleRevokeResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n41, err := m.Header.MarshalTo(data[i:]) + n42, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n41 + i += n42 } return i, nil } @@ -5272,6 +5401,22 @@ func (m *MemberListResponse) Size() (n int) { return n } +func (m *DefragmentRequest) Size() (n int) { + var l int + _ = l + return n +} + +func (m *DefragmentResponse) Size() (n int) { + var l int + _ = l + if m.Header != nil { + l = m.Header.Size() + n += 1 + l + sovRpc(uint64(l)) + } + return n +} + func (m *AuthEnableRequest) Size() (n int) { var l int _ = l @@ -9461,6 +9606,139 @@ func (m *MemberListResponse) Unmarshal(data []byte) error { } return nil } +func (m *DefragmentRequest) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DefragmentRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DefragmentRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipRpc(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *DefragmentResponse) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DefragmentResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DefragmentResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Header", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Header == nil { + m.Header = &ResponseHeader{} + } + if err := m.Header.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRpc(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *AuthEnableRequest) Unmarshal(data []byte) error { l := len(data) iNdEx := 0 diff --git a/etcdserver/etcdserverpb/rpc.proto b/etcdserver/etcdserverpb/rpc.proto index 5e22218ba..f0e7f2a62 100644 --- a/etcdserver/etcdserverpb/rpc.proto +++ b/etcdserver/etcdserverpb/rpc.proto @@ -76,6 +76,11 @@ service Cluster { rpc MemberList(MemberListRequest) returns (MemberListResponse) {} } +service Maintenance { + // TODO: move Hash from kv to Maintenance + rpc Defragment(DefragmentRequest) returns (DefragmentResponse) {} +} + service Auth { // AuthEnable enables authentication. rpc AuthEnable(AuthEnableRequest) returns (AuthEnableResponse) {} @@ -425,6 +430,14 @@ message MemberListResponse { repeated Member members = 2; } +message DefragmentRequest { + +} + +message DefragmentResponse { + ResponseHeader header = 1; +} + message AuthEnableRequest { } diff --git a/etcdserver/server.go b/etcdserver/server.go index 418e08715..f57930bdb 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -24,6 +24,7 @@ import ( "os" "path" "regexp" + "sync" "sync/atomic" "time" @@ -173,6 +174,7 @@ type EtcdServer struct { kv dstorage.ConsistentWatchableKV lessor lease.Lessor + bemu sync.Mutex be backend.Backend stats *stats.ServerStats @@ -604,6 +606,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { // Closing old backend might block until all the txns // on the backend are finished. // We do not want to wait on closing the old backend. + s.bemu.Lock() oldbe := s.be go func() { if err := oldbe.Close(); err != nil { @@ -612,6 +615,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { }() s.be = newbe + s.bemu.Unlock() } if err := s.store.Recovery(apply.snapshot.Data); err != nil { plog.Panicf("recovery store error: %v", err) @@ -1313,3 +1317,8 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error { } func (s *EtcdServer) getKV() dstorage.ConsistentWatchableKV { return s.kv } +func (s *EtcdServer) Backend() backend.Backend { + s.bemu.Lock() + defer s.bemu.Unlock() + return s.be +}