Compare commits
26 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
420a452267 | ||
![]() |
348edfeae6 | ||
![]() |
0d5497a107 | ||
![]() |
87418c3432 | ||
![]() |
8c9fd1b5e6 | ||
![]() |
a3c0a99067 | ||
![]() |
b3ab14ca9a | ||
![]() |
8798c5cd43 | ||
![]() |
4e08898571 | ||
![]() |
8ac6c888cd | ||
![]() |
aca5c8f4b6 | ||
![]() |
3535f7a61f | ||
![]() |
fae9b6f667 | ||
![]() |
66d8194e4d | ||
![]() |
2f0e3fd2df | ||
![]() |
cad3cf7b11 | ||
![]() |
bedba66c69 | ||
![]() |
9bc1e15386 | ||
![]() |
6e0131e83b | ||
![]() |
c0e9e14248 | ||
![]() |
b763b506ab | ||
![]() |
d22ee8423d | ||
![]() |
e5531a4d54 | ||
![]() |
8dabfe12ca | ||
![]() |
360484a3f0 | ||
![]() |
f8af50a8d8 |
68
.travis.yml
68
.travis.yml
@@ -14,20 +14,20 @@ notifications:
|
||||
|
||||
env:
|
||||
matrix:
|
||||
- TARGET=linux-amd64-build
|
||||
- TARGET=linux-amd64-unit
|
||||
- TARGET=linux-amd64-integration
|
||||
- TARGET=linux-amd64-functional
|
||||
- TARGET=linux-386-build
|
||||
- TARGET=linux-amd64-unit
|
||||
- TARGET=all-build
|
||||
- TARGET=linux-386-unit
|
||||
- TARGET=darwin-amd64-build
|
||||
- TARGET=windows-amd64-build
|
||||
- TARGET=linux-arm-build
|
||||
- TARGET=linux-arm64-build
|
||||
- TARGET=linux-ppc64le-build
|
||||
|
||||
matrix:
|
||||
fast_finish: true
|
||||
allow_failures:
|
||||
- go: 1.8.7
|
||||
env: TARGET=linux-386-unit
|
||||
exclude:
|
||||
- go: tip
|
||||
env: TARGET=linux-386-unit
|
||||
|
||||
before_install:
|
||||
- if [[ $TRAVIS_GO_VERSION == 1.* ]]; then docker pull gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION}; fi
|
||||
@@ -39,16 +39,6 @@ script:
|
||||
- echo "TRAVIS_GO_VERSION=${TRAVIS_GO_VERSION}"
|
||||
- >
|
||||
case "${TARGET}" in
|
||||
linux-amd64-build)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
/bin/bash -c "GOARCH=amd64 PASSES='build' ./test"
|
||||
;;
|
||||
linux-amd64-unit)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
/bin/bash -c "GOARCH=amd64 PASSES='unit' ./test"
|
||||
;;
|
||||
linux-amd64-integration)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
@@ -57,41 +47,27 @@ script:
|
||||
linux-amd64-functional)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
/bin/bash -c "./build && GOARCH=amd64 PASSES='build functional' ./test"
|
||||
/bin/bash -c "./build && GOARCH=amd64 PASSES='functional' ./test"
|
||||
;;
|
||||
linux-386-build)
|
||||
linux-amd64-unit)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
/bin/bash -c "GOARCH=386 PASSES='build' ./test"
|
||||
/bin/bash -c "GOARCH=amd64 PASSES='unit' ./test"
|
||||
;;
|
||||
all-build)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
/bin/bash -c "GOARCH=amd64 PASSES='build' ./test \
|
||||
&& GOARCH=386 PASSES='build' ./test \
|
||||
&& GO_BUILD_FLAGS='-v' GOOS=darwin GOARCH=amd64 ./build \
|
||||
&& GO_BUILD_FLAGS='-v' GOOS=windows GOARCH=amd64 ./build \
|
||||
&& GO_BUILD_FLAGS='-v' GOARCH=arm ./build \
|
||||
&& GO_BUILD_FLAGS='-v' GOARCH=arm64 ./build \
|
||||
&& GO_BUILD_FLAGS='-v' GOARCH=ppc64le ./build"
|
||||
;;
|
||||
linux-386-unit)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
/bin/bash -c "GOARCH=386 PASSES='unit' ./test"
|
||||
;;
|
||||
darwin-amd64-build)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
/bin/bash -c "GO_BUILD_FLAGS='-v' GOOS=darwin GOARCH=amd64 ./build"
|
||||
;;
|
||||
windows-amd64-build)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
/bin/bash -c "GO_BUILD_FLAGS='-v' GOOS=windows GOARCH=amd64 ./build"
|
||||
;;
|
||||
linux-arm-build)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
/bin/bash -c "GO_BUILD_FLAGS='-v' GOARCH=arm ./build"
|
||||
;;
|
||||
linux-arm64-build)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
/bin/bash -c "GO_BUILD_FLAGS='-v' GOARCH=arm64 ./build"
|
||||
;;
|
||||
linux-ppc64le-build)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
/bin/bash -c "GO_BUILD_FLAGS='-v' GOARCH=ppc64le ./build"
|
||||
;;
|
||||
esac
|
||||
|
@@ -310,6 +310,49 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestLeaseKeepAliveFullResponseQueue ensures when response
|
||||
// queue is full thus dropping keepalive response sends,
|
||||
// keepalive request is sent with the same rate of TTL / 3.
|
||||
func TestLeaseKeepAliveFullResponseQueue(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
lapi := clus.Client(0)
|
||||
|
||||
// expect lease keepalive every 10-second
|
||||
lresp, err := lapi.Grant(context.Background(), 30)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create lease %v", err)
|
||||
}
|
||||
id := lresp.ID
|
||||
|
||||
old := clientv3.LeaseResponseChSize
|
||||
defer func() {
|
||||
clientv3.LeaseResponseChSize = old
|
||||
}()
|
||||
clientv3.LeaseResponseChSize = 0
|
||||
|
||||
// never fetch from response queue, and let it become full
|
||||
_, err = lapi.KeepAlive(context.Background(), id)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to keepalive lease %v", err)
|
||||
}
|
||||
|
||||
// TTL should not be refreshed after 3 seconds
|
||||
// expect keepalive to be triggered after TTL/3
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
tr, terr := lapi.TimeToLive(context.Background(), id)
|
||||
if terr != nil {
|
||||
t.Fatalf("failed to get lease information %v", terr)
|
||||
}
|
||||
if tr.TTL >= 29 {
|
||||
t.Errorf("unexpected kept-alive lease TTL %d", tr.TTL)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeaseGrantNewAfterClose(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
|
@@ -71,8 +71,6 @@ const (
|
||||
// defaultTTL is the assumed lease TTL used for the first keepalive
|
||||
// deadline before the actual TTL is known to the client.
|
||||
defaultTTL = 5 * time.Second
|
||||
// a small buffer to store unsent lease responses.
|
||||
leaseResponseChSize = 16
|
||||
// NoLease is a lease ID for the absence of a lease.
|
||||
NoLease LeaseID = 0
|
||||
|
||||
@@ -80,6 +78,11 @@ const (
|
||||
retryConnWait = 500 * time.Millisecond
|
||||
)
|
||||
|
||||
// LeaseResponseChSize is the size of buffer to store unsent lease responses.
|
||||
// WARNING: DO NOT UPDATE.
|
||||
// Only for testing purposes.
|
||||
var LeaseResponseChSize = 16
|
||||
|
||||
// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
|
||||
//
|
||||
// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
|
||||
@@ -219,7 +222,7 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption
|
||||
}
|
||||
|
||||
func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
|
||||
ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)
|
||||
ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)
|
||||
|
||||
l.mu.Lock()
|
||||
// ensure that recvKeepAliveLoop is still running
|
||||
@@ -475,9 +478,10 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
|
||||
for _, ch := range ka.chs {
|
||||
select {
|
||||
case ch <- karesp:
|
||||
ka.nextKeepAlive = nextKeepAlive
|
||||
default:
|
||||
}
|
||||
// still advance in order to rate-limit keep-alive sends
|
||||
ka.nextKeepAlive = nextKeepAlive
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -17,8 +17,10 @@ package etcdmain
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"math"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
@@ -40,12 +42,22 @@ import (
|
||||
|
||||
var (
|
||||
grpcProxyListenAddr string
|
||||
grpcProxyMetricsListenAddr string
|
||||
grpcProxyEndpoints []string
|
||||
grpcProxyDNSCluster string
|
||||
grpcProxyInsecureDiscovery bool
|
||||
grpcProxyCert string
|
||||
grpcProxyKey string
|
||||
grpcProxyCA string
|
||||
|
||||
// tls for connecting to etcd
|
||||
|
||||
grpcProxyCA string
|
||||
grpcProxyCert string
|
||||
grpcProxyKey string
|
||||
|
||||
// tls for clients connecting to proxy
|
||||
|
||||
grpcProxyListenCA string
|
||||
grpcProxyListenCert string
|
||||
grpcProxyListenKey string
|
||||
|
||||
grpcProxyAdvertiseClientURL string
|
||||
grpcProxyResolverPrefix string
|
||||
@@ -80,21 +92,67 @@ func newGRPCProxyStartCommand() *cobra.Command {
|
||||
|
||||
cmd.Flags().StringVar(&grpcProxyListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
|
||||
cmd.Flags().StringVar(&grpcProxyDNSCluster, "discovery-srv", "", "DNS domain used to bootstrap initial cluster")
|
||||
cmd.Flags().StringVar(&grpcProxyMetricsListenAddr, "metrics-addr", "", "listen for /metrics requests on an additional interface")
|
||||
cmd.Flags().BoolVar(&grpcProxyInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
|
||||
cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
|
||||
cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file")
|
||||
cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file")
|
||||
cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle")
|
||||
cmd.Flags().StringVar(&grpcProxyAdvertiseClientURL, "advertise-client-url", "127.0.0.1:23790", "advertise address to register (must be reachable by client)")
|
||||
cmd.Flags().StringVar(&grpcProxyResolverPrefix, "resolver-prefix", "", "prefix to use for registering proxy (must be shared with other grpc-proxy members)")
|
||||
cmd.Flags().IntVar(&grpcProxyResolverTTL, "resolver-ttl", 0, "specify TTL, in seconds, when registering proxy endpoints")
|
||||
cmd.Flags().StringVar(&grpcProxyNamespace, "namespace", "", "string to prefix to all keys for namespacing requests")
|
||||
cmd.Flags().BoolVar(&grpcProxyEnablePprof, "enable-pprof", false, `Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/"`)
|
||||
|
||||
// client TLS for connecting to server
|
||||
cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file")
|
||||
cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file")
|
||||
cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle")
|
||||
|
||||
// client TLS for connecting to proxy
|
||||
cmd.Flags().StringVar(&grpcProxyListenCert, "cert-file", "", "identify secure connections to the proxy using this TLS certificate file")
|
||||
cmd.Flags().StringVar(&grpcProxyListenKey, "key-file", "", "identify secure connections to the proxy using this TLS key file")
|
||||
cmd.Flags().StringVar(&grpcProxyListenCA, "trusted-ca-file", "", "verify certificates of TLS-enabled secure proxy using this CA bundle")
|
||||
|
||||
return &cmd
|
||||
}
|
||||
|
||||
func startGRPCProxy(cmd *cobra.Command, args []string) {
|
||||
checkArgs()
|
||||
|
||||
tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey)
|
||||
if tlsinfo != nil {
|
||||
plog.Infof("ServerTLS: %s", tlsinfo)
|
||||
}
|
||||
m := mustListenCMux(tlsinfo)
|
||||
|
||||
grpcl := m.Match(cmux.HTTP2())
|
||||
defer func() {
|
||||
grpcl.Close()
|
||||
plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
|
||||
}()
|
||||
|
||||
client := mustNewClient()
|
||||
|
||||
srvhttp, httpl := mustHTTPListener(m, tlsinfo)
|
||||
errc := make(chan error)
|
||||
go func() { errc <- newGRPCProxyServer(client).Serve(grpcl) }()
|
||||
go func() { errc <- srvhttp.Serve(httpl) }()
|
||||
go func() { errc <- m.Serve() }()
|
||||
if len(grpcProxyMetricsListenAddr) > 0 {
|
||||
mhttpl := mustMetricsListener(tlsinfo)
|
||||
go func() {
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle("/metrics", prometheus.Handler())
|
||||
plog.Fatal(http.Serve(mhttpl, mux))
|
||||
}()
|
||||
}
|
||||
|
||||
// grpc-proxy is initialized, ready to serve
|
||||
notifySystemd()
|
||||
|
||||
fmt.Fprintln(os.Stderr, <-errc)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
func checkArgs() {
|
||||
if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL < 1 {
|
||||
fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-ttl %d", grpcProxyResolverTTL))
|
||||
os.Exit(1)
|
||||
@@ -107,40 +165,76 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
|
||||
fmt.Fprintln(os.Stderr, fmt.Errorf("invalid advertise-client-url %q", grpcProxyAdvertiseClientURL))
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func mustNewClient() *clientv3.Client {
|
||||
srvs := discoverEndpoints(grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery)
|
||||
if len(srvs.Endpoints) != 0 {
|
||||
grpcProxyEndpoints = srvs.Endpoints
|
||||
eps := srvs.Endpoints
|
||||
if len(eps) == 0 {
|
||||
eps = grpcProxyEndpoints
|
||||
}
|
||||
|
||||
l, err := net.Listen("tcp", grpcProxyListenAddr)
|
||||
cfg, err := newClientCfg(eps)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if l, err = transport.NewKeepAliveListener(l, "tcp", nil); err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
plog.Infof("listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
|
||||
defer func() {
|
||||
l.Close()
|
||||
plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
|
||||
}()
|
||||
m := cmux.New(l)
|
||||
|
||||
cfg, err := newClientCfg()
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
client, err := clientv3.New(*cfg)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
return client
|
||||
}
|
||||
|
||||
func newClientCfg(eps []string) (*clientv3.Config, error) {
|
||||
// set tls if any one tls option set
|
||||
cfg := clientv3.Config{
|
||||
Endpoints: eps,
|
||||
DialTimeout: 5 * time.Second,
|
||||
}
|
||||
if tls := newTLS(grpcProxyCA, grpcProxyCert, grpcProxyKey); tls != nil {
|
||||
clientTLS, err := tls.ClientConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cfg.TLS = clientTLS
|
||||
plog.Infof("ClientTLS: %s", tls)
|
||||
}
|
||||
// TODO: support insecure tls
|
||||
return &cfg, nil
|
||||
}
|
||||
|
||||
func newTLS(ca, cert, key string) *transport.TLSInfo {
|
||||
if ca == "" && cert == "" && key == "" {
|
||||
return nil
|
||||
}
|
||||
return &transport.TLSInfo{CAFile: ca, CertFile: cert, KeyFile: key}
|
||||
}
|
||||
|
||||
func mustListenCMux(tlsinfo *transport.TLSInfo) cmux.CMux {
|
||||
l, err := net.Listen("tcp", grpcProxyListenAddr)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
var tlscfg *tls.Config
|
||||
scheme := "http"
|
||||
if tlsinfo != nil {
|
||||
if tlscfg, err = tlsinfo.ServerConfig(); err != nil {
|
||||
plog.Fatal(err)
|
||||
}
|
||||
scheme = "https"
|
||||
}
|
||||
if l, err = transport.NewKeepAliveListener(l, scheme, tlscfg); err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
plog.Infof("listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
|
||||
return cmux.New(l)
|
||||
}
|
||||
|
||||
func newGRPCProxyServer(client *clientv3.Client) *grpc.Server {
|
||||
if len(grpcProxyNamespace) > 0 {
|
||||
client.KV = namespace.NewKV(client.KV, grpcProxyNamespace)
|
||||
client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)
|
||||
@@ -162,7 +256,9 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
|
||||
server := grpc.NewServer(
|
||||
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
|
||||
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
|
||||
grpc.MaxConcurrentStreams(math.MaxUint32),
|
||||
)
|
||||
|
||||
pb.RegisterKVServer(server, kvp)
|
||||
pb.RegisterWatchServer(server, watchp)
|
||||
pb.RegisterClusterServer(server, clusterp)
|
||||
@@ -171,12 +267,10 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
|
||||
pb.RegisterAuthServer(server, authp)
|
||||
v3electionpb.RegisterElectionServer(server, electionp)
|
||||
v3lockpb.RegisterLockServer(server, lockp)
|
||||
return server
|
||||
}
|
||||
|
||||
errc := make(chan error)
|
||||
|
||||
grpcl := m.Match(cmux.HTTP2())
|
||||
go func() { errc <- server.Serve(grpcl) }()
|
||||
|
||||
func mustHTTPListener(m cmux.CMux, tlsinfo *transport.TLSInfo) (*http.Server, net.Listener) {
|
||||
httpmux := http.NewServeMux()
|
||||
httpmux.HandleFunc("/", http.NotFound)
|
||||
httpmux.Handle("/metrics", prometheus.Handler())
|
||||
@@ -186,61 +280,31 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
|
||||
}
|
||||
plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf)
|
||||
}
|
||||
srvhttp := &http.Server{Handler: httpmux}
|
||||
|
||||
srvhttp := &http.Server{
|
||||
Handler: httpmux,
|
||||
if tlsinfo == nil {
|
||||
return srvhttp, m.Match(cmux.HTTP1())
|
||||
}
|
||||
|
||||
var httpl net.Listener
|
||||
if cfg.TLS != nil {
|
||||
srvhttp.TLSConfig = cfg.TLS
|
||||
httpl = tls.NewListener(m.Match(cmux.Any()), cfg.TLS)
|
||||
} else {
|
||||
httpl = m.Match(cmux.HTTP1())
|
||||
srvTLS, err := tlsinfo.ServerConfig()
|
||||
if err != nil {
|
||||
plog.Fatalf("could not setup TLS (%v)", err)
|
||||
}
|
||||
go func() { errc <- srvhttp.Serve(httpl) }()
|
||||
|
||||
go func() { errc <- m.Serve() }()
|
||||
|
||||
// grpc-proxy is initialized, ready to serve
|
||||
notifySystemd()
|
||||
|
||||
fmt.Fprintln(os.Stderr, <-errc)
|
||||
os.Exit(1)
|
||||
srvhttp.TLSConfig = srvTLS
|
||||
return srvhttp, m.Match(cmux.Any())
|
||||
}
|
||||
|
||||
func newClientCfg() (*clientv3.Config, error) {
|
||||
// set tls if any one tls option set
|
||||
var cfgtls *transport.TLSInfo
|
||||
tlsinfo := transport.TLSInfo{}
|
||||
if grpcProxyCert != "" {
|
||||
tlsinfo.CertFile = grpcProxyCert
|
||||
cfgtls = &tlsinfo
|
||||
func mustMetricsListener(tlsinfo *transport.TLSInfo) net.Listener {
|
||||
murl, err := url.Parse(grpcProxyMetricsListenAddr)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "cannot parse %q", grpcProxyMetricsListenAddr)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if grpcProxyKey != "" {
|
||||
tlsinfo.KeyFile = grpcProxyKey
|
||||
cfgtls = &tlsinfo
|
||||
ml, err := transport.NewListener(murl.Host, murl.Scheme, tlsinfo)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if grpcProxyCA != "" {
|
||||
tlsinfo.CAFile = grpcProxyCA
|
||||
cfgtls = &tlsinfo
|
||||
}
|
||||
|
||||
cfg := clientv3.Config{
|
||||
Endpoints: grpcProxyEndpoints,
|
||||
DialTimeout: 5 * time.Second,
|
||||
}
|
||||
if cfgtls != nil {
|
||||
clientTLS, err := cfgtls.ClientConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cfg.TLS = clientTLS
|
||||
}
|
||||
|
||||
// TODO: support insecure tls
|
||||
|
||||
return &cfg, nil
|
||||
plog.Info("grpc-proxy: listening for metrics on ", murl.String())
|
||||
return ml
|
||||
}
|
||||
|
@@ -15,6 +15,7 @@
|
||||
package etcdserver
|
||||
|
||||
import (
|
||||
goruntime "runtime"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/pkg/runtime"
|
||||
@@ -41,6 +42,18 @@ var (
|
||||
Name: "leader_changes_seen_total",
|
||||
Help: "The number of leader changes seen.",
|
||||
})
|
||||
heartbeatSendFailures = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "heartbeat_send_failures_total",
|
||||
Help: "The total number of leader heartbeat send failures (likely overloaded from slow disk).",
|
||||
})
|
||||
slowApplies = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "slow_apply_total",
|
||||
Help: "The total number of slow apply requests (likely overloaded from slow disk).",
|
||||
})
|
||||
proposalsCommitted = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
@@ -71,6 +84,18 @@ var (
|
||||
Name: "lease_expired_total",
|
||||
Help: "The total number of expired leases.",
|
||||
})
|
||||
slowReadIndex = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "slow_read_indexes_total",
|
||||
Help: "The total number of pending read indexes not in sync with leader's or timed out read index requests.",
|
||||
})
|
||||
quotaBackendBytes = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "quota_backend_bytes",
|
||||
Help: "Current backend storage quota size in bytes.",
|
||||
})
|
||||
currentVersion = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
@@ -78,22 +103,37 @@ var (
|
||||
Help: "Which version is running. 1 for 'server_version' label with current version.",
|
||||
},
|
||||
[]string{"server_version"})
|
||||
currentGoVersion = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "go_version",
|
||||
Help: "Which Go version server is running with. 1 for 'server_go_version' label with current version.",
|
||||
},
|
||||
[]string{"server_go_version"})
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(hasLeader)
|
||||
prometheus.MustRegister(isLeader)
|
||||
prometheus.MustRegister(leaderChanges)
|
||||
prometheus.MustRegister(heartbeatSendFailures)
|
||||
prometheus.MustRegister(slowApplies)
|
||||
prometheus.MustRegister(proposalsCommitted)
|
||||
prometheus.MustRegister(proposalsApplied)
|
||||
prometheus.MustRegister(proposalsPending)
|
||||
prometheus.MustRegister(proposalsFailed)
|
||||
prometheus.MustRegister(leaseExpired)
|
||||
prometheus.MustRegister(slowReadIndex)
|
||||
prometheus.MustRegister(quotaBackendBytes)
|
||||
prometheus.MustRegister(currentVersion)
|
||||
prometheus.MustRegister(currentGoVersion)
|
||||
|
||||
currentVersion.With(prometheus.Labels{
|
||||
"server_version": version.Version,
|
||||
}).Set(1)
|
||||
currentGoVersion.With(prometheus.Labels{
|
||||
"server_go_version": goruntime.Version(),
|
||||
}).Set(1)
|
||||
}
|
||||
|
||||
func monitorFileDescriptor(done <-chan struct{}) {
|
||||
|
@@ -14,9 +14,7 @@
|
||||
|
||||
package etcdserver
|
||||
|
||||
import (
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
)
|
||||
import pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
|
||||
const (
|
||||
// DefaultQuotaBytes is the number of bytes the backend Size may
|
||||
@@ -58,15 +56,20 @@ const (
|
||||
)
|
||||
|
||||
func NewBackendQuota(s *EtcdServer) Quota {
|
||||
quotaBackendBytes.Set(float64(s.Cfg.QuotaBackendBytes))
|
||||
|
||||
if s.Cfg.QuotaBackendBytes < 0 {
|
||||
// disable quotas if negative
|
||||
plog.Warningf("disabling backend quota")
|
||||
return &passthroughQuota{}
|
||||
}
|
||||
|
||||
if s.Cfg.QuotaBackendBytes == 0 {
|
||||
// use default size if no quota size given
|
||||
quotaBackendBytes.Set(float64(DefaultQuotaBytes))
|
||||
return &backendQuota{s, DefaultQuotaBytes}
|
||||
}
|
||||
|
||||
if s.Cfg.QuotaBackendBytes > MaxQuotaBytes {
|
||||
plog.Warningf("backend quota %v exceeds maximum recommended quota %v", s.Cfg.QuotaBackendBytes, MaxQuotaBytes)
|
||||
}
|
||||
|
@@ -346,6 +346,7 @@ func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
|
||||
// TODO: limit request rate.
|
||||
plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
|
||||
plog.Warningf("server is likely overloaded")
|
||||
heartbeatSendFailures.Inc()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -146,6 +146,7 @@ func warnOfExpensiveGenericRequest(now time.Time, reqStringer fmt.Stringer, pref
|
||||
result = resp
|
||||
}
|
||||
plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d)
|
||||
slowApplies.Inc()
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -597,8 +597,9 @@ func (s *EtcdServer) linearizableReadLoop() {
|
||||
var rs raft.ReadState
|
||||
|
||||
for {
|
||||
ctx := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(ctx, s.reqIDGen.Next())
|
||||
ctxToSend := make([]byte, 8)
|
||||
id1 := s.reqIDGen.Next()
|
||||
binary.BigEndian.PutUint64(ctxToSend, id1)
|
||||
|
||||
select {
|
||||
case <-s.readwaitc:
|
||||
@@ -614,7 +615,7 @@ func (s *EtcdServer) linearizableReadLoop() {
|
||||
s.readMu.Unlock()
|
||||
|
||||
cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
|
||||
if err := s.r.ReadIndex(cctx, ctx); err != nil {
|
||||
if err := s.r.ReadIndex(cctx, ctxToSend); err != nil {
|
||||
cancel()
|
||||
if err == raft.ErrStopped {
|
||||
return
|
||||
@@ -632,16 +633,24 @@ func (s *EtcdServer) linearizableReadLoop() {
|
||||
for !timeout && !done {
|
||||
select {
|
||||
case rs = <-s.r.readStateC:
|
||||
done = bytes.Equal(rs.RequestCtx, ctx)
|
||||
done = bytes.Equal(rs.RequestCtx, ctxToSend)
|
||||
if !done {
|
||||
// a previous request might time out. now we should ignore the response of it and
|
||||
// continue waiting for the response of the current requests.
|
||||
plog.Warningf("ignored out-of-date read index response (want %v, got %v)", rs.RequestCtx, ctx)
|
||||
id2 := uint64(0)
|
||||
if len(rs.RequestCtx) == 8 {
|
||||
id2 = binary.BigEndian.Uint64(rs.RequestCtx)
|
||||
}
|
||||
plog.Warningf("ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader (request ID want %d, got %d)", id1, id2)
|
||||
slowReadIndex.Inc()
|
||||
}
|
||||
|
||||
case <-time.After(s.Cfg.ReqTimeout()):
|
||||
plog.Warningf("timed out waiting for read index response")
|
||||
nr.notify(ErrTimeout)
|
||||
timeout = true
|
||||
slowReadIndex.Inc()
|
||||
|
||||
case <-s.stopping:
|
||||
return
|
||||
}
|
||||
@@ -691,6 +700,5 @@ func (s *EtcdServer) AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error
|
||||
return authInfo, nil
|
||||
}
|
||||
}
|
||||
|
||||
return s.AuthStore().AuthInfoFromCtx(ctx)
|
||||
}
|
||||
|
@@ -54,6 +54,10 @@ type Backend interface {
|
||||
Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
|
||||
// Size returns the current size of the backend.
|
||||
Size() int64
|
||||
// SizeInUse returns the current size of the backend logically in use.
|
||||
// Since the backend can manage free space in a non-byte unit such as
|
||||
// number of pages, the returned value can be not exactly accurate in bytes.
|
||||
SizeInUse() int64
|
||||
Defrag() error
|
||||
ForceCommit()
|
||||
Close() error
|
||||
@@ -74,6 +78,10 @@ type backend struct {
|
||||
|
||||
// size is the number of bytes in the backend
|
||||
size int64
|
||||
|
||||
// sizeInUse is the number of bytes actually used in the backend
|
||||
sizeInUse int64
|
||||
|
||||
// commits counts number of commits since start
|
||||
commits int64
|
||||
|
||||
@@ -244,6 +252,10 @@ func (b *backend) Size() int64 {
|
||||
return atomic.LoadInt64(&b.size)
|
||||
}
|
||||
|
||||
func (b *backend) SizeInUse() int64 {
|
||||
return atomic.LoadInt64(&b.sizeInUse)
|
||||
}
|
||||
|
||||
func (b *backend) run() {
|
||||
defer close(b.donec)
|
||||
t := time.NewTimer(b.batchInterval)
|
||||
@@ -272,18 +284,12 @@ func (b *backend) Commits() int64 {
|
||||
}
|
||||
|
||||
func (b *backend) Defrag() error {
|
||||
err := b.defrag()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// commit to update metadata like db.size
|
||||
b.batchTx.Commit()
|
||||
|
||||
return nil
|
||||
return b.defrag()
|
||||
}
|
||||
|
||||
func (b *backend) defrag() error {
|
||||
now := time.Now()
|
||||
|
||||
// TODO: make this non-blocking?
|
||||
// lock batchTx to ensure nobody is using previous tx, and then
|
||||
// close previous ongoing tx.
|
||||
@@ -341,7 +347,14 @@ func (b *backend) defrag() error {
|
||||
|
||||
b.readTx.buf.reset()
|
||||
b.readTx.tx = b.unsafeBegin(false)
|
||||
atomic.StoreInt64(&b.size, b.readTx.tx.Size())
|
||||
|
||||
size := b.readTx.tx.Size()
|
||||
db := b.db
|
||||
atomic.StoreInt64(&b.size, size)
|
||||
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
|
||||
|
||||
took := time.Since(now)
|
||||
defragDurations.Observe(took.Seconds())
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -402,7 +415,12 @@ func (b *backend) begin(write bool) *bolt.Tx {
|
||||
b.mu.RLock()
|
||||
tx := b.unsafeBegin(write)
|
||||
b.mu.RUnlock()
|
||||
atomic.StoreInt64(&b.size, tx.Size())
|
||||
|
||||
size := tx.Size()
|
||||
db := tx.DB()
|
||||
atomic.StoreInt64(&b.size, size)
|
||||
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
|
||||
|
||||
return tx
|
||||
}
|
||||
|
||||
|
@@ -141,15 +141,15 @@ func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error)
|
||||
// Commit commits a previous tx and begins a new writable one.
|
||||
func (t *batchTx) Commit() {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
t.commit(false)
|
||||
t.Unlock()
|
||||
}
|
||||
|
||||
// CommitAndStop commits the previous tx and does not create a new one.
|
||||
func (t *batchTx) CommitAndStop() {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
t.commit(true)
|
||||
t.Unlock()
|
||||
}
|
||||
|
||||
func (t *batchTx) Unlock() {
|
||||
@@ -163,21 +163,15 @@ func (t *batchTx) commit(stop bool) {
|
||||
// commit the last tx
|
||||
if t.tx != nil {
|
||||
if t.pending == 0 && !stop {
|
||||
t.backend.mu.RLock()
|
||||
defer t.backend.mu.RUnlock()
|
||||
|
||||
// t.tx.DB()==nil if 'CommitAndStop' calls 'batchTx.commit(true)',
|
||||
// which initializes *bolt.Tx.db and *bolt.Tx.meta as nil; panics t.tx.Size().
|
||||
// Server must make sure 'batchTx.commit(false)' does not follow
|
||||
// 'batchTx.commit(true)' (e.g. stopping backend, and inflight Hash call).
|
||||
atomic.StoreInt64(&t.backend.size, t.tx.Size())
|
||||
return
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
||||
// gofail: var beforeCommit struct{}
|
||||
err := t.tx.Commit()
|
||||
// gofail: var afterCommit struct{}
|
||||
|
||||
commitDurations.Observe(time.Since(start).Seconds())
|
||||
atomic.AddInt64(&t.backend.commits, 1)
|
||||
|
||||
@@ -222,21 +216,21 @@ func (t *batchTxBuffered) Unlock() {
|
||||
|
||||
func (t *batchTxBuffered) Commit() {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
t.commit(false)
|
||||
t.Unlock()
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) CommitAndStop() {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
t.commit(true)
|
||||
t.Unlock()
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) commit(stop bool) {
|
||||
// all read txs must be closed to acquire boltdb commit rwlock
|
||||
t.backend.readTx.mu.Lock()
|
||||
defer t.backend.readTx.mu.Unlock()
|
||||
t.unsafeCommit(stop)
|
||||
t.backend.readTx.mu.Unlock()
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) unsafeCommit(stop bool) {
|
||||
|
@@ -22,7 +22,22 @@ var (
|
||||
Subsystem: "disk",
|
||||
Name: "backend_commit_duration_seconds",
|
||||
Help: "The latency distributions of commit called by backend.",
|
||||
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
|
||||
|
||||
// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
|
||||
// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
|
||||
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
|
||||
})
|
||||
|
||||
defragDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "disk",
|
||||
Name: "backend_defrag_duration_seconds",
|
||||
Help: "The latency distribution of backend defragmentation.",
|
||||
|
||||
// 100 MB usually takes 1 sec, so start with 10 MB of 100 ms
|
||||
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
|
||||
// highest bucket start of 0.1 sec * 2^12 == 409.6 sec
|
||||
Buckets: prometheus.ExponentialBuckets(.1, 2, 13),
|
||||
})
|
||||
|
||||
snapshotDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
@@ -30,12 +45,15 @@ var (
|
||||
Subsystem: "disk",
|
||||
Name: "backend_snapshot_duration_seconds",
|
||||
Help: "The latency distribution of backend snapshots.",
|
||||
// 10 ms -> 655 seconds
|
||||
|
||||
// lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
|
||||
// highest bucket start of 0.01 sec * 2^16 == 655.36 sec
|
||||
Buckets: prometheus.ExponentialBuckets(.01, 2, 17),
|
||||
})
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(commitDurations)
|
||||
prometheus.MustRegister(defragDurations)
|
||||
prometheus.MustRegister(snapshotDurations)
|
||||
}
|
||||
|
@@ -150,8 +150,12 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
|
||||
}
|
||||
|
||||
func (s *store) Hash() (hash uint32, revision int64, err error) {
|
||||
start := time.Now()
|
||||
|
||||
s.b.ForceCommit()
|
||||
h, err := s.b.Hash(DefaultIgnores)
|
||||
|
||||
hashDurations.Observe(time.Since(start).Seconds())
|
||||
return h, s.currentRev, err
|
||||
}
|
||||
|
||||
@@ -245,10 +249,14 @@ func (s *store) Restore(b backend.Backend) error {
|
||||
}
|
||||
|
||||
func (s *store) restore() error {
|
||||
reportDbTotalSizeInBytesMu.Lock()
|
||||
b := s.b
|
||||
|
||||
reportDbTotalSizeInBytesMu.Lock()
|
||||
reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) }
|
||||
reportDbTotalSizeInBytesMu.Unlock()
|
||||
reportDbTotalSizeInUseInBytesMu.Lock()
|
||||
reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) }
|
||||
reportDbTotalSizeInUseInBytesMu.Unlock()
|
||||
|
||||
min, max := newRevBytes(), newRevBytes()
|
||||
revToBytes(revision{main: 1}, min)
|
||||
|
@@ -638,6 +638,7 @@ func (b *fakeBackend) BatchTx() backend.BatchTx
|
||||
func (b *fakeBackend) ReadTx() backend.ReadTx { return b.tx }
|
||||
func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil }
|
||||
func (b *fakeBackend) Size() int64 { return 0 }
|
||||
func (b *fakeBackend) SizeInUse() int64 { return 0 }
|
||||
func (b *fakeBackend) Snapshot() backend.Snapshot { return nil }
|
||||
func (b *fakeBackend) ForceCommit() {}
|
||||
func (b *fakeBackend) Defrag() error { return nil }
|
||||
|
@@ -131,11 +131,23 @@ var (
|
||||
Buckets: prometheus.ExponentialBuckets(100, 2, 14),
|
||||
})
|
||||
|
||||
dbTotalSize = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
dbTotalSizeDebugging = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Namespace: "etcd_debugging",
|
||||
Subsystem: "mvcc",
|
||||
Name: "db_total_size_in_bytes",
|
||||
Help: "Total size of the underlying database in bytes.",
|
||||
Help: "Total size of the underlying database physically allocated in bytes. Use etcd_mvcc_db_total_size_in_bytes",
|
||||
},
|
||||
func() float64 {
|
||||
reportDbTotalSizeInBytesMu.RLock()
|
||||
defer reportDbTotalSizeInBytesMu.RUnlock()
|
||||
return reportDbTotalSizeInBytes()
|
||||
},
|
||||
)
|
||||
dbTotalSize = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "mvcc",
|
||||
Name: "db_total_size_in_bytes",
|
||||
Help: "Total size of the underlying database physically allocated in bytes.",
|
||||
},
|
||||
func() float64 {
|
||||
reportDbTotalSizeInBytesMu.RLock()
|
||||
@@ -145,7 +157,35 @@ var (
|
||||
)
|
||||
// overridden by mvcc initialization
|
||||
reportDbTotalSizeInBytesMu sync.RWMutex
|
||||
reportDbTotalSizeInBytes func() float64 = func() float64 { return 0 }
|
||||
reportDbTotalSizeInBytes = func() float64 { return 0 }
|
||||
|
||||
dbTotalSizeInUse = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "mvcc",
|
||||
Name: "db_total_size_in_use_in_bytes",
|
||||
Help: "Total size of the underlying database logically in use in bytes.",
|
||||
},
|
||||
func() float64 {
|
||||
reportDbTotalSizeInUseInBytesMu.RLock()
|
||||
defer reportDbTotalSizeInUseInBytesMu.RUnlock()
|
||||
return reportDbTotalSizeInUseInBytes()
|
||||
},
|
||||
)
|
||||
// overridden by mvcc initialization
|
||||
reportDbTotalSizeInUseInBytesMu sync.RWMutex
|
||||
reportDbTotalSizeInUseInBytes func() float64 = func() float64 { return 0 }
|
||||
|
||||
hashDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "mvcc",
|
||||
Name: "hash_duration_seconds",
|
||||
Help: "The latency distribution of storage hash operation.",
|
||||
|
||||
// 100 MB usually takes 100 ms, so start with 10 MB of 10 ms
|
||||
// lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
|
||||
// highest bucket start of 0.01 sec * 2^14 == 163.84 sec
|
||||
Buckets: prometheus.ExponentialBuckets(.01, 2, 15),
|
||||
})
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -162,7 +202,10 @@ func init() {
|
||||
prometheus.MustRegister(indexCompactionPauseDurations)
|
||||
prometheus.MustRegister(dbCompactionPauseDurations)
|
||||
prometheus.MustRegister(dbCompactionTotalDurations)
|
||||
prometheus.MustRegister(dbTotalSizeDebugging)
|
||||
prometheus.MustRegister(dbTotalSize)
|
||||
prometheus.MustRegister(dbTotalSizeInUse)
|
||||
prometheus.MustRegister(hashDurations)
|
||||
}
|
||||
|
||||
// ReportEventReceived reports that an event is received.
|
||||
|
@@ -7,32 +7,11 @@ fi
|
||||
|
||||
<<COMMENT
|
||||
# amd64-e2e
|
||||
bash tests/semaphore.test.bash
|
||||
tests/semaphore.test.bash
|
||||
sudo HOST_TMP_DIR=/tmp TEST_OPTS="PASSES='build release e2e' MANUAL_VER=v3.3.7" make docker-test
|
||||
|
||||
# 386-e2e
|
||||
TEST_ARCH=386 bash tests/semaphore.test.bash
|
||||
|
||||
# grpc-proxy
|
||||
TEST_OPTS="PASSES='build grpcproxy'" bash tests/semaphore.test.bash
|
||||
|
||||
# coverage
|
||||
TEST_OPTS="coverage" bash tests/semaphore.test.bash
|
||||
sudo HOST_TMP_DIR=/tmp TEST_OPTS="GOARCH=386 PASSES='build e2e'" make docker-test
|
||||
COMMENT
|
||||
|
||||
if [ -z "${TEST_OPTS}" ]; then
|
||||
TEST_OPTS="PASSES='build release e2e' MANUAL_VER=v3.2.22"
|
||||
fi
|
||||
if [ "${TEST_ARCH}" == "386" ]; then
|
||||
TEST_OPTS="GOARCH=386 PASSES='build e2e'"
|
||||
fi
|
||||
|
||||
echo "Running tests with" ${TEST_OPTS}
|
||||
if [ "${TEST_OPTS}" == "PASSES='build grpcproxy'" ]; then
|
||||
echo "Skip proxy tests for this branch!"
|
||||
exit 0
|
||||
elif [ "${TEST_OPTS}" == "coverage" ]; then
|
||||
echo "Skip coverage tests for this branch!"
|
||||
exit 0
|
||||
else
|
||||
sudo HOST_TMP_DIR=/tmp TEST_OPTS="${TEST_OPTS}" make docker-test
|
||||
fi
|
||||
sudo HOST_TMP_DIR=/tmp TEST_OPTS="PASSES='build release e2e' MANUAL_VER=v3.2.23" make docker-test
|
||||
|
@@ -26,7 +26,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "3.0.0"
|
||||
Version = "3.2.23"
|
||||
Version = "3.2.24"
|
||||
APIVersion = "unknown"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
|
Reference in New Issue
Block a user