Compare commits
26 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
420a452267 | ||
![]() |
348edfeae6 | ||
![]() |
0d5497a107 | ||
![]() |
87418c3432 | ||
![]() |
8c9fd1b5e6 | ||
![]() |
a3c0a99067 | ||
![]() |
b3ab14ca9a | ||
![]() |
8798c5cd43 | ||
![]() |
4e08898571 | ||
![]() |
8ac6c888cd | ||
![]() |
aca5c8f4b6 | ||
![]() |
3535f7a61f | ||
![]() |
fae9b6f667 | ||
![]() |
66d8194e4d | ||
![]() |
2f0e3fd2df | ||
![]() |
cad3cf7b11 | ||
![]() |
bedba66c69 | ||
![]() |
9bc1e15386 | ||
![]() |
6e0131e83b | ||
![]() |
c0e9e14248 | ||
![]() |
b763b506ab | ||
![]() |
d22ee8423d | ||
![]() |
e5531a4d54 | ||
![]() |
8dabfe12ca | ||
![]() |
360484a3f0 | ||
![]() |
f8af50a8d8 |
68
.travis.yml
68
.travis.yml
@@ -14,20 +14,20 @@ notifications:
|
|||||||
|
|
||||||
env:
|
env:
|
||||||
matrix:
|
matrix:
|
||||||
- TARGET=linux-amd64-build
|
|
||||||
- TARGET=linux-amd64-unit
|
|
||||||
- TARGET=linux-amd64-integration
|
- TARGET=linux-amd64-integration
|
||||||
- TARGET=linux-amd64-functional
|
- TARGET=linux-amd64-functional
|
||||||
- TARGET=linux-386-build
|
- TARGET=linux-amd64-unit
|
||||||
|
- TARGET=all-build
|
||||||
- TARGET=linux-386-unit
|
- TARGET=linux-386-unit
|
||||||
- TARGET=darwin-amd64-build
|
|
||||||
- TARGET=windows-amd64-build
|
|
||||||
- TARGET=linux-arm-build
|
|
||||||
- TARGET=linux-arm64-build
|
|
||||||
- TARGET=linux-ppc64le-build
|
|
||||||
|
|
||||||
matrix:
|
matrix:
|
||||||
fast_finish: true
|
fast_finish: true
|
||||||
|
allow_failures:
|
||||||
|
- go: 1.8.7
|
||||||
|
env: TARGET=linux-386-unit
|
||||||
|
exclude:
|
||||||
|
- go: tip
|
||||||
|
env: TARGET=linux-386-unit
|
||||||
|
|
||||||
before_install:
|
before_install:
|
||||||
- if [[ $TRAVIS_GO_VERSION == 1.* ]]; then docker pull gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION}; fi
|
- if [[ $TRAVIS_GO_VERSION == 1.* ]]; then docker pull gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION}; fi
|
||||||
@@ -39,16 +39,6 @@ script:
|
|||||||
- echo "TRAVIS_GO_VERSION=${TRAVIS_GO_VERSION}"
|
- echo "TRAVIS_GO_VERSION=${TRAVIS_GO_VERSION}"
|
||||||
- >
|
- >
|
||||||
case "${TARGET}" in
|
case "${TARGET}" in
|
||||||
linux-amd64-build)
|
|
||||||
docker run --rm \
|
|
||||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
|
||||||
/bin/bash -c "GOARCH=amd64 PASSES='build' ./test"
|
|
||||||
;;
|
|
||||||
linux-amd64-unit)
|
|
||||||
docker run --rm \
|
|
||||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
|
||||||
/bin/bash -c "GOARCH=amd64 PASSES='unit' ./test"
|
|
||||||
;;
|
|
||||||
linux-amd64-integration)
|
linux-amd64-integration)
|
||||||
docker run --rm \
|
docker run --rm \
|
||||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||||
@@ -57,41 +47,27 @@ script:
|
|||||||
linux-amd64-functional)
|
linux-amd64-functional)
|
||||||
docker run --rm \
|
docker run --rm \
|
||||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||||
/bin/bash -c "./build && GOARCH=amd64 PASSES='build functional' ./test"
|
/bin/bash -c "./build && GOARCH=amd64 PASSES='functional' ./test"
|
||||||
;;
|
;;
|
||||||
linux-386-build)
|
linux-amd64-unit)
|
||||||
docker run --rm \
|
docker run --rm \
|
||||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||||
/bin/bash -c "GOARCH=386 PASSES='build' ./test"
|
/bin/bash -c "GOARCH=amd64 PASSES='unit' ./test"
|
||||||
|
;;
|
||||||
|
all-build)
|
||||||
|
docker run --rm \
|
||||||
|
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||||
|
/bin/bash -c "GOARCH=amd64 PASSES='build' ./test \
|
||||||
|
&& GOARCH=386 PASSES='build' ./test \
|
||||||
|
&& GO_BUILD_FLAGS='-v' GOOS=darwin GOARCH=amd64 ./build \
|
||||||
|
&& GO_BUILD_FLAGS='-v' GOOS=windows GOARCH=amd64 ./build \
|
||||||
|
&& GO_BUILD_FLAGS='-v' GOARCH=arm ./build \
|
||||||
|
&& GO_BUILD_FLAGS='-v' GOARCH=arm64 ./build \
|
||||||
|
&& GO_BUILD_FLAGS='-v' GOARCH=ppc64le ./build"
|
||||||
;;
|
;;
|
||||||
linux-386-unit)
|
linux-386-unit)
|
||||||
docker run --rm \
|
docker run --rm \
|
||||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||||
/bin/bash -c "GOARCH=386 PASSES='unit' ./test"
|
/bin/bash -c "GOARCH=386 PASSES='unit' ./test"
|
||||||
;;
|
;;
|
||||||
darwin-amd64-build)
|
|
||||||
docker run --rm \
|
|
||||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
|
||||||
/bin/bash -c "GO_BUILD_FLAGS='-v' GOOS=darwin GOARCH=amd64 ./build"
|
|
||||||
;;
|
|
||||||
windows-amd64-build)
|
|
||||||
docker run --rm \
|
|
||||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
|
||||||
/bin/bash -c "GO_BUILD_FLAGS='-v' GOOS=windows GOARCH=amd64 ./build"
|
|
||||||
;;
|
|
||||||
linux-arm-build)
|
|
||||||
docker run --rm \
|
|
||||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
|
||||||
/bin/bash -c "GO_BUILD_FLAGS='-v' GOARCH=arm ./build"
|
|
||||||
;;
|
|
||||||
linux-arm64-build)
|
|
||||||
docker run --rm \
|
|
||||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
|
||||||
/bin/bash -c "GO_BUILD_FLAGS='-v' GOARCH=arm64 ./build"
|
|
||||||
;;
|
|
||||||
linux-ppc64le-build)
|
|
||||||
docker run --rm \
|
|
||||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
|
||||||
/bin/bash -c "GO_BUILD_FLAGS='-v' GOARCH=ppc64le ./build"
|
|
||||||
;;
|
|
||||||
esac
|
esac
|
||||||
|
@@ -310,6 +310,49 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestLeaseKeepAliveFullResponseQueue ensures when response
|
||||||
|
// queue is full thus dropping keepalive response sends,
|
||||||
|
// keepalive request is sent with the same rate of TTL / 3.
|
||||||
|
func TestLeaseKeepAliveFullResponseQueue(t *testing.T) {
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
|
|
||||||
|
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||||
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
|
lapi := clus.Client(0)
|
||||||
|
|
||||||
|
// expect lease keepalive every 10-second
|
||||||
|
lresp, err := lapi.Grant(context.Background(), 30)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create lease %v", err)
|
||||||
|
}
|
||||||
|
id := lresp.ID
|
||||||
|
|
||||||
|
old := clientv3.LeaseResponseChSize
|
||||||
|
defer func() {
|
||||||
|
clientv3.LeaseResponseChSize = old
|
||||||
|
}()
|
||||||
|
clientv3.LeaseResponseChSize = 0
|
||||||
|
|
||||||
|
// never fetch from response queue, and let it become full
|
||||||
|
_, err = lapi.KeepAlive(context.Background(), id)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to keepalive lease %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TTL should not be refreshed after 3 seconds
|
||||||
|
// expect keepalive to be triggered after TTL/3
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
|
||||||
|
tr, terr := lapi.TimeToLive(context.Background(), id)
|
||||||
|
if terr != nil {
|
||||||
|
t.Fatalf("failed to get lease information %v", terr)
|
||||||
|
}
|
||||||
|
if tr.TTL >= 29 {
|
||||||
|
t.Errorf("unexpected kept-alive lease TTL %d", tr.TTL)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestLeaseGrantNewAfterClose(t *testing.T) {
|
func TestLeaseGrantNewAfterClose(t *testing.T) {
|
||||||
defer testutil.AfterTest(t)
|
defer testutil.AfterTest(t)
|
||||||
|
|
||||||
|
@@ -71,8 +71,6 @@ const (
|
|||||||
// defaultTTL is the assumed lease TTL used for the first keepalive
|
// defaultTTL is the assumed lease TTL used for the first keepalive
|
||||||
// deadline before the actual TTL is known to the client.
|
// deadline before the actual TTL is known to the client.
|
||||||
defaultTTL = 5 * time.Second
|
defaultTTL = 5 * time.Second
|
||||||
// a small buffer to store unsent lease responses.
|
|
||||||
leaseResponseChSize = 16
|
|
||||||
// NoLease is a lease ID for the absence of a lease.
|
// NoLease is a lease ID for the absence of a lease.
|
||||||
NoLease LeaseID = 0
|
NoLease LeaseID = 0
|
||||||
|
|
||||||
@@ -80,6 +78,11 @@ const (
|
|||||||
retryConnWait = 500 * time.Millisecond
|
retryConnWait = 500 * time.Millisecond
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// LeaseResponseChSize is the size of buffer to store unsent lease responses.
|
||||||
|
// WARNING: DO NOT UPDATE.
|
||||||
|
// Only for testing purposes.
|
||||||
|
var LeaseResponseChSize = 16
|
||||||
|
|
||||||
// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
|
// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
|
||||||
//
|
//
|
||||||
// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
|
// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
|
||||||
@@ -219,7 +222,7 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
|
func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
|
||||||
ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)
|
ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)
|
||||||
|
|
||||||
l.mu.Lock()
|
l.mu.Lock()
|
||||||
// ensure that recvKeepAliveLoop is still running
|
// ensure that recvKeepAliveLoop is still running
|
||||||
@@ -475,9 +478,10 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
|
|||||||
for _, ch := range ka.chs {
|
for _, ch := range ka.chs {
|
||||||
select {
|
select {
|
||||||
case ch <- karesp:
|
case ch <- karesp:
|
||||||
ka.nextKeepAlive = nextKeepAlive
|
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
// still advance in order to rate-limit keep-alive sends
|
||||||
|
ka.nextKeepAlive = nextKeepAlive
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -17,8 +17,10 @@ package etcdmain
|
|||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -40,12 +42,22 @@ import (
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
grpcProxyListenAddr string
|
grpcProxyListenAddr string
|
||||||
|
grpcProxyMetricsListenAddr string
|
||||||
grpcProxyEndpoints []string
|
grpcProxyEndpoints []string
|
||||||
grpcProxyDNSCluster string
|
grpcProxyDNSCluster string
|
||||||
grpcProxyInsecureDiscovery bool
|
grpcProxyInsecureDiscovery bool
|
||||||
grpcProxyCert string
|
|
||||||
grpcProxyKey string
|
// tls for connecting to etcd
|
||||||
grpcProxyCA string
|
|
||||||
|
grpcProxyCA string
|
||||||
|
grpcProxyCert string
|
||||||
|
grpcProxyKey string
|
||||||
|
|
||||||
|
// tls for clients connecting to proxy
|
||||||
|
|
||||||
|
grpcProxyListenCA string
|
||||||
|
grpcProxyListenCert string
|
||||||
|
grpcProxyListenKey string
|
||||||
|
|
||||||
grpcProxyAdvertiseClientURL string
|
grpcProxyAdvertiseClientURL string
|
||||||
grpcProxyResolverPrefix string
|
grpcProxyResolverPrefix string
|
||||||
@@ -80,21 +92,67 @@ func newGRPCProxyStartCommand() *cobra.Command {
|
|||||||
|
|
||||||
cmd.Flags().StringVar(&grpcProxyListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
|
cmd.Flags().StringVar(&grpcProxyListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
|
||||||
cmd.Flags().StringVar(&grpcProxyDNSCluster, "discovery-srv", "", "DNS domain used to bootstrap initial cluster")
|
cmd.Flags().StringVar(&grpcProxyDNSCluster, "discovery-srv", "", "DNS domain used to bootstrap initial cluster")
|
||||||
|
cmd.Flags().StringVar(&grpcProxyMetricsListenAddr, "metrics-addr", "", "listen for /metrics requests on an additional interface")
|
||||||
cmd.Flags().BoolVar(&grpcProxyInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
|
cmd.Flags().BoolVar(&grpcProxyInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
|
||||||
cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
|
cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
|
||||||
cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file")
|
|
||||||
cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file")
|
|
||||||
cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle")
|
|
||||||
cmd.Flags().StringVar(&grpcProxyAdvertiseClientURL, "advertise-client-url", "127.0.0.1:23790", "advertise address to register (must be reachable by client)")
|
cmd.Flags().StringVar(&grpcProxyAdvertiseClientURL, "advertise-client-url", "127.0.0.1:23790", "advertise address to register (must be reachable by client)")
|
||||||
cmd.Flags().StringVar(&grpcProxyResolverPrefix, "resolver-prefix", "", "prefix to use for registering proxy (must be shared with other grpc-proxy members)")
|
cmd.Flags().StringVar(&grpcProxyResolverPrefix, "resolver-prefix", "", "prefix to use for registering proxy (must be shared with other grpc-proxy members)")
|
||||||
cmd.Flags().IntVar(&grpcProxyResolverTTL, "resolver-ttl", 0, "specify TTL, in seconds, when registering proxy endpoints")
|
cmd.Flags().IntVar(&grpcProxyResolverTTL, "resolver-ttl", 0, "specify TTL, in seconds, when registering proxy endpoints")
|
||||||
cmd.Flags().StringVar(&grpcProxyNamespace, "namespace", "", "string to prefix to all keys for namespacing requests")
|
cmd.Flags().StringVar(&grpcProxyNamespace, "namespace", "", "string to prefix to all keys for namespacing requests")
|
||||||
cmd.Flags().BoolVar(&grpcProxyEnablePprof, "enable-pprof", false, `Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/"`)
|
cmd.Flags().BoolVar(&grpcProxyEnablePprof, "enable-pprof", false, `Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/"`)
|
||||||
|
|
||||||
|
// client TLS for connecting to server
|
||||||
|
cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file")
|
||||||
|
cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file")
|
||||||
|
cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle")
|
||||||
|
|
||||||
|
// client TLS for connecting to proxy
|
||||||
|
cmd.Flags().StringVar(&grpcProxyListenCert, "cert-file", "", "identify secure connections to the proxy using this TLS certificate file")
|
||||||
|
cmd.Flags().StringVar(&grpcProxyListenKey, "key-file", "", "identify secure connections to the proxy using this TLS key file")
|
||||||
|
cmd.Flags().StringVar(&grpcProxyListenCA, "trusted-ca-file", "", "verify certificates of TLS-enabled secure proxy using this CA bundle")
|
||||||
|
|
||||||
return &cmd
|
return &cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
func startGRPCProxy(cmd *cobra.Command, args []string) {
|
func startGRPCProxy(cmd *cobra.Command, args []string) {
|
||||||
|
checkArgs()
|
||||||
|
|
||||||
|
tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey)
|
||||||
|
if tlsinfo != nil {
|
||||||
|
plog.Infof("ServerTLS: %s", tlsinfo)
|
||||||
|
}
|
||||||
|
m := mustListenCMux(tlsinfo)
|
||||||
|
|
||||||
|
grpcl := m.Match(cmux.HTTP2())
|
||||||
|
defer func() {
|
||||||
|
grpcl.Close()
|
||||||
|
plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
|
||||||
|
}()
|
||||||
|
|
||||||
|
client := mustNewClient()
|
||||||
|
|
||||||
|
srvhttp, httpl := mustHTTPListener(m, tlsinfo)
|
||||||
|
errc := make(chan error)
|
||||||
|
go func() { errc <- newGRPCProxyServer(client).Serve(grpcl) }()
|
||||||
|
go func() { errc <- srvhttp.Serve(httpl) }()
|
||||||
|
go func() { errc <- m.Serve() }()
|
||||||
|
if len(grpcProxyMetricsListenAddr) > 0 {
|
||||||
|
mhttpl := mustMetricsListener(tlsinfo)
|
||||||
|
go func() {
|
||||||
|
mux := http.NewServeMux()
|
||||||
|
mux.Handle("/metrics", prometheus.Handler())
|
||||||
|
plog.Fatal(http.Serve(mhttpl, mux))
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// grpc-proxy is initialized, ready to serve
|
||||||
|
notifySystemd()
|
||||||
|
|
||||||
|
fmt.Fprintln(os.Stderr, <-errc)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkArgs() {
|
||||||
if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL < 1 {
|
if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL < 1 {
|
||||||
fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-ttl %d", grpcProxyResolverTTL))
|
fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-ttl %d", grpcProxyResolverTTL))
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
@@ -107,40 +165,76 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
|
|||||||
fmt.Fprintln(os.Stderr, fmt.Errorf("invalid advertise-client-url %q", grpcProxyAdvertiseClientURL))
|
fmt.Fprintln(os.Stderr, fmt.Errorf("invalid advertise-client-url %q", grpcProxyAdvertiseClientURL))
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func mustNewClient() *clientv3.Client {
|
||||||
srvs := discoverEndpoints(grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery)
|
srvs := discoverEndpoints(grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery)
|
||||||
if len(srvs.Endpoints) != 0 {
|
eps := srvs.Endpoints
|
||||||
grpcProxyEndpoints = srvs.Endpoints
|
if len(eps) == 0 {
|
||||||
|
eps = grpcProxyEndpoints
|
||||||
}
|
}
|
||||||
|
cfg, err := newClientCfg(eps)
|
||||||
l, err := net.Listen("tcp", grpcProxyListenAddr)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Fprintln(os.Stderr, err)
|
fmt.Fprintln(os.Stderr, err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
if l, err = transport.NewKeepAliveListener(l, "tcp", nil); err != nil {
|
|
||||||
fmt.Fprintln(os.Stderr, err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
plog.Infof("listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
|
|
||||||
defer func() {
|
|
||||||
l.Close()
|
|
||||||
plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
|
|
||||||
}()
|
|
||||||
m := cmux.New(l)
|
|
||||||
|
|
||||||
cfg, err := newClientCfg()
|
|
||||||
if err != nil {
|
|
||||||
fmt.Fprintln(os.Stderr, err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
client, err := clientv3.New(*cfg)
|
client, err := clientv3.New(*cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Fprintln(os.Stderr, err)
|
fmt.Fprintln(os.Stderr, err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
return client
|
||||||
|
}
|
||||||
|
|
||||||
|
func newClientCfg(eps []string) (*clientv3.Config, error) {
|
||||||
|
// set tls if any one tls option set
|
||||||
|
cfg := clientv3.Config{
|
||||||
|
Endpoints: eps,
|
||||||
|
DialTimeout: 5 * time.Second,
|
||||||
|
}
|
||||||
|
if tls := newTLS(grpcProxyCA, grpcProxyCert, grpcProxyKey); tls != nil {
|
||||||
|
clientTLS, err := tls.ClientConfig()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
cfg.TLS = clientTLS
|
||||||
|
plog.Infof("ClientTLS: %s", tls)
|
||||||
|
}
|
||||||
|
// TODO: support insecure tls
|
||||||
|
return &cfg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTLS(ca, cert, key string) *transport.TLSInfo {
|
||||||
|
if ca == "" && cert == "" && key == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return &transport.TLSInfo{CAFile: ca, CertFile: cert, KeyFile: key}
|
||||||
|
}
|
||||||
|
|
||||||
|
func mustListenCMux(tlsinfo *transport.TLSInfo) cmux.CMux {
|
||||||
|
l, err := net.Listen("tcp", grpcProxyListenAddr)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintln(os.Stderr, err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
var tlscfg *tls.Config
|
||||||
|
scheme := "http"
|
||||||
|
if tlsinfo != nil {
|
||||||
|
if tlscfg, err = tlsinfo.ServerConfig(); err != nil {
|
||||||
|
plog.Fatal(err)
|
||||||
|
}
|
||||||
|
scheme = "https"
|
||||||
|
}
|
||||||
|
if l, err = transport.NewKeepAliveListener(l, scheme, tlscfg); err != nil {
|
||||||
|
fmt.Fprintln(os.Stderr, err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
plog.Infof("listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
|
||||||
|
return cmux.New(l)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newGRPCProxyServer(client *clientv3.Client) *grpc.Server {
|
||||||
if len(grpcProxyNamespace) > 0 {
|
if len(grpcProxyNamespace) > 0 {
|
||||||
client.KV = namespace.NewKV(client.KV, grpcProxyNamespace)
|
client.KV = namespace.NewKV(client.KV, grpcProxyNamespace)
|
||||||
client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)
|
client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)
|
||||||
@@ -162,7 +256,9 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
|
|||||||
server := grpc.NewServer(
|
server := grpc.NewServer(
|
||||||
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
|
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
|
||||||
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
|
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
|
||||||
|
grpc.MaxConcurrentStreams(math.MaxUint32),
|
||||||
)
|
)
|
||||||
|
|
||||||
pb.RegisterKVServer(server, kvp)
|
pb.RegisterKVServer(server, kvp)
|
||||||
pb.RegisterWatchServer(server, watchp)
|
pb.RegisterWatchServer(server, watchp)
|
||||||
pb.RegisterClusterServer(server, clusterp)
|
pb.RegisterClusterServer(server, clusterp)
|
||||||
@@ -171,12 +267,10 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
|
|||||||
pb.RegisterAuthServer(server, authp)
|
pb.RegisterAuthServer(server, authp)
|
||||||
v3electionpb.RegisterElectionServer(server, electionp)
|
v3electionpb.RegisterElectionServer(server, electionp)
|
||||||
v3lockpb.RegisterLockServer(server, lockp)
|
v3lockpb.RegisterLockServer(server, lockp)
|
||||||
|
return server
|
||||||
|
}
|
||||||
|
|
||||||
errc := make(chan error)
|
func mustHTTPListener(m cmux.CMux, tlsinfo *transport.TLSInfo) (*http.Server, net.Listener) {
|
||||||
|
|
||||||
grpcl := m.Match(cmux.HTTP2())
|
|
||||||
go func() { errc <- server.Serve(grpcl) }()
|
|
||||||
|
|
||||||
httpmux := http.NewServeMux()
|
httpmux := http.NewServeMux()
|
||||||
httpmux.HandleFunc("/", http.NotFound)
|
httpmux.HandleFunc("/", http.NotFound)
|
||||||
httpmux.Handle("/metrics", prometheus.Handler())
|
httpmux.Handle("/metrics", prometheus.Handler())
|
||||||
@@ -186,61 +280,31 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
|
|||||||
}
|
}
|
||||||
plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf)
|
plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf)
|
||||||
}
|
}
|
||||||
|
srvhttp := &http.Server{Handler: httpmux}
|
||||||
|
|
||||||
srvhttp := &http.Server{
|
if tlsinfo == nil {
|
||||||
Handler: httpmux,
|
return srvhttp, m.Match(cmux.HTTP1())
|
||||||
}
|
}
|
||||||
|
|
||||||
var httpl net.Listener
|
srvTLS, err := tlsinfo.ServerConfig()
|
||||||
if cfg.TLS != nil {
|
if err != nil {
|
||||||
srvhttp.TLSConfig = cfg.TLS
|
plog.Fatalf("could not setup TLS (%v)", err)
|
||||||
httpl = tls.NewListener(m.Match(cmux.Any()), cfg.TLS)
|
|
||||||
} else {
|
|
||||||
httpl = m.Match(cmux.HTTP1())
|
|
||||||
}
|
}
|
||||||
go func() { errc <- srvhttp.Serve(httpl) }()
|
srvhttp.TLSConfig = srvTLS
|
||||||
|
return srvhttp, m.Match(cmux.Any())
|
||||||
go func() { errc <- m.Serve() }()
|
|
||||||
|
|
||||||
// grpc-proxy is initialized, ready to serve
|
|
||||||
notifySystemd()
|
|
||||||
|
|
||||||
fmt.Fprintln(os.Stderr, <-errc)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newClientCfg() (*clientv3.Config, error) {
|
func mustMetricsListener(tlsinfo *transport.TLSInfo) net.Listener {
|
||||||
// set tls if any one tls option set
|
murl, err := url.Parse(grpcProxyMetricsListenAddr)
|
||||||
var cfgtls *transport.TLSInfo
|
if err != nil {
|
||||||
tlsinfo := transport.TLSInfo{}
|
fmt.Fprintf(os.Stderr, "cannot parse %q", grpcProxyMetricsListenAddr)
|
||||||
if grpcProxyCert != "" {
|
os.Exit(1)
|
||||||
tlsinfo.CertFile = grpcProxyCert
|
|
||||||
cfgtls = &tlsinfo
|
|
||||||
}
|
}
|
||||||
|
ml, err := transport.NewListener(murl.Host, murl.Scheme, tlsinfo)
|
||||||
if grpcProxyKey != "" {
|
if err != nil {
|
||||||
tlsinfo.KeyFile = grpcProxyKey
|
fmt.Fprintln(os.Stderr, err)
|
||||||
cfgtls = &tlsinfo
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
plog.Info("grpc-proxy: listening for metrics on ", murl.String())
|
||||||
if grpcProxyCA != "" {
|
return ml
|
||||||
tlsinfo.CAFile = grpcProxyCA
|
|
||||||
cfgtls = &tlsinfo
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg := clientv3.Config{
|
|
||||||
Endpoints: grpcProxyEndpoints,
|
|
||||||
DialTimeout: 5 * time.Second,
|
|
||||||
}
|
|
||||||
if cfgtls != nil {
|
|
||||||
clientTLS, err := cfgtls.ClientConfig()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
cfg.TLS = clientTLS
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: support insecure tls
|
|
||||||
|
|
||||||
return &cfg, nil
|
|
||||||
}
|
}
|
||||||
|
@@ -15,6 +15,7 @@
|
|||||||
package etcdserver
|
package etcdserver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
goruntime "runtime"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/runtime"
|
"github.com/coreos/etcd/pkg/runtime"
|
||||||
@@ -41,6 +42,18 @@ var (
|
|||||||
Name: "leader_changes_seen_total",
|
Name: "leader_changes_seen_total",
|
||||||
Help: "The number of leader changes seen.",
|
Help: "The number of leader changes seen.",
|
||||||
})
|
})
|
||||||
|
heartbeatSendFailures = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "server",
|
||||||
|
Name: "heartbeat_send_failures_total",
|
||||||
|
Help: "The total number of leader heartbeat send failures (likely overloaded from slow disk).",
|
||||||
|
})
|
||||||
|
slowApplies = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "server",
|
||||||
|
Name: "slow_apply_total",
|
||||||
|
Help: "The total number of slow apply requests (likely overloaded from slow disk).",
|
||||||
|
})
|
||||||
proposalsCommitted = prometheus.NewGauge(prometheus.GaugeOpts{
|
proposalsCommitted = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
Namespace: "etcd",
|
Namespace: "etcd",
|
||||||
Subsystem: "server",
|
Subsystem: "server",
|
||||||
@@ -71,6 +84,18 @@ var (
|
|||||||
Name: "lease_expired_total",
|
Name: "lease_expired_total",
|
||||||
Help: "The total number of expired leases.",
|
Help: "The total number of expired leases.",
|
||||||
})
|
})
|
||||||
|
slowReadIndex = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "server",
|
||||||
|
Name: "slow_read_indexes_total",
|
||||||
|
Help: "The total number of pending read indexes not in sync with leader's or timed out read index requests.",
|
||||||
|
})
|
||||||
|
quotaBackendBytes = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "server",
|
||||||
|
Name: "quota_backend_bytes",
|
||||||
|
Help: "Current backend storage quota size in bytes.",
|
||||||
|
})
|
||||||
currentVersion = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
currentVersion = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
Namespace: "etcd",
|
Namespace: "etcd",
|
||||||
Subsystem: "server",
|
Subsystem: "server",
|
||||||
@@ -78,22 +103,37 @@ var (
|
|||||||
Help: "Which version is running. 1 for 'server_version' label with current version.",
|
Help: "Which version is running. 1 for 'server_version' label with current version.",
|
||||||
},
|
},
|
||||||
[]string{"server_version"})
|
[]string{"server_version"})
|
||||||
|
currentGoVersion = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "server",
|
||||||
|
Name: "go_version",
|
||||||
|
Help: "Which Go version server is running with. 1 for 'server_go_version' label with current version.",
|
||||||
|
},
|
||||||
|
[]string{"server_go_version"})
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
prometheus.MustRegister(hasLeader)
|
prometheus.MustRegister(hasLeader)
|
||||||
prometheus.MustRegister(isLeader)
|
prometheus.MustRegister(isLeader)
|
||||||
prometheus.MustRegister(leaderChanges)
|
prometheus.MustRegister(leaderChanges)
|
||||||
|
prometheus.MustRegister(heartbeatSendFailures)
|
||||||
|
prometheus.MustRegister(slowApplies)
|
||||||
prometheus.MustRegister(proposalsCommitted)
|
prometheus.MustRegister(proposalsCommitted)
|
||||||
prometheus.MustRegister(proposalsApplied)
|
prometheus.MustRegister(proposalsApplied)
|
||||||
prometheus.MustRegister(proposalsPending)
|
prometheus.MustRegister(proposalsPending)
|
||||||
prometheus.MustRegister(proposalsFailed)
|
prometheus.MustRegister(proposalsFailed)
|
||||||
prometheus.MustRegister(leaseExpired)
|
prometheus.MustRegister(leaseExpired)
|
||||||
|
prometheus.MustRegister(slowReadIndex)
|
||||||
|
prometheus.MustRegister(quotaBackendBytes)
|
||||||
prometheus.MustRegister(currentVersion)
|
prometheus.MustRegister(currentVersion)
|
||||||
|
prometheus.MustRegister(currentGoVersion)
|
||||||
|
|
||||||
currentVersion.With(prometheus.Labels{
|
currentVersion.With(prometheus.Labels{
|
||||||
"server_version": version.Version,
|
"server_version": version.Version,
|
||||||
}).Set(1)
|
}).Set(1)
|
||||||
|
currentGoVersion.With(prometheus.Labels{
|
||||||
|
"server_go_version": goruntime.Version(),
|
||||||
|
}).Set(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func monitorFileDescriptor(done <-chan struct{}) {
|
func monitorFileDescriptor(done <-chan struct{}) {
|
||||||
|
@@ -14,9 +14,7 @@
|
|||||||
|
|
||||||
package etcdserver
|
package etcdserver
|
||||||
|
|
||||||
import (
|
import pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// DefaultQuotaBytes is the number of bytes the backend Size may
|
// DefaultQuotaBytes is the number of bytes the backend Size may
|
||||||
@@ -58,15 +56,20 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func NewBackendQuota(s *EtcdServer) Quota {
|
func NewBackendQuota(s *EtcdServer) Quota {
|
||||||
|
quotaBackendBytes.Set(float64(s.Cfg.QuotaBackendBytes))
|
||||||
|
|
||||||
if s.Cfg.QuotaBackendBytes < 0 {
|
if s.Cfg.QuotaBackendBytes < 0 {
|
||||||
// disable quotas if negative
|
// disable quotas if negative
|
||||||
plog.Warningf("disabling backend quota")
|
plog.Warningf("disabling backend quota")
|
||||||
return &passthroughQuota{}
|
return &passthroughQuota{}
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.Cfg.QuotaBackendBytes == 0 {
|
if s.Cfg.QuotaBackendBytes == 0 {
|
||||||
// use default size if no quota size given
|
// use default size if no quota size given
|
||||||
|
quotaBackendBytes.Set(float64(DefaultQuotaBytes))
|
||||||
return &backendQuota{s, DefaultQuotaBytes}
|
return &backendQuota{s, DefaultQuotaBytes}
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.Cfg.QuotaBackendBytes > MaxQuotaBytes {
|
if s.Cfg.QuotaBackendBytes > MaxQuotaBytes {
|
||||||
plog.Warningf("backend quota %v exceeds maximum recommended quota %v", s.Cfg.QuotaBackendBytes, MaxQuotaBytes)
|
plog.Warningf("backend quota %v exceeds maximum recommended quota %v", s.Cfg.QuotaBackendBytes, MaxQuotaBytes)
|
||||||
}
|
}
|
||||||
|
@@ -346,6 +346,7 @@ func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
|
|||||||
// TODO: limit request rate.
|
// TODO: limit request rate.
|
||||||
plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
|
plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
|
||||||
plog.Warningf("server is likely overloaded")
|
plog.Warningf("server is likely overloaded")
|
||||||
|
heartbeatSendFailures.Inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -146,6 +146,7 @@ func warnOfExpensiveGenericRequest(now time.Time, reqStringer fmt.Stringer, pref
|
|||||||
result = resp
|
result = resp
|
||||||
}
|
}
|
||||||
plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d)
|
plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d)
|
||||||
|
slowApplies.Inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -597,8 +597,9 @@ func (s *EtcdServer) linearizableReadLoop() {
|
|||||||
var rs raft.ReadState
|
var rs raft.ReadState
|
||||||
|
|
||||||
for {
|
for {
|
||||||
ctx := make([]byte, 8)
|
ctxToSend := make([]byte, 8)
|
||||||
binary.BigEndian.PutUint64(ctx, s.reqIDGen.Next())
|
id1 := s.reqIDGen.Next()
|
||||||
|
binary.BigEndian.PutUint64(ctxToSend, id1)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-s.readwaitc:
|
case <-s.readwaitc:
|
||||||
@@ -614,7 +615,7 @@ func (s *EtcdServer) linearizableReadLoop() {
|
|||||||
s.readMu.Unlock()
|
s.readMu.Unlock()
|
||||||
|
|
||||||
cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
|
cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
|
||||||
if err := s.r.ReadIndex(cctx, ctx); err != nil {
|
if err := s.r.ReadIndex(cctx, ctxToSend); err != nil {
|
||||||
cancel()
|
cancel()
|
||||||
if err == raft.ErrStopped {
|
if err == raft.ErrStopped {
|
||||||
return
|
return
|
||||||
@@ -632,16 +633,24 @@ func (s *EtcdServer) linearizableReadLoop() {
|
|||||||
for !timeout && !done {
|
for !timeout && !done {
|
||||||
select {
|
select {
|
||||||
case rs = <-s.r.readStateC:
|
case rs = <-s.r.readStateC:
|
||||||
done = bytes.Equal(rs.RequestCtx, ctx)
|
done = bytes.Equal(rs.RequestCtx, ctxToSend)
|
||||||
if !done {
|
if !done {
|
||||||
// a previous request might time out. now we should ignore the response of it and
|
// a previous request might time out. now we should ignore the response of it and
|
||||||
// continue waiting for the response of the current requests.
|
// continue waiting for the response of the current requests.
|
||||||
plog.Warningf("ignored out-of-date read index response (want %v, got %v)", rs.RequestCtx, ctx)
|
id2 := uint64(0)
|
||||||
|
if len(rs.RequestCtx) == 8 {
|
||||||
|
id2 = binary.BigEndian.Uint64(rs.RequestCtx)
|
||||||
|
}
|
||||||
|
plog.Warningf("ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader (request ID want %d, got %d)", id1, id2)
|
||||||
|
slowReadIndex.Inc()
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-time.After(s.Cfg.ReqTimeout()):
|
case <-time.After(s.Cfg.ReqTimeout()):
|
||||||
plog.Warningf("timed out waiting for read index response")
|
plog.Warningf("timed out waiting for read index response")
|
||||||
nr.notify(ErrTimeout)
|
nr.notify(ErrTimeout)
|
||||||
timeout = true
|
timeout = true
|
||||||
|
slowReadIndex.Inc()
|
||||||
|
|
||||||
case <-s.stopping:
|
case <-s.stopping:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -691,6 +700,5 @@ func (s *EtcdServer) AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error
|
|||||||
return authInfo, nil
|
return authInfo, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.AuthStore().AuthInfoFromCtx(ctx)
|
return s.AuthStore().AuthInfoFromCtx(ctx)
|
||||||
}
|
}
|
||||||
|
@@ -54,6 +54,10 @@ type Backend interface {
|
|||||||
Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
|
Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
|
||||||
// Size returns the current size of the backend.
|
// Size returns the current size of the backend.
|
||||||
Size() int64
|
Size() int64
|
||||||
|
// SizeInUse returns the current size of the backend logically in use.
|
||||||
|
// Since the backend can manage free space in a non-byte unit such as
|
||||||
|
// number of pages, the returned value can be not exactly accurate in bytes.
|
||||||
|
SizeInUse() int64
|
||||||
Defrag() error
|
Defrag() error
|
||||||
ForceCommit()
|
ForceCommit()
|
||||||
Close() error
|
Close() error
|
||||||
@@ -74,6 +78,10 @@ type backend struct {
|
|||||||
|
|
||||||
// size is the number of bytes in the backend
|
// size is the number of bytes in the backend
|
||||||
size int64
|
size int64
|
||||||
|
|
||||||
|
// sizeInUse is the number of bytes actually used in the backend
|
||||||
|
sizeInUse int64
|
||||||
|
|
||||||
// commits counts number of commits since start
|
// commits counts number of commits since start
|
||||||
commits int64
|
commits int64
|
||||||
|
|
||||||
@@ -244,6 +252,10 @@ func (b *backend) Size() int64 {
|
|||||||
return atomic.LoadInt64(&b.size)
|
return atomic.LoadInt64(&b.size)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *backend) SizeInUse() int64 {
|
||||||
|
return atomic.LoadInt64(&b.sizeInUse)
|
||||||
|
}
|
||||||
|
|
||||||
func (b *backend) run() {
|
func (b *backend) run() {
|
||||||
defer close(b.donec)
|
defer close(b.donec)
|
||||||
t := time.NewTimer(b.batchInterval)
|
t := time.NewTimer(b.batchInterval)
|
||||||
@@ -272,18 +284,12 @@ func (b *backend) Commits() int64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (b *backend) Defrag() error {
|
func (b *backend) Defrag() error {
|
||||||
err := b.defrag()
|
return b.defrag()
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// commit to update metadata like db.size
|
|
||||||
b.batchTx.Commit()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *backend) defrag() error {
|
func (b *backend) defrag() error {
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
// TODO: make this non-blocking?
|
// TODO: make this non-blocking?
|
||||||
// lock batchTx to ensure nobody is using previous tx, and then
|
// lock batchTx to ensure nobody is using previous tx, and then
|
||||||
// close previous ongoing tx.
|
// close previous ongoing tx.
|
||||||
@@ -341,7 +347,14 @@ func (b *backend) defrag() error {
|
|||||||
|
|
||||||
b.readTx.buf.reset()
|
b.readTx.buf.reset()
|
||||||
b.readTx.tx = b.unsafeBegin(false)
|
b.readTx.tx = b.unsafeBegin(false)
|
||||||
atomic.StoreInt64(&b.size, b.readTx.tx.Size())
|
|
||||||
|
size := b.readTx.tx.Size()
|
||||||
|
db := b.db
|
||||||
|
atomic.StoreInt64(&b.size, size)
|
||||||
|
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
|
||||||
|
|
||||||
|
took := time.Since(now)
|
||||||
|
defragDurations.Observe(took.Seconds())
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -402,7 +415,12 @@ func (b *backend) begin(write bool) *bolt.Tx {
|
|||||||
b.mu.RLock()
|
b.mu.RLock()
|
||||||
tx := b.unsafeBegin(write)
|
tx := b.unsafeBegin(write)
|
||||||
b.mu.RUnlock()
|
b.mu.RUnlock()
|
||||||
atomic.StoreInt64(&b.size, tx.Size())
|
|
||||||
|
size := tx.Size()
|
||||||
|
db := tx.DB()
|
||||||
|
atomic.StoreInt64(&b.size, size)
|
||||||
|
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
|
||||||
|
|
||||||
return tx
|
return tx
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -141,15 +141,15 @@ func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error)
|
|||||||
// Commit commits a previous tx and begins a new writable one.
|
// Commit commits a previous tx and begins a new writable one.
|
||||||
func (t *batchTx) Commit() {
|
func (t *batchTx) Commit() {
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
|
||||||
t.commit(false)
|
t.commit(false)
|
||||||
|
t.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// CommitAndStop commits the previous tx and does not create a new one.
|
// CommitAndStop commits the previous tx and does not create a new one.
|
||||||
func (t *batchTx) CommitAndStop() {
|
func (t *batchTx) CommitAndStop() {
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
|
||||||
t.commit(true)
|
t.commit(true)
|
||||||
|
t.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *batchTx) Unlock() {
|
func (t *batchTx) Unlock() {
|
||||||
@@ -163,21 +163,15 @@ func (t *batchTx) commit(stop bool) {
|
|||||||
// commit the last tx
|
// commit the last tx
|
||||||
if t.tx != nil {
|
if t.tx != nil {
|
||||||
if t.pending == 0 && !stop {
|
if t.pending == 0 && !stop {
|
||||||
t.backend.mu.RLock()
|
|
||||||
defer t.backend.mu.RUnlock()
|
|
||||||
|
|
||||||
// t.tx.DB()==nil if 'CommitAndStop' calls 'batchTx.commit(true)',
|
|
||||||
// which initializes *bolt.Tx.db and *bolt.Tx.meta as nil; panics t.tx.Size().
|
|
||||||
// Server must make sure 'batchTx.commit(false)' does not follow
|
|
||||||
// 'batchTx.commit(true)' (e.g. stopping backend, and inflight Hash call).
|
|
||||||
atomic.StoreInt64(&t.backend.size, t.tx.Size())
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
// gofail: var beforeCommit struct{}
|
// gofail: var beforeCommit struct{}
|
||||||
err := t.tx.Commit()
|
err := t.tx.Commit()
|
||||||
// gofail: var afterCommit struct{}
|
// gofail: var afterCommit struct{}
|
||||||
|
|
||||||
commitDurations.Observe(time.Since(start).Seconds())
|
commitDurations.Observe(time.Since(start).Seconds())
|
||||||
atomic.AddInt64(&t.backend.commits, 1)
|
atomic.AddInt64(&t.backend.commits, 1)
|
||||||
|
|
||||||
@@ -222,21 +216,21 @@ func (t *batchTxBuffered) Unlock() {
|
|||||||
|
|
||||||
func (t *batchTxBuffered) Commit() {
|
func (t *batchTxBuffered) Commit() {
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
|
||||||
t.commit(false)
|
t.commit(false)
|
||||||
|
t.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *batchTxBuffered) CommitAndStop() {
|
func (t *batchTxBuffered) CommitAndStop() {
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
|
||||||
t.commit(true)
|
t.commit(true)
|
||||||
|
t.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *batchTxBuffered) commit(stop bool) {
|
func (t *batchTxBuffered) commit(stop bool) {
|
||||||
// all read txs must be closed to acquire boltdb commit rwlock
|
// all read txs must be closed to acquire boltdb commit rwlock
|
||||||
t.backend.readTx.mu.Lock()
|
t.backend.readTx.mu.Lock()
|
||||||
defer t.backend.readTx.mu.Unlock()
|
|
||||||
t.unsafeCommit(stop)
|
t.unsafeCommit(stop)
|
||||||
|
t.backend.readTx.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *batchTxBuffered) unsafeCommit(stop bool) {
|
func (t *batchTxBuffered) unsafeCommit(stop bool) {
|
||||||
|
@@ -22,7 +22,22 @@ var (
|
|||||||
Subsystem: "disk",
|
Subsystem: "disk",
|
||||||
Name: "backend_commit_duration_seconds",
|
Name: "backend_commit_duration_seconds",
|
||||||
Help: "The latency distributions of commit called by backend.",
|
Help: "The latency distributions of commit called by backend.",
|
||||||
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
|
|
||||||
|
// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
|
||||||
|
// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
|
||||||
|
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
|
||||||
|
})
|
||||||
|
|
||||||
|
defragDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "disk",
|
||||||
|
Name: "backend_defrag_duration_seconds",
|
||||||
|
Help: "The latency distribution of backend defragmentation.",
|
||||||
|
|
||||||
|
// 100 MB usually takes 1 sec, so start with 10 MB of 100 ms
|
||||||
|
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
|
||||||
|
// highest bucket start of 0.1 sec * 2^12 == 409.6 sec
|
||||||
|
Buckets: prometheus.ExponentialBuckets(.1, 2, 13),
|
||||||
})
|
})
|
||||||
|
|
||||||
snapshotDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
|
snapshotDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||||
@@ -30,12 +45,15 @@ var (
|
|||||||
Subsystem: "disk",
|
Subsystem: "disk",
|
||||||
Name: "backend_snapshot_duration_seconds",
|
Name: "backend_snapshot_duration_seconds",
|
||||||
Help: "The latency distribution of backend snapshots.",
|
Help: "The latency distribution of backend snapshots.",
|
||||||
// 10 ms -> 655 seconds
|
|
||||||
|
// lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
|
||||||
|
// highest bucket start of 0.01 sec * 2^16 == 655.36 sec
|
||||||
Buckets: prometheus.ExponentialBuckets(.01, 2, 17),
|
Buckets: prometheus.ExponentialBuckets(.01, 2, 17),
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
prometheus.MustRegister(commitDurations)
|
prometheus.MustRegister(commitDurations)
|
||||||
|
prometheus.MustRegister(defragDurations)
|
||||||
prometheus.MustRegister(snapshotDurations)
|
prometheus.MustRegister(snapshotDurations)
|
||||||
}
|
}
|
||||||
|
@@ -150,8 +150,12 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) Hash() (hash uint32, revision int64, err error) {
|
func (s *store) Hash() (hash uint32, revision int64, err error) {
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
s.b.ForceCommit()
|
s.b.ForceCommit()
|
||||||
h, err := s.b.Hash(DefaultIgnores)
|
h, err := s.b.Hash(DefaultIgnores)
|
||||||
|
|
||||||
|
hashDurations.Observe(time.Since(start).Seconds())
|
||||||
return h, s.currentRev, err
|
return h, s.currentRev, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -245,10 +249,14 @@ func (s *store) Restore(b backend.Backend) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) restore() error {
|
func (s *store) restore() error {
|
||||||
reportDbTotalSizeInBytesMu.Lock()
|
|
||||||
b := s.b
|
b := s.b
|
||||||
|
|
||||||
|
reportDbTotalSizeInBytesMu.Lock()
|
||||||
reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) }
|
reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) }
|
||||||
reportDbTotalSizeInBytesMu.Unlock()
|
reportDbTotalSizeInBytesMu.Unlock()
|
||||||
|
reportDbTotalSizeInUseInBytesMu.Lock()
|
||||||
|
reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) }
|
||||||
|
reportDbTotalSizeInUseInBytesMu.Unlock()
|
||||||
|
|
||||||
min, max := newRevBytes(), newRevBytes()
|
min, max := newRevBytes(), newRevBytes()
|
||||||
revToBytes(revision{main: 1}, min)
|
revToBytes(revision{main: 1}, min)
|
||||||
|
@@ -638,6 +638,7 @@ func (b *fakeBackend) BatchTx() backend.BatchTx
|
|||||||
func (b *fakeBackend) ReadTx() backend.ReadTx { return b.tx }
|
func (b *fakeBackend) ReadTx() backend.ReadTx { return b.tx }
|
||||||
func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil }
|
func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil }
|
||||||
func (b *fakeBackend) Size() int64 { return 0 }
|
func (b *fakeBackend) Size() int64 { return 0 }
|
||||||
|
func (b *fakeBackend) SizeInUse() int64 { return 0 }
|
||||||
func (b *fakeBackend) Snapshot() backend.Snapshot { return nil }
|
func (b *fakeBackend) Snapshot() backend.Snapshot { return nil }
|
||||||
func (b *fakeBackend) ForceCommit() {}
|
func (b *fakeBackend) ForceCommit() {}
|
||||||
func (b *fakeBackend) Defrag() error { return nil }
|
func (b *fakeBackend) Defrag() error { return nil }
|
||||||
|
@@ -131,11 +131,23 @@ var (
|
|||||||
Buckets: prometheus.ExponentialBuckets(100, 2, 14),
|
Buckets: prometheus.ExponentialBuckets(100, 2, 14),
|
||||||
})
|
})
|
||||||
|
|
||||||
dbTotalSize = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
dbTotalSizeDebugging = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||||
Namespace: "etcd_debugging",
|
Namespace: "etcd_debugging",
|
||||||
Subsystem: "mvcc",
|
Subsystem: "mvcc",
|
||||||
Name: "db_total_size_in_bytes",
|
Name: "db_total_size_in_bytes",
|
||||||
Help: "Total size of the underlying database in bytes.",
|
Help: "Total size of the underlying database physically allocated in bytes. Use etcd_mvcc_db_total_size_in_bytes",
|
||||||
|
},
|
||||||
|
func() float64 {
|
||||||
|
reportDbTotalSizeInBytesMu.RLock()
|
||||||
|
defer reportDbTotalSizeInBytesMu.RUnlock()
|
||||||
|
return reportDbTotalSizeInBytes()
|
||||||
|
},
|
||||||
|
)
|
||||||
|
dbTotalSize = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "mvcc",
|
||||||
|
Name: "db_total_size_in_bytes",
|
||||||
|
Help: "Total size of the underlying database physically allocated in bytes.",
|
||||||
},
|
},
|
||||||
func() float64 {
|
func() float64 {
|
||||||
reportDbTotalSizeInBytesMu.RLock()
|
reportDbTotalSizeInBytesMu.RLock()
|
||||||
@@ -145,7 +157,35 @@ var (
|
|||||||
)
|
)
|
||||||
// overridden by mvcc initialization
|
// overridden by mvcc initialization
|
||||||
reportDbTotalSizeInBytesMu sync.RWMutex
|
reportDbTotalSizeInBytesMu sync.RWMutex
|
||||||
reportDbTotalSizeInBytes func() float64 = func() float64 { return 0 }
|
reportDbTotalSizeInBytes = func() float64 { return 0 }
|
||||||
|
|
||||||
|
dbTotalSizeInUse = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "mvcc",
|
||||||
|
Name: "db_total_size_in_use_in_bytes",
|
||||||
|
Help: "Total size of the underlying database logically in use in bytes.",
|
||||||
|
},
|
||||||
|
func() float64 {
|
||||||
|
reportDbTotalSizeInUseInBytesMu.RLock()
|
||||||
|
defer reportDbTotalSizeInUseInBytesMu.RUnlock()
|
||||||
|
return reportDbTotalSizeInUseInBytes()
|
||||||
|
},
|
||||||
|
)
|
||||||
|
// overridden by mvcc initialization
|
||||||
|
reportDbTotalSizeInUseInBytesMu sync.RWMutex
|
||||||
|
reportDbTotalSizeInUseInBytes func() float64 = func() float64 { return 0 }
|
||||||
|
|
||||||
|
hashDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "mvcc",
|
||||||
|
Name: "hash_duration_seconds",
|
||||||
|
Help: "The latency distribution of storage hash operation.",
|
||||||
|
|
||||||
|
// 100 MB usually takes 100 ms, so start with 10 MB of 10 ms
|
||||||
|
// lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
|
||||||
|
// highest bucket start of 0.01 sec * 2^14 == 163.84 sec
|
||||||
|
Buckets: prometheus.ExponentialBuckets(.01, 2, 15),
|
||||||
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@@ -162,7 +202,10 @@ func init() {
|
|||||||
prometheus.MustRegister(indexCompactionPauseDurations)
|
prometheus.MustRegister(indexCompactionPauseDurations)
|
||||||
prometheus.MustRegister(dbCompactionPauseDurations)
|
prometheus.MustRegister(dbCompactionPauseDurations)
|
||||||
prometheus.MustRegister(dbCompactionTotalDurations)
|
prometheus.MustRegister(dbCompactionTotalDurations)
|
||||||
|
prometheus.MustRegister(dbTotalSizeDebugging)
|
||||||
prometheus.MustRegister(dbTotalSize)
|
prometheus.MustRegister(dbTotalSize)
|
||||||
|
prometheus.MustRegister(dbTotalSizeInUse)
|
||||||
|
prometheus.MustRegister(hashDurations)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReportEventReceived reports that an event is received.
|
// ReportEventReceived reports that an event is received.
|
||||||
|
@@ -7,32 +7,11 @@ fi
|
|||||||
|
|
||||||
<<COMMENT
|
<<COMMENT
|
||||||
# amd64-e2e
|
# amd64-e2e
|
||||||
bash tests/semaphore.test.bash
|
tests/semaphore.test.bash
|
||||||
|
sudo HOST_TMP_DIR=/tmp TEST_OPTS="PASSES='build release e2e' MANUAL_VER=v3.3.7" make docker-test
|
||||||
|
|
||||||
# 386-e2e
|
# 386-e2e
|
||||||
TEST_ARCH=386 bash tests/semaphore.test.bash
|
sudo HOST_TMP_DIR=/tmp TEST_OPTS="GOARCH=386 PASSES='build e2e'" make docker-test
|
||||||
|
|
||||||
# grpc-proxy
|
|
||||||
TEST_OPTS="PASSES='build grpcproxy'" bash tests/semaphore.test.bash
|
|
||||||
|
|
||||||
# coverage
|
|
||||||
TEST_OPTS="coverage" bash tests/semaphore.test.bash
|
|
||||||
COMMENT
|
COMMENT
|
||||||
|
|
||||||
if [ -z "${TEST_OPTS}" ]; then
|
sudo HOST_TMP_DIR=/tmp TEST_OPTS="PASSES='build release e2e' MANUAL_VER=v3.2.23" make docker-test
|
||||||
TEST_OPTS="PASSES='build release e2e' MANUAL_VER=v3.2.22"
|
|
||||||
fi
|
|
||||||
if [ "${TEST_ARCH}" == "386" ]; then
|
|
||||||
TEST_OPTS="GOARCH=386 PASSES='build e2e'"
|
|
||||||
fi
|
|
||||||
|
|
||||||
echo "Running tests with" ${TEST_OPTS}
|
|
||||||
if [ "${TEST_OPTS}" == "PASSES='build grpcproxy'" ]; then
|
|
||||||
echo "Skip proxy tests for this branch!"
|
|
||||||
exit 0
|
|
||||||
elif [ "${TEST_OPTS}" == "coverage" ]; then
|
|
||||||
echo "Skip coverage tests for this branch!"
|
|
||||||
exit 0
|
|
||||||
else
|
|
||||||
sudo HOST_TMP_DIR=/tmp TEST_OPTS="${TEST_OPTS}" make docker-test
|
|
||||||
fi
|
|
||||||
|
@@ -26,7 +26,7 @@ import (
|
|||||||
var (
|
var (
|
||||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||||
MinClusterVersion = "3.0.0"
|
MinClusterVersion = "3.0.0"
|
||||||
Version = "3.2.23"
|
Version = "3.2.24"
|
||||||
APIVersion = "unknown"
|
APIVersion = "unknown"
|
||||||
|
|
||||||
// Git SHA Value will be set during build
|
// Git SHA Value will be set during build
|
||||||
|
Reference in New Issue
Block a user