2016-06-29 08:25:10 +03:00
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdmain
import (
2016-11-21 21:39:34 +03:00
"crypto/tls"
2016-06-29 08:25:10 +03:00
"fmt"
2017-07-18 01:36:19 +03:00
"math"
2016-06-29 08:25:10 +03:00
"net"
2016-11-19 03:34:54 +03:00
"net/http"
2017-07-12 20:06:58 +03:00
"net/url"
2016-06-29 08:25:10 +03:00
"os"
"time"
"github.com/coreos/etcd/clientv3"
2017-03-20 05:30:21 +03:00
"github.com/coreos/etcd/clientv3/namespace"
2017-05-10 22:19:09 +03:00
"github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
"github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
2016-06-29 08:25:10 +03:00
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
2017-04-04 01:15:47 +03:00
"github.com/coreos/etcd/pkg/debugutil"
2016-06-29 08:25:10 +03:00
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/proxy/grpcproxy"
2016-11-19 03:34:54 +03:00
"github.com/cockroachdb/cmux"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
2017-02-28 04:09:36 +03:00
"github.com/spf13/cobra"
"google.golang.org/grpc"
2016-06-29 08:25:10 +03:00
)
var (
2017-04-06 01:25:22 +03:00
grpcProxyListenAddr string
2017-07-12 20:06:58 +03:00
grpcProxyMetricsListenAddr string
2017-04-06 01:25:22 +03:00
grpcProxyEndpoints [ ] string
grpcProxyDNSCluster string
grpcProxyInsecureDiscovery bool
2017-07-18 01:36:19 +03:00
// tls for connecting to etcd
grpcProxyCA string
grpcProxyCert string
grpcProxyKey string
// tls for clients connecting to proxy
grpcProxyListenCA string
grpcProxyListenCert string
grpcProxyListenKey string
2017-03-20 05:30:21 +03:00
2017-02-28 04:09:36 +03:00
grpcProxyAdvertiseClientURL string
grpcProxyResolverPrefix string
grpcProxyResolverTTL int
2017-03-20 05:30:21 +03:00
grpcProxyNamespace string
2017-04-04 01:15:47 +03:00
grpcProxyEnablePprof bool
2016-06-29 08:25:10 +03:00
)
func init ( ) {
rootCmd . AddCommand ( newGRPCProxyCommand ( ) )
}
// newGRPCProxyCommand returns the cobra command for "grpc-proxy".
func newGRPCProxyCommand ( ) * cobra . Command {
lpc := & cobra . Command {
Use : "grpc-proxy <subcommand>" ,
Short : "grpc-proxy related command" ,
}
lpc . AddCommand ( newGRPCProxyStartCommand ( ) )
return lpc
}
func newGRPCProxyStartCommand ( ) * cobra . Command {
cmd := cobra . Command {
Use : "start" ,
Short : "start the grpc proxy" ,
Run : startGRPCProxy ,
}
cmd . Flags ( ) . StringVar ( & grpcProxyListenAddr , "listen-addr" , "127.0.0.1:23790" , "listen address" )
2017-04-06 01:25:22 +03:00
cmd . Flags ( ) . StringVar ( & grpcProxyDNSCluster , "discovery-srv" , "" , "DNS domain used to bootstrap initial cluster" )
2017-07-12 20:06:58 +03:00
cmd . Flags ( ) . StringVar ( & grpcProxyMetricsListenAddr , "metrics-addr" , "" , "listen for /metrics requests on an additional interface" )
2017-04-06 01:25:22 +03:00
cmd . Flags ( ) . BoolVar ( & grpcProxyInsecureDiscovery , "insecure-discovery" , false , "accept insecure SRV records" )
2016-06-29 08:25:10 +03:00
cmd . Flags ( ) . StringSliceVar ( & grpcProxyEndpoints , "endpoints" , [ ] string { "127.0.0.1:2379" } , "comma separated etcd cluster endpoints" )
2017-02-28 04:09:36 +03:00
cmd . Flags ( ) . StringVar ( & grpcProxyAdvertiseClientURL , "advertise-client-url" , "127.0.0.1:23790" , "advertise address to register (must be reachable by client)" )
cmd . Flags ( ) . StringVar ( & grpcProxyResolverPrefix , "resolver-prefix" , "" , "prefix to use for registering proxy (must be shared with other grpc-proxy members)" )
cmd . Flags ( ) . IntVar ( & grpcProxyResolverTTL , "resolver-ttl" , 0 , "specify TTL, in seconds, when registering proxy endpoints" )
2017-03-20 05:30:21 +03:00
cmd . Flags ( ) . StringVar ( & grpcProxyNamespace , "namespace" , "" , "string to prefix to all keys for namespacing requests" )
2017-04-04 01:15:47 +03:00
cmd . Flags ( ) . BoolVar ( & grpcProxyEnablePprof , "enable-pprof" , false , ` Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/" ` )
2016-06-29 08:25:10 +03:00
2017-07-18 01:36:19 +03:00
// client TLS for connecting to server
cmd . Flags ( ) . StringVar ( & grpcProxyCert , "cert" , "" , "identify secure connections with etcd servers using this TLS certificate file" )
cmd . Flags ( ) . StringVar ( & grpcProxyKey , "key" , "" , "identify secure connections with etcd servers using this TLS key file" )
cmd . Flags ( ) . StringVar ( & grpcProxyCA , "cacert" , "" , "verify certificates of TLS-enabled secure etcd servers using this CA bundle" )
// client TLS for connecting to proxy
cmd . Flags ( ) . StringVar ( & grpcProxyListenCert , "cert-file" , "" , "identify secure connections to the proxy using this TLS certificate file" )
cmd . Flags ( ) . StringVar ( & grpcProxyListenKey , "key-file" , "" , "identify secure connections to the proxy using this TLS key file" )
cmd . Flags ( ) . StringVar ( & grpcProxyListenCA , "trusted-ca-file" , "" , "verify certificates of TLS-enabled secure proxy using this CA bundle" )
2016-06-29 08:25:10 +03:00
return & cmd
}
func startGRPCProxy ( cmd * cobra . Command , args [ ] string ) {
2017-07-18 01:36:19 +03:00
checkArgs ( )
tlsinfo := newTLS ( grpcProxyListenCA , grpcProxyListenCert , grpcProxyListenKey )
if tlsinfo != nil {
plog . Infof ( "ServerTLS: %s" , tlsinfo )
}
m := mustListenCMux ( tlsinfo )
grpcl := m . Match ( cmux . HTTP2 ( ) )
defer func ( ) {
grpcl . Close ( )
plog . Infof ( "stopping listening for grpc-proxy client requests on %s" , grpcProxyListenAddr )
} ( )
client := mustNewClient ( )
srvhttp , httpl := mustHTTPListener ( m , tlsinfo )
errc := make ( chan error )
go func ( ) { errc <- newGRPCProxyServer ( client ) . Serve ( grpcl ) } ( )
go func ( ) { errc <- srvhttp . Serve ( httpl ) } ( )
go func ( ) { errc <- m . Serve ( ) } ( )
if len ( grpcProxyMetricsListenAddr ) > 0 {
mhttpl := mustMetricsListener ( tlsinfo )
go func ( ) {
mux := http . NewServeMux ( )
mux . Handle ( "/metrics" , prometheus . Handler ( ) )
plog . Fatal ( http . Serve ( mhttpl , mux ) )
} ( )
}
// grpc-proxy is initialized, ready to serve
notifySystemd ( )
fmt . Fprintln ( os . Stderr , <- errc )
os . Exit ( 1 )
}
func checkArgs ( ) {
2017-02-28 04:09:36 +03:00
if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL < 1 {
fmt . Fprintln ( os . Stderr , fmt . Errorf ( "invalid resolver-ttl %d" , grpcProxyResolverTTL ) )
os . Exit ( 1 )
}
if grpcProxyResolverPrefix == "" && grpcProxyResolverTTL > 0 {
fmt . Fprintln ( os . Stderr , fmt . Errorf ( "invalid resolver-prefix %q" , grpcProxyResolverPrefix ) )
os . Exit ( 1 )
}
if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL > 0 && grpcProxyAdvertiseClientURL == "" {
fmt . Fprintln ( os . Stderr , fmt . Errorf ( "invalid advertise-client-url %q" , grpcProxyAdvertiseClientURL ) )
os . Exit ( 1 )
}
2017-07-18 01:36:19 +03:00
}
2017-02-28 04:09:36 +03:00
2017-07-18 01:36:19 +03:00
func mustNewClient ( ) * clientv3 . Client {
2017-05-05 07:57:54 +03:00
srvs := discoverEndpoints ( grpcProxyDNSCluster , grpcProxyCA , grpcProxyInsecureDiscovery )
2017-07-18 01:36:19 +03:00
eps := srvs . Endpoints
if len ( eps ) == 0 {
eps = grpcProxyEndpoints
2017-04-06 01:25:22 +03:00
}
2017-07-18 01:36:19 +03:00
cfg , err := newClientCfg ( eps )
2016-06-29 08:25:10 +03:00
if err != nil {
fmt . Fprintln ( os . Stderr , err )
os . Exit ( 1 )
}
2017-07-18 01:36:19 +03:00
client , err := clientv3 . New ( * cfg )
if err != nil {
2016-11-19 03:34:54 +03:00
fmt . Fprintln ( os . Stderr , err )
os . Exit ( 1 )
}
2017-07-18 01:36:19 +03:00
return client
}
func newClientCfg ( eps [ ] string ) ( * clientv3 . Config , error ) {
// set tls if any one tls option set
cfg := clientv3 . Config {
Endpoints : eps ,
DialTimeout : 5 * time . Second ,
}
if tls := newTLS ( grpcProxyCA , grpcProxyCert , grpcProxyKey ) ; tls != nil {
clientTLS , err := tls . ClientConfig ( )
if err != nil {
return nil , err
}
cfg . TLS = clientTLS
plog . Infof ( "ClientTLS: %s" , tls )
}
// TODO: support insecure tls
return & cfg , nil
}
2016-06-29 08:25:10 +03:00
2017-07-18 01:36:19 +03:00
func newTLS ( ca , cert , key string ) * transport . TLSInfo {
if ca == "" && cert == "" && key == "" {
return nil
}
return & transport . TLSInfo { CAFile : ca , CertFile : cert , KeyFile : key }
}
func mustListenCMux ( tlsinfo * transport . TLSInfo ) cmux . CMux {
l , err := net . Listen ( "tcp" , grpcProxyListenAddr )
2016-06-29 08:25:10 +03:00
if err != nil {
fmt . Fprintln ( os . Stderr , err )
os . Exit ( 1 )
}
2017-07-18 01:36:19 +03:00
var tlscfg * tls . Config
scheme := "http"
if tlsinfo != nil {
if tlscfg , err = tlsinfo . ServerConfig ( ) ; err != nil {
plog . Fatal ( err )
}
scheme = "https"
}
if l , err = transport . NewKeepAliveListener ( l , scheme , tlscfg ) ; err != nil {
2016-06-29 08:25:10 +03:00
fmt . Fprintln ( os . Stderr , err )
os . Exit ( 1 )
}
2017-07-18 01:36:19 +03:00
plog . Infof ( "listening for grpc-proxy client requests on %s" , grpcProxyListenAddr )
return cmux . New ( l )
}
2016-06-29 08:25:10 +03:00
2017-07-18 01:36:19 +03:00
func newGRPCProxyServer ( client * clientv3 . Client ) * grpc . Server {
2017-03-20 05:30:21 +03:00
if len ( grpcProxyNamespace ) > 0 {
client . KV = namespace . NewKV ( client . KV , grpcProxyNamespace )
client . Watcher = namespace . NewWatcher ( client . Watcher , grpcProxyNamespace )
client . Lease = namespace . NewLease ( client . Lease , grpcProxyNamespace )
}
2017-01-14 03:20:09 +03:00
kvp , _ := grpcproxy . NewKvProxy ( client )
2016-12-30 22:24:23 +03:00
watchp , _ := grpcproxy . NewWatchProxy ( client )
2017-02-28 04:09:36 +03:00
if grpcProxyResolverPrefix != "" {
grpcproxy . Register ( client , grpcProxyResolverPrefix , grpcProxyAdvertiseClientURL , grpcProxyResolverTTL )
}
clusterp , _ := grpcproxy . NewClusterProxy ( client , grpcProxyAdvertiseClientURL , grpcProxyResolverPrefix )
2017-02-14 00:38:46 +03:00
leasep , _ := grpcproxy . NewLeaseProxy ( client )
2016-07-18 23:26:27 +03:00
mainp := grpcproxy . NewMaintenanceProxy ( client )
2016-07-19 00:05:41 +03:00
authp := grpcproxy . NewAuthProxy ( client )
2017-05-10 22:19:09 +03:00
electionp := grpcproxy . NewElectionProxy ( client )
lockp := grpcproxy . NewLockProxy ( client )
2016-06-29 08:25:10 +03:00
2016-11-19 03:34:54 +03:00
server := grpc . NewServer (
grpc . StreamInterceptor ( grpc_prometheus . StreamServerInterceptor ) ,
grpc . UnaryInterceptor ( grpc_prometheus . UnaryServerInterceptor ) ,
2017-07-18 01:36:19 +03:00
grpc . MaxConcurrentStreams ( math . MaxUint32 ) ,
2016-11-19 03:34:54 +03:00
)
2017-07-18 01:36:19 +03:00
2016-06-29 08:25:10 +03:00
pb . RegisterKVServer ( server , kvp )
2016-06-29 07:16:52 +03:00
pb . RegisterWatchServer ( server , watchp )
2016-07-16 22:15:24 +03:00
pb . RegisterClusterServer ( server , clusterp )
2016-07-16 20:00:14 +03:00
pb . RegisterLeaseServer ( server , leasep )
2016-07-18 23:26:27 +03:00
pb . RegisterMaintenanceServer ( server , mainp )
2016-07-19 00:05:41 +03:00
pb . RegisterAuthServer ( server , authp )
2017-05-10 22:19:09 +03:00
v3electionpb . RegisterElectionServer ( server , electionp )
v3lockpb . RegisterLockServer ( server , lockp )
2017-07-18 01:36:19 +03:00
return server
}
2016-11-19 03:34:54 +03:00
2017-07-18 01:36:19 +03:00
func mustHTTPListener ( m cmux . CMux , tlsinfo * transport . TLSInfo ) ( * http . Server , net . Listener ) {
2016-11-19 03:34:54 +03:00
httpmux := http . NewServeMux ( )
httpmux . HandleFunc ( "/" , http . NotFound )
httpmux . Handle ( "/metrics" , prometheus . Handler ( ) )
2017-04-04 01:15:47 +03:00
if grpcProxyEnablePprof {
for p , h := range debugutil . PProfHandlers ( ) {
httpmux . Handle ( p , h )
}
plog . Infof ( "pprof is enabled under %s" , debugutil . HTTPPrefixPProf )
}
2017-07-18 01:36:19 +03:00
srvhttp := & http . Server { Handler : httpmux }
2017-04-04 01:15:47 +03:00
2017-07-18 01:36:19 +03:00
if tlsinfo == nil {
return srvhttp , m . Match ( cmux . HTTP1 ( ) )
2016-11-21 21:39:34 +03:00
}
2017-07-12 20:06:58 +03:00
2017-07-18 01:36:19 +03:00
srvTLS , err := tlsinfo . ServerConfig ( )
if err != nil {
plog . Fatalf ( "could not setup TLS (%v)" , err )
2017-07-12 20:06:58 +03:00
}
2017-07-18 01:36:19 +03:00
srvhttp . TLSConfig = srvTLS
return srvhttp , m . Match ( cmux . Any ( ) )
2016-06-29 08:25:10 +03:00
}
2017-07-18 01:36:19 +03:00
func mustMetricsListener ( tlsinfo * transport . TLSInfo ) net . Listener {
murl , err := url . Parse ( grpcProxyMetricsListenAddr )
if err != nil {
fmt . Fprintf ( os . Stderr , "cannot parse %q" , grpcProxyMetricsListenAddr )
os . Exit ( 1 )
2016-06-29 08:25:10 +03:00
}
2017-07-18 01:36:19 +03:00
ml , err := transport . NewListener ( murl . Host , murl . Scheme , tlsinfo )
if err != nil {
fmt . Fprintln ( os . Stderr , err )
os . Exit ( 1 )
2016-06-29 08:25:10 +03:00
}
2017-07-18 01:36:19 +03:00
plog . Info ( "grpc-proxy: listening for metrics on " , murl . String ( ) )
return ml
2016-06-29 08:25:10 +03:00
}