tests: Make linearizability traffic extendable

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
dependabot/go_modules/go.uber.org/atomic-1.10.0
Marek Siarkowicz 2022-11-14 23:09:53 +01:00
parent ca8baeb308
commit 2fc1485f29
3 changed files with 68 additions and 23 deletions

View File

@ -83,7 +83,7 @@ func TestLinearizability(t *testing.T) {
minimalQPS: minimalQPS,
maximalQPS: maximalQPS,
clientCount: 8,
traffic: PutGetTraffic,
traffic: DefaultTraffic,
}
testLinearizability(context.Background(), t, tc.config, failpoint, traffic)
})

View File

@ -21,10 +21,12 @@ import (
"github.com/anishathalye/porcupine"
)
type Operation int8
type Operation string
const Get Operation = 0
const Put Operation = 1
const (
Get Operation = "get"
Put Operation = "put"
)
type etcdRequest struct {
op Operation

View File

@ -17,48 +17,91 @@ package linearizability
import (
"context"
"fmt"
"math/rand"
"time"
"golang.org/x/time/rate"
)
var (
PutGetTraffic Traffic = putGetTraffic{}
DefaultTraffic Traffic = readWriteSingleKey{key: "key", writes: []opChance{{operation: Put, chance: 100}}}
)
type Traffic interface {
Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter)
}
type putGetTraffic struct{}
type readWriteSingleKey struct {
key string
writes []opChance
}
func (t putGetTraffic) Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter) {
type opChance struct {
operation Operation
chance int
}
func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter) {
maxOperationsPerClient := 1000000
id := maxOperationsPerClient * c.id
key := "key"
minId := maxOperationsPerClient * c.id
maxId := maxOperationsPerClient * (c.id + 1)
for i := 0; i < maxOperationsPerClient; {
for writeId := minId; writeId < maxId; {
select {
case <-ctx.Done():
return
default:
}
getCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond)
err := c.Get(getCtx, key)
cancel()
// Execute one read per one write to avoid operation history include too many failed writes when etcd is down.
err := t.Read(ctx, c, limiter)
if err != nil {
continue
}
limiter.Wait(ctx)
putData := fmt.Sprintf("%d", id+i)
putCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond)
err = c.Put(putCtx, key, putData)
cancel()
if err != nil {
continue
}
limiter.Wait(ctx)
i++
// Provide each write with unique id to make it easier to validate operation history.
t.Write(ctx, c, limiter, writeId)
writeId++
}
return
}
func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limiter *rate.Limiter) error {
getCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond)
err := c.Get(getCtx, t.key)
cancel()
if err == nil {
limiter.Wait(ctx)
}
return err
}
func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, id int) error {
putCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond)
var err error
switch t.pickWriteOperation() {
case Put:
err = c.Put(putCtx, t.key, fmt.Sprintf("%d", id))
default:
panic("invalid operation")
}
cancel()
if err == nil {
limiter.Wait(ctx)
}
return err
}
func (t readWriteSingleKey) pickWriteOperation() Operation {
sum := 0
for _, op := range t.writes {
sum += op.chance
}
roll := rand.Int() % sum
for _, op := range t.writes {
if roll < op.chance {
return op.operation
}
roll -= op.chance
}
panic("unexpected")
}