Merge pull request #16092 from serathius/robustness-etcdctl-traffic-client

Robustness etcd traffic client
dependabot/go_modules/tools/mod/golang.org/x/sync-0.3.0
Marek Siarkowicz 2023-06-19 16:10:55 +02:00 committed by GitHub
commit 1420292b10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 39 additions and 24 deletions

View File

@ -100,6 +100,13 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.
lastOperationSucceeded := true
var lastRev int64
var requestType etcdRequestType
client := etcdTrafficClient{
etcdTraffic: t,
client: c,
limiter: limiter,
idProvider: ids,
leaseStorage: lm,
}
for {
select {
case <-ctx.Done():
@ -115,7 +122,7 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.
} else {
requestType = Get
}
rev, err := t.Request(ctx, c, requestType, limiter, key, ids, lm, lastRev)
rev, err := client.Request(ctx, requestType, key, lastRev)
lastOperationSucceeded = err == nil
if err != nil {
continue
@ -127,87 +134,95 @@ func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.
}
}
func (t etcdTraffic) Request(ctx context.Context, c *RecordingClient, request etcdRequestType, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage, lastRev int64) (rev int64, err error) {
type etcdTrafficClient struct {
etcdTraffic
client *RecordingClient
limiter *rate.Limiter
idProvider identity.Provider
leaseStorage identity.LeaseIdStorage
}
func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType, key string, lastRev int64) (rev int64, err error) {
opCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
switch request {
case StaleGet:
_, rev, err = c.Get(opCtx, key, lastRev)
_, rev, err = c.client.Get(opCtx, key, lastRev)
case Get:
_, rev, err = c.Get(opCtx, key, 0)
_, rev, err = c.client.Get(opCtx, key, 0)
case Put:
var resp *clientv3.PutResponse
resp, err = c.Put(opCtx, key, fmt.Sprintf("%d", id.NewRequestId()))
resp, err = c.client.Put(opCtx, key, fmt.Sprintf("%d", c.idProvider.NewRequestId()))
if resp != nil {
rev = resp.Header.Revision
}
case LargePut:
var resp *clientv3.PutResponse
resp, err = c.Put(opCtx, key, randString(t.largePutSize))
resp, err = c.client.Put(opCtx, key, randString(c.largePutSize))
if resp != nil {
rev = resp.Header.Revision
}
case Delete:
var resp *clientv3.DeleteResponse
resp, err = c.Delete(opCtx, key)
resp, err = c.client.Delete(opCtx, key)
if resp != nil {
rev = resp.Header.Revision
}
case MultiOpTxn:
var resp *clientv3.TxnResponse
resp, err = c.Txn(opCtx, nil, t.pickMultiTxnOps(id), nil)
resp, err = c.client.Txn(opCtx, nil, c.pickMultiTxnOps(), nil)
if resp != nil {
rev = resp.Header.Revision
}
case CompareAndSet:
var kv *mvccpb.KeyValue
kv, rev, err = c.Get(opCtx, key, 0)
kv, rev, err = c.client.Get(opCtx, key, 0)
if err == nil {
limiter.Wait(ctx)
c.limiter.Wait(ctx)
var expectedRevision int64
if kv != nil {
expectedRevision = kv.ModRevision
}
txnCtx, txnCancel := context.WithTimeout(ctx, RequestTimeout)
var resp *clientv3.TxnResponse
resp, err = c.Txn(txnCtx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, fmt.Sprintf("%d", id.NewRequestId()))}, nil)
resp, err = c.client.Txn(txnCtx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, fmt.Sprintf("%d", c.idProvider.NewRequestId()))}, nil)
txnCancel()
if resp != nil {
rev = resp.Header.Revision
}
}
case PutWithLease:
leaseId := lm.LeaseId(c.id)
leaseId := c.leaseStorage.LeaseId(c.client.id)
if leaseId == 0 {
var resp *clientv3.LeaseGrantResponse
resp, err = c.LeaseGrant(opCtx, t.leaseTTL)
resp, err = c.client.LeaseGrant(opCtx, c.leaseTTL)
if resp != nil {
leaseId = int64(resp.ID)
rev = resp.ResponseHeader.Revision
}
if err == nil {
lm.AddLeaseId(c.id, leaseId)
limiter.Wait(ctx)
c.leaseStorage.AddLeaseId(c.client.id, leaseId)
c.limiter.Wait(ctx)
}
}
if leaseId != 0 {
putCtx, putCancel := context.WithTimeout(ctx, RequestTimeout)
var resp *clientv3.PutResponse
resp, err = c.PutWithLease(putCtx, key, fmt.Sprintf("%d", id.NewRequestId()), leaseId)
resp, err = c.client.PutWithLease(putCtx, key, fmt.Sprintf("%d", c.idProvider.NewRequestId()), leaseId)
putCancel()
if resp != nil {
rev = resp.Header.Revision
}
}
case LeaseRevoke:
leaseId := lm.LeaseId(c.id)
leaseId := c.leaseStorage.LeaseId(c.client.id)
if leaseId != 0 {
var resp *clientv3.LeaseRevokeResponse
resp, err = c.LeaseRevoke(opCtx, leaseId)
resp, err = c.client.LeaseRevoke(opCtx, leaseId)
//if LeaseRevoke has failed, do not remove the mapping.
if err == nil {
lm.RemoveLeaseId(c.id)
c.leaseStorage.RemoveLeaseId(c.client.id)
}
if resp != nil {
rev = resp.Header.Revision
@ -215,7 +230,7 @@ func (t etcdTraffic) Request(ctx context.Context, c *RecordingClient, request et
}
case Defragment:
var resp *clientv3.DefragmentResponse
resp, err = c.Defragment(opCtx)
resp, err = c.client.Defragment(opCtx)
if resp != nil {
rev = resp.Header.Revision
}
@ -226,13 +241,13 @@ func (t etcdTraffic) Request(ctx context.Context, c *RecordingClient, request et
return rev, err
}
func (t etcdTraffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op) {
keys := rand.Perm(t.keyCount)
func (c etcdTrafficClient) pickMultiTxnOps() (ops []clientv3.Op) {
keys := rand.Perm(c.keyCount)
opTypes := make([]model.OperationType, 4)
atLeastOnePut := false
for i := 0; i < MultiOpTxnOpCount; i++ {
opTypes[i] = t.pickOperationType()
opTypes[i] = c.pickOperationType()
if opTypes[i] == model.PutOperation {
atLeastOnePut = true
}
@ -248,7 +263,7 @@ func (t etcdTraffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op)
case model.RangeOperation:
ops = append(ops, clientv3.OpGet(key))
case model.PutOperation:
value := fmt.Sprintf("%d", ids.NewRequestId())
value := fmt.Sprintf("%d", c.idProvider.NewRequestId())
ops = append(ops, clientv3.OpPut(key, value))
case model.DeleteOperation:
ops = append(ops, clientv3.OpDelete(key))