From 6a68141db573c08d37a2f5478e7b48d86a66c986 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 1 Dec 2022 20:22:50 +0100 Subject: [PATCH] tests: Allow dynamic number of clients Signed-off-by: Marek Siarkowicz --- tests/linearizability/client.go | 4 +- tests/linearizability/id.go | 40 +++++++++++++++++++ tests/linearizability/linearizability_test.go | 5 ++- tests/linearizability/traffic.go | 13 ++---- 4 files changed, 49 insertions(+), 13 deletions(-) create mode 100644 tests/linearizability/id.go diff --git a/tests/linearizability/client.go b/tests/linearizability/client.go index ca2da56df..b8cf2dd03 100644 --- a/tests/linearizability/client.go +++ b/tests/linearizability/client.go @@ -30,7 +30,7 @@ type recordingClient struct { operations []porcupine.Operation } -func NewClient(endpoints []string, id int) (*recordingClient, error) { +func NewClient(endpoints []string, ids idProvider) (*recordingClient, error) { cc, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, Logger: zap.NewNop(), @@ -42,7 +42,7 @@ func NewClient(endpoints []string, id int) (*recordingClient, error) { } return &recordingClient{ client: *cc, - id: id, + id: ids.ClientId(), operations: []porcupine.Operation{}, }, nil } diff --git a/tests/linearizability/id.go b/tests/linearizability/id.go new file mode 100644 index 000000000..4e8fa3817 --- /dev/null +++ b/tests/linearizability/id.go @@ -0,0 +1,40 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linearizability + +import "sync/atomic" + +type idProvider interface { + ClientId() int + RequestId() int +} + +func newIdProvider() idProvider { + return &atomicProvider{} +} + +type atomicProvider struct { + clientId atomic.Int64 + requestId atomic.Int64 +} + +func (id *atomicProvider) ClientId() int { + // Substract one as ClientId should start from zero. + return int(id.clientId.Add(1) - 1) +} + +func (id *atomicProvider) RequestId() int { + return int(id.requestId.Add(1)) +} diff --git a/tests/linearizability/linearizability_test.go b/tests/linearizability/linearizability_test.go index 70ac4cb4a..ae5539e9d 100644 --- a/tests/linearizability/linearizability_test.go +++ b/tests/linearizability/linearizability_test.go @@ -144,6 +144,7 @@ func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessClu mux := sync.Mutex{} endpoints := clus.EndpointsV3() + ids := newIdProvider() limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200) startTime := time.Now() @@ -151,7 +152,7 @@ func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessClu for i := 0; i < config.clientCount; i++ { wg.Add(1) endpoints := []string{endpoints[i%len(endpoints)]} - c, err := NewClient(endpoints, i) + c, err := NewClient(endpoints, ids) if err != nil { t.Fatal(err) } @@ -159,7 +160,7 @@ func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessClu defer wg.Done() defer c.Close() - config.traffic.Run(ctx, c, limiter) + config.traffic.Run(ctx, c, limiter, ids) mux.Lock() operations = append(operations, c.operations...) mux.Unlock() diff --git a/tests/linearizability/traffic.go b/tests/linearizability/traffic.go index f1507466e..413f3b8e2 100644 --- a/tests/linearizability/traffic.go +++ b/tests/linearizability/traffic.go @@ -28,7 +28,7 @@ var ( ) type Traffic interface { - Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter) + Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter, ids idProvider) } type readWriteSingleKey struct { @@ -41,12 +41,9 @@ type opChance struct { chance int } -func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter) { - maxOperationsPerClient := 1000000 - minId := maxOperationsPerClient * c.id - maxId := maxOperationsPerClient * (c.id + 1) +func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter, ids idProvider) { - for writeId := minId; writeId < maxId; { + for { select { case <-ctx.Done(): return @@ -58,10 +55,8 @@ func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter continue } // Provide each write with unique id to make it easier to validate operation history. - t.Write(ctx, c, limiter, writeId) - writeId++ + t.Write(ctx, c, limiter, ids.RequestId()) } - return } func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limiter *rate.Limiter) error {