From 09b9f889e77884a0d0e08a374265d9ef25b6ad60 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 16 Jun 2023 11:22:55 +0200 Subject: [PATCH] tests/robustness: Refactor etcd traffic client Signed-off-by: Marek Siarkowicz --- tests/robustness/traffic/etcd.go | 63 ++++++++++++++++++++------------ 1 file changed, 39 insertions(+), 24 deletions(-) diff --git a/tests/robustness/traffic/etcd.go b/tests/robustness/traffic/etcd.go index cfce3845c..21506ad78 100644 --- a/tests/robustness/traffic/etcd.go +++ b/tests/robustness/traffic/etcd.go @@ -100,6 +100,13 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate. lastOperationSucceeded := true var lastRev int64 var requestType etcdRequestType + client := etcdTrafficClient{ + etcdTraffic: t, + client: c, + limiter: limiter, + idProvider: ids, + leaseStorage: lm, + } for { select { case <-ctx.Done(): @@ -115,7 +122,7 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate. } else { requestType = Get } - rev, err := t.Request(ctx, c, requestType, limiter, key, ids, lm, lastRev) + rev, err := client.Request(ctx, requestType, key, lastRev) lastOperationSucceeded = err == nil if err != nil { continue @@ -127,87 +134,95 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate. } } -func (t etcdTraffic) Request(ctx context.Context, c *RecordingClient, request etcdRequestType, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage, lastRev int64) (rev int64, err error) { +type etcdTrafficClient struct { + etcdTraffic + client *RecordingClient + limiter *rate.Limiter + idProvider identity.Provider + leaseStorage identity.LeaseIdStorage +} + +func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType, key string, lastRev int64) (rev int64, err error) { opCtx, cancel := context.WithTimeout(ctx, RequestTimeout) switch request { case StaleGet: - _, rev, err = c.Get(opCtx, key, lastRev) + _, rev, err = c.client.Get(opCtx, key, lastRev) case Get: - _, rev, err = c.Get(opCtx, key, 0) + _, rev, err = c.client.Get(opCtx, key, 0) case Put: var resp *clientv3.PutResponse - resp, err = c.Put(opCtx, key, fmt.Sprintf("%d", id.NewRequestId())) + resp, err = c.client.Put(opCtx, key, fmt.Sprintf("%d", c.idProvider.NewRequestId())) if resp != nil { rev = resp.Header.Revision } case LargePut: var resp *clientv3.PutResponse - resp, err = c.Put(opCtx, key, randString(t.largePutSize)) + resp, err = c.client.Put(opCtx, key, randString(c.largePutSize)) if resp != nil { rev = resp.Header.Revision } case Delete: var resp *clientv3.DeleteResponse - resp, err = c.Delete(opCtx, key) + resp, err = c.client.Delete(opCtx, key) if resp != nil { rev = resp.Header.Revision } case MultiOpTxn: var resp *clientv3.TxnResponse - resp, err = c.Txn(opCtx, nil, t.pickMultiTxnOps(id), nil) + resp, err = c.client.Txn(opCtx, nil, c.pickMultiTxnOps(), nil) if resp != nil { rev = resp.Header.Revision } case CompareAndSet: var kv *mvccpb.KeyValue - kv, rev, err = c.Get(opCtx, key, 0) + kv, rev, err = c.client.Get(opCtx, key, 0) if err == nil { - limiter.Wait(ctx) + c.limiter.Wait(ctx) var expectedRevision int64 if kv != nil { expectedRevision = kv.ModRevision } txnCtx, txnCancel := context.WithTimeout(ctx, RequestTimeout) var resp *clientv3.TxnResponse - resp, err = c.Txn(txnCtx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, fmt.Sprintf("%d", id.NewRequestId()))}, nil) + resp, err = c.client.Txn(txnCtx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, fmt.Sprintf("%d", c.idProvider.NewRequestId()))}, nil) txnCancel() if resp != nil { rev = resp.Header.Revision } } case PutWithLease: - leaseId := lm.LeaseId(c.id) + leaseId := c.leaseStorage.LeaseId(c.client.id) if leaseId == 0 { var resp *clientv3.LeaseGrantResponse - resp, err = c.LeaseGrant(opCtx, t.leaseTTL) + resp, err = c.client.LeaseGrant(opCtx, c.leaseTTL) if resp != nil { leaseId = int64(resp.ID) rev = resp.ResponseHeader.Revision } if err == nil { - lm.AddLeaseId(c.id, leaseId) - limiter.Wait(ctx) + c.leaseStorage.AddLeaseId(c.client.id, leaseId) + c.limiter.Wait(ctx) } } if leaseId != 0 { putCtx, putCancel := context.WithTimeout(ctx, RequestTimeout) var resp *clientv3.PutResponse - resp, err = c.PutWithLease(putCtx, key, fmt.Sprintf("%d", id.NewRequestId()), leaseId) + resp, err = c.client.PutWithLease(putCtx, key, fmt.Sprintf("%d", c.idProvider.NewRequestId()), leaseId) putCancel() if resp != nil { rev = resp.Header.Revision } } case LeaseRevoke: - leaseId := lm.LeaseId(c.id) + leaseId := c.leaseStorage.LeaseId(c.client.id) if leaseId != 0 { var resp *clientv3.LeaseRevokeResponse - resp, err = c.LeaseRevoke(opCtx, leaseId) + resp, err = c.client.LeaseRevoke(opCtx, leaseId) //if LeaseRevoke has failed, do not remove the mapping. if err == nil { - lm.RemoveLeaseId(c.id) + c.leaseStorage.RemoveLeaseId(c.client.id) } if resp != nil { rev = resp.Header.Revision @@ -215,7 +230,7 @@ func (t etcdTraffic) Request(ctx context.Context, c *RecordingClient, request et } case Defragment: var resp *clientv3.DefragmentResponse - resp, err = c.Defragment(opCtx) + resp, err = c.client.Defragment(opCtx) if resp != nil { rev = resp.Header.Revision } @@ -226,13 +241,13 @@ func (t etcdTraffic) Request(ctx context.Context, c *RecordingClient, request et return rev, err } -func (t etcdTraffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op) { - keys := rand.Perm(t.keyCount) +func (c etcdTrafficClient) pickMultiTxnOps() (ops []clientv3.Op) { + keys := rand.Perm(c.keyCount) opTypes := make([]model.OperationType, 4) atLeastOnePut := false for i := 0; i < MultiOpTxnOpCount; i++ { - opTypes[i] = t.pickOperationType() + opTypes[i] = c.pickOperationType() if opTypes[i] == model.PutOperation { atLeastOnePut = true } @@ -248,7 +263,7 @@ func (t etcdTraffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op) case model.RangeOperation: ops = append(ops, clientv3.OpGet(key)) case model.PutOperation: - value := fmt.Sprintf("%d", ids.NewRequestId()) + value := fmt.Sprintf("%d", c.idProvider.NewRequestId()) ops = append(ops, clientv3.OpPut(key, value)) case model.DeleteOperation: ops = append(ops, clientv3.OpDelete(key))