Merge pull request #8107 from heyitsanthony/lock-faster
concurrency: fetch current lock holder when creating waitlist keyrelease-3.3
commit
099952136a
|
@ -49,7 +49,9 @@ func (m *Mutex) Lock(ctx context.Context) error {
|
|||
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
|
||||
// reuse key in case this session already holds the lock
|
||||
get := v3.OpGet(m.myKey)
|
||||
resp, err := client.Txn(ctx).If(cmp).Then(put).Else(get).Commit()
|
||||
// fetch current holder to complete uncontended path with only one RPC
|
||||
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
|
||||
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -57,6 +59,12 @@ func (m *Mutex) Lock(ctx context.Context) error {
|
|||
if !resp.Succeeded {
|
||||
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
|
||||
}
|
||||
// if no key on prefix / the minimum rev is key, already hold the lock
|
||||
ownerKey := resp.Responses[1].GetResponseRange().Kvs
|
||||
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
|
||||
m.hdr = resp.Header
|
||||
return nil
|
||||
}
|
||||
|
||||
// wait for deletion revisions prior to myKey
|
||||
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
|
||||
|
|
Loading…
Reference in New Issue