tests/robustness: Implement kubernetes list watch protocol

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
dependabot/go_modules/github.com/prometheus/procfs-0.11.0
Marek Siarkowicz 2023-05-10 15:50:18 +02:00
parent 05ed91d76d
commit 911c40a347
6 changed files with 235 additions and 95 deletions

View File

@ -167,7 +167,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce
panicked = false
}
func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) (operations []porcupine.Operation, responses [][]watchResponse) {
func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) (operations []porcupine.Operation, responses [][]traffic.WatchResponse) {
g := errgroup.Group{}
finishTraffic := make(chan struct{})

View File

@ -25,12 +25,13 @@ import (
"go.uber.org/zap"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/traffic"
)
type report struct {
lg *zap.Logger
clus *e2e.EtcdProcessCluster
responses [][]watchResponse
responses [][]traffic.WatchResponse
events [][]watchEvent
operations []porcupine.Operation
patchedOperations []porcupine.Operation
@ -94,7 +95,7 @@ func persistMemberDataDir(t *testing.T, lg *zap.Logger, member e2e.EtcdProcess,
}
}
func persistWatchResponses(t *testing.T, lg *zap.Logger, path string, responses []watchResponse) {
func persistWatchResponses(t *testing.T, lg *zap.Logger, path string, responses []traffic.WatchResponse) {
lg.Info("Saving watch responses", zap.String("path", path))
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
if err != nil {

View File

@ -37,9 +37,17 @@ type RecordingClient struct {
// Only time-measuring operations should be used to record time.
// see https://github.com/golang/go/blob/master/src/time/time.go#L17
baseTime time.Time
watchMux sync.Mutex
watchResponses []WatchResponse
// mux ensures order of request appending.
mux sync.Mutex
history *model.AppendableHistory
opMux sync.Mutex
operations *model.AppendableHistory
}
type WatchResponse struct {
clientv3.WatchResponse
Time time.Duration
}
func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*RecordingClient, error) {
@ -53,9 +61,9 @@ func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*
return nil, err
}
return &RecordingClient{
client: *cc,
history: model.NewAppendableHistory(ids),
baseTime: baseTime,
client: *cc,
operations: model.NewAppendableHistory(ids),
baseTime: baseTime,
}, nil
}
@ -63,77 +71,85 @@ func (c *RecordingClient) Close() error {
return c.client.Close()
}
func (c *RecordingClient) Operations() model.History {
return c.history.History
func (c *RecordingClient) Report() ClientReport {
return ClientReport{
Operations: c.operations.History,
Watch: nil,
}
}
type ClientReport struct {
Operations model.History
Watch []WatchResponse
}
func (c *RecordingClient) Get(ctx context.Context, key string) (*mvccpb.KeyValue, error) {
resp, err := c.Range(ctx, key, false)
if err != nil || len(resp) == 0 {
if err != nil || len(resp.Kvs) == 0 {
return nil, err
}
if len(resp) == 1 {
return resp[0], err
if len(resp.Kvs) == 1 {
return resp.Kvs[0], err
}
panic(fmt.Sprintf("Unexpected response size: %d", len(resp)))
panic(fmt.Sprintf("Unexpected response size: %d", len(resp.Kvs)))
}
func (c *RecordingClient) Range(ctx context.Context, key string, withPrefix bool) ([]*mvccpb.KeyValue, error) {
func (c *RecordingClient) Range(ctx context.Context, key string, withPrefix bool) (*clientv3.GetResponse, error) {
ops := []clientv3.OpOption{}
if withPrefix {
ops = append(ops, clientv3.WithPrefix())
}
c.mux.Lock()
defer c.mux.Unlock()
c.opMux.Lock()
defer c.opMux.Unlock()
callTime := time.Since(c.baseTime)
resp, err := c.client.Get(ctx, key, ops...)
if err != nil {
return nil, err
}
returnTime := time.Since(c.baseTime)
c.history.AppendRange(key, withPrefix, callTime, returnTime, resp)
return resp.Kvs, nil
c.operations.AppendRange(key, withPrefix, callTime, returnTime, resp)
return resp, nil
}
func (c *RecordingClient) Put(ctx context.Context, key, value string) error {
c.mux.Lock()
defer c.mux.Unlock()
c.opMux.Lock()
defer c.opMux.Unlock()
callTime := time.Since(c.baseTime)
resp, err := c.client.Put(ctx, key, value)
returnTime := time.Since(c.baseTime)
c.history.AppendPut(key, value, callTime, returnTime, resp, err)
c.operations.AppendPut(key, value, callTime, returnTime, resp, err)
return err
}
func (c *RecordingClient) Delete(ctx context.Context, key string) error {
c.mux.Lock()
defer c.mux.Unlock()
c.opMux.Lock()
defer c.opMux.Unlock()
callTime := time.Since(c.baseTime)
resp, err := c.client.Delete(ctx, key)
returnTime := time.Since(c.baseTime)
c.history.AppendDelete(key, callTime, returnTime, resp, err)
c.operations.AppendDelete(key, callTime, returnTime, resp, err)
return nil
}
func (c *RecordingClient) CompareRevisionAndDelete(ctx context.Context, key string, expectedRevision int64) error {
txn := c.compareRevisionTxn(ctx, key, expectedRevision, clientv3.OpDelete(key))
c.mux.Lock()
defer c.mux.Unlock()
c.opMux.Lock()
defer c.opMux.Unlock()
callTime := time.Since(c.baseTime)
resp, err := txn.Commit()
returnTime := time.Since(c.baseTime)
c.history.AppendCompareRevisionAndDelete(key, expectedRevision, callTime, returnTime, resp, err)
c.operations.AppendCompareRevisionAndDelete(key, expectedRevision, callTime, returnTime, resp, err)
return err
}
func (c *RecordingClient) CompareRevisionAndPut(ctx context.Context, key, value string, expectedRevision int64) error {
txn := c.compareRevisionTxn(ctx, key, expectedRevision, clientv3.OpPut(key, value))
c.mux.Lock()
defer c.mux.Unlock()
c.opMux.Lock()
defer c.opMux.Unlock()
callTime := time.Since(c.baseTime)
resp, err := txn.Commit()
returnTime := time.Since(c.baseTime)
c.history.AppendCompareRevisionAndPut(key, expectedRevision, value, callTime, returnTime, resp, err)
c.operations.AppendCompareRevisionAndPut(key, expectedRevision, value, callTime, returnTime, resp, err)
return err
}
@ -158,22 +174,22 @@ func (c *RecordingClient) Txn(ctx context.Context, cmp []clientv3.Cmp, ops []cli
).Then(
ops...,
)
c.mux.Lock()
defer c.mux.Unlock()
c.opMux.Lock()
defer c.opMux.Unlock()
callTime := time.Since(c.baseTime)
resp, err := txn.Commit()
returnTime := time.Since(c.baseTime)
c.history.AppendTxn(cmp, ops, callTime, returnTime, resp, err)
c.operations.AppendTxn(cmp, ops, callTime, returnTime, resp, err)
return err
}
func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, error) {
c.mux.Lock()
defer c.mux.Unlock()
c.opMux.Lock()
defer c.opMux.Unlock()
callTime := time.Since(c.baseTime)
resp, err := c.client.Lease.Grant(ctx, ttl)
returnTime := time.Since(c.baseTime)
c.history.AppendLeaseGrant(callTime, returnTime, resp, err)
c.operations.AppendLeaseGrant(callTime, returnTime, resp, err)
var leaseId int64
if resp != nil {
leaseId = int64(resp.ID)
@ -182,32 +198,53 @@ func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, err
}
func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseId int64) error {
c.mux.Lock()
defer c.mux.Unlock()
c.opMux.Lock()
defer c.opMux.Unlock()
callTime := time.Since(c.baseTime)
resp, err := c.client.Lease.Revoke(ctx, clientv3.LeaseID(leaseId))
returnTime := time.Since(c.baseTime)
c.history.AppendLeaseRevoke(leaseId, callTime, returnTime, resp, err)
c.operations.AppendLeaseRevoke(leaseId, callTime, returnTime, resp, err)
return err
}
func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value string, leaseId int64) error {
opts := clientv3.WithLease(clientv3.LeaseID(leaseId))
c.mux.Lock()
defer c.mux.Unlock()
c.opMux.Lock()
defer c.opMux.Unlock()
callTime := time.Since(c.baseTime)
resp, err := c.client.Put(ctx, key, value, opts)
returnTime := time.Since(c.baseTime)
c.history.AppendPutWithLease(key, value, leaseId, callTime, returnTime, resp, err)
c.operations.AppendPutWithLease(key, value, leaseId, callTime, returnTime, resp, err)
return err
}
func (c *RecordingClient) Defragment(ctx context.Context) error {
c.mux.Lock()
defer c.mux.Unlock()
c.opMux.Lock()
defer c.opMux.Unlock()
callTime := time.Since(c.baseTime)
resp, err := c.client.Defragment(ctx, c.client.Endpoints()[0])
returnTime := time.Since(c.baseTime)
c.history.AppendDefragment(callTime, returnTime, resp, err)
c.operations.AppendDefragment(callTime, returnTime, resp, err)
return err
}
func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool) clientv3.WatchChan {
ops := []clientv3.OpOption{clientv3.WithProgressNotify()}
if withPrefix {
ops = append(ops, clientv3.WithPrefix())
}
if rev != 0 {
ops = append(ops, clientv3.WithRev(rev))
}
respCh := make(chan clientv3.WatchResponse)
go func() {
defer close(respCh)
for r := range c.client.Watch(ctx, key, ops...) {
c.watchMux.Lock()
c.watchResponses = append(c.watchResponses, WatchResponse{r, time.Since(c.baseTime)})
c.watchMux.Unlock()
respCh <- r
}
}()
return respCh
}

View File

@ -16,12 +16,16 @@ package traffic
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/stringutil"
"go.etcd.io/etcd/tests/v3/robustness/identity"
)
@ -61,42 +65,82 @@ const (
)
func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) {
for {
select {
case <-ctx.Done():
return
case <-finish:
return
default:
s := newStorage()
keyPrefix := "/registry/" + t.resource + "/"
g := errgroup.Group{}
g.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-finish:
return nil
default:
}
resp, err := t.Range(ctx, c, keyPrefix, true)
if err != nil {
continue
}
s.Reset(resp)
limiter.Wait(ctx)
watchCtx, cancel := context.WithTimeout(ctx, WatchTimeout)
for e := range c.Watch(watchCtx, keyPrefix, resp.Header.Revision, true) {
s.Update(e)
}
cancel()
}
objects, err := t.Range(ctx, c, "/registry/"+t.resource+"/", true)
if err != nil {
continue
})
g.Go(func() error {
lastWriteFailed := false
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-finish:
return nil
default:
}
// Avoid multiple failed writes in a row
if lastWriteFailed {
resp, err := t.Range(ctx, c, keyPrefix, true)
if err != nil {
continue
}
s.Reset(resp)
limiter.Wait(ctx)
}
err := t.Write(ctx, c, ids, s)
lastWriteFailed = err != nil
if err != nil {
continue
}
limiter.Wait(ctx)
}
limiter.Wait(ctx)
err = t.Write(ctx, c, ids, objects)
if err != nil {
continue
}
limiter.Wait(ctx)
}
})
g.Wait()
}
func (t kubernetesTraffic) Write(ctx context.Context, c *RecordingClient, ids identity.Provider, objects []*mvccpb.KeyValue) (err error) {
func (t kubernetesTraffic) Write(ctx context.Context, c *RecordingClient, ids identity.Provider, s *storage) (err error) {
writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
if len(objects) < t.averageKeyCount/2 {
defer cancel()
count := s.Count()
if count < t.averageKeyCount/2 {
err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId()))
} else {
randomPod := objects[rand.Intn(len(objects))]
if len(objects) > t.averageKeyCount*3/2 {
err = t.Delete(writeCtx, c, string(randomPod.Key), randomPod.ModRevision)
key, rev := s.PickRandom()
if rev == 0 {
return errors.New("storage empty")
}
if count > t.averageKeyCount*3/2 {
err = t.Delete(writeCtx, c, key, rev)
} else {
op := KubernetesRequestType(pickRandom(t.writeChoices))
op := pickRandom(t.writeChoices)
switch op {
case KubernetesDelete:
err = t.Delete(writeCtx, c, string(randomPod.Key), randomPod.ModRevision)
err = t.Delete(writeCtx, c, key, rev)
case KubernetesUpdate:
err = t.Update(writeCtx, c, string(randomPod.Key), fmt.Sprintf("%d", ids.NewRequestId()), randomPod.ModRevision)
err = t.Update(writeCtx, c, key, fmt.Sprintf("%d", ids.NewRequestId()), rev)
case KubernetesCreate:
err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId()))
default:
@ -104,7 +148,6 @@ func (t kubernetesTraffic) Write(ctx context.Context, c *RecordingClient, ids id
}
}
}
cancel()
return err
}
@ -112,7 +155,7 @@ func (t kubernetesTraffic) generateKey() string {
return fmt.Sprintf("/registry/%s/%s/%s", t.resource, t.namespace, stringutil.RandString(5))
}
func (t kubernetesTraffic) Range(ctx context.Context, c *RecordingClient, key string, withPrefix bool) ([]*mvccpb.KeyValue, error) {
func (t kubernetesTraffic) Range(ctx context.Context, c *RecordingClient, key string, withPrefix bool) (*clientv3.GetResponse, error) {
ctx, cancel := context.WithTimeout(ctx, RequestTimeout)
resp, err := c.Range(ctx, key, withPrefix)
cancel()
@ -136,3 +179,65 @@ func (t kubernetesTraffic) Delete(ctx context.Context, c *RecordingClient, key s
cancel()
return err
}
type storage struct {
mux sync.RWMutex
keyRevision map[string]int64
revision int64
}
func newStorage() *storage {
return &storage{
keyRevision: map[string]int64{},
}
}
func (s *storage) Update(resp clientv3.WatchResponse) {
s.mux.Lock()
defer s.mux.Unlock()
for _, e := range resp.Events {
if e.Kv.ModRevision < s.revision {
continue
}
s.revision = e.Kv.ModRevision
switch e.Type {
case mvccpb.PUT:
s.keyRevision[string(e.Kv.Key)] = e.Kv.ModRevision
case mvccpb.DELETE:
delete(s.keyRevision, string(e.Kv.Key))
}
}
}
func (s *storage) Reset(resp *clientv3.GetResponse) {
s.mux.Lock()
defer s.mux.Unlock()
if resp.Header.Revision <= s.revision {
return
}
s.keyRevision = make(map[string]int64, len(resp.Kvs))
for _, kv := range resp.Kvs {
s.keyRevision[string(kv.Key)] = kv.ModRevision
}
s.revision = resp.Header.Revision
}
func (s *storage) Count() int {
s.mux.RLock()
defer s.mux.RUnlock()
return len(s.keyRevision)
}
func (s *storage) PickRandom() (key string, rev int64) {
s.mux.RLock()
defer s.mux.RUnlock()
n := rand.Intn(len(s.keyRevision))
i := 0
for k, v := range s.keyRevision {
if i == n {
return k, v
}
i++
}
return "", 0
}

View File

@ -32,6 +32,7 @@ import (
var (
DefaultLeaseTTL int64 = 7200
RequestTimeout = 40 * time.Millisecond
WatchTimeout = 400 * time.Millisecond
MultiOpTxnOpCount = 4
)
@ -63,7 +64,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
config.traffic.Run(ctx, clientId, c, limiter, ids, lm, finish)
mux.Lock()
h = h.Merge(c.Operations())
h = h.Merge(c.operations.History)
mux.Unlock()
}(c, i)
}
@ -76,7 +77,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
if err != nil {
t.Error(err)
}
h = h.Merge(cc.Operations())
h = h.Merge(cc.operations.History)
operations := h.Operations()
lg.Info("Recorded operations", zap.Int("count", len(operations)))

View File

@ -29,12 +29,13 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/model"
"go.etcd.io/etcd/tests/v3/robustness/traffic"
)
func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64, cfg watchConfig, baseTime time.Time) [][]watchResponse {
func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64, cfg watchConfig, baseTime time.Time) [][]traffic.WatchResponse {
mux := sync.Mutex{}
var wg sync.WaitGroup
memberResponses := make([][]watchResponse, len(clus.Procs))
memberResponses := make([][]traffic.WatchResponse, len(clus.Procs))
memberMaxRevisionChans := make([]chan int64, len(clus.Procs))
for i, member := range clus.Procs {
c, err := clientv3.New(clientv3.Config{
@ -75,7 +76,7 @@ type watchConfig struct {
}
// watchMember collects all responses until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed.
func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64, cfg watchConfig, baseTime time.Time) (resps []watchResponse) {
func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64, cfg watchConfig, baseTime time.Time) (resps []traffic.WatchResponse) {
var maxRevision int64 = 0
var lastRevision int64 = 0
ctx, cancel := context.WithCancel(ctx)
@ -111,7 +112,7 @@ func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevis
if resp.Err() == nil {
// using time.Since time-measuring operation to get monotonic clock reading
// see https://github.com/golang/go/blob/master/src/time/time.go#L17
resps = append(resps, watchResponse{resp, time.Since(baseTime)})
resps = append(resps, traffic.WatchResponse{WatchResponse: resp, Time: time.Since(baseTime)})
} else if !resp.Canceled {
t.Errorf("Watch stream received error, err %v", resp.Err())
}
@ -126,7 +127,7 @@ func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevis
}
}
func watchResponsesMaxRevision(responses []watchResponse) int64 {
func watchResponsesMaxRevision(responses []traffic.WatchResponse) int64 {
var maxRevision int64
for _, response := range responses {
for _, event := range response.Events {
@ -138,13 +139,13 @@ func watchResponsesMaxRevision(responses []watchResponse) int64 {
return maxRevision
}
func validateWatchResponses(t *testing.T, clus *e2e.EtcdProcessCluster, responses [][]watchResponse, expectProgressNotify bool) {
func validateWatchResponses(t *testing.T, clus *e2e.EtcdProcessCluster, responses [][]traffic.WatchResponse, expectProgressNotify bool) {
for i, member := range clus.Procs {
validateMemberWatchResponses(t, member.Config().Name, responses[i], expectProgressNotify)
}
}
func validateMemberWatchResponses(t *testing.T, memberId string, responses []watchResponse, expectProgressNotify bool) {
func validateMemberWatchResponses(t *testing.T, memberId string, responses []traffic.WatchResponse, expectProgressNotify bool) {
// Validate watch is correctly configured to ensure proper testing
validateGotAtLeastOneProgressNotify(t, memberId, responses, expectProgressNotify)
@ -156,7 +157,7 @@ func validateMemberWatchResponses(t *testing.T, memberId string, responses []wat
validateRenewable(t, memberId, responses)
}
func validateGotAtLeastOneProgressNotify(t *testing.T, memberId string, responses []watchResponse, expectProgressNotify bool) {
func validateGotAtLeastOneProgressNotify(t *testing.T, memberId string, responses []traffic.WatchResponse, expectProgressNotify bool) {
var gotProgressNotify = false
var lastHeadRevision int64 = 1
for _, resp := range responses {
@ -171,7 +172,7 @@ func validateGotAtLeastOneProgressNotify(t *testing.T, memberId string, response
}
}
func validateRenewable(t *testing.T, memberId string, responses []watchResponse) {
func validateRenewable(t *testing.T, memberId string, responses []traffic.WatchResponse) {
var lastProgressNotifyRevision int64 = 0
for _, resp := range responses {
for _, event := range resp.Events {
@ -185,7 +186,7 @@ func validateRenewable(t *testing.T, memberId string, responses []watchResponse)
}
}
func validateOrderedAndReliable(t *testing.T, memberId string, responses []watchResponse) {
func validateOrderedAndReliable(t *testing.T, memberId string, responses []traffic.WatchResponse) {
var lastEventRevision int64 = 1
for _, resp := range responses {
for _, event := range resp.Events {
@ -201,7 +202,7 @@ func validateOrderedAndReliable(t *testing.T, memberId string, responses []watch
}
}
func validateUnique(t *testing.T, memberId string, responses []watchResponse) {
func validateUnique(t *testing.T, memberId string, responses []traffic.WatchResponse) {
type revisionKey struct {
revision int64
key string
@ -218,7 +219,7 @@ func validateUnique(t *testing.T, memberId string, responses []watchResponse) {
}
}
func validateAtomic(t *testing.T, memberId string, responses []watchResponse) {
func validateAtomic(t *testing.T, memberId string, responses []traffic.WatchResponse) {
var lastEventRevision int64 = 1
for _, resp := range responses {
if len(resp.Events) > 0 {
@ -230,7 +231,7 @@ func validateAtomic(t *testing.T, memberId string, responses []watchResponse) {
}
}
func toWatchEvents(responses []watchResponse) (events []watchEvent) {
func toWatchEvents(responses []traffic.WatchResponse) (events []watchEvent) {
for _, resp := range responses {
for _, event := range resp.Events {
var op model.OperationType
@ -241,7 +242,7 @@ func toWatchEvents(responses []watchResponse) (events []watchEvent) {
op = model.Delete
}
events = append(events, watchEvent{
Time: resp.time,
Time: resp.Time,
Revision: event.Kv.ModRevision,
Op: model.EtcdOperation{
Type: op,
@ -254,11 +255,6 @@ func toWatchEvents(responses []watchResponse) (events []watchEvent) {
return events
}
type watchResponse struct {
clientv3.WatchResponse
time time.Duration
}
type watchEvent struct {
Op model.EtcdOperation
Revision int64
@ -354,7 +350,7 @@ func hasUniqueWriteOperation(request *model.TxnRequest) bool {
return false
}
func watchEvents(responses [][]watchResponse) [][]watchEvent {
func watchEvents(responses [][]traffic.WatchResponse) [][]watchEvent {
ops := make([][]watchEvent, len(responses))
for i, resps := range responses {
ops[i] = toWatchEvents(resps)