diff --git a/clientv3/integration/lease_test.go b/clientv3/integration/lease_test.go index a0c411826..ef176fddf 100644 --- a/clientv3/integration/lease_test.go +++ b/clientv3/integration/lease_test.go @@ -729,6 +729,12 @@ func TestLeaseWithRequireLeader(t *testing.T) { } clus.Members[1].Stop(t) + // kaReqLeader may issue multiple requests while waiting for the first + // response from proxy server; drain any stray keepalive responses + time.Sleep(100 * time.Millisecond) + for len(kaReqLeader) > 0 { + <-kaReqLeader + } select { case resp, ok := <-kaReqLeader: diff --git a/integration/cluster_proxy.go b/integration/cluster_proxy.go index 0152a16c6..8593b5064 100644 --- a/integration/cluster_proxy.go +++ b/integration/cluster_proxy.go @@ -75,6 +75,7 @@ type proxyCloser struct { clientv3.Watcher wdonec <-chan struct{} kvdonec <-chan struct{} + lclose func() lpdonec <-chan struct{} } @@ -83,6 +84,7 @@ func (pc *proxyCloser) Close() error { <-pc.kvdonec err := pc.Watcher.Close() <-pc.wdonec + pc.lclose() <-pc.lpdonec return err } @@ -95,11 +97,13 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) { rpc := toGRPC(c) c.KV = clientv3.NewKVFromKVClient(rpc.KV) pmu.Lock() + lc := c.Lease c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, cfg.DialTimeout) c.Watcher = &proxyCloser{ Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch), wdonec: proxies[c].wdonec, kvdonec: proxies[c].kvdonec, + lclose: func() { lc.Close() }, lpdonec: proxies[c].lpdonec, } pmu.Unlock()