Compare commits
63 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
992dbd4d1e | ||
![]() |
b39c0f9471 | ||
![]() |
3381ef1602 | ||
![]() |
c096dc2cc5 | ||
![]() |
3e99b42612 | ||
![]() |
cf7be488a9 | ||
![]() |
65fff06adc | ||
![]() |
87b4e08c29 | ||
![]() |
216be8b79b | ||
![]() |
dfcf82b6ff | ||
![]() |
9197907515 | ||
![]() |
14883cad78 | ||
![]() |
4e7691ddcc | ||
![]() |
8a68ae95ec | ||
![]() |
ef1d332298 | ||
![]() |
116c442615 | ||
![]() |
e07fb41140 | ||
![]() |
2c616b0c3d | ||
![]() |
dd2803c4a6 | ||
![]() |
4855ca62b5 | ||
![]() |
bb205caa68 | ||
![]() |
a1d6802da2 | ||
![]() |
79d80bd259 | ||
![]() |
081519c323 | ||
![]() |
e0d5a028d5 | ||
![]() |
a421a604d6 | ||
![]() |
0fbf49df11 | ||
![]() |
fb5080b306 | ||
![]() |
cac6ce756d | ||
![]() |
9f58e57a3c | ||
![]() |
22c25dd4e7 | ||
![]() |
92a7b5df80 | ||
![]() |
3f1fe618ad | ||
![]() |
b8547734ae | ||
![]() |
78a13e67a0 | ||
![]() |
84d11a51c1 | ||
![]() |
a9c4b98756 | ||
![]() |
5531e3b0f5 | ||
![]() |
c2623bb840 | ||
![]() |
f46b4677c0 | ||
![]() |
09843d5d90 | ||
![]() |
be3e6f6ed5 | ||
![]() |
d84dd18637 | ||
![]() |
b7ff47f9d5 | ||
![]() |
fab24fbdab | ||
![]() |
b3ee996629 | ||
![]() |
06da6cf983 | ||
![]() |
9c00100550 | ||
![]() |
1d7a2ca520 | ||
![]() |
e90934ec71 | ||
![]() |
23c5c71426 | ||
![]() |
72a2483d42 | ||
![]() |
53eae781fe | ||
![]() |
7b1b7def84 | ||
![]() |
df000fd776 | ||
![]() |
fd61be4814 | ||
![]() |
781cc0be83 | ||
![]() |
ebe351e3b4 | ||
![]() |
e31510975a | ||
![]() |
43b0cafcb6 | ||
![]() |
169af4470e | ||
![]() |
c4c487eaca | ||
![]() |
6bb88b9617 |
66
.travis.yml
66
.travis.yml
@@ -14,20 +14,20 @@ notifications:
|
||||
|
||||
env:
|
||||
matrix:
|
||||
- TARGET=linux-amd64-build
|
||||
- TARGET=linux-amd64-unit
|
||||
- TARGET=linux-amd64-integration
|
||||
- TARGET=linux-amd64-functional
|
||||
- TARGET=linux-386-build
|
||||
- TARGET=linux-amd64-unit
|
||||
- TARGET=all-build
|
||||
- TARGET=linux-386-unit
|
||||
- TARGET=darwin-amd64-build
|
||||
- TARGET=windows-amd64-build
|
||||
- TARGET=linux-arm-build
|
||||
- TARGET=linux-arm64-build
|
||||
- TARGET=linux-ppc64le-build
|
||||
|
||||
matrix:
|
||||
fast_finish: true
|
||||
allow_failures:
|
||||
- go: 1.8.7
|
||||
env: TARGET=linux-386-unit
|
||||
exclude:
|
||||
- go: tip
|
||||
env: TARGET=linux-386-unit
|
||||
|
||||
before_install:
|
||||
- if [[ $TRAVIS_GO_VERSION == 1.* ]]; then docker pull gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION}; fi
|
||||
@@ -39,16 +39,6 @@ script:
|
||||
- echo "TRAVIS_GO_VERSION=${TRAVIS_GO_VERSION}"
|
||||
- >
|
||||
case "${TARGET}" in
|
||||
linux-amd64-build)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
/bin/bash -c "GOARCH=amd64 PASSES='build' ./test"
|
||||
;;
|
||||
linux-amd64-unit)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
/bin/bash -c "GOARCH=amd64 PASSES='unit' ./test"
|
||||
;;
|
||||
linux-amd64-integration)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
@@ -59,39 +49,25 @@ script:
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
/bin/bash -c "./build && GOARCH=amd64 PASSES='functional' ./test"
|
||||
;;
|
||||
linux-386-build)
|
||||
linux-amd64-unit)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
/bin/bash -c "GOARCH=386 PASSES='build' ./test"
|
||||
/bin/bash -c "GOARCH=amd64 PASSES='unit' ./test"
|
||||
;;
|
||||
all-build)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
/bin/bash -c "GOARCH=amd64 PASSES='build' ./test \
|
||||
&& GOARCH=386 PASSES='build' ./test \
|
||||
&& GO_BUILD_FLAGS='-v' GOOS=darwin GOARCH=amd64 ./build \
|
||||
&& GO_BUILD_FLAGS='-v' GOOS=windows GOARCH=amd64 ./build \
|
||||
&& GO_BUILD_FLAGS='-v' GOARCH=arm ./build \
|
||||
&& GO_BUILD_FLAGS='-v' GOARCH=arm64 ./build \
|
||||
&& GO_BUILD_FLAGS='-v' GOARCH=ppc64le ./build"
|
||||
;;
|
||||
linux-386-unit)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
/bin/bash -c "GOARCH=386 PASSES='unit' ./test"
|
||||
;;
|
||||
darwin-amd64-build)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
/bin/bash -c "GO_BUILD_FLAGS='-v' GOOS=darwin GOARCH=amd64 ./build"
|
||||
;;
|
||||
windows-amd64-build)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
/bin/bash -c "GO_BUILD_FLAGS='-v' GOOS=windows GOARCH=amd64 ./build"
|
||||
;;
|
||||
linux-arm-build)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
/bin/bash -c "GO_BUILD_FLAGS='-v' GOARCH=arm ./build"
|
||||
;;
|
||||
linux-arm64-build)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
/bin/bash -c "GO_BUILD_FLAGS='-v' GOARCH=arm64 ./build"
|
||||
;;
|
||||
linux-ppc64le-build)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||
/bin/bash -c "GO_BUILD_FLAGS='-v' GOARCH=ppc64le ./build"
|
||||
;;
|
||||
esac
|
||||
|
17
Makefile
17
Makefile
@@ -20,12 +20,16 @@ clean:
|
||||
rm -f ./codecov
|
||||
rm -rf ./agent-*
|
||||
rm -rf ./covdir
|
||||
rm -f ./*.coverprofile
|
||||
rm -f ./*.log
|
||||
rm -f ./bin/Dockerfile-release
|
||||
rm -rf ./bin/*.etcd
|
||||
rm -rf ./default.etcd
|
||||
rm -rf ./tests/e2e/default.etcd
|
||||
rm -rf ./gopath
|
||||
rm -rf ./gopath.proto
|
||||
rm -rf ./release
|
||||
rm -f ./snapshot/localhost:*
|
||||
rm -f ./integration/127.0.0.1:* ./integration/localhost:*
|
||||
rm -f ./clientv3/integration/127.0.0.1:* ./clientv3/integration/localhost:*
|
||||
rm -f ./clientv3/ordering/127.0.0.1:* ./clientv3/ordering/localhost:*
|
||||
@@ -46,7 +50,8 @@ docker-remove:
|
||||
|
||||
|
||||
|
||||
GO_VERSION ?= 1.10.1
|
||||
# GO_VERSION ?= 1.10.3
|
||||
GO_VERSION ?= 1.8.7
|
||||
ETCD_VERSION ?= $(shell git rev-parse --short HEAD || echo "GitNotFound")
|
||||
|
||||
TEST_SUFFIX = $(shell date +%s | base64 | head -c 15)
|
||||
@@ -61,16 +66,16 @@ endif
|
||||
|
||||
# Example:
|
||||
# GO_VERSION=1.8.7 make build-docker-test
|
||||
# GO_VERSION=1.9.5 make build-docker-test
|
||||
# GO_VERSION=1.9.7 make build-docker-test
|
||||
# make build-docker-test
|
||||
#
|
||||
# gcloud docker -- login -u _json_key -p "$(cat /etc/gcp-key-etcd-development.json)" https://gcr.io
|
||||
# GO_VERSION=1.8.7 make push-docker-test
|
||||
# GO_VERSION=1.9.5 make push-docker-test
|
||||
# GO_VERSION=1.9.7 make push-docker-test
|
||||
# make push-docker-test
|
||||
#
|
||||
# gsutil -m acl ch -u allUsers:R -r gs://artifacts.etcd-development.appspot.com
|
||||
# GO_VERSION=1.9.5 make pull-docker-test
|
||||
# GO_VERSION=1.9.7 make pull-docker-test
|
||||
# make pull-docker-test
|
||||
|
||||
build-docker-test:
|
||||
@@ -118,7 +123,7 @@ compile-setup-gopath-with-docker-test:
|
||||
#
|
||||
# Local machine:
|
||||
# TEST_OPTS="PASSES='fmt'" make test
|
||||
# TEST_OPTS="PASSES='fmt bom dep compile build unit'" make test
|
||||
# TEST_OPTS="PASSES='fmt bom dep build unit'" make test
|
||||
# TEST_OPTS="PASSES='build unit release integration_e2e functional'" make test
|
||||
# TEST_OPTS="PASSES='build grpcproxy'" make test
|
||||
#
|
||||
@@ -128,7 +133,7 @@ compile-setup-gopath-with-docker-test:
|
||||
# TEST_OPTS="VERBOSE=2 PASSES='unit'" make docker-test
|
||||
#
|
||||
# Travis CI (test with docker):
|
||||
# TEST_OPTS="PASSES='fmt bom dep compile build unit'" make docker-test
|
||||
# TEST_OPTS="PASSES='fmt bom dep build unit'" make docker-test
|
||||
#
|
||||
# Semaphore CI (test with docker):
|
||||
# TEST_OPTS="PASSES='build unit release integration_e2e functional'" make docker-test
|
||||
|
@@ -58,11 +58,9 @@ func (m *Mutex) Lock(ctx context.Context) error {
|
||||
|
||||
// wait for deletion revisions prior to myKey
|
||||
err = waitDeletes(ctx, client, m.pfx, m.myRev-1)
|
||||
// release lock key if cancelled
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// release lock key if wait failed
|
||||
if err != nil {
|
||||
m.Unlock(client.Ctx())
|
||||
default:
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@@ -308,6 +308,49 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestLeaseKeepAliveFullResponseQueue ensures when response
|
||||
// queue is full thus dropping keepalive response sends,
|
||||
// keepalive request is sent with the same rate of TTL / 3.
|
||||
func TestLeaseKeepAliveFullResponseQueue(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
lapi := clus.Client(0)
|
||||
|
||||
// expect lease keepalive every 10-second
|
||||
lresp, err := lapi.Grant(context.Background(), 30)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create lease %v", err)
|
||||
}
|
||||
id := lresp.ID
|
||||
|
||||
old := clientv3.LeaseResponseChSize
|
||||
defer func() {
|
||||
clientv3.LeaseResponseChSize = old
|
||||
}()
|
||||
clientv3.LeaseResponseChSize = 0
|
||||
|
||||
// never fetch from response queue, and let it become full
|
||||
_, err = lapi.KeepAlive(context.Background(), id)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to keepalive lease %v", err)
|
||||
}
|
||||
|
||||
// TTL should not be refreshed after 3 seconds
|
||||
// expect keepalive to be triggered after TTL/3
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
tr, terr := lapi.TimeToLive(context.Background(), id)
|
||||
if terr != nil {
|
||||
t.Fatalf("failed to get lease information %v", terr)
|
||||
}
|
||||
if tr.TTL >= 29 {
|
||||
t.Errorf("unexpected kept-alive lease TTL %d", tr.TTL)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeaseGrantNewAfterClose(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
|
@@ -63,12 +63,15 @@ const (
|
||||
// defaultTTL is the assumed lease TTL used for the first keepalive
|
||||
// deadline before the actual TTL is known to the client.
|
||||
defaultTTL = 5 * time.Second
|
||||
// a small buffer to store unsent lease responses.
|
||||
leaseResponseChSize = 16
|
||||
// NoLease is a lease ID for the absence of a lease.
|
||||
NoLease LeaseID = 0
|
||||
)
|
||||
|
||||
// LeaseResponseChSize is the size of buffer to store unsent lease responses.
|
||||
// WARNING: DO NOT UPDATE.
|
||||
// Only for testing purposes.
|
||||
var LeaseResponseChSize = 16
|
||||
|
||||
// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
|
||||
//
|
||||
// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
|
||||
@@ -223,7 +226,7 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption
|
||||
}
|
||||
|
||||
func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
|
||||
ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)
|
||||
ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)
|
||||
|
||||
l.mu.Lock()
|
||||
// ensure that recvKeepAliveLoop is still running
|
||||
@@ -421,9 +424,10 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
|
||||
for _, ch := range ka.chs {
|
||||
select {
|
||||
case ch <- karesp:
|
||||
ka.nextKeepAlive = nextKeepAlive
|
||||
default:
|
||||
}
|
||||
// still advance in order to rate-limit keep-alive sends
|
||||
ka.nextKeepAlive = nextKeepAlive
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -417,6 +417,14 @@ func dbStatus(p string) dbstatus {
|
||||
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
|
||||
|
||||
err = db.View(func(tx *bolt.Tx) error {
|
||||
// check snapshot file integrity first
|
||||
var dbErrStrings []string
|
||||
for dbErr := range tx.Check() {
|
||||
dbErrStrings = append(dbErrStrings, dbErr.Error())
|
||||
}
|
||||
if len(dbErrStrings) > 0 {
|
||||
return fmt.Errorf("snapshot file integrity check failed. %d errors found.\n"+strings.Join(dbErrStrings, "\n"), len(dbErrStrings))
|
||||
}
|
||||
ds.TotalSize = tx.Size()
|
||||
c := tx.Cursor()
|
||||
for next, _ := c.First(); next != nil; next, _ = c.Next() {
|
||||
|
@@ -76,11 +76,11 @@ func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http
|
||||
}
|
||||
|
||||
mh := &membersHandler{
|
||||
sec: sec,
|
||||
server: server,
|
||||
cluster: server.Cluster(),
|
||||
timeout: timeout,
|
||||
clock: clockwork.NewRealClock(),
|
||||
sec: sec,
|
||||
server: server,
|
||||
cluster: server.Cluster(),
|
||||
timeout: timeout,
|
||||
clock: clockwork.NewRealClock(),
|
||||
clientCertAuthEnabled: server.Cfg.ClientCertAuthEnabled,
|
||||
}
|
||||
|
||||
@@ -346,6 +346,26 @@ func serveVars(w http.ResponseWriter, r *http.Request) {
|
||||
fmt.Fprintf(w, "\n}\n")
|
||||
}
|
||||
|
||||
var (
|
||||
healthSuccess = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "health_success",
|
||||
Help: "The total number of successful health checks",
|
||||
})
|
||||
healthFailed = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "health_failures",
|
||||
Help: "The total number of failed health checks",
|
||||
})
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(healthSuccess)
|
||||
prometheus.MustRegister(healthFailed)
|
||||
}
|
||||
|
||||
func healthHandler(server *etcdserver.EtcdServer) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if !allowMethod(w, r.Method, "GET") {
|
||||
@@ -353,16 +373,19 @@ func healthHandler(server *etcdserver.EtcdServer) http.HandlerFunc {
|
||||
}
|
||||
if uint64(server.Leader()) == raft.None {
|
||||
http.Error(w, `{"health": "false"}`, http.StatusServiceUnavailable)
|
||||
healthFailed.Inc()
|
||||
return
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
if _, err := server.Do(ctx, etcdserverpb.Request{Method: "QGET"}); err != nil {
|
||||
http.Error(w, `{"health": "false"}`, http.StatusServiceUnavailable)
|
||||
healthFailed.Inc()
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte(`{"health": "true"}`))
|
||||
healthSuccess.Inc()
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -19,6 +19,8 @@ import (
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
|
||||
"github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
@@ -45,5 +47,8 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
|
||||
pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
|
||||
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))
|
||||
|
||||
// to display all registered metrics with zero values
|
||||
grpc_prometheus.Register(grpcServer)
|
||||
|
||||
return grpcServer
|
||||
}
|
||||
|
@@ -24,10 +24,13 @@ import (
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft"
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -40,7 +43,7 @@ type streamsMap struct {
|
||||
}
|
||||
|
||||
func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||||
if !api.IsCapabilityEnabled(api.V3rpcCapability) {
|
||||
return nil, rpctypes.ErrGRPCNotCapable
|
||||
}
|
||||
@@ -54,10 +57,116 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
|
||||
}
|
||||
}
|
||||
|
||||
return prometheus.UnaryServerInterceptor(ctx, req, info, handler)
|
||||
return logUnaryInterceptor(ctx, req, info, handler)
|
||||
// interceptors are chained manually during backporting, for better readability refer to PR #9990
|
||||
}
|
||||
}
|
||||
|
||||
// logUnaryInterceptor is a gRPC server-side interceptor that prints info on incoming requests for debugging purpose
|
||||
func logUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||||
startTime := time.Now()
|
||||
resp, err := prometheus.UnaryServerInterceptor(ctx, req, info, handler)
|
||||
// interceptors are chained manually during backporting, for better readability refer to PR #9990
|
||||
if plog.LevelAt(capnslog.DEBUG) {
|
||||
defer logUnaryRequestStats(ctx, info, startTime, req, resp)
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func logUnaryRequestStats(ctx context.Context, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) {
|
||||
duration := time.Since(startTime)
|
||||
remote := "No remote client info."
|
||||
peerInfo, ok := peer.FromContext(ctx)
|
||||
if ok {
|
||||
remote = peerInfo.Addr.String()
|
||||
}
|
||||
var responseType string = info.FullMethod
|
||||
var reqCount, respCount int64
|
||||
var reqSize, respSize int
|
||||
var reqContent string
|
||||
switch _resp := resp.(type) {
|
||||
case *pb.RangeResponse:
|
||||
_req, ok := req.(*pb.RangeRequest)
|
||||
if ok {
|
||||
reqCount = 0
|
||||
reqSize = _req.Size()
|
||||
reqContent = _req.String()
|
||||
}
|
||||
if _resp != nil {
|
||||
respCount = _resp.Count
|
||||
respSize = _resp.Size()
|
||||
}
|
||||
case *pb.PutResponse:
|
||||
_req, ok := req.(*pb.PutRequest)
|
||||
if ok {
|
||||
reqCount = 1
|
||||
reqSize = _req.Size()
|
||||
reqContent = pb.NewLoggablePutRequest(_req).String()
|
||||
// redact value field from request content, see PR #9821
|
||||
}
|
||||
if _resp != nil {
|
||||
respCount = 0
|
||||
respSize = _resp.Size()
|
||||
}
|
||||
case *pb.DeleteRangeResponse:
|
||||
_req, ok := req.(*pb.DeleteRangeRequest)
|
||||
if ok {
|
||||
reqCount = 0
|
||||
reqSize = _req.Size()
|
||||
reqContent = _req.String()
|
||||
}
|
||||
if _resp != nil {
|
||||
respCount = _resp.Deleted
|
||||
respSize = _resp.Size()
|
||||
}
|
||||
case *pb.TxnResponse:
|
||||
_req, ok := req.(*pb.TxnRequest)
|
||||
if ok && _resp != nil {
|
||||
if _resp.Succeeded { // determine the 'actual' count and size of request based on success or failure
|
||||
reqCount = int64(len(_req.GetSuccess()))
|
||||
reqSize = 0
|
||||
for _, r := range _req.GetSuccess() {
|
||||
reqSize += r.Size()
|
||||
}
|
||||
} else {
|
||||
reqCount = int64(len(_req.GetFailure()))
|
||||
reqSize = 0
|
||||
for _, r := range _req.GetFailure() {
|
||||
reqSize += r.Size()
|
||||
}
|
||||
}
|
||||
reqContent = pb.NewLoggableTxnRequest(_req).String()
|
||||
// redact value field from request content, see PR #9821
|
||||
}
|
||||
if _resp != nil {
|
||||
respCount = 0
|
||||
respSize = _resp.Size()
|
||||
}
|
||||
default:
|
||||
reqCount = -1
|
||||
reqSize = -1
|
||||
respCount = -1
|
||||
respSize = -1
|
||||
}
|
||||
|
||||
logGenericRequestStats(startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
|
||||
}
|
||||
|
||||
func logGenericRequestStats(startTime time.Time, duration time.Duration, remote string, responseType string,
|
||||
reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
|
||||
plog.Debugf("start time = %v, "+
|
||||
"time spent = %v, "+
|
||||
"remote = %s, "+
|
||||
"response type = %s, "+
|
||||
"request count = %d, "+
|
||||
"request size = %d, "+
|
||||
"response count = %d, "+
|
||||
"response size = %d, "+
|
||||
"request content = %s",
|
||||
startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent,
|
||||
)
|
||||
}
|
||||
|
||||
func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
|
||||
smap := monitorLeader(s)
|
||||
|
||||
|
@@ -95,6 +95,9 @@ func (s *EtcdServer) newApplierV3() applierV3 {
|
||||
|
||||
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
|
||||
ar := &applyResult{}
|
||||
defer func(start time.Time) {
|
||||
warnOfExpensiveRequest(start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
|
||||
}(time.Now())
|
||||
|
||||
// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
|
||||
switch {
|
||||
|
@@ -105,10 +105,12 @@ func (a *applierV2store) Sync(r *pb.Request) Response {
|
||||
return Response{}
|
||||
}
|
||||
|
||||
// applyV2Request interprets r as a call to store.X and returns a Response interpreted
|
||||
// from store.Event
|
||||
// applyV2Request interprets r as a call to v2store.X
|
||||
// and returns a Response interpreted from v2store.Event
|
||||
func (s *EtcdServer) applyV2Request(r *pb.Request) Response {
|
||||
defer warnOfExpensiveRequest(time.Now(), r, nil, nil)
|
||||
toTTLOptions(r)
|
||||
|
||||
switch r.Method {
|
||||
case "POST":
|
||||
return s.applyV2.Post(r)
|
||||
|
83
etcdserver/backend.go
Normal file
83
etcdserver/backend.go
Normal file
@@ -0,0 +1,83 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package etcdserver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/mvcc"
|
||||
"github.com/coreos/etcd/mvcc/backend"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/snap"
|
||||
)
|
||||
|
||||
func newBackend(cfg *ServerConfig) backend.Backend {
|
||||
return backend.NewDefaultBackend(backendPath(cfg))
|
||||
}
|
||||
|
||||
func backendPath(cfg *ServerConfig) string {
|
||||
return filepath.Join(cfg.SnapDir(), "db")
|
||||
}
|
||||
|
||||
// openSnapshotBackend renames a snapshot db to the current etcd db and opens it.
|
||||
func openSnapshotBackend(cfg *ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) {
|
||||
snapPath, err := ss.DBFilePath(snapshot.Metadata.Index)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to find database snapshot file (%v)", err)
|
||||
}
|
||||
if err := os.Rename(snapPath, backendPath(cfg)); err != nil {
|
||||
return nil, fmt.Errorf("failed to rename database snapshot file (%v)", err)
|
||||
}
|
||||
return openBackend(cfg), nil
|
||||
}
|
||||
|
||||
// openBackend returns a backend using the current etcd db.
|
||||
func openBackend(cfg *ServerConfig) backend.Backend {
|
||||
fn := backendPath(cfg)
|
||||
beOpened := make(chan backend.Backend)
|
||||
go func() {
|
||||
beOpened <- newBackend(cfg)
|
||||
}()
|
||||
|
||||
select {
|
||||
case be := <-beOpened:
|
||||
return be
|
||||
|
||||
case <-time.After(10 * time.Second):
|
||||
plog.Warningf("another etcd process is using %q and holds the file lock, or loading backend file is taking >10 seconds", fn)
|
||||
plog.Warningf("waiting for it to exit before starting...")
|
||||
}
|
||||
|
||||
return <-beOpened
|
||||
}
|
||||
|
||||
// recoverBackendSnapshot recovers the DB from a snapshot in case etcd crashes
|
||||
// before updating the backend db after persisting raft snapshot to disk,
|
||||
// violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this
|
||||
// case, replace the db with the snapshot db sent by the leader.
|
||||
func recoverSnapshotBackend(cfg *ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
|
||||
var cIndex consistentIndex
|
||||
kv := mvcc.New(oldbe, &lease.FakeLessor{}, &cIndex)
|
||||
defer kv.Close()
|
||||
if snapshot.Metadata.Index <= kv.ConsistentIndex() {
|
||||
return oldbe, nil
|
||||
}
|
||||
oldbe.Close()
|
||||
return openSnapshotBackend(cfg, snap.New(cfg.SnapDir()), snapshot)
|
||||
}
|
175
etcdserver/etcdserverpb/raft_internal_stringer.go
Normal file
175
etcdserver/etcdserverpb/raft_internal_stringer.go
Normal file
@@ -0,0 +1,175 @@
|
||||
// Copyright 2018 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package etcdserverpb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
proto "github.com/golang/protobuf/proto"
|
||||
)
|
||||
|
||||
// InternalRaftStringer implements custom proto Stringer:
|
||||
// redact password, replace value fields with value_size fields.
|
||||
type InternalRaftStringer struct {
|
||||
Request *InternalRaftRequest
|
||||
}
|
||||
|
||||
func (as *InternalRaftStringer) String() string {
|
||||
switch {
|
||||
case as.Request.LeaseGrant != nil:
|
||||
return fmt.Sprintf("header:<%s> lease_grant:<ttl:%d-second id:%016x>",
|
||||
as.Request.Header.String(),
|
||||
as.Request.LeaseGrant.TTL,
|
||||
as.Request.LeaseGrant.ID,
|
||||
)
|
||||
case as.Request.LeaseRevoke != nil:
|
||||
return fmt.Sprintf("header:<%s> lease_revoke:<id:%016x>",
|
||||
as.Request.Header.String(),
|
||||
as.Request.LeaseRevoke.ID,
|
||||
)
|
||||
case as.Request.Authenticate != nil:
|
||||
return fmt.Sprintf("header:<%s> authenticate:<name:%s simple_token:%s>",
|
||||
as.Request.Header.String(),
|
||||
as.Request.Authenticate.Name,
|
||||
as.Request.Authenticate.SimpleToken,
|
||||
)
|
||||
case as.Request.AuthUserAdd != nil:
|
||||
return fmt.Sprintf("header:<%s> auth_user_add:<name:%s>",
|
||||
as.Request.Header.String(),
|
||||
as.Request.AuthUserAdd.Name,
|
||||
)
|
||||
case as.Request.AuthUserChangePassword != nil:
|
||||
return fmt.Sprintf("header:<%s> auth_user_change_password:<name:%s>",
|
||||
as.Request.Header.String(),
|
||||
as.Request.AuthUserChangePassword.Name,
|
||||
)
|
||||
case as.Request.Put != nil:
|
||||
return fmt.Sprintf("header:<%s> put:<%s>",
|
||||
as.Request.Header.String(),
|
||||
NewLoggablePutRequest(as.Request.Put).String(),
|
||||
)
|
||||
case as.Request.Txn != nil:
|
||||
return fmt.Sprintf("header:<%s> txn:<%s>",
|
||||
as.Request.Header.String(),
|
||||
NewLoggableTxnRequest(as.Request.Txn).String(),
|
||||
)
|
||||
default:
|
||||
// nothing to redact
|
||||
}
|
||||
return as.Request.String()
|
||||
}
|
||||
|
||||
// txnRequestStringer implements a custom proto String to replace value bytes fields with value size
|
||||
// fields in any nested txn and put operations.
|
||||
type txnRequestStringer struct {
|
||||
Request *TxnRequest
|
||||
}
|
||||
|
||||
func NewLoggableTxnRequest(request *TxnRequest) *txnRequestStringer {
|
||||
return &txnRequestStringer{request}
|
||||
}
|
||||
|
||||
func (as *txnRequestStringer) String() string {
|
||||
var compare []string
|
||||
for _, c := range as.Request.Compare {
|
||||
switch cv := c.TargetUnion.(type) {
|
||||
case *Compare_Value:
|
||||
compare = append(compare, newLoggableValueCompare(c, cv).String())
|
||||
default:
|
||||
// nothing to redact
|
||||
compare = append(compare, c.String())
|
||||
}
|
||||
}
|
||||
var success []string
|
||||
for _, s := range as.Request.Success {
|
||||
success = append(success, newLoggableRequestOp(s).String())
|
||||
}
|
||||
var failure []string
|
||||
for _, f := range as.Request.Failure {
|
||||
failure = append(failure, newLoggableRequestOp(f).String())
|
||||
}
|
||||
return fmt.Sprintf("compare:<%s> success:<%s> failure:<%s>",
|
||||
strings.Join(compare, " "),
|
||||
strings.Join(success, " "),
|
||||
strings.Join(failure, " "),
|
||||
)
|
||||
}
|
||||
|
||||
// requestOpStringer implements a custom proto String to replace value bytes fields with value
|
||||
// size fields in any nested txn and put operations.
|
||||
type requestOpStringer struct {
|
||||
Op *RequestOp
|
||||
}
|
||||
|
||||
func newLoggableRequestOp(op *RequestOp) *requestOpStringer {
|
||||
return &requestOpStringer{op}
|
||||
}
|
||||
|
||||
func (as *requestOpStringer) String() string {
|
||||
switch op := as.Op.Request.(type) {
|
||||
case *RequestOp_RequestPut:
|
||||
return fmt.Sprintf("request_put:<%s>", NewLoggablePutRequest(op.RequestPut).String())
|
||||
default:
|
||||
// nothing to redact
|
||||
}
|
||||
return as.Op.String()
|
||||
}
|
||||
|
||||
// loggableValueCompare implements a custom proto String for Compare.Value union member types to
|
||||
// replace the value bytes field with a value size field.
|
||||
// To preserve proto encoding of the key and range_end bytes, a faked out proto type is used here.
|
||||
type loggableValueCompare struct {
|
||||
Result Compare_CompareResult `protobuf:"varint,1,opt,name=result,proto3,enum=etcdserverpb.Compare_CompareResult"`
|
||||
Target Compare_CompareTarget `protobuf:"varint,2,opt,name=target,proto3,enum=etcdserverpb.Compare_CompareTarget"`
|
||||
Key []byte `protobuf:"bytes,3,opt,name=key,proto3"`
|
||||
ValueSize int `protobuf:"bytes,7,opt,name=value_size,proto3"`
|
||||
}
|
||||
|
||||
func newLoggableValueCompare(c *Compare, cv *Compare_Value) *loggableValueCompare {
|
||||
return &loggableValueCompare{
|
||||
c.Result,
|
||||
c.Target,
|
||||
c.Key,
|
||||
len(cv.Value),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *loggableValueCompare) Reset() { *m = loggableValueCompare{} }
|
||||
func (m *loggableValueCompare) String() string { return proto.CompactTextString(m) }
|
||||
func (*loggableValueCompare) ProtoMessage() {}
|
||||
|
||||
// loggablePutRequest implements a custom proto String to replace value bytes field with a value
|
||||
// size field.
|
||||
// To preserve proto encoding of the key bytes, a faked out proto type is used here.
|
||||
type loggablePutRequest struct {
|
||||
Key []byte `protobuf:"bytes,1,opt,name=key,proto3"`
|
||||
ValueSize int `protobuf:"varint,2,opt,name=value_size,proto3"`
|
||||
Lease int64 `protobuf:"varint,3,opt,name=lease,proto3"`
|
||||
PrevKv bool `protobuf:"varint,4,opt,name=prev_kv,proto3"`
|
||||
}
|
||||
|
||||
func NewLoggablePutRequest(request *PutRequest) *loggablePutRequest {
|
||||
return &loggablePutRequest{
|
||||
request.Key,
|
||||
len(request.Value),
|
||||
request.Lease,
|
||||
request.PrevKv,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *loggablePutRequest) Reset() { *m = loggablePutRequest{} }
|
||||
func (m *loggablePutRequest) String() string { return proto.CompactTextString(m) }
|
||||
func (*loggablePutRequest) ProtoMessage() {}
|
@@ -15,9 +15,11 @@
|
||||
package etcdserver
|
||||
|
||||
import (
|
||||
goruntime "runtime"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/pkg/runtime"
|
||||
"github.com/coreos/etcd/version"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
@@ -40,6 +42,18 @@ var (
|
||||
Name: "leader_changes_seen_total",
|
||||
Help: "The number of leader changes seen.",
|
||||
})
|
||||
heartbeatSendFailures = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "heartbeat_send_failures_total",
|
||||
Help: "The total number of leader heartbeat send failures (likely overloaded from slow disk).",
|
||||
})
|
||||
slowApplies = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "slow_apply_total",
|
||||
Help: "The total number of slow apply requests (likely overloaded from slow disk).",
|
||||
})
|
||||
proposalsCommitted = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
@@ -64,16 +78,70 @@ var (
|
||||
Name: "proposals_failed_total",
|
||||
Help: "The total number of failed proposals seen.",
|
||||
})
|
||||
slowReadIndex = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "slow_read_indexes_total",
|
||||
Help: "The total number of pending read indexes not in sync with leader's or timed out read index requests.",
|
||||
})
|
||||
readIndexFailed = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "read_indexes_failed_total",
|
||||
Help: "The total number of failed read indexes seen.",
|
||||
})
|
||||
quotaBackendBytes = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "quota_backend_bytes",
|
||||
Help: "Current backend storage quota size in bytes.",
|
||||
})
|
||||
currentVersion = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "version",
|
||||
Help: "Which version is running. 1 for 'server_version' label with current version.",
|
||||
},
|
||||
[]string{"server_version"})
|
||||
currentGoVersion = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "go_version",
|
||||
Help: "Which Go version server is running with. 1 for 'server_go_version' label with current version.",
|
||||
},
|
||||
[]string{"server_go_version"})
|
||||
serverID = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "id",
|
||||
Help: "Server or member ID in hexadecimal format. 1 for 'server_id' label with current ID.",
|
||||
},
|
||||
[]string{"server_id"})
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(hasLeader)
|
||||
prometheus.MustRegister(isLeader)
|
||||
prometheus.MustRegister(leaderChanges)
|
||||
prometheus.MustRegister(heartbeatSendFailures)
|
||||
prometheus.MustRegister(slowApplies)
|
||||
prometheus.MustRegister(proposalsCommitted)
|
||||
prometheus.MustRegister(proposalsApplied)
|
||||
prometheus.MustRegister(proposalsPending)
|
||||
prometheus.MustRegister(proposalsFailed)
|
||||
prometheus.MustRegister(slowReadIndex)
|
||||
prometheus.MustRegister(readIndexFailed)
|
||||
prometheus.MustRegister(quotaBackendBytes)
|
||||
prometheus.MustRegister(currentVersion)
|
||||
prometheus.MustRegister(currentGoVersion)
|
||||
prometheus.MustRegister(serverID)
|
||||
|
||||
currentVersion.With(prometheus.Labels{
|
||||
"server_version": version.Version,
|
||||
}).Set(1)
|
||||
currentGoVersion.With(prometheus.Labels{
|
||||
"server_go_version": goruntime.Version(),
|
||||
}).Set(1)
|
||||
}
|
||||
|
||||
func monitorFileDescriptor(done <-chan struct{}) {
|
||||
|
@@ -50,15 +50,20 @@ const (
|
||||
)
|
||||
|
||||
func NewBackendQuota(s *EtcdServer) Quota {
|
||||
quotaBackendBytes.Set(float64(s.Cfg.QuotaBackendBytes))
|
||||
|
||||
if s.Cfg.QuotaBackendBytes < 0 {
|
||||
// disable quotas if negative
|
||||
plog.Warningf("disabling backend quota")
|
||||
return &passthroughQuota{}
|
||||
}
|
||||
|
||||
if s.Cfg.QuotaBackendBytes == 0 {
|
||||
quotaBackendBytes.Set(float64(backend.DefaultQuotaBytes))
|
||||
// use default size if no quota size given
|
||||
return &backendQuota{s, backend.DefaultQuotaBytes}
|
||||
}
|
||||
|
||||
if s.Cfg.QuotaBackendBytes > backend.MaxQuotaBytes {
|
||||
plog.Warningf("backend quota %v exceeds maximum quota %v; using maximum", s.Cfg.QuotaBackendBytes, backend.MaxQuotaBytes)
|
||||
return &backendQuota{s, backend.MaxQuotaBytes}
|
||||
|
@@ -293,6 +293,7 @@ func (r *raftNode) sendMessages(ms []raftpb.Message) {
|
||||
// TODO: limit request rate.
|
||||
plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
|
||||
plog.Warningf("server is likely overloaded")
|
||||
heartbeatSendFailures.Inc()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -56,8 +56,10 @@ import (
|
||||
"github.com/coreos/etcd/store"
|
||||
"github.com/coreos/etcd/version"
|
||||
"github.com/coreos/etcd/wal"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
@@ -378,6 +380,10 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
||||
plog.Panicf("recovered store from snapshot error: %v", err)
|
||||
}
|
||||
plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index)
|
||||
|
||||
if be, err = recoverSnapshotBackend(cfg, be, *snapshot); err != nil {
|
||||
plog.Panicf("recovering backend from snapshot error: %v", err)
|
||||
}
|
||||
}
|
||||
cfg.Print()
|
||||
if !cfg.ForceNewCluster {
|
||||
@@ -437,6 +443,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
||||
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
|
||||
forceVersionC: make(chan struct{}),
|
||||
}
|
||||
serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1)
|
||||
|
||||
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
|
||||
|
||||
@@ -796,14 +803,8 @@ func (s *EtcdServer) run() {
|
||||
|
||||
func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
|
||||
s.applySnapshot(ep, apply)
|
||||
st := time.Now()
|
||||
s.applyEntries(ep, apply)
|
||||
d := time.Since(st)
|
||||
entriesNum := len(apply.entries)
|
||||
if entriesNum != 0 && d > time.Duration(entriesNum)*warnApplyDuration {
|
||||
plog.Warningf("apply entries took too long [%v for %d entries]", d, len(apply.entries))
|
||||
plog.Warningf("avoid queries with large range/delete range!")
|
||||
}
|
||||
|
||||
proposalsApplied.Set(float64(ep.appliedi))
|
||||
s.applyWait.Trigger(ep.appliedi)
|
||||
// wait for the raft routine to finish the disk writes before triggering a
|
||||
|
@@ -15,11 +15,16 @@
|
||||
package etcdserver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/etcdserver/membership"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
"github.com/golang/protobuf/proto"
|
||||
)
|
||||
|
||||
// isConnectedToQuorumSince checks whether the local member is connected to the
|
||||
@@ -95,3 +100,56 @@ func (nc *notifier) notify(err error) {
|
||||
nc.err = err
|
||||
close(nc.c)
|
||||
}
|
||||
|
||||
func warnOfExpensiveRequest(now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
|
||||
var resp string
|
||||
if !isNil(respMsg) {
|
||||
resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
|
||||
}
|
||||
warnOfExpensiveGenericRequest(now, reqStringer, "", resp, err)
|
||||
}
|
||||
|
||||
func warnOfExpensiveReadOnlyTxnRequest(now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
|
||||
reqStringer := pb.NewLoggableTxnRequest(r)
|
||||
var resp string
|
||||
if !isNil(txnResponse) {
|
||||
var resps []string
|
||||
for _, r := range txnResponse.Responses {
|
||||
switch op := r.Response.(type) {
|
||||
case *pb.ResponseOp_ResponseRange:
|
||||
resps = append(resps, fmt.Sprintf("range_response_count:%d", len(op.ResponseRange.Kvs)))
|
||||
default:
|
||||
// only range responses should be in a read only txn request
|
||||
}
|
||||
}
|
||||
resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), proto.Size(txnResponse))
|
||||
}
|
||||
warnOfExpensiveGenericRequest(now, reqStringer, "read-only range ", resp, err)
|
||||
}
|
||||
|
||||
func warnOfExpensiveReadOnlyRangeRequest(now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) {
|
||||
var resp string
|
||||
if !isNil(rangeResponse) {
|
||||
resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), proto.Size(rangeResponse))
|
||||
}
|
||||
warnOfExpensiveGenericRequest(now, reqStringer, "read-only range ", resp, err)
|
||||
}
|
||||
|
||||
func warnOfExpensiveGenericRequest(now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
|
||||
// TODO: add metrics
|
||||
d := time.Since(now)
|
||||
if d > warnApplyDuration {
|
||||
var result string
|
||||
if err != nil {
|
||||
result = fmt.Sprintf("error:%v", err)
|
||||
} else {
|
||||
result = resp
|
||||
}
|
||||
plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d)
|
||||
slowApplies.Inc()
|
||||
}
|
||||
}
|
||||
|
||||
func isNil(msg proto.Message) bool {
|
||||
return msg == nil || reflect.ValueOf(msg).IsNil()
|
||||
}
|
||||
|
@@ -95,21 +95,25 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
|
||||
if s.ClusterVersion() == nil || s.ClusterVersion().LessThan(newRangeClusterVersion) {
|
||||
return s.legacyRange(ctx, r)
|
||||
}
|
||||
var resp *pb.RangeResponse
|
||||
var err error
|
||||
defer func(start time.Time) {
|
||||
warnOfExpensiveReadOnlyRangeRequest(start, r, resp, err)
|
||||
}(time.Now())
|
||||
|
||||
if !r.Serializable {
|
||||
err := s.linearizableReadNotify(ctx)
|
||||
err = s.linearizableReadNotify(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
var resp *pb.RangeResponse
|
||||
var err error
|
||||
chk := func(ai *auth.AuthInfo) error {
|
||||
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
|
||||
}
|
||||
get := func() { resp, err = s.applyV3Base.Range(noTxn, r) }
|
||||
if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
||||
return nil, serr
|
||||
err = serr
|
||||
return nil, err
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
@@ -178,6 +182,11 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
|
||||
chk := func(ai *auth.AuthInfo) error {
|
||||
return checkTxnAuth(s.authStore, ai, r)
|
||||
}
|
||||
|
||||
defer func(start time.Time) {
|
||||
warnOfExpensiveReadOnlyTxnRequest(start, r, resp, err)
|
||||
}(time.Now())
|
||||
|
||||
get := func() { resp, err = s.applyV3Base.Txn(r) }
|
||||
if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
||||
return nil, serr
|
||||
@@ -716,8 +725,9 @@ func (s *EtcdServer) linearizableReadLoop() {
|
||||
var rs raft.ReadState
|
||||
|
||||
for {
|
||||
ctx := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(ctx, s.reqIDGen.Next())
|
||||
ctxToSend := make([]byte, 8)
|
||||
id1 := s.reqIDGen.Next()
|
||||
binary.BigEndian.PutUint64(ctxToSend, id1)
|
||||
|
||||
select {
|
||||
case <-s.readwaitc:
|
||||
@@ -733,12 +743,13 @@ func (s *EtcdServer) linearizableReadLoop() {
|
||||
s.readMu.Unlock()
|
||||
|
||||
cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
|
||||
if err := s.r.ReadIndex(cctx, ctx); err != nil {
|
||||
if err := s.r.ReadIndex(cctx, ctxToSend); err != nil {
|
||||
cancel()
|
||||
if err == raft.ErrStopped {
|
||||
return
|
||||
}
|
||||
plog.Errorf("failed to get read index from raft: %v", err)
|
||||
readIndexFailed.Inc()
|
||||
nr.notify(err)
|
||||
continue
|
||||
}
|
||||
@@ -751,16 +762,24 @@ func (s *EtcdServer) linearizableReadLoop() {
|
||||
for !timeout && !done {
|
||||
select {
|
||||
case rs = <-s.r.readStateC:
|
||||
done = bytes.Equal(rs.RequestCtx, ctx)
|
||||
done = bytes.Equal(rs.RequestCtx, ctxToSend)
|
||||
if !done {
|
||||
// a previous request might time out. now we should ignore the response of it and
|
||||
// continue waiting for the response of the current requests.
|
||||
plog.Warningf("ignored out-of-date read index response (want %v, got %v)", rs.RequestCtx, ctx)
|
||||
id2 := uint64(0)
|
||||
if len(rs.RequestCtx) == 8 {
|
||||
id2 = binary.BigEndian.Uint64(rs.RequestCtx)
|
||||
}
|
||||
plog.Warningf("ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader (request ID want %d, got %d)", id1, id2)
|
||||
slowReadIndex.Inc()
|
||||
}
|
||||
|
||||
case <-time.After(s.Cfg.ReqTimeout()):
|
||||
plog.Warningf("timed out waiting for read index response")
|
||||
plog.Warningf("timed out waiting for read index response (local node might have slow network)")
|
||||
nr.notify(ErrTimeout)
|
||||
timeout = true
|
||||
slowReadIndex.Inc()
|
||||
|
||||
case <-s.stopping:
|
||||
return
|
||||
}
|
||||
|
@@ -58,6 +58,10 @@ type Backend interface {
|
||||
Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
|
||||
// Size returns the current size of the backend.
|
||||
Size() int64
|
||||
// SizeInUse returns the current size of the backend logically in use.
|
||||
// Since the backend can manage free space in a non-byte unit such as
|
||||
// number of pages, the returned value can be not exactly accurate in bytes.
|
||||
SizeInUse() int64
|
||||
Defrag() error
|
||||
ForceCommit()
|
||||
Close() error
|
||||
@@ -78,6 +82,10 @@ type backend struct {
|
||||
|
||||
// size is the number of bytes in the backend
|
||||
size int64
|
||||
|
||||
// sizeInUse is the number of bytes actually used in the backend
|
||||
sizeInUse int64
|
||||
|
||||
// commits counts number of commits since start
|
||||
commits int64
|
||||
|
||||
@@ -185,6 +193,10 @@ func (b *backend) Size() int64 {
|
||||
return atomic.LoadInt64(&b.size)
|
||||
}
|
||||
|
||||
func (b *backend) SizeInUse() int64 {
|
||||
return atomic.LoadInt64(&b.sizeInUse)
|
||||
}
|
||||
|
||||
func (b *backend) run() {
|
||||
defer close(b.donec)
|
||||
t := time.NewTimer(b.batchInterval)
|
||||
@@ -213,18 +225,12 @@ func (b *backend) Commits() int64 {
|
||||
}
|
||||
|
||||
func (b *backend) Defrag() error {
|
||||
err := b.defrag()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// commit to update metadata like db.size
|
||||
b.batchTx.Commit()
|
||||
|
||||
return nil
|
||||
return b.defrag()
|
||||
}
|
||||
|
||||
func (b *backend) defrag() error {
|
||||
now := time.Now()
|
||||
|
||||
// TODO: make this non-blocking?
|
||||
// lock batchTx to ensure nobody is using previous tx, and then
|
||||
// close previous ongoing tx.
|
||||
@@ -276,6 +282,14 @@ func (b *backend) defrag() error {
|
||||
plog.Fatalf("cannot begin tx (%s)", err)
|
||||
}
|
||||
|
||||
size := b.batchTx.tx.Size()
|
||||
db := b.db
|
||||
atomic.StoreInt64(&b.size, size)
|
||||
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
|
||||
|
||||
took := time.Since(now)
|
||||
defragDurations.Observe(took.Seconds())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -136,15 +136,15 @@ func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) err
|
||||
// Commit commits a previous tx and begins a new writable one.
|
||||
func (t *batchTx) Commit() {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
t.commit(false)
|
||||
t.Unlock()
|
||||
}
|
||||
|
||||
// CommitAndStop commits the previous tx and do not create a new one.
|
||||
func (t *batchTx) CommitAndStop() {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
t.commit(true)
|
||||
t.Unlock()
|
||||
}
|
||||
|
||||
func (t *batchTx) Unlock() {
|
||||
@@ -160,28 +160,14 @@ func (t *batchTx) commit(stop bool) {
|
||||
// commit the last tx
|
||||
if t.tx != nil {
|
||||
if t.pending == 0 && !stop {
|
||||
t.backend.mu.RLock()
|
||||
defer t.backend.mu.RUnlock()
|
||||
|
||||
// batchTx.commit(true) calls *bolt.Tx.Commit, which
|
||||
// initializes *bolt.Tx.db and *bolt.Tx.meta as nil,
|
||||
// and subsequent *bolt.Tx.Size() call panics.
|
||||
//
|
||||
// This nil pointer reference panic happens when:
|
||||
// 1. batchTx.commit(false) from newBatchTx
|
||||
// 2. batchTx.commit(true) from stopping backend
|
||||
// 3. batchTx.commit(false) from inflight mvcc Hash call
|
||||
//
|
||||
// Check if db is nil to prevent this panic
|
||||
if t.tx.DB() != nil {
|
||||
atomic.StoreInt64(&t.backend.size, t.tx.Size())
|
||||
}
|
||||
return
|
||||
}
|
||||
start := time.Now()
|
||||
|
||||
// gofail: var beforeCommit struct{}
|
||||
err = t.tx.Commit()
|
||||
// gofail: var afterCommit struct{}
|
||||
|
||||
commitDurations.Observe(time.Since(start).Seconds())
|
||||
atomic.AddInt64(&t.backend.commits, 1)
|
||||
|
||||
@@ -202,5 +188,9 @@ func (t *batchTx) commit(stop bool) {
|
||||
if err != nil {
|
||||
plog.Fatalf("cannot begin tx (%s)", err)
|
||||
}
|
||||
atomic.StoreInt64(&t.backend.size, t.tx.Size())
|
||||
|
||||
size := t.tx.Size()
|
||||
db := t.backend.db
|
||||
atomic.StoreInt64(&t.backend.size, size)
|
||||
atomic.StoreInt64(&t.backend.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
|
||||
}
|
||||
|
@@ -22,10 +22,26 @@ var (
|
||||
Subsystem: "disk",
|
||||
Name: "backend_commit_duration_seconds",
|
||||
Help: "The latency distributions of commit called by backend.",
|
||||
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
|
||||
|
||||
// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
|
||||
// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
|
||||
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
|
||||
})
|
||||
|
||||
defragDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "disk",
|
||||
Name: "backend_defrag_duration_seconds",
|
||||
Help: "The latency distribution of backend defragmentation.",
|
||||
|
||||
// 100 MB usually takes 1 sec, so start with 10 MB of 100 ms
|
||||
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
|
||||
// highest bucket start of 0.1 sec * 2^12 == 409.6 sec
|
||||
Buckets: prometheus.ExponentialBuckets(.1, 2, 13),
|
||||
})
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(commitDurations)
|
||||
prometheus.MustRegister(defragDurations)
|
||||
}
|
||||
|
@@ -217,7 +217,6 @@ func (s *store) txnEnd(txnID int64) error {
|
||||
}
|
||||
s.currentRev.sub = 0
|
||||
|
||||
dbTotalSize.Set(float64(s.b.Size()))
|
||||
s.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
@@ -336,9 +335,14 @@ func init() {
|
||||
func (s *store) Hash() (uint32, int64, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
start := time.Now()
|
||||
|
||||
s.b.ForceCommit()
|
||||
|
||||
h, err := s.b.Hash(DefaultIgnores)
|
||||
|
||||
hashDurations.Observe(time.Since(start).Seconds())
|
||||
rev := s.currentRev.main
|
||||
return h, rev, err
|
||||
}
|
||||
@@ -374,6 +378,13 @@ func (s *store) Restore(b backend.Backend) error {
|
||||
}
|
||||
|
||||
func (s *store) restore() error {
|
||||
reportDbTotalSizeInBytesMu.Lock()
|
||||
reportDbTotalSizeInBytes = func() float64 { return float64(s.b.Size()) }
|
||||
reportDbTotalSizeInBytesMu.Unlock()
|
||||
reportDbTotalSizeInUseInBytesMu.Lock()
|
||||
reportDbTotalSizeInUseInBytes = func() float64 { return float64(s.b.SizeInUse()) }
|
||||
reportDbTotalSizeInUseInBytesMu.Unlock()
|
||||
|
||||
min, max := newRevBytes(), newRevBytes()
|
||||
revToBytes(revision{main: 1}, min)
|
||||
revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
|
||||
|
@@ -613,6 +613,7 @@ type fakeBackend struct {
|
||||
func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx }
|
||||
func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil }
|
||||
func (b *fakeBackend) Size() int64 { return 0 }
|
||||
func (b *fakeBackend) SizeInUse() int64 { return 0 }
|
||||
func (b *fakeBackend) Snapshot() backend.Snapshot { return nil }
|
||||
func (b *fakeBackend) ForceCommit() {}
|
||||
func (b *fakeBackend) Defrag() error { return nil }
|
||||
|
@@ -15,6 +15,8 @@
|
||||
package mvcc
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
@@ -129,11 +131,62 @@ var (
|
||||
Buckets: prometheus.ExponentialBuckets(100, 2, 14),
|
||||
})
|
||||
|
||||
dbTotalSize = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
dbTotalSizeDebugging = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Namespace: "etcd_debugging",
|
||||
Subsystem: "mvcc",
|
||||
Name: "db_total_size_in_bytes",
|
||||
Help: "Total size of the underlying database in bytes.",
|
||||
Help: "Total size of the underlying database physically allocated in bytes. Use etcd_mvcc_db_total_size_in_bytes",
|
||||
},
|
||||
func() float64 {
|
||||
reportDbTotalSizeInBytesMu.RLock()
|
||||
defer reportDbTotalSizeInBytesMu.RUnlock()
|
||||
return reportDbTotalSizeInBytes()
|
||||
},
|
||||
)
|
||||
// overridden
|
||||
|
||||
dbTotalSize = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "mvcc",
|
||||
Name: "db_total_size_in_bytes",
|
||||
Help: "Total size of the underlying database physically allocated in bytes.",
|
||||
},
|
||||
func() float64 {
|
||||
reportDbTotalSizeInBytesMu.RLock()
|
||||
defer reportDbTotalSizeInBytesMu.RUnlock()
|
||||
return reportDbTotalSizeInBytes()
|
||||
},
|
||||
)
|
||||
// overridden by mvcc initialization
|
||||
reportDbTotalSizeInBytesMu sync.RWMutex
|
||||
reportDbTotalSizeInBytes = func() float64 { return 0 }
|
||||
|
||||
dbTotalSizeInUse = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "mvcc",
|
||||
Name: "db_total_size_in_use_in_bytes",
|
||||
Help: "Total size of the underlying database logically in use in bytes.",
|
||||
},
|
||||
func() float64 {
|
||||
reportDbTotalSizeInUseInBytesMu.RLock()
|
||||
defer reportDbTotalSizeInUseInBytesMu.RUnlock()
|
||||
return reportDbTotalSizeInUseInBytes()
|
||||
},
|
||||
)
|
||||
// overridden by mvcc initialization
|
||||
reportDbTotalSizeInUseInBytesMu sync.RWMutex
|
||||
reportDbTotalSizeInUseInBytes func() float64 = func() float64 { return 0 }
|
||||
|
||||
hashDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "mvcc",
|
||||
Name: "hash_duration_seconds",
|
||||
Help: "The latency distribution of storage hash operation.",
|
||||
|
||||
// 100 MB usually takes 100 ms, so start with 10 MB of 10 ms
|
||||
// lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
|
||||
// highest bucket start of 0.01 sec * 2^14 == 163.84 sec
|
||||
Buckets: prometheus.ExponentialBuckets(.01, 2, 15),
|
||||
})
|
||||
)
|
||||
|
||||
@@ -151,7 +204,10 @@ func init() {
|
||||
prometheus.MustRegister(indexCompactionPauseDurations)
|
||||
prometheus.MustRegister(dbCompactionPauseDurations)
|
||||
prometheus.MustRegister(dbCompactionTotalDurations)
|
||||
prometheus.MustRegister(dbTotalSizeDebugging)
|
||||
prometheus.MustRegister(dbTotalSize)
|
||||
prometheus.MustRegister(dbTotalSizeInUse)
|
||||
prometheus.MustRegister(hashDurations)
|
||||
}
|
||||
|
||||
// ReportEventReceived reports that an event is received.
|
||||
|
@@ -267,6 +267,7 @@ func (s *watchableStore) Restore(b backend.Backend) error {
|
||||
}
|
||||
|
||||
for wa := range s.synced.watchers {
|
||||
wa.restore = true
|
||||
s.unsynced.add(wa)
|
||||
}
|
||||
s.synced = newWatcherGroup()
|
||||
@@ -549,6 +550,14 @@ type watcher struct {
|
||||
// compacted is set when the watcher is removed because of compaction
|
||||
compacted bool
|
||||
|
||||
// restore is true when the watcher is being restored from leader snapshot
|
||||
// which means that this watcher has just been moved from "synced" to "unsynced"
|
||||
// watcher group, possibly with a future revision when it was first added
|
||||
// to the synced watcher
|
||||
// "unsynced" watcher revision must always be <= current revision,
|
||||
// except when the watcher were to be moved from "synced" watcher group
|
||||
restore bool
|
||||
|
||||
// minRev is the minimum revision update the watcher will accept
|
||||
minRev int64
|
||||
id WatchID
|
||||
|
@@ -336,6 +336,62 @@ func TestWatchRestore(t *testing.T) {
|
||||
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
|
||||
}
|
||||
|
||||
// TestWatchRestoreSyncedWatcher tests such a case that:
|
||||
// 1. watcher is created with a future revision "math.MaxInt64 - 2"
|
||||
// 2. watcher with a future revision is added to "synced" watcher group
|
||||
// 3. restore/overwrite storage with snapshot of a higher lasat revision
|
||||
// 4. restore operation moves "synced" to "unsynced" watcher group
|
||||
// 5. choose the watcher from step 1, without panic
|
||||
func TestWatchRestoreSyncedWatcher(t *testing.T) {
|
||||
b1, b1Path := backend.NewDefaultTmpBackend()
|
||||
s1 := newWatchableStore(b1, &lease.FakeLessor{}, nil)
|
||||
defer cleanup(s1, b1, b1Path)
|
||||
|
||||
b2, b2Path := backend.NewDefaultTmpBackend()
|
||||
s2 := newWatchableStore(b2, &lease.FakeLessor{}, nil)
|
||||
defer cleanup(s2, b2, b2Path)
|
||||
|
||||
testKey, testValue := []byte("foo"), []byte("bar")
|
||||
rev := s1.Put(testKey, testValue, lease.NoLease)
|
||||
startRev := rev + 2
|
||||
|
||||
// create a watcher with a future revision
|
||||
// add to "synced" watcher group (startRev > s.store.currentRev)
|
||||
w1 := s1.NewWatchStream()
|
||||
w1.Watch(testKey, nil, startRev)
|
||||
|
||||
// make "s2" ends up with a higher last revision
|
||||
s2.Put(testKey, testValue, lease.NoLease)
|
||||
s2.Put(testKey, testValue, lease.NoLease)
|
||||
|
||||
// overwrite storage with higher revisions
|
||||
if err := s1.Restore(b2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// wait for next "syncWatchersLoop" iteration
|
||||
// and the unsynced watcher should be chosen
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// trigger events for "startRev"
|
||||
s1.Put(testKey, testValue, lease.NoLease)
|
||||
|
||||
select {
|
||||
case resp := <-w1.Chan():
|
||||
if resp.Revision != startRev {
|
||||
t.Fatalf("resp.Revision expect %d, got %d", startRev, resp.Revision)
|
||||
}
|
||||
if len(resp.Events) != 1 {
|
||||
t.Fatalf("len(resp.Events) expect 1, got %d", len(resp.Events))
|
||||
}
|
||||
if resp.Events[0].Kv.ModRevision != startRev {
|
||||
t.Fatalf("resp.Events[0].Kv.ModRevision expect %d, got %d", startRev, resp.Events[0].Kv.ModRevision)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("failed to receive event in 1 second")
|
||||
}
|
||||
}
|
||||
|
||||
// TestWatchBatchUnsynced tests batching on unsynced watchers
|
||||
func TestWatchBatchUnsynced(t *testing.T) {
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
|
@@ -15,6 +15,7 @@
|
||||
package mvcc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||
@@ -238,7 +239,15 @@ func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
|
||||
minRev := int64(math.MaxInt64)
|
||||
for w := range wg.watchers {
|
||||
if w.minRev > curRev {
|
||||
panic("watcher current revision should not exceed current revision")
|
||||
// after network partition, possibly choosing future revision watcher from restore operation
|
||||
// with watch key "proxy-namespace__lostleader" and revision "math.MaxInt64 - 2"
|
||||
// do not panic when such watcher had been moved from "synced" watcher during restore operation
|
||||
if !w.restore {
|
||||
panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev))
|
||||
}
|
||||
|
||||
// mark 'restore' done, since it's chosen
|
||||
w.restore = false
|
||||
}
|
||||
if w.minRev < compactRev {
|
||||
select {
|
||||
|
@@ -21,6 +21,7 @@ import (
|
||||
"net/http"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
pioutil "github.com/coreos/etcd/pkg/ioutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
@@ -153,6 +154,8 @@ func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, c
|
||||
}
|
||||
}
|
||||
|
||||
const unknownSnapshotSender = "UNKNOWN_SNAPSHOT_SENDER"
|
||||
|
||||
// ServeHTTP serves HTTP request to receive and process snapshot message.
|
||||
//
|
||||
// If request sender dies without closing underlying TCP connection,
|
||||
@@ -163,9 +166,12 @@ func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, c
|
||||
// received and processed.
|
||||
// 2. this case should happen rarely, so no further optimization is done.
|
||||
func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
start := time.Now()
|
||||
|
||||
if r.Method != "POST" {
|
||||
w.Header().Set("Allow", "POST")
|
||||
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
|
||||
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -173,6 +179,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusPreconditionFailed)
|
||||
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -185,19 +192,22 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
dec := &messageDecoder{r: r.Body}
|
||||
// let snapshots be very large since they can exceed 512MB for large installations
|
||||
m, err := dec.decodeLimit(uint64(1 << 63))
|
||||
from := types.ID(m.From).String()
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("failed to decode raft message (%v)", err)
|
||||
plog.Errorf(msg)
|
||||
http.Error(w, msg, http.StatusBadRequest)
|
||||
recvFailures.WithLabelValues(r.RemoteAddr).Inc()
|
||||
snapshotReceiveFailures.WithLabelValues(from).Inc()
|
||||
return
|
||||
}
|
||||
|
||||
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
|
||||
receivedBytes.WithLabelValues(from).Add(float64(m.Size()))
|
||||
|
||||
if m.Type != raftpb.MsgSnap {
|
||||
plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
|
||||
http.Error(w, "wrong raft message type", http.StatusBadRequest)
|
||||
snapshotReceiveFailures.WithLabelValues(from).Inc()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -208,9 +218,10 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
|
||||
plog.Error(msg)
|
||||
http.Error(w, msg, http.StatusInternalServerError)
|
||||
snapshotReceiveFailures.WithLabelValues(from).Inc()
|
||||
return
|
||||
}
|
||||
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(n))
|
||||
receivedBytes.WithLabelValues(from).Add(float64(n))
|
||||
plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
|
||||
|
||||
if err := h.r.Process(context.TODO(), m); err != nil {
|
||||
@@ -223,12 +234,16 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
msg := fmt.Sprintf("failed to process raft message (%v)", err)
|
||||
plog.Warningf(msg)
|
||||
http.Error(w, msg, http.StatusInternalServerError)
|
||||
snapshotReceiveFailures.WithLabelValues(from).Inc()
|
||||
}
|
||||
return
|
||||
}
|
||||
// Write StatusNoContent header after the message has been processed by
|
||||
// raft, which facilitates the client to report MsgSnap status.
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
|
||||
snapshotReceive.WithLabelValues(from).Inc()
|
||||
snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds())
|
||||
}
|
||||
|
||||
type streamHandler struct {
|
||||
|
@@ -53,6 +53,68 @@ var (
|
||||
[]string{"From"},
|
||||
)
|
||||
|
||||
snapshotSend = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "network",
|
||||
Name: "snapshot_send_success",
|
||||
Help: "Total number of successful snapshot sends",
|
||||
},
|
||||
[]string{"To"},
|
||||
)
|
||||
|
||||
snapshotSendFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "network",
|
||||
Name: "snapshot_send_failures",
|
||||
Help: "Total number of snapshot send failures",
|
||||
},
|
||||
[]string{"To"},
|
||||
)
|
||||
|
||||
snapshotSendSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "network",
|
||||
Name: "snapshot_send_total_duration_seconds",
|
||||
Help: "Total latency distributions of v3 snapshot sends",
|
||||
|
||||
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
|
||||
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
|
||||
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
|
||||
},
|
||||
[]string{"To"},
|
||||
)
|
||||
|
||||
snapshotReceive = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "network",
|
||||
Name: "snapshot_receive_success",
|
||||
Help: "Total number of successful snapshot receives",
|
||||
},
|
||||
[]string{"From"},
|
||||
)
|
||||
|
||||
snapshotReceiveFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "network",
|
||||
Name: "snapshot_receive_failures",
|
||||
Help: "Total number of snapshot receive failures",
|
||||
},
|
||||
[]string{"From"},
|
||||
)
|
||||
|
||||
snapshotReceiveSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "network",
|
||||
Name: "snapshot_receive_total_duration_seconds",
|
||||
Help: "Total latency distributions of v3 snapshot receives",
|
||||
|
||||
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
|
||||
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
|
||||
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
|
||||
},
|
||||
[]string{"From"},
|
||||
)
|
||||
|
||||
rtts = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "network",
|
||||
@@ -69,5 +131,13 @@ func init() {
|
||||
prometheus.MustRegister(receivedBytes)
|
||||
prometheus.MustRegister(sentFailures)
|
||||
prometheus.MustRegister(recvFailures)
|
||||
|
||||
prometheus.MustRegister(snapshotSend)
|
||||
prometheus.MustRegister(snapshotSendFailures)
|
||||
prometheus.MustRegister(snapshotSendSeconds)
|
||||
prometheus.MustRegister(snapshotReceive)
|
||||
prometheus.MustRegister(snapshotReceiveFailures)
|
||||
prometheus.MustRegister(snapshotReceiveSeconds)
|
||||
|
||||
prometheus.MustRegister(rtts)
|
||||
}
|
||||
|
@@ -56,7 +56,7 @@ func (s *peerStatus) deactivate(failure failureType, reason string) {
|
||||
msg := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason)
|
||||
if s.active {
|
||||
plog.Errorf(msg)
|
||||
plog.Infof("peer %s became inactive", s.id)
|
||||
plog.Infof("peer %s became inactive (message send to peer failed)", s.id)
|
||||
s.active = false
|
||||
s.since = time.Time{}
|
||||
return
|
||||
|
@@ -17,6 +17,7 @@ package rafthttp
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/xiang90/probing"
|
||||
)
|
||||
|
||||
@@ -28,7 +29,15 @@ var (
|
||||
statusErrorInterval = 5 * time.Second
|
||||
)
|
||||
|
||||
func addPeerToProber(p probing.Prober, id string, us []string) {
|
||||
const (
|
||||
// RoundTripperNameRaftMessage is the name of round-tripper that sends
|
||||
// all other Raft messages, other than "snap.Message".
|
||||
RoundTripperNameRaftMessage = "ROUND_TRIPPER_RAFT_MESSAGE"
|
||||
// RoundTripperNameSnapshot is the name of round-tripper that sends merged snapshot message.
|
||||
RoundTripperNameSnapshot = "ROUND_TRIPPER_SNAPSHOT"
|
||||
)
|
||||
|
||||
func addPeerToProber(p probing.Prober, id string, us []string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
|
||||
hus := make([]string, len(us))
|
||||
for i := range us {
|
||||
hus[i] = us[i] + ProbingPrefix
|
||||
@@ -40,26 +49,26 @@ func addPeerToProber(p probing.Prober, id string, us []string) {
|
||||
if err != nil {
|
||||
plog.Errorf("failed to add peer %s into prober", id)
|
||||
} else {
|
||||
go monitorProbingStatus(s, id)
|
||||
go monitorProbingStatus(s, id, roundTripperName, rttSecProm)
|
||||
}
|
||||
}
|
||||
|
||||
func monitorProbingStatus(s probing.Status, id string) {
|
||||
func monitorProbingStatus(s probing.Status, id string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
|
||||
// set the first interval short to log error early.
|
||||
interval := statusErrorInterval
|
||||
for {
|
||||
select {
|
||||
case <-time.After(interval):
|
||||
if !s.Health() {
|
||||
plog.Warningf("health check for peer %s could not connect: %v", id, s.Err())
|
||||
plog.Warningf("health check for peer %s could not connect: %v (prober %q)", id, s.Err(), roundTripperName)
|
||||
interval = statusErrorInterval
|
||||
} else {
|
||||
interval = statusMonitoringInterval
|
||||
}
|
||||
if s.ClockDiff() > time.Second {
|
||||
plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second)
|
||||
plog.Warningf("the clock difference against peer %s is too high [%v > %v] (prober %q)", id, s.ClockDiff(), time.Second, roundTripperName)
|
||||
}
|
||||
rtts.WithLabelValues(id).Observe(s.SRTT().Seconds())
|
||||
rttSecProm.WithLabelValues(id).Observe(s.SRTT().Seconds())
|
||||
case <-s.StopNotify():
|
||||
return
|
||||
}
|
||||
|
@@ -63,7 +63,10 @@ func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *pe
|
||||
func (s *snapshotSender) stop() { close(s.stopc) }
|
||||
|
||||
func (s *snapshotSender) send(merged snap.Message) {
|
||||
start := time.Now()
|
||||
|
||||
m := merged.Message
|
||||
to := types.ID(m.To).String()
|
||||
|
||||
body := createSnapBody(merged)
|
||||
defer body.Close()
|
||||
@@ -91,14 +94,18 @@ func (s *snapshotSender) send(merged snap.Message) {
|
||||
// machine knows about it, it would pause a while and retry sending
|
||||
// new snapshot message.
|
||||
s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
|
||||
sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
|
||||
sentFailures.WithLabelValues(to).Inc()
|
||||
snapshotSendFailures.WithLabelValues(to).Inc()
|
||||
return
|
||||
}
|
||||
s.status.activate()
|
||||
s.r.ReportSnapshot(m.To, raft.SnapshotFinish)
|
||||
plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
|
||||
|
||||
sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(merged.TotalSize))
|
||||
sentBytes.WithLabelValues(to).Add(float64(merged.TotalSize))
|
||||
|
||||
snapshotSend.WithLabelValues(to).Inc()
|
||||
snapshotSendSeconds.WithLabelValues(to).Observe(time.Since(start).Seconds())
|
||||
}
|
||||
|
||||
// post posts the given request.
|
||||
|
@@ -26,6 +26,7 @@ import (
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/snap"
|
||||
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
"github.com/xiang90/probing"
|
||||
"golang.org/x/net/context"
|
||||
@@ -121,7 +122,8 @@ type Transport struct {
|
||||
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
|
||||
peers map[types.ID]Peer // peers map
|
||||
|
||||
prober probing.Prober
|
||||
pipelineProber probing.Prober
|
||||
streamProber probing.Prober
|
||||
}
|
||||
|
||||
func (t *Transport) Start() error {
|
||||
@@ -136,7 +138,8 @@ func (t *Transport) Start() error {
|
||||
}
|
||||
t.remotes = make(map[types.ID]*remote)
|
||||
t.peers = make(map[types.ID]Peer)
|
||||
t.prober = probing.NewProber(t.pipelineRt)
|
||||
t.pipelineProber = probing.NewProber(t.pipelineRt)
|
||||
t.streamProber = probing.NewProber(t.streamRt)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -197,7 +200,8 @@ func (t *Transport) Stop() {
|
||||
for _, p := range t.peers {
|
||||
p.stop()
|
||||
}
|
||||
t.prober.RemoveAll()
|
||||
t.pipelineProber.RemoveAll()
|
||||
t.streamProber.RemoveAll()
|
||||
if tr, ok := t.streamRt.(*http.Transport); ok {
|
||||
tr.CloseIdleConnections()
|
||||
}
|
||||
@@ -276,8 +280,8 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
|
||||
}
|
||||
fs := t.LeaderStats.Follower(id.String())
|
||||
t.peers[id] = startPeer(t, urls, id, fs)
|
||||
addPeerToProber(t.prober, id.String(), us)
|
||||
|
||||
addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts)
|
||||
addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts)
|
||||
plog.Infof("added peer %s", id)
|
||||
}
|
||||
|
||||
@@ -304,7 +308,8 @@ func (t *Transport) removePeer(id types.ID) {
|
||||
}
|
||||
delete(t.peers, id)
|
||||
delete(t.LeaderStats.Followers, id.String())
|
||||
t.prober.Remove(id.String())
|
||||
t.pipelineProber.Remove(id.String())
|
||||
t.streamProber.Remove(id.String())
|
||||
plog.Infof("removed peer %s", id)
|
||||
}
|
||||
|
||||
@@ -321,8 +326,10 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) {
|
||||
}
|
||||
t.peers[id].update(urls)
|
||||
|
||||
t.prober.Remove(id.String())
|
||||
addPeerToProber(t.prober, id.String(), us)
|
||||
t.pipelineProber.Remove(id.String())
|
||||
addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts)
|
||||
t.streamProber.Remove(id.String())
|
||||
addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts)
|
||||
plog.Infof("updated peer %s", id)
|
||||
}
|
||||
|
||||
|
@@ -35,8 +35,10 @@ func TestTransportSend(t *testing.T) {
|
||||
peer1 := newFakePeer()
|
||||
peer2 := newFakePeer()
|
||||
tr := &Transport{
|
||||
ServerStats: ss,
|
||||
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
|
||||
ServerStats: ss,
|
||||
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
|
||||
pipelineProber: probing.NewProber(nil),
|
||||
streamProber: probing.NewProber(nil),
|
||||
}
|
||||
wmsgsIgnored := []raftpb.Message{
|
||||
// bad local message
|
||||
@@ -72,8 +74,10 @@ func TestTransportCutMend(t *testing.T) {
|
||||
peer1 := newFakePeer()
|
||||
peer2 := newFakePeer()
|
||||
tr := &Transport{
|
||||
ServerStats: ss,
|
||||
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
|
||||
ServerStats: ss,
|
||||
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
|
||||
pipelineProber: probing.NewProber(nil),
|
||||
streamProber: probing.NewProber(nil),
|
||||
}
|
||||
|
||||
tr.CutPeer(types.ID(1))
|
||||
@@ -100,10 +104,11 @@ func TestTransportCutMend(t *testing.T) {
|
||||
func TestTransportAdd(t *testing.T) {
|
||||
ls := stats.NewLeaderStats("")
|
||||
tr := &Transport{
|
||||
LeaderStats: ls,
|
||||
streamRt: &roundTripperRecorder{},
|
||||
peers: make(map[types.ID]Peer),
|
||||
prober: probing.NewProber(nil),
|
||||
LeaderStats: ls,
|
||||
streamRt: &roundTripperRecorder{},
|
||||
peers: make(map[types.ID]Peer),
|
||||
pipelineProber: probing.NewProber(nil),
|
||||
streamProber: probing.NewProber(nil),
|
||||
}
|
||||
tr.AddPeer(1, []string{"http://localhost:2380"})
|
||||
|
||||
@@ -128,10 +133,11 @@ func TestTransportAdd(t *testing.T) {
|
||||
|
||||
func TestTransportRemove(t *testing.T) {
|
||||
tr := &Transport{
|
||||
LeaderStats: stats.NewLeaderStats(""),
|
||||
streamRt: &roundTripperRecorder{},
|
||||
peers: make(map[types.ID]Peer),
|
||||
prober: probing.NewProber(nil),
|
||||
LeaderStats: stats.NewLeaderStats(""),
|
||||
streamRt: &roundTripperRecorder{},
|
||||
peers: make(map[types.ID]Peer),
|
||||
pipelineProber: probing.NewProber(nil),
|
||||
streamProber: probing.NewProber(nil),
|
||||
}
|
||||
tr.AddPeer(1, []string{"http://localhost:2380"})
|
||||
tr.RemovePeer(types.ID(1))
|
||||
@@ -145,8 +151,9 @@ func TestTransportRemove(t *testing.T) {
|
||||
func TestTransportUpdate(t *testing.T) {
|
||||
peer := newFakePeer()
|
||||
tr := &Transport{
|
||||
peers: map[types.ID]Peer{types.ID(1): peer},
|
||||
prober: probing.NewProber(nil),
|
||||
peers: map[types.ID]Peer{types.ID(1): peer},
|
||||
pipelineProber: probing.NewProber(nil),
|
||||
streamProber: probing.NewProber(nil),
|
||||
}
|
||||
u := "http://localhost:2380"
|
||||
tr.UpdatePeer(types.ID(1), []string{u})
|
||||
@@ -159,13 +166,14 @@ func TestTransportUpdate(t *testing.T) {
|
||||
func TestTransportErrorc(t *testing.T) {
|
||||
errorc := make(chan error, 1)
|
||||
tr := &Transport{
|
||||
Raft: &fakeRaft{},
|
||||
LeaderStats: stats.NewLeaderStats(""),
|
||||
ErrorC: errorc,
|
||||
streamRt: newRespRoundTripper(http.StatusForbidden, nil),
|
||||
pipelineRt: newRespRoundTripper(http.StatusForbidden, nil),
|
||||
peers: make(map[types.ID]Peer),
|
||||
prober: probing.NewProber(nil),
|
||||
Raft: &fakeRaft{},
|
||||
LeaderStats: stats.NewLeaderStats(""),
|
||||
ErrorC: errorc,
|
||||
streamRt: newRespRoundTripper(http.StatusForbidden, nil),
|
||||
pipelineRt: newRespRoundTripper(http.StatusForbidden, nil),
|
||||
peers: make(map[types.ID]Peer),
|
||||
pipelineProber: probing.NewProber(nil),
|
||||
streamProber: probing.NewProber(nil),
|
||||
}
|
||||
tr.AddPeer(1, []string{"http://localhost:2380"})
|
||||
defer tr.Stop()
|
||||
|
@@ -20,6 +20,7 @@ import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
)
|
||||
@@ -27,6 +28,8 @@ import (
|
||||
// SaveDBFrom saves snapshot of the database from the given reader. It
|
||||
// guarantees the save operation is atomic.
|
||||
func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
|
||||
start := time.Now()
|
||||
|
||||
f, err := ioutil.TempFile(s.dir, "tmp")
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@@ -34,7 +37,9 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
|
||||
var n int64
|
||||
n, err = io.Copy(f, r)
|
||||
if err == nil {
|
||||
fsyncStart := time.Now()
|
||||
err = fileutil.Fsync(f)
|
||||
snapDBFsyncSec.Observe(time.Since(fsyncStart).Seconds())
|
||||
}
|
||||
f.Close()
|
||||
if err != nil {
|
||||
@@ -54,6 +59,7 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
|
||||
|
||||
plog.Infof("saved database snapshot to disk [total bytes: %d]", n)
|
||||
|
||||
snapDBSaveSec.Observe(time.Since(start).Seconds())
|
||||
return n, nil
|
||||
}
|
||||
|
||||
|
@@ -33,9 +33,33 @@ var (
|
||||
Help: "The marshalling cost distributions of save called by snapshot.",
|
||||
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
|
||||
})
|
||||
|
||||
snapDBSaveSec = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "snap_db",
|
||||
Name: "save_total_duration_seconds",
|
||||
Help: "The total latency distributions of v3 snapshot save",
|
||||
|
||||
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
|
||||
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
|
||||
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
|
||||
})
|
||||
|
||||
snapDBFsyncSec = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "snap_db",
|
||||
Name: "fsync_duration_seconds",
|
||||
Help: "The latency distributions of fsyncing .snap.db file",
|
||||
|
||||
// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
|
||||
// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
|
||||
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
|
||||
})
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(saveDurations)
|
||||
prometheus.MustRegister(marshallingDurations)
|
||||
prometheus.MustRegister(snapDBSaveSec)
|
||||
prometheus.MustRegister(snapDBFsyncSec)
|
||||
}
|
||||
|
@@ -5,17 +5,13 @@ if ! [[ "$0" =~ "tests/semaphore.test.bash" ]]; then
|
||||
exit 255
|
||||
fi
|
||||
|
||||
TEST_SUFFIX=$(date +%s | base64 | head -c 15)
|
||||
<<COMMENT
|
||||
# amd64-e2e
|
||||
tests/semaphore.test.bash
|
||||
sudo HOST_TMP_DIR=/tmp TEST_OPTS="PASSES='build release e2e' MANUAL_VER=v3.3.7" make docker-test
|
||||
|
||||
TEST_OPTS="PASSES='build unit release integration_e2e functional' MANUAL_VER=v3.1.12"
|
||||
if [ "$TEST_ARCH" == "386" ]; then
|
||||
TEST_OPTS="GOARCH=386 PASSES='build unit integration_e2e'"
|
||||
fi
|
||||
# 386-e2e
|
||||
sudo HOST_TMP_DIR=/tmp TEST_OPTS="GOARCH=386 PASSES='build e2e'" make docker-test
|
||||
COMMENT
|
||||
|
||||
docker run \
|
||||
--rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd \
|
||||
gcr.io/etcd-development/etcd-test:go1.8.7 \
|
||||
/bin/bash -c "${TEST_OPTS} ./test 2>&1 | tee test-${TEST_SUFFIX}.log"
|
||||
|
||||
! egrep "(--- FAIL:|panic: test timed out|appears to have leaked)" -B50 -A10 test-${TEST_SUFFIX}.log
|
||||
sudo HOST_TMP_DIR=/tmp TEST_OPTS="PASSES='build release e2e' MANUAL_VER=v3.1.18" make docker-test
|
||||
|
@@ -26,7 +26,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "3.0.0"
|
||||
Version = "3.1.15"
|
||||
Version = "3.1.20"
|
||||
APIVersion = "unknown"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
|
Reference in New Issue
Block a user