clientv3/concurrency: use session lease id for mutex keys
With randomized keys, if the connection goes down, but the session remains, the client would need complicated recovery logic to avoid deadlock. Instead, bind the session's lease id to the lock entry; if a session tries to reacquire the lock it will reassume its old place in the wait list.release-3.0
parent
d923b59190
commit
22797c7185
|
@ -15,6 +15,7 @@
|
||||||
package concurrency
|
package concurrency
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
v3 "github.com/coreos/etcd/clientv3"
|
v3 "github.com/coreos/etcd/clientv3"
|
||||||
|
@ -37,12 +38,26 @@ func NewMutex(client *v3.Client, pfx string) *Mutex {
|
||||||
// Lock locks the mutex with a cancellable context. If the context is cancelled
|
// Lock locks the mutex with a cancellable context. If the context is cancelled
|
||||||
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
|
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
|
||||||
func (m *Mutex) Lock(ctx context.Context) error {
|
func (m *Mutex) Lock(ctx context.Context) error {
|
||||||
s, err := NewSession(m.client)
|
s, serr := NewSession(m.client)
|
||||||
|
if serr != nil {
|
||||||
|
return serr
|
||||||
|
}
|
||||||
|
|
||||||
|
m.myKey = fmt.Sprintf("%s/%x", m.pfx, s.Lease())
|
||||||
|
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
|
||||||
|
// put self in lock waiters via myKey; oldest waiter holds lock
|
||||||
|
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
|
||||||
|
// reuse key in case this session already holds the lock
|
||||||
|
get := v3.OpGet(m.myKey)
|
||||||
|
resp, err := m.client.Txn(ctx).If(cmp).Then(put).Else(get).Commit()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// put self in lock waiters via myKey; oldest waiter holds lock
|
m.myRev = resp.Header.Revision
|
||||||
m.myKey, m.myRev, err = NewUniqueKey(ctx, m.client, m.pfx, v3.WithLease(s.Lease()))
|
if !resp.Succeeded {
|
||||||
|
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
|
||||||
|
}
|
||||||
|
|
||||||
// wait for deletion revisions prior to myKey
|
// wait for deletion revisions prior to myKey
|
||||||
err = waitDeletes(ctx, m.client, m.pfx, v3.WithPrefix(), v3.WithRev(m.myRev-1))
|
err = waitDeletes(ctx, m.client, m.pfx, v3.WithPrefix(), v3.WithRev(m.myRev-1))
|
||||||
// release lock key if cancelled
|
// release lock key if cancelled
|
||||||
|
|
|
@ -16,6 +16,7 @@ package integration
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -28,18 +29,24 @@ import (
|
||||||
func TestMutexSingleNode(t *testing.T) {
|
func TestMutexSingleNode(t *testing.T) {
|
||||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||||
defer clus.Terminate(t)
|
defer clus.Terminate(t)
|
||||||
testMutex(t, 5, func() *clientv3.Client { return clus.clients[0] })
|
|
||||||
|
var clients []*clientv3.Client
|
||||||
|
testMutex(t, 5, makeSingleNodeClients(t, clus.cluster, &clients))
|
||||||
|
closeClients(t, clients)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMutexMultiNode(t *testing.T) {
|
func TestMutexMultiNode(t *testing.T) {
|
||||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||||
defer clus.Terminate(t)
|
defer clus.Terminate(t)
|
||||||
testMutex(t, 5, func() *clientv3.Client { return clus.RandClient() })
|
|
||||||
|
var clients []*clientv3.Client
|
||||||
|
testMutex(t, 5, makeMultiNodeClients(t, clus.cluster, &clients))
|
||||||
|
closeClients(t, clients)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client) {
|
func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client) {
|
||||||
// stream lock acquisitions
|
// stream lock acquisitions
|
||||||
lockedC := make(chan *concurrency.Mutex, 1)
|
lockedC := make(chan *concurrency.Mutex)
|
||||||
for i := 0; i < waiters; i++ {
|
for i := 0; i < waiters; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
m := concurrency.NewMutex(chooseClient(), "test-mutex")
|
m := concurrency.NewMutex(chooseClient(), "test-mutex")
|
||||||
|
@ -69,6 +76,22 @@ func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestMutexSessionRelock ensures that acquiring the same lock with the same
|
||||||
|
// session will not result in deadlock.
|
||||||
|
func TestMutexSessionRelock(t *testing.T) {
|
||||||
|
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||||
|
defer clus.Terminate(t)
|
||||||
|
cli := clus.RandClient()
|
||||||
|
m := concurrency.NewMutex(cli, "test-mutex")
|
||||||
|
if err := m.Lock(context.TODO()); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
m2 := concurrency.NewMutex(cli, "test-mutex")
|
||||||
|
if err := m2.Lock(context.TODO()); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkMutex4Waiters(b *testing.B) {
|
func BenchmarkMutex4Waiters(b *testing.B) {
|
||||||
// XXX switch tests to use TB interface
|
// XXX switch tests to use TB interface
|
||||||
clus := NewClusterV3(nil, &ClusterConfig{Size: 3})
|
clus := NewClusterV3(nil, &ClusterConfig{Size: 3})
|
||||||
|
@ -137,3 +160,38 @@ func testRWMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func makeClients(t *testing.T, clients *[]*clientv3.Client, choose func() *member) func() *clientv3.Client {
|
||||||
|
var mu sync.Mutex
|
||||||
|
*clients = nil
|
||||||
|
return func() *clientv3.Client {
|
||||||
|
cli, err := NewClientV3(choose())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
mu.Lock()
|
||||||
|
*clients = append(*clients, cli)
|
||||||
|
mu.Unlock()
|
||||||
|
return cli
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeSingleNodeClients(t *testing.T, clus *cluster, clients *[]*clientv3.Client) func() *clientv3.Client {
|
||||||
|
return makeClients(t, clients, func() *member {
|
||||||
|
return clus.Members[0]
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeMultiNodeClients(t *testing.T, clus *cluster, clients *[]*clientv3.Client) func() *clientv3.Client {
|
||||||
|
return makeClients(t, clients, func() *member {
|
||||||
|
return clus.Members[rand.Intn(len(clus.Members))]
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func closeClients(t *testing.T, clients []*clientv3.Client) {
|
||||||
|
for _, cli := range clients {
|
||||||
|
if err := cli.Close(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue