Merge pull request #4678 from gyuho/watch_notify_clientv3

clientv3: add WithProgressNotify
release-2.3
Gyu-Ho Lee 2016-03-04 10:08:25 -08:00
commit 4097a72c0b
3 changed files with 85 additions and 9 deletions

View File

@ -396,3 +396,59 @@ func TestWatchCompactRevision(t *testing.T) {
t.Fatalf("expected closed channel, but got %v", wresp)
}
}
func TestWatchWithProgressNotify(t *testing.T) { testWatchWithProgressNotify(t, true) }
func TestWatchWithProgressNotifyNoEvent(t *testing.T) { testWatchWithProgressNotify(t, false) }
func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
wc := clientv3.NewWatcher(clus.RandClient())
defer wc.Close()
testInterval := 3 * time.Second
pi := v3rpc.ProgressReportInterval
v3rpc.ProgressReportInterval = testInterval
defer func() { v3rpc.ProgressReportInterval = pi }()
opts := []clientv3.OpOption{clientv3.WithProgressNotify()}
if watchOnPut {
opts = append(opts, clientv3.WithPrefix())
}
rch := wc.Watch(context.Background(), "foo", opts...)
select {
case resp := <-rch: // wait for notification
if len(resp.Events) != 0 {
t.Fatalf("resp.Events expected none, got %+v", resp.Events)
}
case <-time.After(2 * pi):
t.Fatalf("watch response expected in %v, but timed out", pi)
}
kvc := clientv3.NewKV(clus.RandClient())
if _, err := kvc.Put(context.TODO(), "foox", "bar"); err != nil {
t.Fatal(err)
}
select {
case resp := <-rch:
if resp.Header.Revision != 2 {
t.Fatalf("resp.Header.Revision expected 2, got %d", resp.Header.Revision)
}
if watchOnPut { // wait for put if watch on the put key
ev := []*storagepb.Event{{Type: storagepb.PUT,
Kv: &storagepb.KeyValue{Key: []byte("foox"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}}}
if !reflect.DeepEqual(ev, resp.Events) {
t.Fatalf("expected %+v, got %+v", ev, resp.Events)
}
} else if len(resp.Events) != 0 { // wait for notification otherwise
t.Fatalf("expected no events, but got %+v", resp.Events)
}
case <-time.After(2 * pi):
t.Fatalf("watch response expected in %v, but timed out", pi)
}
}

View File

@ -42,6 +42,9 @@ type Op struct {
// for range, watch
rev int64
// progressNotify is for progress updates.
progressNotify bool
// for put
val []byte
leaseID lease.LeaseID
@ -225,3 +228,11 @@ func WithLastRev() []OpOption { return withTop(SortByModifiedRev, SortDescend) }
func withTop(target SortTarget, order SortOrder) []OpOption {
return []OpOption{WithPrefix(), WithSort(target, order), WithLimit(1)}
}
// WithProgressNotify makes watch server send periodic progress updates.
// Progress updates have zero events in WatchResponse.
func WithProgressNotify() OpOption {
return func(op *Op) {
op.progressNotify = true
}
}

View File

@ -94,6 +94,8 @@ type watchRequest struct {
key string
end string
rev int64
// progressNotify is for progress updates.
progressNotify bool
// retc receives a chan WatchResponse once the watcher is established
retc chan chan WatchResponse
}
@ -143,11 +145,12 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
retc := make(chan chan WatchResponse, 1)
wr := &watchRequest{
ctx: ctx,
key: string(ow.key),
end: string(ow.end),
rev: ow.rev,
retc: retc,
ctx: ctx,
key: string(ow.key),
end: string(ow.end),
rev: ow.rev,
progressNotify: ow.progressNotify,
retc: retc,
}
ok := false
@ -392,7 +395,12 @@ func (w *watcher) serveStream(ws *watcherStream) {
closing = true
break
}
newRev := wrs[0].Events[len(wrs[0].Events)-1].Kv.ModRevision
var newRev int64
if len(wrs[0].Events) > 0 {
newRev = wrs[0].Events[len(wrs[0].Events)-1].Kv.ModRevision
} else {
newRev = wrs[0].Header.Revision
}
if newRev != ws.lastRev {
ws.lastRev = newRev
}
@ -518,9 +526,10 @@ func (w *watcher) resumeWatchers(wc pb.Watch_WatchClient) error {
// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
func (wr *watchRequest) toPB() *pb.WatchRequest {
req := &pb.WatchCreateRequest{
StartRevision: wr.rev,
Key: []byte(wr.key),
RangeEnd: []byte(wr.end),
StartRevision: wr.rev,
Key: []byte(wr.key),
RangeEnd: []byte(wr.end),
ProgressNotify: wr.progressNotify,
}
cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
return &pb.WatchRequest{RequestUnion: cr}