Merge pull request #15880 from serathius/robustness-stream-id

tests/robustness: Separate stream id from client id and improve AppendableHistory doc
dependabot/go_modules/github.com/prometheus/procfs-0.11.0
Benjamin Wang 2023-05-12 06:35:05 +08:00 committed by GitHub
commit fa7067d0d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 40 additions and 36 deletions

View File

@ -17,8 +17,10 @@ package identity
import "sync/atomic"
type Provider interface {
ClientId() int
RequestId() int
// NewStreamId returns an integer starting from zero to make it render nicely by porcupine visualization.
NewStreamId() int
// NewRequestId returns unique identification used to make write requests unique.
NewRequestId() int
}
func NewIdProvider() Provider {
@ -26,15 +28,14 @@ func NewIdProvider() Provider {
}
type atomicProvider struct {
clientId atomic.Int64
streamId atomic.Int64
requestId atomic.Int64
}
func (id *atomicProvider) ClientId() int {
// Substract one as ClientId should start from zero.
return int(id.clientId.Add(1) - 1)
func (id *atomicProvider) NewStreamId() int {
return int(id.streamId.Add(1) - 1)
}
func (id *atomicProvider) RequestId() int {
func (id *atomicProvider) NewRequestId() int {
return int(id.requestId.Add(1))
}

View File

@ -46,15 +46,18 @@ func ValidateOperationHistoryAndReturnVisualize(t *testing.T, lg *zap.Logger, op
}
}
// AppendableHistory allows to collect history of operations. Appending needs to
// be done in order of operation execution time (start, end time). All Operations
// time is expected to be calculated as time.Since common base time.
// AppendableHistory allows to collect history of sequential operations.
//
// Appending validates that operations don't overlap and properly handles failed
// requests.
// Ensures that operation history is compatible with porcupine library, by preventing concurrent requests sharing the
// same stream id. For failed requests, we don't know their return time, so generate new stream id.
//
// Appending needs to be done in order of operation execution time (start, end time).
// Operations time should be calculated as time.Since common base time to ensure that Go monotonic time is used.
// More in https://github.com/golang/go/blob/96add980ad27faed627f26ef1ab09e8fe45d6bd1/src/time/time.go#L10.
type AppendableHistory struct {
// id of the next write operation. If needed a new id might be requested from idProvider.
id int
// streamId for the next operation. Used for porcupine.Operation.ClientId as porcupine assumes no concurrent requests.
streamId int
// If needed a new streamId is requested from idProvider.
idProvider identity.Provider
History
@ -62,7 +65,7 @@ type AppendableHistory struct {
func NewAppendableHistory(ids identity.Provider) *AppendableHistory {
return &AppendableHistory{
id: ids.ClientId(),
streamId: ids.NewStreamId(),
idProvider: ids,
History: History{
successful: []porcupine.Operation{},
@ -77,7 +80,7 @@ func (h *AppendableHistory) AppendRange(key string, withPrefix bool, start, end
revision = resp.Header.Revision
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.id,
ClientId: h.streamId,
Input: rangeRequest(key, withPrefix, 0),
Call: start.Nanoseconds(),
Output: rangeResponse(resp.Kvs, resp.Count, revision),
@ -96,7 +99,7 @@ func (h *AppendableHistory) AppendPut(key, value string, start, end time.Duratio
revision = resp.Header.Revision
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.id,
ClientId: h.streamId,
Input: request,
Call: start.Nanoseconds(),
Output: putResponse(revision),
@ -115,7 +118,7 @@ func (h *AppendableHistory) AppendPutWithLease(key, value string, leaseID int64,
revision = resp.Header.Revision
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.id,
ClientId: h.streamId,
Input: request,
Call: start.Nanoseconds(),
Output: putResponse(revision),
@ -138,7 +141,7 @@ func (h *AppendableHistory) AppendLeaseGrant(start, end time.Duration, resp *cli
revision = resp.ResponseHeader.Revision
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.id,
ClientId: h.streamId,
Input: request,
Call: start.Nanoseconds(),
Output: leaseGrantResponse(revision),
@ -157,7 +160,7 @@ func (h *AppendableHistory) AppendLeaseRevoke(id int64, start, end time.Duration
revision = resp.Header.Revision
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.id,
ClientId: h.streamId,
Input: request,
Call: start.Nanoseconds(),
Output: leaseRevokeResponse(revision),
@ -178,7 +181,7 @@ func (h *AppendableHistory) AppendDelete(key string, start, end time.Duration, r
deleted = resp.Deleted
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.id,
ClientId: h.streamId,
Input: request,
Call: start.Nanoseconds(),
Output: deleteResponse(deleted, revision),
@ -201,7 +204,7 @@ func (h *AppendableHistory) AppendCompareRevisionAndDelete(key string, expectedR
deleted = resp.Responses[0].GetResponseDeleteRange().Deleted
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.id,
ClientId: h.streamId,
Input: request,
Call: start.Nanoseconds(),
Output: compareRevisionAndDeleteResponse(resp.Succeeded, deleted, revision),
@ -220,7 +223,7 @@ func (h *AppendableHistory) AppendCompareRevisionAndPut(key string, expectedRevi
revision = resp.Header.Revision
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.id,
ClientId: h.streamId,
Input: request,
Call: start.Nanoseconds(),
Output: compareRevisionAndPutResponse(resp.Succeeded, revision),
@ -251,7 +254,7 @@ func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, onSuccess []clientv3.O
results = append(results, toEtcdOperationResult(resp))
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.id,
ClientId: h.streamId,
Input: request,
Call: start.Nanoseconds(),
Output: txnResponse(results, resp.Succeeded, revision),
@ -352,7 +355,7 @@ func (h *AppendableHistory) AppendDefragment(start, end time.Duration, resp *cli
revision = resp.Header.Revision
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.id,
ClientId: h.streamId,
Input: request,
Call: start.Nanoseconds(),
Output: defragmentResponse(revision),
@ -377,15 +380,15 @@ func (h *AppendableHistory) appendFailed(request EtcdRequest, call int64, err er
}
}
h.failed = append(h.failed, porcupine.Operation{
ClientId: h.id,
ClientId: h.streamId,
Input: request,
Call: call,
Output: failedResponse(err),
Return: 0, // For failed writes we don't know when request has really finished.
})
// Operations of single client needs to be sequential.
// As we don't know return time of failed operations, all new writes need to be done with new client id.
h.id = h.idProvider.ClientId()
// As we don't know return time of failed operations, all new writes need to be done with new stream id.
h.streamId = h.idProvider.NewStreamId()
}
func getRequest(key string) EtcdRequest {

View File

@ -30,7 +30,7 @@ import (
// RecordingClient provides a semi etcd client (different interface than
// clientv3.Client) that records all the requests and responses made. Doesn't
// allow for concurrent requests to ensure correct appending to history.
// allow for concurrent requests to confirm to model.AppendableHistory requirements.
type RecordingClient struct {
client clientv3.Client
baseTime time.Time

View File

@ -124,7 +124,7 @@ func (t etcdTraffic) Write(ctx context.Context, c *RecordingClient, limiter *rat
var err error
switch etcdRequestType(pickRandom(t.writeChoices)) {
case Put:
err = c.Put(writeCtx, key, fmt.Sprintf("%d", id.RequestId()))
err = c.Put(writeCtx, key, fmt.Sprintf("%d", id.NewRequestId()))
case LargePut:
err = c.Put(writeCtx, key, randString(t.largePutSize))
case Delete:
@ -136,7 +136,7 @@ func (t etcdTraffic) Write(ctx context.Context, c *RecordingClient, limiter *rat
if lastValues != nil {
expectRevision = lastValues.ModRevision
}
err = c.CompareRevisionAndPut(writeCtx, key, fmt.Sprintf("%d", id.RequestId()), expectRevision)
err = c.CompareRevisionAndPut(writeCtx, key, fmt.Sprintf("%d", id.NewRequestId()), expectRevision)
case PutWithLease:
leaseId := lm.LeaseId(cid)
if leaseId == 0 {
@ -148,7 +148,7 @@ func (t etcdTraffic) Write(ctx context.Context, c *RecordingClient, limiter *rat
}
if leaseId != 0 {
putCtx, putCancel := context.WithTimeout(ctx, RequestTimeout)
err = c.PutWithLease(putCtx, key, fmt.Sprintf("%d", id.RequestId()), leaseId)
err = c.PutWithLease(putCtx, key, fmt.Sprintf("%d", id.NewRequestId()), leaseId)
putCancel()
}
case LeaseRevoke:
@ -191,7 +191,7 @@ func (t etcdTraffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op)
case model.Range:
ops = append(ops, clientv3.OpGet(key))
case model.Put:
value := fmt.Sprintf("%d", ids.RequestId())
value := fmt.Sprintf("%d", ids.NewRequestId())
ops = append(ops, clientv3.OpPut(key, value))
case model.Delete:
ops = append(ops, clientv3.OpDelete(key))

View File

@ -85,7 +85,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *RecordingCl
func (t kubernetesTraffic) Write(ctx context.Context, c *RecordingClient, ids identity.Provider, objects []*mvccpb.KeyValue) (err error) {
writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
if len(objects) < t.averageKeyCount/2 {
err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.RequestId()))
err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId()))
} else {
randomPod := objects[rand.Intn(len(objects))]
if len(objects) > t.averageKeyCount*3/2 {
@ -96,9 +96,9 @@ func (t kubernetesTraffic) Write(ctx context.Context, c *RecordingClient, ids id
case KubernetesDelete:
err = t.Delete(writeCtx, c, string(randomPod.Key), randomPod.ModRevision)
case KubernetesUpdate:
err = t.Update(writeCtx, c, string(randomPod.Key), fmt.Sprintf("%d", ids.RequestId()), randomPod.ModRevision)
err = t.Update(writeCtx, c, string(randomPod.Key), fmt.Sprintf("%d", ids.NewRequestId()), randomPod.ModRevision)
case KubernetesCreate:
err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.RequestId()))
err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId()))
default:
panic(fmt.Sprintf("invalid choice: %q", op))
}