*: add v3 watch service
parent
154fc8e19c
commit
c160085f44
|
@ -20,11 +20,6 @@ service etcd {
|
|||
// and generates events with the same revision in the event history.
|
||||
rpc Txn(TxnRequest) returns (TxnResponse) {}
|
||||
|
||||
// Watch watches the events happening or happened in etcd. Both input and output
|
||||
// are stream. One watch rpc can watch for multiple ranges and get a stream of
|
||||
// events. The whole events history can be watched unless compacted.
|
||||
rpc WatchRange(stream WatchRangeRequest) returns (stream WatchRangeResponse) {}
|
||||
|
||||
// Compact compacts the event history in etcd. User should compact the
|
||||
// event history periodically, or it will grow infinitely.
|
||||
rpc Compact(CompactionRequest) returns (CompactionResponse) {}
|
||||
|
@ -50,6 +45,14 @@ service etcd {
|
|||
rpc LeaseKeepAlive(stream LeaseKeepAliveRequest) returns (stream LeaseKeepAliveResponse) {}
|
||||
}
|
||||
|
||||
service watch {
|
||||
// Watch watches the events happening or happened. Both input and output
|
||||
// are stream. One watch rpc can watch for multiple keys or prefixs and
|
||||
// get a stream of events. The whole events history can be watched unless
|
||||
// compacted.
|
||||
rpc Watch(stream WatchRequest) returns (stream WatchResponse) {}
|
||||
}
|
||||
|
||||
message ResponseHeader {
|
||||
// an error type message?
|
||||
string error = 1;
|
||||
|
@ -190,21 +193,22 @@ message KeyValue {
|
|||
bytes value = 5;
|
||||
}
|
||||
|
||||
message WatchRangeRequest {
|
||||
// if the range_end is not given, the request returns the key.
|
||||
message WatchRequest {
|
||||
// the key to be watched
|
||||
bytes key = 1;
|
||||
// if the range_end is given, it gets the keys in range [key, range_end).
|
||||
bytes range_end = 2;
|
||||
// the prefix to be watched.
|
||||
bytes prefix = 2;
|
||||
// start_revision is an optional revision (including) to watch from. No start_revision is "now".
|
||||
int64 start_revision = 3;
|
||||
// end_revision is an optional revision (excluding) to end watch. No end_revision is "forever".
|
||||
int64 end_revision = 4;
|
||||
bool progress_notification = 5;
|
||||
// TODO: support Range watch?
|
||||
// TODO: support notification every time interval or revision increase?
|
||||
// TODO: support cancel watch if the server cannot reach with majority?
|
||||
}
|
||||
|
||||
message WatchRangeResponse {
|
||||
message WatchResponse {
|
||||
ResponseHeader header = 1;
|
||||
repeated Event events = 2;
|
||||
// TODO: support batched events response?
|
||||
storagepb.Event event = 2;
|
||||
}
|
||||
|
||||
message Event {
|
||||
|
|
|
@ -353,6 +353,43 @@ func (m *CompactionResponse) GetHeader() *ResponseHeader {
|
|||
return nil
|
||||
}
|
||||
|
||||
type WatchRequest struct {
|
||||
// the key to be watched
|
||||
Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
|
||||
// the prefix to be watched.
|
||||
Prefix []byte `protobuf:"bytes,2,opt,name=prefix,proto3" json:"prefix,omitempty"`
|
||||
// start_revision is an optional revision (including) to watch from. No start_revision is "now".
|
||||
StartRevision int64 `protobuf:"varint,3,opt,name=start_revision,proto3" json:"start_revision,omitempty"`
|
||||
}
|
||||
|
||||
func (m *WatchRequest) Reset() { *m = WatchRequest{} }
|
||||
func (m *WatchRequest) String() string { return proto.CompactTextString(m) }
|
||||
func (*WatchRequest) ProtoMessage() {}
|
||||
|
||||
type WatchResponse struct {
|
||||
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
|
||||
// TODO: support batched events response?
|
||||
Event *storagepb.Event `protobuf:"bytes,2,opt,name=event" json:"event,omitempty"`
|
||||
}
|
||||
|
||||
func (m *WatchResponse) Reset() { *m = WatchResponse{} }
|
||||
func (m *WatchResponse) String() string { return proto.CompactTextString(m) }
|
||||
func (*WatchResponse) ProtoMessage() {}
|
||||
|
||||
func (m *WatchResponse) GetHeader() *ResponseHeader {
|
||||
if m != nil {
|
||||
return m.Header
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *WatchResponse) GetEvent() *storagepb.Event {
|
||||
if m != nil {
|
||||
return m.Event
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterEnum("etcdserverpb.Compare_CompareResult", Compare_CompareResult_name, Compare_CompareResult_value)
|
||||
proto.RegisterEnum("etcdserverpb.Compare_CompareTarget", Compare_CompareTarget_name, Compare_CompareTarget_value)
|
||||
|
@ -551,6 +588,109 @@ var _Etcd_serviceDesc = grpc.ServiceDesc{
|
|||
Streams: []grpc.StreamDesc{},
|
||||
}
|
||||
|
||||
// Client API for Watch service
|
||||
|
||||
type WatchClient interface {
|
||||
// Watch watches the events happening or happened. Both input and output
|
||||
// are stream. One watch rpc can watch for multiple keys or prefixs and
|
||||
// get a stream of events. The whole events history can be watched unless
|
||||
// compacted.
|
||||
Watch(ctx context.Context, opts ...grpc.CallOption) (Watch_WatchClient, error)
|
||||
}
|
||||
|
||||
type watchClient struct {
|
||||
cc *grpc.ClientConn
|
||||
}
|
||||
|
||||
func NewWatchClient(cc *grpc.ClientConn) WatchClient {
|
||||
return &watchClient{cc}
|
||||
}
|
||||
|
||||
func (c *watchClient) Watch(ctx context.Context, opts ...grpc.CallOption) (Watch_WatchClient, error) {
|
||||
stream, err := grpc.NewClientStream(ctx, &_Watch_serviceDesc.Streams[0], c.cc, "/etcdserverpb.watch/Watch", opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
x := &watchWatchClient{stream}
|
||||
return x, nil
|
||||
}
|
||||
|
||||
type Watch_WatchClient interface {
|
||||
Send(*WatchRequest) error
|
||||
Recv() (*WatchResponse, error)
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
type watchWatchClient struct {
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
func (x *watchWatchClient) Send(m *WatchRequest) error {
|
||||
return x.ClientStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func (x *watchWatchClient) Recv() (*WatchResponse, error) {
|
||||
m := new(WatchResponse)
|
||||
if err := x.ClientStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// Server API for Watch service
|
||||
|
||||
type WatchServer interface {
|
||||
// Watch watches the events happening or happened. Both input and output
|
||||
// are stream. One watch rpc can watch for multiple keys or prefixs and
|
||||
// get a stream of events. The whole events history can be watched unless
|
||||
// compacted.
|
||||
Watch(Watch_WatchServer) error
|
||||
}
|
||||
|
||||
func RegisterWatchServer(s *grpc.Server, srv WatchServer) {
|
||||
s.RegisterService(&_Watch_serviceDesc, srv)
|
||||
}
|
||||
|
||||
func _Watch_Watch_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
return srv.(WatchServer).Watch(&watchWatchServer{stream})
|
||||
}
|
||||
|
||||
type Watch_WatchServer interface {
|
||||
Send(*WatchResponse) error
|
||||
Recv() (*WatchRequest, error)
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
type watchWatchServer struct {
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
func (x *watchWatchServer) Send(m *WatchResponse) error {
|
||||
return x.ServerStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func (x *watchWatchServer) Recv() (*WatchRequest, error) {
|
||||
m := new(WatchRequest)
|
||||
if err := x.ServerStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
var _Watch_serviceDesc = grpc.ServiceDesc{
|
||||
ServiceName: "etcdserverpb.watch",
|
||||
HandlerType: (*WatchServer)(nil),
|
||||
Methods: []grpc.MethodDesc{},
|
||||
Streams: []grpc.StreamDesc{
|
||||
{
|
||||
StreamName: "Watch",
|
||||
Handler: _Watch_Watch_Handler,
|
||||
ServerStreams: true,
|
||||
ClientStreams: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func (m *ResponseHeader) Marshal() (data []byte, err error) {
|
||||
size := m.Size()
|
||||
data = make([]byte, size)
|
||||
|
@ -1117,6 +1257,83 @@ func (m *CompactionResponse) MarshalTo(data []byte) (int, error) {
|
|||
return i, nil
|
||||
}
|
||||
|
||||
func (m *WatchRequest) Marshal() (data []byte, err error) {
|
||||
size := m.Size()
|
||||
data = make([]byte, size)
|
||||
n, err := m.MarshalTo(data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return data[:n], nil
|
||||
}
|
||||
|
||||
func (m *WatchRequest) MarshalTo(data []byte) (int, error) {
|
||||
var i int
|
||||
_ = i
|
||||
var l int
|
||||
_ = l
|
||||
if m.Key != nil {
|
||||
if len(m.Key) > 0 {
|
||||
data[i] = 0xa
|
||||
i++
|
||||
i = encodeVarintRpc(data, i, uint64(len(m.Key)))
|
||||
i += copy(data[i:], m.Key)
|
||||
}
|
||||
}
|
||||
if m.Prefix != nil {
|
||||
if len(m.Prefix) > 0 {
|
||||
data[i] = 0x12
|
||||
i++
|
||||
i = encodeVarintRpc(data, i, uint64(len(m.Prefix)))
|
||||
i += copy(data[i:], m.Prefix)
|
||||
}
|
||||
}
|
||||
if m.StartRevision != 0 {
|
||||
data[i] = 0x18
|
||||
i++
|
||||
i = encodeVarintRpc(data, i, uint64(m.StartRevision))
|
||||
}
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func (m *WatchResponse) Marshal() (data []byte, err error) {
|
||||
size := m.Size()
|
||||
data = make([]byte, size)
|
||||
n, err := m.MarshalTo(data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return data[:n], nil
|
||||
}
|
||||
|
||||
func (m *WatchResponse) MarshalTo(data []byte) (int, error) {
|
||||
var i int
|
||||
_ = i
|
||||
var l int
|
||||
_ = l
|
||||
if m.Header != nil {
|
||||
data[i] = 0xa
|
||||
i++
|
||||
i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
|
||||
n12, err := m.Header.MarshalTo(data[i:])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
i += n12
|
||||
}
|
||||
if m.Event != nil {
|
||||
data[i] = 0x12
|
||||
i++
|
||||
i = encodeVarintRpc(data, i, uint64(m.Event.Size()))
|
||||
n13, err := m.Event.MarshalTo(data[i:])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
i += n13
|
||||
}
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func encodeFixed64Rpc(data []byte, offset int, v uint64) int {
|
||||
data[offset] = uint8(v)
|
||||
data[offset+1] = uint8(v >> 8)
|
||||
|
@ -1392,6 +1609,41 @@ func (m *CompactionResponse) Size() (n int) {
|
|||
return n
|
||||
}
|
||||
|
||||
func (m *WatchRequest) Size() (n int) {
|
||||
var l int
|
||||
_ = l
|
||||
if m.Key != nil {
|
||||
l = len(m.Key)
|
||||
if l > 0 {
|
||||
n += 1 + l + sovRpc(uint64(l))
|
||||
}
|
||||
}
|
||||
if m.Prefix != nil {
|
||||
l = len(m.Prefix)
|
||||
if l > 0 {
|
||||
n += 1 + l + sovRpc(uint64(l))
|
||||
}
|
||||
}
|
||||
if m.StartRevision != 0 {
|
||||
n += 1 + sovRpc(uint64(m.StartRevision))
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func (m *WatchResponse) Size() (n int) {
|
||||
var l int
|
||||
_ = l
|
||||
if m.Header != nil {
|
||||
l = m.Header.Size()
|
||||
n += 1 + l + sovRpc(uint64(l))
|
||||
}
|
||||
if m.Event != nil {
|
||||
l = m.Event.Size()
|
||||
n += 1 + l + sovRpc(uint64(l))
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func sovRpc(x uint64) (n int) {
|
||||
for {
|
||||
n++
|
||||
|
@ -2931,6 +3183,222 @@ func (m *CompactionResponse) Unmarshal(data []byte) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
func (m *WatchRequest) Unmarshal(data []byte) error {
|
||||
l := len(data)
|
||||
iNdEx := 0
|
||||
for iNdEx < l {
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := data[iNdEx]
|
||||
iNdEx++
|
||||
wire |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
fieldNum := int32(wire >> 3)
|
||||
wireType := int(wire & 0x7)
|
||||
switch fieldNum {
|
||||
case 1:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType)
|
||||
}
|
||||
var byteLen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := data[iNdEx]
|
||||
iNdEx++
|
||||
byteLen |= (int(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if byteLen < 0 {
|
||||
return ErrInvalidLengthRpc
|
||||
}
|
||||
postIndex := iNdEx + byteLen
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.Key = append([]byte{}, data[iNdEx:postIndex]...)
|
||||
iNdEx = postIndex
|
||||
case 2:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Prefix", wireType)
|
||||
}
|
||||
var byteLen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := data[iNdEx]
|
||||
iNdEx++
|
||||
byteLen |= (int(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if byteLen < 0 {
|
||||
return ErrInvalidLengthRpc
|
||||
}
|
||||
postIndex := iNdEx + byteLen
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.Prefix = append([]byte{}, data[iNdEx:postIndex]...)
|
||||
iNdEx = postIndex
|
||||
case 3:
|
||||
if wireType != 0 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field StartRevision", wireType)
|
||||
}
|
||||
m.StartRevision = 0
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := data[iNdEx]
|
||||
iNdEx++
|
||||
m.StartRevision |= (int64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
default:
|
||||
var sizeOfWire int
|
||||
for {
|
||||
sizeOfWire++
|
||||
wire >>= 7
|
||||
if wire == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
iNdEx -= sizeOfWire
|
||||
skippy, err := skipRpc(data[iNdEx:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if skippy < 0 {
|
||||
return ErrInvalidLengthRpc
|
||||
}
|
||||
if (iNdEx + skippy) > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
iNdEx += skippy
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
func (m *WatchResponse) Unmarshal(data []byte) error {
|
||||
l := len(data)
|
||||
iNdEx := 0
|
||||
for iNdEx < l {
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := data[iNdEx]
|
||||
iNdEx++
|
||||
wire |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
fieldNum := int32(wire >> 3)
|
||||
wireType := int(wire & 0x7)
|
||||
switch fieldNum {
|
||||
case 1:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Header", wireType)
|
||||
}
|
||||
var msglen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := data[iNdEx]
|
||||
iNdEx++
|
||||
msglen |= (int(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if msglen < 0 {
|
||||
return ErrInvalidLengthRpc
|
||||
}
|
||||
postIndex := iNdEx + msglen
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
if m.Header == nil {
|
||||
m.Header = &ResponseHeader{}
|
||||
}
|
||||
if err := m.Header.Unmarshal(data[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
iNdEx = postIndex
|
||||
case 2:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Event", wireType)
|
||||
}
|
||||
var msglen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := data[iNdEx]
|
||||
iNdEx++
|
||||
msglen |= (int(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if msglen < 0 {
|
||||
return ErrInvalidLengthRpc
|
||||
}
|
||||
postIndex := iNdEx + msglen
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
if m.Event == nil {
|
||||
m.Event = &storagepb.Event{}
|
||||
}
|
||||
if err := m.Event.Unmarshal(data[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
iNdEx = postIndex
|
||||
default:
|
||||
var sizeOfWire int
|
||||
for {
|
||||
sizeOfWire++
|
||||
wire >>= 7
|
||||
if wire == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
iNdEx -= sizeOfWire
|
||||
skippy, err := skipRpc(data[iNdEx:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if skippy < 0 {
|
||||
return ErrInvalidLengthRpc
|
||||
}
|
||||
if (iNdEx + skippy) > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
iNdEx += skippy
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
func skipRpc(data []byte) (n int, err error) {
|
||||
l := len(data)
|
||||
iNdEx := 0
|
||||
|
|
|
@ -32,6 +32,14 @@ service etcd {
|
|||
rpc Compact(CompactionRequest) returns (CompactionResponse) {}
|
||||
}
|
||||
|
||||
service watch {
|
||||
// Watch watches the events happening or happened. Both input and output
|
||||
// are stream. One watch rpc can watch for multiple keys or prefixs and
|
||||
// get a stream of events. The whole events history can be watched unless
|
||||
// compacted.
|
||||
rpc Watch(stream WatchRequest) returns (stream WatchResponse) {}
|
||||
}
|
||||
|
||||
message ResponseHeader {
|
||||
uint64 cluster_id = 1;
|
||||
uint64 member_id = 2;
|
||||
|
@ -169,3 +177,21 @@ message CompactionRequest {
|
|||
message CompactionResponse {
|
||||
ResponseHeader header = 1;
|
||||
}
|
||||
|
||||
message WatchRequest {
|
||||
// the key to be watched
|
||||
bytes key = 1;
|
||||
// the prefix to be watched.
|
||||
bytes prefix = 2;
|
||||
// start_revision is an optional revision (including) to watch from. No start_revision is "now".
|
||||
int64 start_revision = 3;
|
||||
// TODO: support Range watch?
|
||||
// TODO: support notification every time interval or revision increase?
|
||||
// TODO: support cancel watch if the server cannot reach with majority?
|
||||
}
|
||||
|
||||
message WatchResponse {
|
||||
ResponseHeader header = 1;
|
||||
// TODO: support batched events response?
|
||||
storagepb.Event event = 2;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue