endpoints: implement Update method for EndpointManager.

- Add integration test for endpoints and resolver.
release-3.5
limeng01 2021-02-04 16:00:00 +08:00
parent e897daaebc
commit 571ed502d4
5 changed files with 312 additions and 38 deletions

View File

@ -4,37 +4,69 @@ package endpoints
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/naming/endpoints/internal"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type endpointManager struct {
// TODO: To be implemented, tracked by: https://github.com/etcd-io/etcd/issues/12652
// Client is an initialized etcd client.
client *clientv3.Client
target string
}
// NewManager creates an endpoint manager which implements the interface of 'Manager'.
func NewManager(client *clientv3.Client, target string) (Manager, error) {
// To be implemented (https://github.com/etcd-io/etcd/issues/12652)
return nil, fmt.Errorf("Not implemented yet")
if client == nil {
return nil, errors.New("invalid etcd client")
}
if target == "" {
return nil, errors.New("invalid target")
}
em := &endpointManager{
client: client,
target: target,
}
return em, nil
}
func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) error {
// TODO: For loop in a single transaction:
internalUpdate := &internal.Update{} // translate UpdateWithOpts into json format.
switch internalUpdate.Op {
//case internal.Add:
// var v []byte
// if v, err = json.Marshal(internalUpdate); err != nil {
// return status.Error(codes.InvalidArgument, err.Error())
// }
// _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...)
//case internal.Delete:
// _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...)
//default:
// return status.Error(codes.InvalidArgument, "naming: bad naming op")
func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) (err error) {
ops := make([]clientv3.Op, 0, len(updates))
for _, update := range updates {
if !strings.HasPrefix(update.Key, m.target+"/") {
return status.Errorf(codes.InvalidArgument, "endpoints: endpoint key should be prefixed with %s/", m.target)
}
switch update.Op {
case Add:
internalUpdate := &internal.Update{
Op: internal.Add,
Addr: update.Endpoint.Addr,
Metadata: update.Endpoint.Metadata,
}
var v []byte
if v, err = json.Marshal(internalUpdate); err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
ops = append(ops, clientv3.OpPut(update.Key, string(v), update.Opts...))
case Delete:
ops = append(ops, clientv3.OpDelete(update.Key, update.Opts...))
default:
return status.Error(codes.InvalidArgument, "endpoints: bad update op")
}
}
return fmt.Errorf("Not implemented yet")
_, err = m.client.KV.Txn(ctx).Then(ops...).Commit()
return err
}
func (m *endpointManager) AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error {
@ -116,6 +148,19 @@ func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, er
}
func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) {
// TODO: Implementation
return nil, fmt.Errorf("Not implemented yet")
resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil {
return nil, err
}
eps := make(Key2EndpointMap)
for _, kv := range resp.Kvs {
var iup internal.Update
if err := json.Unmarshal(kv.Value, &iup); err != nil {
continue
}
eps[string(kv.Key)] = Endpoint{Addr: iup.Addr, Metadata: iup.Metadata}
}
return eps, nil
}

View File

@ -21,7 +21,9 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.3 h1:gyjaxf+svBWX08ZjK86iN9geUJF0H6gp2IRKX6Nf6/I=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
@ -64,6 +66,7 @@ golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -74,6 +77,7 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201009025420-dfb3f7c4e634 h1:bNEHhJCnrwMKNMmOx3yAynp5vs5/gRy+XWFtZFu7NBM=
golang.org/x/sys v0.0.0-20201009025420-dfb3f7c4e634/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
@ -87,6 +91,7 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=

View File

@ -0,0 +1,86 @@
package grpc_testing
import (
"context"
"fmt"
"net"
"google.golang.org/grpc"
testpb "google.golang.org/grpc/test/grpc_testing"
)
// StubServer is borrowed from the interal package of grpc-go.
// See https://github.com/grpc/grpc-go/blob/master/internal/stubserver/stubserver.go
// Since it cannot be imported directly, we have to copy and paste it here,
// and useless code for our testing is removed.
// StubServer is a server that is easy to customize within individual test
// cases.
type StubServer struct {
// Guarantees we satisfy this interface; panics if unimplemented methods are called.
testpb.TestServiceServer
EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error)
UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error)
FullDuplexCallF func(stream testpb.TestService_FullDuplexCallServer) error
s *grpc.Server
// Network and Address are parameters for Listen. Defaults will be used if these are empty before Start.
Network string
Address string
cleanups []func() // Lambdas executed in Stop(); populated by Start().
}
// EmptyCall is the handler for testpb.EmptyCall.
func (ss *StubServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return ss.EmptyCallF(ctx, in)
}
// UnaryCall is the handler for testpb.UnaryCall.
func (ss *StubServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return ss.UnaryCallF(ctx, in)
}
// FullDuplexCall is the handler for testpb.FullDuplexCall.
func (ss *StubServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
return ss.FullDuplexCallF(stream)
}
// Start starts the server and creates a client connected to it.
func (ss *StubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption) error {
if ss.Network == "" {
ss.Network = "tcp"
}
if ss.Address == "" {
ss.Address = "localhost:0"
}
lis, err := net.Listen(ss.Network, ss.Address)
if err != nil {
return fmt.Errorf("net.Listen(%q, %q) = %v", ss.Network, ss.Address, err)
}
ss.Address = lis.Addr().String()
ss.cleanups = append(ss.cleanups, func() { lis.Close() })
s := grpc.NewServer(sopts...)
testpb.RegisterTestServiceServer(s, ss)
go s.Serve(lis)
ss.cleanups = append(ss.cleanups, s.Stop)
ss.s = s
return nil
}
// Stop stops ss and cleans up all resources it consumed.
func (ss *StubServer) Stop() {
for i := len(ss.cleanups) - 1; i >= 0; i-- {
ss.cleanups[i]()
}
}
// Addr gets the address the server listening on.
func (ss *StubServer) Addr() string {
return ss.Address
}

View File

@ -133,3 +133,84 @@ func TestEndpointManagerAtomicity(t *testing.T) {
t.Fatalf("expected two delete updates, got %+v", updates)
}
}
func TestEndpointManagerCRUD(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
em, err := endpoints.NewManager(clus.RandClient(), "foo")
if err != nil {
t.Fatal("failed to create EndpointManager", err)
}
// Add
k1 := "foo/a1"
e1 := endpoints.Endpoint{Addr: "127.0.0.1", Metadata: "metadata1"}
err = em.AddEndpoint(context.TODO(), k1, e1)
if err != nil {
t.Fatal("failed to add", k1, err)
}
k2 := "foo/a2"
e2 := endpoints.Endpoint{Addr: "127.0.0.2", Metadata: "metadata2"}
err = em.AddEndpoint(context.TODO(), k2, e2)
if err != nil {
t.Fatal("failed to add", k2, err)
}
eps, err := em.List(context.TODO())
if err != nil {
t.Fatal("failed to list foo")
}
if len(eps) != 2 {
t.Fatalf("unexpected the number of endpoints: %d", len(eps))
}
if !reflect.DeepEqual(eps[k1], e1) {
t.Fatalf("unexpected endpoints: %s", k1)
}
if !reflect.DeepEqual(eps[k2], e2) {
t.Fatalf("unexpected endpoints: %s", k2)
}
// Delete
err = em.DeleteEndpoint(context.TODO(), k1)
if err != nil {
t.Fatal("failed to delete", k2, err)
}
eps, err = em.List(context.TODO())
if err != nil {
t.Fatal("failed to list foo")
}
if len(eps) != 1 {
t.Fatalf("unexpected the number of endpoints: %d", len(eps))
}
if !reflect.DeepEqual(eps[k2], e2) {
t.Fatalf("unexpected endpoints: %s", k2)
}
// Update
k3 := "foo/a3"
e3 := endpoints.Endpoint{Addr: "127.0.0.3", Metadata: "metadata3"}
updates := []*endpoints.UpdateWithOpts{
{Update: endpoints.Update{Op: endpoints.Add, Key: k3, Endpoint: e3}},
{Update: endpoints.Update{Op: endpoints.Delete, Key: k2}},
}
err = em.Update(context.TODO(), updates)
if err != nil {
t.Fatal("failed to update", err)
}
eps, err = em.List(context.TODO())
if err != nil {
t.Fatal("failed to list foo")
}
if len(eps) != 1 {
t.Fatalf("unexpected the number of endpoints: %d", len(eps))
}
if !reflect.DeepEqual(eps[k3], e3) {
t.Fatalf("unexpected endpoints: %s", k3)
}
}

View File

@ -15,56 +15,113 @@
package naming_test
import (
"bytes"
"context"
"testing"
"time"
etcdnaming "go.etcd.io/etcd/client/v3/naming"
"go.etcd.io/etcd/client/v3/naming/endpoints"
"go.etcd.io/etcd/client/v3/naming/resolver"
"google.golang.org/grpc"
grpctest "go.etcd.io/etcd/pkg/v3/grpc_testing"
"go.etcd.io/etcd/pkg/v3/testutil"
"go.etcd.io/etcd/tests/v3/integration"
"google.golang.org/grpc"
testpb "google.golang.org/grpc/test/grpc_testing"
)
// This test mimics scenario described in grpc_naming.md doc.
func TestEtcdGrpcResolver(t *testing.T) {
t.Skip("Not implemented yet")
defer testutil.AfterTest(t)
// s1 := // TODO: Dummy GRPC service listening on 127.0.0.1:20000
// s2 := // TODO: Dummy GRPC service listening on 127.0.0.1:20001
s1PayloadBody := []byte{'1'}
s1 := newDummyStubServer(s1PayloadBody)
if err := s1.Start(nil); err != nil {
t.Fatal("failed to start dummy grpc server (s1)", err)
}
defer s1.Stop()
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
s2PayloadBody := []byte{'2'}
s2 := newDummyStubServer(s2PayloadBody)
if err := s2.Start(nil); err != nil {
t.Fatal("failed to start dummy grpc server (s2)", err)
}
defer s2.Stop()
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
em, err := endpoints.NewManager(clus.RandClient(), "foo")
em, err := endpoints.NewManager(clus.Client(0), "foo")
if err != nil {
t.Fatal("failed to create EndpointManager", err)
}
e1 := endpoints.Endpoint{Addr: "127.0.0.1:20000"}
e2 := endpoints.Endpoint{Addr: "127.0.0.1:20001"}
e1 := endpoints.Endpoint{Addr: s1.Addr()}
e2 := endpoints.Endpoint{Addr: s2.Addr()}
err = em.AddEndpoint(context.TODO(), "foo/e1", e1)
if err != nil {
t.Fatal("failed to add foo", err)
}
etcdResolver, err := resolver.NewBuilder(clus.RandClient())
conn, err := grpc.Dial("etc://foo", grpc.WithResolvers(etcdResolver))
r := &etcdnaming.GRPCResolver{Client: clus.Client(1)}
b := grpc.RoundRobin(r)
conn, err := grpc.Dial("foo", grpc.WithInsecure(), grpc.WithBalancer(b))
if err != nil {
t.Fatal("failed to connect to foo (e1)", err)
t.Fatal("failed to connect to foo", err)
}
defer conn.Close()
// TODO: send requests to conn, ensure s1 received it.
c := testpb.NewTestServiceClient(conn)
resp, err := c.UnaryCall(context.TODO(), &testpb.SimpleRequest{}, grpc.WaitForReady(true))
if err != nil {
t.Fatal("failed to invoke rpc to foo (e1)", err)
}
if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), s1PayloadBody) {
t.Fatalf("unexpected response from foo (e1): %s", resp.GetPayload().GetBody())
}
em.DeleteEndpoint(context.TODO(), "foo/e1")
em.AddEndpoint(context.TODO(), "foo/e2", e2)
// TODO: Send requests to conn and make sure s2 receive it.
// Might require restarting s1 to break the existing (open) connection.
// We use a loop with deadline of 30s to avoid test getting flake
// as it's asynchronous for gRPC Client to update underlying connections.
maxRetries := 300
retryPeriod := 100 * time.Millisecond
retries := 0
for {
time.Sleep(retryPeriod)
retries++
conn.GetState() // this line is to avoid compiler warning that conn is unused.
resp, err = c.UnaryCall(context.TODO(), &testpb.SimpleRequest{})
if err != nil {
if retries < maxRetries {
continue
}
t.Fatal("failed to invoke rpc to foo (e2)", err)
}
if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), s2PayloadBody) {
if retries < maxRetries {
continue
}
t.Fatalf("unexpected response from foo (e2): %s", resp.GetPayload().GetBody())
}
break
}
}
func newDummyStubServer(body []byte) *grpctest.StubServer {
return &grpctest.StubServer{
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{
Payload: &testpb.Payload{
Type: testpb.PayloadType_COMPRESSABLE,
Body: body,
},
}, nil
},
}
}