From c160085f44b15f92dd1bfaadfdbe5c5fb15279c0 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 3 Nov 2015 14:21:24 -0800 Subject: [PATCH] *: add v3 watch service --- Documentation/rfc/v3api.proto | 32 +- etcdserver/etcdserverpb/rpc.pb.go | 468 ++++++++++++++++++++++++++++++ etcdserver/etcdserverpb/rpc.proto | 26 ++ 3 files changed, 512 insertions(+), 14 deletions(-) diff --git a/Documentation/rfc/v3api.proto b/Documentation/rfc/v3api.proto index f6dfe874e..310eb9d2c 100644 --- a/Documentation/rfc/v3api.proto +++ b/Documentation/rfc/v3api.proto @@ -20,11 +20,6 @@ service etcd { // and generates events with the same revision in the event history. rpc Txn(TxnRequest) returns (TxnResponse) {} - // Watch watches the events happening or happened in etcd. Both input and output - // are stream. One watch rpc can watch for multiple ranges and get a stream of - // events. The whole events history can be watched unless compacted. - rpc WatchRange(stream WatchRangeRequest) returns (stream WatchRangeResponse) {} - // Compact compacts the event history in etcd. User should compact the // event history periodically, or it will grow infinitely. rpc Compact(CompactionRequest) returns (CompactionResponse) {} @@ -50,6 +45,14 @@ service etcd { rpc LeaseKeepAlive(stream LeaseKeepAliveRequest) returns (stream LeaseKeepAliveResponse) {} } +service watch { + // Watch watches the events happening or happened. Both input and output + // are stream. One watch rpc can watch for multiple keys or prefixs and + // get a stream of events. The whole events history can be watched unless + // compacted. + rpc Watch(stream WatchRequest) returns (stream WatchResponse) {} +} + message ResponseHeader { // an error type message? string error = 1; @@ -190,21 +193,22 @@ message KeyValue { bytes value = 5; } -message WatchRangeRequest { - // if the range_end is not given, the request returns the key. +message WatchRequest { + // the key to be watched bytes key = 1; - // if the range_end is given, it gets the keys in range [key, range_end). - bytes range_end = 2; + // the prefix to be watched. + bytes prefix = 2; // start_revision is an optional revision (including) to watch from. No start_revision is "now". int64 start_revision = 3; - // end_revision is an optional revision (excluding) to end watch. No end_revision is "forever". - int64 end_revision = 4; - bool progress_notification = 5; + // TODO: support Range watch? + // TODO: support notification every time interval or revision increase? + // TODO: support cancel watch if the server cannot reach with majority? } -message WatchRangeResponse { +message WatchResponse { ResponseHeader header = 1; - repeated Event events = 2; + // TODO: support batched events response? + storagepb.Event event = 2; } message Event { diff --git a/etcdserver/etcdserverpb/rpc.pb.go b/etcdserver/etcdserverpb/rpc.pb.go index 2055a2a97..4e37bd12c 100644 --- a/etcdserver/etcdserverpb/rpc.pb.go +++ b/etcdserver/etcdserverpb/rpc.pb.go @@ -353,6 +353,43 @@ func (m *CompactionResponse) GetHeader() *ResponseHeader { return nil } +type WatchRequest struct { + // the key to be watched + Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + // the prefix to be watched. + Prefix []byte `protobuf:"bytes,2,opt,name=prefix,proto3" json:"prefix,omitempty"` + // start_revision is an optional revision (including) to watch from. No start_revision is "now". + StartRevision int64 `protobuf:"varint,3,opt,name=start_revision,proto3" json:"start_revision,omitempty"` +} + +func (m *WatchRequest) Reset() { *m = WatchRequest{} } +func (m *WatchRequest) String() string { return proto.CompactTextString(m) } +func (*WatchRequest) ProtoMessage() {} + +type WatchResponse struct { + Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` + // TODO: support batched events response? + Event *storagepb.Event `protobuf:"bytes,2,opt,name=event" json:"event,omitempty"` +} + +func (m *WatchResponse) Reset() { *m = WatchResponse{} } +func (m *WatchResponse) String() string { return proto.CompactTextString(m) } +func (*WatchResponse) ProtoMessage() {} + +func (m *WatchResponse) GetHeader() *ResponseHeader { + if m != nil { + return m.Header + } + return nil +} + +func (m *WatchResponse) GetEvent() *storagepb.Event { + if m != nil { + return m.Event + } + return nil +} + func init() { proto.RegisterEnum("etcdserverpb.Compare_CompareResult", Compare_CompareResult_name, Compare_CompareResult_value) proto.RegisterEnum("etcdserverpb.Compare_CompareTarget", Compare_CompareTarget_name, Compare_CompareTarget_value) @@ -551,6 +588,109 @@ var _Etcd_serviceDesc = grpc.ServiceDesc{ Streams: []grpc.StreamDesc{}, } +// Client API for Watch service + +type WatchClient interface { + // Watch watches the events happening or happened. Both input and output + // are stream. One watch rpc can watch for multiple keys or prefixs and + // get a stream of events. The whole events history can be watched unless + // compacted. + Watch(ctx context.Context, opts ...grpc.CallOption) (Watch_WatchClient, error) +} + +type watchClient struct { + cc *grpc.ClientConn +} + +func NewWatchClient(cc *grpc.ClientConn) WatchClient { + return &watchClient{cc} +} + +func (c *watchClient) Watch(ctx context.Context, opts ...grpc.CallOption) (Watch_WatchClient, error) { + stream, err := grpc.NewClientStream(ctx, &_Watch_serviceDesc.Streams[0], c.cc, "/etcdserverpb.watch/Watch", opts...) + if err != nil { + return nil, err + } + x := &watchWatchClient{stream} + return x, nil +} + +type Watch_WatchClient interface { + Send(*WatchRequest) error + Recv() (*WatchResponse, error) + grpc.ClientStream +} + +type watchWatchClient struct { + grpc.ClientStream +} + +func (x *watchWatchClient) Send(m *WatchRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *watchWatchClient) Recv() (*WatchResponse, error) { + m := new(WatchResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// Server API for Watch service + +type WatchServer interface { + // Watch watches the events happening or happened. Both input and output + // are stream. One watch rpc can watch for multiple keys or prefixs and + // get a stream of events. The whole events history can be watched unless + // compacted. + Watch(Watch_WatchServer) error +} + +func RegisterWatchServer(s *grpc.Server, srv WatchServer) { + s.RegisterService(&_Watch_serviceDesc, srv) +} + +func _Watch_Watch_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(WatchServer).Watch(&watchWatchServer{stream}) +} + +type Watch_WatchServer interface { + Send(*WatchResponse) error + Recv() (*WatchRequest, error) + grpc.ServerStream +} + +type watchWatchServer struct { + grpc.ServerStream +} + +func (x *watchWatchServer) Send(m *WatchResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *watchWatchServer) Recv() (*WatchRequest, error) { + m := new(WatchRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +var _Watch_serviceDesc = grpc.ServiceDesc{ + ServiceName: "etcdserverpb.watch", + HandlerType: (*WatchServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Watch", + Handler: _Watch_Watch_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, +} + func (m *ResponseHeader) Marshal() (data []byte, err error) { size := m.Size() data = make([]byte, size) @@ -1117,6 +1257,83 @@ func (m *CompactionResponse) MarshalTo(data []byte) (int, error) { return i, nil } +func (m *WatchRequest) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *WatchRequest) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Key != nil { + if len(m.Key) > 0 { + data[i] = 0xa + i++ + i = encodeVarintRpc(data, i, uint64(len(m.Key))) + i += copy(data[i:], m.Key) + } + } + if m.Prefix != nil { + if len(m.Prefix) > 0 { + data[i] = 0x12 + i++ + i = encodeVarintRpc(data, i, uint64(len(m.Prefix))) + i += copy(data[i:], m.Prefix) + } + } + if m.StartRevision != 0 { + data[i] = 0x18 + i++ + i = encodeVarintRpc(data, i, uint64(m.StartRevision)) + } + return i, nil +} + +func (m *WatchResponse) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *WatchResponse) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Header != nil { + data[i] = 0xa + i++ + i = encodeVarintRpc(data, i, uint64(m.Header.Size())) + n12, err := m.Header.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n12 + } + if m.Event != nil { + data[i] = 0x12 + i++ + i = encodeVarintRpc(data, i, uint64(m.Event.Size())) + n13, err := m.Event.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n13 + } + return i, nil +} + func encodeFixed64Rpc(data []byte, offset int, v uint64) int { data[offset] = uint8(v) data[offset+1] = uint8(v >> 8) @@ -1392,6 +1609,41 @@ func (m *CompactionResponse) Size() (n int) { return n } +func (m *WatchRequest) Size() (n int) { + var l int + _ = l + if m.Key != nil { + l = len(m.Key) + if l > 0 { + n += 1 + l + sovRpc(uint64(l)) + } + } + if m.Prefix != nil { + l = len(m.Prefix) + if l > 0 { + n += 1 + l + sovRpc(uint64(l)) + } + } + if m.StartRevision != 0 { + n += 1 + sovRpc(uint64(m.StartRevision)) + } + return n +} + +func (m *WatchResponse) Size() (n int) { + var l int + _ = l + if m.Header != nil { + l = m.Header.Size() + n += 1 + l + sovRpc(uint64(l)) + } + if m.Event != nil { + l = m.Event.Size() + n += 1 + l + sovRpc(uint64(l)) + } + return n +} + func sovRpc(x uint64) (n int) { for { n++ @@ -2931,6 +3183,222 @@ func (m *CompactionResponse) Unmarshal(data []byte) error { return nil } +func (m *WatchRequest) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Key = append([]byte{}, data[iNdEx:postIndex]...) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Prefix", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Prefix = append([]byte{}, data[iNdEx:postIndex]...) + iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StartRevision", wireType) + } + m.StartRevision = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.StartRevision |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipRpc(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + return nil +} +func (m *WatchResponse) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Header", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Header == nil { + m.Header = &ResponseHeader{} + } + if err := m.Header.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Event", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Event == nil { + m.Event = &storagepb.Event{} + } + if err := m.Event.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipRpc(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + return nil +} func skipRpc(data []byte) (n int, err error) { l := len(data) iNdEx := 0 diff --git a/etcdserver/etcdserverpb/rpc.proto b/etcdserver/etcdserverpb/rpc.proto index c91d1299d..fcf9771eb 100644 --- a/etcdserver/etcdserverpb/rpc.proto +++ b/etcdserver/etcdserverpb/rpc.proto @@ -32,6 +32,14 @@ service etcd { rpc Compact(CompactionRequest) returns (CompactionResponse) {} } +service watch { + // Watch watches the events happening or happened. Both input and output + // are stream. One watch rpc can watch for multiple keys or prefixs and + // get a stream of events. The whole events history can be watched unless + // compacted. + rpc Watch(stream WatchRequest) returns (stream WatchResponse) {} +} + message ResponseHeader { uint64 cluster_id = 1; uint64 member_id = 2; @@ -169,3 +177,21 @@ message CompactionRequest { message CompactionResponse { ResponseHeader header = 1; } + +message WatchRequest { + // the key to be watched + bytes key = 1; + // the prefix to be watched. + bytes prefix = 2; + // start_revision is an optional revision (including) to watch from. No start_revision is "now". + int64 start_revision = 3; + // TODO: support Range watch? + // TODO: support notification every time interval or revision increase? + // TODO: support cancel watch if the server cannot reach with majority? +} + +message WatchResponse { + ResponseHeader header = 1; + // TODO: support batched events response? + storagepb.Event event = 2; +}