Merge pull request #15282 from serathius/linearizability-report-watch
test: Report watch historiesdependabot/go_modules/go.uber.org/atomic-1.10.0
commit
116a3150c0
|
@ -45,6 +45,7 @@ jobs:
|
||||||
esac
|
esac
|
||||||
- name: test-linearizability
|
- name: test-linearizability
|
||||||
run: |
|
run: |
|
||||||
|
# Use --failfast to avoid overriding report generated by failed test
|
||||||
EXPECT_DEBUG=true GO_TEST_FLAGS='-v --count ${{ inputs.count }} --timeout ${{ inputs.testTimeout }} --failfast --run TestLinearizability' RESULTS_DIR=/tmp/linearizability make test-linearizability
|
EXPECT_DEBUG=true GO_TEST_FLAGS='-v --count ${{ inputs.count }} --timeout ${{ inputs.testTimeout }} --failfast --run TestLinearizability' RESULTS_DIR=/tmp/linearizability make test-linearizability
|
||||||
- uses: actions/upload-artifact@v2
|
- uses: actions/upload-artifact@v2
|
||||||
if: always()
|
if: always()
|
||||||
|
|
|
@ -19,7 +19,6 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -167,29 +166,77 @@ func TestLinearizability(t *testing.T) {
|
||||||
lg := zaptest.NewLogger(t)
|
lg := zaptest.NewLogger(t)
|
||||||
scenario.config.Logger = lg
|
scenario.config.Logger = lg
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
clus, err := e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&scenario.config))
|
testLinearizability(ctx, t, lg, scenario.config, scenario.traffic, FailpointConfig{
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer clus.Close()
|
|
||||||
operations, watchResponses := testLinearizability(ctx, t, lg, clus, FailpointConfig{
|
|
||||||
failpoint: scenario.failpoint,
|
failpoint: scenario.failpoint,
|
||||||
count: 1,
|
count: 1,
|
||||||
retries: 3,
|
retries: 3,
|
||||||
waitBetweenTriggers: waitBetweenFailpointTriggers,
|
waitBetweenTriggers: waitBetweenFailpointTriggers,
|
||||||
}, *scenario.traffic)
|
})
|
||||||
forcestopCluster(clus)
|
|
||||||
watchProgressNotifyEnabled := clus.Cfg.WatchProcessNotifyInterval != 0
|
|
||||||
validateWatchResponses(t, watchResponses, watchProgressNotifyEnabled)
|
|
||||||
longestHistory, remainingEvents := watchEventHistory(watchResponses)
|
|
||||||
validateEventsMatch(t, longestHistory, remainingEvents)
|
|
||||||
operations = patchOperationBasedOnWatchEvents(operations, longestHistory)
|
|
||||||
checkOperationsAndPersistResults(t, lg, operations, clus)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testLinearizability(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint FailpointConfig, traffic trafficConfig) (operations []porcupine.Operation, responses [][]watchResponse) {
|
func testLinearizability(ctx context.Context, t *testing.T, lg *zap.Logger, config e2e.EtcdProcessClusterConfig, traffic *trafficConfig, failpoint FailpointConfig) {
|
||||||
|
r := report{lg: lg}
|
||||||
|
var err error
|
||||||
|
r.clus, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&config))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer r.clus.Close()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
r.Report(t)
|
||||||
|
}()
|
||||||
|
r.operations, r.responses = runScenario(ctx, t, lg, r.clus, *traffic, failpoint)
|
||||||
|
forcestopCluster(r.clus)
|
||||||
|
|
||||||
|
watchProgressNotifyEnabled := r.clus.Cfg.WatchProcessNotifyInterval != 0
|
||||||
|
validateWatchResponses(t, r.responses, watchProgressNotifyEnabled)
|
||||||
|
|
||||||
|
r.events = watchEvents(r.responses)
|
||||||
|
validateEventsMatch(t, r.events)
|
||||||
|
|
||||||
|
r.patchedOperations = patchOperationBasedOnWatchEvents(r.operations, longestHistory(r.events))
|
||||||
|
r.visualizeHistory = validateOperationHistoryAndReturnVisualize(t, lg, r.patchedOperations)
|
||||||
|
}
|
||||||
|
|
||||||
|
type report struct {
|
||||||
|
lg *zap.Logger
|
||||||
|
clus *e2e.EtcdProcessCluster
|
||||||
|
responses [][]watchResponse
|
||||||
|
events [][]watchEvent
|
||||||
|
operations []porcupine.Operation
|
||||||
|
patchedOperations []porcupine.Operation
|
||||||
|
visualizeHistory func(path string)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *report) Report(t *testing.T) {
|
||||||
|
path := testResultsDirectory(t)
|
||||||
|
if t.Failed() {
|
||||||
|
for i, member := range r.clus.Procs {
|
||||||
|
memberDataDir := filepath.Join(path, member.Config().Name)
|
||||||
|
persistMemberDataDir(t, r.lg, member, memberDataDir)
|
||||||
|
if r.responses != nil {
|
||||||
|
persistWatchResponses(t, r.lg, filepath.Join(memberDataDir, "responses.json"), r.responses[i])
|
||||||
|
}
|
||||||
|
if r.events != nil {
|
||||||
|
persistWatchEvents(t, r.lg, filepath.Join(memberDataDir, "events.json"), r.events[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if r.operations != nil {
|
||||||
|
persistOperationHistory(t, r.lg, filepath.Join(path, "full-history.json"), r.operations)
|
||||||
|
}
|
||||||
|
if r.patchedOperations != nil {
|
||||||
|
persistOperationHistory(t, r.lg, filepath.Join(path, "patched-history.json"), r.patchedOperations)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if r.visualizeHistory != nil {
|
||||||
|
r.visualizeHistory(filepath.Join(path, "history.html"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, traffic trafficConfig, failpoint FailpointConfig) (operations []porcupine.Operation, responses [][]watchResponse) {
|
||||||
// Run multiple test components (traffic, failpoints, etc) in parallel and use canceling context to propagate stop signal.
|
// Run multiple test components (traffic, failpoints, etc) in parallel and use canceling context to propagate stop signal.
|
||||||
g := errgroup.Group{}
|
g := errgroup.Group{}
|
||||||
trafficCtx, trafficCancel := context.WithCancel(ctx)
|
trafficCtx, trafficCancel := context.WithCancel(ctx)
|
||||||
|
@ -385,34 +432,37 @@ type trafficConfig struct {
|
||||||
traffic Traffic
|
traffic Traffic
|
||||||
}
|
}
|
||||||
|
|
||||||
func watchEventHistory(responses [][]watchResponse) (longest []watchEvent, rest [][]watchEvent) {
|
func watchEvents(responses [][]watchResponse) [][]watchEvent {
|
||||||
ops := make([][]watchEvent, len(responses))
|
ops := make([][]watchEvent, len(responses))
|
||||||
for i, resps := range responses {
|
for i, resps := range responses {
|
||||||
ops[i] = toWatchEvents(resps)
|
ops[i] = toWatchEvents(resps)
|
||||||
}
|
}
|
||||||
|
return ops
|
||||||
sort.Slice(ops, func(i, j int) bool {
|
|
||||||
return len(ops[i]) > len(ops[j])
|
|
||||||
})
|
|
||||||
return ops[0], ops[1:]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func validateEventsMatch(t *testing.T, longestHistory []watchEvent, other [][]watchEvent) {
|
func validateEventsMatch(t *testing.T, histories [][]watchEvent) {
|
||||||
for i := 0; i < len(other); i++ {
|
longestHistory := longestHistory(histories)
|
||||||
length := len(other[i])
|
for i := 0; i < len(histories); i++ {
|
||||||
|
length := len(histories[i])
|
||||||
// We compare prefix of watch events, as we are not guaranteed to collect all events from each node.
|
// We compare prefix of watch events, as we are not guaranteed to collect all events from each node.
|
||||||
if diff := cmp.Diff(longestHistory[:length], other[i][:length], cmpopts.IgnoreFields(watchEvent{}, "Time")); diff != "" {
|
if diff := cmp.Diff(longestHistory[:length], histories[i][:length], cmpopts.IgnoreFields(watchEvent{}, "Time")); diff != "" {
|
||||||
t.Errorf("Events in watches do not match, %s", diff)
|
t.Error("Events in watches do not match")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkOperationsAndPersistResults(t *testing.T, lg *zap.Logger, operations []porcupine.Operation, clus *e2e.EtcdProcessCluster) {
|
func longestHistory(histories [][]watchEvent) []watchEvent {
|
||||||
path, err := testResultsDirectory(t)
|
longestIndex := 0
|
||||||
if err != nil {
|
for i, history := range histories {
|
||||||
t.Error(err)
|
if len(history) > len(histories[longestIndex]) {
|
||||||
|
longestIndex = i
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return histories[longestIndex]
|
||||||
|
}
|
||||||
|
|
||||||
|
// return visualize as porcupine.linearizationInfo used to generate visualization is private
|
||||||
|
func validateOperationHistoryAndReturnVisualize(t *testing.T, lg *zap.Logger, operations []porcupine.Operation) (visualize func(basepath string)) {
|
||||||
linearizable, info := porcupine.CheckOperationsVerbose(model.Etcd, operations, 5*time.Minute)
|
linearizable, info := porcupine.CheckOperationsVerbose(model.Etcd, operations, 5*time.Minute)
|
||||||
if linearizable == porcupine.Illegal {
|
if linearizable == porcupine.Illegal {
|
||||||
t.Error("Model is not linearizable")
|
t.Error("Model is not linearizable")
|
||||||
|
@ -420,23 +470,18 @@ func checkOperationsAndPersistResults(t *testing.T, lg *zap.Logger, operations [
|
||||||
if linearizable == porcupine.Unknown {
|
if linearizable == porcupine.Unknown {
|
||||||
t.Error("Linearization timed out")
|
t.Error("Linearization timed out")
|
||||||
}
|
}
|
||||||
if linearizable != porcupine.Ok {
|
return func(path string) {
|
||||||
persistOperationHistory(t, lg, path, operations)
|
lg.Info("Saving visualization", zap.String("path", path))
|
||||||
persistMemberDataDir(t, lg, clus, path)
|
err := porcupine.VisualizePath(model.Etcd, info, path)
|
||||||
}
|
if err != nil {
|
||||||
|
t.Errorf("Failed to visualize, err: %v", err)
|
||||||
visualizationPath := filepath.Join(path, "history.html")
|
}
|
||||||
lg.Info("Saving visualization", zap.String("path", visualizationPath))
|
|
||||||
err = porcupine.VisualizePath(model.Etcd, info, visualizationPath)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Failed to visualize, err: %v", err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func persistOperationHistory(t *testing.T, lg *zap.Logger, path string, operations []porcupine.Operation) {
|
func persistOperationHistory(t *testing.T, lg *zap.Logger, path string, operations []porcupine.Operation) {
|
||||||
historyFilePath := filepath.Join(path, "history.json")
|
lg.Info("Saving operation history", zap.String("path", path))
|
||||||
lg.Info("Saving operation history", zap.String("path", historyFilePath))
|
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
|
||||||
file, err := os.OpenFile(historyFilePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Failed to save operation history: %v", err)
|
t.Errorf("Failed to save operation history: %v", err)
|
||||||
return
|
return
|
||||||
|
@ -451,31 +496,28 @@ func persistOperationHistory(t *testing.T, lg *zap.Logger, path string, operatio
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func persistMemberDataDir(t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, path string) {
|
func persistMemberDataDir(t *testing.T, lg *zap.Logger, member e2e.EtcdProcess, path string) {
|
||||||
for _, member := range clus.Procs {
|
lg.Info("Saving member data dir", zap.String("member", member.Config().Name), zap.String("path", path))
|
||||||
memberDataDir := filepath.Join(path, member.Config().Name)
|
err := os.Rename(member.Config().DataDirPath, path)
|
||||||
err := os.RemoveAll(memberDataDir)
|
if err != nil {
|
||||||
if err != nil {
|
t.Fatal(err)
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
lg.Info("Saving member data dir", zap.String("member", member.Config().Name), zap.String("path", memberDataDir))
|
|
||||||
err = os.Rename(member.Config().DataDirPath, memberDataDir)
|
|
||||||
if err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testResultsDirectory(t *testing.T) (string, error) {
|
func testResultsDirectory(t *testing.T) string {
|
||||||
path, err := filepath.Abs(filepath.Join(resultsDirectory, strings.ReplaceAll(t.Name(), "/", "_")))
|
path, err := filepath.Abs(filepath.Join(resultsDirectory, strings.ReplaceAll(t.Name(), "/", "_")))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return path, err
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
err = os.RemoveAll(path)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
err = os.MkdirAll(path, 0700)
|
err = os.MkdirAll(path, 0700)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return path, err
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
return path, nil
|
return path
|
||||||
}
|
}
|
||||||
|
|
||||||
// forcestopCluster stops the etcd member with signal kill.
|
// forcestopCluster stops the etcd member with signal kill.
|
||||||
|
|
|
@ -16,6 +16,8 @@ package linearizability
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -174,3 +176,37 @@ type watchEvent struct {
|
||||||
Revision int64
|
Revision int64
|
||||||
Time time.Time
|
Time time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func persistWatchResponses(t *testing.T, lg *zap.Logger, path string, responses []watchResponse) {
|
||||||
|
lg.Info("Saving watch responses", zap.String("path", path))
|
||||||
|
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Failed to save watch history: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
encoder := json.NewEncoder(file)
|
||||||
|
for _, resp := range responses {
|
||||||
|
err := encoder.Encode(resp)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Failed to encode response: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func persistWatchEvents(t *testing.T, lg *zap.Logger, path string, events []watchEvent) {
|
||||||
|
lg.Info("Saving watch events", zap.String("path", path))
|
||||||
|
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Failed to save watch history: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
encoder := json.NewEncoder(file)
|
||||||
|
for _, event := range events {
|
||||||
|
err := encoder.Encode(event)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Failed to encode response: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue