clientv3: remove redundant retries in Lease, set FailFast=true

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
release-3.3
Gyu-Ho Lee 2017-10-18 13:11:16 -07:00
parent a2c61cf04f
commit 29aa4ce2a1
2 changed files with 60 additions and 55 deletions

View File

@ -22,7 +22,6 @@ import (
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
) )
@ -183,7 +182,6 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Durati
} }
func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) { func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
for {
r := &pb.LeaseGrantRequest{TTL: ttl} r := &pb.LeaseGrantRequest{TTL: ttl}
resp, err := l.remote.LeaseGrant(ctx, r) resp, err := l.remote.LeaseGrant(ctx, r)
if err == nil { if err == nil {
@ -195,30 +193,21 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err
} }
return gresp, nil return gresp, nil
} }
if isHaltErr(ctx, err) {
return nil, toErr(ctx, err) return nil, toErr(ctx, err)
} }
}
}
func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) { func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
for {
r := &pb.LeaseRevokeRequest{ID: int64(id)} r := &pb.LeaseRevokeRequest{ID: int64(id)}
resp, err := l.remote.LeaseRevoke(ctx, r) resp, err := l.remote.LeaseRevoke(ctx, r)
if err == nil { if err == nil {
return (*LeaseRevokeResponse)(resp), nil return (*LeaseRevokeResponse)(resp), nil
} }
if isHaltErr(ctx, err) {
return nil, toErr(ctx, err) return nil, toErr(ctx, err)
} }
}
}
func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) { func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
for {
r := toLeaseTimeToLiveRequest(id, opts...) r := toLeaseTimeToLiveRequest(id, opts...)
resp, err := l.remote.LeaseTimeToLive(ctx, r, grpc.FailFast(false)) resp, err := l.remote.LeaseTimeToLive(ctx, r)
if err == nil { if err == nil {
gresp := &LeaseTimeToLiveResponse{ gresp := &LeaseTimeToLiveResponse{
ResponseHeader: resp.GetHeader(), ResponseHeader: resp.GetHeader(),
@ -229,15 +218,11 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption
} }
return gresp, nil return gresp, nil
} }
if isHaltErr(ctx, err) {
return nil, toErr(ctx, err) return nil, toErr(ctx, err)
} }
}
}
func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) { func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) {
for { resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{})
resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}, grpc.FailFast(false))
if err == nil { if err == nil {
leases := make([]LeaseStatus, len(resp.Leases)) leases := make([]LeaseStatus, len(resp.Leases))
for i := range resp.Leases { for i := range resp.Leases {
@ -245,11 +230,8 @@ func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) {
} }
return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil
} }
if isHaltErr(ctx, err) {
return nil, toErr(ctx, err) return nil, toErr(ctx, err)
} }
}
}
func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) { func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize) ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)
@ -389,7 +371,7 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
cctx, cancel := context.WithCancel(ctx) cctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
stream, err := l.remote.LeaseKeepAlive(cctx, grpc.FailFast(false)) stream, err := l.remote.LeaseKeepAlive(cctx)
if err != nil { if err != nil {
return nil, toErr(ctx, err) return nil, toErr(ctx, err)
} }
@ -433,7 +415,6 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) {
} else { } else {
for { for {
resp, err := stream.Recv() resp, err := stream.Recv()
if err != nil { if err != nil {
if canceledByCaller(l.stopCtx, err) { if canceledByCaller(l.stopCtx, err) {
return err return err
@ -461,7 +442,7 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) {
// resetRecv opens a new lease stream and starts sending keep alive requests. // resetRecv opens a new lease stream and starts sending keep alive requests.
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) { func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
sctx, cancel := context.WithCancel(l.stopCtx) sctx, cancel := context.WithCancel(l.stopCtx)
stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false)) stream, err := l.remote.LeaseKeepAlive(sctx)
if err != nil { if err != nil {
cancel() cancel()
return nil, err return nil, err

View File

@ -168,7 +168,7 @@ type retryLeaseClient struct {
readRetry retryRPCFunc readRetry retryRPCFunc
} }
// RetryLeaseClient implements a LeaseClient that uses the client's FailFast retry policy. // RetryLeaseClient implements a LeaseClient.
func RetryLeaseClient(c *Client) pb.LeaseClient { func RetryLeaseClient(c *Client) pb.LeaseClient {
retry := &retryLeaseClient{ retry := &retryLeaseClient{
pb.NewLeaseClient(c.conn), pb.NewLeaseClient(c.conn),
@ -177,6 +177,22 @@ func RetryLeaseClient(c *Client) pb.LeaseClient {
return &retryLeaseClient{retry, c.newAuthRetryWrapper()} return &retryLeaseClient{retry, c.newAuthRetryWrapper()}
} }
func (rlc *retryLeaseClient) LeaseTimeToLive(ctx context.Context, in *pb.LeaseTimeToLiveRequest, opts ...grpc.CallOption) (resp *pb.LeaseTimeToLiveResponse, err error) {
err = rlc.readRetry(ctx, func(rctx context.Context) error {
resp, err = rlc.LeaseClient.LeaseTimeToLive(rctx, in, opts...)
return err
})
return resp, err
}
func (rlc *retryLeaseClient) LeaseLeases(ctx context.Context, in *pb.LeaseLeasesRequest, opts ...grpc.CallOption) (resp *pb.LeaseLeasesResponse, err error) {
err = rlc.readRetry(ctx, func(rctx context.Context) error {
resp, err = rlc.LeaseClient.LeaseLeases(rctx, in, opts...)
return err
})
return resp, err
}
func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) { func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) {
err = rlc.readRetry(ctx, func(rctx context.Context) error { err = rlc.readRetry(ctx, func(rctx context.Context) error {
resp, err = rlc.LeaseClient.LeaseGrant(rctx, in, opts...) resp, err = rlc.LeaseClient.LeaseGrant(rctx, in, opts...)
@ -194,6 +210,14 @@ func (rlc *retryLeaseClient) LeaseRevoke(ctx context.Context, in *pb.LeaseRevoke
return resp, err return resp, err
} }
func (rlc *retryLeaseClient) LeaseKeepAlive(ctx context.Context, opts ...grpc.CallOption) (stream pb.Lease_LeaseKeepAliveClient, err error) {
err = rlc.readRetry(ctx, func(rctx context.Context) error {
stream, err = rlc.LeaseClient.LeaseKeepAlive(rctx, opts...)
return err
})
return stream, err
}
type retryClusterClient struct { type retryClusterClient struct {
pb.ClusterClient pb.ClusterClient
writeRetry retryRPCFunc writeRetry retryRPCFunc