etcd-runner: add --prefix flag, allows inf round, and minor vars refactoring in watch runner.
parent
debc69e1f2
commit
77fbe10dfc
|
@ -24,10 +24,19 @@ import (
|
||||||
|
|
||||||
"github.com/coreos/etcd/clientv3"
|
"github.com/coreos/etcd/clientv3"
|
||||||
"github.com/coreos/etcd/pkg/stringutil"
|
"github.com/coreos/etcd/pkg/stringutil"
|
||||||
|
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
runningTime time.Duration // time for which operation should be performed
|
||||||
|
noOfPrefixes int // total number of prefixes which will be watched upon
|
||||||
|
watchPerPrefix int // number of watchers per prefix
|
||||||
|
watchPrefix string // prefix append to keys in watcher
|
||||||
|
totalKeys int // total number of keys for operation
|
||||||
|
)
|
||||||
|
|
||||||
// NewWatchCommand returns the cobra command for "watcher runner".
|
// NewWatchCommand returns the cobra command for "watcher runner".
|
||||||
func NewWatchCommand() *cobra.Command {
|
func NewWatchCommand() *cobra.Command {
|
||||||
cmd := &cobra.Command{
|
cmd := &cobra.Command{
|
||||||
|
@ -35,12 +44,12 @@ func NewWatchCommand() *cobra.Command {
|
||||||
Short: "Performs watch operation",
|
Short: "Performs watch operation",
|
||||||
Run: runWatcherFunc,
|
Run: runWatcherFunc,
|
||||||
}
|
}
|
||||||
cmd.Flags().IntVar(&rounds, "rounds", 100, "number of rounds to run")
|
|
||||||
cmd.Flags().DurationVar(&runningTime, "running-time", 60, "number of seconds to run")
|
cmd.Flags().DurationVar(&runningTime, "running-time", 60, "number of seconds to run")
|
||||||
|
cmd.Flags().StringVar(&watchPrefix, "prefix", "", "the prefix to append on all keys")
|
||||||
cmd.Flags().IntVar(&noOfPrefixes, "total-prefixes", 10, "total no of prefixes to use")
|
cmd.Flags().IntVar(&noOfPrefixes, "total-prefixes", 10, "total no of prefixes to use")
|
||||||
cmd.Flags().IntVar(&watchPerPrefix, "watch-per-prefix", 10, "number of watchers per prefix")
|
cmd.Flags().IntVar(&watchPerPrefix, "watch-per-prefix", 10, "number of watchers per prefix")
|
||||||
cmd.Flags().IntVar(&reqRate, "req-rate", 30, "rate at which put request will be performed")
|
|
||||||
cmd.Flags().IntVar(&totalKeys, "total-keys", 1000, "total number of keys to watch")
|
cmd.Flags().IntVar(&totalKeys, "total-keys", 1000, "total number of keys to watch")
|
||||||
|
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,7 +59,7 @@ func runWatcherFunc(cmd *cobra.Command, args []string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
for round := 0; round < rounds; round++ {
|
for round := 0; round < rounds || rounds <= 0; round++ {
|
||||||
fmt.Println("round", round)
|
fmt.Println("round", round)
|
||||||
performWatchOnPrefixes(ctx, cmd, round)
|
performWatchOnPrefixes(ctx, cmd, round)
|
||||||
}
|
}
|
||||||
|
@ -94,7 +103,7 @@ func performWatchOnPrefixes(ctx context.Context, cmd *cobra.Command, round int)
|
||||||
if err = limiter.Wait(ctxt); err != nil {
|
if err = limiter.Wait(ctxt); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err = putKeyAtMostOnce(ctxt, client, roundPrefix+"-"+prefix+"-"+key); err != nil {
|
if err = putKeyAtMostOnce(ctxt, client, watchPrefix+"-"+roundPrefix+"-"+prefix+"-"+key); err != nil {
|
||||||
log.Fatalf("failed to put key: %v", err)
|
log.Fatalf("failed to put key: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -112,15 +121,15 @@ func performWatchOnPrefixes(ctx context.Context, cmd *cobra.Command, round int)
|
||||||
rc := newClient(eps, dialTimeout)
|
rc := newClient(eps, dialTimeout)
|
||||||
rcs = append(rcs, rc)
|
rcs = append(rcs, rc)
|
||||||
|
|
||||||
watchPrefix := roundPrefix + "-" + prefix
|
wprefix := watchPrefix + "-" + roundPrefix + "-" + prefix
|
||||||
|
|
||||||
wc := rc.Watch(ctxc, watchPrefix, clientv3.WithPrefix(), clientv3.WithRev(revision))
|
wc := rc.Watch(ctxc, wprefix, clientv3.WithPrefix(), clientv3.WithRev(revision))
|
||||||
wcs = append(wcs, wc)
|
wcs = append(wcs, wc)
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
checkWatchResponse(wc, watchPrefix, keys)
|
checkWatchResponse(wc, wprefix, keys)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -139,7 +148,7 @@ func performWatchOnPrefixes(ctx context.Context, cmd *cobra.Command, round int)
|
||||||
rc.Close()
|
rc.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = deletePrefix(ctx, client, roundPrefix); err != nil {
|
if err = deletePrefix(ctx, client, watchPrefix); err != nil {
|
||||||
log.Fatalf("failed to clean up keys after test: %v", err)
|
log.Fatalf("failed to clean up keys after test: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -148,7 +157,7 @@ func checkWatchResponse(wc clientv3.WatchChan, prefix string, keys []string) {
|
||||||
for n := 0; n < len(keys); {
|
for n := 0; n < len(keys); {
|
||||||
wr, more := <-wc
|
wr, more := <-wc
|
||||||
if !more {
|
if !more {
|
||||||
log.Fatalf("expect more keys (received %d/%d) for %s", len(keys), n, prefix)
|
log.Fatalf("expect more keys (received %d/%d) for %s", n, len(keys), prefix)
|
||||||
}
|
}
|
||||||
for _, event := range wr.Events {
|
for _, event := range wr.Events {
|
||||||
expectedKey := prefix + "-" + keys[n]
|
expectedKey := prefix + "-" + keys[n]
|
||||||
|
|
Loading…
Reference in New Issue