diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index 2d99249aa..f3863494a 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -804,21 +804,39 @@ func TestWatchCancelOnServer(t *testing.T) { defer cluster.Terminate(t) client := cluster.RandClient() + numWatches := 10 - for i := 0; i < 10; i++ { + cancels := make([]context.CancelFunc, numWatches) + for i := 0; i < numWatches; i++ { + // use WithTimeout to force separate streams in client ctx, cancel := context.WithTimeout(context.Background(), time.Second) - client.Watch(ctx, "a", clientv3.WithCreatedNotify()) - cancel() + cancels[i] = cancel + w := client.Watch(ctx, fmt.Sprintf("%d", i), clientv3.WithCreatedNotify()) + <-w + } + + // get max watches; proxy tests have leadership watches, so total may be >numWatches + maxWatches, _ := cluster.Members[0].Metric("etcd_debugging_mvcc_watcher_total") + + // cancel all and wait for cancels to propagate to etcd server + for i := 0; i < numWatches; i++ { + cancels[i]() } - // wait for cancels to propagate time.Sleep(time.Second) - watchers, err := cluster.Members[0].Metric("etcd_debugging_mvcc_watcher_total") + minWatches, err := cluster.Members[0].Metric("etcd_debugging_mvcc_watcher_total") if err != nil { t.Fatal(err) } - if watchers != "0" { - t.Fatalf("expected 0 watchers, got %q", watchers) + + maxWatchV, minWatchV := 0, 0 + n, serr := fmt.Sscanf(maxWatches+" "+minWatches, "%d %d", &maxWatchV, &minWatchV) + if n != 2 || serr != nil { + t.Fatalf("expected n=2 and err=nil, got n=%d and err=%v", n, serr) + } + + if maxWatchV-minWatchV != numWatches { + t.Fatalf("expected %d canceled watchers, got %d", numWatches, maxWatchV-minWatchV) } }