Get rid of legacy client/v3/naming API.

Update grpcproxy to use the new abstractions.
release-3.5
Piotr Tabor 2021-01-21 22:17:10 +01:00
parent 3fddea9669
commit a836a8045b
6 changed files with 60 additions and 329 deletions

View File

@ -43,7 +43,7 @@ func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts)
ops := make([]clientv3.Op, 0, len(updates))
for _, update := range updates {
if !strings.HasPrefix(update.Key, m.target+"/") {
return status.Errorf(codes.InvalidArgument, "endpoints: endpoint key should be prefixed with %s/", m.target)
return status.Errorf(codes.InvalidArgument, "endpoints: endpoint key should be prefixed with '%s/' got: '%s'", m.target, update.Key)
}
switch update.Op {

View File

@ -1,133 +0,0 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package naming
import (
"context"
"encoding/json"
"fmt"
etcd "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/naming"
"google.golang.org/grpc/status"
)
var ErrWatcherClosed = fmt.Errorf("naming: watch closed")
// GRPCResolver creates a grpc.Watcher for a target to track its resolution changes.
type GRPCResolver struct {
// Client is an initialized etcd client.
Client *etcd.Client
}
func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update, opts ...etcd.OpOption) (err error) {
switch nm.Op {
case naming.Add:
var v []byte
if v, err = json.Marshal(nm); err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
_, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...)
case naming.Delete:
_, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...)
default:
return status.Error(codes.InvalidArgument, "naming: bad naming op")
}
return err
}
func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) {
ctx, cancel := context.WithCancel(context.Background())
w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel}
return w, nil
}
type gRPCWatcher struct {
c *etcd.Client
target string
ctx context.Context
cancel context.CancelFunc
wch etcd.WatchChan
err error
}
// Next gets the next set of updates from the etcd resolver.
// Calls to Next should be serialized; concurrent calls are not safe since
// there is no way to reconcile the update ordering.
func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
if gw.wch == nil {
// first Next() returns all addresses
return gw.firstNext()
}
if gw.err != nil {
return nil, gw.err
}
// process new events on target/*
wr, ok := <-gw.wch
if !ok {
gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error())
return nil, gw.err
}
if gw.err = wr.Err(); gw.err != nil {
return nil, gw.err
}
updates := make([]*naming.Update, 0, len(wr.Events))
for _, e := range wr.Events {
var jupdate naming.Update
var err error
switch e.Type {
case etcd.EventTypePut:
err = json.Unmarshal(e.Kv.Value, &jupdate)
jupdate.Op = naming.Add
case etcd.EventTypeDelete:
err = json.Unmarshal(e.PrevKv.Value, &jupdate)
jupdate.Op = naming.Delete
default:
continue
}
if err == nil {
updates = append(updates, &jupdate)
}
}
return updates, nil
}
func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
// Use serialized request so resolution still works if the target etcd
// server is partitioned away from the quorum.
resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
if gw.err = err; err != nil {
return nil, err
}
updates := make([]*naming.Update, 0, len(resp.Kvs))
for _, kv := range resp.Kvs {
var jupdate naming.Update
if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
continue
}
updates = append(updates, &jupdate)
}
opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)
return updates, nil
}
func (gw *gRPCWatcher) Close() { gw.cancel() }

View File

@ -22,13 +22,11 @@ import (
"sync"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/naming"
"go.etcd.io/etcd/client/v3/naming/endpoints"
"golang.org/x/time/rate"
"go.uber.org/zap"
"golang.org/x/time/rate"
gnaming "google.golang.org/grpc/naming"
)
// allow maximum 1 retry per second
@ -38,39 +36,51 @@ type clusterProxy struct {
lg *zap.Logger
clus clientv3.Cluster
ctx context.Context
gr *naming.GRPCResolver
// advertise client URL
advaddr string
prefix string
em endpoints.Manager
umu sync.RWMutex
umap map[string]gnaming.Update
umap map[string]endpoints.Endpoint
}
// NewClusterProxy takes optional prefix to fetch grpc-proxy member endpoints.
// The returned channel is closed when there is grpc-proxy endpoint registered
// and the client's context is canceled so the 'register' loop returns.
// TODO: Expand the API to report creation errors
func NewClusterProxy(lg *zap.Logger, c *clientv3.Client, advaddr string, prefix string) (pb.ClusterServer, <-chan struct{}) {
if lg == nil {
lg = zap.NewNop()
}
var em endpoints.Manager
if advaddr != "" && prefix != "" {
var err error
if em, err = endpoints.NewManager(c, prefix); err != nil {
lg.Error("failed to provision endpointsManager", zap.String("prefix", prefix), zap.Error(err))
return nil, nil
}
}
cp := &clusterProxy{
lg: lg,
clus: c.Cluster,
ctx: c.Ctx(),
gr: &naming.GRPCResolver{Client: c},
advaddr: advaddr,
prefix: prefix,
umap: make(map[string]gnaming.Update),
umap: make(map[string]endpoints.Endpoint),
em: em,
}
donec := make(chan struct{})
if advaddr != "" && prefix != "" {
if em != nil {
go func() {
defer close(donec)
cp.resolve(prefix)
cp.establishEndpointWatch(prefix)
}()
return cp, donec
}
@ -79,38 +89,36 @@ func NewClusterProxy(lg *zap.Logger, c *clientv3.Client, advaddr string, prefix
return cp, donec
}
func (cp *clusterProxy) resolve(prefix string) {
func (cp *clusterProxy) establishEndpointWatch(prefix string) {
rm := rate.NewLimiter(rate.Limit(resolveRetryRate), resolveRetryRate)
for rm.Wait(cp.ctx) == nil {
wa, err := cp.gr.Resolve(prefix)
wc, err := cp.em.NewWatchChannel(cp.ctx)
if err != nil {
cp.lg.Warn("failed to resolve prefix", zap.String("prefix", prefix), zap.Error(err))
cp.lg.Warn("failed to establish endpoint watch", zap.String("prefix", prefix), zap.Error(err))
continue
}
cp.monitor(wa)
cp.monitor(wc)
}
}
func (cp *clusterProxy) monitor(wa gnaming.Watcher) {
for cp.ctx.Err() == nil {
ups, err := wa.Next()
if err != nil {
cp.lg.Warn("clusterProxy watcher error", zap.Error(err))
if rpctypes.ErrorDesc(err) == naming.ErrWatcherClosed.Error() {
return
func (cp *clusterProxy) monitor(wa endpoints.WatchChannel) {
for {
select {
case <-cp.ctx.Done():
cp.lg.Info("watching endpoints interrupted", zap.Error(cp.ctx.Err()))
return
case updates := <-wa:
cp.umu.Lock()
for _, up := range updates {
switch up.Op {
case endpoints.Add:
cp.umap[up.Endpoint.Addr] = up.Endpoint
case endpoints.Delete:
delete(cp.umap, up.Endpoint.Addr)
}
}
cp.umu.Unlock()
}
cp.umu.Lock()
for i := range ups {
switch ups[i].Op {
case gnaming.Add:
cp.umap[ups[i].Addr] = *ups[i]
case gnaming.Delete:
delete(cp.umap, ups[i].Addr)
}
}
cp.umu.Unlock()
}
}

View File

@ -20,11 +20,10 @@ import (
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/client/v3/naming"
"go.etcd.io/etcd/client/v3/naming/endpoints"
"go.uber.org/zap"
"golang.org/x/time/rate"
gnaming "google.golang.org/grpc/naming"
)
// allow maximum 1 retry per second
@ -68,8 +67,12 @@ func registerSession(lg *zap.Logger, c *clientv3.Client, prefix string, addr str
return nil, err
}
gr := &naming.GRPCResolver{Client: c}
if err = gr.Update(c.Ctx(), prefix, gnaming.Update{Op: gnaming.Add, Addr: addr, Metadata: getMeta()}, clientv3.WithLease(ss.Lease())); err != nil {
em, err := endpoints.NewManager(c, prefix)
if err != nil {
return nil, err
}
endpoint := endpoints.Endpoint{Addr: addr, Metadata: getMeta()}
if err = em.AddEndpoint(c.Ctx(), prefix+"/"+addr, endpoint, clientv3.WithLease(ss.Lease())); err != nil {
return nil, err
}

View File

@ -1,139 +0,0 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clientv3test
import (
"context"
"encoding/json"
"reflect"
"testing"
etcd "go.etcd.io/etcd/client/v3"
namingv3 "go.etcd.io/etcd/client/v3/naming"
"go.etcd.io/etcd/pkg/v3/testutil"
"go.etcd.io/etcd/tests/v3/integration"
"google.golang.org/grpc/naming"
)
func TestGRPCResolver(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
r := namingv3.GRPCResolver{
Client: clus.RandClient(),
}
w, err := r.Resolve("foo")
if err != nil {
t.Fatal("failed to resolve foo", err)
}
defer w.Close()
addOp := naming.Update{Op: naming.Add, Addr: "127.0.0.1", Metadata: "metadata"}
err = r.Update(context.TODO(), "foo", addOp)
if err != nil {
t.Fatal("failed to add foo", err)
}
us, err := w.Next()
if err != nil {
t.Fatal("failed to get udpate", err)
}
wu := &naming.Update{
Op: naming.Add,
Addr: "127.0.0.1",
Metadata: "metadata",
}
if !reflect.DeepEqual(us[0], wu) {
t.Fatalf("up = %#v, want %#v", us[0], wu)
}
delOp := naming.Update{Op: naming.Delete, Addr: "127.0.0.1"}
err = r.Update(context.TODO(), "foo", delOp)
if err != nil {
t.Fatalf("failed to udpate %v", err)
}
us, err = w.Next()
if err != nil {
t.Fatalf("failed to get udpate %v", err)
}
wu = &naming.Update{
Op: naming.Delete,
Addr: "127.0.0.1",
Metadata: "metadata",
}
if !reflect.DeepEqual(us[0], wu) {
t.Fatalf("up = %#v, want %#v", us[0], wu)
}
}
// TestGRPCResolverMulti ensures the resolver will initialize
// correctly with multiple hosts and correctly receive multiple
// updates in a single revision.
func TestGRPCResolverMulti(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
c := clus.RandClient()
v, verr := json.Marshal(naming.Update{Addr: "127.0.0.1", Metadata: "md"})
if verr != nil {
t.Fatal(verr)
}
if _, err := c.Put(context.TODO(), "foo/host", string(v)); err != nil {
t.Fatal(err)
}
if _, err := c.Put(context.TODO(), "foo/host2", string(v)); err != nil {
t.Fatal(err)
}
r := namingv3.GRPCResolver{Client: c}
w, err := r.Resolve("foo")
if err != nil {
t.Fatal("failed to resolve foo", err)
}
defer w.Close()
updates, nerr := w.Next()
if nerr != nil {
t.Fatal(nerr)
}
if len(updates) != 2 {
t.Fatalf("expected two updates, got %+v", updates)
}
_, err = c.Txn(context.TODO()).Then(etcd.OpDelete("foo/host"), etcd.OpDelete("foo/host2")).Commit()
if err != nil {
t.Fatal(err)
}
updates, nerr = w.Next()
if nerr != nil {
t.Fatal(nerr)
}
if len(updates) != 2 || (updates[0].Op != naming.Delete && updates[1].Op != naming.Delete) {
t.Fatalf("expected two updates, got %+v", updates)
}
}

View File

@ -19,13 +19,12 @@ import (
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/naming"
"go.etcd.io/etcd/client/v3/naming/endpoints"
"go.etcd.io/etcd/pkg/v3/testutil"
"go.etcd.io/etcd/server/v3/proxy/grpcproxy"
"go.etcd.io/etcd/tests/v3/integration"
"go.uber.org/zap"
gnaming "google.golang.org/grpc/naming"
)
func TestRegister(t *testing.T) {
@ -37,26 +36,16 @@ func TestRegister(t *testing.T) {
paddr := clus.Members[0].GRPCAddr()
testPrefix := "test-name"
wa := createWatcher(t, cli, testPrefix)
ups, err := wa.Next()
if err != nil {
t.Fatal(err)
}
if len(ups) != 0 {
t.Fatalf("len(ups) expected 0, got %d (%v)", len(ups), ups)
}
wa := mustCreateWatcher(t, cli, testPrefix)
donec := grpcproxy.Register(zap.NewExample(), cli, testPrefix, paddr, 5)
ups, err = wa.Next()
if err != nil {
t.Fatal(err)
}
ups := <-wa
if len(ups) != 1 {
t.Fatalf("len(ups) expected 1, got %d (%v)", len(ups), ups)
}
if ups[0].Addr != paddr {
t.Fatalf("ups[0].Addr expected %q, got %q", paddr, ups[0].Addr)
if ups[0].Endpoint.Addr != paddr {
t.Fatalf("ups[0].Addr expected %q, got %q", paddr, ups[0].Endpoint.Addr)
}
cli.Close()
@ -68,11 +57,14 @@ func TestRegister(t *testing.T) {
}
}
func createWatcher(t *testing.T, c *clientv3.Client, prefix string) gnaming.Watcher {
gr := &naming.GRPCResolver{Client: c}
watcher, err := gr.Resolve(prefix)
func mustCreateWatcher(t *testing.T, c *clientv3.Client, prefix string) endpoints.WatchChannel {
em, err := endpoints.NewManager(c, prefix)
if err != nil {
t.Fatalf("failed to create endpoints.Manager: %v", err)
}
wc, err := em.NewWatchChannel(c.Ctx())
if err != nil {
t.Fatalf("failed to resolve %q (%v)", prefix, err)
}
return watcher
return wc
}