From 5e651a0d0d23785a68503a337059ba00d967e2a1 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 9 Aug 2016 00:08:28 -0700 Subject: [PATCH] clientv3: close watcher stream once all watchers detach Fixes #6134 --- clientv3/watch.go | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/clientv3/watch.go b/clientv3/watch.go index 4b7344615..ef4aa5304 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -311,7 +311,12 @@ func (w *watcher) Close() (err error) { } func (w *watchGrpcStream) Close() (err error) { - close(w.stopc) + w.mu.Lock() + if w.stopc != nil { + close(w.stopc) + w.stopc = nil + } + w.mu.Unlock() <-w.donec select { case err = <-w.errc: @@ -374,11 +379,17 @@ func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchReq // closeStream closes the watcher resources and removes it func (w *watchGrpcStream) closeStream(ws *watcherStream) { + w.mu.Lock() // cancels request stream; subscriber receives nil channel close(ws.initReq.retc) // close subscriber's channel close(ws.outc) delete(w.streams, ws.id) + if len(w.streams) == 0 && w.stopc != nil { + close(w.stopc) + w.stopc = nil + } + w.mu.Unlock() } // run is the root of the goroutines for managing a watcher client @@ -404,6 +415,7 @@ func (w *watchGrpcStream) run() { var pendingReq, failedReq *watchRequest curReqC := w.reqc + stopc := w.stopc cancelSet := make(map[int64]struct{}) for { @@ -473,7 +485,7 @@ func (w *watchGrpcStream) run() { failedReq = pendingReq } cancelSet = make(map[int64]struct{}) - case <-w.stopc: + case <-stopc: return } @@ -625,9 +637,7 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) { } } - w.mu.Lock() w.closeStream(ws) - w.mu.Unlock() // lazily send cancel message if events on missing id } @@ -655,13 +665,14 @@ func (w *watchGrpcStream) resume() (ws pb.Watch_WatchClient, err error) { // openWatchClient retries opening a watchclient until retryConnection fails func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) { for { - select { - case <-w.stopc: + w.mu.Lock() + stopc := w.stopc + w.mu.Unlock() + if stopc == nil { if err == nil { err = context.Canceled } return nil, err - default: } if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil { break