tests: Implement LargePut requests

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
dependabot/go_modules/go.uber.org/atomic-1.10.0
Marek Siarkowicz 2023-01-24 21:43:57 +01:00
parent 91ec368927
commit ef0bdbe489
6 changed files with 111 additions and 39 deletions

View File

@ -47,24 +47,36 @@ var (
minimalQPS: 100,
maximalQPS: 200,
clientCount: 8,
traffic: readWriteSingleKey{keyCount: 4, leaseTTL: DefaultLeaseTTL, writes: []requestChance{
{operation: Put, chance: 50},
{operation: Delete, chance: 10},
{operation: PutWithLease, chance: 10},
{operation: LeaseRevoke, chance: 10},
{operation: CompareAndSet, chance: 10},
{operation: Defragment, chance: 10},
}},
traffic: traffic{
keyCount: 4,
leaseTTL: DefaultLeaseTTL,
largePutSize: 32769,
writes: []requestChance{
{operation: Put, chance: 50},
{operation: LargePut, chance: 5},
{operation: Delete, chance: 10},
{operation: PutWithLease, chance: 10},
{operation: LeaseRevoke, chance: 10},
{operation: CompareAndSet, chance: 10},
{operation: Defragment, chance: 5},
},
},
}
HighTraffic = trafficConfig{
name: "HighTraffic",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
traffic: readWriteSingleKey{keyCount: 4, leaseTTL: DefaultLeaseTTL, writes: []requestChance{
{operation: Put, chance: 90},
{operation: Defragment, chance: 10},
}},
traffic: traffic{
keyCount: 4,
largePutSize: 32769,
leaseTTL: DefaultLeaseTTL,
writes: []requestChance{
{operation: Put, chance: 90},
{operation: LargePut, chance: 5},
{operation: Defragment, chance: 5},
},
},
}
defaultTraffic = LowTraffic
trafficList = []trafficConfig{

View File

@ -213,7 +213,7 @@ func getRequest(key string) EtcdRequest {
}
func getResponse(value string, revision int64) EtcdResponse {
return EtcdResponse{Txn: &TxnResponse{OpsResult: []EtcdOperationResult{{Value: value}}}, Revision: revision}
return EtcdResponse{Txn: &TxnResponse{OpsResult: []EtcdOperationResult{{Value: ToValueOrHash(value)}}}, Revision: revision}
}
func failedResponse(err error) EtcdResponse {
@ -225,7 +225,7 @@ func unknownResponse(revision int64) EtcdResponse {
}
func putRequest(key, value string) EtcdRequest {
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Ops: []EtcdOperation{{Type: Put, Key: key, Value: value}}}}
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Ops: []EtcdOperation{{Type: Put, Key: key, Value: ToValueOrHash(value)}}}}
}
func putResponse(revision int64) EtcdResponse {
@ -241,7 +241,7 @@ func deleteResponse(deleted int64, revision int64) EtcdResponse {
}
func txnRequest(key, expectValue, newValue string) EtcdRequest {
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Conds: []EtcdCondition{{Key: key, ExpectedValue: expectValue}}, Ops: []EtcdOperation{{Type: Put, Key: key, Value: newValue}}}}
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Conds: []EtcdCondition{{Key: key, ExpectedValue: ToValueOrHash(expectValue)}}, Ops: []EtcdOperation{{Type: Put, Key: key, Value: ToValueOrHash(newValue)}}}}
}
func txnResponse(succeeded bool, revision int64) EtcdResponse {
@ -253,7 +253,7 @@ func txnResponse(succeeded bool, revision int64) EtcdResponse {
}
func putWithLeaseRequest(key, value string, leaseID int64) EtcdRequest {
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Ops: []EtcdOperation{{Type: Put, Key: key, Value: value, LeaseID: leaseID}}}}
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Ops: []EtcdOperation{{Type: Put, Key: key, Value: ToValueOrHash(value), LeaseID: leaseID}}}}
}
func leaseGrantRequest(leaseID int64) EtcdRequest {

View File

@ -18,6 +18,7 @@ import (
"encoding/json"
"fmt"
"github.com/anishathalye/porcupine"
"hash/fnv"
"reflect"
"strings"
)
@ -76,13 +77,13 @@ type TxnRequest struct {
type EtcdCondition struct {
Key string
ExpectedValue string
ExpectedValue ValueOrHash
}
type EtcdOperation struct {
Type OperationType
Key string
Value string
Value ValueOrHash
LeaseID int64
}
@ -120,7 +121,7 @@ func Match(r1, r2 EtcdResponse) bool {
}
type EtcdOperationResult struct {
Value string
Value ValueOrHash
Deleted int64
}
@ -134,11 +135,28 @@ type PossibleStates []EtcdState
type EtcdState struct {
Revision int64
KeyValues map[string]string
KeyValues map[string]ValueOrHash
KeyLeases map[string]int64
Leases map[int64]EtcdLease
}
type ValueOrHash struct {
Value string
Hash uint32
}
func ToValueOrHash(value string) ValueOrHash {
v := ValueOrHash{}
if len(value) < 20 {
v.Value = value
} else {
h := fnv.New32a()
h.Write([]byte(value))
v.Hash = h.Sum32()
}
return v
}
func describeEtcdRequestResponse(request EtcdRequest, response EtcdResponse) string {
return fmt.Sprintf("%s -> %s", describeEtcdRequest(request), describeEtcdResponse(request, response))
}
@ -181,7 +199,7 @@ func describeEtcdRequest(request EtcdRequest) string {
func describeEtcdConditions(conds []EtcdCondition) string {
opsDescription := make([]string, len(conds))
for i := range conds {
opsDescription[i] = fmt.Sprintf("%s==%q", conds[i].Key, conds[i].ExpectedValue)
opsDescription[i] = fmt.Sprintf("%s==%s", conds[i].Key, describeValueOrHash(conds[i].ExpectedValue))
}
return strings.Join(opsDescription, " && ")
}
@ -211,9 +229,9 @@ func describeEtcdOperation(op EtcdOperation) string {
return fmt.Sprintf("get(%q)", op.Key)
case Put:
if op.LeaseID != 0 {
return fmt.Sprintf("put(%q, %q, %d)", op.Key, op.Value, op.LeaseID)
return fmt.Sprintf("put(%q, %s, %d)", op.Key, describeValueOrHash(op.Value), op.LeaseID)
}
return fmt.Sprintf("put(%q, %q, nil)", op.Key, op.Value)
return fmt.Sprintf("put(%q, %s, nil)", op.Key, describeValueOrHash(op.Value))
case Delete:
return fmt.Sprintf("delete(%q)", op.Key)
default:
@ -224,10 +242,7 @@ func describeEtcdOperation(op EtcdOperation) string {
func describeEtcdOperationResponse(op OperationType, resp EtcdOperationResult) string {
switch op {
case Get:
if resp.Value == "" {
return "nil"
}
return fmt.Sprintf("%q", resp.Value)
return describeValueOrHash(resp.Value)
case Put:
return fmt.Sprintf("ok")
case Delete:
@ -237,6 +252,16 @@ func describeEtcdOperationResponse(op OperationType, resp EtcdOperationResult) s
}
}
func describeValueOrHash(value ValueOrHash) string {
if value.Hash != 0 {
return fmt.Sprintf("hash: %d", value.Hash)
}
if value.Value == "" {
return "nil"
}
return fmt.Sprintf("%q", value.Value)
}
func step(states PossibleStates, request EtcdRequest, response EtcdResponse) (bool, PossibleStates) {
if len(states) == 0 {
// states were not initialized
@ -257,7 +282,7 @@ func step(states PossibleStates, request EtcdRequest, response EtcdResponse) (bo
func initState(request EtcdRequest, response EtcdResponse) EtcdState {
state := EtcdState{
Revision: response.Revision,
KeyValues: map[string]string{},
KeyValues: map[string]ValueOrHash{},
KeyLeases: map[string]int64{},
Leases: map[int64]EtcdLease{},
}
@ -270,7 +295,7 @@ func initState(request EtcdRequest, response EtcdResponse) EtcdState {
opResp := response.Txn.OpsResult[i]
switch op.Type {
case Get:
if opResp.Value != "" {
if opResp.Value.Value != "" && opResp.Value.Hash == 0 {
state.KeyValues[op.Key] = opResp.Value
}
case Put:
@ -319,7 +344,7 @@ func applyRequest(states PossibleStates, request EtcdRequest, response EtcdRespo
// applyRequestToSingleState handles a successful request, returning updated state and response it would generate.
func applyRequestToSingleState(s EtcdState, request EtcdRequest) (EtcdState, EtcdResponse) {
newKVs := map[string]string{}
newKVs := map[string]ValueOrHash{}
for k, v := range s.KeyValues {
newKVs[k] = v
}

View File

@ -65,6 +65,17 @@ func TestModelStep(t *testing.T) {
{req: getRequest("key2"), resp: getResponse("12", 2)},
},
},
{
name: "Get response data should match large put",
operations: []testOperation{
{req: putRequest("key", "012345678901234567890"), resp: putResponse(1)},
{req: getRequest("key"), resp: getResponse("123456789012345678901", 1), failure: true},
{req: getRequest("key"), resp: getResponse("012345678901234567890", 1)},
{req: putRequest("key", "123456789012345678901"), resp: putResponse(2)},
{req: getRequest("key"), resp: getResponse("123456789012345678901", 2)},
{req: getRequest("key"), resp: getResponse("012345678901234567890", 2), failure: true},
},
},
{
name: "Put must increase revision by 1",
operations: []testOperation{
@ -612,6 +623,11 @@ func TestModelDescribe(t *testing.T) {
resp: getResponse("2", 2),
expectDescribe: `get("key2") -> "2", rev: 2`,
},
{
req: getRequest("key2b"),
resp: getResponse("01234567890123456789", 2),
expectDescribe: `get("key2b") -> hash: 2945867837, rev: 2`,
},
{
req: putRequest("key3", "3"),
resp: putResponse(3),
@ -622,6 +638,11 @@ func TestModelDescribe(t *testing.T) {
resp: putResponse(3),
expectDescribe: `put("key3b", "3b", 3) -> ok, rev: 3`,
},
{
req: putRequest("key3c", "01234567890123456789"),
resp: putResponse(3),
expectDescribe: `put("key3c", hash: 2945867837, nil) -> ok, rev: 3`,
},
{
req: putRequest("key4", "4"),
resp: failedResponse(errors.New("failed")),

View File

@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math/rand"
"strings"
"time"
"golang.org/x/time/rate"
@ -36,6 +37,7 @@ type TrafficRequestType string
const (
Get TrafficRequestType = "get"
Put TrafficRequestType = "put"
LargePut TrafficRequestType = "largePut"
Delete TrafficRequestType = "delete"
PutWithLease TrafficRequestType = "putWithLease"
LeaseRevoke TrafficRequestType = "leaseRevoke"
@ -47,10 +49,11 @@ type Traffic interface {
Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage)
}
type readWriteSingleKey struct {
keyCount int
writes []requestChance
leaseTTL int64
type traffic struct {
keyCount int
writes []requestChance
leaseTTL int64
largePutSize int
}
type requestChance struct {
@ -58,7 +61,7 @@ type requestChance struct {
chance int
}
func (t readWriteSingleKey) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage) {
func (t traffic) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage) {
for {
select {
@ -77,7 +80,7 @@ func (t readWriteSingleKey) Run(ctx context.Context, clientId int, c *recordingC
}
}
func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string) ([]*mvccpb.KeyValue, error) {
func (t traffic) Read(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string) ([]*mvccpb.KeyValue, error) {
getCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
resp, err := c.Get(getCtx, key)
cancel()
@ -87,13 +90,15 @@ func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limite
return resp, err
}
func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, newValue string, lm identity.LeaseIdStorage, cid int, lastValues []*mvccpb.KeyValue) error {
func (t traffic) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, newValue string, lm identity.LeaseIdStorage, cid int, lastValues []*mvccpb.KeyValue) error {
writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
var err error
switch t.pickWriteRequest() {
case Put:
err = c.Put(writeCtx, key, newValue)
case LargePut:
err = c.Put(writeCtx, key, randString(t.largePutSize))
case Delete:
err = c.Delete(writeCtx, key)
case CompareAndSet:
@ -137,7 +142,7 @@ func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limit
return err
}
func (t readWriteSingleKey) pickWriteRequest() TrafficRequestType {
func (t traffic) pickWriteRequest() TrafficRequestType {
sum := 0
for _, op := range t.writes {
sum += op.chance
@ -151,3 +156,12 @@ func (t readWriteSingleKey) pickWriteRequest() TrafficRequestType {
}
panic("unexpected")
}
func randString(size int) string {
data := strings.Builder{}
data.Grow(size)
for i := 0; i < size; i++ {
data.WriteByte(byte(int('a') + rand.Intn(26)))
}
return data.String()
}

View File

@ -83,7 +83,7 @@ func collectMemberWatchEvents(ctx context.Context, t *testing.T, c *clientv3.Cli
Op: model.EtcdOperation{
Type: op,
Key: string(event.Kv.Key),
Value: string(event.Kv.Value),
Value: model.ToValueOrHash(string(event.Kv.Value)),
},
})
}