From 962e15038e86c6958fadf9cc56a5398763ed844a Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 10 May 2023 15:16:52 +0200 Subject: [PATCH] tests/robustness: Add safeguards to client and history Signed-off-by: Marek Siarkowicz --- tests/robustness/model/history.go | 85 ++++++++++++++++++++++------- tests/robustness/traffic/client.go | 54 ++++++++++++++---- tests/robustness/traffic/traffic.go | 4 +- 3 files changed, 109 insertions(+), 34 deletions(-) diff --git a/tests/robustness/model/history.go b/tests/robustness/model/history.go index d34e5d08b..4adf28656 100644 --- a/tests/robustness/model/history.go +++ b/tests/robustness/model/history.go @@ -46,6 +46,12 @@ func ValidateOperationHistoryAndReturnVisualize(t *testing.T, lg *zap.Logger, op } } +// AppendableHistory allows to collect history of operations. Appending needs to +// be done in order of operation execution time (start, end time). All Operations +// time is expected to be calculated as time.Since common base time. +// +// Appending validates that operations don't overlap and properly handles failed +// requests. type AppendableHistory struct { // id of the next write operation. If needed a new id might be requested from idProvider. id int @@ -70,7 +76,7 @@ func (h *AppendableHistory) AppendRange(key string, withPrefix bool, start, end if resp != nil && resp.Header != nil { revision = resp.Header.Revision } - h.successful = append(h.successful, porcupine.Operation{ + h.appendSuccessful(porcupine.Operation{ ClientId: h.id, Input: rangeRequest(key, withPrefix, 0), Call: start.Nanoseconds(), @@ -82,14 +88,14 @@ func (h *AppendableHistory) AppendRange(key string, withPrefix bool, start, end func (h *AppendableHistory) AppendPut(key, value string, start, end time.Duration, resp *clientv3.PutResponse, err error) { request := putRequest(key, value) if err != nil { - h.appendFailed(request, start, err) + h.appendFailed(request, start.Nanoseconds(), err) return } var revision int64 if resp != nil && resp.Header != nil { revision = resp.Header.Revision } - h.successful = append(h.successful, porcupine.Operation{ + h.appendSuccessful(porcupine.Operation{ ClientId: h.id, Input: request, Call: start.Nanoseconds(), @@ -101,14 +107,14 @@ func (h *AppendableHistory) AppendPut(key, value string, start, end time.Duratio func (h *AppendableHistory) AppendPutWithLease(key, value string, leaseID int64, start, end time.Duration, resp *clientv3.PutResponse, err error) { request := putWithLeaseRequest(key, value, leaseID) if err != nil { - h.appendFailed(request, start, err) + h.appendFailed(request, start.Nanoseconds(), err) return } var revision int64 if resp != nil && resp.Header != nil { revision = resp.Header.Revision } - h.successful = append(h.successful, porcupine.Operation{ + h.appendSuccessful(porcupine.Operation{ ClientId: h.id, Input: request, Call: start.Nanoseconds(), @@ -124,14 +130,14 @@ func (h *AppendableHistory) AppendLeaseGrant(start, end time.Duration, resp *cli } request := leaseGrantRequest(leaseID) if err != nil { - h.appendFailed(request, start, err) + h.appendFailed(request, start.Nanoseconds(), err) return } var revision int64 if resp != nil && resp.ResponseHeader != nil { revision = resp.ResponseHeader.Revision } - h.successful = append(h.successful, porcupine.Operation{ + h.appendSuccessful(porcupine.Operation{ ClientId: h.id, Input: request, Call: start.Nanoseconds(), @@ -143,14 +149,14 @@ func (h *AppendableHistory) AppendLeaseGrant(start, end time.Duration, resp *cli func (h *AppendableHistory) AppendLeaseRevoke(id int64, start, end time.Duration, resp *clientv3.LeaseRevokeResponse, err error) { request := leaseRevokeRequest(id) if err != nil { - h.appendFailed(request, start, err) + h.appendFailed(request, start.Nanoseconds(), err) return } var revision int64 if resp != nil && resp.Header != nil { revision = resp.Header.Revision } - h.successful = append(h.successful, porcupine.Operation{ + h.appendSuccessful(porcupine.Operation{ ClientId: h.id, Input: request, Call: start.Nanoseconds(), @@ -162,7 +168,7 @@ func (h *AppendableHistory) AppendLeaseRevoke(id int64, start, end time.Duration func (h *AppendableHistory) AppendDelete(key string, start, end time.Duration, resp *clientv3.DeleteResponse, err error) { request := deleteRequest(key) if err != nil { - h.appendFailed(request, start, err) + h.appendFailed(request, start.Nanoseconds(), err) return } var revision int64 @@ -171,7 +177,7 @@ func (h *AppendableHistory) AppendDelete(key string, start, end time.Duration, r revision = resp.Header.Revision deleted = resp.Deleted } - h.successful = append(h.successful, porcupine.Operation{ + h.appendSuccessful(porcupine.Operation{ ClientId: h.id, Input: request, Call: start.Nanoseconds(), @@ -183,7 +189,7 @@ func (h *AppendableHistory) AppendDelete(key string, start, end time.Duration, r func (h *AppendableHistory) AppendCompareRevisionAndDelete(key string, expectedRevision int64, start, end time.Duration, resp *clientv3.TxnResponse, err error) { request := compareRevisionAndDeleteRequest(key, expectedRevision) if err != nil { - h.appendFailed(request, start, err) + h.appendFailed(request, start.Nanoseconds(), err) return } var revision int64 @@ -194,7 +200,7 @@ func (h *AppendableHistory) AppendCompareRevisionAndDelete(key string, expectedR if resp != nil && len(resp.Responses) > 0 { deleted = resp.Responses[0].GetResponseDeleteRange().Deleted } - h.successful = append(h.successful, porcupine.Operation{ + h.appendSuccessful(porcupine.Operation{ ClientId: h.id, Input: request, Call: start.Nanoseconds(), @@ -206,14 +212,14 @@ func (h *AppendableHistory) AppendCompareRevisionAndDelete(key string, expectedR func (h *AppendableHistory) AppendCompareRevisionAndPut(key string, expectedRevision int64, value string, start, end time.Duration, resp *clientv3.TxnResponse, err error) { request := compareRevisionAndPutRequest(key, expectedRevision, value) if err != nil { - h.appendFailed(request, start, err) + h.appendFailed(request, start.Nanoseconds(), err) return } var revision int64 if resp != nil && resp.Header != nil { revision = resp.Header.Revision } - h.successful = append(h.successful, porcupine.Operation{ + h.appendSuccessful(porcupine.Operation{ ClientId: h.id, Input: request, Call: start.Nanoseconds(), @@ -233,7 +239,7 @@ func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, onSuccess []clientv3.O } request := txnRequest(conds, ops) if err != nil { - h.appendFailed(request, start, err) + h.appendFailed(request, start.Nanoseconds(), err) return } var revision int64 @@ -244,7 +250,7 @@ func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, onSuccess []clientv3.O for _, resp := range resp.Responses { results = append(results, toEtcdOperationResult(resp)) } - h.successful = append(h.successful, porcupine.Operation{ + h.appendSuccessful(porcupine.Operation{ ClientId: h.id, Input: request, Call: start.Nanoseconds(), @@ -253,6 +259,28 @@ func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, onSuccess []clientv3.O }) } +func (h *AppendableHistory) appendSuccessful(op porcupine.Operation) { + if op.Call >= op.Return { + panic(fmt.Sprintf("Invalid operation, call(%d) >= return(%d)", op.Call, op.Return)) + } + if len(h.successful) > 0 { + prevSuccessful := h.successful[len(h.successful)-1] + if op.Call <= prevSuccessful.Call { + panic(fmt.Sprintf("Out of order append, new.call(%d) <= prev.call(%d)", op.Call, prevSuccessful.Call)) + } + if op.Call <= prevSuccessful.Return { + panic(fmt.Sprintf("Overlapping operations, new.call(%d) <= prev.return(%d)", op.Call, prevSuccessful.Return)) + } + } + if len(h.failed) > 0 { + prevFailed := h.failed[len(h.failed)-1] + if op.Call <= prevFailed.Call { + panic(fmt.Sprintf("Out of order append, new.call(%d) <= prev.call(%d)", op.Call, prevFailed.Call)) + } + } + h.successful = append(h.successful, op) +} + func toEtcdCondition(cmp clientv3.Cmp) (cond EtcdCondition) { switch { case cmp.Result == etcdserverpb.Compare_EQUAL && cmp.Target == etcdserverpb.Compare_MOD: @@ -316,14 +344,14 @@ func toEtcdOperationResult(resp *etcdserverpb.ResponseOp) EtcdOperationResult { func (h *AppendableHistory) AppendDefragment(start, end time.Duration, resp *clientv3.DefragmentResponse, err error) { request := defragmentRequest() if err != nil { - h.appendFailed(request, start, err) + h.appendFailed(request, start.Nanoseconds(), err) return } var revision int64 if resp != nil && resp.Header != nil { revision = resp.Header.Revision } - h.successful = append(h.successful, porcupine.Operation{ + h.appendSuccessful(porcupine.Operation{ ClientId: h.id, Input: request, Call: start.Nanoseconds(), @@ -332,11 +360,26 @@ func (h *AppendableHistory) AppendDefragment(start, end time.Duration, resp *cli }) } -func (h *AppendableHistory) appendFailed(request EtcdRequest, start time.Duration, err error) { +func (h *AppendableHistory) appendFailed(request EtcdRequest, call int64, err error) { + if len(h.successful) > 0 { + prevSuccessful := h.successful[len(h.successful)-1] + if call <= prevSuccessful.Call { + panic(fmt.Sprintf("Out of order append, new.call(%d) <= prev.call(%d)", call, prevSuccessful.Call)) + } + if call <= prevSuccessful.Return { + panic(fmt.Sprintf("Overlapping operations, new.call(%d) <= prev.return(%d)", call, prevSuccessful.Return)) + } + } + if len(h.failed) > 0 { + prevFailed := h.failed[len(h.failed)-1] + if call <= prevFailed.Call { + panic(fmt.Sprintf("Out of order append, new.call(%d) <= prev.call(%d)", call, prevFailed.Call)) + } + } h.failed = append(h.failed, porcupine.Operation{ ClientId: h.id, Input: request, - Call: start.Nanoseconds(), + Call: call, Output: failedResponse(err), Return: 0, // For failed writes we don't know when request has really finished. }) diff --git a/tests/robustness/traffic/client.go b/tests/robustness/traffic/client.go index c008e22ca..289dbfeed 100644 --- a/tests/robustness/traffic/client.go +++ b/tests/robustness/traffic/client.go @@ -17,6 +17,7 @@ package traffic import ( "context" "fmt" + "sync" "time" "go.uber.org/zap" @@ -27,10 +28,15 @@ import ( "go.etcd.io/etcd/tests/v3/robustness/model" ) +// RecordingClient provides a semi etcd client (different interface than +// clientv3.Client) that records all the requests and responses made. Doesn't +// allow for concurrent requests to ensure correct appending to history. type RecordingClient struct { client clientv3.Client - history *model.AppendableHistory baseTime time.Time + // mux ensures order of request appending. + mux sync.Mutex + history *model.AppendableHistory } func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*RecordingClient, error) { @@ -54,6 +60,10 @@ func (c *RecordingClient) Close() error { return c.client.Close() } +func (c *RecordingClient) Operations() model.History { + return c.history.History +} + func (c *RecordingClient) Get(ctx context.Context, key string) (*mvccpb.KeyValue, error) { resp, err := c.Range(ctx, key, false) if err != nil || len(resp) == 0 { @@ -66,21 +76,25 @@ func (c *RecordingClient) Get(ctx context.Context, key string) (*mvccpb.KeyValue } func (c *RecordingClient) Range(ctx context.Context, key string, withPrefix bool) ([]*mvccpb.KeyValue, error) { - callTime := time.Since(c.baseTime) ops := []clientv3.OpOption{} if withPrefix { ops = append(ops, clientv3.WithPrefix()) } + c.mux.Lock() + defer c.mux.Unlock() + callTime := time.Since(c.baseTime) resp, err := c.client.Get(ctx, key, ops...) - returnTime := time.Since(c.baseTime) if err != nil { return nil, err } + returnTime := time.Since(c.baseTime) c.history.AppendRange(key, withPrefix, callTime, returnTime, resp) return resp.Kvs, nil } func (c *RecordingClient) Put(ctx context.Context, key, value string) error { + c.mux.Lock() + defer c.mux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Put(ctx, key, value) returnTime := time.Since(c.baseTime) @@ -89,6 +103,8 @@ func (c *RecordingClient) Put(ctx context.Context, key, value string) error { } func (c *RecordingClient) Delete(ctx context.Context, key string) error { + c.mux.Lock() + defer c.mux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Delete(ctx, key) returnTime := time.Since(c.baseTime) @@ -97,16 +113,22 @@ func (c *RecordingClient) Delete(ctx context.Context, key string) error { } func (c *RecordingClient) CompareRevisionAndDelete(ctx context.Context, key string, expectedRevision int64) error { + txn := c.compareRevisionTxn(ctx, key, expectedRevision, clientv3.OpDelete(key)) + c.mux.Lock() + defer c.mux.Unlock() callTime := time.Since(c.baseTime) - resp, err := c.compareRevisionTxn(ctx, key, expectedRevision, clientv3.OpDelete(key)).Commit() + resp, err := txn.Commit() returnTime := time.Since(c.baseTime) c.history.AppendCompareRevisionAndDelete(key, expectedRevision, callTime, returnTime, resp, err) return err } func (c *RecordingClient) CompareRevisionAndPut(ctx context.Context, key, value string, expectedRevision int64) error { + txn := c.compareRevisionTxn(ctx, key, expectedRevision, clientv3.OpPut(key, value)) + c.mux.Lock() + defer c.mux.Unlock() callTime := time.Since(c.baseTime) - resp, err := c.compareRevisionTxn(ctx, key, expectedRevision, clientv3.OpPut(key, value)).Commit() + resp, err := txn.Commit() returnTime := time.Since(c.baseTime) c.history.AppendCompareRevisionAndPut(key, expectedRevision, value, callTime, returnTime, resp, err) return err @@ -128,19 +150,23 @@ func (c *RecordingClient) compareRevisionTxn(ctx context.Context, key string, ex } func (c *RecordingClient) Txn(ctx context.Context, cmp []clientv3.Cmp, ops []clientv3.Op) error { - callTime := time.Since(c.baseTime) - txn := c.client.Txn(ctx) - resp, err := txn.If( + txn := c.client.Txn(ctx).If( cmp..., ).Then( ops..., - ).Commit() + ) + c.mux.Lock() + defer c.mux.Unlock() + callTime := time.Since(c.baseTime) + resp, err := txn.Commit() returnTime := time.Since(c.baseTime) c.history.AppendTxn(cmp, ops, callTime, returnTime, resp, err) return err } func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, error) { + c.mux.Lock() + defer c.mux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Lease.Grant(ctx, ttl) returnTime := time.Since(c.baseTime) @@ -153,6 +179,8 @@ func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, err } func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseId int64) error { + c.mux.Lock() + defer c.mux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Lease.Revoke(ctx, clientv3.LeaseID(leaseId)) returnTime := time.Since(c.baseTime) @@ -161,15 +189,19 @@ func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseId int64) error } func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value string, leaseId int64) error { - callTime := time.Since(c.baseTime) opts := clientv3.WithLease(clientv3.LeaseID(leaseId)) + c.mux.Lock() + defer c.mux.Unlock() + callTime := time.Since(c.baseTime) resp, err := c.client.Put(ctx, key, value, opts) returnTime := time.Since(c.baseTime) - c.history.AppendPutWithLease(key, value, int64(leaseId), callTime, returnTime, resp, err) + c.history.AppendPutWithLease(key, value, leaseId, callTime, returnTime, resp, err) return err } func (c *RecordingClient) Defragment(ctx context.Context) error { + c.mux.Lock() + defer c.mux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Defragment(ctx, c.client.Endpoints()[0]) returnTime := time.Since(c.baseTime) diff --git a/tests/robustness/traffic/traffic.go b/tests/robustness/traffic/traffic.go index c895ca6bb..c5fb93663 100644 --- a/tests/robustness/traffic/traffic.go +++ b/tests/robustness/traffic/traffic.go @@ -63,7 +63,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 config.traffic.Run(ctx, clientId, c, limiter, ids, lm, finish) mux.Lock() - h = h.Merge(c.history.History) + h = h.Merge(c.Operations()) mux.Unlock() }(c, i) } @@ -76,7 +76,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 if err != nil { t.Error(err) } - h = h.Merge(cc.history.History) + h = h.Merge(cc.Operations()) operations := h.Operations() lg.Info("Recorded operations", zap.Int("count", len(operations)))