tests/robustness: Validate stale get requests by replaying etcd state

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
dependabot/go_modules/tools/mod/golang.org/x/sync-0.3.0
Marek Siarkowicz 2023-06-19 14:12:25 +02:00
parent bb155a6629
commit 1663600bec
10 changed files with 229 additions and 58 deletions

View File

@ -47,12 +47,12 @@ var DeterministicModel = porcupine.Model{
return string(data)
},
Step: func(st interface{}, in interface{}, out interface{}) (bool, interface{}) {
var s etcdState
var s EtcdState
err := json.Unmarshal([]byte(st.(string)), &s)
if err != nil {
panic(err)
}
ok, s := s.Step(in.(EtcdRequest), out.(EtcdResponse))
ok, s := s.apply(in.(EtcdRequest), out.(EtcdResponse))
data, err := json.Marshal(s)
if err != nil {
panic(err)
@ -64,20 +64,20 @@ var DeterministicModel = porcupine.Model{
},
}
type etcdState struct {
type EtcdState struct {
Revision int64
KeyValues map[string]ValueRevision
KeyLeases map[string]int64
Leases map[int64]EtcdLease
}
func (s etcdState) Step(request EtcdRequest, response EtcdResponse) (bool, etcdState) {
newState, modelResponse := s.step(request)
func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, EtcdState) {
newState, modelResponse := s.Step(request)
return Match(MaybeEtcdResponse{EtcdResponse: response}, modelResponse), newState
}
func freshEtcdState() etcdState {
return etcdState{
func freshEtcdState() EtcdState {
return EtcdState{
Revision: 1,
KeyValues: map[string]ValueRevision{},
KeyLeases: map[string]int64{},
@ -85,8 +85,8 @@ func freshEtcdState() etcdState {
}
}
// step handles a successful request, returning updated state and response it would generate.
func (s etcdState) step(request EtcdRequest) (etcdState, MaybeEtcdResponse) {
// Step handles a successful request, returning updated state and response it would generate.
func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
newKVs := map[string]ValueRevision{}
for k, v := range s.KeyValues {
newKVs[k] = v
@ -185,7 +185,7 @@ func (s etcdState) step(request EtcdRequest) (etcdState, MaybeEtcdResponse) {
}
}
func (s etcdState) getRange(key string, options RangeOptions) RangeResponse {
func (s EtcdState) getRange(key string, options RangeOptions) RangeResponse {
response := RangeResponse{
KVs: []KeyValue{},
}
@ -217,7 +217,7 @@ func (s etcdState) getRange(key string, options RangeOptions) RangeResponse {
return response
}
func detachFromOldLease(s etcdState, key string) etcdState {
func detachFromOldLease(s EtcdState, key string) EtcdState {
if oldLeaseId, ok := s.KeyLeases[key]; ok {
delete(s.Leases[oldLeaseId].Keys, key)
delete(s.KeyLeases, key)
@ -225,7 +225,7 @@ func detachFromOldLease(s etcdState, key string) etcdState {
return s
}
func attachToNewLease(s etcdState, leaseID int64, key string) etcdState {
func attachToNewLease(s EtcdState, leaseID int64, key string) EtcdState {
s.KeyLeases[key] = leaseID
s.Leases[leaseID].Keys[key] = leased
return s

View File

@ -33,12 +33,12 @@ func TestModelDeterministic(t *testing.T) {
if op.expectFailure == ok {
t.Logf("state: %v", state)
t.Errorf("Unexpected operation result, expect: %v, got: %v, operation: %s", !op.expectFailure, ok, DeterministicModel.DescribeOperation(op.req, op.resp.EtcdResponse))
var loadedState etcdState
var loadedState EtcdState
err := json.Unmarshal([]byte(state.(string)), &loadedState)
if err != nil {
t.Fatalf("Failed to load state: %v", err)
}
_, resp := loadedState.step(op.req)
_, resp := loadedState.Step(op.req)
t.Errorf("Response diff: %s", cmp.Diff(op.resp, resp))
break
}

View File

@ -40,7 +40,7 @@ var NonDeterministicModel = porcupine.Model{
if err != nil {
panic(err)
}
ok, states := states.Step(in.(EtcdRequest), out.(MaybeEtcdResponse))
ok, states := states.apply(in.(EtcdRequest), out.(MaybeEtcdResponse))
data, err := json.Marshal(states)
if err != nil {
panic(err)
@ -52,27 +52,27 @@ var NonDeterministicModel = porcupine.Model{
},
}
type nonDeterministicState []etcdState
type nonDeterministicState []EtcdState
func (states nonDeterministicState) Step(request EtcdRequest, response MaybeEtcdResponse) (bool, nonDeterministicState) {
func (states nonDeterministicState) apply(request EtcdRequest, response MaybeEtcdResponse) (bool, nonDeterministicState) {
var newStates nonDeterministicState
switch {
case response.Err != nil:
newStates = states.stepFailedRequest(request)
newStates = states.stepFailedResponse(request)
case response.PartialResponse:
newStates = states.stepPartialRequest(request, response.EtcdResponse.Revision)
newStates = states.applyResponseRevision(request, response.EtcdResponse.Revision)
default:
newStates = states.stepSuccessfulRequest(request, response.EtcdResponse)
newStates = states.applySuccessfulResponse(request, response.EtcdResponse)
}
return len(newStates) > 0, newStates
}
// stepFailedRequest duplicates number of states by considering request persisted and lost.
func (states nonDeterministicState) stepFailedRequest(request EtcdRequest) nonDeterministicState {
// stepFailedResponse duplicates number of states by considering both cases, request was persisted and request was lost.
func (states nonDeterministicState) stepFailedResponse(request EtcdRequest) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states)*2)
for _, s := range states {
newStates = append(newStates, s)
newState, _ := s.step(request)
newState, _ := s.Step(request)
if !reflect.DeepEqual(newState, s) {
newStates = append(newStates, newState)
}
@ -80,11 +80,11 @@ func (states nonDeterministicState) stepFailedRequest(request EtcdRequest) nonDe
return newStates
}
// stepPartialRequest filters possible states by leaving ony states that would return proper revision.
func (states nonDeterministicState) stepPartialRequest(request EtcdRequest, responseRevision int64) nonDeterministicState {
// applyResponseRevision filters possible states by leaving ony states that would return proper revision.
func (states nonDeterministicState) applyResponseRevision(request EtcdRequest, responseRevision int64) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states))
for _, s := range states {
newState, modelResponse := s.step(request)
newState, modelResponse := s.Step(request)
if modelResponse.Revision == responseRevision {
newStates = append(newStates, newState)
}
@ -92,11 +92,11 @@ func (states nonDeterministicState) stepPartialRequest(request EtcdRequest, resp
return newStates
}
// stepSuccessfulRequest filters possible states by leaving ony states that would respond correctly.
func (states nonDeterministicState) stepSuccessfulRequest(request EtcdRequest, response EtcdResponse) nonDeterministicState {
// applySuccessfulResponse filters possible states by leaving ony states that would respond correctly.
func (states nonDeterministicState) applySuccessfulResponse(request EtcdRequest, response EtcdResponse) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states))
for _, s := range states {
newState, modelResponse := s.step(request)
newState, modelResponse := s.Step(request)
if Match(modelResponse, MaybeEtcdResponse{EtcdResponse: response}) {
newStates = append(newStates, newState)
}

View File

@ -339,7 +339,7 @@ func TestModelNonDeterministic(t *testing.T) {
t.Fatalf("Failed to load state: %v", err)
}
for i, s := range loadedState {
_, resp := s.step(op.req)
_, resp := s.Step(op.req)
t.Errorf("For state %d, response diff: %s", i, cmp.Diff(op.resp, resp))
}
break

View File

@ -0,0 +1,99 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
"fmt"
)
func NewReplay(eventHistory []WatchEvent) *EtcdReplay {
var lastEventRevision int64 = 1
for _, event := range eventHistory {
if event.Revision > lastEventRevision && event.Revision != lastEventRevision+1 {
panic("Replay requires a complete event history")
}
lastEventRevision = event.Revision
}
return &EtcdReplay{
eventHistory: eventHistory,
}
}
type EtcdReplay struct {
eventHistory []WatchEvent
// Cached state and event index used for it's calculation
cachedState *EtcdState
eventHistoryIndex int
}
func (r *EtcdReplay) StateForRevision(revision int64) (EtcdState, error) {
if revision < 1 {
return EtcdState{}, fmt.Errorf("invalid revision: %d", revision)
}
if r.cachedState == nil || r.cachedState.Revision > revision {
r.reset()
}
for r.eventHistoryIndex < len(r.eventHistory) && r.cachedState.Revision < revision {
nextRequest, nextRevision, nextIndex := r.next()
newState, _ := r.cachedState.Step(nextRequest)
if newState.Revision != nextRevision {
return EtcdState{}, fmt.Errorf("model returned different revision than one present in event history, model: %d, event: %d", newState.Revision, nextRevision)
}
r.cachedState = &newState
r.eventHistoryIndex = nextIndex
}
if r.eventHistoryIndex > len(r.eventHistory) && r.cachedState.Revision < revision {
return EtcdState{}, fmt.Errorf("requested revision higher then available in even history, requested: %d, model: %d", revision, r.cachedState.Revision)
}
return *r.cachedState, nil
}
func (r *EtcdReplay) reset() {
state := freshEtcdState()
r.cachedState = &state
r.eventHistoryIndex = 0
}
func (r *EtcdReplay) next() (request EtcdRequest, revision int64, index int) {
revision = r.eventHistory[r.eventHistoryIndex].Revision
index = r.eventHistoryIndex
operations := []EtcdOperation{}
for r.eventHistory[index].Revision == revision {
operations = append(operations, r.eventHistory[index].Op)
index++
}
return EtcdRequest{
Type: Txn,
Txn: &TxnRequest{
OperationsOnSuccess: operations,
},
}, revision, index
}
func operationToRequest(op EtcdOperation) EtcdRequest {
return EtcdRequest{
Type: Txn,
Txn: &TxnRequest{
OperationsOnSuccess: []EtcdOperation{op},
},
}
}
type WatchEvent struct {
Op EtcdOperation
Revision int64
}

View File

@ -34,7 +34,7 @@ type report struct {
lg *zap.Logger
clus *e2e.EtcdProcessCluster
clientReports []traffic.ClientReport
visualizeHistory func(path string)
visualizeHistory func(path string) error
}
func testResultsDirectory(t *testing.T) string {
@ -89,7 +89,10 @@ func (r *report) Report(t *testing.T, force bool) {
}
}
if r.visualizeHistory != nil {
r.visualizeHistory(filepath.Join(path, "history.html"))
err := r.visualizeHistory(filepath.Join(path, "history.html"))
if err != nil {
t.Error(err)
}
}
}

View File

@ -46,22 +46,17 @@ type RecordingClient struct {
}
type WatchResponse struct {
Events []WatchEvent
Events []model.WatchEvent
IsProgressNotify bool
Revision int64
Time time.Duration
}
type TimedWatchEvent struct {
WatchEvent
model.WatchEvent
Time time.Duration
}
type WatchEvent struct {
Op model.EtcdOperation
Revision int64
}
func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*RecordingClient, error) {
cc, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
@ -259,7 +254,7 @@ func ToWatchResponse(r clientv3.WatchResponse, baseTime time.Time) WatchResponse
return resp
}
func toWatchEvent(event clientv3.Event) WatchEvent {
func toWatchEvent(event clientv3.Event) model.WatchEvent {
var op model.OperationType
switch event.Type {
case mvccpb.PUT:
@ -269,7 +264,7 @@ func toWatchEvent(event clientv3.Event) WatchEvent {
default:
panic(fmt.Sprintf("Unexpected event type: %s", event.Type))
}
return WatchEvent{
return model.WatchEvent{
Revision: event.Kv.ModRevision,
Op: model.EtcdOperation{
Type: op,

View File

@ -15,28 +15,92 @@
package validate
import (
"fmt"
"reflect"
"sort"
"testing"
"time"
"github.com/anishathalye/porcupine"
"github.com/google/go-cmp/cmp"
"go.uber.org/zap"
"go.etcd.io/etcd/tests/v3/robustness/model"
)
func validateOperationHistoryAndReturnVisualize(t *testing.T, lg *zap.Logger, operations []porcupine.Operation) (visualize func(basepath string)) {
linearizable, info := porcupine.CheckOperationsVerbose(model.NonDeterministicModel, operations, 5*time.Minute)
if linearizable == porcupine.Illegal {
func validateOperationsAndVisualize(t *testing.T, lg *zap.Logger, operations []porcupine.Operation, eventHistory []model.WatchEvent) (visualize func(basepath string) error) {
const timeout = 5 * time.Minute
lg.Info("Validating linearizable operations", zap.Duration("timeout", timeout))
result, visualize := validateLinearizableOperationAndVisualize(lg, operations, timeout)
switch result {
case porcupine.Illegal:
t.Error("Linearization failed for provided operations")
return
case porcupine.Unknown:
t.Error("Model is not linearizable")
return
case porcupine.Ok:
t.Log("Linearization passed")
default:
t.Fatalf("Unknown Linearization")
}
if linearizable == porcupine.Unknown {
t.Error("Linearization timed out")
}
return func(path string) {
lg.Info("Validating serializable operations")
// TODO: Use linearization result instead of event history to get order of events
// This is currently impossible as porcupine doesn't expose operation order created during linearization.
validateSerializableOperations(t, operations, eventHistory)
return visualize
}
func validateLinearizableOperationAndVisualize(lg *zap.Logger, operations []porcupine.Operation, timeout time.Duration) (result porcupine.CheckResult, visualize func(basepath string) error) {
linearizable, info := porcupine.CheckOperationsVerbose(model.NonDeterministicModel, operations, timeout)
return linearizable, func(path string) error {
lg.Info("Saving visualization", zap.String("path", path))
err := porcupine.VisualizePath(model.NonDeterministicModel, info, path)
if err != nil {
t.Errorf("Failed to visualize, err: %v", err)
return fmt.Errorf("failed to visualize, err: %v", err)
}
return nil
}
}
func validateSerializableOperations(t *testing.T, operations []porcupine.Operation, totalEventHistory []model.WatchEvent) {
staleReads := filterSerializableReads(operations)
if len(staleReads) == 0 {
return
}
sort.Slice(staleReads, func(i, j int) bool {
return staleReads[i].Input.(model.EtcdRequest).Range.Revision < staleReads[j].Input.(model.EtcdRequest).Range.Revision
})
replay := model.NewReplay(totalEventHistory)
for _, read := range staleReads {
request := read.Input.(model.EtcdRequest)
response := read.Output.(model.MaybeEtcdResponse)
validateSerializableOperation(t, replay, request, response)
}
}
func filterSerializableReads(operations []porcupine.Operation) []porcupine.Operation {
resp := []porcupine.Operation{}
for _, op := range operations {
request := op.Input.(model.EtcdRequest)
if request.Type == model.Range && request.Range.Revision != 0 {
resp = append(resp, op)
}
}
return resp
}
func validateSerializableOperation(t *testing.T, replay *model.EtcdReplay, request model.EtcdRequest, response model.MaybeEtcdResponse) {
if response.PartialResponse || response.Err != nil {
return
}
state, err := replay.StateForRevision(request.Range.Revision)
if err != nil {
t.Fatal(err)
}
_, expectResp := state.Step(request)
if !reflect.DeepEqual(response.EtcdResponse.Range, expectResp.Range) {
t.Errorf("Invalid serializable response, diff: %s", cmp.Diff(response.EtcdResponse.Range, expectResp.Range))
}
}

View File

@ -24,14 +24,13 @@ import (
"go.etcd.io/etcd/tests/v3/robustness/traffic"
)
// ValidateAndReturnVisualize return visualize as porcupine.linearizationInfo used to generate visualization is private.
func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []traffic.ClientReport) (visualize func(basepath string)) {
validateWatch(t, cfg, reports)
// TODO: Validate stale reads responses.
// ValidateAndReturnVisualize returns visualize as porcupine.linearizationInfo used to generate visualization is private.
func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []traffic.ClientReport) (visualize func(basepath string) error) {
eventHistory := validateWatch(t, cfg, reports)
allOperations := operations(reports)
watchEvents := uniqueWatchEvents(reports)
newOperations := patchOperationsWithWatchEvents(allOperations, watchEvents)
return validateOperationHistoryAndReturnVisualize(t, lg, newOperations)
patchedOperations := patchOperationsWithWatchEvents(allOperations, watchEvents)
return validateOperationsAndVisualize(t, lg, patchedOperations, eventHistory)
}
func operations(reports []traffic.ClientReport) []porcupine.Operation {

View File

@ -19,10 +19,11 @@ import (
"github.com/google/go-cmp/cmp"
"go.etcd.io/etcd/tests/v3/robustness/model"
"go.etcd.io/etcd/tests/v3/robustness/traffic"
)
func validateWatch(t *testing.T, cfg Config, reports []traffic.ClientReport) {
func validateWatch(t *testing.T, cfg Config, reports []traffic.ClientReport) []model.WatchEvent {
// Validate etcd watch properties defined in https://etcd.io/docs/v3.6/learning/api_guarantees/#watch-apis
for _, r := range reports {
validateOrdered(t, r)
@ -34,8 +35,10 @@ func validateWatch(t *testing.T, cfg Config, reports []traffic.ClientReport) {
validateEventsMatch(t, reports)
// Expects that longest history encompasses all events.
// TODO: Use combined events from all histories instead of the longest history.
eventHistory := longestEventHistory(reports)
// TODO: Validate that each watch report is reliable, not only the longest one.
validateReliable(t, longestEventHistory(reports))
validateReliable(t, eventHistory)
return watchEvents(eventHistory)
}
func validateBookmarkable(t *testing.T, report traffic.ClientReport) {
@ -127,7 +130,7 @@ func validateEventsMatch(t *testing.T, reports []traffic.ClientReport) {
key string
}
type eventClientId struct {
traffic.WatchEvent
model.WatchEvent
ClientId int
}
revisionKeyToEvent := map[revisionKey]eventClientId{}
@ -158,3 +161,11 @@ func longestEventHistory(report []traffic.ClientReport) []traffic.TimedWatchEven
}
return toWatchEvents(report[longestIndex].Watch)
}
func watchEvents(timed []traffic.TimedWatchEvent) []model.WatchEvent {
result := make([]model.WatchEvent, 0, len(timed))
for _, event := range timed {
result = append(result, event.WatchEvent)
}
return result
}