tests: Propagage logger through linearizability tests
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>dependabot/go_modules/go.uber.org/atomic-1.10.0
parent
a8b6b05564
commit
0c004a6ce4
|
@ -78,14 +78,14 @@ var (
|
|||
)
|
||||
|
||||
type Failpoint interface {
|
||||
Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error
|
||||
Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error
|
||||
Name() string
|
||||
Available(e2e.EtcdProcess) bool
|
||||
}
|
||||
|
||||
type killFailpoint struct{}
|
||||
|
||||
func (f killFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
|
||||
func (f killFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
|
||||
member := clus.Procs[rand.Int()%len(clus.Procs)]
|
||||
|
||||
killCtx, cancel := context.WithTimeout(ctx, triggerTimeout)
|
||||
|
@ -93,10 +93,11 @@ func (f killFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.Etcd
|
|||
for member.IsRunning() {
|
||||
err := member.Kill()
|
||||
if err != nil {
|
||||
t.Logf("sending kill signal failed: %v", err)
|
||||
lg.Info("Sending kill signal failed", zap.Error(err))
|
||||
}
|
||||
err = member.Wait(killCtx)
|
||||
if err != nil && !strings.Contains(err.Error(), "unexpected exit code") {
|
||||
lg.Info("Failed to kill the process", zap.Error(err))
|
||||
return fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err)
|
||||
}
|
||||
}
|
||||
|
@ -129,35 +130,36 @@ const (
|
|||
Leader failpointTarget = "Leader"
|
||||
)
|
||||
|
||||
func (f goPanicFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
|
||||
func (f goPanicFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
|
||||
member := f.pickMember(t, clus)
|
||||
|
||||
triggerCtx, cancel := context.WithTimeout(ctx, triggerTimeout)
|
||||
defer cancel()
|
||||
|
||||
for member.IsRunning() {
|
||||
t.Logf("setting up gofailpoint %q", f.Name())
|
||||
lg.Info("Setting up gofailpoint", zap.String("failpoint", f.Name()))
|
||||
err := member.Failpoints().Setup(triggerCtx, f.failpoint, "panic")
|
||||
if err != nil {
|
||||
t.Logf("gofailpoint setup failed: %v", err)
|
||||
lg.Info("goFailpoint setup failed", zap.String("failpoint", f.Name()), zap.Error(err))
|
||||
}
|
||||
if !member.IsRunning() {
|
||||
// TODO: Check member logs that etcd not running is caused panic caused by proper gofailpoint.
|
||||
break
|
||||
}
|
||||
if f.trigger != nil {
|
||||
t.Logf("triggering gofailpoint")
|
||||
lg.Info("Triggering gofailpoint", zap.String("failpoint", f.Name()))
|
||||
err = f.trigger(triggerCtx, member)
|
||||
if err != nil {
|
||||
t.Logf("triggering gofailpoint failed: %v", err)
|
||||
lg.Info("gofailpoint trigger failed", zap.String("failpoint", f.Name()), zap.Error(err))
|
||||
}
|
||||
}
|
||||
t.Logf("waiting for process to exist")
|
||||
lg.Info("Waiting for member to exist", zap.String("member", member.Config().Name))
|
||||
err = member.Wait(triggerCtx)
|
||||
if err != nil && !strings.Contains(err.Error(), "unexpected exit code") {
|
||||
return fmt.Errorf("failed to trigger a gofailpoint within %s, err: %w", triggerTimeout, err)
|
||||
lg.Info("Member didn't exit as expected", zap.String("member", member.Config().Name), zap.Error(err))
|
||||
return fmt.Errorf("member didn't exit as expected: %v", err)
|
||||
}
|
||||
t.Logf("process existed")
|
||||
lg.Info("Member existed as expected", zap.String("member", member.Config().Name))
|
||||
}
|
||||
|
||||
err := member.Start(ctx)
|
||||
|
@ -236,7 +238,7 @@ type randomFailpoint struct {
|
|||
failpoints []Failpoint
|
||||
}
|
||||
|
||||
func (f randomFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
|
||||
func (f randomFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
|
||||
availableFailpoints := make([]Failpoint, 0, len(f.failpoints))
|
||||
for _, failpoint := range f.failpoints {
|
||||
count := 0
|
||||
|
@ -250,8 +252,8 @@ func (f randomFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.Et
|
|||
}
|
||||
}
|
||||
failpoint := availableFailpoints[rand.Int()%len(availableFailpoints)]
|
||||
t.Logf("Triggering %v failpoint\n", failpoint.Name())
|
||||
return failpoint.Trigger(t, ctx, clus)
|
||||
lg.Info("Triggering failpoint\n", zap.String("failpoint", failpoint.Name()))
|
||||
return failpoint.Trigger(ctx, t, lg, clus)
|
||||
}
|
||||
|
||||
func (f randomFailpoint) Name() string {
|
||||
|
@ -266,15 +268,15 @@ type blackholePeerNetworkFailpoint struct {
|
|||
duration time.Duration
|
||||
}
|
||||
|
||||
func (f blackholePeerNetworkFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
|
||||
func (f blackholePeerNetworkFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
|
||||
member := clus.Procs[rand.Int()%len(clus.Procs)]
|
||||
proxy := member.PeerProxy()
|
||||
|
||||
proxy.BlackholeTx()
|
||||
proxy.BlackholeRx()
|
||||
t.Logf("Blackholing traffic from and to %s", member.Config().Name)
|
||||
lg.Info("Blackholing traffic from and to member", zap.String("member", member.Config().Name))
|
||||
time.Sleep(f.duration)
|
||||
t.Logf("Traffic restored for %s", member.Config().Name)
|
||||
lg.Info("Traffic restored from and to member", zap.String("member", member.Config().Name))
|
||||
proxy.UnblackholeTx()
|
||||
proxy.UnblackholeRx()
|
||||
return nil
|
||||
|
@ -294,15 +296,15 @@ type delayPeerNetworkFailpoint struct {
|
|||
randomizedLatency time.Duration
|
||||
}
|
||||
|
||||
func (f delayPeerNetworkFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
|
||||
func (f delayPeerNetworkFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
|
||||
member := clus.Procs[rand.Int()%len(clus.Procs)]
|
||||
proxy := member.PeerProxy()
|
||||
|
||||
proxy.DelayRx(f.baseLatency, f.randomizedLatency)
|
||||
proxy.DelayTx(f.baseLatency, f.randomizedLatency)
|
||||
t.Logf("Delaying traffic from and to %s by %v +/- %v", member.Config().Name, f.baseLatency, f.randomizedLatency)
|
||||
lg.Info("Delaying traffic from and to member", zap.String("member", member.Config().Name), zap.Duration("baseLatency", f.baseLatency), zap.Duration("randomizedLatency", f.randomizedLatency))
|
||||
time.Sleep(f.duration)
|
||||
t.Logf("Traffic delay removed for %s", member.Config().Name)
|
||||
lg.Info("Traffic delay removed", zap.String("member", member.Config().Name))
|
||||
proxy.UndelayRx()
|
||||
proxy.UndelayTx()
|
||||
return nil
|
||||
|
|
|
@ -28,6 +28,8 @@ import (
|
|||
"github.com/anishathalye/porcupine"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
|
@ -150,13 +152,15 @@ func TestLinearizability(t *testing.T) {
|
|||
}
|
||||
|
||||
t.Run(scenario.name, func(t *testing.T) {
|
||||
lg := zaptest.NewLogger(t)
|
||||
scenario.config.Logger = lg
|
||||
ctx := context.Background()
|
||||
clus, err := e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&scenario.config))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer clus.Close()
|
||||
operations, events := testLinearizability(ctx, t, clus, FailpointConfig{
|
||||
operations, events := testLinearizability(ctx, t, lg, clus, FailpointConfig{
|
||||
failpoint: scenario.failpoint,
|
||||
count: 1,
|
||||
retries: 3,
|
||||
|
@ -166,30 +170,30 @@ func TestLinearizability(t *testing.T) {
|
|||
longestHistory, remainingEvents := pickLongestHistory(events)
|
||||
validateEventsMatch(t, longestHistory, remainingEvents)
|
||||
operations = patchOperationBasedOnWatchEvents(operations, longestHistory)
|
||||
checkOperationsAndPersistResults(t, operations, clus)
|
||||
checkOperationsAndPersistResults(t, lg, operations, clus)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testLinearizability(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, failpoint FailpointConfig, traffic trafficConfig) (operations []porcupine.Operation, events [][]watchEvent) {
|
||||
func testLinearizability(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint FailpointConfig, traffic trafficConfig) (operations []porcupine.Operation, events [][]watchEvent) {
|
||||
// Run multiple test components (traffic, failpoints, etc) in parallel and use canceling context to propagate stop signal.
|
||||
g := errgroup.Group{}
|
||||
trafficCtx, trafficCancel := context.WithCancel(ctx)
|
||||
g.Go(func() error {
|
||||
triggerFailpoints(ctx, t, clus, failpoint)
|
||||
triggerFailpoints(ctx, t, lg, clus, failpoint)
|
||||
time.Sleep(time.Second)
|
||||
trafficCancel()
|
||||
return nil
|
||||
})
|
||||
watchCtx, watchCancel := context.WithCancel(ctx)
|
||||
g.Go(func() error {
|
||||
operations = simulateTraffic(trafficCtx, t, clus, traffic)
|
||||
operations = simulateTraffic(trafficCtx, t, lg, clus, traffic)
|
||||
time.Sleep(time.Second)
|
||||
watchCancel()
|
||||
return nil
|
||||
})
|
||||
g.Go(func() error {
|
||||
events = collectClusterWatchEvents(watchCtx, t, clus)
|
||||
events = collectClusterWatchEvents(watchCtx, t, lg, clus)
|
||||
return nil
|
||||
})
|
||||
g.Wait()
|
||||
|
@ -285,7 +289,7 @@ func hasUniqueWriteOperation(request *model.TxnRequest) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func triggerFailpoints(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config FailpointConfig) {
|
||||
func triggerFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config FailpointConfig) {
|
||||
var err error
|
||||
successes := 0
|
||||
failures := 0
|
||||
|
@ -297,9 +301,10 @@ func triggerFailpoints(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessC
|
|||
}
|
||||
for successes < config.count && failures < config.retries {
|
||||
time.Sleep(config.waitBetweenTriggers)
|
||||
err = config.failpoint.Trigger(t, ctx, clus)
|
||||
lg.Info("Triggering failpoint\n", zap.String("failpoint", config.failpoint.Name()))
|
||||
err = config.failpoint.Trigger(ctx, t, lg, clus)
|
||||
if err != nil {
|
||||
t.Logf("Failed to trigger failpoint %q, err: %v\n", config.failpoint.Name(), err)
|
||||
lg.Info("Failed to trigger failpoint", zap.String("failpoint", config.failpoint.Name()), zap.Error(err))
|
||||
failures++
|
||||
continue
|
||||
}
|
||||
|
@ -317,7 +322,7 @@ type FailpointConfig struct {
|
|||
waitBetweenTriggers time.Duration
|
||||
}
|
||||
|
||||
func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config trafficConfig) []porcupine.Operation {
|
||||
func simulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config trafficConfig) []porcupine.Operation {
|
||||
mux := sync.Mutex{}
|
||||
endpoints := clus.EndpointsV3()
|
||||
|
||||
|
@ -348,10 +353,10 @@ func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessClu
|
|||
wg.Wait()
|
||||
endTime := time.Now()
|
||||
operations := h.Operations()
|
||||
t.Logf("Recorded %d operations", len(operations))
|
||||
lg.Info("Recorded operations", zap.Int("count", len(operations)))
|
||||
|
||||
qps := float64(len(operations)) / float64(endTime.Sub(startTime)) * float64(time.Second)
|
||||
t.Logf("Average traffic: %f qps", qps)
|
||||
lg.Info("Average traffic", zap.Float64("qps", qps))
|
||||
if qps < config.minimalQPS {
|
||||
t.Errorf("Requiring minimal %f qps for test results to be reliable, got %f qps", config.minimalQPS, qps)
|
||||
}
|
||||
|
@ -383,7 +388,7 @@ func validateEventsMatch(t *testing.T, longestHistory []watchEvent, other [][]wa
|
|||
}
|
||||
}
|
||||
|
||||
func checkOperationsAndPersistResults(t *testing.T, operations []porcupine.Operation, clus *e2e.EtcdProcessCluster) {
|
||||
func checkOperationsAndPersistResults(t *testing.T, lg *zap.Logger, operations []porcupine.Operation, clus *e2e.EtcdProcessCluster) {
|
||||
path, err := testResultsDirectory(t)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
|
@ -397,21 +402,21 @@ func checkOperationsAndPersistResults(t *testing.T, operations []porcupine.Opera
|
|||
t.Error("Linearization timed out")
|
||||
}
|
||||
if linearizable != porcupine.Ok {
|
||||
persistOperationHistory(t, path, operations)
|
||||
persistMemberDataDir(t, clus, path)
|
||||
persistOperationHistory(t, lg, path, operations)
|
||||
persistMemberDataDir(t, lg, clus, path)
|
||||
}
|
||||
|
||||
visualizationPath := filepath.Join(path, "history.html")
|
||||
t.Logf("saving visualization to %q", visualizationPath)
|
||||
lg.Info("Saving visualization", zap.String("path", visualizationPath))
|
||||
err = porcupine.VisualizePath(model.Etcd, info, visualizationPath)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to visualize, err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func persistOperationHistory(t *testing.T, path string, operations []porcupine.Operation) {
|
||||
func persistOperationHistory(t *testing.T, lg *zap.Logger, path string, operations []porcupine.Operation) {
|
||||
historyFilePath := filepath.Join(path, "history.json")
|
||||
t.Logf("saving operation history to %q", historyFilePath)
|
||||
lg.Info("Saving operation history", zap.String("path", historyFilePath))
|
||||
file, err := os.OpenFile(historyFilePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to save operation history: %v", err)
|
||||
|
@ -427,14 +432,14 @@ func persistOperationHistory(t *testing.T, path string, operations []porcupine.O
|
|||
}
|
||||
}
|
||||
|
||||
func persistMemberDataDir(t *testing.T, clus *e2e.EtcdProcessCluster, path string) {
|
||||
func persistMemberDataDir(t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, path string) {
|
||||
for _, member := range clus.Procs {
|
||||
memberDataDir := filepath.Join(path, member.Config().Name)
|
||||
err := os.RemoveAll(memberDataDir)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
t.Logf("saving %s data dir to %q", member.Config().Name, memberDataDir)
|
||||
lg.Info("Saving member data dir", zap.String("member", member.Config().Name), zap.String("path", memberDataDir))
|
||||
err = os.Rename(member.Config().DataDirPath, memberDataDir)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
|
|
|
@ -28,7 +28,7 @@ import (
|
|||
"go.etcd.io/etcd/tests/v3/linearizability/model"
|
||||
)
|
||||
|
||||
func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster) [][]watchEvent {
|
||||
func collectClusterWatchEvents(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) [][]watchEvent {
|
||||
mux := sync.Mutex{}
|
||||
var wg sync.WaitGroup
|
||||
memberEvents := make([][]watchEvent, len(clus.Procs))
|
||||
|
@ -47,7 +47,7 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd
|
|||
go func(i int, c *clientv3.Client) {
|
||||
defer wg.Done()
|
||||
defer c.Close()
|
||||
events := collectMemberWatchEvents(ctx, t, c)
|
||||
events := collectMemberWatchEvents(ctx, lg, c)
|
||||
mux.Lock()
|
||||
memberEvents[i] = events
|
||||
mux.Unlock()
|
||||
|
@ -57,7 +57,7 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd
|
|||
return memberEvents
|
||||
}
|
||||
|
||||
func collectMemberWatchEvents(ctx context.Context, t *testing.T, c *clientv3.Client) []watchEvent {
|
||||
func collectMemberWatchEvents(ctx context.Context, lg *zap.Logger, c *clientv3.Client) []watchEvent {
|
||||
events := []watchEvent{}
|
||||
var lastRevision int64 = 1
|
||||
for {
|
||||
|
@ -88,7 +88,7 @@ func collectMemberWatchEvents(ctx context.Context, t *testing.T, c *clientv3.Cli
|
|||
})
|
||||
}
|
||||
if resp.Err() != nil {
|
||||
t.Logf("Watch error: %v", resp.Err())
|
||||
lg.Info("Watch error", zap.Error(resp.Err()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue