Add support for lease api to linearizability tests

Signed-off-by: Geeta Gharpure <geetagh@amazon.com>
dependabot/go_modules/go.uber.org/atomic-1.10.0
Geeta Gharpure 2022-12-29 16:08:36 -08:00 committed by root
parent a992fb5e92
commit 5b84526e9a
7 changed files with 409 additions and 13 deletions

View File

@ -94,3 +94,32 @@ func (c *recordingClient) Txn(ctx context.Context, key, expectedValue, newValue
c.history.AppendTxn(key, expectedValue, newValue, callTime, returnTime, resp, err)
return err
}
func (c *recordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, error) {
callTime := time.Now()
resp, err := c.client.Lease.Grant(ctx, ttl)
returnTime := time.Now()
c.history.AppendLeaseGrant(callTime, returnTime, resp, err)
var leaseId int64
if resp != nil {
leaseId = int64(resp.ID)
}
return leaseId, err
}
func (c *recordingClient) LeaseRevoke(ctx context.Context, leaseId int64) error {
callTime := time.Now()
resp, err := c.client.Lease.Revoke(ctx, clientv3.LeaseID(leaseId))
returnTime := time.Now()
c.history.AppendLeaseRevoke(leaseId, callTime, returnTime, resp, err)
return err
}
func (c *recordingClient) PutWithLease(ctx context.Context, key string, value string, leaseId int64) error {
callTime := time.Now()
opts := clientv3.WithLease(clientv3.LeaseID(leaseId))
resp, err := c.client.Put(ctx, key, value, opts)
returnTime := time.Now()
c.history.AppendPutWithLease(key, value, int64(leaseId), callTime, returnTime, resp, err)
return err
}

View File

@ -78,6 +78,67 @@ func (h *appendableHistory) AppendPut(key, value string, start, end time.Time, r
})
}
func (h *appendableHistory) AppendPutWithLease(key, value string, leaseID int64, start, end time.Time, resp *clientv3.PutResponse, err error) {
request := putWithLeaseRequest(key, value, leaseID)
if err != nil {
h.appendFailed(request, start, err)
return
}
var revision int64
if resp != nil && resp.Header != nil {
revision = resp.Header.Revision
}
h.successful = append(h.successful, porcupine.Operation{
ClientId: h.id,
Input: request,
Call: start.UnixNano(),
Output: putResponse(revision),
Return: end.UnixNano(),
})
}
func (h *appendableHistory) AppendLeaseGrant(start, end time.Time, resp *clientv3.LeaseGrantResponse, err error) {
var leaseID int64
if resp != nil {
leaseID = int64(resp.ID)
}
request := leaseGrantRequest(leaseID)
if err != nil {
h.appendFailed(request, start, err)
return
}
var revision int64
if resp != nil && resp.ResponseHeader != nil {
revision = resp.ResponseHeader.Revision
}
h.successful = append(h.successful, porcupine.Operation{
ClientId: h.id,
Input: request,
Call: start.UnixNano(),
Output: leaseGrantResponse(revision),
Return: end.UnixNano(),
})
}
func (h *appendableHistory) AppendLeaseRevoke(id int64, start time.Time, end time.Time, resp *clientv3.LeaseRevokeResponse, err error) {
request := leaseRevokeRequest(id)
if err != nil {
h.appendFailed(request, start, err)
return
}
var revision int64
if resp != nil && resp.Header != nil {
revision = resp.Header.Revision
}
h.successful = append(h.successful, porcupine.Operation{
ClientId: h.id,
Input: request,
Call: start.UnixNano(),
Output: leaseRevokeResponse(revision),
Return: end.UnixNano(),
})
}
func (h *appendableHistory) AppendDelete(key string, start, end time.Time, resp *clientv3.DeleteResponse, err error) {
request := deleteRequest(key)
if err != nil {
@ -171,6 +232,26 @@ func txnResponse(succeeded bool, revision int64) EtcdResponse {
return EtcdResponse{Result: result, TxnFailure: !succeeded, Revision: revision}
}
func putWithLeaseRequest(key, value string, leaseID int64) EtcdRequest {
return EtcdRequest{Ops: []EtcdOperation{{Type: PutWithLease, Key: key, Value: value, LeaseID: leaseID}}}
}
func leaseGrantRequest(leaseID int64) EtcdRequest {
return EtcdRequest{Ops: []EtcdOperation{{Type: LeaseGrant, LeaseID: leaseID}}}
}
func leaseGrantResponse(revision int64) EtcdResponse {
return EtcdResponse{Result: []EtcdOperationResult{{}}, Revision: revision}
}
func leaseRevokeRequest(leaseID int64) EtcdRequest {
return EtcdRequest{Ops: []EtcdOperation{{Type: LeaseRevoke, LeaseID: leaseID}}}
}
func leaseRevokeResponse(revision int64) EtcdResponse {
return EtcdResponse{Result: []EtcdOperationResult{{}}, Revision: revision}
}
type history struct {
successful []porcupine.Operation
// failed requests are kept separate as we don't know return time of failed operations.

View File

@ -0,0 +1,53 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package linearizability
import (
"sync"
)
type clientId2LeaseIdMapper interface {
LeaseId(int) int64
AddLeaseId(int, int64)
RemoveLeaseId(int)
}
func newClientId2LeaseIdMapper() clientId2LeaseIdMapper {
return &atomicClientId2LeaseIdMapper{m: map[int]int64{}}
}
type atomicClientId2LeaseIdMapper struct {
sync.RWMutex
// m is used to store clientId to leaseId mapping.
m map[int]int64
}
func (lm *atomicClientId2LeaseIdMapper) LeaseId(clientId int) int64 {
lm.RLock()
defer lm.RUnlock()
return lm.m[clientId]
}
func (lm *atomicClientId2LeaseIdMapper) AddLeaseId(clientId int, leaseId int64) {
lm.Lock()
defer lm.Unlock()
lm.m[clientId] = leaseId
}
func (lm *atomicClientId2LeaseIdMapper) RemoveLeaseId(clientId int) {
lm.Lock()
defer lm.Unlock()
delete(lm.m, clientId)
}

View File

@ -156,6 +156,7 @@ func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessClu
endpoints := clus.EndpointsV3()
ids := newIdProvider()
lm := newClientId2LeaseIdMapper()
h := history{}
limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200)
@ -172,7 +173,7 @@ func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessClu
defer wg.Done()
defer c.Close()
config.traffic.Run(ctx, c, limiter, ids)
config.traffic.Run(ctx, c, limiter, ids, lm)
mux.Lock()
h = h.Merge(c.history.history)
mux.Unlock()

View File

@ -26,10 +26,13 @@ import (
type OperationType string
const (
Get OperationType = "get"
Put OperationType = "put"
Delete OperationType = "delete"
Txn OperationType = "txn"
Get OperationType = "get"
Put OperationType = "put"
Delete OperationType = "delete"
Txn OperationType = "txn"
PutWithLease OperationType = "putWithLease"
LeaseGrant OperationType = "leaseGrant"
LeaseRevoke OperationType = "leaseRevoke"
)
type EtcdRequest struct {
@ -43,9 +46,10 @@ type EtcdCondition struct {
}
type EtcdOperation struct {
Type OperationType
Key string
Value string
Type OperationType
Key string
Value string
LeaseID int64
}
type EtcdResponse struct {
@ -60,11 +64,20 @@ type EtcdOperationResult struct {
Deleted int64
}
var leased = struct{}{}
type EtcdLease struct {
LeaseID int64
Keys map[string]struct{}
}
type PossibleStates []EtcdState
type EtcdState struct {
Revision int64
KeyValues map[string]string
KeyLeases map[string]int64
Leases map[int64]EtcdLease
}
var etcdModel = porcupine.Model{
@ -139,6 +152,12 @@ func describeEtcdOperation(op EtcdOperation) string {
return fmt.Sprintf("delete(%q)", op.Key)
case Txn:
return "<! unsupported: nested transaction !>"
case LeaseGrant:
return fmt.Sprintf("leaseGrant(%d)", op.LeaseID)
case LeaseRevoke:
return fmt.Sprintf("leaseRevoke(%d)", op.LeaseID)
case PutWithLease:
return fmt.Sprintf("putWithLease(%q, %q, %d)", op.Key, op.Value, op.LeaseID)
default:
return fmt.Sprintf("<! unknown op: %q !>", op.Type)
}
@ -157,6 +176,12 @@ func describeEtcdOperationResponse(op OperationType, resp EtcdOperationResult) s
return fmt.Sprintf("deleted: %d", resp.Deleted)
case Txn:
return "<! unsupported: nested transaction !>"
case LeaseGrant:
return fmt.Sprintf("ok")
case LeaseRevoke:
return fmt.Sprintf("ok")
case PutWithLease:
return fmt.Sprintf("ok")
default:
return fmt.Sprintf("<! unknown op: %q !>", op)
}
@ -183,6 +208,8 @@ func initState(request EtcdRequest, response EtcdResponse) EtcdState {
state := EtcdState{
Revision: response.Revision,
KeyValues: map[string]string{},
KeyLeases: map[string]int64{},
Leases: map[int64]EtcdLease{},
}
if response.TxnFailure {
return state
@ -197,6 +224,24 @@ func initState(request EtcdRequest, response EtcdResponse) EtcdState {
case Put:
state.KeyValues[op.Key] = op.Value
case Delete:
case PutWithLease:
if _, ok := state.Leases[op.LeaseID]; ok {
state.KeyValues[op.Key] = op.Value
//detach from old lease id but we dont expect that at init
if _, ok := state.KeyLeases[op.Key]; ok {
panic("old lease id found at init")
}
//attach to new lease id
state.KeyLeases[op.Key] = op.LeaseID
state.Leases[op.LeaseID].Keys[op.Key] = leased
}
case LeaseGrant:
lease := EtcdLease{
LeaseID: op.LeaseID,
Keys: map[string]struct{}{},
}
state.Leases[op.LeaseID] = lease
case LeaseRevoke:
default:
panic("Unknown operation")
}
@ -244,6 +289,7 @@ func applyRequestToSingleState(s EtcdState, request EtcdRequest) (EtcdState, Etc
s.KeyValues = newKVs
opResp := make([]EtcdOperationResult, len(request.Ops))
increaseRevision := false
for i, op := range request.Ops {
switch op.Type {
case Get:
@ -251,18 +297,68 @@ func applyRequestToSingleState(s EtcdState, request EtcdRequest) (EtcdState, Etc
case Put:
s.KeyValues[op.Key] = op.Value
increaseRevision = true
s = detachFromOldLease(s, op)
case Delete:
if _, ok := s.KeyValues[op.Key]; ok {
delete(s.KeyValues, op.Key)
increaseRevision = true
s = detachFromOldLease(s, op)
opResp[i].Deleted = 1
}
case PutWithLease:
if _, ok := s.Leases[op.LeaseID]; ok {
//handle put op.
s.KeyValues[op.Key] = op.Value
increaseRevision = true
s = detachFromOldLease(s, op)
s = attachToNewLease(s, op)
}
case LeaseRevoke:
//Delete the keys attached to the lease
keyDeleted := false
for key, _ := range s.Leases[op.LeaseID].Keys {
//same as delete.
if _, ok := s.KeyValues[key]; ok {
if !keyDeleted {
keyDeleted = true
}
delete(s.KeyValues, key)
delete(s.KeyLeases, key)
}
}
//delete the lease
delete(s.Leases, op.LeaseID)
if keyDeleted {
increaseRevision = true
}
case LeaseGrant:
lease := EtcdLease{
LeaseID: op.LeaseID,
Keys: map[string]struct{}{},
}
s.Leases[op.LeaseID] = lease
default:
panic("unsupported operation")
}
}
if increaseRevision {
s.Revision += 1
}
return s, EtcdResponse{Result: opResp, Revision: s.Revision}
}
func detachFromOldLease(s EtcdState, op EtcdOperation) EtcdState {
if oldLeaseId, ok := s.KeyLeases[op.Key]; ok {
delete(s.Leases[oldLeaseId].Keys, op.Key)
delete(s.KeyLeases, op.Key)
}
return s
}
func attachToNewLease(s EtcdState, op EtcdOperation) EtcdState {
s.KeyLeases[op.Key] = op.LeaseID
s.Leases[op.LeaseID].Keys[op.Key] = leased
return s
}

View File

@ -402,6 +402,122 @@ func TestModelStep(t *testing.T) {
{req: txnRequest("key", "8", "10"), resp: txnResponse(false, 9)},
},
},
{
name: "Put with valid lease id should succeed. Put with invalid lease id should fail",
operations: []testOperation{
{req: leaseGrantRequest(1), resp: leaseGrantResponse(1)},
{req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)},
{req: putWithLeaseRequest("key", "3", 2), resp: putResponse(3), failure: true},
{req: getRequest("key"), resp: getResponse("2", 2)},
},
},
{
name: "Put with valid lease id should succeed. Put with expired lease id should fail",
operations: []testOperation{
{req: leaseGrantRequest(1), resp: leaseGrantResponse(1)},
{req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)},
{req: getRequest("key"), resp: getResponse("2", 2)},
{req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)},
{req: putWithLeaseRequest("key", "4", 1), resp: putResponse(4), failure: true},
{req: getRequest("key"), resp: getResponse("", 3)},
},
},
{
name: "Revoke should increment the revision",
operations: []testOperation{
{req: leaseGrantRequest(1), resp: leaseGrantResponse(1)},
{req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)},
{req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)},
{req: getRequest("key"), resp: getResponse("", 3)},
},
},
{
name: "Put following a PutWithLease will detach the key from the lease",
operations: []testOperation{
{req: leaseGrantRequest(1), resp: leaseGrantResponse(1)},
{req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)},
{req: putRequest("key", "3"), resp: putResponse(3)},
{req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)},
{req: getRequest("key"), resp: getResponse("3", 3)},
},
},
{
name: "Change lease. Revoking older lease should not increment revision",
operations: []testOperation{
{req: leaseGrantRequest(1), resp: leaseGrantResponse(1)},
{req: leaseGrantRequest(2), resp: leaseGrantResponse(1)},
{req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)},
{req: putWithLeaseRequest("key", "3", 2), resp: putResponse(3)},
{req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)},
{req: getRequest("key"), resp: getResponse("3", 3)},
{req: leaseRevokeRequest(2), resp: leaseRevokeResponse(4)},
{req: getRequest("key"), resp: getResponse("", 4)},
},
},
{
name: "Update key with same lease",
operations: []testOperation{
{req: leaseGrantRequest(1), resp: leaseGrantResponse(1)},
{req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)},
{req: putWithLeaseRequest("key", "3", 1), resp: putResponse(3)},
{req: getRequest("key"), resp: getResponse("3", 3)},
},
},
{
name: "Deleting a leased key - revoke should not increment revision",
operations: []testOperation{
{req: leaseGrantRequest(1), resp: leaseGrantResponse(1)},
{req: putWithLeaseRequest("key", "2", 1), resp: putResponse(2)},
{req: deleteRequest("key"), resp: deleteResponse(1, 3)},
{req: leaseRevokeRequest(1), resp: leaseRevokeResponse(4), failure: true},
{req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)},
},
},
{
name: "Lease a few keys - revoke should increment revision only once",
operations: []testOperation{
{req: leaseGrantRequest(1), resp: leaseGrantResponse(1)},
{req: putWithLeaseRequest("key1", "1", 1), resp: putResponse(2)},
{req: putWithLeaseRequest("key2", "2", 1), resp: putResponse(3)},
{req: putWithLeaseRequest("key3", "3", 1), resp: putResponse(4)},
{req: putWithLeaseRequest("key4", "4", 1), resp: putResponse(5)},
{req: leaseRevokeRequest(1), resp: leaseRevokeResponse(6)},
},
},
{
name: "Lease some keys then delete some of them. Revoke should increment revision since some keys were still leased",
operations: []testOperation{
{req: leaseGrantRequest(1), resp: leaseGrantResponse(1)},
{req: putWithLeaseRequest("key1", "1", 1), resp: putResponse(2)},
{req: putWithLeaseRequest("key2", "2", 1), resp: putResponse(3)},
{req: putWithLeaseRequest("key3", "3", 1), resp: putResponse(4)},
{req: putWithLeaseRequest("key4", "4", 1), resp: putResponse(5)},
{req: deleteRequest("key1"), resp: deleteResponse(1, 6)},
{req: deleteRequest("key3"), resp: deleteResponse(1, 7)},
{req: deleteRequest("key4"), resp: deleteResponse(1, 8)},
{req: leaseRevokeRequest(1), resp: leaseRevokeResponse(9)},
{req: deleteRequest("key2"), resp: deleteResponse(0, 9)},
{req: getRequest("key1"), resp: getResponse("", 9)},
{req: getRequest("key2"), resp: getResponse("", 9)},
{req: getRequest("key3"), resp: getResponse("", 9)},
{req: getRequest("key4"), resp: getResponse("", 9)},
},
},
{
name: "Lease some keys then delete all of them. Revoke should not increment",
operations: []testOperation{
{req: leaseGrantRequest(1), resp: leaseGrantResponse(1)},
{req: putWithLeaseRequest("key1", "1", 1), resp: putResponse(2)},
{req: putWithLeaseRequest("key2", "2", 1), resp: putResponse(3)},
{req: putWithLeaseRequest("key3", "3", 1), resp: putResponse(4)},
{req: putWithLeaseRequest("key4", "4", 1), resp: putResponse(5)},
{req: deleteRequest("key1"), resp: deleteResponse(1, 6)},
{req: deleteRequest("key2"), resp: deleteResponse(1, 7)},
{req: deleteRequest("key3"), resp: deleteResponse(1, 8)},
{req: deleteRequest("key4"), resp: deleteResponse(1, 9)},
{req: leaseRevokeRequest(1), resp: leaseRevokeResponse(9)},
},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {

View File

@ -26,16 +26,18 @@ import (
)
var (
DefaultTraffic Traffic = readWriteSingleKey{keyCount: 4, writes: []opChance{{operation: Put, chance: 60}, {operation: Delete, chance: 20}, {operation: Txn, chance: 20}}}
DefaultLeaseTTL int64 = 7200
DefaultTraffic Traffic = readWriteSingleKey{keyCount: 4, leaseTTL: DefaultLeaseTTL, writes: []opChance{{operation: Put, chance: 50}, {operation: Delete, chance: 10}, {operation: PutWithLease, chance: 10}, {operation: LeaseRevoke, chance: 10}, {operation: Txn, chance: 20}}}
)
type Traffic interface {
Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter, ids idProvider)
Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter, ids idProvider, lm clientId2LeaseIdMapper)
}
type readWriteSingleKey struct {
keyCount int
writes []opChance
leaseTTL int64
}
type opChance struct {
@ -43,7 +45,7 @@ type opChance struct {
chance int
}
func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter, ids idProvider) {
func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter, ids idProvider, lm clientId2LeaseIdMapper) {
for {
select {
@ -58,7 +60,7 @@ func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter
continue
}
// Provide each write with unique id to make it easier to validate operation history.
t.Write(ctx, c, limiter, key, fmt.Sprintf("%d", ids.RequestId()), resp)
t.Write(ctx, c, limiter, key, fmt.Sprintf("%d", ids.RequestId()), lm, c.history.id, resp)
}
}
@ -72,7 +74,7 @@ func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limite
return resp, err
}
func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, newValue string, lastValues []*mvccpb.KeyValue) error {
func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, newValue string, lm clientId2LeaseIdMapper, cid int, lastValues []*mvccpb.KeyValue) error {
putCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond)
var err error
@ -87,6 +89,24 @@ func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limit
expectValue = string(lastValues[0].Value)
}
err = c.Txn(putCtx, key, expectValue, newValue)
case PutWithLease:
leaseId := lm.LeaseId(cid)
if leaseId == 0 {
leaseId, err = c.LeaseGrant(ctx, t.leaseTTL)
lm.AddLeaseId(cid, leaseId)
}
if leaseId != 0 {
err = c.PutWithLease(putCtx, key, newValue, leaseId)
}
case LeaseRevoke:
leaseId := lm.LeaseId(cid)
if leaseId != 0 {
err = c.LeaseRevoke(putCtx, leaseId)
//if LeaseRevoke has failed, do not remove the mapping.
if err == nil {
lm.RemoveLeaseId(cid)
}
}
default:
panic("invalid operation")
}