From c338882d7af1baf0f36cf902c1d4b283e5cdf68f Mon Sep 17 00:00:00 2001 From: Bogdan Kanivets Date: Sun, 14 May 2023 12:57:59 -0700 Subject: [PATCH] tests/robustness: use monotonic clock for watch events see: https://github.com/etcd-io/etcd/pull/15323 For consistency watch events should also use only time-measurement operations. fixes: https://github.com/etcd-io/etcd/issues/15328 Signed-off-by: Bogdan Kanivets --- tests/robustness/linearizability_test.go | 6 ++++-- tests/robustness/traffic/client.go | 5 ++++- tests/robustness/traffic/traffic.go | 6 +++--- tests/robustness/watch.go | 16 +++++++++------- 4 files changed, 20 insertions(+), 13 deletions(-) diff --git a/tests/robustness/linearizability_test.go b/tests/robustness/linearizability_test.go index f38ec3960..3c545abdd 100644 --- a/tests/robustness/linearizability_test.go +++ b/tests/robustness/linearizability_test.go @@ -171,6 +171,8 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu g := errgroup.Group{} finishTraffic := make(chan struct{}) + // baseTime is used to get monotonic clock reading when recording operations/watch events + baseTime := time.Now() g.Go(func() error { defer close(finishTraffic) injectFailpoints(ctx, t, lg, clus, s.failpoint) @@ -180,12 +182,12 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu maxRevisionChan := make(chan int64, 1) g.Go(func() error { defer close(maxRevisionChan) - operations = traffic.SimulateTraffic(ctx, t, lg, clus, s.traffic, finishTraffic) + operations = traffic.SimulateTraffic(ctx, t, lg, clus, s.traffic, finishTraffic, baseTime) maxRevisionChan <- operationsMaxRevision(operations) return nil }) g.Go(func() error { - responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch) + responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch, baseTime) return nil }) g.Wait() diff --git a/tests/robustness/traffic/client.go b/tests/robustness/traffic/client.go index 95a314c52..68910b484 100644 --- a/tests/robustness/traffic/client.go +++ b/tests/robustness/traffic/client.go @@ -32,7 +32,10 @@ import ( // clientv3.Client) that records all the requests and responses made. Doesn't // allow for concurrent requests to confirm to model.AppendableHistory requirements. type RecordingClient struct { - client clientv3.Client + client clientv3.Client + // baseTime is needed to achieve monotonic clock reading. + // Only time-measuring operations should be used to record time. + // see https://github.com/golang/go/blob/master/src/time/time.go#L17 baseTime time.Time // mux ensures order of request appending. mux sync.Mutex diff --git a/tests/robustness/traffic/traffic.go b/tests/robustness/traffic/traffic.go index c5fb93663..98c20a972 100644 --- a/tests/robustness/traffic/traffic.go +++ b/tests/robustness/traffic/traffic.go @@ -35,7 +35,7 @@ var ( MultiOpTxnOpCount = 4 ) -func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config Config, finish <-chan struct{}) []porcupine.Operation { +func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config Config, finish <-chan struct{}, baseTime time.Time) []porcupine.Operation { mux := sync.Mutex{} endpoints := clus.EndpointsGRPC() @@ -45,7 +45,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200) startTime := time.Now() - cc, err := NewClient(endpoints, ids, startTime) + cc, err := NewClient(endpoints, ids, baseTime) if err != nil { t.Fatal(err) } @@ -53,7 +53,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 wg := sync.WaitGroup{} for i := 0; i < config.clientCount; i++ { wg.Add(1) - c, err := NewClient([]string{endpoints[i%len(endpoints)]}, ids, startTime) + c, err := NewClient([]string{endpoints[i%len(endpoints)]}, ids, baseTime) if err != nil { t.Fatal(err) } diff --git a/tests/robustness/watch.go b/tests/robustness/watch.go index 5cfe7a4a8..40c9c0621 100644 --- a/tests/robustness/watch.go +++ b/tests/robustness/watch.go @@ -31,7 +31,7 @@ import ( "go.etcd.io/etcd/tests/v3/robustness/model" ) -func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64, cfg watchConfig) [][]watchResponse { +func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64, cfg watchConfig, baseTime time.Time) [][]watchResponse { mux := sync.Mutex{} var wg sync.WaitGroup memberResponses := make([][]watchResponse, len(clus.Procs)) @@ -52,7 +52,7 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd go func(i int, c *clientv3.Client) { defer wg.Done() defer c.Close() - responses := watchMember(ctx, t, c, memberChan, cfg) + responses := watchMember(ctx, t, c, memberChan, cfg, baseTime) mux.Lock() memberResponses[i] = responses mux.Unlock() @@ -75,7 +75,7 @@ type watchConfig struct { } // watchMember collects all responses until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed. -func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64, cfg watchConfig) (resps []watchResponse) { +func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64, cfg watchConfig, baseTime time.Time) (resps []watchResponse) { var maxRevision int64 = 0 var lastRevision int64 = 0 ctx, cancel := context.WithCancel(ctx) @@ -109,7 +109,9 @@ func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevis c.RequestProgress(ctx) } if resp.Err() == nil { - resps = append(resps, watchResponse{resp, time.Now()}) + // using time.Since time-measuring operation to get monotonic clock reading + // see https://github.com/golang/go/blob/master/src/time/time.go#L17 + resps = append(resps, watchResponse{resp, time.Since(baseTime)}) } else if !resp.Canceled { t.Errorf("Watch stream received error, err %v", resp.Err()) } @@ -254,13 +256,13 @@ func toWatchEvents(responses []watchResponse) (events []watchEvent) { type watchResponse struct { clientv3.WatchResponse - time time.Time + time time.Duration } type watchEvent struct { Op model.EtcdOperation Revision int64 - Time time.Time + Time time.Duration } func patchOperationBasedOnWatchEvents(operations []porcupine.Operation, watchEvents []watchEvent) []porcupine.Operation { @@ -282,7 +284,7 @@ func patchOperationBasedOnWatchEvents(operations []porcupine.Operation, watchEve event := matchWatchEvent(request.Txn, persisted) if event != nil { // Set revision and time based on watchEvent. - op.Return = event.Time.UnixNano() + op.Return = event.Time.Nanoseconds() op.Output = model.EtcdNonDeterministicResponse{ EtcdResponse: model.EtcdResponse{Revision: event.Revision}, ResultUnknown: true,