From 153ba928300ddb44be6af5d7063e6da32c6ee70e Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 17 Aug 2017 12:12:44 -0700 Subject: [PATCH] embed: serve basic v3 grpc over peer port --- embed/etcd.go | 99 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 59 insertions(+), 40 deletions(-) diff --git a/embed/etcd.go b/embed/etcd.go index e69adbfd6..7388cfd13 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -16,6 +16,7 @@ package embed import ( "context" + "crypto/tls" "fmt" "io/ioutil" defaultLog "log" @@ -28,6 +29,7 @@ import ( "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/etcdhttp" "github.com/coreos/etcd/etcdserver/api/v2http" + "github.com/coreos/etcd/etcdserver/api/v3rpc" "github.com/coreos/etcd/pkg/cors" "github.com/coreos/etcd/pkg/debugutil" runtimeutil "github.com/coreos/etcd/pkg/runtime" @@ -35,6 +37,9 @@ import ( "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/rafthttp" "github.com/coreos/pkg/capnslog" + + "github.com/cockroachdb/cmux" + "google.golang.org/grpc" ) var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "embed") @@ -152,29 +157,39 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { return } - // configure peer handlers after rafthttp.Transport started - ph := etcdhttp.NewPeerHandler(e.Server) - for _, p := range e.Peers { - srv := &http.Server{ - Handler: ph, - ReadTimeout: 5 * time.Minute, - ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error - } - - l := p.Listener - p.serve = func() error { return srv.Serve(l) } - p.close = func(ctx context.Context) error { - // gracefully shutdown http.Server - // close open listeners, idle connections - // until context cancel or time-out - return srv.Shutdown(ctx) - } - } - // buffer channel so goroutines on closed connections won't wait forever e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs)) e.Server.Start() + + // configure peer handlers after rafthttp.Transport started + ph := etcdhttp.NewPeerHandler(e.Server) + var peerTLScfg *tls.Config + if !cfg.PeerTLSInfo.Empty() { + if peerTLScfg, err = cfg.PeerTLSInfo.ServerConfig(); err != nil { + return + } + } + for _, p := range e.Peers { + gs := v3rpc.Server(e.Server, peerTLScfg) + m := cmux.New(p.Listener) + go gs.Serve(m.Match(cmux.HTTP2())) + srv := &http.Server{ + Handler: grpcHandlerFunc(gs, ph), + ReadTimeout: 5 * time.Minute, + ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error + } + go srv.Serve(m.Match(cmux.Any())) + p.serve = func() error { return m.Serve() } + p.close = func(ctx context.Context) error { + // gracefully shutdown http.Server + // close open listeners, idle connections + // until context cancel or time-out + e.stopGRPCServer(gs) + return srv.Shutdown(ctx) + } + } + if err = e.serve(); err != nil { return } @@ -190,29 +205,9 @@ func (e *Etcd) Config() Config { func (e *Etcd) Close() { e.closeOnce.Do(func() { close(e.stopc) }) - timeout := 2 * time.Second - if e.Server != nil { - timeout = e.Server.Cfg.ReqTimeout() - } for _, sctx := range e.sctxs { for gs := range sctx.grpcServerC { - ch := make(chan struct{}) - go func() { - defer close(ch) - // close listeners to stop accepting new connections, - // will block on any existing transports - gs.GracefulStop() - }() - // wait until all pending RPCs are finished - select { - case <-ch: - case <-time.After(timeout): - // took too long, manually close open transports - // e.g. watch streams - gs.Stop() - // concurrent GracefulStop should be interrupted - <-ch - } + e.stopGRPCServer(gs) } } @@ -243,6 +238,30 @@ func (e *Etcd) Close() { } } +func (e *Etcd) stopGRPCServer(gs *grpc.Server) { + timeout := 2 * time.Second + if e.Server != nil { + timeout = e.Server.Cfg.ReqTimeout() + } + ch := make(chan struct{}) + go func() { + defer close(ch) + // close listeners to stop accepting new connections, + // will block on any existing transports + gs.GracefulStop() + }() + // wait until all pending RPCs are finished + select { + case <-ch: + case <-time.After(timeout): + // took too long, manually close open transports + // e.g. watch streams + gs.Stop() + // concurrent GracefulStop should be interrupted + <-ch + } +} + func (e *Etcd) Err() <-chan error { return e.errc } func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {