tests: Trigger only failpoints available in binary

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
dependabot/go_modules/go.uber.org/atomic-1.10.0
Marek Siarkowicz 2022-12-26 14:05:51 +01:00
parent a992fb5e92
commit deb4291485
4 changed files with 130 additions and 33 deletions

View File

@ -128,6 +128,10 @@ func (p *proxyEtcdProcess) PeerProxy() proxy.Server {
return nil
}
func (p *proxyEtcdProcess) Failpoints() *BinaryFailpoints {
return p.etcdProc.Failpoints()
}
type proxyProc struct {
lg *zap.Logger
name string

View File

@ -15,8 +15,11 @@
package e2e
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/url"
"os"
"strings"
@ -51,6 +54,7 @@ type EtcdProcess interface {
Close() error
Config() *EtcdServerProcessConfig
PeerProxy() proxy.Server
Failpoints() *BinaryFailpoints
Logs() LogsExpect
Kill() error
}
@ -62,10 +66,11 @@ type LogsExpect interface {
}
type EtcdServerProcess struct {
cfg *EtcdServerProcessConfig
proc *expect.ExpectProcess
proxy proxy.Server
donec chan struct{} // closed when Interact() terminates
cfg *EtcdServerProcessConfig
proc *expect.ExpectProcess
proxy proxy.Server
failpoints *BinaryFailpoints
donec chan struct{} // closed when Interact() terminates
}
type EtcdServerProcessConfig struct {
@ -101,7 +106,11 @@ func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, err
return nil, err
}
}
return &EtcdServerProcess{cfg: cfg, donec: make(chan struct{})}, nil
ep := &EtcdServerProcess{cfg: cfg, donec: make(chan struct{})}
if cfg.GoFailPort != 0 {
ep.failpoints = &BinaryFailpoints{member: ep}
}
return ep, nil
}
func (ep *EtcdServerProcess) EndpointsV2() []string { return []string{ep.cfg.ClientURL} }
@ -269,3 +278,72 @@ func AssertProcessLogs(t *testing.T, ep EtcdProcess, expectLog string) {
func (ep *EtcdServerProcess) PeerProxy() proxy.Server {
return ep.proxy
}
func (ep *EtcdServerProcess) Failpoints() *BinaryFailpoints {
return ep.failpoints
}
type BinaryFailpoints struct {
member EtcdProcess
availableCache map[string]struct{}
}
func (f *BinaryFailpoints) Setup(ctx context.Context, failpoint, payload string) error {
host := fmt.Sprintf("127.0.0.1:%d", f.member.Config().GoFailPort)
failpointUrl := url.URL{
Scheme: "http",
Host: host,
Path: failpoint,
}
r, err := http.NewRequestWithContext(ctx, "PUT", failpointUrl.String(), bytes.NewBuffer([]byte(payload)))
if err != nil {
return err
}
resp, err := httpClient.Do(r)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
return fmt.Errorf("bad status code: %d", resp.StatusCode)
}
return nil
}
var httpClient = http.Client{
Timeout: 10 * time.Millisecond,
}
func (f *BinaryFailpoints) Available() map[string]struct{} {
if f.availableCache == nil {
fs, err := fetchFailpoints(f.member)
if err != nil {
panic(err)
}
f.availableCache = fs
}
return f.availableCache
}
func fetchFailpoints(member EtcdProcess) (map[string]struct{}, error) {
address := fmt.Sprintf("127.0.0.1:%d", member.Config().GoFailPort)
failpointUrl := url.URL{
Scheme: "http",
Host: address,
}
resp, err := http.Get(failpointUrl.String())
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
text := strings.ReplaceAll(string(body), "=", "")
failpoints := map[string]struct{}{}
for _, f := range strings.Split(text, "\n") {
failpoints[f] = struct{}{}
}
return failpoints, nil
}

View File

@ -15,12 +15,9 @@
package linearizability
import (
"bytes"
"context"
"fmt"
"math/rand"
"net/http"
"net/url"
"strings"
"testing"
"time"
@ -83,6 +80,7 @@ var (
type Failpoint interface {
Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error
Name() string
Available(e2e.EtcdProcess) bool
}
type killFailpoint struct{}
@ -114,6 +112,10 @@ func (f killFailpoint) Name() string {
return "Kill"
}
func (f killFailpoint) Available(e2e.EtcdProcess) bool {
return true
}
type goPanicFailpoint struct {
failpoint string
trigger func(ctx context.Context, member e2e.EtcdProcess) error
@ -129,13 +131,12 @@ const (
func (f goPanicFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
member := f.pickMember(t, clus)
address := fmt.Sprintf("127.0.0.1:%d", member.Config().GoFailPort)
triggerCtx, cancel := context.WithTimeout(ctx, triggerTimeout)
defer cancel()
for member.IsRunning() {
err := setupGoFailpoint(triggerCtx, address, f.failpoint, "panic")
err := member.Failpoints().Setup(triggerCtx, f.failpoint, "panic")
if err != nil {
t.Logf("gofailpoint setup failed: %v", err)
}
@ -169,25 +170,14 @@ func (f goPanicFailpoint) pickMember(t *testing.T, clus *e2e.EtcdProcessCluster)
}
}
func setupGoFailpoint(ctx context.Context, host, failpoint, payload string) error {
failpointUrl := url.URL{
Scheme: "http",
Host: host,
Path: failpoint,
func (f goPanicFailpoint) Available(member e2e.EtcdProcess) bool {
memberFailpoints := member.Failpoints()
if memberFailpoints == nil {
return false
}
r, err := http.NewRequestWithContext(ctx, "PUT", failpointUrl.String(), bytes.NewBuffer([]byte(payload)))
if err != nil {
return err
}
resp, err := httpClient.Do(r)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
return fmt.Errorf("bad status code: %d", resp.StatusCode)
}
return nil
available := memberFailpoints.Available()
_, found := available[f.failpoint]
return found
}
func (f goPanicFailpoint) Name() string {
@ -234,16 +224,24 @@ func triggerCompact(ctx context.Context, member e2e.EtcdProcess) error {
return nil
}
var httpClient = http.Client{
Timeout: 10 * time.Millisecond,
}
type randomFailpoint struct {
failpoints []Failpoint
}
func (f randomFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
failpoint := f.failpoints[rand.Int()%len(f.failpoints)]
availableFailpoints := make([]Failpoint, 0, len(f.failpoints))
for _, failpoint := range f.failpoints {
count := 0
for _, proc := range clus.Procs {
if failpoint.Available(proc) {
count++
}
}
if count == len(clus.Procs) {
availableFailpoints = append(availableFailpoints, failpoint)
}
}
failpoint := availableFailpoints[rand.Int()%len(availableFailpoints)]
t.Logf("Triggering %v failpoint\n", failpoint.Name())
return failpoint.Trigger(t, ctx, clus)
}
@ -252,6 +250,10 @@ func (f randomFailpoint) Name() string {
return "Random"
}
func (f randomFailpoint) Available(e2e.EtcdProcess) bool {
return true
}
type blackholePeerNetworkFailpoint struct {
duration time.Duration
}
@ -274,6 +276,10 @@ func (f blackholePeerNetworkFailpoint) Name() string {
return "blackhole"
}
func (f blackholePeerNetworkFailpoint) Available(clus e2e.EtcdProcess) bool {
return clus.PeerProxy() != nil
}
type delayPeerNetworkFailpoint struct {
duration time.Duration
baseLatency time.Duration
@ -297,3 +303,7 @@ func (f delayPeerNetworkFailpoint) Trigger(t *testing.T, ctx context.Context, cl
func (f delayPeerNetworkFailpoint) Name() string {
return "delay"
}
func (f delayPeerNetworkFailpoint) Available(clus e2e.EtcdProcess) bool {
return clus.PeerProxy() != nil
}

View File

@ -127,6 +127,11 @@ func triggerFailpoints(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessC
var err error
successes := 0
failures := 0
for _, proc := range clus.Procs {
if !config.failpoint.Available(proc) {
return fmt.Errorf("failpoint %q not available on %s", config.failpoint.Name(), proc.Config().Name)
}
}
for successes < config.count && failures < config.retries {
time.Sleep(config.waitBetweenTriggers)
err = config.failpoint.Trigger(t, ctx, clus)