From 57c68ab1dbac2a5f998fb86ecf7d0b7729b95540 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 2 Aug 2016 20:51:30 -0700 Subject: [PATCH] grpcproxy: handle create event --- clientv3/watch.go | 2 +- proxy/grpcproxy/watch.go | 13 +------------ proxy/grpcproxy/watcher.go | 3 ++- proxy/grpcproxy/watcher_groups.go | 7 ++++++- 4 files changed, 10 insertions(+), 15 deletions(-) diff --git a/clientv3/watch.go b/clientv3/watch.go index 70aba565f..4b7344615 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -92,7 +92,7 @@ func (wr *WatchResponse) Err() error { // IsProgressNotify returns true if the WatchResponse is progress notification. func (wr *WatchResponse) IsProgressNotify() bool { - return len(wr.Events) == 0 && !wr.Canceled + return len(wr.Events) == 0 && !wr.Canceled && !wr.Created } // watcher implements the Watcher interface diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index 395017320..ceb7794a3 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -123,19 +123,7 @@ func (sws *serverWatchStream) recvLoop() error { } else { sws.addCoalescedWatcher(watcher) } - - wresp := &pb.WatchResponse{ - Header: &pb.ResponseHeader{}, // TODO: fill in header - WatchId: sws.nextWatcherID, - Created: true, - } - sws.nextWatcherID++ - select { - case sws.ctrlCh <- wresp: - default: - panic("handle this") - } case *pb.WatchRequest_CancelRequest: sws.removeWatcher(uv.CancelRequest.WatchId) @@ -185,6 +173,7 @@ func (sws *serverWatchStream) addDedicatedWatcher(w watcher, rev int64) { w.wr.key, clientv3.WithRange(w.wr.end), clientv3.WithRev(rev), clientv3.WithProgressNotify(), + clientv3.WithCreatedNotify(), ) ws := newWatcherSingle(wch, cancel, w, sws) diff --git a/proxy/grpcproxy/watcher.go b/proxy/grpcproxy/watcher.go index ca0aae65d..b018a663f 100644 --- a/proxy/grpcproxy/watcher.go +++ b/proxy/grpcproxy/watcher.go @@ -66,12 +66,13 @@ func (w *watcher) send(wr clientv3.WatchResponse) { } // all events are filtered out? - if !wr.IsProgressNotify() && len(events) == 0 { + if !wr.IsProgressNotify() && !wr.Created && len(events) == 0 { return } pbwr := &pb.WatchResponse{ Header: &wr.Header, + Created: wr.Created, WatchId: w.id, Events: events, } diff --git a/proxy/grpcproxy/watcher_groups.go b/proxy/grpcproxy/watcher_groups.go index 25d40c19e..514b5b44d 100644 --- a/proxy/grpcproxy/watcher_groups.go +++ b/proxy/grpcproxy/watcher_groups.go @@ -42,7 +42,12 @@ func (wgs *watchergroups) addWatcher(rid receiverID, w watcher) { ctx, cancel := context.WithCancel(context.Background()) - wch := wgs.cw.Watch(ctx, w.wr.key, clientv3.WithRange(w.wr.end), clientv3.WithProgressNotify()) + wch := wgs.cw.Watch(ctx, w.wr.key, + clientv3.WithRange(w.wr.end), + clientv3.WithProgressNotify(), + clientv3.WithCreatedNotify(), + ) + watchg := newWatchergroup(wch, cancel) watchg.add(rid, w) go watchg.run()