clientv3: Split out grpc balancer builder to ensure there is a balancer per ClientConn

release-3.4
Joe Betz 2018-04-10 10:44:50 -07:00 committed by Gyuho Lee
parent 12acfc057a
commit 309208dbef
3 changed files with 84 additions and 77 deletions

View File

@ -28,11 +28,67 @@ import (
_ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
)
// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
// must be invoked at initialization time.
func RegisterBuilder(cfg Config) {
bb := &builder{cfg}
balancer.Register(bb)
bb.cfg.Logger.Info(
"registered balancer",
zap.String("policy", bb.cfg.Policy.String()),
zap.String("name", bb.cfg.Name),
)
}
type builder struct {
cfg Config
}
// Build is called initially when creating "ccBalancerWrapper".
// "grpc.Dial" is called to this client connection.
// Then, resolved addreses will be handled via "HandleResolvedAddrs".
func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
bb := &baseBalancer{
policy: b.cfg.Policy,
name: b.cfg.Policy.String(),
lg: b.cfg.Logger,
addrToSc: make(map[resolver.Address]balancer.SubConn),
scToAddr: make(map[balancer.SubConn]resolver.Address),
scToSt: make(map[balancer.SubConn]connectivity.State),
currentConn: nil,
csEvltr: &connectivityStateEvaluator{},
// initialize picker always returns "ErrNoSubConnAvailable"
Picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
}
if b.cfg.Name != "" {
bb.name = b.cfg.Name
}
if bb.lg == nil {
bb.lg = zap.NewNop()
}
// TODO: support multiple connections
bb.mu.Lock()
bb.currentConn = cc
bb.mu.Unlock()
bb.lg.Info(
"built balancer",
zap.String("policy", bb.policy.String()),
zap.String("resolver-target", cc.Target()),
)
return bb
}
// Name implements "grpc/balancer.Builder" interface.
func (b *builder) Name() string { return b.cfg.Name }
// Balancer defines client balancer interface.
type Balancer interface {
// Builder is called at the beginning to initialize sub-connection states and picker.
balancer.Builder
// Balancer is called on specified client connection. Client initiates gRPC
// connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved
// addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs".
@ -63,60 +119,6 @@ type baseBalancer struct {
picker.Picker
}
// New returns a new balancer from specified picker policy.
func New(cfg Config) Balancer {
bb := &baseBalancer{
policy: cfg.Policy,
name: cfg.Policy.String(),
lg: cfg.Logger,
addrToSc: make(map[resolver.Address]balancer.SubConn),
scToAddr: make(map[balancer.SubConn]resolver.Address),
scToSt: make(map[balancer.SubConn]connectivity.State),
currentConn: nil,
csEvltr: &connectivityStateEvaluator{},
// initialize picker always returns "ErrNoSubConnAvailable"
Picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
}
if cfg.Name != "" {
bb.name = cfg.Name
}
if bb.lg == nil {
bb.lg = zap.NewNop()
}
balancer.Register(bb)
bb.lg.Info(
"registered balancer",
zap.String("policy", bb.policy.String()),
zap.String("name", bb.name),
)
return bb
}
// Name implements "grpc/balancer.Builder" interface.
func (bb *baseBalancer) Name() string { return bb.name }
// Build implements "grpc/balancer.Builder" interface.
// Build is called initially when creating "ccBalancerWrapper".
// "grpc.Dial" is called to this client connection.
// Then, resolved addreses will be handled via "HandleResolvedAddrs".
func (bb *baseBalancer) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
// TODO: support multiple connections
bb.mu.Lock()
bb.currentConn = cc
bb.mu.Unlock()
bb.lg.Info(
"built balancer",
zap.String("policy", bb.policy.String()),
zap.String("resolver-target", cc.Target()),
)
return bb
}
// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
// gRPC sends initial or updated resolved addresses from "Build".
func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {

View File

@ -112,8 +112,15 @@ func (r *Resolver) InitialAddrs(addrs []resolver.Address) {
r.Unlock()
}
func (r *Resolver) InitialEndpoints(eps []string) {
// InitialEndpoints sets the initial endpoints to for the resolver and returns a grpc dial target.
// This should be called before dialing. The endpoints may be updated after the dial using NewAddress.
// At least one endpoint is required.
func (r *Resolver) InitialEndpoints(eps []string) (string, error) {
if len(eps) < 1 {
return "", fmt.Errorf("At least one endpoint is required, but got: %v", eps)
}
r.InitialAddrs(epsToAddrs(eps...))
return r.Target(eps[0]), nil
}
// TODO: use balancer.epsToAddrs

View File

@ -45,13 +45,13 @@ var (
ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
ErrOldCluster = errors.New("etcdclient: old cluster version")
defaultBalancer balancer.Balancer
roundRobinBalancerName = fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String())
)
func init() {
defaultBalancer = balancer.New(balancer.Config{
balancer.RegisterBuilder(balancer.Config{
Policy: picker.RoundrobinBalanced,
Name: fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String()),
Name: roundRobinBalancerName,
Logger: zap.NewNop(), // zap.NewExample(),
})
}
@ -433,24 +433,22 @@ func newClient(cfg *Config) (*Client, error) {
client.callOpts = callOpts
}
clientId := fmt.Sprintf("client-%s", strconv.FormatInt(time.Now().UnixNano(), 36))
rsv := endpoint.EndpointResolver(clientId)
rsv.InitialEndpoints(cfg.Endpoints)
targets := []string{}
for _, ep := range cfg.Endpoints {
targets = append(targets, fmt.Sprintf("endpoint://%s/%s", clientId, ep))
}
client.resolver = rsv
client.balancer = defaultBalancer // TODO: allow alternate balancers to be passed in via config?
// use Endpoints[0] so that for https:// without any tls config given, then
// grpc will assume the certificate server name is the endpoint host.
conn, err := client.dial(targets[0], grpc.WithBalancerName(client.balancer.Name()))
// Prepare a 'endpoint://<unique-client-id>/' resolver for the client and create a endpoint target to pass
// to dial so the client knows to use this resolver.
client.resolver = endpoint.EndpointResolver(fmt.Sprintf("client-%s", strconv.FormatInt(time.Now().UnixNano(), 36)))
target, err := client.resolver.InitialEndpoints(cfg.Endpoints)
if err != nil {
client.cancel()
rsv.Close()
client.resolver.Close()
return nil, err
}
// Use an provided endpoint target so that for https:// without any tls config given, then
// grpc will assume the certificate server name is the endpoint host.
conn, err := client.dial(target, grpc.WithBalancerName(roundRobinBalancerName))
if err != nil {
client.cancel()
client.resolver.Close()
return nil, err
}
// TODO: With the old grpc balancer interface, we waited until the dial timeout