From a3b7876a3c15a7c3e49c8f5757fd505c06c19e1e Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 27 Jan 2016 21:13:23 -0800 Subject: [PATCH] clientv3: use retryConnection --- clientv3/client.go | 12 ++++++--- clientv3/kv.go | 48 +++++++++++++++++++++++----------- etcdserver/api/v3rpc/key.go | 2 +- etcdserver/api/v3rpc/member.go | 6 ++--- 4 files changed, 46 insertions(+), 22 deletions(-) diff --git a/clientv3/client.go b/clientv3/client.go index a9d63a7b3..f66eae5d8 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -18,6 +18,7 @@ import ( "sync" "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/codes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" ) @@ -120,7 +121,7 @@ func (c *Client) activeConnection() *grpc.ClientConn { } // refreshConnection establishes a new connection -func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) *grpc.ClientConn { +func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) (*grpc.ClientConn, error) { c.mu.Lock() defer c.mu.Unlock() if err != nil { @@ -128,14 +129,15 @@ func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) *grpc.Clie } if oldConn != c.conn { // conn has already been updated - return c.conn + return c.conn, nil } conn, dialErr := c.cfg.RetryDialer(c) if dialErr != nil { c.errors = append(c.errors, dialErr) + return nil, dialErr } c.conn = conn - return c.conn + return c.conn, nil } // dialEndpoints attempts to connect to each endpoint in order until a @@ -152,3 +154,7 @@ func dialEndpointList(c *Client) (*grpc.ClientConn, error) { } return nil, err } + +func isRPCError(err error) bool { + return grpc.Code(err) != codes.Unknown +} diff --git a/clientv3/kv.go b/clientv3/kv.go index 61e3e42a4..39f87888a 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -16,6 +16,7 @@ package clientv3 import ( "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" ) @@ -87,30 +88,47 @@ type Txn interface { } type kv struct { + conn *grpc.ClientConn // conn in-use remote pb.KVClient c *Client } func (kv *kv) Range(key, end string, limit, rev int64, sort *SortOption) (*pb.RangeResponse, error) { - r := kv.do(OpRange(key, end, limit, rev, sort)) + r, err := kv.do(OpRange(key, end, limit, rev, sort)) + if err != nil { + return nil, err + } return r.GetResponseRange(), nil } -func (kv *kv) do(op Op) *pb.ResponseUnion { - switch op.t { - // TODO: handle other ops - case tRange: - // TODO: setup sorting - r := &pb.RangeRequest{Key: op.key, RangeEnd: op.end, Limit: op.limit, Revision: op.rev} - resp, err := kv.remote.Range(context.TODO(), r) - if err != nil { - // do something +func (kv *kv) do(op Op) (*pb.ResponseUnion, error) { + for { + var err error + switch op.t { + // TODO: handle other ops + case tRange: + var resp *pb.RangeResponse + // TODO: setup sorting + r := &pb.RangeRequest{Key: op.key, RangeEnd: op.end, Limit: op.limit, Revision: op.rev} + resp, err = kv.remote.Range(context.TODO(), r) + if err == nil { + return &pb.ResponseUnion{Response: &pb.ResponseUnion_ResponseRange{resp}}, nil + } + default: + panic("Unknown op") } - return &pb.ResponseUnion{Response: &pb.ResponseUnion_ResponseRange{resp}} - default: - panic("Unknown op") - } - return nil + if isRPCError(err) { + return nil, err + } + + newConn, cerr := kv.c.retryConnection(kv.conn, err) + if cerr != nil { + // TODO: return client lib defined connection error + return nil, cerr + } + kv.conn = newConn + kv.remote = pb.NewKVClient(kv.conn) + } } diff --git a/etcdserver/api/v3rpc/key.go b/etcdserver/api/v3rpc/key.go index 36e1c96de..afe0a446e 100644 --- a/etcdserver/api/v3rpc/key.go +++ b/etcdserver/api/v3rpc/key.go @@ -215,6 +215,6 @@ func togRPCError(err error) error { return ErrFutureRev // TODO: handle error from raft and timeout default: - return grpc.Errorf(codes.Unknown, err.Error()) + return grpc.Errorf(codes.Internal, err.Error()) } } diff --git a/etcdserver/api/v3rpc/member.go b/etcdserver/api/v3rpc/member.go index b25b6d4ee..4ea64edd5 100644 --- a/etcdserver/api/v3rpc/member.go +++ b/etcdserver/api/v3rpc/member.go @@ -54,7 +54,7 @@ func (cs *ClusterServer) MemberAdd(ctx context.Context, r *pb.MemberAddRequest) case err == etcdserver.ErrPeerURLexists: return nil, ErrPeerURLExist case err != nil: - return nil, grpc.Errorf(codes.Unknown, err.Error()) + return nil, grpc.Errorf(codes.Internal, err.Error()) } return &pb.MemberAddResponse{ @@ -71,7 +71,7 @@ func (cs *ClusterServer) MemberRemove(ctx context.Context, r *pb.MemberRemoveReq case err == etcdserver.ErrIDNotFound: return nil, ErrMemberNotFound case err != nil: - return nil, grpc.Errorf(codes.Unknown, err.Error()) + return nil, grpc.Errorf(codes.Internal, err.Error()) } return &pb.MemberRemoveResponse{Header: cs.header()}, nil @@ -89,7 +89,7 @@ func (cs *ClusterServer) MemberUpdate(ctx context.Context, r *pb.MemberUpdateReq case err == etcdserver.ErrIDNotFound: return nil, ErrMemberNotFound case err != nil: - return nil, grpc.Errorf(codes.Unknown, err.Error()) + return nil, grpc.Errorf(codes.Internal, err.Error()) } return &pb.MemberUpdateResponse{Header: cs.header()}, nil