Introduce grpc-1.30+ compatible client/v3/naming API.

This is not yet implementation, just API and tests to be filled
with implementation in next CLs,
tracked by: https://github.com/etcd-io/etcd/issues/12652

We propose here 3 packages:
 - clientv3/naming/endpoints ->
    That is abstraction layer over etcd that allows to write, read &
    watch Endpoints information. It's independent from GRPC API. It hides
    the storage details.

 - clientv3/naming/endpoints/internal ->
    That contains the grpc's compatible Update class to preserve the
    internal JSON mashalling format.

 - clientv3/naming/resolver ->
   That implements the GRPC resolver API, such that etcd can be
   used for connection.Dial in grpc.

Please see the grpc_naming.md document changes & grpcproxy/cluster.go
new integration, to see how the new abstractions work.
release-3.5
Piotr Tabor 2021-01-21 22:13:38 +01:00
parent 90d1b838ad
commit 5d7c1db3a9
9 changed files with 536 additions and 28 deletions

View File

@ -8,40 +8,41 @@ etcd provides a gRPC resolver to support an alternative name system that fetches
## Using etcd discovery with go-grpc
The etcd client provides a gRPC resolver for resolving gRPC endpoints with an etcd backend. The resolver is initialized with an etcd client and given a target for resolution:
The etcd client provides a gRPC resolver for resolving gRPC endpoints with an etcd backend. The resolver is initialized with an etcd client:
```go
import (
"go.etcd.io/etcd/v3/clientv3"
etcdnaming "go.etcd.io/etcd/v3/clientv3/naming"
resolver "go.etcd.io/etcd/v3/clientv3/naming/resolver"
"google.golang.org/grpc"
)
...
cli, cerr := clientv3.NewFromURL("http://localhost:2379")
r := &etcdnaming.GRPCResolver{Client: cli}
b := grpc.RoundRobin(r)
conn, gerr := grpc.Dial("my-service", grpc.WithBalancer(b), grpc.WithBlock(), ...)
etcdResolver, err := resolver.NewBuilder(clus.RandClient());
conn, gerr := grpc.Dial("etcd://foo/bar/my-service", grpc.WithResolvers(etcdResolver))
```
## Managing service endpoints
The etcd resolver treats all keys under the prefix of the resolution target following a "/" (e.g., "my-service/") with JSON-encoded go-grpc `naming.Update` values as potential service endpoints. Endpoints are added to the service by creating new keys and removed from the service by deleting keys.
The etcd resolver treats all keys under the prefix of the resolution target following a "/" (e.g., "foo/bar/my-service/")
with JSON-encoded (historically go-grpc `naming.Update`) values as potential service endpoints.
Endpoints are added to the service by creating new keys and removed from the service by deleting keys.
### Adding an endpoint
New endpoints can be added to the service through `etcdctl`:
```sh
ETCDCTL_API=3 etcdctl put my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}'
ETCDCTL_API=3 etcdctl put foo/bar/my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}'
```
The etcd client's `GRPCResolver.Update` method can also register new endpoints with a key matching the `Addr`:
The etcd client's `endpoints.Manager` method can also register new endpoints with a key matching the `Addr`:
```go
r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Add, Addr: "1.2.3.4", Metadata: "..."})
em := endpoints.NewManager(client, "foo/bar/my-service")
err := em.AddEndpoint(context.TODO(),"foo/bar/my-service/e1", endpoints.Endpoint{Addr:"1.2.3.4"});
```
### Deleting an endpoint
@ -49,13 +50,14 @@ r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Add, Addr: "1.2.
Hosts can be deleted from the service through `etcdctl`:
```sh
ETCDCTL_API=3 etcdctl del my-service/1.2.3.4
ETCDCTL_API=3 etcdctl del foo/bar/my-service/1.2.3.4
```
The etcd client's `GRPCResolver.Update` method also supports deleting endpoints:
The etcd client's `endpoints.Manager` method also supports deleting endpoints:
```go
r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Delete, Addr: "1.2.3.4"})
em := endpoints.NewManager(client, "foo/bar/my-service")
err := em.DeleteEndpoint(context.TODO(), "foo/bar/my-service/e1");
```
### Registering an endpoint with a lease
@ -67,3 +69,21 @@ lease=`ETCDCTL_API=3 etcdctl lease grant 5 | cut -f2 -d' '`
ETCDCTL_API=3 etcdctl put --lease=$lease my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}'
ETCDCTL_API=3 etcdctl lease keep-alive $lease
```
In the golang:
```go
em := endpoints.NewManager(client, "foo/bar/my-service")
err := endpoints.AddEndpoint(context.TODO(), "foo/bar/my-service/e1", endpoints.Endpoint{Addr:"1.2.3.4"});
```
### Atomically updating endpoints
If it's desired to modify multiple endpoints in a single transaction, `endpoints.Manager` can be used directly:
```
em := endpoints.NewManager(c, "foo")
err := em.Update(context.TODO(), []*endpoints.UpdateWithOpts{
endpoints.NewDeleteUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.4"}),
endpoints.NewAddUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.14"})})
```

View File

@ -12,45 +12,48 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// Package naming provides an etcd-backed gRPC resolver for discovering gRPC services.
// Package naming provides:
// - subpackage endpoints: an abstraction layer to store and read endpoints
// information from etcd.
// - subpackage resolver: an etcd-backed gRPC resolver for discovering gRPC
// services based on the endpoints configuration
//
// To use, first import the packages:
//
// import (
// "go.etcd.io/etcd/client/v3"
// etcdnaming "go.etcd.io/etcd/client/v3/naming"
//
// "go.etcd.io/etcd/client/v3/naming/endpoints"
// "go.etcd.io/etcd/client/v3/naming/resolver"
// "google.golang.org/grpc"
// "google.golang.org/grpc/naming"
// )
//
// First, register new endpoint addresses for a service:
//
// func etcdAdd(c *clientv3.Client, service, addr string) error {
// r := &etcdnaming.GRPCResolver{Client: c}
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Add, Addr: addr})
// em := endpoints.NewManager(c, service)
// return em.AddEndpoint(c.Ctx(), service+"/"+addr, endpoints.Endpoint{Addr:addr});
// }
//
// Dial an RPC service using the etcd gRPC resolver and a gRPC Balancer:
//
// func etcdDial(c *clientv3.Client, service string) (*grpc.ClientConn, error) {
// r := &etcdnaming.GRPCResolver{Client: c}
// b := grpc.RoundRobin(r)
// return grpc.Dial(service, grpc.WithBalancer(b))
// etcdResolver, err := resolver.NewBuilder(c);
// if err { return nil, err }
// return grpc.Dial("etc://foo", grpc.WithResolvers(etcdResolver))
// }
//
// Optionally, force delete an endpoint:
//
// func etcdDelete(c *clientv3, service, addr string) error {
// r := &etcdnaming.GRPCResolver{Client: c}
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Delete, Addr: "1.2.3.4"})
// em := endpoints.NewManager(c, service)
// return em.DeleteEndpoint(c.Ctx(), service+"/"+addr)
// }
//
// Or register an expiring endpoint with a lease:
//
// func etcdLeaseAdd(c *clientv3.Client, lid clientv3.LeaseID, service, addr string) error {
// r := &etcdnaming.GRPCResolver{Client: c}
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Add, Addr: addr}, clientv3.WithLease(lid))
// func etcdAdd(c *clientv3.Client, lid clientv3.LeaseID, service, addr string) error {
// em := endpoints.NewManager(c, service)
// return em.AddEndpoint(c.Ctx(), service+"/"+addr, endpoints.Endpoint{Addr:addr}, clientv3.WithLease(lid));
// }
//
package naming

View File

@ -0,0 +1,82 @@
package endpoints
import (
"context"
clientv3 "go.etcd.io/etcd/client/v3"
)
// Endpoint represents a single address the connection can be established with.
//
// Inspired by: https://pkg.go.dev/google.golang.org/grpc/resolver#Address.
// Please document etcd version since which version each field is supported.
type Endpoint struct {
// Addr is the server address on which a connection will be established.
// Since etcd 3.1
Addr string
// Metadata is the information associated with Addr, which may be used
// to make load balancing decision.
// Since etcd 3.1
Metadata interface{}
}
type Operation uint8
const (
// Add indicates an Endpoint is added.
Add Operation = iota
// Delete indicates an existing address is deleted.
Delete
)
// Update describes a single edit action of an Endpoint.
type Update struct {
// Op - action Add or Delete.
Op Operation
Key string
Endpoint Endpoint
}
// WatchChannel is used to deliver notifications about endpoints updates.
type WatchChannel chan []*Update
// Key2EndpointMap maps etcd key into struct describing the endpoint.
type Key2EndpointMap map[string]Endpoint
// UpdateWithOpts describes endpoint update (add or delete) together
// with etcd options (e.g. to attach an endpoint to a lease).
type UpdateWithOpts struct {
Update
Opts []clientv3.OpOption
}
// NewAddUpdateOpts constructs UpdateWithOpts for endpoint registration.
func NewAddUpdateOpts(key string, endpoint Endpoint, opts ...clientv3.OpOption) *UpdateWithOpts {
return &UpdateWithOpts{Update: Update{Op: Add, Key: key, Endpoint: endpoint}, Opts: opts}
}
// NewDeleteUpdateOpts constructs UpdateWithOpts for endpoint deletion.
func NewDeleteUpdateOpts(key string, opts ...clientv3.OpOption) *UpdateWithOpts {
return &UpdateWithOpts{Update: Update{Op: Delete, Key: key}, Opts: opts}
}
// Manager can be used to add/remove & inspect endpoints stored in etcd for
// a particular target.
type Manager interface {
// Update allows to atomically add/remove a few endpoints from etcd.
Update(ctx context.Context, updates []*UpdateWithOpts) error
// AddEndpoint registers a single endpoint in etcd.
// For more advanced use-cases use the Update method.
AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error
// DeleteEndpoint deletes a single endpoint stored in etcd.
// For more advanced use-cases use the Update method.
DeleteEndpoint(ctx context.Context, key string, opts ...clientv3.OpOption) error
// List returns all the endpoints for the current target as a map.
List(ctx context.Context) (Key2EndpointMap, error)
// NewWatchChannel creates a channel that populates or endpoint updates.
// Cancel the 'ctx' to close the watcher.
NewWatchChannel(ctx context.Context) (WatchChannel, error)
}

View File

@ -0,0 +1,121 @@
package endpoints
// TODO: The API is not yet implemented.
import (
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/naming/endpoints/internal"
)
type endpointManager struct {
// TODO: To be implemented, tracked by: https://github.com/etcd-io/etcd/issues/12652
}
func NewManager(client *clientv3.Client, target string) (Manager, error) {
// To be implemented (https://github.com/etcd-io/etcd/issues/12652)
return nil, fmt.Errorf("Not implemented yet")
}
func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) error {
// TODO: For loop in a single transaction:
internalUpdate := &internal.Update{} // translate UpdateWithOpts into json format.
switch internalUpdate.Op {
//case internal.Add:
// var v []byte
// if v, err = json.Marshal(internalUpdate); err != nil {
// return status.Error(codes.InvalidArgument, err.Error())
// }
// _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...)
//case internal.Delete:
// _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...)
//default:
// return status.Error(codes.InvalidArgument, "naming: bad naming op")
}
return fmt.Errorf("Not implemented yet")
}
func (m *endpointManager) AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error {
return m.Update(ctx, []*UpdateWithOpts{NewAddUpdateOpts(key, endpoint, opts...)})
}
func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts ...clientv3.OpOption) error {
return m.Update(ctx, []*UpdateWithOpts{NewDeleteUpdateOpts(key, opts...)})
}
func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) {
return nil, fmt.Errorf("Not implemented yet")
// TODO: Implementation to be inspired by:
// Next gets the next set of updates from the etcd resolver.
//// Calls to Next should be serialized; concurrent calls are not safe since
//// there is no way to reconcile the update ordering.
//func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
// if gw.wch == nil {
// // first Next() returns all addresses
// return gw.firstNext()
// }
// if gw.err != nil {
// return nil, gw.err
// }
//
// // process new events on target/*
// wr, ok := <-gw.wch
// if !ok {
// gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error())
// return nil, gw.err
// }
// if gw.err = wr.Err(); gw.err != nil {
// return nil, gw.err
// }
//
// updates := make([]*naming.Update, 0, len(wr.Events))
// for _, e := range wr.Events {
// var jupdate naming.Update
// var err error
// switch e.Type {
// case etcd.EventTypePut:
// err = json.Unmarshal(e.Kv.Value, &jupdate)
// jupdate.Op = naming.Add
// case etcd.EventTypeDelete:
// err = json.Unmarshal(e.PrevKv.Value, &jupdate)
// jupdate.Op = naming.Delete
// default:
// continue
// }
// if err == nil {
// updates = append(updates, &jupdate)
// }
// }
// return updates, nil
//}
//
//func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
// // Use serialized request so resolution still works if the target etcd
// // server is partitioned away from the quorum.
// resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
// if gw.err = err; err != nil {
// return nil, err
// }
//
// updates := make([]*naming.Update, 0, len(resp.Kvs))
// for _, kv := range resp.Kvs {
// var jupdate naming.Update
// if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
// continue
// }
// updates = append(updates, &jupdate)
// }
//
// opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
// gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)
// return updates, nil
//}
}
func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) {
// TODO: Implementation
return nil, fmt.Errorf("Not implemented yet")
}

View File

@ -0,0 +1,38 @@
package internal
// Operation describes action performed on endpoint (addition vs deletion).
// Must stay JSON-format compatible with:
// https://pkg.go.dev/google.golang.org/grpc@v1.29.1/naming#Operation
type Operation uint8
const (
// Add indicates a new address is added.
Add Operation = iota
// Delete indicates an existing address is deleted.
Delete
)
// Update defines a persistent (JSON marshalled) format representing
// endpoint within the etcd storage.
//
// As the format can be persisted by one version of etcd client library and
// read by other the format must be kept backward compatible and
// in particular must be superset of the grpc(<=1.29.1) naming.Update structure:
// https://pkg.go.dev/google.golang.org/grpc@v1.29.1/naming#Update
//
// Please document since which version of etcd-client given property is supported.
// Please keep the naming consistent with e.g. https://pkg.go.dev/google.golang.org/grpc/resolver#Address.
//
// Notice that it is not valid having both empty string Addr and nil Metadata in an Update.
type Update struct {
// Op indicates the operation of the update.
// Since etcd 3.1.
Op Operation
// Addr is the updated address. It is empty string if there is no address update.
// Since etcd 3.1.
Addr string
// Metadata is the updated metadata. It is nil if there is no metadata update.
// Metadata is not required for a custom naming implementation.
// Since etcd 3.1.
Metadata interface{}
}

View File

@ -0,0 +1,24 @@
package resolver
import (
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc/resolver"
)
type builder struct {
// ...
}
func (b builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
// To be implemented...
// Using endpoints.NewWatcher() to subscribe for endpoints changes.
return nil, nil
}
func (b builder) Scheme() string {
return "etcd"
}
func NewBuilder(client *clientv3.Client) (resolver.Builder, error) {
return builder{}, nil
}

View File

@ -0,0 +1,135 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package naming_test
import (
"context"
"reflect"
"testing"
etcd "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/naming/endpoints"
"go.etcd.io/etcd/pkg/v3/testutil"
"go.etcd.io/etcd/tests/v3/integration"
)
func TestEndpointManager(t *testing.T) {
t.Skip("Not implemented yet")
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
em, err := endpoints.NewManager(clus.RandClient(), "foo")
if err != nil {
t.Fatal("failed to create EndpointManager", err)
}
ctx, watchCancel := context.WithCancel(context.Background())
defer watchCancel()
w, err := em.NewWatchChannel(ctx)
if err != nil {
t.Fatal("failed to establish watch", err)
}
e1 := endpoints.Endpoint{Addr: "127.0.0.1", Metadata: "metadata"}
err = em.AddEndpoint(context.TODO(), "foo/a1", e1)
if err != nil {
t.Fatal("failed to add foo", err)
}
us := <-w
if us == nil {
t.Fatal("failed to get update", err)
}
wu := endpoints.Update{
Op: endpoints.Add,
Key: "foo/a1",
Endpoint: e1,
}
if !reflect.DeepEqual(us[0], wu) {
t.Fatalf("up = %#v, want %#v", us[0], wu)
}
err = em.DeleteEndpoint(context.TODO(), "foo/a1")
if err != nil {
t.Fatalf("failed to udpate %v", err)
}
us = <-w
if err != nil {
t.Fatalf("failed to get udpate %v", err)
}
wu = endpoints.Update{
Op: endpoints.Delete,
Key: "foo/a1",
}
if !reflect.DeepEqual(us, wu) {
t.Fatalf("up = %#v, want %#v", us[1], wu)
}
}
// TestEndpointManagerAtomicity ensures the resolver will initialize
// correctly with multiple hosts and correctly receive multiple
// updates in a single revision.
func TestEndpointManagerAtomicity(t *testing.T) {
t.Skip("Not implemented yet")
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
c := clus.RandClient()
em, err := endpoints.NewManager(c, "foo")
if err != nil {
t.Fatal("failed to create EndpointManager", err)
}
err = em.Update(context.TODO(), []*endpoints.UpdateWithOpts{
endpoints.NewAddUpdateOpts("foo/host", endpoints.Endpoint{Addr: "127.0.0.1:2000"}),
endpoints.NewAddUpdateOpts("foo/host2", endpoints.Endpoint{Addr: "127.0.0.1:2001"})})
if err != nil {
t.Fatal(err)
}
ctx, watchCancel := context.WithCancel(context.Background())
defer watchCancel()
w, err := em.NewWatchChannel(ctx)
if err != nil {
t.Fatal(err)
}
updates := <-w
if len(updates) != 2 {
t.Fatalf("expected two updates, got %+v", updates)
}
_, err = c.Txn(context.TODO()).Then(etcd.OpDelete("foo/host"), etcd.OpDelete("foo/host2")).Commit()
if err != nil {
t.Fatal(err)
}
updates = <-w
if len(updates) != 2 || (updates[0].Op != endpoints.Delete && updates[1].Op != endpoints.Delete) {
t.Fatalf("expected two delete updates, got %+v", updates)
}
}

View File

@ -0,0 +1,15 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package naming_test
import (
"testing"
"go.etcd.io/etcd/pkg/v3/testutil"
)
func TestMain(m *testing.M) {
testutil.MustTestMainWithLeakDetection(m)
}

View File

@ -0,0 +1,70 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package naming_test
import (
"context"
"testing"
"go.etcd.io/etcd/client/v3/naming/endpoints"
"go.etcd.io/etcd/client/v3/naming/resolver"
"google.golang.org/grpc"
"go.etcd.io/etcd/pkg/v3/testutil"
"go.etcd.io/etcd/tests/v3/integration"
)
// This test mimics scenario described in grpc_naming.md doc.
func TestEtcdGrpcResolver(t *testing.T) {
t.Skip("Not implemented yet")
defer testutil.AfterTest(t)
// s1 := // TODO: Dummy GRPC service listening on 127.0.0.1:20000
// s2 := // TODO: Dummy GRPC service listening on 127.0.0.1:20001
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
em, err := endpoints.NewManager(clus.RandClient(), "foo")
if err != nil {
t.Fatal("failed to create EndpointManager", err)
}
e1 := endpoints.Endpoint{Addr: "127.0.0.1:20000"}
e2 := endpoints.Endpoint{Addr: "127.0.0.1:20001"}
err = em.AddEndpoint(context.TODO(), "foo/e1", e1)
if err != nil {
t.Fatal("failed to add foo", err)
}
etcdResolver, err := resolver.NewBuilder(clus.RandClient())
conn, err := grpc.Dial("etc://foo", grpc.WithResolvers(etcdResolver))
if err != nil {
t.Fatal("failed to connect to foo (e1)", err)
}
// TODO: send requests to conn, ensure s1 received it.
em.DeleteEndpoint(context.TODO(), "foo/e1")
em.AddEndpoint(context.TODO(), "foo/e2", e2)
// TODO: Send requests to conn and make sure s2 receive it.
// Might require restarting s1 to break the existing (open) connection.
conn.GetState() // this line is to avoid compiler warning that conn is unused.
}