Merge pull request #14604 from ahrtr/double_barrier

clientv3: fix the implementation of double barrier
dependabot/go_modules/go.uber.org/atomic-1.10.0
Benjamin Wang 2022-10-31 08:13:12 +08:00 committed by GitHub
commit a1018dbddf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 109 additions and 10 deletions

View File

@ -45,25 +45,46 @@ func NewDoubleBarrier(s *concurrency.Session, key string, count int) *DoubleBarr
// Enter waits for "count" processes to enter the barrier then returns
func (b *DoubleBarrier) Enter() error {
client := b.s.Client()
// Check the entered clients before creating the UniqueEphemeralKey,
// fail the request if there are already too many clients.
if resp1, err := b.enteredClients(client); err != nil {
return err
} else if len(resp1.Kvs) >= b.count {
return ErrTooManyClients
}
ek, err := newUniqueEphemeralKey(b.s, b.key+"/waiters")
if err != nil {
return err
}
b.myKey = ek
resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
// Check the entered clients after creating the UniqueEphemeralKey
resp2, err := b.enteredClients(client)
if err != nil {
return err
}
if len(resp2.Kvs) >= b.count {
lastWaiter := resp2.Kvs[b.count-1]
if ek.rev > lastWaiter.CreateRevision {
// delete itself now, otherwise other processes may need to wait
// until these keys are automatically deleted when the related
// lease expires.
if err = b.myKey.Delete(); err != nil {
// Nothing to do here. We have to wait for the key to be
// deleted when the lease expires.
}
return ErrTooManyClients
}
if len(resp.Kvs) > b.count {
return ErrTooManyClients
}
if len(resp.Kvs) == b.count {
// unblock waiters
_, err = client.Put(b.ctx, b.key+"/ready", "")
return err
if ek.rev == lastWaiter.CreateRevision {
// TODO(ahrtr): we might need to compare ek.key and
// string(lastWaiter.Key), they should be equal.
// unblock all other waiters
_, err = client.Put(b.ctx, b.key+"/ready", "")
return err
}
}
_, err = WaitEvents(
@ -74,6 +95,18 @@ func (b *DoubleBarrier) Enter() error {
return err
}
// enteredClients gets all the entered clients, which are ordered by the
// createRevision in ascending order.
func (b *DoubleBarrier) enteredClients(cli *clientv3.Client) (*clientv3.GetResponse, error) {
resp, err := cli.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByCreateRevision, clientv3.SortAscend))
if err != nil {
return nil, err
}
return resp, nil
}
// Leave waits for "count" processes to leave the barrier then returns
func (b *DoubleBarrier) Leave() error {
client := b.s.Client()
@ -96,7 +129,7 @@ func (b *DoubleBarrier) Leave() error {
}
isLowest := string(lowest.Key) == b.myKey.Key()
if len(resp.Kvs) == 1 {
if len(resp.Kvs) == 1 && isLowest {
// this is the only node in the barrier; finish up
if _, err = client.Delete(b.ctx, b.key+"/ready"); err != nil {
return err

View File

@ -15,9 +15,14 @@
package recipes_test
import (
"context"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
recipe "go.etcd.io/etcd/client/v3/experimental/recipes"
integration2 "go.etcd.io/etcd/tests/v3/framework/integration"
@ -97,6 +102,67 @@ func TestDoubleBarrier(t *testing.T) {
}
}
func TestDoubleBarrierTooManyClients(t *testing.T) {
integration2.BeforeTest(t)
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3})
defer clus.Terminate(t)
waiters := 10
session, err := concurrency.NewSession(clus.RandClient())
if err != nil {
t.Error(err)
}
defer session.Orphan()
b := recipe.NewDoubleBarrier(session, "test-barrier", waiters)
donec := make(chan struct{})
var (
wgDone sync.WaitGroup // make sure all clients have finished the tasks
wgEntered sync.WaitGroup // make sure all clients have entered the double barrier
)
wgDone.Add(waiters)
wgEntered.Add(waiters)
for i := 0; i < waiters; i++ {
go func() {
defer wgDone.Done()
session, err := concurrency.NewSession(clus.RandClient())
if err != nil {
t.Error(err)
}
defer session.Orphan()
bb := recipe.NewDoubleBarrier(session, "test-barrier", waiters)
if err := bb.Enter(); err != nil {
t.Errorf("could not enter on barrier (%v)", err)
}
wgEntered.Done()
<-donec
if err := bb.Leave(); err != nil {
t.Errorf("could not leave on barrier (%v)", err)
}
}()
}
// Wait until all clients have already entered the double barrier, so
// no any other client can enter the barrier.
wgEntered.Wait()
t.Log("Try to enter into double barrier")
if err := b.Enter(); err != recipe.ErrTooManyClients {
t.Errorf("Unexcepted error, expected: ErrTooManyClients, got: %v", err)
}
resp, err := clus.RandClient().Get(context.TODO(), "test-barrier/waiters", clientv3.WithPrefix())
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
// Make sure the extra `b.Enter()` did not create a new ephemeral key
assert.Equal(t, waiters, len(resp.Kvs))
close(donec)
wgDone.Wait()
}
func TestDoubleBarrierFailover(t *testing.T) {
integration2.BeforeTest(t)