From 811fbc5672e686611f825ce0f466fb226f4c76bd Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 8 Jan 2016 10:10:05 -0800 Subject: [PATCH] etcdmain: support keep alive listeners on limit listener connections Fixes #4171 --- etcdmain/etcd.go | 3 +- pkg/transport/keepalive_listener.go | 19 +++++--- pkg/transport/limit_listen.go | 70 +++++++++++++++++++++++++++++ 3 files changed, 83 insertions(+), 9 deletions(-) create mode 100644 pkg/transport/limit_listen.go diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index b66ad26f0..cc61af556 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -31,7 +31,6 @@ import ( systemdutil "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-systemd/util" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus" - "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/netutil" "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" "github.com/coreos/etcd/discovery" "github.com/coreos/etcd/etcdserver" @@ -245,7 +244,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { if fdLimit <= reservedInternalFDNum { plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum) } - l = netutil.LimitListener(l, int(fdLimit-reservedInternalFDNum)) + l = transport.LimitListener(l, int(fdLimit-reservedInternalFDNum)) } // Do not wrap around this listener if TLS Info is set. diff --git a/pkg/transport/keepalive_listener.go b/pkg/transport/keepalive_listener.go index 5efcd55b8..1fe1ba80d 100644 --- a/pkg/transport/keepalive_listener.go +++ b/pkg/transport/keepalive_listener.go @@ -21,6 +21,11 @@ import ( "time" ) +type keepAliveConn interface { + SetKeepAlive(bool) error + SetKeepAlivePeriod(d time.Duration) error +} + // NewKeepAliveListener returns a listener that listens on the given address. // Be careful when wrap around KeepAliveListener with another Listener if TLSInfo is not nil. // Some pkgs (like go/http) might expect Listener to return TLSConn type to start TLS handshake. @@ -50,13 +55,13 @@ func (kln *keepaliveListener) Accept() (net.Conn, error) { if err != nil { return nil, err } - tcpc := c.(*net.TCPConn) + kac := c.(keepAliveConn) // detection time: tcp_keepalive_time + tcp_keepalive_probes + tcp_keepalive_intvl // default on linux: 30 + 8 * 30 // default on osx: 30 + 8 * 75 - tcpc.SetKeepAlive(true) - tcpc.SetKeepAlivePeriod(30 * time.Second) - return tcpc, nil + kac.SetKeepAlive(true) + kac.SetKeepAlivePeriod(30 * time.Second) + return c, nil } // A tlsKeepaliveListener implements a network listener (net.Listener) for TLS connections. @@ -72,12 +77,12 @@ func (l *tlsKeepaliveListener) Accept() (c net.Conn, err error) { if err != nil { return } - tcpc := c.(*net.TCPConn) + kac := c.(keepAliveConn) // detection time: tcp_keepalive_time + tcp_keepalive_probes + tcp_keepalive_intvl // default on linux: 30 + 8 * 30 // default on osx: 30 + 8 * 75 - tcpc.SetKeepAlive(true) - tcpc.SetKeepAlivePeriod(30 * time.Second) + kac.SetKeepAlive(true) + kac.SetKeepAlivePeriod(30 * time.Second) c = tls.Server(c, l.config) return } diff --git a/pkg/transport/limit_listen.go b/pkg/transport/limit_listen.go new file mode 100644 index 000000000..8a81a6b93 --- /dev/null +++ b/pkg/transport/limit_listen.go @@ -0,0 +1,70 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package netutil provides network utility functions, complementing the more +// common ones in the net package. +package transport + +import ( + "errors" + "net" + "sync" + "time" +) + +var ( + ErrNotTCP = errors.New("only tcp connections have keepalive") +) + +// LimitListener returns a Listener that accepts at most n simultaneous +// connections from the provided Listener. +func LimitListener(l net.Listener, n int) net.Listener { + return &limitListener{l, make(chan struct{}, n)} +} + +type limitListener struct { + net.Listener + sem chan struct{} +} + +func (l *limitListener) acquire() { l.sem <- struct{}{} } +func (l *limitListener) release() { <-l.sem } + +func (l *limitListener) Accept() (net.Conn, error) { + l.acquire() + c, err := l.Listener.Accept() + if err != nil { + l.release() + return nil, err + } + return &limitListenerConn{Conn: c, release: l.release}, nil +} + +type limitListenerConn struct { + net.Conn + releaseOnce sync.Once + release func() +} + +func (l *limitListenerConn) Close() error { + err := l.Conn.Close() + l.releaseOnce.Do(l.release) + return err +} + +func (l *limitListenerConn) SetKeepAlive(doKeepAlive bool) error { + tcpc, ok := l.Conn.(*net.TCPConn) + if !ok { + return ErrNotTCP + } + return tcpc.SetKeepAlive(doKeepAlive) +} + +func (l *limitListenerConn) SetKeepAlivePeriod(d time.Duration) error { + tcpc, ok := l.Conn.(*net.TCPConn) + if !ok { + return ErrNotTCP + } + return tcpc.SetKeepAlivePeriod(d) +}