Merge pull request #4306 from heyitsanthony/v3-client

replace raw v3 grpc connections with clientv3.Client
release-2.3
Anthony Romano 2016-01-27 14:52:40 -08:00
commit 163812246f
30 changed files with 308 additions and 358 deletions

98
clientv3/client.go Normal file
View File

@ -0,0 +1,98 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clientv3
import (
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
type Client struct {
// KV is the keyvalue API for the client's connection.
KV pb.KVClient
// Lease is the lease API for the client's connection.
Lease pb.LeaseClient
// Watch is the watch API for the client's connection.
Watch pb.WatchClient
// Cluster is the cluster API for the client's connection.
Cluster pb.ClusterClient
conn *grpc.ClientConn
cfg Config
}
type Config struct {
// Endpoints is a list of URLs
Endpoints []string
// TODO TLS options
}
// New creates a new etcdv3 client from a given configuration.
func New(cfg Config) (*Client, error) {
conn, err := cfg.dialEndpoints()
if err != nil {
return nil, err
}
client := newClient(conn)
client.cfg = cfg
return client, nil
}
// NewFromURL creates a new etcdv3 client from a URL.
func NewFromURL(url string) (*Client, error) {
return New(Config{Endpoints: []string{url}})
}
// NewFromConn creates a new etcdv3 client from an established grpc Connection.
func NewFromConn(conn *grpc.ClientConn) *Client {
return newClient(conn)
}
// Clone creates a copy of client with the old connection and new API clients.
func (c *Client) Clone() *Client {
cl := newClient(c.conn)
cl.cfg = c.cfg
return cl
}
// Close shuts down the client's etcd connections.
func (c *Client) Close() error {
return c.conn.Close()
}
func newClient(conn *grpc.ClientConn) *Client {
return &Client{
KV: pb.NewKVClient(conn),
Lease: pb.NewLeaseClient(conn),
Watch: pb.NewWatchClient(conn),
Cluster: pb.NewClusterClient(conn),
conn: conn,
}
}
func (cfg *Config) dialEndpoints() (*grpc.ClientConn, error) {
var err error
for _, ep := range cfg.Endpoints {
// TODO: enable grpc.WithTransportCredentials(creds)
conn, curErr := grpc.Dial(ep, grpc.WithInsecure())
if err != nil {
err = curErr
} else {
return conn, nil
}
}
return nil, err
}

View File

@ -16,6 +16,7 @@ package recipe
import (
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/storage/storagepb"
)
@ -23,11 +24,11 @@ import (
// Barrier creates a key in etcd to block processes, then deletes the key to
// release all blocked processes.
type Barrier struct {
client *EtcdClient
client *clientv3.Client
key string
}
func NewBarrier(client *EtcdClient, key string) *Barrier {
func NewBarrier(client *clientv3.Client, key string) *Barrier {
return &Barrier{client, key}
}

View File

@ -18,7 +18,6 @@ import (
"errors"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
spb "github.com/coreos/etcd/storage/storagepb"
)
@ -28,22 +27,8 @@ var (
ErrWaitMismatch = errors.New("unexpected wait result")
)
type EtcdClient struct {
conn *grpc.ClientConn
KV pb.KVClient
Lease pb.LeaseClient
Watch pb.WatchClient
}
func NewEtcdClient(conn *grpc.ClientConn) *EtcdClient {
kv := pb.NewKVClient(conn)
lease := pb.NewLeaseClient(conn)
watch := pb.NewWatchClient(conn)
return &EtcdClient{conn, kv, lease, watch}
}
// deleteRevKey deletes a key by revision, returning false if key is missing
func (ec *EtcdClient) deleteRevKey(key string, rev int64) (bool, error) {
func deleteRevKey(kvc pb.KVClient, key string, rev int64) (bool, error) {
cmp := &pb.Compare{
Result: pb.Compare_EQUAL,
Target: pb.Compare_MOD,
@ -52,7 +37,7 @@ func (ec *EtcdClient) deleteRevKey(key string, rev int64) (bool, error) {
}
req := &pb.RequestUnion{Request: &pb.RequestUnion_RequestDeleteRange{
RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte(key)}}}
txnresp, err := ec.KV.Txn(
txnresp, err := kvc.Txn(
context.TODO(),
&pb.TxnRequest{
Compare: []*pb.Compare{cmp},
@ -67,9 +52,9 @@ func (ec *EtcdClient) deleteRevKey(key string, rev int64) (bool, error) {
return true, nil
}
func (ec *EtcdClient) claimFirstKey(kvs []*spb.KeyValue) (*spb.KeyValue, error) {
func claimFirstKey(kvc pb.KVClient, kvs []*spb.KeyValue) (*spb.KeyValue, error) {
for _, kv := range kvs {
ok, err := ec.deleteRevKey(string(kv.Key), kv.ModRevision)
ok, err := deleteRevKey(kvc, string(kv.Key), kv.ModRevision)
if err != nil {
return nil, err
} else if ok {

View File

@ -20,23 +20,24 @@ import (
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/lease"
)
// Key is a key/revision pair created by the client and stored on etcd
type RemoteKV struct {
client *EtcdClient
client *clientv3.Client
key string
rev int64
val string
}
func NewKey(client *EtcdClient, key string, leaseID lease.LeaseID) (*RemoteKV, error) {
func NewKey(client *clientv3.Client, key string, leaseID lease.LeaseID) (*RemoteKV, error) {
return NewKV(client, key, "", leaseID)
}
func NewKV(client *EtcdClient, key, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
func NewKV(client *clientv3.Client, key, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
rev, err := putNewKV(client, key, val, leaseID)
if err != nil {
return nil, err
@ -44,7 +45,7 @@ func NewKV(client *EtcdClient, key, val string, leaseID lease.LeaseID) (*RemoteK
return &RemoteKV{client, key, rev, val}, nil
}
func GetRemoteKV(client *EtcdClient, key string) (*RemoteKV, error) {
func GetRemoteKV(client *clientv3.Client, key string) (*RemoteKV, error) {
resp, err := client.KV.Range(
context.TODO(),
&pb.RangeRequest{Key: []byte(key)},
@ -65,11 +66,11 @@ func GetRemoteKV(client *EtcdClient, key string) (*RemoteKV, error) {
val: val}, nil
}
func NewUniqueKey(client *EtcdClient, prefix string) (*RemoteKV, error) {
func NewUniqueKey(client *clientv3.Client, prefix string) (*RemoteKV, error) {
return NewUniqueKV(client, prefix, "", 0)
}
func NewUniqueKV(client *EtcdClient, prefix string, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
func NewUniqueKV(client *clientv3.Client, prefix string, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
for {
newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
rev, err := putNewKV(client, newKey, val, 0)
@ -84,7 +85,7 @@ func NewUniqueKV(client *EtcdClient, prefix string, val string, leaseID lease.Le
// putNewKV attempts to create the given key, only succeeding if the key did
// not yet exist.
func putNewKV(ec *EtcdClient, key, val string, leaseID lease.LeaseID) (int64, error) {
func putNewKV(ec *clientv3.Client, key, val string, leaseID lease.LeaseID) (int64, error) {
cmp := &pb.Compare{
Result: pb.Compare_EQUAL,
Target: pb.Compare_VERSION,
@ -110,13 +111,13 @@ func putNewKV(ec *EtcdClient, key, val string, leaseID lease.LeaseID) (int64, er
}
// NewSequentialKV allocates a new sequential key-value pair at <prefix>/nnnnn
func NewSequentialKV(client *EtcdClient, prefix, val string) (*RemoteKV, error) {
func NewSequentialKV(client *clientv3.Client, prefix, val string) (*RemoteKV, error) {
return newSequentialKV(client, prefix, val, 0)
}
// newSequentialKV allocates a new sequential key <prefix>/nnnnn with a given
// value and lease. Note: a bookkeeping node __<prefix> is also allocated.
func newSequentialKV(client *EtcdClient, prefix, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
func newSequentialKV(client *clientv3.Client, prefix, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
resp, err := NewRange(client, prefix).LastKey()
if err != nil {
return nil, err

View File

@ -17,17 +17,18 @@ package recipe
import (
"sync"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/storage/storagepb"
)
// Mutex implements the sync Locker interface with etcd
type Mutex struct {
client *EtcdClient
client *clientv3.Client
key string
myKey *RemoteKV
}
func NewMutex(client *EtcdClient, key string) *Mutex {
func NewMutex(client *clientv3.Client, key string) *Mutex {
return &Mutex{client, key, nil}
}
@ -80,6 +81,6 @@ func (lm *lockerMutex) Unlock() {
}
}
func NewLocker(client *EtcdClient, key string) sync.Locker {
func NewLocker(client *clientv3.Client, key string) sync.Locker {
return &lockerMutex{NewMutex(client, key)}
}

View File

@ -17,17 +17,18 @@ package recipe
import (
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/storage/storagepb"
)
// PriorityQueue implements a multi-reader, multi-writer distributed queue.
type PriorityQueue struct {
client *EtcdClient
client *clientv3.Client
key string
}
// NewPriorityQueue creates an etcd priority queue.
func NewPriorityQueue(client *EtcdClient, key string) *PriorityQueue {
func NewPriorityQueue(client *clientv3.Client, key string) *PriorityQueue {
return &PriorityQueue{client, key + "/"}
}
@ -47,7 +48,7 @@ func (q *PriorityQueue) Dequeue() (string, error) {
return "", err
}
kv, err := q.client.claimFirstKey(resp.Kvs)
kv, err := claimFirstKey(q.client.KV, resp.Kvs)
if err != nil {
return "", err
} else if kv != nil {
@ -67,7 +68,7 @@ func (q *PriorityQueue) Dequeue() (string, error) {
return "", err
}
ok, err := q.client.deleteRevKey(string(ev.Kv.Key), ev.Kv.ModRevision)
ok, err := deleteRevKey(q.client.KV, string(ev.Kv.Key), ev.Kv.ModRevision)
if err != nil {
return "", err
} else if !ok {

View File

@ -15,16 +15,17 @@
package recipe
import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/storage/storagepb"
)
// Queue implements a multi-reader, multi-writer distributed queue.
type Queue struct {
client *EtcdClient
client *clientv3.Client
keyPrefix string
}
func NewQueue(client *EtcdClient, keyPrefix string) *Queue {
func NewQueue(client *clientv3.Client, keyPrefix string) *Queue {
return &Queue{client, keyPrefix}
}
@ -42,7 +43,7 @@ func (q *Queue) Dequeue() (string, error) {
return "", err
}
kv, err := q.client.claimFirstKey(resp.Kvs)
kv, err := claimFirstKey(q.client.KV, resp.Kvs)
if err != nil {
return "", err
} else if kv != nil {
@ -62,7 +63,7 @@ func (q *Queue) Dequeue() (string, error) {
return "", err
}
ok, err := q.client.deleteRevKey(string(ev.Kv.Key), ev.Kv.ModRevision)
ok, err := deleteRevKey(q.client.KV, string(ev.Kv.Key), ev.Kv.ModRevision)
if err != nil {
return "", err
} else if !ok {

View File

@ -16,6 +16,7 @@ package recipe
import (
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
@ -26,11 +27,11 @@ type Range struct {
keyEnd []byte
}
func NewRange(client *EtcdClient, key string) *Range {
func NewRange(client *clientv3.Client, key string) *Range {
return NewRangeRev(client, key, 0)
}
func NewRangeRev(client *EtcdClient, key string, rev int64) *Range {
func NewRangeRev(client *clientv3.Client, key string, rev int64) *Range {
return &Range{client.KV, []byte(key), rev, prefixEnd(key)}
}

View File

@ -15,16 +15,17 @@
package recipe
import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/storage/storagepb"
)
type RWMutex struct {
client *EtcdClient
client *clientv3.Client
key string
myKey *RemoteKV
}
func NewRWMutex(client *EtcdClient, key string) *RWMutex {
func NewRWMutex(client *clientv3.Client, key string) *RWMutex {
return &RWMutex{client, key, nil}
}

View File

@ -16,12 +16,13 @@ package recipe
import (
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
// STM implements software transactional memory over etcd
type STM struct {
client *EtcdClient
client *clientv3.Client
// rset holds the read key's value and revision of read
rset map[string]*RemoteKV
// wset holds the write key and its value
@ -32,7 +33,7 @@ type STM struct {
}
// NewSTM creates new transaction loop for a given apply function.
func NewSTM(client *EtcdClient, apply func(*STM) error) <-chan error {
func NewSTM(client *clientv3.Client, apply func(*STM) error) <-chan error {
s := &STM{client: client, apply: apply}
errc := make(chan error, 1)
go func() {

View File

@ -16,6 +16,7 @@ package recipe
import (
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/storage"
"github.com/coreos/etcd/storage/storagepb"
@ -30,15 +31,15 @@ type Watcher struct {
lastErr error
}
func NewWatcher(c *EtcdClient, key string, rev int64) (*Watcher, error) {
func NewWatcher(c *clientv3.Client, key string, rev int64) (*Watcher, error) {
return newWatcher(c, key, rev, false)
}
func NewPrefixWatcher(c *EtcdClient, prefix string, rev int64) (*Watcher, error) {
func NewPrefixWatcher(c *clientv3.Client, prefix string, rev int64) (*Watcher, error) {
return newWatcher(c, prefix, rev, true)
}
func newWatcher(c *EtcdClient, key string, rev int64, isPrefix bool) (*Watcher, error) {
func newWatcher(c *clientv3.Client, key string, rev int64, isPrefix bool) (*Watcher, error) {
ctx, cancel := context.WithCancel(context.Background())
w, err := c.Watch.Watch(ctx)
if err != nil {
@ -134,7 +135,7 @@ func (w *Watcher) waitEvents(evs []storagepb.Event_EventType) (*storagepb.Event,
}
// WaitEvents waits on a key until it observes the given events and returns the final one.
func WaitEvents(c *EtcdClient, key string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
func WaitEvents(c *clientv3.Client, key string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
w, err := NewWatcher(c, key, rev)
if err != nil {
return nil, err
@ -143,7 +144,7 @@ func WaitEvents(c *EtcdClient, key string, rev int64, evs []storagepb.Event_Even
return w.waitEvents(evs)
}
func WaitPrefixEvents(c *EtcdClient, prefix string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
w, err := NewPrefixWatcher(c, prefix, rev)
if err != nil {
return nil, err

View File

@ -20,7 +20,6 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
@ -44,17 +43,6 @@ func compactionCommandFunc(cmd *cobra.Command, args []string) {
ExitWithError(ExitError, err)
}
endpoint, err := cmd.Flags().GetString("endpoint")
if err != nil {
ExitWithError(ExitError, err)
}
// TODO: enable grpc.WithTransportCredentials(creds)
conn, err := grpc.Dial(endpoint, grpc.WithInsecure())
if err != nil {
ExitWithError(ExitBadConnection, err)
}
kv := pb.NewKVClient(conn)
req := &pb.CompactionRequest{Revision: rev}
kv.Compact(context.Background(), req)
mustClient(cmd).KV.Compact(context.Background(), req)
}

View File

@ -19,7 +19,6 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
@ -44,19 +43,8 @@ func deleteRangeCommandFunc(cmd *cobra.Command, args []string) {
rangeEnd = []byte(args[1])
}
endpoint, err := cmd.Flags().GetString("endpoint")
if err != nil {
ExitWithError(ExitError, err)
}
// TODO: enable grpc.WithTransportCredentials(creds)
conn, err := grpc.Dial(endpoint, grpc.WithInsecure())
if err != nil {
ExitWithError(ExitBadConnection, err)
}
kv := pb.NewKVClient(conn)
req := &pb.DeleteRangeRequest{Key: key, RangeEnd: rangeEnd}
kv.DeleteRange(context.Background(), req)
mustClient(cmd).KV.DeleteRange(context.Background(), req)
if rangeEnd != nil {
fmt.Printf("range [%s, %s) is deleted\n", string(key), string(rangeEnd))

View File

@ -14,8 +14,25 @@
package command
import (
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
"github.com/coreos/etcd/clientv3"
)
// GlobalFlags are flags that defined globally
// and are inherited to all sub-commands.
type GlobalFlags struct {
Endpoints string
}
func mustClient(cmd *cobra.Command) *clientv3.Client {
endpoint, err := cmd.Flags().GetString("endpoint")
if err != nil {
ExitWithError(ExitError, err)
}
client, err := clientv3.NewFromURL(endpoint)
if err != nil {
ExitWithError(ExitBadConnection, err)
}
return client
}

View File

@ -23,7 +23,6 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
@ -64,19 +63,8 @@ func leaseCreateCommandFunc(cmd *cobra.Command, args []string) {
ExitWithError(ExitBadArgs, fmt.Errorf("bad TTL (%v)", err))
}
endpoint, err := cmd.Flags().GetString("endpoint")
if err != nil {
ExitWithError(ExitError, err)
}
// TODO: enable grpc.WithTransportCredentials(creds)
conn, err := grpc.Dial(endpoint, grpc.WithInsecure())
if err != nil {
ExitWithError(ExitBadConnection, err)
}
lease := pb.NewLeaseClient(conn)
req := &pb.LeaseCreateRequest{TTL: ttl}
resp, err := lease.LeaseCreate(context.Background(), req)
resp, err := mustClient(cmd).Lease.LeaseCreate(context.Background(), req)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to create lease (%v)\n", err)
return
@ -107,19 +95,8 @@ func leaseRevokeCommandFunc(cmd *cobra.Command, args []string) {
ExitWithError(ExitBadArgs, fmt.Errorf("bad lease ID arg (%v), expecting ID in Hex", err))
}
endpoint, err := cmd.Flags().GetString("endpoint")
if err != nil {
ExitWithError(ExitError, err)
}
// TODO: enable grpc.WithTransportCredentials(creds)
conn, err := grpc.Dial(endpoint, grpc.WithInsecure())
if err != nil {
ExitWithError(ExitBadConnection, err)
}
lease := pb.NewLeaseClient(conn)
req := &pb.LeaseRevokeRequest{ID: id}
_, err = lease.LeaseRevoke(context.Background(), req)
_, err = mustClient(cmd).Lease.LeaseRevoke(context.Background(), req)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to revoke lease (%v)\n", err)
return
@ -150,17 +127,7 @@ func leaseKeepAliveCommandFunc(cmd *cobra.Command, args []string) {
ExitWithError(ExitBadArgs, fmt.Errorf("bad lease ID arg (%v), expecting ID in Hex", err))
}
endpoint, err := cmd.Flags().GetString("endpoint")
if err != nil {
ExitWithError(ExitError, err)
}
// TODO: enable grpc.WithTransportCredentials(creds)
conn, err := grpc.Dial(endpoint, grpc.WithInsecure())
if err != nil {
ExitWithError(ExitBadConnection, err)
}
lease := pb.NewLeaseClient(conn)
kStream, err := lease.LeaseKeepAlive(context.TODO())
kStream, err := mustClient(cmd).Lease.LeaseKeepAlive(context.TODO())
if err != nil {
ExitWithError(ExitBadConnection, err)
}

View File

@ -21,7 +21,6 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
@ -109,18 +108,8 @@ func memberAddCommandFunc(cmd *cobra.Command, args []string) {
urls := strings.Split(memberPeerURLs, ",")
endpoint, err := cmd.Flags().GetString("endpoint")
if err != nil {
ExitWithError(ExitError, err)
}
// TODO: enable grpc.WithTransportCredentials(creds)
conn, err := grpc.Dial(endpoint, grpc.WithInsecure())
if err != nil {
ExitWithError(ExitBadConnection, err)
}
mc := pb.NewClusterClient(conn)
resp, err := mc.MemberAdd(context.TODO(), &pb.MemberAddRequest{PeerURLs: urls})
req := &pb.MemberAddRequest{PeerURLs: urls}
resp, err := mustClient(cmd).Cluster.MemberAdd(context.TODO(), req)
if err != nil {
ExitWithError(ExitError, err)
}
@ -139,18 +128,8 @@ func memberRemoveCommandFunc(cmd *cobra.Command, args []string) {
ExitWithError(ExitBadArgs, fmt.Errorf("bad member ID arg (%v), expecting ID in Hex", err))
}
endpoint, err := cmd.Flags().GetString("endpoint")
if err != nil {
ExitWithError(ExitError, err)
}
// TODO: enable grpc.WithTransportCredentials(creds)
conn, err := grpc.Dial(endpoint, grpc.WithInsecure())
if err != nil {
ExitWithError(ExitBadConnection, err)
}
mc := pb.NewClusterClient(conn)
resp, err := mc.MemberRemove(context.TODO(), &pb.MemberRemoveRequest{ID: uint64(id)})
req := &pb.MemberRemoveRequest{ID: uint64(id)}
resp, err := mustClient(cmd).Cluster.MemberRemove(context.TODO(), req)
if err != nil {
ExitWithError(ExitError, err)
}
@ -175,18 +154,8 @@ func memberUpdateCommandFunc(cmd *cobra.Command, args []string) {
urls := strings.Split(memberPeerURLs, ",")
endpoint, err := cmd.Flags().GetString("endpoint")
if err != nil {
ExitWithError(ExitError, err)
}
// TODO: enable grpc.WithTransportCredentials(creds)
conn, err := grpc.Dial(endpoint, grpc.WithInsecure())
if err != nil {
ExitWithError(ExitBadConnection, err)
}
mc := pb.NewClusterClient(conn)
resp, err := mc.MemberUpdate(context.TODO(), &pb.MemberUpdateRequest{ID: uint64(id), PeerURLs: urls})
req := &pb.MemberUpdateRequest{ID: uint64(id), PeerURLs: urls}
resp, err := mustClient(cmd).Cluster.MemberUpdate(context.TODO(), req)
if err != nil {
ExitWithError(ExitError, err)
}
@ -196,18 +165,7 @@ func memberUpdateCommandFunc(cmd *cobra.Command, args []string) {
// memberListCommandFunc executes the "member list" command.
func memberListCommandFunc(cmd *cobra.Command, args []string) {
endpoint, err := cmd.Flags().GetString("endpoint")
if err != nil {
ExitWithError(ExitError, err)
}
// TODO: enable grpc.WithTransportCredentials(creds)
conn, err := grpc.Dial(endpoint, grpc.WithInsecure())
if err != nil {
ExitWithError(ExitBadConnection, err)
}
mc := pb.NewClusterClient(conn)
resp, err := mc.MemberList(context.TODO(), &pb.MemberListRequest{})
resp, err := mustClient(cmd).Cluster.MemberList(context.TODO(), &pb.MemberListRequest{})
if err != nil {
ExitWithError(ExitError, err)
}

View File

@ -20,7 +20,6 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
@ -62,18 +61,7 @@ func putCommandFunc(cmd *cobra.Command, args []string) {
key := []byte(args[0])
value := []byte(args[1])
endpoint, err := cmd.Flags().GetString("endpoint")
if err != nil {
ExitWithError(ExitError, err)
}
// TODO: enable grpc.WithTransportCredentials(creds)
conn, err := grpc.Dial(endpoint, grpc.WithInsecure())
if err != nil {
ExitWithError(ExitBadConnection, err)
}
kv := pb.NewKVClient(conn)
req := &pb.PutRequest{Key: key, Value: value, Lease: id}
kv.Put(context.Background(), req)
mustClient(cmd).KV.Put(context.Background(), req)
fmt.Printf("%s %s\n", key, value)
}

View File

@ -20,7 +20,6 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
@ -55,11 +54,6 @@ func rangeCommandFunc(cmd *cobra.Command, args []string) {
rangeEnd = []byte(args[1])
}
endpoint, err := cmd.Flags().GetString("endpoint")
if err != nil {
ExitWithError(ExitError, err)
}
sortByOrder := pb.RangeRequest_NONE
sortOrder := strings.ToUpper(rangeSortOrder)
switch {
@ -92,12 +86,6 @@ func rangeCommandFunc(cmd *cobra.Command, args []string) {
ExitWithError(ExitBadFeature, fmt.Errorf("bad sort target %v", rangeSortTarget))
}
// TODO: enable grpc.WithTransportCredentials(creds)
conn, err := grpc.Dial(endpoint, grpc.WithInsecure())
if err != nil {
ExitWithError(ExitBadConnection, err)
}
kv := pb.NewKVClient(conn)
req := &pb.RangeRequest{
Key: key,
RangeEnd: rangeEnd,
@ -105,7 +93,10 @@ func rangeCommandFunc(cmd *cobra.Command, args []string) {
SortTarget: sortByTarget,
Limit: int64(rangeLimit),
}
resp, err := kv.Range(context.Background(), req)
resp, err := mustClient(cmd).KV.Range(context.Background(), req)
if err != nil {
ExitWithError(ExitError, err)
}
for _, kv := range resp.Kvs {
fmt.Printf("%s %s\n", string(kv.Key), string(kv.Value))
}

View File

@ -23,7 +23,6 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
@ -50,18 +49,7 @@ func txnCommandFunc(cmd *cobra.Command, args []string) {
next = next(txn, reader)
}
endpoint, err := cmd.Flags().GetString("endpoint")
if err != nil {
ExitWithError(ExitError, err)
}
// TODO: enable grpc.WithTransportCredentials(creds)
conn, err := grpc.Dial(endpoint, grpc.WithInsecure())
if err != nil {
ExitWithError(ExitBadConnection, err)
}
kv := pb.NewKVClient(conn)
resp, err := kv.Txn(context.Background(), txn)
resp, err := mustClient(cmd).KV.Txn(context.Background(), txn)
if err != nil {
ExitWithError(ExitError, err)
}

View File

@ -24,7 +24,6 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
@ -39,18 +38,7 @@ func NewWatchCommand() *cobra.Command {
// watchCommandFunc executes the "watch" command.
func watchCommandFunc(cmd *cobra.Command, args []string) {
endpoint, err := cmd.Flags().GetString("endpoint")
if err != nil {
ExitWithError(ExitInvalidInput, err)
}
// TODO: enable grpc.WithTransportCredentials(creds)
conn, err := grpc.Dial(endpoint, grpc.WithInsecure())
if err != nil {
ExitWithError(ExitBadConnection, err)
}
wAPI := pb.NewWatchClient(conn)
wStream, err := wAPI.Watch(context.TODO())
wStream, err := mustClient(cmd).Watch.Watch(context.TODO())
if err != nil {
ExitWithError(ExitBadConnection, err)
}

View File

@ -32,6 +32,7 @@ import (
"time"
"github.com/coreos/etcd/client"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
"github.com/coreos/etcd/etcdserver/etcdhttp"
@ -729,8 +730,8 @@ func (m *member) listenGRPC() error {
return nil
}
// newGrpcClient creates a new grpc client connection to the member
func NewGRPCClient(m *member) (*grpc.ClientConn, error) {
// NewClientV3 creates a new grpc client connection to the member
func NewClientV3(m *member) (*clientv3.Client, error) {
if m.grpcAddr == "" {
return nil, fmt.Errorf("member not configured for grpc")
}
@ -738,7 +739,11 @@ func NewGRPCClient(m *member) (*grpc.ClientConn, error) {
return net.Dial("unix", a)
}
unixdialer := grpc.WithDialer(f)
return grpc.Dial(m.grpcAddr, grpc.WithInsecure(), unixdialer)
conn, err := grpc.Dial(m.grpcAddr, grpc.WithInsecure(), unixdialer)
if err != nil {
return nil, err
}
return clientv3.NewFromConn(conn), nil
}
// Clone returns a member with the same server configuration. The returned

View File

@ -17,27 +17,27 @@ import (
"testing"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/contrib/recipes"
"github.com/coreos/etcd/pkg/testutil"
)
func TestBarrierSingleNode(t *testing.T) {
defer testutil.AfterTest(t)
clus := newClusterGRPC(t, &clusterConfig{size: 3})
clus := newClusterV3(t, &clusterConfig{size: 3})
defer clus.Terminate(t)
testBarrier(t, 5, func() *grpc.ClientConn { return clus.conns[0] })
testBarrier(t, 5, func() *clientv3.Client { return clus.clients[0] })
}
func TestBarrierMultiNode(t *testing.T) {
defer testutil.AfterTest(t)
clus := newClusterGRPC(t, &clusterConfig{size: 3})
clus := newClusterV3(t, &clusterConfig{size: 3})
defer clus.Terminate(t)
testBarrier(t, 5, func() *grpc.ClientConn { return clus.RandConn() })
testBarrier(t, 5, func() *clientv3.Client { return clus.RandClient() })
}
func testBarrier(t *testing.T, waiters int, chooseConn func() *grpc.ClientConn) {
b := recipe.NewBarrier(recipe.NewEtcdClient(chooseConn()), "test-barrier")
func testBarrier(t *testing.T, waiters int, chooseClient func() *clientv3.Client) {
b := recipe.NewBarrier(chooseClient(), "test-barrier")
if err := b.Hold(); err != nil {
t.Fatalf("could not hold barrier (%v)", err)
}
@ -48,7 +48,7 @@ func testBarrier(t *testing.T, waiters int, chooseConn func() *grpc.ClientConn)
donec := make(chan struct{})
for i := 0; i < waiters; i++ {
go func() {
br := recipe.NewBarrier(recipe.NewEtcdClient(chooseConn()), "test-barrier")
br := recipe.NewBarrier(chooseClient(), "test-barrier")
if err := br.Wait(); err != nil {
t.Fatalf("could not wait on barrier (%v)", err)
}

View File

@ -24,7 +24,7 @@ import (
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/lease"
@ -34,47 +34,47 @@ import (
type clusterV3 struct {
*cluster
conns []*grpc.ClientConn
clients []*clientv3.Client
}
// newClusterGRPC returns a launched cluster with a grpc client connection
// newClusterV3 returns a launched cluster with a grpc client connection
// for each cluster member.
func newClusterGRPC(t *testing.T, cfg *clusterConfig) *clusterV3 {
func newClusterV3(t *testing.T, cfg *clusterConfig) *clusterV3 {
cfg.useV3 = true
cfg.useGRPC = true
clus := &clusterV3{cluster: NewClusterByConfig(t, cfg)}
for _, m := range clus.Members {
conn, err := NewGRPCClient(m)
client, err := NewClientV3(m)
if err != nil {
t.Fatal(err)
}
clus.conns = append(clus.conns, conn)
clus.clients = append(clus.clients, client)
}
clus.Launch(t)
return clus
}
func (c *clusterV3) Terminate(t *testing.T) {
for _, conn := range c.conns {
if err := conn.Close(); err != nil {
for _, client := range c.clients {
if err := client.Close(); err != nil {
t.Error(err)
}
}
c.cluster.Terminate(t)
}
func (c *clusterV3) RandConn() *grpc.ClientConn {
return c.conns[rand.Intn(len(c.conns))]
func (c *clusterV3) RandClient() *clientv3.Client {
return c.clients[rand.Intn(len(c.clients))]
}
// TestV3PutOverwrite puts a key with the v3 api to a random cluster member,
// overwrites it, then checks that the change was applied.
func TestV3PutOverwrite(t *testing.T) {
defer testutil.AfterTest(t)
clus := newClusterGRPC(t, &clusterConfig{size: 3})
clus := newClusterV3(t, &clusterConfig{size: 3})
defer clus.Terminate(t)
kvc := pb.NewKVClient(clus.RandConn())
kvc := clus.RandClient().KV
key := []byte("foo")
reqput := &pb.PutRequest{Key: key, Value: []byte("bar")}
@ -115,10 +115,10 @@ func TestV3PutOverwrite(t *testing.T) {
func TestV3TxnTooManyOps(t *testing.T) {
defer testutil.AfterTest(t)
clus := newClusterGRPC(t, &clusterConfig{size: 3})
clus := newClusterV3(t, &clusterConfig{size: 3})
defer clus.Terminate(t)
kvc := pb.NewKVClient(clus.RandConn())
kvc := clus.RandClient().KV
addCompareOps := func(txn *pb.TxnRequest) {
txn.Compare = append(txn.Compare,
@ -173,10 +173,10 @@ func TestV3TxnTooManyOps(t *testing.T) {
// TestV3PutMissingLease ensures that a Put on a key with a bogus lease fails.
func TestV3PutMissingLease(t *testing.T) {
defer testutil.AfterTest(t)
clus := newClusterGRPC(t, &clusterConfig{size: 3})
clus := newClusterV3(t, &clusterConfig{size: 3})
defer clus.Terminate(t)
kvc := pb.NewKVClient(clus.RandConn())
kvc := clus.RandClient().KV
key := []byte("foo")
preq := &pb.PutRequest{Key: key, Lease: 123456}
tests := []func(){
@ -290,8 +290,8 @@ func TestV3DeleteRange(t *testing.T) {
}
for i, tt := range tests {
clus := newClusterGRPC(t, &clusterConfig{size: 3})
kvc := pb.NewKVClient(clus.RandConn())
clus := newClusterV3(t, &clusterConfig{size: 3})
kvc := clus.RandClient().KV
ks := tt.keySet
for j := range ks {
@ -336,10 +336,10 @@ func TestV3DeleteRange(t *testing.T) {
// TestV3TxnInvaildRange tests txn
func TestV3TxnInvaildRange(t *testing.T) {
defer testutil.AfterTest(t)
clus := newClusterGRPC(t, &clusterConfig{size: 3})
clus := newClusterV3(t, &clusterConfig{size: 3})
defer clus.Terminate(t)
kvc := pb.NewKVClient(clus.RandConn())
kvc := clus.RandClient().KV
preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
for i := 0; i < 3; i++ {
@ -553,9 +553,9 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
}
for i, tt := range tests {
clus := newClusterGRPC(t, &clusterConfig{size: 3})
clus := newClusterV3(t, &clusterConfig{size: 3})
wAPI := pb.NewWatchClient(clus.RandConn())
wAPI := clus.RandClient().Watch
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wStream, err := wAPI.Watch(ctx)
@ -569,7 +569,7 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
go func() {
for _, k := range tt.putKeys {
kvc := pb.NewKVClient(clus.RandConn())
kvc := clus.RandClient().KV
req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
if _, err := kvc.Put(context.TODO(), req); err != nil {
t.Fatalf("#%d: couldn't put key (%v)", i, err)
@ -629,12 +629,11 @@ func TestV3WatchCancelUnsynced(t *testing.T) {
}
func testV3WatchCancel(t *testing.T, startRev int64) {
clus := newClusterGRPC(t, &clusterConfig{size: 3})
wAPI := pb.NewWatchClient(clus.RandConn())
clus := newClusterV3(t, &clusterConfig{size: 3})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wStream, errW := wAPI.Watch(ctx)
wStream, errW := clus.RandClient().Watch.Watch(ctx)
if errW != nil {
t.Fatalf("wAPI.Watch error: %v", errW)
}
@ -669,7 +668,7 @@ func testV3WatchCancel(t *testing.T, startRev int64) {
t.Errorf("cresp.Canceled got = %v, want = true", cresp.Canceled)
}
kvc := pb.NewKVClient(clus.RandConn())
kvc := clus.RandClient().KV
if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil {
t.Errorf("couldn't put key (%v)", err)
}
@ -698,13 +697,12 @@ func TestV3WatchMultipleWatchersUnsynced(t *testing.T) {
// that matches all watchers, and another key that matches only
// one watcher to test if it receives expected events.
func testV3WatchMultipleWatchers(t *testing.T, startRev int64) {
clus := newClusterGRPC(t, &clusterConfig{size: 3})
wAPI := pb.NewWatchClient(clus.RandConn())
kvc := pb.NewKVClient(clus.RandConn())
clus := newClusterV3(t, &clusterConfig{size: 3})
kvc := clus.RandClient().KV
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wStream, errW := wAPI.Watch(ctx)
wStream, errW := clus.RandClient().Watch.Watch(ctx)
if errW != nil {
t.Fatalf("wAPI.Watch error: %v", errW)
}
@ -801,12 +799,11 @@ func TestV3WatchMultipleEventsTxnUnsynced(t *testing.T) {
// testV3WatchMultipleEventsTxn tests Watch APIs when it receives multiple events.
func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) {
clus := newClusterGRPC(t, &clusterConfig{size: 3})
clus := newClusterV3(t, &clusterConfig{size: 3})
wAPI := pb.NewWatchClient(clus.RandConn())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wStream, wErr := wAPI.Watch(ctx)
wStream, wErr := clus.RandClient().Watch.Watch(ctx)
if wErr != nil {
t.Fatalf("wAPI.Watch error: %v", wErr)
}
@ -818,7 +815,7 @@ func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) {
t.Fatalf("wStream.Send error: %v", err)
}
kvc := pb.NewKVClient(clus.RandConn())
kvc := clus.RandClient().KV
txn := pb.TxnRequest{}
for i := 0; i < 3; i++ {
ru := &pb.RequestUnion{}
@ -885,10 +882,10 @@ func (evs eventsSortByKey) Less(i, j int) bool { return bytes.Compare(evs[i].Kv.
func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) {
defer testutil.AfterTest(t)
clus := newClusterGRPC(t, &clusterConfig{size: 3})
clus := newClusterV3(t, &clusterConfig{size: 3})
defer clus.Terminate(t)
kvc := pb.NewKVClient(clus.RandConn())
kvc := clus.RandClient().KV
if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo0"), Value: []byte("bar")}); err != nil {
t.Fatalf("couldn't put key (%v)", err)
@ -897,10 +894,9 @@ func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) {
t.Fatalf("couldn't put key (%v)", err)
}
wAPI := pb.NewWatchClient(clus.RandConn())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wStream, wErr := wAPI.Watch(ctx)
wStream, wErr := clus.RandClient().Watch.Watch(ctx)
if wErr != nil {
t.Fatalf("wAPI.Watch error: %v", wErr)
}
@ -975,9 +971,9 @@ func TestV3WatchMultipleStreamsUnsynced(t *testing.T) {
// testV3WatchMultipleStreams tests multiple watchers on the same key on multiple streams.
func testV3WatchMultipleStreams(t *testing.T, startRev int64) {
clus := newClusterGRPC(t, &clusterConfig{size: 3})
wAPI := pb.NewWatchClient(clus.RandConn())
kvc := pb.NewKVClient(clus.RandConn())
clus := newClusterV3(t, &clusterConfig{size: 3})
wAPI := clus.RandClient().Watch
kvc := clus.RandClient().KV
streams := make([]pb.Watch_WatchClient, 5)
for i := range streams {
@ -1199,9 +1195,9 @@ func TestV3RangeRequest(t *testing.T) {
}
for i, tt := range tests {
clus := newClusterGRPC(t, &clusterConfig{size: 3})
clus := newClusterV3(t, &clusterConfig{size: 3})
for _, k := range tt.putKeys {
kvc := pb.NewKVClient(clus.RandConn())
kvc := clus.RandClient().KV
req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
if _, err := kvc.Put(context.TODO(), req); err != nil {
t.Fatalf("#%d: couldn't put key (%v)", i, err)
@ -1209,7 +1205,7 @@ func TestV3RangeRequest(t *testing.T) {
}
for j, req := range tt.reqs {
kvc := pb.NewKVClient(clus.RandConn())
kvc := clus.RandClient().KV
resp, err := kvc.Range(context.TODO(), &req)
if err != nil {
t.Errorf("#%d.%d: Range error: %v", i, j, err)
@ -1244,7 +1240,7 @@ func TestV3RangeRequest(t *testing.T) {
func TestV3LeaseRevoke(t *testing.T) {
defer testutil.AfterTest(t)
testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error {
lc := pb.NewLeaseClient(clus.RandConn())
lc := clus.RandClient().Lease
_, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID})
return err
})
@ -1253,11 +1249,11 @@ func TestV3LeaseRevoke(t *testing.T) {
// TestV3LeaseCreateById ensures leases may be created by a given id.
func TestV3LeaseCreateByID(t *testing.T) {
defer testutil.AfterTest(t)
clus := newClusterGRPC(t, &clusterConfig{size: 3})
clus := newClusterV3(t, &clusterConfig{size: 3})
defer clus.Terminate(t)
// create fixed lease
lresp, err := pb.NewLeaseClient(clus.RandConn()).LeaseCreate(
lresp, err := clus.RandClient().Lease.LeaseCreate(
context.TODO(),
&pb.LeaseCreateRequest{ID: 1, TTL: 1})
if err != nil {
@ -1268,7 +1264,7 @@ func TestV3LeaseCreateByID(t *testing.T) {
}
// create duplicate fixed lease
lresp, err = pb.NewLeaseClient(clus.RandConn()).LeaseCreate(
lresp, err = clus.RandClient().Lease.LeaseCreate(
context.TODO(),
&pb.LeaseCreateRequest{ID: 1, TTL: 1})
if err != nil {
@ -1279,7 +1275,7 @@ func TestV3LeaseCreateByID(t *testing.T) {
}
// create fresh fixed lease
lresp, err = pb.NewLeaseClient(clus.RandConn()).LeaseCreate(
lresp, err = clus.RandClient().Lease.LeaseCreate(
context.TODO(),
&pb.LeaseCreateRequest{ID: 2, TTL: 1})
if err != nil {
@ -1297,10 +1293,9 @@ func TestV3LeaseExpire(t *testing.T) {
testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error {
// let lease lapse; wait for deleted key
wAPI := pb.NewWatchClient(clus.RandConn())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wStream, err := wAPI.Watch(ctx)
wStream, err := clus.RandClient().Watch.Watch(ctx)
if err != nil {
return err
}
@ -1348,7 +1343,7 @@ func TestV3LeaseExpire(t *testing.T) {
func TestV3LeaseKeepAlive(t *testing.T) {
defer testutil.AfterTest(t)
testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error {
lc := pb.NewLeaseClient(clus.RandConn())
lc := clus.RandClient().Lease
lreq := &pb.LeaseKeepAliveRequest{ID: leaseID}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -1381,13 +1376,13 @@ func TestV3LeaseKeepAlive(t *testing.T) {
// client to confirm it's visible to the whole cluster.
func TestV3LeaseExists(t *testing.T) {
defer testutil.AfterTest(t)
clus := newClusterGRPC(t, &clusterConfig{size: 3})
clus := newClusterV3(t, &clusterConfig{size: 3})
defer clus.Terminate(t)
// create lease
ctx0, cancel0 := context.WithCancel(context.Background())
defer cancel0()
lresp, err := pb.NewLeaseClient(clus.RandConn()).LeaseCreate(
lresp, err := clus.RandClient().Lease.LeaseCreate(
ctx0,
&pb.LeaseCreateRequest{TTL: 30})
if err != nil {
@ -1400,7 +1395,7 @@ func TestV3LeaseExists(t *testing.T) {
// confirm keepalive
ctx1, cancel1 := context.WithCancel(context.Background())
defer cancel1()
lac, err := pb.NewLeaseClient(clus.RandConn()).LeaseKeepAlive(ctx1)
lac, err := clus.RandClient().Lease.LeaseKeepAlive(ctx1)
if err != nil {
t.Fatal(err)
}
@ -1416,7 +1411,7 @@ func TestV3LeaseExists(t *testing.T) {
// acquireLeaseAndKey creates a new lease and creates an attached key.
func acquireLeaseAndKey(clus *clusterV3, key string) (int64, error) {
// create lease
lresp, err := pb.NewLeaseClient(clus.RandConn()).LeaseCreate(
lresp, err := clus.RandClient().Lease.LeaseCreate(
context.TODO(),
&pb.LeaseCreateRequest{TTL: 1})
if err != nil {
@ -1427,7 +1422,7 @@ func acquireLeaseAndKey(clus *clusterV3, key string) (int64, error) {
}
// attach to key
put := &pb.PutRequest{Key: []byte(key), Lease: lresp.ID}
if _, err := pb.NewKVClient(clus.RandConn()).Put(context.TODO(), put); err != nil {
if _, err := clus.RandClient().KV.Put(context.TODO(), put); err != nil {
return 0, err
}
return lresp.ID, nil
@ -1436,7 +1431,7 @@ func acquireLeaseAndKey(clus *clusterV3, key string) (int64, error) {
// testLeaseRemoveLeasedKey performs some action while holding a lease with an
// attached key "foo", then confirms the key is gone.
func testLeaseRemoveLeasedKey(t *testing.T, act func(*clusterV3, int64) error) {
clus := newClusterGRPC(t, &clusterConfig{size: 3})
clus := newClusterV3(t, &clusterConfig{size: 3})
defer clus.Terminate(t)
leaseID, err := acquireLeaseAndKey(clus, "foo")
@ -1450,7 +1445,7 @@ func testLeaseRemoveLeasedKey(t *testing.T, act func(*clusterV3, int64) error) {
// confirm no key
rreq := &pb.RangeRequest{Key: []byte("foo")}
rresp, err := pb.NewKVClient(clus.RandConn()).Range(context.TODO(), rreq)
rresp, err := clus.RandClient().KV.Range(context.TODO(), rreq)
if err != nil {
t.Fatal(err)
}

View File

@ -18,28 +18,28 @@ import (
"testing"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/contrib/recipes"
)
func TestMutexSingleNode(t *testing.T) {
clus := newClusterGRPC(t, &clusterConfig{size: 3})
clus := newClusterV3(t, &clusterConfig{size: 3})
defer clus.Terminate(t)
testMutex(t, 5, func() *grpc.ClientConn { return clus.conns[0] })
testMutex(t, 5, func() *clientv3.Client { return clus.clients[0] })
}
func TestMutexMultiNode(t *testing.T) {
clus := newClusterGRPC(t, &clusterConfig{size: 3})
clus := newClusterV3(t, &clusterConfig{size: 3})
defer clus.Terminate(t)
testMutex(t, 5, func() *grpc.ClientConn { return clus.RandConn() })
testMutex(t, 5, func() *clientv3.Client { return clus.RandClient() })
}
func testMutex(t *testing.T, waiters int, chooseConn func() *grpc.ClientConn) {
func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client) {
// stream lock acquistions
lockedC := make(chan *recipe.Mutex, 1)
for i := 0; i < waiters; i++ {
go func() {
m := recipe.NewMutex(recipe.NewEtcdClient(chooseConn()), "test-mutex")
m := recipe.NewMutex(chooseClient(), "test-mutex")
if err := m.Lock(); err != nil {
t.Fatalf("could not wait on lock (%v)", err)
}
@ -68,32 +68,32 @@ func testMutex(t *testing.T, waiters int, chooseConn func() *grpc.ClientConn) {
func BenchmarkMutex4Waiters(b *testing.B) {
// XXX switch tests to use TB interface
clus := newClusterGRPC(nil, &clusterConfig{size: 3})
clus := newClusterV3(nil, &clusterConfig{size: 3})
defer clus.Terminate(nil)
for i := 0; i < b.N; i++ {
testMutex(nil, 4, func() *grpc.ClientConn { return clus.RandConn() })
testMutex(nil, 4, func() *clientv3.Client { return clus.RandClient() })
}
}
func TestRWMutexSingleNode(t *testing.T) {
clus := newClusterGRPC(t, &clusterConfig{size: 3})
clus := newClusterV3(t, &clusterConfig{size: 3})
defer clus.Terminate(t)
testRWMutex(t, 5, func() *grpc.ClientConn { return clus.conns[0] })
testRWMutex(t, 5, func() *clientv3.Client { return clus.clients[0] })
}
func TestRWMutexMultiNode(t *testing.T) {
clus := newClusterGRPC(t, &clusterConfig{size: 3})
clus := newClusterV3(t, &clusterConfig{size: 3})
defer clus.Terminate(t)
testRWMutex(t, 5, func() *grpc.ClientConn { return clus.RandConn() })
testRWMutex(t, 5, func() *clientv3.Client { return clus.RandClient() })
}
func testRWMutex(t *testing.T, waiters int, chooseConn func() *grpc.ClientConn) {
func testRWMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client) {
// stream rwlock acquistions
rlockedC := make(chan *recipe.RWMutex, 1)
wlockedC := make(chan *recipe.RWMutex, 1)
for i := 0; i < waiters; i++ {
go func() {
rwm := recipe.NewRWMutex(recipe.NewEtcdClient(chooseConn()), "test-rwmutex")
rwm := recipe.NewRWMutex(chooseClient(), "test-rwmutex")
if rand.Intn(1) == 0 {
if err := rwm.RLock(); err != nil {
t.Fatalf("could not rlock (%v)", err)

View File

@ -29,7 +29,7 @@ const (
// TestQueueOneReaderOneWriter confirms the queue is FIFO
func TestQueueOneReaderOneWriter(t *testing.T) {
clus := newClusterGRPC(t, &clusterConfig{size: 1})
clus := newClusterV3(t, &clusterConfig{size: 1})
defer clus.Terminate(t)
done := make(chan struct{})
@ -37,7 +37,7 @@ func TestQueueOneReaderOneWriter(t *testing.T) {
defer func() {
done <- struct{}{}
}()
etcdc := recipe.NewEtcdClient(clus.RandConn())
etcdc := clus.RandClient()
q := recipe.NewQueue(etcdc, "testq")
for i := 0; i < 5; i++ {
if err := q.Enqueue(fmt.Sprintf("%d", i)); err != nil {
@ -46,7 +46,7 @@ func TestQueueOneReaderOneWriter(t *testing.T) {
}
}()
etcdc := recipe.NewEtcdClient(clus.RandConn())
etcdc := clus.RandClient()
q := recipe.NewQueue(etcdc, "testq")
for i := 0; i < 5; i++ {
s, err := q.Dequeue()
@ -75,7 +75,7 @@ func TestQueueManyReaderManyWriter(t *testing.T) {
// BenchmarkQueue benchmarks Queues using many/many readers/writers
func BenchmarkQueue(b *testing.B) {
// XXX switch tests to use TB interface
clus := newClusterGRPC(nil, &clusterConfig{size: 3})
clus := newClusterV3(nil, &clusterConfig{size: 3})
defer clus.Terminate(nil)
for i := 0; i < b.N; i++ {
testQueueNReaderMWriter(nil, manyQueueClients, manyQueueClients)
@ -84,11 +84,11 @@ func BenchmarkQueue(b *testing.B) {
// TestPrQueue tests whether priority queues respect priorities.
func TestPrQueueOneReaderOneWriter(t *testing.T) {
clus := newClusterGRPC(t, &clusterConfig{size: 1})
clus := newClusterV3(t, &clusterConfig{size: 1})
defer clus.Terminate(t)
// write out five items with random priority
etcdc := recipe.NewEtcdClient(clus.RandConn())
etcdc := clus.RandClient()
q := recipe.NewPriorityQueue(etcdc, "testprq")
for i := 0; i < 5; i++ {
// [0, 2] priority for priority collision to test seq keys
@ -116,7 +116,7 @@ func TestPrQueueOneReaderOneWriter(t *testing.T) {
}
func TestPrQueueManyReaderManyWriter(t *testing.T) {
clus := newClusterGRPC(t, &clusterConfig{size: 3})
clus := newClusterV3(t, &clusterConfig{size: 3})
defer clus.Terminate(t)
rqs := newPriorityQueues(clus, manyQueueClients)
wqs := newPriorityQueues(clus, manyQueueClients)
@ -126,7 +126,7 @@ func TestPrQueueManyReaderManyWriter(t *testing.T) {
// BenchmarkQueue benchmarks Queues using n/n readers/writers
func BenchmarkPrQueueOneReaderOneWriter(b *testing.B) {
// XXX switch tests to use TB interface
clus := newClusterGRPC(nil, &clusterConfig{size: 3})
clus := newClusterV3(nil, &clusterConfig{size: 3})
defer clus.Terminate(nil)
rqs := newPriorityQueues(clus, 1)
wqs := newPriorityQueues(clus, 1)
@ -136,14 +136,14 @@ func BenchmarkPrQueueOneReaderOneWriter(b *testing.B) {
}
func testQueueNReaderMWriter(t *testing.T, n int, m int) {
clus := newClusterGRPC(t, &clusterConfig{size: 3})
clus := newClusterV3(t, &clusterConfig{size: 3})
defer clus.Terminate(t)
testReadersWriters(t, newQueues(clus, n), newQueues(clus, m))
}
func newQueues(clus *clusterV3, n int) (qs []testQueue) {
for i := 0; i < n; i++ {
etcdc := recipe.NewEtcdClient(clus.RandConn())
etcdc := clus.RandClient()
qs = append(qs, recipe.NewQueue(etcdc, "q"))
}
return qs
@ -151,7 +151,7 @@ func newQueues(clus *clusterV3, n int) (qs []testQueue) {
func newPriorityQueues(clus *clusterV3, n int) (qs []testQueue) {
for i := 0; i < n; i++ {
etcdc := recipe.NewEtcdClient(clus.RandConn())
etcdc := clus.RandClient()
q := &flatPriorityQueue{recipe.NewPriorityQueue(etcdc, "prq")}
qs = append(qs, q)
}

View File

@ -24,10 +24,10 @@ import (
// TestSTMConflict tests that conflicts are retried.
func TestSTMConflict(t *testing.T) {
clus := newClusterGRPC(t, &clusterConfig{size: 3})
clus := newClusterV3(t, &clusterConfig{size: 3})
defer clus.Terminate(t)
etcdc := recipe.NewEtcdClient(clus.RandConn())
etcdc := clus.RandClient()
keys := make([]*recipe.RemoteKV, 5)
for i := 0; i < len(keys); i++ {
rk, err := recipe.NewKV(etcdc, fmt.Sprintf("foo-%d", i), "100", 0)
@ -39,7 +39,7 @@ func TestSTMConflict(t *testing.T) {
errc := make([]<-chan error, len(keys))
for i, rk := range keys {
curEtcdc := recipe.NewEtcdClient(clus.RandConn())
curEtcdc := clus.RandClient()
srcKey := rk.Key()
applyf := func(stm *recipe.STM) error {
src, err := stm.Get(srcKey)
@ -89,10 +89,10 @@ func TestSTMConflict(t *testing.T) {
// TestSTMPut confirms a STM put on a new key is visible after commit.
func TestSTMPutNewKey(t *testing.T) {
clus := newClusterGRPC(t, &clusterConfig{size: 1})
clus := newClusterV3(t, &clusterConfig{size: 1})
defer clus.Terminate(t)
etcdc := recipe.NewEtcdClient(clus.RandConn())
etcdc := clus.RandClient()
applyf := func(stm *recipe.STM) error {
stm.Put("foo", "bar")
return nil
@ -113,10 +113,10 @@ func TestSTMPutNewKey(t *testing.T) {
// TestSTMAbort tests that an aborted txn does not modify any keys.
func TestSTMAbort(t *testing.T) {
clus := newClusterGRPC(t, &clusterConfig{size: 1})
clus := newClusterV3(t, &clusterConfig{size: 1})
defer clus.Terminate(t)
etcdc := recipe.NewEtcdClient(clus.RandConn())
etcdc := clus.RandClient()
applyf := func(stm *recipe.STM) error {
stm.Put("foo", "baz")
stm.Abort()

View File

@ -24,7 +24,6 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/cheggaaa/pb"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
)
@ -67,22 +66,14 @@ func putFunc(cmd *cobra.Command, args []string) {
k, v := make([]byte, keySize), mustRandBytes(valSize)
conns := make([]*grpc.ClientConn, totalConns)
for i := range conns {
conns[i] = mustCreateConn()
}
clients := make([]etcdserverpb.KVClient, totalClients)
for i := range clients {
clients[i] = etcdserverpb.NewKVClient(conns[i%int(totalConns)])
}
clients := mustCreateClients(totalClients, totalConns)
bar.Format("Bom !")
bar.Start()
for i := range clients {
wg.Add(1)
go doPut(context.Background(), clients[i], requests)
go doPut(context.Background(), clients[i].KV, requests)
}
pdoneC := printReport(results)

View File

@ -22,7 +22,6 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/cheggaaa/pb"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
)
@ -59,22 +58,14 @@ func rangeFunc(cmd *cobra.Command, args []string) {
requests := make(chan etcdserverpb.RangeRequest, totalClients)
bar = pb.New(rangeTotal)
conns := make([]*grpc.ClientConn, totalConns)
for i := range conns {
conns[i] = mustCreateConn()
}
clients := make([]etcdserverpb.KVClient, totalClients)
for i := range clients {
clients[i] = etcdserverpb.NewKVClient(conns[i%int(totalConns)])
}
clients := mustCreateClients(totalClients, totalConns)
bar.Format("Bom !")
bar.Start()
for i := range clients {
wg.Add(1)
go doRange(clients[i], requests)
go doRange(clients[i].KV, requests)
}
pdoneC := printReport(results)

View File

@ -20,7 +20,7 @@ import (
"os"
"strings"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
"github.com/coreos/etcd/clientv3"
)
var (
@ -29,16 +29,29 @@ var (
dialTotal int
)
func mustCreateConn() *grpc.ClientConn {
func mustCreateConn() *clientv3.Client {
eps := strings.Split(endpoints, ",")
endpoint := eps[dialTotal%len(eps)]
dialTotal++
conn, err := grpc.Dial(endpoint, grpc.WithInsecure())
client, err := clientv3.NewFromURL(endpoint)
if err != nil {
fmt.Fprintf(os.Stderr, "dial error: %v\n", err)
os.Exit(1)
}
return conn
return client
}
func mustCreateClients(totalClients, totalConns uint) []*clientv3.Client {
conns := make([]*clientv3.Client, totalConns)
for i := range conns {
conns[i] = mustCreateConn()
}
clients := make([]*clientv3.Client, totalClients)
for i := range clients {
clients[i] = conns[i%int(totalConns)].Clone()
}
return clients
}
func mustRandBytes(n int) []byte {

View File

@ -24,7 +24,6 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/cheggaaa/pb"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
)
// watchCmd represents the watch command
@ -72,20 +71,12 @@ func watchFunc(cmd *cobra.Command, args []string) {
requests := make(chan etcdserverpb.WatchRequest, totalClients)
conns := make([]*grpc.ClientConn, totalConns)
for i := range conns {
conns[i] = mustCreateConn()
}
clients := make([]etcdserverpb.WatchClient, totalClients)
for i := range clients {
clients[i] = etcdserverpb.NewWatchClient(conns[i%int(totalConns)])
}
clients := mustCreateClients(totalClients, totalConns)
streams := make([]etcdserverpb.Watch_WatchClient, watchTotalStreams)
var err error
for i := range streams {
streams[i], err = clients[i%int(totalClients)].Watch(context.TODO())
streams[i], err = clients[i%len(clients)].Watch.Watch(context.TODO())
if err != nil {
fmt.Fprintln(os.Stderr, "Failed to create watch stream:", err)
os.Exit(1)
@ -124,7 +115,6 @@ func watchFunc(cmd *cobra.Command, args []string) {
<-pdoneC
// put phase
kv := etcdserverpb.NewKVClient(conns[0])
// total number of puts * number of watchers on each key
eventsTotal := watchPutTotal * (watchTotal / watchedKeyTotal)
@ -138,7 +128,7 @@ func watchFunc(cmd *cobra.Command, args []string) {
for i := 0; i < watchPutTotal; i++ {
wg.Add(1)
go doPut(context.TODO(), kv, putreqc)
go doPut(context.TODO(), clients[i%len(clients)].KV, putreqc)
}
pdoneC = printRate(results)