diff --git a/clientv3/client.go b/clientv3/client.go index 5ef96f259..5a1049ede 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -50,6 +50,14 @@ type Client struct { ctx context.Context cancel context.CancelFunc + + // fields below are managed by connMonitor + + // reconnc accepts writes which signal the client should reconnect + reconnc chan error + // newconnc is closed on successful connect and set to a fresh channel + newconnc chan struct{} + lastConnErr error } // New creates a new etcdv3 client from a given configuration. @@ -87,10 +95,13 @@ func (c *Client) Close() error { } c.cancel() c.cancel = nil + err := c.conn.Close() + connc := c.newconnc c.mu.Unlock() c.Watcher.Close() c.Lease.Close() - return c.conn.Close() + <-connc + return err } // Ctx is a context for "out of band" messages (e.g., for sending @@ -161,12 +172,17 @@ func newClient(cfg *Config) (*Client, error) { return nil, err } client := &Client{ - conn: conn, - cfg: *cfg, - creds: creds, - ctx: ctx, - cancel: cancel, + conn: conn, + cfg: *cfg, + creds: creds, + ctx: ctx, + cancel: cancel, + reconnc: make(chan error), + newconnc: make(chan struct{}), } + + go client.connMonitor() + client.Cluster = NewCluster(client) client.KV = NewKV(client) client.Lease = NewLease(client) @@ -191,7 +207,7 @@ func (c *Client) ActiveConnection() *grpc.ClientConn { } // retryConnection establishes a new connection -func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) (*grpc.ClientConn, error) { +func (c *Client) retryConnection(err error) (newConn *grpc.ClientConn, dialErr error) { c.mu.Lock() defer c.mu.Unlock() if err != nil { @@ -200,24 +216,66 @@ func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) (*grpc.Cli if c.cancel == nil { return nil, c.ctx.Err() } - if oldConn != c.conn { - // conn has already been updated - return c.conn, nil + if c.conn != nil { + c.conn.Close() + if st, _ := c.conn.State(); st != grpc.Shutdown { + // wait so grpc doesn't leak sleeping goroutines + c.conn.WaitForStateChange(c.ctx, st) + } } - oldConn.Close() - if st, _ := oldConn.State(); st != grpc.Shutdown { - // wait for shutdown so grpc doesn't leak sleeping goroutines - oldConn.WaitForStateChange(c.ctx, st) - } - - conn, dialErr := c.cfg.RetryDialer(c) + c.conn, dialErr = c.cfg.RetryDialer(c) if dialErr != nil { c.errors = append(c.errors, dialErr) - return nil, dialErr } - c.conn = conn - return c.conn, nil + return c.conn, dialErr +} + +// connStartRetry schedules a reconnect if one is not already running +func (c *Client) connStartRetry(err error) { + select { + case c.reconnc <- err: + default: + } +} + +// connWait waits for a reconnect to be processed +func (c *Client) connWait(ctx context.Context, err error) (*grpc.ClientConn, error) { + c.mu.Lock() + ch := c.newconnc + c.mu.Unlock() + c.connStartRetry(err) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-ch: + } + c.mu.Lock() + defer c.mu.Unlock() + return c.conn, c.lastConnErr +} + +// connMonitor monitors the connection and handles retries +func (c *Client) connMonitor() { + var err error + for { + select { + case err = <-c.reconnc: + case <-c.ctx.Done(): + c.mu.Lock() + c.lastConnErr = c.ctx.Err() + close(c.newconnc) + c.mu.Unlock() + return + } + conn, connErr := c.retryConnection(err) + c.mu.Lock() + c.lastConnErr = connErr + c.conn = conn + close(c.newconnc) + c.newconnc = make(chan struct{}) + c.mu.Unlock() + } } // dialEndpointList attempts to connect to each endpoint in order until a diff --git a/clientv3/cluster.go b/clientv3/cluster.go index 6ffee1a98..6dcd85403 100644 --- a/clientv3/cluster.go +++ b/clientv3/cluster.go @@ -15,8 +15,6 @@ package clientv3 import ( - "sync" - "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" @@ -46,22 +44,15 @@ type Cluster interface { } type cluster struct { - c *Client - - mu sync.Mutex - conn *grpc.ClientConn // conn in-use + rc *remoteClient remote pb.ClusterClient } func NewCluster(c *Client) Cluster { - conn := c.ActiveConnection() - - return &cluster{ - c: c, - - conn: conn, - remote: pb.NewClusterClient(conn), - } + ret := &cluster{} + f := func(conn *grpc.ClientConn) { ret.remote = pb.NewClusterClient(conn) } + ret.rc = newRemoteClient(c, f) + return ret } func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) { @@ -75,7 +66,7 @@ func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAdd return nil, rpctypes.Error(err) } - go c.switchRemote(err) + c.rc.reconnect(err) return nil, rpctypes.Error(err) } @@ -90,7 +81,7 @@ func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveRes return nil, rpctypes.Error(err) } - go c.switchRemote(err) + c.rc.reconnect(err) return nil, rpctypes.Error(err) } @@ -107,8 +98,7 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin return nil, rpctypes.Error(err) } - err = c.switchRemote(err) - if err != nil { + if err = c.rc.reconnectWait(ctx, err); err != nil { return nil, rpctypes.Error(err) } } @@ -126,30 +116,14 @@ func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) { return nil, rpctypes.Error(err) } - err = c.switchRemote(err) - if err != nil { + if err = c.rc.reconnectWait(ctx, err); err != nil { return nil, rpctypes.Error(err) } } } func (c *cluster) getRemote() pb.ClusterClient { - c.mu.Lock() - defer c.mu.Unlock() - + c.rc.mu.Lock() + defer c.rc.mu.Unlock() return c.remote } - -func (c *cluster) switchRemote(prevErr error) error { - newConn, err := c.c.retryConnection(c.conn, prevErr) - if err != nil { - return err - } - - c.mu.Lock() - defer c.mu.Unlock() - - c.conn = newConn - c.remote = pb.NewClusterClient(c.conn) - return nil -} diff --git a/clientv3/kv.go b/clientv3/kv.go index 4c7449088..735be2e29 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -15,8 +15,6 @@ package clientv3 import ( - "sync" - "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" @@ -76,23 +74,15 @@ type OpResponse struct { } type kv struct { - c *Client - - mu sync.Mutex // guards all fields - conn *grpc.ClientConn // conn in-use + rc *remoteClient remote pb.KVClient } func NewKV(c *Client) KV { - conn := c.ActiveConnection() - remote := pb.NewKVClient(conn) - - return &kv{ - conn: c.ActiveConnection(), - remote: remote, - - c: c, - } + ret := &kv{} + f := func(conn *grpc.ClientConn) { ret.remote = pb.NewKVClient(conn) } + ret.rc = newRemoteClient(c, f) + return ret } func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) { @@ -111,17 +101,14 @@ func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*Delete } func (kv *kv) Compact(ctx context.Context, rev int64) error { - remote := kv.getRemote() - _, err := remote.Compact(ctx, &pb.CompactionRequest{Revision: rev}) + _, err := kv.getRemote().Compact(ctx, &pb.CompactionRequest{Revision: rev}) if err == nil { return nil } - if isHaltErr(ctx, err) { return rpctypes.Error(err) } - - go kv.switchRemote(remote, err) + kv.rc.reconnect(err) return rpctypes.Error(err) } @@ -174,36 +161,18 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) { // do not retry on modifications if op.isWrite() { - go kv.switchRemote(remote, err) + kv.rc.reconnect(err) return OpResponse{}, rpctypes.Error(err) } - if nerr := kv.switchRemote(remote, err); nerr != nil { + if nerr := kv.rc.reconnectWait(ctx, err); nerr != nil { return OpResponse{}, nerr } } } -func (kv *kv) switchRemote(remote pb.KVClient, prevErr error) error { - kv.mu.Lock() - oldRemote := kv.remote - conn := kv.conn - kv.mu.Unlock() - if remote != oldRemote { - return nil - } - newConn, err := kv.c.retryConnection(conn, prevErr) - kv.mu.Lock() - defer kv.mu.Unlock() - if err == nil { - kv.conn = newConn - kv.remote = pb.NewKVClient(kv.conn) - } - return rpctypes.Error(err) -} - func (kv *kv) getRemote() pb.KVClient { - kv.mu.Lock() - defer kv.mu.Unlock() + kv.rc.mu.Lock() + defer kv.rc.mu.Unlock() return kv.remote } diff --git a/clientv3/lease.go b/clientv3/lease.go index c5776ea92..6b490ed81 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -71,14 +71,12 @@ type Lease interface { } type lessor struct { - c *Client - - mu sync.Mutex // guards all fields - conn *grpc.ClientConn // conn in-use + mu sync.Mutex // guards all fields // donec is closed when recvKeepAliveLoop stops donec chan struct{} + rc *remoteClient remote pb.LeaseClient stream pb.Lease_LeaseKeepAliveClient @@ -102,14 +100,12 @@ type keepAlive struct { func NewLease(c *Client) Lease { l := &lessor{ - c: c, - conn: c.ActiveConnection(), - donec: make(chan struct{}), keepAlives: make(map[LeaseID]*keepAlive), } + f := func(conn *grpc.ClientConn) { l.remote = pb.NewLeaseClient(conn) } + l.rc = newRemoteClient(c, f) - l.remote = pb.NewLeaseClient(l.conn) l.stopCtx, l.stopCancel = context.WithCancel(context.Background()) go l.recvKeepAliveLoop() @@ -386,8 +382,8 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) { } func (l *lessor) getRemote() pb.LeaseClient { - l.mu.Lock() - defer l.mu.Unlock() + l.rc.mu.Lock() + defer l.rc.mu.Unlock() return l.remote } @@ -399,36 +395,15 @@ func (l *lessor) getKeepAliveStream() pb.Lease_LeaseKeepAliveClient { func (l *lessor) switchRemoteAndStream(prevErr error) error { for { - l.mu.Lock() - conn := l.conn - l.mu.Unlock() - - var ( - err error - newConn *grpc.ClientConn - ) - if prevErr != nil { - conn.Close() - newConn, err = l.c.retryConnection(conn, prevErr) + err := l.rc.reconnectWait(l.stopCtx, prevErr) if err != nil { return rpctypes.Error(err) } } - - l.mu.Lock() - if newConn != nil { - l.conn = newConn + if prevErr = l.newStream(); prevErr == nil { + return nil } - - l.remote = pb.NewLeaseClient(l.conn) - l.mu.Unlock() - - prevErr = l.newStream() - if prevErr != nil { - continue - } - return nil } } diff --git a/clientv3/maintenance.go b/clientv3/maintenance.go index 6c3e372dd..a8145d509 100644 --- a/clientv3/maintenance.go +++ b/clientv3/maintenance.go @@ -16,7 +16,6 @@ package clientv3 import ( "io" - "sync" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" @@ -57,18 +56,15 @@ type Maintenance interface { type maintenance struct { c *Client - mu sync.Mutex - conn *grpc.ClientConn // conn in-use + rc *remoteClient remote pb.MaintenanceClient } func NewMaintenance(c *Client) Maintenance { - conn := c.ActiveConnection() - return &maintenance{ - c: c, - conn: conn, - remote: pb.NewMaintenanceClient(conn), - } + ret := &maintenance{c: c} + f := func(conn *grpc.ClientConn) { ret.remote = pb.NewMaintenanceClient(conn) } + ret.rc = newRemoteClient(c, f) + return ret } func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) { @@ -85,7 +81,7 @@ func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) { if isHaltErr(ctx, err) { return nil, rpctypes.Error(err) } - if err = m.switchRemote(err); err != nil { + if err = m.rc.reconnectWait(ctx, err); err != nil { return nil, err } } @@ -118,8 +114,8 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR if err == nil { return (*AlarmResponse)(resp), nil } - if isHaltErr(ctx, err) { - go m.switchRemote(err) + if !isHaltErr(ctx, err) { + m.rc.reconnect(err) } return nil, rpctypes.Error(err) } @@ -178,19 +174,7 @@ func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) { } func (m *maintenance) getRemote() pb.MaintenanceClient { - m.mu.Lock() - defer m.mu.Unlock() + m.rc.mu.Lock() + defer m.rc.mu.Unlock() return m.remote } - -func (m *maintenance) switchRemote(prevErr error) error { - m.mu.Lock() - defer m.mu.Unlock() - newConn, err := m.c.retryConnection(m.conn, prevErr) - if err != nil { - return rpctypes.Error(err) - } - m.conn = newConn - m.remote = pb.NewMaintenanceClient(m.conn) - return nil -} diff --git a/clientv3/remote_client.go b/clientv3/remote_client.go new file mode 100644 index 000000000..98abff294 --- /dev/null +++ b/clientv3/remote_client.go @@ -0,0 +1,79 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + "sync" + + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +type remoteClient struct { + client *Client + conn *grpc.ClientConn + updateConn func(*grpc.ClientConn) + mu sync.Mutex +} + +func newRemoteClient(client *Client, update func(*grpc.ClientConn)) *remoteClient { + ret := &remoteClient{ + client: client, + conn: client.ActiveConnection(), + updateConn: update, + } + ret.mu.Lock() + defer ret.mu.Unlock() + ret.updateConn(ret.conn) + return ret +} + +// reconnectWait reconnects the client, returning when connection establishes/fails. +func (r *remoteClient) reconnectWait(ctx context.Context, prevErr error) error { + r.mu.Lock() + updated := r.tryUpdate() + r.mu.Unlock() + if updated { + return nil + } + conn, err := r.client.connWait(ctx, prevErr) + if err == nil { + r.mu.Lock() + r.conn = conn + r.updateConn(conn) + r.mu.Unlock() + } + return err +} + +// reconnect will reconnect the client without waiting +func (r *remoteClient) reconnect(err error) { + r.mu.Lock() + defer r.mu.Unlock() + if r.tryUpdate() { + return + } + r.client.connStartRetry(err) +} + +func (r *remoteClient) tryUpdate() bool { + activeConn := r.client.ActiveConnection() + if activeConn == nil || activeConn == r.conn { + return false + } + r.conn = activeConn + r.updateConn(activeConn) + return true +} diff --git a/clientv3/txn.go b/clientv3/txn.go index 875a325a0..4bd262995 100644 --- a/clientv3/txn.go +++ b/clientv3/txn.go @@ -141,9 +141,8 @@ func (txn *txn) Commit() (*TxnResponse, error) { kv := txn.kv for { - remote := kv.getRemote() r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas} - resp, err := remote.Txn(txn.ctx, r) + resp, err := kv.getRemote().Txn(txn.ctx, r) if err == nil { return (*TxnResponse)(resp), nil } @@ -153,11 +152,11 @@ func (txn *txn) Commit() (*TxnResponse, error) { } if txn.isWrite { - go kv.switchRemote(remote, err) + kv.rc.reconnect(err) return nil, rpctypes.Error(err) } - if nerr := kv.switchRemote(remote, err); nerr != nil { + if nerr := kv.rc.reconnectWait(txn.ctx, err); nerr != nil { return nil, nerr } } diff --git a/clientv3/watch.go b/clientv3/watch.go index e2cf99146..8f3aad677 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -87,8 +87,7 @@ func (wr *WatchResponse) IsProgressNotify() bool { // watcher implements the Watcher interface type watcher struct { - c *Client - conn *grpc.ClientConn + rc *remoteClient remote pb.WatchClient // ctx controls internal remote.Watch requests @@ -142,13 +141,7 @@ type watcherStream struct { func NewWatcher(c *Client) Watcher { ctx, cancel := context.WithCancel(context.Background()) - conn := c.ActiveConnection() - w := &watcher{ - c: c, - conn: conn, - remote: pb.NewWatchClient(conn), - ctx: ctx, cancel: cancel, streams: make(map[int64]*watcherStream), @@ -159,6 +152,10 @@ func NewWatcher(c *Client) Watcher { donec: make(chan struct{}), errc: make(chan error, 1), } + + f := func(conn *grpc.ClientConn) { w.remote = pb.NewWatchClient(conn) } + w.rc = newRemoteClient(c, f) + go w.run() return w } @@ -508,12 +505,9 @@ func (w *watcher) openWatchClient() (ws pb.Watch_WatchClient, err error) { } else if isHaltErr(w.ctx, err) { return nil, v3rpc.Error(err) } - newConn, nerr := w.c.retryConnection(w.conn, nil) - if nerr != nil { + if nerr := w.remoteConn.reconnectWait(w.ctx, nil); nerr != nil { return nil, nerr } - w.conn = newConn - w.remote = pb.NewWatchClient(w.conn) } return ws, nil }