Merge pull request #6081 from gyuho/functional-tester
etcd-tester: delete/range with limit, clean uprelease-3.1
commit
9764652356
|
@ -40,6 +40,7 @@ type cluster struct {
|
||||||
stressQPS int
|
stressQPS int
|
||||||
stressKeySize int
|
stressKeySize int
|
||||||
stressKeySuffixRange int
|
stressKeySuffixRange int
|
||||||
|
stressKeyRangeLimit int
|
||||||
|
|
||||||
Size int
|
Size int
|
||||||
Stressers []Stresser
|
Stressers []Stresser
|
||||||
|
@ -51,21 +52,6 @@ type ClusterStatus struct {
|
||||||
AgentStatuses map[string]client.Status
|
AgentStatuses map[string]client.Status
|
||||||
}
|
}
|
||||||
|
|
||||||
// newCluster starts and returns a new cluster. The caller should call Terminate when finished, to shut it down.
|
|
||||||
func newCluster(agentEndpoints []string, datadir string, stressQPS, stressKeySize, stressKeySuffixRange int, isV2Only bool) (*cluster, error) {
|
|
||||||
c := &cluster{
|
|
||||||
v2Only: isV2Only,
|
|
||||||
datadir: datadir,
|
|
||||||
stressQPS: stressQPS,
|
|
||||||
stressKeySize: stressKeySize,
|
|
||||||
stressKeySuffixRange: stressKeySuffixRange,
|
|
||||||
}
|
|
||||||
if err := c.bootstrap(agentEndpoints); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return c, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cluster) bootstrap(agentEndpoints []string) error {
|
func (c *cluster) bootstrap(agentEndpoints []string) error {
|
||||||
size := len(agentEndpoints)
|
size := len(agentEndpoints)
|
||||||
|
|
||||||
|
@ -117,15 +103,16 @@ func (c *cluster) bootstrap(agentEndpoints []string) error {
|
||||||
if c.v2Only {
|
if c.v2Only {
|
||||||
c.Stressers[i] = &stresserV2{
|
c.Stressers[i] = &stresserV2{
|
||||||
Endpoint: m.ClientURL,
|
Endpoint: m.ClientURL,
|
||||||
KeySize: c.stressKeySize,
|
keySize: c.stressKeySize,
|
||||||
KeySuffixRange: c.stressKeySuffixRange,
|
keySuffixRange: c.stressKeySuffixRange,
|
||||||
N: stressN,
|
N: stressN,
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
c.Stressers[i] = &stresser{
|
c.Stressers[i] = &stresser{
|
||||||
Endpoint: m.grpcAddr(),
|
Endpoint: m.grpcAddr(),
|
||||||
KeySize: c.stressKeySize,
|
keySize: c.stressKeySize,
|
||||||
KeySuffixRange: c.stressKeySuffixRange,
|
keySuffixRange: c.stressKeySuffixRange,
|
||||||
|
keyRangeLimit: c.stressKeyRangeLimit,
|
||||||
qps: c.stressQPS,
|
qps: c.stressQPS,
|
||||||
N: stressN,
|
N: stressN,
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,8 +29,9 @@ var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcd-tester")
|
||||||
func main() {
|
func main() {
|
||||||
endpointStr := flag.String("agent-endpoints", "localhost:9027", "HTTP RPC endpoints of agents. Do not specify the schema.")
|
endpointStr := flag.String("agent-endpoints", "localhost:9027", "HTTP RPC endpoints of agents. Do not specify the schema.")
|
||||||
datadir := flag.String("data-dir", "agent.etcd", "etcd data directory location on agent machine.")
|
datadir := flag.String("data-dir", "agent.etcd", "etcd data directory location on agent machine.")
|
||||||
stressKeySize := flag.Int("stress-key-size", 100, "the size of each key written into etcd.")
|
stressKeySize := flag.Uint("stress-key-size", 100, "the size of each key written into etcd.")
|
||||||
stressKeySuffixRange := flag.Int("stress-key-count", 250000, "the count of key range written into etcd.")
|
stressKeySuffixRange := flag.Uint("stress-key-count", 250000, "the count of key range written into etcd.")
|
||||||
|
stressKeyRangeLimit := flag.Uint("stress-range-limit", 50, "maximum number of keys to range or delete.")
|
||||||
limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).")
|
limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).")
|
||||||
stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.")
|
stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.")
|
||||||
schedCases := flag.String("schedule-cases", "", "test case schedule")
|
schedCases := flag.String("schedule-cases", "", "test case schedule")
|
||||||
|
@ -38,9 +39,15 @@ func main() {
|
||||||
isV2Only := flag.Bool("v2-only", false, "'true' to run V2 only tester.")
|
isV2Only := flag.Bool("v2-only", false, "'true' to run V2 only tester.")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
endpoints := strings.Split(*endpointStr, ",")
|
c := &cluster{
|
||||||
c, err := newCluster(endpoints, *datadir, *stressQPS, *stressKeySize, *stressKeySuffixRange, *isV2Only)
|
v2Only: *isV2Only,
|
||||||
if err != nil {
|
datadir: *datadir,
|
||||||
|
stressQPS: *stressQPS,
|
||||||
|
stressKeySize: int(*stressKeySize),
|
||||||
|
stressKeySuffixRange: int(*stressKeySuffixRange),
|
||||||
|
stressKeyRangeLimit: int(*stressKeyRangeLimit),
|
||||||
|
}
|
||||||
|
if err := c.bootstrap(strings.Split(*endpointStr, ",")); err != nil {
|
||||||
plog.Fatal(err)
|
plog.Fatal(err)
|
||||||
}
|
}
|
||||||
defer c.Terminate()
|
defer c.Terminate()
|
||||||
|
|
|
@ -90,11 +90,12 @@ func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newStressRangePrefix(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
func newStressRangeInterval(kvc pb.KVClient, keySuffixRange, keyRangeLimit int) stressFunc {
|
||||||
return func(ctx context.Context) error {
|
return func(ctx context.Context) error {
|
||||||
_, err := kvc.Range(ctx, &pb.RangeRequest{
|
_, err := kvc.Range(ctx, &pb.RangeRequest{
|
||||||
Key: []byte("foo"),
|
Key: []byte("foo"),
|
||||||
RangeEnd: []byte(fmt.Sprintf("foo%d", rand.Intn(keySuffixRange))),
|
RangeEnd: []byte(fmt.Sprintf("foo%d", keySuffixRange)),
|
||||||
|
Limit: int64(keyRangeLimit),
|
||||||
}, grpc.FailFast(false))
|
}, grpc.FailFast(false))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -109,11 +110,23 @@ func newStressDelete(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newStressDeletePrefix(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
func newStressDeleteInterval(kvc pb.KVClient, keySuffixRange, keyRangeLimit int) stressFunc {
|
||||||
return func(ctx context.Context) error {
|
return func(ctx context.Context) error {
|
||||||
_, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
|
resp, _ := kvc.Range(ctx, &pb.RangeRequest{
|
||||||
Key: []byte("foo"),
|
Key: []byte("foo"),
|
||||||
RangeEnd: []byte(fmt.Sprintf("foo%d", rand.Intn(keySuffixRange))),
|
RangeEnd: []byte(fmt.Sprintf("foo%d", keySuffixRange)),
|
||||||
|
Limit: int64(keyRangeLimit),
|
||||||
|
}, grpc.FailFast(false))
|
||||||
|
|
||||||
|
start, end := []byte("foo"), []byte(fmt.Sprintf("foo%d", keyRangeLimit))
|
||||||
|
if resp != nil && resp.Count > 0 {
|
||||||
|
start = resp.Kvs[0].Key
|
||||||
|
end = resp.Kvs[len(resp.Kvs)-1].Key
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
|
||||||
|
Key: start,
|
||||||
|
RangeEnd: end,
|
||||||
}, grpc.FailFast(false))
|
}, grpc.FailFast(false))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -131,8 +144,9 @@ type Stresser interface {
|
||||||
type stresser struct {
|
type stresser struct {
|
||||||
Endpoint string
|
Endpoint string
|
||||||
|
|
||||||
KeySize int
|
keySize int
|
||||||
KeySuffixRange int
|
keySuffixRange int
|
||||||
|
keyRangeLimit int
|
||||||
|
|
||||||
qps int
|
qps int
|
||||||
N int
|
N int
|
||||||
|
@ -171,11 +185,11 @@ func (s *stresser) Stress() error {
|
||||||
kvc := pb.NewKVClient(conn)
|
kvc := pb.NewKVClient(conn)
|
||||||
|
|
||||||
var stressEntries = []stressEntry{
|
var stressEntries = []stressEntry{
|
||||||
{weight: 0.7, f: newStressPut(kvc, s.KeySuffixRange, s.KeySize)},
|
{weight: 0.7, f: newStressPut(kvc, s.keySuffixRange, s.keySize)},
|
||||||
{weight: 0.07, f: newStressRange(kvc, s.KeySuffixRange)},
|
{weight: 0.07, f: newStressRange(kvc, s.keySuffixRange)},
|
||||||
{weight: 0.07, f: newStressRangePrefix(kvc, s.KeySuffixRange)},
|
{weight: 0.07, f: newStressRangeInterval(kvc, s.keySuffixRange, s.keyRangeLimit)},
|
||||||
{weight: 0.07, f: newStressDelete(kvc, s.KeySuffixRange)},
|
{weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)},
|
||||||
{weight: 0.07, f: newStressDeletePrefix(kvc, s.KeySuffixRange)},
|
{weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange, s.keyRangeLimit)},
|
||||||
}
|
}
|
||||||
s.stressTable = createStressTable(stressEntries)
|
s.stressTable = createStressTable(stressEntries)
|
||||||
|
|
||||||
|
@ -272,8 +286,8 @@ func (s *stresser) Report() (int, int) {
|
||||||
type stresserV2 struct {
|
type stresserV2 struct {
|
||||||
Endpoint string
|
Endpoint string
|
||||||
|
|
||||||
KeySize int
|
keySize int
|
||||||
KeySuffixRange int
|
keySuffixRange int
|
||||||
|
|
||||||
N int
|
N int
|
||||||
|
|
||||||
|
@ -308,8 +322,8 @@ func (s *stresserV2) Stress() error {
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
setctx, setcancel := context.WithTimeout(ctx, clientV2.DefaultRequestTimeout)
|
setctx, setcancel := context.WithTimeout(ctx, clientV2.DefaultRequestTimeout)
|
||||||
key := fmt.Sprintf("foo%d", rand.Intn(s.KeySuffixRange))
|
key := fmt.Sprintf("foo%d", rand.Intn(s.keySuffixRange))
|
||||||
_, err := kv.Set(setctx, key, string(randBytes(s.KeySize)), nil)
|
_, err := kv.Set(setctx, key, string(randBytes(s.keySize)), nil)
|
||||||
setcancel()
|
setcancel()
|
||||||
if err == context.Canceled {
|
if err == context.Canceled {
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in New Issue