diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index d83a86dba..f2b69b238 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -39,6 +39,7 @@ func TestKVPut(t *testing.T) { defer lapi.Close() kv := clientv3.NewKV(clus.RandClient()) + ctx := context.TODO() resp, err := lapi.Create(context.Background(), 10) if err != nil { @@ -54,10 +55,10 @@ func TestKVPut(t *testing.T) { } for i, tt := range tests { - if _, err := kv.Put(tt.key, tt.val, tt.leaseID); err != nil { + if _, err := kv.Put(ctx, tt.key, tt.val, tt.leaseID); err != nil { t.Fatalf("#%d: couldn't put %q (%v)", i, tt.key, err) } - resp, err := kv.Get(tt.key) + resp, err := kv.Get(ctx, tt.key) if err != nil { t.Fatalf("#%d: couldn't get key (%v)", i, err) } @@ -80,14 +81,15 @@ func TestKVRange(t *testing.T) { defer clus.Terminate(t) kv := clientv3.NewKV(clus.RandClient()) + ctx := context.TODO() keySet := []string{"a", "b", "c", "c", "c", "foo", "foo/abc", "fop"} for i, key := range keySet { - if _, err := kv.Put(key, "", lease.NoLease); err != nil { + if _, err := kv.Put(ctx, key, "", lease.NoLease); err != nil { t.Fatalf("#%d: couldn't put %q (%v)", i, key, err) } } - resp, err := kv.Get(keySet[0]) + resp, err := kv.Get(ctx, keySet[0]) if err != nil { t.Fatalf("couldn't get key (%v)", err) } @@ -173,7 +175,7 @@ func TestKVRange(t *testing.T) { if tt.sortOption != nil { opts = append(opts, clientv3.WithSort(tt.sortOption.Target, tt.sortOption.Order)) } - resp, err := kv.Get(tt.begin, opts...) + resp, err := kv.Get(ctx, tt.begin, opts...) if err != nil { t.Fatalf("#%d: couldn't range (%v)", i, err) } @@ -193,10 +195,11 @@ func TestKVDeleteRange(t *testing.T) { defer clus.Terminate(t) kv := clientv3.NewKV(clus.RandClient()) + ctx := context.TODO() keySet := []string{"a", "b", "c", "c", "c", "d", "e", "f"} for i, key := range keySet { - if _, err := kv.Put(key, "", lease.NoLease); err != nil { + if _, err := kv.Put(ctx, key, "", lease.NoLease); err != nil { t.Fatalf("#%d: couldn't put %q (%v)", i, key, err) } } @@ -210,14 +213,14 @@ func TestKVDeleteRange(t *testing.T) { } for i, tt := range tests { - dresp, err := kv.DeleteRange(tt.key, tt.end) + dresp, err := kv.DeleteRange(ctx, tt.key, tt.end) if err != nil { t.Fatalf("#%d: couldn't delete range (%v)", i, err) } if dresp.Header.Revision != tt.delRev { t.Fatalf("#%d: dresp.Header.Revision got %d, want %d", i, dresp.Header.Revision, tt.delRev) } - resp, err := kv.Get(tt.key, clientv3.WithRange(tt.end)) + resp, err := kv.Get(ctx, tt.key, clientv3.WithRange(tt.end)) if err != nil { t.Fatalf("#%d: couldn't get key (%v)", i, err) } @@ -234,22 +237,23 @@ func TestKVDelete(t *testing.T) { defer clus.Terminate(t) kv := clientv3.NewKV(clus.RandClient()) + ctx := context.TODO() - presp, err := kv.Put("foo", "", lease.NoLease) + presp, err := kv.Put(ctx, "foo", "", lease.NoLease) if err != nil { t.Fatalf("couldn't put 'foo' (%v)", err) } if presp.Header.Revision != 2 { t.Fatalf("presp.Header.Revision got %d, want %d", presp.Header.Revision, 2) } - resp, err := kv.Delete("foo") + resp, err := kv.Delete(ctx, "foo") if err != nil { t.Fatalf("couldn't delete key (%v)", err) } if resp.Header.Revision != 3 { t.Fatalf("resp.Header.Revision got %d, want %d", resp.Header.Revision, 3) } - gresp, err := kv.Get("foo") + gresp, err := kv.Get(ctx, "foo") if err != nil { t.Fatalf("couldn't get key (%v)", err) } @@ -265,32 +269,33 @@ func TestKVCompact(t *testing.T) { defer clus.Terminate(t) kv := clientv3.NewKV(clus.RandClient()) + ctx := context.TODO() for i := 0; i < 10; i++ { - if _, err := kv.Put("foo", "bar", lease.NoLease); err != nil { + if _, err := kv.Put(ctx, "foo", "bar", lease.NoLease); err != nil { t.Fatalf("couldn't put 'foo' (%v)", err) } } - err := kv.Compact(7) + err := kv.Compact(ctx, 7) if err != nil { t.Fatalf("couldn't compact kv space (%v)", err) } - err = kv.Compact(7) + err = kv.Compact(ctx, 7) if err == nil || err != v3rpc.ErrCompacted { t.Fatalf("error got %v, want %v", err, v3rpc.ErrFutureRev) } wc := clientv3.NewWatcher(clus.RandClient()) defer wc.Close() - wchan := wc.Watch(context.TODO(), "foo", 3) + wchan := wc.Watch(ctx, "foo", 3) _, ok := <-wchan if ok { t.Fatalf("wchan ok got %v, want false", ok) } - err = kv.Compact(1000) + err = kv.Compact(ctx, 1000) if err == nil || err != v3rpc.ErrFutureRev { t.Fatalf("error got %v, want %v", err, v3rpc.ErrFutureRev) } @@ -304,8 +309,9 @@ func TestKVGetRetry(t *testing.T) { defer clus.Terminate(t) kv := clientv3.NewKV(clus.Client(0)) + ctx := context.TODO() - if _, err := kv.Put("foo", "bar", 0); err != nil { + if _, err := kv.Put(ctx, "foo", "bar", 0); err != nil { t.Fatal(err) } @@ -315,7 +321,7 @@ func TestKVGetRetry(t *testing.T) { donec := make(chan struct{}) go func() { // Get will fail, but reconnect will trigger - gresp, gerr := kv.Get("foo") + gresp, gerr := kv.Get(ctx, "foo") if gerr != nil { t.Fatal(gerr) } @@ -352,10 +358,12 @@ func TestKVPutFailGetRetry(t *testing.T) { defer clus.Terminate(t) kv := clientv3.NewKV(clus.Client(0)) + ctx := context.TODO() + clus.Members[0].Stop(t) <-clus.Members[0].StopNotify() - _, err := kv.Put("foo", "bar", 0) + _, err := kv.Put(ctx, "foo", "bar", 0) if err == nil { t.Fatalf("got success on disconnected put, wanted error") } @@ -363,7 +371,7 @@ func TestKVPutFailGetRetry(t *testing.T) { donec := make(chan struct{}) go func() { // Get will fail, but reconnect will trigger - gresp, gerr := kv.Get("foo") + gresp, gerr := kv.Get(ctx, "foo") if gerr != nil { t.Fatal(gerr) } diff --git a/clientv3/integration/lease_test.go b/clientv3/integration/lease_test.go index 89b6eb7e9..fdbdfe470 100644 --- a/clientv3/integration/lease_test.go +++ b/clientv3/integration/lease_test.go @@ -42,7 +42,7 @@ func TestLeaseCreate(t *testing.T) { t.Errorf("failed to create lease %v", err) } - _, err = kv.Put("foo", "bar", lease.LeaseID(resp.ID)) + _, err = kv.Put(context.TODO(), "foo", "bar", lease.LeaseID(resp.ID)) if err != nil { t.Fatalf("failed to create key with lease %v", err) } @@ -69,7 +69,7 @@ func TestLeaseRevoke(t *testing.T) { t.Errorf("failed to revoke lease %v", err) } - _, err = kv.Put("foo", "bar", lease.LeaseID(resp.ID)) + _, err = kv.Put(context.TODO(), "foo", "bar", lease.LeaseID(resp.ID)) if err != v3rpc.ErrLeaseNotFound { t.Fatalf("err = %v, want %v", err, v3rpc.ErrLeaseNotFound) } diff --git a/clientv3/integration/txn_test.go b/clientv3/integration/txn_test.go index 2c4f87524..6cf0f8e48 100644 --- a/clientv3/integration/txn_test.go +++ b/clientv3/integration/txn_test.go @@ -18,6 +18,7 @@ import ( "testing" "time" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/integration" "github.com/coreos/etcd/pkg/testutil" @@ -30,12 +31,14 @@ func TestTxnWriteFail(t *testing.T) { defer clus.Terminate(t) kv := clientv3.NewKV(clus.Client(0)) + ctx := context.TODO() + clus.Members[0].Stop(t) <-clus.Members[0].StopNotify() donec := make(chan struct{}) go func() { - resp, err := kv.Txn().Then(clientv3.OpPut("foo", "bar", 0)).Commit() + resp, err := kv.Txn(ctx).Then(clientv3.OpPut("foo", "bar", 0)).Commit() if err == nil { t.Fatalf("expected error, got response %v", resp) } @@ -57,7 +60,7 @@ func TestTxnWriteFail(t *testing.T) { donec <- struct{}{} // and ensure the put didn't take - gresp, gerr := kv.Get("foo") + gresp, gerr := kv.Get(ctx, "foo") if gerr != nil { t.Fatal(gerr) } @@ -92,7 +95,8 @@ func TestTxnReadRetry(t *testing.T) { donec := make(chan struct{}) go func() { - _, err := kv.Txn().Then(clientv3.OpGet("foo")).Commit() + ctx := context.TODO() + _, err := kv.Txn(ctx).Then(clientv3.OpGet("foo")).Commit() if err != nil { t.Fatalf("expected response, got error %v", err) } @@ -117,12 +121,14 @@ func TestTxnSuccess(t *testing.T) { defer clus.Terminate(t) kv := clientv3.NewKV(clus.Client(0)) - _, err := kv.Txn().Then(clientv3.OpPut("foo", "bar", 0)).Commit() + ctx := context.TODO() + + _, err := kv.Txn(ctx).Then(clientv3.OpPut("foo", "bar", 0)).Commit() if err != nil { t.Fatal(err) } - resp, err := kv.Get("foo") + resp, err := kv.Get(ctx, "foo") if err != nil { t.Fatal(err) } diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index 93e298786..e343214b0 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -142,10 +142,11 @@ func testWatchMultiWatcher(t *testing.T, wctx *watchctx) { <-readyc } // generate events + ctx := context.TODO() for i := 0; i < numKeyUpdates; i++ { for _, k := range keys { v := fmt.Sprintf("%s-%d", k, i) - if _, err := wctx.kv.Put(k, v, 0); err != nil { + if _, err := wctx.kv.Put(ctx, k, v, 0); err != nil { t.Fatal(err) } } @@ -220,7 +221,7 @@ func testWatchCancelRunning(t *testing.T, wctx *watchctx) { if wctx.ch = wctx.w.Watch(ctx, "a", 0); wctx.ch == nil { t.Fatalf("expected non-nil watcher channel") } - if _, err := wctx.kv.Put("a", "a", 0); err != nil { + if _, err := wctx.kv.Put(ctx, "a", "a", 0); err != nil { t.Fatal(err) } cancel() @@ -245,7 +246,7 @@ func testWatchCancelRunning(t *testing.T, wctx *watchctx) { } func putAndWatch(t *testing.T, wctx *watchctx, key, val string) { - if _, err := wctx.kv.Put(key, val, 0); err != nil { + if _, err := wctx.kv.Put(context.TODO(), key, val, 0); err != nil { t.Fatal(err) } select { diff --git a/clientv3/kv.go b/clientv3/kv.go index 383c392ec..aa60a4209 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -36,7 +36,7 @@ type KV interface { // Note that key,value can be plain bytes array and string is // an immutable representation of that bytes array. // To get a string of bytes, do string([]byte(0x10, 0x20)). - Put(key, val string, leaseID lease.LeaseID) (*PutResponse, error) + Put(ctx context.Context, key, val string, leaseID lease.LeaseID) (*PutResponse, error) // Get retrieves keys. // By default, Get will return the value for "key", if any. @@ -45,19 +45,19 @@ type KV interface { // if the required revision is compacted, the request will fail with ErrCompacted . // When passed WithLimit(limit), the number of returned keys is bounded by limit. // When passed WithSort(), the keys will be sorted. - Get(key string, opts ...OpOption) (*GetResponse, error) + Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) // DeleteRange deletes the given range [key, end). - DeleteRange(key, end string) (*DeleteRangeResponse, error) + DeleteRange(ctx context.Context, key, end string) (*DeleteRangeResponse, error) // Delete is like DeleteRange. A shortcut for deleting single key like [key, key+1). - Delete(key string) (*DeleteResponse, error) + Delete(ctx context.Context, key string) (*DeleteResponse, error) // Compact compacts etcd KV history before the given rev. - Compact(rev int64) error + Compact(ctx context.Context, rev int64) error // Txn creates a transaction. - Txn() Txn + Txn(ctx context.Context) Txn } type kv struct { @@ -80,41 +80,41 @@ func NewKV(c *Client) KV { } } -func (kv *kv) Put(key, val string, leaseID lease.LeaseID) (*PutResponse, error) { - r, err := kv.do(OpPut(key, val, leaseID)) +func (kv *kv) Put(ctx context.Context, key, val string, leaseID lease.LeaseID) (*PutResponse, error) { + r, err := kv.do(ctx, OpPut(key, val, leaseID)) if err != nil { return nil, err } return (*PutResponse)(r.GetResponsePut()), nil } -func (kv *kv) Get(key string, opts ...OpOption) (*GetResponse, error) { - r, err := kv.do(OpGet(key, opts...)) +func (kv *kv) Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) { + r, err := kv.do(ctx, OpGet(key, opts...)) if err != nil { return nil, err } return (*GetResponse)(r.GetResponseRange()), nil } -func (kv *kv) DeleteRange(key, end string) (*DeleteRangeResponse, error) { - r, err := kv.do(OpDeleteRange(key, end)) +func (kv *kv) DeleteRange(ctx context.Context, key, end string) (*DeleteRangeResponse, error) { + r, err := kv.do(ctx, OpDeleteRange(key, end)) if err != nil { return nil, err } return (*DeleteRangeResponse)(r.GetResponseDeleteRange()), nil } -func (kv *kv) Delete(key string) (*DeleteResponse, error) { - r, err := kv.do(OpDelete(key)) +func (kv *kv) Delete(ctx context.Context, key string) (*DeleteResponse, error) { + r, err := kv.do(ctx, OpDelete(key)) if err != nil { return nil, err } return (*DeleteResponse)(r.GetResponseDeleteRange()), nil } -func (kv *kv) Compact(rev int64) error { +func (kv *kv) Compact(ctx context.Context, rev int64) error { r := &pb.CompactionRequest{Revision: rev} - _, err := kv.getRemote().Compact(context.TODO(), r) + _, err := kv.getRemote().Compact(ctx, r) if err == nil { return nil } @@ -127,13 +127,14 @@ func (kv *kv) Compact(rev int64) error { return err } -func (kv *kv) Txn() Txn { +func (kv *kv) Txn(ctx context.Context) Txn { return &txn{ - kv: kv, + kv: kv, + ctx: ctx, } } -func (kv *kv) do(op Op) (*pb.ResponseUnion, error) { +func (kv *kv) do(ctx context.Context, op Op) (*pb.ResponseUnion, error) { for { var err error switch op.t { @@ -146,7 +147,7 @@ func (kv *kv) do(op Op) (*pb.ResponseUnion, error) { r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target) } - resp, err = kv.getRemote().Range(context.TODO(), r) + resp, err = kv.getRemote().Range(ctx, r) if err == nil { respu := &pb.ResponseUnion_ResponseRange{ResponseRange: resp} return &pb.ResponseUnion{Response: respu}, nil @@ -154,7 +155,7 @@ func (kv *kv) do(op Op) (*pb.ResponseUnion, error) { case tPut: var resp *pb.PutResponse r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)} - resp, err = kv.getRemote().Put(context.TODO(), r) + resp, err = kv.getRemote().Put(ctx, r) if err == nil { respu := &pb.ResponseUnion_ResponsePut{ResponsePut: resp} return &pb.ResponseUnion{Response: respu}, nil @@ -162,7 +163,7 @@ func (kv *kv) do(op Op) (*pb.ResponseUnion, error) { case tDeleteRange: var resp *pb.DeleteRangeResponse r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end} - resp, err = kv.getRemote().DeleteRange(context.TODO(), r) + resp, err = kv.getRemote().DeleteRange(ctx, r) if err == nil { respu := &pb.ResponseUnion_ResponseDeleteRange{ResponseDeleteRange: resp} return &pb.ResponseUnion{Response: respu}, nil diff --git a/clientv3/txn.go b/clientv3/txn.go index 8822d60c5..170fa6ba7 100644 --- a/clientv3/txn.go +++ b/clientv3/txn.go @@ -51,7 +51,8 @@ type Txn interface { } type txn struct { - kv *kv + kv *kv + ctx context.Context mu sync.Mutex cif bool @@ -138,7 +139,7 @@ func (txn *txn) Commit() (*TxnResponse, error) { for { r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas} - resp, err := kv.getRemote().Txn(context.TODO(), r) + resp, err := kv.getRemote().Txn(txn.ctx, r) if err == nil { return (*TxnResponse)(resp), nil } diff --git a/clientv3/txn_test.go b/clientv3/txn_test.go index 1e0bcb8c8..d6bc1e9d4 100644 --- a/clientv3/txn_test.go +++ b/clientv3/txn_test.go @@ -44,7 +44,7 @@ func TestTxnPanics(t *testing.T) { { f: func() { defer df() - kv.Txn().If(cmp).If(cmp) + kv.Txn(nil).If(cmp).If(cmp) }, err: "cannot call If twice!", @@ -52,7 +52,7 @@ func TestTxnPanics(t *testing.T) { { f: func() { defer df() - kv.Txn().Then(op).If(cmp) + kv.Txn(nil).Then(op).If(cmp) }, err: "cannot call If after Then!", @@ -60,7 +60,7 @@ func TestTxnPanics(t *testing.T) { { f: func() { defer df() - kv.Txn().Else(op).If(cmp) + kv.Txn(nil).Else(op).If(cmp) }, err: "cannot call If after Else!", @@ -68,7 +68,7 @@ func TestTxnPanics(t *testing.T) { { f: func() { defer df() - kv.Txn().Then(op).Then(op) + kv.Txn(nil).Then(op).Then(op) }, err: "cannot call Then twice!", @@ -76,7 +76,7 @@ func TestTxnPanics(t *testing.T) { { f: func() { defer df() - kv.Txn().Else(op).Then(op) + kv.Txn(nil).Else(op).Then(op) }, err: "cannot call Then after Else!", @@ -84,7 +84,7 @@ func TestTxnPanics(t *testing.T) { { f: func() { defer df() - kv.Txn().Else(op).Else(op) + kv.Txn(nil).Else(op).Else(op) }, err: "cannot call Else twice!", diff --git a/etcdctlv3/command/compaction_command.go b/etcdctlv3/command/compaction_command.go index 5d60050e4..969922931 100644 --- a/etcdctlv3/command/compaction_command.go +++ b/etcdctlv3/command/compaction_command.go @@ -19,6 +19,7 @@ import ( "strconv" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/clientv3" ) @@ -43,7 +44,7 @@ func compactionCommandFunc(cmd *cobra.Command, args []string) { } c := mustClient(cmd) - if cerr := clientv3.NewKV(c).Compact(rev); cerr != nil { + if cerr := clientv3.NewKV(c).Compact(context.TODO(), rev); cerr != nil { ExitWithError(ExitError, cerr) return }