tests: Add defragment request to linearizability tests
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>dependabot/go_modules/go.uber.org/atomic-1.10.0
parent
63902fafbd
commit
c50e602458
|
@ -125,3 +125,11 @@ func (c *recordingClient) PutWithLease(ctx context.Context, key string, value st
|
|||
c.history.AppendPutWithLease(key, value, int64(leaseId), callTime, returnTime, resp, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *recordingClient) Defragment(ctx context.Context) error {
|
||||
callTime := time.Now()
|
||||
resp, err := c.client.Defragment(ctx, c.client.Endpoints()[0])
|
||||
returnTime := time.Now()
|
||||
c.history.AppendDefragment(callTime, returnTime, resp, err)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -42,8 +42,8 @@ const (
|
|||
)
|
||||
|
||||
var (
|
||||
LowTrafficAllRequests = trafficConfig{
|
||||
name: "LowTrafficAllRequests",
|
||||
LowTraffic = trafficConfig{
|
||||
name: "LowTraffic",
|
||||
minimalQPS: 100,
|
||||
maximalQPS: 200,
|
||||
clientCount: 8,
|
||||
|
@ -52,19 +52,23 @@ var (
|
|||
{operation: Delete, chance: 10},
|
||||
{operation: PutWithLease, chance: 10},
|
||||
{operation: LeaseRevoke, chance: 10},
|
||||
{operation: CompareAndSet, chance: 20},
|
||||
{operation: CompareAndSet, chance: 10},
|
||||
{operation: Defragment, chance: 10},
|
||||
}},
|
||||
}
|
||||
HighTrafficPut = trafficConfig{
|
||||
name: "HighTrafficPut",
|
||||
HighTraffic = trafficConfig{
|
||||
name: "HighTraffic",
|
||||
minimalQPS: 200,
|
||||
maximalQPS: 1000,
|
||||
clientCount: 12,
|
||||
traffic: readWriteSingleKey{keyCount: 4, leaseTTL: DefaultLeaseTTL, writes: []requestChance{{operation: Put, chance: 100}}},
|
||||
traffic: readWriteSingleKey{keyCount: 4, leaseTTL: DefaultLeaseTTL, writes: []requestChance{
|
||||
{operation: Put, chance: 90},
|
||||
{operation: Defragment, chance: 10},
|
||||
}},
|
||||
}
|
||||
defaultTraffic = LowTrafficAllRequests
|
||||
defaultTraffic = LowTraffic
|
||||
trafficList = []trafficConfig{
|
||||
LowTrafficAllRequests, HighTrafficPut,
|
||||
LowTraffic, HighTraffic,
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -122,7 +126,7 @@ func TestLinearizability(t *testing.T) {
|
|||
{
|
||||
name: "Issue13766",
|
||||
failpoint: KillFailpoint,
|
||||
traffic: &HighTrafficPut,
|
||||
traffic: &HighTraffic,
|
||||
config: *e2e.NewConfig(
|
||||
e2e.WithSnapshotCount(100),
|
||||
),
|
||||
|
|
|
@ -180,6 +180,21 @@ func (h *AppendableHistory) AppendTxn(key, expectValue, newValue string, start,
|
|||
})
|
||||
}
|
||||
|
||||
func (h *AppendableHistory) AppendDefragment(start, end time.Time, resp *clientv3.DefragmentResponse, err error) {
|
||||
request := defragmentRequest()
|
||||
if err != nil {
|
||||
h.appendFailed(request, start, err)
|
||||
return
|
||||
}
|
||||
h.successful = append(h.successful, porcupine.Operation{
|
||||
ClientId: h.id,
|
||||
Input: request,
|
||||
Call: start.UnixNano(),
|
||||
Output: defragmentResponse(),
|
||||
Return: end.UnixNano(),
|
||||
})
|
||||
}
|
||||
|
||||
func (h *AppendableHistory) appendFailed(request EtcdRequest, start time.Time, err error) {
|
||||
h.failed = append(h.failed, porcupine.Operation{
|
||||
ClientId: h.id,
|
||||
|
@ -257,6 +272,14 @@ func leaseRevokeResponse(revision int64) EtcdResponse {
|
|||
return EtcdResponse{LeaseRevoke: &LeaseRevokeResponse{}, Revision: revision}
|
||||
}
|
||||
|
||||
func defragmentRequest() EtcdRequest {
|
||||
return EtcdRequest{Type: Defragment, Defragment: &DefragmentRequest{}}
|
||||
}
|
||||
|
||||
func defragmentResponse() EtcdResponse {
|
||||
return EtcdResponse{Defragment: &DefragmentResponse{}}
|
||||
}
|
||||
|
||||
type History struct {
|
||||
successful []porcupine.Operation
|
||||
// failed requests are kept separate as we don't know return time of failed operations.
|
||||
|
|
|
@ -58,6 +58,7 @@ const (
|
|||
Txn RequestType = "txn"
|
||||
LeaseGrant RequestType = "leaseGrant"
|
||||
LeaseRevoke RequestType = "leaseRevoke"
|
||||
Defragment RequestType = "defragment"
|
||||
)
|
||||
|
||||
type EtcdRequest struct {
|
||||
|
@ -65,6 +66,7 @@ type EtcdRequest struct {
|
|||
LeaseGrant *LeaseGrantRequest
|
||||
LeaseRevoke *LeaseRevokeRequest
|
||||
Txn *TxnRequest
|
||||
Defragment *DefragmentRequest
|
||||
}
|
||||
|
||||
type TxnRequest struct {
|
||||
|
@ -90,6 +92,7 @@ type LeaseGrantRequest struct {
|
|||
type LeaseRevokeRequest struct {
|
||||
LeaseID int64
|
||||
}
|
||||
type DefragmentRequest struct{}
|
||||
|
||||
type EtcdResponse struct {
|
||||
Err error
|
||||
|
@ -98,6 +101,7 @@ type EtcdResponse struct {
|
|||
Txn *TxnResponse
|
||||
LeaseGrant *LeaseGrantReponse
|
||||
LeaseRevoke *LeaseRevokeResponse
|
||||
Defragment *DefragmentResponse
|
||||
}
|
||||
|
||||
type TxnResponse struct {
|
||||
|
@ -109,6 +113,7 @@ type LeaseGrantReponse struct {
|
|||
LeaseID int64
|
||||
}
|
||||
type LeaseRevokeResponse struct{}
|
||||
type DefragmentResponse struct{}
|
||||
|
||||
func Match(r1, r2 EtcdResponse) bool {
|
||||
return ((r1.ResultUnknown || r2.ResultUnknown) && (r1.Revision == r2.Revision)) || reflect.DeepEqual(r1, r2)
|
||||
|
@ -147,9 +152,11 @@ func describeEtcdResponse(request EtcdRequest, response EtcdResponse) string {
|
|||
}
|
||||
if request.Type == Txn {
|
||||
return fmt.Sprintf("%s, rev: %d", describeTxnResponse(request.Txn, response.Txn), response.Revision)
|
||||
} else {
|
||||
return fmt.Sprintf("ok, rev: %d", response.Revision)
|
||||
}
|
||||
if response.Revision == 0 {
|
||||
return "ok"
|
||||
}
|
||||
return fmt.Sprintf("ok, rev: %d", response.Revision)
|
||||
}
|
||||
|
||||
func describeEtcdRequest(request EtcdRequest) string {
|
||||
|
@ -164,6 +171,8 @@ func describeEtcdRequest(request EtcdRequest) string {
|
|||
return fmt.Sprintf("leaseGrant(%d)", request.LeaseGrant.LeaseID)
|
||||
case LeaseRevoke:
|
||||
return fmt.Sprintf("leaseRevoke(%d)", request.LeaseRevoke.LeaseID)
|
||||
case Defragment:
|
||||
return fmt.Sprintf("defragment()")
|
||||
default:
|
||||
return fmt.Sprintf("<! unknown request type: %q !>", request.Type)
|
||||
}
|
||||
|
@ -231,7 +240,7 @@ func describeEtcdOperationResponse(op OperationType, resp EtcdOperationResult) s
|
|||
func step(states PossibleStates, request EtcdRequest, response EtcdResponse) (bool, PossibleStates) {
|
||||
if len(states) == 0 {
|
||||
// states were not initialized
|
||||
if response.Err != nil || response.ResultUnknown {
|
||||
if response.Err != nil || response.ResultUnknown || response.Revision == 0 {
|
||||
return true, nil
|
||||
}
|
||||
return true, PossibleStates{initState(request, response)}
|
||||
|
@ -278,6 +287,7 @@ func initState(request EtcdRequest, response EtcdResponse) EtcdState {
|
|||
}
|
||||
state.Leases[request.LeaseGrant.LeaseID] = lease
|
||||
case LeaseRevoke:
|
||||
case Defragment:
|
||||
default:
|
||||
panic(fmt.Sprintf("Unknown request type: %v", request.Type))
|
||||
}
|
||||
|
@ -288,7 +298,9 @@ func initState(request EtcdRequest, response EtcdResponse) EtcdState {
|
|||
func applyFailedRequest(states PossibleStates, request EtcdRequest) PossibleStates {
|
||||
for _, s := range states {
|
||||
newState, _ := applyRequestToSingleState(s, request)
|
||||
states = append(states, newState)
|
||||
if !reflect.DeepEqual(newState, s) {
|
||||
states = append(states, newState)
|
||||
}
|
||||
}
|
||||
return states
|
||||
}
|
||||
|
@ -382,6 +394,8 @@ func applyRequestToSingleState(s EtcdState, request EtcdRequest) (EtcdState, Etc
|
|||
s.Revision += 1
|
||||
}
|
||||
return s, EtcdResponse{Revision: s.Revision, LeaseRevoke: &LeaseRevokeResponse{}}
|
||||
case Defragment:
|
||||
return s, defragmentResponse()
|
||||
default:
|
||||
panic(fmt.Sprintf("Unknown request type: %v", request.Type))
|
||||
}
|
||||
|
|
|
@ -518,6 +518,59 @@ func TestModelStep(t *testing.T) {
|
|||
{req: leaseRevokeRequest(1), resp: leaseRevokeResponse(9)},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "All request types",
|
||||
operations: []testOperation{
|
||||
{req: leaseGrantRequest(1), resp: leaseGrantResponse(1)},
|
||||
{req: putWithLeaseRequest("key", "1", 1), resp: putResponse(2)},
|
||||
{req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)},
|
||||
{req: putRequest("key", "4"), resp: putResponse(4)},
|
||||
{req: getRequest("key"), resp: getResponse("4", 4)},
|
||||
{req: txnRequest("key", "4", "5"), resp: txnResponse(true, 5)},
|
||||
{req: deleteRequest("key"), resp: deleteResponse(1, 6)},
|
||||
{req: defragmentRequest(), resp: defragmentResponse()},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Defragment success between all other request types",
|
||||
operations: []testOperation{
|
||||
{req: defragmentRequest(), resp: defragmentResponse()},
|
||||
{req: leaseGrantRequest(1), resp: leaseGrantResponse(1)},
|
||||
{req: defragmentRequest(), resp: defragmentResponse()},
|
||||
{req: putWithLeaseRequest("key", "1", 1), resp: putResponse(2)},
|
||||
{req: defragmentRequest(), resp: defragmentResponse()},
|
||||
{req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)},
|
||||
{req: defragmentRequest(), resp: defragmentResponse()},
|
||||
{req: putRequest("key", "4"), resp: putResponse(4)},
|
||||
{req: defragmentRequest(), resp: defragmentResponse()},
|
||||
{req: getRequest("key"), resp: getResponse("4", 4)},
|
||||
{req: defragmentRequest(), resp: defragmentResponse()},
|
||||
{req: txnRequest("key", "4", "5"), resp: txnResponse(true, 5)},
|
||||
{req: defragmentRequest(), resp: defragmentResponse()},
|
||||
{req: deleteRequest("key"), resp: deleteResponse(1, 6)},
|
||||
{req: defragmentRequest(), resp: defragmentResponse()},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Defragment failures between all other request types",
|
||||
operations: []testOperation{
|
||||
{req: defragmentRequest(), resp: failedResponse(errors.New("failed"))},
|
||||
{req: leaseGrantRequest(1), resp: leaseGrantResponse(1)},
|
||||
{req: defragmentRequest(), resp: failedResponse(errors.New("failed"))},
|
||||
{req: putWithLeaseRequest("key", "1", 1), resp: putResponse(2)},
|
||||
{req: defragmentRequest(), resp: failedResponse(errors.New("failed"))},
|
||||
{req: leaseRevokeRequest(1), resp: leaseRevokeResponse(3)},
|
||||
{req: defragmentRequest(), resp: failedResponse(errors.New("failed"))},
|
||||
{req: putRequest("key", "4"), resp: putResponse(4)},
|
||||
{req: defragmentRequest(), resp: failedResponse(errors.New("failed"))},
|
||||
{req: getRequest("key"), resp: getResponse("4", 4)},
|
||||
{req: defragmentRequest(), resp: failedResponse(errors.New("failed"))},
|
||||
{req: txnRequest("key", "4", "5"), resp: txnResponse(true, 5)},
|
||||
{req: defragmentRequest(), resp: failedResponse(errors.New("failed"))},
|
||||
{req: deleteRequest("key"), resp: deleteResponse(1, 6)},
|
||||
{req: defragmentRequest(), resp: failedResponse(errors.New("failed"))},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
|
@ -604,6 +657,11 @@ func TestModelDescribe(t *testing.T) {
|
|||
resp: failedResponse(errors.New("failed")),
|
||||
expectDescribe: `if(key9=="9").then(put("key9", "99", nil)) -> err: "failed"`,
|
||||
},
|
||||
{
|
||||
req: defragmentRequest(),
|
||||
resp: defragmentResponse(),
|
||||
expectDescribe: `defragment() -> ok`,
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
assert.Equal(t, tc.expectDescribe, Etcd.DescribeOperation(tc.req, tc.resp))
|
||||
|
|
|
@ -40,6 +40,7 @@ const (
|
|||
PutWithLease TrafficRequestType = "putWithLease"
|
||||
LeaseRevoke TrafficRequestType = "leaseRevoke"
|
||||
CompareAndSet TrafficRequestType = "compareAndSet"
|
||||
Defragment TrafficRequestType = "defragment"
|
||||
)
|
||||
|
||||
type Traffic interface {
|
||||
|
@ -124,6 +125,8 @@ func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limit
|
|||
lm.RemoveLeaseId(cid)
|
||||
}
|
||||
}
|
||||
case Defragment:
|
||||
err = c.Defragment(writeCtx)
|
||||
default:
|
||||
panic("invalid operation")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue