diff --git a/embed/etcd.go b/embed/etcd.go index da1d8f610..4d23a0400 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -148,6 +148,14 @@ func (e *Etcd) Config() Config { func (e *Etcd) Close() { e.closeOnce.Do(func() { close(e.stopc) }) + // (gRPC server) stops accepting new connections, + // RPCs, and blocks until all pending RPCs are finished + for _, sctx := range e.sctxs { + for gs := range sctx.grpcServerC { + gs.GracefulStop() + } + } + for _, sctx := range e.sctxs { sctx.cancel() } diff --git a/embed/serve.go b/embed/serve.go index 46634b7c5..02c093ff8 100644 --- a/embed/serve.go +++ b/embed/serve.go @@ -52,11 +52,14 @@ type serveCtx struct { userHandlers map[string]http.Handler serviceRegister func(*grpc.Server) + grpcServerC chan *grpc.Server } func newServeCtx() *serveCtx { ctx, cancel := context.WithCancel(context.Background()) - return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler)} + return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler), + grpcServerC: make(chan *grpc.Server, 2), // in case sctx.insecure,sctx.secure true + } } // serve accepts incoming connections on the listener l, @@ -72,8 +75,11 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle servElection := v3election.NewElectionServer(v3c) servLock := v3lock.NewLockServer(v3c) + defer close(sctx.grpcServerC) + if sctx.insecure { gs := v3rpc.Server(s, nil) + sctx.grpcServerC <- gs v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) if sctx.serviceRegister != nil { @@ -103,6 +109,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle if sctx.secure { gs := v3rpc.Server(s, tlscfg) + sctx.grpcServerC <- gs v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) if sctx.serviceRegister != nil {