diff --git a/clientv3/naming/grpc.go b/clientv3/naming/grpc.go index 100899ea0..0f9973a15 100644 --- a/clientv3/naming/grpc.go +++ b/clientv3/naming/grpc.go @@ -16,99 +16,112 @@ package naming import ( "encoding/json" - "time" - "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/mvcc/mvccpb" + etcd "github.com/coreos/etcd/clientv3" "golang.org/x/net/context" - "google.golang.org/grpc/naming" -) -const ( - gRPCNamingPrefix = "/github.com/grpc/" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/naming" ) // GRPCResolver creates a grpc.Watcher for a target to track its resolution changes. type GRPCResolver struct { - // Client is an initialized etcd client - Client *clientv3.Client - // Timeout for update/delete request. - Timeout time.Duration + // Client is an initialized etcd client. + Client *etcd.Client } -func (gr *GRPCResolver) Add(target string, addr string, metadata interface{}) error { - update := naming.Update{ - Addr: addr, - Metadata: metadata, +func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update) (err error) { + switch nm.Op { + case naming.Add: + var v []byte + if v, err = json.Marshal(nm); err != nil { + return grpc.Errorf(codes.InvalidArgument, err.Error()) + } + _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v)) + case naming.Delete: + _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr) + default: + return grpc.Errorf(codes.InvalidArgument, "naming: bad naming op") } - val, err := json.Marshal(update) - if err != nil { - return err - } - - ctx := context.Background() - if gr.Timeout != 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(context.Background(), gr.Timeout) - defer cancel() - } - - _, err = gr.Client.KV.Put(ctx, gRPCNamingPrefix+target, string(val)) - return err -} - -func (gr *GRPCResolver) Delete(target string) error { - ctx := context.Background() - if gr.Timeout != 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(context.Background(), gr.Timeout) - defer cancel() - } - - _, err := gr.Client.Delete(ctx, gRPCNamingPrefix+target) return err } func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) { - cctx, cancel := context.WithCancel(context.Background()) - - wch := gr.Client.Watch(cctx, gRPCNamingPrefix+target) - - w := &gRPCWatcher{ - cancel: cancel, - wch: wch, - } - + ctx, cancel := context.WithCancel(context.Background()) + w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel} return w, nil } type gRPCWatcher struct { + c *etcd.Client + target string + ctx context.Context cancel context.CancelFunc - wch clientv3.WatchChan + wch etcd.WatchChan + err error } +// Next gets the next set of updates from the etcd resolver. +// Calls to Next should be serialized; concurrent calls are not safe since +// there is no way to reconcile the update ordering. func (gw *gRPCWatcher) Next() ([]*naming.Update, error) { + if gw.wch == nil { + // first Next() returns all addresses + return gw.firstNext() + } + if gw.err != nil { + return nil, gw.err + } + + // process new events on target/* wr, ok := <-gw.wch if !ok { - return nil, wr.Err() + gw.err = grpc.Errorf(codes.Unavailable, "naming: watch closed") + return nil, gw.err + } + if gw.err = wr.Err(); gw.err != nil { + return nil, gw.err } updates := make([]*naming.Update, 0, len(wr.Events)) - for _, e := range wr.Events { + var jupdate naming.Update + var err error switch e.Type { - case mvccpb.PUT: - var jupdate naming.Update - err := json.Unmarshal(e.Kv.Value, &jupdate) - if err != nil { - continue - } + case etcd.EventTypePut: + err = json.Unmarshal(e.Kv.Value, &jupdate) + jupdate.Op = naming.Add + case etcd.EventTypeDelete: + err = json.Unmarshal(e.PrevKv.Value, &jupdate) + jupdate.Op = naming.Delete + } + if err == nil { updates = append(updates, &jupdate) - case mvccpb.DELETE: - updates = append(updates, &naming.Update{Op: naming.Delete}) } } + return updates, nil +} +func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) { + // Use serialized request so resolution still works if the target etcd + // server is partitioned away from the quorum. + resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable()) + if gw.err = err; err != nil { + return nil, err + } + + updates := make([]*naming.Update, 0, len(resp.Kvs)) + for _, kv := range resp.Kvs { + var jupdate naming.Update + if err := json.Unmarshal(kv.Value, &jupdate); err != nil { + continue + } + updates = append(updates, &jupdate) + } + + opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()} + gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...) return updates, nil } diff --git a/clientv3/naming/grpc_test.go b/clientv3/naming/grpc_test.go index 8d0248376..ad2d20662 100644 --- a/clientv3/naming/grpc_test.go +++ b/clientv3/naming/grpc_test.go @@ -15,11 +15,14 @@ package naming import ( + "encoding/json" "reflect" "testing" + "golang.org/x/net/context" "google.golang.org/grpc/naming" + etcd "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/integration" "github.com/coreos/etcd/pkg/testutil" ) @@ -40,7 +43,8 @@ func TestGRPCResolver(t *testing.T) { } defer w.Close() - err = r.Add("foo", "127.0.0.1", "metadata") + addOp := naming.Update{Op: naming.Add, Addr: "127.0.0.1", Metadata: "metadata"} + err = r.Update(context.TODO(), "foo", addOp) if err != nil { t.Fatal("failed to add foo", err) } @@ -60,7 +64,8 @@ func TestGRPCResolver(t *testing.T) { t.Fatalf("up = %#v, want %#v", us[0], wu) } - err = r.Delete("foo") + delOp := naming.Update{Op: naming.Delete, Addr: "127.0.0.1"} + err = r.Update(context.TODO(), "foo", delOp) us, err = w.Next() if err != nil { @@ -68,10 +73,63 @@ func TestGRPCResolver(t *testing.T) { } wu = &naming.Update{ - Op: naming.Delete, + Op: naming.Delete, + Addr: "127.0.0.1", + Metadata: "metadata", } if !reflect.DeepEqual(us[0], wu) { t.Fatalf("up = %#v, want %#v", us[0], wu) } } + +// TestGRPCResolverMultiInit ensures the resolver will initialize +// correctly with multiple hosts and correctly receive multiple +// updates in a single revision. +func TestGRPCResolverMulti(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + c := clus.RandClient() + + v, verr := json.Marshal(naming.Update{Addr: "127.0.0.1", Metadata: "md"}) + if verr != nil { + t.Fatal(verr) + } + if _, err := c.Put(context.TODO(), "foo/host", string(v)); err != nil { + t.Fatal(err) + } + if _, err := c.Put(context.TODO(), "foo/host2", string(v)); err != nil { + t.Fatal(err) + } + + r := GRPCResolver{c} + + w, err := r.Resolve("foo") + if err != nil { + t.Fatal("failed to resolve foo", err) + } + defer w.Close() + + updates, nerr := w.Next() + if nerr != nil { + t.Fatal(nerr) + } + if len(updates) != 2 { + t.Fatalf("expected two updates, got %+v", updates) + } + + _, err = c.Txn(context.TODO()).Then(etcd.OpDelete("foo/host"), etcd.OpDelete("foo/host2")).Commit() + if err != nil { + t.Fatal(err) + } + + updates, nerr = w.Next() + if nerr != nil { + t.Fatal(nerr) + } + if len(updates) != 2 || (updates[0].Op != naming.Delete && updates[1].Op != naming.Delete) { + t.Fatalf("expected two updates, got %+v", updates) + } +}