From e9900f6fff2c8ddcb173075fe2d845b312c908ad Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 11 May 2023 20:30:51 +0200 Subject: [PATCH] tests/robustness: Separate stream id from client id and improve AppendableHistory doc Signed-off-by: Marek Siarkowicz --- tests/robustness/identity/id.go | 15 +++++---- tests/robustness/model/history.go | 45 ++++++++++++++------------ tests/robustness/traffic/client.go | 2 +- tests/robustness/traffic/etcd.go | 8 ++--- tests/robustness/traffic/kubernetes.go | 6 ++-- 5 files changed, 40 insertions(+), 36 deletions(-) diff --git a/tests/robustness/identity/id.go b/tests/robustness/identity/id.go index 31f57ccc1..810a60e1b 100644 --- a/tests/robustness/identity/id.go +++ b/tests/robustness/identity/id.go @@ -17,8 +17,10 @@ package identity import "sync/atomic" type Provider interface { - ClientId() int - RequestId() int + // NewStreamId returns an integer starting from zero to make it render nicely by porcupine visualization. + NewStreamId() int + // NewRequestId returns unique identification used to make write requests unique. + NewRequestId() int } func NewIdProvider() Provider { @@ -26,15 +28,14 @@ func NewIdProvider() Provider { } type atomicProvider struct { - clientId atomic.Int64 + streamId atomic.Int64 requestId atomic.Int64 } -func (id *atomicProvider) ClientId() int { - // Substract one as ClientId should start from zero. - return int(id.clientId.Add(1) - 1) +func (id *atomicProvider) NewStreamId() int { + return int(id.streamId.Add(1) - 1) } -func (id *atomicProvider) RequestId() int { +func (id *atomicProvider) NewRequestId() int { return int(id.requestId.Add(1)) } diff --git a/tests/robustness/model/history.go b/tests/robustness/model/history.go index 4adf28656..3e04cec68 100644 --- a/tests/robustness/model/history.go +++ b/tests/robustness/model/history.go @@ -46,15 +46,18 @@ func ValidateOperationHistoryAndReturnVisualize(t *testing.T, lg *zap.Logger, op } } -// AppendableHistory allows to collect history of operations. Appending needs to -// be done in order of operation execution time (start, end time). All Operations -// time is expected to be calculated as time.Since common base time. +// AppendableHistory allows to collect history of sequential operations. // -// Appending validates that operations don't overlap and properly handles failed -// requests. +// Ensures that operation history is compatible with porcupine library, by preventing concurrent requests sharing the +// same stream id. For failed requests, we don't know their return time, so generate new stream id. +// +// Appending needs to be done in order of operation execution time (start, end time). +// Operations time should be calculated as time.Since common base time to ensure that Go monotonic time is used. +// More in https://github.com/golang/go/blob/96add980ad27faed627f26ef1ab09e8fe45d6bd1/src/time/time.go#L10. type AppendableHistory struct { - // id of the next write operation. If needed a new id might be requested from idProvider. - id int + // streamId for the next operation. Used for porcupine.Operation.ClientId as porcupine assumes no concurrent requests. + streamId int + // If needed a new streamId is requested from idProvider. idProvider identity.Provider History @@ -62,7 +65,7 @@ type AppendableHistory struct { func NewAppendableHistory(ids identity.Provider) *AppendableHistory { return &AppendableHistory{ - id: ids.ClientId(), + streamId: ids.NewStreamId(), idProvider: ids, History: History{ successful: []porcupine.Operation{}, @@ -77,7 +80,7 @@ func (h *AppendableHistory) AppendRange(key string, withPrefix bool, start, end revision = resp.Header.Revision } h.appendSuccessful(porcupine.Operation{ - ClientId: h.id, + ClientId: h.streamId, Input: rangeRequest(key, withPrefix, 0), Call: start.Nanoseconds(), Output: rangeResponse(resp.Kvs, resp.Count, revision), @@ -96,7 +99,7 @@ func (h *AppendableHistory) AppendPut(key, value string, start, end time.Duratio revision = resp.Header.Revision } h.appendSuccessful(porcupine.Operation{ - ClientId: h.id, + ClientId: h.streamId, Input: request, Call: start.Nanoseconds(), Output: putResponse(revision), @@ -115,7 +118,7 @@ func (h *AppendableHistory) AppendPutWithLease(key, value string, leaseID int64, revision = resp.Header.Revision } h.appendSuccessful(porcupine.Operation{ - ClientId: h.id, + ClientId: h.streamId, Input: request, Call: start.Nanoseconds(), Output: putResponse(revision), @@ -138,7 +141,7 @@ func (h *AppendableHistory) AppendLeaseGrant(start, end time.Duration, resp *cli revision = resp.ResponseHeader.Revision } h.appendSuccessful(porcupine.Operation{ - ClientId: h.id, + ClientId: h.streamId, Input: request, Call: start.Nanoseconds(), Output: leaseGrantResponse(revision), @@ -157,7 +160,7 @@ func (h *AppendableHistory) AppendLeaseRevoke(id int64, start, end time.Duration revision = resp.Header.Revision } h.appendSuccessful(porcupine.Operation{ - ClientId: h.id, + ClientId: h.streamId, Input: request, Call: start.Nanoseconds(), Output: leaseRevokeResponse(revision), @@ -178,7 +181,7 @@ func (h *AppendableHistory) AppendDelete(key string, start, end time.Duration, r deleted = resp.Deleted } h.appendSuccessful(porcupine.Operation{ - ClientId: h.id, + ClientId: h.streamId, Input: request, Call: start.Nanoseconds(), Output: deleteResponse(deleted, revision), @@ -201,7 +204,7 @@ func (h *AppendableHistory) AppendCompareRevisionAndDelete(key string, expectedR deleted = resp.Responses[0].GetResponseDeleteRange().Deleted } h.appendSuccessful(porcupine.Operation{ - ClientId: h.id, + ClientId: h.streamId, Input: request, Call: start.Nanoseconds(), Output: compareRevisionAndDeleteResponse(resp.Succeeded, deleted, revision), @@ -220,7 +223,7 @@ func (h *AppendableHistory) AppendCompareRevisionAndPut(key string, expectedRevi revision = resp.Header.Revision } h.appendSuccessful(porcupine.Operation{ - ClientId: h.id, + ClientId: h.streamId, Input: request, Call: start.Nanoseconds(), Output: compareRevisionAndPutResponse(resp.Succeeded, revision), @@ -251,7 +254,7 @@ func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, onSuccess []clientv3.O results = append(results, toEtcdOperationResult(resp)) } h.appendSuccessful(porcupine.Operation{ - ClientId: h.id, + ClientId: h.streamId, Input: request, Call: start.Nanoseconds(), Output: txnResponse(results, resp.Succeeded, revision), @@ -352,7 +355,7 @@ func (h *AppendableHistory) AppendDefragment(start, end time.Duration, resp *cli revision = resp.Header.Revision } h.appendSuccessful(porcupine.Operation{ - ClientId: h.id, + ClientId: h.streamId, Input: request, Call: start.Nanoseconds(), Output: defragmentResponse(revision), @@ -377,15 +380,15 @@ func (h *AppendableHistory) appendFailed(request EtcdRequest, call int64, err er } } h.failed = append(h.failed, porcupine.Operation{ - ClientId: h.id, + ClientId: h.streamId, Input: request, Call: call, Output: failedResponse(err), Return: 0, // For failed writes we don't know when request has really finished. }) // Operations of single client needs to be sequential. - // As we don't know return time of failed operations, all new writes need to be done with new client id. - h.id = h.idProvider.ClientId() + // As we don't know return time of failed operations, all new writes need to be done with new stream id. + h.streamId = h.idProvider.NewStreamId() } func getRequest(key string) EtcdRequest { diff --git a/tests/robustness/traffic/client.go b/tests/robustness/traffic/client.go index 289dbfeed..95a314c52 100644 --- a/tests/robustness/traffic/client.go +++ b/tests/robustness/traffic/client.go @@ -30,7 +30,7 @@ import ( // RecordingClient provides a semi etcd client (different interface than // clientv3.Client) that records all the requests and responses made. Doesn't -// allow for concurrent requests to ensure correct appending to history. +// allow for concurrent requests to confirm to model.AppendableHistory requirements. type RecordingClient struct { client clientv3.Client baseTime time.Time diff --git a/tests/robustness/traffic/etcd.go b/tests/robustness/traffic/etcd.go index 7f21fc0af..589ed31a1 100644 --- a/tests/robustness/traffic/etcd.go +++ b/tests/robustness/traffic/etcd.go @@ -124,7 +124,7 @@ func (t etcdTraffic) Write(ctx context.Context, c *RecordingClient, limiter *rat var err error switch etcdRequestType(pickRandom(t.writeChoices)) { case Put: - err = c.Put(writeCtx, key, fmt.Sprintf("%d", id.RequestId())) + err = c.Put(writeCtx, key, fmt.Sprintf("%d", id.NewRequestId())) case LargePut: err = c.Put(writeCtx, key, randString(t.largePutSize)) case Delete: @@ -136,7 +136,7 @@ func (t etcdTraffic) Write(ctx context.Context, c *RecordingClient, limiter *rat if lastValues != nil { expectRevision = lastValues.ModRevision } - err = c.CompareRevisionAndPut(writeCtx, key, fmt.Sprintf("%d", id.RequestId()), expectRevision) + err = c.CompareRevisionAndPut(writeCtx, key, fmt.Sprintf("%d", id.NewRequestId()), expectRevision) case PutWithLease: leaseId := lm.LeaseId(cid) if leaseId == 0 { @@ -148,7 +148,7 @@ func (t etcdTraffic) Write(ctx context.Context, c *RecordingClient, limiter *rat } if leaseId != 0 { putCtx, putCancel := context.WithTimeout(ctx, RequestTimeout) - err = c.PutWithLease(putCtx, key, fmt.Sprintf("%d", id.RequestId()), leaseId) + err = c.PutWithLease(putCtx, key, fmt.Sprintf("%d", id.NewRequestId()), leaseId) putCancel() } case LeaseRevoke: @@ -191,7 +191,7 @@ func (t etcdTraffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op) case model.Range: ops = append(ops, clientv3.OpGet(key)) case model.Put: - value := fmt.Sprintf("%d", ids.RequestId()) + value := fmt.Sprintf("%d", ids.NewRequestId()) ops = append(ops, clientv3.OpPut(key, value)) case model.Delete: ops = append(ops, clientv3.OpDelete(key)) diff --git a/tests/robustness/traffic/kubernetes.go b/tests/robustness/traffic/kubernetes.go index 376da6f55..76299afe1 100644 --- a/tests/robustness/traffic/kubernetes.go +++ b/tests/robustness/traffic/kubernetes.go @@ -85,7 +85,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *RecordingCl func (t kubernetesTraffic) Write(ctx context.Context, c *RecordingClient, ids identity.Provider, objects []*mvccpb.KeyValue) (err error) { writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout) if len(objects) < t.averageKeyCount/2 { - err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.RequestId())) + err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId())) } else { randomPod := objects[rand.Intn(len(objects))] if len(objects) > t.averageKeyCount*3/2 { @@ -96,9 +96,9 @@ func (t kubernetesTraffic) Write(ctx context.Context, c *RecordingClient, ids id case KubernetesDelete: err = t.Delete(writeCtx, c, string(randomPod.Key), randomPod.ModRevision) case KubernetesUpdate: - err = t.Update(writeCtx, c, string(randomPod.Key), fmt.Sprintf("%d", ids.RequestId()), randomPod.ModRevision) + err = t.Update(writeCtx, c, string(randomPod.Key), fmt.Sprintf("%d", ids.NewRequestId()), randomPod.ModRevision) case KubernetesCreate: - err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.RequestId())) + err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId())) default: panic(fmt.Sprintf("invalid choice: %q", op)) }