2016-06-29 08:25:10 +03:00
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdmain
import (
2016-11-21 21:39:34 +03:00
"crypto/tls"
2016-06-29 08:25:10 +03:00
"fmt"
"net"
2016-11-19 03:34:54 +03:00
"net/http"
2017-07-12 20:06:58 +03:00
"net/url"
2016-06-29 08:25:10 +03:00
"os"
"time"
"github.com/coreos/etcd/clientv3"
2017-03-20 05:30:21 +03:00
"github.com/coreos/etcd/clientv3/namespace"
2017-05-10 22:19:09 +03:00
"github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
"github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
2016-06-29 08:25:10 +03:00
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
2017-04-04 01:15:47 +03:00
"github.com/coreos/etcd/pkg/debugutil"
2016-06-29 08:25:10 +03:00
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/proxy/grpcproxy"
2016-11-19 03:34:54 +03:00
"github.com/cockroachdb/cmux"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
2017-02-28 04:09:36 +03:00
"github.com/spf13/cobra"
"google.golang.org/grpc"
2016-06-29 08:25:10 +03:00
)
var (
2017-04-06 01:25:22 +03:00
grpcProxyListenAddr string
2017-07-12 20:06:58 +03:00
grpcProxyMetricsListenAddr string
2017-04-06 01:25:22 +03:00
grpcProxyEndpoints [ ] string
grpcProxyDNSCluster string
grpcProxyInsecureDiscovery bool
grpcProxyCert string
grpcProxyKey string
grpcProxyCA string
2017-03-20 05:30:21 +03:00
2017-02-28 04:09:36 +03:00
grpcProxyAdvertiseClientURL string
grpcProxyResolverPrefix string
grpcProxyResolverTTL int
2017-03-20 05:30:21 +03:00
grpcProxyNamespace string
2017-04-04 01:15:47 +03:00
grpcProxyEnablePprof bool
2016-06-29 08:25:10 +03:00
)
func init ( ) {
rootCmd . AddCommand ( newGRPCProxyCommand ( ) )
}
// newGRPCProxyCommand returns the cobra command for "grpc-proxy".
func newGRPCProxyCommand ( ) * cobra . Command {
lpc := & cobra . Command {
Use : "grpc-proxy <subcommand>" ,
Short : "grpc-proxy related command" ,
}
lpc . AddCommand ( newGRPCProxyStartCommand ( ) )
return lpc
}
func newGRPCProxyStartCommand ( ) * cobra . Command {
cmd := cobra . Command {
Use : "start" ,
Short : "start the grpc proxy" ,
Run : startGRPCProxy ,
}
cmd . Flags ( ) . StringVar ( & grpcProxyListenAddr , "listen-addr" , "127.0.0.1:23790" , "listen address" )
2017-04-06 01:25:22 +03:00
cmd . Flags ( ) . StringVar ( & grpcProxyDNSCluster , "discovery-srv" , "" , "DNS domain used to bootstrap initial cluster" )
2017-07-12 20:06:58 +03:00
cmd . Flags ( ) . StringVar ( & grpcProxyMetricsListenAddr , "metrics-addr" , "" , "listen for /metrics requests on an additional interface" )
2017-04-06 01:25:22 +03:00
cmd . Flags ( ) . BoolVar ( & grpcProxyInsecureDiscovery , "insecure-discovery" , false , "accept insecure SRV records" )
2016-06-29 08:25:10 +03:00
cmd . Flags ( ) . StringSliceVar ( & grpcProxyEndpoints , "endpoints" , [ ] string { "127.0.0.1:2379" } , "comma separated etcd cluster endpoints" )
cmd . Flags ( ) . StringVar ( & grpcProxyCert , "cert" , "" , "identify secure connections with etcd servers using this TLS certificate file" )
cmd . Flags ( ) . StringVar ( & grpcProxyKey , "key" , "" , "identify secure connections with etcd servers using this TLS key file" )
cmd . Flags ( ) . StringVar ( & grpcProxyCA , "cacert" , "" , "verify certificates of TLS-enabled secure etcd servers using this CA bundle" )
2017-02-28 04:09:36 +03:00
cmd . Flags ( ) . StringVar ( & grpcProxyAdvertiseClientURL , "advertise-client-url" , "127.0.0.1:23790" , "advertise address to register (must be reachable by client)" )
cmd . Flags ( ) . StringVar ( & grpcProxyResolverPrefix , "resolver-prefix" , "" , "prefix to use for registering proxy (must be shared with other grpc-proxy members)" )
cmd . Flags ( ) . IntVar ( & grpcProxyResolverTTL , "resolver-ttl" , 0 , "specify TTL, in seconds, when registering proxy endpoints" )
2017-03-20 05:30:21 +03:00
cmd . Flags ( ) . StringVar ( & grpcProxyNamespace , "namespace" , "" , "string to prefix to all keys for namespacing requests" )
2017-04-04 01:15:47 +03:00
cmd . Flags ( ) . BoolVar ( & grpcProxyEnablePprof , "enable-pprof" , false , ` Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/" ` )
2016-06-29 08:25:10 +03:00
return & cmd
}
func startGRPCProxy ( cmd * cobra . Command , args [ ] string ) {
2017-02-28 04:09:36 +03:00
if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL < 1 {
fmt . Fprintln ( os . Stderr , fmt . Errorf ( "invalid resolver-ttl %d" , grpcProxyResolverTTL ) )
os . Exit ( 1 )
}
if grpcProxyResolverPrefix == "" && grpcProxyResolverTTL > 0 {
fmt . Fprintln ( os . Stderr , fmt . Errorf ( "invalid resolver-prefix %q" , grpcProxyResolverPrefix ) )
os . Exit ( 1 )
}
if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL > 0 && grpcProxyAdvertiseClientURL == "" {
fmt . Fprintln ( os . Stderr , fmt . Errorf ( "invalid advertise-client-url %q" , grpcProxyAdvertiseClientURL ) )
os . Exit ( 1 )
}
2017-05-05 07:57:54 +03:00
srvs := discoverEndpoints ( grpcProxyDNSCluster , grpcProxyCA , grpcProxyInsecureDiscovery )
if len ( srvs . Endpoints ) != 0 {
grpcProxyEndpoints = srvs . Endpoints
2017-04-06 01:25:22 +03:00
}
2016-06-29 08:25:10 +03:00
l , err := net . Listen ( "tcp" , grpcProxyListenAddr )
if err != nil {
fmt . Fprintln ( os . Stderr , err )
os . Exit ( 1 )
}
2016-11-19 03:34:54 +03:00
if l , err = transport . NewKeepAliveListener ( l , "tcp" , nil ) ; err != nil {
fmt . Fprintln ( os . Stderr , err )
os . Exit ( 1 )
}
plog . Infof ( "listening for grpc-proxy client requests on %s" , grpcProxyListenAddr )
defer func ( ) {
l . Close ( )
plog . Infof ( "stopping listening for grpc-proxy client requests on %s" , grpcProxyListenAddr )
} ( )
m := cmux . New ( l )
2016-06-29 08:25:10 +03:00
2017-07-12 20:06:58 +03:00
cfg , cfgtls , err := newClientCfg ( )
2016-06-29 08:25:10 +03:00
if err != nil {
fmt . Fprintln ( os . Stderr , err )
os . Exit ( 1 )
}
client , err := clientv3 . New ( * cfg )
if err != nil {
fmt . Fprintln ( os . Stderr , err )
os . Exit ( 1 )
}
2017-03-20 05:30:21 +03:00
if len ( grpcProxyNamespace ) > 0 {
client . KV = namespace . NewKV ( client . KV , grpcProxyNamespace )
client . Watcher = namespace . NewWatcher ( client . Watcher , grpcProxyNamespace )
client . Lease = namespace . NewLease ( client . Lease , grpcProxyNamespace )
}
2017-01-14 03:20:09 +03:00
kvp , _ := grpcproxy . NewKvProxy ( client )
2016-12-30 22:24:23 +03:00
watchp , _ := grpcproxy . NewWatchProxy ( client )
2017-02-28 04:09:36 +03:00
if grpcProxyResolverPrefix != "" {
grpcproxy . Register ( client , grpcProxyResolverPrefix , grpcProxyAdvertiseClientURL , grpcProxyResolverTTL )
}
clusterp , _ := grpcproxy . NewClusterProxy ( client , grpcProxyAdvertiseClientURL , grpcProxyResolverPrefix )
2017-02-14 00:38:46 +03:00
leasep , _ := grpcproxy . NewLeaseProxy ( client )
2016-07-18 23:26:27 +03:00
mainp := grpcproxy . NewMaintenanceProxy ( client )
2016-07-19 00:05:41 +03:00
authp := grpcproxy . NewAuthProxy ( client )
2017-05-10 22:19:09 +03:00
electionp := grpcproxy . NewElectionProxy ( client )
lockp := grpcproxy . NewLockProxy ( client )
2016-06-29 08:25:10 +03:00
2016-11-19 03:34:54 +03:00
server := grpc . NewServer (
grpc . StreamInterceptor ( grpc_prometheus . StreamServerInterceptor ) ,
grpc . UnaryInterceptor ( grpc_prometheus . UnaryServerInterceptor ) ,
)
2016-06-29 08:25:10 +03:00
pb . RegisterKVServer ( server , kvp )
2016-06-29 07:16:52 +03:00
pb . RegisterWatchServer ( server , watchp )
2016-07-16 22:15:24 +03:00
pb . RegisterClusterServer ( server , clusterp )
2016-07-16 20:00:14 +03:00
pb . RegisterLeaseServer ( server , leasep )
2016-07-18 23:26:27 +03:00
pb . RegisterMaintenanceServer ( server , mainp )
2016-07-19 00:05:41 +03:00
pb . RegisterAuthServer ( server , authp )
2017-05-10 22:19:09 +03:00
v3electionpb . RegisterElectionServer ( server , electionp )
v3lockpb . RegisterLockServer ( server , lockp )
2016-11-19 03:34:54 +03:00
errc := make ( chan error )
grpcl := m . Match ( cmux . HTTP2 ( ) )
go func ( ) { errc <- server . Serve ( grpcl ) } ( )
httpmux := http . NewServeMux ( )
httpmux . HandleFunc ( "/" , http . NotFound )
httpmux . Handle ( "/metrics" , prometheus . Handler ( ) )
2017-04-04 01:15:47 +03:00
if grpcProxyEnablePprof {
for p , h := range debugutil . PProfHandlers ( ) {
httpmux . Handle ( p , h )
}
plog . Infof ( "pprof is enabled under %s" , debugutil . HTTPPrefixPProf )
}
2016-11-19 03:34:54 +03:00
srvhttp := & http . Server {
Handler : httpmux ,
}
2016-11-21 21:39:34 +03:00
var httpl net . Listener
if cfg . TLS != nil {
srvhttp . TLSConfig = cfg . TLS
httpl = tls . NewListener ( m . Match ( cmux . Any ( ) ) , cfg . TLS )
} else {
httpl = m . Match ( cmux . HTTP1 ( ) )
}
2016-11-19 03:34:54 +03:00
go func ( ) { errc <- srvhttp . Serve ( httpl ) } ( )
go func ( ) { errc <- m . Serve ( ) } ( )
2016-06-29 08:25:10 +03:00
2017-07-12 20:06:58 +03:00
if len ( grpcProxyMetricsListenAddr ) > 0 {
murl , err := url . Parse ( grpcProxyMetricsListenAddr )
if err != nil {
fmt . Fprintf ( os . Stderr , "cannot parse %q" , grpcProxyMetricsListenAddr )
os . Exit ( 1 )
}
ml , err := transport . NewListener ( murl . Host , murl . Scheme , cfgtls )
if err != nil {
fmt . Fprintln ( os . Stderr , err )
os . Exit ( 1 )
}
mux := http . NewServeMux ( )
mux . Handle ( "/metrics" , prometheus . Handler ( ) )
go func ( ) {
plog . Info ( "grpc-proxy: listening for metrics on " , murl . String ( ) )
plog . Fatal ( http . Serve ( ml , mux ) )
} ( )
}
2017-03-09 20:11:43 +03:00
// grpc-proxy is initialized, ready to serve
notifySystemd ( )
2016-11-19 03:34:54 +03:00
fmt . Fprintln ( os . Stderr , <- errc )
os . Exit ( 1 )
2016-06-29 08:25:10 +03:00
}
2017-07-12 20:06:58 +03:00
func newClientCfg ( ) ( * clientv3 . Config , * transport . TLSInfo , error ) {
2016-06-29 08:25:10 +03:00
// set tls if any one tls option set
var cfgtls * transport . TLSInfo
tlsinfo := transport . TLSInfo { }
if grpcProxyCert != "" {
tlsinfo . CertFile = grpcProxyCert
cfgtls = & tlsinfo
}
if grpcProxyKey != "" {
tlsinfo . KeyFile = grpcProxyKey
cfgtls = & tlsinfo
}
if grpcProxyCA != "" {
tlsinfo . CAFile = grpcProxyCA
cfgtls = & tlsinfo
}
cfg := clientv3 . Config {
Endpoints : grpcProxyEndpoints ,
DialTimeout : 5 * time . Second ,
}
if cfgtls != nil {
clientTLS , err := cfgtls . ClientConfig ( )
if err != nil {
2017-07-12 20:06:58 +03:00
return nil , nil , err
2016-06-29 08:25:10 +03:00
}
cfg . TLS = clientTLS
}
// TODO: support insecure tls
2017-07-12 20:06:58 +03:00
return & cfg , cfgtls , nil
2016-06-29 08:25:10 +03:00
}