diff --git a/clientv3/client.go b/clientv3/client.go index 71492b422..8ad7f0cf9 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -101,25 +101,39 @@ func NewFromConfigFile(path string) (*Client, error) { } // Close shuts down the client's etcd connections. -func (c *Client) Close() error { +func (c *Client) Close() (err error) { c.mu.Lock() defer c.mu.Unlock() + + // acquire the cancel if c.cancel == nil { - return nil + // already canceled + if c.lastConnErr != c.ctx.Err() { + err = c.lastConnErr + } + return } - c.cancel() + cancel := c.cancel c.cancel = nil - connc := c.newconnc c.mu.Unlock() - c.connStartRetry(nil) + + // close watcher and lease before terminating connection + // so they don't retry on a closed client c.Watcher.Close() c.Lease.Close() + + // cancel reconnection loop + cancel() + c.mu.Lock() + connc := c.newconnc + c.mu.Unlock() + // connc on cancel() is left closed <-connc c.mu.Lock() if c.lastConnErr != c.ctx.Err() { - return c.lastConnErr + err = c.lastConnErr } - return nil + return } // Ctx is a context for "out of band" messages (e.g., for sending @@ -282,30 +296,41 @@ func (c *Client) ActiveConnection() *grpc.ClientConn { } // retryConnection establishes a new connection -func (c *Client) retryConnection(err error) (newConn *grpc.ClientConn, dialErr error) { +func (c *Client) retryConnection(err error) { + oldconn := c.conn + + // return holding lock so old connection can be cleaned up in this defer + defer func() { + if oldconn != nil { + oldconn.Close() + if st, _ := oldconn.State(); st != grpc.Shutdown { + // wait so grpc doesn't leak sleeping goroutines + oldconn.WaitForStateChange(context.Background(), st) + } + } + c.mu.Unlock() + }() + c.mu.Lock() - defer c.mu.Unlock() if err != nil { c.errors = append(c.errors, err) } - if c.conn != nil { - c.conn.Close() - if st, _ := c.conn.State(); st != grpc.Shutdown { - // wait so grpc doesn't leak sleeping goroutines - c.conn.WaitForStateChange(context.Background(), st) - } - c.conn = nil - } if c.cancel == nil { // client has called Close() so don't try to dial out - return nil, c.ctx.Err() + return } + c.mu.Unlock() - c.conn, dialErr = c.cfg.retryDialer(c) + nc, dialErr := c.cfg.retryDialer(c) + + c.mu.Lock() + if nc != nil { + c.conn = nc + } if dialErr != nil { c.errors = append(c.errors, dialErr) } - return c.conn, dialErr + c.lastConnErr = dialErr } // connStartRetry schedules a reconnect if one is not already running @@ -321,17 +346,20 @@ func (c *Client) connStartRetry(err error) { // connWait waits for a reconnect to be processed func (c *Client) connWait(ctx context.Context, err error) (*grpc.ClientConn, error) { - c.mu.Lock() + c.mu.RLock() ch := c.newconnc - c.mu.Unlock() + c.mu.RUnlock() c.connStartRetry(err) select { case <-ctx.Done(): return nil, ctx.Err() case <-ch: } - c.mu.Lock() - defer c.mu.Unlock() + c.mu.RLock() + defer c.mu.RUnlock() + if c.cancel == nil { + return c.conn, rpctypes.ErrConnClosed + } return c.conn, c.lastConnErr } @@ -340,11 +368,8 @@ func (c *Client) connMonitor() { var err error defer func() { - _, err = c.retryConnection(c.ctx.Err()) - c.mu.Lock() - c.lastConnErr = err + c.retryConnection(c.ctx.Err()) close(c.newconnc) - c.mu.Unlock() }() limiter := rate.NewLimiter(rate.Every(minConnRetryWait), 1) @@ -354,10 +379,8 @@ func (c *Client) connMonitor() { case <-c.ctx.Done(): return } - conn, connErr := c.retryConnection(err) + c.retryConnection(err) c.mu.Lock() - c.lastConnErr = connErr - c.conn = conn close(c.newconnc) c.newconnc = make(chan struct{}) c.reconnc = make(chan error, 1) diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index 24252c428..e9b320756 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -131,6 +131,13 @@ func TestKVPutWithRequireLeader(t *testing.T) { if err != rpctypes.ErrNoLeader { t.Fatal(err) } + + // clients may give timeout errors since the members are stopped; take + // the clients so that terminating the cluster won't complain + clus.Client(1).Close() + clus.Client(2).Close() + clus.TakeClient(1) + clus.TakeClient(2) } func TestKVRange(t *testing.T) { @@ -633,13 +640,22 @@ func TestKVPutStoppedServerAndClose(t *testing.T) { defer testutil.AfterTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) + cli := clus.Client(0) clus.Members[0].Stop(t) // this Put fails and triggers an asynchronous connection retry - _, err := clus.Client(0).Put(context.TODO(), "abc", "123") + _, err := cli.Put(context.TODO(), "abc", "123") if err == nil || (!strings.Contains(err.Error(), "connection is closing") && !strings.Contains(err.Error(), "transport is closing")) { t.Fatal(err) } - // cluster will terminate and close the client with the retry in-flight + + // wait some so the client closes with the retry in-flight + time.Sleep(time.Second) + + // get the timeout + clus.TakeClient(0) + if err := cli.Close(); err == nil || !strings.Contains(err.Error(), "timed out") { + t.Fatal(err) + } } diff --git a/clientv3/integration/txn_test.go b/clientv3/integration/txn_test.go index f381ccfad..68633e6fa 100644 --- a/clientv3/integration/txn_test.go +++ b/clientv3/integration/txn_test.go @@ -74,7 +74,7 @@ func TestTxnWriteFail(t *testing.T) { dialTimeout := 5 * time.Second select { - case <-time.After(2*dialTimeout + time.Second): + case <-time.After(dialTimeout + time.Second): t.Fatalf("timed out waiting for txn to fail") case <-donec: // don't restart cluster until txn errors out diff --git a/clientv3/remote_client.go b/clientv3/remote_client.go index b51116305..3bc3484c2 100644 --- a/clientv3/remote_client.go +++ b/clientv3/remote_client.go @@ -88,9 +88,11 @@ func (r *remoteClient) acquire(ctx context.Context) error { r.client.mu.RLock() closed := r.client.cancel == nil c := r.client.conn + lastConnErr := r.client.lastConnErr match := r.conn == c r.mu.Unlock() - if c != nil && match { + if lastConnErr == nil && match { + // new connection already return nil } r.client.mu.RUnlock()