Compare commits
63 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
992dbd4d1e | ||
![]() |
b39c0f9471 | ||
![]() |
3381ef1602 | ||
![]() |
c096dc2cc5 | ||
![]() |
3e99b42612 | ||
![]() |
cf7be488a9 | ||
![]() |
65fff06adc | ||
![]() |
87b4e08c29 | ||
![]() |
216be8b79b | ||
![]() |
dfcf82b6ff | ||
![]() |
9197907515 | ||
![]() |
14883cad78 | ||
![]() |
4e7691ddcc | ||
![]() |
8a68ae95ec | ||
![]() |
ef1d332298 | ||
![]() |
116c442615 | ||
![]() |
e07fb41140 | ||
![]() |
2c616b0c3d | ||
![]() |
dd2803c4a6 | ||
![]() |
4855ca62b5 | ||
![]() |
bb205caa68 | ||
![]() |
a1d6802da2 | ||
![]() |
79d80bd259 | ||
![]() |
081519c323 | ||
![]() |
e0d5a028d5 | ||
![]() |
a421a604d6 | ||
![]() |
0fbf49df11 | ||
![]() |
fb5080b306 | ||
![]() |
cac6ce756d | ||
![]() |
9f58e57a3c | ||
![]() |
22c25dd4e7 | ||
![]() |
92a7b5df80 | ||
![]() |
3f1fe618ad | ||
![]() |
b8547734ae | ||
![]() |
78a13e67a0 | ||
![]() |
84d11a51c1 | ||
![]() |
a9c4b98756 | ||
![]() |
5531e3b0f5 | ||
![]() |
c2623bb840 | ||
![]() |
f46b4677c0 | ||
![]() |
09843d5d90 | ||
![]() |
be3e6f6ed5 | ||
![]() |
d84dd18637 | ||
![]() |
b7ff47f9d5 | ||
![]() |
fab24fbdab | ||
![]() |
b3ee996629 | ||
![]() |
06da6cf983 | ||
![]() |
9c00100550 | ||
![]() |
1d7a2ca520 | ||
![]() |
e90934ec71 | ||
![]() |
23c5c71426 | ||
![]() |
72a2483d42 | ||
![]() |
53eae781fe | ||
![]() |
7b1b7def84 | ||
![]() |
df000fd776 | ||
![]() |
fd61be4814 | ||
![]() |
781cc0be83 | ||
![]() |
ebe351e3b4 | ||
![]() |
e31510975a | ||
![]() |
43b0cafcb6 | ||
![]() |
169af4470e | ||
![]() |
c4c487eaca | ||
![]() |
6bb88b9617 |
66
.travis.yml
66
.travis.yml
@@ -14,20 +14,20 @@ notifications:
|
|||||||
|
|
||||||
env:
|
env:
|
||||||
matrix:
|
matrix:
|
||||||
- TARGET=linux-amd64-build
|
|
||||||
- TARGET=linux-amd64-unit
|
|
||||||
- TARGET=linux-amd64-integration
|
- TARGET=linux-amd64-integration
|
||||||
- TARGET=linux-amd64-functional
|
- TARGET=linux-amd64-functional
|
||||||
- TARGET=linux-386-build
|
- TARGET=linux-amd64-unit
|
||||||
|
- TARGET=all-build
|
||||||
- TARGET=linux-386-unit
|
- TARGET=linux-386-unit
|
||||||
- TARGET=darwin-amd64-build
|
|
||||||
- TARGET=windows-amd64-build
|
|
||||||
- TARGET=linux-arm-build
|
|
||||||
- TARGET=linux-arm64-build
|
|
||||||
- TARGET=linux-ppc64le-build
|
|
||||||
|
|
||||||
matrix:
|
matrix:
|
||||||
fast_finish: true
|
fast_finish: true
|
||||||
|
allow_failures:
|
||||||
|
- go: 1.8.7
|
||||||
|
env: TARGET=linux-386-unit
|
||||||
|
exclude:
|
||||||
|
- go: tip
|
||||||
|
env: TARGET=linux-386-unit
|
||||||
|
|
||||||
before_install:
|
before_install:
|
||||||
- if [[ $TRAVIS_GO_VERSION == 1.* ]]; then docker pull gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION}; fi
|
- if [[ $TRAVIS_GO_VERSION == 1.* ]]; then docker pull gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION}; fi
|
||||||
@@ -39,16 +39,6 @@ script:
|
|||||||
- echo "TRAVIS_GO_VERSION=${TRAVIS_GO_VERSION}"
|
- echo "TRAVIS_GO_VERSION=${TRAVIS_GO_VERSION}"
|
||||||
- >
|
- >
|
||||||
case "${TARGET}" in
|
case "${TARGET}" in
|
||||||
linux-amd64-build)
|
|
||||||
docker run --rm \
|
|
||||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
|
||||||
/bin/bash -c "GOARCH=amd64 PASSES='build' ./test"
|
|
||||||
;;
|
|
||||||
linux-amd64-unit)
|
|
||||||
docker run --rm \
|
|
||||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
|
||||||
/bin/bash -c "GOARCH=amd64 PASSES='unit' ./test"
|
|
||||||
;;
|
|
||||||
linux-amd64-integration)
|
linux-amd64-integration)
|
||||||
docker run --rm \
|
docker run --rm \
|
||||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||||
@@ -59,39 +49,25 @@ script:
|
|||||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||||
/bin/bash -c "./build && GOARCH=amd64 PASSES='functional' ./test"
|
/bin/bash -c "./build && GOARCH=amd64 PASSES='functional' ./test"
|
||||||
;;
|
;;
|
||||||
linux-386-build)
|
linux-amd64-unit)
|
||||||
docker run --rm \
|
docker run --rm \
|
||||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||||
/bin/bash -c "GOARCH=386 PASSES='build' ./test"
|
/bin/bash -c "GOARCH=amd64 PASSES='unit' ./test"
|
||||||
|
;;
|
||||||
|
all-build)
|
||||||
|
docker run --rm \
|
||||||
|
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||||
|
/bin/bash -c "GOARCH=amd64 PASSES='build' ./test \
|
||||||
|
&& GOARCH=386 PASSES='build' ./test \
|
||||||
|
&& GO_BUILD_FLAGS='-v' GOOS=darwin GOARCH=amd64 ./build \
|
||||||
|
&& GO_BUILD_FLAGS='-v' GOOS=windows GOARCH=amd64 ./build \
|
||||||
|
&& GO_BUILD_FLAGS='-v' GOARCH=arm ./build \
|
||||||
|
&& GO_BUILD_FLAGS='-v' GOARCH=arm64 ./build \
|
||||||
|
&& GO_BUILD_FLAGS='-v' GOARCH=ppc64le ./build"
|
||||||
;;
|
;;
|
||||||
linux-386-unit)
|
linux-386-unit)
|
||||||
docker run --rm \
|
docker run --rm \
|
||||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
||||||
/bin/bash -c "GOARCH=386 PASSES='unit' ./test"
|
/bin/bash -c "GOARCH=386 PASSES='unit' ./test"
|
||||||
;;
|
;;
|
||||||
darwin-amd64-build)
|
|
||||||
docker run --rm \
|
|
||||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
|
||||||
/bin/bash -c "GO_BUILD_FLAGS='-v' GOOS=darwin GOARCH=amd64 ./build"
|
|
||||||
;;
|
|
||||||
windows-amd64-build)
|
|
||||||
docker run --rm \
|
|
||||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
|
||||||
/bin/bash -c "GO_BUILD_FLAGS='-v' GOOS=windows GOARCH=amd64 ./build"
|
|
||||||
;;
|
|
||||||
linux-arm-build)
|
|
||||||
docker run --rm \
|
|
||||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
|
||||||
/bin/bash -c "GO_BUILD_FLAGS='-v' GOARCH=arm ./build"
|
|
||||||
;;
|
|
||||||
linux-arm64-build)
|
|
||||||
docker run --rm \
|
|
||||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
|
||||||
/bin/bash -c "GO_BUILD_FLAGS='-v' GOARCH=arm64 ./build"
|
|
||||||
;;
|
|
||||||
linux-ppc64le-build)
|
|
||||||
docker run --rm \
|
|
||||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
|
|
||||||
/bin/bash -c "GO_BUILD_FLAGS='-v' GOARCH=ppc64le ./build"
|
|
||||||
;;
|
|
||||||
esac
|
esac
|
||||||
|
17
Makefile
17
Makefile
@@ -20,12 +20,16 @@ clean:
|
|||||||
rm -f ./codecov
|
rm -f ./codecov
|
||||||
rm -rf ./agent-*
|
rm -rf ./agent-*
|
||||||
rm -rf ./covdir
|
rm -rf ./covdir
|
||||||
|
rm -f ./*.coverprofile
|
||||||
rm -f ./*.log
|
rm -f ./*.log
|
||||||
rm -f ./bin/Dockerfile-release
|
rm -f ./bin/Dockerfile-release
|
||||||
rm -rf ./bin/*.etcd
|
rm -rf ./bin/*.etcd
|
||||||
|
rm -rf ./default.etcd
|
||||||
|
rm -rf ./tests/e2e/default.etcd
|
||||||
rm -rf ./gopath
|
rm -rf ./gopath
|
||||||
rm -rf ./gopath.proto
|
rm -rf ./gopath.proto
|
||||||
rm -rf ./release
|
rm -rf ./release
|
||||||
|
rm -f ./snapshot/localhost:*
|
||||||
rm -f ./integration/127.0.0.1:* ./integration/localhost:*
|
rm -f ./integration/127.0.0.1:* ./integration/localhost:*
|
||||||
rm -f ./clientv3/integration/127.0.0.1:* ./clientv3/integration/localhost:*
|
rm -f ./clientv3/integration/127.0.0.1:* ./clientv3/integration/localhost:*
|
||||||
rm -f ./clientv3/ordering/127.0.0.1:* ./clientv3/ordering/localhost:*
|
rm -f ./clientv3/ordering/127.0.0.1:* ./clientv3/ordering/localhost:*
|
||||||
@@ -46,7 +50,8 @@ docker-remove:
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
GO_VERSION ?= 1.10.1
|
# GO_VERSION ?= 1.10.3
|
||||||
|
GO_VERSION ?= 1.8.7
|
||||||
ETCD_VERSION ?= $(shell git rev-parse --short HEAD || echo "GitNotFound")
|
ETCD_VERSION ?= $(shell git rev-parse --short HEAD || echo "GitNotFound")
|
||||||
|
|
||||||
TEST_SUFFIX = $(shell date +%s | base64 | head -c 15)
|
TEST_SUFFIX = $(shell date +%s | base64 | head -c 15)
|
||||||
@@ -61,16 +66,16 @@ endif
|
|||||||
|
|
||||||
# Example:
|
# Example:
|
||||||
# GO_VERSION=1.8.7 make build-docker-test
|
# GO_VERSION=1.8.7 make build-docker-test
|
||||||
# GO_VERSION=1.9.5 make build-docker-test
|
# GO_VERSION=1.9.7 make build-docker-test
|
||||||
# make build-docker-test
|
# make build-docker-test
|
||||||
#
|
#
|
||||||
# gcloud docker -- login -u _json_key -p "$(cat /etc/gcp-key-etcd-development.json)" https://gcr.io
|
# gcloud docker -- login -u _json_key -p "$(cat /etc/gcp-key-etcd-development.json)" https://gcr.io
|
||||||
# GO_VERSION=1.8.7 make push-docker-test
|
# GO_VERSION=1.8.7 make push-docker-test
|
||||||
# GO_VERSION=1.9.5 make push-docker-test
|
# GO_VERSION=1.9.7 make push-docker-test
|
||||||
# make push-docker-test
|
# make push-docker-test
|
||||||
#
|
#
|
||||||
# gsutil -m acl ch -u allUsers:R -r gs://artifacts.etcd-development.appspot.com
|
# gsutil -m acl ch -u allUsers:R -r gs://artifacts.etcd-development.appspot.com
|
||||||
# GO_VERSION=1.9.5 make pull-docker-test
|
# GO_VERSION=1.9.7 make pull-docker-test
|
||||||
# make pull-docker-test
|
# make pull-docker-test
|
||||||
|
|
||||||
build-docker-test:
|
build-docker-test:
|
||||||
@@ -118,7 +123,7 @@ compile-setup-gopath-with-docker-test:
|
|||||||
#
|
#
|
||||||
# Local machine:
|
# Local machine:
|
||||||
# TEST_OPTS="PASSES='fmt'" make test
|
# TEST_OPTS="PASSES='fmt'" make test
|
||||||
# TEST_OPTS="PASSES='fmt bom dep compile build unit'" make test
|
# TEST_OPTS="PASSES='fmt bom dep build unit'" make test
|
||||||
# TEST_OPTS="PASSES='build unit release integration_e2e functional'" make test
|
# TEST_OPTS="PASSES='build unit release integration_e2e functional'" make test
|
||||||
# TEST_OPTS="PASSES='build grpcproxy'" make test
|
# TEST_OPTS="PASSES='build grpcproxy'" make test
|
||||||
#
|
#
|
||||||
@@ -128,7 +133,7 @@ compile-setup-gopath-with-docker-test:
|
|||||||
# TEST_OPTS="VERBOSE=2 PASSES='unit'" make docker-test
|
# TEST_OPTS="VERBOSE=2 PASSES='unit'" make docker-test
|
||||||
#
|
#
|
||||||
# Travis CI (test with docker):
|
# Travis CI (test with docker):
|
||||||
# TEST_OPTS="PASSES='fmt bom dep compile build unit'" make docker-test
|
# TEST_OPTS="PASSES='fmt bom dep build unit'" make docker-test
|
||||||
#
|
#
|
||||||
# Semaphore CI (test with docker):
|
# Semaphore CI (test with docker):
|
||||||
# TEST_OPTS="PASSES='build unit release integration_e2e functional'" make docker-test
|
# TEST_OPTS="PASSES='build unit release integration_e2e functional'" make docker-test
|
||||||
|
@@ -58,11 +58,9 @@ func (m *Mutex) Lock(ctx context.Context) error {
|
|||||||
|
|
||||||
// wait for deletion revisions prior to myKey
|
// wait for deletion revisions prior to myKey
|
||||||
err = waitDeletes(ctx, client, m.pfx, m.myRev-1)
|
err = waitDeletes(ctx, client, m.pfx, m.myRev-1)
|
||||||
// release lock key if cancelled
|
// release lock key if wait failed
|
||||||
select {
|
if err != nil {
|
||||||
case <-ctx.Done():
|
|
||||||
m.Unlock(client.Ctx())
|
m.Unlock(client.Ctx())
|
||||||
default:
|
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@@ -308,6 +308,49 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestLeaseKeepAliveFullResponseQueue ensures when response
|
||||||
|
// queue is full thus dropping keepalive response sends,
|
||||||
|
// keepalive request is sent with the same rate of TTL / 3.
|
||||||
|
func TestLeaseKeepAliveFullResponseQueue(t *testing.T) {
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
|
|
||||||
|
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||||
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
|
lapi := clus.Client(0)
|
||||||
|
|
||||||
|
// expect lease keepalive every 10-second
|
||||||
|
lresp, err := lapi.Grant(context.Background(), 30)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create lease %v", err)
|
||||||
|
}
|
||||||
|
id := lresp.ID
|
||||||
|
|
||||||
|
old := clientv3.LeaseResponseChSize
|
||||||
|
defer func() {
|
||||||
|
clientv3.LeaseResponseChSize = old
|
||||||
|
}()
|
||||||
|
clientv3.LeaseResponseChSize = 0
|
||||||
|
|
||||||
|
// never fetch from response queue, and let it become full
|
||||||
|
_, err = lapi.KeepAlive(context.Background(), id)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to keepalive lease %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TTL should not be refreshed after 3 seconds
|
||||||
|
// expect keepalive to be triggered after TTL/3
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
|
||||||
|
tr, terr := lapi.TimeToLive(context.Background(), id)
|
||||||
|
if terr != nil {
|
||||||
|
t.Fatalf("failed to get lease information %v", terr)
|
||||||
|
}
|
||||||
|
if tr.TTL >= 29 {
|
||||||
|
t.Errorf("unexpected kept-alive lease TTL %d", tr.TTL)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestLeaseGrantNewAfterClose(t *testing.T) {
|
func TestLeaseGrantNewAfterClose(t *testing.T) {
|
||||||
defer testutil.AfterTest(t)
|
defer testutil.AfterTest(t)
|
||||||
|
|
||||||
|
@@ -63,12 +63,15 @@ const (
|
|||||||
// defaultTTL is the assumed lease TTL used for the first keepalive
|
// defaultTTL is the assumed lease TTL used for the first keepalive
|
||||||
// deadline before the actual TTL is known to the client.
|
// deadline before the actual TTL is known to the client.
|
||||||
defaultTTL = 5 * time.Second
|
defaultTTL = 5 * time.Second
|
||||||
// a small buffer to store unsent lease responses.
|
|
||||||
leaseResponseChSize = 16
|
|
||||||
// NoLease is a lease ID for the absence of a lease.
|
// NoLease is a lease ID for the absence of a lease.
|
||||||
NoLease LeaseID = 0
|
NoLease LeaseID = 0
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// LeaseResponseChSize is the size of buffer to store unsent lease responses.
|
||||||
|
// WARNING: DO NOT UPDATE.
|
||||||
|
// Only for testing purposes.
|
||||||
|
var LeaseResponseChSize = 16
|
||||||
|
|
||||||
// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
|
// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
|
||||||
//
|
//
|
||||||
// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
|
// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
|
||||||
@@ -223,7 +226,7 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
|
func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
|
||||||
ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)
|
ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)
|
||||||
|
|
||||||
l.mu.Lock()
|
l.mu.Lock()
|
||||||
// ensure that recvKeepAliveLoop is still running
|
// ensure that recvKeepAliveLoop is still running
|
||||||
@@ -421,9 +424,10 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
|
|||||||
for _, ch := range ka.chs {
|
for _, ch := range ka.chs {
|
||||||
select {
|
select {
|
||||||
case ch <- karesp:
|
case ch <- karesp:
|
||||||
ka.nextKeepAlive = nextKeepAlive
|
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
// still advance in order to rate-limit keep-alive sends
|
||||||
|
ka.nextKeepAlive = nextKeepAlive
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -417,6 +417,14 @@ func dbStatus(p string) dbstatus {
|
|||||||
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
|
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
|
||||||
|
|
||||||
err = db.View(func(tx *bolt.Tx) error {
|
err = db.View(func(tx *bolt.Tx) error {
|
||||||
|
// check snapshot file integrity first
|
||||||
|
var dbErrStrings []string
|
||||||
|
for dbErr := range tx.Check() {
|
||||||
|
dbErrStrings = append(dbErrStrings, dbErr.Error())
|
||||||
|
}
|
||||||
|
if len(dbErrStrings) > 0 {
|
||||||
|
return fmt.Errorf("snapshot file integrity check failed. %d errors found.\n"+strings.Join(dbErrStrings, "\n"), len(dbErrStrings))
|
||||||
|
}
|
||||||
ds.TotalSize = tx.Size()
|
ds.TotalSize = tx.Size()
|
||||||
c := tx.Cursor()
|
c := tx.Cursor()
|
||||||
for next, _ := c.First(); next != nil; next, _ = c.Next() {
|
for next, _ := c.First(); next != nil; next, _ = c.Next() {
|
||||||
|
@@ -76,11 +76,11 @@ func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http
|
|||||||
}
|
}
|
||||||
|
|
||||||
mh := &membersHandler{
|
mh := &membersHandler{
|
||||||
sec: sec,
|
sec: sec,
|
||||||
server: server,
|
server: server,
|
||||||
cluster: server.Cluster(),
|
cluster: server.Cluster(),
|
||||||
timeout: timeout,
|
timeout: timeout,
|
||||||
clock: clockwork.NewRealClock(),
|
clock: clockwork.NewRealClock(),
|
||||||
clientCertAuthEnabled: server.Cfg.ClientCertAuthEnabled,
|
clientCertAuthEnabled: server.Cfg.ClientCertAuthEnabled,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -346,6 +346,26 @@ func serveVars(w http.ResponseWriter, r *http.Request) {
|
|||||||
fmt.Fprintf(w, "\n}\n")
|
fmt.Fprintf(w, "\n}\n")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
healthSuccess = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "server",
|
||||||
|
Name: "health_success",
|
||||||
|
Help: "The total number of successful health checks",
|
||||||
|
})
|
||||||
|
healthFailed = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "server",
|
||||||
|
Name: "health_failures",
|
||||||
|
Help: "The total number of failed health checks",
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
prometheus.MustRegister(healthSuccess)
|
||||||
|
prometheus.MustRegister(healthFailed)
|
||||||
|
}
|
||||||
|
|
||||||
func healthHandler(server *etcdserver.EtcdServer) http.HandlerFunc {
|
func healthHandler(server *etcdserver.EtcdServer) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
if !allowMethod(w, r.Method, "GET") {
|
if !allowMethod(w, r.Method, "GET") {
|
||||||
@@ -353,16 +373,19 @@ func healthHandler(server *etcdserver.EtcdServer) http.HandlerFunc {
|
|||||||
}
|
}
|
||||||
if uint64(server.Leader()) == raft.None {
|
if uint64(server.Leader()) == raft.None {
|
||||||
http.Error(w, `{"health": "false"}`, http.StatusServiceUnavailable)
|
http.Error(w, `{"health": "false"}`, http.StatusServiceUnavailable)
|
||||||
|
healthFailed.Inc()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
if _, err := server.Do(ctx, etcdserverpb.Request{Method: "QGET"}); err != nil {
|
if _, err := server.Do(ctx, etcdserverpb.Request{Method: "QGET"}); err != nil {
|
||||||
http.Error(w, `{"health": "false"}`, http.StatusServiceUnavailable)
|
http.Error(w, `{"health": "false"}`, http.StatusServiceUnavailable)
|
||||||
|
healthFailed.Inc()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
w.Write([]byte(`{"health": "true"}`))
|
w.Write([]byte(`{"health": "true"}`))
|
||||||
|
healthSuccess.Inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -19,6 +19,8 @@ import (
|
|||||||
|
|
||||||
"github.com/coreos/etcd/etcdserver"
|
"github.com/coreos/etcd/etcdserver"
|
||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
|
|
||||||
|
"github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/credentials"
|
"google.golang.org/grpc/credentials"
|
||||||
"google.golang.org/grpc/grpclog"
|
"google.golang.org/grpc/grpclog"
|
||||||
@@ -45,5 +47,8 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
|
|||||||
pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
|
pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
|
||||||
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))
|
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))
|
||||||
|
|
||||||
|
// to display all registered metrics with zero values
|
||||||
|
grpc_prometheus.Register(grpcServer)
|
||||||
|
|
||||||
return grpcServer
|
return grpcServer
|
||||||
}
|
}
|
||||||
|
@@ -24,10 +24,13 @@ import (
|
|||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
"github.com/coreos/etcd/raft"
|
"github.com/coreos/etcd/raft"
|
||||||
|
|
||||||
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
|
"github.com/coreos/pkg/capnslog"
|
||||||
prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
|
"google.golang.org/grpc/peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -40,7 +43,7 @@ type streamsMap struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
|
func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
|
||||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||||||
if !api.IsCapabilityEnabled(api.V3rpcCapability) {
|
if !api.IsCapabilityEnabled(api.V3rpcCapability) {
|
||||||
return nil, rpctypes.ErrGRPCNotCapable
|
return nil, rpctypes.ErrGRPCNotCapable
|
||||||
}
|
}
|
||||||
@@ -54,10 +57,116 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return prometheus.UnaryServerInterceptor(ctx, req, info, handler)
|
return logUnaryInterceptor(ctx, req, info, handler)
|
||||||
|
// interceptors are chained manually during backporting, for better readability refer to PR #9990
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// logUnaryInterceptor is a gRPC server-side interceptor that prints info on incoming requests for debugging purpose
|
||||||
|
func logUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||||||
|
startTime := time.Now()
|
||||||
|
resp, err := prometheus.UnaryServerInterceptor(ctx, req, info, handler)
|
||||||
|
// interceptors are chained manually during backporting, for better readability refer to PR #9990
|
||||||
|
if plog.LevelAt(capnslog.DEBUG) {
|
||||||
|
defer logUnaryRequestStats(ctx, info, startTime, req, resp)
|
||||||
|
}
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func logUnaryRequestStats(ctx context.Context, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) {
|
||||||
|
duration := time.Since(startTime)
|
||||||
|
remote := "No remote client info."
|
||||||
|
peerInfo, ok := peer.FromContext(ctx)
|
||||||
|
if ok {
|
||||||
|
remote = peerInfo.Addr.String()
|
||||||
|
}
|
||||||
|
var responseType string = info.FullMethod
|
||||||
|
var reqCount, respCount int64
|
||||||
|
var reqSize, respSize int
|
||||||
|
var reqContent string
|
||||||
|
switch _resp := resp.(type) {
|
||||||
|
case *pb.RangeResponse:
|
||||||
|
_req, ok := req.(*pb.RangeRequest)
|
||||||
|
if ok {
|
||||||
|
reqCount = 0
|
||||||
|
reqSize = _req.Size()
|
||||||
|
reqContent = _req.String()
|
||||||
|
}
|
||||||
|
if _resp != nil {
|
||||||
|
respCount = _resp.Count
|
||||||
|
respSize = _resp.Size()
|
||||||
|
}
|
||||||
|
case *pb.PutResponse:
|
||||||
|
_req, ok := req.(*pb.PutRequest)
|
||||||
|
if ok {
|
||||||
|
reqCount = 1
|
||||||
|
reqSize = _req.Size()
|
||||||
|
reqContent = pb.NewLoggablePutRequest(_req).String()
|
||||||
|
// redact value field from request content, see PR #9821
|
||||||
|
}
|
||||||
|
if _resp != nil {
|
||||||
|
respCount = 0
|
||||||
|
respSize = _resp.Size()
|
||||||
|
}
|
||||||
|
case *pb.DeleteRangeResponse:
|
||||||
|
_req, ok := req.(*pb.DeleteRangeRequest)
|
||||||
|
if ok {
|
||||||
|
reqCount = 0
|
||||||
|
reqSize = _req.Size()
|
||||||
|
reqContent = _req.String()
|
||||||
|
}
|
||||||
|
if _resp != nil {
|
||||||
|
respCount = _resp.Deleted
|
||||||
|
respSize = _resp.Size()
|
||||||
|
}
|
||||||
|
case *pb.TxnResponse:
|
||||||
|
_req, ok := req.(*pb.TxnRequest)
|
||||||
|
if ok && _resp != nil {
|
||||||
|
if _resp.Succeeded { // determine the 'actual' count and size of request based on success or failure
|
||||||
|
reqCount = int64(len(_req.GetSuccess()))
|
||||||
|
reqSize = 0
|
||||||
|
for _, r := range _req.GetSuccess() {
|
||||||
|
reqSize += r.Size()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
reqCount = int64(len(_req.GetFailure()))
|
||||||
|
reqSize = 0
|
||||||
|
for _, r := range _req.GetFailure() {
|
||||||
|
reqSize += r.Size()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
reqContent = pb.NewLoggableTxnRequest(_req).String()
|
||||||
|
// redact value field from request content, see PR #9821
|
||||||
|
}
|
||||||
|
if _resp != nil {
|
||||||
|
respCount = 0
|
||||||
|
respSize = _resp.Size()
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
reqCount = -1
|
||||||
|
reqSize = -1
|
||||||
|
respCount = -1
|
||||||
|
respSize = -1
|
||||||
|
}
|
||||||
|
|
||||||
|
logGenericRequestStats(startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
|
||||||
|
}
|
||||||
|
|
||||||
|
func logGenericRequestStats(startTime time.Time, duration time.Duration, remote string, responseType string,
|
||||||
|
reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
|
||||||
|
plog.Debugf("start time = %v, "+
|
||||||
|
"time spent = %v, "+
|
||||||
|
"remote = %s, "+
|
||||||
|
"response type = %s, "+
|
||||||
|
"request count = %d, "+
|
||||||
|
"request size = %d, "+
|
||||||
|
"response count = %d, "+
|
||||||
|
"response size = %d, "+
|
||||||
|
"request content = %s",
|
||||||
|
startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
|
func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
|
||||||
smap := monitorLeader(s)
|
smap := monitorLeader(s)
|
||||||
|
|
||||||
|
@@ -95,6 +95,9 @@ func (s *EtcdServer) newApplierV3() applierV3 {
|
|||||||
|
|
||||||
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
|
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
|
||||||
ar := &applyResult{}
|
ar := &applyResult{}
|
||||||
|
defer func(start time.Time) {
|
||||||
|
warnOfExpensiveRequest(start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
|
||||||
|
}(time.Now())
|
||||||
|
|
||||||
// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
|
// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
|
||||||
switch {
|
switch {
|
||||||
|
@@ -105,10 +105,12 @@ func (a *applierV2store) Sync(r *pb.Request) Response {
|
|||||||
return Response{}
|
return Response{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// applyV2Request interprets r as a call to store.X and returns a Response interpreted
|
// applyV2Request interprets r as a call to v2store.X
|
||||||
// from store.Event
|
// and returns a Response interpreted from v2store.Event
|
||||||
func (s *EtcdServer) applyV2Request(r *pb.Request) Response {
|
func (s *EtcdServer) applyV2Request(r *pb.Request) Response {
|
||||||
|
defer warnOfExpensiveRequest(time.Now(), r, nil, nil)
|
||||||
toTTLOptions(r)
|
toTTLOptions(r)
|
||||||
|
|
||||||
switch r.Method {
|
switch r.Method {
|
||||||
case "POST":
|
case "POST":
|
||||||
return s.applyV2.Post(r)
|
return s.applyV2.Post(r)
|
||||||
|
83
etcdserver/backend.go
Normal file
83
etcdserver/backend.go
Normal file
@@ -0,0 +1,83 @@
|
|||||||
|
// Copyright 2017 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package etcdserver
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/coreos/etcd/lease"
|
||||||
|
"github.com/coreos/etcd/mvcc"
|
||||||
|
"github.com/coreos/etcd/mvcc/backend"
|
||||||
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
|
"github.com/coreos/etcd/snap"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newBackend(cfg *ServerConfig) backend.Backend {
|
||||||
|
return backend.NewDefaultBackend(backendPath(cfg))
|
||||||
|
}
|
||||||
|
|
||||||
|
func backendPath(cfg *ServerConfig) string {
|
||||||
|
return filepath.Join(cfg.SnapDir(), "db")
|
||||||
|
}
|
||||||
|
|
||||||
|
// openSnapshotBackend renames a snapshot db to the current etcd db and opens it.
|
||||||
|
func openSnapshotBackend(cfg *ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) {
|
||||||
|
snapPath, err := ss.DBFilePath(snapshot.Metadata.Index)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to find database snapshot file (%v)", err)
|
||||||
|
}
|
||||||
|
if err := os.Rename(snapPath, backendPath(cfg)); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to rename database snapshot file (%v)", err)
|
||||||
|
}
|
||||||
|
return openBackend(cfg), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// openBackend returns a backend using the current etcd db.
|
||||||
|
func openBackend(cfg *ServerConfig) backend.Backend {
|
||||||
|
fn := backendPath(cfg)
|
||||||
|
beOpened := make(chan backend.Backend)
|
||||||
|
go func() {
|
||||||
|
beOpened <- newBackend(cfg)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case be := <-beOpened:
|
||||||
|
return be
|
||||||
|
|
||||||
|
case <-time.After(10 * time.Second):
|
||||||
|
plog.Warningf("another etcd process is using %q and holds the file lock, or loading backend file is taking >10 seconds", fn)
|
||||||
|
plog.Warningf("waiting for it to exit before starting...")
|
||||||
|
}
|
||||||
|
|
||||||
|
return <-beOpened
|
||||||
|
}
|
||||||
|
|
||||||
|
// recoverBackendSnapshot recovers the DB from a snapshot in case etcd crashes
|
||||||
|
// before updating the backend db after persisting raft snapshot to disk,
|
||||||
|
// violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this
|
||||||
|
// case, replace the db with the snapshot db sent by the leader.
|
||||||
|
func recoverSnapshotBackend(cfg *ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
|
||||||
|
var cIndex consistentIndex
|
||||||
|
kv := mvcc.New(oldbe, &lease.FakeLessor{}, &cIndex)
|
||||||
|
defer kv.Close()
|
||||||
|
if snapshot.Metadata.Index <= kv.ConsistentIndex() {
|
||||||
|
return oldbe, nil
|
||||||
|
}
|
||||||
|
oldbe.Close()
|
||||||
|
return openSnapshotBackend(cfg, snap.New(cfg.SnapDir()), snapshot)
|
||||||
|
}
|
175
etcdserver/etcdserverpb/raft_internal_stringer.go
Normal file
175
etcdserver/etcdserverpb/raft_internal_stringer.go
Normal file
@@ -0,0 +1,175 @@
|
|||||||
|
// Copyright 2018 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package etcdserverpb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
proto "github.com/golang/protobuf/proto"
|
||||||
|
)
|
||||||
|
|
||||||
|
// InternalRaftStringer implements custom proto Stringer:
|
||||||
|
// redact password, replace value fields with value_size fields.
|
||||||
|
type InternalRaftStringer struct {
|
||||||
|
Request *InternalRaftRequest
|
||||||
|
}
|
||||||
|
|
||||||
|
func (as *InternalRaftStringer) String() string {
|
||||||
|
switch {
|
||||||
|
case as.Request.LeaseGrant != nil:
|
||||||
|
return fmt.Sprintf("header:<%s> lease_grant:<ttl:%d-second id:%016x>",
|
||||||
|
as.Request.Header.String(),
|
||||||
|
as.Request.LeaseGrant.TTL,
|
||||||
|
as.Request.LeaseGrant.ID,
|
||||||
|
)
|
||||||
|
case as.Request.LeaseRevoke != nil:
|
||||||
|
return fmt.Sprintf("header:<%s> lease_revoke:<id:%016x>",
|
||||||
|
as.Request.Header.String(),
|
||||||
|
as.Request.LeaseRevoke.ID,
|
||||||
|
)
|
||||||
|
case as.Request.Authenticate != nil:
|
||||||
|
return fmt.Sprintf("header:<%s> authenticate:<name:%s simple_token:%s>",
|
||||||
|
as.Request.Header.String(),
|
||||||
|
as.Request.Authenticate.Name,
|
||||||
|
as.Request.Authenticate.SimpleToken,
|
||||||
|
)
|
||||||
|
case as.Request.AuthUserAdd != nil:
|
||||||
|
return fmt.Sprintf("header:<%s> auth_user_add:<name:%s>",
|
||||||
|
as.Request.Header.String(),
|
||||||
|
as.Request.AuthUserAdd.Name,
|
||||||
|
)
|
||||||
|
case as.Request.AuthUserChangePassword != nil:
|
||||||
|
return fmt.Sprintf("header:<%s> auth_user_change_password:<name:%s>",
|
||||||
|
as.Request.Header.String(),
|
||||||
|
as.Request.AuthUserChangePassword.Name,
|
||||||
|
)
|
||||||
|
case as.Request.Put != nil:
|
||||||
|
return fmt.Sprintf("header:<%s> put:<%s>",
|
||||||
|
as.Request.Header.String(),
|
||||||
|
NewLoggablePutRequest(as.Request.Put).String(),
|
||||||
|
)
|
||||||
|
case as.Request.Txn != nil:
|
||||||
|
return fmt.Sprintf("header:<%s> txn:<%s>",
|
||||||
|
as.Request.Header.String(),
|
||||||
|
NewLoggableTxnRequest(as.Request.Txn).String(),
|
||||||
|
)
|
||||||
|
default:
|
||||||
|
// nothing to redact
|
||||||
|
}
|
||||||
|
return as.Request.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
// txnRequestStringer implements a custom proto String to replace value bytes fields with value size
|
||||||
|
// fields in any nested txn and put operations.
|
||||||
|
type txnRequestStringer struct {
|
||||||
|
Request *TxnRequest
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewLoggableTxnRequest(request *TxnRequest) *txnRequestStringer {
|
||||||
|
return &txnRequestStringer{request}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (as *txnRequestStringer) String() string {
|
||||||
|
var compare []string
|
||||||
|
for _, c := range as.Request.Compare {
|
||||||
|
switch cv := c.TargetUnion.(type) {
|
||||||
|
case *Compare_Value:
|
||||||
|
compare = append(compare, newLoggableValueCompare(c, cv).String())
|
||||||
|
default:
|
||||||
|
// nothing to redact
|
||||||
|
compare = append(compare, c.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var success []string
|
||||||
|
for _, s := range as.Request.Success {
|
||||||
|
success = append(success, newLoggableRequestOp(s).String())
|
||||||
|
}
|
||||||
|
var failure []string
|
||||||
|
for _, f := range as.Request.Failure {
|
||||||
|
failure = append(failure, newLoggableRequestOp(f).String())
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("compare:<%s> success:<%s> failure:<%s>",
|
||||||
|
strings.Join(compare, " "),
|
||||||
|
strings.Join(success, " "),
|
||||||
|
strings.Join(failure, " "),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// requestOpStringer implements a custom proto String to replace value bytes fields with value
|
||||||
|
// size fields in any nested txn and put operations.
|
||||||
|
type requestOpStringer struct {
|
||||||
|
Op *RequestOp
|
||||||
|
}
|
||||||
|
|
||||||
|
func newLoggableRequestOp(op *RequestOp) *requestOpStringer {
|
||||||
|
return &requestOpStringer{op}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (as *requestOpStringer) String() string {
|
||||||
|
switch op := as.Op.Request.(type) {
|
||||||
|
case *RequestOp_RequestPut:
|
||||||
|
return fmt.Sprintf("request_put:<%s>", NewLoggablePutRequest(op.RequestPut).String())
|
||||||
|
default:
|
||||||
|
// nothing to redact
|
||||||
|
}
|
||||||
|
return as.Op.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
// loggableValueCompare implements a custom proto String for Compare.Value union member types to
|
||||||
|
// replace the value bytes field with a value size field.
|
||||||
|
// To preserve proto encoding of the key and range_end bytes, a faked out proto type is used here.
|
||||||
|
type loggableValueCompare struct {
|
||||||
|
Result Compare_CompareResult `protobuf:"varint,1,opt,name=result,proto3,enum=etcdserverpb.Compare_CompareResult"`
|
||||||
|
Target Compare_CompareTarget `protobuf:"varint,2,opt,name=target,proto3,enum=etcdserverpb.Compare_CompareTarget"`
|
||||||
|
Key []byte `protobuf:"bytes,3,opt,name=key,proto3"`
|
||||||
|
ValueSize int `protobuf:"bytes,7,opt,name=value_size,proto3"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func newLoggableValueCompare(c *Compare, cv *Compare_Value) *loggableValueCompare {
|
||||||
|
return &loggableValueCompare{
|
||||||
|
c.Result,
|
||||||
|
c.Target,
|
||||||
|
c.Key,
|
||||||
|
len(cv.Value),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *loggableValueCompare) Reset() { *m = loggableValueCompare{} }
|
||||||
|
func (m *loggableValueCompare) String() string { return proto.CompactTextString(m) }
|
||||||
|
func (*loggableValueCompare) ProtoMessage() {}
|
||||||
|
|
||||||
|
// loggablePutRequest implements a custom proto String to replace value bytes field with a value
|
||||||
|
// size field.
|
||||||
|
// To preserve proto encoding of the key bytes, a faked out proto type is used here.
|
||||||
|
type loggablePutRequest struct {
|
||||||
|
Key []byte `protobuf:"bytes,1,opt,name=key,proto3"`
|
||||||
|
ValueSize int `protobuf:"varint,2,opt,name=value_size,proto3"`
|
||||||
|
Lease int64 `protobuf:"varint,3,opt,name=lease,proto3"`
|
||||||
|
PrevKv bool `protobuf:"varint,4,opt,name=prev_kv,proto3"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewLoggablePutRequest(request *PutRequest) *loggablePutRequest {
|
||||||
|
return &loggablePutRequest{
|
||||||
|
request.Key,
|
||||||
|
len(request.Value),
|
||||||
|
request.Lease,
|
||||||
|
request.PrevKv,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *loggablePutRequest) Reset() { *m = loggablePutRequest{} }
|
||||||
|
func (m *loggablePutRequest) String() string { return proto.CompactTextString(m) }
|
||||||
|
func (*loggablePutRequest) ProtoMessage() {}
|
@@ -15,9 +15,11 @@
|
|||||||
package etcdserver
|
package etcdserver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
goruntime "runtime"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/runtime"
|
"github.com/coreos/etcd/pkg/runtime"
|
||||||
|
"github.com/coreos/etcd/version"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -40,6 +42,18 @@ var (
|
|||||||
Name: "leader_changes_seen_total",
|
Name: "leader_changes_seen_total",
|
||||||
Help: "The number of leader changes seen.",
|
Help: "The number of leader changes seen.",
|
||||||
})
|
})
|
||||||
|
heartbeatSendFailures = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "server",
|
||||||
|
Name: "heartbeat_send_failures_total",
|
||||||
|
Help: "The total number of leader heartbeat send failures (likely overloaded from slow disk).",
|
||||||
|
})
|
||||||
|
slowApplies = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "server",
|
||||||
|
Name: "slow_apply_total",
|
||||||
|
Help: "The total number of slow apply requests (likely overloaded from slow disk).",
|
||||||
|
})
|
||||||
proposalsCommitted = prometheus.NewGauge(prometheus.GaugeOpts{
|
proposalsCommitted = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
Namespace: "etcd",
|
Namespace: "etcd",
|
||||||
Subsystem: "server",
|
Subsystem: "server",
|
||||||
@@ -64,16 +78,70 @@ var (
|
|||||||
Name: "proposals_failed_total",
|
Name: "proposals_failed_total",
|
||||||
Help: "The total number of failed proposals seen.",
|
Help: "The total number of failed proposals seen.",
|
||||||
})
|
})
|
||||||
|
slowReadIndex = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "server",
|
||||||
|
Name: "slow_read_indexes_total",
|
||||||
|
Help: "The total number of pending read indexes not in sync with leader's or timed out read index requests.",
|
||||||
|
})
|
||||||
|
readIndexFailed = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "server",
|
||||||
|
Name: "read_indexes_failed_total",
|
||||||
|
Help: "The total number of failed read indexes seen.",
|
||||||
|
})
|
||||||
|
quotaBackendBytes = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "server",
|
||||||
|
Name: "quota_backend_bytes",
|
||||||
|
Help: "Current backend storage quota size in bytes.",
|
||||||
|
})
|
||||||
|
currentVersion = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "server",
|
||||||
|
Name: "version",
|
||||||
|
Help: "Which version is running. 1 for 'server_version' label with current version.",
|
||||||
|
},
|
||||||
|
[]string{"server_version"})
|
||||||
|
currentGoVersion = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "server",
|
||||||
|
Name: "go_version",
|
||||||
|
Help: "Which Go version server is running with. 1 for 'server_go_version' label with current version.",
|
||||||
|
},
|
||||||
|
[]string{"server_go_version"})
|
||||||
|
serverID = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "server",
|
||||||
|
Name: "id",
|
||||||
|
Help: "Server or member ID in hexadecimal format. 1 for 'server_id' label with current ID.",
|
||||||
|
},
|
||||||
|
[]string{"server_id"})
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
prometheus.MustRegister(hasLeader)
|
prometheus.MustRegister(hasLeader)
|
||||||
prometheus.MustRegister(isLeader)
|
prometheus.MustRegister(isLeader)
|
||||||
prometheus.MustRegister(leaderChanges)
|
prometheus.MustRegister(leaderChanges)
|
||||||
|
prometheus.MustRegister(heartbeatSendFailures)
|
||||||
|
prometheus.MustRegister(slowApplies)
|
||||||
prometheus.MustRegister(proposalsCommitted)
|
prometheus.MustRegister(proposalsCommitted)
|
||||||
prometheus.MustRegister(proposalsApplied)
|
prometheus.MustRegister(proposalsApplied)
|
||||||
prometheus.MustRegister(proposalsPending)
|
prometheus.MustRegister(proposalsPending)
|
||||||
prometheus.MustRegister(proposalsFailed)
|
prometheus.MustRegister(proposalsFailed)
|
||||||
|
prometheus.MustRegister(slowReadIndex)
|
||||||
|
prometheus.MustRegister(readIndexFailed)
|
||||||
|
prometheus.MustRegister(quotaBackendBytes)
|
||||||
|
prometheus.MustRegister(currentVersion)
|
||||||
|
prometheus.MustRegister(currentGoVersion)
|
||||||
|
prometheus.MustRegister(serverID)
|
||||||
|
|
||||||
|
currentVersion.With(prometheus.Labels{
|
||||||
|
"server_version": version.Version,
|
||||||
|
}).Set(1)
|
||||||
|
currentGoVersion.With(prometheus.Labels{
|
||||||
|
"server_go_version": goruntime.Version(),
|
||||||
|
}).Set(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func monitorFileDescriptor(done <-chan struct{}) {
|
func monitorFileDescriptor(done <-chan struct{}) {
|
||||||
|
@@ -50,15 +50,20 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func NewBackendQuota(s *EtcdServer) Quota {
|
func NewBackendQuota(s *EtcdServer) Quota {
|
||||||
|
quotaBackendBytes.Set(float64(s.Cfg.QuotaBackendBytes))
|
||||||
|
|
||||||
if s.Cfg.QuotaBackendBytes < 0 {
|
if s.Cfg.QuotaBackendBytes < 0 {
|
||||||
// disable quotas if negative
|
// disable quotas if negative
|
||||||
plog.Warningf("disabling backend quota")
|
plog.Warningf("disabling backend quota")
|
||||||
return &passthroughQuota{}
|
return &passthroughQuota{}
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.Cfg.QuotaBackendBytes == 0 {
|
if s.Cfg.QuotaBackendBytes == 0 {
|
||||||
|
quotaBackendBytes.Set(float64(backend.DefaultQuotaBytes))
|
||||||
// use default size if no quota size given
|
// use default size if no quota size given
|
||||||
return &backendQuota{s, backend.DefaultQuotaBytes}
|
return &backendQuota{s, backend.DefaultQuotaBytes}
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.Cfg.QuotaBackendBytes > backend.MaxQuotaBytes {
|
if s.Cfg.QuotaBackendBytes > backend.MaxQuotaBytes {
|
||||||
plog.Warningf("backend quota %v exceeds maximum quota %v; using maximum", s.Cfg.QuotaBackendBytes, backend.MaxQuotaBytes)
|
plog.Warningf("backend quota %v exceeds maximum quota %v; using maximum", s.Cfg.QuotaBackendBytes, backend.MaxQuotaBytes)
|
||||||
return &backendQuota{s, backend.MaxQuotaBytes}
|
return &backendQuota{s, backend.MaxQuotaBytes}
|
||||||
|
@@ -293,6 +293,7 @@ func (r *raftNode) sendMessages(ms []raftpb.Message) {
|
|||||||
// TODO: limit request rate.
|
// TODO: limit request rate.
|
||||||
plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
|
plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
|
||||||
plog.Warningf("server is likely overloaded")
|
plog.Warningf("server is likely overloaded")
|
||||||
|
heartbeatSendFailures.Inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -56,8 +56,10 @@ import (
|
|||||||
"github.com/coreos/etcd/store"
|
"github.com/coreos/etcd/store"
|
||||||
"github.com/coreos/etcd/version"
|
"github.com/coreos/etcd/version"
|
||||||
"github.com/coreos/etcd/wal"
|
"github.com/coreos/etcd/wal"
|
||||||
|
|
||||||
"github.com/coreos/go-semver/semver"
|
"github.com/coreos/go-semver/semver"
|
||||||
"github.com/coreos/pkg/capnslog"
|
"github.com/coreos/pkg/capnslog"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -378,6 +380,10 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
plog.Panicf("recovered store from snapshot error: %v", err)
|
plog.Panicf("recovered store from snapshot error: %v", err)
|
||||||
}
|
}
|
||||||
plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index)
|
plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index)
|
||||||
|
|
||||||
|
if be, err = recoverSnapshotBackend(cfg, be, *snapshot); err != nil {
|
||||||
|
plog.Panicf("recovering backend from snapshot error: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
cfg.Print()
|
cfg.Print()
|
||||||
if !cfg.ForceNewCluster {
|
if !cfg.ForceNewCluster {
|
||||||
@@ -437,6 +443,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
|
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
|
||||||
forceVersionC: make(chan struct{}),
|
forceVersionC: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1)
|
||||||
|
|
||||||
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
|
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
|
||||||
|
|
||||||
@@ -796,14 +803,8 @@ func (s *EtcdServer) run() {
|
|||||||
|
|
||||||
func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
|
func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
|
||||||
s.applySnapshot(ep, apply)
|
s.applySnapshot(ep, apply)
|
||||||
st := time.Now()
|
|
||||||
s.applyEntries(ep, apply)
|
s.applyEntries(ep, apply)
|
||||||
d := time.Since(st)
|
|
||||||
entriesNum := len(apply.entries)
|
|
||||||
if entriesNum != 0 && d > time.Duration(entriesNum)*warnApplyDuration {
|
|
||||||
plog.Warningf("apply entries took too long [%v for %d entries]", d, len(apply.entries))
|
|
||||||
plog.Warningf("avoid queries with large range/delete range!")
|
|
||||||
}
|
|
||||||
proposalsApplied.Set(float64(ep.appliedi))
|
proposalsApplied.Set(float64(ep.appliedi))
|
||||||
s.applyWait.Trigger(ep.appliedi)
|
s.applyWait.Trigger(ep.appliedi)
|
||||||
// wait for the raft routine to finish the disk writes before triggering a
|
// wait for the raft routine to finish the disk writes before triggering a
|
||||||
|
@@ -15,11 +15,16 @@
|
|||||||
package etcdserver
|
package etcdserver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
"github.com/coreos/etcd/etcdserver/membership"
|
"github.com/coreos/etcd/etcdserver/membership"
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
"github.com/coreos/etcd/rafthttp"
|
"github.com/coreos/etcd/rafthttp"
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
// isConnectedToQuorumSince checks whether the local member is connected to the
|
// isConnectedToQuorumSince checks whether the local member is connected to the
|
||||||
@@ -95,3 +100,56 @@ func (nc *notifier) notify(err error) {
|
|||||||
nc.err = err
|
nc.err = err
|
||||||
close(nc.c)
|
close(nc.c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func warnOfExpensiveRequest(now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
|
||||||
|
var resp string
|
||||||
|
if !isNil(respMsg) {
|
||||||
|
resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
|
||||||
|
}
|
||||||
|
warnOfExpensiveGenericRequest(now, reqStringer, "", resp, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func warnOfExpensiveReadOnlyTxnRequest(now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
|
||||||
|
reqStringer := pb.NewLoggableTxnRequest(r)
|
||||||
|
var resp string
|
||||||
|
if !isNil(txnResponse) {
|
||||||
|
var resps []string
|
||||||
|
for _, r := range txnResponse.Responses {
|
||||||
|
switch op := r.Response.(type) {
|
||||||
|
case *pb.ResponseOp_ResponseRange:
|
||||||
|
resps = append(resps, fmt.Sprintf("range_response_count:%d", len(op.ResponseRange.Kvs)))
|
||||||
|
default:
|
||||||
|
// only range responses should be in a read only txn request
|
||||||
|
}
|
||||||
|
}
|
||||||
|
resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), proto.Size(txnResponse))
|
||||||
|
}
|
||||||
|
warnOfExpensiveGenericRequest(now, reqStringer, "read-only range ", resp, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func warnOfExpensiveReadOnlyRangeRequest(now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) {
|
||||||
|
var resp string
|
||||||
|
if !isNil(rangeResponse) {
|
||||||
|
resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), proto.Size(rangeResponse))
|
||||||
|
}
|
||||||
|
warnOfExpensiveGenericRequest(now, reqStringer, "read-only range ", resp, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func warnOfExpensiveGenericRequest(now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
|
||||||
|
// TODO: add metrics
|
||||||
|
d := time.Since(now)
|
||||||
|
if d > warnApplyDuration {
|
||||||
|
var result string
|
||||||
|
if err != nil {
|
||||||
|
result = fmt.Sprintf("error:%v", err)
|
||||||
|
} else {
|
||||||
|
result = resp
|
||||||
|
}
|
||||||
|
plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d)
|
||||||
|
slowApplies.Inc()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func isNil(msg proto.Message) bool {
|
||||||
|
return msg == nil || reflect.ValueOf(msg).IsNil()
|
||||||
|
}
|
||||||
|
@@ -95,21 +95,25 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
|
|||||||
if s.ClusterVersion() == nil || s.ClusterVersion().LessThan(newRangeClusterVersion) {
|
if s.ClusterVersion() == nil || s.ClusterVersion().LessThan(newRangeClusterVersion) {
|
||||||
return s.legacyRange(ctx, r)
|
return s.legacyRange(ctx, r)
|
||||||
}
|
}
|
||||||
|
var resp *pb.RangeResponse
|
||||||
|
var err error
|
||||||
|
defer func(start time.Time) {
|
||||||
|
warnOfExpensiveReadOnlyRangeRequest(start, r, resp, err)
|
||||||
|
}(time.Now())
|
||||||
|
|
||||||
if !r.Serializable {
|
if !r.Serializable {
|
||||||
err := s.linearizableReadNotify(ctx)
|
err = s.linearizableReadNotify(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var resp *pb.RangeResponse
|
|
||||||
var err error
|
|
||||||
chk := func(ai *auth.AuthInfo) error {
|
chk := func(ai *auth.AuthInfo) error {
|
||||||
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
|
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
|
||||||
}
|
}
|
||||||
get := func() { resp, err = s.applyV3Base.Range(noTxn, r) }
|
get := func() { resp, err = s.applyV3Base.Range(noTxn, r) }
|
||||||
if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
||||||
return nil, serr
|
err = serr
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
@@ -178,6 +182,11 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
|
|||||||
chk := func(ai *auth.AuthInfo) error {
|
chk := func(ai *auth.AuthInfo) error {
|
||||||
return checkTxnAuth(s.authStore, ai, r)
|
return checkTxnAuth(s.authStore, ai, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer func(start time.Time) {
|
||||||
|
warnOfExpensiveReadOnlyTxnRequest(start, r, resp, err)
|
||||||
|
}(time.Now())
|
||||||
|
|
||||||
get := func() { resp, err = s.applyV3Base.Txn(r) }
|
get := func() { resp, err = s.applyV3Base.Txn(r) }
|
||||||
if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
||||||
return nil, serr
|
return nil, serr
|
||||||
@@ -716,8 +725,9 @@ func (s *EtcdServer) linearizableReadLoop() {
|
|||||||
var rs raft.ReadState
|
var rs raft.ReadState
|
||||||
|
|
||||||
for {
|
for {
|
||||||
ctx := make([]byte, 8)
|
ctxToSend := make([]byte, 8)
|
||||||
binary.BigEndian.PutUint64(ctx, s.reqIDGen.Next())
|
id1 := s.reqIDGen.Next()
|
||||||
|
binary.BigEndian.PutUint64(ctxToSend, id1)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-s.readwaitc:
|
case <-s.readwaitc:
|
||||||
@@ -733,12 +743,13 @@ func (s *EtcdServer) linearizableReadLoop() {
|
|||||||
s.readMu.Unlock()
|
s.readMu.Unlock()
|
||||||
|
|
||||||
cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
|
cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
|
||||||
if err := s.r.ReadIndex(cctx, ctx); err != nil {
|
if err := s.r.ReadIndex(cctx, ctxToSend); err != nil {
|
||||||
cancel()
|
cancel()
|
||||||
if err == raft.ErrStopped {
|
if err == raft.ErrStopped {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
plog.Errorf("failed to get read index from raft: %v", err)
|
plog.Errorf("failed to get read index from raft: %v", err)
|
||||||
|
readIndexFailed.Inc()
|
||||||
nr.notify(err)
|
nr.notify(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -751,16 +762,24 @@ func (s *EtcdServer) linearizableReadLoop() {
|
|||||||
for !timeout && !done {
|
for !timeout && !done {
|
||||||
select {
|
select {
|
||||||
case rs = <-s.r.readStateC:
|
case rs = <-s.r.readStateC:
|
||||||
done = bytes.Equal(rs.RequestCtx, ctx)
|
done = bytes.Equal(rs.RequestCtx, ctxToSend)
|
||||||
if !done {
|
if !done {
|
||||||
// a previous request might time out. now we should ignore the response of it and
|
// a previous request might time out. now we should ignore the response of it and
|
||||||
// continue waiting for the response of the current requests.
|
// continue waiting for the response of the current requests.
|
||||||
plog.Warningf("ignored out-of-date read index response (want %v, got %v)", rs.RequestCtx, ctx)
|
id2 := uint64(0)
|
||||||
|
if len(rs.RequestCtx) == 8 {
|
||||||
|
id2 = binary.BigEndian.Uint64(rs.RequestCtx)
|
||||||
|
}
|
||||||
|
plog.Warningf("ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader (request ID want %d, got %d)", id1, id2)
|
||||||
|
slowReadIndex.Inc()
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-time.After(s.Cfg.ReqTimeout()):
|
case <-time.After(s.Cfg.ReqTimeout()):
|
||||||
plog.Warningf("timed out waiting for read index response")
|
plog.Warningf("timed out waiting for read index response (local node might have slow network)")
|
||||||
nr.notify(ErrTimeout)
|
nr.notify(ErrTimeout)
|
||||||
timeout = true
|
timeout = true
|
||||||
|
slowReadIndex.Inc()
|
||||||
|
|
||||||
case <-s.stopping:
|
case <-s.stopping:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@@ -58,6 +58,10 @@ type Backend interface {
|
|||||||
Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
|
Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
|
||||||
// Size returns the current size of the backend.
|
// Size returns the current size of the backend.
|
||||||
Size() int64
|
Size() int64
|
||||||
|
// SizeInUse returns the current size of the backend logically in use.
|
||||||
|
// Since the backend can manage free space in a non-byte unit such as
|
||||||
|
// number of pages, the returned value can be not exactly accurate in bytes.
|
||||||
|
SizeInUse() int64
|
||||||
Defrag() error
|
Defrag() error
|
||||||
ForceCommit()
|
ForceCommit()
|
||||||
Close() error
|
Close() error
|
||||||
@@ -78,6 +82,10 @@ type backend struct {
|
|||||||
|
|
||||||
// size is the number of bytes in the backend
|
// size is the number of bytes in the backend
|
||||||
size int64
|
size int64
|
||||||
|
|
||||||
|
// sizeInUse is the number of bytes actually used in the backend
|
||||||
|
sizeInUse int64
|
||||||
|
|
||||||
// commits counts number of commits since start
|
// commits counts number of commits since start
|
||||||
commits int64
|
commits int64
|
||||||
|
|
||||||
@@ -185,6 +193,10 @@ func (b *backend) Size() int64 {
|
|||||||
return atomic.LoadInt64(&b.size)
|
return atomic.LoadInt64(&b.size)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *backend) SizeInUse() int64 {
|
||||||
|
return atomic.LoadInt64(&b.sizeInUse)
|
||||||
|
}
|
||||||
|
|
||||||
func (b *backend) run() {
|
func (b *backend) run() {
|
||||||
defer close(b.donec)
|
defer close(b.donec)
|
||||||
t := time.NewTimer(b.batchInterval)
|
t := time.NewTimer(b.batchInterval)
|
||||||
@@ -213,18 +225,12 @@ func (b *backend) Commits() int64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (b *backend) Defrag() error {
|
func (b *backend) Defrag() error {
|
||||||
err := b.defrag()
|
return b.defrag()
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// commit to update metadata like db.size
|
|
||||||
b.batchTx.Commit()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *backend) defrag() error {
|
func (b *backend) defrag() error {
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
// TODO: make this non-blocking?
|
// TODO: make this non-blocking?
|
||||||
// lock batchTx to ensure nobody is using previous tx, and then
|
// lock batchTx to ensure nobody is using previous tx, and then
|
||||||
// close previous ongoing tx.
|
// close previous ongoing tx.
|
||||||
@@ -276,6 +282,14 @@ func (b *backend) defrag() error {
|
|||||||
plog.Fatalf("cannot begin tx (%s)", err)
|
plog.Fatalf("cannot begin tx (%s)", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size := b.batchTx.tx.Size()
|
||||||
|
db := b.db
|
||||||
|
atomic.StoreInt64(&b.size, size)
|
||||||
|
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
|
||||||
|
|
||||||
|
took := time.Since(now)
|
||||||
|
defragDurations.Observe(took.Seconds())
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -136,15 +136,15 @@ func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) err
|
|||||||
// Commit commits a previous tx and begins a new writable one.
|
// Commit commits a previous tx and begins a new writable one.
|
||||||
func (t *batchTx) Commit() {
|
func (t *batchTx) Commit() {
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
|
||||||
t.commit(false)
|
t.commit(false)
|
||||||
|
t.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// CommitAndStop commits the previous tx and do not create a new one.
|
// CommitAndStop commits the previous tx and do not create a new one.
|
||||||
func (t *batchTx) CommitAndStop() {
|
func (t *batchTx) CommitAndStop() {
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
|
||||||
t.commit(true)
|
t.commit(true)
|
||||||
|
t.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *batchTx) Unlock() {
|
func (t *batchTx) Unlock() {
|
||||||
@@ -160,28 +160,14 @@ func (t *batchTx) commit(stop bool) {
|
|||||||
// commit the last tx
|
// commit the last tx
|
||||||
if t.tx != nil {
|
if t.tx != nil {
|
||||||
if t.pending == 0 && !stop {
|
if t.pending == 0 && !stop {
|
||||||
t.backend.mu.RLock()
|
|
||||||
defer t.backend.mu.RUnlock()
|
|
||||||
|
|
||||||
// batchTx.commit(true) calls *bolt.Tx.Commit, which
|
|
||||||
// initializes *bolt.Tx.db and *bolt.Tx.meta as nil,
|
|
||||||
// and subsequent *bolt.Tx.Size() call panics.
|
|
||||||
//
|
|
||||||
// This nil pointer reference panic happens when:
|
|
||||||
// 1. batchTx.commit(false) from newBatchTx
|
|
||||||
// 2. batchTx.commit(true) from stopping backend
|
|
||||||
// 3. batchTx.commit(false) from inflight mvcc Hash call
|
|
||||||
//
|
|
||||||
// Check if db is nil to prevent this panic
|
|
||||||
if t.tx.DB() != nil {
|
|
||||||
atomic.StoreInt64(&t.backend.size, t.tx.Size())
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
// gofail: var beforeCommit struct{}
|
// gofail: var beforeCommit struct{}
|
||||||
err = t.tx.Commit()
|
err = t.tx.Commit()
|
||||||
// gofail: var afterCommit struct{}
|
// gofail: var afterCommit struct{}
|
||||||
|
|
||||||
commitDurations.Observe(time.Since(start).Seconds())
|
commitDurations.Observe(time.Since(start).Seconds())
|
||||||
atomic.AddInt64(&t.backend.commits, 1)
|
atomic.AddInt64(&t.backend.commits, 1)
|
||||||
|
|
||||||
@@ -202,5 +188,9 @@ func (t *batchTx) commit(stop bool) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
plog.Fatalf("cannot begin tx (%s)", err)
|
plog.Fatalf("cannot begin tx (%s)", err)
|
||||||
}
|
}
|
||||||
atomic.StoreInt64(&t.backend.size, t.tx.Size())
|
|
||||||
|
size := t.tx.Size()
|
||||||
|
db := t.backend.db
|
||||||
|
atomic.StoreInt64(&t.backend.size, size)
|
||||||
|
atomic.StoreInt64(&t.backend.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
|
||||||
}
|
}
|
||||||
|
@@ -22,10 +22,26 @@ var (
|
|||||||
Subsystem: "disk",
|
Subsystem: "disk",
|
||||||
Name: "backend_commit_duration_seconds",
|
Name: "backend_commit_duration_seconds",
|
||||||
Help: "The latency distributions of commit called by backend.",
|
Help: "The latency distributions of commit called by backend.",
|
||||||
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
|
|
||||||
|
// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
|
||||||
|
// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
|
||||||
|
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
|
||||||
|
})
|
||||||
|
|
||||||
|
defragDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "disk",
|
||||||
|
Name: "backend_defrag_duration_seconds",
|
||||||
|
Help: "The latency distribution of backend defragmentation.",
|
||||||
|
|
||||||
|
// 100 MB usually takes 1 sec, so start with 10 MB of 100 ms
|
||||||
|
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
|
||||||
|
// highest bucket start of 0.1 sec * 2^12 == 409.6 sec
|
||||||
|
Buckets: prometheus.ExponentialBuckets(.1, 2, 13),
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
prometheus.MustRegister(commitDurations)
|
prometheus.MustRegister(commitDurations)
|
||||||
|
prometheus.MustRegister(defragDurations)
|
||||||
}
|
}
|
||||||
|
@@ -217,7 +217,6 @@ func (s *store) txnEnd(txnID int64) error {
|
|||||||
}
|
}
|
||||||
s.currentRev.sub = 0
|
s.currentRev.sub = 0
|
||||||
|
|
||||||
dbTotalSize.Set(float64(s.b.Size()))
|
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -336,9 +335,14 @@ func init() {
|
|||||||
func (s *store) Hash() (uint32, int64, error) {
|
func (s *store) Hash() (uint32, int64, error) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
s.b.ForceCommit()
|
s.b.ForceCommit()
|
||||||
|
|
||||||
h, err := s.b.Hash(DefaultIgnores)
|
h, err := s.b.Hash(DefaultIgnores)
|
||||||
|
|
||||||
|
hashDurations.Observe(time.Since(start).Seconds())
|
||||||
rev := s.currentRev.main
|
rev := s.currentRev.main
|
||||||
return h, rev, err
|
return h, rev, err
|
||||||
}
|
}
|
||||||
@@ -374,6 +378,13 @@ func (s *store) Restore(b backend.Backend) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) restore() error {
|
func (s *store) restore() error {
|
||||||
|
reportDbTotalSizeInBytesMu.Lock()
|
||||||
|
reportDbTotalSizeInBytes = func() float64 { return float64(s.b.Size()) }
|
||||||
|
reportDbTotalSizeInBytesMu.Unlock()
|
||||||
|
reportDbTotalSizeInUseInBytesMu.Lock()
|
||||||
|
reportDbTotalSizeInUseInBytes = func() float64 { return float64(s.b.SizeInUse()) }
|
||||||
|
reportDbTotalSizeInUseInBytesMu.Unlock()
|
||||||
|
|
||||||
min, max := newRevBytes(), newRevBytes()
|
min, max := newRevBytes(), newRevBytes()
|
||||||
revToBytes(revision{main: 1}, min)
|
revToBytes(revision{main: 1}, min)
|
||||||
revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
|
revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
|
||||||
|
@@ -613,6 +613,7 @@ type fakeBackend struct {
|
|||||||
func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx }
|
func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx }
|
||||||
func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil }
|
func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil }
|
||||||
func (b *fakeBackend) Size() int64 { return 0 }
|
func (b *fakeBackend) Size() int64 { return 0 }
|
||||||
|
func (b *fakeBackend) SizeInUse() int64 { return 0 }
|
||||||
func (b *fakeBackend) Snapshot() backend.Snapshot { return nil }
|
func (b *fakeBackend) Snapshot() backend.Snapshot { return nil }
|
||||||
func (b *fakeBackend) ForceCommit() {}
|
func (b *fakeBackend) ForceCommit() {}
|
||||||
func (b *fakeBackend) Defrag() error { return nil }
|
func (b *fakeBackend) Defrag() error { return nil }
|
||||||
|
@@ -15,6 +15,8 @@
|
|||||||
package mvcc
|
package mvcc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -129,11 +131,62 @@ var (
|
|||||||
Buckets: prometheus.ExponentialBuckets(100, 2, 14),
|
Buckets: prometheus.ExponentialBuckets(100, 2, 14),
|
||||||
})
|
})
|
||||||
|
|
||||||
dbTotalSize = prometheus.NewGauge(prometheus.GaugeOpts{
|
dbTotalSizeDebugging = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||||
Namespace: "etcd_debugging",
|
Namespace: "etcd_debugging",
|
||||||
Subsystem: "mvcc",
|
Subsystem: "mvcc",
|
||||||
Name: "db_total_size_in_bytes",
|
Name: "db_total_size_in_bytes",
|
||||||
Help: "Total size of the underlying database in bytes.",
|
Help: "Total size of the underlying database physically allocated in bytes. Use etcd_mvcc_db_total_size_in_bytes",
|
||||||
|
},
|
||||||
|
func() float64 {
|
||||||
|
reportDbTotalSizeInBytesMu.RLock()
|
||||||
|
defer reportDbTotalSizeInBytesMu.RUnlock()
|
||||||
|
return reportDbTotalSizeInBytes()
|
||||||
|
},
|
||||||
|
)
|
||||||
|
// overridden
|
||||||
|
|
||||||
|
dbTotalSize = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "mvcc",
|
||||||
|
Name: "db_total_size_in_bytes",
|
||||||
|
Help: "Total size of the underlying database physically allocated in bytes.",
|
||||||
|
},
|
||||||
|
func() float64 {
|
||||||
|
reportDbTotalSizeInBytesMu.RLock()
|
||||||
|
defer reportDbTotalSizeInBytesMu.RUnlock()
|
||||||
|
return reportDbTotalSizeInBytes()
|
||||||
|
},
|
||||||
|
)
|
||||||
|
// overridden by mvcc initialization
|
||||||
|
reportDbTotalSizeInBytesMu sync.RWMutex
|
||||||
|
reportDbTotalSizeInBytes = func() float64 { return 0 }
|
||||||
|
|
||||||
|
dbTotalSizeInUse = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "mvcc",
|
||||||
|
Name: "db_total_size_in_use_in_bytes",
|
||||||
|
Help: "Total size of the underlying database logically in use in bytes.",
|
||||||
|
},
|
||||||
|
func() float64 {
|
||||||
|
reportDbTotalSizeInUseInBytesMu.RLock()
|
||||||
|
defer reportDbTotalSizeInUseInBytesMu.RUnlock()
|
||||||
|
return reportDbTotalSizeInUseInBytes()
|
||||||
|
},
|
||||||
|
)
|
||||||
|
// overridden by mvcc initialization
|
||||||
|
reportDbTotalSizeInUseInBytesMu sync.RWMutex
|
||||||
|
reportDbTotalSizeInUseInBytes func() float64 = func() float64 { return 0 }
|
||||||
|
|
||||||
|
hashDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "mvcc",
|
||||||
|
Name: "hash_duration_seconds",
|
||||||
|
Help: "The latency distribution of storage hash operation.",
|
||||||
|
|
||||||
|
// 100 MB usually takes 100 ms, so start with 10 MB of 10 ms
|
||||||
|
// lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
|
||||||
|
// highest bucket start of 0.01 sec * 2^14 == 163.84 sec
|
||||||
|
Buckets: prometheus.ExponentialBuckets(.01, 2, 15),
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -151,7 +204,10 @@ func init() {
|
|||||||
prometheus.MustRegister(indexCompactionPauseDurations)
|
prometheus.MustRegister(indexCompactionPauseDurations)
|
||||||
prometheus.MustRegister(dbCompactionPauseDurations)
|
prometheus.MustRegister(dbCompactionPauseDurations)
|
||||||
prometheus.MustRegister(dbCompactionTotalDurations)
|
prometheus.MustRegister(dbCompactionTotalDurations)
|
||||||
|
prometheus.MustRegister(dbTotalSizeDebugging)
|
||||||
prometheus.MustRegister(dbTotalSize)
|
prometheus.MustRegister(dbTotalSize)
|
||||||
|
prometheus.MustRegister(dbTotalSizeInUse)
|
||||||
|
prometheus.MustRegister(hashDurations)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReportEventReceived reports that an event is received.
|
// ReportEventReceived reports that an event is received.
|
||||||
|
@@ -267,6 +267,7 @@ func (s *watchableStore) Restore(b backend.Backend) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for wa := range s.synced.watchers {
|
for wa := range s.synced.watchers {
|
||||||
|
wa.restore = true
|
||||||
s.unsynced.add(wa)
|
s.unsynced.add(wa)
|
||||||
}
|
}
|
||||||
s.synced = newWatcherGroup()
|
s.synced = newWatcherGroup()
|
||||||
@@ -549,6 +550,14 @@ type watcher struct {
|
|||||||
// compacted is set when the watcher is removed because of compaction
|
// compacted is set when the watcher is removed because of compaction
|
||||||
compacted bool
|
compacted bool
|
||||||
|
|
||||||
|
// restore is true when the watcher is being restored from leader snapshot
|
||||||
|
// which means that this watcher has just been moved from "synced" to "unsynced"
|
||||||
|
// watcher group, possibly with a future revision when it was first added
|
||||||
|
// to the synced watcher
|
||||||
|
// "unsynced" watcher revision must always be <= current revision,
|
||||||
|
// except when the watcher were to be moved from "synced" watcher group
|
||||||
|
restore bool
|
||||||
|
|
||||||
// minRev is the minimum revision update the watcher will accept
|
// minRev is the minimum revision update the watcher will accept
|
||||||
minRev int64
|
minRev int64
|
||||||
id WatchID
|
id WatchID
|
||||||
|
@@ -336,6 +336,62 @@ func TestWatchRestore(t *testing.T) {
|
|||||||
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
|
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestWatchRestoreSyncedWatcher tests such a case that:
|
||||||
|
// 1. watcher is created with a future revision "math.MaxInt64 - 2"
|
||||||
|
// 2. watcher with a future revision is added to "synced" watcher group
|
||||||
|
// 3. restore/overwrite storage with snapshot of a higher lasat revision
|
||||||
|
// 4. restore operation moves "synced" to "unsynced" watcher group
|
||||||
|
// 5. choose the watcher from step 1, without panic
|
||||||
|
func TestWatchRestoreSyncedWatcher(t *testing.T) {
|
||||||
|
b1, b1Path := backend.NewDefaultTmpBackend()
|
||||||
|
s1 := newWatchableStore(b1, &lease.FakeLessor{}, nil)
|
||||||
|
defer cleanup(s1, b1, b1Path)
|
||||||
|
|
||||||
|
b2, b2Path := backend.NewDefaultTmpBackend()
|
||||||
|
s2 := newWatchableStore(b2, &lease.FakeLessor{}, nil)
|
||||||
|
defer cleanup(s2, b2, b2Path)
|
||||||
|
|
||||||
|
testKey, testValue := []byte("foo"), []byte("bar")
|
||||||
|
rev := s1.Put(testKey, testValue, lease.NoLease)
|
||||||
|
startRev := rev + 2
|
||||||
|
|
||||||
|
// create a watcher with a future revision
|
||||||
|
// add to "synced" watcher group (startRev > s.store.currentRev)
|
||||||
|
w1 := s1.NewWatchStream()
|
||||||
|
w1.Watch(testKey, nil, startRev)
|
||||||
|
|
||||||
|
// make "s2" ends up with a higher last revision
|
||||||
|
s2.Put(testKey, testValue, lease.NoLease)
|
||||||
|
s2.Put(testKey, testValue, lease.NoLease)
|
||||||
|
|
||||||
|
// overwrite storage with higher revisions
|
||||||
|
if err := s1.Restore(b2); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for next "syncWatchersLoop" iteration
|
||||||
|
// and the unsynced watcher should be chosen
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
// trigger events for "startRev"
|
||||||
|
s1.Put(testKey, testValue, lease.NoLease)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case resp := <-w1.Chan():
|
||||||
|
if resp.Revision != startRev {
|
||||||
|
t.Fatalf("resp.Revision expect %d, got %d", startRev, resp.Revision)
|
||||||
|
}
|
||||||
|
if len(resp.Events) != 1 {
|
||||||
|
t.Fatalf("len(resp.Events) expect 1, got %d", len(resp.Events))
|
||||||
|
}
|
||||||
|
if resp.Events[0].Kv.ModRevision != startRev {
|
||||||
|
t.Fatalf("resp.Events[0].Kv.ModRevision expect %d, got %d", startRev, resp.Events[0].Kv.ModRevision)
|
||||||
|
}
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("failed to receive event in 1 second")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestWatchBatchUnsynced tests batching on unsynced watchers
|
// TestWatchBatchUnsynced tests batching on unsynced watchers
|
||||||
func TestWatchBatchUnsynced(t *testing.T) {
|
func TestWatchBatchUnsynced(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
|
@@ -15,6 +15,7 @@
|
|||||||
package mvcc
|
package mvcc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
|
||||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||||
@@ -238,7 +239,15 @@ func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
|
|||||||
minRev := int64(math.MaxInt64)
|
minRev := int64(math.MaxInt64)
|
||||||
for w := range wg.watchers {
|
for w := range wg.watchers {
|
||||||
if w.minRev > curRev {
|
if w.minRev > curRev {
|
||||||
panic("watcher current revision should not exceed current revision")
|
// after network partition, possibly choosing future revision watcher from restore operation
|
||||||
|
// with watch key "proxy-namespace__lostleader" and revision "math.MaxInt64 - 2"
|
||||||
|
// do not panic when such watcher had been moved from "synced" watcher during restore operation
|
||||||
|
if !w.restore {
|
||||||
|
panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev))
|
||||||
|
}
|
||||||
|
|
||||||
|
// mark 'restore' done, since it's chosen
|
||||||
|
w.restore = false
|
||||||
}
|
}
|
||||||
if w.minRev < compactRev {
|
if w.minRev < compactRev {
|
||||||
select {
|
select {
|
||||||
|
@@ -21,6 +21,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"path"
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
pioutil "github.com/coreos/etcd/pkg/ioutil"
|
pioutil "github.com/coreos/etcd/pkg/ioutil"
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
@@ -153,6 +154,8 @@ func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, c
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const unknownSnapshotSender = "UNKNOWN_SNAPSHOT_SENDER"
|
||||||
|
|
||||||
// ServeHTTP serves HTTP request to receive and process snapshot message.
|
// ServeHTTP serves HTTP request to receive and process snapshot message.
|
||||||
//
|
//
|
||||||
// If request sender dies without closing underlying TCP connection,
|
// If request sender dies without closing underlying TCP connection,
|
||||||
@@ -163,9 +166,12 @@ func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, c
|
|||||||
// received and processed.
|
// received and processed.
|
||||||
// 2. this case should happen rarely, so no further optimization is done.
|
// 2. this case should happen rarely, so no further optimization is done.
|
||||||
func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
if r.Method != "POST" {
|
if r.Method != "POST" {
|
||||||
w.Header().Set("Allow", "POST")
|
w.Header().Set("Allow", "POST")
|
||||||
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
|
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
|
||||||
|
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -173,6 +179,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
|
if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusPreconditionFailed)
|
http.Error(w, err.Error(), http.StatusPreconditionFailed)
|
||||||
|
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -185,19 +192,22 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
dec := &messageDecoder{r: r.Body}
|
dec := &messageDecoder{r: r.Body}
|
||||||
// let snapshots be very large since they can exceed 512MB for large installations
|
// let snapshots be very large since they can exceed 512MB for large installations
|
||||||
m, err := dec.decodeLimit(uint64(1 << 63))
|
m, err := dec.decodeLimit(uint64(1 << 63))
|
||||||
|
from := types.ID(m.From).String()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg := fmt.Sprintf("failed to decode raft message (%v)", err)
|
msg := fmt.Sprintf("failed to decode raft message (%v)", err)
|
||||||
plog.Errorf(msg)
|
plog.Errorf(msg)
|
||||||
http.Error(w, msg, http.StatusBadRequest)
|
http.Error(w, msg, http.StatusBadRequest)
|
||||||
recvFailures.WithLabelValues(r.RemoteAddr).Inc()
|
recvFailures.WithLabelValues(r.RemoteAddr).Inc()
|
||||||
|
snapshotReceiveFailures.WithLabelValues(from).Inc()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
|
receivedBytes.WithLabelValues(from).Add(float64(m.Size()))
|
||||||
|
|
||||||
if m.Type != raftpb.MsgSnap {
|
if m.Type != raftpb.MsgSnap {
|
||||||
plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
|
plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
|
||||||
http.Error(w, "wrong raft message type", http.StatusBadRequest)
|
http.Error(w, "wrong raft message type", http.StatusBadRequest)
|
||||||
|
snapshotReceiveFailures.WithLabelValues(from).Inc()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -208,9 +218,10 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
|
msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
|
||||||
plog.Error(msg)
|
plog.Error(msg)
|
||||||
http.Error(w, msg, http.StatusInternalServerError)
|
http.Error(w, msg, http.StatusInternalServerError)
|
||||||
|
snapshotReceiveFailures.WithLabelValues(from).Inc()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(n))
|
receivedBytes.WithLabelValues(from).Add(float64(n))
|
||||||
plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
|
plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
|
||||||
|
|
||||||
if err := h.r.Process(context.TODO(), m); err != nil {
|
if err := h.r.Process(context.TODO(), m); err != nil {
|
||||||
@@ -223,12 +234,16 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
msg := fmt.Sprintf("failed to process raft message (%v)", err)
|
msg := fmt.Sprintf("failed to process raft message (%v)", err)
|
||||||
plog.Warningf(msg)
|
plog.Warningf(msg)
|
||||||
http.Error(w, msg, http.StatusInternalServerError)
|
http.Error(w, msg, http.StatusInternalServerError)
|
||||||
|
snapshotReceiveFailures.WithLabelValues(from).Inc()
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Write StatusNoContent header after the message has been processed by
|
// Write StatusNoContent header after the message has been processed by
|
||||||
// raft, which facilitates the client to report MsgSnap status.
|
// raft, which facilitates the client to report MsgSnap status.
|
||||||
w.WriteHeader(http.StatusNoContent)
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
|
||||||
|
snapshotReceive.WithLabelValues(from).Inc()
|
||||||
|
snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
type streamHandler struct {
|
type streamHandler struct {
|
||||||
|
@@ -53,6 +53,68 @@ var (
|
|||||||
[]string{"From"},
|
[]string{"From"},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
snapshotSend = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "network",
|
||||||
|
Name: "snapshot_send_success",
|
||||||
|
Help: "Total number of successful snapshot sends",
|
||||||
|
},
|
||||||
|
[]string{"To"},
|
||||||
|
)
|
||||||
|
|
||||||
|
snapshotSendFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "network",
|
||||||
|
Name: "snapshot_send_failures",
|
||||||
|
Help: "Total number of snapshot send failures",
|
||||||
|
},
|
||||||
|
[]string{"To"},
|
||||||
|
)
|
||||||
|
|
||||||
|
snapshotSendSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "network",
|
||||||
|
Name: "snapshot_send_total_duration_seconds",
|
||||||
|
Help: "Total latency distributions of v3 snapshot sends",
|
||||||
|
|
||||||
|
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
|
||||||
|
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
|
||||||
|
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
|
||||||
|
},
|
||||||
|
[]string{"To"},
|
||||||
|
)
|
||||||
|
|
||||||
|
snapshotReceive = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "network",
|
||||||
|
Name: "snapshot_receive_success",
|
||||||
|
Help: "Total number of successful snapshot receives",
|
||||||
|
},
|
||||||
|
[]string{"From"},
|
||||||
|
)
|
||||||
|
|
||||||
|
snapshotReceiveFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "network",
|
||||||
|
Name: "snapshot_receive_failures",
|
||||||
|
Help: "Total number of snapshot receive failures",
|
||||||
|
},
|
||||||
|
[]string{"From"},
|
||||||
|
)
|
||||||
|
|
||||||
|
snapshotReceiveSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "network",
|
||||||
|
Name: "snapshot_receive_total_duration_seconds",
|
||||||
|
Help: "Total latency distributions of v3 snapshot receives",
|
||||||
|
|
||||||
|
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
|
||||||
|
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
|
||||||
|
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
|
||||||
|
},
|
||||||
|
[]string{"From"},
|
||||||
|
)
|
||||||
|
|
||||||
rtts = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
rtts = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||||
Namespace: "etcd",
|
Namespace: "etcd",
|
||||||
Subsystem: "network",
|
Subsystem: "network",
|
||||||
@@ -69,5 +131,13 @@ func init() {
|
|||||||
prometheus.MustRegister(receivedBytes)
|
prometheus.MustRegister(receivedBytes)
|
||||||
prometheus.MustRegister(sentFailures)
|
prometheus.MustRegister(sentFailures)
|
||||||
prometheus.MustRegister(recvFailures)
|
prometheus.MustRegister(recvFailures)
|
||||||
|
|
||||||
|
prometheus.MustRegister(snapshotSend)
|
||||||
|
prometheus.MustRegister(snapshotSendFailures)
|
||||||
|
prometheus.MustRegister(snapshotSendSeconds)
|
||||||
|
prometheus.MustRegister(snapshotReceive)
|
||||||
|
prometheus.MustRegister(snapshotReceiveFailures)
|
||||||
|
prometheus.MustRegister(snapshotReceiveSeconds)
|
||||||
|
|
||||||
prometheus.MustRegister(rtts)
|
prometheus.MustRegister(rtts)
|
||||||
}
|
}
|
||||||
|
@@ -56,7 +56,7 @@ func (s *peerStatus) deactivate(failure failureType, reason string) {
|
|||||||
msg := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason)
|
msg := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason)
|
||||||
if s.active {
|
if s.active {
|
||||||
plog.Errorf(msg)
|
plog.Errorf(msg)
|
||||||
plog.Infof("peer %s became inactive", s.id)
|
plog.Infof("peer %s became inactive (message send to peer failed)", s.id)
|
||||||
s.active = false
|
s.active = false
|
||||||
s.since = time.Time{}
|
s.since = time.Time{}
|
||||||
return
|
return
|
||||||
|
@@ -17,6 +17,7 @@ package rafthttp
|
|||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/xiang90/probing"
|
"github.com/xiang90/probing"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -28,7 +29,15 @@ var (
|
|||||||
statusErrorInterval = 5 * time.Second
|
statusErrorInterval = 5 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
func addPeerToProber(p probing.Prober, id string, us []string) {
|
const (
|
||||||
|
// RoundTripperNameRaftMessage is the name of round-tripper that sends
|
||||||
|
// all other Raft messages, other than "snap.Message".
|
||||||
|
RoundTripperNameRaftMessage = "ROUND_TRIPPER_RAFT_MESSAGE"
|
||||||
|
// RoundTripperNameSnapshot is the name of round-tripper that sends merged snapshot message.
|
||||||
|
RoundTripperNameSnapshot = "ROUND_TRIPPER_SNAPSHOT"
|
||||||
|
)
|
||||||
|
|
||||||
|
func addPeerToProber(p probing.Prober, id string, us []string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
|
||||||
hus := make([]string, len(us))
|
hus := make([]string, len(us))
|
||||||
for i := range us {
|
for i := range us {
|
||||||
hus[i] = us[i] + ProbingPrefix
|
hus[i] = us[i] + ProbingPrefix
|
||||||
@@ -40,26 +49,26 @@ func addPeerToProber(p probing.Prober, id string, us []string) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
plog.Errorf("failed to add peer %s into prober", id)
|
plog.Errorf("failed to add peer %s into prober", id)
|
||||||
} else {
|
} else {
|
||||||
go monitorProbingStatus(s, id)
|
go monitorProbingStatus(s, id, roundTripperName, rttSecProm)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func monitorProbingStatus(s probing.Status, id string) {
|
func monitorProbingStatus(s probing.Status, id string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
|
||||||
// set the first interval short to log error early.
|
// set the first interval short to log error early.
|
||||||
interval := statusErrorInterval
|
interval := statusErrorInterval
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-time.After(interval):
|
case <-time.After(interval):
|
||||||
if !s.Health() {
|
if !s.Health() {
|
||||||
plog.Warningf("health check for peer %s could not connect: %v", id, s.Err())
|
plog.Warningf("health check for peer %s could not connect: %v (prober %q)", id, s.Err(), roundTripperName)
|
||||||
interval = statusErrorInterval
|
interval = statusErrorInterval
|
||||||
} else {
|
} else {
|
||||||
interval = statusMonitoringInterval
|
interval = statusMonitoringInterval
|
||||||
}
|
}
|
||||||
if s.ClockDiff() > time.Second {
|
if s.ClockDiff() > time.Second {
|
||||||
plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second)
|
plog.Warningf("the clock difference against peer %s is too high [%v > %v] (prober %q)", id, s.ClockDiff(), time.Second, roundTripperName)
|
||||||
}
|
}
|
||||||
rtts.WithLabelValues(id).Observe(s.SRTT().Seconds())
|
rttSecProm.WithLabelValues(id).Observe(s.SRTT().Seconds())
|
||||||
case <-s.StopNotify():
|
case <-s.StopNotify():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@@ -63,7 +63,10 @@ func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *pe
|
|||||||
func (s *snapshotSender) stop() { close(s.stopc) }
|
func (s *snapshotSender) stop() { close(s.stopc) }
|
||||||
|
|
||||||
func (s *snapshotSender) send(merged snap.Message) {
|
func (s *snapshotSender) send(merged snap.Message) {
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
m := merged.Message
|
m := merged.Message
|
||||||
|
to := types.ID(m.To).String()
|
||||||
|
|
||||||
body := createSnapBody(merged)
|
body := createSnapBody(merged)
|
||||||
defer body.Close()
|
defer body.Close()
|
||||||
@@ -91,14 +94,18 @@ func (s *snapshotSender) send(merged snap.Message) {
|
|||||||
// machine knows about it, it would pause a while and retry sending
|
// machine knows about it, it would pause a while and retry sending
|
||||||
// new snapshot message.
|
// new snapshot message.
|
||||||
s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
|
s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
|
||||||
sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
|
sentFailures.WithLabelValues(to).Inc()
|
||||||
|
snapshotSendFailures.WithLabelValues(to).Inc()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.status.activate()
|
s.status.activate()
|
||||||
s.r.ReportSnapshot(m.To, raft.SnapshotFinish)
|
s.r.ReportSnapshot(m.To, raft.SnapshotFinish)
|
||||||
plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
|
plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
|
||||||
|
|
||||||
sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(merged.TotalSize))
|
sentBytes.WithLabelValues(to).Add(float64(merged.TotalSize))
|
||||||
|
|
||||||
|
snapshotSend.WithLabelValues(to).Inc()
|
||||||
|
snapshotSendSeconds.WithLabelValues(to).Observe(time.Since(start).Seconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
// post posts the given request.
|
// post posts the given request.
|
||||||
|
@@ -26,6 +26,7 @@ import (
|
|||||||
"github.com/coreos/etcd/raft"
|
"github.com/coreos/etcd/raft"
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
"github.com/coreos/etcd/snap"
|
"github.com/coreos/etcd/snap"
|
||||||
|
|
||||||
"github.com/coreos/pkg/capnslog"
|
"github.com/coreos/pkg/capnslog"
|
||||||
"github.com/xiang90/probing"
|
"github.com/xiang90/probing"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
@@ -121,7 +122,8 @@ type Transport struct {
|
|||||||
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
|
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
|
||||||
peers map[types.ID]Peer // peers map
|
peers map[types.ID]Peer // peers map
|
||||||
|
|
||||||
prober probing.Prober
|
pipelineProber probing.Prober
|
||||||
|
streamProber probing.Prober
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Transport) Start() error {
|
func (t *Transport) Start() error {
|
||||||
@@ -136,7 +138,8 @@ func (t *Transport) Start() error {
|
|||||||
}
|
}
|
||||||
t.remotes = make(map[types.ID]*remote)
|
t.remotes = make(map[types.ID]*remote)
|
||||||
t.peers = make(map[types.ID]Peer)
|
t.peers = make(map[types.ID]Peer)
|
||||||
t.prober = probing.NewProber(t.pipelineRt)
|
t.pipelineProber = probing.NewProber(t.pipelineRt)
|
||||||
|
t.streamProber = probing.NewProber(t.streamRt)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -197,7 +200,8 @@ func (t *Transport) Stop() {
|
|||||||
for _, p := range t.peers {
|
for _, p := range t.peers {
|
||||||
p.stop()
|
p.stop()
|
||||||
}
|
}
|
||||||
t.prober.RemoveAll()
|
t.pipelineProber.RemoveAll()
|
||||||
|
t.streamProber.RemoveAll()
|
||||||
if tr, ok := t.streamRt.(*http.Transport); ok {
|
if tr, ok := t.streamRt.(*http.Transport); ok {
|
||||||
tr.CloseIdleConnections()
|
tr.CloseIdleConnections()
|
||||||
}
|
}
|
||||||
@@ -276,8 +280,8 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
|
|||||||
}
|
}
|
||||||
fs := t.LeaderStats.Follower(id.String())
|
fs := t.LeaderStats.Follower(id.String())
|
||||||
t.peers[id] = startPeer(t, urls, id, fs)
|
t.peers[id] = startPeer(t, urls, id, fs)
|
||||||
addPeerToProber(t.prober, id.String(), us)
|
addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts)
|
||||||
|
addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts)
|
||||||
plog.Infof("added peer %s", id)
|
plog.Infof("added peer %s", id)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -304,7 +308,8 @@ func (t *Transport) removePeer(id types.ID) {
|
|||||||
}
|
}
|
||||||
delete(t.peers, id)
|
delete(t.peers, id)
|
||||||
delete(t.LeaderStats.Followers, id.String())
|
delete(t.LeaderStats.Followers, id.String())
|
||||||
t.prober.Remove(id.String())
|
t.pipelineProber.Remove(id.String())
|
||||||
|
t.streamProber.Remove(id.String())
|
||||||
plog.Infof("removed peer %s", id)
|
plog.Infof("removed peer %s", id)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -321,8 +326,10 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) {
|
|||||||
}
|
}
|
||||||
t.peers[id].update(urls)
|
t.peers[id].update(urls)
|
||||||
|
|
||||||
t.prober.Remove(id.String())
|
t.pipelineProber.Remove(id.String())
|
||||||
addPeerToProber(t.prober, id.String(), us)
|
addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts)
|
||||||
|
t.streamProber.Remove(id.String())
|
||||||
|
addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts)
|
||||||
plog.Infof("updated peer %s", id)
|
plog.Infof("updated peer %s", id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -35,8 +35,10 @@ func TestTransportSend(t *testing.T) {
|
|||||||
peer1 := newFakePeer()
|
peer1 := newFakePeer()
|
||||||
peer2 := newFakePeer()
|
peer2 := newFakePeer()
|
||||||
tr := &Transport{
|
tr := &Transport{
|
||||||
ServerStats: ss,
|
ServerStats: ss,
|
||||||
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
|
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
|
||||||
|
pipelineProber: probing.NewProber(nil),
|
||||||
|
streamProber: probing.NewProber(nil),
|
||||||
}
|
}
|
||||||
wmsgsIgnored := []raftpb.Message{
|
wmsgsIgnored := []raftpb.Message{
|
||||||
// bad local message
|
// bad local message
|
||||||
@@ -72,8 +74,10 @@ func TestTransportCutMend(t *testing.T) {
|
|||||||
peer1 := newFakePeer()
|
peer1 := newFakePeer()
|
||||||
peer2 := newFakePeer()
|
peer2 := newFakePeer()
|
||||||
tr := &Transport{
|
tr := &Transport{
|
||||||
ServerStats: ss,
|
ServerStats: ss,
|
||||||
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
|
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
|
||||||
|
pipelineProber: probing.NewProber(nil),
|
||||||
|
streamProber: probing.NewProber(nil),
|
||||||
}
|
}
|
||||||
|
|
||||||
tr.CutPeer(types.ID(1))
|
tr.CutPeer(types.ID(1))
|
||||||
@@ -100,10 +104,11 @@ func TestTransportCutMend(t *testing.T) {
|
|||||||
func TestTransportAdd(t *testing.T) {
|
func TestTransportAdd(t *testing.T) {
|
||||||
ls := stats.NewLeaderStats("")
|
ls := stats.NewLeaderStats("")
|
||||||
tr := &Transport{
|
tr := &Transport{
|
||||||
LeaderStats: ls,
|
LeaderStats: ls,
|
||||||
streamRt: &roundTripperRecorder{},
|
streamRt: &roundTripperRecorder{},
|
||||||
peers: make(map[types.ID]Peer),
|
peers: make(map[types.ID]Peer),
|
||||||
prober: probing.NewProber(nil),
|
pipelineProber: probing.NewProber(nil),
|
||||||
|
streamProber: probing.NewProber(nil),
|
||||||
}
|
}
|
||||||
tr.AddPeer(1, []string{"http://localhost:2380"})
|
tr.AddPeer(1, []string{"http://localhost:2380"})
|
||||||
|
|
||||||
@@ -128,10 +133,11 @@ func TestTransportAdd(t *testing.T) {
|
|||||||
|
|
||||||
func TestTransportRemove(t *testing.T) {
|
func TestTransportRemove(t *testing.T) {
|
||||||
tr := &Transport{
|
tr := &Transport{
|
||||||
LeaderStats: stats.NewLeaderStats(""),
|
LeaderStats: stats.NewLeaderStats(""),
|
||||||
streamRt: &roundTripperRecorder{},
|
streamRt: &roundTripperRecorder{},
|
||||||
peers: make(map[types.ID]Peer),
|
peers: make(map[types.ID]Peer),
|
||||||
prober: probing.NewProber(nil),
|
pipelineProber: probing.NewProber(nil),
|
||||||
|
streamProber: probing.NewProber(nil),
|
||||||
}
|
}
|
||||||
tr.AddPeer(1, []string{"http://localhost:2380"})
|
tr.AddPeer(1, []string{"http://localhost:2380"})
|
||||||
tr.RemovePeer(types.ID(1))
|
tr.RemovePeer(types.ID(1))
|
||||||
@@ -145,8 +151,9 @@ func TestTransportRemove(t *testing.T) {
|
|||||||
func TestTransportUpdate(t *testing.T) {
|
func TestTransportUpdate(t *testing.T) {
|
||||||
peer := newFakePeer()
|
peer := newFakePeer()
|
||||||
tr := &Transport{
|
tr := &Transport{
|
||||||
peers: map[types.ID]Peer{types.ID(1): peer},
|
peers: map[types.ID]Peer{types.ID(1): peer},
|
||||||
prober: probing.NewProber(nil),
|
pipelineProber: probing.NewProber(nil),
|
||||||
|
streamProber: probing.NewProber(nil),
|
||||||
}
|
}
|
||||||
u := "http://localhost:2380"
|
u := "http://localhost:2380"
|
||||||
tr.UpdatePeer(types.ID(1), []string{u})
|
tr.UpdatePeer(types.ID(1), []string{u})
|
||||||
@@ -159,13 +166,14 @@ func TestTransportUpdate(t *testing.T) {
|
|||||||
func TestTransportErrorc(t *testing.T) {
|
func TestTransportErrorc(t *testing.T) {
|
||||||
errorc := make(chan error, 1)
|
errorc := make(chan error, 1)
|
||||||
tr := &Transport{
|
tr := &Transport{
|
||||||
Raft: &fakeRaft{},
|
Raft: &fakeRaft{},
|
||||||
LeaderStats: stats.NewLeaderStats(""),
|
LeaderStats: stats.NewLeaderStats(""),
|
||||||
ErrorC: errorc,
|
ErrorC: errorc,
|
||||||
streamRt: newRespRoundTripper(http.StatusForbidden, nil),
|
streamRt: newRespRoundTripper(http.StatusForbidden, nil),
|
||||||
pipelineRt: newRespRoundTripper(http.StatusForbidden, nil),
|
pipelineRt: newRespRoundTripper(http.StatusForbidden, nil),
|
||||||
peers: make(map[types.ID]Peer),
|
peers: make(map[types.ID]Peer),
|
||||||
prober: probing.NewProber(nil),
|
pipelineProber: probing.NewProber(nil),
|
||||||
|
streamProber: probing.NewProber(nil),
|
||||||
}
|
}
|
||||||
tr.AddPeer(1, []string{"http://localhost:2380"})
|
tr.AddPeer(1, []string{"http://localhost:2380"})
|
||||||
defer tr.Stop()
|
defer tr.Stop()
|
||||||
|
@@ -20,6 +20,7 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
)
|
)
|
||||||
@@ -27,6 +28,8 @@ import (
|
|||||||
// SaveDBFrom saves snapshot of the database from the given reader. It
|
// SaveDBFrom saves snapshot of the database from the given reader. It
|
||||||
// guarantees the save operation is atomic.
|
// guarantees the save operation is atomic.
|
||||||
func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
|
func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
f, err := ioutil.TempFile(s.dir, "tmp")
|
f, err := ioutil.TempFile(s.dir, "tmp")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
@@ -34,7 +37,9 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
|
|||||||
var n int64
|
var n int64
|
||||||
n, err = io.Copy(f, r)
|
n, err = io.Copy(f, r)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
fsyncStart := time.Now()
|
||||||
err = fileutil.Fsync(f)
|
err = fileutil.Fsync(f)
|
||||||
|
snapDBFsyncSec.Observe(time.Since(fsyncStart).Seconds())
|
||||||
}
|
}
|
||||||
f.Close()
|
f.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -54,6 +59,7 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
|
|||||||
|
|
||||||
plog.Infof("saved database snapshot to disk [total bytes: %d]", n)
|
plog.Infof("saved database snapshot to disk [total bytes: %d]", n)
|
||||||
|
|
||||||
|
snapDBSaveSec.Observe(time.Since(start).Seconds())
|
||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -33,9 +33,33 @@ var (
|
|||||||
Help: "The marshalling cost distributions of save called by snapshot.",
|
Help: "The marshalling cost distributions of save called by snapshot.",
|
||||||
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
|
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
|
||||||
})
|
})
|
||||||
|
|
||||||
|
snapDBSaveSec = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "snap_db",
|
||||||
|
Name: "save_total_duration_seconds",
|
||||||
|
Help: "The total latency distributions of v3 snapshot save",
|
||||||
|
|
||||||
|
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
|
||||||
|
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
|
||||||
|
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
|
||||||
|
})
|
||||||
|
|
||||||
|
snapDBFsyncSec = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "snap_db",
|
||||||
|
Name: "fsync_duration_seconds",
|
||||||
|
Help: "The latency distributions of fsyncing .snap.db file",
|
||||||
|
|
||||||
|
// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
|
||||||
|
// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
|
||||||
|
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
|
||||||
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
prometheus.MustRegister(saveDurations)
|
prometheus.MustRegister(saveDurations)
|
||||||
prometheus.MustRegister(marshallingDurations)
|
prometheus.MustRegister(marshallingDurations)
|
||||||
|
prometheus.MustRegister(snapDBSaveSec)
|
||||||
|
prometheus.MustRegister(snapDBFsyncSec)
|
||||||
}
|
}
|
||||||
|
@@ -5,17 +5,13 @@ if ! [[ "$0" =~ "tests/semaphore.test.bash" ]]; then
|
|||||||
exit 255
|
exit 255
|
||||||
fi
|
fi
|
||||||
|
|
||||||
TEST_SUFFIX=$(date +%s | base64 | head -c 15)
|
<<COMMENT
|
||||||
|
# amd64-e2e
|
||||||
|
tests/semaphore.test.bash
|
||||||
|
sudo HOST_TMP_DIR=/tmp TEST_OPTS="PASSES='build release e2e' MANUAL_VER=v3.3.7" make docker-test
|
||||||
|
|
||||||
TEST_OPTS="PASSES='build unit release integration_e2e functional' MANUAL_VER=v3.1.12"
|
# 386-e2e
|
||||||
if [ "$TEST_ARCH" == "386" ]; then
|
sudo HOST_TMP_DIR=/tmp TEST_OPTS="GOARCH=386 PASSES='build e2e'" make docker-test
|
||||||
TEST_OPTS="GOARCH=386 PASSES='build unit integration_e2e'"
|
COMMENT
|
||||||
fi
|
|
||||||
|
|
||||||
docker run \
|
sudo HOST_TMP_DIR=/tmp TEST_OPTS="PASSES='build release e2e' MANUAL_VER=v3.1.18" make docker-test
|
||||||
--rm \
|
|
||||||
--volume=`pwd`:/go/src/github.com/coreos/etcd \
|
|
||||||
gcr.io/etcd-development/etcd-test:go1.8.7 \
|
|
||||||
/bin/bash -c "${TEST_OPTS} ./test 2>&1 | tee test-${TEST_SUFFIX}.log"
|
|
||||||
|
|
||||||
! egrep "(--- FAIL:|panic: test timed out|appears to have leaked)" -B50 -A10 test-${TEST_SUFFIX}.log
|
|
||||||
|
@@ -26,7 +26,7 @@ import (
|
|||||||
var (
|
var (
|
||||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||||
MinClusterVersion = "3.0.0"
|
MinClusterVersion = "3.0.0"
|
||||||
Version = "3.1.15"
|
Version = "3.1.20"
|
||||||
APIVersion = "unknown"
|
APIVersion = "unknown"
|
||||||
|
|
||||||
// Git SHA Value will be set during build
|
// Git SHA Value will be set during build
|
||||||
|
Reference in New Issue
Block a user