From 8dbc6cfd430f6e372fb1fe4b0af6b387ab6f3b41 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 24 Feb 2016 23:46:32 -0800 Subject: [PATCH] etcdserver: ranges in watcher rpc protocol protocol change so watch requests are ranges; server rejects non-prefix ranges --- clientv3/op.go | 21 --------- clientv3/watch.go | 28 ++++++------ etcdserver/api/v3rpc/watch.go | 73 +++++++++++++++++++------------ etcdserver/etcdserverpb/rpc.pb.go | 25 ++++++----- etcdserver/etcdserverpb/rpc.proto | 6 +-- integration/v3_watch_test.go | 22 +++++++--- 6 files changed, 91 insertions(+), 84 deletions(-) diff --git a/clientv3/op.go b/clientv3/op.go index a7b545049..99830eb46 100644 --- a/clientv3/op.go +++ b/clientv3/op.go @@ -69,27 +69,6 @@ func (op Op) toRequestUnion() *pb.RequestUnion { } } -func (op Op) toWatchRequest() *watchRequest { - switch op.t { - case tRange: - key := string(op.key) - prefix := "" - if op.end != nil { - prefix = key - key = "" - } - wr := &watchRequest{ - key: key, - prefix: prefix, - rev: op.rev, - } - return wr - - default: - panic("Only for tRange") - } -} - func (op Op) isWrite() bool { return op.t != tRange } diff --git a/clientv3/watch.go b/clientv3/watch.go index 2a1119de5..0fd45b17e 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -78,10 +78,10 @@ type watcher struct { // watchRequest is issued by the subscriber to start a new watcher type watchRequest struct { - ctx context.Context - key string - prefix string - rev int64 + ctx context.Context + key string + end string + rev int64 // retc receives a chan WatchResponse once the watcher is established retc chan chan WatchResponse } @@ -129,11 +129,14 @@ func NewWatcher(c *Client) Watcher { func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan { ow := opWatch(key, opts...) - wr := ow.toWatchRequest() - wr.ctx = ctx - retc := make(chan chan WatchResponse, 1) - wr.retc = retc + wr := &watchRequest{ + ctx: ctx, + key: string(ow.key), + end: string(ow.end), + rev: ow.rev, + retc: retc, + } ok := false @@ -502,11 +505,10 @@ func (w *watcher) resumeWatchers(wc pb.Watch_WatchClient) error { // toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest) func (wr *watchRequest) toPB() *pb.WatchRequest { - req := &pb.WatchCreateRequest{StartRevision: wr.rev} - if wr.key != "" { - req.Key = []byte(wr.key) - } else { - req.Prefix = []byte(wr.prefix) + req := &pb.WatchCreateRequest{ + StartRevision: wr.rev, + Key: []byte(wr.key), + RangeEnd: []byte(wr.end), } cr := &pb.WatchRequest_CreateRequest{CreateRequest: req} return &pb.WatchRequest{RequestUnion: cr} diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 3a0a9b991..1660732c9 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -16,6 +16,7 @@ package v3rpc import ( "io" + "reflect" "github.com/coreos/etcd/etcdserver" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" @@ -94,35 +95,33 @@ func (sws *serverWatchStream) recvLoop() error { switch uv := req.RequestUnion.(type) { case *pb.WatchRequest_CreateRequest: - if uv.CreateRequest != nil { - creq := uv.CreateRequest - var prefix bool - toWatch := creq.Key - if len(creq.Key) == 0 { - toWatch = creq.Prefix - prefix = true - } + if uv.CreateRequest == nil { + break + } - rev := creq.StartRevision - wsrev := sws.watchStream.Rev() - if rev == 0 { - // rev 0 watches past the current revision - rev = wsrev + 1 - } else if rev > wsrev { // do not allow watching future revision. - sws.ctrlStream <- &pb.WatchResponse{ - Header: sws.newResponseHeader(wsrev), - WatchId: -1, - Created: true, - Canceled: true, - } - continue - } - id := sws.watchStream.Watch(toWatch, prefix, rev) - sws.ctrlStream <- &pb.WatchResponse{ - Header: sws.newResponseHeader(wsrev), - WatchId: int64(id), - Created: true, - } + creq := uv.CreateRequest + toWatch := creq.Key + isPrefix := len(creq.RangeEnd) != 0 + badPrefix := isPrefix && !reflect.DeepEqual(getPrefix(toWatch), creq.RangeEnd) + + rev := creq.StartRevision + wsrev := sws.watchStream.Rev() + futureRev := rev > wsrev + if rev == 0 { + // rev 0 watches past the current revision + rev = wsrev + 1 + } + // do not allow future watch revision + // do not allow range that is not a prefix + id := storage.WatchID(-1) + if !futureRev && !badPrefix { + id = sws.watchStream.Watch(toWatch, isPrefix, rev) + } + sws.ctrlStream <- &pb.WatchResponse{ + Header: sws.newResponseHeader(wsrev), + WatchId: int64(id), + Created: true, + Canceled: futureRev || badPrefix, } case *pb.WatchRequest_CancelRequest: if uv.CancelRequest != nil { @@ -238,3 +237,21 @@ func (sws *serverWatchStream) newResponseHeader(rev int64) *pb.ResponseHeader { RaftTerm: sws.raftTimer.Term(), } } + +// TODO: remove getPrefix when storage supports full range watchers + +func getPrefix(key []byte) []byte { + end := make([]byte, len(key)) + copy(end, key) + for i := len(end) - 1; i >= 0; i-- { + if end[i] < 0xff { + end[i] = end[i] + 1 + end = end[:i+1] + return end + } + } + // next prefix does not exist (e.g., 0xffff); + // default to WithFromKey policy + end = []byte{0} + return end +} diff --git a/etcdserver/etcdserverpb/rpc.pb.go b/etcdserver/etcdserverpb/rpc.pb.go index 05ac13097..b3fdeb174 100644 --- a/etcdserver/etcdserverpb/rpc.pb.go +++ b/etcdserver/etcdserverpb/rpc.pb.go @@ -870,8 +870,9 @@ func _WatchRequest_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.B type WatchCreateRequest struct { // the key to be watched Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` - // the prefix to be watched. - Prefix []byte `protobuf:"bytes,2,opt,name=prefix,proto3" json:"prefix,omitempty"` + // if the range_end is given, keys in [key, range_end) are watched + // NOTE: only range_end == prefixEnd(key) is accepted now + RangeEnd []byte `protobuf:"bytes,2,opt,name=range_end,proto3" json:"range_end,omitempty"` // start_revision is an optional revision (including) to watch from. No start_revision is "now". StartRevision int64 `protobuf:"varint,3,opt,name=start_revision,proto3" json:"start_revision,omitempty"` } @@ -2588,12 +2589,12 @@ func (m *WatchCreateRequest) MarshalTo(data []byte) (int, error) { i += copy(data[i:], m.Key) } } - if m.Prefix != nil { - if len(m.Prefix) > 0 { + if m.RangeEnd != nil { + if len(m.RangeEnd) > 0 { data[i] = 0x12 i++ - i = encodeVarintRpc(data, i, uint64(len(m.Prefix))) - i += copy(data[i:], m.Prefix) + i = encodeVarintRpc(data, i, uint64(len(m.RangeEnd))) + i += copy(data[i:], m.RangeEnd) } } if m.StartRevision != 0 { @@ -3592,8 +3593,8 @@ func (m *WatchCreateRequest) Size() (n int) { n += 1 + l + sovRpc(uint64(l)) } } - if m.Prefix != nil { - l = len(m.Prefix) + if m.RangeEnd != nil { + l = len(m.RangeEnd) if l > 0 { n += 1 + l + sovRpc(uint64(l)) } @@ -6004,7 +6005,7 @@ func (m *WatchCreateRequest) Unmarshal(data []byte) error { iNdEx = postIndex case 2: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Prefix", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field RangeEnd", wireType) } var byteLen int for shift := uint(0); ; shift += 7 { @@ -6028,9 +6029,9 @@ func (m *WatchCreateRequest) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Prefix = append(m.Prefix[:0], data[iNdEx:postIndex]...) - if m.Prefix == nil { - m.Prefix = []byte{} + m.RangeEnd = append(m.RangeEnd[:0], data[iNdEx:postIndex]...) + if m.RangeEnd == nil { + m.RangeEnd = []byte{} } iNdEx = postIndex case 3: diff --git a/etcdserver/etcdserverpb/rpc.proto b/etcdserver/etcdserverpb/rpc.proto index 20fd26b43..f70152bba 100644 --- a/etcdserver/etcdserverpb/rpc.proto +++ b/etcdserver/etcdserverpb/rpc.proto @@ -262,11 +262,11 @@ message WatchRequest { message WatchCreateRequest { // the key to be watched bytes key = 1; - // the prefix to be watched. - bytes prefix = 2; + // if the range_end is given, keys in [key, range_end) are watched + // NOTE: only range_end == prefixEnd(key) is accepted now + bytes range_end = 2; // start_revision is an optional revision (including) to watch from. No start_revision is "now". int64 start_revision = 3; - // TODO: support Range watch? } message WatchCancelRequest { diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index 7ca7b830b..e479cc84f 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -71,7 +71,8 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { []string{"fooLong"}, &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ CreateRequest: &pb.WatchCreateRequest{ - Prefix: []byte("foo")}}}, + Key: []byte("foo"), + RangeEnd: []byte("fop")}}}, []*pb.WatchResponse{ { @@ -91,7 +92,8 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { []string{"foo"}, &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ CreateRequest: &pb.WatchCreateRequest{ - Prefix: []byte("helloworld")}}}, + Key: []byte("helloworld"), + RangeEnd: []byte("helloworle")}}}, []*pb.WatchResponse{}, }, @@ -140,7 +142,8 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { []string{"foo", "foo", "foo"}, &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ CreateRequest: &pb.WatchCreateRequest{ - Prefix: []byte("foo")}}}, + Key: []byte("foo"), + RangeEnd: []byte("fop")}}}, []*pb.WatchResponse{ { @@ -203,6 +206,11 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { t.Errorf("#%d: did not create watchid, got +%v", i, cresp) continue } + if cresp.Canceled { + t.Errorf("#%d: canceled watcher on create", i, cresp) + continue + } + createdWatchId := cresp.WatchId if cresp.Header == nil || cresp.Header.Revision != 1 { t.Errorf("#%d: header revision got +%v, wanted revison 1", i, cresp) @@ -353,7 +361,7 @@ func TestV3WatchCurrentPutOverlap(t *testing.T) { progress := make(map[int64]int64) wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ - CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("foo")}}} + CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo"), RangeEnd: []byte("fop")}}} if err := wStream.Send(wreq); err != nil { t.Fatalf("first watch request failed (%v)", err) } @@ -437,7 +445,7 @@ func testV3WatchMultipleWatchers(t *testing.T, startRev int64) { } else { wreq = &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ CreateRequest: &pb.WatchCreateRequest{ - Prefix: []byte("fo"), StartRevision: startRev}}} + Key: []byte("fo"), RangeEnd: []byte("fp"), StartRevision: startRev}}} } if err := wStream.Send(wreq); err != nil { t.Fatalf("wStream.Send error: %v", err) @@ -530,7 +538,7 @@ func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) { wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ CreateRequest: &pb.WatchCreateRequest{ - Prefix: []byte("foo"), StartRevision: startRev}}} + Key: []byte("foo"), RangeEnd: []byte("fop"), StartRevision: startRev}}} if err := wStream.Send(wreq); err != nil { t.Fatalf("wStream.Send error: %v", err) } @@ -623,7 +631,7 @@ func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) { wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ CreateRequest: &pb.WatchCreateRequest{ - Prefix: []byte("foo"), StartRevision: 1}}} + Key: []byte("foo"), RangeEnd: []byte("fop"), StartRevision: 1}}} if err := wStream.Send(wreq); err != nil { t.Fatalf("wStream.Send error: %v", err) }