diff --git a/clientv3/client.go b/clientv3/client.go index a62d6cb17..4ec061db3 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -45,6 +45,9 @@ type Client struct { creds *credentials.TransportAuthenticator mu sync.RWMutex // protects connection selection and error list errors []error // errors passed to retryConnection + + ctx context.Context + cancel context.CancelFunc } // EndpointDialer is a policy for choosing which endpoint to dial next @@ -83,11 +86,23 @@ func NewFromURL(url string) (*Client, error) { // Close shuts down the client's etcd connections. func (c *Client) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + if c.cancel == nil { + return nil + } + c.cancel() + c.cancel = nil c.Watcher.Close() c.Lease.Close() return c.conn.Close() } +// Ctx is a context for "out of band" messages (e.g., for sending +// "clean up" message when another context is canceled). It is +// canceled on client Close(). +func (c *Client) Ctx() context.Context { return c.ctx } + // Endpoints lists the registered endpoints for the client. func (c *Client) Endpoints() []string { return c.cfg.Endpoints } @@ -145,10 +160,13 @@ func newClient(cfg *Config) (*Client, error) { if err != nil { return nil, err } + ctx, cancel := context.WithCancel(context.TODO()) client := &Client{ - conn: conn, - cfg: *cfg, - creds: creds, + conn: conn, + cfg: *cfg, + creds: creds, + ctx: ctx, + cancel: cancel, } client.Cluster = NewCluster(client) client.KV = NewKV(client) @@ -173,6 +191,9 @@ func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) (*grpc.Cli if err != nil { c.errors = append(c.errors, err) } + if c.cancel == nil { + return nil, c.ctx.Err() + } if oldConn != c.conn { // conn has already been updated return c.conn, nil diff --git a/clientv3/concurrency/election.go b/clientv3/concurrency/election.go index 03c541837..bb75cb312 100644 --- a/clientv3/concurrency/election.go +++ b/clientv3/concurrency/election.go @@ -29,7 +29,6 @@ var ( type Election struct { client *v3.Client - ctx context.Context keyPrefix string @@ -39,8 +38,8 @@ type Election struct { } // NewElection returns a new election on a given key prefix. -func NewElection(ctx context.Context, client *v3.Client, pfx string) *Election { - return &Election{client: client, ctx: ctx, keyPrefix: pfx} +func NewElection(client *v3.Client, pfx string) *Election { + return &Election{client: client, keyPrefix: pfx} } // Campaign puts a value as eligible for the election. It blocks until @@ -60,7 +59,7 @@ func (e *Election) Campaign(ctx context.Context, val string) error { // clean up in case of context cancel select { case <-ctx.Done(): - e.client.Delete(e.ctx, k) + e.client.Delete(e.client.Ctx(), k) default: } return err @@ -94,7 +93,7 @@ func (e *Election) Resign() (err error) { if e.leaderSession == nil { return nil } - _, err = e.client.Delete(e.ctx, e.leaderKey) + _, err = e.client.Delete(e.client.Ctx(), e.leaderKey) e.leaderKey = "" e.leaderSession = nil return err @@ -102,7 +101,7 @@ func (e *Election) Resign() (err error) { // Leader returns the leader value for the current election. func (e *Election) Leader() (string, error) { - resp, err := e.client.Get(e.ctx, e.keyPrefix, v3.WithFirstCreate()...) + resp, err := e.client.Get(e.client.Ctx(), e.keyPrefix, v3.WithFirstCreate()...) if err != nil { return "", err } else if len(resp.Kvs) == 0 { diff --git a/clientv3/concurrency/mutex.go b/clientv3/concurrency/mutex.go index bf748cd7a..701aa6e25 100644 --- a/clientv3/concurrency/mutex.go +++ b/clientv3/concurrency/mutex.go @@ -24,15 +24,14 @@ import ( // Mutex implements the sync Locker interface with etcd type Mutex struct { client *v3.Client - ctx context.Context pfx string myKey string myRev int64 } -func NewMutex(ctx context.Context, client *v3.Client, pfx string) *Mutex { - return &Mutex{client, ctx, pfx, "", -1} +func NewMutex(client *v3.Client, pfx string) *Mutex { + return &Mutex{client, pfx, "", -1} } // Lock locks the mutex with a cancellable context. If the context is cancelled @@ -56,7 +55,7 @@ func (m *Mutex) Lock(ctx context.Context) error { } func (m *Mutex) Unlock() error { - if _, err := m.client.Delete(m.ctx, m.myKey); err != nil { + if _, err := m.client.Delete(m.client.Ctx(), m.myKey); err != nil { return err } m.myKey = "\x00" @@ -73,7 +72,7 @@ func (m *Mutex) Key() string { return m.myKey } type lockerMutex struct{ *Mutex } func (lm *lockerMutex) Lock() { - if err := lm.Mutex.Lock(lm.ctx); err != nil { + if err := lm.Mutex.Lock(lm.client.Ctx()); err != nil { panic(err) } } @@ -84,6 +83,6 @@ func (lm *lockerMutex) Unlock() { } // NewLocker creates a sync.Locker backed by an etcd mutex. -func NewLocker(ctx context.Context, client *v3.Client, pfx string) sync.Locker { - return &lockerMutex{NewMutex(ctx, client, pfx)} +func NewLocker(client *v3.Client, pfx string) sync.Locker { + return &lockerMutex{NewMutex(client, pfx)} } diff --git a/clientv3/concurrency/session.go b/clientv3/concurrency/session.go index b1c2d2159..205c14e28 100644 --- a/clientv3/concurrency/session.go +++ b/clientv3/concurrency/session.go @@ -49,13 +49,13 @@ func NewSession(client *v3.Client) (*Session, error) { return s, nil } - resp, err := client.Create(context.TODO(), sessionTTL) + resp, err := client.Create(client.Ctx(), sessionTTL) if err != nil { return nil, err } id := lease.LeaseID(resp.ID) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(client.Ctx()) keepAlive, err := client.KeepAlive(ctx, id) if err != nil || keepAlive == nil { return nil, err @@ -99,6 +99,6 @@ func (s *Session) Orphan() { // Close orphans the session and revokes the session lease. func (s *Session) Close() error { s.Orphan() - _, err := s.client.Revoke(context.TODO(), s.id) + _, err := s.client.Revoke(s.client.Ctx(), s.id) return err } diff --git a/etcdctlv3/command/elect_command.go b/etcdctlv3/command/elect_command.go index f7d0242ea..3596493b5 100644 --- a/etcdctlv3/command/elect_command.go +++ b/etcdctlv3/command/elect_command.go @@ -64,7 +64,7 @@ func electCommandFunc(cmd *cobra.Command, args []string) { } func observe(c *clientv3.Client, election string) error { - e := concurrency.NewElection(context.TODO(), c, election) + e := concurrency.NewElection(c, election) ctx, cancel := context.WithCancel(context.TODO()) donec := make(chan struct{}) @@ -94,7 +94,7 @@ func observe(c *clientv3.Client, election string) error { } func campaign(c *clientv3.Client, election string, prop string) error { - e := concurrency.NewElection(context.TODO(), c, election) + e := concurrency.NewElection(c, election) ctx, cancel := context.WithCancel(context.TODO()) donec := make(chan struct{}) diff --git a/etcdctlv3/command/lock_command.go b/etcdctlv3/command/lock_command.go index a07f2a773..ed07accd2 100644 --- a/etcdctlv3/command/lock_command.go +++ b/etcdctlv3/command/lock_command.go @@ -46,7 +46,7 @@ func lockCommandFunc(cmd *cobra.Command, args []string) { } func lockUntilSignal(c *clientv3.Client, lockname string) error { - m := concurrency.NewMutex(context.TODO(), c, lockname) + m := concurrency.NewMutex(c, lockname) ctx, cancel := context.WithCancel(context.TODO()) // unlock in case of ordinary shutdown diff --git a/integration/v3_election_test.go b/integration/v3_election_test.go index 4f7ad7a7b..8a43e342a 100644 --- a/integration/v3_election_test.go +++ b/integration/v3_election_test.go @@ -40,7 +40,7 @@ func TestElectionWait(t *testing.T) { nextc = append(nextc, make(chan struct{})) go func(ch chan struct{}) { for j := 0; j < leaders; j++ { - b := concurrency.NewElection(context.TODO(), clus.RandClient(), "test-election") + b := concurrency.NewElection(clus.RandClient(), "test-election") cctx, cancel := context.WithCancel(context.TODO()) defer cancel() s, ok := <-b.Observe(cctx) @@ -58,7 +58,7 @@ func TestElectionWait(t *testing.T) { // elect some leaders for i := 0; i < leaders; i++ { go func() { - e := concurrency.NewElection(context.TODO(), clus.RandClient(), "test-election") + e := concurrency.NewElection(clus.RandClient(), "test-election") ev := fmt.Sprintf("electval-%v", time.Now().UnixNano()) if err := e.Campaign(context.TODO(), ev); err != nil { t.Fatalf("failed volunteer (%v)", err) @@ -97,7 +97,7 @@ func TestElectionFailover(t *testing.T) { defer cancel() // first leader (elected) - e := concurrency.NewElection(context.TODO(), clus.clients[0], "test-election") + e := concurrency.NewElection(clus.clients[0], "test-election") if err := e.Campaign(context.TODO(), "foo"); err != nil { t.Fatalf("failed volunteer (%v)", err) } @@ -115,7 +115,7 @@ func TestElectionFailover(t *testing.T) { // next leader electedc := make(chan struct{}) go func() { - ee := concurrency.NewElection(context.TODO(), clus.clients[1], "test-election") + ee := concurrency.NewElection(clus.clients[1], "test-election") if eer := ee.Campaign(context.TODO(), "bar"); eer != nil { t.Fatal(eer) } @@ -132,7 +132,7 @@ func TestElectionFailover(t *testing.T) { } // check new leader - e = concurrency.NewElection(context.TODO(), clus.clients[2], "test-election") + e = concurrency.NewElection(clus.clients[2], "test-election") resp, ok = <-e.Observe(cctx) if !ok { t.Fatalf("could not wait for second election; channel closed") diff --git a/integration/v3_lock_test.go b/integration/v3_lock_test.go index 97c0ae4f9..c96e97c4d 100644 --- a/integration/v3_lock_test.go +++ b/integration/v3_lock_test.go @@ -41,7 +41,7 @@ func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client) lockedC := make(chan *concurrency.Mutex, 1) for i := 0; i < waiters; i++ { go func() { - m := concurrency.NewMutex(context.TODO(), chooseClient(), "test-mutex") + m := concurrency.NewMutex(chooseClient(), "test-mutex") if err := m.Lock(context.TODO()); err != nil { t.Fatalf("could not wait on lock (%v)", err) }