*: Change gRPC proxy to expose etcd server endpoint /metrics
This PR resolves an issue where the `/metrics` endpoints exposed by the proxy were not returning metrics of the etcd members servers but of the proxy itself. Signed-off-by: Sam Batschelet <sbatsche@redhat.com>release-3.4
parent
cc08c1bd2e
commit
9915d02022
|
@ -225,3 +225,28 @@ Finally, test the TLS termination by putting a key into the proxy over http:
|
||||||
$ ETCDCTL_API=3 etcdctl --endpoints=http://localhost:12379 put abc def
|
$ ETCDCTL_API=3 etcdctl --endpoints=http://localhost:12379 put abc def
|
||||||
# OK
|
# OK
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## Metrics and Health
|
||||||
|
|
||||||
|
The gRPC proxy exposes `/health` and Prometheus `/metrics` endpoints for the etcd members defined by `--endpoints`. An alternative define an additional URL that will respond to both the `/metrics` and `/health` endpoints with the `--metrics-addr` flag.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
$ etcd grpc-proxy start \
|
||||||
|
--endpoints https://localhost:2379 \
|
||||||
|
--metrics-addr https://0.0.0.0:4443 \
|
||||||
|
--listen-addr 127.0.0.1:23790 \
|
||||||
|
--key client.key \
|
||||||
|
--key-file proxy-server.key \
|
||||||
|
--cert client.crt \
|
||||||
|
--cert-file proxy-server.crt \
|
||||||
|
--cacert ca.pem \
|
||||||
|
--trusted-ca-file proxy-ca.pem
|
||||||
|
```
|
||||||
|
|
||||||
|
### Known issue
|
||||||
|
|
||||||
|
The main interface of the proxy serves both HTTP2 and HTTP/1.1. If proxy is setup with TLS as show in the above example, when using a client such as cURL against the listening interface will require explicitly setting the protocol to HTTP/1.1 on the request to return `/metrics` or `/health`. By using the `--metrics-addr` flag the secondary interface will not have this requirement.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
$ curl --cacert proxy-ca.pem --key proxy-client.key --cert proxy-client.crt https://127.0.0.1:23790/metrics --http1.1
|
||||||
|
```
|
||||||
|
|
|
@ -16,6 +16,8 @@ package etcdmain
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
|
@ -31,7 +33,6 @@ import (
|
||||||
"go.etcd.io/etcd/clientv3/leasing"
|
"go.etcd.io/etcd/clientv3/leasing"
|
||||||
"go.etcd.io/etcd/clientv3/namespace"
|
"go.etcd.io/etcd/clientv3/namespace"
|
||||||
"go.etcd.io/etcd/clientv3/ordering"
|
"go.etcd.io/etcd/clientv3/ordering"
|
||||||
"go.etcd.io/etcd/etcdserver/api/etcdhttp"
|
|
||||||
"go.etcd.io/etcd/etcdserver/api/v3election/v3electionpb"
|
"go.etcd.io/etcd/etcdserver/api/v3election/v3electionpb"
|
||||||
"go.etcd.io/etcd/etcdserver/api/v3lock/v3lockpb"
|
"go.etcd.io/etcd/etcdserver/api/v3lock/v3lockpb"
|
||||||
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
||||||
|
@ -114,7 +115,7 @@ func newGRPCProxyStartCommand() *cobra.Command {
|
||||||
cmd.Flags().StringVar(&grpcProxyListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
|
cmd.Flags().StringVar(&grpcProxyListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
|
||||||
cmd.Flags().StringVar(&grpcProxyDNSCluster, "discovery-srv", "", "domain name to query for SRV records describing cluster endpoints")
|
cmd.Flags().StringVar(&grpcProxyDNSCluster, "discovery-srv", "", "domain name to query for SRV records describing cluster endpoints")
|
||||||
cmd.Flags().StringVar(&grpcProxyDNSClusterServiceName, "discovery-srv-name", "", "service name to query when using DNS discovery")
|
cmd.Flags().StringVar(&grpcProxyDNSClusterServiceName, "discovery-srv-name", "", "service name to query when using DNS discovery")
|
||||||
cmd.Flags().StringVar(&grpcProxyMetricsListenAddr, "metrics-addr", "", "listen for /metrics requests on an additional interface")
|
cmd.Flags().StringVar(&grpcProxyMetricsListenAddr, "metrics-addr", "", "listen for endpoint /metrics requests on an additional interface")
|
||||||
cmd.Flags().BoolVar(&grpcProxyInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
|
cmd.Flags().BoolVar(&grpcProxyInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
|
||||||
cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
|
cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
|
||||||
cmd.Flags().StringVar(&grpcProxyAdvertiseClientURL, "advertise-client-url", "127.0.0.1:23790", "advertise address to register (must be reachable by client)")
|
cmd.Flags().StringVar(&grpcProxyAdvertiseClientURL, "advertise-client-url", "127.0.0.1:23790", "advertise address to register (must be reachable by client)")
|
||||||
|
@ -184,7 +185,6 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
|
||||||
lg.Info("gRPC proxy server TLS", zap.String("tls-info", fmt.Sprintf("%+v", tlsinfo)))
|
lg.Info("gRPC proxy server TLS", zap.String("tls-info", fmt.Sprintf("%+v", tlsinfo)))
|
||||||
}
|
}
|
||||||
m := mustListenCMux(lg, tlsinfo)
|
m := mustListenCMux(lg, tlsinfo)
|
||||||
|
|
||||||
grpcl := m.Match(cmux.HTTP2())
|
grpcl := m.Match(cmux.HTTP2())
|
||||||
defer func() {
|
defer func() {
|
||||||
grpcl.Close()
|
grpcl.Close()
|
||||||
|
@ -192,6 +192,7 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
client := mustNewClient(lg)
|
client := mustNewClient(lg)
|
||||||
|
httpClient := mustNewHTTPClient(lg)
|
||||||
|
|
||||||
srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client)
|
srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client)
|
||||||
errc := make(chan error)
|
errc := make(chan error)
|
||||||
|
@ -202,7 +203,7 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
|
||||||
mhttpl := mustMetricsListener(lg, tlsinfo)
|
mhttpl := mustMetricsListener(lg, tlsinfo)
|
||||||
go func() {
|
go func() {
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
etcdhttp.HandlePrometheus(mux)
|
grpcproxy.HandleMetrics(mux, httpClient, client.Endpoints())
|
||||||
grpcproxy.HandleHealth(mux, client)
|
grpcproxy.HandleHealth(mux, client)
|
||||||
lg.Info("gRPC proxy server metrics URL serving")
|
lg.Info("gRPC proxy server metrics URL serving")
|
||||||
herr := http.Serve(mhttpl, mux)
|
herr := http.Serve(mhttpl, mux)
|
||||||
|
@ -372,16 +373,14 @@ func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
|
||||||
v3electionpb.RegisterElectionServer(server, electionp)
|
v3electionpb.RegisterElectionServer(server, electionp)
|
||||||
v3lockpb.RegisterLockServer(server, lockp)
|
v3lockpb.RegisterLockServer(server, lockp)
|
||||||
|
|
||||||
// set zero values for metrics registered for this grpc server
|
|
||||||
grpc_prometheus.Register(server)
|
|
||||||
|
|
||||||
return server
|
return server
|
||||||
}
|
}
|
||||||
|
|
||||||
func mustHTTPListener(lg *zap.Logger, m cmux.CMux, tlsinfo *transport.TLSInfo, c *clientv3.Client) (*http.Server, net.Listener) {
|
func mustHTTPListener(lg *zap.Logger, m cmux.CMux, tlsinfo *transport.TLSInfo, c *clientv3.Client) (*http.Server, net.Listener) {
|
||||||
|
httpClient := mustNewHTTPClient(lg)
|
||||||
httpmux := http.NewServeMux()
|
httpmux := http.NewServeMux()
|
||||||
httpmux.HandleFunc("/", http.NotFound)
|
httpmux.HandleFunc("/", http.NotFound)
|
||||||
etcdhttp.HandlePrometheus(httpmux)
|
grpcproxy.HandleMetrics(httpmux, httpClient, c.Endpoints())
|
||||||
grpcproxy.HandleHealth(httpmux, c)
|
grpcproxy.HandleHealth(httpmux, c)
|
||||||
if grpcProxyEnablePprof {
|
if grpcProxyEnablePprof {
|
||||||
for p, h := range debugutil.PProfHandlers() {
|
for p, h := range debugutil.PProfHandlers() {
|
||||||
|
@ -406,6 +405,43 @@ func mustHTTPListener(lg *zap.Logger, m cmux.CMux, tlsinfo *transport.TLSInfo, c
|
||||||
return srvhttp, m.Match(cmux.Any())
|
return srvhttp, m.Match(cmux.Any())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func mustNewHTTPClient(lg *zap.Logger) *http.Client {
|
||||||
|
transport, err := newHTTPTransport(grpcProxyCA, grpcProxyCert, grpcProxyKey)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintln(os.Stderr, err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
return &http.Client{Transport: transport}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newHTTPTransport(ca, cert, key string) (*http.Transport, error) {
|
||||||
|
tr := &http.Transport{}
|
||||||
|
|
||||||
|
if ca != "" && cert != "" && key != "" {
|
||||||
|
caCert, err := ioutil.ReadFile(ca)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
keyPair, err := tls.LoadX509KeyPair(cert, key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
caPool := x509.NewCertPool()
|
||||||
|
caPool.AppendCertsFromPEM(caCert)
|
||||||
|
|
||||||
|
tlsConfig := &tls.Config{
|
||||||
|
Certificates: []tls.Certificate{keyPair},
|
||||||
|
RootCAs: caPool,
|
||||||
|
}
|
||||||
|
tlsConfig.BuildNameToCertificate()
|
||||||
|
tr.TLSClientConfig = tlsConfig
|
||||||
|
} else if grpcProxyInsecureSkipTLSVerify {
|
||||||
|
tlsConfig := &tls.Config{InsecureSkipVerify: grpcProxyInsecureSkipTLSVerify}
|
||||||
|
tr.TLSClientConfig = tlsConfig
|
||||||
|
}
|
||||||
|
return tr, nil
|
||||||
|
}
|
||||||
|
|
||||||
func mustMetricsListener(lg *zap.Logger, tlsinfo *transport.TLSInfo) net.Listener {
|
func mustMetricsListener(lg *zap.Logger, tlsinfo *transport.TLSInfo) net.Listener {
|
||||||
murl, err := url.Parse(grpcProxyMetricsListenAddr)
|
murl, err := url.Parse(grpcProxyMetricsListenAddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -29,19 +29,19 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
pathMetrics = "/metrics"
|
PathMetrics = "/metrics"
|
||||||
PathHealth = "/health"
|
PathHealth = "/health"
|
||||||
)
|
)
|
||||||
|
|
||||||
// HandleMetricsHealth registers metrics and health handlers.
|
// HandleMetricsHealth registers metrics and health handlers.
|
||||||
func HandleMetricsHealth(mux *http.ServeMux, srv etcdserver.ServerV2) {
|
func HandleMetricsHealth(mux *http.ServeMux, srv etcdserver.ServerV2) {
|
||||||
mux.Handle(pathMetrics, promhttp.Handler())
|
mux.Handle(PathMetrics, promhttp.Handler())
|
||||||
mux.Handle(PathHealth, NewHealthHandler(func() Health { return checkHealth(srv) }))
|
mux.Handle(PathHealth, NewHealthHandler(func() Health { return checkHealth(srv) }))
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandlePrometheus registers prometheus handler on '/metrics'.
|
// HandlePrometheus registers prometheus handler on '/metrics'.
|
||||||
func HandlePrometheus(mux *http.ServeMux) {
|
func HandlePrometheus(mux *http.ServeMux) {
|
||||||
mux.Handle(pathMetrics, promhttp.Handler())
|
mux.Handle(PathMetrics, promhttp.Handler())
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewHealthHandler handles '/health' requests.
|
// NewHealthHandler handles '/health' requests.
|
||||||
|
|
|
@ -14,7 +14,17 @@
|
||||||
|
|
||||||
package grpcproxy
|
package grpcproxy
|
||||||
|
|
||||||
import "github.com/prometheus/client_golang/prometheus"
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"math/rand"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"go.etcd.io/etcd/etcdserver/api/etcdhttp"
|
||||||
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
watchersCoalescing = prometheus.NewGauge(prometheus.GaugeOpts{
|
watchersCoalescing = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
|
@ -56,3 +66,49 @@ func init() {
|
||||||
prometheus.MustRegister(cacheHits)
|
prometheus.MustRegister(cacheHits)
|
||||||
prometheus.MustRegister(cachedMisses)
|
prometheus.MustRegister(cachedMisses)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HandleMetrics performs a GET request against etcd endpoint and returns '/metrics'.
|
||||||
|
func HandleMetrics(mux *http.ServeMux, c *http.Client, eps []string) {
|
||||||
|
// random shuffle endpoints
|
||||||
|
r := rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
|
||||||
|
if len(eps) > 1 {
|
||||||
|
eps = shuffleEndpoints(r, eps)
|
||||||
|
}
|
||||||
|
|
||||||
|
pathMetrics := etcdhttp.PathMetrics
|
||||||
|
mux.HandleFunc(pathMetrics, func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
target := fmt.Sprintf("%s%s", eps[0], pathMetrics)
|
||||||
|
if !strings.HasPrefix(target, "http") {
|
||||||
|
scheme := "http"
|
||||||
|
if r.TLS != nil {
|
||||||
|
scheme = "https"
|
||||||
|
}
|
||||||
|
target = fmt.Sprintf("%s://%s", scheme, target)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := c.Get(target)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, "Internal server error", http.StatusInternalServerError)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
w.Header().Set("Content-Type", "text/plain; version=0.0.4")
|
||||||
|
body, _ := ioutil.ReadAll(resp.Body)
|
||||||
|
fmt.Fprintf(w, "%s", body)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func shuffleEndpoints(r *rand.Rand, eps []string) []string {
|
||||||
|
// copied from Go 1.9<= rand.Rand.Perm
|
||||||
|
n := len(eps)
|
||||||
|
p := make([]int, n)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
j := r.Intn(i + 1)
|
||||||
|
p[i] = p[j]
|
||||||
|
p[j] = i
|
||||||
|
}
|
||||||
|
neps := make([]string, n)
|
||||||
|
for i, k := range p {
|
||||||
|
neps[i] = eps[k]
|
||||||
|
}
|
||||||
|
return neps
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue