diff --git a/etcdserver/etcdserverpb/rpc.pb.go b/etcdserver/etcdserverpb/rpc.pb.go index 6d0601022..b82d5a201 100644 --- a/etcdserver/etcdserverpb/rpc.pb.go +++ b/etcdserver/etcdserverpb/rpc.pb.go @@ -128,6 +128,7 @@ func (m *RangeResponse) GetKvs() []*storagepb.KeyValue { type PutRequest struct { Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + Lease int64 `protobuf:"varint,3,opt,name=lease,proto3" json:"lease,omitempty"` } func (m *PutRequest) Reset() { *m = PutRequest{} } @@ -390,6 +391,82 @@ func (m *WatchResponse) GetEvent() *storagepb.Event { return nil } +type LeaseCreateRequest struct { + // advisory ttl in seconds + Ttl int64 `protobuf:"varint,1,opt,name=ttl,proto3" json:"ttl,omitempty"` +} + +func (m *LeaseCreateRequest) Reset() { *m = LeaseCreateRequest{} } +func (m *LeaseCreateRequest) String() string { return proto.CompactTextString(m) } +func (*LeaseCreateRequest) ProtoMessage() {} + +type LeaseCreateResponse struct { + Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` + LeaseId int64 `protobuf:"varint,2,opt,name=lease_id,proto3" json:"lease_id,omitempty"` + // server decided ttl in second + Ttl int64 `protobuf:"varint,3,opt,name=ttl,proto3" json:"ttl,omitempty"` + Error string `protobuf:"bytes,4,opt,name=error,proto3" json:"error,omitempty"` +} + +func (m *LeaseCreateResponse) Reset() { *m = LeaseCreateResponse{} } +func (m *LeaseCreateResponse) String() string { return proto.CompactTextString(m) } +func (*LeaseCreateResponse) ProtoMessage() {} + +func (m *LeaseCreateResponse) GetHeader() *ResponseHeader { + if m != nil { + return m.Header + } + return nil +} + +type LeaseRevokeRequest struct { + LeaseId int64 `protobuf:"varint,1,opt,name=lease_id,proto3" json:"lease_id,omitempty"` +} + +func (m *LeaseRevokeRequest) Reset() { *m = LeaseRevokeRequest{} } +func (m *LeaseRevokeRequest) String() string { return proto.CompactTextString(m) } +func (*LeaseRevokeRequest) ProtoMessage() {} + +type LeaseRevokeResponse struct { + Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` +} + +func (m *LeaseRevokeResponse) Reset() { *m = LeaseRevokeResponse{} } +func (m *LeaseRevokeResponse) String() string { return proto.CompactTextString(m) } +func (*LeaseRevokeResponse) ProtoMessage() {} + +func (m *LeaseRevokeResponse) GetHeader() *ResponseHeader { + if m != nil { + return m.Header + } + return nil +} + +type LeaseKeepAliveRequest struct { + LeaseId int64 `protobuf:"varint,1,opt,name=lease_id,proto3" json:"lease_id,omitempty"` +} + +func (m *LeaseKeepAliveRequest) Reset() { *m = LeaseKeepAliveRequest{} } +func (m *LeaseKeepAliveRequest) String() string { return proto.CompactTextString(m) } +func (*LeaseKeepAliveRequest) ProtoMessage() {} + +type LeaseKeepAliveResponse struct { + Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` + LeaseId int64 `protobuf:"varint,2,opt,name=lease_id,proto3" json:"lease_id,omitempty"` + Ttl int64 `protobuf:"varint,3,opt,name=ttl,proto3" json:"ttl,omitempty"` +} + +func (m *LeaseKeepAliveResponse) Reset() { *m = LeaseKeepAliveResponse{} } +func (m *LeaseKeepAliveResponse) String() string { return proto.CompactTextString(m) } +func (*LeaseKeepAliveResponse) ProtoMessage() {} + +func (m *LeaseKeepAliveResponse) GetHeader() *ResponseHeader { + if m != nil { + return m.Header + } + return nil +} + func init() { proto.RegisterEnum("etcdserverpb.Compare_CompareResult", Compare_CompareResult_name, Compare_CompareResult_value) proto.RegisterEnum("etcdserverpb.Compare_CompareTarget", Compare_CompareTarget_name, Compare_CompareTarget_value) @@ -415,6 +492,7 @@ type KVClient interface { // Txn processes all the requests in one transaction. // A txn request increases the revision of the store, // and generates events with the same revision in the event history. + // It is not allowed to modify the same key several times within one txn. Txn(ctx context.Context, in *TxnRequest, opts ...grpc.CallOption) (*TxnResponse, error) // Compact compacts the event history in etcd. User should compact the // event history periodically, or it will grow infinitely. @@ -490,6 +568,7 @@ type KVServer interface { // Txn processes all the requests in one transaction. // A txn request increases the revision of the store, // and generates events with the same revision in the event history. + // It is not allowed to modify the same key several times within one txn. Txn(context.Context, *TxnRequest) (*TxnResponse, error) // Compact compacts the event history in etcd. User should compact the // event history periodically, or it will grow infinitely. @@ -691,6 +770,168 @@ var _Watch_serviceDesc = grpc.ServiceDesc{ }, } +// Client API for Lease service + +type LeaseClient interface { + // LeaseCreate creates a lease. A lease has a TTL. The lease will expire if the + // server does not receive a keepAlive within TTL from the lease holder. + // All keys attached to the lease will be expired and deleted if the lease expires. + // The key expiration generates an event in event history. + LeaseCreate(ctx context.Context, in *LeaseCreateRequest, opts ...grpc.CallOption) (*LeaseCreateResponse, error) + // LeaseRevoke revokes a lease. All the key attached to the lease will be expired and deleted. + LeaseRevoke(ctx context.Context, in *LeaseRevokeRequest, opts ...grpc.CallOption) (*LeaseRevokeResponse, error) + // KeepAlive keeps the lease alive. + LeaseKeepAlive(ctx context.Context, opts ...grpc.CallOption) (Lease_LeaseKeepAliveClient, error) +} + +type leaseClient struct { + cc *grpc.ClientConn +} + +func NewLeaseClient(cc *grpc.ClientConn) LeaseClient { + return &leaseClient{cc} +} + +func (c *leaseClient) LeaseCreate(ctx context.Context, in *LeaseCreateRequest, opts ...grpc.CallOption) (*LeaseCreateResponse, error) { + out := new(LeaseCreateResponse) + err := grpc.Invoke(ctx, "/etcdserverpb.Lease/LeaseCreate", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *leaseClient) LeaseRevoke(ctx context.Context, in *LeaseRevokeRequest, opts ...grpc.CallOption) (*LeaseRevokeResponse, error) { + out := new(LeaseRevokeResponse) + err := grpc.Invoke(ctx, "/etcdserverpb.Lease/LeaseRevoke", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *leaseClient) LeaseKeepAlive(ctx context.Context, opts ...grpc.CallOption) (Lease_LeaseKeepAliveClient, error) { + stream, err := grpc.NewClientStream(ctx, &_Lease_serviceDesc.Streams[0], c.cc, "/etcdserverpb.Lease/LeaseKeepAlive", opts...) + if err != nil { + return nil, err + } + x := &leaseLeaseKeepAliveClient{stream} + return x, nil +} + +type Lease_LeaseKeepAliveClient interface { + Send(*LeaseKeepAliveRequest) error + Recv() (*LeaseKeepAliveResponse, error) + grpc.ClientStream +} + +type leaseLeaseKeepAliveClient struct { + grpc.ClientStream +} + +func (x *leaseLeaseKeepAliveClient) Send(m *LeaseKeepAliveRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *leaseLeaseKeepAliveClient) Recv() (*LeaseKeepAliveResponse, error) { + m := new(LeaseKeepAliveResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// Server API for Lease service + +type LeaseServer interface { + // LeaseCreate creates a lease. A lease has a TTL. The lease will expire if the + // server does not receive a keepAlive within TTL from the lease holder. + // All keys attached to the lease will be expired and deleted if the lease expires. + // The key expiration generates an event in event history. + LeaseCreate(context.Context, *LeaseCreateRequest) (*LeaseCreateResponse, error) + // LeaseRevoke revokes a lease. All the key attached to the lease will be expired and deleted. + LeaseRevoke(context.Context, *LeaseRevokeRequest) (*LeaseRevokeResponse, error) + // KeepAlive keeps the lease alive. + LeaseKeepAlive(Lease_LeaseKeepAliveServer) error +} + +func RegisterLeaseServer(s *grpc.Server, srv LeaseServer) { + s.RegisterService(&_Lease_serviceDesc, srv) +} + +func _Lease_LeaseCreate_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) { + in := new(LeaseCreateRequest) + if err := codec.Unmarshal(buf, in); err != nil { + return nil, err + } + out, err := srv.(LeaseServer).LeaseCreate(ctx, in) + if err != nil { + return nil, err + } + return out, nil +} + +func _Lease_LeaseRevoke_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) { + in := new(LeaseRevokeRequest) + if err := codec.Unmarshal(buf, in); err != nil { + return nil, err + } + out, err := srv.(LeaseServer).LeaseRevoke(ctx, in) + if err != nil { + return nil, err + } + return out, nil +} + +func _Lease_LeaseKeepAlive_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(LeaseServer).LeaseKeepAlive(&leaseLeaseKeepAliveServer{stream}) +} + +type Lease_LeaseKeepAliveServer interface { + Send(*LeaseKeepAliveResponse) error + Recv() (*LeaseKeepAliveRequest, error) + grpc.ServerStream +} + +type leaseLeaseKeepAliveServer struct { + grpc.ServerStream +} + +func (x *leaseLeaseKeepAliveServer) Send(m *LeaseKeepAliveResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *leaseLeaseKeepAliveServer) Recv() (*LeaseKeepAliveRequest, error) { + m := new(LeaseKeepAliveRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +var _Lease_serviceDesc = grpc.ServiceDesc{ + ServiceName: "etcdserverpb.Lease", + HandlerType: (*LeaseServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "LeaseCreate", + Handler: _Lease_LeaseCreate_Handler, + }, + { + MethodName: "LeaseRevoke", + Handler: _Lease_LeaseRevoke_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "LeaseKeepAlive", + Handler: _Lease_LeaseKeepAlive_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, +} + func (m *ResponseHeader) Marshal() (data []byte, err error) { size := m.Size() data = make([]byte, size) @@ -854,6 +1095,11 @@ func (m *PutRequest) MarshalTo(data []byte) (int, error) { i += copy(data[i:], m.Value) } } + if m.Lease != 0 { + data[i] = 0x18 + i++ + i = encodeVarintRpc(data, i, uint64(m.Lease)) + } return i, nil } @@ -1334,6 +1580,185 @@ func (m *WatchResponse) MarshalTo(data []byte) (int, error) { return i, nil } +func (m *LeaseCreateRequest) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *LeaseCreateRequest) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Ttl != 0 { + data[i] = 0x8 + i++ + i = encodeVarintRpc(data, i, uint64(m.Ttl)) + } + return i, nil +} + +func (m *LeaseCreateResponse) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *LeaseCreateResponse) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Header != nil { + data[i] = 0xa + i++ + i = encodeVarintRpc(data, i, uint64(m.Header.Size())) + n14, err := m.Header.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n14 + } + if m.LeaseId != 0 { + data[i] = 0x10 + i++ + i = encodeVarintRpc(data, i, uint64(m.LeaseId)) + } + if m.Ttl != 0 { + data[i] = 0x18 + i++ + i = encodeVarintRpc(data, i, uint64(m.Ttl)) + } + if len(m.Error) > 0 { + data[i] = 0x22 + i++ + i = encodeVarintRpc(data, i, uint64(len(m.Error))) + i += copy(data[i:], m.Error) + } + return i, nil +} + +func (m *LeaseRevokeRequest) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *LeaseRevokeRequest) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.LeaseId != 0 { + data[i] = 0x8 + i++ + i = encodeVarintRpc(data, i, uint64(m.LeaseId)) + } + return i, nil +} + +func (m *LeaseRevokeResponse) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *LeaseRevokeResponse) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Header != nil { + data[i] = 0xa + i++ + i = encodeVarintRpc(data, i, uint64(m.Header.Size())) + n15, err := m.Header.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n15 + } + return i, nil +} + +func (m *LeaseKeepAliveRequest) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *LeaseKeepAliveRequest) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.LeaseId != 0 { + data[i] = 0x8 + i++ + i = encodeVarintRpc(data, i, uint64(m.LeaseId)) + } + return i, nil +} + +func (m *LeaseKeepAliveResponse) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *LeaseKeepAliveResponse) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Header != nil { + data[i] = 0xa + i++ + i = encodeVarintRpc(data, i, uint64(m.Header.Size())) + n16, err := m.Header.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n16 + } + if m.LeaseId != 0 { + data[i] = 0x10 + i++ + i = encodeVarintRpc(data, i, uint64(m.LeaseId)) + } + if m.Ttl != 0 { + data[i] = 0x18 + i++ + i = encodeVarintRpc(data, i, uint64(m.Ttl)) + } + return i, nil +} + func encodeFixed64Rpc(data []byte, offset int, v uint64) int { data[offset] = uint8(v) data[offset+1] = uint8(v >> 8) @@ -1437,6 +1862,9 @@ func (m *PutRequest) Size() (n int) { n += 1 + l + sovRpc(uint64(l)) } } + if m.Lease != 0 { + n += 1 + sovRpc(uint64(m.Lease)) + } return n } @@ -1644,6 +2072,79 @@ func (m *WatchResponse) Size() (n int) { return n } +func (m *LeaseCreateRequest) Size() (n int) { + var l int + _ = l + if m.Ttl != 0 { + n += 1 + sovRpc(uint64(m.Ttl)) + } + return n +} + +func (m *LeaseCreateResponse) Size() (n int) { + var l int + _ = l + if m.Header != nil { + l = m.Header.Size() + n += 1 + l + sovRpc(uint64(l)) + } + if m.LeaseId != 0 { + n += 1 + sovRpc(uint64(m.LeaseId)) + } + if m.Ttl != 0 { + n += 1 + sovRpc(uint64(m.Ttl)) + } + l = len(m.Error) + if l > 0 { + n += 1 + l + sovRpc(uint64(l)) + } + return n +} + +func (m *LeaseRevokeRequest) Size() (n int) { + var l int + _ = l + if m.LeaseId != 0 { + n += 1 + sovRpc(uint64(m.LeaseId)) + } + return n +} + +func (m *LeaseRevokeResponse) Size() (n int) { + var l int + _ = l + if m.Header != nil { + l = m.Header.Size() + n += 1 + l + sovRpc(uint64(l)) + } + return n +} + +func (m *LeaseKeepAliveRequest) Size() (n int) { + var l int + _ = l + if m.LeaseId != 0 { + n += 1 + sovRpc(uint64(m.LeaseId)) + } + return n +} + +func (m *LeaseKeepAliveResponse) Size() (n int) { + var l int + _ = l + if m.Header != nil { + l = m.Header.Size() + n += 1 + l + sovRpc(uint64(l)) + } + if m.LeaseId != 0 { + n += 1 + sovRpc(uint64(m.LeaseId)) + } + if m.Ttl != 0 { + n += 1 + sovRpc(uint64(m.Ttl)) + } + return n +} + func sovRpc(x uint64) (n int) { for { n++ @@ -2082,6 +2583,22 @@ func (m *PutRequest) Unmarshal(data []byte) error { } m.Value = append([]byte{}, data[iNdEx:postIndex]...) iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Lease", wireType) + } + m.Lease = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Lease |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } default: var sizeOfWire int for { @@ -3399,6 +3916,504 @@ func (m *WatchResponse) Unmarshal(data []byte) error { return nil } +func (m *LeaseCreateRequest) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Ttl", wireType) + } + m.Ttl = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Ttl |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipRpc(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + return nil +} +func (m *LeaseCreateResponse) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Header", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Header == nil { + m.Header = &ResponseHeader{} + } + if err := m.Header.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field LeaseId", wireType) + } + m.LeaseId = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.LeaseId |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Ttl", wireType) + } + m.Ttl = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Ttl |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Error", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Error = string(data[iNdEx:postIndex]) + iNdEx = postIndex + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipRpc(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + return nil +} +func (m *LeaseRevokeRequest) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field LeaseId", wireType) + } + m.LeaseId = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.LeaseId |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipRpc(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + return nil +} +func (m *LeaseRevokeResponse) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Header", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Header == nil { + m.Header = &ResponseHeader{} + } + if err := m.Header.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipRpc(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + return nil +} +func (m *LeaseKeepAliveRequest) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field LeaseId", wireType) + } + m.LeaseId = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.LeaseId |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipRpc(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + return nil +} +func (m *LeaseKeepAliveResponse) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Header", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Header == nil { + m.Header = &ResponseHeader{} + } + if err := m.Header.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field LeaseId", wireType) + } + m.LeaseId = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.LeaseId |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Ttl", wireType) + } + m.Ttl = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Ttl |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipRpc(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + return nil +} func skipRpc(data []byte) (n int, err error) { l := len(data) iNdEx := 0 diff --git a/etcdserver/etcdserverpb/rpc.proto b/etcdserver/etcdserverpb/rpc.proto index e509a1318..c09eb547e 100644 --- a/etcdserver/etcdserverpb/rpc.proto +++ b/etcdserver/etcdserverpb/rpc.proto @@ -213,3 +213,34 @@ message WatchResponse { // TODO: support batched events response? storagepb.Event event = 2; } + +message LeaseCreateRequest { + // advisory ttl in seconds + int64 ttl = 1; +} + +message LeaseCreateResponse { + ResponseHeader header = 1; + int64 lease_id = 2; + // server decided ttl in second + int64 ttl = 3; + string error = 4; +} + +message LeaseRevokeRequest { + int64 lease_id = 1; +} + +message LeaseRevokeResponse { + ResponseHeader header = 1; +} + +message LeaseKeepAliveRequest { + int64 lease_id = 1; +} + +message LeaseKeepAliveResponse { + ResponseHeader header = 1; + int64 lease_id = 2; + int64 ttl = 3; +} diff --git a/storage/storagepb/kv.pb.go b/storage/storagepb/kv.pb.go index 0b6453ae3..bdcb565db 100644 --- a/storage/storagepb/kv.pb.go +++ b/storage/storagepb/kv.pb.go @@ -57,6 +57,9 @@ type KeyValue struct { // increases its version. Version int64 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"` Value []byte `protobuf:"bytes,5,opt,name=value,proto3" json:"value,omitempty"` + // lease is the ID of the lease that attached to key. + // When the attached lease expires, the key will be deleted. + Lease int64 `protobuf:"varint,6,opt,name=lease,proto3" json:"lease,omitempty"` } func (m *KeyValue) Reset() { *m = KeyValue{} } @@ -126,6 +129,11 @@ func (m *KeyValue) MarshalTo(data []byte) (int, error) { i += copy(data[i:], m.Value) } } + if m.Lease != 0 { + data[i] = 0x30 + i++ + i = encodeVarintKv(data, i, uint64(m.Lease)) + } return i, nil } @@ -218,6 +226,9 @@ func (m *KeyValue) Size() (n int) { n += 1 + l + sovKv(uint64(l)) } } + if m.Lease != 0 { + n += 1 + sovKv(uint64(m.Lease)) + } return n } @@ -367,6 +378,22 @@ func (m *KeyValue) Unmarshal(data []byte) error { } m.Value = append([]byte{}, data[iNdEx:postIndex]...) iNdEx = postIndex + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Lease", wireType) + } + m.Lease = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Lease |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } default: var sizeOfWire int for {