Merge pull request #14388 from niconorsk/fix/notify-systemd-when-cluster-ready-times-out

etcdmain: Honour ExperimentalWaitClusterReadyTimeout in startEtcd
dependabot/go_modules/go.uber.org/atomic-1.10.0
Benjamin Wang 2022-08-26 18:34:52 +08:00 committed by GitHub
commit dc4b810195
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 65 additions and 15 deletions

View File

@ -19,6 +19,7 @@ import (
"os"
"runtime"
"strings"
"time"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/client/pkg/v3/logutil"
@ -207,6 +208,8 @@ func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
select {
case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster
case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped'
case <-time.After(cfg.ExperimentalWaitClusterReadyTimeout):
e.GetLogger().Warn("startEtcd: timed out waiting for the ready notification")
}
return e.Server.StopNotify(), e.Err(), nil
}

View File

@ -61,7 +61,7 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int) {
t.Log("Downgrade enabled, validating if cluster is ready for downgrade")
for i := 0; i < len(epc.Procs); i++ {
expectLog(t, epc.Procs[i], "The server is ready to downgrade")
e2e.AssertProcessLogs(t, epc.Procs[i], "The server is ready to downgrade")
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: currentVersionStr})
}
t.Log("Cluster is ready for downgrade")
@ -73,7 +73,7 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int) {
startEtcd(t, epc.Procs[i], lastReleaseBinary)
}
t.Log("All members downgraded, validating downgrade")
expectLog(t, leader(t, epc), "the cluster has been downgraded")
e2e.AssertProcessLogs(t, leader(t, epc), "the cluster has been downgraded")
for i := 0; i < len(epc.Procs); i++ {
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: lastVersionStr})
}
@ -164,17 +164,6 @@ func validateVersion(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, member e2e
}
}
func expectLog(t *testing.T, ep e2e.EtcdProcess, expectLog string) {
t.Helper()
var err error
testutils.ExecuteWithTimeout(t, 30*time.Second, func() {
_, err = ep.Logs().Expect(expectLog)
})
if err != nil {
t.Fatal(err)
}
}
func leader(t *testing.T, epc *e2e.EtcdProcessCluster) e2e.EtcdProcess {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

View File

@ -151,7 +151,7 @@ func firstMatch(t *testing.T, expectLine string, logs ...e2e.LogsExpect) string
match := make(chan string, len(logs))
for i := range logs {
go func(l e2e.LogsExpect) {
line, _ := l.Expect(expectLine)
line, _ := l.ExpectWithContext(context.TODO(), expectLine)
match <- line
}(logs[i])
}

View File

@ -0,0 +1,44 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package e2e
import (
"testing"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)
func TestInitDaemonNotifyWithoutQuorum(t *testing.T) {
// Initialize a cluster with 3 members
epc, err := e2e.InitEtcdProcessCluster(t, e2e.NewConfigAutoTLS())
if err != nil {
t.Fatalf("Failed to initilize the etcd cluster: %v", err)
}
// Remove two members, so that only one etcd will get started
epc.Procs = epc.Procs[:1]
// Start the etcd cluster with only one member
if err := epc.Start(); err != nil {
t.Fatalf("Failed to start the etcd cluster: %v", err)
}
// Expect log message indicating time out waiting for quorum hit
e2e.AssertProcessLogs(t, epc.Procs[0], "startEtcd: timed out waiting for the ready notification")
// Expect log message indicating systemd notify message has been sent
e2e.AssertProcessLogs(t, epc.Procs[0], "notifying init daemon")
epc.Close()
}

View File

@ -15,9 +15,12 @@
package e2e
import (
"context"
"fmt"
"net/url"
"os"
"testing"
"time"
"go.uber.org/zap"
@ -48,7 +51,7 @@ type EtcdProcess interface {
}
type LogsExpect interface {
Expect(string) (string, error)
ExpectWithContext(context.Context, string) (string, error)
Lines() []string
LineCount() int
}
@ -179,3 +182,14 @@ func (ep *EtcdServerProcess) Logs() LogsExpect {
}
return ep.proc
}
func AssertProcessLogs(t *testing.T, ep EtcdProcess, expectLog string) {
t.Helper()
var err error
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err = ep.Logs().ExpectWithContext(ctx, expectLog)
if err != nil {
t.Fatal(err)
}
}