Compare commits
19 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
3c8740a793 | ||
![]() |
9cd3eefd01 | ||
![]() |
47d3dea2a9 | ||
![]() |
aaa85715c3 | ||
![]() |
5cf80a6229 | ||
![]() |
069bce1384 | ||
![]() |
7c164a8948 | ||
![]() |
ff5fb05bec | ||
![]() |
a977795f2d | ||
![]() |
aedfe5458a | ||
![]() |
e7888805e1 | ||
![]() |
7fbfdc2b6a | ||
![]() |
5c19bd24f0 | ||
![]() |
683a643fba | ||
![]() |
660dc83e19 | ||
![]() |
ffcddac2ff | ||
![]() |
6d8052314b | ||
![]() |
5e4d852e95 | ||
![]() |
3827d6bd2d |
@@ -113,10 +113,9 @@ func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOp
|
||||
return nil, status.Errorf(codes.Unimplemented, "clientv3/retry_interceptor: cannot retry on ClientStreams, set Disable()")
|
||||
}
|
||||
newStreamer, err := streamer(ctx, desc, cc, method, grpcOpts...)
|
||||
logger.Warn("retry stream intercept", zap.Error(err))
|
||||
if err != nil {
|
||||
// TODO(mwitkow): Maybe dial and transport errors should be retriable?
|
||||
return nil, err
|
||||
logger.Error("streamer failed to create ClientStream", zap.Error(err))
|
||||
return nil, err // TODO(mwitkow): Maybe dial and transport errors should be retriable?
|
||||
}
|
||||
retryingStreamer := &serverStreamingRetryingStream{
|
||||
client: c,
|
||||
@@ -185,6 +184,7 @@ func (s *serverStreamingRetryingStream) RecvMsg(m interface{}) error {
|
||||
if !attemptRetry {
|
||||
return lastErr // success or hard failure
|
||||
}
|
||||
|
||||
// We start off from attempt 1, because zeroth was already made on normal SendMsg().
|
||||
for attempt := uint(1); attempt < s.callOpts.max; attempt++ {
|
||||
if err := waitRetryBackoff(s.ctx, attempt, s.callOpts); err != nil {
|
||||
@@ -192,12 +192,13 @@ func (s *serverStreamingRetryingStream) RecvMsg(m interface{}) error {
|
||||
}
|
||||
newStream, err := s.reestablishStreamAndResendBuffer(s.ctx)
|
||||
if err != nil {
|
||||
// TODO(mwitkow): Maybe dial and transport errors should be retriable?
|
||||
return err
|
||||
s.client.lg.Error("failed reestablishStreamAndResendBuffer", zap.Error(err))
|
||||
return err // TODO(mwitkow): Maybe dial and transport errors should be retriable?
|
||||
}
|
||||
s.setStream(newStream)
|
||||
|
||||
s.client.lg.Warn("retrying RecvMsg", zap.Error(lastErr))
|
||||
attemptRetry, lastErr = s.receiveMsgAndIndicateRetry(m)
|
||||
//fmt.Printf("Received message and indicate: %v %v\n", attemptRetry, lastErr)
|
||||
if !attemptRetry {
|
||||
return lastErr
|
||||
}
|
||||
|
@@ -36,6 +36,7 @@ import (
|
||||
"github.com/coreos/etcd/version"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// RaftCluster is a list of Members that belong to the same raft cluster
|
||||
@@ -368,6 +369,7 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*semver.Version
|
||||
} else {
|
||||
plog.Noticef("set the initial cluster version to %v", version.Cluster(ver.String()))
|
||||
}
|
||||
oldVer := c.version
|
||||
c.version = ver
|
||||
mustDetectDowngrade(c.version)
|
||||
if c.store != nil {
|
||||
@@ -376,6 +378,10 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*semver.Version
|
||||
if c.be != nil {
|
||||
mustSaveClusterVersionToBackend(c.be, ver)
|
||||
}
|
||||
if oldVer != nil {
|
||||
ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(oldVer.String())}).Set(0)
|
||||
}
|
||||
ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(ver.String())}).Set(1)
|
||||
onSet(ver)
|
||||
}
|
||||
|
||||
|
31
etcdserver/membership/metrics.go
Normal file
31
etcdserver/membership/metrics.go
Normal file
@@ -0,0 +1,31 @@
|
||||
// Copyright 2018 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package membership
|
||||
|
||||
import "github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
var (
|
||||
ClusterVersionMetrics = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "cluster",
|
||||
Name: "version",
|
||||
Help: "Which version is running. 1 for 'cluster_version' label with current cluster version",
|
||||
},
|
||||
[]string{"cluster_version"})
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(ClusterVersionMetrics)
|
||||
}
|
@@ -602,6 +602,7 @@ func (s *EtcdServer) start() {
|
||||
s.leaderChanged = make(chan struct{})
|
||||
if s.ClusterVersion() != nil {
|
||||
plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, version.Cluster(s.ClusterVersion().String()))
|
||||
membership.ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(s.ClusterVersion().String())}).Set(1)
|
||||
} else {
|
||||
plog.Infof("starting server... [version: %v, cluster version: to_be_decided]", version.Version)
|
||||
}
|
||||
@@ -612,12 +613,13 @@ func (s *EtcdServer) start() {
|
||||
|
||||
func (s *EtcdServer) purgeFile() {
|
||||
var dberrc, serrc, werrc <-chan error
|
||||
var dbdonec, sdonec, wdonec <-chan struct{}
|
||||
if s.Cfg.MaxSnapFiles > 0 {
|
||||
dberrc = fileutil.PurgeFile(s.Cfg.SnapDir(), "snap.db", s.Cfg.MaxSnapFiles, purgeFileInterval, s.done)
|
||||
serrc = fileutil.PurgeFile(s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.done)
|
||||
dbdonec, dberrc = fileutil.PurgeFileWithDoneNotify(s.Cfg.SnapDir(), "snap.db", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping)
|
||||
sdonec, serrc = fileutil.PurgeFileWithDoneNotify(s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping)
|
||||
}
|
||||
if s.Cfg.MaxWALFiles > 0 {
|
||||
werrc = fileutil.PurgeFile(s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.done)
|
||||
wdonec, werrc = fileutil.PurgeFileWithDoneNotify(s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.stopping)
|
||||
}
|
||||
select {
|
||||
case e := <-dberrc:
|
||||
@@ -627,6 +629,15 @@ func (s *EtcdServer) purgeFile() {
|
||||
case e := <-werrc:
|
||||
plog.Fatalf("failed to purge wal file %v", e)
|
||||
case <-s.stopping:
|
||||
if dbdonec != nil {
|
||||
<-dbdonec
|
||||
}
|
||||
if sdonec != nil {
|
||||
<-sdonec
|
||||
}
|
||||
if wdonec != nil {
|
||||
<-wdonec
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@@ -238,6 +238,14 @@ var (
|
||||
// overridden by mvcc initialization
|
||||
reportCompactRevMu sync.RWMutex
|
||||
reportCompactRev = func() float64 { return 0 }
|
||||
|
||||
totalPutSizeGauge = prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: "etcd_debugging",
|
||||
Subsystem: "mvcc",
|
||||
Name: "total_put_size_in_bytes",
|
||||
Help: "The total size of put kv pairs seen by this member.",
|
||||
})
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -262,6 +270,7 @@ func init() {
|
||||
prometheus.MustRegister(hashRevDurations)
|
||||
prometheus.MustRegister(currentRev)
|
||||
prometheus.MustRegister(compactRev)
|
||||
prometheus.MustRegister(totalPutSizeGauge)
|
||||
}
|
||||
|
||||
// ReportEventReceived reports that an event is received.
|
||||
|
@@ -23,14 +23,15 @@ type metricsTxnWrite struct {
|
||||
ranges uint
|
||||
puts uint
|
||||
deletes uint
|
||||
putSize int64
|
||||
}
|
||||
|
||||
func newMetricsTxnRead(tr TxnRead) TxnRead {
|
||||
return &metricsTxnWrite{&txnReadWrite{tr}, 0, 0, 0}
|
||||
return &metricsTxnWrite{&txnReadWrite{tr}, 0, 0, 0, 0}
|
||||
}
|
||||
|
||||
func newMetricsTxnWrite(tw TxnWrite) TxnWrite {
|
||||
return &metricsTxnWrite{tw, 0, 0, 0}
|
||||
return &metricsTxnWrite{tw, 0, 0, 0, 0}
|
||||
}
|
||||
|
||||
func (tw *metricsTxnWrite) Range(key, end []byte, ro RangeOptions) (*RangeResult, error) {
|
||||
@@ -45,6 +46,8 @@ func (tw *metricsTxnWrite) DeleteRange(key, end []byte) (n, rev int64) {
|
||||
|
||||
func (tw *metricsTxnWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
|
||||
tw.puts++
|
||||
size := int64(len(key) + len(value))
|
||||
tw.putSize += size
|
||||
return tw.TxnWrite.Put(key, value, lease)
|
||||
}
|
||||
|
||||
@@ -55,5 +58,6 @@ func (tw *metricsTxnWrite) End() {
|
||||
}
|
||||
rangeCounter.Add(float64(tw.ranges))
|
||||
putCounter.Add(float64(tw.puts))
|
||||
totalPutSizeGauge.Add(float64(tw.putSize))
|
||||
deleteCounter.Add(float64(tw.deletes))
|
||||
}
|
||||
|
@@ -23,13 +23,23 @@ import (
|
||||
)
|
||||
|
||||
func PurgeFile(dirname string, suffix string, max uint, interval time.Duration, stop <-chan struct{}) <-chan error {
|
||||
return purgeFile(dirname, suffix, max, interval, stop, nil)
|
||||
return purgeFile(dirname, suffix, max, interval, stop, nil, nil)
|
||||
}
|
||||
|
||||
func PurgeFileWithDoneNotify(dirname string, suffix string, max uint, interval time.Duration, stop <-chan struct{}) (<-chan struct{}, <-chan error) {
|
||||
doneC := make(chan struct{})
|
||||
errC := purgeFile(dirname, suffix, max, interval, stop, nil, doneC)
|
||||
return doneC, errC
|
||||
}
|
||||
|
||||
// purgeFile is the internal implementation for PurgeFile which can post purged files to purgec if non-nil.
|
||||
func purgeFile(dirname string, suffix string, max uint, interval time.Duration, stop <-chan struct{}, purgec chan<- string) <-chan error {
|
||||
// if donec is non-nil, the function closes it to notify its exit.
|
||||
func purgeFile(dirname string, suffix string, max uint, interval time.Duration, stop <-chan struct{}, purgec chan<- string, donec chan<- struct{}) <-chan error {
|
||||
errC := make(chan error, 1)
|
||||
go func() {
|
||||
if donec != nil {
|
||||
defer close(donec)
|
||||
}
|
||||
for {
|
||||
fnames, err := ReadDir(dirname)
|
||||
if err != nil {
|
||||
|
@@ -43,7 +43,7 @@ func TestPurgeFile(t *testing.T) {
|
||||
stop, purgec := make(chan struct{}), make(chan string, 10)
|
||||
|
||||
// keep 3 most recent files
|
||||
errch := purgeFile(dir, "test", 3, time.Millisecond, stop, purgec)
|
||||
errch := purgeFile(dir, "test", 3, time.Millisecond, stop, purgec, nil)
|
||||
select {
|
||||
case f := <-purgec:
|
||||
t.Errorf("unexpected purge on %q", f)
|
||||
@@ -114,7 +114,7 @@ func TestPurgeFileHoldingLockFile(t *testing.T) {
|
||||
}
|
||||
|
||||
stop, purgec := make(chan struct{}), make(chan string, 10)
|
||||
errch := purgeFile(dir, "test", 3, time.Millisecond, stop, purgec)
|
||||
errch := purgeFile(dir, "test", 3, time.Millisecond, stop, purgec, nil)
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
select {
|
||||
|
@@ -57,6 +57,11 @@ function main {
|
||||
cd release
|
||||
setup_env "${PROJ}" "${VER}"
|
||||
|
||||
if [[ $(go env GOOS) == "darwin" ]]; then
|
||||
echo "Please use linux machine for release builds."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
for os in darwin windows linux; do
|
||||
export GOOS=${os}
|
||||
TARGET_ARCHS=("amd64")
|
||||
|
@@ -37,12 +37,6 @@ main() {
|
||||
exit 1
|
||||
fi
|
||||
|
||||
KEYID=$(gpg --list-keys --with-colons| awk -F: '/^pub:/ { print $5 }')
|
||||
if [[ -z "${KEYID}" ]]; then
|
||||
echo "Failed to load gpg key. Is gpg set up correctly for etcd releases?"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Expected umask for etcd release artifacts
|
||||
umask 022
|
||||
|
||||
@@ -51,7 +45,7 @@ main() {
|
||||
if [ ! -d "${reldir}/etcd" ]; then
|
||||
mkdir -p "${reldir}"
|
||||
cd "${reldir}"
|
||||
git clone git@github.com:etcd-io/etcd.git --branch "${BRANCH}"
|
||||
git clone https://github.com/etcd-io/etcd.git --branch "${BRANCH}"
|
||||
fi
|
||||
cd "${reldir}/etcd"
|
||||
|
||||
@@ -112,9 +106,28 @@ main() {
|
||||
echo "Skipping tag step. git tag ${RELEASE_VERSION} already exists."
|
||||
else
|
||||
echo "Tagging release..."
|
||||
KEYID=$(gpg --list-keys --with-colons| awk -F: '/^pub:/ { print $5 }')
|
||||
if [[ -z "${KEYID}" ]]; then
|
||||
echo "Failed to load gpg key. Is gpg set up correctly for etcd releases?"
|
||||
exit 1
|
||||
fi
|
||||
git tag --local-user "${KEYID}" --sign "${RELEASE_VERSION}" --message "${RELEASE_VERSION}"
|
||||
fi
|
||||
|
||||
# Verify the latest commit has the version tag
|
||||
local tag="$(git describe --exact-match HEAD)"
|
||||
if [ "${tag}" != "${RELEASE_VERSION}" ]; then
|
||||
echo "Error: Expected HEAD to be tagged with ${RELEASE_VERSION}, but 'git describe --exact-match HEAD' reported: ${tag}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Verify the version tag is on the right branch
|
||||
local branch=$(git branch --contains "${RELEASE_VERSION}")
|
||||
if [ "${branch}" != "release-${MINOR_VERSION}" ]; then
|
||||
echo "Error: Git tag ${RELEASE_VERSION} should be on branch release-${MINOR_VERSION} but is on ${branch}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Push the tag change if it's not already been pushed.
|
||||
read -p "Push etcd ${RELEASE_VERSION} tag [y/N]? " confirm
|
||||
[[ "${confirm,,}" == "y" ]] || exit 1
|
||||
@@ -181,6 +194,28 @@ main() {
|
||||
gsutil -m acl ch -u allUsers:R -r gs://artifacts.etcd-development.appspot.com
|
||||
fi
|
||||
|
||||
### Release validation
|
||||
mkdir -p downloads
|
||||
|
||||
# Check image versions
|
||||
for IMAGE in "quay.io/coreos/etcd:${RELEASE_VERSION}" "gcr.io/etcd-development/etcd:${RELEASE_VERSION}"; do
|
||||
local image_version=$(docker run --rm "${IMAGE}" etcd --version | grep "etcd Version" | awk -F: '{print $2}' | tr -d '[:space:]')
|
||||
if [ "${image_version}" != "${VERSION}" ]; then
|
||||
echo "Check failed: etcd --version output for ${IMAGE} is incorrect: ${image_version}"
|
||||
exit 1
|
||||
fi
|
||||
done
|
||||
|
||||
# Check gsutil binary versions
|
||||
local BINARY_TGZ="etcd-${RELEASE_VERSION}-$(go env GOOS)-amd64.tar.gz"
|
||||
gsutil cp "gs://etcd/${RELEASE_VERSION}/${BINARY_TGZ}" downloads
|
||||
tar -zx -C downloads -f "downloads/${BINARY_TGZ}"
|
||||
local binary_version=$("./downloads/etcd-${RELEASE_VERSION}-$(go env GOOS)-amd64/etcd" --version | grep "etcd Version" | awk -F: '{print $2}' | tr -d '[:space:]')
|
||||
if [ "${binary_version}" != "${VERSION}" ]; then
|
||||
echo "Check failed: etcd --version output for ${BINARY_TGZ} from gs://etcd/${RELEASE_VERSION} is incorrect: ${binary_version}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# TODO: signing process
|
||||
echo ""
|
||||
echo "WARNING: The release has not been signed and published to github. This must be done manually."
|
||||
|
@@ -103,6 +103,11 @@ func TestReleaseUpgrade(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// expect upgraded cluster version
|
||||
if err := cURLGet(cx.epc, cURLReq{endpoint: "/metrics", expected: fmt.Sprintf(`etcd_cluster_version{cluster_version="%s"} 1`, version.Cluster(version.Version)), metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
|
||||
cx.t.Fatalf("failed get with curl (%v)", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReleaseUpgradeWithRestart(t *testing.T) {
|
||||
|
59
tests/e2e/metrics_test.go
Normal file
59
tests/e2e/metrics_test.go
Normal file
@@ -0,0 +1,59 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/etcd/version"
|
||||
)
|
||||
|
||||
func TestV3MetricsSecure(t *testing.T) {
|
||||
cfg := configTLS
|
||||
cfg.clusterSize = 1
|
||||
cfg.metricsURLScheme = "https"
|
||||
testCtl(t, metricsTest)
|
||||
}
|
||||
|
||||
func TestV3MetricsInsecure(t *testing.T) {
|
||||
cfg := configTLS
|
||||
cfg.clusterSize = 1
|
||||
cfg.metricsURLScheme = "http"
|
||||
testCtl(t, metricsTest)
|
||||
}
|
||||
|
||||
func metricsTest(cx ctlCtx) {
|
||||
if err := ctlV3Put(cx, "k", "v", ""); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
if err := cURLGet(cx.epc, cURLReq{endpoint: "/metrics", expected: `etcd_debugging_mvcc_keys_total 1`, metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
|
||||
cx.t.Fatalf("failed get with curl (%v)", err)
|
||||
}
|
||||
if err := cURLGet(cx.epc, cURLReq{endpoint: "/metrics", expected: fmt.Sprintf(`etcd_server_version{server_version="%s"} 1`, version.Version), metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
|
||||
cx.t.Fatalf("failed get with curl (%v)", err)
|
||||
}
|
||||
ver := version.Version
|
||||
if strings.HasSuffix(ver, "+git") {
|
||||
ver = strings.Replace(ver, "+git", "", 1)
|
||||
}
|
||||
if err := cURLGet(cx.epc, cURLReq{endpoint: "/metrics", expected: fmt.Sprintf(`etcd_cluster_version{cluster_version="%s"} 1`, version.Cluster(version.Version)), metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
|
||||
cx.t.Fatalf("failed get with curl (%v)", err)
|
||||
}
|
||||
if err := cURLGet(cx.epc, cURLReq{endpoint: "/health", expected: `{"health":"true"}`, metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
|
||||
cx.t.Fatalf("failed get with curl (%v)", err)
|
||||
}
|
||||
}
|
@@ -26,7 +26,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "3.0.0"
|
||||
Version = "3.3.16"
|
||||
Version = "3.3.18"
|
||||
APIVersion = "unknown"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
|
Reference in New Issue
Block a user