tests: Validate etcd linearizability

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
dependabot/go_modules/go.uber.org/atomic-1.10.0
Marek Siarkowicz 2022-08-29 12:11:34 +02:00
parent e24402d39f
commit 069e26e284
18 changed files with 709 additions and 5 deletions

17
.github/workflows/linearizability.yaml vendored Normal file
View File

@ -0,0 +1,17 @@
name: Linearizability
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: "1.19.1"
- run: |
mkdir -p /tmp/linearizability
EXPECT_DEBUG=true GO_TEST_FLAGS=-v RESULTS_DIR=/tmp/linearizability make test-linearizability
- uses: actions/upload-artifact@v2
if: always()
with:
path: /tmp/linearizability/*

View File

@ -7,25 +7,31 @@ build:
# Tests
GO_TEST_FLAGS?=
.PHONY: test
test:
PASSES="unit integration release e2e" ./scripts/test.sh
PASSES="unit integration release e2e" ./scripts/test.sh $(GO_TEST_FLAGS)
.PHONY: test-unit
test-unit:
PASSES="unit" ./scripts/test.sh
PASSES="unit" ./scripts/test.sh $(GO_TEST_FLAGS)
.PHONY: test-integration
test-integration:
PASSES="integration" ./scripts/test.sh
PASSES="integration" ./scripts/test.sh $(GO_TEST_FLAGS)
.PHONY: test-e2e
test-e2e: build
PASSES="e2e" ./scripts/test.sh
PASSES="e2e" ./scripts/test.sh $(GO_TEST_FLAGS)
.PHONY: test-e2e-release
test-e2e-release: build
PASSES="release e2e" ./scripts/test.sh
PASSES="release e2e" ./scripts/test.sh $(GO_TEST_FLAGS)
.PHONY: test-linearizability
test-linearizability: build
PASSES="linearizability" ./scripts/test.sh $(GO_TEST_FLAGS)
# Static analysis

View File

@ -8,6 +8,15 @@
}
]
},
{
"project": "github.com/anishathalye/porcupine",
"licenses": [
{
"type": "MIT License",
"confidence": 1
}
]
},
{
"project": "github.com/benbjohnson/clock",
"licenses": [

View File

@ -182,6 +182,11 @@ func (ep *ExpectProcess) Signal(sig os.Signal) error {
return ep.cmd.Process.Signal(sig)
}
func (ep *ExpectProcess) Wait() error {
_, err := ep.cmd.Process.Wait()
return err
}
// Close waits for the expect process to exit.
// Close currently does not return error if process exited with !=0 status.
// TODO: Close should expose underlying process failure by default.

View File

@ -124,6 +124,11 @@ function e2e_pass {
run_for_module "tests" go_test "./common/..." "keep_going" : --tags=e2e -timeout="${TIMEOUT:-30m}" "${RUN_ARG[@]}" "$@"
}
function linearizability_pass {
# e2e tests are running pre-build binary. Settings like --race,-cover,-cpu does not have any impact.
run_for_module "tests" go_test "./linearizability/..." "keep_going" : -timeout="${TIMEOUT:-30m}" "${RUN_ARG[@]}" "$@"
}
function integration_e2e_pass {
run_pass "integration" "${@}"
run_pass "e2e" "${@}"

View File

@ -109,6 +109,10 @@ func (c *e2eCluster) Client(cfg clientv3.AuthConfig) (Client, error) {
return e2eClient{etcdctl}, nil
}
func (c *e2eCluster) Endpoints() []string {
return c.EndpointsV3()
}
func (c *e2eCluster) Members() (ms []Member) {
for _, proc := range c.EtcdProcessCluster.Procs {
ms = append(ms, e2eMember{EtcdProcess: proc, Cfg: c.Cfg})

View File

@ -103,6 +103,14 @@ func (p *proxyEtcdProcess) Logs() LogsExpect {
return p.etcdProc.Logs()
}
func (p *proxyEtcdProcess) Kill() error {
return p.etcdProc.Kill()
}
func (p *proxyEtcdProcess) Wait() error {
return p.etcdProc.Wait()
}
type proxyProc struct {
lg *zap.Logger
name string

View File

@ -19,6 +19,7 @@ import (
"fmt"
"net/url"
"os"
"syscall"
"testing"
"time"
@ -38,12 +39,14 @@ type EtcdProcess interface {
EndpointsV3() []string
EndpointsMetrics() []string
Wait() error
Start(ctx context.Context) error
Restart(ctx context.Context) error
Stop() error
Close() error
Config() *EtcdServerProcessConfig
Logs() LogsExpect
Kill() error
}
type LogsExpect interface {
@ -173,6 +176,22 @@ func (ep *EtcdServerProcess) Logs() LogsExpect {
return ep.proc
}
func (ep *EtcdServerProcess) Kill() error {
ep.cfg.lg.Info("killing server...", zap.String("name", ep.cfg.Name))
return ep.proc.Signal(syscall.SIGKILL)
}
func (ep *EtcdServerProcess) Wait() error {
err := ep.proc.Wait()
if err != nil {
ep.cfg.lg.Error("failed to wait for server exit", zap.String("name", ep.cfg.Name))
return err
}
ep.cfg.lg.Info("server exited", zap.String("name", ep.cfg.Name))
ep.proc = nil
return nil
}
func AssertProcessLogs(t *testing.T, ep EtcdProcess, expectLog string) {
t.Helper()
var err error

View File

@ -33,6 +33,7 @@ type Cluster interface {
Client(cfg clientv3.AuthConfig) (Client, error)
WaitLeader(t testing.TB) int
Close() error
Endpoints() []string
}
type Member interface {

View File

@ -15,6 +15,7 @@ replace (
)
require (
github.com/anishathalye/porcupine v0.1.2
github.com/coreos/go-semver v0.3.0
github.com/dustin/go-humanize v1.0.0
github.com/gogo/protobuf v1.3.2

View File

@ -43,6 +43,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/anishathalye/porcupine v0.1.2 h1:eqWNeLcnTzXt6usipDJ4RFn6XOWqY5wEqBYVG3yFLSE=
github.com/anishathalye/porcupine v0.1.2/go.mod h1:/X9OQYnVb7DzfKCQVO4tI1Aq+o56UJW+RvN/5U4EuZA=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=

View File

@ -0,0 +1,87 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package linearizability
import (
"context"
"time"
"github.com/anishathalye/porcupine"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
type recordingClient struct {
client clientv3.Client
id int
operations []porcupine.Operation
}
func NewClient(endpoints []string, id int) (*recordingClient, error) {
cc, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
Logger: zap.NewNop(),
DialKeepAliveTime: 1 * time.Millisecond,
DialKeepAliveTimeout: 5 * time.Millisecond,
})
if err != nil {
return nil, err
}
return &recordingClient{
client: *cc,
id: id,
operations: []porcupine.Operation{},
}, nil
}
func (c *recordingClient) Close() error {
return c.client.Close()
}
func (c *recordingClient) Get(ctx context.Context, key string) error {
callTime := time.Now()
resp, err := c.client.Get(ctx, key)
returnTime := time.Now()
if err != nil {
return err
}
var readData string
if len(resp.Kvs) == 1 {
readData = string(resp.Kvs[0].Value)
}
c.operations = append(c.operations, porcupine.Operation{
ClientId: c.id,
Input: etcdRequest{op: Get, key: key},
Call: callTime.UnixNano(),
Output: etcdResponse{getData: readData},
Return: returnTime.UnixNano(),
})
return nil
}
func (c *recordingClient) Put(ctx context.Context, key, value string) error {
callTime := time.Now()
_, err := c.client.Put(ctx, key, value)
returnTime := time.Now()
c.operations = append(c.operations, porcupine.Operation{
ClientId: c.id,
Input: etcdRequest{op: Put, key: key, putData: value},
Call: callTime.UnixNano(),
Output: etcdResponse{err: err},
Return: returnTime.UnixNano(),
})
return nil
}

View File

@ -0,0 +1,49 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package linearizability
import (
"context"
"math/rand"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)
var (
KillFailpoint Failpoint = killFailpoint{}
)
type Failpoint interface {
Trigger(ctx context.Context, clus *e2e.EtcdProcessCluster) error
}
type killFailpoint struct{}
func (f killFailpoint) Trigger(ctx context.Context, clus *e2e.EtcdProcessCluster) error {
member := clus.Procs[rand.Int()%len(clus.Procs)]
err := member.Kill()
if err != nil {
return err
}
err = member.Wait()
if err != nil {
return err
}
err = member.Start(ctx)
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,183 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package linearizability
import (
"context"
"fmt"
"path/filepath"
"strings"
"sync"
"testing"
"time"
"github.com/anishathalye/porcupine"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"golang.org/x/time/rate"
)
const (
// minimalQPS is used to validate if enough traffic is send to make tests accurate.
minimalQPS = 100.0
// maximalQPS limits number of requests send to etcd to avoid linearizability analysis taking too long.
maximalQPS = 200.0
// failpointTriggersCount
failpointTriggersCount = 60
// waitBetweenFailpointTriggers
waitBetweenFailpointTriggers = time.Second
)
func TestLinearizability(t *testing.T) {
testRunner.BeforeTest(t)
tcs := []struct {
name string
failpoint Failpoint
config e2e.EtcdProcessClusterConfig
}{
{
name: "KillClusterOfSize1",
failpoint: KillFailpoint,
config: e2e.EtcdProcessClusterConfig{
ClusterSize: 1,
},
},
{
name: "KillClusterOfSize3",
failpoint: KillFailpoint,
config: e2e.EtcdProcessClusterConfig{
ClusterSize: 3,
},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
failpoint := FailpointConfig{
failpoint: tc.failpoint,
count: failpointTriggersCount,
waitBetweenTriggers: waitBetweenFailpointTriggers,
}
traffic := trafficConfig{
minimalQPS: minimalQPS,
maximalQPS: maximalQPS,
clientCount: 8,
traffic: PutGetTraffic,
}
testLinearizability(context.Background(), t, tc.config, failpoint, traffic)
})
}
}
func testLinearizability(ctx context.Context, t *testing.T, config e2e.EtcdProcessClusterConfig, failpoint FailpointConfig, traffic trafficConfig) {
clus, err := e2e.NewEtcdProcessCluster(ctx, t, &config)
if err != nil {
t.Fatal(err)
}
defer clus.Close()
ctx, cancel := context.WithCancel(ctx)
go func() {
defer cancel()
err := triggerFailpoints(ctx, clus, failpoint)
if err != nil {
t.Error(err)
}
}()
operations := simulateTraffic(ctx, t, clus, traffic)
clus.Close()
linearizable, info := porcupine.CheckOperationsVerbose(etcdModel, operations, 0)
if linearizable != porcupine.Ok {
t.Error("Model is not linearizable")
}
path, err := filepath.Abs(filepath.Join(resultsDirectory, strings.Replace(t.Name(), "/", "_", -1)+".html"))
if err != nil {
t.Error(err)
}
err = porcupine.VisualizePath(etcdModel, info, path)
if err != nil {
t.Errorf("Failed to visualize, err: %v", err)
}
t.Logf("saving visualization to %q", path)
}
func triggerFailpoints(ctx context.Context, clus *e2e.EtcdProcessCluster, config FailpointConfig) error {
var err error
successes := 0
failures := 0
time.Sleep(config.waitBetweenTriggers)
for successes < config.count && failures < config.count {
err = config.failpoint.Trigger(ctx, clus)
if err != nil {
failures++
continue
}
successes++
time.Sleep(config.waitBetweenTriggers)
}
if successes < config.count || failures >= config.count {
return fmt.Errorf("failed to trigger failpoints enough times, err: %v", err)
}
return nil
}
type FailpointConfig struct {
failpoint Failpoint
count int
waitBetweenTriggers time.Duration
}
func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config trafficConfig) (operations []porcupine.Operation) {
mux := sync.Mutex{}
endpoints := clus.EndpointsV3()
limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200)
startTime := time.Now()
wg := sync.WaitGroup{}
for i := 0; i < config.clientCount; i++ {
wg.Add(1)
endpoints := []string{endpoints[i%len(endpoints)]}
c, err := NewClient(endpoints, i)
if err != nil {
t.Fatal(err)
}
go func(c *recordingClient) {
defer wg.Done()
defer c.Close()
config.traffic.Run(ctx, c, limiter)
mux.Lock()
operations = append(operations, c.operations...)
mux.Unlock()
}(c)
}
wg.Wait()
endTime := time.Now()
t.Logf("Recorded %d operations", len(operations))
qps := float64(len(operations)) / float64(endTime.Sub(startTime)) * float64(time.Second)
t.Logf("Average traffic: %f qps", qps)
if qps < config.minimalQPS {
t.Errorf("Requiring minimal %f qps for test results to be reliable, got %f qps", config.minimalQPS, qps)
}
return operations
}
type trafficConfig struct {
minimalQPS float64
maximalQPS float64
clientCount int
traffic Traffic
}

View File

@ -0,0 +1,41 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package linearizability
import (
"os"
"path/filepath"
"testing"
"go.etcd.io/etcd/tests/v3/framework"
)
var testRunner = framework.E2eTestRunner
var resultsDirectory string
func TestMain(m *testing.M) {
var ok bool
var err error
resultsDirectory, ok = os.LookupEnv("RESULTS_DIR")
if !ok {
resultsDirectory = "/tmp/"
}
resultsDirectory, err = filepath.Abs(resultsDirectory)
if err != nil {
panic(err)
}
testRunner.TestMain(m)
}

View File

@ -0,0 +1,120 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package linearizability
import (
"encoding/json"
"fmt"
"github.com/anishathalye/porcupine"
)
type Operation int8
const Get Operation = 0
const Put Operation = 1
type etcdRequest struct {
op Operation
key string
putData string
}
type etcdResponse struct {
getData string
err error
}
type EtcdState struct {
Key string
Value string
FailedWrites map[string]struct{}
}
var etcdModel = porcupine.Model{
Init: func() interface{} { return "{}" },
Step: func(st interface{}, in interface{}, out interface{}) (bool, interface{}) {
var state EtcdState
err := json.Unmarshal([]byte(st.(string)), &state)
if err != nil {
panic(err)
}
if state.FailedWrites == nil {
state.FailedWrites = map[string]struct{}{}
}
ok, state := step(state, in.(etcdRequest), out.(etcdResponse))
data, err := json.Marshal(state)
if err != nil {
panic(err)
}
return ok, string(data)
},
DescribeOperation: func(in, out interface{}) string {
request := in.(etcdRequest)
response := out.(etcdResponse)
var resp string
switch request.op {
case Get:
if response.err != nil {
resp = response.err.Error()
} else {
resp = response.getData
}
return fmt.Sprintf("get(%q) -> %q", request.key, resp)
case Put:
if response.err != nil {
resp = response.err.Error()
} else {
resp = "ok"
}
return fmt.Sprintf("put(%q, %q) -> %s", request.key, request.putData, resp)
default:
return "<invalid>"
}
},
}
func step(state EtcdState, request etcdRequest, response etcdResponse) (bool, EtcdState) {
if request.key == "" {
panic("Invalid request")
}
if state.Key == "" {
state.Key = request.key
}
if state.Key != request.key {
panic("Multiple keys not supported")
}
switch request.op {
case Get:
if state.Value == response.getData {
return true, state
}
for write := range state.FailedWrites {
if write == response.getData {
state.Value = response.getData
delete(state.FailedWrites, write)
return true, state
}
}
case Put:
if response.err == nil {
state.Value = request.putData
} else {
state.FailedWrites[request.putData] = struct{}{}
}
return true, state
}
return false, state
}

View File

@ -0,0 +1,83 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package linearizability
import (
"errors"
"github.com/anishathalye/porcupine"
"testing"
)
func TestModel(t *testing.T) {
tcs := []struct {
name string
okOperations []porcupine.Operation
failOperation *porcupine.Operation
}{
{
name: "Etcd must return what was written",
okOperations: []porcupine.Operation{
{Input: etcdRequest{op: Put, key: "key", putData: "1"}, Output: etcdResponse{}},
{Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: "1"}},
},
failOperation: &porcupine.Operation{Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: "2"}},
},
{
name: "Etcd can crash after storing result but before returning success to client",
okOperations: []porcupine.Operation{
{Input: etcdRequest{op: Put, key: "key", putData: "1"}, Output: etcdResponse{err: errors.New("failed")}},
{Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: "1"}},
},
},
{
name: "Etcd can crash before storing result",
okOperations: []porcupine.Operation{
{Input: etcdRequest{op: Put, key: "key", putData: "1"}, Output: etcdResponse{err: errors.New("failed")}},
{Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: ""}},
},
},
{
name: "Etcd can continue errored request after it failed",
okOperations: []porcupine.Operation{
{Input: etcdRequest{op: Put, key: "key", putData: "1"}, Output: etcdResponse{err: errors.New("failed")}},
{Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: ""}},
{Input: etcdRequest{op: Put, key: "key"}, Output: etcdResponse{getData: "2"}},
{Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: "1"}},
},
failOperation: &porcupine.Operation{Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: ""}},
},
}
for _, tc := range tcs {
var ok bool
t.Run(tc.name, func(t *testing.T) {
state := etcdModel.Init()
for _, op := range tc.okOperations {
t.Logf("state: %v", state)
ok, state = etcdModel.Step(state, op.Input, op.Output)
if !ok {
t.Errorf("Unexpected failed operation: %s", etcdModel.DescribeOperation(op.Input, op.Output))
}
}
if tc.failOperation != nil {
t.Logf("state: %v", state)
ok, state = etcdModel.Step(state, tc.failOperation.Input, tc.failOperation.Output)
if ok {
t.Errorf("Unexpected succesfull operation: %s", etcdModel.DescribeOperation(tc.failOperation.Input, tc.failOperation.Output))
}
}
})
}
}

View File

@ -0,0 +1,64 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package linearizability
import (
"context"
"fmt"
"time"
"golang.org/x/time/rate"
)
var (
PutGetTraffic Traffic = putGetTraffic{}
)
type Traffic interface {
Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter)
}
type putGetTraffic struct{}
func (t putGetTraffic) Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter) {
maxOperationsPerClient := 1000000
id := maxOperationsPerClient * c.id
key := "key"
for i := 0; i < maxOperationsPerClient; {
select {
case <-ctx.Done():
return
default:
}
getCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond)
err := c.Get(getCtx, key)
cancel()
if err != nil {
continue
}
limiter.Wait(ctx)
putData := fmt.Sprintf("%d", id+i)
putCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond)
err = c.Put(putCtx, key, putData)
cancel()
if err != nil {
continue
}
limiter.Wait(ctx)
i++
}
return
}