2016-07-10 21:06:08 +03:00
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package embed
import (
2017-05-05 02:56:08 +03:00
"context"
2017-08-17 22:12:44 +03:00
"crypto/tls"
2016-07-10 21:06:08 +03:00
"fmt"
2017-05-05 02:56:08 +03:00
"io/ioutil"
defaultLog "log"
2016-07-10 21:06:08 +03:00
"net"
"net/http"
2017-07-12 20:05:16 +03:00
"net/url"
2018-03-27 03:12:43 +03:00
"sort"
2017-09-26 20:06:45 +03:00
"strconv"
2017-03-17 20:15:34 +03:00
"sync"
2017-05-05 02:56:08 +03:00
"time"
2016-07-10 21:06:08 +03:00
"github.com/coreos/etcd/etcdserver"
2017-05-10 05:39:29 +03:00
"github.com/coreos/etcd/etcdserver/api/etcdhttp"
2018-05-21 20:14:47 +03:00
"github.com/coreos/etcd/etcdserver/api/rafthttp"
2016-07-10 21:06:08 +03:00
"github.com/coreos/etcd/etcdserver/api/v2http"
2017-08-11 08:26:46 +03:00
"github.com/coreos/etcd/etcdserver/api/v2v3"
"github.com/coreos/etcd/etcdserver/api/v3client"
2017-08-17 22:12:44 +03:00
"github.com/coreos/etcd/etcdserver/api/v3rpc"
2017-04-04 01:15:47 +03:00
"github.com/coreos/etcd/pkg/debugutil"
2016-07-10 21:06:08 +03:00
runtimeutil "github.com/coreos/etcd/pkg/runtime"
"github.com/coreos/etcd/pkg/transport"
2016-07-14 02:02:49 +03:00
"github.com/coreos/etcd/pkg/types"
2017-08-17 22:12:44 +03:00
2017-11-20 20:35:39 +03:00
"github.com/coreos/pkg/capnslog"
2018-01-06 17:56:44 +03:00
"github.com/grpc-ecosystem/go-grpc-prometheus"
2017-08-20 04:22:58 +03:00
"github.com/soheilhy/cmux"
2018-04-15 08:52:39 +03:00
"go.uber.org/zap"
2017-08-17 22:12:44 +03:00
"google.golang.org/grpc"
2017-08-25 23:26:28 +03:00
"google.golang.org/grpc/keepalive"
2016-07-10 21:06:08 +03:00
)
var plog = capnslog . NewPackageLogger ( "github.com/coreos/etcd" , "embed" )
const (
// internal fd usage includes disk usage and transport usage.
// To read/write snapshot, snap pkg needs 1. In normal case, wal pkg needs
// at most 2 to read/lock/write WALs. One case that it needs to 2 is to
// read all logs after some snapshot index, which locates at the end of
// the second last and the head of the last. For purging, it needs to read
// directory, so it needs 1. For fd monitor, it needs 1.
// For transport, rafthttp builds two long-polling connections and at most
// four temporary connections with each member. There are at most 9 members
// in a cluster, so it should reserve 96.
// For the safety, we set the total reserved number to 150.
reservedInternalFDNum = 150
)
// Etcd contains a running etcd server and its listeners.
type Etcd struct {
2017-11-20 20:35:39 +03:00
Peers [ ] * peerListener
Clients [ ] net . Listener
// a map of contexts for the servers that serves client requests.
sctxs map [ string ] * serveCtx
2017-07-12 20:05:16 +03:00
metricsListeners [ ] net . Listener
2017-11-20 20:35:39 +03:00
Server * etcdserver . EtcdServer
2016-07-10 21:06:08 +03:00
cfg Config
2017-03-17 20:15:34 +03:00
stopc chan struct { }
2016-07-10 21:06:08 +03:00
errc chan error
2017-03-17 20:15:34 +03:00
closeOnce sync . Once
2016-07-10 21:06:08 +03:00
}
2017-05-05 02:56:08 +03:00
type peerListener struct {
net . Listener
serve func ( ) error
close func ( context . Context ) error
}
2016-07-10 21:06:08 +03:00
// StartEtcd launches the etcd server and HTTP handlers for client/server communication.
2016-08-29 09:04:06 +03:00
// The returned Etcd.Server is not guaranteed to have joined the cluster. Wait
// on the Etcd.Server.ReadyNotify() channel to know when it completes and is ready for use.
2016-07-10 21:06:08 +03:00
func StartEtcd ( inCfg * Config ) ( e * Etcd , err error ) {
if err = inCfg . Validate ( ) ; err != nil {
return nil , err
}
2017-05-02 00:12:40 +03:00
serving := false
2017-03-17 20:15:34 +03:00
e = & Etcd { cfg : * inCfg , stopc : make ( chan struct { } ) }
2016-07-10 21:06:08 +03:00
cfg := & e . cfg
defer func ( ) {
2017-05-02 00:12:40 +03:00
if e == nil || err == nil {
return
2016-07-10 21:06:08 +03:00
}
2017-05-02 00:12:40 +03:00
if ! serving {
2017-12-07 22:02:17 +03:00
// errored before starting gRPC server for serveCtx.serversC
2017-05-02 00:12:40 +03:00
for _ , sctx := range e . sctxs {
2017-12-07 22:02:17 +03:00
close ( sctx . serversC )
2017-05-02 00:12:40 +03:00
}
}
e . Close ( )
e = nil
2016-07-10 21:06:08 +03:00
} ( )
2018-04-17 23:27:55 +03:00
if e . cfg . logger != nil {
e . cfg . logger . Info (
"configuring peer listeners" ,
zap . Strings ( "listen-peer-urls" , e . cfg . getLPURLs ( ) ) ,
)
}
if e . Peers , err = configurePeerListeners ( cfg ) ; err != nil {
2017-11-11 05:28:57 +03:00
return e , err
2016-07-10 21:06:08 +03:00
}
2018-04-17 23:27:55 +03:00
if e . cfg . logger != nil {
e . cfg . logger . Info (
"configuring client listeners" ,
zap . Strings ( "listen-client-urls" , e . cfg . getLCURLs ( ) ) ,
)
}
if e . sctxs , err = configureClientListeners ( cfg ) ; err != nil {
2017-11-11 05:28:57 +03:00
return e , err
2016-07-10 21:06:08 +03:00
}
2018-04-17 23:27:55 +03:00
2016-07-10 21:06:08 +03:00
for _ , sctx := range e . sctxs {
e . Clients = append ( e . Clients , sctx . l )
}
2016-07-14 02:02:49 +03:00
var (
urlsmap types . URLsMap
token string
)
2017-11-22 23:56:03 +03:00
memberInitialized := true
2016-07-14 02:02:49 +03:00
if ! isMemberInitialized ( cfg ) {
2017-11-22 23:56:03 +03:00
memberInitialized = false
2016-07-14 02:02:49 +03:00
urlsmap , token , err = cfg . PeerURLsMapAndToken ( "etcd" )
if err != nil {
2016-07-16 00:34:06 +03:00
return e , fmt . Errorf ( "error setting up initial cluster: %v" , err )
2016-07-14 02:02:49 +03:00
}
2016-07-10 21:06:08 +03:00
}
2017-09-29 03:31:09 +03:00
// AutoCompactionRetention defaults to "0" if not set.
if len ( cfg . AutoCompactionRetention ) == 0 {
cfg . AutoCompactionRetention = "0"
}
2018-02-21 04:21:07 +03:00
autoCompactionRetention , err := parseCompactionRetention ( cfg . AutoCompactionMode , cfg . AutoCompactionRetention )
if err != nil {
return e , err
2017-09-26 20:06:45 +03:00
}
2017-05-10 23:55:06 +03:00
srvcfg := etcdserver . ServerConfig {
2018-04-20 00:03:17 +03:00
Name : cfg . Name ,
ClientURLs : cfg . ACUrls ,
PeerURLs : cfg . APUrls ,
DataDir : cfg . Dir ,
DedicatedWALDir : cfg . WalDir ,
2018-05-19 00:38:39 +03:00
SnapshotCount : cfg . SnapshotCount ,
2018-04-20 00:03:17 +03:00
MaxSnapFiles : cfg . MaxSnapFiles ,
MaxWALFiles : cfg . MaxWalFiles ,
InitialPeerURLsMap : urlsmap ,
InitialClusterToken : token ,
DiscoveryURL : cfg . Durl ,
DiscoveryProxy : cfg . Dproxy ,
NewCluster : cfg . IsNewCluster ( ) ,
PeerTLSInfo : cfg . PeerTLSInfo ,
TickMs : cfg . TickMs ,
ElectionTicks : cfg . ElectionTicks ( ) ,
InitialElectionTickAdvance : cfg . InitialElectionTickAdvance ,
AutoCompactionRetention : autoCompactionRetention ,
AutoCompactionMode : cfg . AutoCompactionMode ,
QuotaBackendBytes : cfg . QuotaBackendBytes ,
MaxTxnOps : cfg . MaxTxnOps ,
MaxRequestBytes : cfg . MaxRequestBytes ,
StrictReconfigCheck : cfg . StrictReconfigCheck ,
ClientCertAuthEnabled : cfg . ClientTLSInfo . ClientCertAuth ,
AuthToken : cfg . AuthToken ,
2018-05-03 21:43:32 +03:00
BcryptCost : cfg . BcryptCost ,
2018-04-20 00:03:17 +03:00
CORS : cfg . CORS ,
HostWhitelist : cfg . HostWhitelist ,
InitialCorruptCheck : cfg . ExperimentalInitialCorruptCheck ,
CorruptCheckTime : cfg . ExperimentalCorruptCheckTime ,
PreVote : cfg . PreVote ,
Logger : cfg . logger ,
LoggerConfig : cfg . loggerConfig ,
2018-04-25 01:40:52 +03:00
LoggerCore : cfg . loggerCore ,
LoggerWriteSyncer : cfg . loggerWriteSyncer ,
2018-04-20 00:03:17 +03:00
Debug : cfg . Debug ,
ForceNewCluster : cfg . ForceNewCluster ,
2016-07-10 21:06:08 +03:00
}
if e . Server , err = etcdserver . NewServer ( srvcfg ) ; err != nil {
2017-11-11 05:28:57 +03:00
return e , err
2016-07-10 21:06:08 +03:00
}
2018-03-27 03:19:12 +03:00
if len ( e . cfg . CORS ) > 0 {
ss := make ( [ ] string , 0 , len ( e . cfg . CORS ) )
for v := range e . cfg . CORS {
ss = append ( ss , v )
}
sort . Strings ( ss )
2018-04-15 08:52:39 +03:00
if e . cfg . logger != nil {
2018-04-17 23:27:55 +03:00
e . cfg . logger . Info ( "configured CORS" , zap . Strings ( "cors" , ss ) )
2018-04-15 08:52:39 +03:00
} else {
plog . Infof ( "%s starting with cors %q" , e . Server . ID ( ) , ss )
}
2018-03-27 03:19:12 +03:00
}
if len ( e . cfg . HostWhitelist ) > 0 {
ss := make ( [ ] string , 0 , len ( e . cfg . HostWhitelist ) )
for v := range e . cfg . HostWhitelist {
ss = append ( ss , v )
}
sort . Strings ( ss )
2018-04-15 08:52:39 +03:00
if e . cfg . logger != nil {
2018-04-17 23:27:55 +03:00
e . cfg . logger . Info ( "configured host whitelist" , zap . Strings ( "hosts" , ss ) )
2018-04-15 08:52:39 +03:00
} else {
plog . Infof ( "%s starting with host whitelist %q" , e . Server . ID ( ) , ss )
}
2018-03-27 03:19:12 +03:00
}
2016-07-10 21:06:08 +03:00
2017-08-17 22:12:44 +03:00
// buffer channel so goroutines on closed connections won't wait forever
e . errc = make ( chan error , len ( e . Peers ) + len ( e . Clients ) + 2 * len ( e . sctxs ) )
2017-11-22 23:56:03 +03:00
// newly started member ("memberInitialized==false")
// does not need corruption check
if memberInitialized {
if err = e . Server . CheckInitialHashKV ( ) ; err != nil {
// set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"
// (nothing to close since rafthttp transports have not been started)
e . Server = nil
return e , err
}
}
2017-08-17 22:12:44 +03:00
e . Server . Start ( )
2017-11-20 20:35:39 +03:00
if err = e . servePeers ( ) ; err != nil {
return e , err
2017-08-17 22:12:44 +03:00
}
2017-11-20 20:35:39 +03:00
if err = e . serveClients ( ) ; err != nil {
return e , err
2017-05-05 02:56:08 +03:00
}
2017-11-20 20:35:39 +03:00
if err = e . serveMetrics ( ) ; err != nil {
2017-11-11 05:28:57 +03:00
return e , err
2016-07-21 18:45:07 +03:00
}
2017-11-20 20:35:39 +03:00
2018-04-17 23:27:55 +03:00
if e . cfg . logger != nil {
e . cfg . logger . Info (
"now serving peer/client/metrics" ,
zap . String ( "local-member-id" , e . Server . ID ( ) . String ( ) ) ,
zap . Strings ( "initial-advertise-peer-urls" , e . cfg . getAPURLs ( ) ) ,
zap . Strings ( "listen-peer-urls" , e . cfg . getLPURLs ( ) ) ,
zap . Strings ( "advertise-client-urls" , e . cfg . getACURLs ( ) ) ,
zap . Strings ( "listen-client-urls" , e . cfg . getLCURLs ( ) ) ,
zap . Strings ( "listen-metrics-urls" , e . cfg . getMetricsURLs ( ) ) ,
)
}
2017-05-02 00:12:40 +03:00
serving = true
2017-11-11 05:28:57 +03:00
return e , nil
2016-07-10 21:06:08 +03:00
}
2016-09-29 05:05:03 +03:00
// Config returns the current configuration.
func ( e * Etcd ) Config ( ) Config {
return e . cfg
}
2017-12-07 22:02:17 +03:00
// Close gracefully shuts down all servers/listeners.
// Client requests will be terminated with request timeout.
// After timeout, enforce remaning requests be closed immediately.
2016-07-10 21:06:08 +03:00
func ( e * Etcd ) Close ( ) {
2018-05-02 23:26:39 +03:00
fields := [ ] zap . Field {
2018-04-27 00:22:33 +03:00
zap . String ( "name" , e . cfg . Name ) ,
zap . String ( "data-dir" , e . cfg . Dir ) ,
zap . Strings ( "advertise-peer-urls" , e . cfg . getAPURLs ( ) ) ,
zap . Strings ( "advertise-client-urls" , e . cfg . getACURLs ( ) ) ,
}
lg := e . GetLogger ( )
if lg != nil {
lg . Info ( "closing etcd server" , fields ... )
}
defer func ( ) {
if lg != nil {
lg . Info ( "closed etcd server" , fields ... )
lg . Sync ( )
}
} ( )
2017-03-17 20:15:34 +03:00
e . closeOnce . Do ( func ( ) { close ( e . stopc ) } )
2017-12-07 22:02:17 +03:00
// close client requests with request timeout
timeout := 2 * time . Second
2017-12-07 09:20:41 +03:00
if e . Server != nil {
2017-12-07 22:02:17 +03:00
timeout = e . Server . Cfg . ReqTimeout ( )
2017-04-14 19:24:52 +03:00
}
2016-07-10 21:06:08 +03:00
for _ , sctx := range e . sctxs {
2017-12-07 22:02:17 +03:00
for ss := range sctx . serversC {
ctx , cancel := context . WithTimeout ( context . Background ( ) , timeout )
stopServers ( ctx , ss )
cancel ( )
}
}
for _ , sctx := range e . sctxs {
sctx . cancel ( )
2016-07-10 21:06:08 +03:00
}
2017-12-07 09:20:41 +03:00
2016-07-10 21:06:08 +03:00
for i := range e . Clients {
if e . Clients [ i ] != nil {
e . Clients [ i ] . Close ( )
}
}
2017-12-07 22:02:17 +03:00
2017-07-12 20:05:16 +03:00
for i := range e . metricsListeners {
e . metricsListeners [ i ] . Close ( )
}
2017-05-05 02:56:08 +03:00
// close rafthttp transports
2016-07-10 21:06:08 +03:00
if e . Server != nil {
e . Server . Stop ( )
}
2017-05-05 02:56:08 +03:00
// close all idle connections in peer handler (wait up to 1-second)
for i := range e . Peers {
if e . Peers [ i ] != nil && e . Peers [ i ] . close != nil {
ctx , cancel := context . WithTimeout ( context . Background ( ) , time . Second )
e . Peers [ i ] . close ( ctx )
cancel ( )
}
}
2016-07-10 21:06:08 +03:00
}
2017-12-07 22:02:17 +03:00
func stopServers ( ctx context . Context , ss * servers ) {
shutdownNow := func ( ) {
// first, close the http.Server
ss . http . Shutdown ( ctx )
// then close grpc.Server; cancels all active RPCs
ss . grpc . Stop ( )
}
// do not grpc.Server.GracefulStop with TLS enabled etcd server
// See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
// and https://github.com/coreos/etcd/issues/8916
if ss . secure {
shutdownNow ( )
return
2017-08-17 22:12:44 +03:00
}
2017-12-07 22:02:17 +03:00
2017-08-17 22:12:44 +03:00
ch := make ( chan struct { } )
go func ( ) {
defer close ( ch )
// close listeners to stop accepting new connections,
// will block on any existing transports
2017-12-07 22:02:17 +03:00
ss . grpc . GracefulStop ( )
2017-08-17 22:12:44 +03:00
} ( )
2017-12-07 22:02:17 +03:00
2017-08-17 22:12:44 +03:00
// wait until all pending RPCs are finished
select {
case <- ch :
2017-12-07 22:02:17 +03:00
case <- ctx . Done ( ) :
2017-08-17 22:12:44 +03:00
// took too long, manually close open transports
// e.g. watch streams
2017-12-07 22:02:17 +03:00
shutdownNow ( )
2017-08-17 22:12:44 +03:00
// concurrent GracefulStop should be interrupted
<- ch
}
}
2016-07-10 21:06:08 +03:00
func ( e * Etcd ) Err ( ) <- chan error { return e . errc }
2018-04-17 23:27:55 +03:00
func configurePeerListeners ( cfg * Config ) ( peers [ ] * peerListener , err error ) {
2018-06-04 18:45:16 +03:00
if err = updateCipherSuites ( & cfg . PeerTLSInfo , cfg . CipherSuites ) ; err != nil {
return nil , err
}
2017-07-18 00:34:59 +03:00
if err = cfg . PeerSelfCert ( ) ; err != nil {
2018-04-15 08:52:39 +03:00
if cfg . logger != nil {
cfg . logger . Fatal ( "failed to get peer self-signed certs" , zap . Error ( err ) )
} else {
plog . Fatalf ( "could not get certs (%v)" , err )
}
2016-07-10 21:06:08 +03:00
}
if ! cfg . PeerTLSInfo . Empty ( ) {
2018-04-15 08:52:39 +03:00
if cfg . logger != nil {
2018-06-04 18:45:16 +03:00
cfg . logger . Info (
"starting with peer TLS" ,
zap . String ( "tls-info" , fmt . Sprintf ( "%+v" , cfg . PeerTLSInfo ) ) ,
zap . Strings ( "cipher-suites" , cfg . CipherSuites ) ,
)
2018-04-15 08:52:39 +03:00
} else {
plog . Infof ( "peerTLS: %s" , cfg . PeerTLSInfo )
}
2016-07-10 21:06:08 +03:00
}
2017-05-05 02:56:08 +03:00
peers = make ( [ ] * peerListener , len ( cfg . LPUrls ) )
2016-07-10 21:06:08 +03:00
defer func ( ) {
if err == nil {
return
}
2017-05-05 02:56:08 +03:00
for i := range peers {
if peers [ i ] != nil && peers [ i ] . close != nil {
2018-04-15 08:52:39 +03:00
if cfg . logger != nil {
2018-04-17 23:27:55 +03:00
cfg . logger . Warn (
"closing peer listener" ,
zap . String ( "address" , cfg . LPUrls [ i ] . String ( ) ) ,
zap . Error ( err ) ,
)
2018-04-15 08:52:39 +03:00
} else {
plog . Info ( "stopping listening for peers on " , cfg . LPUrls [ i ] . String ( ) )
}
2017-12-07 22:02:17 +03:00
ctx , cancel := context . WithTimeout ( context . Background ( ) , time . Second )
peers [ i ] . close ( ctx )
cancel ( )
2016-07-10 21:06:08 +03:00
}
}
} ( )
for i , u := range cfg . LPUrls {
if u . Scheme == "http" {
if ! cfg . PeerTLSInfo . Empty ( ) {
2018-04-15 08:52:39 +03:00
if cfg . logger != nil {
cfg . logger . Warn ( "scheme is HTTP while key and cert files are present; ignoring key and cert files" , zap . String ( "peer-url" , u . String ( ) ) )
} else {
plog . Warningf ( "The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files." , u . String ( ) )
}
2016-07-10 21:06:08 +03:00
}
if cfg . PeerTLSInfo . ClientCertAuth {
2018-04-15 08:52:39 +03:00
if cfg . logger != nil {
cfg . logger . Warn ( "scheme is HTTP while --peer-client-cert-auth is enabled; ignoring client cert auth for this URL" , zap . String ( "peer-url" , u . String ( ) ) )
} else {
plog . Warningf ( "The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url." , u . String ( ) )
}
2016-07-10 21:06:08 +03:00
}
}
2017-05-05 02:56:08 +03:00
peers [ i ] = & peerListener { close : func ( context . Context ) error { return nil } }
peers [ i ] . Listener , err = rafthttp . NewListener ( u , & cfg . PeerTLSInfo )
if err != nil {
2016-07-10 21:06:08 +03:00
return nil , err
}
2017-05-05 02:56:08 +03:00
// once serve, overwrite with 'http.Server.Shutdown'
peers [ i ] . close = func ( context . Context ) error {
return peers [ i ] . Listener . Close ( )
}
2016-07-10 21:06:08 +03:00
}
2017-05-05 02:56:08 +03:00
return peers , nil
2016-07-10 21:06:08 +03:00
}
2017-11-20 20:35:39 +03:00
// configure peer handlers after rafthttp.Transport started
func ( e * Etcd ) servePeers ( ) ( err error ) {
2018-05-02 22:53:46 +03:00
ph := etcdhttp . NewPeerHandler ( e . GetLogger ( ) , e . Server )
2017-11-20 20:35:39 +03:00
var peerTLScfg * tls . Config
if ! e . cfg . PeerTLSInfo . Empty ( ) {
if peerTLScfg , err = e . cfg . PeerTLSInfo . ServerConfig ( ) ; err != nil {
return err
}
}
2017-12-07 22:02:17 +03:00
2017-11-20 20:35:39 +03:00
for _ , p := range e . Peers {
2018-04-17 23:27:55 +03:00
u := p . Listener . Addr ( ) . String ( )
2017-11-20 20:35:39 +03:00
gs := v3rpc . Server ( e . Server , peerTLScfg )
m := cmux . New ( p . Listener )
go gs . Serve ( m . Match ( cmux . HTTP2 ( ) ) )
srv := & http . Server {
Handler : grpcHandlerFunc ( gs , ph ) ,
ReadTimeout : 5 * time . Minute ,
ErrorLog : defaultLog . New ( ioutil . Discard , "" , 0 ) , // do not log user error
}
go srv . Serve ( m . Match ( cmux . Any ( ) ) )
p . serve = func ( ) error { return m . Serve ( ) }
p . close = func ( ctx context . Context ) error {
// gracefully shutdown http.Server
// close open listeners, idle connections
// until context cancel or time-out
2018-04-17 23:27:55 +03:00
if e . cfg . logger != nil {
e . cfg . logger . Info (
"stopping serving peer traffic" ,
zap . String ( "address" , u ) ,
)
}
2017-12-07 22:02:17 +03:00
stopServers ( ctx , & servers { secure : peerTLScfg != nil , grpc : gs , http : srv } )
2018-04-17 23:27:55 +03:00
if e . cfg . logger != nil {
e . cfg . logger . Info (
"stopped serving peer traffic" ,
zap . String ( "address" , u ) ,
)
}
2017-12-07 22:02:17 +03:00
return nil
2017-11-20 20:35:39 +03:00
}
}
// start peer servers in a goroutine
for _ , pl := range e . Peers {
go func ( l * peerListener ) {
2018-04-17 23:27:55 +03:00
u := l . Addr ( ) . String ( )
if e . cfg . logger != nil {
e . cfg . logger . Info (
"serving peer traffic" ,
zap . String ( "address" , u ) ,
)
} else {
plog . Info ( "listening for peers on " , u )
}
2017-11-20 20:35:39 +03:00
e . errHandler ( l . serve ( ) )
} ( pl )
}
return nil
}
2018-04-17 23:27:55 +03:00
func configureClientListeners ( cfg * Config ) ( sctxs map [ string ] * serveCtx , err error ) {
2018-06-04 18:45:16 +03:00
if err = updateCipherSuites ( & cfg . ClientTLSInfo , cfg . CipherSuites ) ; err != nil {
return nil , err
}
2017-07-18 00:34:59 +03:00
if err = cfg . ClientSelfCert ( ) ; err != nil {
2018-04-15 08:52:39 +03:00
if cfg . logger != nil {
cfg . logger . Fatal ( "failed to get client self-signed certs" , zap . Error ( err ) )
} else {
plog . Fatalf ( "could not get certs (%v)" , err )
}
2016-07-10 21:06:08 +03:00
}
2016-12-09 23:37:35 +03:00
if cfg . EnablePprof {
2018-04-15 08:52:39 +03:00
if cfg . logger != nil {
cfg . logger . Info ( "pprof is enabled" , zap . String ( "path" , debugutil . HTTPPrefixPProf ) )
} else {
plog . Infof ( "pprof is enabled under %s" , debugutil . HTTPPrefixPProf )
}
2016-12-09 23:37:35 +03:00
}
2016-07-10 21:06:08 +03:00
sctxs = make ( map [ string ] * serveCtx )
for _ , u := range cfg . LCUrls {
2018-04-15 08:52:39 +03:00
sctx := newServeCtx ( cfg . logger )
2016-07-19 02:41:41 +03:00
if u . Scheme == "http" || u . Scheme == "unix" {
2016-07-10 21:06:08 +03:00
if ! cfg . ClientTLSInfo . Empty ( ) {
2018-04-15 08:52:39 +03:00
if cfg . logger != nil {
cfg . logger . Warn ( "scheme is HTTP while key and cert files are present; ignoring key and cert files" , zap . String ( "client-url" , u . String ( ) ) )
} else {
plog . Warningf ( "The scheme of client url %s is HTTP while peer key/cert files are presented. Ignored key/cert files." , u . String ( ) )
}
2016-07-10 21:06:08 +03:00
}
if cfg . ClientTLSInfo . ClientCertAuth {
2018-04-15 08:52:39 +03:00
if cfg . logger != nil {
cfg . logger . Warn ( "scheme is HTTP while --client-cert-auth is enabled; ignoring client cert auth for this URL" , zap . String ( "client-url" , u . String ( ) ) )
} else {
plog . Warningf ( "The scheme of client url %s is HTTP while client cert auth (--client-cert-auth) is enabled. Ignored client cert auth for this url." , u . String ( ) )
}
2016-07-10 21:06:08 +03:00
}
}
2016-07-19 02:41:41 +03:00
if ( u . Scheme == "https" || u . Scheme == "unixs" ) && cfg . ClientTLSInfo . Empty ( ) {
2016-07-10 21:06:08 +03:00
return nil , fmt . Errorf ( "TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPs scheme" , u . String ( ) )
}
2018-02-26 03:52:38 +03:00
network := "tcp"
2017-04-01 17:30:41 +03:00
addr := u . Host
2016-07-19 02:41:41 +03:00
if u . Scheme == "unix" || u . Scheme == "unixs" {
2018-02-26 03:52:38 +03:00
network = "unix"
2017-04-01 17:30:41 +03:00
addr = u . Host + u . Path
2016-07-19 02:41:41 +03:00
}
2018-02-26 03:52:38 +03:00
sctx . network = network
2016-07-19 02:41:41 +03:00
sctx . secure = u . Scheme == "https" || u . Scheme == "unixs"
2016-07-10 21:06:08 +03:00
sctx . insecure = ! sctx . secure
2017-04-01 17:30:41 +03:00
if oldctx := sctxs [ addr ] ; oldctx != nil {
2016-07-10 21:06:08 +03:00
oldctx . secure = oldctx . secure || sctx . secure
oldctx . insecure = oldctx . insecure || sctx . insecure
continue
}
2018-02-26 03:52:38 +03:00
if sctx . l , err = net . Listen ( network , addr ) ; err != nil {
2016-07-10 21:06:08 +03:00
return nil , err
}
2017-07-07 00:19:06 +03:00
// net.Listener will rewrite ipv4 0.0.0.0 to ipv6 [::], breaking
// hosts that disable ipv6. So, use the address given by the user.
sctx . addr = addr
2016-07-10 21:06:08 +03:00
if fdLimit , fderr := runtimeutil . FDLimit ( ) ; fderr == nil {
if fdLimit <= reservedInternalFDNum {
2018-04-15 08:52:39 +03:00
if cfg . logger != nil {
cfg . logger . Fatal (
"file descriptor limit of etcd process is too low; please set higher" ,
zap . Uint64 ( "limit" , fdLimit ) ,
zap . Int ( "recommended-limit" , reservedInternalFDNum ) ,
)
} else {
plog . Fatalf ( "file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage" , fdLimit , reservedInternalFDNum )
}
2016-07-10 21:06:08 +03:00
}
sctx . l = transport . LimitListener ( sctx . l , int ( fdLimit - reservedInternalFDNum ) )
}
2018-02-26 03:52:38 +03:00
if network == "tcp" {
if sctx . l , err = transport . NewKeepAliveListener ( sctx . l , network , nil ) ; err != nil {
2016-07-19 02:41:41 +03:00
return nil , err
}
2016-07-10 21:06:08 +03:00
}
defer func ( ) {
2018-04-17 23:27:55 +03:00
if err == nil {
return
}
sctx . l . Close ( )
if cfg . logger != nil {
cfg . logger . Warn (
"closing peer listener" ,
zap . String ( "address" , u . Host ) ,
zap . Error ( err ) ,
)
} else {
plog . Info ( "stopping listening for client requests on " , u . Host )
2016-07-10 21:06:08 +03:00
}
} ( )
2016-12-14 21:12:40 +03:00
for k := range cfg . UserHandlers {
sctx . userHandlers [ k ] = cfg . UserHandlers [ k ]
}
2017-01-22 13:21:19 +03:00
sctx . serviceRegister = cfg . ServiceRegister
2017-02-27 22:01:05 +03:00
if cfg . EnablePprof || cfg . Debug {
2016-12-09 23:37:35 +03:00
sctx . registerPprof ( )
}
2017-02-27 22:01:05 +03:00
if cfg . Debug {
sctx . registerTrace ( )
}
2017-04-01 17:30:41 +03:00
sctxs [ addr ] = sctx
2016-07-10 21:06:08 +03:00
}
return sctxs , nil
}
2017-11-20 20:35:39 +03:00
func ( e * Etcd ) serveClients ( ) ( err error ) {
2016-07-10 21:06:08 +03:00
if ! e . cfg . ClientTLSInfo . Empty ( ) {
2018-04-15 08:52:39 +03:00
if e . cfg . logger != nil {
2018-04-17 23:27:55 +03:00
e . cfg . logger . Info (
"starting with client TLS" ,
zap . String ( "tls-info" , fmt . Sprintf ( "%+v" , e . cfg . ClientTLSInfo ) ) ,
2018-06-04 18:45:16 +03:00
zap . Strings ( "cipher-suites" , e . cfg . CipherSuites ) ,
2018-04-17 23:27:55 +03:00
)
2018-04-15 08:52:39 +03:00
} else {
plog . Infof ( "ClientTLS: %s" , e . cfg . ClientTLSInfo )
}
2016-07-10 21:06:08 +03:00
}
// Start a client server goroutine for each listen address
2017-05-10 05:39:29 +03:00
var h http . Handler
2017-01-16 19:55:26 +03:00
if e . Config ( ) . EnableV2 {
2017-08-11 08:26:46 +03:00
if len ( e . Config ( ) . ExperimentalEnableV2V3 ) > 0 {
2018-04-16 14:01:05 +03:00
srv := v2v3 . NewServer ( e . cfg . logger , v3client . New ( e . Server ) , e . cfg . ExperimentalEnableV2V3 )
2018-05-02 21:34:43 +03:00
h = v2http . NewClientHandler ( e . GetLogger ( ) , srv , e . Server . Cfg . ReqTimeout ( ) )
2017-08-11 08:26:46 +03:00
} else {
2018-05-02 21:34:43 +03:00
h = v2http . NewClientHandler ( e . GetLogger ( ) , e . Server , e . Server . Cfg . ReqTimeout ( ) )
2017-08-11 08:26:46 +03:00
}
2017-05-10 05:39:29 +03:00
} else {
mux := http . NewServeMux ( )
etcdhttp . HandleBasic ( mux , e . Server )
h = mux
2017-01-16 19:55:26 +03:00
}
2017-05-10 05:39:29 +03:00
2017-08-25 23:26:28 +03:00
gopts := [ ] grpc . ServerOption { }
if e . cfg . GRPCKeepAliveMinTime > time . Duration ( 0 ) {
gopts = append ( gopts , grpc . KeepaliveEnforcementPolicy ( keepalive . EnforcementPolicy {
MinTime : e . cfg . GRPCKeepAliveMinTime ,
PermitWithoutStream : false ,
} ) )
}
if e . cfg . GRPCKeepAliveInterval > time . Duration ( 0 ) &&
e . cfg . GRPCKeepAliveTimeout > time . Duration ( 0 ) {
gopts = append ( gopts , grpc . KeepaliveParams ( keepalive . ServerParameters {
Time : e . cfg . GRPCKeepAliveInterval ,
Timeout : e . cfg . GRPCKeepAliveTimeout ,
} ) )
}
2017-11-20 20:35:39 +03:00
// start client servers in a goroutine
2016-07-10 21:06:08 +03:00
for _ , sctx := range e . sctxs {
go func ( s * serveCtx ) {
2017-08-25 23:26:28 +03:00
e . errHandler ( s . serve ( e . Server , & e . cfg . ClientTLSInfo , h , e . errHandler , gopts ... ) )
2016-07-10 21:06:08 +03:00
} ( sctx )
}
2017-11-20 20:35:39 +03:00
return nil
}
2017-07-12 20:05:16 +03:00
2017-11-20 20:35:39 +03:00
func ( e * Etcd ) serveMetrics ( ) ( err error ) {
2018-01-06 17:56:44 +03:00
if e . cfg . Metrics == "extensive" {
grpc_prometheus . EnableHandlingTimeHistogram ( )
}
2017-07-12 20:05:16 +03:00
if len ( e . cfg . ListenMetricsUrls ) > 0 {
metricsMux := http . NewServeMux ( )
2017-07-24 21:22:49 +03:00
etcdhttp . HandleMetricsHealth ( metricsMux , e . Server )
2017-07-12 20:05:16 +03:00
for _ , murl := range e . cfg . ListenMetricsUrls {
2017-07-24 21:22:49 +03:00
tlsInfo := & e . cfg . ClientTLSInfo
if murl . Scheme == "http" {
tlsInfo = nil
}
ml , err := transport . NewListener ( murl . Host , murl . Scheme , tlsInfo )
2017-07-12 20:05:16 +03:00
if err != nil {
return err
}
e . metricsListeners = append ( e . metricsListeners , ml )
go func ( u url . URL , ln net . Listener ) {
2018-04-15 08:52:39 +03:00
if e . cfg . logger != nil {
2018-04-17 23:27:55 +03:00
e . cfg . logger . Info (
"serving metrics" ,
zap . String ( "address" , u . String ( ) ) ,
)
2018-04-15 08:52:39 +03:00
} else {
plog . Info ( "listening for metrics on " , u . String ( ) )
}
2017-07-12 20:05:16 +03:00
e . errHandler ( http . Serve ( ln , metricsMux ) )
} ( murl , ml )
}
}
2016-07-10 21:06:08 +03:00
return nil
}
2017-03-17 20:15:34 +03:00
func ( e * Etcd ) errHandler ( err error ) {
select {
case <- e . stopc :
return
default :
}
select {
case <- e . stopc :
case e . errc <- err :
}
}
2018-02-21 04:21:07 +03:00
2018-04-15 08:52:39 +03:00
// GetLogger returns the logger.
func ( e * Etcd ) GetLogger ( ) * zap . Logger {
e . cfg . loggerMu . RLock ( )
l := e . cfg . logger
e . cfg . loggerMu . RUnlock ( )
return l
}
2018-02-21 04:21:07 +03:00
func parseCompactionRetention ( mode , retention string ) ( ret time . Duration , err error ) {
h , err := strconv . Atoi ( retention )
if err == nil {
switch mode {
case CompactorModeRevision :
ret = time . Duration ( int64 ( h ) )
case CompactorModePeriodic :
ret = time . Duration ( int64 ( h ) ) * time . Hour
}
} else {
// periodic compaction
ret , err = time . ParseDuration ( retention )
if err != nil {
return 0 , fmt . Errorf ( "error parsing CompactionRetention: %v" , err )
}
}
return ret , nil
}