From 911c40a3471ad39436cb017373348a4aa30ffcfa Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 10 May 2023 15:50:18 +0200 Subject: [PATCH] tests/robustness: Implement kubernetes list watch protocol Signed-off-by: Marek Siarkowicz --- tests/robustness/linearizability_test.go | 2 +- tests/robustness/report.go | 5 +- tests/robustness/traffic/client.go | 123 ++++++++++++------ tests/robustness/traffic/kubernetes.go | 159 +++++++++++++++++++---- tests/robustness/traffic/traffic.go | 5 +- tests/robustness/watch.go | 36 +++-- 6 files changed, 235 insertions(+), 95 deletions(-) diff --git a/tests/robustness/linearizability_test.go b/tests/robustness/linearizability_test.go index 3c545abdd..a658d524c 100644 --- a/tests/robustness/linearizability_test.go +++ b/tests/robustness/linearizability_test.go @@ -167,7 +167,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce panicked = false } -func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) (operations []porcupine.Operation, responses [][]watchResponse) { +func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) (operations []porcupine.Operation, responses [][]traffic.WatchResponse) { g := errgroup.Group{} finishTraffic := make(chan struct{}) diff --git a/tests/robustness/report.go b/tests/robustness/report.go index f4608671e..aeebf8e6e 100644 --- a/tests/robustness/report.go +++ b/tests/robustness/report.go @@ -25,12 +25,13 @@ import ( "go.uber.org/zap" "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/robustness/traffic" ) type report struct { lg *zap.Logger clus *e2e.EtcdProcessCluster - responses [][]watchResponse + responses [][]traffic.WatchResponse events [][]watchEvent operations []porcupine.Operation patchedOperations []porcupine.Operation @@ -94,7 +95,7 @@ func persistMemberDataDir(t *testing.T, lg *zap.Logger, member e2e.EtcdProcess, } } -func persistWatchResponses(t *testing.T, lg *zap.Logger, path string, responses []watchResponse) { +func persistWatchResponses(t *testing.T, lg *zap.Logger, path string, responses []traffic.WatchResponse) { lg.Info("Saving watch responses", zap.String("path", path)) file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755) if err != nil { diff --git a/tests/robustness/traffic/client.go b/tests/robustness/traffic/client.go index 68910b484..ec47dc08c 100644 --- a/tests/robustness/traffic/client.go +++ b/tests/robustness/traffic/client.go @@ -37,9 +37,17 @@ type RecordingClient struct { // Only time-measuring operations should be used to record time. // see https://github.com/golang/go/blob/master/src/time/time.go#L17 baseTime time.Time + + watchMux sync.Mutex + watchResponses []WatchResponse // mux ensures order of request appending. - mux sync.Mutex - history *model.AppendableHistory + opMux sync.Mutex + operations *model.AppendableHistory +} + +type WatchResponse struct { + clientv3.WatchResponse + Time time.Duration } func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*RecordingClient, error) { @@ -53,9 +61,9 @@ func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (* return nil, err } return &RecordingClient{ - client: *cc, - history: model.NewAppendableHistory(ids), - baseTime: baseTime, + client: *cc, + operations: model.NewAppendableHistory(ids), + baseTime: baseTime, }, nil } @@ -63,77 +71,85 @@ func (c *RecordingClient) Close() error { return c.client.Close() } -func (c *RecordingClient) Operations() model.History { - return c.history.History +func (c *RecordingClient) Report() ClientReport { + return ClientReport{ + Operations: c.operations.History, + Watch: nil, + } +} + +type ClientReport struct { + Operations model.History + Watch []WatchResponse } func (c *RecordingClient) Get(ctx context.Context, key string) (*mvccpb.KeyValue, error) { resp, err := c.Range(ctx, key, false) - if err != nil || len(resp) == 0 { + if err != nil || len(resp.Kvs) == 0 { return nil, err } - if len(resp) == 1 { - return resp[0], err + if len(resp.Kvs) == 1 { + return resp.Kvs[0], err } - panic(fmt.Sprintf("Unexpected response size: %d", len(resp))) + panic(fmt.Sprintf("Unexpected response size: %d", len(resp.Kvs))) } -func (c *RecordingClient) Range(ctx context.Context, key string, withPrefix bool) ([]*mvccpb.KeyValue, error) { +func (c *RecordingClient) Range(ctx context.Context, key string, withPrefix bool) (*clientv3.GetResponse, error) { ops := []clientv3.OpOption{} if withPrefix { ops = append(ops, clientv3.WithPrefix()) } - c.mux.Lock() - defer c.mux.Unlock() + c.opMux.Lock() + defer c.opMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Get(ctx, key, ops...) if err != nil { return nil, err } returnTime := time.Since(c.baseTime) - c.history.AppendRange(key, withPrefix, callTime, returnTime, resp) - return resp.Kvs, nil + c.operations.AppendRange(key, withPrefix, callTime, returnTime, resp) + return resp, nil } func (c *RecordingClient) Put(ctx context.Context, key, value string) error { - c.mux.Lock() - defer c.mux.Unlock() + c.opMux.Lock() + defer c.opMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Put(ctx, key, value) returnTime := time.Since(c.baseTime) - c.history.AppendPut(key, value, callTime, returnTime, resp, err) + c.operations.AppendPut(key, value, callTime, returnTime, resp, err) return err } func (c *RecordingClient) Delete(ctx context.Context, key string) error { - c.mux.Lock() - defer c.mux.Unlock() + c.opMux.Lock() + defer c.opMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Delete(ctx, key) returnTime := time.Since(c.baseTime) - c.history.AppendDelete(key, callTime, returnTime, resp, err) + c.operations.AppendDelete(key, callTime, returnTime, resp, err) return nil } func (c *RecordingClient) CompareRevisionAndDelete(ctx context.Context, key string, expectedRevision int64) error { txn := c.compareRevisionTxn(ctx, key, expectedRevision, clientv3.OpDelete(key)) - c.mux.Lock() - defer c.mux.Unlock() + c.opMux.Lock() + defer c.opMux.Unlock() callTime := time.Since(c.baseTime) resp, err := txn.Commit() returnTime := time.Since(c.baseTime) - c.history.AppendCompareRevisionAndDelete(key, expectedRevision, callTime, returnTime, resp, err) + c.operations.AppendCompareRevisionAndDelete(key, expectedRevision, callTime, returnTime, resp, err) return err } func (c *RecordingClient) CompareRevisionAndPut(ctx context.Context, key, value string, expectedRevision int64) error { txn := c.compareRevisionTxn(ctx, key, expectedRevision, clientv3.OpPut(key, value)) - c.mux.Lock() - defer c.mux.Unlock() + c.opMux.Lock() + defer c.opMux.Unlock() callTime := time.Since(c.baseTime) resp, err := txn.Commit() returnTime := time.Since(c.baseTime) - c.history.AppendCompareRevisionAndPut(key, expectedRevision, value, callTime, returnTime, resp, err) + c.operations.AppendCompareRevisionAndPut(key, expectedRevision, value, callTime, returnTime, resp, err) return err } @@ -158,22 +174,22 @@ func (c *RecordingClient) Txn(ctx context.Context, cmp []clientv3.Cmp, ops []cli ).Then( ops..., ) - c.mux.Lock() - defer c.mux.Unlock() + c.opMux.Lock() + defer c.opMux.Unlock() callTime := time.Since(c.baseTime) resp, err := txn.Commit() returnTime := time.Since(c.baseTime) - c.history.AppendTxn(cmp, ops, callTime, returnTime, resp, err) + c.operations.AppendTxn(cmp, ops, callTime, returnTime, resp, err) return err } func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, error) { - c.mux.Lock() - defer c.mux.Unlock() + c.opMux.Lock() + defer c.opMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Lease.Grant(ctx, ttl) returnTime := time.Since(c.baseTime) - c.history.AppendLeaseGrant(callTime, returnTime, resp, err) + c.operations.AppendLeaseGrant(callTime, returnTime, resp, err) var leaseId int64 if resp != nil { leaseId = int64(resp.ID) @@ -182,32 +198,53 @@ func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, err } func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseId int64) error { - c.mux.Lock() - defer c.mux.Unlock() + c.opMux.Lock() + defer c.opMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Lease.Revoke(ctx, clientv3.LeaseID(leaseId)) returnTime := time.Since(c.baseTime) - c.history.AppendLeaseRevoke(leaseId, callTime, returnTime, resp, err) + c.operations.AppendLeaseRevoke(leaseId, callTime, returnTime, resp, err) return err } func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value string, leaseId int64) error { opts := clientv3.WithLease(clientv3.LeaseID(leaseId)) - c.mux.Lock() - defer c.mux.Unlock() + c.opMux.Lock() + defer c.opMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Put(ctx, key, value, opts) returnTime := time.Since(c.baseTime) - c.history.AppendPutWithLease(key, value, leaseId, callTime, returnTime, resp, err) + c.operations.AppendPutWithLease(key, value, leaseId, callTime, returnTime, resp, err) return err } func (c *RecordingClient) Defragment(ctx context.Context) error { - c.mux.Lock() - defer c.mux.Unlock() + c.opMux.Lock() + defer c.opMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Defragment(ctx, c.client.Endpoints()[0]) returnTime := time.Since(c.baseTime) - c.history.AppendDefragment(callTime, returnTime, resp, err) + c.operations.AppendDefragment(callTime, returnTime, resp, err) return err } + +func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool) clientv3.WatchChan { + ops := []clientv3.OpOption{clientv3.WithProgressNotify()} + if withPrefix { + ops = append(ops, clientv3.WithPrefix()) + } + if rev != 0 { + ops = append(ops, clientv3.WithRev(rev)) + } + respCh := make(chan clientv3.WatchResponse) + go func() { + defer close(respCh) + for r := range c.client.Watch(ctx, key, ops...) { + c.watchMux.Lock() + c.watchResponses = append(c.watchResponses, WatchResponse{r, time.Since(c.baseTime)}) + c.watchMux.Unlock() + respCh <- r + } + }() + return respCh +} diff --git a/tests/robustness/traffic/kubernetes.go b/tests/robustness/traffic/kubernetes.go index 76299afe1..63f440329 100644 --- a/tests/robustness/traffic/kubernetes.go +++ b/tests/robustness/traffic/kubernetes.go @@ -16,12 +16,16 @@ package traffic import ( "context" + "errors" "fmt" "math/rand" + "sync" + "golang.org/x/sync/errgroup" "golang.org/x/time/rate" "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/pkg/v3/stringutil" "go.etcd.io/etcd/tests/v3/robustness/identity" ) @@ -61,42 +65,82 @@ const ( ) func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) { - for { - select { - case <-ctx.Done(): - return - case <-finish: - return - default: + s := newStorage() + keyPrefix := "/registry/" + t.resource + "/" + g := errgroup.Group{} + + g.Go(func() error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-finish: + return nil + default: + } + resp, err := t.Range(ctx, c, keyPrefix, true) + if err != nil { + continue + } + s.Reset(resp) + limiter.Wait(ctx) + watchCtx, cancel := context.WithTimeout(ctx, WatchTimeout) + for e := range c.Watch(watchCtx, keyPrefix, resp.Header.Revision, true) { + s.Update(e) + } + cancel() } - objects, err := t.Range(ctx, c, "/registry/"+t.resource+"/", true) - if err != nil { - continue + }) + g.Go(func() error { + lastWriteFailed := false + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-finish: + return nil + default: + } + // Avoid multiple failed writes in a row + if lastWriteFailed { + resp, err := t.Range(ctx, c, keyPrefix, true) + if err != nil { + continue + } + s.Reset(resp) + limiter.Wait(ctx) + } + err := t.Write(ctx, c, ids, s) + lastWriteFailed = err != nil + if err != nil { + continue + } + limiter.Wait(ctx) } - limiter.Wait(ctx) - err = t.Write(ctx, c, ids, objects) - if err != nil { - continue - } - limiter.Wait(ctx) - } + }) + g.Wait() } -func (t kubernetesTraffic) Write(ctx context.Context, c *RecordingClient, ids identity.Provider, objects []*mvccpb.KeyValue) (err error) { +func (t kubernetesTraffic) Write(ctx context.Context, c *RecordingClient, ids identity.Provider, s *storage) (err error) { writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout) - if len(objects) < t.averageKeyCount/2 { + defer cancel() + count := s.Count() + if count < t.averageKeyCount/2 { err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId())) } else { - randomPod := objects[rand.Intn(len(objects))] - if len(objects) > t.averageKeyCount*3/2 { - err = t.Delete(writeCtx, c, string(randomPod.Key), randomPod.ModRevision) + key, rev := s.PickRandom() + if rev == 0 { + return errors.New("storage empty") + } + if count > t.averageKeyCount*3/2 { + err = t.Delete(writeCtx, c, key, rev) } else { - op := KubernetesRequestType(pickRandom(t.writeChoices)) + op := pickRandom(t.writeChoices) switch op { case KubernetesDelete: - err = t.Delete(writeCtx, c, string(randomPod.Key), randomPod.ModRevision) + err = t.Delete(writeCtx, c, key, rev) case KubernetesUpdate: - err = t.Update(writeCtx, c, string(randomPod.Key), fmt.Sprintf("%d", ids.NewRequestId()), randomPod.ModRevision) + err = t.Update(writeCtx, c, key, fmt.Sprintf("%d", ids.NewRequestId()), rev) case KubernetesCreate: err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId())) default: @@ -104,7 +148,6 @@ func (t kubernetesTraffic) Write(ctx context.Context, c *RecordingClient, ids id } } } - cancel() return err } @@ -112,7 +155,7 @@ func (t kubernetesTraffic) generateKey() string { return fmt.Sprintf("/registry/%s/%s/%s", t.resource, t.namespace, stringutil.RandString(5)) } -func (t kubernetesTraffic) Range(ctx context.Context, c *RecordingClient, key string, withPrefix bool) ([]*mvccpb.KeyValue, error) { +func (t kubernetesTraffic) Range(ctx context.Context, c *RecordingClient, key string, withPrefix bool) (*clientv3.GetResponse, error) { ctx, cancel := context.WithTimeout(ctx, RequestTimeout) resp, err := c.Range(ctx, key, withPrefix) cancel() @@ -136,3 +179,65 @@ func (t kubernetesTraffic) Delete(ctx context.Context, c *RecordingClient, key s cancel() return err } + +type storage struct { + mux sync.RWMutex + keyRevision map[string]int64 + revision int64 +} + +func newStorage() *storage { + return &storage{ + keyRevision: map[string]int64{}, + } +} + +func (s *storage) Update(resp clientv3.WatchResponse) { + s.mux.Lock() + defer s.mux.Unlock() + for _, e := range resp.Events { + if e.Kv.ModRevision < s.revision { + continue + } + s.revision = e.Kv.ModRevision + switch e.Type { + case mvccpb.PUT: + s.keyRevision[string(e.Kv.Key)] = e.Kv.ModRevision + case mvccpb.DELETE: + delete(s.keyRevision, string(e.Kv.Key)) + } + } +} + +func (s *storage) Reset(resp *clientv3.GetResponse) { + s.mux.Lock() + defer s.mux.Unlock() + if resp.Header.Revision <= s.revision { + return + } + s.keyRevision = make(map[string]int64, len(resp.Kvs)) + for _, kv := range resp.Kvs { + s.keyRevision[string(kv.Key)] = kv.ModRevision + } + s.revision = resp.Header.Revision +} + +func (s *storage) Count() int { + s.mux.RLock() + defer s.mux.RUnlock() + return len(s.keyRevision) +} + +func (s *storage) PickRandom() (key string, rev int64) { + s.mux.RLock() + defer s.mux.RUnlock() + n := rand.Intn(len(s.keyRevision)) + i := 0 + for k, v := range s.keyRevision { + if i == n { + return k, v + } + i++ + } + return "", 0 +} diff --git a/tests/robustness/traffic/traffic.go b/tests/robustness/traffic/traffic.go index 98c20a972..f476dee20 100644 --- a/tests/robustness/traffic/traffic.go +++ b/tests/robustness/traffic/traffic.go @@ -32,6 +32,7 @@ import ( var ( DefaultLeaseTTL int64 = 7200 RequestTimeout = 40 * time.Millisecond + WatchTimeout = 400 * time.Millisecond MultiOpTxnOpCount = 4 ) @@ -63,7 +64,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 config.traffic.Run(ctx, clientId, c, limiter, ids, lm, finish) mux.Lock() - h = h.Merge(c.Operations()) + h = h.Merge(c.operations.History) mux.Unlock() }(c, i) } @@ -76,7 +77,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 if err != nil { t.Error(err) } - h = h.Merge(cc.Operations()) + h = h.Merge(cc.operations.History) operations := h.Operations() lg.Info("Recorded operations", zap.Int("count", len(operations))) diff --git a/tests/robustness/watch.go b/tests/robustness/watch.go index 40c9c0621..316b42fa6 100644 --- a/tests/robustness/watch.go +++ b/tests/robustness/watch.go @@ -29,12 +29,13 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/robustness/model" + "go.etcd.io/etcd/tests/v3/robustness/traffic" ) -func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64, cfg watchConfig, baseTime time.Time) [][]watchResponse { +func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64, cfg watchConfig, baseTime time.Time) [][]traffic.WatchResponse { mux := sync.Mutex{} var wg sync.WaitGroup - memberResponses := make([][]watchResponse, len(clus.Procs)) + memberResponses := make([][]traffic.WatchResponse, len(clus.Procs)) memberMaxRevisionChans := make([]chan int64, len(clus.Procs)) for i, member := range clus.Procs { c, err := clientv3.New(clientv3.Config{ @@ -75,7 +76,7 @@ type watchConfig struct { } // watchMember collects all responses until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed. -func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64, cfg watchConfig, baseTime time.Time) (resps []watchResponse) { +func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64, cfg watchConfig, baseTime time.Time) (resps []traffic.WatchResponse) { var maxRevision int64 = 0 var lastRevision int64 = 0 ctx, cancel := context.WithCancel(ctx) @@ -111,7 +112,7 @@ func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevis if resp.Err() == nil { // using time.Since time-measuring operation to get monotonic clock reading // see https://github.com/golang/go/blob/master/src/time/time.go#L17 - resps = append(resps, watchResponse{resp, time.Since(baseTime)}) + resps = append(resps, traffic.WatchResponse{WatchResponse: resp, Time: time.Since(baseTime)}) } else if !resp.Canceled { t.Errorf("Watch stream received error, err %v", resp.Err()) } @@ -126,7 +127,7 @@ func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevis } } -func watchResponsesMaxRevision(responses []watchResponse) int64 { +func watchResponsesMaxRevision(responses []traffic.WatchResponse) int64 { var maxRevision int64 for _, response := range responses { for _, event := range response.Events { @@ -138,13 +139,13 @@ func watchResponsesMaxRevision(responses []watchResponse) int64 { return maxRevision } -func validateWatchResponses(t *testing.T, clus *e2e.EtcdProcessCluster, responses [][]watchResponse, expectProgressNotify bool) { +func validateWatchResponses(t *testing.T, clus *e2e.EtcdProcessCluster, responses [][]traffic.WatchResponse, expectProgressNotify bool) { for i, member := range clus.Procs { validateMemberWatchResponses(t, member.Config().Name, responses[i], expectProgressNotify) } } -func validateMemberWatchResponses(t *testing.T, memberId string, responses []watchResponse, expectProgressNotify bool) { +func validateMemberWatchResponses(t *testing.T, memberId string, responses []traffic.WatchResponse, expectProgressNotify bool) { // Validate watch is correctly configured to ensure proper testing validateGotAtLeastOneProgressNotify(t, memberId, responses, expectProgressNotify) @@ -156,7 +157,7 @@ func validateMemberWatchResponses(t *testing.T, memberId string, responses []wat validateRenewable(t, memberId, responses) } -func validateGotAtLeastOneProgressNotify(t *testing.T, memberId string, responses []watchResponse, expectProgressNotify bool) { +func validateGotAtLeastOneProgressNotify(t *testing.T, memberId string, responses []traffic.WatchResponse, expectProgressNotify bool) { var gotProgressNotify = false var lastHeadRevision int64 = 1 for _, resp := range responses { @@ -171,7 +172,7 @@ func validateGotAtLeastOneProgressNotify(t *testing.T, memberId string, response } } -func validateRenewable(t *testing.T, memberId string, responses []watchResponse) { +func validateRenewable(t *testing.T, memberId string, responses []traffic.WatchResponse) { var lastProgressNotifyRevision int64 = 0 for _, resp := range responses { for _, event := range resp.Events { @@ -185,7 +186,7 @@ func validateRenewable(t *testing.T, memberId string, responses []watchResponse) } } -func validateOrderedAndReliable(t *testing.T, memberId string, responses []watchResponse) { +func validateOrderedAndReliable(t *testing.T, memberId string, responses []traffic.WatchResponse) { var lastEventRevision int64 = 1 for _, resp := range responses { for _, event := range resp.Events { @@ -201,7 +202,7 @@ func validateOrderedAndReliable(t *testing.T, memberId string, responses []watch } } -func validateUnique(t *testing.T, memberId string, responses []watchResponse) { +func validateUnique(t *testing.T, memberId string, responses []traffic.WatchResponse) { type revisionKey struct { revision int64 key string @@ -218,7 +219,7 @@ func validateUnique(t *testing.T, memberId string, responses []watchResponse) { } } -func validateAtomic(t *testing.T, memberId string, responses []watchResponse) { +func validateAtomic(t *testing.T, memberId string, responses []traffic.WatchResponse) { var lastEventRevision int64 = 1 for _, resp := range responses { if len(resp.Events) > 0 { @@ -230,7 +231,7 @@ func validateAtomic(t *testing.T, memberId string, responses []watchResponse) { } } -func toWatchEvents(responses []watchResponse) (events []watchEvent) { +func toWatchEvents(responses []traffic.WatchResponse) (events []watchEvent) { for _, resp := range responses { for _, event := range resp.Events { var op model.OperationType @@ -241,7 +242,7 @@ func toWatchEvents(responses []watchResponse) (events []watchEvent) { op = model.Delete } events = append(events, watchEvent{ - Time: resp.time, + Time: resp.Time, Revision: event.Kv.ModRevision, Op: model.EtcdOperation{ Type: op, @@ -254,11 +255,6 @@ func toWatchEvents(responses []watchResponse) (events []watchEvent) { return events } -type watchResponse struct { - clientv3.WatchResponse - time time.Duration -} - type watchEvent struct { Op model.EtcdOperation Revision int64 @@ -354,7 +350,7 @@ func hasUniqueWriteOperation(request *model.TxnRequest) bool { return false } -func watchEvents(responses [][]watchResponse) [][]watchEvent { +func watchEvents(responses [][]traffic.WatchResponse) [][]watchEvent { ops := make([][]watchEvent, len(responses)) for i, resps := range responses { ops[i] = toWatchEvents(resps)