diff --git a/clientv3/concurrency/key.go b/clientv3/concurrency/key.go new file mode 100644 index 000000000..b741f97b4 --- /dev/null +++ b/clientv3/concurrency/key.go @@ -0,0 +1,57 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package concurrency + +import ( + "fmt" + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + v3 "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/etcdserver/api/v3rpc" +) + +// NewUniqueKey creates a new key from a given prefix. +func NewUniqueKey(ctx context.Context, kv v3.KV, pfx string, opts ...v3.OpOption) (string, int64, error) { + for { + newKey := fmt.Sprintf("%s/%v", pfx, time.Now().UnixNano()) + put := v3.OpPut(newKey, "", opts...) + cmp := v3.Compare(v3.ModifiedRevision(newKey), "=", 0) + resp, err := kv.Txn(ctx).If(cmp).Then(put).Commit() + if err != nil { + return "", 0, err + } + if !resp.Succeeded { + continue + } + return newKey, resp.Header.Revision, nil + } +} + +func waitUpdate(ctx context.Context, client *v3.Client, key string, opts ...v3.OpOption) error { + w := v3.NewWatcher(client) + defer w.Close() + wc := w.Watch(ctx, key, opts...) + if wc == nil { + return ctx.Err() + } + wresp, ok := <-wc + if !ok { + return ctx.Err() + } + if len(wresp.Events) == 0 { + return v3rpc.ErrCompacted + } + return nil +} diff --git a/clientv3/concurrency/mutex.go b/clientv3/concurrency/mutex.go new file mode 100644 index 000000000..f80f2e40f --- /dev/null +++ b/clientv3/concurrency/mutex.go @@ -0,0 +1,111 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package concurrency + +import ( + "sync" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + v3 "github.com/coreos/etcd/clientv3" +) + +// Mutex implements the sync Locker interface with etcd +type Mutex struct { + client *v3.Client + kv v3.KV + ctx context.Context + + pfx string + myKey string + myRev int64 +} + +func NewMutex(ctx context.Context, client *v3.Client, pfx string) *Mutex { + return &Mutex{client, v3.NewKV(client), ctx, pfx, "", -1} +} + +// Lock locks the mutex with a cancellable context. If the context is cancelled +// while trying to acquire the lock, the mutex tries to clean its stale lock entry. +func (m *Mutex) Lock(ctx context.Context) error { + s, err := NewSession(m.client) + if err != nil { + return err + } + // put self in lock waiters via myKey; oldest waiter holds lock + m.myKey, m.myRev, err = NewUniqueKey(ctx, m.kv, m.pfx, v3.WithLease(s.Lease())) + // wait for lock to become available + for err == nil { + // find oldest element in waiters via revision of insertion + var resp *v3.GetResponse + resp, err = m.kv.Get(ctx, m.pfx, v3.WithFirstRev()...) + if err != nil { + break + } + if m.myRev == resp.Kvs[0].CreateRevision { + // myKey is oldest in waiters; myKey holds the lock now + return nil + } + // otherwise myKey isn't lowest, so there must be a pfx prior to myKey + opts := append(v3.WithLastRev(), v3.WithRev(m.myRev-1)) + resp, err = m.kv.Get(ctx, m.pfx, opts...) + if err != nil { + break + } + lastKey := string(resp.Kvs[0].Key) + // wait for release on prior pfx + err = waitUpdate(ctx, m.client, lastKey, v3.WithRev(m.myRev)) + // try again in case lastKey left the wait list before acquiring the lock; + // myKey can only hold the lock if it's the oldest in the list + } + + // release lock key if cancelled + select { + case <-ctx.Done(): + m.Unlock() + default: + } + return err +} + +func (m *Mutex) Unlock() error { + if _, err := m.kv.Delete(m.ctx, m.myKey); err != nil { + return err + } + m.myKey = "\x00" + m.myRev = -1 + return nil +} + +func (m *Mutex) IsOwner() v3.Cmp { + return v3.Compare(v3.CreatedRevision(m.myKey), "=", m.myRev) +} + +type lockerMutex struct{ *Mutex } + +func (lm *lockerMutex) Lock() { + if err := lm.Mutex.Lock(lm.ctx); err != nil { + panic(err) + } +} +func (lm *lockerMutex) Unlock() { + if err := lm.Mutex.Unlock(); err != nil { + panic(err) + } +} + +// NewLocker creates a sync.Locker backed by an etcd mutex. +func NewLocker(ctx context.Context, client *v3.Client, pfx string) sync.Locker { + return &lockerMutex{NewMutex(ctx, client, pfx)} +} diff --git a/clientv3/op.go b/clientv3/op.go index 9f76e2d39..a7b545049 100644 --- a/clientv3/op.go +++ b/clientv3/op.go @@ -227,3 +227,26 @@ func WithFromKey() OpOption { return WithRange("\x00") } func WithSerializable() OpOption { return func(op *Op) { op.serializable = true } } + +// WithFirstCreate gets the key with the oldest creation revision in the request range. +func WithFirstCreate() []OpOption { return withTop(SortByCreatedRev, SortAscend) } + +// WithLastCreate gets the key with the latest creation revision in the request range. +func WithLastCreate() []OpOption { return withTop(SortByCreatedRev, SortDescend) } + +// WithFirstKey gets the lexically first key in the request range. +func WithFirstKey() []OpOption { return withTop(SortByKey, SortAscend) } + +// WithLastKey gets the lexically last key in the request range. +func WithLastKey() []OpOption { return withTop(SortByKey, SortDescend) } + +// WithFirstRev gets the key with the oldest modification revision in the request range. +func WithFirstRev() []OpOption { return withTop(SortByModifiedRev, SortAscend) } + +// WithLastRev gets the key with the latest modification revision in the request range. +func WithLastRev() []OpOption { return withTop(SortByModifiedRev, SortDescend) } + +// withTop gets the first key over the get's prefix given a sort order +func withTop(target SortTarget, order SortOrder) []OpOption { + return []OpOption{WithPrefix(), WithSort(target, order), WithLimit(1)} +} diff --git a/contrib/recipes/barrier.go b/contrib/recipes/barrier.go index dcfd025a6..56fde2dfd 100644 --- a/contrib/recipes/barrier.go +++ b/contrib/recipes/barrier.go @@ -49,7 +49,7 @@ func (b *Barrier) Release() error { // Wait blocks on the barrier key until it is deleted. If there is no key, Wait // assumes Release has already been called and returns immediately. func (b *Barrier) Wait() error { - resp, err := b.kv.Get(b.ctx, b.key, withFirstKey()...) + resp, err := b.kv.Get(b.ctx, b.key, v3.WithFirstKey()...) if err != nil { return err } diff --git a/contrib/recipes/election.go b/contrib/recipes/election.go index 163f49867..d091a1975 100644 --- a/contrib/recipes/election.go +++ b/contrib/recipes/election.go @@ -62,7 +62,7 @@ func (e *Election) Resign() (err error) { // Leader returns the leader value for the current election. func (e *Election) Leader() (string, error) { - resp, err := e.kv.Get(e.ctx, e.keyPrefix, withFirstCreate()...) + resp, err := e.kv.Get(e.ctx, e.keyPrefix, v3.WithFirstCreate()...) if err != nil { return "", err } else if len(resp.Kvs) == 0 { @@ -74,7 +74,7 @@ func (e *Election) Leader() (string, error) { // Wait waits for a leader to be elected, returning the leader value. func (e *Election) Wait() (string, error) { - resp, err := e.kv.Get(e.ctx, e.keyPrefix, withFirstCreate()...) + resp, err := e.kv.Get(e.ctx, e.keyPrefix, v3.WithFirstCreate()...) if err != nil { return "", err } else if len(resp.Kvs) != 0 { @@ -93,7 +93,7 @@ func (e *Election) Wait() (string, error) { } func (e *Election) waitLeadership(tryKey *EphemeralKV) error { - opts := append(withLastCreate(), v3.WithRev(tryKey.Revision()-1)) + opts := append(v3.WithLastCreate(), v3.WithRev(tryKey.Revision()-1)) resp, err := e.kv.Get(e.ctx, e.keyPrefix, opts...) if err != nil { return err diff --git a/contrib/recipes/key.go b/contrib/recipes/key.go index e88357611..a7011c177 100644 --- a/contrib/recipes/key.go +++ b/contrib/recipes/key.go @@ -99,7 +99,7 @@ func NewSequentialKV(kv v3.KV, prefix, val string) (*RemoteKV, error) { // newSequentialKV allocates a new sequential key /nnnnn with a given // value and lease. Note: a bookkeeping node __ is also allocated. func newSequentialKV(kv v3.KV, prefix, val string, leaseID lease.LeaseID) (*RemoteKV, error) { - resp, err := kv.Get(context.TODO(), prefix, withLastKey()...) + resp, err := kv.Get(context.TODO(), prefix, v3.WithLastKey()...) if err != nil { return nil, err } diff --git a/contrib/recipes/mutex.go b/contrib/recipes/mutex.go deleted file mode 100644 index f872e6d4d..000000000 --- a/contrib/recipes/mutex.go +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright 2016 CoreOS, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package recipe - -import ( - "sync" - - "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - v3 "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/storage/storagepb" -) - -// Mutex implements the sync Locker interface with etcd -type Mutex struct { - client *v3.Client - kv v3.KV - ctx context.Context - - key string - myKey *EphemeralKV -} - -func NewMutex(client *v3.Client, key string) *Mutex { - return &Mutex{client, v3.NewKV(client), context.TODO(), key, nil} -} - -func (m *Mutex) Lock() (err error) { - // put self in lock waiters via myKey; oldest waiter holds lock - m.myKey, err = NewUniqueEphemeralKey(m.client, m.key) - if err != nil { - return err - } - // find oldest element in waiters via revision of insertion - resp, err := m.kv.Get(m.ctx, m.key, withFirstRev()...) - if err != nil { - return err - } - // if myKey is oldest in waiters, then myKey holds the lock - if m.myKey.Revision() == resp.Kvs[0].CreateRevision { - return nil - } - // otherwise myKey isn't lowest, so there must be a key prior to myKey - opts := append(withLastRev(), v3.WithRev(m.myKey.Revision()-1)) - lastKey, err := m.kv.Get(m.ctx, m.key, opts...) - if err != nil { - return err - } - // wait for release on prior key - _, err = WaitEvents( - m.client, - string(lastKey.Kvs[0].Key), - m.myKey.Revision()-1, - []storagepb.Event_EventType{storagepb.DELETE}) - // myKey now oldest - return err -} - -func (m *Mutex) Unlock() error { - err := m.myKey.Delete() - m.myKey = nil - return err -} - -type lockerMutex struct{ *Mutex } - -func (lm *lockerMutex) Lock() { - if err := lm.Mutex.Lock(); err != nil { - panic(err) - } -} -func (lm *lockerMutex) Unlock() { - if err := lm.Mutex.Unlock(); err != nil { - panic(err) - } -} - -func NewLocker(client *v3.Client, key string) sync.Locker { - return &lockerMutex{NewMutex(client, key)} -} diff --git a/contrib/recipes/priority_queue.go b/contrib/recipes/priority_queue.go index e7cce4ec0..1a1c628a1 100644 --- a/contrib/recipes/priority_queue.go +++ b/contrib/recipes/priority_queue.go @@ -46,7 +46,7 @@ func (q *PriorityQueue) Enqueue(val string, pr uint16) error { // queue is empty, Dequeue blocks until items are available. func (q *PriorityQueue) Dequeue() (string, error) { // TODO: fewer round trips by fetching more than one key - resp, err := q.kv.Get(q.ctx, q.key, withFirstKey()...) + resp, err := q.kv.Get(q.ctx, q.key, v3.WithFirstKey()...) if err != nil { return "", err } diff --git a/contrib/recipes/queue.go b/contrib/recipes/queue.go index 396420d2c..c0a4977fb 100644 --- a/contrib/recipes/queue.go +++ b/contrib/recipes/queue.go @@ -42,7 +42,7 @@ func (q *Queue) Enqueue(val string) error { // queue is empty, Dequeue blocks until elements are available. func (q *Queue) Dequeue() (string, error) { // TODO: fewer round trips by fetching more than one key - resp, err := q.kv.Get(q.ctx, q.keyPrefix, withFirstRev()...) + resp, err := q.kv.Get(q.ctx, q.keyPrefix, v3.WithFirstRev()...) if err != nil { return "", err } diff --git a/contrib/recipes/range.go b/contrib/recipes/range.go deleted file mode 100644 index 155a644e8..000000000 --- a/contrib/recipes/range.go +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2016 CoreOS, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package recipe - -import ( - v3 "github.com/coreos/etcd/clientv3" -) - -func withFirstCreate() []v3.OpOption { return withTop(v3.SortByCreatedRev, v3.SortAscend) } -func withLastCreate() []v3.OpOption { return withTop(v3.SortByCreatedRev, v3.SortDescend) } -func withFirstKey() []v3.OpOption { return withTop(v3.SortByKey, v3.SortAscend) } -func withLastKey() []v3.OpOption { return withTop(v3.SortByKey, v3.SortDescend) } -func withFirstRev() []v3.OpOption { return withTop(v3.SortByModifiedRev, v3.SortAscend) } -func withLastRev() []v3.OpOption { return withTop(v3.SortByModifiedRev, v3.SortDescend) } - -// withTop gets the first key over the get's prefix given a sort order -func withTop(target v3.SortTarget, order v3.SortOrder) []v3.OpOption { - return []v3.OpOption{ - v3.WithPrefix(), - v3.WithSort(target, order), - v3.WithLimit(1)} -} diff --git a/contrib/recipes/rwmutex.go b/contrib/recipes/rwmutex.go index 93aff10cd..ea2425c69 100644 --- a/contrib/recipes/rwmutex.go +++ b/contrib/recipes/rwmutex.go @@ -42,7 +42,7 @@ func (rwm *RWMutex) RLock() error { // if there are nodes with "write-" and a lower // revision number than us we must wait - resp, err := rwm.kv.Get(rwm.ctx, rwm.key+"/write", withFirstRev()...) + resp, err := rwm.kv.Get(rwm.ctx, rwm.key+"/write", v3.WithFirstRev()...) if err != nil { return err } @@ -62,7 +62,7 @@ func (rwm *RWMutex) Lock() error { for { // find any key of lower rev number blocks the write lock - opts := append(withLastRev(), v3.WithRev(rk.Revision()-1)) + opts := append(v3.WithLastRev(), v3.WithRev(rk.Revision()-1)) resp, err := rwm.kv.Get(rwm.ctx, rwm.key, opts...) if err != nil { return err @@ -82,7 +82,7 @@ func (rwm *RWMutex) Lock() error { func (rwm *RWMutex) waitOnLowest() error { // must block; get key before ek for waiting - opts := append(withLastRev(), v3.WithRev(rwm.myKey.Revision()-1)) + opts := append(v3.WithLastRev(), v3.WithRev(rwm.myKey.Revision()-1)) lastKey, err := rwm.kv.Get(rwm.ctx, rwm.key, opts...) if err != nil { return err diff --git a/integration/v3_lock_test.go b/integration/v3_lock_test.go index a3dfc1fab..97c0ae4f9 100644 --- a/integration/v3_lock_test.go +++ b/integration/v3_lock_test.go @@ -18,7 +18,9 @@ import ( "testing" "time" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" "github.com/coreos/etcd/contrib/recipes" ) @@ -36,11 +38,11 @@ func TestMutexMultiNode(t *testing.T) { func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client) { // stream lock acquisitions - lockedC := make(chan *recipe.Mutex, 1) + lockedC := make(chan *concurrency.Mutex, 1) for i := 0; i < waiters; i++ { go func() { - m := recipe.NewMutex(chooseClient(), "test-mutex") - if err := m.Lock(); err != nil { + m := concurrency.NewMutex(context.TODO(), chooseClient(), "test-mutex") + if err := m.Lock(context.TODO()); err != nil { t.Fatalf("could not wait on lock (%v)", err) } lockedC <- m