Merge pull request #6632 from heyitsanthony/grpc-naming

clientv3/naming: support resolving to multiple hosts
Anthony Romano 2016-10-12 13:18:36 -07:00 committed by GitHub
commit 546873f27e
4 changed files with 201 additions and 63 deletions

View File

@ -0,0 +1,65 @@
# gRPC naming and discovery
etcd provides a gRPC resolver to support an alternative name system that fetches endpoints from etcd for discovering gRPC services. The underlying mechanism is based on watching updates to keys prefixed with the service name.
## Using etcd discovery with go-grpc
The etcd client provides a gRPC resolver for resolving gRPC endpoints with an etcd backend. The resolver is initialized with an etcd client and given a target for resolution:
import (
etcdnaming ""
cli, cerr := clientv3.NewFromURL("http://localhost:2379")
r := &etcdnaming.GRPCResolver{Client: cli}
b := grpc.RoundRobin(r)
conn, gerr := grpc.Dial("my-service", grpc.WithBalancer(b))
## Managing service endpoints
The etcd resolver treats all keys under the prefix of the resolution target following a "/" (e.g., "my-service/") with JSON-encoded go-grpc `naming.Update` values as potential service endpoints. Endpoints are added to the service by creating new keys and removed from the service by deleting keys.
### Adding an endpoint
New endpoints can be added to the service through `etcdctl`:
ETCDCTL_API=3 etcdctl put my-service/ '{"Addr":"","Metadata":"..."}'
The etcd client's `GRPCResolver.Update` method can also register new endpoints with a key matching the `Addr`:
r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Add, Addr: "", Metadata: "..."})
### Deleting an endpoint
Hosts can be deleted from the service through `etcdctl`:
ETCDCTL_API=3 etcdctl del my-service/
The etcd client's `GRPCResolver.Update` method also supports deleting endpoints:
r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Delete, Addr: ""})
### Registering an endpoint with a lease
Registering an endpoint with a lease ensures that if the host can't maintain a keepalive heartbeat (e.g., its machine fails), it will be removed from the service:
lease=`ETCDCTL_API=3 etcdctl lease grant 5 | cut -f2 -d' '`
ETCDCTL_API=3 etcdctl put --lease=$lease my-service/ '{"Addr":"","Metadata":"..."}'
ETCDCTL_API=3 etcdctl lease keep-alive $lease

View File

@ -14,6 +14,7 @@ The easiest way to get started using etcd as a distributed key-value store is to
- [Interacting with etcd][interacting]
- [API references][api_ref]
- [gRPC gateway][api_grpc_gateway]
- [gRPC naming and discovery][grpc_naming]
- [Embedding etcd][embed_etcd]
- [Experimental features and APIs][experimental]
@ -60,6 +61,7 @@ To learn more about the concepts and internals behind etcd, read the following p
[grpc_naming]: dev-guide/
[failures]: op-guide/
[gateway]: op-guide/
[glossary]: learning/

View File

@ -16,99 +16,112 @@ package naming
import (
etcd ""
const (
gRPCNamingPrefix = "/"
// GRPCResolver creates a grpc.Watcher for a target to track its resolution changes.
type GRPCResolver struct {
// Client is an initialized etcd client
Client *clientv3.Client
// Timeout for update/delete request.
Timeout time.Duration
// Client is an initialized etcd client.
Client *etcd.Client
func (gr *GRPCResolver) Add(target string, addr string, metadata interface{}) error {
update := naming.Update{
Addr: addr,
Metadata: metadata,
func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update) (err error) {
switch nm.Op {
case naming.Add:
var v []byte
if v, err = json.Marshal(nm); err != nil {
return grpc.Errorf(codes.InvalidArgument, err.Error())
_, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v))
case naming.Delete:
_, err = gr.Client.Delete(ctx, target+"/"+nm.Addr)
return grpc.Errorf(codes.InvalidArgument, "naming: bad naming op")
val, err := json.Marshal(update)
if err != nil {
return err
ctx := context.Background()
if gr.Timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), gr.Timeout)
defer cancel()
_, err = gr.Client.KV.Put(ctx, gRPCNamingPrefix+target, string(val))
return err
func (gr *GRPCResolver) Delete(target string) error {
ctx := context.Background()
if gr.Timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), gr.Timeout)
defer cancel()
_, err := gr.Client.Delete(ctx, gRPCNamingPrefix+target)
return err
func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) {
cctx, cancel := context.WithCancel(context.Background())
wch := gr.Client.Watch(cctx, gRPCNamingPrefix+target)
w := &gRPCWatcher{
cancel: cancel,
wch: wch,
ctx, cancel := context.WithCancel(context.Background())
w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel}
return w, nil
type gRPCWatcher struct {
c *etcd.Client
target string
ctx context.Context
cancel context.CancelFunc
wch clientv3.WatchChan
wch etcd.WatchChan
err error
// Next gets the next set of updates from the etcd resolver.
// Calls to Next should be serialized; concurrent calls are not safe since
// there is no way to reconcile the update ordering.
func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
if gw.wch == nil {
// first Next() returns all addresses
return gw.firstNext()
if gw.err != nil {
return nil, gw.err
// process new events on target/*
wr, ok := <-gw.wch
if !ok {
return nil, wr.Err()
gw.err = grpc.Errorf(codes.Unavailable, "naming: watch closed")
return nil, gw.err
if gw.err = wr.Err(); gw.err != nil {
return nil, gw.err
updates := make([]*naming.Update, 0, len(wr.Events))
for _, e := range wr.Events {
var jupdate naming.Update
var err error
switch e.Type {
case mvccpb.PUT:
var jupdate naming.Update
err := json.Unmarshal(e.Kv.Value, &jupdate)
if err != nil {
case etcd.EventTypePut:
err = json.Unmarshal(e.Kv.Value, &jupdate)
jupdate.Op = naming.Add
case etcd.EventTypeDelete:
err = json.Unmarshal(e.PrevKv.Value, &jupdate)
jupdate.Op = naming.Delete
if err == nil {
updates = append(updates, &jupdate)
case mvccpb.DELETE:
updates = append(updates, &naming.Update{Op: naming.Delete})
return updates, nil
func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
// Use serialized request so resolution still works if the target etcd
// server is partitioned away from the quorum.
resp, err := gw.c.Get(gw.ctx,, etcd.WithPrefix(), etcd.WithSerializable())
if gw.err = err; err != nil {
return nil, err
updates := make([]*naming.Update, 0, len(resp.Kvs))
for _, kv := range resp.Kvs {
var jupdate naming.Update
if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
updates = append(updates, &jupdate)
opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
gw.wch = gw.c.Watch(gw.ctx,, opts...)
return updates, nil

View File

@ -15,11 +15,14 @@
package naming
import (
etcd ""
@ -40,7 +43,8 @@ func TestGRPCResolver(t *testing.T) {
defer w.Close()
err = r.Add("foo", "", "metadata")
addOp := naming.Update{Op: naming.Add, Addr: "", Metadata: "metadata"}
err = r.Update(context.TODO(), "foo", addOp)
if err != nil {
t.Fatal("failed to add foo", err)
@ -60,7 +64,8 @@ func TestGRPCResolver(t *testing.T) {
t.Fatalf("up = %#v, want %#v", us[0], wu)
err = r.Delete("foo")
delOp := naming.Update{Op: naming.Delete, Addr: ""}
err = r.Update(context.TODO(), "foo", delOp)
us, err = w.Next()
if err != nil {
@ -68,10 +73,63 @@ func TestGRPCResolver(t *testing.T) {
wu = &naming.Update{
Op: naming.Delete,
Op: naming.Delete,
Addr: "",
Metadata: "metadata",
if !reflect.DeepEqual(us[0], wu) {
t.Fatalf("up = %#v, want %#v", us[0], wu)
// TestGRPCResolverMultiInit ensures the resolver will initialize
// correctly with multiple hosts and correctly receive multiple
// updates in a single revision.
func TestGRPCResolverMulti(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
c := clus.RandClient()
v, verr := json.Marshal(naming.Update{Addr: "", Metadata: "md"})
if verr != nil {
if _, err := c.Put(context.TODO(), "foo/host", string(v)); err != nil {
if _, err := c.Put(context.TODO(), "foo/host2", string(v)); err != nil {
r := GRPCResolver{c}
w, err := r.Resolve("foo")
if err != nil {
t.Fatal("failed to resolve foo", err)
defer w.Close()
updates, nerr := w.Next()
if nerr != nil {
if len(updates) != 2 {
t.Fatalf("expected two updates, got %+v", updates)
_, err = c.Txn(context.TODO()).Then(etcd.OpDelete("foo/host"), etcd.OpDelete("foo/host2")).Commit()
if err != nil {
updates, nerr = w.Next()
if nerr != nil {
if len(updates) != 2 || (updates[0].Op != naming.Delete && updates[1].Op != naming.Delete) {
t.Fatalf("expected two updates, got %+v", updates)