diff --git a/etcdmain/grpc_proxy.go b/etcdmain/grpc_proxy.go index c85fd5d17..2a0488bb1 100644 --- a/etcdmain/grpc_proxy.go +++ b/etcdmain/grpc_proxy.go @@ -104,7 +104,7 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { } kvp := grpcproxy.NewKvProxy(client) - watchp := grpcproxy.NewWatchProxy(client) + watchp, _ := grpcproxy.NewWatchProxy(client) clusterp := grpcproxy.NewClusterProxy(client) leasep := grpcproxy.NewLeaseProxy(client) mainp := grpcproxy.NewMaintenanceProxy(client) diff --git a/integration/cluster_proxy.go b/integration/cluster_proxy.go index c5c76774f..ea4bbec03 100644 --- a/integration/cluster_proxy.go +++ b/integration/cluster_proxy.go @@ -26,25 +26,43 @@ import ( var ( pmu sync.Mutex - proxies map[*clientv3.Client]grpcAPI = make(map[*clientv3.Client]grpcAPI) + proxies map[*clientv3.Client]grpcClientProxy = make(map[*clientv3.Client]grpcClientProxy) ) +type grpcClientProxy struct { + grpc grpcAPI + wdonec <-chan struct{} +} + func toGRPC(c *clientv3.Client) grpcAPI { pmu.Lock() defer pmu.Unlock() if v, ok := proxies[c]; ok { - return v + return v.grpc } - api := grpcAPI{ + + wp, wpch := grpcproxy.NewWatchProxy(c) + grpc := grpcAPI{ pb.NewClusterClient(c.ActiveConnection()), grpcproxy.KvServerToKvClient(grpcproxy.NewKvProxy(c)), pb.NewLeaseClient(c.ActiveConnection()), - grpcproxy.WatchServerToWatchClient(grpcproxy.NewWatchProxy(c)), + grpcproxy.WatchServerToWatchClient(wp), pb.NewMaintenanceClient(c.ActiveConnection()), } - proxies[c] = api - return api + proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch} + return grpc +} + +type watchCloser struct { + clientv3.Watcher + wdonec <-chan struct{} +} + +func (wc *watchCloser) Close() error { + err := wc.Watcher.Close() + <-wc.wdonec + return err } func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) { @@ -54,6 +72,11 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) { } rpc := toGRPC(c) c.KV = clientv3.NewKVFromKVClient(rpc.KV) - c.Watcher = clientv3.NewWatchFromWatchClient(rpc.Watch) + pmu.Lock() + c.Watcher = &watchCloser{ + Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch), + wdonec: proxies[c].wdonec, + } + pmu.Unlock() return c, nil } diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index ad99fac71..c6570d0c2 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -407,6 +407,7 @@ func TestV3WatchCancelUnsynced(t *testing.T) { func testV3WatchCancel(t *testing.T, startRev int64) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -455,8 +456,6 @@ func testV3WatchCancel(t *testing.T, startRev int64) { if !rok { t.Errorf("unexpected pb.WatchResponse is received %+v", nr) } - - clus.Terminate(t) } // TestV3WatchCurrentPutOverlap ensures current watchers receive all events with @@ -541,7 +540,10 @@ func TestV3WatchCurrentPutOverlap(t *testing.T) { // TestV3WatchEmptyKey ensures synced watchers see empty key PUTs as PUT events func TestV3WatchEmptyKey(t *testing.T) { + defer testutil.AfterTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -581,8 +583,6 @@ func TestV3WatchEmptyKey(t *testing.T) { if !reflect.DeepEqual(resp.Events, wevs) { t.Fatalf("got %v, expected %v", resp.Events, wevs) } - - clus.Terminate(t) } func TestV3WatchMultipleWatchersSynced(t *testing.T) { @@ -601,6 +601,8 @@ func TestV3WatchMultipleWatchersUnsynced(t *testing.T) { // one watcher to test if it receives expected events. func testV3WatchMultipleWatchers(t *testing.T, startRev int64) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + kvc := toGRPC(clus.RandClient()).KV ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) @@ -686,8 +688,6 @@ func testV3WatchMultipleWatchers(t *testing.T, startRev int64) { if !rok { t.Errorf("unexpected pb.WatchResponse is received %+v", nr) } - - clus.Terminate(t) } func TestV3WatchMultipleEventsTxnSynced(t *testing.T) { @@ -703,6 +703,7 @@ func TestV3WatchMultipleEventsTxnUnsynced(t *testing.T) { // testV3WatchMultipleEventsTxn tests Watch APIs when it receives multiple events. func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -772,9 +773,6 @@ func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) { if !rok { t.Errorf("unexpected pb.WatchResponse is received %+v", nr) } - - // can't defer because tcp ports will be in use - clus.Terminate(t) } type eventsSortByKey []*mvccpb.Event @@ -875,6 +873,8 @@ func TestV3WatchMultipleStreamsUnsynced(t *testing.T) { // testV3WatchMultipleStreams tests multiple watchers on the same key on multiple streams. func testV3WatchMultipleStreams(t *testing.T, startRev int64) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + wAPI := toGRPC(clus.RandClient()).Watch kvc := toGRPC(clus.RandClient()).KV @@ -939,8 +939,6 @@ func testV3WatchMultipleStreams(t *testing.T, startRev int64) { }(i) } wg.Wait() - - clus.Terminate(t) } // waitResponse waits on the given stream for given duration. diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index 88c63af7a..2c070b31c 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -49,7 +49,7 @@ const ( retryPerSecond = 10 ) -func NewWatchProxy(c *clientv3.Client) pb.WatchServer { +func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) { wp := &watchProxy{ cw: c.Watcher, ctx: clientv3.WithRequireLeader(c.Ctx()), @@ -57,7 +57,9 @@ func NewWatchProxy(c *clientv3.Client) pb.WatchServer { leaderc: make(chan struct{}), } wp.ranges = newWatchRanges(wp) + ch := make(chan struct{}) go func() { + defer close(ch) // a new streams without opening any watchers won't catch // a lost leader event, so have a special watch to monitor it rev := int64((uint64(1) << 63) - 2) @@ -77,7 +79,7 @@ func NewWatchProxy(c *clientv3.Client) pb.WatchServer { wp.wg.Wait() wp.ranges.stop() }() - return wp + return wp, ch } func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {