Compare commits

...

12 Commits

Author SHA1 Message Date
Benjamin Wang 66553d4f07
Merge pull request #16090 from tjungblu/changelog_leaseput
update change logs with lease put improvements
2023-06-21 13:23:16 +01:00
Thomas Jungblut e3f2638aea update change logs with lease put improvements
Signed-off-by: Thomas Jungblut <tjungblu@redhat.com>
2023-06-21 12:18:20 +02:00
Benjamin Wang de6415801e
Merge pull request #16110 from etcd-io/dependabot/github_actions/github/codeql-action-2.20.0
build(deps): bump github/codeql-action from 2.3.6 to 2.20.0
2023-06-20 14:15:52 +01:00
Marek Siarkowicz fd3e338d88
Merge pull request #16115 from serathius/robustness-kubernetes-tune
tests/robustness: Tune Kubernetes tests to reduce number of delete requests
2023-06-20 11:16:02 +02:00
Marek Siarkowicz 486462a907
Merge pull request #16114 from serathius/robustness-test-name-separate
tests/robustness: Separate traffic name from cluster setup in test name
2023-06-20 11:15:41 +02:00
Marek Siarkowicz 519617cfd0 tests/robustness: Tune Kubernetes tests to reduce number of delete requests
Having too many delete requests is bad as they are not unique requests, so
linearization is more prone to timeout on them.

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
2023-06-20 09:45:23 +02:00
Marek Siarkowicz 1217548acf tests/robustness: Separate traffic name from cluster setup in test name
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
2023-06-20 09:16:36 +02:00
dependabot[bot] 1d472bb6e4
build(deps): bump github/codeql-action from 2.3.6 to 2.20.0
Bumps [github/codeql-action](https://github.com/github/codeql-action) from 2.3.6 to 2.20.0.
- [Release notes](https://github.com/github/codeql-action/releases)
- [Changelog](https://github.com/github/codeql-action/blob/main/CHANGELOG.md)
- [Commits](83f0fe6c49...6c089f53dd)

---
updated-dependencies:
- dependency-name: github/codeql-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-06-19 18:01:17 +00:00
Marek Siarkowicz 9c659eb4e0
Merge pull request #16072 from serathius/robustness-stale-read
Validate stale read
2023-06-19 18:22:08 +02:00
Marek Siarkowicz 1420292b10
Merge pull request #16092 from serathius/robustness-etcdctl-traffic-client
Robustness etcd traffic client
2023-06-19 16:10:55 +02:00
Marek Siarkowicz 1663600bec tests/robustness: Validate stale get requests by replaying etcd state
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
2023-06-19 14:17:38 +02:00
Marek Siarkowicz 09b9f889e7 tests/robustness: Refactor etcd traffic client
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
2023-06-19 12:08:17 +02:00
17 changed files with 280 additions and 92 deletions

View File

@ -40,7 +40,7 @@ jobs:
uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # v3.5.3
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@83f0fe6c4988d98a455712a27f0255212bba9bd4 # v2.3.6
uses: github/codeql-action/init@6c089f53dd51dc3fc7e599c3cb5356453a52ca9e # v2.20.0
with:
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
@ -50,6 +50,6 @@ jobs:
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@83f0fe6c4988d98a455712a27f0255212bba9bd4 # v2.3.6
uses: github/codeql-action/autobuild@6c089f53dd51dc3fc7e599c3cb5356453a52ca9e # v2.20.0
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@83f0fe6c4988d98a455712a27f0255212bba9bd4 # v2.3.6
uses: github/codeql-action/analyze@6c089f53dd51dc3fc7e599c3cb5356453a52ca9e # v2.20.0

View File

@ -50,6 +50,6 @@ jobs:
# Upload the results to GitHub's code scanning dashboard.
- name: "Upload to code-scanning"
uses: github/codeql-action/upload-sarif@83f0fe6c4988d98a455712a27f0255212bba9bd4 # tag=v1.0.26
uses: github/codeql-action/upload-sarif@6c089f53dd51dc3fc7e599c3cb5356453a52ca9e # tag=v1.0.26
with:
sarif_file: results.sarif

View File

@ -8,6 +8,7 @@ Previous change logs can be found at [CHANGELOG-3.3](https://github.com/etcd-io/
### etcd server
- Fix [corruption check may get a `ErrCompacted` error when server has just been compacted](https://github.com/etcd-io/etcd/pull/16047)
- Improve [Lease put performance for the case that auth is disabled or the user is admin](https://github.com/etcd-io/etcd/pull/16020)
## v3.4.26 (2023-05-12)

View File

@ -8,6 +8,7 @@ Previous change logs can be found at [CHANGELOG-3.4](https://github.com/etcd-io/
### etcd server
- Fix [corruption check may get a `ErrCompacted` error when server has just been compacted](https://github.com/etcd-io/etcd/pull/16048)
- Improve [Lease put performance for the case that auth is disabled or the user is admin](https://github.com/etcd-io/etcd/pull/16019)
### etcd grpc-proxy
- Fix [Memberlist results not updated when proxy node down](https://github.com/etcd-io/etcd/pull/15907).

View File

@ -39,7 +39,7 @@ func TestRobustness(t *testing.T) {
scenarios := []testScenario{}
for _, traffic := range []traffic.Config{traffic.LowTraffic, traffic.HighTraffic, traffic.KubernetesTraffic} {
scenarios = append(scenarios, testScenario{
name: traffic.Name + "ClusterOfSize1",
name: traffic.Name + "/ClusterOfSize1",
traffic: traffic,
cluster: *e2e.NewConfig(
e2e.WithClusterSize(1),
@ -61,7 +61,7 @@ func TestRobustness(t *testing.T) {
clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithSnapshotCatchUpEntries(100))
}
scenarios = append(scenarios, testScenario{
name: traffic.Name + "ClusterOfSize3",
name: traffic.Name + "/ClusterOfSize3",
traffic: traffic,
cluster: *e2e.NewConfig(clusterOfSize3Options...),
})

View File

@ -47,12 +47,12 @@ var DeterministicModel = porcupine.Model{
return string(data)
},
Step: func(st interface{}, in interface{}, out interface{}) (bool, interface{}) {
var s etcdState
var s EtcdState
err := json.Unmarshal([]byte(st.(string)), &s)
if err != nil {
panic(err)
}
ok, s := s.Step(in.(EtcdRequest), out.(EtcdResponse))
ok, s := s.apply(in.(EtcdRequest), out.(EtcdResponse))
data, err := json.Marshal(s)
if err != nil {
panic(err)
@ -64,20 +64,20 @@ var DeterministicModel = porcupine.Model{
},
}
type etcdState struct {
type EtcdState struct {
Revision int64
KeyValues map[string]ValueRevision
KeyLeases map[string]int64
Leases map[int64]EtcdLease
}
func (s etcdState) Step(request EtcdRequest, response EtcdResponse) (bool, etcdState) {
newState, modelResponse := s.step(request)
func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, EtcdState) {
newState, modelResponse := s.Step(request)
return Match(MaybeEtcdResponse{EtcdResponse: response}, modelResponse), newState
}
func freshEtcdState() etcdState {
return etcdState{
func freshEtcdState() EtcdState {
return EtcdState{
Revision: 1,
KeyValues: map[string]ValueRevision{},
KeyLeases: map[string]int64{},
@ -85,8 +85,8 @@ func freshEtcdState() etcdState {
}
}
// step handles a successful request, returning updated state and response it would generate.
func (s etcdState) step(request EtcdRequest) (etcdState, MaybeEtcdResponse) {
// Step handles a successful request, returning updated state and response it would generate.
func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
newKVs := map[string]ValueRevision{}
for k, v := range s.KeyValues {
newKVs[k] = v
@ -185,7 +185,7 @@ func (s etcdState) step(request EtcdRequest) (etcdState, MaybeEtcdResponse) {
}
}
func (s etcdState) getRange(key string, options RangeOptions) RangeResponse {
func (s EtcdState) getRange(key string, options RangeOptions) RangeResponse {
response := RangeResponse{
KVs: []KeyValue{},
}
@ -217,7 +217,7 @@ func (s etcdState) getRange(key string, options RangeOptions) RangeResponse {
return response
}
func detachFromOldLease(s etcdState, key string) etcdState {
func detachFromOldLease(s EtcdState, key string) EtcdState {
if oldLeaseId, ok := s.KeyLeases[key]; ok {
delete(s.Leases[oldLeaseId].Keys, key)
delete(s.KeyLeases, key)
@ -225,7 +225,7 @@ func detachFromOldLease(s etcdState, key string) etcdState {
return s
}
func attachToNewLease(s etcdState, leaseID int64, key string) etcdState {
func attachToNewLease(s EtcdState, leaseID int64, key string) EtcdState {
s.KeyLeases[key] = leaseID
s.Leases[leaseID].Keys[key] = leased
return s

View File

@ -33,12 +33,12 @@ func TestModelDeterministic(t *testing.T) {
if op.expectFailure == ok {
t.Logf("state: %v", state)
t.Errorf("Unexpected operation result, expect: %v, got: %v, operation: %s", !op.expectFailure, ok, DeterministicModel.DescribeOperation(op.req, op.resp.EtcdResponse))
var loadedState etcdState
var loadedState EtcdState
err := json.Unmarshal([]byte(state.(string)), &loadedState)
if err != nil {
t.Fatalf("Failed to load state: %v", err)
}
_, resp := loadedState.step(op.req)
_, resp := loadedState.Step(op.req)
t.Errorf("Response diff: %s", cmp.Diff(op.resp, resp))
break
}

View File

@ -40,7 +40,7 @@ var NonDeterministicModel = porcupine.Model{
if err != nil {
panic(err)
}
ok, states := states.Step(in.(EtcdRequest), out.(MaybeEtcdResponse))
ok, states := states.apply(in.(EtcdRequest), out.(MaybeEtcdResponse))
data, err := json.Marshal(states)
if err != nil {
panic(err)
@ -52,27 +52,27 @@ var NonDeterministicModel = porcupine.Model{
},
}
type nonDeterministicState []etcdState
type nonDeterministicState []EtcdState
func (states nonDeterministicState) Step(request EtcdRequest, response MaybeEtcdResponse) (bool, nonDeterministicState) {
func (states nonDeterministicState) apply(request EtcdRequest, response MaybeEtcdResponse) (bool, nonDeterministicState) {
var newStates nonDeterministicState
switch {
case response.Err != nil:
newStates = states.stepFailedRequest(request)
newStates = states.stepFailedResponse(request)
case response.PartialResponse:
newStates = states.stepPartialRequest(request, response.EtcdResponse.Revision)
newStates = states.applyResponseRevision(request, response.EtcdResponse.Revision)
default:
newStates = states.stepSuccessfulRequest(request, response.EtcdResponse)
newStates = states.applySuccessfulResponse(request, response.EtcdResponse)
}
return len(newStates) > 0, newStates
}
// stepFailedRequest duplicates number of states by considering request persisted and lost.
func (states nonDeterministicState) stepFailedRequest(request EtcdRequest) nonDeterministicState {
// stepFailedResponse duplicates number of states by considering both cases, request was persisted and request was lost.
func (states nonDeterministicState) stepFailedResponse(request EtcdRequest) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states)*2)
for _, s := range states {
newStates = append(newStates, s)
newState, _ := s.step(request)
newState, _ := s.Step(request)
if !reflect.DeepEqual(newState, s) {
newStates = append(newStates, newState)
}
@ -80,11 +80,11 @@ func (states nonDeterministicState) stepFailedRequest(request EtcdRequest) nonDe
return newStates
}
// stepPartialRequest filters possible states by leaving ony states that would return proper revision.
func (states nonDeterministicState) stepPartialRequest(request EtcdRequest, responseRevision int64) nonDeterministicState {
// applyResponseRevision filters possible states by leaving ony states that would return proper revision.
func (states nonDeterministicState) applyResponseRevision(request EtcdRequest, responseRevision int64) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states))
for _, s := range states {
newState, modelResponse := s.step(request)
newState, modelResponse := s.Step(request)
if modelResponse.Revision == responseRevision {
newStates = append(newStates, newState)
}
@ -92,11 +92,11 @@ func (states nonDeterministicState) stepPartialRequest(request EtcdRequest, resp
return newStates
}
// stepSuccessfulRequest filters possible states by leaving ony states that would respond correctly.
func (states nonDeterministicState) stepSuccessfulRequest(request EtcdRequest, response EtcdResponse) nonDeterministicState {
// applySuccessfulResponse filters possible states by leaving ony states that would respond correctly.
func (states nonDeterministicState) applySuccessfulResponse(request EtcdRequest, response EtcdResponse) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states))
for _, s := range states {
newState, modelResponse := s.step(request)
newState, modelResponse := s.Step(request)
if Match(modelResponse, MaybeEtcdResponse{EtcdResponse: response}) {
newStates = append(newStates, newState)
}

View File

@ -339,7 +339,7 @@ func TestModelNonDeterministic(t *testing.T) {
t.Fatalf("Failed to load state: %v", err)
}
for i, s := range loadedState {
_, resp := s.step(op.req)
_, resp := s.Step(op.req)
t.Errorf("For state %d, response diff: %s", i, cmp.Diff(op.resp, resp))
}
break

View File

@ -0,0 +1,99 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
"fmt"
)
func NewReplay(eventHistory []WatchEvent) *EtcdReplay {
var lastEventRevision int64 = 1
for _, event := range eventHistory {
if event.Revision > lastEventRevision && event.Revision != lastEventRevision+1 {
panic("Replay requires a complete event history")
}
lastEventRevision = event.Revision
}
return &EtcdReplay{
eventHistory: eventHistory,
}
}
type EtcdReplay struct {
eventHistory []WatchEvent
// Cached state and event index used for it's calculation
cachedState *EtcdState
eventHistoryIndex int
}
func (r *EtcdReplay) StateForRevision(revision int64) (EtcdState, error) {
if revision < 1 {
return EtcdState{}, fmt.Errorf("invalid revision: %d", revision)
}
if r.cachedState == nil || r.cachedState.Revision > revision {
r.reset()
}
for r.eventHistoryIndex < len(r.eventHistory) && r.cachedState.Revision < revision {
nextRequest, nextRevision, nextIndex := r.next()
newState, _ := r.cachedState.Step(nextRequest)
if newState.Revision != nextRevision {
return EtcdState{}, fmt.Errorf("model returned different revision than one present in event history, model: %d, event: %d", newState.Revision, nextRevision)
}
r.cachedState = &newState
r.eventHistoryIndex = nextIndex
}
if r.eventHistoryIndex > len(r.eventHistory) && r.cachedState.Revision < revision {
return EtcdState{}, fmt.Errorf("requested revision higher then available in even history, requested: %d, model: %d", revision, r.cachedState.Revision)
}
return *r.cachedState, nil
}
func (r *EtcdReplay) reset() {
state := freshEtcdState()
r.cachedState = &state
r.eventHistoryIndex = 0
}
func (r *EtcdReplay) next() (request EtcdRequest, revision int64, index int) {
revision = r.eventHistory[r.eventHistoryIndex].Revision
index = r.eventHistoryIndex
operations := []EtcdOperation{}
for r.eventHistory[index].Revision == revision {
operations = append(operations, r.eventHistory[index].Op)
index++
}
return EtcdRequest{
Type: Txn,
Txn: &TxnRequest{
OperationsOnSuccess: operations,
},
}, revision, index
}
func operationToRequest(op EtcdOperation) EtcdRequest {
return EtcdRequest{
Type: Txn,
Txn: &TxnRequest{
OperationsOnSuccess: []EtcdOperation{op},
},
}
}
type WatchEvent struct {
Op EtcdOperation
Revision int64
}

View File

@ -34,7 +34,7 @@ type report struct {
lg *zap.Logger
clus *e2e.EtcdProcessCluster
clientReports []traffic.ClientReport
visualizeHistory func(path string)
visualizeHistory func(path string) error
}
func testResultsDirectory(t *testing.T) string {
@ -89,7 +89,10 @@ func (r *report) Report(t *testing.T, force bool) {
}
}
if r.visualizeHistory != nil {
r.visualizeHistory(filepath.Join(path, "history.html"))
err := r.visualizeHistory(filepath.Join(path, "history.html"))
if err != nil {
t.Error(err)
}
}
}

View File

@ -46,22 +46,17 @@ type RecordingClient struct {
}
type WatchResponse struct {
Events []WatchEvent
Events []model.WatchEvent
IsProgressNotify bool
Revision int64
Time time.Duration
}
type TimedWatchEvent struct {
WatchEvent
model.WatchEvent
Time time.Duration
}
type WatchEvent struct {
Op model.EtcdOperation
Revision int64
}
func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*RecordingClient, error) {
cc, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
@ -259,7 +254,7 @@ func ToWatchResponse(r clientv3.WatchResponse, baseTime time.Time) WatchResponse
return resp
}
func toWatchEvent(event clientv3.Event) WatchEvent {
func toWatchEvent(event clientv3.Event) model.WatchEvent {
var op model.OperationType
switch event.Type {
case mvccpb.PUT:
@ -269,7 +264,7 @@ func toWatchEvent(event clientv3.Event) WatchEvent {
default:
panic(fmt.Sprintf("Unexpected event type: %s", event.Type))
}
return WatchEvent{
return model.WatchEvent{
Revision: event.Kv.ModRevision,
Op: model.EtcdOperation{
Type: op,

View File

@ -100,6 +100,13 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.
lastOperationSucceeded := true
var lastRev int64
var requestType etcdRequestType
client := etcdTrafficClient{
etcdTraffic: t,
client: c,
limiter: limiter,
idProvider: ids,
leaseStorage: lm,
}
for {
select {
case <-ctx.Done():
@ -115,7 +122,7 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.
} else {
requestType = Get
}
rev, err := t.Request(ctx, c, requestType, limiter, key, ids, lm, lastRev)
rev, err := client.Request(ctx, requestType, key, lastRev)
lastOperationSucceeded = err == nil
if err != nil {
continue
@ -127,87 +134,95 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.
}
}
func (t etcdTraffic) Request(ctx context.Context, c *RecordingClient, request etcdRequestType, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage, lastRev int64) (rev int64, err error) {
type etcdTrafficClient struct {
etcdTraffic
client *RecordingClient
limiter *rate.Limiter
idProvider identity.Provider
leaseStorage identity.LeaseIdStorage
}
func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType, key string, lastRev int64) (rev int64, err error) {
opCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
switch request {
case StaleGet:
_, rev, err = c.Get(opCtx, key, lastRev)
_, rev, err = c.client.Get(opCtx, key, lastRev)
case Get:
_, rev, err = c.Get(opCtx, key, 0)
_, rev, err = c.client.Get(opCtx, key, 0)
case Put:
var resp *clientv3.PutResponse
resp, err = c.Put(opCtx, key, fmt.Sprintf("%d", id.NewRequestId()))
resp, err = c.client.Put(opCtx, key, fmt.Sprintf("%d", c.idProvider.NewRequestId()))
if resp != nil {
rev = resp.Header.Revision
}
case LargePut:
var resp *clientv3.PutResponse
resp, err = c.Put(opCtx, key, randString(t.largePutSize))
resp, err = c.client.Put(opCtx, key, randString(c.largePutSize))
if resp != nil {
rev = resp.Header.Revision
}
case Delete:
var resp *clientv3.DeleteResponse
resp, err = c.Delete(opCtx, key)
resp, err = c.client.Delete(opCtx, key)
if resp != nil {
rev = resp.Header.Revision
}
case MultiOpTxn:
var resp *clientv3.TxnResponse
resp, err = c.Txn(opCtx, nil, t.pickMultiTxnOps(id), nil)
resp, err = c.client.Txn(opCtx, nil, c.pickMultiTxnOps(), nil)
if resp != nil {
rev = resp.Header.Revision
}
case CompareAndSet:
var kv *mvccpb.KeyValue
kv, rev, err = c.Get(opCtx, key, 0)
kv, rev, err = c.client.Get(opCtx, key, 0)
if err == nil {
limiter.Wait(ctx)
c.limiter.Wait(ctx)
var expectedRevision int64
if kv != nil {
expectedRevision = kv.ModRevision
}
txnCtx, txnCancel := context.WithTimeout(ctx, RequestTimeout)
var resp *clientv3.TxnResponse
resp, err = c.Txn(txnCtx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, fmt.Sprintf("%d", id.NewRequestId()))}, nil)
resp, err = c.client.Txn(txnCtx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, fmt.Sprintf("%d", c.idProvider.NewRequestId()))}, nil)
txnCancel()
if resp != nil {
rev = resp.Header.Revision
}
}
case PutWithLease:
leaseId := lm.LeaseId(c.id)
leaseId := c.leaseStorage.LeaseId(c.client.id)
if leaseId == 0 {
var resp *clientv3.LeaseGrantResponse
resp, err = c.LeaseGrant(opCtx, t.leaseTTL)
resp, err = c.client.LeaseGrant(opCtx, c.leaseTTL)
if resp != nil {
leaseId = int64(resp.ID)
rev = resp.ResponseHeader.Revision
}
if err == nil {
lm.AddLeaseId(c.id, leaseId)
limiter.Wait(ctx)
c.leaseStorage.AddLeaseId(c.client.id, leaseId)
c.limiter.Wait(ctx)
}
}
if leaseId != 0 {
putCtx, putCancel := context.WithTimeout(ctx, RequestTimeout)
var resp *clientv3.PutResponse
resp, err = c.PutWithLease(putCtx, key, fmt.Sprintf("%d", id.NewRequestId()), leaseId)
resp, err = c.client.PutWithLease(putCtx, key, fmt.Sprintf("%d", c.idProvider.NewRequestId()), leaseId)
putCancel()
if resp != nil {
rev = resp.Header.Revision
}
}
case LeaseRevoke:
leaseId := lm.LeaseId(c.id)
leaseId := c.leaseStorage.LeaseId(c.client.id)
if leaseId != 0 {
var resp *clientv3.LeaseRevokeResponse
resp, err = c.LeaseRevoke(opCtx, leaseId)
resp, err = c.client.LeaseRevoke(opCtx, leaseId)
//if LeaseRevoke has failed, do not remove the mapping.
if err == nil {
lm.RemoveLeaseId(c.id)
c.leaseStorage.RemoveLeaseId(c.client.id)
}
if resp != nil {
rev = resp.Header.Revision
@ -215,7 +230,7 @@ func (t etcdTraffic) Request(ctx context.Context, c *RecordingClient, request et
}
case Defragment:
var resp *clientv3.DefragmentResponse
resp, err = c.Defragment(opCtx)
resp, err = c.client.Defragment(opCtx)
if resp != nil {
rev = resp.Header.Revision
}
@ -226,13 +241,13 @@ func (t etcdTraffic) Request(ctx context.Context, c *RecordingClient, request et
return rev, err
}
func (t etcdTraffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op) {
keys := rand.Perm(t.keyCount)
func (c etcdTrafficClient) pickMultiTxnOps() (ops []clientv3.Op) {
keys := rand.Perm(c.keyCount)
opTypes := make([]model.OperationType, 4)
atLeastOnePut := false
for i := 0; i < MultiOpTxnOpCount; i++ {
opTypes[i] = t.pickOperationType()
opTypes[i] = c.pickOperationType()
if opTypes[i] == model.PutOperation {
atLeastOnePut = true
}
@ -248,7 +263,7 @@ func (t etcdTraffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op)
case model.RangeOperation:
ops = append(ops, clientv3.OpGet(key))
case model.PutOperation:
value := fmt.Sprintf("%d", ids.NewRequestId())
value := fmt.Sprintf("%d", c.idProvider.NewRequestId())
ops = append(ops, clientv3.OpPut(key, value))
case model.DeleteOperation:
ops = append(ops, clientv3.OpDelete(key))

View File

@ -37,13 +37,13 @@ var (
maximalQPS: 1000,
clientCount: 12,
Traffic: kubernetesTraffic{
averageKeyCount: 5,
averageKeyCount: 10,
resource: "pods",
namespace: "default",
writeChoices: []choiceWeight[KubernetesRequestType]{
{choice: KubernetesUpdate, weight: 75},
{choice: KubernetesDelete, weight: 15},
{choice: KubernetesCreate, weight: 10},
{choice: KubernetesUpdate, weight: 90},
{choice: KubernetesDelete, weight: 5},
{choice: KubernetesCreate, weight: 5},
},
},
}

View File

@ -15,28 +15,92 @@
package validate
import (
"fmt"
"reflect"
"sort"
"testing"
"time"
"github.com/anishathalye/porcupine"
"github.com/google/go-cmp/cmp"
"go.uber.org/zap"
"go.etcd.io/etcd/tests/v3/robustness/model"
)
func validateOperationHistoryAndReturnVisualize(t *testing.T, lg *zap.Logger, operations []porcupine.Operation) (visualize func(basepath string)) {
linearizable, info := porcupine.CheckOperationsVerbose(model.NonDeterministicModel, operations, 5*time.Minute)
if linearizable == porcupine.Illegal {
func validateOperationsAndVisualize(t *testing.T, lg *zap.Logger, operations []porcupine.Operation, eventHistory []model.WatchEvent) (visualize func(basepath string) error) {
const timeout = 5 * time.Minute
lg.Info("Validating linearizable operations", zap.Duration("timeout", timeout))
result, visualize := validateLinearizableOperationAndVisualize(lg, operations, timeout)
switch result {
case porcupine.Illegal:
t.Error("Linearization failed for provided operations")
return
case porcupine.Unknown:
t.Error("Model is not linearizable")
return
case porcupine.Ok:
t.Log("Linearization passed")
default:
t.Fatalf("Unknown Linearization")
}
if linearizable == porcupine.Unknown {
t.Error("Linearization timed out")
}
return func(path string) {
lg.Info("Validating serializable operations")
// TODO: Use linearization result instead of event history to get order of events
// This is currently impossible as porcupine doesn't expose operation order created during linearization.
validateSerializableOperations(t, operations, eventHistory)
return visualize
}
func validateLinearizableOperationAndVisualize(lg *zap.Logger, operations []porcupine.Operation, timeout time.Duration) (result porcupine.CheckResult, visualize func(basepath string) error) {
linearizable, info := porcupine.CheckOperationsVerbose(model.NonDeterministicModel, operations, timeout)
return linearizable, func(path string) error {
lg.Info("Saving visualization", zap.String("path", path))
err := porcupine.VisualizePath(model.NonDeterministicModel, info, path)
if err != nil {
t.Errorf("Failed to visualize, err: %v", err)
return fmt.Errorf("failed to visualize, err: %v", err)
}
return nil
}
}
func validateSerializableOperations(t *testing.T, operations []porcupine.Operation, totalEventHistory []model.WatchEvent) {
staleReads := filterSerializableReads(operations)
if len(staleReads) == 0 {
return
}
sort.Slice(staleReads, func(i, j int) bool {
return staleReads[i].Input.(model.EtcdRequest).Range.Revision < staleReads[j].Input.(model.EtcdRequest).Range.Revision
})
replay := model.NewReplay(totalEventHistory)
for _, read := range staleReads {
request := read.Input.(model.EtcdRequest)
response := read.Output.(model.MaybeEtcdResponse)
validateSerializableOperation(t, replay, request, response)
}
}
func filterSerializableReads(operations []porcupine.Operation) []porcupine.Operation {
resp := []porcupine.Operation{}
for _, op := range operations {
request := op.Input.(model.EtcdRequest)
if request.Type == model.Range && request.Range.Revision != 0 {
resp = append(resp, op)
}
}
return resp
}
func validateSerializableOperation(t *testing.T, replay *model.EtcdReplay, request model.EtcdRequest, response model.MaybeEtcdResponse) {
if response.PartialResponse || response.Err != nil {
return
}
state, err := replay.StateForRevision(request.Range.Revision)
if err != nil {
t.Fatal(err)
}
_, expectResp := state.Step(request)
if !reflect.DeepEqual(response.EtcdResponse.Range, expectResp.Range) {
t.Errorf("Invalid serializable response, diff: %s", cmp.Diff(response.EtcdResponse.Range, expectResp.Range))
}
}

View File

@ -24,14 +24,13 @@ import (
"go.etcd.io/etcd/tests/v3/robustness/traffic"
)
// ValidateAndReturnVisualize return visualize as porcupine.linearizationInfo used to generate visualization is private.
func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []traffic.ClientReport) (visualize func(basepath string)) {
validateWatch(t, cfg, reports)
// TODO: Validate stale reads responses.
// ValidateAndReturnVisualize returns visualize as porcupine.linearizationInfo used to generate visualization is private.
func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []traffic.ClientReport) (visualize func(basepath string) error) {
eventHistory := validateWatch(t, cfg, reports)
allOperations := operations(reports)
watchEvents := uniqueWatchEvents(reports)
newOperations := patchOperationsWithWatchEvents(allOperations, watchEvents)
return validateOperationHistoryAndReturnVisualize(t, lg, newOperations)
patchedOperations := patchOperationsWithWatchEvents(allOperations, watchEvents)
return validateOperationsAndVisualize(t, lg, patchedOperations, eventHistory)
}
func operations(reports []traffic.ClientReport) []porcupine.Operation {

View File

@ -19,10 +19,11 @@ import (
"github.com/google/go-cmp/cmp"
"go.etcd.io/etcd/tests/v3/robustness/model"
"go.etcd.io/etcd/tests/v3/robustness/traffic"
)
func validateWatch(t *testing.T, cfg Config, reports []traffic.ClientReport) {
func validateWatch(t *testing.T, cfg Config, reports []traffic.ClientReport) []model.WatchEvent {
// Validate etcd watch properties defined in https://etcd.io/docs/v3.6/learning/api_guarantees/#watch-apis
for _, r := range reports {
validateOrdered(t, r)
@ -34,8 +35,10 @@ func validateWatch(t *testing.T, cfg Config, reports []traffic.ClientReport) {
validateEventsMatch(t, reports)
// Expects that longest history encompasses all events.
// TODO: Use combined events from all histories instead of the longest history.
eventHistory := longestEventHistory(reports)
// TODO: Validate that each watch report is reliable, not only the longest one.
validateReliable(t, longestEventHistory(reports))
validateReliable(t, eventHistory)
return watchEvents(eventHistory)
}
func validateBookmarkable(t *testing.T, report traffic.ClientReport) {
@ -127,7 +130,7 @@ func validateEventsMatch(t *testing.T, reports []traffic.ClientReport) {
key string
}
type eventClientId struct {
traffic.WatchEvent
model.WatchEvent
ClientId int
}
revisionKeyToEvent := map[revisionKey]eventClientId{}
@ -158,3 +161,11 @@ func longestEventHistory(report []traffic.ClientReport) []traffic.TimedWatchEven
}
return toWatchEvents(report[longestIndex].Watch)
}
func watchEvents(timed []traffic.TimedWatchEvent) []model.WatchEvent {
result := make([]model.WatchEvent, 0, len(timed))
for _, event := range timed {
result = append(result, event.WatchEvent)
}
return result
}