diff --git a/tools/functional-tester/etcd-tester/cluster.go b/tools/functional-tester/etcd-tester/cluster.go index e261999a4..890a327f3 100644 --- a/tools/functional-tester/etcd-tester/cluster.go +++ b/tools/functional-tester/etcd-tester/cluster.go @@ -40,6 +40,7 @@ type cluster struct { stressQPS int stressKeySize int stressKeySuffixRange int + stressKeyRangeLimit int Size int Stressers []Stresser @@ -51,21 +52,6 @@ type ClusterStatus struct { AgentStatuses map[string]client.Status } -// newCluster starts and returns a new cluster. The caller should call Terminate when finished, to shut it down. -func newCluster(agentEndpoints []string, datadir string, stressQPS, stressKeySize, stressKeySuffixRange int, isV2Only bool) (*cluster, error) { - c := &cluster{ - v2Only: isV2Only, - datadir: datadir, - stressQPS: stressQPS, - stressKeySize: stressKeySize, - stressKeySuffixRange: stressKeySuffixRange, - } - if err := c.bootstrap(agentEndpoints); err != nil { - return nil, err - } - return c, nil -} - func (c *cluster) bootstrap(agentEndpoints []string) error { size := len(agentEndpoints) @@ -117,15 +103,16 @@ func (c *cluster) bootstrap(agentEndpoints []string) error { if c.v2Only { c.Stressers[i] = &stresserV2{ Endpoint: m.ClientURL, - KeySize: c.stressKeySize, - KeySuffixRange: c.stressKeySuffixRange, + keySize: c.stressKeySize, + keySuffixRange: c.stressKeySuffixRange, N: stressN, } } else { c.Stressers[i] = &stresser{ Endpoint: m.grpcAddr(), - KeySize: c.stressKeySize, - KeySuffixRange: c.stressKeySuffixRange, + keySize: c.stressKeySize, + keySuffixRange: c.stressKeySuffixRange, + keyRangeLimit: c.stressKeyRangeLimit, qps: c.stressQPS, N: stressN, } diff --git a/tools/functional-tester/etcd-tester/main.go b/tools/functional-tester/etcd-tester/main.go index 01b4f2531..ec2ae97f0 100644 --- a/tools/functional-tester/etcd-tester/main.go +++ b/tools/functional-tester/etcd-tester/main.go @@ -29,8 +29,9 @@ var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcd-tester") func main() { endpointStr := flag.String("agent-endpoints", "localhost:9027", "HTTP RPC endpoints of agents. Do not specify the schema.") datadir := flag.String("data-dir", "agent.etcd", "etcd data directory location on agent machine.") - stressKeySize := flag.Int("stress-key-size", 100, "the size of each key written into etcd.") - stressKeySuffixRange := flag.Int("stress-key-count", 250000, "the count of key range written into etcd.") + stressKeySize := flag.Uint("stress-key-size", 100, "the size of each key written into etcd.") + stressKeySuffixRange := flag.Uint("stress-key-count", 250000, "the count of key range written into etcd.") + stressKeyRangeLimit := flag.Uint("stress-range-limit", 50, "maximum number of keys to range or delete.") limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).") stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.") schedCases := flag.String("schedule-cases", "", "test case schedule") @@ -38,9 +39,15 @@ func main() { isV2Only := flag.Bool("v2-only", false, "'true' to run V2 only tester.") flag.Parse() - endpoints := strings.Split(*endpointStr, ",") - c, err := newCluster(endpoints, *datadir, *stressQPS, *stressKeySize, *stressKeySuffixRange, *isV2Only) - if err != nil { + c := &cluster{ + v2Only: *isV2Only, + datadir: *datadir, + stressQPS: *stressQPS, + stressKeySize: int(*stressKeySize), + stressKeySuffixRange: int(*stressKeySuffixRange), + stressKeyRangeLimit: int(*stressKeyRangeLimit), + } + if err := c.bootstrap(strings.Split(*endpointStr, ",")); err != nil { plog.Fatal(err) } defer c.Terminate() diff --git a/tools/functional-tester/etcd-tester/stresser.go b/tools/functional-tester/etcd-tester/stresser.go index 00d8f80fd..d578b5958 100644 --- a/tools/functional-tester/etcd-tester/stresser.go +++ b/tools/functional-tester/etcd-tester/stresser.go @@ -90,11 +90,12 @@ func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc { } } -func newStressRangePrefix(kvc pb.KVClient, keySuffixRange int) stressFunc { +func newStressRangeInterval(kvc pb.KVClient, keySuffixRange, keyRangeLimit int) stressFunc { return func(ctx context.Context) error { _, err := kvc.Range(ctx, &pb.RangeRequest{ Key: []byte("foo"), - RangeEnd: []byte(fmt.Sprintf("foo%d", rand.Intn(keySuffixRange))), + RangeEnd: []byte(fmt.Sprintf("foo%d", keySuffixRange)), + Limit: int64(keyRangeLimit), }, grpc.FailFast(false)) return err } @@ -109,11 +110,23 @@ func newStressDelete(kvc pb.KVClient, keySuffixRange int) stressFunc { } } -func newStressDeletePrefix(kvc pb.KVClient, keySuffixRange int) stressFunc { +func newStressDeleteInterval(kvc pb.KVClient, keySuffixRange, keyRangeLimit int) stressFunc { return func(ctx context.Context) error { - _, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{ + resp, _ := kvc.Range(ctx, &pb.RangeRequest{ Key: []byte("foo"), - RangeEnd: []byte(fmt.Sprintf("foo%d", rand.Intn(keySuffixRange))), + RangeEnd: []byte(fmt.Sprintf("foo%d", keySuffixRange)), + Limit: int64(keyRangeLimit), + }, grpc.FailFast(false)) + + start, end := []byte("foo"), []byte(fmt.Sprintf("foo%d", keyRangeLimit)) + if resp != nil && resp.Count > 0 { + start = resp.Kvs[0].Key + end = resp.Kvs[len(resp.Kvs)-1].Key + } + + _, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{ + Key: start, + RangeEnd: end, }, grpc.FailFast(false)) return err } @@ -131,8 +144,9 @@ type Stresser interface { type stresser struct { Endpoint string - KeySize int - KeySuffixRange int + keySize int + keySuffixRange int + keyRangeLimit int qps int N int @@ -171,11 +185,11 @@ func (s *stresser) Stress() error { kvc := pb.NewKVClient(conn) var stressEntries = []stressEntry{ - {weight: 0.7, f: newStressPut(kvc, s.KeySuffixRange, s.KeySize)}, - {weight: 0.07, f: newStressRange(kvc, s.KeySuffixRange)}, - {weight: 0.07, f: newStressRangePrefix(kvc, s.KeySuffixRange)}, - {weight: 0.07, f: newStressDelete(kvc, s.KeySuffixRange)}, - {weight: 0.07, f: newStressDeletePrefix(kvc, s.KeySuffixRange)}, + {weight: 0.7, f: newStressPut(kvc, s.keySuffixRange, s.keySize)}, + {weight: 0.07, f: newStressRange(kvc, s.keySuffixRange)}, + {weight: 0.07, f: newStressRangeInterval(kvc, s.keySuffixRange, s.keyRangeLimit)}, + {weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)}, + {weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange, s.keyRangeLimit)}, } s.stressTable = createStressTable(stressEntries) @@ -272,8 +286,8 @@ func (s *stresser) Report() (int, int) { type stresserV2 struct { Endpoint string - KeySize int - KeySuffixRange int + keySize int + keySuffixRange int N int @@ -308,8 +322,8 @@ func (s *stresserV2) Stress() error { go func() { for { setctx, setcancel := context.WithTimeout(ctx, clientV2.DefaultRequestTimeout) - key := fmt.Sprintf("foo%d", rand.Intn(s.KeySuffixRange)) - _, err := kv.Set(setctx, key, string(randBytes(s.KeySize)), nil) + key := fmt.Sprintf("foo%d", rand.Intn(s.keySuffixRange)) + _, err := kv.Set(setctx, key, string(randBytes(s.keySize)), nil) setcancel() if err == context.Canceled { return