clientv3: serialize updating notifych in balancer

FIXES #7283
release-3.2
fanmin shi 2017-03-08 13:39:11 -08:00
parent 7a25257fb2
commit 8baaa06cce
2 changed files with 163 additions and 6 deletions

View File

@ -47,6 +47,15 @@ type simpleBalancer struct {
// upc closes when upEps transitions from empty to non-zero or the balancer closes.
upc chan struct{}
// downc closes when grpc calls down() on pinAddr
downc chan struct{}
// stopc is closed to signal updateNotifyLoop should stop.
stopc chan struct{}
// donec closes when all goroutines are exited
donec chan struct{}
// grpc issues TLS cert checks using the string passed into dial so
// that string must be the host. To recover the full scheme://host URL,
// have a map from hosts to the original endpoint.
@ -71,8 +80,12 @@ func newSimpleBalancer(eps []string) *simpleBalancer {
notifyCh: notifyCh,
readyc: make(chan struct{}),
upc: make(chan struct{}),
stopc: make(chan struct{}),
downc: make(chan struct{}),
donec: make(chan struct{}),
host2ep: getHost2ep(eps),
}
go sb.updateNotifyLoop()
return sb
}
@ -131,6 +144,50 @@ func (b *simpleBalancer) updateAddrs(eps []string) {
}
}
func (b *simpleBalancer) updateNotifyLoop() {
defer close(b.donec)
for {
b.mu.RLock()
upc := b.upc
b.mu.RUnlock()
var downc chan struct{}
select {
case <-upc:
var addr string
b.mu.RLock()
addr = b.pinAddr
// Up() sets pinAddr and downc as a pair under b.mu
downc = b.downc
b.mu.RUnlock()
if addr == "" {
break
}
// close opened connections that are not pinAddr
// this ensures only one connection is open per client
select {
case b.notifyCh <- []grpc.Address{{Addr: addr}}:
case <-b.stopc:
return
}
}
select {
case <-downc:
b.mu.RLock()
addrs := b.addrs
b.mu.RUnlock()
select {
case b.notifyCh <- addrs:
case <-b.stopc:
return
}
case <-b.stopc:
return
}
}
}
func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
b.mu.Lock()
defer b.mu.Unlock()
@ -145,20 +202,18 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
if b.pinAddr == "" {
// notify waiting Get()s and pin first connected address
close(b.upc)
b.downc = make(chan struct{})
b.pinAddr = addr.Addr
// notify client that a connection is up
b.readyOnce.Do(func() { close(b.readyc) })
// close opened connections that are not pinAddr
// this ensures only one connection is open per client
b.notifyCh <- []grpc.Address{addr}
}
return func(err error) {
b.mu.Lock()
if b.pinAddr == addr.Addr {
b.upc = make(chan struct{})
close(b.downc)
b.pinAddr = ""
b.notifyCh <- b.addrs
}
b.mu.Unlock()
}
@ -214,14 +269,15 @@ func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh }
func (b *simpleBalancer) Close() error {
b.mu.Lock()
defer b.mu.Unlock()
// In case gRPC calls close twice. TODO: remove the checking
// when we are sure that gRPC wont call close twice.
if b.closed {
b.mu.Unlock()
<-b.donec
return nil
}
b.closed = true
close(b.notifyCh)
close(b.stopc)
b.pinAddr = ""
// In the case of following scenario:
@ -236,6 +292,13 @@ func (b *simpleBalancer) Close() error {
// terminate all waiting Get()s
close(b.upc)
}
b.mu.Unlock()
// wait for updateNotifyLoop to finish
<-b.donec
close(b.notifyCh)
return nil
}

View File

@ -16,9 +16,14 @@ package clientv3
import (
"errors"
"net"
"sync"
"testing"
"time"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/testutil"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
@ -124,3 +129,92 @@ func TestBalancerGetBlocking(t *testing.T) {
t.Errorf("Get() with no up endpoints should timeout, got %v", err)
}
}
// TestBalancerDoNotBlockOnClose ensures that balancer and grpc don't deadlock each other
// due to rapid open/close conn. The deadlock causes balancer.Close() to block forever.
// See issue: https://github.com/coreos/etcd/issues/7283 for more detail.
func TestBalancerDoNotBlockOnClose(t *testing.T) {
defer testutil.AfterTest(t)
kcl := newKillConnListener(t, 3)
defer kcl.close()
for i := 0; i < 5; i++ {
sb := newSimpleBalancer(kcl.endpoints())
conn, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithBalancer(sb))
if err != nil {
t.Fatal(err)
}
kvc := pb.NewKVClient(conn)
<-sb.readyc
for j := 0; j < 100; j++ {
go kvc.Range(context.TODO(), &pb.RangeRequest{}, grpc.FailFast(false))
}
// balancer.Close() might block
// if balancer and grpc deadlock each other.
closec := make(chan struct{})
go func() {
defer close(closec)
sb.Close()
}()
go conn.Close()
select {
case <-closec:
case <-time.After(3 * time.Second):
testutil.FatalStack(t, "balancer close timeout")
}
}
}
// killConnListener listens incoming conn and kills it immediately.
type killConnListener struct {
wg sync.WaitGroup
eps []string
stopc chan struct{}
t *testing.T
}
func newKillConnListener(t *testing.T, size int) *killConnListener {
kcl := &killConnListener{stopc: make(chan struct{}), t: t}
for i := 0; i < size; i++ {
ln, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
kcl.eps = append(kcl.eps, ln.Addr().String())
kcl.wg.Add(1)
go kcl.listen(ln)
}
return kcl
}
func (kcl *killConnListener) endpoints() []string {
return kcl.eps
}
func (kcl *killConnListener) listen(l net.Listener) {
go func() {
defer kcl.wg.Done()
for {
conn, err := l.Accept()
select {
case <-kcl.stopc:
return
default:
}
if err != nil {
kcl.t.Fatal(err)
}
time.Sleep(1 * time.Millisecond)
conn.Close()
}
}()
<-kcl.stopc
l.Close()
}
func (kcl *killConnListener) close() {
close(kcl.stopc)
kcl.wg.Wait()
}