etcd-tester: add failpoint cases

Fixes #5754
release-3.1
Anthony Romano 2016-06-28 18:50:52 -07:00
parent 8d4701bb1d
commit a5f043c85b
6 changed files with 197 additions and 16 deletions

View File

@ -27,7 +27,10 @@ import (
"github.com/coreos/etcd/tools/functional-tester/etcd-agent/client"
)
const peerURLPort = 2380
const (
peerURLPort = 2380
failpointPort = 2381
)
type cluster struct {
v2Only bool // to be deprecated
@ -75,11 +78,12 @@ func (c *cluster) bootstrap(agentEndpoints []string) error {
return err
}
members[i] = &member{
Agent: agent,
Endpoint: u,
Name: fmt.Sprintf("etcd-%d", i),
ClientURL: fmt.Sprintf("http://%s:2379", host),
PeerURL: fmt.Sprintf("http://%s:%d", host, peerURLPort),
Agent: agent,
Endpoint: u,
Name: fmt.Sprintf("etcd-%d", i),
ClientURL: fmt.Sprintf("http://%s:2379", host),
PeerURL: fmt.Sprintf("http://%s:%d", host, peerURLPort),
FailpointURL: fmt.Sprintf("http://%s:%d", host, failpointPort),
}
memberNameURLs[i] = members[i].ClusterEntry()
}

View File

@ -0,0 +1,155 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"fmt"
"io/ioutil"
"net/http"
"strings"
"sync"
"time"
)
type failpointStats struct {
// crashes counts the number of crashes for a failpoint
crashes map[string]int
// mu protects crashes
mu sync.Mutex
}
var fpStats failpointStats
func failpointFailures(c *cluster) (ret []failure, err error) {
var fps []string
fps, err = failpointPaths(c.Members[0].FailpointURL)
if err != nil {
return nil, err
}
// create failure objects for all failpoints
for _, fp := range fps {
if len(fp) == 0 {
continue
}
fpFails := failuresFromFailpoint(fp)
// wrap in delays so failpoint has time to trigger
for i, fpf := range fpFails {
if strings.Contains(fp, "Snap") {
// hack to trigger snapshot failpoints
fpFails[i] = &failureUntilSnapshot{fpf}
} else {
fpFails[i] = &failureDelay{fpf, 3 * time.Second}
}
}
ret = append(ret, fpFails...)
}
fpStats.crashes = make(map[string]int)
return ret, err
}
func failpointPaths(endpoint string) ([]string, error) {
resp, err := http.Get(endpoint)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, rerr := ioutil.ReadAll(resp.Body)
if rerr != nil {
return nil, rerr
}
var fps []string
for _, l := range strings.Split(string(body), "\n") {
fp := strings.Split(l, "=")[0]
fps = append(fps, fp)
}
return fps, nil
}
func failuresFromFailpoint(fp string) []failure {
inject := makeInjectFailpoint(fp, `panic("etcd-tester")`)
recov := makeRecoverFailpoint(fp)
return []failure{
&failureOne{
description: description("failpoint " + fp + " panic one"),
injectMember: inject,
recoverMember: recov,
},
&failureAll{
description: description("failpoint " + fp + " panic all"),
injectMember: inject,
recoverMember: recov,
},
&failureMajority{
description: description("failpoint " + fp + " panic majority"),
injectMember: inject,
recoverMember: recov,
},
&failureLeader{
failureByFunc{
description: description("failpoint " + fp + " panic leader"),
injectMember: inject,
recoverMember: recov,
},
0,
},
}
}
func makeInjectFailpoint(fp, val string) injectMemberFunc {
return func(m *member) (err error) {
return putFailpoint(m.FailpointURL, fp, val)
}
}
func makeRecoverFailpoint(fp string) recoverMemberFunc {
return func(m *member) error {
if err := delFailpoint(m.FailpointURL, fp); err == nil {
return nil
}
// node not responding, likely dead from fp panic; restart
fpStats.mu.Lock()
fpStats.crashes[fp]++
fpStats.mu.Unlock()
return recoverStop(m)
}
}
func putFailpoint(ep, fp, val string) error {
req, _ := http.NewRequest(http.MethodPut, ep+"/"+fp, strings.NewReader(val))
c := http.Client{}
resp, err := c.Do(req)
if err != nil {
return err
}
resp.Body.Close()
if resp.StatusCode/100 != 2 {
return fmt.Errorf("failed to PUT %s=%s at %s (%v)", fp, val, ep, resp.Status)
}
return nil
}
func delFailpoint(ep, fp string) error {
req, _ := http.NewRequest(http.MethodDelete, ep+"/"+fp, strings.NewReader(""))
c := http.Client{}
resp, err := c.Do(req)
if err != nil {
return err
}
resp.Body.Close()
if resp.StatusCode/100 != 2 {
return fmt.Errorf("failed to DELETE %s at %s (%v)", fp, ep, resp.Status)
}
return nil
}

View File

@ -52,8 +52,13 @@ type failureLeader struct {
idx int
}
// failureDelay injects a failure and waits for a snapshot event
type failureDelay struct{ failure }
type failureDelay struct {
failure
delayDuration time.Duration
}
// failureUntilSnapshot injects a failure and waits for a snapshot event
type failureUntilSnapshot struct{ failure }
func (f *failureOne) Inject(c *cluster, round int) error {
return f.injectMember(c.Members[round%c.Size])
@ -122,6 +127,14 @@ func (f *failureDelay) Inject(c *cluster, round int) error {
if err := f.failure.Inject(c, round); err != nil {
return err
}
time.Sleep(f.delayDuration)
return nil
}
func (f *failureUntilSnapshot) Inject(c *cluster, round int) error {
if err := f.failure.Inject(c, round); err != nil {
return err
}
if c.Size < 3 {
return nil
@ -144,7 +157,7 @@ func (f *failureDelay) Inject(c *cluster, round int) error {
return fmt.Errorf("cluster too slow: only commit %d requests in %ds", end-start, retry)
}
func (f *failureDelay) Desc() string {
func (f *failureUntilSnapshot) Desc() string {
return f.failure.Desc() + " for a long time and expect it to recover from an incoming snapshot"
}

View File

@ -71,11 +71,11 @@ func newFailureKillLeader() failure {
}
func newFailureKillOneForLongTime() failure {
return &failureDelay{newFailureKillOne()}
return &failureUntilSnapshot{newFailureKillOne()}
}
func newFailureKillLeaderForLongTime() failure {
return &failureDelay{newFailureKillLeader()}
return &failureUntilSnapshot{newFailureKillLeader()}
}
func injectDropPort(m *member) error { return m.Agent.DropPort(peerURLPort) }

View File

@ -58,6 +58,14 @@ func main() {
newFailureSlowNetworkAll(),
}
// ensure cluster is fully booted to know failpoints are available
c.WaitHealth()
fpFailures, fperr := failpointFailures(c)
if len(fpFailures) == 0 {
plog.Infof("no failpoints found (%v)", fperr)
}
failures = append(failures, fpFailures...)
schedule := failures
if schedCases != nil && *schedCases != "" {
cases := strings.Split(*schedCases, " ")

View File

@ -29,11 +29,12 @@ import (
)
type member struct {
Agent client.Agent
Endpoint string
Name string
ClientURL string
PeerURL string
Agent client.Agent
Endpoint string
Name string
ClientURL string
PeerURL string
FailpointURL string
}
func (m *member) ClusterEntry() string { return m.Name + "=" + m.PeerURL }