functional-tester/tester: use "*clientv3.Client" for lease stresser

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
release-3.4
Gyuho Lee 2018-04-03 13:33:35 -07:00
parent 00ed41d175
commit a80a1a6356
1 changed files with 30 additions and 44 deletions

View File

@ -22,8 +22,8 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/tools/functional-tester/rpcpb" "github.com/coreos/etcd/tools/functional-tester/rpcpb"
"go.uber.org/zap" "go.uber.org/zap"
@ -41,11 +41,9 @@ type leaseStresser struct {
lg *zap.Logger lg *zap.Logger
m *rpcpb.Member m *rpcpb.Member
cancel func() cli *clientv3.Client
conn *grpc.ClientConn
kvc pb.KVClient
lc pb.LeaseClient
ctx context.Context ctx context.Context
cancel func()
rateLimiter *rate.Limiter rateLimiter *rate.Limiter
// atomicModifiedKey records the number of keys created and deleted during a test case // atomicModifiedKey records the number of keys created and deleted during a test case
@ -118,7 +116,6 @@ func (ls *leaseStresser) setupOnce() error {
} }
ls.aliveLeases = &atomicLeases{leases: make(map[int64]time.Time)} ls.aliveLeases = &atomicLeases{leases: make(map[int64]time.Time)}
return nil return nil
} }
@ -132,20 +129,19 @@ func (ls *leaseStresser) Stress() error {
return err return err
} }
conn, err := ls.m.DialEtcdGRPCServer(grpc.WithBackoffMaxDelay(1 * time.Second)) ctx, cancel := context.WithCancel(context.Background())
ls.ctx = ctx
ls.cancel = cancel
cli, err := ls.m.CreateEtcdClient(grpc.WithBackoffMaxDelay(1 * time.Second))
if err != nil { if err != nil {
return fmt.Errorf("%v (%s)", err, ls.m.EtcdClientEndpoint) return fmt.Errorf("%v (%s)", err, ls.m.EtcdClientEndpoint)
} }
ls.conn = conn ls.cli = cli
ls.kvc = pb.NewKVClient(conn)
ls.lc = pb.NewLeaseClient(conn)
ls.revokedLeases = &atomicLeases{leases: make(map[int64]time.Time)} ls.revokedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
ls.shortLivedLeases = &atomicLeases{leases: make(map[int64]time.Time)} ls.shortLivedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
ctx, cancel := context.WithCancel(context.Background())
ls.cancel = cancel
ls.ctx = ctx
ls.runWg.Add(1) ls.runWg.Add(1)
go ls.run() go ls.run()
return nil return nil
@ -299,17 +295,17 @@ func (ls *leaseStresser) randomlyDropLeases() {
} }
func (ls *leaseStresser) createLease(ttl int64) (int64, error) { func (ls *leaseStresser) createLease(ttl int64) (int64, error) {
resp, err := ls.lc.LeaseGrant(ls.ctx, &pb.LeaseGrantRequest{TTL: ttl}) resp, err := ls.cli.Grant(ls.ctx, ttl)
if err != nil { if err != nil {
return -1, err return -1, err
} }
return resp.ID, nil return int64(resp.ID), nil
} }
func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
defer ls.aliveWg.Done() defer ls.aliveWg.Done()
ctx, cancel := context.WithCancel(ls.ctx) ctx, cancel := context.WithCancel(ls.ctx)
stream, err := ls.lc.LeaseKeepAlive(ctx) stream, err := ls.cli.KeepAlive(ctx, clientv3.LeaseID(leaseID))
defer func() { cancel() }() defer func() { cancel() }()
for { for {
select { select {
@ -347,36 +343,21 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
) )
cancel() cancel()
ctx, cancel = context.WithCancel(ls.ctx) ctx, cancel = context.WithCancel(ls.ctx)
stream, err = ls.lc.LeaseKeepAlive(ctx) stream, err = ls.cli.KeepAlive(ctx, clientv3.LeaseID(leaseID))
cancel() cancel()
continue continue
} }
ls.lg.Debug( ls.lg.Debug(
"keepLeaseAlive stream sends lease keepalive request", "keepLeaseAlive waiting on lease stream",
zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
) )
err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID})
if err != nil {
ls.lg.Debug(
"keepLeaseAlive stream failed to send lease keepalive request",
zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err),
)
continue
}
leaseRenewTime := time.Now() leaseRenewTime := time.Now()
ls.lg.Debug( respRC := <-stream
"keepLeaseAlive stream sent lease keepalive request",
zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
)
respRC, err := stream.Recv()
if err != nil { if err != nil {
ls.lg.Debug( ls.lg.Debug(
"keepLeaseAlive stream failed to receive lease keepalive response", "keepLeaseAlive failed to receive lease keepalive response",
zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err), zap.Error(err),
@ -409,16 +390,18 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
// the format of key is the concat of leaseID + '_' + '<order of key creation>' // the format of key is the concat of leaseID + '_' + '<order of key creation>'
// e.g 5186835655248304152_0 for first created key and 5186835655248304152_1 for second created key // e.g 5186835655248304152_0 for first created key and 5186835655248304152_1 for second created key
func (ls *leaseStresser) attachKeysWithLease(leaseID int64) error { func (ls *leaseStresser) attachKeysWithLease(leaseID int64) error {
var txnPuts []*pb.RequestOp var txnPuts []clientv3.Op
for j := 0; j < ls.keysPerLease; j++ { for j := 0; j < ls.keysPerLease; j++ {
txnput := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte(fmt.Sprintf("%d%s%d", leaseID, "_", j)), txnput := clientv3.OpPut(
Value: []byte(fmt.Sprintf("bar")), Lease: leaseID}}} fmt.Sprintf("%d%s%d", leaseID, "_", j),
fmt.Sprintf("bar"),
clientv3.WithLease(clientv3.LeaseID(leaseID)),
)
txnPuts = append(txnPuts, txnput) txnPuts = append(txnPuts, txnput)
} }
// keep retrying until lease is not found or ctx is being canceled // keep retrying until lease is not found or ctx is being canceled
for ls.ctx.Err() == nil { for ls.ctx.Err() == nil {
txn := &pb.TxnRequest{Success: txnPuts} _, err := ls.cli.Txn(ls.ctx).Then(txnPuts...).Commit()
_, err := ls.kvc.Txn(ls.ctx, txn)
if err == nil { if err == nil {
// since all created keys will be deleted too, the number of operations on keys will be roughly 2x the number of created keys // since all created keys will be deleted too, the number of operations on keys will be roughly 2x the number of created keys
atomic.AddInt64(&ls.atomicModifiedKey, 2*int64(ls.keysPerLease)) atomic.AddInt64(&ls.atomicModifiedKey, 2*int64(ls.keysPerLease))
@ -437,9 +420,10 @@ func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) {
if rand.Intn(2) != 0 { if rand.Intn(2) != 0 {
return false, nil return false, nil
} }
// keep retrying until a lease is dropped or ctx is being canceled // keep retrying until a lease is dropped or ctx is being canceled
for ls.ctx.Err() == nil { for ls.ctx.Err() == nil {
_, err := ls.lc.LeaseRevoke(ls.ctx, &pb.LeaseRevokeRequest{ID: leaseID}) _, err := ls.cli.Revoke(ls.ctx, clientv3.LeaseID(leaseID))
if err == nil || rpctypes.Error(err) == rpctypes.ErrLeaseNotFound { if err == nil || rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
return true, nil return true, nil
} }
@ -454,7 +438,9 @@ func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) {
return false, ls.ctx.Err() return false, ls.ctx.Err()
} }
func (ls *leaseStresser) Pause() { ls.Close() } func (ls *leaseStresser) Pause() {
ls.Close()
}
func (ls *leaseStresser) Close() { func (ls *leaseStresser) Close() {
ls.lg.Info( ls.lg.Info(
@ -464,7 +450,7 @@ func (ls *leaseStresser) Close() {
ls.cancel() ls.cancel()
ls.runWg.Wait() ls.runWg.Wait()
ls.aliveWg.Wait() ls.aliveWg.Wait()
ls.conn.Close() ls.cli.Close()
ls.lg.Info( ls.lg.Info(
"lease stresser is closed", "lease stresser is closed",
zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),