Merge pull request #8601 from gyuho/notify
clientv3: wait for ConnectNotify before sending RPCsrelease-3.3
commit
398e6ba2a6
|
@ -442,8 +442,8 @@ func TestKVGetErrConnClosed(t *testing.T) {
|
||||||
go func() {
|
go func() {
|
||||||
defer close(donec)
|
defer close(donec)
|
||||||
_, err := cli.Get(context.TODO(), "foo")
|
_, err := cli.Get(context.TODO(), "foo")
|
||||||
if err != nil && err != grpc.ErrClientConnClosing {
|
if err != nil && err != context.Canceled && err != grpc.ErrClientConnClosing {
|
||||||
t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err)
|
t.Fatalf("expected %v or %v, got %v", context.Canceled, grpc.ErrClientConnClosing, err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -473,8 +473,8 @@ func TestKVNewAfterClose(t *testing.T) {
|
||||||
|
|
||||||
donec := make(chan struct{})
|
donec := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
if _, err := cli.Get(context.TODO(), "foo"); err != grpc.ErrClientConnClosing {
|
if _, err := cli.Get(context.TODO(), "foo"); err != context.Canceled {
|
||||||
t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err)
|
t.Fatalf("expected %v, got %v", context.Canceled, err)
|
||||||
}
|
}
|
||||||
close(donec)
|
close(donec)
|
||||||
}()
|
}()
|
||||||
|
@ -791,7 +791,7 @@ func TestKVGetStoppedServerAndClose(t *testing.T) {
|
||||||
// this Get fails and triggers an asynchronous connection retry
|
// this Get fails and triggers an asynchronous connection retry
|
||||||
_, err := cli.Get(ctx, "abc")
|
_, err := cli.Get(ctx, "abc")
|
||||||
cancel()
|
cancel()
|
||||||
if !strings.Contains(err.Error(), "context deadline") {
|
if err != nil && err != context.DeadlineExceeded {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -813,14 +813,14 @@ func TestKVPutStoppedServerAndClose(t *testing.T) {
|
||||||
// grpc finds out the original connection is down due to the member shutdown.
|
// grpc finds out the original connection is down due to the member shutdown.
|
||||||
_, err := cli.Get(ctx, "abc")
|
_, err := cli.Get(ctx, "abc")
|
||||||
cancel()
|
cancel()
|
||||||
if !strings.Contains(err.Error(), "context deadline") {
|
if err != nil && err != context.DeadlineExceeded {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// this Put fails and triggers an asynchronous connection retry
|
// this Put fails and triggers an asynchronous connection retry
|
||||||
_, err = cli.Put(ctx, "abc", "123")
|
_, err = cli.Put(ctx, "abc", "123")
|
||||||
cancel()
|
cancel()
|
||||||
if !strings.Contains(err.Error(), "context deadline") {
|
if err != nil && err != context.DeadlineExceeded {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -319,8 +319,8 @@ func TestLeaseGrantNewAfterClose(t *testing.T) {
|
||||||
|
|
||||||
donec := make(chan struct{})
|
donec := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
if _, err := cli.Grant(context.TODO(), 5); err != grpc.ErrClientConnClosing {
|
if _, err := cli.Grant(context.TODO(), 5); err != context.Canceled && err != grpc.ErrClientConnClosing {
|
||||||
t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err)
|
t.Fatalf("expected %v or %v, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
|
||||||
}
|
}
|
||||||
close(donec)
|
close(donec)
|
||||||
}()
|
}()
|
||||||
|
@ -351,8 +351,8 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) {
|
||||||
|
|
||||||
donec := make(chan struct{})
|
donec := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
if _, err := cli.Revoke(context.TODO(), leaseID); err != grpc.ErrClientConnClosing {
|
if _, err := cli.Revoke(context.TODO(), leaseID); err != context.Canceled && err != grpc.ErrClientConnClosing {
|
||||||
t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err)
|
t.Fatalf("expected %v or %v, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
|
||||||
}
|
}
|
||||||
close(donec)
|
close(donec)
|
||||||
}()
|
}()
|
||||||
|
|
|
@ -51,6 +51,13 @@ func isWriteStopError(err error) bool {
|
||||||
func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc {
|
func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc {
|
||||||
return func(rpcCtx context.Context, f rpcFunc) error {
|
return func(rpcCtx context.Context, f rpcFunc) error {
|
||||||
for {
|
for {
|
||||||
|
select {
|
||||||
|
case <-c.balancer.ConnectNotify():
|
||||||
|
case <-rpcCtx.Done():
|
||||||
|
return rpcCtx.Err()
|
||||||
|
case <-c.ctx.Done():
|
||||||
|
return c.ctx.Err()
|
||||||
|
}
|
||||||
err := f(rpcCtx)
|
err := f(rpcCtx)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Reference in New Issue