Merge pull request #12671 from ptabor/20210207-grpc-del-balancer

clientv3: Replace balancer with upstream grpc solution
release-3.5
Piotr Tabor 2021-02-17 22:45:13 +01:00 committed by GitHub
commit b67ed4e4aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 232 additions and 1484 deletions

View File

@ -170,15 +170,6 @@
}
]
},
{
"project": "github.com/google/uuid",
"licenses": [
{
"type": "BSD 3-clause \"New\" or \"Revised\" License",
"confidence": 0.9663865546218487
}
]
},
{
"project": "github.com/gorilla/websocket",
"licenses": [

View File

@ -1,328 +0,0 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package balancer implements client balancer.
package balancer
import (
"strconv"
"sync"
"time"
"go.etcd.io/etcd/client/v3/balancer/connectivity"
"go.etcd.io/etcd/client/v3/balancer/picker"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc/balancer"
grpcconnectivity "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
_ "google.golang.org/grpc/resolver/dns" // register DNS resolver
_ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
)
// NOTE: Ensure
// - `baseBalancer` satisfies `balancer.V2Balancer`.
var (
_ balancer.V2Balancer = (*baseBalancer)(nil)
)
// Config defines balancer configurations.
type Config struct {
// Policy configures balancer policy.
Policy picker.Policy
// Picker implements gRPC picker.
// Leave empty if "Policy" field is not custom.
// TODO: currently custom policy is not supported.
// Picker picker.Picker
// Name defines an additional name for balancer.
// Useful for balancer testing to avoid register conflicts.
// If empty, defaults to policy name.
Name string
// Logger configures balancer logging.
// If nil, logs are discarded.
Logger *zap.Logger
}
// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
// must be invoked at initialization time.
func RegisterBuilder(cfg Config) {
bb := &builder{cfg}
balancer.Register(bb)
bb.cfg.Logger.Debug(
"registered balancer",
zap.String("policy", bb.cfg.Policy.String()),
zap.String("name", bb.cfg.Name),
)
}
type builder struct {
cfg Config
}
// Build is called initially when creating "ccBalancerWrapper".
// "grpc.Dial" is called to this client connection.
// Then, resolved addresses will be handled via "HandleResolvedAddrs".
func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
bb := &baseBalancer{
id: strconv.FormatInt(time.Now().UnixNano(), 36),
policy: b.cfg.Policy,
name: b.cfg.Name,
lg: b.cfg.Logger,
addrToSc: make(map[resolver.Address]balancer.SubConn),
scToAddr: make(map[balancer.SubConn]resolver.Address),
scToSt: make(map[balancer.SubConn]grpcconnectivity.State),
currentConn: nil,
connectivityRecorder: connectivity.New(b.cfg.Logger),
// initialize picker always returns "ErrNoSubConnAvailable"
picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
}
// TODO: support multiple connections
bb.mu.Lock()
bb.currentConn = cc
bb.mu.Unlock()
bb.lg.Info(
"built balancer",
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
zap.String("resolver-target", cc.Target()),
)
return bb
}
// Name implements "grpc/balancer.Builder" interface.
func (b *builder) Name() string { return b.cfg.Name }
// Balancer defines client balancer interface.
type Balancer interface {
// Balancer is called on specified client connection. Client initiates gRPC
// connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved
// addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs".
// For each resolved address, balancer calls "balancer.ClientConn.NewSubConn".
// "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state
// changes, thus requires failover logic in this method.
balancer.Balancer
// Picker calls "Pick" for every client request.
picker.Picker
}
type baseBalancer struct {
id string
policy picker.Policy
name string
lg *zap.Logger
mu sync.RWMutex
addrToSc map[resolver.Address]balancer.SubConn
scToAddr map[balancer.SubConn]resolver.Address
scToSt map[balancer.SubConn]grpcconnectivity.State
currentConn balancer.ClientConn
connectivityRecorder connectivity.Recorder
picker picker.Picker
}
// UpdateClientConnState implements "grpc/balancer.V2Balancer" interface.
func (bb *baseBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
return bb.handleResolvedWithError(ccs.ResolverState.Addresses, nil)
}
// ResolverError implements "grpc/balancer.V2Balancer" interface.
func (bb *baseBalancer) ResolverError(err error) {
bb.HandleResolvedAddrs(nil, err)
}
// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
// gRPC sends initial or updated resolved addresses from "Build".
func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
_ = bb.handleResolvedWithError(addrs, err)
}
// handleResolvedWithError is an implementation shared both by `HandleResolvedAddrs()`,
// which is part of the `Balancer` interface as well as `UpdateClientConnState()`,
// which is part of the `V2Balancer` interface.
func (bb *baseBalancer) handleResolvedWithError(addrs []resolver.Address, err error) error {
if err != nil {
bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
return err
}
bb.lg.Info("resolved",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.Strings("addresses", addrsToStrings(addrs)),
)
bb.mu.Lock()
defer bb.mu.Unlock()
resolved := make(map[resolver.Address]struct{})
warnedErrors := []error{}
for _, addr := range addrs {
resolved[addr] = struct{}{}
if _, ok := bb.addrToSc[addr]; !ok {
sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
if err != nil {
bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
warnedErrors = append(warnedErrors, err)
continue
}
bb.lg.Info("created subconn", zap.String("address", addr.Addr))
bb.addrToSc[addr] = sc
bb.scToAddr[sc] = addr
bb.scToSt[sc] = grpcconnectivity.Idle
sc.Connect()
}
}
for addr, sc := range bb.addrToSc {
if _, ok := resolved[addr]; !ok {
// was removed by resolver or failed to create subconn
bb.currentConn.RemoveSubConn(sc)
delete(bb.addrToSc, addr)
bb.lg.Info(
"removed subconn",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("address", addr.Addr),
zap.String("subconn", scToString(sc)),
)
// Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown.
// The entry will be deleted in HandleSubConnStateChange.
// (DO NOT) delete(bb.scToAddr, sc)
// (DO NOT) delete(bb.scToSt, sc)
}
}
// TODO: Consider just returning `ErrBadResolverState` if `warnedErrors` is
// not empty.
return multierr.Combine(warnedErrors...)
}
// UpdateSubConnState implements "grpc/balancer.V2Balancer" interface.
func (bb *baseBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
bb.HandleSubConnStateChange(sc, s.ConnectivityState)
}
// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconnectivity.State) {
bb.mu.Lock()
defer bb.mu.Unlock()
old, ok := bb.scToSt[sc]
if !ok {
bb.lg.Warn(
"state change for an unknown subconn",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("subconn", scToString(sc)),
zap.Int("subconn-size", len(bb.scToAddr)),
zap.String("state", s.String()),
)
return
}
bb.lg.Info(
"state changed",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.Bool("connected", s == grpcconnectivity.Ready),
zap.String("subconn", scToString(sc)),
zap.Int("subconn-size", len(bb.scToAddr)),
zap.String("address", bb.scToAddr[sc].Addr),
zap.String("old-state", old.String()),
zap.String("new-state", s.String()),
)
bb.scToSt[sc] = s
switch s {
case grpcconnectivity.Idle:
sc.Connect()
case grpcconnectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scToSt. Remove state for this sc here.
delete(bb.scToAddr, sc)
delete(bb.scToSt, sc)
}
oldAggrState := bb.connectivityRecorder.GetCurrentState()
bb.connectivityRecorder.RecordTransition(old, s)
// Update balancer picker when one of the following happens:
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
if (s == grpcconnectivity.Ready) != (old == grpcconnectivity.Ready) ||
(bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure) != (oldAggrState == grpcconnectivity.TransientFailure) {
bb.updatePicker()
}
bb.currentConn.UpdateBalancerState(bb.connectivityRecorder.GetCurrentState(), bb.picker)
}
func (bb *baseBalancer) updatePicker() {
if bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure {
bb.picker = picker.NewErr(balancer.ErrTransientFailure)
bb.lg.Info(
"updated picker to transient error picker",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
)
return
}
// only pass ready subconns to picker
scToAddr := make(map[balancer.SubConn]resolver.Address)
for addr, sc := range bb.addrToSc {
if st, ok := bb.scToSt[sc]; ok && st == grpcconnectivity.Ready {
scToAddr[sc] = addr
}
}
bb.picker = picker.New(picker.Config{
Policy: bb.policy,
Logger: bb.lg,
SubConnToResolverAddress: scToAddr,
})
bb.lg.Info(
"updated picker",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
zap.Strings("subconn-ready", scsToStrings(scToAddr)),
zap.Int("subconn-size", len(scToAddr)),
)
}
// Close implements "grpc/balancer.Balancer" interface.
// Close is a nop because base balancer doesn't have internal state to clean up,
// and it doesn't need to call RemoveSubConn for the SubConns.
func (bb *baseBalancer) Close() {
// TODO
}

View File

@ -1,310 +0,0 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package balancer
import (
"context"
"fmt"
"strings"
"testing"
"time"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/v3/balancer/picker"
"go.etcd.io/etcd/client/v3/balancer/resolver/endpoint"
"go.etcd.io/etcd/client/v3/mock/mockserver"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)
// TestRoundRobinBalancedResolvableNoFailover ensures that
// requests to a resolvable endpoint can be balanced between
// multiple, if any, nodes. And there needs be no failover.
func TestRoundRobinBalancedResolvableNoFailover(t *testing.T) {
testCases := []struct {
name string
serverCount int
reqN int
network string
}{
{name: "rrBalanced_1", serverCount: 1, reqN: 5, network: "tcp"},
{name: "rrBalanced_1_unix_sockets", serverCount: 1, reqN: 5, network: "unix"},
{name: "rrBalanced_3", serverCount: 3, reqN: 7, network: "tcp"},
{name: "rrBalanced_5", serverCount: 5, reqN: 10, network: "tcp"},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ms, err := mockserver.StartMockServersOnNetwork(tc.serverCount, tc.network)
if err != nil {
t.Fatalf("failed to start mock servers: %v", err)
}
defer ms.Stop()
var eps []string
for _, svr := range ms.Servers {
eps = append(eps, svr.ResolverAddress().Addr)
}
rsv, err := endpoint.NewResolverGroup("nofailover")
if err != nil {
t.Fatal(err)
}
defer rsv.Close()
rsv.SetEndpoints(eps)
name := genName()
cfg := Config{
Policy: picker.RoundrobinBalanced,
Name: name,
Logger: zap.NewExample(),
}
RegisterBuilder(cfg)
conn, err := grpc.Dial(fmt.Sprintf("endpoint://nofailover/*"), grpc.WithInsecure(), grpc.WithBalancerName(name))
if err != nil {
t.Fatalf("failed to dial mock server: %v", err)
}
defer conn.Close()
cli := pb.NewKVClient(conn)
reqFunc := func(ctx context.Context) (picked string, err error) {
var p peer.Peer
_, err = cli.Range(ctx, &pb.RangeRequest{Key: []byte("/x")}, grpc.Peer(&p))
if p.Addr != nil {
picked = p.Addr.String()
}
return picked, err
}
prev, switches := "", 0
for i := 0; i < tc.reqN; i++ {
picked, err := reqFunc(context.Background())
if err != nil {
t.Fatalf("#%d: unexpected failure %v", i, err)
}
if prev == "" {
prev = picked
continue
}
if prev != picked {
switches++
}
prev = picked
}
if tc.serverCount > 1 && switches < tc.reqN-3 { // -3 for initial resolutions
// TODO: FIX ME
t.Skipf("expected balanced loads for %d requests, got switches %d", tc.reqN, switches)
}
})
}
}
// TestRoundRobinBalancedResolvableFailoverFromServerFail ensures that
// loads be rebalanced while one server goes down and comes back.
func TestRoundRobinBalancedResolvableFailoverFromServerFail(t *testing.T) {
serverCount := 5
ms, err := mockserver.StartMockServers(serverCount)
if err != nil {
t.Fatalf("failed to start mock servers: %s", err)
}
defer ms.Stop()
var eps []string
for _, svr := range ms.Servers {
eps = append(eps, svr.ResolverAddress().Addr)
}
rsv, err := endpoint.NewResolverGroup("serverfail")
if err != nil {
t.Fatal(err)
}
defer rsv.Close()
rsv.SetEndpoints(eps)
name := genName()
cfg := Config{
Policy: picker.RoundrobinBalanced,
Name: name,
Logger: zap.NewExample(),
}
RegisterBuilder(cfg)
conn, err := grpc.Dial(fmt.Sprintf("endpoint://serverfail/mock.server"), grpc.WithInsecure(), grpc.WithBalancerName(name))
if err != nil {
t.Fatalf("failed to dial mock server: %s", err)
}
defer conn.Close()
cli := pb.NewKVClient(conn)
reqFunc := func(ctx context.Context) (picked string, err error) {
var p peer.Peer
_, err = cli.Range(ctx, &pb.RangeRequest{Key: []byte("/x")}, grpc.Peer(&p))
if p.Addr != nil {
picked = p.Addr.String()
}
return picked, err
}
// stop first server, loads should be redistributed
// stopped server should never be picked
ms.StopAt(0)
available := make(map[string]struct{})
for i := 1; i < serverCount; i++ {
available[eps[i]] = struct{}{}
}
reqN := 10
prev, switches := "", 0
for i := 0; i < reqN; i++ {
picked, err := reqFunc(context.Background())
if err != nil && strings.Contains(err.Error(), "transport is closing") {
continue
}
if prev == "" { // first failover
if eps[0] == picked {
t.Fatalf("expected failover from %q, picked %q", eps[0], picked)
}
prev = picked
continue
}
if _, ok := available[picked]; !ok {
t.Fatalf("picked unavailable address %q (available %v)", picked, available)
}
if prev != picked {
switches++
}
prev = picked
}
if switches < reqN-3 { // -3 for initial resolutions + failover
// TODO: FIX ME!
t.Skipf("expected balanced loads for %d requests, got switches %d", reqN, switches)
}
// now failed server comes back
ms.StartAt(0)
// enough time for reconnecting to recovered server
time.Sleep(time.Second)
prev, switches = "", 0
recoveredAddr, recovered := eps[0], 0
available[recoveredAddr] = struct{}{}
for i := 0; i < 2*reqN; i++ {
picked, err := reqFunc(context.Background())
if err != nil {
t.Fatalf("#%d: unexpected failure %v", i, err)
}
if prev == "" {
prev = picked
continue
}
if _, ok := available[picked]; !ok {
t.Fatalf("#%d: picked unavailable address %q (available %v)", i, picked, available)
}
if prev != picked {
switches++
}
if picked == recoveredAddr {
recovered++
}
prev = picked
}
if switches < reqN-3 { // -3 for initial resolutions
t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches)
}
if recovered < reqN/serverCount {
t.Fatalf("recovered server %q got only %d requests", recoveredAddr, recovered)
}
}
// TestRoundRobinBalancedResolvableFailoverFromRequestFail ensures that
// loads be rebalanced while some requests are failed.
func TestRoundRobinBalancedResolvableFailoverFromRequestFail(t *testing.T) {
serverCount := 5
ms, err := mockserver.StartMockServers(serverCount)
if err != nil {
t.Fatalf("failed to start mock servers: %s", err)
}
defer ms.Stop()
var eps []string
available := make(map[string]struct{})
for _, svr := range ms.Servers {
eps = append(eps, svr.ResolverAddress().Addr)
available[svr.Address] = struct{}{}
}
rsv, err := endpoint.NewResolverGroup("requestfail")
if err != nil {
t.Fatal(err)
}
defer rsv.Close()
rsv.SetEndpoints(eps)
name := genName()
cfg := Config{
Policy: picker.RoundrobinBalanced,
Name: name,
Logger: zap.NewExample(),
}
RegisterBuilder(cfg)
conn, err := grpc.Dial(fmt.Sprintf("endpoint://requestfail/mock.server"), grpc.WithInsecure(), grpc.WithBalancerName(name))
if err != nil {
t.Fatalf("failed to dial mock server: %s", err)
}
defer conn.Close()
cli := pb.NewKVClient(conn)
reqFunc := func(ctx context.Context) (picked string, err error) {
var p peer.Peer
_, err = cli.Range(ctx, &pb.RangeRequest{Key: []byte("/x")}, grpc.Peer(&p))
if p.Addr != nil {
picked = p.Addr.String()
}
return picked, err
}
reqN := 20
prev, switches := "", 0
for i := 0; i < reqN; i++ {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if i%2 == 0 {
cancel()
}
picked, err := reqFunc(ctx)
if i%2 == 0 {
if s, ok := status.FromError(err); ok && s.Code() != codes.Canceled || picked != "" {
t.Fatalf("#%d: expected %v, got %v", i, context.Canceled, err)
}
continue
}
if prev == "" && picked != "" {
prev = picked
continue
}
if _, ok := available[picked]; !ok {
t.Fatalf("#%d: picked unavailable address %q (available %v)", i, picked, available)
}
if prev != picked {
switches++
}
prev = picked
}
if switches < reqN/2-3 { // -3 for initial resolutions + failover
t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches)
}
}

View File

@ -1,93 +0,0 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package connectivity implements client connectivity operations.
package connectivity
import (
"sync"
"go.uber.org/zap"
"google.golang.org/grpc/connectivity"
)
// Recorder records gRPC connectivity.
type Recorder interface {
GetCurrentState() connectivity.State
RecordTransition(oldState, newState connectivity.State)
}
// New returns a new Recorder.
func New(lg *zap.Logger) Recorder {
return &recorder{lg: lg}
}
// recorder takes the connectivity states of multiple SubConns
// and returns one aggregated connectivity state.
// ref. https://github.com/grpc/grpc-go/blob/master/balancer/balancer.go
type recorder struct {
lg *zap.Logger
mu sync.RWMutex
cur connectivity.State
numReady uint64 // Number of addrConns in ready state.
numConnecting uint64 // Number of addrConns in connecting state.
numTransientFailure uint64 // Number of addrConns in transientFailure.
}
func (rc *recorder) GetCurrentState() (state connectivity.State) {
rc.mu.RLock()
defer rc.mu.RUnlock()
return rc.cur
}
// RecordTransition records state change happening in subConn and based on that
// it evaluates what aggregated state should be.
//
// - If at least one SubConn in Ready, the aggregated state is Ready;
// - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
// - Else the aggregated state is TransientFailure.
//
// Idle and Shutdown are not considered.
//
// ref. https://github.com/grpc/grpc-go/blob/master/balancer/balancer.go
func (rc *recorder) RecordTransition(oldState, newState connectivity.State) {
rc.mu.Lock()
defer rc.mu.Unlock()
for idx, state := range []connectivity.State{oldState, newState} {
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
switch state {
case connectivity.Ready:
rc.numReady += updateVal
case connectivity.Connecting:
rc.numConnecting += updateVal
case connectivity.TransientFailure:
rc.numTransientFailure += updateVal
default:
rc.lg.Warn("connectivity recorder received unknown state", zap.String("connectivity-state", state.String()))
}
}
switch { // must be exclusive, no overlap
case rc.numReady > 0:
rc.cur = connectivity.Ready
case rc.numConnecting > 0:
rc.cur = connectivity.Connecting
default:
rc.cur = connectivity.TransientFailure
}
}

View File

@ -1,16 +0,0 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package picker defines/implements client balancer picker policy.
package picker

View File

@ -1,53 +0,0 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package picker
import (
"context"
"google.golang.org/grpc/balancer"
)
// NOTE: Ensure
// - `errPickerV2` satisfies `balancer.V2Picker`.
var (
_ balancer.V2Picker = (*errPickerV2)(nil)
)
// NewErr returns a picker that always returns err on "Pick".
func NewErr(err error) Picker {
return &errPicker{p: Error, err: err}
}
type errPicker struct {
p Policy
err error
}
func (ep *errPicker) String() string {
return ep.p.String()
}
func (ep *errPicker) Pick(context.Context, balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
return nil, nil, ep.err
}
type errPickerV2 struct {
errPicker
}
func (ep2 *errPickerV2) Pick(opts balancer.PickInfo) (balancer.PickResult, error) {
return balancer.PickResult{}, ep2.errPicker.err
}

View File

@ -1,91 +0,0 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package picker
import (
"fmt"
"go.uber.org/zap"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
)
// Picker defines balancer Picker methods.
type Picker interface {
balancer.Picker
String() string
}
// Config defines picker configuration.
type Config struct {
// Policy specifies etcd clientv3's built in balancer policy.
Policy Policy
// Logger defines picker logging object.
Logger *zap.Logger
// SubConnToResolverAddress maps each gRPC sub-connection to an address.
// Basically, it is a list of addresses that the Picker can pick from.
SubConnToResolverAddress map[balancer.SubConn]resolver.Address
}
// Policy defines balancer picker policy.
type Policy uint8
const (
// Error is error picker policy.
Error Policy = iota
// RoundrobinBalanced balances loads over multiple endpoints
// and implements failover in roundrobin fashion.
RoundrobinBalanced
// Custom defines custom balancer picker.
// TODO: custom picker is not supported yet.
Custom
)
func (p Policy) String() string {
switch p {
case Error:
return "picker-error"
case RoundrobinBalanced:
return "picker-roundrobin-balanced"
case Custom:
panic("'custom' picker policy is not supported yet")
default:
panic(fmt.Errorf("invalid balancer picker policy (%d)", p))
}
}
// New creates a new Picker.
func New(cfg Config) Picker {
switch cfg.Policy {
case Error:
panic("'error' picker policy is not supported here; use 'picker.NewErr'")
case RoundrobinBalanced:
return newRoundrobinBalanced(cfg)
case Custom:
panic("'custom' picker policy is not supported yet")
default:
panic(fmt.Errorf("invalid balancer picker policy (%d)", cfg.Policy))
}
}

View File

@ -1,111 +0,0 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package picker
import (
"context"
"sync"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
)
// NOTE: Ensure
// - `rrBalancedV2` satisfies `balancer.V2Picker`.
var (
_ balancer.V2Picker = (*rrBalancedV2)(nil)
)
// newRoundrobinBalanced returns a new roundrobin balanced picker.
func newRoundrobinBalanced(cfg Config) Picker {
scs := make([]balancer.SubConn, 0, len(cfg.SubConnToResolverAddress))
for sc := range cfg.SubConnToResolverAddress {
scs = append(scs, sc)
}
return &rrBalanced{
p: RoundrobinBalanced,
lg: cfg.Logger,
scs: scs,
scToAddr: cfg.SubConnToResolverAddress,
}
}
type rrBalanced struct {
p Policy
lg *zap.Logger
mu sync.RWMutex
next int
scs []balancer.SubConn
scToAddr map[balancer.SubConn]resolver.Address
}
func (rb *rrBalanced) String() string { return rb.p.String() }
// Pick is called for every client request.
func (rb *rrBalanced) Pick(_ context.Context, opts balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
rb.mu.RLock()
n := len(rb.scs)
rb.mu.RUnlock()
if n == 0 {
return nil, nil, balancer.ErrNoSubConnAvailable
}
rb.mu.Lock()
cur := rb.next
sc := rb.scs[cur]
picked := rb.scToAddr[sc].Addr
rb.next = (rb.next + 1) % len(rb.scs)
rb.mu.Unlock()
rb.lg.Debug(
"picked",
zap.String("picker", rb.p.String()),
zap.String("address", picked),
zap.Int("subconn-index", cur),
zap.Int("subconn-size", n),
)
doneFunc := func(info balancer.DoneInfo) {
// TODO: error handling?
fss := []zapcore.Field{
zap.Error(info.Err),
zap.String("picker", rb.p.String()),
zap.String("address", picked),
zap.Bool("success", info.Err == nil),
zap.Bool("bytes-sent", info.BytesSent),
zap.Bool("bytes-received", info.BytesReceived),
}
if info.Err == nil {
rb.lg.Debug("balancer done", fss...)
} else {
rb.lg.Warn("balancer failed", fss...)
}
}
return sc, doneFunc, nil
}
type rrBalancedV2 struct {
rrBalanced
}
func (rb2 *rrBalancedV2) Pick(opts balancer.PickInfo) (balancer.PickResult, error) {
sc, doneFunc, err := rb2.rrBalanced.Pick(context.TODO(), opts)
pr := balancer.PickResult{SubConn: sc, Done: doneFunc}
return pr, err
}

View File

@ -1,248 +0,0 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package endpoint resolves etcd entpoints using grpc targets of the form 'endpoint://<id>/<endpoint>'.
package endpoint
import (
"context"
"fmt"
"net"
"net/url"
"strings"
"sync"
"google.golang.org/grpc/resolver"
)
const scheme = "endpoint"
var (
targetPrefix = fmt.Sprintf("%s://", scheme)
bldr *builder
)
func init() {
bldr = &builder{
resolverGroups: make(map[string]*ResolverGroup),
}
resolver.Register(bldr)
}
type builder struct {
mu sync.RWMutex
resolverGroups map[string]*ResolverGroup
}
// NewResolverGroup creates a new ResolverGroup with the given id.
func NewResolverGroup(id string) (*ResolverGroup, error) {
return bldr.newResolverGroup(id)
}
// ResolverGroup keeps all endpoints of resolvers using a common endpoint://<id>/ target
// up-to-date.
type ResolverGroup struct {
mu sync.RWMutex
id string
endpoints []string
resolvers []*Resolver
}
func (e *ResolverGroup) addResolver(r *Resolver) {
e.mu.Lock()
addrs := epsToAddrs(e.endpoints...)
e.resolvers = append(e.resolvers, r)
e.mu.Unlock()
r.cc.NewAddress(addrs)
}
func (e *ResolverGroup) removeResolver(r *Resolver) {
e.mu.Lock()
for i, er := range e.resolvers {
if er == r {
e.resolvers = append(e.resolvers[:i], e.resolvers[i+1:]...)
break
}
}
e.mu.Unlock()
}
// SetEndpoints updates the endpoints for ResolverGroup. All registered resolver are updated
// immediately with the new endpoints.
func (e *ResolverGroup) SetEndpoints(endpoints []string) {
addrs := epsToAddrs(endpoints...)
e.mu.Lock()
e.endpoints = endpoints
for _, r := range e.resolvers {
r.cc.NewAddress(addrs)
}
e.mu.Unlock()
}
// Target constructs a endpoint target using the endpoint id of the ResolverGroup.
func (e *ResolverGroup) Target(endpoint string) string {
return Target(e.id, endpoint)
}
// Target constructs a endpoint resolver target.
func Target(id, endpoint string) string {
return fmt.Sprintf("%s://%s/%s", scheme, id, endpoint)
}
// IsTarget checks if a given target string in an endpoint resolver target.
func IsTarget(target string) bool {
return strings.HasPrefix(target, "endpoint://")
}
func (e *ResolverGroup) Close() {
bldr.close(e.id)
}
// Build creates or reuses an etcd resolver for the etcd cluster name identified by the authority part of the target.
func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
if len(target.Authority) < 1 {
return nil, fmt.Errorf("'etcd' target scheme requires non-empty authority identifying etcd cluster being routed to")
}
id := target.Authority
es, err := b.getResolverGroup(id)
if err != nil {
return nil, fmt.Errorf("failed to build resolver: %v", err)
}
r := &Resolver{
endpointID: id,
cc: cc,
}
es.addResolver(r)
return r, nil
}
func (b *builder) newResolverGroup(id string) (*ResolverGroup, error) {
b.mu.RLock()
_, ok := b.resolverGroups[id]
b.mu.RUnlock()
if ok {
return nil, fmt.Errorf("Endpoint already exists for id: %s", id)
}
es := &ResolverGroup{id: id}
b.mu.Lock()
b.resolverGroups[id] = es
b.mu.Unlock()
return es, nil
}
func (b *builder) getResolverGroup(id string) (*ResolverGroup, error) {
b.mu.RLock()
es, ok := b.resolverGroups[id]
b.mu.RUnlock()
if !ok {
return nil, fmt.Errorf("ResolverGroup not found for id: %s", id)
}
return es, nil
}
func (b *builder) close(id string) {
b.mu.Lock()
delete(b.resolverGroups, id)
b.mu.Unlock()
}
func (b *builder) Scheme() string {
return scheme
}
// Resolver provides a resolver for a single etcd cluster, identified by name.
type Resolver struct {
endpointID string
cc resolver.ClientConn
sync.RWMutex
}
// TODO: use balancer.epsToAddrs
func epsToAddrs(eps ...string) (addrs []resolver.Address) {
addrs = make([]resolver.Address, 0, len(eps))
for _, ep := range eps {
_, host, _ := ParseEndpoint(ep)
addrs = append(addrs, resolver.Address{Addr: ep, ServerName: host})
}
return addrs
}
func (*Resolver) ResolveNow(o resolver.ResolveNowOptions) {}
func (r *Resolver) Close() {
es, err := bldr.getResolverGroup(r.endpointID)
if err != nil {
return
}
es.removeResolver(r)
}
// ParseEndpoint endpoint parses an endpoint of the form
// (http|https)://<host>*|(unix|unixs)://<path>)
// and returns a protocol ('tcp' or 'unix'),
// host (or filepath if a unix socket),
// scheme (http, https, unix, unixs).
func ParseEndpoint(endpoint string) (proto string, host string, scheme string) {
proto = "tcp"
host = endpoint
url, uerr := url.Parse(endpoint)
if uerr != nil || !strings.Contains(endpoint, "://") {
return proto, host, scheme
}
scheme = url.Scheme
// strip scheme:// prefix since grpc dials by host
host = url.Host
switch url.Scheme {
case "http", "https":
case "unix", "unixs":
proto = "unix"
host = url.Host + url.Path
default:
proto, host = "", ""
}
return proto, host, scheme
}
// ParseTarget parses a endpoint://<id>/<endpoint> string and returns the parsed id and endpoint.
// If the target is malformed, an error is returned.
func ParseTarget(target string) (string, string, error) {
noPrefix := strings.TrimPrefix(target, targetPrefix)
if noPrefix == target {
return "", "", fmt.Errorf("malformed target, %s prefix is required: %s", targetPrefix, target)
}
parts := strings.SplitN(noPrefix, "/", 2)
if len(parts) != 2 {
return "", "", fmt.Errorf("malformed target, expected %s://<id>/<endpoint>, but got %s", scheme, target)
}
return parts[0], parts[1], nil
}
// Dialer dials a endpoint using net.Dialer.
// Context cancelation and timeout are supported.
func Dialer(ctx context.Context, dialEp string) (net.Conn, error) {
proto, host, _ := ParseEndpoint(dialEp)
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
dialer := &net.Dialer{}
if deadline, ok := ctx.Deadline(); ok {
dialer.Deadline = deadline
}
return dialer.DialContext(ctx, proto, host)
}

View File

@ -1,68 +0,0 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package balancer
import (
"fmt"
"net/url"
"sort"
"sync/atomic"
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
)
func scToString(sc balancer.SubConn) string {
return fmt.Sprintf("%p", sc)
}
func scsToStrings(scs map[balancer.SubConn]resolver.Address) (ss []string) {
ss = make([]string, 0, len(scs))
for sc, a := range scs {
ss = append(ss, fmt.Sprintf("%s (%s)", a.Addr, scToString(sc)))
}
sort.Strings(ss)
return ss
}
func addrsToStrings(addrs []resolver.Address) (ss []string) {
ss = make([]string, len(addrs))
for i := range addrs {
ss[i] = addrs[i].Addr
}
sort.Strings(ss)
return ss
}
func epsToAddrs(eps ...string) (addrs []resolver.Address) {
addrs = make([]resolver.Address, 0, len(eps))
for _, ep := range eps {
u, err := url.Parse(ep)
if err != nil {
addrs = append(addrs, resolver.Address{Addr: ep, Type: resolver.Backend})
continue
}
addrs = append(addrs, resolver.Address{Addr: u.Host, Type: resolver.Backend})
}
return addrs
}
var genN = new(uint32)
func genName() string {
now := time.Now().UnixNano()
return fmt.Sprintf("%X%X", now, atomic.AddUint32(genN, 1))
}

View File

@ -1,34 +0,0 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package balancer
import (
"reflect"
"testing"
"google.golang.org/grpc/resolver"
)
func Test_epsToAddrs(t *testing.T) {
eps := []string{"https://example.com:2379", "127.0.0.1:2379"}
exp := []resolver.Address{
{Addr: "example.com:2379", Type: resolver.Backend},
{Addr: "127.0.0.1:2379", Type: resolver.Backend},
}
rs := epsToAddrs(eps...)
if !reflect.DeepEqual(rs, exp) {
t.Fatalf("expected %v, got %v", exp, rs)
}
}

View File

@ -18,19 +18,15 @@ import (
"context"
"errors"
"fmt"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/google/uuid"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/v3/balancer"
"go.etcd.io/etcd/client/v3/balancer/picker"
"go.etcd.io/etcd/client/v3/balancer/resolver/endpoint"
"go.etcd.io/etcd/client/v3/credentials"
"go.etcd.io/etcd/client/v3/internal/endpoint"
"go.etcd.io/etcd/client/v3/internal/resolver"
"go.etcd.io/etcd/pkg/v3/logutil"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -43,31 +39,8 @@ import (
var (
ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
ErrOldCluster = errors.New("etcdclient: old cluster version")
roundRobinBalancerName = fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String())
)
func init() {
lg := zap.NewNop()
if os.Getenv("ETCD_CLIENT_DEBUG") != "" {
lcfg := logutil.DefaultZapLoggerConfig
lcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
var err error
lg, err = lcfg.Build() // info level logging
if err != nil {
panic(err)
}
}
// TODO: support custom balancer
balancer.RegisterBuilder(balancer.Config{
Policy: picker.RoundrobinBalanced,
Name: roundRobinBalancerName,
Logger: lg,
})
}
// Client provides and manages an etcd v3 client session.
type Client struct {
Cluster
@ -79,10 +52,10 @@ type Client struct {
conn *grpc.ClientConn
cfg Config
creds grpccredentials.TransportCredentials
resolverGroup *endpoint.ResolverGroup
mu *sync.RWMutex
cfg Config
creds grpccredentials.TransportCredentials
resolver *resolver.EtcdManualResolver
mu *sync.RWMutex
ctx context.Context
cancel context.CancelFunc
@ -152,9 +125,6 @@ func (c *Client) Close() error {
if c.Lease != nil {
c.Lease.Close()
}
if c.resolverGroup != nil {
c.resolverGroup.Close()
}
if c.conn != nil {
return toErr(c.ctx, c.conn.Close())
}
@ -181,7 +151,8 @@ func (c *Client) SetEndpoints(eps ...string) {
c.mu.Lock()
defer c.mu.Unlock()
c.cfg.Endpoints = eps
c.resolverGroup.SetEndpoints(eps)
c.resolver.SetEndpoints(eps)
}
// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
@ -212,29 +183,12 @@ func (c *Client) autoSync() {
err := c.Sync(ctx)
cancel()
if err != nil && err != c.ctx.Err() {
lg.Lvl(4).Infof("Auto sync endpoints failed: %v", err)
c.lg.Info("Auto sync endpoints failed.", zap.Error(err))
}
}
}
}
func (c *Client) processCreds(scheme string) (creds grpccredentials.TransportCredentials) {
creds = c.creds
switch scheme {
case "unix":
case "http":
creds = nil
case "https", "unixs":
if creds != nil {
break
}
creds = credentials.NewBundle(credentials.Config{}).TransportCredentials()
default:
creds = nil
}
return creds
}
// dialSetupOpts gives the dial opts prior to any authentication.
func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (opts []grpc.DialOption, err error) {
if c.cfg.DialKeepAliveTime > 0 {
@ -247,13 +201,12 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts
}
opts = append(opts, dopts...)
dialer := endpoint.Dialer
if creds != nil {
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithInsecure())
}
opts = append(opts, grpc.WithContextDialer(dialer))
grpc.WithDisableRetry()
// Interceptor retry and backoff.
// TODO: Replace all of clientv3/retry.go with interceptor based retry, or with
@ -272,15 +225,11 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts
// Dial connects to a single endpoint using the client's config.
func (c *Client) Dial(ep string) (*grpc.ClientConn, error) {
creds, err := c.directDialCreds(ep)
if err != nil {
return nil, err
}
// Use the grpc passthrough resolver to directly dial a single endpoint.
// This resolver passes through the 'unix' and 'unixs' endpoints schemes used
// by etcd without modification, allowing us to directly dial endpoints and
// using the same dial functions that we use for load balancer dialing.
return c.dial(fmt.Sprintf("passthrough:///%s", ep), creds)
creds := c.credentialsForEndpoint(ep)
// Using ad-hoc created resolver, to guarantee only explicitly given
// endpoint is used.
return c.dial(creds, grpc.WithResolvers(resolver.New(ep)))
}
func (c *Client) getToken(ctx context.Context) error {
@ -304,19 +253,17 @@ func (c *Client) getToken(ctx context.Context) error {
// dialWithBalancer dials the client's current load balanced resolver group. The scheme of the host
// of the provided endpoint determines the scheme used for all endpoints of the client connection.
func (c *Client) dialWithBalancer(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
_, host, _ := endpoint.ParseEndpoint(ep)
target := c.resolverGroup.Target(host)
creds := c.dialWithBalancerCreds(ep)
return c.dial(target, creds, dopts...)
creds := c.credentialsForEndpoint(ep)
opts := append(dopts, grpc.WithResolvers(c.resolver))
return c.dial(creds, opts...)
}
// dial configures and dials any grpc balancer target.
func (c *Client) dial(target string, creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
opts, err := c.dialSetupOpts(creds, dopts...)
if err != nil {
return nil, fmt.Errorf("failed to configure dialer: %v", err)
}
if c.Username != "" && c.Password != "" {
c.authTokenBundle = credentials.NewBundle(credentials.Config{})
opts = append(opts, grpc.WithPerRPCCredentials(c.authTokenBundle.PerRPCCredentials()))
@ -331,43 +278,21 @@ func (c *Client) dial(target string, creds grpccredentials.TransportCredentials,
defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
}
conn, err := grpc.DialContext(dctx, target, opts...)
conn, err := grpc.DialContext(dctx, c.resolver.Scheme()+":///", opts...)
if err != nil {
return nil, err
}
return conn, nil
}
func (c *Client) directDialCreds(ep string) (grpccredentials.TransportCredentials, error) {
_, host, scheme := endpoint.ParseEndpoint(ep)
creds := c.creds
if len(scheme) != 0 {
creds = c.processCreds(scheme)
if creds != nil {
clone := creds.Clone()
// Set the server name must to the endpoint hostname without port since grpc
// otherwise attempts to check if x509 cert is valid for the full endpoint
// including the scheme and port, which fails.
overrideServerName, _, err := net.SplitHostPort(host)
if err != nil {
// Either the host didn't have a port or the host could not be parsed. Either way, continue with the
// original host string.
overrideServerName = host
}
clone.OverrideServerName(overrideServerName)
creds = clone
}
func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials {
if c.creds != nil {
return c.creds
}
return creds, nil
}
func (c *Client) dialWithBalancerCreds(ep string) grpccredentials.TransportCredentials {
_, _, scheme := endpoint.ParseEndpoint(ep)
creds := c.creds
if len(scheme) != 0 {
creds = c.processCreds(scheme)
if endpoint.RequiresCredentials(ep) {
return credentials.NewBundle(credentials.Config{}).TransportCredentials()
}
return creds
return nil
}
func newClient(cfg *Config) (*Client, error) {
@ -429,14 +354,7 @@ func newClient(cfg *Config) (*Client, error) {
client.callOpts = callOpts
}
// Prepare a 'endpoint://<unique-client-id>/' resolver for the client and create a endpoint target to pass
// to dial so the client knows to use this resolver.
client.resolverGroup, err = endpoint.NewResolverGroup(fmt.Sprintf("client-%s", uuid.New().String()))
if err != nil {
client.cancel()
return nil, err
}
client.resolverGroup.SetEndpoints(cfg.Endpoints)
client.resolver = resolver.New(cfg.Endpoints...)
if len(cfg.Endpoints) < 1 {
client.cancel()
@ -446,10 +364,10 @@ func newClient(cfg *Config) (*Client, error) {
// Use a provided endpoint target so that for https:// without any tls config given, then
// grpc will assume the certificate server name is the endpoint host.
conn, err := client.dialWithBalancer(dialEndpoint, grpc.WithBalancerName(roundRobinBalancerName))
conn, err := client.dialWithBalancer(dialEndpoint)
if err != nil {
client.cancel()
client.resolverGroup.Close()
client.resolver.Close()
return nil, err
}
// TODO: With the old grpc balancer interface, we waited until the dial timeout

View File

@ -4,12 +4,10 @@ go 1.15
require (
github.com/dustin/go-humanize v1.0.0
github.com/google/uuid v1.1.2
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/prometheus/client_golang v1.5.1
go.etcd.io/etcd/api/v3 v3.5.0-pre
go.etcd.io/etcd/pkg/v3 v3.5.0-pre
go.uber.org/multierr v1.5.0
go.uber.org/zap v1.16.0
google.golang.org/grpc v1.29.1
sigs.k8s.io/yaml v1.2.0

View File

@ -54,8 +54,6 @@ github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.14.6/go.mod h1:zdiPV4Yse/1gnckTHtghG4GkDEdKCRJduHpTxT3/jcw=

View File

@ -0,0 +1,68 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package endpoint
import (
"net/url"
"regexp"
)
var (
STRIP_PORT_REGEXP = regexp.MustCompile("(.*):([0-9]+)")
)
func stripPort(ep string) string {
return STRIP_PORT_REGEXP.ReplaceAllString(ep, "$1")
}
func translateEndpoint(ep string) (addr string, serverName string, requireCreds bool) {
url, err := url.Parse(ep)
if err != nil {
return ep, stripPort(ep), false
}
switch url.Scheme {
case "http", "https":
return url.Host, url.Hostname(), url.Scheme == "https"
case "unix", "unixs":
requireCreds = url.Scheme == "unixs"
if url.Opaque != "" {
return "unix:" + url.Opaque, stripPort(url.Opaque), requireCreds
} else if url.Path != "" {
return "unix://" + url.Host + url.Path, url.Host + url.Path, requireCreds
} else {
return "unix:" + url.Host, url.Hostname(), requireCreds
}
case "":
return url.Host + url.Path, url.Host + url.Path, false
default:
return ep, stripPort(ep), false
}
}
// RequiresCredentials returns whether given endpoint requires
// credentials/certificates for connection.
func RequiresCredentials(ep string) bool {
_, _, requireCreds := translateEndpoint(ep)
return requireCreds
}
// Interpret endpoint parses an endpoint of the form
// (http|https)://<host>*|(unix|unixs)://<path>)
// and returns low-level address (supported by 'net') to connect to,
// and a server name used for x509 certificate matching.
func Interpret(ep string) (address string, serverName string) {
addr, serverName, _ := translateEndpoint(ep)
return addr, serverName
}

View File

@ -0,0 +1,65 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package endpoint
import (
"testing"
)
func TestInterpret(t *testing.T) {
tests := []struct {
endpoint string
wantAddress string
wantServerName string
}{
{"127.0.0.1", "127.0.0.1", "127.0.0.1"},
{"localhost", "localhost", "localhost"},
{"localhost:8080", "localhost:8080", "localhost"},
{"unix:127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
{"unix:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
{"unix://127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
{"unix://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
{"unixs:127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
{"unixs:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
{"unixs://127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
{"unixs://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
{"http://127.0.0.1", "127.0.0.1", "127.0.0.1"},
{"http://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1"},
{"https://127.0.0.1", "127.0.0.1", "127.0.0.1"},
{"https://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1"},
{"https://localhost:20000", "localhost:20000", "localhost"},
{"unix:///tmp/abc", "unix:///tmp/abc", "/tmp/abc"},
{"unixs:///tmp/abc", "unix:///tmp/abc", "/tmp/abc"},
{"etcd.io", "etcd.io", "etcd.io"},
{"http://etcd.io/abc", "etcd.io", "etcd.io"},
{"dns://something-other", "dns://something-other", "dns://something-other"},
}
for _, tt := range tests {
t.Run(tt.endpoint, func(t *testing.T) {
gotAddress, gotServerName := Interpret(tt.endpoint)
if gotAddress != tt.wantAddress {
t.Errorf("Interpret() gotAddress = %v, want %v", gotAddress, tt.wantAddress)
}
if gotServerName != tt.wantServerName {
t.Errorf("Interpret() gotServerName = %v, want %v", gotServerName, tt.wantServerName)
}
})
}
}

View File

@ -0,0 +1,70 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package resolver
import (
"go.etcd.io/etcd/client/v3/internal/endpoint"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
)
// EtcdManualResolver is a Resolver (and resolver.Builder) that can be updated
// using SetEndpoints.
type EtcdManualResolver struct {
*manual.Resolver
endpoints []string
serviceConfig *serviceconfig.ParseResult
}
func New(endpoints ...string) *EtcdManualResolver {
r := manual.NewBuilderWithScheme("etcd-endpoints")
return &EtcdManualResolver{Resolver: r, endpoints: endpoints, serviceConfig: nil}
}
// Build returns itself for Resolver, because it's both a builder and a resolver.
func (r *EtcdManualResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
r.serviceConfig = cc.ParseServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
if r.serviceConfig.Err != nil {
return nil, r.serviceConfig.Err
}
res, err := r.Resolver.Build(target, cc, opts)
if err != nil {
return nil, err
}
// Populates endpoints stored in r into ClientConn (cc).
r.updateState()
return res, nil
}
func (r *EtcdManualResolver) SetEndpoints(endpoints []string) {
r.endpoints = endpoints
r.updateState()
}
func (r EtcdManualResolver) updateState() {
if r.CC != nil {
addresses := make([]resolver.Address, len(r.endpoints))
for i, ep := range r.endpoints {
addr, serverName := endpoint.Interpret(ep)
addresses[i] = resolver.Address{Addr: addr, ServerName: serverName}
}
state := resolver.State{
Addresses: addresses,
ServiceConfig: r.serviceConfig,
}
r.UpdateState(state)
}
}

View File

@ -115,8 +115,6 @@ github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXi
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=

2
go.sum
View File

@ -116,8 +116,6 @@ github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXi
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=

View File

@ -112,8 +112,6 @@ github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXi
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=

View File

@ -116,8 +116,6 @@ github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXi
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=