diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index 2e13ab9e9..d33723cf5 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -157,6 +157,33 @@ func testWatchMultiWatcher(t *testing.T, wctx *watchctx) { } } +// TestWatchReconnRequest tests the send failure path when requesting a watcher. +func TestWatchReconnRequest(t *testing.T) { + runWatchTest(t, testWatchReconnRequest) +} + +func testWatchReconnRequest(t *testing.T, wctx *watchctx) { + // take down watcher connection + donec := make(chan struct{}) + go func() { + for { + wctx.wclient.ActiveConnection().Close() + select { + case <-donec: + return + default: + } + } + }() + // should reconnect when requesting watch + if wctx.ch = wctx.w.Watch(context.TODO(), "a", 0); wctx.ch == nil { + t.Fatalf("expected non-nil channel") + } + close(donec) + // ensure watcher works + putAndWatch(t, wctx, "a", "a") +} + // TestWatchReconnInit tests watcher resumes correctly if connection lost // before any data was sent. func TestWatchReconnInit(t *testing.T) { diff --git a/clientv3/watch.go b/clientv3/watch.go index 80ecca4e8..79b78a433 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -286,13 +286,13 @@ func (w *watcher) run() { // send failed; queue for retry if failedReq != nil { - go func() { + go func(wr *watchRequest) { select { - case w.reqc <- pendingReq: - case <-pendingReq.ctx.Done(): + case w.reqc <- wr: + case <-wr.ctx.Done(): case <-w.donec: } - }() + }(pendingReq) failedReq = nil pendingReq = nil }