Compare commits
12 Commits
dependabot
...
master
Author | SHA1 | Date |
---|---|---|
Benjamin Wang | 66553d4f07 | |
Thomas Jungblut | e3f2638aea | |
Benjamin Wang | de6415801e | |
Marek Siarkowicz | fd3e338d88 | |
Marek Siarkowicz | 486462a907 | |
Marek Siarkowicz | 519617cfd0 | |
Marek Siarkowicz | 1217548acf | |
dependabot[bot] | 1d472bb6e4 | |
Marek Siarkowicz | 9c659eb4e0 | |
Marek Siarkowicz | 1420292b10 | |
Marek Siarkowicz | 1663600bec | |
Marek Siarkowicz | 09b9f889e7 |
|
@ -40,7 +40,7 @@ jobs:
|
||||||
uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # v3.5.3
|
uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # v3.5.3
|
||||||
# Initializes the CodeQL tools for scanning.
|
# Initializes the CodeQL tools for scanning.
|
||||||
- name: Initialize CodeQL
|
- name: Initialize CodeQL
|
||||||
uses: github/codeql-action/init@83f0fe6c4988d98a455712a27f0255212bba9bd4 # v2.3.6
|
uses: github/codeql-action/init@6c089f53dd51dc3fc7e599c3cb5356453a52ca9e # v2.20.0
|
||||||
with:
|
with:
|
||||||
# If you wish to specify custom queries, you can do so here or in a config file.
|
# If you wish to specify custom queries, you can do so here or in a config file.
|
||||||
# By default, queries listed here will override any specified in a config file.
|
# By default, queries listed here will override any specified in a config file.
|
||||||
|
@ -50,6 +50,6 @@ jobs:
|
||||||
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
|
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
|
||||||
# If this step fails, then you should remove it and run the build manually (see below)
|
# If this step fails, then you should remove it and run the build manually (see below)
|
||||||
- name: Autobuild
|
- name: Autobuild
|
||||||
uses: github/codeql-action/autobuild@83f0fe6c4988d98a455712a27f0255212bba9bd4 # v2.3.6
|
uses: github/codeql-action/autobuild@6c089f53dd51dc3fc7e599c3cb5356453a52ca9e # v2.20.0
|
||||||
- name: Perform CodeQL Analysis
|
- name: Perform CodeQL Analysis
|
||||||
uses: github/codeql-action/analyze@83f0fe6c4988d98a455712a27f0255212bba9bd4 # v2.3.6
|
uses: github/codeql-action/analyze@6c089f53dd51dc3fc7e599c3cb5356453a52ca9e # v2.20.0
|
||||||
|
|
|
@ -50,6 +50,6 @@ jobs:
|
||||||
|
|
||||||
# Upload the results to GitHub's code scanning dashboard.
|
# Upload the results to GitHub's code scanning dashboard.
|
||||||
- name: "Upload to code-scanning"
|
- name: "Upload to code-scanning"
|
||||||
uses: github/codeql-action/upload-sarif@83f0fe6c4988d98a455712a27f0255212bba9bd4 # tag=v1.0.26
|
uses: github/codeql-action/upload-sarif@6c089f53dd51dc3fc7e599c3cb5356453a52ca9e # tag=v1.0.26
|
||||||
with:
|
with:
|
||||||
sarif_file: results.sarif
|
sarif_file: results.sarif
|
||||||
|
|
|
@ -8,6 +8,7 @@ Previous change logs can be found at [CHANGELOG-3.3](https://github.com/etcd-io/
|
||||||
|
|
||||||
### etcd server
|
### etcd server
|
||||||
- Fix [corruption check may get a `ErrCompacted` error when server has just been compacted](https://github.com/etcd-io/etcd/pull/16047)
|
- Fix [corruption check may get a `ErrCompacted` error when server has just been compacted](https://github.com/etcd-io/etcd/pull/16047)
|
||||||
|
- Improve [Lease put performance for the case that auth is disabled or the user is admin](https://github.com/etcd-io/etcd/pull/16020)
|
||||||
|
|
||||||
## v3.4.26 (2023-05-12)
|
## v3.4.26 (2023-05-12)
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,7 @@ Previous change logs can be found at [CHANGELOG-3.4](https://github.com/etcd-io/
|
||||||
|
|
||||||
### etcd server
|
### etcd server
|
||||||
- Fix [corruption check may get a `ErrCompacted` error when server has just been compacted](https://github.com/etcd-io/etcd/pull/16048)
|
- Fix [corruption check may get a `ErrCompacted` error when server has just been compacted](https://github.com/etcd-io/etcd/pull/16048)
|
||||||
|
- Improve [Lease put performance for the case that auth is disabled or the user is admin](https://github.com/etcd-io/etcd/pull/16019)
|
||||||
|
|
||||||
### etcd grpc-proxy
|
### etcd grpc-proxy
|
||||||
- Fix [Memberlist results not updated when proxy node down](https://github.com/etcd-io/etcd/pull/15907).
|
- Fix [Memberlist results not updated when proxy node down](https://github.com/etcd-io/etcd/pull/15907).
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -72,7 +72,7 @@ require (
|
||||||
github.com/prometheus/client_golang v1.15.1 // indirect
|
github.com/prometheus/client_golang v1.15.1 // indirect
|
||||||
github.com/prometheus/client_model v0.4.0 // indirect
|
github.com/prometheus/client_model v0.4.0 // indirect
|
||||||
github.com/prometheus/common v0.43.0 // indirect
|
github.com/prometheus/common v0.43.0 // indirect
|
||||||
github.com/prometheus/procfs v0.11.0 // indirect
|
github.com/prometheus/procfs v0.9.0 // indirect
|
||||||
github.com/rivo/uniseg v0.2.0 // indirect
|
github.com/rivo/uniseg v0.2.0 // indirect
|
||||||
github.com/sirupsen/logrus v1.8.1 // indirect
|
github.com/sirupsen/logrus v1.8.1 // indirect
|
||||||
github.com/soheilhy/cmux v0.1.5 // indirect
|
github.com/soheilhy/cmux v0.1.5 // indirect
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -236,8 +236,8 @@ github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUo
|
||||||
github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU=
|
github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU=
|
||||||
github.com/prometheus/common v0.43.0 h1:iq+BVjvYLei5f27wiuNiB1DN6DYQkp1c8Bx0Vykh5us=
|
github.com/prometheus/common v0.43.0 h1:iq+BVjvYLei5f27wiuNiB1DN6DYQkp1c8Bx0Vykh5us=
|
||||||
github.com/prometheus/common v0.43.0/go.mod h1:NCvr5cQIh3Y/gy73/RdVtC9r8xxrxwJnB+2lB3BxrFc=
|
github.com/prometheus/common v0.43.0/go.mod h1:NCvr5cQIh3Y/gy73/RdVtC9r8xxrxwJnB+2lB3BxrFc=
|
||||||
github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuRbyk=
|
github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI=
|
||||||
github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
|
github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY=
|
||||||
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
|
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
|
||||||
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
|
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
|
||||||
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
|
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
|
||||||
|
|
|
@ -39,7 +39,7 @@ func TestRobustness(t *testing.T) {
|
||||||
scenarios := []testScenario{}
|
scenarios := []testScenario{}
|
||||||
for _, traffic := range []traffic.Config{traffic.LowTraffic, traffic.HighTraffic, traffic.KubernetesTraffic} {
|
for _, traffic := range []traffic.Config{traffic.LowTraffic, traffic.HighTraffic, traffic.KubernetesTraffic} {
|
||||||
scenarios = append(scenarios, testScenario{
|
scenarios = append(scenarios, testScenario{
|
||||||
name: traffic.Name + "ClusterOfSize1",
|
name: traffic.Name + "/ClusterOfSize1",
|
||||||
traffic: traffic,
|
traffic: traffic,
|
||||||
cluster: *e2e.NewConfig(
|
cluster: *e2e.NewConfig(
|
||||||
e2e.WithClusterSize(1),
|
e2e.WithClusterSize(1),
|
||||||
|
@ -61,7 +61,7 @@ func TestRobustness(t *testing.T) {
|
||||||
clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithSnapshotCatchUpEntries(100))
|
clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithSnapshotCatchUpEntries(100))
|
||||||
}
|
}
|
||||||
scenarios = append(scenarios, testScenario{
|
scenarios = append(scenarios, testScenario{
|
||||||
name: traffic.Name + "ClusterOfSize3",
|
name: traffic.Name + "/ClusterOfSize3",
|
||||||
traffic: traffic,
|
traffic: traffic,
|
||||||
cluster: *e2e.NewConfig(clusterOfSize3Options...),
|
cluster: *e2e.NewConfig(clusterOfSize3Options...),
|
||||||
})
|
})
|
||||||
|
|
|
@ -47,12 +47,12 @@ var DeterministicModel = porcupine.Model{
|
||||||
return string(data)
|
return string(data)
|
||||||
},
|
},
|
||||||
Step: func(st interface{}, in interface{}, out interface{}) (bool, interface{}) {
|
Step: func(st interface{}, in interface{}, out interface{}) (bool, interface{}) {
|
||||||
var s etcdState
|
var s EtcdState
|
||||||
err := json.Unmarshal([]byte(st.(string)), &s)
|
err := json.Unmarshal([]byte(st.(string)), &s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
ok, s := s.Step(in.(EtcdRequest), out.(EtcdResponse))
|
ok, s := s.apply(in.(EtcdRequest), out.(EtcdResponse))
|
||||||
data, err := json.Marshal(s)
|
data, err := json.Marshal(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -64,20 +64,20 @@ var DeterministicModel = porcupine.Model{
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
type etcdState struct {
|
type EtcdState struct {
|
||||||
Revision int64
|
Revision int64
|
||||||
KeyValues map[string]ValueRevision
|
KeyValues map[string]ValueRevision
|
||||||
KeyLeases map[string]int64
|
KeyLeases map[string]int64
|
||||||
Leases map[int64]EtcdLease
|
Leases map[int64]EtcdLease
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s etcdState) Step(request EtcdRequest, response EtcdResponse) (bool, etcdState) {
|
func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, EtcdState) {
|
||||||
newState, modelResponse := s.step(request)
|
newState, modelResponse := s.Step(request)
|
||||||
return Match(MaybeEtcdResponse{EtcdResponse: response}, modelResponse), newState
|
return Match(MaybeEtcdResponse{EtcdResponse: response}, modelResponse), newState
|
||||||
}
|
}
|
||||||
|
|
||||||
func freshEtcdState() etcdState {
|
func freshEtcdState() EtcdState {
|
||||||
return etcdState{
|
return EtcdState{
|
||||||
Revision: 1,
|
Revision: 1,
|
||||||
KeyValues: map[string]ValueRevision{},
|
KeyValues: map[string]ValueRevision{},
|
||||||
KeyLeases: map[string]int64{},
|
KeyLeases: map[string]int64{},
|
||||||
|
@ -85,8 +85,8 @@ func freshEtcdState() etcdState {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// step handles a successful request, returning updated state and response it would generate.
|
// Step handles a successful request, returning updated state and response it would generate.
|
||||||
func (s etcdState) step(request EtcdRequest) (etcdState, MaybeEtcdResponse) {
|
func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
|
||||||
newKVs := map[string]ValueRevision{}
|
newKVs := map[string]ValueRevision{}
|
||||||
for k, v := range s.KeyValues {
|
for k, v := range s.KeyValues {
|
||||||
newKVs[k] = v
|
newKVs[k] = v
|
||||||
|
@ -185,7 +185,7 @@ func (s etcdState) step(request EtcdRequest) (etcdState, MaybeEtcdResponse) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s etcdState) getRange(key string, options RangeOptions) RangeResponse {
|
func (s EtcdState) getRange(key string, options RangeOptions) RangeResponse {
|
||||||
response := RangeResponse{
|
response := RangeResponse{
|
||||||
KVs: []KeyValue{},
|
KVs: []KeyValue{},
|
||||||
}
|
}
|
||||||
|
@ -217,7 +217,7 @@ func (s etcdState) getRange(key string, options RangeOptions) RangeResponse {
|
||||||
return response
|
return response
|
||||||
}
|
}
|
||||||
|
|
||||||
func detachFromOldLease(s etcdState, key string) etcdState {
|
func detachFromOldLease(s EtcdState, key string) EtcdState {
|
||||||
if oldLeaseId, ok := s.KeyLeases[key]; ok {
|
if oldLeaseId, ok := s.KeyLeases[key]; ok {
|
||||||
delete(s.Leases[oldLeaseId].Keys, key)
|
delete(s.Leases[oldLeaseId].Keys, key)
|
||||||
delete(s.KeyLeases, key)
|
delete(s.KeyLeases, key)
|
||||||
|
@ -225,7 +225,7 @@ func detachFromOldLease(s etcdState, key string) etcdState {
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func attachToNewLease(s etcdState, leaseID int64, key string) etcdState {
|
func attachToNewLease(s EtcdState, leaseID int64, key string) EtcdState {
|
||||||
s.KeyLeases[key] = leaseID
|
s.KeyLeases[key] = leaseID
|
||||||
s.Leases[leaseID].Keys[key] = leased
|
s.Leases[leaseID].Keys[key] = leased
|
||||||
return s
|
return s
|
||||||
|
|
|
@ -33,12 +33,12 @@ func TestModelDeterministic(t *testing.T) {
|
||||||
if op.expectFailure == ok {
|
if op.expectFailure == ok {
|
||||||
t.Logf("state: %v", state)
|
t.Logf("state: %v", state)
|
||||||
t.Errorf("Unexpected operation result, expect: %v, got: %v, operation: %s", !op.expectFailure, ok, DeterministicModel.DescribeOperation(op.req, op.resp.EtcdResponse))
|
t.Errorf("Unexpected operation result, expect: %v, got: %v, operation: %s", !op.expectFailure, ok, DeterministicModel.DescribeOperation(op.req, op.resp.EtcdResponse))
|
||||||
var loadedState etcdState
|
var loadedState EtcdState
|
||||||
err := json.Unmarshal([]byte(state.(string)), &loadedState)
|
err := json.Unmarshal([]byte(state.(string)), &loadedState)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to load state: %v", err)
|
t.Fatalf("Failed to load state: %v", err)
|
||||||
}
|
}
|
||||||
_, resp := loadedState.step(op.req)
|
_, resp := loadedState.Step(op.req)
|
||||||
t.Errorf("Response diff: %s", cmp.Diff(op.resp, resp))
|
t.Errorf("Response diff: %s", cmp.Diff(op.resp, resp))
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,7 +40,7 @@ var NonDeterministicModel = porcupine.Model{
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
ok, states := states.Step(in.(EtcdRequest), out.(MaybeEtcdResponse))
|
ok, states := states.apply(in.(EtcdRequest), out.(MaybeEtcdResponse))
|
||||||
data, err := json.Marshal(states)
|
data, err := json.Marshal(states)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -52,27 +52,27 @@ var NonDeterministicModel = porcupine.Model{
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
type nonDeterministicState []etcdState
|
type nonDeterministicState []EtcdState
|
||||||
|
|
||||||
func (states nonDeterministicState) Step(request EtcdRequest, response MaybeEtcdResponse) (bool, nonDeterministicState) {
|
func (states nonDeterministicState) apply(request EtcdRequest, response MaybeEtcdResponse) (bool, nonDeterministicState) {
|
||||||
var newStates nonDeterministicState
|
var newStates nonDeterministicState
|
||||||
switch {
|
switch {
|
||||||
case response.Err != nil:
|
case response.Err != nil:
|
||||||
newStates = states.stepFailedRequest(request)
|
newStates = states.stepFailedResponse(request)
|
||||||
case response.PartialResponse:
|
case response.PartialResponse:
|
||||||
newStates = states.stepPartialRequest(request, response.EtcdResponse.Revision)
|
newStates = states.applyResponseRevision(request, response.EtcdResponse.Revision)
|
||||||
default:
|
default:
|
||||||
newStates = states.stepSuccessfulRequest(request, response.EtcdResponse)
|
newStates = states.applySuccessfulResponse(request, response.EtcdResponse)
|
||||||
}
|
}
|
||||||
return len(newStates) > 0, newStates
|
return len(newStates) > 0, newStates
|
||||||
}
|
}
|
||||||
|
|
||||||
// stepFailedRequest duplicates number of states by considering request persisted and lost.
|
// stepFailedResponse duplicates number of states by considering both cases, request was persisted and request was lost.
|
||||||
func (states nonDeterministicState) stepFailedRequest(request EtcdRequest) nonDeterministicState {
|
func (states nonDeterministicState) stepFailedResponse(request EtcdRequest) nonDeterministicState {
|
||||||
newStates := make(nonDeterministicState, 0, len(states)*2)
|
newStates := make(nonDeterministicState, 0, len(states)*2)
|
||||||
for _, s := range states {
|
for _, s := range states {
|
||||||
newStates = append(newStates, s)
|
newStates = append(newStates, s)
|
||||||
newState, _ := s.step(request)
|
newState, _ := s.Step(request)
|
||||||
if !reflect.DeepEqual(newState, s) {
|
if !reflect.DeepEqual(newState, s) {
|
||||||
newStates = append(newStates, newState)
|
newStates = append(newStates, newState)
|
||||||
}
|
}
|
||||||
|
@ -80,11 +80,11 @@ func (states nonDeterministicState) stepFailedRequest(request EtcdRequest) nonDe
|
||||||
return newStates
|
return newStates
|
||||||
}
|
}
|
||||||
|
|
||||||
// stepPartialRequest filters possible states by leaving ony states that would return proper revision.
|
// applyResponseRevision filters possible states by leaving ony states that would return proper revision.
|
||||||
func (states nonDeterministicState) stepPartialRequest(request EtcdRequest, responseRevision int64) nonDeterministicState {
|
func (states nonDeterministicState) applyResponseRevision(request EtcdRequest, responseRevision int64) nonDeterministicState {
|
||||||
newStates := make(nonDeterministicState, 0, len(states))
|
newStates := make(nonDeterministicState, 0, len(states))
|
||||||
for _, s := range states {
|
for _, s := range states {
|
||||||
newState, modelResponse := s.step(request)
|
newState, modelResponse := s.Step(request)
|
||||||
if modelResponse.Revision == responseRevision {
|
if modelResponse.Revision == responseRevision {
|
||||||
newStates = append(newStates, newState)
|
newStates = append(newStates, newState)
|
||||||
}
|
}
|
||||||
|
@ -92,11 +92,11 @@ func (states nonDeterministicState) stepPartialRequest(request EtcdRequest, resp
|
||||||
return newStates
|
return newStates
|
||||||
}
|
}
|
||||||
|
|
||||||
// stepSuccessfulRequest filters possible states by leaving ony states that would respond correctly.
|
// applySuccessfulResponse filters possible states by leaving ony states that would respond correctly.
|
||||||
func (states nonDeterministicState) stepSuccessfulRequest(request EtcdRequest, response EtcdResponse) nonDeterministicState {
|
func (states nonDeterministicState) applySuccessfulResponse(request EtcdRequest, response EtcdResponse) nonDeterministicState {
|
||||||
newStates := make(nonDeterministicState, 0, len(states))
|
newStates := make(nonDeterministicState, 0, len(states))
|
||||||
for _, s := range states {
|
for _, s := range states {
|
||||||
newState, modelResponse := s.step(request)
|
newState, modelResponse := s.Step(request)
|
||||||
if Match(modelResponse, MaybeEtcdResponse{EtcdResponse: response}) {
|
if Match(modelResponse, MaybeEtcdResponse{EtcdResponse: response}) {
|
||||||
newStates = append(newStates, newState)
|
newStates = append(newStates, newState)
|
||||||
}
|
}
|
||||||
|
|
|
@ -339,7 +339,7 @@ func TestModelNonDeterministic(t *testing.T) {
|
||||||
t.Fatalf("Failed to load state: %v", err)
|
t.Fatalf("Failed to load state: %v", err)
|
||||||
}
|
}
|
||||||
for i, s := range loadedState {
|
for i, s := range loadedState {
|
||||||
_, resp := s.step(op.req)
|
_, resp := s.Step(op.req)
|
||||||
t.Errorf("For state %d, response diff: %s", i, cmp.Diff(op.resp, resp))
|
t.Errorf("For state %d, response diff: %s", i, cmp.Diff(op.resp, resp))
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
|
|
|
@ -0,0 +1,99 @@
|
||||||
|
// Copyright 2023 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package model
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewReplay(eventHistory []WatchEvent) *EtcdReplay {
|
||||||
|
var lastEventRevision int64 = 1
|
||||||
|
for _, event := range eventHistory {
|
||||||
|
if event.Revision > lastEventRevision && event.Revision != lastEventRevision+1 {
|
||||||
|
panic("Replay requires a complete event history")
|
||||||
|
}
|
||||||
|
lastEventRevision = event.Revision
|
||||||
|
}
|
||||||
|
return &EtcdReplay{
|
||||||
|
eventHistory: eventHistory,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type EtcdReplay struct {
|
||||||
|
eventHistory []WatchEvent
|
||||||
|
|
||||||
|
// Cached state and event index used for it's calculation
|
||||||
|
cachedState *EtcdState
|
||||||
|
eventHistoryIndex int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *EtcdReplay) StateForRevision(revision int64) (EtcdState, error) {
|
||||||
|
if revision < 1 {
|
||||||
|
return EtcdState{}, fmt.Errorf("invalid revision: %d", revision)
|
||||||
|
}
|
||||||
|
if r.cachedState == nil || r.cachedState.Revision > revision {
|
||||||
|
r.reset()
|
||||||
|
}
|
||||||
|
|
||||||
|
for r.eventHistoryIndex < len(r.eventHistory) && r.cachedState.Revision < revision {
|
||||||
|
nextRequest, nextRevision, nextIndex := r.next()
|
||||||
|
newState, _ := r.cachedState.Step(nextRequest)
|
||||||
|
if newState.Revision != nextRevision {
|
||||||
|
return EtcdState{}, fmt.Errorf("model returned different revision than one present in event history, model: %d, event: %d", newState.Revision, nextRevision)
|
||||||
|
}
|
||||||
|
r.cachedState = &newState
|
||||||
|
r.eventHistoryIndex = nextIndex
|
||||||
|
}
|
||||||
|
if r.eventHistoryIndex > len(r.eventHistory) && r.cachedState.Revision < revision {
|
||||||
|
return EtcdState{}, fmt.Errorf("requested revision higher then available in even history, requested: %d, model: %d", revision, r.cachedState.Revision)
|
||||||
|
}
|
||||||
|
return *r.cachedState, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *EtcdReplay) reset() {
|
||||||
|
state := freshEtcdState()
|
||||||
|
r.cachedState = &state
|
||||||
|
r.eventHistoryIndex = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *EtcdReplay) next() (request EtcdRequest, revision int64, index int) {
|
||||||
|
revision = r.eventHistory[r.eventHistoryIndex].Revision
|
||||||
|
index = r.eventHistoryIndex
|
||||||
|
operations := []EtcdOperation{}
|
||||||
|
for r.eventHistory[index].Revision == revision {
|
||||||
|
operations = append(operations, r.eventHistory[index].Op)
|
||||||
|
index++
|
||||||
|
}
|
||||||
|
return EtcdRequest{
|
||||||
|
Type: Txn,
|
||||||
|
Txn: &TxnRequest{
|
||||||
|
OperationsOnSuccess: operations,
|
||||||
|
},
|
||||||
|
}, revision, index
|
||||||
|
}
|
||||||
|
|
||||||
|
func operationToRequest(op EtcdOperation) EtcdRequest {
|
||||||
|
return EtcdRequest{
|
||||||
|
Type: Txn,
|
||||||
|
Txn: &TxnRequest{
|
||||||
|
OperationsOnSuccess: []EtcdOperation{op},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type WatchEvent struct {
|
||||||
|
Op EtcdOperation
|
||||||
|
Revision int64
|
||||||
|
}
|
|
@ -34,7 +34,7 @@ type report struct {
|
||||||
lg *zap.Logger
|
lg *zap.Logger
|
||||||
clus *e2e.EtcdProcessCluster
|
clus *e2e.EtcdProcessCluster
|
||||||
clientReports []traffic.ClientReport
|
clientReports []traffic.ClientReport
|
||||||
visualizeHistory func(path string)
|
visualizeHistory func(path string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func testResultsDirectory(t *testing.T) string {
|
func testResultsDirectory(t *testing.T) string {
|
||||||
|
@ -89,7 +89,10 @@ func (r *report) Report(t *testing.T, force bool) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if r.visualizeHistory != nil {
|
if r.visualizeHistory != nil {
|
||||||
r.visualizeHistory(filepath.Join(path, "history.html"))
|
err := r.visualizeHistory(filepath.Join(path, "history.html"))
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -46,22 +46,17 @@ type RecordingClient struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type WatchResponse struct {
|
type WatchResponse struct {
|
||||||
Events []WatchEvent
|
Events []model.WatchEvent
|
||||||
IsProgressNotify bool
|
IsProgressNotify bool
|
||||||
Revision int64
|
Revision int64
|
||||||
Time time.Duration
|
Time time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type TimedWatchEvent struct {
|
type TimedWatchEvent struct {
|
||||||
WatchEvent
|
model.WatchEvent
|
||||||
Time time.Duration
|
Time time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type WatchEvent struct {
|
|
||||||
Op model.EtcdOperation
|
|
||||||
Revision int64
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*RecordingClient, error) {
|
func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*RecordingClient, error) {
|
||||||
cc, err := clientv3.New(clientv3.Config{
|
cc, err := clientv3.New(clientv3.Config{
|
||||||
Endpoints: endpoints,
|
Endpoints: endpoints,
|
||||||
|
@ -259,7 +254,7 @@ func ToWatchResponse(r clientv3.WatchResponse, baseTime time.Time) WatchResponse
|
||||||
return resp
|
return resp
|
||||||
}
|
}
|
||||||
|
|
||||||
func toWatchEvent(event clientv3.Event) WatchEvent {
|
func toWatchEvent(event clientv3.Event) model.WatchEvent {
|
||||||
var op model.OperationType
|
var op model.OperationType
|
||||||
switch event.Type {
|
switch event.Type {
|
||||||
case mvccpb.PUT:
|
case mvccpb.PUT:
|
||||||
|
@ -269,7 +264,7 @@ func toWatchEvent(event clientv3.Event) WatchEvent {
|
||||||
default:
|
default:
|
||||||
panic(fmt.Sprintf("Unexpected event type: %s", event.Type))
|
panic(fmt.Sprintf("Unexpected event type: %s", event.Type))
|
||||||
}
|
}
|
||||||
return WatchEvent{
|
return model.WatchEvent{
|
||||||
Revision: event.Kv.ModRevision,
|
Revision: event.Kv.ModRevision,
|
||||||
Op: model.EtcdOperation{
|
Op: model.EtcdOperation{
|
||||||
Type: op,
|
Type: op,
|
||||||
|
|
|
@ -100,6 +100,13 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.
|
||||||
lastOperationSucceeded := true
|
lastOperationSucceeded := true
|
||||||
var lastRev int64
|
var lastRev int64
|
||||||
var requestType etcdRequestType
|
var requestType etcdRequestType
|
||||||
|
client := etcdTrafficClient{
|
||||||
|
etcdTraffic: t,
|
||||||
|
client: c,
|
||||||
|
limiter: limiter,
|
||||||
|
idProvider: ids,
|
||||||
|
leaseStorage: lm,
|
||||||
|
}
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
@ -115,7 +122,7 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.
|
||||||
} else {
|
} else {
|
||||||
requestType = Get
|
requestType = Get
|
||||||
}
|
}
|
||||||
rev, err := t.Request(ctx, c, requestType, limiter, key, ids, lm, lastRev)
|
rev, err := client.Request(ctx, requestType, key, lastRev)
|
||||||
lastOperationSucceeded = err == nil
|
lastOperationSucceeded = err == nil
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
|
@ -127,87 +134,95 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t etcdTraffic) Request(ctx context.Context, c *RecordingClient, request etcdRequestType, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage, lastRev int64) (rev int64, err error) {
|
type etcdTrafficClient struct {
|
||||||
|
etcdTraffic
|
||||||
|
client *RecordingClient
|
||||||
|
limiter *rate.Limiter
|
||||||
|
idProvider identity.Provider
|
||||||
|
leaseStorage identity.LeaseIdStorage
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType, key string, lastRev int64) (rev int64, err error) {
|
||||||
opCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
|
opCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
|
||||||
|
|
||||||
switch request {
|
switch request {
|
||||||
case StaleGet:
|
case StaleGet:
|
||||||
_, rev, err = c.Get(opCtx, key, lastRev)
|
_, rev, err = c.client.Get(opCtx, key, lastRev)
|
||||||
case Get:
|
case Get:
|
||||||
_, rev, err = c.Get(opCtx, key, 0)
|
_, rev, err = c.client.Get(opCtx, key, 0)
|
||||||
case Put:
|
case Put:
|
||||||
var resp *clientv3.PutResponse
|
var resp *clientv3.PutResponse
|
||||||
resp, err = c.Put(opCtx, key, fmt.Sprintf("%d", id.NewRequestId()))
|
resp, err = c.client.Put(opCtx, key, fmt.Sprintf("%d", c.idProvider.NewRequestId()))
|
||||||
if resp != nil {
|
if resp != nil {
|
||||||
rev = resp.Header.Revision
|
rev = resp.Header.Revision
|
||||||
}
|
}
|
||||||
case LargePut:
|
case LargePut:
|
||||||
var resp *clientv3.PutResponse
|
var resp *clientv3.PutResponse
|
||||||
resp, err = c.Put(opCtx, key, randString(t.largePutSize))
|
resp, err = c.client.Put(opCtx, key, randString(c.largePutSize))
|
||||||
if resp != nil {
|
if resp != nil {
|
||||||
rev = resp.Header.Revision
|
rev = resp.Header.Revision
|
||||||
}
|
}
|
||||||
case Delete:
|
case Delete:
|
||||||
var resp *clientv3.DeleteResponse
|
var resp *clientv3.DeleteResponse
|
||||||
resp, err = c.Delete(opCtx, key)
|
resp, err = c.client.Delete(opCtx, key)
|
||||||
if resp != nil {
|
if resp != nil {
|
||||||
rev = resp.Header.Revision
|
rev = resp.Header.Revision
|
||||||
}
|
}
|
||||||
case MultiOpTxn:
|
case MultiOpTxn:
|
||||||
var resp *clientv3.TxnResponse
|
var resp *clientv3.TxnResponse
|
||||||
resp, err = c.Txn(opCtx, nil, t.pickMultiTxnOps(id), nil)
|
resp, err = c.client.Txn(opCtx, nil, c.pickMultiTxnOps(), nil)
|
||||||
if resp != nil {
|
if resp != nil {
|
||||||
rev = resp.Header.Revision
|
rev = resp.Header.Revision
|
||||||
}
|
}
|
||||||
|
|
||||||
case CompareAndSet:
|
case CompareAndSet:
|
||||||
var kv *mvccpb.KeyValue
|
var kv *mvccpb.KeyValue
|
||||||
kv, rev, err = c.Get(opCtx, key, 0)
|
kv, rev, err = c.client.Get(opCtx, key, 0)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
limiter.Wait(ctx)
|
c.limiter.Wait(ctx)
|
||||||
var expectedRevision int64
|
var expectedRevision int64
|
||||||
if kv != nil {
|
if kv != nil {
|
||||||
expectedRevision = kv.ModRevision
|
expectedRevision = kv.ModRevision
|
||||||
}
|
}
|
||||||
txnCtx, txnCancel := context.WithTimeout(ctx, RequestTimeout)
|
txnCtx, txnCancel := context.WithTimeout(ctx, RequestTimeout)
|
||||||
var resp *clientv3.TxnResponse
|
var resp *clientv3.TxnResponse
|
||||||
resp, err = c.Txn(txnCtx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, fmt.Sprintf("%d", id.NewRequestId()))}, nil)
|
resp, err = c.client.Txn(txnCtx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, fmt.Sprintf("%d", c.idProvider.NewRequestId()))}, nil)
|
||||||
txnCancel()
|
txnCancel()
|
||||||
if resp != nil {
|
if resp != nil {
|
||||||
rev = resp.Header.Revision
|
rev = resp.Header.Revision
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case PutWithLease:
|
case PutWithLease:
|
||||||
leaseId := lm.LeaseId(c.id)
|
leaseId := c.leaseStorage.LeaseId(c.client.id)
|
||||||
if leaseId == 0 {
|
if leaseId == 0 {
|
||||||
var resp *clientv3.LeaseGrantResponse
|
var resp *clientv3.LeaseGrantResponse
|
||||||
resp, err = c.LeaseGrant(opCtx, t.leaseTTL)
|
resp, err = c.client.LeaseGrant(opCtx, c.leaseTTL)
|
||||||
if resp != nil {
|
if resp != nil {
|
||||||
leaseId = int64(resp.ID)
|
leaseId = int64(resp.ID)
|
||||||
rev = resp.ResponseHeader.Revision
|
rev = resp.ResponseHeader.Revision
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
lm.AddLeaseId(c.id, leaseId)
|
c.leaseStorage.AddLeaseId(c.client.id, leaseId)
|
||||||
limiter.Wait(ctx)
|
c.limiter.Wait(ctx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if leaseId != 0 {
|
if leaseId != 0 {
|
||||||
putCtx, putCancel := context.WithTimeout(ctx, RequestTimeout)
|
putCtx, putCancel := context.WithTimeout(ctx, RequestTimeout)
|
||||||
var resp *clientv3.PutResponse
|
var resp *clientv3.PutResponse
|
||||||
resp, err = c.PutWithLease(putCtx, key, fmt.Sprintf("%d", id.NewRequestId()), leaseId)
|
resp, err = c.client.PutWithLease(putCtx, key, fmt.Sprintf("%d", c.idProvider.NewRequestId()), leaseId)
|
||||||
putCancel()
|
putCancel()
|
||||||
if resp != nil {
|
if resp != nil {
|
||||||
rev = resp.Header.Revision
|
rev = resp.Header.Revision
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case LeaseRevoke:
|
case LeaseRevoke:
|
||||||
leaseId := lm.LeaseId(c.id)
|
leaseId := c.leaseStorage.LeaseId(c.client.id)
|
||||||
if leaseId != 0 {
|
if leaseId != 0 {
|
||||||
var resp *clientv3.LeaseRevokeResponse
|
var resp *clientv3.LeaseRevokeResponse
|
||||||
resp, err = c.LeaseRevoke(opCtx, leaseId)
|
resp, err = c.client.LeaseRevoke(opCtx, leaseId)
|
||||||
//if LeaseRevoke has failed, do not remove the mapping.
|
//if LeaseRevoke has failed, do not remove the mapping.
|
||||||
if err == nil {
|
if err == nil {
|
||||||
lm.RemoveLeaseId(c.id)
|
c.leaseStorage.RemoveLeaseId(c.client.id)
|
||||||
}
|
}
|
||||||
if resp != nil {
|
if resp != nil {
|
||||||
rev = resp.Header.Revision
|
rev = resp.Header.Revision
|
||||||
|
@ -215,7 +230,7 @@ func (t etcdTraffic) Request(ctx context.Context, c *RecordingClient, request et
|
||||||
}
|
}
|
||||||
case Defragment:
|
case Defragment:
|
||||||
var resp *clientv3.DefragmentResponse
|
var resp *clientv3.DefragmentResponse
|
||||||
resp, err = c.Defragment(opCtx)
|
resp, err = c.client.Defragment(opCtx)
|
||||||
if resp != nil {
|
if resp != nil {
|
||||||
rev = resp.Header.Revision
|
rev = resp.Header.Revision
|
||||||
}
|
}
|
||||||
|
@ -226,13 +241,13 @@ func (t etcdTraffic) Request(ctx context.Context, c *RecordingClient, request et
|
||||||
return rev, err
|
return rev, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t etcdTraffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op) {
|
func (c etcdTrafficClient) pickMultiTxnOps() (ops []clientv3.Op) {
|
||||||
keys := rand.Perm(t.keyCount)
|
keys := rand.Perm(c.keyCount)
|
||||||
opTypes := make([]model.OperationType, 4)
|
opTypes := make([]model.OperationType, 4)
|
||||||
|
|
||||||
atLeastOnePut := false
|
atLeastOnePut := false
|
||||||
for i := 0; i < MultiOpTxnOpCount; i++ {
|
for i := 0; i < MultiOpTxnOpCount; i++ {
|
||||||
opTypes[i] = t.pickOperationType()
|
opTypes[i] = c.pickOperationType()
|
||||||
if opTypes[i] == model.PutOperation {
|
if opTypes[i] == model.PutOperation {
|
||||||
atLeastOnePut = true
|
atLeastOnePut = true
|
||||||
}
|
}
|
||||||
|
@ -248,7 +263,7 @@ func (t etcdTraffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op)
|
||||||
case model.RangeOperation:
|
case model.RangeOperation:
|
||||||
ops = append(ops, clientv3.OpGet(key))
|
ops = append(ops, clientv3.OpGet(key))
|
||||||
case model.PutOperation:
|
case model.PutOperation:
|
||||||
value := fmt.Sprintf("%d", ids.NewRequestId())
|
value := fmt.Sprintf("%d", c.idProvider.NewRequestId())
|
||||||
ops = append(ops, clientv3.OpPut(key, value))
|
ops = append(ops, clientv3.OpPut(key, value))
|
||||||
case model.DeleteOperation:
|
case model.DeleteOperation:
|
||||||
ops = append(ops, clientv3.OpDelete(key))
|
ops = append(ops, clientv3.OpDelete(key))
|
||||||
|
|
|
@ -37,13 +37,13 @@ var (
|
||||||
maximalQPS: 1000,
|
maximalQPS: 1000,
|
||||||
clientCount: 12,
|
clientCount: 12,
|
||||||
Traffic: kubernetesTraffic{
|
Traffic: kubernetesTraffic{
|
||||||
averageKeyCount: 5,
|
averageKeyCount: 10,
|
||||||
resource: "pods",
|
resource: "pods",
|
||||||
namespace: "default",
|
namespace: "default",
|
||||||
writeChoices: []choiceWeight[KubernetesRequestType]{
|
writeChoices: []choiceWeight[KubernetesRequestType]{
|
||||||
{choice: KubernetesUpdate, weight: 75},
|
{choice: KubernetesUpdate, weight: 90},
|
||||||
{choice: KubernetesDelete, weight: 15},
|
{choice: KubernetesDelete, weight: 5},
|
||||||
{choice: KubernetesCreate, weight: 10},
|
{choice: KubernetesCreate, weight: 5},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,28 +15,92 @@
|
||||||
package validate
|
package validate
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/anishathalye/porcupine"
|
"github.com/anishathalye/porcupine"
|
||||||
|
"github.com/google/go-cmp/cmp"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"go.etcd.io/etcd/tests/v3/robustness/model"
|
"go.etcd.io/etcd/tests/v3/robustness/model"
|
||||||
)
|
)
|
||||||
|
|
||||||
func validateOperationHistoryAndReturnVisualize(t *testing.T, lg *zap.Logger, operations []porcupine.Operation) (visualize func(basepath string)) {
|
func validateOperationsAndVisualize(t *testing.T, lg *zap.Logger, operations []porcupine.Operation, eventHistory []model.WatchEvent) (visualize func(basepath string) error) {
|
||||||
linearizable, info := porcupine.CheckOperationsVerbose(model.NonDeterministicModel, operations, 5*time.Minute)
|
const timeout = 5 * time.Minute
|
||||||
if linearizable == porcupine.Illegal {
|
lg.Info("Validating linearizable operations", zap.Duration("timeout", timeout))
|
||||||
|
result, visualize := validateLinearizableOperationAndVisualize(lg, operations, timeout)
|
||||||
|
switch result {
|
||||||
|
case porcupine.Illegal:
|
||||||
|
t.Error("Linearization failed for provided operations")
|
||||||
|
return
|
||||||
|
case porcupine.Unknown:
|
||||||
t.Error("Model is not linearizable")
|
t.Error("Model is not linearizable")
|
||||||
|
return
|
||||||
|
case porcupine.Ok:
|
||||||
|
t.Log("Linearization passed")
|
||||||
|
default:
|
||||||
|
t.Fatalf("Unknown Linearization")
|
||||||
}
|
}
|
||||||
if linearizable == porcupine.Unknown {
|
lg.Info("Validating serializable operations")
|
||||||
t.Error("Linearization timed out")
|
// TODO: Use linearization result instead of event history to get order of events
|
||||||
|
// This is currently impossible as porcupine doesn't expose operation order created during linearization.
|
||||||
|
validateSerializableOperations(t, operations, eventHistory)
|
||||||
|
return visualize
|
||||||
}
|
}
|
||||||
return func(path string) {
|
|
||||||
|
func validateLinearizableOperationAndVisualize(lg *zap.Logger, operations []porcupine.Operation, timeout time.Duration) (result porcupine.CheckResult, visualize func(basepath string) error) {
|
||||||
|
linearizable, info := porcupine.CheckOperationsVerbose(model.NonDeterministicModel, operations, timeout)
|
||||||
|
return linearizable, func(path string) error {
|
||||||
lg.Info("Saving visualization", zap.String("path", path))
|
lg.Info("Saving visualization", zap.String("path", path))
|
||||||
err := porcupine.VisualizePath(model.NonDeterministicModel, info, path)
|
err := porcupine.VisualizePath(model.NonDeterministicModel, info, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Failed to visualize, err: %v", err)
|
return fmt.Errorf("failed to visualize, err: %v", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func validateSerializableOperations(t *testing.T, operations []porcupine.Operation, totalEventHistory []model.WatchEvent) {
|
||||||
|
staleReads := filterSerializableReads(operations)
|
||||||
|
if len(staleReads) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sort.Slice(staleReads, func(i, j int) bool {
|
||||||
|
return staleReads[i].Input.(model.EtcdRequest).Range.Revision < staleReads[j].Input.(model.EtcdRequest).Range.Revision
|
||||||
|
})
|
||||||
|
replay := model.NewReplay(totalEventHistory)
|
||||||
|
for _, read := range staleReads {
|
||||||
|
request := read.Input.(model.EtcdRequest)
|
||||||
|
response := read.Output.(model.MaybeEtcdResponse)
|
||||||
|
validateSerializableOperation(t, replay, request, response)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterSerializableReads(operations []porcupine.Operation) []porcupine.Operation {
|
||||||
|
resp := []porcupine.Operation{}
|
||||||
|
for _, op := range operations {
|
||||||
|
request := op.Input.(model.EtcdRequest)
|
||||||
|
if request.Type == model.Range && request.Range.Revision != 0 {
|
||||||
|
resp = append(resp, op)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return resp
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateSerializableOperation(t *testing.T, replay *model.EtcdReplay, request model.EtcdRequest, response model.MaybeEtcdResponse) {
|
||||||
|
if response.PartialResponse || response.Err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
state, err := replay.StateForRevision(request.Range.Revision)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, expectResp := state.Step(request)
|
||||||
|
if !reflect.DeepEqual(response.EtcdResponse.Range, expectResp.Range) {
|
||||||
|
t.Errorf("Invalid serializable response, diff: %s", cmp.Diff(response.EtcdResponse.Range, expectResp.Range))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,14 +24,13 @@ import (
|
||||||
"go.etcd.io/etcd/tests/v3/robustness/traffic"
|
"go.etcd.io/etcd/tests/v3/robustness/traffic"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ValidateAndReturnVisualize return visualize as porcupine.linearizationInfo used to generate visualization is private.
|
// ValidateAndReturnVisualize returns visualize as porcupine.linearizationInfo used to generate visualization is private.
|
||||||
func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []traffic.ClientReport) (visualize func(basepath string)) {
|
func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []traffic.ClientReport) (visualize func(basepath string) error) {
|
||||||
validateWatch(t, cfg, reports)
|
eventHistory := validateWatch(t, cfg, reports)
|
||||||
// TODO: Validate stale reads responses.
|
|
||||||
allOperations := operations(reports)
|
allOperations := operations(reports)
|
||||||
watchEvents := uniqueWatchEvents(reports)
|
watchEvents := uniqueWatchEvents(reports)
|
||||||
newOperations := patchOperationsWithWatchEvents(allOperations, watchEvents)
|
patchedOperations := patchOperationsWithWatchEvents(allOperations, watchEvents)
|
||||||
return validateOperationHistoryAndReturnVisualize(t, lg, newOperations)
|
return validateOperationsAndVisualize(t, lg, patchedOperations, eventHistory)
|
||||||
}
|
}
|
||||||
|
|
||||||
func operations(reports []traffic.ClientReport) []porcupine.Operation {
|
func operations(reports []traffic.ClientReport) []porcupine.Operation {
|
||||||
|
|
|
@ -19,10 +19,11 @@ import (
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
|
|
||||||
|
"go.etcd.io/etcd/tests/v3/robustness/model"
|
||||||
"go.etcd.io/etcd/tests/v3/robustness/traffic"
|
"go.etcd.io/etcd/tests/v3/robustness/traffic"
|
||||||
)
|
)
|
||||||
|
|
||||||
func validateWatch(t *testing.T, cfg Config, reports []traffic.ClientReport) {
|
func validateWatch(t *testing.T, cfg Config, reports []traffic.ClientReport) []model.WatchEvent {
|
||||||
// Validate etcd watch properties defined in https://etcd.io/docs/v3.6/learning/api_guarantees/#watch-apis
|
// Validate etcd watch properties defined in https://etcd.io/docs/v3.6/learning/api_guarantees/#watch-apis
|
||||||
for _, r := range reports {
|
for _, r := range reports {
|
||||||
validateOrdered(t, r)
|
validateOrdered(t, r)
|
||||||
|
@ -34,8 +35,10 @@ func validateWatch(t *testing.T, cfg Config, reports []traffic.ClientReport) {
|
||||||
validateEventsMatch(t, reports)
|
validateEventsMatch(t, reports)
|
||||||
// Expects that longest history encompasses all events.
|
// Expects that longest history encompasses all events.
|
||||||
// TODO: Use combined events from all histories instead of the longest history.
|
// TODO: Use combined events from all histories instead of the longest history.
|
||||||
|
eventHistory := longestEventHistory(reports)
|
||||||
// TODO: Validate that each watch report is reliable, not only the longest one.
|
// TODO: Validate that each watch report is reliable, not only the longest one.
|
||||||
validateReliable(t, longestEventHistory(reports))
|
validateReliable(t, eventHistory)
|
||||||
|
return watchEvents(eventHistory)
|
||||||
}
|
}
|
||||||
|
|
||||||
func validateBookmarkable(t *testing.T, report traffic.ClientReport) {
|
func validateBookmarkable(t *testing.T, report traffic.ClientReport) {
|
||||||
|
@ -127,7 +130,7 @@ func validateEventsMatch(t *testing.T, reports []traffic.ClientReport) {
|
||||||
key string
|
key string
|
||||||
}
|
}
|
||||||
type eventClientId struct {
|
type eventClientId struct {
|
||||||
traffic.WatchEvent
|
model.WatchEvent
|
||||||
ClientId int
|
ClientId int
|
||||||
}
|
}
|
||||||
revisionKeyToEvent := map[revisionKey]eventClientId{}
|
revisionKeyToEvent := map[revisionKey]eventClientId{}
|
||||||
|
@ -158,3 +161,11 @@ func longestEventHistory(report []traffic.ClientReport) []traffic.TimedWatchEven
|
||||||
}
|
}
|
||||||
return toWatchEvents(report[longestIndex].Watch)
|
return toWatchEvents(report[longestIndex].Watch)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func watchEvents(timed []traffic.TimedWatchEvent) []model.WatchEvent {
|
||||||
|
result := make([]model.WatchEvent, 0, len(timed))
|
||||||
|
for _, event := range timed {
|
||||||
|
result = append(result, event.WatchEvent)
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue