clientv3: use waitgroup to wait for substream goroutine teardown

When a grpc watch stream is torn down, it will join on its logical substream
goroutines by waiting for each to close a channel. This doesn't guarantee
the substream is fully exited, though, but only about to exit and can be
waiting to resume even after Watch.Close finishes. Instead, use a
waitgroup.Done at the very end of the substream defer.

Fixes #7573
release-3.2
Anthony Romano 2017-03-22 22:14:39 -07:00
parent 1a75165ed8
commit a39107a3b8
1 changed files with 6 additions and 1 deletions

View File

@ -132,6 +132,8 @@ type watchGrpcStream struct {
errc chan error errc chan error
// closingc gets the watcherStream of closing watchers // closingc gets the watcherStream of closing watchers
closingc chan *watcherStream closingc chan *watcherStream
// wg is Done when all substream goroutines have exited
wg sync.WaitGroup
// resumec closes to signal that all substreams should begin resuming // resumec closes to signal that all substreams should begin resuming
resumec chan struct{} resumec chan struct{}
@ -406,7 +408,7 @@ func (w *watchGrpcStream) run() {
for range closing { for range closing {
w.closeSubstream(<-w.closingc) w.closeSubstream(<-w.closingc)
} }
w.wg.Wait()
w.owner.closeStream(w) w.owner.closeStream(w)
}() }()
@ -431,6 +433,7 @@ func (w *watchGrpcStream) run() {
} }
ws.donec = make(chan struct{}) ws.donec = make(chan struct{})
w.wg.Add(1)
go w.serveSubstream(ws, w.resumec) go w.serveSubstream(ws, w.resumec)
// queue up for watcher creation/resume // queue up for watcher creation/resume
@ -576,6 +579,7 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
if !resuming { if !resuming {
w.closingc <- ws w.closingc <- ws
} }
w.wg.Done()
}() }()
emptyWr := &WatchResponse{} emptyWr := &WatchResponse{}
@ -674,6 +678,7 @@ func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
continue continue
} }
ws.donec = make(chan struct{}) ws.donec = make(chan struct{})
w.wg.Add(1)
go w.serveSubstream(ws, w.resumec) go w.serveSubstream(ws, w.resumec)
} }