diff --git a/tests/robustness/model/describe.go b/tests/robustness/model/describe.go index 03da88644..6cf370941 100644 --- a/tests/robustness/model/describe.go +++ b/tests/robustness/model/describe.go @@ -44,7 +44,7 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin func describeEtcdRequest(request EtcdRequest) string { switch request.Type { case Range: - return describeRangeRequest(request.Range.Key, request.Range.RangeOptions) + return describeRangeRequest(request.Range.Key, request.Range.Revision, request.Range.RangeOptions) case Txn: onSuccess := describeEtcdOperations(request.Txn.OperationsOnSuccess) if len(request.Txn.Conditions) != 0 { @@ -105,7 +105,7 @@ func describeTxnResponse(request *TxnRequest, response *TxnResponse) string { func describeEtcdOperation(op EtcdOperation) string { switch op.Type { case RangeOperation: - return describeRangeRequest(op.Key, op.RangeOptions) + return describeRangeRequest(op.Key, 0, op.RangeOptions) case PutOperation: if op.LeaseID != 0 { return fmt.Sprintf("put(%q, %s, %d)", op.Key, describeValueOrHash(op.Value), op.LeaseID) @@ -118,8 +118,11 @@ func describeEtcdOperation(op EtcdOperation) string { } } -func describeRangeRequest(key string, opts RangeOptions) string { +func describeRangeRequest(key string, revision int64, opts RangeOptions) string { kwargs := []string{} + if revision != 0 { + kwargs = append(kwargs, fmt.Sprintf("rev=%d", revision)) + } if opts.Limit != 0 { kwargs = append(kwargs, fmt.Sprintf("limit=%d", opts.Limit)) } diff --git a/tests/robustness/model/describe_test.go b/tests/robustness/model/describe_test.go index fdc380b83..66ce0be6b 100644 --- a/tests/robustness/model/describe_test.go +++ b/tests/robustness/model/describe_test.go @@ -134,6 +134,11 @@ func TestModelDescribe(t *testing.T) { resp: rangeResponse(nil, 0, 14), expectDescribe: `range("key14", limit=14) -> [], count: 0, rev: 14`, }, + { + req: staleRangeRequest("key15", true, 0, 15), + resp: rangeResponse(nil, 0, 15), + expectDescribe: `range("key15", rev=15) -> [], count: 0, rev: 15`, + }, } for _, tc := range tcs { assert.Equal(t, tc.expectDescribe, NonDeterministicModel.DescribeOperation(tc.req, tc.resp)) diff --git a/tests/robustness/model/deterministic.go b/tests/robustness/model/deterministic.go index f466cae8b..3a88ab4a5 100644 --- a/tests/robustness/model/deterministic.go +++ b/tests/robustness/model/deterministic.go @@ -16,6 +16,7 @@ package model import ( "encoding/json" + "errors" "fmt" "hash/fnv" "reflect" @@ -93,8 +94,15 @@ func (s etcdState) step(request EtcdRequest) (etcdState, MaybeEtcdResponse) { s.KeyValues = newKVs switch request.Type { case Range: - resp := s.getRange(request.Range.Key, request.Range.RangeOptions) - return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Range: &resp, Revision: s.Revision}} + if request.Range.Revision == 0 || request.Range.Revision == s.Revision { + resp := s.getRange(request.Range.Key, request.Range.RangeOptions) + return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Range: &resp, Revision: s.Revision}} + } else { + if request.Range.Revision > s.Revision { + return s, MaybeEtcdResponse{Err: EtcdFutureRevErr} + } + return s, MaybeEtcdResponse{PartialResponse: true, EtcdResponse: EtcdResponse{Revision: s.Revision}} + } case Txn: failure := false for _, cond := range request.Txn.Conditions { @@ -245,8 +253,7 @@ type EtcdRequest struct { type RangeRequest struct { Key string RangeOptions - // TODO: Implement stale read using revision - revision int64 + Revision int64 } type RangeOptions struct { @@ -304,6 +311,8 @@ type MaybeEtcdResponse struct { Err error } +var EtcdFutureRevErr = errors.New("future rev") + type EtcdResponse struct { Txn *TxnResponse Range *RangeResponse diff --git a/tests/robustness/model/deterministic_test.go b/tests/robustness/model/deterministic_test.go index ecdc00fe0..7a9ff0ab2 100644 --- a/tests/robustness/model/deterministic_test.go +++ b/tests/robustness/model/deterministic_test.go @@ -151,6 +151,34 @@ var commonTestScenarios = []modelTestCase{ {req: getRequest("key"), resp: getResponse("key", "012345678901234567890", 3, 3), expectFailure: true}, }, }, + { + name: "Stale Get doesn't need to match put if asking about old revision", + operations: []testOperation{ + {req: putRequest("key", "1"), resp: putResponse(2)}, + {req: staleGetRequest("key", 1), resp: getResponse("key", "2", 2, 2)}, + {req: staleGetRequest("key", 1), resp: getResponse("key", "1", 2, 2)}, + }, + }, + { + name: "Stale Get need to match put if asking about matching revision", + operations: []testOperation{ + {req: putRequest("key", "1"), resp: putResponse(2)}, + {req: staleGetRequest("key", 2), resp: getResponse("key", "1", 3, 2), expectFailure: true}, + {req: staleGetRequest("key", 2), resp: getResponse("key", "1", 2, 3), expectFailure: true}, + {req: staleGetRequest("key", 2), resp: getResponse("key", "2", 2, 2), expectFailure: true}, + {req: staleGetRequest("key", 2), resp: getResponse("key", "1", 2, 2)}, + }, + }, + { + name: "Stale Get need to have a proper response revision", + operations: []testOperation{ + {req: putRequest("key", "1"), resp: putResponse(2)}, + {req: staleGetRequest("key", 2), resp: getResponse("key", "1", 2, 3), expectFailure: true}, + {req: staleGetRequest("key", 2), resp: getResponse("key", "1", 2, 2)}, + {req: putRequest("key", "2"), resp: putResponse(3)}, + {req: staleGetRequest("key", 2), resp: getResponse("key", "1", 2, 3)}, + }, + }, { name: "Put must increase revision by 1", operations: []testOperation{ diff --git a/tests/robustness/model/history.go b/tests/robustness/model/history.go index bc5e7a040..5d5d40e1b 100644 --- a/tests/robustness/model/history.go +++ b/tests/robustness/model/history.go @@ -54,16 +54,16 @@ func NewAppendableHistory(ids identity.Provider) *AppendableHistory { } } -func (h *AppendableHistory) AppendRange(key string, withPrefix bool, start, end time.Duration, resp *clientv3.GetResponse) { - var revision int64 +func (h *AppendableHistory) AppendRange(key string, withPrefix bool, revision int64, start, end time.Duration, resp *clientv3.GetResponse) { + var respRevision int64 if resp != nil && resp.Header != nil { - revision = resp.Header.Revision + respRevision = resp.Header.Revision } h.appendSuccessful(porcupine.Operation{ ClientId: h.streamId, - Input: rangeRequest(key, withPrefix, 0), + Input: staleRangeRequest(key, withPrefix, 0, revision), Call: start.Nanoseconds(), - Output: rangeResponse(resp.Kvs, resp.Count, revision), + Output: rangeResponse(resp.Kvs, resp.Count, respRevision), Return: end.Nanoseconds(), }) } @@ -340,8 +340,16 @@ func getRequest(key string) EtcdRequest { return rangeRequest(key, false, 0) } +func staleGetRequest(key string, revision int64) EtcdRequest { + return staleRangeRequest(key, false, 0, revision) +} + func rangeRequest(key string, withPrefix bool, limit int64) EtcdRequest { - return EtcdRequest{Type: Range, Range: &RangeRequest{Key: key, RangeOptions: RangeOptions{WithPrefix: withPrefix, Limit: limit}}} + return staleRangeRequest(key, withPrefix, limit, 0) +} + +func staleRangeRequest(key string, withPrefix bool, limit, revision int64) EtcdRequest { + return EtcdRequest{Type: Range, Range: &RangeRequest{Key: key, RangeOptions: RangeOptions{WithPrefix: withPrefix, Limit: limit}, Revision: revision}} } func emptyGetResponse(revision int64) MaybeEtcdResponse { diff --git a/tests/robustness/traffic/client.go b/tests/robustness/traffic/client.go index 5462f749e..98bc83f51 100644 --- a/tests/robustness/traffic/client.go +++ b/tests/robustness/traffic/client.go @@ -106,22 +106,25 @@ func (r ClientReport) WatchEventCount() int { return count } -func (c *RecordingClient) Get(ctx context.Context, key string) (*mvccpb.KeyValue, error) { - resp, err := c.Range(ctx, key, false) - if err != nil || len(resp.Kvs) == 0 { - return nil, err +func (c *RecordingClient) Get(ctx context.Context, key string, revision int64) (kv *mvccpb.KeyValue, rev int64, err error) { + resp, err := c.Range(ctx, key, false, revision) + if err != nil { + return nil, 0, err } if len(resp.Kvs) == 1 { - return resp.Kvs[0], err + kv = resp.Kvs[0] } - panic(fmt.Sprintf("Unexpected response size: %d", len(resp.Kvs))) + return kv, resp.Header.Revision, nil } -func (c *RecordingClient) Range(ctx context.Context, key string, withPrefix bool) (*clientv3.GetResponse, error) { +func (c *RecordingClient) Range(ctx context.Context, key string, withPrefix bool, revision int64) (*clientv3.GetResponse, error) { ops := []clientv3.OpOption{} if withPrefix { ops = append(ops, clientv3.WithPrefix()) } + if revision != 0 { + ops = append(ops, clientv3.WithRev(revision)) + } c.opMux.Lock() defer c.opMux.Unlock() callTime := time.Since(c.baseTime) @@ -130,28 +133,28 @@ func (c *RecordingClient) Range(ctx context.Context, key string, withPrefix bool return nil, err } returnTime := time.Since(c.baseTime) - c.operations.AppendRange(key, withPrefix, callTime, returnTime, resp) + c.operations.AppendRange(key, withPrefix, revision, callTime, returnTime, resp) return resp, nil } -func (c *RecordingClient) Put(ctx context.Context, key, value string) error { +func (c *RecordingClient) Put(ctx context.Context, key, value string) (*clientv3.PutResponse, error) { c.opMux.Lock() defer c.opMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Put(ctx, key, value) returnTime := time.Since(c.baseTime) c.operations.AppendPut(key, value, callTime, returnTime, resp, err) - return err + return resp, err } -func (c *RecordingClient) Delete(ctx context.Context, key string) error { +func (c *RecordingClient) Delete(ctx context.Context, key string) (*clientv3.DeleteResponse, error) { c.opMux.Lock() defer c.opMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Delete(ctx, key) returnTime := time.Since(c.baseTime) c.operations.AppendDelete(key, callTime, returnTime, resp, err) - return nil + return resp, err } func (c *RecordingClient) Txn(ctx context.Context, conditions []clientv3.Cmp, onSuccess []clientv3.Op, onFailure []clientv3.Op) (*clientv3.TxnResponse, error) { @@ -171,31 +174,27 @@ func (c *RecordingClient) Txn(ctx context.Context, conditions []clientv3.Cmp, on return resp, err } -func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, error) { +func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (*clientv3.LeaseGrantResponse, error) { c.opMux.Lock() defer c.opMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Lease.Grant(ctx, ttl) returnTime := time.Since(c.baseTime) c.operations.AppendLeaseGrant(callTime, returnTime, resp, err) - var leaseId int64 - if resp != nil { - leaseId = int64(resp.ID) - } - return leaseId, err + return resp, err } -func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseId int64) error { +func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseId int64) (*clientv3.LeaseRevokeResponse, error) { c.opMux.Lock() defer c.opMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Lease.Revoke(ctx, clientv3.LeaseID(leaseId)) returnTime := time.Since(c.baseTime) c.operations.AppendLeaseRevoke(leaseId, callTime, returnTime, resp, err) - return err + return resp, err } -func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value string, leaseId int64) error { +func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value string, leaseId int64) (*clientv3.PutResponse, error) { opts := clientv3.WithLease(clientv3.LeaseID(leaseId)) c.opMux.Lock() defer c.opMux.Unlock() @@ -203,17 +202,17 @@ func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value st resp, err := c.client.Put(ctx, key, value, opts) returnTime := time.Since(c.baseTime) c.operations.AppendPutWithLease(key, value, leaseId, callTime, returnTime, resp, err) - return err + return resp, err } -func (c *RecordingClient) Defragment(ctx context.Context) error { +func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentResponse, error) { c.opMux.Lock() defer c.opMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Defragment(ctx, c.client.Endpoints()[0]) returnTime := time.Since(c.baseTime) c.operations.AppendDefragment(callTime, returnTime, resp, err) - return err + return resp, err } func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool) clientv3.WatchChan { diff --git a/tests/robustness/traffic/etcd.go b/tests/robustness/traffic/etcd.go index f9361a2ae..cfce3845c 100644 --- a/tests/robustness/traffic/etcd.go +++ b/tests/robustness/traffic/etcd.go @@ -37,8 +37,9 @@ var ( keyCount: 10, leaseTTL: DefaultLeaseTTL, largePutSize: 32769, - operations: []choiceWeight[etcdRequestType]{ - {choice: Get, weight: 50}, + requests: []choiceWeight[etcdRequestType]{ + {choice: Get, weight: 25}, + {choice: StaleGet, weight: 25}, {choice: Put, weight: 23}, {choice: LargePut, weight: 2}, {choice: Delete, weight: 5}, @@ -58,8 +59,9 @@ var ( keyCount: 10, largePutSize: 32769, leaseTTL: DefaultLeaseTTL, - operations: []choiceWeight[etcdRequestType]{ - {choice: Get, weight: 50}, + requests: []choiceWeight[etcdRequestType]{ + {choice: Get, weight: 25}, + {choice: StaleGet, weight: 25}, {choice: Put, weight: 40}, {choice: MultiOpTxn, weight: 5}, {choice: LargePut, weight: 5}, @@ -70,7 +72,7 @@ var ( type etcdTraffic struct { keyCount int - operations []choiceWeight[etcdRequestType] + requests []choiceWeight[etcdRequestType] leaseTTL int64 largePutSize int } @@ -83,6 +85,7 @@ type etcdRequestType string const ( Get etcdRequestType = "get" + StaleGet etcdRequestType = "staleGet" Put etcdRequestType = "put" LargePut etcdRequestType = "largePut" Delete etcdRequestType = "delete" @@ -95,6 +98,8 @@ const ( func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) { lastOperationSucceeded := true + var lastRev int64 + var requestType etcdRequestType for { select { case <-ctx.Done(): @@ -105,47 +110,59 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate. } key := fmt.Sprintf("%d", rand.Int()%t.keyCount) // Avoid multiple failed writes in a row - if !lastOperationSucceeded { - _, err := t.Read(ctx, c, key) - if err != nil { - continue - } - limiter.Wait(ctx) + if lastOperationSucceeded { + requestType = pickRandom(t.requests) + } else { + requestType = Get } - err := t.RandomOperation(ctx, c, limiter, key, ids, lm) + rev, err := t.Request(ctx, c, requestType, limiter, key, ids, lm, lastRev) lastOperationSucceeded = err == nil if err != nil { continue } + if rev != 0 { + lastRev = rev + } limiter.Wait(ctx) } } -func (t etcdTraffic) Read(ctx context.Context, c *RecordingClient, key string) (*mvccpb.KeyValue, error) { - getCtx, cancel := context.WithTimeout(ctx, RequestTimeout) - resp, err := c.Get(getCtx, key) - cancel() - return resp, err -} - -func (t etcdTraffic) RandomOperation(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage) error { +func (t etcdTraffic) Request(ctx context.Context, c *RecordingClient, request etcdRequestType, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage, lastRev int64) (rev int64, err error) { opCtx, cancel := context.WithTimeout(ctx, RequestTimeout) - var err error - switch pickRandom(t.operations) { + switch request { + case StaleGet: + _, rev, err = c.Get(opCtx, key, lastRev) case Get: - _, err = c.Get(opCtx, key) + _, rev, err = c.Get(opCtx, key, 0) case Put: - err = c.Put(opCtx, key, fmt.Sprintf("%d", id.NewRequestId())) + var resp *clientv3.PutResponse + resp, err = c.Put(opCtx, key, fmt.Sprintf("%d", id.NewRequestId())) + if resp != nil { + rev = resp.Header.Revision + } case LargePut: - err = c.Put(opCtx, key, randString(t.largePutSize)) + var resp *clientv3.PutResponse + resp, err = c.Put(opCtx, key, randString(t.largePutSize)) + if resp != nil { + rev = resp.Header.Revision + } case Delete: - err = c.Delete(opCtx, key) + var resp *clientv3.DeleteResponse + resp, err = c.Delete(opCtx, key) + if resp != nil { + rev = resp.Header.Revision + } case MultiOpTxn: - _, err = c.Txn(opCtx, nil, t.pickMultiTxnOps(id), nil) + var resp *clientv3.TxnResponse + resp, err = c.Txn(opCtx, nil, t.pickMultiTxnOps(id), nil) + if resp != nil { + rev = resp.Header.Revision + } + case CompareAndSet: var kv *mvccpb.KeyValue - kv, err = c.Get(opCtx, key) + kv, rev, err = c.Get(opCtx, key, 0) if err == nil { limiter.Wait(ctx) var expectedRevision int64 @@ -153,13 +170,22 @@ func (t etcdTraffic) RandomOperation(ctx context.Context, c *RecordingClient, li expectedRevision = kv.ModRevision } txnCtx, txnCancel := context.WithTimeout(ctx, RequestTimeout) - _, err = c.Txn(txnCtx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, fmt.Sprintf("%d", id.NewRequestId()))}, nil) + var resp *clientv3.TxnResponse + resp, err = c.Txn(txnCtx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, fmt.Sprintf("%d", id.NewRequestId()))}, nil) txnCancel() + if resp != nil { + rev = resp.Header.Revision + } } case PutWithLease: leaseId := lm.LeaseId(c.id) if leaseId == 0 { - leaseId, err = c.LeaseGrant(opCtx, t.leaseTTL) + var resp *clientv3.LeaseGrantResponse + resp, err = c.LeaseGrant(opCtx, t.leaseTTL) + if resp != nil { + leaseId = int64(resp.ID) + rev = resp.ResponseHeader.Revision + } if err == nil { lm.AddLeaseId(c.id, leaseId) limiter.Wait(ctx) @@ -167,25 +193,37 @@ func (t etcdTraffic) RandomOperation(ctx context.Context, c *RecordingClient, li } if leaseId != 0 { putCtx, putCancel := context.WithTimeout(ctx, RequestTimeout) - err = c.PutWithLease(putCtx, key, fmt.Sprintf("%d", id.NewRequestId()), leaseId) + var resp *clientv3.PutResponse + resp, err = c.PutWithLease(putCtx, key, fmt.Sprintf("%d", id.NewRequestId()), leaseId) putCancel() + if resp != nil { + rev = resp.Header.Revision + } } case LeaseRevoke: leaseId := lm.LeaseId(c.id) if leaseId != 0 { - err = c.LeaseRevoke(opCtx, leaseId) + var resp *clientv3.LeaseRevokeResponse + resp, err = c.LeaseRevoke(opCtx, leaseId) //if LeaseRevoke has failed, do not remove the mapping. if err == nil { lm.RemoveLeaseId(c.id) } + if resp != nil { + rev = resp.Header.Revision + } } case Defragment: - err = c.Defragment(opCtx) + var resp *clientv3.DefragmentResponse + resp, err = c.Defragment(opCtx) + if resp != nil { + rev = resp.Header.Revision + } default: panic("invalid choice") } cancel() - return err + return rev, err } func (t etcdTraffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op) { diff --git a/tests/robustness/traffic/kubernetes.go b/tests/robustness/traffic/kubernetes.go index d3d26d91d..b4f3f9f5e 100644 --- a/tests/robustness/traffic/kubernetes.go +++ b/tests/robustness/traffic/kubernetes.go @@ -169,7 +169,7 @@ type kubernetesClient struct { } func (k kubernetesClient) List(ctx context.Context, key string) (*clientv3.GetResponse, error) { - resp, err := k.client.Range(ctx, key, true) + resp, err := k.client.Range(ctx, key, true, 0) if err != nil { return nil, err } diff --git a/tests/robustness/traffic/traffic.go b/tests/robustness/traffic/traffic.go index 02dade953..f2f6cbdeb 100644 --- a/tests/robustness/traffic/traffic.go +++ b/tests/robustness/traffic/traffic.go @@ -70,7 +70,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 // Ensure that last operation is succeeds time.Sleep(time.Second) - err = cc.Put(ctx, "tombstone", "true") + _, err = cc.Put(ctx, "tombstone", "true") if err != nil { t.Error(err) } diff --git a/tests/robustness/validate/validate.go b/tests/robustness/validate/validate.go index 7096dcde2..d7a9a806d 100644 --- a/tests/robustness/validate/validate.go +++ b/tests/robustness/validate/validate.go @@ -27,6 +27,7 @@ import ( // ValidateAndReturnVisualize return visualize as porcupine.linearizationInfo used to generate visualization is private. func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []traffic.ClientReport) (visualize func(basepath string)) { validateWatch(t, cfg, reports) + // TODO: Validate stale reads responses. allOperations := operations(reports) watchEvents := uniqueWatchEvents(reports) newOperations := patchOperationsWithWatchEvents(allOperations, watchEvents)