Merge pull request #6506 from mitake/decouple-stresser

functional-tester: decouple stresser from tester
release-3.1
Gyu-Ho Lee 2016-09-23 10:05:03 -07:00 committed by GitHub
commit 4ef44d1130
3 changed files with 78 additions and 28 deletions

View File

@ -25,7 +25,6 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/tools/functional-tester/etcd-agent/client"
"golang.org/x/time/rate"
"google.golang.org/grpc"
)
@ -53,6 +52,8 @@ type cluster struct {
Stressers []Stresser
Members []*member
stressBuilder stressBuilder
}
type ClusterStatus struct {
@ -102,29 +103,9 @@ func (c *cluster) bootstrap() error {
}
}
// TODO: Too intensive stressers can panic etcd member with
// 'out of memory' error. Put rate limits in server side.
stressN := 100
c.Stressers = make([]Stresser, len(members))
limiter := rate.NewLimiter(rate.Limit(c.stressQPS), c.stressQPS)
for i, m := range members {
if c.v2Only {
c.Stressers[i] = &stresserV2{
Endpoint: m.ClientURL,
keySize: c.stressKeySize,
keySuffixRange: c.stressKeySuffixRange,
N: stressN,
}
} else {
c.Stressers[i] = &stresser{
Endpoint: m.grpcAddr(),
keyLargeSize: c.stressKeyLargeSize,
keySize: c.stressKeySize,
keySuffixRange: c.stressKeySuffixRange,
N: stressN,
rateLimiter: limiter,
}
}
c.Stressers[i] = c.stressBuilder(m)
go c.Stressers[i].Stress()
}

View File

@ -48,6 +48,7 @@ func main() {
schedCases := flag.String("schedule-cases", "", "test case schedule")
consistencyCheck := flag.Bool("consistency-check", true, "true to check consistency (revision, hash)")
isV2Only := flag.Bool("v2-only", false, "'true' to run V2 only tester.")
stresserType := flag.String("stresser", "default", "specify stresser (\"default\" or \"nop\").")
flag.Parse()
eps := strings.Split(*endpointStr, ",")
@ -63,13 +64,18 @@ func main() {
agents[i].datadir = *datadir
}
sConfig := &stressConfig{
qps: *stressQPS,
keyLargeSize: int(*stressKeyLargeSize),
keySize: int(*stressKeySize),
keySuffixRange: int(*stressKeySuffixRange),
v2: *isV2Only,
}
c := &cluster{
agents: agents,
v2Only: *isV2Only,
stressQPS: *stressQPS,
stressKeyLargeSize: int(*stressKeyLargeSize),
stressKeySize: int(*stressKeySize),
stressKeySuffixRange: int(*stressKeySuffixRange),
agents: agents,
v2Only: *isV2Only,
stressBuilder: newStressBuilder(*stresserType, sConfig),
}
if err := c.bootstrap(); err != nil {

View File

@ -369,3 +369,66 @@ func randBytes(size int) []byte {
}
return data
}
// nopStresser implements Stresser that does nothing
type nopStresser struct {
start time.Time
qps int
}
func (s *nopStresser) Stress() error { return nil }
func (s *nopStresser) Cancel() {}
func (s *nopStresser) Report() (int, int) {
return int(time.Since(s.start).Seconds()) * s.qps, 0
}
type stressConfig struct {
qps int
keyLargeSize int
keySize int
keySuffixRange int
v2 bool
}
type stressBuilder func(m *member) Stresser
func newStressBuilder(s string, sc *stressConfig) stressBuilder {
switch s {
case "nop":
return func(*member) Stresser {
return &nopStresser{
start: time.Now(),
qps: sc.qps,
}
}
case "default":
// TODO: Too intensive stressers can panic etcd member with
// 'out of memory' error. Put rate limits in server side.
stressN := 100
l := rate.NewLimiter(rate.Limit(sc.qps), sc.qps)
return func(m *member) Stresser {
if sc.v2 {
return &stresserV2{
Endpoint: m.ClientURL,
keySize: sc.keySize,
keySuffixRange: sc.keySuffixRange,
N: stressN,
}
} else {
return &stresser{
Endpoint: m.grpcAddr(),
keyLargeSize: sc.keyLargeSize,
keySize: sc.keySize,
keySuffixRange: sc.keySuffixRange,
N: stressN,
rateLimiter: l,
}
}
}
default:
plog.Panicf("unknown stresser type: %s\n", s)
}
return nil // never reach here
}