Compare commits

...

63 Commits

Author SHA1 Message Date
Joe Betz
992dbd4d1e version: bump up to 3.1.20 2018-10-10 11:02:11 -07:00
Gyuho Lee
b39c0f9471 etcdserver: add "etcd_server_read_indexes_failed_total"
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2018-10-09 18:21:06 -07:00
Gyuho Lee
3381ef1602 rafthttp: probe all raft transports
This PR adds another probing routine to monitor the connection
for Raft message transports. Previously, we only monitored
snapshot transports.

In our production cluster, we found one TCP connection had >8-sec
latencies to a remote peer, but "etcd_network_peer_round_trip_time_seconds"
metrics shows <1-sec latency distribution, which means etcd server
was not sampling enough while such latency spikes happen
outside of snapshot pipeline connection.

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2018-10-09 18:16:08 -07:00
Gyuho Lee
c096dc2cc5 etcdserver: add "etcd_server_health_success/failures"
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2018-10-09 18:06:37 -07:00
Jingyi Hu
3e99b42612 Merge pull request #10163 from jingyih/automated-cherry-pick-of-#10153-origin-release-3.1
clientv3: automated cherry pick of #10153 to release 3.1
2018-10-08 18:46:55 -07:00
yura
cf7be488a9 clientv3: concurrency.Mutex.Lock() - preserve invariant
Convenient invariant:
- if werr == nil then lock is supposed to be locked at the moment.

While we could not be confident in stronger invariant ('is exactly locked'),
it were inconvenient that previous code could return `werr == nil` after
Mutex.Unlock.

It could happen when ctx is canceled/timeouted exactly after waitDeletes
successfully returned werr == nil and before `<-ctx.Done()` checked.
While such situation is very rare, it is still possible.

fixes #10111
2018-10-08 16:53:39 -07:00
Gyuho Lee
65fff06adc Merge pull request #10124 from jingyih/cherry-pick-of-#10109-origin-release-3.1
etcdctl: cherry pick of #10109 to release-3.1
2018-09-25 19:55:23 -07:00
Jingyi Hu
87b4e08c29 etcdctl: cherry pick of #10109 to release-3.1
Add snapshot file integrity verification when querying snapshot status.
2018-09-25 17:51:12 -07:00
Gyuho Lee
216be8b79b etcdserver: add "etcd_server_id"
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2018-08-29 14:49:01 -07:00
Gyuho Lee
dfcf82b6ff etcdserver: clarify read index wait timeout warnings
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2018-08-29 14:38:22 -07:00
Gyuho Lee
9197907515 rafthttp: clarify "became inactive" warning
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2018-08-29 14:33:17 -07:00
Gyuho Lee
14883cad78 Merge pull request #10043 from wenjiaswe/automated-cherry-pick-of-#9997-upstream-release-3.1
Automated cherry pick of #9997
2018-08-29 12:42:05 -07:00
Wenjia
4e7691ddcc remove automatic added imports 2018-08-28 15:00:44 -07:00
Gyuho Lee
8a68ae95ec etcdserver/api/rafthttp: add v3 snapshot send/receive metrics
Distribution would be:
0.1 second or more
...
25.6 seconds or more
51.2 seconds or more

etcd_network_snapshot_send_success
etcd_network_snapshot_send_failures
etcd_network_snapshot_send_total_duration_seconds
etcd_network_snapshot_receive_success
etcd_network_snapshot_receive_failures
etcd_network_snapshot_receive_total_duration_seconds

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2018-08-28 14:34:21 -07:00
Gyuho Lee
ef1d332298 etcdserver/api/snap: add v3 snapshot fsync metrics
etcd_snap_db_fsync_duration_seconds_count
etcd_snap_db_save_total_duration_seconds_bucket

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2018-08-28 14:16:05 -07:00
Xiang Li
116c442615 Merge pull request #10034 from gyuho/init-metrics-3.1
etcdserver/api/v3rpc: display all registered gRPC metrics at start (v3.1)
2018-08-24 18:52:40 -07:00
Gyuho Lee
e07fb41140 etcdserver/api/v3rpc: display all registered gRPC metrics at start
Previously, only display the one that has been requested at least once.
Now it shows all metrics, as we do in v3.3 and v3.4+.

grpc_server_started_total{grpc_method="Alarm",grpc_service="etcdserverpb.Maintenance",grpc_type="unary"} 0
grpc_server_started_total{grpc_method="AuthDisable",grpc_service="etcdserverpb.Auth",grpc_type="unary"} 0
grpc_server_started_total{grpc_method="AuthEnable",grpc_service="etcdserverpb.Auth",grpc_type="unary"} 0
grpc_server_started_total{grpc_method="Authenticate",grpc_service="etcdserverpb.Auth",grpc_type="unary"} 0
grpc_server_started_total{grpc_method="Compact",grpc_service="etcdserverpb.KV",grpc_type="unary"} 0
grpc_server_started_total{grpc_method="Defragment",grpc_service="etcdserverpb.Maintenance",grpc_type="unary"} 0
grpc_server_started_total{grpc_method="DeleteRange",grpc_service="etcdserverpb.KV",grpc_type="unary"} 0

Should help document metrics.

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2018-08-22 19:14:58 -07:00
Joe Betz
2c616b0c3d Merge pull request #10030 from jingyih/cherry-pick-of-#9990-origin-release-3.1
etcdserver: cherry pick of #9990 to release 3.1
2018-08-20 15:33:27 -07:00
Jingyi Hu
dd2803c4a6 etcdserver: add grpc interceptor to log info on incoming request to
etcdserver

To improve debuggability of etcd v3.1. Added a grpc interceptor to log
info on incoming requests to etcd server. The log output includes remote
client info, request content (with value field redacted), request
handling latency, response size, etc.

Dependency on zap logger and grpc_middleware is removed during
backporting.

Added checking in logging interceptor. If debug level is disabled, skip
logUnaryRequestStats() to avoid potential performance degradation. (PR #10021)
2018-08-20 14:32:48 -07:00
Jingyi Hu
4855ca62b5 etcdserver: add grpc interceptor to log info on incoming request to etcdserver.
To improve debuggability of etcd v3. Added a grpc interceptor to log
info on incoming requests to etcd server. The log output includes remote
client info, request content (with value field redacted), request
handling latency, response size, etc.

Dependency on zap logger and grpc_middleware is removed during
backporting.

Added checking in logging interceptor. If debug level is disabled, skip
logUnaryRequestStats() to avoid potential performance degradation. (PR #10021)
2018-08-20 13:54:24 -07:00
Joe Betz
bb205caa68 version: bump up to 3.1.19+git 2018-07-24 10:07:31 -07:00
Joe Betz
a1d6802da2 version: bump up to 3.1.19 2018-07-24 10:04:37 -07:00
Gyuho Lee
79d80bd259 etcdserver: add "etcd_server_go_version" metric
Currently, one has to look at server logs manually,
to see what Go version was used to build etcd server.

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-07-23 16:38:10 -07:00
Gyuho Lee
081519c323 clientv3: fix keepalive send interval when response queue is full
client should update next keepalive send time
even when lease keepalive response queue becomes full.

Otherwise, client sends keepalive request every 500ms
regardless of TTL when the send is only expected to happen
with the interval of TTL / 3 at minimum.

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-07-23 08:50:07 -07:00
Gyuho Lee
e0d5a028d5 Merge pull request #9944 from wenjiaswe/automated-cherry-pick-of-#9761-upstream-release-3.1
Automated cherry pick of #9761
2018-07-20 14:51:20 -07:00
Wenjia
a421a604d6 remove hashRevDurations 2018-07-20 13:49:58 -07:00
Gyuho Lee
0fbf49df11 etcdserver: rename to "heartbeat_send_failures_total"
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-07-20 11:40:37 -07:00
Gyuho Lee
fb5080b306 mvcc: add "etcd_mvcc_hash_(rev)_duration_seconds"
etcd_mvcc_hash_duration_seconds
etcd_mvcc_hash_rev_duration_seconds

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-07-20 11:37:06 -07:00
Gyuho Lee
cac6ce756d mvcc/backend: fix defrag duration scale
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-07-20 10:53:26 -07:00
Gyuho Lee
9f58e57a3c mvcc/backend: add "etcd_disk_backend_defrag_duration_seconds"
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-07-20 10:53:26 -07:00
Gyuho Lee
22c25dd4e7 mvcc/backend: document metrics ExponentialBuckets
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-07-20 10:44:52 -07:00
Gyuho Lee
92a7b5df80 mvcc/backend: clean up mutex, logging
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-07-20 10:35:39 -07:00
Gyuho Lee
3f1fe618ad etcdserver: add "etcd_server_slow_apply_total"
{"level":"warn","ts":1527101858.6985068,"caller":"etcdserver/util.go:115","msg":"apply request took too long","took":0.114101529,"expected-duration":0.1,"prefix":"","request":"header:<ID:1029181977902852337> put:<key:\"\\000\\000...

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-07-20 10:25:16 -07:00
Gyuho Lee
b8547734ae etcdserver: add "etcd_server_heartbeat_failures_total"
{"level":"warn","ts":1527101858.4149103,"caller":"etcdserver/raft.go:370","msg":"failed to send out heartbeat; took too long, server is overloaded likely from slow disk","heartbeat-interval":0.1,"expected-duration":0.2,"exceeded-duration":0.025771662}
{"level":"warn","ts":1527101858.4149644,"caller":"etcdserver/raft.go:370","msg":"failed to send out heartbeat; took too long, server is overloaded likely from slow disk","heartbeat-interval":0.1,"expected-duration":0.2,"exceeded-duration":0.034015766}

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-07-20 10:24:40 -07:00
Gyuho Lee
78a13e67a0 mvcc/backend: avoid unnecessary metrics update
https://github.com/coreos/etcd/pull/9300

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-07-03 14:53:20 -07:00
Gyuho Lee
84d11a51c1 mvcc: use "t.tx.DB()" to fetch DB
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-07-03 14:34:20 -07:00
Gyuho Lee
a9c4b98756 mvcc: add "etcd_mvcc_db_total_size_in_use_in_bytes"
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-07-03 14:21:11 -07:00
Gyuho Lee
5531e3b0f5 mvcc: add "etcd_mvcc_db_total_size_in_bytes"
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-07-03 13:51:06 -07:00
Gyuho Lee
c2623bb840 etcdserver: add "etcd_server_quota_backend_bytes"
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-07-03 13:30:10 -07:00
Gyuho Lee
f46b4677c0 etcdserver: add "etcd_server_slow_read_indexes_total"
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-07-03 12:58:29 -07:00
Gyuho Lee
09843d5d90 etcdserver: clarify read index warnings
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-07-03 12:55:31 -07:00
Gyuho Lee
be3e6f6ed5 tests: update test scripts
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-06-18 14:15:52 -07:00
Joe Betz
d84dd18637 version: bump up to 3.1.18+git 2018-06-15 09:51:30 -07:00
Joe Betz
b7ff47f9d5 version: bump up to 3.1.18 2018-06-15 09:47:04 -07:00
Gyuho Lee
fab24fbdab Merge pull request #9848 from wenjiaswe/automated-cherry-pick-of-#8960-upstream-release-3.1
Automated cherry pick of #8960
2018-06-13 16:49:48 -07:00
Joe Betz
b3ee996629 metrics: Add server_version metric 2018-06-13 16:31:18 -07:00
Gyuho Lee
06da6cf983 tests/semaphore.test.bash: update
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-06-13 14:42:45 -07:00
Gyuho Lee
9c00100550 Makefile: update
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-06-13 14:42:10 -07:00
Gyuho Lee
1d7a2ca520 Merge pull request #9838 from jpbetz/automated-cherry-pick-of-#9821-origin-release-3.1-1528833932
etcdserver: Automated cherry pick of detailed "took too long" warnings to release-3.1
2018-06-12 13:54:40 -07:00
Joe Betz
e90934ec71 etcdserver: Fix txn request 'took too long' warnings to use loggable request stringer 2018-06-12 13:22:45 -07:00
Joe Betz
23c5c71426 etcdserver: Add response byte size and range response count to took too long warning 2018-06-12 13:22:45 -07:00
Joe Betz
72a2483d42 etcdserver: Replace value contents with value_size in request took too long warning 2018-06-12 13:15:19 -07:00
Hitoshi Mitake
53eae781fe etcdserver: not print password in the warning message of expensive request
Fix https://github.com/coreos/etcd/issues/9635
2018-06-12 13:15:18 -07:00
Joe Betz
7b1b7def84 etcdserver: Fix to backport of #9288 for pre-RequestV2 code 2018-06-12 13:13:46 -07:00
Xiang
df000fd776 etcdserver: improve request took too long warning 2018-06-12 13:13:46 -07:00
Joe Betz
fd61be4814 version: bump up to 3.1.17+git 2018-06-06 10:36:22 -07:00
Joe Betz
781cc0be83 version: bump up to 3.1.17 2018-06-06 09:54:59 -07:00
Joe Betz
ebe351e3b4 Merge pull request #9808 from jpbetz/snapshot-recover-3.1
etcdserver: Backport snapshot recovery from #7917 to 3.1 branch
2018-06-05 16:13:40 -07:00
Joe Betz
e31510975a etcdserver: Backport snapshot recovery from #7917 to 3.1 branch 2018-06-04 21:52:26 -07:00
Joe Betz
43b0cafcb6 version: bump up to 3.1.16+git 2018-05-31 12:54:30 -07:00
Joe Betz
169af4470e version: bump up to 3.1.16 2018-05-31 12:51:28 -07:00
Gyuho Lee
c4c487eaca mvcc: fix panic by allowing future revision watcher from restore operation
This also happens without gRPC proxy.

Fix panic when gRPC proxy leader watcher is restored:

```
go test -v -tags cluster_proxy -cpu 4 -race -run TestV3WatchRestoreSnapshotUnsync

=== RUN   TestV3WatchRestoreSnapshotUnsync
panic: watcher minimum revision 9223372036854775805 should not exceed current revision 16

goroutine 156 [running]:
github.com/coreos/etcd/mvcc.(*watcherGroup).chooseAll(0xc4202b8720, 0x10, 0xffffffffffffffff, 0x1)
	/home/gyuho/go/src/github.com/coreos/etcd/mvcc/watcher_group.go:242 +0x3b5
github.com/coreos/etcd/mvcc.(*watcherGroup).choose(0xc4202b8720, 0x200, 0x10, 0xffffffffffffffff, 0xc420253378, 0xc420253378)
	/home/gyuho/go/src/github.com/coreos/etcd/mvcc/watcher_group.go:225 +0x289
github.com/coreos/etcd/mvcc.(*watchableStore).syncWatchers(0xc4202b86e0, 0x0)
	/home/gyuho/go/src/github.com/coreos/etcd/mvcc/watchable_store.go:340 +0x237
github.com/coreos/etcd/mvcc.(*watchableStore).syncWatchersLoop(0xc4202b86e0)
	/home/gyuho/go/src/github.com/coreos/etcd/mvcc/watchable_store.go:214 +0x280
created by github.com/coreos/etcd/mvcc.newWatchableStore
	/home/gyuho/go/src/github.com/coreos/etcd/mvcc/watchable_store.go:90 +0x477
exit status 2
FAIL	github.com/coreos/etcd/integration	2.551s
```

gRPC proxy spawns a watcher with a key "proxy-namespace__lostleader"
and watch revision "int64(math.MaxInt64 - 2)" to detect leader loss.
But, when the partitioned node restores, this watcher triggers
panic with "watcher minimum revision ... should not exceed current ...".

This check was added a long time ago, by my PR, when there was no gRPC proxy:

https://github.com/coreos/etcd/pull/4043#discussion_r48457145

> we can remove this checking actually. it is impossible for a unsynced watching to have a future rev. or we should just panic here.

However, now it's possible that a unsynced watcher has a future
revision, when it was moved from a synced watcher group through
restore operation.

This PR adds "restore" flag to indicate that a watcher was moved
from the synced watcher group with restore operation. Otherwise,
the watcher with future revision in an unsynced watcher group
would still panic.

Example logs with future revision watcher from restore operation:

```
{"level":"info","ts":1527196358.9057755,"caller":"mvcc/watcher_group.go:261","msg":"choosing future revision watcher from restore operation","watch-key":"proxy-namespace__lostleader","watch-revision":9223372036854775805,"current-revision":16}
{"level":"info","ts":1527196358.910349,"caller":"mvcc/watcher_group.go:261","msg":"choosing future revision watcher from restore operation","watch-key":"proxy-namespace__lostleader","watch-revision":9223372036854775805,"current-revision":16}
```

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-05-31 11:42:25 -07:00
Joe Betz
6bb88b9617 version: bump up to 3.1.15+git 2018-05-09 10:26:18 -07:00
39 changed files with 1061 additions and 171 deletions

View File

@@ -14,20 +14,20 @@ notifications:
env:
matrix:
- TARGET=linux-amd64-build
- TARGET=linux-amd64-unit
- TARGET=linux-amd64-integration
- TARGET=linux-amd64-functional
- TARGET=linux-386-build
- TARGET=linux-amd64-unit
- TARGET=all-build
- TARGET=linux-386-unit
- TARGET=darwin-amd64-build
- TARGET=windows-amd64-build
- TARGET=linux-arm-build
- TARGET=linux-arm64-build
- TARGET=linux-ppc64le-build
matrix:
fast_finish: true
allow_failures:
- go: 1.8.7
env: TARGET=linux-386-unit
exclude:
- go: tip
env: TARGET=linux-386-unit
before_install:
- if [[ $TRAVIS_GO_VERSION == 1.* ]]; then docker pull gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION}; fi
@@ -39,16 +39,6 @@ script:
- echo "TRAVIS_GO_VERSION=${TRAVIS_GO_VERSION}"
- >
case "${TARGET}" in
linux-amd64-build)
docker run --rm \
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
/bin/bash -c "GOARCH=amd64 PASSES='build' ./test"
;;
linux-amd64-unit)
docker run --rm \
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
/bin/bash -c "GOARCH=amd64 PASSES='unit' ./test"
;;
linux-amd64-integration)
docker run --rm \
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
@@ -59,39 +49,25 @@ script:
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
/bin/bash -c "./build && GOARCH=amd64 PASSES='functional' ./test"
;;
linux-386-build)
linux-amd64-unit)
docker run --rm \
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
/bin/bash -c "GOARCH=386 PASSES='build' ./test"
/bin/bash -c "GOARCH=amd64 PASSES='unit' ./test"
;;
all-build)
docker run --rm \
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
/bin/bash -c "GOARCH=amd64 PASSES='build' ./test \
&& GOARCH=386 PASSES='build' ./test \
&& GO_BUILD_FLAGS='-v' GOOS=darwin GOARCH=amd64 ./build \
&& GO_BUILD_FLAGS='-v' GOOS=windows GOARCH=amd64 ./build \
&& GO_BUILD_FLAGS='-v' GOARCH=arm ./build \
&& GO_BUILD_FLAGS='-v' GOARCH=arm64 ./build \
&& GO_BUILD_FLAGS='-v' GOARCH=ppc64le ./build"
;;
linux-386-unit)
docker run --rm \
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
/bin/bash -c "GOARCH=386 PASSES='unit' ./test"
;;
darwin-amd64-build)
docker run --rm \
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
/bin/bash -c "GO_BUILD_FLAGS='-v' GOOS=darwin GOARCH=amd64 ./build"
;;
windows-amd64-build)
docker run --rm \
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
/bin/bash -c "GO_BUILD_FLAGS='-v' GOOS=windows GOARCH=amd64 ./build"
;;
linux-arm-build)
docker run --rm \
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
/bin/bash -c "GO_BUILD_FLAGS='-v' GOARCH=arm ./build"
;;
linux-arm64-build)
docker run --rm \
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
/bin/bash -c "GO_BUILD_FLAGS='-v' GOARCH=arm64 ./build"
;;
linux-ppc64le-build)
docker run --rm \
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
/bin/bash -c "GO_BUILD_FLAGS='-v' GOARCH=ppc64le ./build"
;;
esac

View File

@@ -20,12 +20,16 @@ clean:
rm -f ./codecov
rm -rf ./agent-*
rm -rf ./covdir
rm -f ./*.coverprofile
rm -f ./*.log
rm -f ./bin/Dockerfile-release
rm -rf ./bin/*.etcd
rm -rf ./default.etcd
rm -rf ./tests/e2e/default.etcd
rm -rf ./gopath
rm -rf ./gopath.proto
rm -rf ./release
rm -f ./snapshot/localhost:*
rm -f ./integration/127.0.0.1:* ./integration/localhost:*
rm -f ./clientv3/integration/127.0.0.1:* ./clientv3/integration/localhost:*
rm -f ./clientv3/ordering/127.0.0.1:* ./clientv3/ordering/localhost:*
@@ -46,7 +50,8 @@ docker-remove:
GO_VERSION ?= 1.10.1
# GO_VERSION ?= 1.10.3
GO_VERSION ?= 1.8.7
ETCD_VERSION ?= $(shell git rev-parse --short HEAD || echo "GitNotFound")
TEST_SUFFIX = $(shell date +%s | base64 | head -c 15)
@@ -61,16 +66,16 @@ endif
# Example:
# GO_VERSION=1.8.7 make build-docker-test
# GO_VERSION=1.9.5 make build-docker-test
# GO_VERSION=1.9.7 make build-docker-test
# make build-docker-test
#
# gcloud docker -- login -u _json_key -p "$(cat /etc/gcp-key-etcd-development.json)" https://gcr.io
# GO_VERSION=1.8.7 make push-docker-test
# GO_VERSION=1.9.5 make push-docker-test
# GO_VERSION=1.9.7 make push-docker-test
# make push-docker-test
#
# gsutil -m acl ch -u allUsers:R -r gs://artifacts.etcd-development.appspot.com
# GO_VERSION=1.9.5 make pull-docker-test
# GO_VERSION=1.9.7 make pull-docker-test
# make pull-docker-test
build-docker-test:
@@ -118,7 +123,7 @@ compile-setup-gopath-with-docker-test:
#
# Local machine:
# TEST_OPTS="PASSES='fmt'" make test
# TEST_OPTS="PASSES='fmt bom dep compile build unit'" make test
# TEST_OPTS="PASSES='fmt bom dep build unit'" make test
# TEST_OPTS="PASSES='build unit release integration_e2e functional'" make test
# TEST_OPTS="PASSES='build grpcproxy'" make test
#
@@ -128,7 +133,7 @@ compile-setup-gopath-with-docker-test:
# TEST_OPTS="VERBOSE=2 PASSES='unit'" make docker-test
#
# Travis CI (test with docker):
# TEST_OPTS="PASSES='fmt bom dep compile build unit'" make docker-test
# TEST_OPTS="PASSES='fmt bom dep build unit'" make docker-test
#
# Semaphore CI (test with docker):
# TEST_OPTS="PASSES='build unit release integration_e2e functional'" make docker-test

View File

@@ -58,11 +58,9 @@ func (m *Mutex) Lock(ctx context.Context) error {
// wait for deletion revisions prior to myKey
err = waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if cancelled
select {
case <-ctx.Done():
// release lock key if wait failed
if err != nil {
m.Unlock(client.Ctx())
default:
}
return err
}

View File

@@ -308,6 +308,49 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {
}
}
// TestLeaseKeepAliveFullResponseQueue ensures when response
// queue is full thus dropping keepalive response sends,
// keepalive request is sent with the same rate of TTL / 3.
func TestLeaseKeepAliveFullResponseQueue(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
lapi := clus.Client(0)
// expect lease keepalive every 10-second
lresp, err := lapi.Grant(context.Background(), 30)
if err != nil {
t.Fatalf("failed to create lease %v", err)
}
id := lresp.ID
old := clientv3.LeaseResponseChSize
defer func() {
clientv3.LeaseResponseChSize = old
}()
clientv3.LeaseResponseChSize = 0
// never fetch from response queue, and let it become full
_, err = lapi.KeepAlive(context.Background(), id)
if err != nil {
t.Fatalf("failed to keepalive lease %v", err)
}
// TTL should not be refreshed after 3 seconds
// expect keepalive to be triggered after TTL/3
time.Sleep(3 * time.Second)
tr, terr := lapi.TimeToLive(context.Background(), id)
if terr != nil {
t.Fatalf("failed to get lease information %v", terr)
}
if tr.TTL >= 29 {
t.Errorf("unexpected kept-alive lease TTL %d", tr.TTL)
}
}
func TestLeaseGrantNewAfterClose(t *testing.T) {
defer testutil.AfterTest(t)

View File

@@ -63,12 +63,15 @@ const (
// defaultTTL is the assumed lease TTL used for the first keepalive
// deadline before the actual TTL is known to the client.
defaultTTL = 5 * time.Second
// a small buffer to store unsent lease responses.
leaseResponseChSize = 16
// NoLease is a lease ID for the absence of a lease.
NoLease LeaseID = 0
)
// LeaseResponseChSize is the size of buffer to store unsent lease responses.
// WARNING: DO NOT UPDATE.
// Only for testing purposes.
var LeaseResponseChSize = 16
// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
//
// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
@@ -223,7 +226,7 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption
}
func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)
ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)
l.mu.Lock()
// ensure that recvKeepAliveLoop is still running
@@ -421,9 +424,10 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
for _, ch := range ka.chs {
select {
case ch <- karesp:
ka.nextKeepAlive = nextKeepAlive
default:
}
// still advance in order to rate-limit keep-alive sends
ka.nextKeepAlive = nextKeepAlive
}
}

View File

@@ -417,6 +417,14 @@ func dbStatus(p string) dbstatus {
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
err = db.View(func(tx *bolt.Tx) error {
// check snapshot file integrity first
var dbErrStrings []string
for dbErr := range tx.Check() {
dbErrStrings = append(dbErrStrings, dbErr.Error())
}
if len(dbErrStrings) > 0 {
return fmt.Errorf("snapshot file integrity check failed. %d errors found.\n"+strings.Join(dbErrStrings, "\n"), len(dbErrStrings))
}
ds.TotalSize = tx.Size()
c := tx.Cursor()
for next, _ := c.First(); next != nil; next, _ = c.Next() {

View File

@@ -76,11 +76,11 @@ func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http
}
mh := &membersHandler{
sec: sec,
server: server,
cluster: server.Cluster(),
timeout: timeout,
clock: clockwork.NewRealClock(),
sec: sec,
server: server,
cluster: server.Cluster(),
timeout: timeout,
clock: clockwork.NewRealClock(),
clientCertAuthEnabled: server.Cfg.ClientCertAuthEnabled,
}
@@ -346,6 +346,26 @@ func serveVars(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "\n}\n")
}
var (
healthSuccess = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "health_success",
Help: "The total number of successful health checks",
})
healthFailed = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "health_failures",
Help: "The total number of failed health checks",
})
)
func init() {
prometheus.MustRegister(healthSuccess)
prometheus.MustRegister(healthFailed)
}
func healthHandler(server *etcdserver.EtcdServer) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r.Method, "GET") {
@@ -353,16 +373,19 @@ func healthHandler(server *etcdserver.EtcdServer) http.HandlerFunc {
}
if uint64(server.Leader()) == raft.None {
http.Error(w, `{"health": "false"}`, http.StatusServiceUnavailable)
healthFailed.Inc()
return
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if _, err := server.Do(ctx, etcdserverpb.Request{Method: "QGET"}); err != nil {
http.Error(w, `{"health": "false"}`, http.StatusServiceUnavailable)
healthFailed.Inc()
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"health": "true"}`))
healthSuccess.Inc()
}
}

View File

@@ -19,6 +19,8 @@ import (
"github.com/coreos/etcd/etcdserver"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
@@ -45,5 +47,8 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))
// to display all registered metrics with zero values
grpc_prometheus.Register(grpcServer)
return grpcServer
}

View File

@@ -24,10 +24,13 @@ import (
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/pkg/capnslog"
prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
)
const (
@@ -40,7 +43,7 @@ type streamsMap struct {
}
func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if !api.IsCapabilityEnabled(api.V3rpcCapability) {
return nil, rpctypes.ErrGRPCNotCapable
}
@@ -54,10 +57,116 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
}
}
return prometheus.UnaryServerInterceptor(ctx, req, info, handler)
return logUnaryInterceptor(ctx, req, info, handler)
// interceptors are chained manually during backporting, for better readability refer to PR #9990
}
}
// logUnaryInterceptor is a gRPC server-side interceptor that prints info on incoming requests for debugging purpose
func logUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
startTime := time.Now()
resp, err := prometheus.UnaryServerInterceptor(ctx, req, info, handler)
// interceptors are chained manually during backporting, for better readability refer to PR #9990
if plog.LevelAt(capnslog.DEBUG) {
defer logUnaryRequestStats(ctx, info, startTime, req, resp)
}
return resp, err
}
func logUnaryRequestStats(ctx context.Context, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) {
duration := time.Since(startTime)
remote := "No remote client info."
peerInfo, ok := peer.FromContext(ctx)
if ok {
remote = peerInfo.Addr.String()
}
var responseType string = info.FullMethod
var reqCount, respCount int64
var reqSize, respSize int
var reqContent string
switch _resp := resp.(type) {
case *pb.RangeResponse:
_req, ok := req.(*pb.RangeRequest)
if ok {
reqCount = 0
reqSize = _req.Size()
reqContent = _req.String()
}
if _resp != nil {
respCount = _resp.Count
respSize = _resp.Size()
}
case *pb.PutResponse:
_req, ok := req.(*pb.PutRequest)
if ok {
reqCount = 1
reqSize = _req.Size()
reqContent = pb.NewLoggablePutRequest(_req).String()
// redact value field from request content, see PR #9821
}
if _resp != nil {
respCount = 0
respSize = _resp.Size()
}
case *pb.DeleteRangeResponse:
_req, ok := req.(*pb.DeleteRangeRequest)
if ok {
reqCount = 0
reqSize = _req.Size()
reqContent = _req.String()
}
if _resp != nil {
respCount = _resp.Deleted
respSize = _resp.Size()
}
case *pb.TxnResponse:
_req, ok := req.(*pb.TxnRequest)
if ok && _resp != nil {
if _resp.Succeeded { // determine the 'actual' count and size of request based on success or failure
reqCount = int64(len(_req.GetSuccess()))
reqSize = 0
for _, r := range _req.GetSuccess() {
reqSize += r.Size()
}
} else {
reqCount = int64(len(_req.GetFailure()))
reqSize = 0
for _, r := range _req.GetFailure() {
reqSize += r.Size()
}
}
reqContent = pb.NewLoggableTxnRequest(_req).String()
// redact value field from request content, see PR #9821
}
if _resp != nil {
respCount = 0
respSize = _resp.Size()
}
default:
reqCount = -1
reqSize = -1
respCount = -1
respSize = -1
}
logGenericRequestStats(startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
}
func logGenericRequestStats(startTime time.Time, duration time.Duration, remote string, responseType string,
reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
plog.Debugf("start time = %v, "+
"time spent = %v, "+
"remote = %s, "+
"response type = %s, "+
"request count = %d, "+
"request size = %d, "+
"response count = %d, "+
"response size = %d, "+
"request content = %s",
startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent,
)
}
func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
smap := monitorLeader(s)

View File

@@ -95,6 +95,9 @@ func (s *EtcdServer) newApplierV3() applierV3 {
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
ar := &applyResult{}
defer func(start time.Time) {
warnOfExpensiveRequest(start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
}(time.Now())
// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
switch {

View File

@@ -105,10 +105,12 @@ func (a *applierV2store) Sync(r *pb.Request) Response {
return Response{}
}
// applyV2Request interprets r as a call to store.X and returns a Response interpreted
// from store.Event
// applyV2Request interprets r as a call to v2store.X
// and returns a Response interpreted from v2store.Event
func (s *EtcdServer) applyV2Request(r *pb.Request) Response {
defer warnOfExpensiveRequest(time.Now(), r, nil, nil)
toTTLOptions(r)
switch r.Method {
case "POST":
return s.applyV2.Post(r)

83
etcdserver/backend.go Normal file
View File

@@ -0,0 +1,83 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
import (
"fmt"
"os"
"path/filepath"
"time"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
)
func newBackend(cfg *ServerConfig) backend.Backend {
return backend.NewDefaultBackend(backendPath(cfg))
}
func backendPath(cfg *ServerConfig) string {
return filepath.Join(cfg.SnapDir(), "db")
}
// openSnapshotBackend renames a snapshot db to the current etcd db and opens it.
func openSnapshotBackend(cfg *ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) {
snapPath, err := ss.DBFilePath(snapshot.Metadata.Index)
if err != nil {
return nil, fmt.Errorf("failed to find database snapshot file (%v)", err)
}
if err := os.Rename(snapPath, backendPath(cfg)); err != nil {
return nil, fmt.Errorf("failed to rename database snapshot file (%v)", err)
}
return openBackend(cfg), nil
}
// openBackend returns a backend using the current etcd db.
func openBackend(cfg *ServerConfig) backend.Backend {
fn := backendPath(cfg)
beOpened := make(chan backend.Backend)
go func() {
beOpened <- newBackend(cfg)
}()
select {
case be := <-beOpened:
return be
case <-time.After(10 * time.Second):
plog.Warningf("another etcd process is using %q and holds the file lock, or loading backend file is taking >10 seconds", fn)
plog.Warningf("waiting for it to exit before starting...")
}
return <-beOpened
}
// recoverBackendSnapshot recovers the DB from a snapshot in case etcd crashes
// before updating the backend db after persisting raft snapshot to disk,
// violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this
// case, replace the db with the snapshot db sent by the leader.
func recoverSnapshotBackend(cfg *ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
var cIndex consistentIndex
kv := mvcc.New(oldbe, &lease.FakeLessor{}, &cIndex)
defer kv.Close()
if snapshot.Metadata.Index <= kv.ConsistentIndex() {
return oldbe, nil
}
oldbe.Close()
return openSnapshotBackend(cfg, snap.New(cfg.SnapDir()), snapshot)
}

View File

@@ -0,0 +1,175 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserverpb
import (
"fmt"
"strings"
proto "github.com/golang/protobuf/proto"
)
// InternalRaftStringer implements custom proto Stringer:
// redact password, replace value fields with value_size fields.
type InternalRaftStringer struct {
Request *InternalRaftRequest
}
func (as *InternalRaftStringer) String() string {
switch {
case as.Request.LeaseGrant != nil:
return fmt.Sprintf("header:<%s> lease_grant:<ttl:%d-second id:%016x>",
as.Request.Header.String(),
as.Request.LeaseGrant.TTL,
as.Request.LeaseGrant.ID,
)
case as.Request.LeaseRevoke != nil:
return fmt.Sprintf("header:<%s> lease_revoke:<id:%016x>",
as.Request.Header.String(),
as.Request.LeaseRevoke.ID,
)
case as.Request.Authenticate != nil:
return fmt.Sprintf("header:<%s> authenticate:<name:%s simple_token:%s>",
as.Request.Header.String(),
as.Request.Authenticate.Name,
as.Request.Authenticate.SimpleToken,
)
case as.Request.AuthUserAdd != nil:
return fmt.Sprintf("header:<%s> auth_user_add:<name:%s>",
as.Request.Header.String(),
as.Request.AuthUserAdd.Name,
)
case as.Request.AuthUserChangePassword != nil:
return fmt.Sprintf("header:<%s> auth_user_change_password:<name:%s>",
as.Request.Header.String(),
as.Request.AuthUserChangePassword.Name,
)
case as.Request.Put != nil:
return fmt.Sprintf("header:<%s> put:<%s>",
as.Request.Header.String(),
NewLoggablePutRequest(as.Request.Put).String(),
)
case as.Request.Txn != nil:
return fmt.Sprintf("header:<%s> txn:<%s>",
as.Request.Header.String(),
NewLoggableTxnRequest(as.Request.Txn).String(),
)
default:
// nothing to redact
}
return as.Request.String()
}
// txnRequestStringer implements a custom proto String to replace value bytes fields with value size
// fields in any nested txn and put operations.
type txnRequestStringer struct {
Request *TxnRequest
}
func NewLoggableTxnRequest(request *TxnRequest) *txnRequestStringer {
return &txnRequestStringer{request}
}
func (as *txnRequestStringer) String() string {
var compare []string
for _, c := range as.Request.Compare {
switch cv := c.TargetUnion.(type) {
case *Compare_Value:
compare = append(compare, newLoggableValueCompare(c, cv).String())
default:
// nothing to redact
compare = append(compare, c.String())
}
}
var success []string
for _, s := range as.Request.Success {
success = append(success, newLoggableRequestOp(s).String())
}
var failure []string
for _, f := range as.Request.Failure {
failure = append(failure, newLoggableRequestOp(f).String())
}
return fmt.Sprintf("compare:<%s> success:<%s> failure:<%s>",
strings.Join(compare, " "),
strings.Join(success, " "),
strings.Join(failure, " "),
)
}
// requestOpStringer implements a custom proto String to replace value bytes fields with value
// size fields in any nested txn and put operations.
type requestOpStringer struct {
Op *RequestOp
}
func newLoggableRequestOp(op *RequestOp) *requestOpStringer {
return &requestOpStringer{op}
}
func (as *requestOpStringer) String() string {
switch op := as.Op.Request.(type) {
case *RequestOp_RequestPut:
return fmt.Sprintf("request_put:<%s>", NewLoggablePutRequest(op.RequestPut).String())
default:
// nothing to redact
}
return as.Op.String()
}
// loggableValueCompare implements a custom proto String for Compare.Value union member types to
// replace the value bytes field with a value size field.
// To preserve proto encoding of the key and range_end bytes, a faked out proto type is used here.
type loggableValueCompare struct {
Result Compare_CompareResult `protobuf:"varint,1,opt,name=result,proto3,enum=etcdserverpb.Compare_CompareResult"`
Target Compare_CompareTarget `protobuf:"varint,2,opt,name=target,proto3,enum=etcdserverpb.Compare_CompareTarget"`
Key []byte `protobuf:"bytes,3,opt,name=key,proto3"`
ValueSize int `protobuf:"bytes,7,opt,name=value_size,proto3"`
}
func newLoggableValueCompare(c *Compare, cv *Compare_Value) *loggableValueCompare {
return &loggableValueCompare{
c.Result,
c.Target,
c.Key,
len(cv.Value),
}
}
func (m *loggableValueCompare) Reset() { *m = loggableValueCompare{} }
func (m *loggableValueCompare) String() string { return proto.CompactTextString(m) }
func (*loggableValueCompare) ProtoMessage() {}
// loggablePutRequest implements a custom proto String to replace value bytes field with a value
// size field.
// To preserve proto encoding of the key bytes, a faked out proto type is used here.
type loggablePutRequest struct {
Key []byte `protobuf:"bytes,1,opt,name=key,proto3"`
ValueSize int `protobuf:"varint,2,opt,name=value_size,proto3"`
Lease int64 `protobuf:"varint,3,opt,name=lease,proto3"`
PrevKv bool `protobuf:"varint,4,opt,name=prev_kv,proto3"`
}
func NewLoggablePutRequest(request *PutRequest) *loggablePutRequest {
return &loggablePutRequest{
request.Key,
len(request.Value),
request.Lease,
request.PrevKv,
}
}
func (m *loggablePutRequest) Reset() { *m = loggablePutRequest{} }
func (m *loggablePutRequest) String() string { return proto.CompactTextString(m) }
func (*loggablePutRequest) ProtoMessage() {}

View File

@@ -15,9 +15,11 @@
package etcdserver
import (
goruntime "runtime"
"time"
"github.com/coreos/etcd/pkg/runtime"
"github.com/coreos/etcd/version"
"github.com/prometheus/client_golang/prometheus"
)
@@ -40,6 +42,18 @@ var (
Name: "leader_changes_seen_total",
Help: "The number of leader changes seen.",
})
heartbeatSendFailures = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "heartbeat_send_failures_total",
Help: "The total number of leader heartbeat send failures (likely overloaded from slow disk).",
})
slowApplies = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "slow_apply_total",
Help: "The total number of slow apply requests (likely overloaded from slow disk).",
})
proposalsCommitted = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
@@ -64,16 +78,70 @@ var (
Name: "proposals_failed_total",
Help: "The total number of failed proposals seen.",
})
slowReadIndex = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "slow_read_indexes_total",
Help: "The total number of pending read indexes not in sync with leader's or timed out read index requests.",
})
readIndexFailed = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "read_indexes_failed_total",
Help: "The total number of failed read indexes seen.",
})
quotaBackendBytes = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "quota_backend_bytes",
Help: "Current backend storage quota size in bytes.",
})
currentVersion = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "version",
Help: "Which version is running. 1 for 'server_version' label with current version.",
},
[]string{"server_version"})
currentGoVersion = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "go_version",
Help: "Which Go version server is running with. 1 for 'server_go_version' label with current version.",
},
[]string{"server_go_version"})
serverID = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "id",
Help: "Server or member ID in hexadecimal format. 1 for 'server_id' label with current ID.",
},
[]string{"server_id"})
)
func init() {
prometheus.MustRegister(hasLeader)
prometheus.MustRegister(isLeader)
prometheus.MustRegister(leaderChanges)
prometheus.MustRegister(heartbeatSendFailures)
prometheus.MustRegister(slowApplies)
prometheus.MustRegister(proposalsCommitted)
prometheus.MustRegister(proposalsApplied)
prometheus.MustRegister(proposalsPending)
prometheus.MustRegister(proposalsFailed)
prometheus.MustRegister(slowReadIndex)
prometheus.MustRegister(readIndexFailed)
prometheus.MustRegister(quotaBackendBytes)
prometheus.MustRegister(currentVersion)
prometheus.MustRegister(currentGoVersion)
prometheus.MustRegister(serverID)
currentVersion.With(prometheus.Labels{
"server_version": version.Version,
}).Set(1)
currentGoVersion.With(prometheus.Labels{
"server_go_version": goruntime.Version(),
}).Set(1)
}
func monitorFileDescriptor(done <-chan struct{}) {

View File

@@ -50,15 +50,20 @@ const (
)
func NewBackendQuota(s *EtcdServer) Quota {
quotaBackendBytes.Set(float64(s.Cfg.QuotaBackendBytes))
if s.Cfg.QuotaBackendBytes < 0 {
// disable quotas if negative
plog.Warningf("disabling backend quota")
return &passthroughQuota{}
}
if s.Cfg.QuotaBackendBytes == 0 {
quotaBackendBytes.Set(float64(backend.DefaultQuotaBytes))
// use default size if no quota size given
return &backendQuota{s, backend.DefaultQuotaBytes}
}
if s.Cfg.QuotaBackendBytes > backend.MaxQuotaBytes {
plog.Warningf("backend quota %v exceeds maximum quota %v; using maximum", s.Cfg.QuotaBackendBytes, backend.MaxQuotaBytes)
return &backendQuota{s, backend.MaxQuotaBytes}

View File

@@ -293,6 +293,7 @@ func (r *raftNode) sendMessages(ms []raftpb.Message) {
// TODO: limit request rate.
plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
plog.Warningf("server is likely overloaded")
heartbeatSendFailures.Inc()
}
}
}

View File

@@ -56,8 +56,10 @@ import (
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/version"
"github.com/coreos/etcd/wal"
"github.com/coreos/go-semver/semver"
"github.com/coreos/pkg/capnslog"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
)
@@ -378,6 +380,10 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
plog.Panicf("recovered store from snapshot error: %v", err)
}
plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index)
if be, err = recoverSnapshotBackend(cfg, be, *snapshot); err != nil {
plog.Panicf("recovering backend from snapshot error: %v", err)
}
}
cfg.Print()
if !cfg.ForceNewCluster {
@@ -437,6 +443,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
forceVersionC: make(chan struct{}),
}
serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1)
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
@@ -796,14 +803,8 @@ func (s *EtcdServer) run() {
func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
s.applySnapshot(ep, apply)
st := time.Now()
s.applyEntries(ep, apply)
d := time.Since(st)
entriesNum := len(apply.entries)
if entriesNum != 0 && d > time.Duration(entriesNum)*warnApplyDuration {
plog.Warningf("apply entries took too long [%v for %d entries]", d, len(apply.entries))
plog.Warningf("avoid queries with large range/delete range!")
}
proposalsApplied.Set(float64(ep.appliedi))
s.applyWait.Trigger(ep.appliedi)
// wait for the raft routine to finish the disk writes before triggering a

View File

@@ -15,11 +15,16 @@
package etcdserver
import (
"fmt"
"reflect"
"strings"
"time"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/rafthttp"
"github.com/golang/protobuf/proto"
)
// isConnectedToQuorumSince checks whether the local member is connected to the
@@ -95,3 +100,56 @@ func (nc *notifier) notify(err error) {
nc.err = err
close(nc.c)
}
func warnOfExpensiveRequest(now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
var resp string
if !isNil(respMsg) {
resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
}
warnOfExpensiveGenericRequest(now, reqStringer, "", resp, err)
}
func warnOfExpensiveReadOnlyTxnRequest(now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
reqStringer := pb.NewLoggableTxnRequest(r)
var resp string
if !isNil(txnResponse) {
var resps []string
for _, r := range txnResponse.Responses {
switch op := r.Response.(type) {
case *pb.ResponseOp_ResponseRange:
resps = append(resps, fmt.Sprintf("range_response_count:%d", len(op.ResponseRange.Kvs)))
default:
// only range responses should be in a read only txn request
}
}
resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), proto.Size(txnResponse))
}
warnOfExpensiveGenericRequest(now, reqStringer, "read-only range ", resp, err)
}
func warnOfExpensiveReadOnlyRangeRequest(now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) {
var resp string
if !isNil(rangeResponse) {
resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), proto.Size(rangeResponse))
}
warnOfExpensiveGenericRequest(now, reqStringer, "read-only range ", resp, err)
}
func warnOfExpensiveGenericRequest(now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
// TODO: add metrics
d := time.Since(now)
if d > warnApplyDuration {
var result string
if err != nil {
result = fmt.Sprintf("error:%v", err)
} else {
result = resp
}
plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d)
slowApplies.Inc()
}
}
func isNil(msg proto.Message) bool {
return msg == nil || reflect.ValueOf(msg).IsNil()
}

View File

@@ -95,21 +95,25 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
if s.ClusterVersion() == nil || s.ClusterVersion().LessThan(newRangeClusterVersion) {
return s.legacyRange(ctx, r)
}
var resp *pb.RangeResponse
var err error
defer func(start time.Time) {
warnOfExpensiveReadOnlyRangeRequest(start, r, resp, err)
}(time.Now())
if !r.Serializable {
err := s.linearizableReadNotify(ctx)
err = s.linearizableReadNotify(ctx)
if err != nil {
return nil, err
}
}
var resp *pb.RangeResponse
var err error
chk := func(ai *auth.AuthInfo) error {
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
}
get := func() { resp, err = s.applyV3Base.Range(noTxn, r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
err = serr
return nil, err
}
return resp, err
}
@@ -178,6 +182,11 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
chk := func(ai *auth.AuthInfo) error {
return checkTxnAuth(s.authStore, ai, r)
}
defer func(start time.Time) {
warnOfExpensiveReadOnlyTxnRequest(start, r, resp, err)
}(time.Now())
get := func() { resp, err = s.applyV3Base.Txn(r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
@@ -716,8 +725,9 @@ func (s *EtcdServer) linearizableReadLoop() {
var rs raft.ReadState
for {
ctx := make([]byte, 8)
binary.BigEndian.PutUint64(ctx, s.reqIDGen.Next())
ctxToSend := make([]byte, 8)
id1 := s.reqIDGen.Next()
binary.BigEndian.PutUint64(ctxToSend, id1)
select {
case <-s.readwaitc:
@@ -733,12 +743,13 @@ func (s *EtcdServer) linearizableReadLoop() {
s.readMu.Unlock()
cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
if err := s.r.ReadIndex(cctx, ctx); err != nil {
if err := s.r.ReadIndex(cctx, ctxToSend); err != nil {
cancel()
if err == raft.ErrStopped {
return
}
plog.Errorf("failed to get read index from raft: %v", err)
readIndexFailed.Inc()
nr.notify(err)
continue
}
@@ -751,16 +762,24 @@ func (s *EtcdServer) linearizableReadLoop() {
for !timeout && !done {
select {
case rs = <-s.r.readStateC:
done = bytes.Equal(rs.RequestCtx, ctx)
done = bytes.Equal(rs.RequestCtx, ctxToSend)
if !done {
// a previous request might time out. now we should ignore the response of it and
// continue waiting for the response of the current requests.
plog.Warningf("ignored out-of-date read index response (want %v, got %v)", rs.RequestCtx, ctx)
id2 := uint64(0)
if len(rs.RequestCtx) == 8 {
id2 = binary.BigEndian.Uint64(rs.RequestCtx)
}
plog.Warningf("ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader (request ID want %d, got %d)", id1, id2)
slowReadIndex.Inc()
}
case <-time.After(s.Cfg.ReqTimeout()):
plog.Warningf("timed out waiting for read index response")
plog.Warningf("timed out waiting for read index response (local node might have slow network)")
nr.notify(ErrTimeout)
timeout = true
slowReadIndex.Inc()
case <-s.stopping:
return
}

View File

@@ -58,6 +58,10 @@ type Backend interface {
Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
// Size returns the current size of the backend.
Size() int64
// SizeInUse returns the current size of the backend logically in use.
// Since the backend can manage free space in a non-byte unit such as
// number of pages, the returned value can be not exactly accurate in bytes.
SizeInUse() int64
Defrag() error
ForceCommit()
Close() error
@@ -78,6 +82,10 @@ type backend struct {
// size is the number of bytes in the backend
size int64
// sizeInUse is the number of bytes actually used in the backend
sizeInUse int64
// commits counts number of commits since start
commits int64
@@ -185,6 +193,10 @@ func (b *backend) Size() int64 {
return atomic.LoadInt64(&b.size)
}
func (b *backend) SizeInUse() int64 {
return atomic.LoadInt64(&b.sizeInUse)
}
func (b *backend) run() {
defer close(b.donec)
t := time.NewTimer(b.batchInterval)
@@ -213,18 +225,12 @@ func (b *backend) Commits() int64 {
}
func (b *backend) Defrag() error {
err := b.defrag()
if err != nil {
return err
}
// commit to update metadata like db.size
b.batchTx.Commit()
return nil
return b.defrag()
}
func (b *backend) defrag() error {
now := time.Now()
// TODO: make this non-blocking?
// lock batchTx to ensure nobody is using previous tx, and then
// close previous ongoing tx.
@@ -276,6 +282,14 @@ func (b *backend) defrag() error {
plog.Fatalf("cannot begin tx (%s)", err)
}
size := b.batchTx.tx.Size()
db := b.db
atomic.StoreInt64(&b.size, size)
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
took := time.Since(now)
defragDurations.Observe(took.Seconds())
return nil
}

View File

@@ -136,15 +136,15 @@ func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) err
// Commit commits a previous tx and begins a new writable one.
func (t *batchTx) Commit() {
t.Lock()
defer t.Unlock()
t.commit(false)
t.Unlock()
}
// CommitAndStop commits the previous tx and do not create a new one.
func (t *batchTx) CommitAndStop() {
t.Lock()
defer t.Unlock()
t.commit(true)
t.Unlock()
}
func (t *batchTx) Unlock() {
@@ -160,28 +160,14 @@ func (t *batchTx) commit(stop bool) {
// commit the last tx
if t.tx != nil {
if t.pending == 0 && !stop {
t.backend.mu.RLock()
defer t.backend.mu.RUnlock()
// batchTx.commit(true) calls *bolt.Tx.Commit, which
// initializes *bolt.Tx.db and *bolt.Tx.meta as nil,
// and subsequent *bolt.Tx.Size() call panics.
//
// This nil pointer reference panic happens when:
// 1. batchTx.commit(false) from newBatchTx
// 2. batchTx.commit(true) from stopping backend
// 3. batchTx.commit(false) from inflight mvcc Hash call
//
// Check if db is nil to prevent this panic
if t.tx.DB() != nil {
atomic.StoreInt64(&t.backend.size, t.tx.Size())
}
return
}
start := time.Now()
// gofail: var beforeCommit struct{}
err = t.tx.Commit()
// gofail: var afterCommit struct{}
commitDurations.Observe(time.Since(start).Seconds())
atomic.AddInt64(&t.backend.commits, 1)
@@ -202,5 +188,9 @@ func (t *batchTx) commit(stop bool) {
if err != nil {
plog.Fatalf("cannot begin tx (%s)", err)
}
atomic.StoreInt64(&t.backend.size, t.tx.Size())
size := t.tx.Size()
db := t.backend.db
atomic.StoreInt64(&t.backend.size, size)
atomic.StoreInt64(&t.backend.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
}

View File

@@ -22,10 +22,26 @@ var (
Subsystem: "disk",
Name: "backend_commit_duration_seconds",
Help: "The latency distributions of commit called by backend.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
})
defragDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "disk",
Name: "backend_defrag_duration_seconds",
Help: "The latency distribution of backend defragmentation.",
// 100 MB usually takes 1 sec, so start with 10 MB of 100 ms
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
// highest bucket start of 0.1 sec * 2^12 == 409.6 sec
Buckets: prometheus.ExponentialBuckets(.1, 2, 13),
})
)
func init() {
prometheus.MustRegister(commitDurations)
prometheus.MustRegister(defragDurations)
}

View File

@@ -217,7 +217,6 @@ func (s *store) txnEnd(txnID int64) error {
}
s.currentRev.sub = 0
dbTotalSize.Set(float64(s.b.Size()))
s.mu.Unlock()
return nil
}
@@ -336,9 +335,14 @@ func init() {
func (s *store) Hash() (uint32, int64, error) {
s.mu.Lock()
defer s.mu.Unlock()
start := time.Now()
s.b.ForceCommit()
h, err := s.b.Hash(DefaultIgnores)
hashDurations.Observe(time.Since(start).Seconds())
rev := s.currentRev.main
return h, rev, err
}
@@ -374,6 +378,13 @@ func (s *store) Restore(b backend.Backend) error {
}
func (s *store) restore() error {
reportDbTotalSizeInBytesMu.Lock()
reportDbTotalSizeInBytes = func() float64 { return float64(s.b.Size()) }
reportDbTotalSizeInBytesMu.Unlock()
reportDbTotalSizeInUseInBytesMu.Lock()
reportDbTotalSizeInUseInBytes = func() float64 { return float64(s.b.SizeInUse()) }
reportDbTotalSizeInUseInBytesMu.Unlock()
min, max := newRevBytes(), newRevBytes()
revToBytes(revision{main: 1}, min)
revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)

View File

@@ -613,6 +613,7 @@ type fakeBackend struct {
func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx }
func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil }
func (b *fakeBackend) Size() int64 { return 0 }
func (b *fakeBackend) SizeInUse() int64 { return 0 }
func (b *fakeBackend) Snapshot() backend.Snapshot { return nil }
func (b *fakeBackend) ForceCommit() {}
func (b *fakeBackend) Defrag() error { return nil }

View File

@@ -15,6 +15,8 @@
package mvcc
import (
"sync"
"github.com/prometheus/client_golang/prometheus"
)
@@ -129,11 +131,62 @@ var (
Buckets: prometheus.ExponentialBuckets(100, 2, 14),
})
dbTotalSize = prometheus.NewGauge(prometheus.GaugeOpts{
dbTotalSizeDebugging = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "etcd_debugging",
Subsystem: "mvcc",
Name: "db_total_size_in_bytes",
Help: "Total size of the underlying database in bytes.",
Help: "Total size of the underlying database physically allocated in bytes. Use etcd_mvcc_db_total_size_in_bytes",
},
func() float64 {
reportDbTotalSizeInBytesMu.RLock()
defer reportDbTotalSizeInBytesMu.RUnlock()
return reportDbTotalSizeInBytes()
},
)
// overridden
dbTotalSize = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "mvcc",
Name: "db_total_size_in_bytes",
Help: "Total size of the underlying database physically allocated in bytes.",
},
func() float64 {
reportDbTotalSizeInBytesMu.RLock()
defer reportDbTotalSizeInBytesMu.RUnlock()
return reportDbTotalSizeInBytes()
},
)
// overridden by mvcc initialization
reportDbTotalSizeInBytesMu sync.RWMutex
reportDbTotalSizeInBytes = func() float64 { return 0 }
dbTotalSizeInUse = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "mvcc",
Name: "db_total_size_in_use_in_bytes",
Help: "Total size of the underlying database logically in use in bytes.",
},
func() float64 {
reportDbTotalSizeInUseInBytesMu.RLock()
defer reportDbTotalSizeInUseInBytesMu.RUnlock()
return reportDbTotalSizeInUseInBytes()
},
)
// overridden by mvcc initialization
reportDbTotalSizeInUseInBytesMu sync.RWMutex
reportDbTotalSizeInUseInBytes func() float64 = func() float64 { return 0 }
hashDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "mvcc",
Name: "hash_duration_seconds",
Help: "The latency distribution of storage hash operation.",
// 100 MB usually takes 100 ms, so start with 10 MB of 10 ms
// lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
// highest bucket start of 0.01 sec * 2^14 == 163.84 sec
Buckets: prometheus.ExponentialBuckets(.01, 2, 15),
})
)
@@ -151,7 +204,10 @@ func init() {
prometheus.MustRegister(indexCompactionPauseDurations)
prometheus.MustRegister(dbCompactionPauseDurations)
prometheus.MustRegister(dbCompactionTotalDurations)
prometheus.MustRegister(dbTotalSizeDebugging)
prometheus.MustRegister(dbTotalSize)
prometheus.MustRegister(dbTotalSizeInUse)
prometheus.MustRegister(hashDurations)
}
// ReportEventReceived reports that an event is received.

View File

@@ -267,6 +267,7 @@ func (s *watchableStore) Restore(b backend.Backend) error {
}
for wa := range s.synced.watchers {
wa.restore = true
s.unsynced.add(wa)
}
s.synced = newWatcherGroup()
@@ -549,6 +550,14 @@ type watcher struct {
// compacted is set when the watcher is removed because of compaction
compacted bool
// restore is true when the watcher is being restored from leader snapshot
// which means that this watcher has just been moved from "synced" to "unsynced"
// watcher group, possibly with a future revision when it was first added
// to the synced watcher
// "unsynced" watcher revision must always be <= current revision,
// except when the watcher were to be moved from "synced" watcher group
restore bool
// minRev is the minimum revision update the watcher will accept
minRev int64
id WatchID

View File

@@ -336,6 +336,62 @@ func TestWatchRestore(t *testing.T) {
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
}
// TestWatchRestoreSyncedWatcher tests such a case that:
// 1. watcher is created with a future revision "math.MaxInt64 - 2"
// 2. watcher with a future revision is added to "synced" watcher group
// 3. restore/overwrite storage with snapshot of a higher lasat revision
// 4. restore operation moves "synced" to "unsynced" watcher group
// 5. choose the watcher from step 1, without panic
func TestWatchRestoreSyncedWatcher(t *testing.T) {
b1, b1Path := backend.NewDefaultTmpBackend()
s1 := newWatchableStore(b1, &lease.FakeLessor{}, nil)
defer cleanup(s1, b1, b1Path)
b2, b2Path := backend.NewDefaultTmpBackend()
s2 := newWatchableStore(b2, &lease.FakeLessor{}, nil)
defer cleanup(s2, b2, b2Path)
testKey, testValue := []byte("foo"), []byte("bar")
rev := s1.Put(testKey, testValue, lease.NoLease)
startRev := rev + 2
// create a watcher with a future revision
// add to "synced" watcher group (startRev > s.store.currentRev)
w1 := s1.NewWatchStream()
w1.Watch(testKey, nil, startRev)
// make "s2" ends up with a higher last revision
s2.Put(testKey, testValue, lease.NoLease)
s2.Put(testKey, testValue, lease.NoLease)
// overwrite storage with higher revisions
if err := s1.Restore(b2); err != nil {
t.Fatal(err)
}
// wait for next "syncWatchersLoop" iteration
// and the unsynced watcher should be chosen
time.Sleep(2 * time.Second)
// trigger events for "startRev"
s1.Put(testKey, testValue, lease.NoLease)
select {
case resp := <-w1.Chan():
if resp.Revision != startRev {
t.Fatalf("resp.Revision expect %d, got %d", startRev, resp.Revision)
}
if len(resp.Events) != 1 {
t.Fatalf("len(resp.Events) expect 1, got %d", len(resp.Events))
}
if resp.Events[0].Kv.ModRevision != startRev {
t.Fatalf("resp.Events[0].Kv.ModRevision expect %d, got %d", startRev, resp.Events[0].Kv.ModRevision)
}
case <-time.After(time.Second):
t.Fatal("failed to receive event in 1 second")
}
}
// TestWatchBatchUnsynced tests batching on unsynced watchers
func TestWatchBatchUnsynced(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()

View File

@@ -15,6 +15,7 @@
package mvcc
import (
"fmt"
"math"
"github.com/coreos/etcd/mvcc/mvccpb"
@@ -238,7 +239,15 @@ func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
minRev := int64(math.MaxInt64)
for w := range wg.watchers {
if w.minRev > curRev {
panic("watcher current revision should not exceed current revision")
// after network partition, possibly choosing future revision watcher from restore operation
// with watch key "proxy-namespace__lostleader" and revision "math.MaxInt64 - 2"
// do not panic when such watcher had been moved from "synced" watcher during restore operation
if !w.restore {
panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev))
}
// mark 'restore' done, since it's chosen
w.restore = false
}
if w.minRev < compactRev {
select {

View File

@@ -21,6 +21,7 @@ import (
"net/http"
"path"
"strings"
"time"
pioutil "github.com/coreos/etcd/pkg/ioutil"
"github.com/coreos/etcd/pkg/types"
@@ -153,6 +154,8 @@ func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, c
}
}
const unknownSnapshotSender = "UNKNOWN_SNAPSHOT_SENDER"
// ServeHTTP serves HTTP request to receive and process snapshot message.
//
// If request sender dies without closing underlying TCP connection,
@@ -163,9 +166,12 @@ func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, c
// received and processed.
// 2. this case should happen rarely, so no further optimization is done.
func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
start := time.Now()
if r.Method != "POST" {
w.Header().Set("Allow", "POST")
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
return
}
@@ -173,6 +179,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
http.Error(w, err.Error(), http.StatusPreconditionFailed)
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
return
}
@@ -185,19 +192,22 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
dec := &messageDecoder{r: r.Body}
// let snapshots be very large since they can exceed 512MB for large installations
m, err := dec.decodeLimit(uint64(1 << 63))
from := types.ID(m.From).String()
if err != nil {
msg := fmt.Sprintf("failed to decode raft message (%v)", err)
plog.Errorf(msg)
http.Error(w, msg, http.StatusBadRequest)
recvFailures.WithLabelValues(r.RemoteAddr).Inc()
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
receivedBytes.WithLabelValues(from).Add(float64(m.Size()))
if m.Type != raftpb.MsgSnap {
plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
http.Error(w, "wrong raft message type", http.StatusBadRequest)
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}
@@ -208,9 +218,10 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
plog.Error(msg)
http.Error(w, msg, http.StatusInternalServerError)
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(n))
receivedBytes.WithLabelValues(from).Add(float64(n))
plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
if err := h.r.Process(context.TODO(), m); err != nil {
@@ -223,12 +234,16 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
msg := fmt.Sprintf("failed to process raft message (%v)", err)
plog.Warningf(msg)
http.Error(w, msg, http.StatusInternalServerError)
snapshotReceiveFailures.WithLabelValues(from).Inc()
}
return
}
// Write StatusNoContent header after the message has been processed by
// raft, which facilitates the client to report MsgSnap status.
w.WriteHeader(http.StatusNoContent)
snapshotReceive.WithLabelValues(from).Inc()
snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds())
}
type streamHandler struct {

View File

@@ -53,6 +53,68 @@ var (
[]string{"From"},
)
snapshotSend = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_send_success",
Help: "Total number of successful snapshot sends",
},
[]string{"To"},
)
snapshotSendFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_send_failures",
Help: "Total number of snapshot send failures",
},
[]string{"To"},
)
snapshotSendSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_send_total_duration_seconds",
Help: "Total latency distributions of v3 snapshot sends",
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
},
[]string{"To"},
)
snapshotReceive = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_receive_success",
Help: "Total number of successful snapshot receives",
},
[]string{"From"},
)
snapshotReceiveFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_receive_failures",
Help: "Total number of snapshot receive failures",
},
[]string{"From"},
)
snapshotReceiveSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_receive_total_duration_seconds",
Help: "Total latency distributions of v3 snapshot receives",
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
},
[]string{"From"},
)
rtts = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "network",
@@ -69,5 +131,13 @@ func init() {
prometheus.MustRegister(receivedBytes)
prometheus.MustRegister(sentFailures)
prometheus.MustRegister(recvFailures)
prometheus.MustRegister(snapshotSend)
prometheus.MustRegister(snapshotSendFailures)
prometheus.MustRegister(snapshotSendSeconds)
prometheus.MustRegister(snapshotReceive)
prometheus.MustRegister(snapshotReceiveFailures)
prometheus.MustRegister(snapshotReceiveSeconds)
prometheus.MustRegister(rtts)
}

View File

@@ -56,7 +56,7 @@ func (s *peerStatus) deactivate(failure failureType, reason string) {
msg := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason)
if s.active {
plog.Errorf(msg)
plog.Infof("peer %s became inactive", s.id)
plog.Infof("peer %s became inactive (message send to peer failed)", s.id)
s.active = false
s.since = time.Time{}
return

View File

@@ -17,6 +17,7 @@ package rafthttp
import (
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/xiang90/probing"
)
@@ -28,7 +29,15 @@ var (
statusErrorInterval = 5 * time.Second
)
func addPeerToProber(p probing.Prober, id string, us []string) {
const (
// RoundTripperNameRaftMessage is the name of round-tripper that sends
// all other Raft messages, other than "snap.Message".
RoundTripperNameRaftMessage = "ROUND_TRIPPER_RAFT_MESSAGE"
// RoundTripperNameSnapshot is the name of round-tripper that sends merged snapshot message.
RoundTripperNameSnapshot = "ROUND_TRIPPER_SNAPSHOT"
)
func addPeerToProber(p probing.Prober, id string, us []string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
hus := make([]string, len(us))
for i := range us {
hus[i] = us[i] + ProbingPrefix
@@ -40,26 +49,26 @@ func addPeerToProber(p probing.Prober, id string, us []string) {
if err != nil {
plog.Errorf("failed to add peer %s into prober", id)
} else {
go monitorProbingStatus(s, id)
go monitorProbingStatus(s, id, roundTripperName, rttSecProm)
}
}
func monitorProbingStatus(s probing.Status, id string) {
func monitorProbingStatus(s probing.Status, id string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
// set the first interval short to log error early.
interval := statusErrorInterval
for {
select {
case <-time.After(interval):
if !s.Health() {
plog.Warningf("health check for peer %s could not connect: %v", id, s.Err())
plog.Warningf("health check for peer %s could not connect: %v (prober %q)", id, s.Err(), roundTripperName)
interval = statusErrorInterval
} else {
interval = statusMonitoringInterval
}
if s.ClockDiff() > time.Second {
plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second)
plog.Warningf("the clock difference against peer %s is too high [%v > %v] (prober %q)", id, s.ClockDiff(), time.Second, roundTripperName)
}
rtts.WithLabelValues(id).Observe(s.SRTT().Seconds())
rttSecProm.WithLabelValues(id).Observe(s.SRTT().Seconds())
case <-s.StopNotify():
return
}

View File

@@ -63,7 +63,10 @@ func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *pe
func (s *snapshotSender) stop() { close(s.stopc) }
func (s *snapshotSender) send(merged snap.Message) {
start := time.Now()
m := merged.Message
to := types.ID(m.To).String()
body := createSnapBody(merged)
defer body.Close()
@@ -91,14 +94,18 @@ func (s *snapshotSender) send(merged snap.Message) {
// machine knows about it, it would pause a while and retry sending
// new snapshot message.
s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
sentFailures.WithLabelValues(to).Inc()
snapshotSendFailures.WithLabelValues(to).Inc()
return
}
s.status.activate()
s.r.ReportSnapshot(m.To, raft.SnapshotFinish)
plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(merged.TotalSize))
sentBytes.WithLabelValues(to).Add(float64(merged.TotalSize))
snapshotSend.WithLabelValues(to).Inc()
snapshotSendSeconds.WithLabelValues(to).Observe(time.Since(start).Seconds())
}
// post posts the given request.

View File

@@ -26,6 +26,7 @@ import (
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/pkg/capnslog"
"github.com/xiang90/probing"
"golang.org/x/net/context"
@@ -121,7 +122,8 @@ type Transport struct {
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
peers map[types.ID]Peer // peers map
prober probing.Prober
pipelineProber probing.Prober
streamProber probing.Prober
}
func (t *Transport) Start() error {
@@ -136,7 +138,8 @@ func (t *Transport) Start() error {
}
t.remotes = make(map[types.ID]*remote)
t.peers = make(map[types.ID]Peer)
t.prober = probing.NewProber(t.pipelineRt)
t.pipelineProber = probing.NewProber(t.pipelineRt)
t.streamProber = probing.NewProber(t.streamRt)
return nil
}
@@ -197,7 +200,8 @@ func (t *Transport) Stop() {
for _, p := range t.peers {
p.stop()
}
t.prober.RemoveAll()
t.pipelineProber.RemoveAll()
t.streamProber.RemoveAll()
if tr, ok := t.streamRt.(*http.Transport); ok {
tr.CloseIdleConnections()
}
@@ -276,8 +280,8 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
}
fs := t.LeaderStats.Follower(id.String())
t.peers[id] = startPeer(t, urls, id, fs)
addPeerToProber(t.prober, id.String(), us)
addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts)
addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts)
plog.Infof("added peer %s", id)
}
@@ -304,7 +308,8 @@ func (t *Transport) removePeer(id types.ID) {
}
delete(t.peers, id)
delete(t.LeaderStats.Followers, id.String())
t.prober.Remove(id.String())
t.pipelineProber.Remove(id.String())
t.streamProber.Remove(id.String())
plog.Infof("removed peer %s", id)
}
@@ -321,8 +326,10 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) {
}
t.peers[id].update(urls)
t.prober.Remove(id.String())
addPeerToProber(t.prober, id.String(), us)
t.pipelineProber.Remove(id.String())
addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts)
t.streamProber.Remove(id.String())
addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts)
plog.Infof("updated peer %s", id)
}

View File

@@ -35,8 +35,10 @@ func TestTransportSend(t *testing.T) {
peer1 := newFakePeer()
peer2 := newFakePeer()
tr := &Transport{
ServerStats: ss,
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
ServerStats: ss,
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
wmsgsIgnored := []raftpb.Message{
// bad local message
@@ -72,8 +74,10 @@ func TestTransportCutMend(t *testing.T) {
peer1 := newFakePeer()
peer2 := newFakePeer()
tr := &Transport{
ServerStats: ss,
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
ServerStats: ss,
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
tr.CutPeer(types.ID(1))
@@ -100,10 +104,11 @@ func TestTransportCutMend(t *testing.T) {
func TestTransportAdd(t *testing.T) {
ls := stats.NewLeaderStats("")
tr := &Transport{
LeaderStats: ls,
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
prober: probing.NewProber(nil),
LeaderStats: ls,
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
tr.AddPeer(1, []string{"http://localhost:2380"})
@@ -128,10 +133,11 @@ func TestTransportAdd(t *testing.T) {
func TestTransportRemove(t *testing.T) {
tr := &Transport{
LeaderStats: stats.NewLeaderStats(""),
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
prober: probing.NewProber(nil),
LeaderStats: stats.NewLeaderStats(""),
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
tr.AddPeer(1, []string{"http://localhost:2380"})
tr.RemovePeer(types.ID(1))
@@ -145,8 +151,9 @@ func TestTransportRemove(t *testing.T) {
func TestTransportUpdate(t *testing.T) {
peer := newFakePeer()
tr := &Transport{
peers: map[types.ID]Peer{types.ID(1): peer},
prober: probing.NewProber(nil),
peers: map[types.ID]Peer{types.ID(1): peer},
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
u := "http://localhost:2380"
tr.UpdatePeer(types.ID(1), []string{u})
@@ -159,13 +166,14 @@ func TestTransportUpdate(t *testing.T) {
func TestTransportErrorc(t *testing.T) {
errorc := make(chan error, 1)
tr := &Transport{
Raft: &fakeRaft{},
LeaderStats: stats.NewLeaderStats(""),
ErrorC: errorc,
streamRt: newRespRoundTripper(http.StatusForbidden, nil),
pipelineRt: newRespRoundTripper(http.StatusForbidden, nil),
peers: make(map[types.ID]Peer),
prober: probing.NewProber(nil),
Raft: &fakeRaft{},
LeaderStats: stats.NewLeaderStats(""),
ErrorC: errorc,
streamRt: newRespRoundTripper(http.StatusForbidden, nil),
pipelineRt: newRespRoundTripper(http.StatusForbidden, nil),
peers: make(map[types.ID]Peer),
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
tr.AddPeer(1, []string{"http://localhost:2380"})
defer tr.Stop()

View File

@@ -20,6 +20,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"time"
"github.com/coreos/etcd/pkg/fileutil"
)
@@ -27,6 +28,8 @@ import (
// SaveDBFrom saves snapshot of the database from the given reader. It
// guarantees the save operation is atomic.
func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
start := time.Now()
f, err := ioutil.TempFile(s.dir, "tmp")
if err != nil {
return 0, err
@@ -34,7 +37,9 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
var n int64
n, err = io.Copy(f, r)
if err == nil {
fsyncStart := time.Now()
err = fileutil.Fsync(f)
snapDBFsyncSec.Observe(time.Since(fsyncStart).Seconds())
}
f.Close()
if err != nil {
@@ -54,6 +59,7 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
plog.Infof("saved database snapshot to disk [total bytes: %d]", n)
snapDBSaveSec.Observe(time.Since(start).Seconds())
return n, nil
}

View File

@@ -33,9 +33,33 @@ var (
Help: "The marshalling cost distributions of save called by snapshot.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
})
snapDBSaveSec = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "snap_db",
Name: "save_total_duration_seconds",
Help: "The total latency distributions of v3 snapshot save",
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
})
snapDBFsyncSec = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "snap_db",
Name: "fsync_duration_seconds",
Help: "The latency distributions of fsyncing .snap.db file",
// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
})
)
func init() {
prometheus.MustRegister(saveDurations)
prometheus.MustRegister(marshallingDurations)
prometheus.MustRegister(snapDBSaveSec)
prometheus.MustRegister(snapDBFsyncSec)
}

View File

@@ -5,17 +5,13 @@ if ! [[ "$0" =~ "tests/semaphore.test.bash" ]]; then
exit 255
fi
TEST_SUFFIX=$(date +%s | base64 | head -c 15)
<<COMMENT
# amd64-e2e
tests/semaphore.test.bash
sudo HOST_TMP_DIR=/tmp TEST_OPTS="PASSES='build release e2e' MANUAL_VER=v3.3.7" make docker-test
TEST_OPTS="PASSES='build unit release integration_e2e functional' MANUAL_VER=v3.1.12"
if [ "$TEST_ARCH" == "386" ]; then
TEST_OPTS="GOARCH=386 PASSES='build unit integration_e2e'"
fi
# 386-e2e
sudo HOST_TMP_DIR=/tmp TEST_OPTS="GOARCH=386 PASSES='build e2e'" make docker-test
COMMENT
docker run \
--rm \
--volume=`pwd`:/go/src/github.com/coreos/etcd \
gcr.io/etcd-development/etcd-test:go1.8.7 \
/bin/bash -c "${TEST_OPTS} ./test 2>&1 | tee test-${TEST_SUFFIX}.log"
! egrep "(--- FAIL:|panic: test timed out|appears to have leaked)" -B50 -A10 test-${TEST_SUFFIX}.log
sudo HOST_TMP_DIR=/tmp TEST_OPTS="PASSES='build release e2e' MANUAL_VER=v3.1.18" make docker-test

View File

@@ -26,7 +26,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "3.0.0"
Version = "3.1.15"
Version = "3.1.20"
APIVersion = "unknown"
// Git SHA Value will be set during build