Merge pull request #15604 from jmhbnz/robustness-ensure-healthy-clus

tests: Ensure healthy cluster before and after robustness failpoint
storage-doc
Marek Siarkowicz 2023-04-04 14:22:52 +02:00 committed by GitHub
commit f9d124974b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 46 additions and 1 deletions

View File

@ -23,6 +23,7 @@ import (
"time"
"go.uber.org/zap"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
@ -98,18 +99,62 @@ func triggerFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *
}
for successes < config.count && failures < config.retries {
time.Sleep(config.waitBetweenTriggers)
lg.Info("Triggering failpoint\n", zap.String("failpoint", config.failpoint.Name()))
lg.Info("Verifying cluster health before failpoint", zap.String("failpoint", config.failpoint.Name()))
if err = verifyClusterHealth(ctx, t, clus); err != nil {
t.Errorf("failed to verify cluster health before failpoint injection, err: %v", err)
return
}
lg.Info("Triggering failpoint", zap.String("failpoint", config.failpoint.Name()))
err = config.failpoint.Trigger(ctx, t, lg, clus)
if err != nil {
lg.Info("Failed to trigger failpoint", zap.String("failpoint", config.failpoint.Name()), zap.Error(err))
failures++
continue
}
lg.Info("Verifying cluster health after failpoint", zap.String("failpoint", config.failpoint.Name()))
if err = verifyClusterHealth(ctx, t, clus); err != nil {
t.Errorf("failed to verify cluster health after failpoint injection, err: %v", err)
return
}
successes++
}
if successes < config.count || failures >= config.retries {
t.Errorf("failed to trigger failpoints enough times, err: %v", err)
}
return
}
func verifyClusterHealth(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster) error {
for i := 0; i < len(clus.Procs); i++ {
clusterClient, err := clientv3.New(clientv3.Config{
Endpoints: clus.Procs[i].EndpointsGRPC(),
Logger: zap.NewNop(),
DialKeepAliveTime: 1 * time.Millisecond,
DialKeepAliveTimeout: 5 * time.Millisecond,
})
if err != nil {
return fmt.Errorf("Error creating client for cluster %s: %v", clus.Procs[i].Config().Name, err)
}
defer clusterClient.Close()
cli := healthpb.NewHealthClient(clusterClient.ActiveConnection())
resp, err := cli.Check(ctx, &healthpb.HealthCheckRequest{})
if err != nil {
return fmt.Errorf("Error checking member %s health: %v", clus.Procs[i].Config().Name, err)
}
if resp.Status != healthpb.HealthCheckResponse_SERVING {
return fmt.Errorf("Member %s health status expected %s, got %s",
clus.Procs[i].Config().Name,
healthpb.HealthCheckResponse_SERVING,
resp.Status)
}
}
return nil
}
type FailpointConfig struct {