From f292a4c9532e8084e3ac197f9513f6c77d4b1b21 Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Fri, 17 Mar 2017 10:15:34 -0700 Subject: [PATCH] embed: don't return error when closing on embed etcd FIXES #7019 --- embed/etcd.go | 24 +++++++++++++++++++++--- embed/serve.go | 8 ++++---- integration/embed_test.go | 5 +++++ 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/embed/etcd.go b/embed/etcd.go index 81ad5f5a3..43be1a3bd 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -20,6 +20,7 @@ import ( "net" "net/http" "path/filepath" + "sync" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/v2http" @@ -54,8 +55,11 @@ type Etcd struct { Server *etcdserver.EtcdServer cfg Config + stopc chan struct{} errc chan error sctxs map[string]*serveCtx + + closeOnce sync.Once } // StartEtcd launches the etcd server and HTTP handlers for client/server communication. @@ -65,7 +69,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { if err = inCfg.Validate(); err != nil { return nil, err } - e = &Etcd{cfg: *inCfg} + e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})} cfg := &e.cfg defer func() { if e != nil && err != nil { @@ -141,6 +145,8 @@ func (e *Etcd) Config() Config { } func (e *Etcd) Close() { + e.closeOnce.Do(func() { close(e.stopc) }) + for _, sctx := range e.sctxs { sctx.cancel() } @@ -319,7 +325,7 @@ func (e *Etcd) serve() (err error) { ph := v2http.NewPeerHandler(e.Server) for _, l := range e.Peers { go func(l net.Listener) { - e.errc <- servePeerHTTP(l, ph) + e.errHandler(servePeerHTTP(l, ph)) }(l) } @@ -335,8 +341,20 @@ func (e *Etcd) serve() (err error) { // read timeout does not work with http close notify // TODO: https://github.com/golang/go/issues/9524 go func(s *serveCtx) { - e.errc <- s.serve(e.Server, ctlscfg, v2h, e.errc) + e.errHandler(s.serve(e.Server, ctlscfg, v2h, e.errHandler)) }(sctx) } return nil } + +func (e *Etcd) errHandler(err error) { + select { + case <-e.stopc: + return + default: + } + select { + case <-e.stopc: + case e.errc <- err: + } +} diff --git a/embed/serve.go b/embed/serve.go index e173f4893..fb05df730 100644 --- a/embed/serve.go +++ b/embed/serve.go @@ -62,7 +62,7 @@ func newServeCtx() *serveCtx { // serve accepts incoming connections on the listener l, // creating a new service goroutine for each. The service goroutines // read requests and then call handler to reply to them. -func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handler http.Handler, errc chan<- error) error { +func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handler http.Handler, errHandler func(error)) error { logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0) <-s.ReadyNotify() plog.Info("ready to serve client requests") @@ -76,7 +76,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle sctx.serviceRegister(gs) } grpcl := m.Match(cmux.HTTP2()) - go func() { errc <- gs.Serve(grpcl) }() + go func() { errHandler(gs.Serve(grpcl)) }() opts := []grpc.DialOption{ grpc.WithInsecure(), @@ -93,7 +93,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle ErrorLog: logger, // do not log user error } httpl := m.Match(cmux.HTTP1()) - go func() { errc <- srvhttp.Serve(httpl) }() + go func() { errHandler(srvhttp.Serve(httpl)) }() plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.l.Addr().String()) } @@ -124,7 +124,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle TLSConfig: tlscfg, ErrorLog: logger, // do not log user error } - go func() { errc <- srv.Serve(tlsl) }() + go func() { errHandler(srv.Serve(tlsl)) }() plog.Infof("serving client requests on %s", sctx.l.Addr().String()) } diff --git a/integration/embed_test.go b/integration/embed_test.go index f37bfc626..8cba0b396 100644 --- a/integration/embed_test.go +++ b/integration/embed_test.go @@ -94,6 +94,11 @@ func TestEmbedEtcd(t *testing.T) { t.Errorf("%d: expected %d clients, got %d", i, tt.wclients, len(e.Clients)) } e.Close() + select { + case err := <-e.Err(): + t.Errorf("#%d: unexpected error on close (%v)", i, err) + default: + } } }