2016-06-29 08:25:10 +03:00
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdmain
import (
2017-07-27 03:10:10 +03:00
"context"
2016-06-29 08:25:10 +03:00
"fmt"
2017-07-18 01:36:19 +03:00
"math"
2016-06-29 08:25:10 +03:00
"net"
2016-11-19 03:34:54 +03:00
"net/http"
2017-07-12 20:06:58 +03:00
"net/url"
2016-06-29 08:25:10 +03:00
"os"
2017-07-19 00:09:32 +03:00
"path/filepath"
2016-06-29 08:25:10 +03:00
"time"
"github.com/coreos/etcd/clientv3"
2017-06-29 01:09:44 +03:00
"github.com/coreos/etcd/clientv3/leasing"
2017-03-20 05:30:21 +03:00
"github.com/coreos/etcd/clientv3/namespace"
2017-07-27 03:10:10 +03:00
"github.com/coreos/etcd/clientv3/ordering"
2017-07-24 21:22:49 +03:00
"github.com/coreos/etcd/etcdserver/api/etcdhttp"
2017-05-10 22:19:09 +03:00
"github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
"github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
2016-06-29 08:25:10 +03:00
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
2017-04-04 01:15:47 +03:00
"github.com/coreos/etcd/pkg/debugutil"
2016-06-29 08:25:10 +03:00
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/proxy/grpcproxy"
2016-11-19 03:34:54 +03:00
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
2017-08-20 04:22:58 +03:00
"github.com/soheilhy/cmux"
2017-02-28 04:09:36 +03:00
"github.com/spf13/cobra"
"google.golang.org/grpc"
2016-06-29 08:25:10 +03:00
)
var (
2017-04-06 01:25:22 +03:00
grpcProxyListenAddr string
2017-07-12 20:06:58 +03:00
grpcProxyMetricsListenAddr string
2017-04-06 01:25:22 +03:00
grpcProxyEndpoints [ ] string
grpcProxyDNSCluster string
grpcProxyInsecureDiscovery bool
2017-07-19 00:09:32 +03:00
grpcProxyDataDir string
2017-07-18 01:36:19 +03:00
// tls for connecting to etcd
2017-07-19 00:09:32 +03:00
grpcProxyCA string
grpcProxyCert string
grpcProxyKey string
grpcProxyInsecureSkipTLSVerify bool
2017-07-18 01:36:19 +03:00
// tls for clients connecting to proxy
2017-07-19 00:09:32 +03:00
grpcProxyListenCA string
grpcProxyListenCert string
grpcProxyListenKey string
grpcProxyListenAutoTLS bool
2017-07-19 00:59:35 +03:00
grpcProxyListenCRL string
2017-03-20 05:30:21 +03:00
2017-02-28 04:09:36 +03:00
grpcProxyAdvertiseClientURL string
grpcProxyResolverPrefix string
grpcProxyResolverTTL int
2017-03-20 05:30:21 +03:00
grpcProxyNamespace string
2017-06-29 01:09:44 +03:00
grpcProxyLeasing string
2017-04-04 01:15:47 +03:00
2017-07-27 03:10:10 +03:00
grpcProxyEnablePprof bool
grpcProxyEnableOrdering bool
2016-06-29 08:25:10 +03:00
)
func init ( ) {
rootCmd . AddCommand ( newGRPCProxyCommand ( ) )
}
// newGRPCProxyCommand returns the cobra command for "grpc-proxy".
func newGRPCProxyCommand ( ) * cobra . Command {
lpc := & cobra . Command {
Use : "grpc-proxy <subcommand>" ,
Short : "grpc-proxy related command" ,
}
lpc . AddCommand ( newGRPCProxyStartCommand ( ) )
return lpc
}
func newGRPCProxyStartCommand ( ) * cobra . Command {
cmd := cobra . Command {
Use : "start" ,
Short : "start the grpc proxy" ,
Run : startGRPCProxy ,
}
cmd . Flags ( ) . StringVar ( & grpcProxyListenAddr , "listen-addr" , "127.0.0.1:23790" , "listen address" )
2017-04-06 01:25:22 +03:00
cmd . Flags ( ) . StringVar ( & grpcProxyDNSCluster , "discovery-srv" , "" , "DNS domain used to bootstrap initial cluster" )
2017-07-12 20:06:58 +03:00
cmd . Flags ( ) . StringVar ( & grpcProxyMetricsListenAddr , "metrics-addr" , "" , "listen for /metrics requests on an additional interface" )
2017-04-06 01:25:22 +03:00
cmd . Flags ( ) . BoolVar ( & grpcProxyInsecureDiscovery , "insecure-discovery" , false , "accept insecure SRV records" )
2016-06-29 08:25:10 +03:00
cmd . Flags ( ) . StringSliceVar ( & grpcProxyEndpoints , "endpoints" , [ ] string { "127.0.0.1:2379" } , "comma separated etcd cluster endpoints" )
2017-02-28 04:09:36 +03:00
cmd . Flags ( ) . StringVar ( & grpcProxyAdvertiseClientURL , "advertise-client-url" , "127.0.0.1:23790" , "advertise address to register (must be reachable by client)" )
cmd . Flags ( ) . StringVar ( & grpcProxyResolverPrefix , "resolver-prefix" , "" , "prefix to use for registering proxy (must be shared with other grpc-proxy members)" )
cmd . Flags ( ) . IntVar ( & grpcProxyResolverTTL , "resolver-ttl" , 0 , "specify TTL, in seconds, when registering proxy endpoints" )
2017-03-20 05:30:21 +03:00
cmd . Flags ( ) . StringVar ( & grpcProxyNamespace , "namespace" , "" , "string to prefix to all keys for namespacing requests" )
2017-04-04 01:15:47 +03:00
cmd . Flags ( ) . BoolVar ( & grpcProxyEnablePprof , "enable-pprof" , false , ` Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/" ` )
2017-07-19 00:09:32 +03:00
cmd . Flags ( ) . StringVar ( & grpcProxyDataDir , "data-dir" , "default.proxy" , "Data directory for persistent data" )
2016-06-29 08:25:10 +03:00
2017-07-18 01:36:19 +03:00
// client TLS for connecting to server
cmd . Flags ( ) . StringVar ( & grpcProxyCert , "cert" , "" , "identify secure connections with etcd servers using this TLS certificate file" )
cmd . Flags ( ) . StringVar ( & grpcProxyKey , "key" , "" , "identify secure connections with etcd servers using this TLS key file" )
cmd . Flags ( ) . StringVar ( & grpcProxyCA , "cacert" , "" , "verify certificates of TLS-enabled secure etcd servers using this CA bundle" )
2017-07-19 00:09:32 +03:00
cmd . Flags ( ) . BoolVar ( & grpcProxyInsecureSkipTLSVerify , "insecure-skip-tls-verify" , false , "skip authentication of etcd server TLS certificates" )
2017-07-18 01:36:19 +03:00
// client TLS for connecting to proxy
cmd . Flags ( ) . StringVar ( & grpcProxyListenCert , "cert-file" , "" , "identify secure connections to the proxy using this TLS certificate file" )
cmd . Flags ( ) . StringVar ( & grpcProxyListenKey , "key-file" , "" , "identify secure connections to the proxy using this TLS key file" )
cmd . Flags ( ) . StringVar ( & grpcProxyListenCA , "trusted-ca-file" , "" , "verify certificates of TLS-enabled secure proxy using this CA bundle" )
2017-07-19 00:09:32 +03:00
cmd . Flags ( ) . BoolVar ( & grpcProxyListenAutoTLS , "auto-tls" , false , "proxy TLS using generated certificates" )
2017-07-19 00:59:35 +03:00
cmd . Flags ( ) . StringVar ( & grpcProxyListenCRL , "client-crl-file" , "" , "proxy client certificate revocation list file." )
2017-07-18 01:36:19 +03:00
2017-07-27 03:10:10 +03:00
// experimental flags
cmd . Flags ( ) . BoolVar ( & grpcProxyEnableOrdering , "experimental-serializable-ordering" , false , "Ensure serializable reads have monotonically increasing store revisions across endpoints." )
2017-06-29 01:09:44 +03:00
cmd . Flags ( ) . StringVar ( & grpcProxyLeasing , "experimental-leasing-prefix" , "" , "leasing metadata prefix for disconnected linearized reads." )
2016-06-29 08:25:10 +03:00
return & cmd
}
func startGRPCProxy ( cmd * cobra . Command , args [ ] string ) {
2017-07-18 01:36:19 +03:00
checkArgs ( )
tlsinfo := newTLS ( grpcProxyListenCA , grpcProxyListenCert , grpcProxyListenKey )
2017-07-19 00:09:32 +03:00
if tlsinfo == nil && grpcProxyListenAutoTLS {
host := [ ] string { "https://" + grpcProxyListenAddr }
dir := filepath . Join ( grpcProxyDataDir , "fixtures" , "proxy" )
autoTLS , err := transport . SelfCert ( dir , host )
if err != nil {
plog . Fatal ( err )
}
tlsinfo = & autoTLS
}
2017-07-18 01:36:19 +03:00
if tlsinfo != nil {
plog . Infof ( "ServerTLS: %s" , tlsinfo )
}
m := mustListenCMux ( tlsinfo )
grpcl := m . Match ( cmux . HTTP2 ( ) )
defer func ( ) {
grpcl . Close ( )
plog . Infof ( "stopping listening for grpc-proxy client requests on %s" , grpcProxyListenAddr )
} ( )
client := mustNewClient ( )
2017-07-29 01:24:17 +03:00
srvhttp , httpl := mustHTTPListener ( m , tlsinfo , client )
2017-07-18 01:36:19 +03:00
errc := make ( chan error )
go func ( ) { errc <- newGRPCProxyServer ( client ) . Serve ( grpcl ) } ( )
go func ( ) { errc <- srvhttp . Serve ( httpl ) } ( )
go func ( ) { errc <- m . Serve ( ) } ( )
if len ( grpcProxyMetricsListenAddr ) > 0 {
mhttpl := mustMetricsListener ( tlsinfo )
go func ( ) {
mux := http . NewServeMux ( )
2017-07-24 21:22:49 +03:00
etcdhttp . HandlePrometheus ( mux )
2017-07-29 01:24:17 +03:00
grpcproxy . HandleHealth ( mux , client )
2017-07-18 01:36:19 +03:00
plog . Fatal ( http . Serve ( mhttpl , mux ) )
} ( )
}
// grpc-proxy is initialized, ready to serve
notifySystemd ( )
fmt . Fprintln ( os . Stderr , <- errc )
os . Exit ( 1 )
}
func checkArgs ( ) {
2017-02-28 04:09:36 +03:00
if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL < 1 {
fmt . Fprintln ( os . Stderr , fmt . Errorf ( "invalid resolver-ttl %d" , grpcProxyResolverTTL ) )
os . Exit ( 1 )
}
if grpcProxyResolverPrefix == "" && grpcProxyResolverTTL > 0 {
fmt . Fprintln ( os . Stderr , fmt . Errorf ( "invalid resolver-prefix %q" , grpcProxyResolverPrefix ) )
os . Exit ( 1 )
}
if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL > 0 && grpcProxyAdvertiseClientURL == "" {
fmt . Fprintln ( os . Stderr , fmt . Errorf ( "invalid advertise-client-url %q" , grpcProxyAdvertiseClientURL ) )
os . Exit ( 1 )
}
2017-07-18 01:36:19 +03:00
}
2017-02-28 04:09:36 +03:00
2017-07-18 01:36:19 +03:00
func mustNewClient ( ) * clientv3 . Client {
2017-05-05 07:57:54 +03:00
srvs := discoverEndpoints ( grpcProxyDNSCluster , grpcProxyCA , grpcProxyInsecureDiscovery )
2017-07-18 01:36:19 +03:00
eps := srvs . Endpoints
if len ( eps ) == 0 {
eps = grpcProxyEndpoints
2017-04-06 01:25:22 +03:00
}
2017-07-18 01:36:19 +03:00
cfg , err := newClientCfg ( eps )
2016-06-29 08:25:10 +03:00
if err != nil {
fmt . Fprintln ( os . Stderr , err )
os . Exit ( 1 )
}
2017-07-24 12:09:37 +03:00
cfg . DialOptions = append ( cfg . DialOptions ,
grpc . WithUnaryInterceptor ( grpcproxy . AuthUnaryClientInterceptor ) )
2017-07-28 10:28:28 +03:00
cfg . DialOptions = append ( cfg . DialOptions ,
grpc . WithStreamInterceptor ( grpcproxy . AuthStreamClientInterceptor ) )
2017-07-18 01:36:19 +03:00
client , err := clientv3 . New ( * cfg )
if err != nil {
2016-11-19 03:34:54 +03:00
fmt . Fprintln ( os . Stderr , err )
os . Exit ( 1 )
}
2017-07-18 01:36:19 +03:00
return client
}
func newClientCfg ( eps [ ] string ) ( * clientv3 . Config , error ) {
// set tls if any one tls option set
cfg := clientv3 . Config {
Endpoints : eps ,
DialTimeout : 5 * time . Second ,
}
2017-07-19 00:09:32 +03:00
tls := newTLS ( grpcProxyCA , grpcProxyCert , grpcProxyKey )
if tls == nil && grpcProxyInsecureSkipTLSVerify {
tls = & transport . TLSInfo { }
}
if tls != nil {
2017-07-18 01:36:19 +03:00
clientTLS , err := tls . ClientConfig ( )
if err != nil {
return nil , err
}
2017-07-19 00:09:32 +03:00
clientTLS . InsecureSkipVerify = grpcProxyInsecureSkipTLSVerify
2017-07-18 01:36:19 +03:00
cfg . TLS = clientTLS
plog . Infof ( "ClientTLS: %s" , tls )
}
return & cfg , nil
}
2016-06-29 08:25:10 +03:00
2017-07-18 01:36:19 +03:00
func newTLS ( ca , cert , key string ) * transport . TLSInfo {
if ca == "" && cert == "" && key == "" {
return nil
}
return & transport . TLSInfo { CAFile : ca , CertFile : cert , KeyFile : key }
}
func mustListenCMux ( tlsinfo * transport . TLSInfo ) cmux . CMux {
l , err := net . Listen ( "tcp" , grpcProxyListenAddr )
2016-06-29 08:25:10 +03:00
if err != nil {
fmt . Fprintln ( os . Stderr , err )
os . Exit ( 1 )
}
2017-07-19 00:59:35 +03:00
if l , err = transport . NewKeepAliveListener ( l , "tcp" , nil ) ; err != nil {
fmt . Fprintln ( os . Stderr , err )
os . Exit ( 1 )
}
2017-07-18 01:36:19 +03:00
if tlsinfo != nil {
2017-07-19 00:59:35 +03:00
tlsinfo . CRLFile = grpcProxyListenCRL
if l , err = transport . NewTLSListener ( l , tlsinfo ) ; err != nil {
2017-07-18 01:36:19 +03:00
plog . Fatal ( err )
}
2016-06-29 08:25:10 +03:00
}
2017-07-19 00:59:35 +03:00
2017-07-18 01:36:19 +03:00
plog . Infof ( "listening for grpc-proxy client requests on %s" , grpcProxyListenAddr )
return cmux . New ( l )
}
2016-06-29 08:25:10 +03:00
2017-07-18 01:36:19 +03:00
func newGRPCProxyServer ( client * clientv3 . Client ) * grpc . Server {
2017-07-27 03:10:10 +03:00
if grpcProxyEnableOrdering {
vf := ordering . NewOrderViolationSwitchEndpointClosure ( * client )
client . KV = ordering . NewKV ( client . KV , vf )
plog . Infof ( "waiting for linearized read from cluster to recover ordering" )
for {
_ , err := client . KV . Get ( context . TODO ( ) , "_" , clientv3 . WithKeysOnly ( ) )
if err == nil {
break
}
plog . Warningf ( "ordering recovery failed, retrying in 1s (%v)" , err )
time . Sleep ( time . Second )
}
}
2017-03-20 05:30:21 +03:00
if len ( grpcProxyNamespace ) > 0 {
client . KV = namespace . NewKV ( client . KV , grpcProxyNamespace )
client . Watcher = namespace . NewWatcher ( client . Watcher , grpcProxyNamespace )
client . Lease = namespace . NewLease ( client . Lease , grpcProxyNamespace )
}
2017-06-29 01:09:44 +03:00
if len ( grpcProxyLeasing ) > 0 {
2017-08-18 23:16:49 +03:00
client . KV , _ , _ = leasing . NewKV ( client , grpcProxyLeasing )
2017-06-29 01:09:44 +03:00
}
2017-01-14 03:20:09 +03:00
kvp , _ := grpcproxy . NewKvProxy ( client )
2016-12-30 22:24:23 +03:00
watchp , _ := grpcproxy . NewWatchProxy ( client )
2017-02-28 04:09:36 +03:00
if grpcProxyResolverPrefix != "" {
grpcproxy . Register ( client , grpcProxyResolverPrefix , grpcProxyAdvertiseClientURL , grpcProxyResolverTTL )
}
clusterp , _ := grpcproxy . NewClusterProxy ( client , grpcProxyAdvertiseClientURL , grpcProxyResolverPrefix )
2017-02-14 00:38:46 +03:00
leasep , _ := grpcproxy . NewLeaseProxy ( client )
2016-07-18 23:26:27 +03:00
mainp := grpcproxy . NewMaintenanceProxy ( client )
2016-07-19 00:05:41 +03:00
authp := grpcproxy . NewAuthProxy ( client )
2017-05-10 22:19:09 +03:00
electionp := grpcproxy . NewElectionProxy ( client )
lockp := grpcproxy . NewLockProxy ( client )
2016-06-29 08:25:10 +03:00
2016-11-19 03:34:54 +03:00
server := grpc . NewServer (
grpc . StreamInterceptor ( grpc_prometheus . StreamServerInterceptor ) ,
grpc . UnaryInterceptor ( grpc_prometheus . UnaryServerInterceptor ) ,
2017-07-18 01:36:19 +03:00
grpc . MaxConcurrentStreams ( math . MaxUint32 ) ,
2016-11-19 03:34:54 +03:00
)
2017-07-18 01:36:19 +03:00
2016-06-29 08:25:10 +03:00
pb . RegisterKVServer ( server , kvp )
2016-06-29 07:16:52 +03:00
pb . RegisterWatchServer ( server , watchp )
2016-07-16 22:15:24 +03:00
pb . RegisterClusterServer ( server , clusterp )
2016-07-16 20:00:14 +03:00
pb . RegisterLeaseServer ( server , leasep )
2016-07-18 23:26:27 +03:00
pb . RegisterMaintenanceServer ( server , mainp )
2016-07-19 00:05:41 +03:00
pb . RegisterAuthServer ( server , authp )
2017-05-10 22:19:09 +03:00
v3electionpb . RegisterElectionServer ( server , electionp )
v3lockpb . RegisterLockServer ( server , lockp )
2017-11-15 13:21:29 +03:00
// set zero values for metrics registered for this grpc server
grpc_prometheus . Register ( server )
2017-07-18 01:36:19 +03:00
return server
}
2016-11-19 03:34:54 +03:00
2017-07-29 01:24:17 +03:00
func mustHTTPListener ( m cmux . CMux , tlsinfo * transport . TLSInfo , c * clientv3 . Client ) ( * http . Server , net . Listener ) {
2016-11-19 03:34:54 +03:00
httpmux := http . NewServeMux ( )
httpmux . HandleFunc ( "/" , http . NotFound )
2017-07-24 21:22:49 +03:00
etcdhttp . HandlePrometheus ( httpmux )
2017-07-29 01:24:17 +03:00
grpcproxy . HandleHealth ( httpmux , c )
2017-04-04 01:15:47 +03:00
if grpcProxyEnablePprof {
for p , h := range debugutil . PProfHandlers ( ) {
httpmux . Handle ( p , h )
}
plog . Infof ( "pprof is enabled under %s" , debugutil . HTTPPrefixPProf )
}
2017-07-18 01:36:19 +03:00
srvhttp := & http . Server { Handler : httpmux }
2017-04-04 01:15:47 +03:00
2017-07-18 01:36:19 +03:00
if tlsinfo == nil {
return srvhttp , m . Match ( cmux . HTTP1 ( ) )
2016-11-21 21:39:34 +03:00
}
2017-07-12 20:06:58 +03:00
2017-07-18 01:36:19 +03:00
srvTLS , err := tlsinfo . ServerConfig ( )
if err != nil {
plog . Fatalf ( "could not setup TLS (%v)" , err )
2017-07-12 20:06:58 +03:00
}
2017-07-18 01:36:19 +03:00
srvhttp . TLSConfig = srvTLS
return srvhttp , m . Match ( cmux . Any ( ) )
2016-06-29 08:25:10 +03:00
}
2017-07-18 01:36:19 +03:00
func mustMetricsListener ( tlsinfo * transport . TLSInfo ) net . Listener {
murl , err := url . Parse ( grpcProxyMetricsListenAddr )
if err != nil {
fmt . Fprintf ( os . Stderr , "cannot parse %q" , grpcProxyMetricsListenAddr )
os . Exit ( 1 )
2016-06-29 08:25:10 +03:00
}
2017-07-18 01:36:19 +03:00
ml , err := transport . NewListener ( murl . Host , murl . Scheme , tlsinfo )
if err != nil {
fmt . Fprintln ( os . Stderr , err )
os . Exit ( 1 )
2016-06-29 08:25:10 +03:00
}
2017-07-18 01:36:19 +03:00
plog . Info ( "grpc-proxy: listening for metrics on " , murl . String ( ) )
return ml
2016-06-29 08:25:10 +03:00
}