etcd server shouldn't wait for the ready notification infinitely on startup
parent
7101e8569d
commit
1713dc67b5
|
@ -35,6 +35,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
|
|||
- Add [`etcd --experimental-max-learners`](https://github.com/etcd-io/etcd/pull/13377) flag to allow configuration of learner max membership.
|
||||
- Add [`etcd --experimental-enable-lease-checkpoint-persist`](https://github.com/etcd-io/etcd/pull/13508) flag to handle upgrade from v3.5.2 clusters with this feature enabled.
|
||||
- Add [`etcdctl make-mirror --rev`](https://github.com/etcd-io/etcd/pull/13519) flag to support incremental mirror.
|
||||
- Add [`etcd --experimental-wait-cluster-ready-timeout`](https://github.com/etcd-io/etcd/pull/13525) flag to wait for cluster to be ready before serving client requests.
|
||||
- Fix [non mutating requests pass through quotaKVServer when NOSPACE](https://github.com/etcd-io/etcd/pull/13435)
|
||||
- Fix [exclude the same alarm type activated by multiple peers](https://github.com/etcd-io/etcd/pull/13467).
|
||||
- Fix [Provide a better liveness probe for when etcd runs as a Kubernetes pod](https://github.com/etcd-io/etcd/pull/13399)
|
||||
|
|
|
@ -80,6 +80,10 @@ type ServerConfig struct {
|
|||
TickMs uint
|
||||
ElectionTicks int
|
||||
|
||||
// WaitClusterReadyTimeout is the maximum time to wait for the
|
||||
// cluster to be ready on startup before serving client requests.
|
||||
WaitClusterReadyTimeout time.Duration
|
||||
|
||||
// InitialElectionTickAdvance is true, then local member fast-forwards
|
||||
// election ticks to speed up "initial" leader election trigger. This
|
||||
// benefits the case of larger election ticks. For instance, cross
|
||||
|
|
|
@ -60,6 +60,7 @@ const (
|
|||
DefaultGRPCKeepAliveInterval = 2 * time.Hour
|
||||
DefaultGRPCKeepAliveTimeout = 20 * time.Second
|
||||
DefaultDowngradeCheckTime = 5 * time.Second
|
||||
DefaultWaitClusterReadyTimeout = 5 * time.Second
|
||||
|
||||
DefaultListenPeerURLs = "http://localhost:2380"
|
||||
DefaultListenClientURLs = "http://localhost:2379"
|
||||
|
@ -212,14 +213,15 @@ type Config struct {
|
|||
// Note that cipher suites are prioritized in the given order.
|
||||
CipherSuites []string `json:"cipher-suites"`
|
||||
|
||||
ClusterState string `json:"initial-cluster-state"`
|
||||
DNSCluster string `json:"discovery-srv"`
|
||||
DNSClusterServiceName string `json:"discovery-srv-name"`
|
||||
Dproxy string `json:"discovery-proxy"`
|
||||
Durl string `json:"discovery"`
|
||||
InitialCluster string `json:"initial-cluster"`
|
||||
InitialClusterToken string `json:"initial-cluster-token"`
|
||||
StrictReconfigCheck bool `json:"strict-reconfig-check"`
|
||||
ClusterState string `json:"initial-cluster-state"`
|
||||
DNSCluster string `json:"discovery-srv"`
|
||||
DNSClusterServiceName string `json:"discovery-srv-name"`
|
||||
Dproxy string `json:"discovery-proxy"`
|
||||
Durl string `json:"discovery"`
|
||||
InitialCluster string `json:"initial-cluster"`
|
||||
InitialClusterToken string `json:"initial-cluster-token"`
|
||||
StrictReconfigCheck bool `json:"strict-reconfig-check"`
|
||||
ExperimentalWaitClusterReadyTimeout time.Duration `json:"wait-cluster-ready-timeout"`
|
||||
|
||||
// AutoCompactionMode is either 'periodic' or 'revision'.
|
||||
AutoCompactionMode string `json:"auto-compaction-mode"`
|
||||
|
@ -471,8 +473,9 @@ func NewConfig() *Config {
|
|||
APUrls: []url.URL{*apurl},
|
||||
ACUrls: []url.URL{*acurl},
|
||||
|
||||
ClusterState: ClusterStateFlagNew,
|
||||
InitialClusterToken: "etcd-cluster",
|
||||
ClusterState: ClusterStateFlagNew,
|
||||
InitialClusterToken: "etcd-cluster",
|
||||
ExperimentalWaitClusterReadyTimeout: DefaultWaitClusterReadyTimeout,
|
||||
|
||||
StrictReconfigCheck: DefaultStrictReconfigCheck,
|
||||
Metrics: "basic",
|
||||
|
|
|
@ -181,6 +181,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
|||
PeerTLSInfo: cfg.PeerTLSInfo,
|
||||
TickMs: cfg.TickMs,
|
||||
ElectionTicks: cfg.ElectionTicks(),
|
||||
WaitClusterReadyTimeout: cfg.ExperimentalWaitClusterReadyTimeout,
|
||||
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
|
||||
AutoCompactionRetention: autoCompactionRetention,
|
||||
AutoCompactionMode: cfg.AutoCompactionMode,
|
||||
|
@ -321,6 +322,7 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized
|
|||
zap.Bool("force-new-cluster", sc.ForceNewCluster),
|
||||
zap.String("heartbeat-interval", fmt.Sprintf("%v", time.Duration(sc.TickMs)*time.Millisecond)),
|
||||
zap.String("election-timeout", fmt.Sprintf("%v", time.Duration(sc.ElectionTicks*int(sc.TickMs))*time.Millisecond)),
|
||||
zap.String("wait-cluster-ready-timeout", sc.WaitClusterReadyTimeout.String()),
|
||||
zap.Bool("initial-election-tick-advance", sc.InitialElectionTickAdvance),
|
||||
zap.Uint64("snapshot-count", sc.SnapshotCount),
|
||||
zap.Uint64("snapshot-catchup-entries", sc.SnapshotCatchUpEntries),
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
etcdservergw "go.etcd.io/etcd/api/v3/etcdserverpb/gw"
|
||||
"go.etcd.io/etcd/client/pkg/v3/transport"
|
||||
|
@ -93,7 +94,15 @@ func (sctx *serveCtx) serve(
|
|||
errHandler func(error),
|
||||
gopts ...grpc.ServerOption) (err error) {
|
||||
logger := defaultLog.New(io.Discard, "etcdhttp", 0)
|
||||
<-s.ReadyNotify()
|
||||
|
||||
// When the quorum isn't satisfied, then etcd server will be blocked
|
||||
// on <-s.ReadyNotify(). Set a timeout here so that the etcd server
|
||||
// can continue to serve serializable read request.
|
||||
select {
|
||||
case <-time.After(s.Cfg.WaitClusterReadyTimeout):
|
||||
sctx.lg.Warn("timed out waiting for the ready notification")
|
||||
case <-s.ReadyNotify():
|
||||
}
|
||||
|
||||
sctx.lg.Info("ready to serve client requests")
|
||||
|
||||
|
|
|
@ -293,6 +293,7 @@ func newConfig() *config {
|
|||
fs.BoolVar(&cfg.ec.ExperimentalTxnModeWriteWithSharedBuffer, "experimental-txn-mode-write-with-shared-buffer", true, "Enable the write transaction to use a shared buffer in its readonly check operations.")
|
||||
fs.UintVar(&cfg.ec.ExperimentalBootstrapDefragThresholdMegabytes, "experimental-bootstrap-defrag-threshold-megabytes", 0, "Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect.")
|
||||
fs.IntVar(&cfg.ec.ExperimentalMaxLearners, "experimental-max-learners", membership.DefaultMaxLearners, "Sets the maximum number of learners that can be available in the cluster membership.")
|
||||
fs.DurationVar(&cfg.ec.ExperimentalWaitClusterReadyTimeout, "experimental-wait-cluster-ready-timeout", cfg.ec.ExperimentalWaitClusterReadyTimeout, "Maximum duration to wait for the cluster to be ready.")
|
||||
|
||||
// unsafe
|
||||
fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")
|
||||
|
|
|
@ -250,6 +250,8 @@ Experimental feature:
|
|||
Set time duration after which a warning is generated if a unary request takes more than this duration.
|
||||
--experimental-max-learners '1'
|
||||
Set the max number of learner members allowed in the cluster membership.
|
||||
--experimental-wait-cluster-ready-timeout '5s'
|
||||
Set the maximum time duration to wait for the cluster to be ready.
|
||||
|
||||
Unsafe feature:
|
||||
--force-new-cluster 'false'
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
// Copyright 2021 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// When the quorum isn't satisfied, then each etcd member isn't able to
|
||||
// publish/register server information(i.e., clientURL) into the cluster.
|
||||
// Accordingly, the v2 proxy can't get any member's clientURL, so this
|
||||
// case will fail for sure in this case.
|
||||
//
|
||||
// todo(ahrtr): When v2 proxy is removed, then we can remove the go build
|
||||
// lines below.
|
||||
//go:build !cluster_proxy
|
||||
// +build !cluster_proxy
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"go.etcd.io/etcd/tests/v3/framework/e2e"
|
||||
)
|
||||
|
||||
func TestSerializableReadWithoutQuorum(t *testing.T) {
|
||||
// Initialize a cluster with 3 members
|
||||
epc, err := e2e.InitEtcdProcessCluster(t, e2e.NewConfigAutoTLS())
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to initilize the etcd cluster: %v", err)
|
||||
}
|
||||
|
||||
// Remove two members, so that only one etcd will get started
|
||||
epc.Procs = epc.Procs[:1]
|
||||
|
||||
// Start the etcd cluster with only one member
|
||||
if err := epc.Start(); err != nil {
|
||||
t.Fatalf("Failed to start the etcd cluster: %v", err)
|
||||
}
|
||||
|
||||
// construct the ctl context
|
||||
cx := getDefaultCtlCtx(t)
|
||||
cx.epc = epc
|
||||
|
||||
// run serializable test and wait for result
|
||||
runCtlTest(t, serializableReadTest, nil, cx)
|
||||
|
||||
// run linearizable test and wait for result
|
||||
runCtlTest(t, linearizableReadTest, nil, cx)
|
||||
}
|
||||
|
||||
func serializableReadTest(cx ctlCtx) {
|
||||
cx.quorum = false
|
||||
if err := ctlV3Get(cx, []string{"key1"}, []kv{}...); err != nil {
|
||||
cx.t.Errorf("serializableReadTest failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func linearizableReadTest(cx ctlCtx) {
|
||||
cx.quorum = true
|
||||
if err := ctlV3Get(cx, []string{"key1"}, []kv{}...); err == nil {
|
||||
cx.t.Error("linearizableReadTest is expected to fail, but it succeeded")
|
||||
}
|
||||
}
|
|
@ -213,14 +213,18 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
|
|||
testCtlWithOffline(t, testFunc, nil, opts...)
|
||||
}
|
||||
|
||||
func testCtlWithOffline(t *testing.T, testFunc func(ctlCtx), testOfflineFunc func(ctlCtx), opts ...ctlOption) {
|
||||
e2e.BeforeTest(t)
|
||||
|
||||
ret := ctlCtx{
|
||||
func getDefaultCtlCtx(t *testing.T) ctlCtx {
|
||||
return ctlCtx{
|
||||
t: t,
|
||||
cfg: *e2e.NewConfigAutoTLS(),
|
||||
dialTimeout: 7 * time.Second,
|
||||
}
|
||||
}
|
||||
|
||||
func testCtlWithOffline(t *testing.T, testFunc func(ctlCtx), testOfflineFunc func(ctlCtx), opts ...ctlOption) {
|
||||
e2e.BeforeTest(t)
|
||||
|
||||
ret := getDefaultCtlCtx(t)
|
||||
ret.applyOpts(opts)
|
||||
|
||||
if !ret.quorum {
|
||||
|
@ -244,15 +248,19 @@ func testCtlWithOffline(t *testing.T, testFunc func(ctlCtx), testOfflineFunc fun
|
|||
ret.epc = epc
|
||||
ret.dataDir = epc.Procs[0].Config().DataDirPath
|
||||
|
||||
runCtlTest(t, testFunc, testOfflineFunc, ret)
|
||||
}
|
||||
|
||||
func runCtlTest(t *testing.T, testFunc func(ctlCtx), testOfflineFunc func(ctlCtx), cx ctlCtx) {
|
||||
defer func() {
|
||||
if ret.envMap != nil {
|
||||
for k := range ret.envMap {
|
||||
if cx.envMap != nil {
|
||||
for k := range cx.envMap {
|
||||
os.Unsetenv(k)
|
||||
}
|
||||
ret.envMap = make(map[string]string)
|
||||
cx.envMap = make(map[string]string)
|
||||
}
|
||||
if ret.epc != nil {
|
||||
if errC := ret.epc.Close(); errC != nil {
|
||||
if cx.epc != nil {
|
||||
if errC := cx.epc.Close(); errC != nil {
|
||||
t.Fatalf("error closing etcd processes (%v)", errC)
|
||||
}
|
||||
}
|
||||
|
@ -261,12 +269,12 @@ func testCtlWithOffline(t *testing.T, testFunc func(ctlCtx), testOfflineFunc fun
|
|||
donec := make(chan struct{})
|
||||
go func() {
|
||||
defer close(donec)
|
||||
testFunc(ret)
|
||||
testFunc(cx)
|
||||
t.Log("---testFunc logic DONE")
|
||||
}()
|
||||
|
||||
timeout := 2*ret.dialTimeout + time.Second
|
||||
if ret.dialTimeout == 0 {
|
||||
timeout := 2*cx.dialTimeout + time.Second
|
||||
if cx.dialTimeout == 0 {
|
||||
timeout = 30 * time.Second
|
||||
}
|
||||
select {
|
||||
|
@ -276,12 +284,12 @@ func testCtlWithOffline(t *testing.T, testFunc func(ctlCtx), testOfflineFunc fun
|
|||
}
|
||||
|
||||
t.Log("closing test cluster...")
|
||||
assert.NoError(t, epc.Close())
|
||||
epc = nil
|
||||
assert.NoError(t, cx.epc.Close())
|
||||
cx.epc = nil
|
||||
t.Log("closed test cluster...")
|
||||
|
||||
if testOfflineFunc != nil {
|
||||
testOfflineFunc(ret)
|
||||
testOfflineFunc(cx)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -176,6 +176,26 @@ type EtcdProcessClusterConfig struct {
|
|||
// NewEtcdProcessCluster launches a new cluster from etcd processes, returning
|
||||
// a new EtcdProcessCluster once all nodes are ready to accept client requests.
|
||||
func NewEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdProcessCluster, error) {
|
||||
epc, err := InitEtcdProcessCluster(t, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cfg.RollingStart {
|
||||
if err := epc.RollingStart(); err != nil {
|
||||
return nil, fmt.Errorf("Cannot rolling-start: %v", err)
|
||||
}
|
||||
} else {
|
||||
if err := epc.Start(); err != nil {
|
||||
return nil, fmt.Errorf("Cannot start: %v", err)
|
||||
}
|
||||
}
|
||||
return epc, nil
|
||||
}
|
||||
|
||||
// InitEtcdProcessCluster initializes a new cluster based on the given config.
|
||||
// It doesn't start the cluster.
|
||||
func InitEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdProcessCluster, error) {
|
||||
SkipInShortMode(t)
|
||||
|
||||
etcdCfgs := cfg.EtcdServerProcessConfigs(t)
|
||||
|
@ -190,20 +210,11 @@ func NewEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdPr
|
|||
proc, err := NewEtcdProcess(etcdCfgs[i])
|
||||
if err != nil {
|
||||
epc.Close()
|
||||
return nil, fmt.Errorf("Cannot configure: %v", err)
|
||||
return nil, fmt.Errorf("cannot configure: %v", err)
|
||||
}
|
||||
epc.Procs[i] = proc
|
||||
}
|
||||
|
||||
if cfg.RollingStart {
|
||||
if err := epc.RollingStart(); err != nil {
|
||||
return nil, fmt.Errorf("Cannot rolling-start: %v", err)
|
||||
}
|
||||
} else {
|
||||
if err := epc.Start(); err != nil {
|
||||
return nil, fmt.Errorf("Cannot start: %v", err)
|
||||
}
|
||||
}
|
||||
return epc, nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue