clientv3: only retry mutable KV RPCs if no endpoints found
Was retrying when it shouldn't, causing multiple putsrelease-3.3
parent
8385c6682a
commit
4669aaa9a2
|
@ -52,11 +52,9 @@ type Client struct {
|
||||||
conn *grpc.ClientConn
|
conn *grpc.ClientConn
|
||||||
dialerrc chan error
|
dialerrc chan error
|
||||||
|
|
||||||
cfg Config
|
cfg Config
|
||||||
creds *credentials.TransportCredentials
|
creds *credentials.TransportCredentials
|
||||||
balancer *simpleBalancer
|
balancer *simpleBalancer
|
||||||
retryWrapper retryRpcFunc
|
|
||||||
retryAuthWrapper retryRpcFunc
|
|
||||||
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
|
@ -387,8 +385,6 @@ func newClient(cfg *Config) (*Client, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
client.conn = conn
|
client.conn = conn
|
||||||
client.retryWrapper = client.newRetryWrapper()
|
|
||||||
client.retryAuthWrapper = client.newAuthRetryWrapper()
|
|
||||||
|
|
||||||
// wait for a connection
|
// wait for a connection
|
||||||
if cfg.DialTimeout > 0 {
|
if cfg.DialTimeout > 0 {
|
||||||
|
@ -510,7 +506,6 @@ func toErr(ctx context.Context, err error) error {
|
||||||
err = ctx.Err()
|
err = ctx.Err()
|
||||||
}
|
}
|
||||||
case codes.Unavailable:
|
case codes.Unavailable:
|
||||||
err = ErrNoAvailableEndpoints
|
|
||||||
case codes.FailedPrecondition:
|
case codes.FailedPrecondition:
|
||||||
err = grpc.ErrClientConnClosing
|
err = grpc.ErrClientConnClosing
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,26 +24,29 @@ import (
|
||||||
|
|
||||||
type rpcFunc func(ctx context.Context) error
|
type rpcFunc func(ctx context.Context) error
|
||||||
type retryRpcFunc func(context.Context, rpcFunc) error
|
type retryRpcFunc func(context.Context, rpcFunc) error
|
||||||
|
type retryStopErrFunc func(error) bool
|
||||||
|
|
||||||
func (c *Client) newRetryWrapper() retryRpcFunc {
|
func isReadStopError(err error) bool {
|
||||||
|
eErr := rpctypes.Error(err)
|
||||||
|
// always stop retry on etcd errors
|
||||||
|
if _, ok := eErr.(rpctypes.EtcdError); ok {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
// only retry if unavailable
|
||||||
|
return grpc.Code(err) != codes.Unavailable
|
||||||
|
}
|
||||||
|
|
||||||
|
func isWriteStopError(err error) bool {
|
||||||
|
return grpc.Code(err) != codes.Unavailable ||
|
||||||
|
grpc.ErrorDesc(err) != "there is no address available"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc {
|
||||||
return func(rpcCtx context.Context, f rpcFunc) error {
|
return func(rpcCtx context.Context, f rpcFunc) error {
|
||||||
for {
|
for {
|
||||||
err := f(rpcCtx)
|
if err := f(rpcCtx); err == nil || isStop(err) {
|
||||||
if err == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
eErr := rpctypes.Error(err)
|
|
||||||
// always stop retry on etcd errors
|
|
||||||
if _, ok := eErr.(rpctypes.EtcdError); ok {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// only retry if unavailable
|
|
||||||
if grpc.Code(err) != codes.Unavailable {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-c.balancer.ConnectNotify():
|
case <-c.balancer.ConnectNotify():
|
||||||
case <-rpcCtx.Done():
|
case <-rpcCtx.Done():
|
||||||
|
@ -79,17 +82,24 @@ func (c *Client) newAuthRetryWrapper() retryRpcFunc {
|
||||||
|
|
||||||
// RetryKVClient implements a KVClient that uses the client's FailFast retry policy.
|
// RetryKVClient implements a KVClient that uses the client's FailFast retry policy.
|
||||||
func RetryKVClient(c *Client) pb.KVClient {
|
func RetryKVClient(c *Client) pb.KVClient {
|
||||||
retryWrite := &retryWriteKVClient{pb.NewKVClient(c.conn), c.retryWrapper}
|
readRetry := c.newRetryWrapper(isReadStopError)
|
||||||
return &retryKVClient{&retryWriteKVClient{retryWrite, c.retryAuthWrapper}}
|
writeRetry := c.newRetryWrapper(isWriteStopError)
|
||||||
|
conn := pb.NewKVClient(c.conn)
|
||||||
|
retryBasic := &retryKVClient{&retryWriteKVClient{conn, writeRetry}, readRetry}
|
||||||
|
retryAuthWrapper := c.newAuthRetryWrapper()
|
||||||
|
return &retryKVClient{
|
||||||
|
&retryWriteKVClient{retryBasic, retryAuthWrapper},
|
||||||
|
retryAuthWrapper}
|
||||||
}
|
}
|
||||||
|
|
||||||
type retryKVClient struct {
|
type retryKVClient struct {
|
||||||
*retryWriteKVClient
|
*retryWriteKVClient
|
||||||
|
readRetry retryRpcFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) {
|
func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) {
|
||||||
err = rkv.retryf(ctx, func(rctx context.Context) error {
|
err = rkv.readRetry(ctx, func(rctx context.Context) error {
|
||||||
resp, err = rkv.retryWriteKVClient.Range(rctx, in, opts...)
|
resp, err = rkv.KVClient.Range(rctx, in, opts...)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
return resp, err
|
return resp, err
|
||||||
|
@ -139,8 +149,11 @@ type retryLeaseClient struct {
|
||||||
|
|
||||||
// RetryLeaseClient implements a LeaseClient that uses the client's FailFast retry policy.
|
// RetryLeaseClient implements a LeaseClient that uses the client's FailFast retry policy.
|
||||||
func RetryLeaseClient(c *Client) pb.LeaseClient {
|
func RetryLeaseClient(c *Client) pb.LeaseClient {
|
||||||
retry := &retryLeaseClient{pb.NewLeaseClient(c.conn), c.retryWrapper}
|
retry := &retryLeaseClient{
|
||||||
return &retryLeaseClient{retry, c.retryAuthWrapper}
|
pb.NewLeaseClient(c.conn),
|
||||||
|
c.newRetryWrapper(isReadStopError),
|
||||||
|
}
|
||||||
|
return &retryLeaseClient{retry, c.newAuthRetryWrapper()}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) {
|
func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) {
|
||||||
|
@ -167,7 +180,7 @@ type retryClusterClient struct {
|
||||||
|
|
||||||
// RetryClusterClient implements a ClusterClient that uses the client's FailFast retry policy.
|
// RetryClusterClient implements a ClusterClient that uses the client's FailFast retry policy.
|
||||||
func RetryClusterClient(c *Client) pb.ClusterClient {
|
func RetryClusterClient(c *Client) pb.ClusterClient {
|
||||||
return &retryClusterClient{pb.NewClusterClient(c.conn), c.retryWrapper}
|
return &retryClusterClient{pb.NewClusterClient(c.conn), c.newRetryWrapper(isWriteStopError)}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) {
|
func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) {
|
||||||
|
@ -201,7 +214,7 @@ type retryAuthClient struct {
|
||||||
|
|
||||||
// RetryAuthClient implements a AuthClient that uses the client's FailFast retry policy.
|
// RetryAuthClient implements a AuthClient that uses the client's FailFast retry policy.
|
||||||
func RetryAuthClient(c *Client) pb.AuthClient {
|
func RetryAuthClient(c *Client) pb.AuthClient {
|
||||||
return &retryAuthClient{pb.NewAuthClient(c.conn), c.retryWrapper}
|
return &retryAuthClient{pb.NewAuthClient(c.conn), c.newRetryWrapper(isWriteStopError)}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (resp *pb.AuthEnableResponse, err error) {
|
func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (resp *pb.AuthEnableResponse, err error) {
|
||||||
|
|
Loading…
Reference in New Issue