tests: Use watch events to patch history to speed up linearization

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
dependabot/go_modules/go.uber.org/atomic-1.10.0
Marek Siarkowicz 2023-01-10 18:11:57 +01:00
parent 3306639b76
commit 7b2dfece70
5 changed files with 285 additions and 40 deletions

View File

@ -197,19 +197,23 @@ func getRequest(key string) EtcdRequest {
}
func getResponse(value string, revision int64) EtcdResponse {
return EtcdResponse{Result: []EtcdOperationResult{{Value: value}}, Revision: revision}
return EtcdResponse{OpsResult: []EtcdOperationResult{{Value: value}}, Revision: revision}
}
func failedResponse(err error) EtcdResponse {
return EtcdResponse{Err: err}
}
func unknownResponse(revision int64) EtcdResponse {
return EtcdResponse{ResultUnknown: true, Revision: revision}
}
func putRequest(key, value string) EtcdRequest {
return EtcdRequest{Ops: []EtcdOperation{{Type: Put, Key: key, Value: value}}}
}
func putResponse(revision int64) EtcdResponse {
return EtcdResponse{Result: []EtcdOperationResult{{}}, Revision: revision}
return EtcdResponse{OpsResult: []EtcdOperationResult{{}}, Revision: revision}
}
func deleteRequest(key string) EtcdRequest {
@ -217,7 +221,7 @@ func deleteRequest(key string) EtcdRequest {
}
func deleteResponse(deleted int64, revision int64) EtcdResponse {
return EtcdResponse{Result: []EtcdOperationResult{{Deleted: deleted}}, Revision: revision}
return EtcdResponse{OpsResult: []EtcdOperationResult{{Deleted: deleted}}, Revision: revision}
}
func txnRequest(key, expectValue, newValue string) EtcdRequest {
@ -229,7 +233,7 @@ func txnResponse(succeeded bool, revision int64) EtcdResponse {
if succeeded {
result = []EtcdOperationResult{{}}
}
return EtcdResponse{Result: result, TxnFailure: !succeeded, Revision: revision}
return EtcdResponse{OpsResult: result, TxnResult: !succeeded, Revision: revision}
}
func putWithLeaseRequest(key, value string, leaseID int64) EtcdRequest {
@ -241,7 +245,7 @@ func leaseGrantRequest(leaseID int64) EtcdRequest {
}
func leaseGrantResponse(revision int64) EtcdResponse {
return EtcdResponse{Result: []EtcdOperationResult{{}}, Revision: revision}
return EtcdResponse{OpsResult: []EtcdOperationResult{{}}, Revision: revision}
}
func leaseRevokeRequest(leaseID int64) EtcdRequest {
@ -249,7 +253,7 @@ func leaseRevokeRequest(leaseID int64) EtcdRequest {
}
func leaseRevokeResponse(revision int64) EtcdResponse {
return EtcdResponse{Result: []EtcdOperationResult{{}}, Revision: revision}
return EtcdResponse{OpsResult: []EtcdOperationResult{{}}, Revision: revision}
}
type history struct {

View File

@ -18,6 +18,7 @@ import (
"context"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"testing"
@ -25,6 +26,7 @@ import (
"github.com/anishathalye/porcupine"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
@ -102,7 +104,9 @@ func TestLinearizability(t *testing.T) {
clientCount: 8,
traffic: DefaultTraffic,
})
validateEventsMatch(t, events)
longestHistory, remainingEvents := pickLongestHistory(events)
validateEventsMatch(t, longestHistory, remainingEvents)
operations = patchOperationBasedOnWatchEvents(operations, longestHistory)
checkOperationsAndPersistResults(t, operations, clus)
})
}
@ -133,6 +137,75 @@ func testLinearizability(ctx context.Context, t *testing.T, clus *e2e.EtcdProces
return operations, events
}
func patchOperationBasedOnWatchEvents(operations []porcupine.Operation, watchEvents []watchEvent) []porcupine.Operation {
newOperations := make([]porcupine.Operation, 0, len(operations))
persisted := map[EtcdOperation]watchEvent{}
for _, op := range watchEvents {
persisted[op.Op] = op
}
lastObservedEventTime := watchEvents[len(watchEvents)-1].Time
for _, op := range operations {
resp := op.Output.(EtcdResponse)
if resp.Err == nil || op.Call > lastObservedEventTime.UnixNano() {
// No need to patch successfully requests and cannot patch requests outside observed window.
newOperations = append(newOperations, op)
continue
}
event, hasUniqueWriteOperation := matchWatchEvent(op, persisted)
if event != nil {
// Set revision and time based on watchEvent.
op.Return = event.Time.UnixNano()
op.Output = EtcdResponse{
Revision: event.Revision,
ResultUnknown: true,
}
newOperations = append(newOperations, op)
continue
}
if hasWriteOperation(op) && !hasUniqueWriteOperation {
// Leave operation as it is as we cannot match non-unique operations to watch events.
newOperations = append(newOperations, op)
continue
}
// Remove non persisted operations
}
return newOperations
}
func matchWatchEvent(op porcupine.Operation, watchEvents map[EtcdOperation]watchEvent) (event *watchEvent, hasUniqueWriteOperation bool) {
request := op.Input.(EtcdRequest)
for _, etcdOp := range request.Ops {
if isWrite(etcdOp.Type) && inUnique(etcdOp.Type) {
// We expect all put to be unique as they write unique value.
hasUniqueWriteOperation = true
opType := etcdOp.Type
if opType == PutWithLease {
opType = Put
}
event, ok := watchEvents[EtcdOperation{
Type: opType,
Key: etcdOp.Key,
Value: etcdOp.Value,
}]
if ok {
return &event, hasUniqueWriteOperation
}
}
}
return nil, hasUniqueWriteOperation
}
func hasWriteOperation(op porcupine.Operation) bool {
request := op.Input.(EtcdRequest)
for _, etcdOp := range request.Ops {
if isWrite(etcdOp.Type) {
return true
}
}
return false
}
func triggerFailpoints(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config FailpointConfig) {
var err error
successes := 0
@ -213,20 +286,18 @@ type trafficConfig struct {
traffic Traffic
}
func validateEventsMatch(t *testing.T, ops [][]watchEvent) {
// Move longest history to ops[0]
maxLength := len(ops[0])
for i := 1; i < len(ops); i++ {
if len(ops[i]) > maxLength {
maxLength = len(ops[i])
ops[0], ops[i] = ops[i], ops[0]
}
}
func pickLongestHistory(ops [][]watchEvent) (longest []watchEvent, rest [][]watchEvent) {
sort.Slice(ops, func(i, j int) bool {
return len(ops[i]) > len(ops[j])
})
return ops[0], ops[1:]
}
for i := 1; i < len(ops); i++ {
length := len(ops[i])
func validateEventsMatch(t *testing.T, longestHistory []watchEvent, other [][]watchEvent) {
for i := 0; i < len(other); i++ {
length := len(other[i])
// We compare prefix of watch events, as we are not guaranteed to collect all events from each node.
if diff := cmp.Diff(ops[0][:length], ops[i][:length]); diff != "" {
if diff := cmp.Diff(longestHistory[:length], other[i][:length], cmpopts.IgnoreFields(watchEvent{}, "Time")); diff != "" {
t.Errorf("Events in watches do not match, %s", diff)
}
}

View File

@ -35,6 +35,14 @@ const (
LeaseRevoke OperationType = "leaseRevoke"
)
func isWrite(t OperationType) bool {
return t == Put || t == Delete || t == PutWithLease || t == LeaseRevoke || t == LeaseGrant
}
func inUnique(t OperationType) bool {
return t == Put || t == PutWithLease
}
type EtcdRequest struct {
Conds []EtcdCondition
Ops []EtcdOperation
@ -53,10 +61,15 @@ type EtcdOperation struct {
}
type EtcdResponse struct {
Err error
Revision int64
TxnFailure bool
Result []EtcdOperationResult
Err error
Revision int64
ResultUnknown bool
TxnResult bool
OpsResult []EtcdOperationResult
}
func Match(r1, r2 EtcdResponse) bool {
return ((r1.ResultUnknown || r2.ResultUnknown) && (r1.Revision == r2.Revision)) || reflect.DeepEqual(r1, r2)
}
type EtcdOperationResult struct {
@ -70,7 +83,6 @@ type EtcdLease struct {
LeaseID int64
Keys map[string]struct{}
}
type PossibleStates []EtcdState
type EtcdState struct {
@ -131,12 +143,15 @@ func describeEtcdResponse(ops []EtcdOperation, response EtcdResponse) string {
if response.Err != nil {
return fmt.Sprintf("err: %q", response.Err)
}
if response.TxnFailure {
if response.ResultUnknown {
return fmt.Sprintf("unknown, rev: %d", response.Revision)
}
if response.TxnResult {
return fmt.Sprintf("txn failed, rev: %d", response.Revision)
}
respDescription := make([]string, len(response.Result))
for i := range response.Result {
respDescription[i] = describeEtcdOperationResponse(ops[i].Type, response.Result[i])
respDescription := make([]string, len(response.OpsResult))
for i := range response.OpsResult {
respDescription[i] = describeEtcdOperationResponse(ops[i].Type, response.OpsResult[i])
}
respDescription = append(respDescription, fmt.Sprintf("rev: %d", response.Revision))
return strings.Join(respDescription, ", ")
@ -190,7 +205,7 @@ func describeEtcdOperationResponse(op OperationType, resp EtcdOperationResult) s
func step(states PossibleStates, request EtcdRequest, response EtcdResponse) (bool, PossibleStates) {
if len(states) == 0 {
// states were not initialized
if response.Err != nil {
if response.Err != nil || response.ResultUnknown {
return true, nil
}
return true, PossibleStates{initState(request, response)}
@ -211,11 +226,11 @@ func initState(request EtcdRequest, response EtcdResponse) EtcdState {
KeyLeases: map[string]int64{},
Leases: map[int64]EtcdLease{},
}
if response.TxnFailure {
if response.TxnResult {
return state
}
for i, op := range request.Ops {
opResp := response.Result[i]
opResp := response.OpsResult[i]
switch op.Type {
case Get:
if opResp.Value != "" {
@ -263,7 +278,7 @@ func applyRequest(states PossibleStates, request EtcdRequest, response EtcdRespo
newStates := make(PossibleStates, 0, len(states))
for _, s := range states {
newState, expectResponse := applyRequestToSingleState(s, request)
if reflect.DeepEqual(expectResponse, response) {
if Match(expectResponse, response) {
newStates = append(newStates, newState)
}
}
@ -280,7 +295,7 @@ func applyRequestToSingleState(s EtcdState, request EtcdRequest) (EtcdState, Etc
}
}
if !success {
return s, EtcdResponse{Revision: s.Revision, TxnFailure: true}
return s, EtcdResponse{Revision: s.Revision, TxnResult: true}
}
newKVs := map[string]string{}
for k, v := range s.KeyValues {
@ -346,7 +361,7 @@ func applyRequestToSingleState(s EtcdState, request EtcdRequest) (EtcdState, Etc
s.Revision += 1
}
return s, EtcdResponse{Result: opResp, Revision: s.Revision}
return s, EtcdResponse{OpsResult: opResp, Revision: s.Revision}
}
func detachFromOldLease(s EtcdState, op EtcdOperation) EtcdState {

View File

@ -569,6 +569,11 @@ func TestModelDescribe(t *testing.T) {
resp: failedResponse(errors.New("failed")),
expectDescribe: `put("key4", "4") -> err: "failed"`,
},
{
req: putRequest("key4b", "4b"),
resp: unknownResponse(42),
expectDescribe: `put("key4b", "4b") -> unknown, rev: 42`,
},
{
req: deleteRequest("key5"),
resp: deleteResponse(1, 5),
@ -599,3 +604,150 @@ func TestModelDescribe(t *testing.T) {
assert.Equal(t, tc.expectDescribe, etcdModel.DescribeOperation(tc.req, tc.resp))
}
}
func TestModelResponseMatch(t *testing.T) {
tcs := []struct {
resp1 EtcdResponse
resp2 EtcdResponse
expectMatch bool
}{
{
resp1: getResponse("a", 1),
resp2: getResponse("a", 1),
expectMatch: true,
},
{
resp1: getResponse("a", 1),
resp2: getResponse("b", 1),
expectMatch: false,
},
{
resp1: getResponse("a", 1),
resp2: getResponse("a", 2),
expectMatch: false,
},
{
resp1: getResponse("a", 1),
resp2: failedResponse(errors.New("failed request")),
expectMatch: false,
},
{
resp1: getResponse("a", 1),
resp2: unknownResponse(1),
expectMatch: true,
},
{
resp1: getResponse("a", 1),
resp2: unknownResponse(0),
expectMatch: false,
},
{
resp1: putResponse(3),
resp2: putResponse(3),
expectMatch: true,
},
{
resp1: putResponse(3),
resp2: putResponse(4),
expectMatch: false,
},
{
resp1: putResponse(3),
resp2: failedResponse(errors.New("failed request")),
expectMatch: false,
},
{
resp1: putResponse(3),
resp2: unknownResponse(3),
expectMatch: true,
},
{
resp1: putResponse(3),
resp2: unknownResponse(0),
expectMatch: false,
},
{
resp1: deleteResponse(1, 5),
resp2: deleteResponse(1, 5),
expectMatch: true,
},
{
resp1: deleteResponse(1, 5),
resp2: deleteResponse(0, 5),
expectMatch: false,
},
{
resp1: deleteResponse(1, 5),
resp2: deleteResponse(1, 6),
expectMatch: false,
},
{
resp1: deleteResponse(1, 5),
resp2: failedResponse(errors.New("failed request")),
expectMatch: false,
},
{
resp1: deleteResponse(1, 5),
resp2: unknownResponse(5),
expectMatch: true,
},
{
resp1: deleteResponse(0, 5),
resp2: unknownResponse(0),
expectMatch: false,
},
{
resp1: deleteResponse(1, 5),
resp2: unknownResponse(0),
expectMatch: false,
},
{
resp1: deleteResponse(0, 5),
resp2: unknownResponse(2),
expectMatch: false,
},
{
resp1: txnResponse(false, 7),
resp2: txnResponse(false, 7),
expectMatch: true,
},
{
resp1: txnResponse(true, 7),
resp2: txnResponse(false, 7),
expectMatch: false,
},
{
resp1: txnResponse(false, 7),
resp2: txnResponse(false, 8),
expectMatch: false,
},
{
resp1: txnResponse(false, 7),
resp2: failedResponse(errors.New("failed request")),
expectMatch: false,
},
{
resp1: txnResponse(true, 7),
resp2: unknownResponse(7),
expectMatch: true,
},
{
resp1: txnResponse(false, 7),
resp2: unknownResponse(7),
expectMatch: true,
},
{
resp1: txnResponse(true, 7),
resp2: unknownResponse(0),
expectMatch: false,
},
{
resp1: txnResponse(false, 7),
resp2: unknownResponse(0),
expectMatch: false,
},
}
for i, tc := range tcs {
assert.Equal(t, tc.expectMatch, Match(tc.resp1, tc.resp2), "%d %+v %+v", i, tc.resp1, tc.resp2)
}
}

View File

@ -67,6 +67,7 @@ func collectMemberWatchEvents(ctx context.Context, t *testing.T, c *clientv3.Cli
}
for resp := range c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(lastRevision)) {
lastRevision = resp.Header.Revision
time := time.Now()
for _, event := range resp.Events {
var op OperationType
switch event.Type {
@ -76,10 +77,13 @@ func collectMemberWatchEvents(ctx context.Context, t *testing.T, c *clientv3.Cli
op = Delete
}
events = append(events, watchEvent{
Op: op,
Key: string(event.Kv.Key),
Value: string(event.Kv.Value),
Time: time,
Revision: event.Kv.ModRevision,
Op: EtcdOperation{
Type: op,
Key: string(event.Kv.Key),
Value: string(event.Kv.Value),
},
})
}
if resp.Err() != nil {
@ -90,8 +94,7 @@ func collectMemberWatchEvents(ctx context.Context, t *testing.T, c *clientv3.Cli
}
type watchEvent struct {
Op OperationType
Key string
Value string
Op EtcdOperation
Revision int64
Time time.Time
}