diff --git a/tests/robustness/client.go b/tests/robustness/client.go index 8f236ea7f..b8486d1f9 100644 --- a/tests/robustness/client.go +++ b/tests/robustness/client.go @@ -96,8 +96,23 @@ func (c *recordingClient) Delete(ctx context.Context, key string) error { return nil } -func (c *recordingClient) CompareAndSet(ctx context.Context, key, value string, expectedRevision int64) error { +func (c *recordingClient) CompareRevisionAndDelete(ctx context.Context, key string, expectedRevision int64) error { callTime := time.Since(c.baseTime) + resp, err := c.compareRevisionTxn(ctx, key, expectedRevision, clientv3.OpDelete(key)).Commit() + returnTime := time.Since(c.baseTime) + c.history.AppendCompareRevisionAndDelete(key, expectedRevision, callTime, returnTime, resp, err) + return err +} + +func (c *recordingClient) CompareRevisionAndPut(ctx context.Context, key, value string, expectedRevision int64) error { + callTime := time.Since(c.baseTime) + resp, err := c.compareRevisionTxn(ctx, key, expectedRevision, clientv3.OpPut(key, value)).Commit() + returnTime := time.Since(c.baseTime) + c.history.AppendCompareRevisionAndPut(key, expectedRevision, value, callTime, returnTime, resp, err) + return err +} + +func (c *recordingClient) compareRevisionTxn(ctx context.Context, key string, expectedRevision int64, op clientv3.Op) clientv3.Txn { txn := c.client.Txn(ctx) var cmp clientv3.Cmp if expectedRevision == 0 { @@ -105,14 +120,11 @@ func (c *recordingClient) CompareAndSet(ctx context.Context, key, value string, } else { cmp = clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision) } - resp, err := txn.If( + return txn.If( cmp, ).Then( - clientv3.OpPut(key, value), - ).Commit() - returnTime := time.Since(c.baseTime) - c.history.AppendCompareAndSet(key, expectedRevision, value, callTime, returnTime, resp, err) - return err + op, + ) } func (c *recordingClient) Txn(ctx context.Context, cmp []clientv3.Cmp, ops []clientv3.Op) error { diff --git a/tests/robustness/linearizability_test.go b/tests/robustness/linearizability_test.go index c2b965786..5a441cdf4 100644 --- a/tests/robustness/linearizability_test.go +++ b/tests/robustness/linearizability_test.go @@ -41,18 +41,18 @@ var ( maximalQPS: 200, clientCount: 8, requestProgress: false, - traffic: traffic{ + traffic: etcdTraffic{ keyCount: 10, leaseTTL: DefaultLeaseTTL, largePutSize: 32769, - writes: []requestChance{ - {operation: Put, chance: 45}, - {operation: LargePut, chance: 5}, - {operation: Delete, chance: 10}, - {operation: MultiOpTxn, chance: 10}, - {operation: PutWithLease, chance: 10}, - {operation: LeaseRevoke, chance: 10}, - {operation: CompareAndSet, chance: 10}, + writeChoices: []choiceWeight{ + {choice: string(Put), weight: 45}, + {choice: string(LargePut), weight: 5}, + {choice: string(Delete), weight: 10}, + {choice: string(MultiOpTxn), weight: 10}, + {choice: string(PutWithLease), weight: 10}, + {choice: string(LeaseRevoke), weight: 10}, + {choice: string(CompareAndSet), weight: 10}, }, }, } @@ -62,14 +62,14 @@ var ( maximalQPS: 1000, clientCount: 12, requestProgress: false, - traffic: traffic{ + traffic: etcdTraffic{ keyCount: 10, largePutSize: 32769, leaseTTL: DefaultLeaseTTL, - writes: []requestChance{ - {operation: Put, chance: 85}, - {operation: MultiOpTxn, chance: 10}, - {operation: LargePut, chance: 5}, + writeChoices: []choiceWeight{ + {choice: string(Put), weight: 85}, + {choice: string(MultiOpTxn), weight: 10}, + {choice: string(LargePut), weight: 5}, }, }, } @@ -79,7 +79,14 @@ var ( maximalQPS: 1000, clientCount: 12, traffic: kubernetesTraffic{ - keyCount: 5, + averageKeyCount: 5, + resource: "pods", + namespace: "default", + writeChoices: []choiceWeight{ + {choice: string(KubernetesUpdate), weight: 75}, + {choice: string(KubernetesDelete), weight: 15}, + {choice: string(KubernetesCreate), weight: 10}, + }, }, } ReqProgTraffic = trafficConfig{ @@ -88,13 +95,13 @@ var ( maximalQPS: 1000, clientCount: 12, requestProgress: true, - traffic: traffic{ + traffic: etcdTraffic{ keyCount: 10, largePutSize: 8196, leaseTTL: DefaultLeaseTTL, - writes: []requestChance{ - {operation: Put, chance: 95}, - {operation: LargePut, chance: 5}, + writeChoices: []choiceWeight{ + {choice: string(Put), weight: 95}, + {choice: string(LargePut), weight: 5}, }, }, } diff --git a/tests/robustness/model/describe_test.go b/tests/robustness/model/describe_test.go index 88928f30c..7482e2167 100644 --- a/tests/robustness/model/describe_test.go +++ b/tests/robustness/model/describe_test.go @@ -80,17 +80,17 @@ func TestModelDescribe(t *testing.T) { expectDescribe: `delete("key6") -> err: "failed"`, }, { - req: compareAndSetRequest("key7", 7, "77"), - resp: compareAndSetResponse(false, 7), + req: compareRevisionAndPutRequest("key7", 7, "77"), + resp: compareRevisionAndPutResponse(false, 7), expectDescribe: `if(mod_rev(key7)==7).then(put("key7", "77")) -> txn failed, rev: 7`, }, { - req: compareAndSetRequest("key8", 8, "88"), - resp: compareAndSetResponse(true, 8), + req: compareRevisionAndPutRequest("key8", 8, "88"), + resp: compareRevisionAndPutResponse(true, 8), expectDescribe: `if(mod_rev(key8)==8).then(put("key8", "88")) -> ok, rev: 8`, }, { - req: compareAndSetRequest("key9", 9, "99"), + req: compareRevisionAndPutRequest("key9", 9, "99"), resp: failedResponse(errors.New("failed")), expectDescribe: `if(mod_rev(key9)==9).then(put("key9", "99")) -> err: "failed"`, }, diff --git a/tests/robustness/model/deterministic_test.go b/tests/robustness/model/deterministic_test.go index 23ca7e799..72c6aa04c 100644 --- a/tests/robustness/model/deterministic_test.go +++ b/tests/robustness/model/deterministic_test.go @@ -66,7 +66,7 @@ func TestModelBase(t *testing.T) { { name: "First Txn can start from non-zero revision", operations: []testOperation{ - {req: compareAndSetRequest("key", 0, "42"), resp: compareAndSetResponse(false, 42).EtcdResponse}, + {req: compareRevisionAndPutRequest("key", 0, "42"), resp: compareRevisionAndPutResponse(false, 42).EtcdResponse}, }, }, { @@ -157,10 +157,10 @@ func TestModelBase(t *testing.T) { name: "Txn sets new value if value matches expected", operations: []testOperation{ {req: getRequest("key"), resp: getResponse("key", "1", 1, 1).EtcdResponse}, - {req: compareAndSetRequest("key", 1, "2"), resp: compareAndSetResponse(true, 1).EtcdResponse, failure: true}, - {req: compareAndSetRequest("key", 1, "2"), resp: compareAndSetResponse(false, 2).EtcdResponse, failure: true}, - {req: compareAndSetRequest("key", 1, "2"), resp: compareAndSetResponse(false, 1).EtcdResponse, failure: true}, - {req: compareAndSetRequest("key", 1, "2"), resp: compareAndSetResponse(true, 2).EtcdResponse}, + {req: compareRevisionAndPutRequest("key", 1, "2"), resp: compareRevisionAndPutResponse(true, 1).EtcdResponse, failure: true}, + {req: compareRevisionAndPutRequest("key", 1, "2"), resp: compareRevisionAndPutResponse(false, 2).EtcdResponse, failure: true}, + {req: compareRevisionAndPutRequest("key", 1, "2"), resp: compareRevisionAndPutResponse(false, 1).EtcdResponse, failure: true}, + {req: compareRevisionAndPutRequest("key", 1, "2"), resp: compareRevisionAndPutResponse(true, 2).EtcdResponse}, {req: getRequest("key"), resp: getResponse("key", "1", 1, 1).EtcdResponse, failure: true}, {req: getRequest("key"), resp: getResponse("key", "1", 1, 2).EtcdResponse, failure: true}, {req: getRequest("key"), resp: getResponse("key", "1", 2, 2).EtcdResponse, failure: true}, @@ -172,19 +172,19 @@ func TestModelBase(t *testing.T) { name: "Txn can expect on empty key", operations: []testOperation{ {req: getRequest("key1"), resp: emptyGetResponse(1).EtcdResponse}, - {req: compareAndSetRequest("key1", 0, "2"), resp: compareAndSetResponse(true, 2).EtcdResponse}, - {req: compareAndSetRequest("key2", 0, "3"), resp: compareAndSetResponse(true, 3).EtcdResponse}, - {req: compareAndSetRequest("key3", 4, "4"), resp: compareAndSetResponse(false, 4).EtcdResponse, failure: true}, + {req: compareRevisionAndPutRequest("key1", 0, "2"), resp: compareRevisionAndPutResponse(true, 2).EtcdResponse}, + {req: compareRevisionAndPutRequest("key2", 0, "3"), resp: compareRevisionAndPutResponse(true, 3).EtcdResponse}, + {req: compareRevisionAndPutRequest("key3", 4, "4"), resp: compareRevisionAndPutResponse(false, 4).EtcdResponse, failure: true}, }, }, { name: "Txn doesn't do anything if value doesn't match expected", operations: []testOperation{ {req: getRequest("key"), resp: getResponse("key", "1", 1, 1).EtcdResponse}, - {req: compareAndSetRequest("key", 2, "3"), resp: compareAndSetResponse(true, 2).EtcdResponse, failure: true}, - {req: compareAndSetRequest("key", 2, "3"), resp: compareAndSetResponse(true, 1).EtcdResponse, failure: true}, - {req: compareAndSetRequest("key", 2, "3"), resp: compareAndSetResponse(false, 2).EtcdResponse, failure: true}, - {req: compareAndSetRequest("key", 2, "3"), resp: compareAndSetResponse(false, 1).EtcdResponse}, + {req: compareRevisionAndPutRequest("key", 2, "3"), resp: compareRevisionAndPutResponse(true, 2).EtcdResponse, failure: true}, + {req: compareRevisionAndPutRequest("key", 2, "3"), resp: compareRevisionAndPutResponse(true, 1).EtcdResponse, failure: true}, + {req: compareRevisionAndPutRequest("key", 2, "3"), resp: compareRevisionAndPutResponse(false, 2).EtcdResponse, failure: true}, + {req: compareRevisionAndPutRequest("key", 2, "3"), resp: compareRevisionAndPutResponse(false, 1).EtcdResponse}, {req: getRequest("key"), resp: getResponse("key", "2", 1, 1).EtcdResponse, failure: true}, {req: getRequest("key"), resp: getResponse("key", "2", 2, 2).EtcdResponse, failure: true}, {req: getRequest("key"), resp: getResponse("key", "3", 1, 1).EtcdResponse, failure: true}, @@ -317,7 +317,7 @@ func TestModelBase(t *testing.T) { {req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3).EtcdResponse}, {req: putRequest("key", "4"), resp: putResponse(4).EtcdResponse}, {req: getRequest("key"), resp: getResponse("key", "4", 4, 4).EtcdResponse}, - {req: compareAndSetRequest("key", 4, "5"), resp: compareAndSetResponse(true, 5).EtcdResponse}, + {req: compareRevisionAndPutRequest("key", 4, "5"), resp: compareRevisionAndPutResponse(true, 5).EtcdResponse}, {req: deleteRequest("key"), resp: deleteResponse(1, 6).EtcdResponse}, {req: defragmentRequest(), resp: defragmentResponse(6).EtcdResponse}, }, @@ -336,7 +336,7 @@ func TestModelBase(t *testing.T) { {req: defragmentRequest(), resp: defragmentResponse(4).EtcdResponse}, {req: getRequest("key"), resp: getResponse("key", "4", 4, 4).EtcdResponse}, {req: defragmentRequest(), resp: defragmentResponse(4).EtcdResponse}, - {req: compareAndSetRequest("key", 4, "5"), resp: compareAndSetResponse(true, 5).EtcdResponse}, + {req: compareRevisionAndPutRequest("key", 4, "5"), resp: compareRevisionAndPutResponse(true, 5).EtcdResponse}, {req: defragmentRequest(), resp: defragmentResponse(5).EtcdResponse}, {req: deleteRequest("key"), resp: deleteResponse(1, 6).EtcdResponse}, {req: defragmentRequest(), resp: defragmentResponse(6).EtcdResponse}, diff --git a/tests/robustness/model/history.go b/tests/robustness/model/history.go index cda2c1306..01e2dbf4e 100644 --- a/tests/robustness/model/history.go +++ b/tests/robustness/model/history.go @@ -180,8 +180,31 @@ func (h *AppendableHistory) AppendDelete(key string, start, end time.Duration, r }) } -func (h *AppendableHistory) AppendCompareAndSet(key string, expectedRevision int64, value string, start, end time.Duration, resp *clientv3.TxnResponse, err error) { - request := compareAndSetRequest(key, expectedRevision, value) +func (h *AppendableHistory) AppendCompareRevisionAndDelete(key string, expectedRevision int64, start, end time.Duration, resp *clientv3.TxnResponse, err error) { + request := compareRevisionAndDeleteRequest(key, expectedRevision) + if err != nil { + h.appendFailed(request, start, err) + return + } + var revision int64 + if resp != nil && resp.Header != nil { + revision = resp.Header.Revision + } + var deleted int64 + if resp != nil && len(resp.Responses) > 0 { + deleted = resp.Responses[0].GetResponseDeleteRange().Deleted + } + h.successful = append(h.successful, porcupine.Operation{ + ClientId: h.id, + Input: request, + Call: start.Nanoseconds(), + Output: compareRevisionAndDeleteResponse(resp.Succeeded, deleted, revision), + Return: end.Nanoseconds(), + }) + +} +func (h *AppendableHistory) AppendCompareRevisionAndPut(key string, expectedRevision int64, value string, start, end time.Duration, resp *clientv3.TxnResponse, err error) { + request := compareRevisionAndPutRequest(key, expectedRevision, value) if err != nil { h.appendFailed(request, start, err) return @@ -194,7 +217,7 @@ func (h *AppendableHistory) AppendCompareAndSet(key string, expectedRevision int ClientId: h.id, Input: request, Call: start.Nanoseconds(), - Output: compareAndSetResponse(resp.Succeeded, revision), + Output: compareRevisionAndPutResponse(resp.Succeeded, revision), Return: end.Nanoseconds(), }) } @@ -376,11 +399,15 @@ func deleteResponse(deleted int64, revision int64) EtcdNonDeterministicResponse return EtcdNonDeterministicResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{OpsResult: []EtcdOperationResult{{Deleted: deleted}}}, Revision: revision}} } -func compareAndSetRequest(key string, expectedRevision int64, value string) EtcdRequest { +func compareRevisionAndDeleteRequest(key string, expectedRevision int64) EtcdRequest { + return txnRequest([]EtcdCondition{{Key: key, ExpectedRevision: expectedRevision}}, []EtcdOperation{{Type: Delete, Key: key}}) +} + +func compareRevisionAndPutRequest(key string, expectedRevision int64, value string) EtcdRequest { return txnRequest([]EtcdCondition{{Key: key, ExpectedRevision: expectedRevision}}, []EtcdOperation{{Type: Put, Key: key, Value: ToValueOrHash(value)}}) } -func compareAndSetResponse(succeeded bool, revision int64) EtcdNonDeterministicResponse { +func compareRevisionAndPutResponse(succeeded bool, revision int64) EtcdNonDeterministicResponse { var result []EtcdOperationResult if succeeded { result = []EtcdOperationResult{{}} @@ -388,6 +415,14 @@ func compareAndSetResponse(succeeded bool, revision int64) EtcdNonDeterministicR return txnResponse(result, succeeded, revision) } +func compareRevisionAndDeleteResponse(succeeded bool, deleted, revision int64) EtcdNonDeterministicResponse { + var result []EtcdOperationResult + if succeeded { + result = []EtcdOperationResult{{Deleted: deleted}} + } + return txnResponse(result, succeeded, revision) +} + func txnRequest(conds []EtcdCondition, onSuccess []EtcdOperation) EtcdRequest { return EtcdRequest{Type: Txn, Txn: &TxnRequest{Conds: conds, Ops: onSuccess}} } diff --git a/tests/robustness/model/non_deterministic_test.go b/tests/robustness/model/non_deterministic_test.go index ec26671a1..4a7183169 100644 --- a/tests/robustness/model/non_deterministic_test.go +++ b/tests/robustness/model/non_deterministic_test.go @@ -69,7 +69,7 @@ func TestModelNonDeterministic(t *testing.T) { { name: "First Txn can start from non-zero revision", operations: []testOperation{ - {req: compareAndSetRequest("key", 0, "42"), resp: compareAndSetResponse(false, 42)}, + {req: compareRevisionAndPutRequest("key", 0, "42"), resp: compareRevisionAndPutResponse(false, 42)}, }, }, { @@ -159,11 +159,11 @@ func TestModelNonDeterministic(t *testing.T) { // Txn failure {req: getRequest("key"), resp: emptyGetResponse(1)}, {req: putRequest("key", "1"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", 2, "3"), resp: compareAndSetResponse(false, 1)}, + {req: compareRevisionAndPutRequest("key", 2, "3"), resp: compareRevisionAndPutResponse(false, 1)}, // Txn success {req: putRequest("key", "2"), resp: putResponse(2)}, {req: putRequest("key", "4"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", 2, "5"), resp: compareAndSetResponse(true, 3)}, + {req: compareRevisionAndPutRequest("key", 2, "5"), resp: compareRevisionAndPutResponse(true, 3)}, }, }, { @@ -213,11 +213,11 @@ func TestModelNonDeterministic(t *testing.T) { // Txn success {req: getRequest("key"), resp: emptyGetResponse(1)}, {req: putRequest("key", "2"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", 2, ""), resp: compareAndSetResponse(true, 2), failure: true}, - {req: compareAndSetRequest("key", 2, ""), resp: compareAndSetResponse(true, 3)}, + {req: compareRevisionAndPutRequest("key", 2, ""), resp: compareRevisionAndPutResponse(true, 2), failure: true}, + {req: compareRevisionAndPutRequest("key", 2, ""), resp: compareRevisionAndPutResponse(true, 3)}, // Txn failure {req: putRequest("key", "4"), resp: putResponse(4)}, - {req: compareAndSetRequest("key", 5, ""), resp: compareAndSetResponse(false, 4)}, + {req: compareRevisionAndPutRequest("key", 5, ""), resp: compareRevisionAndPutResponse(false, 4)}, {req: putRequest("key", "5"), resp: failedResponse(errors.New("failed"))}, {req: getRequest("key"), resp: getResponse("key", "5", 5, 5)}, }, @@ -327,21 +327,21 @@ func TestModelNonDeterministic(t *testing.T) { // Txn success {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, {req: deleteRequest("key"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", 0, "3"), resp: compareAndSetResponse(true, 3)}, + {req: compareRevisionAndPutRequest("key", 0, "3"), resp: compareRevisionAndPutResponse(true, 3)}, // Txn failure {req: putRequest("key", "4"), resp: putResponse(4)}, {req: deleteRequest("key"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", 4, "5"), resp: compareAndSetResponse(false, 5)}, + {req: compareRevisionAndPutRequest("key", 4, "5"), resp: compareRevisionAndPutResponse(false, 5)}, }, }, { name: "Txn sets new value if value matches expected", operations: []testOperation{ {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, - {req: compareAndSetRequest("key", 1, "2"), resp: compareAndSetResponse(true, 1), failure: true}, - {req: compareAndSetRequest("key", 1, "2"), resp: compareAndSetResponse(false, 2), failure: true}, - {req: compareAndSetRequest("key", 1, "2"), resp: compareAndSetResponse(false, 1), failure: true}, - {req: compareAndSetRequest("key", 1, "2"), resp: compareAndSetResponse(true, 2)}, + {req: compareRevisionAndPutRequest("key", 1, "2"), resp: compareRevisionAndPutResponse(true, 1), failure: true}, + {req: compareRevisionAndPutRequest("key", 1, "2"), resp: compareRevisionAndPutResponse(false, 2), failure: true}, + {req: compareRevisionAndPutRequest("key", 1, "2"), resp: compareRevisionAndPutResponse(false, 1), failure: true}, + {req: compareRevisionAndPutRequest("key", 1, "2"), resp: compareRevisionAndPutResponse(true, 2)}, {req: getRequest("key"), resp: getResponse("key", "1", 1, 1), failure: true}, {req: getRequest("key"), resp: getResponse("key", "1", 1, 2), failure: true}, {req: getRequest("key"), resp: getResponse("key", "1", 2, 2), failure: true}, @@ -353,19 +353,19 @@ func TestModelNonDeterministic(t *testing.T) { name: "Txn can expect on empty key", operations: []testOperation{ {req: getRequest("key1"), resp: emptyGetResponse(1)}, - {req: compareAndSetRequest("key1", 0, "2"), resp: compareAndSetResponse(true, 2)}, - {req: compareAndSetRequest("key2", 0, "3"), resp: compareAndSetResponse(true, 3)}, - {req: compareAndSetRequest("key3", 4, "4"), resp: compareAndSetResponse(false, 4), failure: true}, + {req: compareRevisionAndPutRequest("key1", 0, "2"), resp: compareRevisionAndPutResponse(true, 2)}, + {req: compareRevisionAndPutRequest("key2", 0, "3"), resp: compareRevisionAndPutResponse(true, 3)}, + {req: compareRevisionAndPutRequest("key3", 4, "4"), resp: compareRevisionAndPutResponse(false, 4), failure: true}, }, }, { name: "Txn doesn't do anything if value doesn't match expected", operations: []testOperation{ {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, - {req: compareAndSetRequest("key", 2, "3"), resp: compareAndSetResponse(true, 2), failure: true}, - {req: compareAndSetRequest("key", 2, "3"), resp: compareAndSetResponse(true, 1), failure: true}, - {req: compareAndSetRequest("key", 2, "3"), resp: compareAndSetResponse(false, 2), failure: true}, - {req: compareAndSetRequest("key", 2, "3"), resp: compareAndSetResponse(false, 1)}, + {req: compareRevisionAndPutRequest("key", 2, "3"), resp: compareRevisionAndPutResponse(true, 2), failure: true}, + {req: compareRevisionAndPutRequest("key", 2, "3"), resp: compareRevisionAndPutResponse(true, 1), failure: true}, + {req: compareRevisionAndPutRequest("key", 2, "3"), resp: compareRevisionAndPutResponse(false, 2), failure: true}, + {req: compareRevisionAndPutRequest("key", 2, "3"), resp: compareRevisionAndPutResponse(false, 1)}, {req: getRequest("key"), resp: getResponse("key", "2", 1, 1), failure: true}, {req: getRequest("key"), resp: getResponse("key", "2", 2, 2), failure: true}, {req: getRequest("key"), resp: getResponse("key", "3", 1, 1), failure: true}, @@ -378,7 +378,7 @@ func TestModelNonDeterministic(t *testing.T) { name: "Txn can fail and be lost before get", operations: []testOperation{ {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, - {req: compareAndSetRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))}, + {req: compareRevisionAndPutRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))}, {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, {req: getRequest("key"), resp: getResponse("key", "2", 2, 2), failure: true}, }, @@ -387,7 +387,7 @@ func TestModelNonDeterministic(t *testing.T) { name: "Txn can fail and be lost before delete", operations: []testOperation{ {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, - {req: compareAndSetRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))}, + {req: compareRevisionAndPutRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))}, {req: deleteRequest("key"), resp: deleteResponse(1, 2)}, }, }, @@ -395,7 +395,7 @@ func TestModelNonDeterministic(t *testing.T) { name: "Txn can fail and be lost before put", operations: []testOperation{ {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, - {req: compareAndSetRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))}, + {req: compareRevisionAndPutRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))}, {req: putRequest("key", "3"), resp: putResponse(2)}, }, }, @@ -404,13 +404,13 @@ func TestModelNonDeterministic(t *testing.T) { operations: []testOperation{ // One failed request, one persisted. {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, - {req: compareAndSetRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))}, + {req: compareRevisionAndPutRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))}, {req: getRequest("key"), resp: getResponse("key", "2", 1, 1), failure: true}, {req: getRequest("key"), resp: getResponse("key", "2", 2, 2)}, // Two failed request, two persisted. {req: putRequest("key", "3"), resp: putResponse(3)}, - {req: compareAndSetRequest("key", 3, "4"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", 4, "5"), resp: failedResponse(errors.New("failed"))}, + {req: compareRevisionAndPutRequest("key", 3, "4"), resp: failedResponse(errors.New("failed"))}, + {req: compareRevisionAndPutRequest("key", 4, "5"), resp: failedResponse(errors.New("failed"))}, {req: getRequest("key"), resp: getResponse("key", "5", 5, 5)}, }, }, @@ -419,12 +419,12 @@ func TestModelNonDeterministic(t *testing.T) { operations: []testOperation{ // One failed request, one persisted. {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, - {req: compareAndSetRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))}, + {req: compareRevisionAndPutRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))}, {req: putRequest("key", "3"), resp: putResponse(3)}, // Two failed request, two persisted. {req: putRequest("key", "4"), resp: putResponse(4)}, - {req: compareAndSetRequest("key", 4, "5"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", 5, "6"), resp: failedResponse(errors.New("failed"))}, + {req: compareRevisionAndPutRequest("key", 4, "5"), resp: failedResponse(errors.New("failed"))}, + {req: compareRevisionAndPutRequest("key", 5, "6"), resp: failedResponse(errors.New("failed"))}, {req: putRequest("key", "7"), resp: putResponse(7)}, }, }, @@ -433,12 +433,12 @@ func TestModelNonDeterministic(t *testing.T) { operations: []testOperation{ // One failed request, one persisted. {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, - {req: compareAndSetRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))}, + {req: compareRevisionAndPutRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))}, {req: deleteRequest("key"), resp: deleteResponse(1, 3)}, // Two failed request, two persisted. {req: putRequest("key", "4"), resp: putResponse(4)}, - {req: compareAndSetRequest("key", 4, "5"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", 5, "6"), resp: failedResponse(errors.New("failed"))}, + {req: compareRevisionAndPutRequest("key", 4, "5"), resp: failedResponse(errors.New("failed"))}, + {req: compareRevisionAndPutRequest("key", 5, "6"), resp: failedResponse(errors.New("failed"))}, {req: deleteRequest("key"), resp: deleteResponse(1, 7)}, }, }, @@ -447,17 +447,17 @@ func TestModelNonDeterministic(t *testing.T) { operations: []testOperation{ // One failed request, one persisted with success. {req: getRequest("key"), resp: getResponse("key", "1", 1, 1)}, - {req: compareAndSetRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", 2, "3"), resp: compareAndSetResponse(true, 3)}, + {req: compareRevisionAndPutRequest("key", 1, "2"), resp: failedResponse(errors.New("failed"))}, + {req: compareRevisionAndPutRequest("key", 2, "3"), resp: compareRevisionAndPutResponse(true, 3)}, // Two failed request, two persisted with success. {req: putRequest("key", "4"), resp: putResponse(4)}, - {req: compareAndSetRequest("key", 4, "5"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", 5, "6"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", 6, "7"), resp: compareAndSetResponse(true, 7)}, + {req: compareRevisionAndPutRequest("key", 4, "5"), resp: failedResponse(errors.New("failed"))}, + {req: compareRevisionAndPutRequest("key", 5, "6"), resp: failedResponse(errors.New("failed"))}, + {req: compareRevisionAndPutRequest("key", 6, "7"), resp: compareRevisionAndPutResponse(true, 7)}, // One failed request, one persisted with failure. {req: putRequest("key", "8"), resp: putResponse(8)}, - {req: compareAndSetRequest("key", 8, "9"), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", 8, "10"), resp: compareAndSetResponse(false, 9)}, + {req: compareRevisionAndPutRequest("key", 8, "9"), resp: failedResponse(errors.New("failed"))}, + {req: compareRevisionAndPutRequest("key", 8, "10"), resp: compareRevisionAndPutResponse(false, 9)}, }, }, { @@ -584,7 +584,7 @@ func TestModelNonDeterministic(t *testing.T) { {req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)}, {req: putRequest("key", "4"), resp: putResponse(4)}, {req: getRequest("key"), resp: getResponse("key", "4", 4, 4)}, - {req: compareAndSetRequest("key", 4, "5"), resp: compareAndSetResponse(true, 5)}, + {req: compareRevisionAndPutRequest("key", 4, "5"), resp: compareRevisionAndPutResponse(true, 5)}, {req: deleteRequest("key"), resp: deleteResponse(1, 6)}, {req: defragmentRequest(), resp: defragmentResponse(6)}, }, @@ -603,7 +603,7 @@ func TestModelNonDeterministic(t *testing.T) { {req: defragmentRequest(), resp: defragmentResponse(4)}, {req: getRequest("key"), resp: getResponse("key", "4", 4, 4)}, {req: defragmentRequest(), resp: defragmentResponse(4)}, - {req: compareAndSetRequest("key", 4, "5"), resp: compareAndSetResponse(true, 5)}, + {req: compareRevisionAndPutRequest("key", 4, "5"), resp: compareRevisionAndPutResponse(true, 5)}, {req: defragmentRequest(), resp: defragmentResponse(5)}, {req: deleteRequest("key"), resp: deleteResponse(1, 6)}, {req: defragmentRequest(), resp: defragmentResponse(6)}, @@ -623,7 +623,7 @@ func TestModelNonDeterministic(t *testing.T) { {req: defragmentRequest(), resp: failedResponse(errors.New("failed"))}, {req: getRequest("key"), resp: getResponse("key", "4", 4, 4)}, {req: defragmentRequest(), resp: failedResponse(errors.New("failed"))}, - {req: compareAndSetRequest("key", 4, "5"), resp: compareAndSetResponse(true, 5)}, + {req: compareRevisionAndPutRequest("key", 4, "5"), resp: compareRevisionAndPutResponse(true, 5)}, {req: defragmentRequest(), resp: failedResponse(errors.New("failed"))}, {req: deleteRequest("key"), resp: deleteResponse(1, 6)}, {req: defragmentRequest(), resp: failedResponse(errors.New("failed"))}, @@ -755,42 +755,42 @@ func TestModelResponseMatch(t *testing.T) { expectMatch: false, }, { - resp1: compareAndSetResponse(false, 7), - resp2: compareAndSetResponse(false, 7), + resp1: compareRevisionAndPutResponse(false, 7), + resp2: compareRevisionAndPutResponse(false, 7), expectMatch: true, }, { - resp1: compareAndSetResponse(true, 7), - resp2: compareAndSetResponse(false, 7), + resp1: compareRevisionAndPutResponse(true, 7), + resp2: compareRevisionAndPutResponse(false, 7), expectMatch: false, }, { - resp1: compareAndSetResponse(false, 7), - resp2: compareAndSetResponse(false, 8), + resp1: compareRevisionAndPutResponse(false, 7), + resp2: compareRevisionAndPutResponse(false, 8), expectMatch: false, }, { - resp1: compareAndSetResponse(false, 7), + resp1: compareRevisionAndPutResponse(false, 7), resp2: failedResponse(errors.New("failed request")), expectMatch: false, }, { - resp1: compareAndSetResponse(true, 7), + resp1: compareRevisionAndPutResponse(true, 7), resp2: unknownResponse(7), expectMatch: true, }, { - resp1: compareAndSetResponse(false, 7), + resp1: compareRevisionAndPutResponse(false, 7), resp2: unknownResponse(7), expectMatch: true, }, { - resp1: compareAndSetResponse(true, 7), + resp1: compareRevisionAndPutResponse(true, 7), resp2: unknownResponse(0), expectMatch: false, }, { - resp1: compareAndSetResponse(false, 7), + resp1: compareRevisionAndPutResponse(false, 7), resp2: unknownResponse(0), expectMatch: false, }, diff --git a/tests/robustness/traffic.go b/tests/robustness/traffic.go index 3bb433c8f..a3b61133e 100644 --- a/tests/robustness/traffic.go +++ b/tests/robustness/traffic.go @@ -41,20 +41,6 @@ var ( MultiOpTxnOpCount = 4 ) -type TrafficRequestType string - -const ( - Get TrafficRequestType = "get" - Put TrafficRequestType = "put" - LargePut TrafficRequestType = "largePut" - Delete TrafficRequestType = "delete" - MultiOpTxn TrafficRequestType = "multiOpTxn" - PutWithLease TrafficRequestType = "putWithLease" - LeaseRevoke TrafficRequestType = "leaseRevoke" - CompareAndSet TrafficRequestType = "compareAndSet" - Defragment TrafficRequestType = "defragment" -) - func simulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config trafficConfig, finish <-chan struct{}) []porcupine.Operation { mux := sync.Mutex{} endpoints := clus.EndpointsGRPC() @@ -122,22 +108,41 @@ type Traffic interface { Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) } -type traffic struct { +type etcdTraffic struct { keyCount int - writes []requestChance + writeChoices []choiceWeight leaseTTL int64 largePutSize int } -type requestChance struct { - operation TrafficRequestType - chance int -} +type etcdRequestType string + +const ( + Put etcdRequestType = "put" + LargePut etcdRequestType = "largePut" + Delete etcdRequestType = "delete" + MultiOpTxn etcdRequestType = "multiOpTxn" + PutWithLease etcdRequestType = "putWithLease" + LeaseRevoke etcdRequestType = "leaseRevoke" + CompareAndSet etcdRequestType = "compareAndSet" + Defragment etcdRequestType = "defragment" +) type kubernetesTraffic struct { - keyCount int + averageKeyCount int + resource string + namespace string + writeChoices []choiceWeight } +type KubernetesRequestType string + +const ( + KubernetesUpdate KubernetesRequestType = "update" + KubernetesCreate KubernetesRequestType = "create" + KubernetesDelete KubernetesRequestType = "delete" +) + func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) { for { select { @@ -147,20 +152,12 @@ func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *recordingCl return default: } - resource := "pods" - - pods, err := t.Range(ctx, c, "/registry/"+resource+"/", true) + objects, err := t.Range(ctx, c, "/registry/"+t.resource+"/", true) if err != nil { continue } limiter.Wait(ctx) - if len(pods) < t.keyCount { - err = t.Create(ctx, c, fmt.Sprintf("/registry/%s/default/%s", resource, stringutil.RandString(5)), fmt.Sprintf("%d", ids.RequestId())) - continue - } else { - randomPod := pods[rand.Intn(len(pods))] - err = t.Update(ctx, c, string(randomPod.Key), fmt.Sprintf("%d", ids.RequestId()), randomPod.ModRevision) - } + err = t.Write(ctx, c, ids, objects) if err != nil { continue } @@ -168,6 +165,36 @@ func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *recordingCl } } +func (t kubernetesTraffic) Write(ctx context.Context, c *recordingClient, ids identity.Provider, objects []*mvccpb.KeyValue) (err error) { + writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout) + if len(objects) < t.averageKeyCount/2 { + err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.RequestId())) + } else { + randomPod := objects[rand.Intn(len(objects))] + if len(objects) > t.averageKeyCount*3/2 { + err = t.Delete(writeCtx, c, string(randomPod.Key), randomPod.ModRevision) + } else { + op := KubernetesRequestType(pickRandom(t.writeChoices)) + switch op { + case KubernetesDelete: + err = t.Delete(writeCtx, c, string(randomPod.Key), randomPod.ModRevision) + case KubernetesUpdate: + err = t.Update(writeCtx, c, string(randomPod.Key), fmt.Sprintf("%d", ids.RequestId()), randomPod.ModRevision) + case KubernetesCreate: + err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.RequestId())) + default: + panic(fmt.Sprintf("invalid choice: %q", op)) + } + } + } + cancel() + return err +} + +func (t kubernetesTraffic) generateKey() string { + return fmt.Sprintf("/registry/%s/%s/%s", t.resource, t.namespace, stringutil.RandString(5)) +} + func (t kubernetesTraffic) Range(ctx context.Context, c *recordingClient, key string, withPrefix bool) ([]*mvccpb.KeyValue, error) { ctx, cancel := context.WithTimeout(ctx, RequestTimeout) resp, err := c.Range(ctx, key, withPrefix) @@ -181,12 +208,19 @@ func (t kubernetesTraffic) Create(ctx context.Context, c *recordingClient, key, func (t kubernetesTraffic) Update(ctx context.Context, c *recordingClient, key, value string, expectedRevision int64) error { ctx, cancel := context.WithTimeout(ctx, RequestTimeout) - err := c.CompareAndSet(ctx, key, value, expectedRevision) + err := c.CompareRevisionAndPut(ctx, key, value, expectedRevision) cancel() return err } -func (t traffic) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) { +func (t kubernetesTraffic) Delete(ctx context.Context, c *recordingClient, key string, expectedRevision int64) error { + ctx, cancel := context.WithTimeout(ctx, RequestTimeout) + err := c.CompareRevisionAndDelete(ctx, key, expectedRevision) + cancel() + return err +} + +func (t etcdTraffic) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) { for { select { @@ -211,18 +245,18 @@ func (t traffic) Run(ctx context.Context, clientId int, c *recordingClient, limi } } -func (t traffic) Read(ctx context.Context, c *recordingClient, key string) (*mvccpb.KeyValue, error) { +func (t etcdTraffic) Read(ctx context.Context, c *recordingClient, key string) (*mvccpb.KeyValue, error) { getCtx, cancel := context.WithTimeout(ctx, RequestTimeout) resp, err := c.Get(getCtx, key) cancel() return resp, err } -func (t traffic) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage, cid int, lastValues *mvccpb.KeyValue) error { +func (t etcdTraffic) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage, cid int, lastValues *mvccpb.KeyValue) error { writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout) var err error - switch t.pickWriteRequest() { + switch etcdRequestType(pickRandom(t.writeChoices)) { case Put: err = c.Put(writeCtx, key, fmt.Sprintf("%d", id.RequestId())) case LargePut: @@ -236,7 +270,7 @@ func (t traffic) Write(ctx context.Context, c *recordingClient, limiter *rate.Li if lastValues != nil { expectRevision = lastValues.ModRevision } - err = c.CompareAndSet(writeCtx, key, fmt.Sprintf("%d", id.RequestId()), expectRevision) + err = c.CompareRevisionAndPut(writeCtx, key, fmt.Sprintf("%d", id.RequestId()), expectRevision) case PutWithLease: leaseId := lm.LeaseId(cid) if leaseId == 0 { @@ -263,28 +297,13 @@ func (t traffic) Write(ctx context.Context, c *recordingClient, limiter *rate.Li case Defragment: err = c.Defragment(writeCtx) default: - panic("invalid operation") + panic("invalid choice") } cancel() return err } -func (t traffic) pickWriteRequest() TrafficRequestType { - sum := 0 - for _, op := range t.writes { - sum += op.chance - } - roll := rand.Int() % sum - for _, op := range t.writes { - if roll < op.chance { - return op.operation - } - roll -= op.chance - } - panic("unexpected") -} - -func (t traffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op) { +func (t etcdTraffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op) { keys := rand.Perm(t.keyCount) opTypes := make([]model.OperationType, 4) @@ -311,13 +330,13 @@ func (t traffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op) { case model.Delete: ops = append(ops, clientv3.OpDelete(key)) default: - panic("unsuported operation type") + panic("unsuported choice type") } } return ops } -func (t traffic) pickOperationType() model.OperationType { +func (t etcdTraffic) pickOperationType() model.OperationType { roll := rand.Int() % 100 if roll < 10 { return model.Delete @@ -336,3 +355,23 @@ func randString(size int) string { } return data.String() } + +type choiceWeight struct { + choice string + weight int +} + +func pickRandom(choices []choiceWeight) string { + sum := 0 + for _, op := range choices { + sum += op.weight + } + roll := rand.Int() % sum + for _, op := range choices { + if roll < op.weight { + return op.choice + } + roll -= op.weight + } + panic("unexpected") +}