tests: Move linearizability model and identity to dedicated packages

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
dependabot/go_modules/go.uber.org/atomic-1.10.0
Marek Siarkowicz 2023-01-16 19:16:21 +01:00
parent 677e5281e0
commit 96e2a7fbd6
9 changed files with 104 additions and 96 deletions

View File

@ -22,14 +22,16 @@ import (
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/linearizability/identity"
"go.etcd.io/etcd/tests/v3/linearizability/model"
)
type recordingClient struct {
client clientv3.Client
history *appendableHistory
history *model.AppendableHistory
}
func NewClient(endpoints []string, ids idProvider) (*recordingClient, error) {
func NewClient(endpoints []string, ids identity.Provider) (*recordingClient, error) {
cc, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
Logger: zap.NewNop(),
@ -41,7 +43,7 @@ func NewClient(endpoints []string, ids idProvider) (*recordingClient, error) {
}
return &recordingClient{
client: *cc,
history: newAppendableHistory(ids),
history: model.NewAppendableHistory(ids),
}, nil
}

View File

@ -12,16 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package linearizability
package identity
import "sync/atomic"
type idProvider interface {
type Provider interface {
ClientId() int
RequestId() int
}
func newIdProvider() idProvider {
func NewIdProvider() Provider {
return &atomicProvider{}
}

View File

@ -12,19 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package linearizability
package identity
import (
"sync"
)
type clientId2LeaseIdMapper interface {
type LeaseIdStorage interface {
LeaseId(int) int64
AddLeaseId(int, int64)
RemoveLeaseId(int)
}
func newClientId2LeaseIdMapper() clientId2LeaseIdMapper {
func NewLeaseIdStorage() LeaseIdStorage {
return &atomicClientId2LeaseIdMapper{m: map[int]int64{}}
}

View File

@ -31,6 +31,8 @@ import (
"golang.org/x/time/rate"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/linearizability/identity"
"go.etcd.io/etcd/tests/v3/linearizability/model"
)
const (
@ -139,14 +141,14 @@ func testLinearizability(ctx context.Context, t *testing.T, clus *e2e.EtcdProces
func patchOperationBasedOnWatchEvents(operations []porcupine.Operation, watchEvents []watchEvent) []porcupine.Operation {
newOperations := make([]porcupine.Operation, 0, len(operations))
persisted := map[EtcdOperation]watchEvent{}
persisted := map[model.EtcdOperation]watchEvent{}
for _, op := range watchEvents {
persisted[op.Op] = op
}
lastObservedOperation := lastOperationObservedInWatch(operations, persisted)
for _, op := range operations {
resp := op.Output.(EtcdResponse)
resp := op.Output.(model.EtcdResponse)
if resp.Err == nil || op.Call > lastObservedOperation.Call {
// No need to patch successfully requests and cannot patch requests outside observed window.
newOperations = append(newOperations, op)
@ -156,7 +158,7 @@ func patchOperationBasedOnWatchEvents(operations []porcupine.Operation, watchEve
if event != nil {
// Set revision and time based on watchEvent.
op.Return = event.Time.UnixNano()
op.Output = EtcdResponse{
op.Output = model.EtcdResponse{
Revision: event.Revision,
ResultUnknown: true,
}
@ -173,7 +175,7 @@ func patchOperationBasedOnWatchEvents(operations []porcupine.Operation, watchEve
return newOperations
}
func lastOperationObservedInWatch(operations []porcupine.Operation, watchEvents map[EtcdOperation]watchEvent) porcupine.Operation {
func lastOperationObservedInWatch(operations []porcupine.Operation, watchEvents map[model.EtcdOperation]watchEvent) porcupine.Operation {
var maxCallTime int64
var lastOperation porcupine.Operation
for _, op := range operations {
@ -186,17 +188,17 @@ func lastOperationObservedInWatch(operations []porcupine.Operation, watchEvents
return lastOperation
}
func matchWatchEvent(op porcupine.Operation, watchEvents map[EtcdOperation]watchEvent) (event *watchEvent, hasUniqueWriteOperation bool) {
request := op.Input.(EtcdRequest)
func matchWatchEvent(op porcupine.Operation, watchEvents map[model.EtcdOperation]watchEvent) (event *watchEvent, hasUniqueWriteOperation bool) {
request := op.Input.(model.EtcdRequest)
for _, etcdOp := range request.Ops {
if isWrite(etcdOp.Type) && inUnique(etcdOp.Type) {
if model.IsWrite(etcdOp.Type) && model.IsUnique(etcdOp.Type) {
// We expect all put to be unique as they write unique value.
hasUniqueWriteOperation = true
opType := etcdOp.Type
if opType == PutWithLease {
opType = Put
if opType == model.PutWithLease {
opType = model.Put
}
event, ok := watchEvents[EtcdOperation{
event, ok := watchEvents[model.EtcdOperation{
Type: opType,
Key: etcdOp.Key,
Value: etcdOp.Value,
@ -210,9 +212,9 @@ func matchWatchEvent(op porcupine.Operation, watchEvents map[EtcdOperation]watch
}
func hasWriteOperation(op porcupine.Operation) bool {
request := op.Input.(EtcdRequest)
request := op.Input.(model.EtcdRequest)
for _, etcdOp := range request.Ops {
if isWrite(etcdOp.Type) {
if model.IsWrite(etcdOp.Type) {
return true
}
}
@ -255,9 +257,9 @@ func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessClu
mux := sync.Mutex{}
endpoints := clus.EndpointsV3()
ids := newIdProvider()
lm := newClientId2LeaseIdMapper()
h := history{}
ids := identity.NewIdProvider()
lm := identity.NewLeaseIdStorage()
h := model.History{}
limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200)
startTime := time.Now()
@ -269,15 +271,15 @@ func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessClu
if err != nil {
t.Fatal(err)
}
go func(c *recordingClient) {
go func(c *recordingClient, clientId int) {
defer wg.Done()
defer c.Close()
config.traffic.Run(ctx, c, limiter, ids, lm)
config.traffic.Run(ctx, clientId, c, limiter, ids, lm)
mux.Lock()
h = h.Merge(c.history.history)
h = h.Merge(c.history.History)
mux.Unlock()
}(c)
}(c, i)
}
wg.Wait()
endTime := time.Now()
@ -322,7 +324,7 @@ func checkOperationsAndPersistResults(t *testing.T, operations []porcupine.Opera
t.Error(err)
}
linearizable, info := porcupine.CheckOperationsVerbose(etcdModel, operations, 0)
linearizable, info := porcupine.CheckOperationsVerbose(model.Etcd, operations, 0)
if linearizable != porcupine.Ok {
t.Error("Model is not linearizable")
persistMemberDataDir(t, clus, path)
@ -330,7 +332,7 @@ func checkOperationsAndPersistResults(t *testing.T, operations []porcupine.Opera
visualizationPath := filepath.Join(path, "history.html")
t.Logf("saving visualization to %q", visualizationPath)
err = porcupine.VisualizePath(etcdModel, info, visualizationPath)
err = porcupine.VisualizePath(model.Etcd, info, visualizationPath)
if err != nil {
t.Errorf("Failed to visualize, err: %v", err)
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package linearizability
package model
import (
"time"
@ -20,28 +20,29 @@ import (
"github.com/anishathalye/porcupine"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/linearizability/identity"
)
type appendableHistory struct {
type AppendableHistory struct {
// id of the next write operation. If needed a new id might be requested from idProvider.
id int
idProvider idProvider
idProvider identity.Provider
history
History
}
func newAppendableHistory(ids idProvider) *appendableHistory {
return &appendableHistory{
func NewAppendableHistory(ids identity.Provider) *AppendableHistory {
return &AppendableHistory{
id: ids.ClientId(),
idProvider: ids,
history: history{
History: History{
successful: []porcupine.Operation{},
failed: []porcupine.Operation{},
},
}
}
func (h *appendableHistory) AppendGet(key string, start, end time.Time, resp *clientv3.GetResponse) {
func (h *AppendableHistory) AppendGet(key string, start, end time.Time, resp *clientv3.GetResponse) {
var readData string
if len(resp.Kvs) == 1 {
readData = string(resp.Kvs[0].Value)
@ -59,7 +60,7 @@ func (h *appendableHistory) AppendGet(key string, start, end time.Time, resp *cl
})
}
func (h *appendableHistory) AppendPut(key, value string, start, end time.Time, resp *clientv3.PutResponse, err error) {
func (h *AppendableHistory) AppendPut(key, value string, start, end time.Time, resp *clientv3.PutResponse, err error) {
request := putRequest(key, value)
if err != nil {
h.appendFailed(request, start, err)
@ -78,7 +79,7 @@ func (h *appendableHistory) AppendPut(key, value string, start, end time.Time, r
})
}
func (h *appendableHistory) AppendPutWithLease(key, value string, leaseID int64, start, end time.Time, resp *clientv3.PutResponse, err error) {
func (h *AppendableHistory) AppendPutWithLease(key, value string, leaseID int64, start, end time.Time, resp *clientv3.PutResponse, err error) {
request := putWithLeaseRequest(key, value, leaseID)
if err != nil {
h.appendFailed(request, start, err)
@ -97,7 +98,7 @@ func (h *appendableHistory) AppendPutWithLease(key, value string, leaseID int64,
})
}
func (h *appendableHistory) AppendLeaseGrant(start, end time.Time, resp *clientv3.LeaseGrantResponse, err error) {
func (h *AppendableHistory) AppendLeaseGrant(start, end time.Time, resp *clientv3.LeaseGrantResponse, err error) {
var leaseID int64
if resp != nil {
leaseID = int64(resp.ID)
@ -120,7 +121,7 @@ func (h *appendableHistory) AppendLeaseGrant(start, end time.Time, resp *clientv
})
}
func (h *appendableHistory) AppendLeaseRevoke(id int64, start time.Time, end time.Time, resp *clientv3.LeaseRevokeResponse, err error) {
func (h *AppendableHistory) AppendLeaseRevoke(id int64, start time.Time, end time.Time, resp *clientv3.LeaseRevokeResponse, err error) {
request := leaseRevokeRequest(id)
if err != nil {
h.appendFailed(request, start, err)
@ -139,7 +140,7 @@ func (h *appendableHistory) AppendLeaseRevoke(id int64, start time.Time, end tim
})
}
func (h *appendableHistory) AppendDelete(key string, start, end time.Time, resp *clientv3.DeleteResponse, err error) {
func (h *AppendableHistory) AppendDelete(key string, start, end time.Time, resp *clientv3.DeleteResponse, err error) {
request := deleteRequest(key)
if err != nil {
h.appendFailed(request, start, err)
@ -160,7 +161,7 @@ func (h *appendableHistory) AppendDelete(key string, start, end time.Time, resp
})
}
func (h *appendableHistory) AppendTxn(key, expectValue, newValue string, start, end time.Time, resp *clientv3.TxnResponse, err error) {
func (h *AppendableHistory) AppendTxn(key, expectValue, newValue string, start, end time.Time, resp *clientv3.TxnResponse, err error) {
request := txnRequest(key, expectValue, newValue)
if err != nil {
h.appendFailed(request, start, err)
@ -179,7 +180,7 @@ func (h *appendableHistory) AppendTxn(key, expectValue, newValue string, start,
})
}
func (h *appendableHistory) appendFailed(request EtcdRequest, start time.Time, err error) {
func (h *AppendableHistory) appendFailed(request EtcdRequest, start time.Time, err error) {
h.failed = append(h.failed, porcupine.Operation{
ClientId: h.id,
Input: request,
@ -256,15 +257,15 @@ func leaseRevokeResponse(revision int64) EtcdResponse {
return EtcdResponse{OpsResult: []EtcdOperationResult{{}}, Revision: revision}
}
type history struct {
type History struct {
successful []porcupine.Operation
// failed requests are kept separate as we don't know return time of failed operations.
// Based on https://github.com/anishathalye/porcupine/issues/10
failed []porcupine.Operation
}
func (h history) Merge(h2 history) history {
result := history{
func (h History) Merge(h2 History) History {
result := History{
successful: make([]porcupine.Operation, 0, len(h.successful)+len(h2.successful)),
failed: make([]porcupine.Operation, 0, len(h.failed)+len(h2.failed)),
}
@ -275,7 +276,7 @@ func (h history) Merge(h2 history) history {
return result
}
func (h history) Operations() []porcupine.Operation {
func (h History) Operations() []porcupine.Operation {
operations := make([]porcupine.Operation, 0, len(h.successful)+len(h.failed))
var maxTime int64
for _, op := range h.successful {

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package linearizability
package model
import (
"encoding/json"
@ -35,11 +35,33 @@ const (
LeaseRevoke OperationType = "leaseRevoke"
)
func isWrite(t OperationType) bool {
var Etcd = porcupine.Model{
Init: func() interface{} {
return "[]" // empty PossibleStates
},
Step: func(st interface{}, in interface{}, out interface{}) (bool, interface{}) {
var states PossibleStates
err := json.Unmarshal([]byte(st.(string)), &states)
if err != nil {
panic(err)
}
ok, states := step(states, in.(EtcdRequest), out.(EtcdResponse))
data, err := json.Marshal(states)
if err != nil {
panic(err)
}
return ok, string(data)
},
DescribeOperation: func(in, out interface{}) string {
return describeEtcdRequestResponse(in.(EtcdRequest), out.(EtcdResponse))
},
}
func IsWrite(t OperationType) bool {
return t == Put || t == Delete || t == PutWithLease || t == LeaseRevoke || t == LeaseGrant
}
func inUnique(t OperationType) bool {
func IsUnique(t OperationType) bool {
return t == Put || t == PutWithLease
}
@ -92,28 +114,6 @@ type EtcdState struct {
Leases map[int64]EtcdLease
}
var etcdModel = porcupine.Model{
Init: func() interface{} {
return "[]" // empty PossibleStates
},
Step: func(st interface{}, in interface{}, out interface{}) (bool, interface{}) {
var states PossibleStates
err := json.Unmarshal([]byte(st.(string)), &states)
if err != nil {
panic(err)
}
ok, states := step(states, in.(EtcdRequest), out.(EtcdResponse))
data, err := json.Marshal(states)
if err != nil {
panic(err)
}
return ok, string(data)
},
DescribeOperation: func(in, out interface{}) string {
return describeEtcdRequestResponse(in.(EtcdRequest), out.(EtcdResponse))
},
}
func describeEtcdRequestResponse(request EtcdRequest, response EtcdResponse) string {
prefix := describeEtcdOperations(request.Ops)
if len(request.Conds) != 0 {

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package linearizability
package model
import (
"errors"
@ -521,12 +521,12 @@ func TestModelStep(t *testing.T) {
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
state := etcdModel.Init()
state := Etcd.Init()
for _, op := range tc.operations {
ok, newState := etcdModel.Step(state, op.req, op.resp)
ok, newState := Etcd.Step(state, op.req, op.resp)
if ok != !op.failure {
t.Logf("state: %v", state)
t.Errorf("Unexpected operation result, expect: %v, got: %v, operation: %s", !op.failure, ok, etcdModel.DescribeOperation(op.req, op.resp))
t.Errorf("Unexpected operation result, expect: %v, got: %v, operation: %s", !op.failure, ok, Etcd.DescribeOperation(op.req, op.resp))
}
if ok {
state = newState
@ -601,7 +601,7 @@ func TestModelDescribe(t *testing.T) {
},
}
for _, tc := range tcs {
assert.Equal(t, tc.expectDescribe, etcdModel.DescribeOperation(tc.req, tc.resp))
assert.Equal(t, tc.expectDescribe, Etcd.DescribeOperation(tc.req, tc.resp))
}
}

View File

@ -23,16 +23,18 @@ import (
"golang.org/x/time/rate"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/tests/v3/linearizability/identity"
"go.etcd.io/etcd/tests/v3/linearizability/model"
)
var (
DefaultLeaseTTL int64 = 7200
RequestTimeout = 40 * time.Millisecond
DefaultTraffic Traffic = readWriteSingleKey{keyCount: 4, leaseTTL: DefaultLeaseTTL, writes: []opChance{{operation: Put, chance: 50}, {operation: Delete, chance: 10}, {operation: PutWithLease, chance: 10}, {operation: LeaseRevoke, chance: 10}, {operation: Txn, chance: 20}}}
DefaultTraffic Traffic = readWriteSingleKey{keyCount: 4, leaseTTL: DefaultLeaseTTL, writes: []opChance{{operation: model.Put, chance: 50}, {operation: model.Delete, chance: 10}, {operation: model.PutWithLease, chance: 10}, {operation: model.LeaseRevoke, chance: 10}, {operation: model.Txn, chance: 20}}}
)
type Traffic interface {
Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter, ids idProvider, lm clientId2LeaseIdMapper)
Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage)
}
type readWriteSingleKey struct {
@ -42,11 +44,11 @@ type readWriteSingleKey struct {
}
type opChance struct {
operation OperationType
operation model.OperationType
chance int
}
func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter, ids idProvider, lm clientId2LeaseIdMapper) {
func (t readWriteSingleKey) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage) {
for {
select {
@ -61,7 +63,7 @@ func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter
continue
}
// Provide each write with unique id to make it easier to validate operation history.
t.Write(ctx, c, limiter, key, fmt.Sprintf("%d", ids.RequestId()), lm, c.history.id, resp)
t.Write(ctx, c, limiter, key, fmt.Sprintf("%d", ids.RequestId()), lm, clientId, resp)
}
}
@ -75,22 +77,22 @@ func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limite
return resp, err
}
func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, newValue string, lm clientId2LeaseIdMapper, cid int, lastValues []*mvccpb.KeyValue) error {
func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, newValue string, lm identity.LeaseIdStorage, cid int, lastValues []*mvccpb.KeyValue) error {
writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
var err error
switch t.pickWriteOperation() {
case Put:
case model.Put:
err = c.Put(writeCtx, key, newValue)
case Delete:
case model.Delete:
err = c.Delete(writeCtx, key)
case Txn:
case model.Txn:
var expectValue string
if len(lastValues) != 0 {
expectValue = string(lastValues[0].Value)
}
err = c.Txn(writeCtx, key, expectValue, newValue)
case PutWithLease:
case model.PutWithLease:
leaseId := lm.LeaseId(cid)
if leaseId == 0 {
leaseId, err = c.LeaseGrant(writeCtx, t.leaseTTL)
@ -104,7 +106,7 @@ func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limit
err = c.PutWithLease(putCtx, key, newValue, leaseId)
putCancel()
}
case LeaseRevoke:
case model.LeaseRevoke:
leaseId := lm.LeaseId(cid)
if leaseId != 0 {
err = c.LeaseRevoke(writeCtx, leaseId)
@ -123,7 +125,7 @@ func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limit
return err
}
func (t readWriteSingleKey) pickWriteOperation() OperationType {
func (t readWriteSingleKey) pickWriteOperation() model.OperationType {
sum := 0
for _, op := range t.writes {
sum += op.chance

View File

@ -25,6 +25,7 @@ import (
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/linearizability/model"
)
func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster) [][]watchEvent {
@ -69,17 +70,17 @@ func collectMemberWatchEvents(ctx context.Context, t *testing.T, c *clientv3.Cli
lastRevision = resp.Header.Revision
time := time.Now()
for _, event := range resp.Events {
var op OperationType
var op model.OperationType
switch event.Type {
case mvccpb.PUT:
op = Put
op = model.Put
case mvccpb.DELETE:
op = Delete
op = model.Delete
}
events = append(events, watchEvent{
Time: time,
Revision: event.Kv.ModRevision,
Op: EtcdOperation{
Op: model.EtcdOperation{
Type: op,
Key: string(event.Kv.Key),
Value: string(event.Kv.Value),
@ -94,7 +95,7 @@ func collectMemberWatchEvents(ctx context.Context, t *testing.T, c *clientv3.Cli
}
type watchEvent struct {
Op EtcdOperation
Op model.EtcdOperation
Revision int64
Time time.Time
}