tests/robustness: use monotonic clock for watch events
see: https://github.com/etcd-io/etcd/pull/15323 For consistency watch events should also use only time-measurement operations. fixes: https://github.com/etcd-io/etcd/issues/15328 Signed-off-by: Bogdan Kanivets <bkanivets@apple.com>dependabot/go_modules/github.com/prometheus/procfs-0.11.0
parent
14f21a124e
commit
c338882d7a
|
@ -171,6 +171,8 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu
|
||||||
g := errgroup.Group{}
|
g := errgroup.Group{}
|
||||||
finishTraffic := make(chan struct{})
|
finishTraffic := make(chan struct{})
|
||||||
|
|
||||||
|
// baseTime is used to get monotonic clock reading when recording operations/watch events
|
||||||
|
baseTime := time.Now()
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
defer close(finishTraffic)
|
defer close(finishTraffic)
|
||||||
injectFailpoints(ctx, t, lg, clus, s.failpoint)
|
injectFailpoints(ctx, t, lg, clus, s.failpoint)
|
||||||
|
@ -180,12 +182,12 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu
|
||||||
maxRevisionChan := make(chan int64, 1)
|
maxRevisionChan := make(chan int64, 1)
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
defer close(maxRevisionChan)
|
defer close(maxRevisionChan)
|
||||||
operations = traffic.SimulateTraffic(ctx, t, lg, clus, s.traffic, finishTraffic)
|
operations = traffic.SimulateTraffic(ctx, t, lg, clus, s.traffic, finishTraffic, baseTime)
|
||||||
maxRevisionChan <- operationsMaxRevision(operations)
|
maxRevisionChan <- operationsMaxRevision(operations)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch)
|
responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch, baseTime)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
g.Wait()
|
g.Wait()
|
||||||
|
|
|
@ -32,7 +32,10 @@ import (
|
||||||
// clientv3.Client) that records all the requests and responses made. Doesn't
|
// clientv3.Client) that records all the requests and responses made. Doesn't
|
||||||
// allow for concurrent requests to confirm to model.AppendableHistory requirements.
|
// allow for concurrent requests to confirm to model.AppendableHistory requirements.
|
||||||
type RecordingClient struct {
|
type RecordingClient struct {
|
||||||
client clientv3.Client
|
client clientv3.Client
|
||||||
|
// baseTime is needed to achieve monotonic clock reading.
|
||||||
|
// Only time-measuring operations should be used to record time.
|
||||||
|
// see https://github.com/golang/go/blob/master/src/time/time.go#L17
|
||||||
baseTime time.Time
|
baseTime time.Time
|
||||||
// mux ensures order of request appending.
|
// mux ensures order of request appending.
|
||||||
mux sync.Mutex
|
mux sync.Mutex
|
||||||
|
|
|
@ -35,7 +35,7 @@ var (
|
||||||
MultiOpTxnOpCount = 4
|
MultiOpTxnOpCount = 4
|
||||||
)
|
)
|
||||||
|
|
||||||
func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config Config, finish <-chan struct{}) []porcupine.Operation {
|
func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config Config, finish <-chan struct{}, baseTime time.Time) []porcupine.Operation {
|
||||||
mux := sync.Mutex{}
|
mux := sync.Mutex{}
|
||||||
endpoints := clus.EndpointsGRPC()
|
endpoints := clus.EndpointsGRPC()
|
||||||
|
|
||||||
|
@ -45,7 +45,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
|
||||||
limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200)
|
limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200)
|
||||||
|
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
cc, err := NewClient(endpoints, ids, startTime)
|
cc, err := NewClient(endpoints, ids, baseTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -53,7 +53,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
for i := 0; i < config.clientCount; i++ {
|
for i := 0; i < config.clientCount; i++ {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
c, err := NewClient([]string{endpoints[i%len(endpoints)]}, ids, startTime)
|
c, err := NewClient([]string{endpoints[i%len(endpoints)]}, ids, baseTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,7 @@ import (
|
||||||
"go.etcd.io/etcd/tests/v3/robustness/model"
|
"go.etcd.io/etcd/tests/v3/robustness/model"
|
||||||
)
|
)
|
||||||
|
|
||||||
func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64, cfg watchConfig) [][]watchResponse {
|
func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64, cfg watchConfig, baseTime time.Time) [][]watchResponse {
|
||||||
mux := sync.Mutex{}
|
mux := sync.Mutex{}
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
memberResponses := make([][]watchResponse, len(clus.Procs))
|
memberResponses := make([][]watchResponse, len(clus.Procs))
|
||||||
|
@ -52,7 +52,7 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd
|
||||||
go func(i int, c *clientv3.Client) {
|
go func(i int, c *clientv3.Client) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
defer c.Close()
|
defer c.Close()
|
||||||
responses := watchMember(ctx, t, c, memberChan, cfg)
|
responses := watchMember(ctx, t, c, memberChan, cfg, baseTime)
|
||||||
mux.Lock()
|
mux.Lock()
|
||||||
memberResponses[i] = responses
|
memberResponses[i] = responses
|
||||||
mux.Unlock()
|
mux.Unlock()
|
||||||
|
@ -75,7 +75,7 @@ type watchConfig struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// watchMember collects all responses until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed.
|
// watchMember collects all responses until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed.
|
||||||
func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64, cfg watchConfig) (resps []watchResponse) {
|
func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64, cfg watchConfig, baseTime time.Time) (resps []watchResponse) {
|
||||||
var maxRevision int64 = 0
|
var maxRevision int64 = 0
|
||||||
var lastRevision int64 = 0
|
var lastRevision int64 = 0
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
@ -109,7 +109,9 @@ func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevis
|
||||||
c.RequestProgress(ctx)
|
c.RequestProgress(ctx)
|
||||||
}
|
}
|
||||||
if resp.Err() == nil {
|
if resp.Err() == nil {
|
||||||
resps = append(resps, watchResponse{resp, time.Now()})
|
// using time.Since time-measuring operation to get monotonic clock reading
|
||||||
|
// see https://github.com/golang/go/blob/master/src/time/time.go#L17
|
||||||
|
resps = append(resps, watchResponse{resp, time.Since(baseTime)})
|
||||||
} else if !resp.Canceled {
|
} else if !resp.Canceled {
|
||||||
t.Errorf("Watch stream received error, err %v", resp.Err())
|
t.Errorf("Watch stream received error, err %v", resp.Err())
|
||||||
}
|
}
|
||||||
|
@ -254,13 +256,13 @@ func toWatchEvents(responses []watchResponse) (events []watchEvent) {
|
||||||
|
|
||||||
type watchResponse struct {
|
type watchResponse struct {
|
||||||
clientv3.WatchResponse
|
clientv3.WatchResponse
|
||||||
time time.Time
|
time time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type watchEvent struct {
|
type watchEvent struct {
|
||||||
Op model.EtcdOperation
|
Op model.EtcdOperation
|
||||||
Revision int64
|
Revision int64
|
||||||
Time time.Time
|
Time time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func patchOperationBasedOnWatchEvents(operations []porcupine.Operation, watchEvents []watchEvent) []porcupine.Operation {
|
func patchOperationBasedOnWatchEvents(operations []porcupine.Operation, watchEvents []watchEvent) []porcupine.Operation {
|
||||||
|
@ -282,7 +284,7 @@ func patchOperationBasedOnWatchEvents(operations []porcupine.Operation, watchEve
|
||||||
event := matchWatchEvent(request.Txn, persisted)
|
event := matchWatchEvent(request.Txn, persisted)
|
||||||
if event != nil {
|
if event != nil {
|
||||||
// Set revision and time based on watchEvent.
|
// Set revision and time based on watchEvent.
|
||||||
op.Return = event.Time.UnixNano()
|
op.Return = event.Time.Nanoseconds()
|
||||||
op.Output = model.EtcdNonDeterministicResponse{
|
op.Output = model.EtcdNonDeterministicResponse{
|
||||||
EtcdResponse: model.EtcdResponse{Revision: event.Revision},
|
EtcdResponse: model.EtcdResponse{Revision: event.Revision},
|
||||||
ResultUnknown: true,
|
ResultUnknown: true,
|
||||||
|
|
Loading…
Reference in New Issue