benchmark: fix watch command

Fix https://github.com/coreos/etcd/issues/5099.
release-3.0
Gyu-Ho Lee 2016-05-19 09:57:35 -07:00
parent 8e4a83c830
commit 9ca84e814f
1 changed files with 12 additions and 11 deletions

View File

@ -88,6 +88,7 @@ func watchFunc(cmd *cobra.Command, args []string) {
}
watched := make([]string, watchedKeyTotal)
numWatchers := make(map[string]int)
for i := range watched {
k := make([]byte, watchKeySize)
if watchSeqKeys {
@ -96,6 +97,7 @@ func watchFunc(cmd *cobra.Command, args []string) {
binary.PutVarint(k, int64(rand.Intn(watchKeySpaceSize)))
}
watched[i] = string(k)
numWatchers[watched[i]] = numWatchers[watched[i]] + 1
}
requests := make(chan string, totalClients)
@ -137,8 +139,10 @@ func watchFunc(cmd *cobra.Command, args []string) {
<-pdoneC
// put phase
// total number of puts * number of watchers on each key
eventsTotal = watchPutTotal * (watchTotal / watchedKeyTotal)
eventsTotal = 0
for i := 0; i < watchPutTotal; i++ {
eventsTotal += numWatchers[watched[i%len(watched)]]
}
results = make(chan result)
bar = pb.New(eventsTotal)
@ -157,7 +161,7 @@ func watchFunc(cmd *cobra.Command, args []string) {
pdoneC = printRate(results)
go func() {
for i := 0; i < eventsTotal; i++ {
for i := 0; i < watchPutTotal; i++ {
putreqc <- v3.OpPut(watched[i%(len(watched))], "data")
// TODO: use a real rate-limiter instead of sleep.
time.Sleep(time.Second / time.Duration(watchPutRate))
@ -165,9 +169,7 @@ func watchFunc(cmd *cobra.Command, args []string) {
close(putreqc)
}()
for range streams {
<-recvCompletedNotifier
}
<-recvCompletedNotifier
bar.Finish()
fmt.Printf("Watch events received summary:\n")
close(results)
@ -194,16 +196,15 @@ func doWatch(stream v3.Watcher, requests <-chan string) {
func recvWatchChan(wch v3.WatchChan) {
for range wch {
if atomic.LoadInt32(&nrRecvCompleted) == int32(eventsTotal) {
recvCompletedNotifier <- struct{}{}
break
}
st := time.Now()
results <- result{duration: time.Since(st), happened: time.Now()}
bar.Increment()
atomic.AddInt32(&nrRecvCompleted, 1)
if atomic.LoadInt32(&nrRecvCompleted) == int32(eventsTotal) {
recvCompletedNotifier <- struct{}{}
break
}
}
}