Merge pull request #6124 from heyitsanthony/share-limiter

functional-tester: share limiter among stresser
release-3.1
Xiang Li 2016-08-07 19:17:06 -07:00 committed by GitHub
commit fb7c4da361
3 changed files with 9 additions and 5 deletions

View File

@ -25,6 +25,7 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/tools/functional-tester/etcd-agent/client"
"golang.org/x/time/rate"
"google.golang.org/grpc"
)
@ -99,6 +100,7 @@ func (c *cluster) bootstrap(agentEndpoints []string) error {
// 'out of memory' error. Put rate limits in server side.
stressN := 100
c.Stressers = make([]Stresser, len(members))
limiter := rate.NewLimiter(rate.Limit(c.stressQPS), c.stressQPS)
for i, m := range members {
if c.v2Only {
c.Stressers[i] = &stresserV2{
@ -113,8 +115,8 @@ func (c *cluster) bootstrap(agentEndpoints []string) error {
keySize: c.stressKeySize,
keySuffixRange: c.stressKeySuffixRange,
keyRangeLimit: c.stressKeyRangeLimit,
qps: c.stressQPS,
N: stressN,
rateLimiter: limiter,
}
}
go c.Stressers[i].Stress()

View File

@ -33,7 +33,7 @@ func main() {
stressKeySuffixRange := flag.Uint("stress-key-count", 250000, "the count of key range written into etcd.")
stressKeyRangeLimit := flag.Uint("stress-range-limit", 50, "maximum number of keys to range or delete.")
limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).")
stressQPS := flag.Int("stress-qps", 3000, "maximum number of stresser requests per second.")
stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.")
schedCases := flag.String("schedule-cases", "", "test case schedule")
consistencyCheck := flag.Bool("consistency-check", true, "true to check consistency (revision, hash)")
isV2Only := flag.Bool("v2-only", false, "'true' to run V2 only tester.")

View File

@ -148,8 +148,7 @@ type stresser struct {
keySuffixRange int
keyRangeLimit int
qps int
N int
N int
mu sync.Mutex
wg *sync.WaitGroup
@ -166,6 +165,10 @@ type stresser struct {
}
func (s *stresser) Stress() error {
if s.rateLimiter == nil {
panic("expect rateLimiter to be set")
}
// TODO: add backoff option
conn, err := grpc.Dial(s.Endpoint, grpc.WithInsecure())
if err != nil {
@ -180,7 +183,6 @@ func (s *stresser) Stress() error {
s.conn = conn
s.cancel = cancel
s.wg = wg
s.rateLimiter = rate.NewLimiter(rate.Limit(s.qps), s.qps)
s.mu.Unlock()
kvc := pb.NewKVClient(conn)