diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index 22c71019f..62d915a85 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -442,8 +442,8 @@ func TestKVGetErrConnClosed(t *testing.T) { go func() { defer close(donec) _, err := cli.Get(context.TODO(), "foo") - if err != nil && err != grpc.ErrClientConnClosing { - t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err) + if err != nil && err != context.Canceled && err != grpc.ErrClientConnClosing { + t.Fatalf("expected %v or %v, got %v", context.Canceled, grpc.ErrClientConnClosing, err) } }() @@ -473,8 +473,8 @@ func TestKVNewAfterClose(t *testing.T) { donec := make(chan struct{}) go func() { - if _, err := cli.Get(context.TODO(), "foo"); err != grpc.ErrClientConnClosing { - t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err) + if _, err := cli.Get(context.TODO(), "foo"); err != context.Canceled { + t.Fatalf("expected %v, got %v", context.Canceled, err) } close(donec) }() @@ -791,7 +791,7 @@ func TestKVGetStoppedServerAndClose(t *testing.T) { // this Get fails and triggers an asynchronous connection retry _, err := cli.Get(ctx, "abc") cancel() - if !strings.Contains(err.Error(), "context deadline") { + if err != nil && err != context.DeadlineExceeded { t.Fatal(err) } } @@ -813,14 +813,14 @@ func TestKVPutStoppedServerAndClose(t *testing.T) { // grpc finds out the original connection is down due to the member shutdown. _, err := cli.Get(ctx, "abc") cancel() - if !strings.Contains(err.Error(), "context deadline") { + if err != nil && err != context.DeadlineExceeded { t.Fatal(err) } // this Put fails and triggers an asynchronous connection retry _, err = cli.Put(ctx, "abc", "123") cancel() - if !strings.Contains(err.Error(), "context deadline") { + if err != nil && err != context.DeadlineExceeded { t.Fatal(err) } } diff --git a/clientv3/integration/lease_test.go b/clientv3/integration/lease_test.go index 360806ef8..dc001c7a3 100644 --- a/clientv3/integration/lease_test.go +++ b/clientv3/integration/lease_test.go @@ -319,8 +319,8 @@ func TestLeaseGrantNewAfterClose(t *testing.T) { donec := make(chan struct{}) go func() { - if _, err := cli.Grant(context.TODO(), 5); err != grpc.ErrClientConnClosing { - t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err) + if _, err := cli.Grant(context.TODO(), 5); err != context.Canceled && err != grpc.ErrClientConnClosing { + t.Fatalf("expected %v or %v, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err) } close(donec) }() @@ -351,8 +351,8 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) { donec := make(chan struct{}) go func() { - if _, err := cli.Revoke(context.TODO(), leaseID); err != grpc.ErrClientConnClosing { - t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err) + if _, err := cli.Revoke(context.TODO(), leaseID); err != context.Canceled && err != grpc.ErrClientConnClosing { + t.Fatalf("expected %v or %v, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err) } close(donec) }() diff --git a/clientv3/retry.go b/clientv3/retry.go index aab2c9235..e4d1206f7 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -51,6 +51,13 @@ func isWriteStopError(err error) bool { func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc { return func(rpcCtx context.Context, f rpcFunc) error { for { + select { + case <-c.balancer.ConnectNotify(): + case <-rpcCtx.Done(): + return rpcCtx.Err() + case <-c.ctx.Done(): + return c.ctx.Err() + } err := f(rpcCtx) if err == nil { return nil