clientv3: Fix dialer for new balancer to correctly handle first are as endpoint, not hostname

release-3.4
Joe Betz 2018-04-10 16:58:52 -07:00 committed by Gyuho Lee
parent 309208dbef
commit 7ac2a2dd20
4 changed files with 27 additions and 41 deletions

View File

@ -16,7 +16,9 @@ package balancer
import (
"fmt"
"strconv"
"sync"
"time"
"github.com/coreos/etcd/clientv3/balancer/picker"
@ -50,6 +52,7 @@ type builder struct {
// Then, resolved addreses will be handled via "HandleResolvedAddrs".
func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
bb := &baseBalancer{
id: strconv.FormatInt(time.Now().UnixNano(), 36),
policy: b.cfg.Policy,
name: b.cfg.Policy.String(),
lg: b.cfg.Logger,
@ -78,6 +81,7 @@ func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balan
bb.lg.Info(
"built balancer",
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
zap.String("resolver-target", cc.Target()),
)
@ -102,6 +106,7 @@ type Balancer interface {
}
type baseBalancer struct {
id string
policy picker.Policy
name string
lg *zap.Logger
@ -123,10 +128,10 @@ type baseBalancer struct {
// gRPC sends initial or updated resolved addresses from "Build".
func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
if err != nil {
bb.lg.Warn("HandleResolvedAddrs called with error", zap.Error(err))
bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
return
}
bb.lg.Info("resolved", zap.Strings("addresses", addrsToStrings(addrs)))
bb.lg.Info("resolved", zap.String("balancer-id", bb.id), zap.Strings("addresses", addrsToStrings(addrs)))
bb.mu.Lock()
defer bb.mu.Unlock()
@ -137,7 +142,7 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
if _, ok := bb.addrToSc[addr]; !ok {
sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
if err != nil {
bb.lg.Warn("NewSubConn failed", zap.Error(err), zap.String("address", addr.Addr))
bb.lg.Warn("NewSubConn failed", zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
continue
}
bb.addrToSc[addr] = sc
@ -155,6 +160,7 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
bb.lg.Info(
"removed subconn",
zap.String("balancer-id", bb.id),
zap.String("address", addr.Addr),
zap.String("subconn", scToString(sc)),
)
@ -176,6 +182,7 @@ func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connecti
if !ok {
bb.lg.Warn(
"state change for an unknown subconn",
zap.String("balancer-id", bb.id),
zap.String("subconn", scToString(sc)),
zap.String("state", s.String()),
)
@ -184,6 +191,7 @@ func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connecti
bb.lg.Info(
"state changed",
zap.String("balancer-id", bb.id),
zap.Bool("connected", s == connectivity.Ready),
zap.String("subconn", scToString(sc)),
zap.String("address", bb.scToAddr[sc].Addr),
@ -221,6 +229,11 @@ func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connecti
func (bb *baseBalancer) regeneratePicker() {
if bb.currentState == connectivity.TransientFailure {
bb.lg.Info(
"generated transient error picker",
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
)
bb.Picker = picker.NewErr(balancer.ErrTransientFailure)
return
}
@ -247,6 +260,7 @@ func (bb *baseBalancer) regeneratePicker() {
bb.lg.Info(
"generated picker",
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
zap.Strings("subconn-ready", scsToStrings(addrToSc)),
zap.Int("subconn-size", len(addrToSc)),

View File

@ -100,7 +100,6 @@ type Resolver struct {
clusterName string
cc resolver.ClientConn
addrs []resolver.Address
hostToAddr map[string]resolver.Address
sync.RWMutex
}
@ -108,7 +107,6 @@ type Resolver struct {
func (r *Resolver) InitialAddrs(addrs []resolver.Address) {
r.Lock()
r.addrs = addrs
r.hostToAddr = keyAddrsByHost(addrs)
r.Unlock()
}
@ -133,37 +131,13 @@ func epsToAddrs(eps ...string) (addrs []resolver.Address) {
}
// NewAddress updates the addresses of the resolver.
func (r *Resolver) NewAddress(addrs []resolver.Address) error {
if r.cc == nil {
return fmt.Errorf("resolver not yet built, use InitialAddrs to provide initialization endpoints")
}
func (r *Resolver) NewAddress(addrs []resolver.Address) {
r.Lock()
r.addrs = addrs
r.hostToAddr = keyAddrsByHost(addrs)
r.Unlock()
r.cc.NewAddress(addrs)
return nil
}
func keyAddrsByHost(addrs []resolver.Address) map[string]resolver.Address {
// TODO: etcd may be is running on multiple ports on the same host, what to do? Keep a list of addresses?
byHost := make(map[string]resolver.Address, len(addrs))
for _, addr := range addrs {
_, host, _ := ParseEndpoint(addr.Addr)
byHost[host] = addr
if r.cc != nil {
r.cc.NewAddress(addrs)
}
return byHost
}
// Endpoint get the resolver address for the host, if any.
func (r *Resolver) Endpoint(host string) string {
var addr string
r.RLock()
if a, ok := r.hostToAddr[host]; ok {
addr = a.Addr
}
r.RUnlock()
return addr
}
func (*Resolver) ResolveNow(o resolver.ResolveNowOption) {}

View File

@ -143,17 +143,15 @@ func (c *Client) Endpoints() (eps []string) {
// SetEndpoints updates client's endpoints.
func (c *Client) SetEndpoints(eps ...string) {
c.mu.Lock()
c.cfg.Endpoints = eps
c.mu.Unlock()
var addrs []resolver.Address
for _, ep := range eps {
addrs = append(addrs, resolver.Address{Addr: ep})
}
c.mu.Lock()
defer c.mu.Unlock()
c.cfg.Endpoints = eps
c.resolver.NewAddress(addrs)
// TODO: Does the new grpc balancer provide a way to block until the endpoint changes are propagated?
/*if c.balancer.NeedUpdate() {
select {
@ -252,9 +250,8 @@ func (c *Client) dialSetupOpts(target string, dopts ...grpc.DialOption) (opts []
}
opts = append(opts, dopts...)
f := func(host string, t time.Duration) (net.Conn, error) {
// TODO: eliminate this ParseEndpoint call, the endpoint is already parsed by the resolver.
proto, host, _ := endpoint.ParseEndpoint(c.resolver.Endpoint(host))
f := func(dialEp string, t time.Duration) (net.Conn, error) {
proto, host, _ := endpoint.ParseEndpoint(dialEp)
if host == "" && ep != "" {
// dialing an endpoint not in the balancer; use
// endpoint passed into dial

View File

@ -82,6 +82,7 @@ func TestDetectKvOrderViolation(t *testing.T) {
clus.Members[2].Restart(t)
// force OrderingKv to query the third member
cli.SetEndpoints(clus.Members[2].GRPCAddr())
time.Sleep(2 * time.Second) // FIXME: Figure out how pause SetEndpoints sufficiently that this is not needed
_, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable())
if err != errOrderViolation {
@ -147,7 +148,7 @@ func TestDetectTxnOrderViolation(t *testing.T) {
clus.Members[2].Restart(t)
// force OrderingKv to query the third member
cli.SetEndpoints(clus.Members[2].GRPCAddr())
time.Sleep(2 * time.Second) // FIXME: Figure out how pause SetEndpoints sufficiently that this is not needed
_, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable())
if err != errOrderViolation {
t.Fatalf("expected %v, got %v", errOrderViolation, err)