diff --git a/tests/robustness/failpoints.go b/tests/robustness/failpoints.go index 77241afe0..05f7f8510 100644 --- a/tests/robustness/failpoints.go +++ b/tests/robustness/failpoints.go @@ -23,6 +23,7 @@ import ( "time" "go.uber.org/zap" + healthpb "google.golang.org/grpc/health/grpc_health_v1" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/framework/e2e" @@ -98,18 +99,62 @@ func triggerFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus * } for successes < config.count && failures < config.retries { time.Sleep(config.waitBetweenTriggers) - lg.Info("Triggering failpoint\n", zap.String("failpoint", config.failpoint.Name())) + + lg.Info("Verifying cluster health before failpoint", zap.String("failpoint", config.failpoint.Name())) + if err = verifyClusterHealth(ctx, t, clus); err != nil { + t.Errorf("failed to verify cluster health before failpoint injection, err: %v", err) + return + } + + lg.Info("Triggering failpoint", zap.String("failpoint", config.failpoint.Name())) err = config.failpoint.Trigger(ctx, t, lg, clus) if err != nil { lg.Info("Failed to trigger failpoint", zap.String("failpoint", config.failpoint.Name()), zap.Error(err)) failures++ continue } + + lg.Info("Verifying cluster health after failpoint", zap.String("failpoint", config.failpoint.Name())) + if err = verifyClusterHealth(ctx, t, clus); err != nil { + t.Errorf("failed to verify cluster health after failpoint injection, err: %v", err) + return + } + successes++ } if successes < config.count || failures >= config.retries { t.Errorf("failed to trigger failpoints enough times, err: %v", err) } + + return +} + +func verifyClusterHealth(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster) error { + for i := 0; i < len(clus.Procs); i++ { + clusterClient, err := clientv3.New(clientv3.Config{ + Endpoints: clus.Procs[i].EndpointsGRPC(), + Logger: zap.NewNop(), + DialKeepAliveTime: 1 * time.Millisecond, + DialKeepAliveTimeout: 5 * time.Millisecond, + }) + if err != nil { + return fmt.Errorf("Error creating client for cluster %s: %v", clus.Procs[i].Config().Name, err) + } + defer clusterClient.Close() + + cli := healthpb.NewHealthClient(clusterClient.ActiveConnection()) + resp, err := cli.Check(ctx, &healthpb.HealthCheckRequest{}) + if err != nil { + return fmt.Errorf("Error checking member %s health: %v", clus.Procs[i].Config().Name, err) + } + if resp.Status != healthpb.HealthCheckResponse_SERVING { + return fmt.Errorf("Member %s health status expected %s, got %s", + clus.Procs[i].Config().Name, + healthpb.HealthCheckResponse_SERVING, + resp.Status) + } + } + return nil } type FailpointConfig struct {