Merge pull request #6695 from sinsharat/watch_runner_respect_rounds

etcd-runner: watcher runner respect rounds
release-3.1
Xiang Li 2016-10-21 20:28:29 -07:00 committed by GitHub
commit 92c987f75d
1 changed files with 48 additions and 10 deletions

View File

@ -54,7 +54,7 @@ func main() {
case "lease-renewer":
runLeaseRenewer(eps)
case "watcher":
runWatcher(eps)
runWatcher(eps, *round)
default:
fmt.Fprintf(os.Stderr, "unsupported mode %v\n", *mode)
}
@ -219,7 +219,14 @@ func runRacer(eps []string, round int) {
doRounds(rcs, round)
}
func runWatcher(eps []string) {
func runWatcher(eps []string, limit int) {
ctx := context.Background()
for round := 0; round < limit; round++ {
performWatchOnPrefixes(ctx, eps, round)
}
}
func performWatchOnPrefixes(ctx context.Context, eps []string, round int) {
runningTime := 60 * time.Second // time for which operation should be performed
noOfPrefixes := 36 // total number of prefixes which will be watched upon
watchPerPrefix := 10 // number of watchers per prefix
@ -229,6 +236,8 @@ func runWatcher(eps []string) {
prefixes := generateUniqueKeys(5, noOfPrefixes)
keys := generateRandomKeys(10, keyPrePrefix)
roundPrefix := fmt.Sprint("%16x", round)
var (
revision int64
wg sync.WaitGroup
@ -236,7 +245,6 @@ func runWatcher(eps []string) {
err error
)
ctx := context.Background()
// create client for performing get and put operations
client := randClient(eps)
defer client.Close()
@ -253,9 +261,9 @@ func runWatcher(eps []string) {
go func() {
var modrevision int64
for i := 0; i < len(keys); i++ {
for j := 0; j < len(prefixes); j++ {
key := prefixes[j] + "-" + keys[i]
for _, key := range keys {
for _, prefix := range prefixes {
key := roundPrefix + "-" + prefix + "-" + key
// limit key put as per reqRate
if err = limiter.Wait(ctxt); err != nil {
@ -285,16 +293,22 @@ func runWatcher(eps []string) {
}
}()
ctxc, cancelc := context.WithCancel(ctx)
wcs := make([]clientv3.WatchChan, 0)
rcs := make([]*clientv3.Client, 0)
wg.Add(noOfPrefixes * watchPerPrefix)
for i := 0; i < noOfPrefixes; i++ {
for _, prefix := range prefixes {
for j := 0; j < watchPerPrefix; j++ {
go func(prefix string) {
defer wg.Done()
rc := randClient(eps)
defer rc.Close()
rcs = append(rcs, rc)
wc := rc.Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(revision))
wc := rc.Watch(ctxc, prefix, clientv3.WithPrefix(), clientv3.WithRev(revision))
wcs = append(wcs, wc)
for n := 0; n < len(keys); {
select {
case watchChan := <-wc:
@ -310,10 +324,34 @@ func runWatcher(eps []string) {
return
}
}
}(prefixes[i])
}(roundPrefix + "-" + prefix)
}
}
wg.Wait()
// cancel all watch channels
cancelc()
// verify all watch channels are closed
for e, wc := range wcs {
if _, ok := <-wc; ok {
log.Fatalf("expected wc to be closed, but received %v", e)
}
}
for _, rc := range rcs {
rc.Close()
}
deletePrefixWithRety(client, ctx, roundPrefix)
}
func deletePrefixWithRety(client *clientv3.Client, ctx context.Context, key string) {
for {
if _, err := client.Delete(ctx, key, clientv3.WithRange(key+"z")); err == nil {
return
}
}
}
func getWithRetry(client *clientv3.Client, ctx context.Context, key string) *clientv3.GetResponse {