2016-07-10 21:06:08 +03:00
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package embed
import (
2017-05-05 02:56:08 +03:00
"context"
2016-07-10 21:06:08 +03:00
"fmt"
2017-05-05 02:56:08 +03:00
"io/ioutil"
defaultLog "log"
2016-07-10 21:06:08 +03:00
"net"
"net/http"
2017-03-16 05:31:10 +03:00
"path/filepath"
2017-03-17 20:15:34 +03:00
"sync"
2017-05-05 02:56:08 +03:00
"time"
2016-07-10 21:06:08 +03:00
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v2http"
"github.com/coreos/etcd/pkg/cors"
2017-04-04 01:15:47 +03:00
"github.com/coreos/etcd/pkg/debugutil"
2016-07-10 21:06:08 +03:00
runtimeutil "github.com/coreos/etcd/pkg/runtime"
"github.com/coreos/etcd/pkg/transport"
2016-07-14 02:02:49 +03:00
"github.com/coreos/etcd/pkg/types"
2016-07-10 21:06:08 +03:00
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/pkg/capnslog"
)
var plog = capnslog . NewPackageLogger ( "github.com/coreos/etcd" , "embed" )
const (
// internal fd usage includes disk usage and transport usage.
// To read/write snapshot, snap pkg needs 1. In normal case, wal pkg needs
// at most 2 to read/lock/write WALs. One case that it needs to 2 is to
// read all logs after some snapshot index, which locates at the end of
// the second last and the head of the last. For purging, it needs to read
// directory, so it needs 1. For fd monitor, it needs 1.
// For transport, rafthttp builds two long-polling connections and at most
// four temporary connections with each member. There are at most 9 members
// in a cluster, so it should reserve 96.
// For the safety, we set the total reserved number to 150.
reservedInternalFDNum = 150
)
// Etcd contains a running etcd server and its listeners.
type Etcd struct {
2017-05-05 02:56:08 +03:00
Peers [ ] * peerListener
2016-07-10 21:06:08 +03:00
Clients [ ] net . Listener
Server * etcdserver . EtcdServer
cfg Config
2017-03-17 20:15:34 +03:00
stopc chan struct { }
2016-07-10 21:06:08 +03:00
errc chan error
sctxs map [ string ] * serveCtx
2017-03-17 20:15:34 +03:00
closeOnce sync . Once
2016-07-10 21:06:08 +03:00
}
2017-05-05 02:56:08 +03:00
type peerListener struct {
net . Listener
serve func ( ) error
close func ( context . Context ) error
}
2016-07-10 21:06:08 +03:00
// StartEtcd launches the etcd server and HTTP handlers for client/server communication.
2016-08-29 09:04:06 +03:00
// The returned Etcd.Server is not guaranteed to have joined the cluster. Wait
// on the Etcd.Server.ReadyNotify() channel to know when it completes and is ready for use.
2016-07-10 21:06:08 +03:00
func StartEtcd ( inCfg * Config ) ( e * Etcd , err error ) {
if err = inCfg . Validate ( ) ; err != nil {
return nil , err
}
2017-05-02 00:12:40 +03:00
serving := false
2017-03-17 20:15:34 +03:00
e = & Etcd { cfg : * inCfg , stopc : make ( chan struct { } ) }
2016-07-10 21:06:08 +03:00
cfg := & e . cfg
defer func ( ) {
2017-05-02 00:12:40 +03:00
if e == nil || err == nil {
return
2016-07-10 21:06:08 +03:00
}
2017-05-02 00:12:40 +03:00
if ! serving {
// errored before starting gRPC server for serveCtx.grpcServerC
for _ , sctx := range e . sctxs {
close ( sctx . grpcServerC )
}
}
e . Close ( )
e = nil
2016-07-10 21:06:08 +03:00
} ( )
if e . Peers , err = startPeerListeners ( cfg ) ; err != nil {
return
}
if e . sctxs , err = startClientListeners ( cfg ) ; err != nil {
return
}
for _ , sctx := range e . sctxs {
e . Clients = append ( e . Clients , sctx . l )
}
2016-07-14 02:02:49 +03:00
var (
urlsmap types . URLsMap
token string
)
if ! isMemberInitialized ( cfg ) {
urlsmap , token , err = cfg . PeerURLsMapAndToken ( "etcd" )
if err != nil {
2016-07-16 00:34:06 +03:00
return e , fmt . Errorf ( "error setting up initial cluster: %v" , err )
2016-07-14 02:02:49 +03:00
}
2016-07-10 21:06:08 +03:00
}
2017-05-10 23:55:06 +03:00
srvcfg := etcdserver . ServerConfig {
2016-07-10 21:06:08 +03:00
Name : cfg . Name ,
ClientURLs : cfg . ACUrls ,
PeerURLs : cfg . APUrls ,
DataDir : cfg . Dir ,
DedicatedWALDir : cfg . WalDir ,
SnapCount : cfg . SnapCount ,
MaxSnapFiles : cfg . MaxSnapFiles ,
MaxWALFiles : cfg . MaxWalFiles ,
InitialPeerURLsMap : urlsmap ,
InitialClusterToken : token ,
DiscoveryURL : cfg . Durl ,
DiscoveryProxy : cfg . Dproxy ,
NewCluster : cfg . IsNewCluster ( ) ,
ForceNewCluster : cfg . ForceNewCluster ,
PeerTLSInfo : cfg . PeerTLSInfo ,
TickMs : cfg . TickMs ,
ElectionTicks : cfg . ElectionTicks ( ) ,
AutoCompactionRetention : cfg . AutoCompactionRetention ,
2017-06-17 01:12:49 +03:00
AutoCompactionMode : cfg . AutoCompactionMode ,
2016-07-10 21:06:08 +03:00
QuotaBackendBytes : cfg . QuotaBackendBytes ,
2017-05-24 03:50:20 +03:00
MaxTxnOps : cfg . MaxTxnOps ,
2017-05-23 02:54:50 +03:00
MaxRequestBytes : cfg . MaxRequestBytes ,
2016-07-10 21:06:08 +03:00
StrictReconfigCheck : cfg . StrictReconfigCheck ,
2016-07-21 02:23:24 +03:00
ClientCertAuthEnabled : cfg . ClientTLSInfo . ClientCertAuth ,
2016-07-21 08:13:57 +03:00
AuthToken : cfg . AuthToken ,
2016-07-10 21:06:08 +03:00
}
if e . Server , err = etcdserver . NewServer ( srvcfg ) ; err != nil {
return
}
2017-05-05 02:56:08 +03:00
// configure peer handlers after rafthttp.Transport started
ph := v2http . NewPeerHandler ( e . Server )
for i := range e . Peers {
srv := & http . Server {
Handler : ph ,
ReadTimeout : 5 * time . Minute ,
ErrorLog : defaultLog . New ( ioutil . Discard , "" , 0 ) , // do not log user error
}
e . Peers [ i ] . serve = func ( ) error {
return srv . Serve ( e . Peers [ i ] . Listener )
}
e . Peers [ i ] . close = func ( ctx context . Context ) error {
// gracefully shutdown http.Server
// close open listeners, idle connections
// until context cancel or time-out
return srv . Shutdown ( ctx )
}
}
2016-07-10 21:06:08 +03:00
// buffer channel so goroutines on closed connections won't wait forever
e . errc = make ( chan error , len ( e . Peers ) + len ( e . Clients ) + 2 * len ( e . sctxs ) )
e . Server . Start ( )
2016-07-21 18:45:07 +03:00
if err = e . serve ( ) ; err != nil {
return
}
2017-05-02 00:12:40 +03:00
serving = true
2016-07-10 21:06:08 +03:00
return
}
2016-09-29 05:05:03 +03:00
// Config returns the current configuration.
func ( e * Etcd ) Config ( ) Config {
return e . cfg
}
2016-07-10 21:06:08 +03:00
func ( e * Etcd ) Close ( ) {
2017-03-17 20:15:34 +03:00
e . closeOnce . Do ( func ( ) { close ( e . stopc ) } )
2017-04-14 19:24:52 +03:00
// (gRPC server) stops accepting new connections,
// RPCs, and blocks until all pending RPCs are finished
for _ , sctx := range e . sctxs {
for gs := range sctx . grpcServerC {
gs . GracefulStop ( )
}
}
2016-07-10 21:06:08 +03:00
for _ , sctx := range e . sctxs {
sctx . cancel ( )
}
for i := range e . Clients {
if e . Clients [ i ] != nil {
e . Clients [ i ] . Close ( )
}
}
2017-05-05 02:56:08 +03:00
// close rafthttp transports
2016-07-10 21:06:08 +03:00
if e . Server != nil {
e . Server . Stop ( )
}
2017-05-05 02:56:08 +03:00
// close all idle connections in peer handler (wait up to 1-second)
for i := range e . Peers {
if e . Peers [ i ] != nil && e . Peers [ i ] . close != nil {
ctx , cancel := context . WithTimeout ( context . Background ( ) , time . Second )
e . Peers [ i ] . close ( ctx )
cancel ( )
}
}
2016-07-10 21:06:08 +03:00
}
func ( e * Etcd ) Err ( ) <- chan error { return e . errc }
2017-05-05 02:56:08 +03:00
func startPeerListeners ( cfg * Config ) ( peers [ ] * peerListener , err error ) {
2016-07-10 21:06:08 +03:00
if cfg . PeerAutoTLS && cfg . PeerTLSInfo . Empty ( ) {
phosts := make ( [ ] string , len ( cfg . LPUrls ) )
for i , u := range cfg . LPUrls {
phosts [ i ] = u . Host
}
2017-03-16 05:31:10 +03:00
cfg . PeerTLSInfo , err = transport . SelfCert ( filepath . Join ( cfg . Dir , "fixtures" , "peer" ) , phosts )
2016-07-10 21:06:08 +03:00
if err != nil {
plog . Fatalf ( "could not get certs (%v)" , err )
}
} else if cfg . PeerAutoTLS {
plog . Warningf ( "ignoring peer auto TLS since certs given" )
}
if ! cfg . PeerTLSInfo . Empty ( ) {
plog . Infof ( "peerTLS: %s" , cfg . PeerTLSInfo )
}
2017-05-05 02:56:08 +03:00
peers = make ( [ ] * peerListener , len ( cfg . LPUrls ) )
2016-07-10 21:06:08 +03:00
defer func ( ) {
if err == nil {
return
}
2017-05-05 02:56:08 +03:00
for i := range peers {
if peers [ i ] != nil && peers [ i ] . close != nil {
plog . Info ( "stopping listening for peers on " , cfg . LPUrls [ i ] . String ( ) )
peers [ i ] . close ( context . Background ( ) )
2016-07-10 21:06:08 +03:00
}
}
} ( )
for i , u := range cfg . LPUrls {
if u . Scheme == "http" {
if ! cfg . PeerTLSInfo . Empty ( ) {
plog . Warningf ( "The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files." , u . String ( ) )
}
if cfg . PeerTLSInfo . ClientCertAuth {
plog . Warningf ( "The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url." , u . String ( ) )
}
}
2017-05-05 02:56:08 +03:00
peers [ i ] = & peerListener { close : func ( context . Context ) error { return nil } }
peers [ i ] . Listener , err = rafthttp . NewListener ( u , & cfg . PeerTLSInfo )
if err != nil {
2016-07-10 21:06:08 +03:00
return nil , err
}
2017-05-05 02:56:08 +03:00
// once serve, overwrite with 'http.Server.Shutdown'
peers [ i ] . close = func ( context . Context ) error {
return peers [ i ] . Listener . Close ( )
}
2016-07-10 21:06:08 +03:00
plog . Info ( "listening for peers on " , u . String ( ) )
}
2017-05-05 02:56:08 +03:00
return peers , nil
2016-07-10 21:06:08 +03:00
}
func startClientListeners ( cfg * Config ) ( sctxs map [ string ] * serveCtx , err error ) {
if cfg . ClientAutoTLS && cfg . ClientTLSInfo . Empty ( ) {
chosts := make ( [ ] string , len ( cfg . LCUrls ) )
for i , u := range cfg . LCUrls {
chosts [ i ] = u . Host
}
2017-03-16 05:31:10 +03:00
cfg . ClientTLSInfo , err = transport . SelfCert ( filepath . Join ( cfg . Dir , "fixtures" , "client" ) , chosts )
2016-07-10 21:06:08 +03:00
if err != nil {
plog . Fatalf ( "could not get certs (%v)" , err )
}
} else if cfg . ClientAutoTLS {
plog . Warningf ( "ignoring client auto TLS since certs given" )
}
2016-12-09 23:37:35 +03:00
if cfg . EnablePprof {
2017-04-04 01:15:47 +03:00
plog . Infof ( "pprof is enabled under %s" , debugutil . HTTPPrefixPProf )
2016-12-09 23:37:35 +03:00
}
2016-07-10 21:06:08 +03:00
sctxs = make ( map [ string ] * serveCtx )
for _ , u := range cfg . LCUrls {
sctx := newServeCtx ( )
2016-07-19 02:41:41 +03:00
if u . Scheme == "http" || u . Scheme == "unix" {
2016-07-10 21:06:08 +03:00
if ! cfg . ClientTLSInfo . Empty ( ) {
plog . Warningf ( "The scheme of client url %s is HTTP while peer key/cert files are presented. Ignored key/cert files." , u . String ( ) )
}
if cfg . ClientTLSInfo . ClientCertAuth {
plog . Warningf ( "The scheme of client url %s is HTTP while client cert auth (--client-cert-auth) is enabled. Ignored client cert auth for this url." , u . String ( ) )
}
}
2016-07-19 02:41:41 +03:00
if ( u . Scheme == "https" || u . Scheme == "unixs" ) && cfg . ClientTLSInfo . Empty ( ) {
2016-07-10 21:06:08 +03:00
return nil , fmt . Errorf ( "TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPs scheme" , u . String ( ) )
}
2016-07-19 02:41:41 +03:00
proto := "tcp"
2017-04-01 17:30:41 +03:00
addr := u . Host
2016-07-19 02:41:41 +03:00
if u . Scheme == "unix" || u . Scheme == "unixs" {
proto = "unix"
2017-04-01 17:30:41 +03:00
addr = u . Host + u . Path
2016-07-19 02:41:41 +03:00
}
sctx . secure = u . Scheme == "https" || u . Scheme == "unixs"
2016-07-10 21:06:08 +03:00
sctx . insecure = ! sctx . secure
2017-04-01 17:30:41 +03:00
if oldctx := sctxs [ addr ] ; oldctx != nil {
2016-07-10 21:06:08 +03:00
oldctx . secure = oldctx . secure || sctx . secure
oldctx . insecure = oldctx . insecure || sctx . insecure
continue
}
2017-04-01 17:30:41 +03:00
if sctx . l , err = net . Listen ( proto , addr ) ; err != nil {
2016-07-10 21:06:08 +03:00
return nil , err
}
2017-07-07 00:19:06 +03:00
// net.Listener will rewrite ipv4 0.0.0.0 to ipv6 [::], breaking
// hosts that disable ipv6. So, use the address given by the user.
sctx . addr = addr
2016-07-10 21:06:08 +03:00
if fdLimit , fderr := runtimeutil . FDLimit ( ) ; fderr == nil {
if fdLimit <= reservedInternalFDNum {
plog . Fatalf ( "file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage" , fdLimit , reservedInternalFDNum )
}
sctx . l = transport . LimitListener ( sctx . l , int ( fdLimit - reservedInternalFDNum ) )
}
2016-07-19 02:41:41 +03:00
if proto == "tcp" {
if sctx . l , err = transport . NewKeepAliveListener ( sctx . l , "tcp" , nil ) ; err != nil {
return nil , err
}
2016-07-10 21:06:08 +03:00
}
plog . Info ( "listening for client requests on " , u . Host )
defer func ( ) {
if err != nil {
sctx . l . Close ( )
plog . Info ( "stopping listening for client requests on " , u . Host )
}
} ( )
2016-12-14 21:12:40 +03:00
for k := range cfg . UserHandlers {
sctx . userHandlers [ k ] = cfg . UserHandlers [ k ]
}
2017-01-22 13:21:19 +03:00
sctx . serviceRegister = cfg . ServiceRegister
2017-02-27 22:01:05 +03:00
if cfg . EnablePprof || cfg . Debug {
2016-12-09 23:37:35 +03:00
sctx . registerPprof ( )
}
2017-02-27 22:01:05 +03:00
if cfg . Debug {
sctx . registerTrace ( )
}
2017-04-01 17:30:41 +03:00
sctxs [ addr ] = sctx
2016-07-10 21:06:08 +03:00
}
return sctxs , nil
}
func ( e * Etcd ) serve ( ) ( err error ) {
if ! e . cfg . ClientTLSInfo . Empty ( ) {
plog . Infof ( "ClientTLS: %s" , e . cfg . ClientTLSInfo )
}
if e . cfg . CorsInfo . String ( ) != "" {
plog . Infof ( "cors = %s" , e . cfg . CorsInfo )
}
// Start the peer server in a goroutine
2017-05-05 02:56:08 +03:00
for _ , pl := range e . Peers {
go func ( l * peerListener ) {
e . errHandler ( l . serve ( ) )
} ( pl )
2016-07-10 21:06:08 +03:00
}
// Start a client server goroutine for each listen address
2017-01-16 19:55:26 +03:00
var v2h http . Handler
if e . Config ( ) . EnableV2 {
v2h = http . Handler ( & cors . CORSHandler {
Handler : v2http . NewClientHandler ( e . Server , e . Server . Cfg . ReqTimeout ( ) ) ,
Info : e . cfg . CorsInfo ,
} )
}
2016-07-10 21:06:08 +03:00
for _ , sctx := range e . sctxs {
go func ( s * serveCtx ) {
2017-06-17 05:04:57 +03:00
e . errHandler ( s . serve ( e . Server , & e . cfg . ClientTLSInfo , v2h , e . errHandler ) )
2016-07-10 21:06:08 +03:00
} ( sctx )
}
return nil
}
2017-03-17 20:15:34 +03:00
func ( e * Etcd ) errHandler ( err error ) {
select {
case <- e . stopc :
return
default :
}
select {
case <- e . stopc :
case e . errc <- err :
}
}