clientv3: ctx-ize KV

release-2.3
Anthony Romano 2016-02-09 17:33:30 -08:00
parent a60da8f3ce
commit 51c4894f62
8 changed files with 79 additions and 61 deletions

View File

@ -39,6 +39,7 @@ func TestKVPut(t *testing.T) {
defer lapi.Close()
kv := clientv3.NewKV(clus.RandClient())
ctx := context.TODO()
resp, err := lapi.Create(context.Background(), 10)
if err != nil {
@ -54,10 +55,10 @@ func TestKVPut(t *testing.T) {
}
for i, tt := range tests {
if _, err := kv.Put(tt.key, tt.val, tt.leaseID); err != nil {
if _, err := kv.Put(ctx, tt.key, tt.val, tt.leaseID); err != nil {
t.Fatalf("#%d: couldn't put %q (%v)", i, tt.key, err)
}
resp, err := kv.Get(tt.key)
resp, err := kv.Get(ctx, tt.key)
if err != nil {
t.Fatalf("#%d: couldn't get key (%v)", i, err)
}
@ -80,14 +81,15 @@ func TestKVRange(t *testing.T) {
defer clus.Terminate(t)
kv := clientv3.NewKV(clus.RandClient())
ctx := context.TODO()
keySet := []string{"a", "b", "c", "c", "c", "foo", "foo/abc", "fop"}
for i, key := range keySet {
if _, err := kv.Put(key, "", lease.NoLease); err != nil {
if _, err := kv.Put(ctx, key, "", lease.NoLease); err != nil {
t.Fatalf("#%d: couldn't put %q (%v)", i, key, err)
}
}
resp, err := kv.Get(keySet[0])
resp, err := kv.Get(ctx, keySet[0])
if err != nil {
t.Fatalf("couldn't get key (%v)", err)
}
@ -173,7 +175,7 @@ func TestKVRange(t *testing.T) {
if tt.sortOption != nil {
opts = append(opts, clientv3.WithSort(tt.sortOption.Target, tt.sortOption.Order))
}
resp, err := kv.Get(tt.begin, opts...)
resp, err := kv.Get(ctx, tt.begin, opts...)
if err != nil {
t.Fatalf("#%d: couldn't range (%v)", i, err)
}
@ -193,10 +195,11 @@ func TestKVDeleteRange(t *testing.T) {
defer clus.Terminate(t)
kv := clientv3.NewKV(clus.RandClient())
ctx := context.TODO()
keySet := []string{"a", "b", "c", "c", "c", "d", "e", "f"}
for i, key := range keySet {
if _, err := kv.Put(key, "", lease.NoLease); err != nil {
if _, err := kv.Put(ctx, key, "", lease.NoLease); err != nil {
t.Fatalf("#%d: couldn't put %q (%v)", i, key, err)
}
}
@ -210,14 +213,14 @@ func TestKVDeleteRange(t *testing.T) {
}
for i, tt := range tests {
dresp, err := kv.DeleteRange(tt.key, tt.end)
dresp, err := kv.DeleteRange(ctx, tt.key, tt.end)
if err != nil {
t.Fatalf("#%d: couldn't delete range (%v)", i, err)
}
if dresp.Header.Revision != tt.delRev {
t.Fatalf("#%d: dresp.Header.Revision got %d, want %d", i, dresp.Header.Revision, tt.delRev)
}
resp, err := kv.Get(tt.key, clientv3.WithRange(tt.end))
resp, err := kv.Get(ctx, tt.key, clientv3.WithRange(tt.end))
if err != nil {
t.Fatalf("#%d: couldn't get key (%v)", i, err)
}
@ -234,22 +237,23 @@ func TestKVDelete(t *testing.T) {
defer clus.Terminate(t)
kv := clientv3.NewKV(clus.RandClient())
ctx := context.TODO()
presp, err := kv.Put("foo", "", lease.NoLease)
presp, err := kv.Put(ctx, "foo", "", lease.NoLease)
if err != nil {
t.Fatalf("couldn't put 'foo' (%v)", err)
}
if presp.Header.Revision != 2 {
t.Fatalf("presp.Header.Revision got %d, want %d", presp.Header.Revision, 2)
}
resp, err := kv.Delete("foo")
resp, err := kv.Delete(ctx, "foo")
if err != nil {
t.Fatalf("couldn't delete key (%v)", err)
}
if resp.Header.Revision != 3 {
t.Fatalf("resp.Header.Revision got %d, want %d", resp.Header.Revision, 3)
}
gresp, err := kv.Get("foo")
gresp, err := kv.Get(ctx, "foo")
if err != nil {
t.Fatalf("couldn't get key (%v)", err)
}
@ -265,32 +269,33 @@ func TestKVCompact(t *testing.T) {
defer clus.Terminate(t)
kv := clientv3.NewKV(clus.RandClient())
ctx := context.TODO()
for i := 0; i < 10; i++ {
if _, err := kv.Put("foo", "bar", lease.NoLease); err != nil {
if _, err := kv.Put(ctx, "foo", "bar", lease.NoLease); err != nil {
t.Fatalf("couldn't put 'foo' (%v)", err)
}
}
err := kv.Compact(7)
err := kv.Compact(ctx, 7)
if err != nil {
t.Fatalf("couldn't compact kv space (%v)", err)
}
err = kv.Compact(7)
err = kv.Compact(ctx, 7)
if err == nil || err != v3rpc.ErrCompacted {
t.Fatalf("error got %v, want %v", err, v3rpc.ErrFutureRev)
}
wc := clientv3.NewWatcher(clus.RandClient())
defer wc.Close()
wchan := wc.Watch(context.TODO(), "foo", 3)
wchan := wc.Watch(ctx, "foo", 3)
_, ok := <-wchan
if ok {
t.Fatalf("wchan ok got %v, want false", ok)
}
err = kv.Compact(1000)
err = kv.Compact(ctx, 1000)
if err == nil || err != v3rpc.ErrFutureRev {
t.Fatalf("error got %v, want %v", err, v3rpc.ErrFutureRev)
}
@ -304,8 +309,9 @@ func TestKVGetRetry(t *testing.T) {
defer clus.Terminate(t)
kv := clientv3.NewKV(clus.Client(0))
ctx := context.TODO()
if _, err := kv.Put("foo", "bar", 0); err != nil {
if _, err := kv.Put(ctx, "foo", "bar", 0); err != nil {
t.Fatal(err)
}
@ -315,7 +321,7 @@ func TestKVGetRetry(t *testing.T) {
donec := make(chan struct{})
go func() {
// Get will fail, but reconnect will trigger
gresp, gerr := kv.Get("foo")
gresp, gerr := kv.Get(ctx, "foo")
if gerr != nil {
t.Fatal(gerr)
}
@ -352,10 +358,12 @@ func TestKVPutFailGetRetry(t *testing.T) {
defer clus.Terminate(t)
kv := clientv3.NewKV(clus.Client(0))
ctx := context.TODO()
clus.Members[0].Stop(t)
<-clus.Members[0].StopNotify()
_, err := kv.Put("foo", "bar", 0)
_, err := kv.Put(ctx, "foo", "bar", 0)
if err == nil {
t.Fatalf("got success on disconnected put, wanted error")
}
@ -363,7 +371,7 @@ func TestKVPutFailGetRetry(t *testing.T) {
donec := make(chan struct{})
go func() {
// Get will fail, but reconnect will trigger
gresp, gerr := kv.Get("foo")
gresp, gerr := kv.Get(ctx, "foo")
if gerr != nil {
t.Fatal(gerr)
}

View File

@ -42,7 +42,7 @@ func TestLeaseCreate(t *testing.T) {
t.Errorf("failed to create lease %v", err)
}
_, err = kv.Put("foo", "bar", lease.LeaseID(resp.ID))
_, err = kv.Put(context.TODO(), "foo", "bar", lease.LeaseID(resp.ID))
if err != nil {
t.Fatalf("failed to create key with lease %v", err)
}
@ -69,7 +69,7 @@ func TestLeaseRevoke(t *testing.T) {
t.Errorf("failed to revoke lease %v", err)
}
_, err = kv.Put("foo", "bar", lease.LeaseID(resp.ID))
_, err = kv.Put(context.TODO(), "foo", "bar", lease.LeaseID(resp.ID))
if err != v3rpc.ErrLeaseNotFound {
t.Fatalf("err = %v, want %v", err, v3rpc.ErrLeaseNotFound)
}

View File

@ -18,6 +18,7 @@ import (
"testing"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
@ -30,12 +31,14 @@ func TestTxnWriteFail(t *testing.T) {
defer clus.Terminate(t)
kv := clientv3.NewKV(clus.Client(0))
ctx := context.TODO()
clus.Members[0].Stop(t)
<-clus.Members[0].StopNotify()
donec := make(chan struct{})
go func() {
resp, err := kv.Txn().Then(clientv3.OpPut("foo", "bar", 0)).Commit()
resp, err := kv.Txn(ctx).Then(clientv3.OpPut("foo", "bar", 0)).Commit()
if err == nil {
t.Fatalf("expected error, got response %v", resp)
}
@ -57,7 +60,7 @@ func TestTxnWriteFail(t *testing.T) {
donec <- struct{}{}
// and ensure the put didn't take
gresp, gerr := kv.Get("foo")
gresp, gerr := kv.Get(ctx, "foo")
if gerr != nil {
t.Fatal(gerr)
}
@ -92,7 +95,8 @@ func TestTxnReadRetry(t *testing.T) {
donec := make(chan struct{})
go func() {
_, err := kv.Txn().Then(clientv3.OpGet("foo")).Commit()
ctx := context.TODO()
_, err := kv.Txn(ctx).Then(clientv3.OpGet("foo")).Commit()
if err != nil {
t.Fatalf("expected response, got error %v", err)
}
@ -117,12 +121,14 @@ func TestTxnSuccess(t *testing.T) {
defer clus.Terminate(t)
kv := clientv3.NewKV(clus.Client(0))
_, err := kv.Txn().Then(clientv3.OpPut("foo", "bar", 0)).Commit()
ctx := context.TODO()
_, err := kv.Txn(ctx).Then(clientv3.OpPut("foo", "bar", 0)).Commit()
if err != nil {
t.Fatal(err)
}
resp, err := kv.Get("foo")
resp, err := kv.Get(ctx, "foo")
if err != nil {
t.Fatal(err)
}

View File

@ -142,10 +142,11 @@ func testWatchMultiWatcher(t *testing.T, wctx *watchctx) {
<-readyc
}
// generate events
ctx := context.TODO()
for i := 0; i < numKeyUpdates; i++ {
for _, k := range keys {
v := fmt.Sprintf("%s-%d", k, i)
if _, err := wctx.kv.Put(k, v, 0); err != nil {
if _, err := wctx.kv.Put(ctx, k, v, 0); err != nil {
t.Fatal(err)
}
}
@ -220,7 +221,7 @@ func testWatchCancelRunning(t *testing.T, wctx *watchctx) {
if wctx.ch = wctx.w.Watch(ctx, "a", 0); wctx.ch == nil {
t.Fatalf("expected non-nil watcher channel")
}
if _, err := wctx.kv.Put("a", "a", 0); err != nil {
if _, err := wctx.kv.Put(ctx, "a", "a", 0); err != nil {
t.Fatal(err)
}
cancel()
@ -245,7 +246,7 @@ func testWatchCancelRunning(t *testing.T, wctx *watchctx) {
}
func putAndWatch(t *testing.T, wctx *watchctx, key, val string) {
if _, err := wctx.kv.Put(key, val, 0); err != nil {
if _, err := wctx.kv.Put(context.TODO(), key, val, 0); err != nil {
t.Fatal(err)
}
select {

View File

@ -36,7 +36,7 @@ type KV interface {
// Note that key,value can be plain bytes array and string is
// an immutable representation of that bytes array.
// To get a string of bytes, do string([]byte(0x10, 0x20)).
Put(key, val string, leaseID lease.LeaseID) (*PutResponse, error)
Put(ctx context.Context, key, val string, leaseID lease.LeaseID) (*PutResponse, error)
// Get retrieves keys.
// By default, Get will return the value for "key", if any.
@ -45,19 +45,19 @@ type KV interface {
// if the required revision is compacted, the request will fail with ErrCompacted .
// When passed WithLimit(limit), the number of returned keys is bounded by limit.
// When passed WithSort(), the keys will be sorted.
Get(key string, opts ...OpOption) (*GetResponse, error)
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
// DeleteRange deletes the given range [key, end).
DeleteRange(key, end string) (*DeleteRangeResponse, error)
DeleteRange(ctx context.Context, key, end string) (*DeleteRangeResponse, error)
// Delete is like DeleteRange. A shortcut for deleting single key like [key, key+1).
Delete(key string) (*DeleteResponse, error)
Delete(ctx context.Context, key string) (*DeleteResponse, error)
// Compact compacts etcd KV history before the given rev.
Compact(rev int64) error
Compact(ctx context.Context, rev int64) error
// Txn creates a transaction.
Txn() Txn
Txn(ctx context.Context) Txn
}
type kv struct {
@ -80,41 +80,41 @@ func NewKV(c *Client) KV {
}
}
func (kv *kv) Put(key, val string, leaseID lease.LeaseID) (*PutResponse, error) {
r, err := kv.do(OpPut(key, val, leaseID))
func (kv *kv) Put(ctx context.Context, key, val string, leaseID lease.LeaseID) (*PutResponse, error) {
r, err := kv.do(ctx, OpPut(key, val, leaseID))
if err != nil {
return nil, err
}
return (*PutResponse)(r.GetResponsePut()), nil
}
func (kv *kv) Get(key string, opts ...OpOption) (*GetResponse, error) {
r, err := kv.do(OpGet(key, opts...))
func (kv *kv) Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) {
r, err := kv.do(ctx, OpGet(key, opts...))
if err != nil {
return nil, err
}
return (*GetResponse)(r.GetResponseRange()), nil
}
func (kv *kv) DeleteRange(key, end string) (*DeleteRangeResponse, error) {
r, err := kv.do(OpDeleteRange(key, end))
func (kv *kv) DeleteRange(ctx context.Context, key, end string) (*DeleteRangeResponse, error) {
r, err := kv.do(ctx, OpDeleteRange(key, end))
if err != nil {
return nil, err
}
return (*DeleteRangeResponse)(r.GetResponseDeleteRange()), nil
}
func (kv *kv) Delete(key string) (*DeleteResponse, error) {
r, err := kv.do(OpDelete(key))
func (kv *kv) Delete(ctx context.Context, key string) (*DeleteResponse, error) {
r, err := kv.do(ctx, OpDelete(key))
if err != nil {
return nil, err
}
return (*DeleteResponse)(r.GetResponseDeleteRange()), nil
}
func (kv *kv) Compact(rev int64) error {
func (kv *kv) Compact(ctx context.Context, rev int64) error {
r := &pb.CompactionRequest{Revision: rev}
_, err := kv.getRemote().Compact(context.TODO(), r)
_, err := kv.getRemote().Compact(ctx, r)
if err == nil {
return nil
}
@ -127,13 +127,14 @@ func (kv *kv) Compact(rev int64) error {
return err
}
func (kv *kv) Txn() Txn {
func (kv *kv) Txn(ctx context.Context) Txn {
return &txn{
kv: kv,
kv: kv,
ctx: ctx,
}
}
func (kv *kv) do(op Op) (*pb.ResponseUnion, error) {
func (kv *kv) do(ctx context.Context, op Op) (*pb.ResponseUnion, error) {
for {
var err error
switch op.t {
@ -146,7 +147,7 @@ func (kv *kv) do(op Op) (*pb.ResponseUnion, error) {
r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target)
}
resp, err = kv.getRemote().Range(context.TODO(), r)
resp, err = kv.getRemote().Range(ctx, r)
if err == nil {
respu := &pb.ResponseUnion_ResponseRange{ResponseRange: resp}
return &pb.ResponseUnion{Response: respu}, nil
@ -154,7 +155,7 @@ func (kv *kv) do(op Op) (*pb.ResponseUnion, error) {
case tPut:
var resp *pb.PutResponse
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)}
resp, err = kv.getRemote().Put(context.TODO(), r)
resp, err = kv.getRemote().Put(ctx, r)
if err == nil {
respu := &pb.ResponseUnion_ResponsePut{ResponsePut: resp}
return &pb.ResponseUnion{Response: respu}, nil
@ -162,7 +163,7 @@ func (kv *kv) do(op Op) (*pb.ResponseUnion, error) {
case tDeleteRange:
var resp *pb.DeleteRangeResponse
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end}
resp, err = kv.getRemote().DeleteRange(context.TODO(), r)
resp, err = kv.getRemote().DeleteRange(ctx, r)
if err == nil {
respu := &pb.ResponseUnion_ResponseDeleteRange{ResponseDeleteRange: resp}
return &pb.ResponseUnion{Response: respu}, nil

View File

@ -51,7 +51,8 @@ type Txn interface {
}
type txn struct {
kv *kv
kv *kv
ctx context.Context
mu sync.Mutex
cif bool
@ -138,7 +139,7 @@ func (txn *txn) Commit() (*TxnResponse, error) {
for {
r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
resp, err := kv.getRemote().Txn(context.TODO(), r)
resp, err := kv.getRemote().Txn(txn.ctx, r)
if err == nil {
return (*TxnResponse)(resp), nil
}

View File

@ -44,7 +44,7 @@ func TestTxnPanics(t *testing.T) {
{
f: func() {
defer df()
kv.Txn().If(cmp).If(cmp)
kv.Txn(nil).If(cmp).If(cmp)
},
err: "cannot call If twice!",
@ -52,7 +52,7 @@ func TestTxnPanics(t *testing.T) {
{
f: func() {
defer df()
kv.Txn().Then(op).If(cmp)
kv.Txn(nil).Then(op).If(cmp)
},
err: "cannot call If after Then!",
@ -60,7 +60,7 @@ func TestTxnPanics(t *testing.T) {
{
f: func() {
defer df()
kv.Txn().Else(op).If(cmp)
kv.Txn(nil).Else(op).If(cmp)
},
err: "cannot call If after Else!",
@ -68,7 +68,7 @@ func TestTxnPanics(t *testing.T) {
{
f: func() {
defer df()
kv.Txn().Then(op).Then(op)
kv.Txn(nil).Then(op).Then(op)
},
err: "cannot call Then twice!",
@ -76,7 +76,7 @@ func TestTxnPanics(t *testing.T) {
{
f: func() {
defer df()
kv.Txn().Else(op).Then(op)
kv.Txn(nil).Else(op).Then(op)
},
err: "cannot call Then after Else!",
@ -84,7 +84,7 @@ func TestTxnPanics(t *testing.T) {
{
f: func() {
defer df()
kv.Txn().Else(op).Else(op)
kv.Txn(nil).Else(op).Else(op)
},
err: "cannot call Else twice!",

View File

@ -19,6 +19,7 @@ import (
"strconv"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
)
@ -43,7 +44,7 @@ func compactionCommandFunc(cmd *cobra.Command, args []string) {
}
c := mustClient(cmd)
if cerr := clientv3.NewKV(c).Compact(rev); cerr != nil {
if cerr := clientv3.NewKV(c).Compact(context.TODO(), rev); cerr != nil {
ExitWithError(ExitError, cerr)
return
}