Compare commits

..

12 Commits

Author SHA1 Message Date
Benjamin Wang 66553d4f07
Merge pull request #16090 from tjungblu/changelog_leaseput
update change logs with lease put improvements
2023-06-21 13:23:16 +01:00
Thomas Jungblut e3f2638aea update change logs with lease put improvements
Signed-off-by: Thomas Jungblut <tjungblu@redhat.com>
2023-06-21 12:18:20 +02:00
Benjamin Wang de6415801e
Merge pull request #16110 from etcd-io/dependabot/github_actions/github/codeql-action-2.20.0
build(deps): bump github/codeql-action from 2.3.6 to 2.20.0
2023-06-20 14:15:52 +01:00
Marek Siarkowicz fd3e338d88
Merge pull request #16115 from serathius/robustness-kubernetes-tune
tests/robustness: Tune Kubernetes tests to reduce number of delete requests
2023-06-20 11:16:02 +02:00
Marek Siarkowicz 486462a907
Merge pull request #16114 from serathius/robustness-test-name-separate
tests/robustness: Separate traffic name from cluster setup in test name
2023-06-20 11:15:41 +02:00
Marek Siarkowicz 519617cfd0 tests/robustness: Tune Kubernetes tests to reduce number of delete requests
Having too many delete requests is bad as they are not unique requests, so
linearization is more prone to timeout on them.

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
2023-06-20 09:45:23 +02:00
Marek Siarkowicz 1217548acf tests/robustness: Separate traffic name from cluster setup in test name
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
2023-06-20 09:16:36 +02:00
dependabot[bot] 1d472bb6e4
build(deps): bump github/codeql-action from 2.3.6 to 2.20.0
Bumps [github/codeql-action](https://github.com/github/codeql-action) from 2.3.6 to 2.20.0.
- [Release notes](https://github.com/github/codeql-action/releases)
- [Changelog](https://github.com/github/codeql-action/blob/main/CHANGELOG.md)
- [Commits](83f0fe6c49...6c089f53dd)

---
updated-dependencies:
- dependency-name: github/codeql-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-06-19 18:01:17 +00:00
Marek Siarkowicz 9c659eb4e0
Merge pull request #16072 from serathius/robustness-stale-read
Validate stale read
2023-06-19 18:22:08 +02:00
Marek Siarkowicz 1420292b10
Merge pull request #16092 from serathius/robustness-etcdctl-traffic-client
Robustness etcd traffic client
2023-06-19 16:10:55 +02:00
Marek Siarkowicz 1663600bec tests/robustness: Validate stale get requests by replaying etcd state
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
2023-06-19 14:17:38 +02:00
Marek Siarkowicz 09b9f889e7 tests/robustness: Refactor etcd traffic client
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
2023-06-19 12:08:17 +02:00
19 changed files with 283 additions and 95 deletions

View File

@ -40,7 +40,7 @@ jobs:
uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # v3.5.3 uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # v3.5.3
# Initializes the CodeQL tools for scanning. # Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL - name: Initialize CodeQL
uses: github/codeql-action/init@83f0fe6c4988d98a455712a27f0255212bba9bd4 # v2.3.6 uses: github/codeql-action/init@6c089f53dd51dc3fc7e599c3cb5356453a52ca9e # v2.20.0
with: with:
# If you wish to specify custom queries, you can do so here or in a config file. # If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file. # By default, queries listed here will override any specified in a config file.
@ -50,6 +50,6 @@ jobs:
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java). # Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below) # If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild - name: Autobuild
uses: github/codeql-action/autobuild@83f0fe6c4988d98a455712a27f0255212bba9bd4 # v2.3.6 uses: github/codeql-action/autobuild@6c089f53dd51dc3fc7e599c3cb5356453a52ca9e # v2.20.0
- name: Perform CodeQL Analysis - name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@83f0fe6c4988d98a455712a27f0255212bba9bd4 # v2.3.6 uses: github/codeql-action/analyze@6c089f53dd51dc3fc7e599c3cb5356453a52ca9e # v2.20.0

View File

@ -50,6 +50,6 @@ jobs:
# Upload the results to GitHub's code scanning dashboard. # Upload the results to GitHub's code scanning dashboard.
- name: "Upload to code-scanning" - name: "Upload to code-scanning"
uses: github/codeql-action/upload-sarif@83f0fe6c4988d98a455712a27f0255212bba9bd4 # tag=v1.0.26 uses: github/codeql-action/upload-sarif@6c089f53dd51dc3fc7e599c3cb5356453a52ca9e # tag=v1.0.26
with: with:
sarif_file: results.sarif sarif_file: results.sarif

View File

@ -8,6 +8,7 @@ Previous change logs can be found at [CHANGELOG-3.3](https://github.com/etcd-io/
### etcd server ### etcd server
- Fix [corruption check may get a `ErrCompacted` error when server has just been compacted](https://github.com/etcd-io/etcd/pull/16047) - Fix [corruption check may get a `ErrCompacted` error when server has just been compacted](https://github.com/etcd-io/etcd/pull/16047)
- Improve [Lease put performance for the case that auth is disabled or the user is admin](https://github.com/etcd-io/etcd/pull/16020)
## v3.4.26 (2023-05-12) ## v3.4.26 (2023-05-12)

View File

@ -8,6 +8,7 @@ Previous change logs can be found at [CHANGELOG-3.4](https://github.com/etcd-io/
### etcd server ### etcd server
- Fix [corruption check may get a `ErrCompacted` error when server has just been compacted](https://github.com/etcd-io/etcd/pull/16048) - Fix [corruption check may get a `ErrCompacted` error when server has just been compacted](https://github.com/etcd-io/etcd/pull/16048)
- Improve [Lease put performance for the case that auth is disabled or the user is admin](https://github.com/etcd-io/etcd/pull/16019)
### etcd grpc-proxy ### etcd grpc-proxy
- Fix [Memberlist results not updated when proxy node down](https://github.com/etcd-io/etcd/pull/15907). - Fix [Memberlist results not updated when proxy node down](https://github.com/etcd-io/etcd/pull/15907).

2
go.mod
View File

@ -72,7 +72,7 @@ require (
github.com/prometheus/client_golang v1.15.1 // indirect github.com/prometheus/client_golang v1.15.1 // indirect
github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.43.0 // indirect github.com/prometheus/common v0.43.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect github.com/sirupsen/logrus v1.8.1 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect github.com/soheilhy/cmux v0.1.5 // indirect

4
go.sum
View File

@ -236,8 +236,8 @@ github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUo
github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU=
github.com/prometheus/common v0.43.0 h1:iq+BVjvYLei5f27wiuNiB1DN6DYQkp1c8Bx0Vykh5us= github.com/prometheus/common v0.43.0 h1:iq+BVjvYLei5f27wiuNiB1DN6DYQkp1c8Bx0Vykh5us=
github.com/prometheus/common v0.43.0/go.mod h1:NCvr5cQIh3Y/gy73/RdVtC9r8xxrxwJnB+2lB3BxrFc= github.com/prometheus/common v0.43.0/go.mod h1:NCvr5cQIh3Y/gy73/RdVtC9r8xxrxwJnB+2lB3BxrFc=
github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuRbyk= github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI=
github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY=
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=

View File

@ -39,7 +39,7 @@ func TestRobustness(t *testing.T) {
scenarios := []testScenario{} scenarios := []testScenario{}
for _, traffic := range []traffic.Config{traffic.LowTraffic, traffic.HighTraffic, traffic.KubernetesTraffic} { for _, traffic := range []traffic.Config{traffic.LowTraffic, traffic.HighTraffic, traffic.KubernetesTraffic} {
scenarios = append(scenarios, testScenario{ scenarios = append(scenarios, testScenario{
name: traffic.Name + "ClusterOfSize1", name: traffic.Name + "/ClusterOfSize1",
traffic: traffic, traffic: traffic,
cluster: *e2e.NewConfig( cluster: *e2e.NewConfig(
e2e.WithClusterSize(1), e2e.WithClusterSize(1),
@ -61,7 +61,7 @@ func TestRobustness(t *testing.T) {
clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithSnapshotCatchUpEntries(100)) clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithSnapshotCatchUpEntries(100))
} }
scenarios = append(scenarios, testScenario{ scenarios = append(scenarios, testScenario{
name: traffic.Name + "ClusterOfSize3", name: traffic.Name + "/ClusterOfSize3",
traffic: traffic, traffic: traffic,
cluster: *e2e.NewConfig(clusterOfSize3Options...), cluster: *e2e.NewConfig(clusterOfSize3Options...),
}) })

View File

@ -47,12 +47,12 @@ var DeterministicModel = porcupine.Model{
return string(data) return string(data)
}, },
Step: func(st interface{}, in interface{}, out interface{}) (bool, interface{}) { Step: func(st interface{}, in interface{}, out interface{}) (bool, interface{}) {
var s etcdState var s EtcdState
err := json.Unmarshal([]byte(st.(string)), &s) err := json.Unmarshal([]byte(st.(string)), &s)
if err != nil { if err != nil {
panic(err) panic(err)
} }
ok, s := s.Step(in.(EtcdRequest), out.(EtcdResponse)) ok, s := s.apply(in.(EtcdRequest), out.(EtcdResponse))
data, err := json.Marshal(s) data, err := json.Marshal(s)
if err != nil { if err != nil {
panic(err) panic(err)
@ -64,20 +64,20 @@ var DeterministicModel = porcupine.Model{
}, },
} }
type etcdState struct { type EtcdState struct {
Revision int64 Revision int64
KeyValues map[string]ValueRevision KeyValues map[string]ValueRevision
KeyLeases map[string]int64 KeyLeases map[string]int64
Leases map[int64]EtcdLease Leases map[int64]EtcdLease
} }
func (s etcdState) Step(request EtcdRequest, response EtcdResponse) (bool, etcdState) { func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, EtcdState) {
newState, modelResponse := s.step(request) newState, modelResponse := s.Step(request)
return Match(MaybeEtcdResponse{EtcdResponse: response}, modelResponse), newState return Match(MaybeEtcdResponse{EtcdResponse: response}, modelResponse), newState
} }
func freshEtcdState() etcdState { func freshEtcdState() EtcdState {
return etcdState{ return EtcdState{
Revision: 1, Revision: 1,
KeyValues: map[string]ValueRevision{}, KeyValues: map[string]ValueRevision{},
KeyLeases: map[string]int64{}, KeyLeases: map[string]int64{},
@ -85,8 +85,8 @@ func freshEtcdState() etcdState {
} }
} }
// step handles a successful request, returning updated state and response it would generate. // Step handles a successful request, returning updated state and response it would generate.
func (s etcdState) step(request EtcdRequest) (etcdState, MaybeEtcdResponse) { func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
newKVs := map[string]ValueRevision{} newKVs := map[string]ValueRevision{}
for k, v := range s.KeyValues { for k, v := range s.KeyValues {
newKVs[k] = v newKVs[k] = v
@ -185,7 +185,7 @@ func (s etcdState) step(request EtcdRequest) (etcdState, MaybeEtcdResponse) {
} }
} }
func (s etcdState) getRange(key string, options RangeOptions) RangeResponse { func (s EtcdState) getRange(key string, options RangeOptions) RangeResponse {
response := RangeResponse{ response := RangeResponse{
KVs: []KeyValue{}, KVs: []KeyValue{},
} }
@ -217,7 +217,7 @@ func (s etcdState) getRange(key string, options RangeOptions) RangeResponse {
return response return response
} }
func detachFromOldLease(s etcdState, key string) etcdState { func detachFromOldLease(s EtcdState, key string) EtcdState {
if oldLeaseId, ok := s.KeyLeases[key]; ok { if oldLeaseId, ok := s.KeyLeases[key]; ok {
delete(s.Leases[oldLeaseId].Keys, key) delete(s.Leases[oldLeaseId].Keys, key)
delete(s.KeyLeases, key) delete(s.KeyLeases, key)
@ -225,7 +225,7 @@ func detachFromOldLease(s etcdState, key string) etcdState {
return s return s
} }
func attachToNewLease(s etcdState, leaseID int64, key string) etcdState { func attachToNewLease(s EtcdState, leaseID int64, key string) EtcdState {
s.KeyLeases[key] = leaseID s.KeyLeases[key] = leaseID
s.Leases[leaseID].Keys[key] = leased s.Leases[leaseID].Keys[key] = leased
return s return s

View File

@ -33,12 +33,12 @@ func TestModelDeterministic(t *testing.T) {
if op.expectFailure == ok { if op.expectFailure == ok {
t.Logf("state: %v", state) t.Logf("state: %v", state)
t.Errorf("Unexpected operation result, expect: %v, got: %v, operation: %s", !op.expectFailure, ok, DeterministicModel.DescribeOperation(op.req, op.resp.EtcdResponse)) t.Errorf("Unexpected operation result, expect: %v, got: %v, operation: %s", !op.expectFailure, ok, DeterministicModel.DescribeOperation(op.req, op.resp.EtcdResponse))
var loadedState etcdState var loadedState EtcdState
err := json.Unmarshal([]byte(state.(string)), &loadedState) err := json.Unmarshal([]byte(state.(string)), &loadedState)
if err != nil { if err != nil {
t.Fatalf("Failed to load state: %v", err) t.Fatalf("Failed to load state: %v", err)
} }
_, resp := loadedState.step(op.req) _, resp := loadedState.Step(op.req)
t.Errorf("Response diff: %s", cmp.Diff(op.resp, resp)) t.Errorf("Response diff: %s", cmp.Diff(op.resp, resp))
break break
} }

View File

@ -40,7 +40,7 @@ var NonDeterministicModel = porcupine.Model{
if err != nil { if err != nil {
panic(err) panic(err)
} }
ok, states := states.Step(in.(EtcdRequest), out.(MaybeEtcdResponse)) ok, states := states.apply(in.(EtcdRequest), out.(MaybeEtcdResponse))
data, err := json.Marshal(states) data, err := json.Marshal(states)
if err != nil { if err != nil {
panic(err) panic(err)
@ -52,27 +52,27 @@ var NonDeterministicModel = porcupine.Model{
}, },
} }
type nonDeterministicState []etcdState type nonDeterministicState []EtcdState
func (states nonDeterministicState) Step(request EtcdRequest, response MaybeEtcdResponse) (bool, nonDeterministicState) { func (states nonDeterministicState) apply(request EtcdRequest, response MaybeEtcdResponse) (bool, nonDeterministicState) {
var newStates nonDeterministicState var newStates nonDeterministicState
switch { switch {
case response.Err != nil: case response.Err != nil:
newStates = states.stepFailedRequest(request) newStates = states.stepFailedResponse(request)
case response.PartialResponse: case response.PartialResponse:
newStates = states.stepPartialRequest(request, response.EtcdResponse.Revision) newStates = states.applyResponseRevision(request, response.EtcdResponse.Revision)
default: default:
newStates = states.stepSuccessfulRequest(request, response.EtcdResponse) newStates = states.applySuccessfulResponse(request, response.EtcdResponse)
} }
return len(newStates) > 0, newStates return len(newStates) > 0, newStates
} }
// stepFailedRequest duplicates number of states by considering request persisted and lost. // stepFailedResponse duplicates number of states by considering both cases, request was persisted and request was lost.
func (states nonDeterministicState) stepFailedRequest(request EtcdRequest) nonDeterministicState { func (states nonDeterministicState) stepFailedResponse(request EtcdRequest) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states)*2) newStates := make(nonDeterministicState, 0, len(states)*2)
for _, s := range states { for _, s := range states {
newStates = append(newStates, s) newStates = append(newStates, s)
newState, _ := s.step(request) newState, _ := s.Step(request)
if !reflect.DeepEqual(newState, s) { if !reflect.DeepEqual(newState, s) {
newStates = append(newStates, newState) newStates = append(newStates, newState)
} }
@ -80,11 +80,11 @@ func (states nonDeterministicState) stepFailedRequest(request EtcdRequest) nonDe
return newStates return newStates
} }
// stepPartialRequest filters possible states by leaving ony states that would return proper revision. // applyResponseRevision filters possible states by leaving ony states that would return proper revision.
func (states nonDeterministicState) stepPartialRequest(request EtcdRequest, responseRevision int64) nonDeterministicState { func (states nonDeterministicState) applyResponseRevision(request EtcdRequest, responseRevision int64) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states)) newStates := make(nonDeterministicState, 0, len(states))
for _, s := range states { for _, s := range states {
newState, modelResponse := s.step(request) newState, modelResponse := s.Step(request)
if modelResponse.Revision == responseRevision { if modelResponse.Revision == responseRevision {
newStates = append(newStates, newState) newStates = append(newStates, newState)
} }
@ -92,11 +92,11 @@ func (states nonDeterministicState) stepPartialRequest(request EtcdRequest, resp
return newStates return newStates
} }
// stepSuccessfulRequest filters possible states by leaving ony states that would respond correctly. // applySuccessfulResponse filters possible states by leaving ony states that would respond correctly.
func (states nonDeterministicState) stepSuccessfulRequest(request EtcdRequest, response EtcdResponse) nonDeterministicState { func (states nonDeterministicState) applySuccessfulResponse(request EtcdRequest, response EtcdResponse) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states)) newStates := make(nonDeterministicState, 0, len(states))
for _, s := range states { for _, s := range states {
newState, modelResponse := s.step(request) newState, modelResponse := s.Step(request)
if Match(modelResponse, MaybeEtcdResponse{EtcdResponse: response}) { if Match(modelResponse, MaybeEtcdResponse{EtcdResponse: response}) {
newStates = append(newStates, newState) newStates = append(newStates, newState)
} }

View File

@ -339,7 +339,7 @@ func TestModelNonDeterministic(t *testing.T) {
t.Fatalf("Failed to load state: %v", err) t.Fatalf("Failed to load state: %v", err)
} }
for i, s := range loadedState { for i, s := range loadedState {
_, resp := s.step(op.req) _, resp := s.Step(op.req)
t.Errorf("For state %d, response diff: %s", i, cmp.Diff(op.resp, resp)) t.Errorf("For state %d, response diff: %s", i, cmp.Diff(op.resp, resp))
} }
break break

View File

@ -0,0 +1,99 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
"fmt"
)
func NewReplay(eventHistory []WatchEvent) *EtcdReplay {
var lastEventRevision int64 = 1
for _, event := range eventHistory {
if event.Revision > lastEventRevision && event.Revision != lastEventRevision+1 {
panic("Replay requires a complete event history")
}
lastEventRevision = event.Revision
}
return &EtcdReplay{
eventHistory: eventHistory,
}
}
type EtcdReplay struct {
eventHistory []WatchEvent
// Cached state and event index used for it's calculation
cachedState *EtcdState
eventHistoryIndex int
}
func (r *EtcdReplay) StateForRevision(revision int64) (EtcdState, error) {
if revision < 1 {
return EtcdState{}, fmt.Errorf("invalid revision: %d", revision)
}
if r.cachedState == nil || r.cachedState.Revision > revision {
r.reset()
}
for r.eventHistoryIndex < len(r.eventHistory) && r.cachedState.Revision < revision {
nextRequest, nextRevision, nextIndex := r.next()
newState, _ := r.cachedState.Step(nextRequest)
if newState.Revision != nextRevision {
return EtcdState{}, fmt.Errorf("model returned different revision than one present in event history, model: %d, event: %d", newState.Revision, nextRevision)
}
r.cachedState = &newState
r.eventHistoryIndex = nextIndex
}
if r.eventHistoryIndex > len(r.eventHistory) && r.cachedState.Revision < revision {
return EtcdState{}, fmt.Errorf("requested revision higher then available in even history, requested: %d, model: %d", revision, r.cachedState.Revision)
}
return *r.cachedState, nil
}
func (r *EtcdReplay) reset() {
state := freshEtcdState()
r.cachedState = &state
r.eventHistoryIndex = 0
}
func (r *EtcdReplay) next() (request EtcdRequest, revision int64, index int) {
revision = r.eventHistory[r.eventHistoryIndex].Revision
index = r.eventHistoryIndex
operations := []EtcdOperation{}
for r.eventHistory[index].Revision == revision {
operations = append(operations, r.eventHistory[index].Op)
index++
}
return EtcdRequest{
Type: Txn,
Txn: &TxnRequest{
OperationsOnSuccess: operations,
},
}, revision, index
}
func operationToRequest(op EtcdOperation) EtcdRequest {
return EtcdRequest{
Type: Txn,
Txn: &TxnRequest{
OperationsOnSuccess: []EtcdOperation{op},
},
}
}
type WatchEvent struct {
Op EtcdOperation
Revision int64
}

View File

@ -34,7 +34,7 @@ type report struct {
lg *zap.Logger lg *zap.Logger
clus *e2e.EtcdProcessCluster clus *e2e.EtcdProcessCluster
clientReports []traffic.ClientReport clientReports []traffic.ClientReport
visualizeHistory func(path string) visualizeHistory func(path string) error
} }
func testResultsDirectory(t *testing.T) string { func testResultsDirectory(t *testing.T) string {
@ -89,7 +89,10 @@ func (r *report) Report(t *testing.T, force bool) {
} }
} }
if r.visualizeHistory != nil { if r.visualizeHistory != nil {
r.visualizeHistory(filepath.Join(path, "history.html")) err := r.visualizeHistory(filepath.Join(path, "history.html"))
if err != nil {
t.Error(err)
}
} }
} }

View File

@ -46,22 +46,17 @@ type RecordingClient struct {
} }
type WatchResponse struct { type WatchResponse struct {
Events []WatchEvent Events []model.WatchEvent
IsProgressNotify bool IsProgressNotify bool
Revision int64 Revision int64
Time time.Duration Time time.Duration
} }
type TimedWatchEvent struct { type TimedWatchEvent struct {
WatchEvent model.WatchEvent
Time time.Duration Time time.Duration
} }
type WatchEvent struct {
Op model.EtcdOperation
Revision int64
}
func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*RecordingClient, error) { func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*RecordingClient, error) {
cc, err := clientv3.New(clientv3.Config{ cc, err := clientv3.New(clientv3.Config{
Endpoints: endpoints, Endpoints: endpoints,
@ -259,7 +254,7 @@ func ToWatchResponse(r clientv3.WatchResponse, baseTime time.Time) WatchResponse
return resp return resp
} }
func toWatchEvent(event clientv3.Event) WatchEvent { func toWatchEvent(event clientv3.Event) model.WatchEvent {
var op model.OperationType var op model.OperationType
switch event.Type { switch event.Type {
case mvccpb.PUT: case mvccpb.PUT:
@ -269,7 +264,7 @@ func toWatchEvent(event clientv3.Event) WatchEvent {
default: default:
panic(fmt.Sprintf("Unexpected event type: %s", event.Type)) panic(fmt.Sprintf("Unexpected event type: %s", event.Type))
} }
return WatchEvent{ return model.WatchEvent{
Revision: event.Kv.ModRevision, Revision: event.Kv.ModRevision,
Op: model.EtcdOperation{ Op: model.EtcdOperation{
Type: op, Type: op,

View File

@ -100,6 +100,13 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.
lastOperationSucceeded := true lastOperationSucceeded := true
var lastRev int64 var lastRev int64
var requestType etcdRequestType var requestType etcdRequestType
client := etcdTrafficClient{
etcdTraffic: t,
client: c,
limiter: limiter,
idProvider: ids,
leaseStorage: lm,
}
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -115,7 +122,7 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.
} else { } else {
requestType = Get requestType = Get
} }
rev, err := t.Request(ctx, c, requestType, limiter, key, ids, lm, lastRev) rev, err := client.Request(ctx, requestType, key, lastRev)
lastOperationSucceeded = err == nil lastOperationSucceeded = err == nil
if err != nil { if err != nil {
continue continue
@ -127,87 +134,95 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.
} }
} }
func (t etcdTraffic) Request(ctx context.Context, c *RecordingClient, request etcdRequestType, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage, lastRev int64) (rev int64, err error) { type etcdTrafficClient struct {
etcdTraffic
client *RecordingClient
limiter *rate.Limiter
idProvider identity.Provider
leaseStorage identity.LeaseIdStorage
}
func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType, key string, lastRev int64) (rev int64, err error) {
opCtx, cancel := context.WithTimeout(ctx, RequestTimeout) opCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
switch request { switch request {
case StaleGet: case StaleGet:
_, rev, err = c.Get(opCtx, key, lastRev) _, rev, err = c.client.Get(opCtx, key, lastRev)
case Get: case Get:
_, rev, err = c.Get(opCtx, key, 0) _, rev, err = c.client.Get(opCtx, key, 0)
case Put: case Put:
var resp *clientv3.PutResponse var resp *clientv3.PutResponse
resp, err = c.Put(opCtx, key, fmt.Sprintf("%d", id.NewRequestId())) resp, err = c.client.Put(opCtx, key, fmt.Sprintf("%d", c.idProvider.NewRequestId()))
if resp != nil { if resp != nil {
rev = resp.Header.Revision rev = resp.Header.Revision
} }
case LargePut: case LargePut:
var resp *clientv3.PutResponse var resp *clientv3.PutResponse
resp, err = c.Put(opCtx, key, randString(t.largePutSize)) resp, err = c.client.Put(opCtx, key, randString(c.largePutSize))
if resp != nil { if resp != nil {
rev = resp.Header.Revision rev = resp.Header.Revision
} }
case Delete: case Delete:
var resp *clientv3.DeleteResponse var resp *clientv3.DeleteResponse
resp, err = c.Delete(opCtx, key) resp, err = c.client.Delete(opCtx, key)
if resp != nil { if resp != nil {
rev = resp.Header.Revision rev = resp.Header.Revision
} }
case MultiOpTxn: case MultiOpTxn:
var resp *clientv3.TxnResponse var resp *clientv3.TxnResponse
resp, err = c.Txn(opCtx, nil, t.pickMultiTxnOps(id), nil) resp, err = c.client.Txn(opCtx, nil, c.pickMultiTxnOps(), nil)
if resp != nil { if resp != nil {
rev = resp.Header.Revision rev = resp.Header.Revision
} }
case CompareAndSet: case CompareAndSet:
var kv *mvccpb.KeyValue var kv *mvccpb.KeyValue
kv, rev, err = c.Get(opCtx, key, 0) kv, rev, err = c.client.Get(opCtx, key, 0)
if err == nil { if err == nil {
limiter.Wait(ctx) c.limiter.Wait(ctx)
var expectedRevision int64 var expectedRevision int64
if kv != nil { if kv != nil {
expectedRevision = kv.ModRevision expectedRevision = kv.ModRevision
} }
txnCtx, txnCancel := context.WithTimeout(ctx, RequestTimeout) txnCtx, txnCancel := context.WithTimeout(ctx, RequestTimeout)
var resp *clientv3.TxnResponse var resp *clientv3.TxnResponse
resp, err = c.Txn(txnCtx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, fmt.Sprintf("%d", id.NewRequestId()))}, nil) resp, err = c.client.Txn(txnCtx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, fmt.Sprintf("%d", c.idProvider.NewRequestId()))}, nil)
txnCancel() txnCancel()
if resp != nil { if resp != nil {
rev = resp.Header.Revision rev = resp.Header.Revision
} }
} }
case PutWithLease: case PutWithLease:
leaseId := lm.LeaseId(c.id) leaseId := c.leaseStorage.LeaseId(c.client.id)
if leaseId == 0 { if leaseId == 0 {
var resp *clientv3.LeaseGrantResponse var resp *clientv3.LeaseGrantResponse
resp, err = c.LeaseGrant(opCtx, t.leaseTTL) resp, err = c.client.LeaseGrant(opCtx, c.leaseTTL)
if resp != nil { if resp != nil {
leaseId = int64(resp.ID) leaseId = int64(resp.ID)
rev = resp.ResponseHeader.Revision rev = resp.ResponseHeader.Revision
} }
if err == nil { if err == nil {
lm.AddLeaseId(c.id, leaseId) c.leaseStorage.AddLeaseId(c.client.id, leaseId)
limiter.Wait(ctx) c.limiter.Wait(ctx)
} }
} }
if leaseId != 0 { if leaseId != 0 {
putCtx, putCancel := context.WithTimeout(ctx, RequestTimeout) putCtx, putCancel := context.WithTimeout(ctx, RequestTimeout)
var resp *clientv3.PutResponse var resp *clientv3.PutResponse
resp, err = c.PutWithLease(putCtx, key, fmt.Sprintf("%d", id.NewRequestId()), leaseId) resp, err = c.client.PutWithLease(putCtx, key, fmt.Sprintf("%d", c.idProvider.NewRequestId()), leaseId)
putCancel() putCancel()
if resp != nil { if resp != nil {
rev = resp.Header.Revision rev = resp.Header.Revision
} }
} }
case LeaseRevoke: case LeaseRevoke:
leaseId := lm.LeaseId(c.id) leaseId := c.leaseStorage.LeaseId(c.client.id)
if leaseId != 0 { if leaseId != 0 {
var resp *clientv3.LeaseRevokeResponse var resp *clientv3.LeaseRevokeResponse
resp, err = c.LeaseRevoke(opCtx, leaseId) resp, err = c.client.LeaseRevoke(opCtx, leaseId)
//if LeaseRevoke has failed, do not remove the mapping. //if LeaseRevoke has failed, do not remove the mapping.
if err == nil { if err == nil {
lm.RemoveLeaseId(c.id) c.leaseStorage.RemoveLeaseId(c.client.id)
} }
if resp != nil { if resp != nil {
rev = resp.Header.Revision rev = resp.Header.Revision
@ -215,7 +230,7 @@ func (t etcdTraffic) Request(ctx context.Context, c *RecordingClient, request et
} }
case Defragment: case Defragment:
var resp *clientv3.DefragmentResponse var resp *clientv3.DefragmentResponse
resp, err = c.Defragment(opCtx) resp, err = c.client.Defragment(opCtx)
if resp != nil { if resp != nil {
rev = resp.Header.Revision rev = resp.Header.Revision
} }
@ -226,13 +241,13 @@ func (t etcdTraffic) Request(ctx context.Context, c *RecordingClient, request et
return rev, err return rev, err
} }
func (t etcdTraffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op) { func (c etcdTrafficClient) pickMultiTxnOps() (ops []clientv3.Op) {
keys := rand.Perm(t.keyCount) keys := rand.Perm(c.keyCount)
opTypes := make([]model.OperationType, 4) opTypes := make([]model.OperationType, 4)
atLeastOnePut := false atLeastOnePut := false
for i := 0; i < MultiOpTxnOpCount; i++ { for i := 0; i < MultiOpTxnOpCount; i++ {
opTypes[i] = t.pickOperationType() opTypes[i] = c.pickOperationType()
if opTypes[i] == model.PutOperation { if opTypes[i] == model.PutOperation {
atLeastOnePut = true atLeastOnePut = true
} }
@ -248,7 +263,7 @@ func (t etcdTraffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op)
case model.RangeOperation: case model.RangeOperation:
ops = append(ops, clientv3.OpGet(key)) ops = append(ops, clientv3.OpGet(key))
case model.PutOperation: case model.PutOperation:
value := fmt.Sprintf("%d", ids.NewRequestId()) value := fmt.Sprintf("%d", c.idProvider.NewRequestId())
ops = append(ops, clientv3.OpPut(key, value)) ops = append(ops, clientv3.OpPut(key, value))
case model.DeleteOperation: case model.DeleteOperation:
ops = append(ops, clientv3.OpDelete(key)) ops = append(ops, clientv3.OpDelete(key))

View File

@ -37,13 +37,13 @@ var (
maximalQPS: 1000, maximalQPS: 1000,
clientCount: 12, clientCount: 12,
Traffic: kubernetesTraffic{ Traffic: kubernetesTraffic{
averageKeyCount: 5, averageKeyCount: 10,
resource: "pods", resource: "pods",
namespace: "default", namespace: "default",
writeChoices: []choiceWeight[KubernetesRequestType]{ writeChoices: []choiceWeight[KubernetesRequestType]{
{choice: KubernetesUpdate, weight: 75}, {choice: KubernetesUpdate, weight: 90},
{choice: KubernetesDelete, weight: 15}, {choice: KubernetesDelete, weight: 5},
{choice: KubernetesCreate, weight: 10}, {choice: KubernetesCreate, weight: 5},
}, },
}, },
} }

View File

@ -15,28 +15,92 @@
package validate package validate
import ( import (
"fmt"
"reflect"
"sort"
"testing" "testing"
"time" "time"
"github.com/anishathalye/porcupine" "github.com/anishathalye/porcupine"
"github.com/google/go-cmp/cmp"
"go.uber.org/zap" "go.uber.org/zap"
"go.etcd.io/etcd/tests/v3/robustness/model" "go.etcd.io/etcd/tests/v3/robustness/model"
) )
func validateOperationHistoryAndReturnVisualize(t *testing.T, lg *zap.Logger, operations []porcupine.Operation) (visualize func(basepath string)) { func validateOperationsAndVisualize(t *testing.T, lg *zap.Logger, operations []porcupine.Operation, eventHistory []model.WatchEvent) (visualize func(basepath string) error) {
linearizable, info := porcupine.CheckOperationsVerbose(model.NonDeterministicModel, operations, 5*time.Minute) const timeout = 5 * time.Minute
if linearizable == porcupine.Illegal { lg.Info("Validating linearizable operations", zap.Duration("timeout", timeout))
result, visualize := validateLinearizableOperationAndVisualize(lg, operations, timeout)
switch result {
case porcupine.Illegal:
t.Error("Linearization failed for provided operations")
return
case porcupine.Unknown:
t.Error("Model is not linearizable") t.Error("Model is not linearizable")
return
case porcupine.Ok:
t.Log("Linearization passed")
default:
t.Fatalf("Unknown Linearization")
} }
if linearizable == porcupine.Unknown { lg.Info("Validating serializable operations")
t.Error("Linearization timed out") // TODO: Use linearization result instead of event history to get order of events
} // This is currently impossible as porcupine doesn't expose operation order created during linearization.
return func(path string) { validateSerializableOperations(t, operations, eventHistory)
return visualize
}
func validateLinearizableOperationAndVisualize(lg *zap.Logger, operations []porcupine.Operation, timeout time.Duration) (result porcupine.CheckResult, visualize func(basepath string) error) {
linearizable, info := porcupine.CheckOperationsVerbose(model.NonDeterministicModel, operations, timeout)
return linearizable, func(path string) error {
lg.Info("Saving visualization", zap.String("path", path)) lg.Info("Saving visualization", zap.String("path", path))
err := porcupine.VisualizePath(model.NonDeterministicModel, info, path) err := porcupine.VisualizePath(model.NonDeterministicModel, info, path)
if err != nil { if err != nil {
t.Errorf("Failed to visualize, err: %v", err) return fmt.Errorf("failed to visualize, err: %v", err)
} }
return nil
}
}
func validateSerializableOperations(t *testing.T, operations []porcupine.Operation, totalEventHistory []model.WatchEvent) {
staleReads := filterSerializableReads(operations)
if len(staleReads) == 0 {
return
}
sort.Slice(staleReads, func(i, j int) bool {
return staleReads[i].Input.(model.EtcdRequest).Range.Revision < staleReads[j].Input.(model.EtcdRequest).Range.Revision
})
replay := model.NewReplay(totalEventHistory)
for _, read := range staleReads {
request := read.Input.(model.EtcdRequest)
response := read.Output.(model.MaybeEtcdResponse)
validateSerializableOperation(t, replay, request, response)
}
}
func filterSerializableReads(operations []porcupine.Operation) []porcupine.Operation {
resp := []porcupine.Operation{}
for _, op := range operations {
request := op.Input.(model.EtcdRequest)
if request.Type == model.Range && request.Range.Revision != 0 {
resp = append(resp, op)
}
}
return resp
}
func validateSerializableOperation(t *testing.T, replay *model.EtcdReplay, request model.EtcdRequest, response model.MaybeEtcdResponse) {
if response.PartialResponse || response.Err != nil {
return
}
state, err := replay.StateForRevision(request.Range.Revision)
if err != nil {
t.Fatal(err)
}
_, expectResp := state.Step(request)
if !reflect.DeepEqual(response.EtcdResponse.Range, expectResp.Range) {
t.Errorf("Invalid serializable response, diff: %s", cmp.Diff(response.EtcdResponse.Range, expectResp.Range))
} }
} }

View File

@ -24,14 +24,13 @@ import (
"go.etcd.io/etcd/tests/v3/robustness/traffic" "go.etcd.io/etcd/tests/v3/robustness/traffic"
) )
// ValidateAndReturnVisualize return visualize as porcupine.linearizationInfo used to generate visualization is private. // ValidateAndReturnVisualize returns visualize as porcupine.linearizationInfo used to generate visualization is private.
func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []traffic.ClientReport) (visualize func(basepath string)) { func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []traffic.ClientReport) (visualize func(basepath string) error) {
validateWatch(t, cfg, reports) eventHistory := validateWatch(t, cfg, reports)
// TODO: Validate stale reads responses.
allOperations := operations(reports) allOperations := operations(reports)
watchEvents := uniqueWatchEvents(reports) watchEvents := uniqueWatchEvents(reports)
newOperations := patchOperationsWithWatchEvents(allOperations, watchEvents) patchedOperations := patchOperationsWithWatchEvents(allOperations, watchEvents)
return validateOperationHistoryAndReturnVisualize(t, lg, newOperations) return validateOperationsAndVisualize(t, lg, patchedOperations, eventHistory)
} }
func operations(reports []traffic.ClientReport) []porcupine.Operation { func operations(reports []traffic.ClientReport) []porcupine.Operation {

View File

@ -19,10 +19,11 @@ import (
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"go.etcd.io/etcd/tests/v3/robustness/model"
"go.etcd.io/etcd/tests/v3/robustness/traffic" "go.etcd.io/etcd/tests/v3/robustness/traffic"
) )
func validateWatch(t *testing.T, cfg Config, reports []traffic.ClientReport) { func validateWatch(t *testing.T, cfg Config, reports []traffic.ClientReport) []model.WatchEvent {
// Validate etcd watch properties defined in https://etcd.io/docs/v3.6/learning/api_guarantees/#watch-apis // Validate etcd watch properties defined in https://etcd.io/docs/v3.6/learning/api_guarantees/#watch-apis
for _, r := range reports { for _, r := range reports {
validateOrdered(t, r) validateOrdered(t, r)
@ -34,8 +35,10 @@ func validateWatch(t *testing.T, cfg Config, reports []traffic.ClientReport) {
validateEventsMatch(t, reports) validateEventsMatch(t, reports)
// Expects that longest history encompasses all events. // Expects that longest history encompasses all events.
// TODO: Use combined events from all histories instead of the longest history. // TODO: Use combined events from all histories instead of the longest history.
eventHistory := longestEventHistory(reports)
// TODO: Validate that each watch report is reliable, not only the longest one. // TODO: Validate that each watch report is reliable, not only the longest one.
validateReliable(t, longestEventHistory(reports)) validateReliable(t, eventHistory)
return watchEvents(eventHistory)
} }
func validateBookmarkable(t *testing.T, report traffic.ClientReport) { func validateBookmarkable(t *testing.T, report traffic.ClientReport) {
@ -127,7 +130,7 @@ func validateEventsMatch(t *testing.T, reports []traffic.ClientReport) {
key string key string
} }
type eventClientId struct { type eventClientId struct {
traffic.WatchEvent model.WatchEvent
ClientId int ClientId int
} }
revisionKeyToEvent := map[revisionKey]eventClientId{} revisionKeyToEvent := map[revisionKey]eventClientId{}
@ -158,3 +161,11 @@ func longestEventHistory(report []traffic.ClientReport) []traffic.TimedWatchEven
} }
return toWatchEvents(report[longestIndex].Watch) return toWatchEvents(report[longestIndex].Watch)
} }
func watchEvents(timed []traffic.TimedWatchEvent) []model.WatchEvent {
result := make([]model.WatchEvent, 0, len(timed))
for _, event := range timed {
result = append(result, event.WatchEvent)
}
return result
}