clientv3: close watcher stream once all watchers detach

Fixes #6134
release-3.1
Anthony Romano 2016-08-09 00:08:28 -07:00
parent c3c41234f1
commit 5e651a0d0d
1 changed files with 18 additions and 7 deletions

View File

@ -311,7 +311,12 @@ func (w *watcher) Close() (err error) {
} }
func (w *watchGrpcStream) Close() (err error) { func (w *watchGrpcStream) Close() (err error) {
close(w.stopc) w.mu.Lock()
if w.stopc != nil {
close(w.stopc)
w.stopc = nil
}
w.mu.Unlock()
<-w.donec <-w.donec
select { select {
case err = <-w.errc: case err = <-w.errc:
@ -374,11 +379,17 @@ func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchReq
// closeStream closes the watcher resources and removes it // closeStream closes the watcher resources and removes it
func (w *watchGrpcStream) closeStream(ws *watcherStream) { func (w *watchGrpcStream) closeStream(ws *watcherStream) {
w.mu.Lock()
// cancels request stream; subscriber receives nil channel // cancels request stream; subscriber receives nil channel
close(ws.initReq.retc) close(ws.initReq.retc)
// close subscriber's channel // close subscriber's channel
close(ws.outc) close(ws.outc)
delete(w.streams, ws.id) delete(w.streams, ws.id)
if len(w.streams) == 0 && w.stopc != nil {
close(w.stopc)
w.stopc = nil
}
w.mu.Unlock()
} }
// run is the root of the goroutines for managing a watcher client // run is the root of the goroutines for managing a watcher client
@ -404,6 +415,7 @@ func (w *watchGrpcStream) run() {
var pendingReq, failedReq *watchRequest var pendingReq, failedReq *watchRequest
curReqC := w.reqc curReqC := w.reqc
stopc := w.stopc
cancelSet := make(map[int64]struct{}) cancelSet := make(map[int64]struct{})
for { for {
@ -473,7 +485,7 @@ func (w *watchGrpcStream) run() {
failedReq = pendingReq failedReq = pendingReq
} }
cancelSet = make(map[int64]struct{}) cancelSet = make(map[int64]struct{})
case <-w.stopc: case <-stopc:
return return
} }
@ -625,9 +637,7 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) {
} }
} }
w.mu.Lock()
w.closeStream(ws) w.closeStream(ws)
w.mu.Unlock()
// lazily send cancel message if events on missing id // lazily send cancel message if events on missing id
} }
@ -655,13 +665,14 @@ func (w *watchGrpcStream) resume() (ws pb.Watch_WatchClient, err error) {
// openWatchClient retries opening a watchclient until retryConnection fails // openWatchClient retries opening a watchclient until retryConnection fails
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) { func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
for { for {
select { w.mu.Lock()
case <-w.stopc: stopc := w.stopc
w.mu.Unlock()
if stopc == nil {
if err == nil { if err == nil {
err = context.Canceled err = context.Canceled
} }
return nil, err return nil, err
default:
} }
if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil { if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil {
break break