Prepare `balancer` interfaces for `>=google.golang.org/grpc@1.30.0` upgrade.

release-3.5
Danny Hermes 2021-02-02 15:39:03 -06:00
parent 50ca440c49
commit ea34f8dbc6
4 changed files with 68 additions and 2 deletions

View File

@ -23,6 +23,7 @@ import (
"go.etcd.io/etcd/client/v3/balancer/connectivity"
"go.etcd.io/etcd/client/v3/balancer/picker"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc/balancer"
grpcconnectivity "google.golang.org/grpc/connectivity"
@ -31,6 +32,12 @@ import (
_ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
)
// NOTE: Ensure
// - `baseBalancer` satisfies `balancer.V2Balancer`.
var (
_ balancer.V2Balancer = (*baseBalancer)(nil)
)
// Config defines balancer configurations.
type Config struct {
// Policy configures balancer policy.
@ -138,12 +145,29 @@ type baseBalancer struct {
picker picker.Picker
}
// UpdateClientConnState implements "grpc/balancer.V2Balancer" interface.
func (bb *baseBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
return bb.handleResolvedWithError(ccs.ResolverState.Addresses, nil)
}
// ResolverError implements "grpc/balancer.V2Balancer" interface.
func (bb *baseBalancer) ResolverError(err error) {
bb.HandleResolvedAddrs(nil, err)
}
// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
// gRPC sends initial or updated resolved addresses from "Build".
func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
_ = bb.handleResolvedWithError(addrs, err)
}
// handleResolvedWithError is an implementation shared both by `HandleResolvedAddrs()`,
// which is part of the `Balancer` interface as well as `UpdateClientConnState()`,
// which is part of the `V2Balancer` interface.
func (bb *baseBalancer) handleResolvedWithError(addrs []resolver.Address, err error) error {
if err != nil {
bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
return
return err
}
bb.lg.Info("resolved",
zap.String("picker", bb.picker.String()),
@ -155,12 +179,14 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
defer bb.mu.Unlock()
resolved := make(map[resolver.Address]struct{})
warnedErrors := []error{}
for _, addr := range addrs {
resolved[addr] = struct{}{}
if _, ok := bb.addrToSc[addr]; !ok {
sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
if err != nil {
bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
warnedErrors = append(warnedErrors, err)
continue
}
bb.lg.Info("created subconn", zap.String("address", addr.Addr))
@ -191,6 +217,15 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
// (DO NOT) delete(bb.scToSt, sc)
}
}
// TODO: Consider just returning `ErrBadResolverState` if `warnedErrors` is
// not empty.
return multierr.Combine(warnedErrors...)
}
// UpdateSubConnState implements "grpc/balancer.V2Balancer" interface.
func (bb *baseBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
bb.HandleSubConnStateChange(sc, s.ConnectivityState)
}
// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.

View File

@ -20,6 +20,12 @@ import (
"google.golang.org/grpc/balancer"
)
// NOTE: Ensure
// - `errPickerV2` satisfies `balancer.V2Picker`.
var (
_ balancer.V2Picker = (*errPickerV2)(nil)
)
// NewErr returns a picker that always returns err on "Pick".
func NewErr(err error) Picker {
return &errPicker{p: Error, err: err}
@ -37,3 +43,11 @@ func (ep *errPicker) String() string {
func (ep *errPicker) Pick(context.Context, balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
return nil, nil, ep.err
}
type errPickerV2 struct {
errPicker
}
func (ep2 *errPickerV2) Pick(opts balancer.PickInfo) (balancer.PickResult, error) {
return balancer.PickResult{}, ep2.errPicker.err
}

View File

@ -24,6 +24,12 @@ import (
"google.golang.org/grpc/resolver"
)
// NOTE: Ensure
// - `rrBalancedV2` satisfies `balancer.V2Picker`.
var (
_ balancer.V2Picker = (*rrBalancedV2)(nil)
)
// newRoundrobinBalanced returns a new roundrobin balanced picker.
func newRoundrobinBalanced(cfg Config) Picker {
scs := make([]balancer.SubConn, 0, len(cfg.SubConnToResolverAddress))
@ -52,7 +58,7 @@ type rrBalanced struct {
func (rb *rrBalanced) String() string { return rb.p.String() }
// Pick is called for every client request.
func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
func (rb *rrBalanced) Pick(_ context.Context, opts balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
rb.mu.RLock()
n := len(rb.scs)
rb.mu.RUnlock()
@ -93,3 +99,13 @@ func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickInfo) (balance
}
return sc, doneFunc, nil
}
type rrBalancedV2 struct {
rrBalanced
}
func (rb2 *rrBalancedV2) Pick(opts balancer.PickInfo) (balancer.PickResult, error) {
sc, doneFunc, err := rb2.rrBalanced.Pick(context.TODO(), opts)
pr := balancer.PickResult{SubConn: sc, Done: doneFunc}
return pr, err
}

View File

@ -9,6 +9,7 @@ require (
github.com/prometheus/client_golang v1.5.1
go.etcd.io/etcd/api/v3 v3.5.0-pre
go.etcd.io/etcd/pkg/v3 v3.5.0-pre
go.uber.org/multierr v1.5.0
go.uber.org/zap v1.16.0
google.golang.org/grpc v1.29.1
sigs.k8s.io/yaml v1.2.0