functional-tester/tester: initial commit

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
release-3.4
Gyuho Lee 2018-03-27 18:27:28 -07:00
parent ca54bc22c7
commit 2bc666292e
20 changed files with 3535 additions and 0 deletions

View File

@ -0,0 +1,319 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import (
"context"
"fmt"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"go.uber.org/zap"
"google.golang.org/grpc"
)
const retries = 7
type Checker interface {
// Check returns an error if the system fails a consistency check.
Check() error
}
type hashAndRevGetter interface {
getRevisionHash() (revs map[string]int64, hashes map[string]int64, err error)
}
type hashChecker struct {
logger *zap.Logger
hrg hashAndRevGetter
}
func newHashChecker(logger *zap.Logger, hrg hashAndRevGetter) Checker {
return &hashChecker{
logger: logger,
hrg: hrg,
}
}
const leaseCheckerTimeout = 10 * time.Second
func (hc *hashChecker) checkRevAndHashes() (err error) {
var (
revs map[string]int64
hashes map[string]int64
)
// retries in case of transient failure or etcd cluster has not stablized yet.
for i := 0; i < retries; i++ {
revs, hashes, err = hc.hrg.getRevisionHash()
if err != nil {
hc.logger.Warn(
"failed to get revision and hash",
zap.Int("retries", i),
zap.Error(err),
)
} else {
sameRev := getSameValue(revs)
sameHashes := getSameValue(hashes)
if sameRev && sameHashes {
return nil
}
hc.logger.Warn(
"retrying; etcd cluster is not stable",
zap.Int("retries", i),
zap.Bool("same-revisions", sameRev),
zap.Bool("same-hashes", sameHashes),
zap.String("revisions", fmt.Sprintf("%+v", revs)),
zap.String("hashes", fmt.Sprintf("%+v", hashes)),
)
}
time.Sleep(time.Second)
}
if err != nil {
return fmt.Errorf("failed revision and hash check (%v)", err)
}
return fmt.Errorf("etcd cluster is not stable: [revisions: %v] and [hashes: %v]", revs, hashes)
}
func (hc *hashChecker) Check() error {
return hc.checkRevAndHashes()
}
type leaseChecker struct {
logger *zap.Logger
// TODO: use Member
endpoint string
ls *leaseStresser
leaseClient pb.LeaseClient
kvc pb.KVClient
}
func (lc *leaseChecker) Check() error {
conn, err := grpc.Dial(lc.ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1))
if err != nil {
return fmt.Errorf("%v (%s)", err, lc.ls.endpoint)
}
defer func() {
if conn != nil {
conn.Close()
}
}()
lc.kvc = pb.NewKVClient(conn)
lc.leaseClient = pb.NewLeaseClient(conn)
if err := lc.check(true, lc.ls.revokedLeases.leases); err != nil {
return err
}
if err := lc.check(false, lc.ls.aliveLeases.leases); err != nil {
return err
}
return lc.checkShortLivedLeases()
}
// checkShortLivedLeases ensures leases expire.
func (lc *leaseChecker) checkShortLivedLeases() error {
ctx, cancel := context.WithTimeout(context.Background(), leaseCheckerTimeout)
errc := make(chan error)
defer cancel()
for leaseID := range lc.ls.shortLivedLeases.leases {
go func(id int64) {
errc <- lc.checkShortLivedLease(ctx, id)
}(leaseID)
}
var errs []error
for range lc.ls.shortLivedLeases.leases {
if err := <-errc; err != nil {
errs = append(errs, err)
}
}
return errsToError(errs)
}
func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64) (err error) {
// retry in case of transient failure or lease is expired but not yet revoked due to the fact that etcd cluster didn't have enought time to delete it.
var resp *pb.LeaseTimeToLiveResponse
for i := 0; i < retries; i++ {
resp, err = lc.getLeaseByID(ctx, leaseID)
// lease not found, for ~v3.1 compatibilities, check ErrLeaseNotFound
if (err == nil && resp.TTL == -1) || (err != nil && rpctypes.Error(err) == rpctypes.ErrLeaseNotFound) {
return nil
}
if err != nil {
lc.logger.Debug(
"retrying; Lease TimeToLive failed",
zap.Int("retries", i),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err),
)
continue
}
if resp.TTL > 0 {
dur := time.Duration(resp.TTL) * time.Second
lc.logger.Debug(
"lease has not been expired, wait until expire",
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Int64("ttl", resp.TTL),
zap.Duration("wait-duration", dur),
)
time.Sleep(dur)
} else {
lc.logger.Debug(
"lease expired but not yet revoked",
zap.Int("retries", i),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Int64("ttl", resp.TTL),
zap.Duration("wait-duration", time.Second),
)
time.Sleep(time.Second)
}
if err = lc.checkLease(ctx, false, leaseID); err != nil {
continue
}
return nil
}
return err
}
func (lc *leaseChecker) checkLease(ctx context.Context, expired bool, leaseID int64) error {
keysExpired, err := lc.hasKeysAttachedToLeaseExpired(ctx, leaseID)
if err != nil {
lc.logger.Warn(
"hasKeysAttachedToLeaseExpired failed",
zap.String("endpoint", lc.endpoint),
zap.Error(err),
)
return err
}
leaseExpired, err := lc.hasLeaseExpired(ctx, leaseID)
if err != nil {
lc.logger.Warn(
"hasLeaseExpired failed",
zap.String("endpoint", lc.endpoint),
zap.Error(err),
)
return err
}
if leaseExpired != keysExpired {
return fmt.Errorf("lease %v expiration mismatch (lease expired=%v, keys expired=%v)", leaseID, leaseExpired, keysExpired)
}
if leaseExpired != expired {
return fmt.Errorf("lease %v expected expired=%v, got %v", leaseID, expired, leaseExpired)
}
return nil
}
func (lc *leaseChecker) check(expired bool, leases map[int64]time.Time) error {
ctx, cancel := context.WithTimeout(context.Background(), leaseCheckerTimeout)
defer cancel()
for leaseID := range leases {
if err := lc.checkLease(ctx, expired, leaseID); err != nil {
return err
}
}
return nil
}
func (lc *leaseChecker) getLeaseByID(ctx context.Context, leaseID int64) (*pb.LeaseTimeToLiveResponse, error) {
ltl := &pb.LeaseTimeToLiveRequest{ID: leaseID, Keys: true}
return lc.leaseClient.LeaseTimeToLive(ctx, ltl, grpc.FailFast(false))
}
func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
// keep retrying until lease's state is known or ctx is being canceled
for ctx.Err() == nil {
resp, err := lc.getLeaseByID(ctx, leaseID)
if err != nil {
// for ~v3.1 compatibilities
if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
return true, nil
}
} else {
return resp.TTL == -1, nil
}
lc.logger.Warn(
"hasLeaseExpired getLeaseByID failed",
zap.String("endpoint", lc.endpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err),
)
}
return false, ctx.Err()
}
// The keys attached to the lease has the format of "<leaseID>_<idx>" where idx is the ordering key creation
// Since the format of keys contains about leaseID, finding keys base on "<leaseID>" prefix
// determines whether the attached keys for a given leaseID has been deleted or not
func (lc *leaseChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
resp, err := lc.kvc.Range(ctx, &pb.RangeRequest{
Key: []byte(fmt.Sprintf("%d", leaseID)),
RangeEnd: []byte(clientv3.GetPrefixRangeEnd(fmt.Sprintf("%d", leaseID))),
}, grpc.FailFast(false))
if err != nil {
lc.logger.Warn(
"hasKeysAttachedToLeaseExpired failed",
zap.String("endpoint", lc.endpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err),
)
return false, err
}
return len(resp.Kvs) == 0, nil
}
// compositeChecker implements a checker that runs a slice of Checkers concurrently.
type compositeChecker struct{ checkers []Checker }
func newCompositeChecker(checkers []Checker) Checker {
return &compositeChecker{checkers}
}
func (cchecker *compositeChecker) Check() error {
errc := make(chan error)
for _, c := range cchecker.checkers {
go func(chk Checker) { errc <- chk.Check() }(c)
}
var errs []error
for range cchecker.checkers {
if err := <-errc; err != nil {
errs = append(errs, err)
}
}
return errsToError(errs)
}
type runnerChecker struct {
errc chan error
}
func (rc *runnerChecker) Check() error {
select {
case err := <-rc.errc:
return err
default:
return nil
}
}
type noChecker struct{}
func newNoChecker() Checker { return &noChecker{} }
func (nc *noChecker) Check() error { return nil }

View File

@ -0,0 +1,724 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import (
"context"
"errors"
"fmt"
"io/ioutil"
"net/http"
"path/filepath"
"strings"
"time"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/debugutil"
"github.com/coreos/etcd/tools/functional-tester/rpcpb"
"golang.org/x/time/rate"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
"google.golang.org/grpc"
yaml "gopkg.in/yaml.v2"
)
// Cluster defines tester cluster.
type Cluster struct {
logger *zap.Logger
agentConns []*grpc.ClientConn
agentClients []rpcpb.TransportClient
agentStreams []rpcpb.Transport_TransportClient
agentRequests []*rpcpb.Request
testerHTTPServer *http.Server
Members []*rpcpb.Member `yaml:"agent-configs"`
Tester *rpcpb.Tester `yaml:"tester-config"`
failures []Failure
rateLimiter *rate.Limiter
stresser Stresser
checker Checker
currentRevision int64
rd int
cs int
}
func newCluster(logger *zap.Logger, fpath string) (*Cluster, error) {
logger.Info("reading configuration file", zap.String("path", fpath))
bts, err := ioutil.ReadFile(fpath)
if err != nil {
return nil, err
}
logger.Info("opened configuration file", zap.String("path", fpath))
clus := &Cluster{logger: logger}
if err = yaml.Unmarshal(bts, clus); err != nil {
return nil, err
}
for i := range clus.Members {
if clus.Members[i].BaseDir == "" {
return nil, fmt.Errorf("Members[i].BaseDir cannot be empty (got %q)", clus.Members[i].BaseDir)
}
if clus.Members[i].EtcdLogPath == "" {
return nil, fmt.Errorf("Members[i].EtcdLogPath cannot be empty (got %q)", clus.Members[i].EtcdLogPath)
}
if clus.Members[i].Etcd.Name == "" {
return nil, fmt.Errorf("'--name' cannot be empty (got %+v)", clus.Members[i])
}
if clus.Members[i].Etcd.DataDir == "" {
return nil, fmt.Errorf("'--data-dir' cannot be empty (got %+v)", clus.Members[i])
}
if clus.Members[i].Etcd.SnapshotCount == 0 {
return nil, fmt.Errorf("'--snapshot-count' cannot be 0 (got %+v)", clus.Members[i].Etcd.SnapshotCount)
}
if clus.Members[i].Etcd.DataDir == "" {
return nil, fmt.Errorf("'--data-dir' cannot be empty (got %q)", clus.Members[i].Etcd.DataDir)
}
if clus.Members[i].Etcd.WALDir == "" {
clus.Members[i].Etcd.WALDir = filepath.Join(clus.Members[i].Etcd.DataDir, "member", "wal")
}
port := ""
listenClientPorts := make([]string, len(clus.Members))
for i, u := range clus.Members[i].Etcd.ListenClientURLs {
if !isValidURL(u) {
return nil, fmt.Errorf("'--listen-client-urls' has valid URL %q", u)
}
listenClientPorts[i], err = getPort(u)
if err != nil {
return nil, fmt.Errorf("'--listen-client-urls' has no port %q", u)
}
}
for i, u := range clus.Members[i].Etcd.AdvertiseClientURLs {
if !isValidURL(u) {
return nil, fmt.Errorf("'--advertise-client-urls' has valid URL %q", u)
}
port, err = getPort(u)
if err != nil {
return nil, fmt.Errorf("'--advertise-client-urls' has no port %q", u)
}
if clus.Members[i].EtcdClientProxy && listenClientPorts[i] == port {
return nil, fmt.Errorf("clus.Members[%d] requires client port proxy, but advertise port %q conflicts with listener port %q", i, port, listenClientPorts[i])
}
}
listenPeerPorts := make([]string, len(clus.Members))
for i, u := range clus.Members[i].Etcd.ListenPeerURLs {
if !isValidURL(u) {
return nil, fmt.Errorf("'--listen-peer-urls' has valid URL %q", u)
}
listenPeerPorts[i], err = getPort(u)
if err != nil {
return nil, fmt.Errorf("'--listen-peer-urls' has no port %q", u)
}
}
for i, u := range clus.Members[i].Etcd.InitialAdvertisePeerURLs {
if !isValidURL(u) {
return nil, fmt.Errorf("'--initial-advertise-peer-urls' has valid URL %q", u)
}
port, err = getPort(u)
if err != nil {
return nil, fmt.Errorf("'--initial-advertise-peer-urls' has no port %q", u)
}
if clus.Members[i].EtcdPeerProxy && listenPeerPorts[i] == port {
return nil, fmt.Errorf("clus.Members[%d] requires peer port proxy, but advertise port %q conflicts with listener port %q", i, port, listenPeerPorts[i])
}
}
if !strings.HasPrefix(clus.Members[i].EtcdLogPath, clus.Members[i].BaseDir) {
return nil, fmt.Errorf("EtcdLogPath must be prefixed with BaseDir (got %q)", clus.Members[i].EtcdLogPath)
}
if !strings.HasPrefix(clus.Members[i].Etcd.DataDir, clus.Members[i].BaseDir) {
return nil, fmt.Errorf("Etcd.DataDir must be prefixed with BaseDir (got %q)", clus.Members[i].Etcd.DataDir)
}
// TODO: support separate WALDir that can be handled via failure-archive
if !strings.HasPrefix(clus.Members[i].Etcd.WALDir, clus.Members[i].BaseDir) {
return nil, fmt.Errorf("Etcd.WALDir must be prefixed with BaseDir (got %q)", clus.Members[i].Etcd.WALDir)
}
if len(clus.Tester.FailureCases) == 0 {
return nil, errors.New("FailureCases not found")
}
}
for _, v := range clus.Tester.FailureCases {
if _, ok := rpcpb.FailureCase_value[v]; !ok {
return nil, fmt.Errorf("%q is not defined in 'rpcpb.FailureCase_value'", v)
}
}
for _, v := range clus.Tester.StressTypes {
if _, ok := rpcpb.StressType_value[v]; !ok {
return nil, fmt.Errorf("StressType is unknown; got %q", v)
}
}
if clus.Tester.StressKeySuffixRangeTxn > 100 {
return nil, fmt.Errorf("StressKeySuffixRangeTxn maximum value is 100, got %v", clus.Tester.StressKeySuffixRangeTxn)
}
if clus.Tester.StressKeyTxnOps > 64 {
return nil, fmt.Errorf("StressKeyTxnOps maximum value is 64, got %v", clus.Tester.StressKeyTxnOps)
}
return clus, err
}
// TODO: status handler
var dialOpts = []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithTimeout(5 * time.Second),
grpc.WithBlock(),
}
// NewCluster creates a client from a tester configuration.
func NewCluster(logger *zap.Logger, fpath string) (*Cluster, error) {
clus, err := newCluster(logger, fpath)
if err != nil {
return nil, err
}
clus.agentConns = make([]*grpc.ClientConn, len(clus.Members))
clus.agentClients = make([]rpcpb.TransportClient, len(clus.Members))
clus.agentStreams = make([]rpcpb.Transport_TransportClient, len(clus.Members))
clus.agentRequests = make([]*rpcpb.Request, len(clus.Members))
clus.failures = make([]Failure, 0)
for i, ap := range clus.Members {
logger.Info("connecting", zap.String("agent-address", ap.AgentAddr))
var err error
clus.agentConns[i], err = grpc.Dial(ap.AgentAddr, dialOpts...)
if err != nil {
return nil, err
}
clus.agentClients[i] = rpcpb.NewTransportClient(clus.agentConns[i])
logger.Info("connected", zap.String("agent-address", ap.AgentAddr))
logger.Info("creating stream", zap.String("agent-address", ap.AgentAddr))
clus.agentStreams[i], err = clus.agentClients[i].Transport(context.Background())
if err != nil {
return nil, err
}
logger.Info("created stream", zap.String("agent-address", ap.AgentAddr))
}
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
if clus.Tester.EnablePprof {
for p, h := range debugutil.PProfHandlers() {
mux.Handle(p, h)
}
}
clus.testerHTTPServer = &http.Server{
Addr: clus.Tester.TesterAddr,
Handler: mux,
}
go clus.serveTesterServer()
for _, cs := range clus.Tester.FailureCases {
switch cs {
case "KILL_ONE_FOLLOWER":
clus.failures = append(clus.failures, newFailureKillOne()) // TODO
case "KILL_LEADER":
clus.failures = append(clus.failures, newFailureKillLeader())
case "KILL_ONE_FOLLOWER_FOR_LONG":
clus.failures = append(clus.failures, newFailureKillOneForLongTime()) // TODO
case "KILL_LEADER_FOR_LONG":
clus.failures = append(clus.failures, newFailureKillLeaderForLongTime())
case "KILL_QUORUM":
clus.failures = append(clus.failures, newFailureKillQuorum())
case "KILL_ALL":
clus.failures = append(clus.failures, newFailureKillAll())
case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER":
clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOne()) // TODO
case "BLACKHOLE_PEER_PORT_TX_RX_LEADER_ONE":
clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOne()) // TODO
case "BLACKHOLE_PEER_PORT_TX_RX_ALL":
clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxAll())
case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER":
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneMember()) // TODO
case "DELAY_PEER_PORT_TX_RX_LEADER":
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeader()) // TODO
case "DELAY_PEER_PORT_TX_RX_ALL":
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxAll()) // TODO
case "FAILPOINTS":
fpFailures, fperr := failpointFailures(clus)
if len(fpFailures) == 0 {
clus.logger.Info("no failpoints found!", zap.Error(fperr))
}
clus.failures = append(clus.failures, fpFailures...)
case "NO_FAIL":
clus.failures = append(clus.failures, newFailureNoOp())
case "EXTERNAL":
clus.failures = append(clus.failures, newFailureExternal(clus.Tester.ExternalExecPath))
default:
return nil, fmt.Errorf("unknown failure %q", cs)
}
}
clus.rateLimiter = rate.NewLimiter(
rate.Limit(int(clus.Tester.StressQPS)),
int(clus.Tester.StressQPS),
)
clus.updateStresserChecker()
return clus, nil
}
func (clus *Cluster) serveTesterServer() {
clus.logger.Info(
"started tester HTTP server",
zap.String("tester-address", clus.Tester.TesterAddr),
)
err := clus.testerHTTPServer.ListenAndServe()
clus.logger.Info(
"tester HTTP server returned",
zap.String("tester-address", clus.Tester.TesterAddr),
zap.Error(err),
)
if err != nil && err != http.ErrServerClosed {
clus.logger.Fatal("tester HTTP errored", zap.Error(err))
}
}
func (clus *Cluster) updateStresserChecker() {
clus.logger.Info(
"updating stressers",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
cs := &compositeStresser{}
for idx := range clus.Members {
cs.stressers = append(cs.stressers, newStresser(clus, idx))
}
clus.stresser = cs
clus.checker = newHashChecker(clus.logger, hashAndRevGetter(clus))
if schk := cs.Checker(); schk != nil {
clus.checker = newCompositeChecker([]Checker{clus.checker, schk})
}
clus.logger.Info(
"updated stressers",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
}
func (clus *Cluster) startStresser() (err error) {
clus.logger.Info(
"starting stressers",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
err = clus.stresser.Stress()
clus.logger.Info(
"started stressers",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
return err
}
func (clus *Cluster) closeStresser() {
clus.logger.Info(
"closing stressers",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
clus.stresser.Close()
clus.logger.Info(
"closed stressers",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
}
func (clus *Cluster) pauseStresser() {
clus.logger.Info(
"pausing stressers",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
clus.stresser.Pause()
clus.logger.Info(
"paused stressers",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
}
func (clus *Cluster) checkConsistency() (err error) {
defer func() {
if err != nil {
return
}
if err = clus.updateRevision(); err != nil {
clus.logger.Warn(
"updateRevision failed",
zap.Error(err),
)
return
}
err = clus.startStresser()
}()
clus.logger.Info(
"checking consistency and invariant of cluster",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.String("desc", clus.failures[clus.cs].Desc()),
)
if err = clus.checker.Check(); err != nil {
clus.logger.Warn(
"checker.Check failed",
zap.Error(err),
)
return err
}
clus.logger.Info(
"checked consistency and invariant of cluster",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.String("desc", clus.failures[clus.cs].Desc()),
)
return err
}
// Bootstrap bootstraps etcd cluster the very first time.
// After this, just continue to call kill/restart.
func (clus *Cluster) Bootstrap() error {
// this is the only time that creates request from scratch
return clus.broadcastOperation(rpcpb.Operation_InitialStartEtcd)
}
// FailArchive sends "FailArchive" operation.
func (clus *Cluster) FailArchive() error {
return clus.broadcastOperation(rpcpb.Operation_FailArchive)
}
// Restart sends "Restart" operation.
func (clus *Cluster) Restart() error {
return clus.broadcastOperation(rpcpb.Operation_RestartEtcd)
}
func (clus *Cluster) broadcastOperation(op rpcpb.Operation) error {
for i := range clus.agentStreams {
err := clus.sendOperation(i, op)
if err != nil {
if op == rpcpb.Operation_DestroyEtcdAgent &&
strings.Contains(err.Error(), "rpc error: code = Unavailable desc = transport is closing") {
// agent server has already closed;
// so this error is expected
clus.logger.Info(
"successfully destroyed",
zap.String("member", clus.Members[i].EtcdClientEndpoint),
)
continue
}
return err
}
}
return nil
}
func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
if op == rpcpb.Operation_InitialStartEtcd {
clus.agentRequests[idx] = &rpcpb.Request{
Operation: op,
Member: clus.Members[idx],
Tester: clus.Tester,
}
} else {
clus.agentRequests[idx].Operation = op
}
clus.logger.Info(
"sending request",
zap.String("operation", op.String()),
zap.String("to", clus.Members[idx].EtcdClientEndpoint),
)
err := clus.agentStreams[idx].Send(clus.agentRequests[idx])
clus.logger.Info(
"sent request",
zap.String("operation", op.String()),
zap.String("to", clus.Members[idx].EtcdClientEndpoint),
zap.Error(err),
)
if err != nil {
return err
}
clus.logger.Info(
"receiving response",
zap.String("operation", op.String()),
zap.String("from", clus.Members[idx].EtcdClientEndpoint),
)
resp, err := clus.agentStreams[idx].Recv()
if resp != nil {
clus.logger.Info(
"received response",
zap.String("operation", op.String()),
zap.String("from", clus.Members[idx].EtcdClientEndpoint),
zap.Bool("success", resp.Success),
zap.String("status", resp.Status),
zap.Error(err),
)
} else {
clus.logger.Info(
"received empty response",
zap.String("operation", op.String()),
zap.String("from", clus.Members[idx].EtcdClientEndpoint),
zap.Error(err),
)
}
if err != nil {
return err
}
if !resp.Success {
err = errors.New(resp.Status)
}
return err
}
// DestroyEtcdAgents terminates all tester connections to agents and etcd servers.
func (clus *Cluster) DestroyEtcdAgents() {
clus.logger.Info("destroying etcd servers and agents")
err := clus.broadcastOperation(rpcpb.Operation_DestroyEtcdAgent)
if err != nil {
clus.logger.Warn("failed to destroy etcd servers and agents", zap.Error(err))
} else {
clus.logger.Info("destroyed etcd servers and agents")
}
for i, conn := range clus.agentConns {
clus.logger.Info("closing connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr))
err := conn.Close()
clus.logger.Info("closed connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr), zap.Error(err))
}
// TODO: closing stresser connections to etcd
if clus.testerHTTPServer != nil {
clus.logger.Info("closing tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr))
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
err := clus.testerHTTPServer.Shutdown(ctx)
cancel()
clus.logger.Info("closed tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr), zap.Error(err))
}
}
// WaitHealth ensures all members are healthy
// by writing a test key to etcd cluster.
func (clus *Cluster) WaitHealth() error {
var err error
// wait 60s to check cluster health.
// TODO: set it to a reasonable value. It is set that high because
// follower may use long time to catch up the leader when reboot under
// reasonable workload (https://github.com/coreos/etcd/issues/2698)
for i := 0; i < 60; i++ {
for _, m := range clus.Members {
clus.logger.Info(
"writing health key",
zap.Int("retries", i),
zap.String("endpoint", m.EtcdClientEndpoint),
)
if err = m.WriteHealthKey(); err != nil {
clus.logger.Warn(
"writing health key failed",
zap.Int("retries", i),
zap.String("endpoint", m.EtcdClientEndpoint),
zap.Error(err),
)
break
}
clus.logger.Info(
"successfully wrote health key",
zap.Int("retries", i),
zap.String("endpoint", m.EtcdClientEndpoint),
)
}
if err == nil {
clus.logger.Info(
"writing health key success on all members",
zap.Int("retries", i),
)
return nil
}
time.Sleep(time.Second)
}
return err
}
// GetLeader returns the index of leader and error if any.
func (clus *Cluster) GetLeader() (int, error) {
for i, m := range clus.Members {
isLeader, err := m.IsLeader()
if isLeader || err != nil {
return i, err
}
}
return 0, fmt.Errorf("no leader found")
}
// maxRev returns the maximum revision found on the cluster.
func (clus *Cluster) maxRev() (rev int64, err error) {
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()
revc, errc := make(chan int64, len(clus.Members)), make(chan error, len(clus.Members))
for i := range clus.Members {
go func(m *rpcpb.Member) {
mrev, merr := m.Rev(ctx)
revc <- mrev
errc <- merr
}(clus.Members[i])
}
for i := 0; i < len(clus.Members); i++ {
if merr := <-errc; merr != nil {
err = merr
}
if mrev := <-revc; mrev > rev {
rev = mrev
}
}
return rev, err
}
func (clus *Cluster) getRevisionHash() (map[string]int64, map[string]int64, error) {
revs := make(map[string]int64)
hashes := make(map[string]int64)
for _, m := range clus.Members {
rev, hash, err := m.RevHash()
if err != nil {
return nil, nil, err
}
revs[m.EtcdClientEndpoint] = rev
hashes[m.EtcdClientEndpoint] = hash
}
return revs, hashes, nil
}
func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
if rev <= 0 {
return nil
}
for i, m := range clus.Members {
conn, derr := m.DialEtcdGRPCServer()
if derr != nil {
clus.logger.Warn(
"compactKV dial failed",
zap.String("endpoint", m.EtcdClientEndpoint),
zap.Error(derr),
)
err = derr
continue
}
kvc := pb.NewKVClient(conn)
clus.logger.Info(
"starting compaction",
zap.String("endpoint", m.EtcdClientEndpoint),
zap.Int64("revision", rev),
zap.Duration("timeout", timeout),
)
now := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
_, cerr := kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev, Physical: true}, grpc.FailFast(false))
cancel()
conn.Close()
succeed := true
if cerr != nil {
if strings.Contains(cerr.Error(), "required revision has been compacted") && i > 0 {
clus.logger.Info(
"compact error is ignored",
zap.String("endpoint", m.EtcdClientEndpoint),
zap.Int64("revision", rev),
zap.Error(cerr),
)
} else {
clus.logger.Warn(
"compact failed",
zap.String("endpoint", m.EtcdClientEndpoint),
zap.Int64("revision", rev),
zap.Error(cerr),
)
err = cerr
succeed = false
}
}
if succeed {
clus.logger.Info(
"finished compaction",
zap.String("endpoint", m.EtcdClientEndpoint),
zap.Int64("revision", rev),
zap.Duration("timeout", timeout),
zap.Duration("took", time.Since(now)),
)
}
}
return err
}
func (clus *Cluster) checkCompact(rev int64) error {
if rev == 0 {
return nil
}
for _, m := range clus.Members {
if err := m.CheckCompact(rev); err != nil {
return err
}
}
return nil
}
func (clus *Cluster) defrag() error {
clus.logger.Info(
"defragmenting",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
for _, m := range clus.Members {
if err := m.Defrag(); err != nil {
clus.logger.Warn(
"defrag failed",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.Error(err),
)
return err
}
}
clus.logger.Info(
"defragmented",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
return nil
}
func (clus *Cluster) Report() int64 { return clus.stresser.ModifiedKeys() }

View File

@ -0,0 +1,162 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import (
"reflect"
"testing"
"github.com/coreos/etcd/tools/functional-tester/rpcpb"
"go.uber.org/zap"
)
func Test_newCluster(t *testing.T) {
exp := &Cluster{
Members: []*rpcpb.Member{
{
EtcdExecPath: "./bin/etcd",
AgentAddr: "127.0.0.1:19027",
FailpointHTTPAddr: "http://127.0.0.1:7381",
BaseDir: "/tmp/etcd-agent-data-1",
EtcdLogPath: "/tmp/etcd-agent-data-1/current-etcd.log",
EtcdClientTLS: false,
EtcdClientProxy: false,
EtcdPeerProxy: true,
EtcdClientEndpoint: "127.0.0.1:1379",
Etcd: &rpcpb.Etcd{
Name: "s1",
DataDir: "/tmp/etcd-agent-data-1/etcd.data",
WALDir: "/tmp/etcd-agent-data-1/etcd.data/member/wal",
ListenClientURLs: []string{"http://127.0.0.1:1379"},
AdvertiseClientURLs: []string{"http://127.0.0.1:1379"},
ListenPeerURLs: []string{"http://127.0.0.1:1380"},
InitialAdvertisePeerURLs: []string{"http://127.0.0.1:13800"},
InitialCluster: "s1=http://127.0.0.1:13800,s2=http://127.0.0.1:23800,s3=http://127.0.0.1:33800",
InitialClusterState: "new",
InitialClusterToken: "tkn",
SnapshotCount: 10000,
QuotaBackendBytes: 10740000000,
PreVote: true,
InitialCorruptCheck: true,
},
},
{
EtcdExecPath: "./bin/etcd",
AgentAddr: "127.0.0.1:29027",
FailpointHTTPAddr: "http://127.0.0.1:7382",
BaseDir: "/tmp/etcd-agent-data-2",
EtcdLogPath: "/tmp/etcd-agent-data-2/current-etcd.log",
EtcdClientTLS: false,
EtcdClientProxy: false,
EtcdPeerProxy: true,
EtcdClientEndpoint: "127.0.0.1:2379",
Etcd: &rpcpb.Etcd{
Name: "s2",
DataDir: "/tmp/etcd-agent-data-2/etcd.data",
WALDir: "/tmp/etcd-agent-data-2/etcd.data/member/wal",
ListenClientURLs: []string{"http://127.0.0.1:2379"},
AdvertiseClientURLs: []string{"http://127.0.0.1:2379"},
ListenPeerURLs: []string{"http://127.0.0.1:2380"},
InitialAdvertisePeerURLs: []string{"http://127.0.0.1:23800"},
InitialCluster: "s1=http://127.0.0.1:13800,s2=http://127.0.0.1:23800,s3=http://127.0.0.1:33800",
InitialClusterState: "new",
InitialClusterToken: "tkn",
SnapshotCount: 10000,
QuotaBackendBytes: 10740000000,
PreVote: true,
InitialCorruptCheck: true,
},
},
{
EtcdExecPath: "./bin/etcd",
AgentAddr: "127.0.0.1:39027",
FailpointHTTPAddr: "http://127.0.0.1:7383",
BaseDir: "/tmp/etcd-agent-data-3",
EtcdLogPath: "/tmp/etcd-agent-data-3/current-etcd.log",
EtcdClientTLS: false,
EtcdClientProxy: false,
EtcdPeerProxy: true,
EtcdClientEndpoint: "127.0.0.1:3379",
Etcd: &rpcpb.Etcd{
Name: "s3",
DataDir: "/tmp/etcd-agent-data-3/etcd.data",
WALDir: "/tmp/etcd-agent-data-3/etcd.data/member/wal",
ListenClientURLs: []string{"http://127.0.0.1:3379"},
AdvertiseClientURLs: []string{"http://127.0.0.1:3379"},
ListenPeerURLs: []string{"http://127.0.0.1:3380"},
InitialAdvertisePeerURLs: []string{"http://127.0.0.1:33800"},
InitialCluster: "s1=http://127.0.0.1:13800,s2=http://127.0.0.1:23800,s3=http://127.0.0.1:33800",
InitialClusterState: "new",
InitialClusterToken: "tkn",
SnapshotCount: 10000,
QuotaBackendBytes: 10740000000,
PreVote: true,
InitialCorruptCheck: true,
},
},
},
Tester: &rpcpb.Tester{
TesterNetwork: "tcp",
TesterAddr: "127.0.0.1:9028",
DelayLatencyMs: 500,
DelayLatencyMsRv: 50,
RoundLimit: 1,
ExitOnFailure: true,
ConsistencyCheck: true,
EnablePprof: true,
FailureCases: []string{
"KILL_ONE_FOLLOWER",
"KILL_LEADER",
"KILL_ONE_FOLLOWER_FOR_LONG",
"KILL_LEADER_FOR_LONG",
"KILL_QUORUM",
"KILL_ALL",
"BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER",
"BLACKHOLE_PEER_PORT_TX_RX_LEADER_ONE",
"BLACKHOLE_PEER_PORT_TX_RX_ALL",
"DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER",
"DELAY_PEER_PORT_TX_RX_LEADER",
"DELAY_PEER_PORT_TX_RX_ALL",
},
FailpointCommands: []string{`panic("etcd-tester")`},
RunnerExecPath: "/etcd-runner",
ExternalExecPath: "",
StressTypes: []string{"KV", "LEASE"},
StressKeySize: 100,
StressKeySizeLarge: 32769,
StressKeySuffixRange: 250000,
StressKeySuffixRangeTxn: 100,
StressKeyTxnOps: 10,
StressQPS: 1000,
},
}
logger, err := zap.NewProduction()
if err != nil {
t.Fatal(err)
}
defer logger.Sync()
cfg, err := newCluster(logger, "./local-test.yaml")
if err != nil {
t.Fatal(err)
}
cfg.logger = nil
if !reflect.DeepEqual(exp, cfg) {
t.Fatalf("expected %+v, got %+v", exp, cfg)
}
}

View File

@ -0,0 +1,16 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package tester implements functional-tester tester server.
package tester

View File

@ -0,0 +1,30 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
// Failure defines failure injection interface.
// To add a fail case:
// 1. implement "Failure" interface
// 2. define fail case name in "rpcpb.FailureCase"
type Failure interface {
// Inject injeccts the failure into the testing cluster at the given
// round. When calling the function, the cluster should be in health.
Inject(clus *Cluster, round int) error
// Recover recovers the injected failure caused by the injection of the
// given round and wait for the recovery of the testing cluster.
Recover(clus *Cluster, round int) error
// Desc returns a description of the failure
Desc() string
}

View File

@ -0,0 +1,49 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import "github.com/coreos/etcd/tools/functional-tester/rpcpb"
func injectBlackholePeerPortTxRx(clus *Cluster, idx int) error {
return clus.sendOperation(idx, rpcpb.Operation_BlackholePeerPortTxRx)
}
func recoverBlackholePeerPortTxRx(clus *Cluster, idx int) error {
return clus.sendOperation(idx, rpcpb.Operation_UnblackholePeerPortTxRx)
}
func newFailureBlackholePeerPortTxRxOne() Failure {
f := &failureOne{
description: "blackhole peer port on one member",
injectMember: injectBlackholePeerPortTxRx,
recoverMember: recoverBlackholePeerPortTxRx,
}
return &failureDelay{
Failure: f,
delayDuration: triggerElectionDur,
}
}
func newFailureBlackholePeerPortTxRxAll() Failure {
f := &failureAll{
description: "blackhole peer port on all members",
injectMember: injectBlackholePeerPortTxRx,
recoverMember: recoverBlackholePeerPortTxRx,
}
return &failureDelay{
Failure: f,
delayDuration: triggerElectionDur,
}
}

View File

@ -0,0 +1,41 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import (
"time"
"go.uber.org/zap"
)
type failureDelay struct {
Failure
delayDuration time.Duration
}
func (f *failureDelay) Inject(clus *Cluster, round int) error {
if err := f.Failure.Inject(clus, round); err != nil {
return err
}
if f.delayDuration > 0 {
clus.logger.Info(
"sleeping in failureDelay",
zap.Duration("delay", f.delayDuration),
zap.String("case", f.Failure.Desc()),
)
time.Sleep(f.delayDuration)
}
return nil
}

View File

@ -0,0 +1,44 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import (
"fmt"
"os/exec"
)
type failureExternal struct {
Failure
description string
scriptPath string
}
func (f *failureExternal) Inject(clus *Cluster, round int) error {
return exec.Command(f.scriptPath, "enable", fmt.Sprintf("%d", round)).Run()
}
func (f *failureExternal) Recover(clus *Cluster, round int) error {
return exec.Command(f.scriptPath, "disable", fmt.Sprintf("%d", round)).Run()
}
func (f *failureExternal) Desc() string { return f.description }
func newFailureExternal(scriptPath string) Failure {
return &failureExternal{
description: fmt.Sprintf("external fault injector (script: %q)", scriptPath),
scriptPath: scriptPath,
}
}

View File

@ -0,0 +1,159 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import (
"fmt"
"io/ioutil"
"net/http"
"strings"
"sync"
"time"
)
type failpointStats struct {
mu sync.Mutex
// crashes counts the number of crashes for a failpoint
crashes map[string]int
}
var fpStats failpointStats
func failpointFailures(clus *Cluster) (ret []Failure, err error) {
var fps []string
fps, err = failpointPaths(clus.Members[0].FailpointHTTPAddr)
if err != nil {
return nil, err
}
// create failure objects for all failpoints
for _, fp := range fps {
if len(fp) == 0 {
continue
}
fpFails := failuresFromFailpoint(fp, clus.Tester.FailpointCommands)
// wrap in delays so failpoint has time to trigger
for i, fpf := range fpFails {
if strings.Contains(fp, "Snap") {
// hack to trigger snapshot failpoints
fpFails[i] = &failureUntilSnapshot{fpf}
} else {
fpFails[i] = &failureDelay{fpf, 3 * time.Second}
}
}
ret = append(ret, fpFails...)
}
fpStats.crashes = make(map[string]int)
return ret, err
}
func failpointPaths(endpoint string) ([]string, error) {
resp, err := http.Get(endpoint)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, rerr := ioutil.ReadAll(resp.Body)
if rerr != nil {
return nil, rerr
}
var fps []string
for _, l := range strings.Split(string(body), "\n") {
fp := strings.Split(l, "=")[0]
fps = append(fps, fp)
}
return fps, nil
}
// failpoints follows FreeBSD KFAIL_POINT syntax.
// e.g. panic("etcd-tester"),1*sleep(1000)->panic("etcd-tester")
func failuresFromFailpoint(fp string, failpointCommands []string) (fs []Failure) {
recov := makeRecoverFailpoint(fp)
for _, fcmd := range failpointCommands {
inject := makeInjectFailpoint(fp, fcmd)
fs = append(fs, []Failure{
&failureOne{
description: description(fmt.Sprintf("failpoint %s (one: %s)", fp, fcmd)),
injectMember: inject,
recoverMember: recov,
},
&failureAll{
description: description(fmt.Sprintf("failpoint %s (all: %s)", fp, fcmd)),
injectMember: inject,
recoverMember: recov,
},
&failureQuorum{
description: description(fmt.Sprintf("failpoint %s (majority: %s)", fp, fcmd)),
injectMember: inject,
recoverMember: recov,
},
&failureLeader{
failureByFunc{
description: description(fmt.Sprintf("failpoint %s (leader: %s)", fp, fcmd)),
injectMember: inject,
recoverMember: recov,
},
0,
},
}...)
}
return fs
}
func makeInjectFailpoint(fp, val string) injectMemberFunc {
return func(clus *Cluster, idx int) (err error) {
return putFailpoint(clus.Members[idx].FailpointHTTPAddr, fp, val)
}
}
func makeRecoverFailpoint(fp string) recoverMemberFunc {
return func(clus *Cluster, idx int) error {
if err := delFailpoint(clus.Members[idx].FailpointHTTPAddr, fp); err == nil {
return nil
}
// node not responding, likely dead from fp panic; restart
fpStats.mu.Lock()
fpStats.crashes[fp]++
fpStats.mu.Unlock()
return recoverKill(clus, idx)
}
}
func putFailpoint(ep, fp, val string) error {
req, _ := http.NewRequest(http.MethodPut, ep+"/"+fp, strings.NewReader(val))
c := http.Client{}
resp, err := c.Do(req)
if err != nil {
return err
}
resp.Body.Close()
if resp.StatusCode/100 != 2 {
return fmt.Errorf("failed to PUT %s=%s at %s (%v)", fp, val, ep, resp.Status)
}
return nil
}
func delFailpoint(ep, fp string) error {
req, _ := http.NewRequest(http.MethodDelete, ep+"/"+fp, strings.NewReader(""))
c := http.Client{}
resp, err := c.Do(req)
if err != nil {
return err
}
resp.Body.Close()
if resp.StatusCode/100 != 2 {
return fmt.Errorf("failed to DELETE %s at %s (%v)", fp, ep, resp.Status)
}
return nil
}

View File

@ -0,0 +1,210 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import (
"fmt"
"math/rand"
"time"
"github.com/coreos/etcd/tools/functional-tester/rpcpb"
)
const snapshotCount = 10000
func injectKill(clus *Cluster, idx int) error {
return clus.sendOperation(idx, rpcpb.Operation_KillEtcd)
}
func recoverKill(clus *Cluster, idx int) error {
return clus.sendOperation(idx, rpcpb.Operation_RestartEtcd)
}
func newFailureKillAll() Failure {
return &failureAll{
description: "kill all members",
injectMember: injectKill,
recoverMember: recoverKill,
}
}
func newFailureKillQuorum() Failure {
return &failureQuorum{
description: "kill quorum of the cluster",
injectMember: injectKill,
recoverMember: recoverKill,
}
}
func newFailureKillOne() Failure {
return &failureOne{
description: "kill one random member",
injectMember: injectKill,
recoverMember: recoverKill,
}
}
func newFailureKillLeader() Failure {
ff := failureByFunc{
description: "kill leader member",
injectMember: injectKill,
recoverMember: recoverKill,
}
return &failureLeader{ff, 0}
}
func newFailureKillOneForLongTime() Failure {
return &failureUntilSnapshot{newFailureKillOne()}
}
func newFailureKillLeaderForLongTime() Failure {
return &failureUntilSnapshot{newFailureKillLeader()}
}
type description string
func (d description) Desc() string { return string(d) }
type injectMemberFunc func(*Cluster, int) error
type recoverMemberFunc func(*Cluster, int) error
type failureByFunc struct {
description
injectMember injectMemberFunc
recoverMember recoverMemberFunc
}
// TODO: support kill follower
type failureOne failureByFunc
type failureAll failureByFunc
type failureQuorum failureByFunc
type failureLeader struct {
failureByFunc
idx int
}
// failureUntilSnapshot injects a failure and waits for a snapshot event
type failureUntilSnapshot struct{ Failure }
func (f *failureOne) Inject(clus *Cluster, round int) error {
return f.injectMember(clus, round%len(clus.Members))
}
func (f *failureOne) Recover(clus *Cluster, round int) error {
if err := f.recoverMember(clus, round%len(clus.Members)); err != nil {
return err
}
return clus.WaitHealth()
}
func (f *failureAll) Inject(clus *Cluster, round int) error {
for i := range clus.Members {
if err := f.injectMember(clus, i); err != nil {
return err
}
}
return nil
}
func (f *failureAll) Recover(clus *Cluster, round int) error {
for i := range clus.Members {
if err := f.recoverMember(clus, i); err != nil {
return err
}
}
return clus.WaitHealth()
}
func (f *failureQuorum) Inject(clus *Cluster, round int) error {
for i := range killMap(len(clus.Members), round) {
if err := f.injectMember(clus, i); err != nil {
return err
}
}
return nil
}
func (f *failureQuorum) Recover(clus *Cluster, round int) error {
for i := range killMap(len(clus.Members), round) {
if err := f.recoverMember(clus, i); err != nil {
return err
}
}
return nil
}
func (f *failureLeader) Inject(clus *Cluster, round int) error {
idx, err := clus.GetLeader()
if err != nil {
return err
}
f.idx = idx
return f.injectMember(clus, idx)
}
func (f *failureLeader) Recover(clus *Cluster, round int) error {
if err := f.recoverMember(clus, f.idx); err != nil {
return err
}
return clus.WaitHealth()
}
func (f *failureUntilSnapshot) Inject(clus *Cluster, round int) error {
if err := f.Failure.Inject(clus, round); err != nil {
return err
}
if len(clus.Members) < 3 {
return nil
}
// maxRev may fail since failure just injected, retry if failed.
startRev, err := clus.maxRev()
for i := 0; i < 10 && startRev == 0; i++ {
startRev, err = clus.maxRev()
}
if startRev == 0 {
return err
}
lastRev := startRev
// Normal healthy cluster could accept 1000req/s at least.
// Give it 3-times time to create a new snapshot.
retry := snapshotCount / 1000 * 3
for j := 0; j < retry; j++ {
lastRev, _ = clus.maxRev()
// If the number of proposals committed is bigger than snapshot count,
// a new snapshot should have been created.
if lastRev-startRev > snapshotCount {
return nil
}
time.Sleep(time.Second)
}
return fmt.Errorf("cluster too slow: only commit %d requests in %ds", lastRev-startRev, retry)
}
func (f *failureUntilSnapshot) Desc() string {
return f.Failure.Desc() + " for a long time and expect it to recover from an incoming snapshot"
}
func killMap(size int, seed int) map[int]bool {
m := make(map[int]bool)
r := rand.New(rand.NewSource(int64(seed)))
majority := size/2 + 1
for {
m[r.Intn(size)] = true
if len(m) >= majority {
return m
}
}
}

View File

@ -0,0 +1,26 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
type failureNoOp failureByFunc
func (f *failureNoOp) Inject(clus *Cluster, round int) error { return nil }
func (f *failureNoOp) Recover(clus *Cluster, round int) error { return nil }
func newFailureNoOp() Failure {
return &failureNoOp{
description: "no failure",
}
}

View File

@ -0,0 +1,85 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import (
"fmt"
"time"
"github.com/coreos/etcd/tools/functional-tester/rpcpb"
)
const (
// TODO
slowNetworkLatency = 500 // 500 millisecond
// delay duration to trigger leader election (default election timeout 1s)
triggerElectionDur = 5 * time.Second
// Wait more when it recovers from slow network, because network layer
// needs extra time to propagate traffic control (tc command) change.
// Otherwise, we get different hash values from the previous revision.
// For more detail, please see https://github.com/coreos/etcd/issues/5121.
waitRecover = 5 * time.Second
)
func injectDelayPeerPortTxRx(clus *Cluster, idx int) error {
return clus.sendOperation(idx, rpcpb.Operation_DelayPeerPortTxRx)
}
func recoverDelayPeerPortTxRx(clus *Cluster, idx int) error {
err := clus.sendOperation(idx, rpcpb.Operation_UndelayPeerPortTxRx)
time.Sleep(waitRecover)
return err
}
func newFailureDelayPeerPortTxRxOneMember() Failure {
desc := fmt.Sprintf("delay one member's network by adding %d ms latency", slowNetworkLatency)
f := &failureOne{
description: description(desc),
injectMember: injectDelayPeerPortTxRx,
recoverMember: recoverDelayPeerPortTxRx,
}
return &failureDelay{
Failure: f,
delayDuration: triggerElectionDur,
}
}
func newFailureDelayPeerPortTxRxLeader() Failure {
desc := fmt.Sprintf("delay leader's network by adding %d ms latency", slowNetworkLatency)
ff := failureByFunc{
description: description(desc),
injectMember: injectDelayPeerPortTxRx,
recoverMember: recoverDelayPeerPortTxRx,
}
f := &failureLeader{ff, 0}
return &failureDelay{
Failure: f,
delayDuration: triggerElectionDur,
}
}
func newFailureDelayPeerPortTxRxAll() Failure {
f := &failureAll{
description: "delay all members' network",
injectMember: injectDelayPeerPortTxRx,
recoverMember: recoverDelayPeerPortTxRx,
}
return &failureDelay{
Failure: f,
delayDuration: triggerElectionDur,
}
}

View File

@ -0,0 +1,126 @@
agent-configs:
- etcd-exec-path: ./bin/etcd
agent-addr: 127.0.0.1:19027
failpoint-http-addr: http://127.0.0.1:7381
base-dir: /tmp/etcd-agent-data-1
etcd-log-path: /tmp/etcd-agent-data-1/current-etcd.log
etcd-client-tls: false
etcd-client-proxy: false
etcd-peer-proxy: true
etcd-client-endpoint: 127.0.0.1:1379
etcd-config:
name: s1
data-dir: /tmp/etcd-agent-data-1/etcd.data
wal-dir: /tmp/etcd-agent-data-1/etcd.data/member/wal
listen-client-urls: ["http://127.0.0.1:1379"]
advertise-client-urls: ["http://127.0.0.1:1379"]
listen-peer-urls: ["http://127.0.0.1:1380"]
initial-advertise-peer-urls: ["http://127.0.0.1:13800"]
initial-cluster: s1=http://127.0.0.1:13800,s2=http://127.0.0.1:23800,s3=http://127.0.0.1:33800
initial-cluster-state: new
initial-cluster-token: tkn
snapshot-count: 10000
quota-backend-bytes: 10740000000 # 10 GiB
pre-vote: true
initial-corrupt-check: true
- etcd-exec-path: ./bin/etcd
agent-addr: 127.0.0.1:29027
failpoint-http-addr: http://127.0.0.1:7382
base-dir: /tmp/etcd-agent-data-2
etcd-log-path: /tmp/etcd-agent-data-2/current-etcd.log
etcd-client-tls: false
etcd-client-proxy: false
etcd-peer-proxy: true
etcd-client-endpoint: 127.0.0.1:2379
etcd-config:
name: s2
data-dir: /tmp/etcd-agent-data-2/etcd.data
wal-dir: /tmp/etcd-agent-data-2/etcd.data/member/wal
listen-client-urls: ["http://127.0.0.1:2379"]
advertise-client-urls: ["http://127.0.0.1:2379"]
listen-peer-urls: ["http://127.0.0.1:2380"]
initial-advertise-peer-urls: ["http://127.0.0.1:23800"]
initial-cluster: s1=http://127.0.0.1:13800,s2=http://127.0.0.1:23800,s3=http://127.0.0.1:33800
initial-cluster-state: new
initial-cluster-token: tkn
snapshot-count: 10000
quota-backend-bytes: 10740000000 # 10 GiB
pre-vote: true
initial-corrupt-check: true
- etcd-exec-path: ./bin/etcd
agent-addr: 127.0.0.1:39027
failpoint-http-addr: http://127.0.0.1:7383
base-dir: /tmp/etcd-agent-data-3
etcd-log-path: /tmp/etcd-agent-data-3/current-etcd.log
etcd-client-tls: false
etcd-client-proxy: false
etcd-peer-proxy: true
etcd-client-endpoint: 127.0.0.1:3379
etcd-config:
name: s3
data-dir: /tmp/etcd-agent-data-3/etcd.data
wal-dir: /tmp/etcd-agent-data-3/etcd.data/member/wal
listen-client-urls: ["http://127.0.0.1:3379"]
advertise-client-urls: ["http://127.0.0.1:3379"]
listen-peer-urls: ["http://127.0.0.1:3380"]
initial-advertise-peer-urls: ["http://127.0.0.1:33800"]
initial-cluster: s1=http://127.0.0.1:13800,s2=http://127.0.0.1:23800,s3=http://127.0.0.1:33800
initial-cluster-state: new
initial-cluster-token: tkn
snapshot-count: 10000
quota-backend-bytes: 10740000000 # 10 GiB
pre-vote: true
initial-corrupt-check: true
tester-config:
tester-network: tcp
tester-addr: 127.0.0.1:9028
delay-latency-ms: 500
delay-latency-ms-rv: 50
round-limit: 1
exit-on-failure: true
consistency-check: true
enable-pprof: true
failure-cases:
- KILL_ONE_FOLLOWER
- KILL_LEADER
- KILL_ONE_FOLLOWER_FOR_LONG
- KILL_LEADER_FOR_LONG
- KILL_QUORUM
- KILL_ALL
- BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER
- BLACKHOLE_PEER_PORT_TX_RX_LEADER_ONE
- BLACKHOLE_PEER_PORT_TX_RX_ALL
- DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER
- DELAY_PEER_PORT_TX_RX_LEADER
- DELAY_PEER_PORT_TX_RX_ALL
# TODO: shuffle
# fail-shuffle: true
failpoint-commands:
- panic("etcd-tester")
# failpoint-commands:
# - panic("etcd-tester"),1*sleep(1000)
runner-exec-path: /etcd-runner
external-exec-path: ""
stress-types:
- KV
- LEASE
# - NO_STRESS
# - ELECTION_RUNNER
# - WATCH_RUNNER
# - LOCK_RACER_RUNNER
# - LEASE_RUNNER
stress-key-size: 100
stress-key-size-large: 32769
stress-key-suffix-range: 250000
stress-key-suffix-range-txn: 100
stress-key-txn-ops: 10
stress-qps: 1000

View File

@ -0,0 +1,62 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import "github.com/prometheus/client_golang/prometheus"
var (
caseTotalCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "funcational_tester",
Name: "case_total",
Help: "Total number of finished test cases",
},
[]string{"desc"},
)
caseFailedTotalCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "funcational_tester",
Name: "case_failed_total",
Help: "Total number of failed test cases",
},
[]string{"desc"},
)
roundTotalCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "funcational_tester",
Name: "round_total",
Help: "Total number of finished test rounds.",
})
roundFailedTotalCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "funcational_tester",
Name: "round_failed_total",
Help: "Total number of failed test rounds.",
})
)
func init() {
prometheus.MustRegister(caseTotalCounter)
prometheus.MustRegister(caseFailedTotalCounter)
prometheus.MustRegister(roundTotalCounter)
prometheus.MustRegister(roundFailedTotalCounter)
}

View File

@ -0,0 +1,202 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import (
"fmt"
"sync"
"time"
"go.uber.org/zap"
)
type Stresser interface {
// Stress starts to stress the etcd cluster
Stress() error
// Pause stops the stresser from sending requests to etcd. Resume by calling Stress.
Pause()
// Close releases all of the Stresser's resources.
Close()
// ModifiedKeys reports the number of keys created and deleted by stresser
ModifiedKeys() int64
// Checker returns an invariant checker for after the stresser is canceled.
Checker() Checker
}
// nopStresser implements Stresser that does nothing
type nopStresser struct {
start time.Time
qps int
}
func (s *nopStresser) Stress() error { return nil }
func (s *nopStresser) Pause() {}
func (s *nopStresser) Close() {}
func (s *nopStresser) ModifiedKeys() int64 {
return 0
}
func (s *nopStresser) Checker() Checker { return nil }
// compositeStresser implements a Stresser that runs a slice of
// stressing clients concurrently.
type compositeStresser struct {
stressers []Stresser
}
func (cs *compositeStresser) Stress() error {
for i, s := range cs.stressers {
if err := s.Stress(); err != nil {
for j := 0; j < i; j++ {
cs.stressers[i].Close()
}
return err
}
}
return nil
}
func (cs *compositeStresser) Pause() {
var wg sync.WaitGroup
wg.Add(len(cs.stressers))
for i := range cs.stressers {
go func(s Stresser) {
defer wg.Done()
s.Pause()
}(cs.stressers[i])
}
wg.Wait()
}
func (cs *compositeStresser) Close() {
var wg sync.WaitGroup
wg.Add(len(cs.stressers))
for i := range cs.stressers {
go func(s Stresser) {
defer wg.Done()
s.Close()
}(cs.stressers[i])
}
wg.Wait()
}
func (cs *compositeStresser) ModifiedKeys() (modifiedKey int64) {
for _, stress := range cs.stressers {
modifiedKey += stress.ModifiedKeys()
}
return modifiedKey
}
func (cs *compositeStresser) Checker() Checker {
var chks []Checker
for _, s := range cs.stressers {
if chk := s.Checker(); chk != nil {
chks = append(chks, chk)
}
}
if len(chks) == 0 {
return nil
}
return newCompositeChecker(chks)
}
// newStresser creates stresser from a comma separated list of stresser types.
func newStresser(clus *Cluster, idx int) Stresser {
stressers := make([]Stresser, len(clus.Tester.StressTypes))
for i, stype := range clus.Tester.StressTypes {
clus.logger.Info("creating stresser", zap.String("type", stype))
switch stype {
case "NO_STRESS":
stressers[i] = &nopStresser{start: time.Now(), qps: int(clus.rateLimiter.Limit())}
case "KV":
// TODO: Too intensive stressing clients can panic etcd member with
// 'out of memory' error. Put rate limits in server side.
stressers[i] = &keyStresser{
logger: clus.logger,
Endpoint: clus.Members[idx].EtcdClientEndpoint,
keySize: int(clus.Tester.StressKeySize),
keyLargeSize: int(clus.Tester.StressKeySizeLarge),
keySuffixRange: int(clus.Tester.StressKeySuffixRange),
keyTxnSuffixRange: int(clus.Tester.StressKeySuffixRangeTxn),
keyTxnOps: int(clus.Tester.StressKeyTxnOps),
N: 100,
rateLimiter: clus.rateLimiter,
}
case "LEASE":
stressers[i] = &leaseStresser{
logger: clus.logger,
endpoint: clus.Members[idx].EtcdClientEndpoint,
numLeases: 10, // TODO: configurable
keysPerLease: 10, // TODO: configurable
rateLimiter: clus.rateLimiter,
}
case "ELECTION_RUNNER":
reqRate := 100
args := []string{
"election",
fmt.Sprintf("%v", time.Now().UnixNano()), // election name as current nano time
"--dial-timeout=10s",
"--endpoints", clus.Members[idx].EtcdClientEndpoint,
"--total-client-connections=10",
"--rounds=0", // runs forever
"--req-rate", fmt.Sprintf("%v", reqRate),
}
stressers[i] = newRunnerStresser(
clus.Tester.RunnerExecPath,
args,
clus.rateLimiter,
reqRate,
)
case "WATCH_RUNNER":
reqRate := 100
args := []string{
"watcher",
"--prefix", fmt.Sprintf("%v", time.Now().UnixNano()), // prefix all keys with nano time
"--total-keys=1",
"--total-prefixes=1",
"--watch-per-prefix=1",
"--endpoints", clus.Members[idx].EtcdClientEndpoint,
"--rounds=0", // runs forever
"--req-rate", fmt.Sprintf("%v", reqRate),
}
stressers[i] = newRunnerStresser(clus.Tester.RunnerExecPath, args, clus.rateLimiter, reqRate)
case "LOCK_RACER_RUNNER":
reqRate := 100
args := []string{
"lock-racer",
fmt.Sprintf("%v", time.Now().UnixNano()), // locker name as current nano time
"--endpoints", clus.Members[idx].EtcdClientEndpoint,
"--total-client-connections=10",
"--rounds=0", // runs forever
"--req-rate", fmt.Sprintf("%v", reqRate),
}
stressers[i] = newRunnerStresser(clus.Tester.RunnerExecPath, args, clus.rateLimiter, reqRate)
case "LEASE_RUNNER":
args := []string{
"lease-renewer",
"--ttl=30",
"--endpoints", clus.Members[idx].EtcdClientEndpoint,
}
stressers[i] = newRunnerStresser(clus.Tester.RunnerExecPath, args, clus.rateLimiter, 0)
}
}
return &compositeStresser{stressers}
}

View File

@ -0,0 +1,345 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"go.uber.org/zap"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/transport"
)
type keyStresser struct {
logger *zap.Logger
// TODO: use Member
Endpoint string
keySize int
keyLargeSize int
keySuffixRange int
keyTxnSuffixRange int
keyTxnOps int
N int
rateLimiter *rate.Limiter
wg sync.WaitGroup
cancel func()
conn *grpc.ClientConn
// atomicModifiedKeys records the number of keys created and deleted by the stresser.
atomicModifiedKeys int64
stressTable *stressTable
}
func (s *keyStresser) Stress() error {
// TODO: add backoff option
conn, err := grpc.Dial(s.Endpoint, grpc.WithInsecure())
if err != nil {
return fmt.Errorf("%v (%s)", err, s.Endpoint)
}
ctx, cancel := context.WithCancel(context.Background())
s.wg.Add(s.N)
s.conn = conn
s.cancel = cancel
kvc := pb.NewKVClient(conn)
var stressEntries = []stressEntry{
{weight: 0.7, f: newStressPut(kvc, s.keySuffixRange, s.keySize)},
{
weight: 0.7 * float32(s.keySize) / float32(s.keyLargeSize),
f: newStressPut(kvc, s.keySuffixRange, s.keyLargeSize),
},
{weight: 0.07, f: newStressRange(kvc, s.keySuffixRange)},
{weight: 0.07, f: newStressRangeInterval(kvc, s.keySuffixRange)},
{weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)},
{weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)},
}
if s.keyTxnSuffixRange > 0 {
// adjust to make up ±70% of workloads with writes
stressEntries[0].weight = 0.35
stressEntries = append(stressEntries, stressEntry{
weight: 0.35,
f: newStressTxn(kvc, s.keyTxnSuffixRange, s.keyTxnOps),
})
}
s.stressTable = createStressTable(stressEntries)
for i := 0; i < s.N; i++ {
go s.run(ctx)
}
s.logger.Info(
"key stresser started in background",
zap.String("endpoint", s.Endpoint),
)
return nil
}
func (s *keyStresser) run(ctx context.Context) {
defer s.wg.Done()
for {
if err := s.rateLimiter.Wait(ctx); err == context.Canceled {
return
}
// TODO: 10-second is enough timeout to cover leader failure
// and immediate leader election. Find out what other cases this
// could be timed out.
sctx, scancel := context.WithTimeout(ctx, 10*time.Second)
err, modifiedKeys := s.stressTable.choose()(sctx)
scancel()
if err == nil {
atomic.AddInt64(&s.atomicModifiedKeys, modifiedKeys)
continue
}
switch rpctypes.ErrorDesc(err) {
case context.DeadlineExceeded.Error():
// This retries when request is triggered at the same time as
// leader failure. When we terminate the leader, the request to
// that leader cannot be processed, and times out. Also requests
// to followers cannot be forwarded to the old leader, so timing out
// as well. We want to keep stressing until the cluster elects a
// new leader and start processing requests again.
case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error():
// This retries when request is triggered at the same time as
// leader failure and follower nodes receive time out errors
// from losing their leader. Followers should retry to connect
// to the new leader.
case etcdserver.ErrStopped.Error():
// one of the etcd nodes stopped from failure injection
case transport.ErrConnClosing.Desc:
// server closed the transport (failure injected node)
case rpctypes.ErrNotCapable.Error():
// capability check has not been done (in the beginning)
case rpctypes.ErrTooManyRequests.Error():
// hitting the recovering member.
case context.Canceled.Error():
// from stresser.Cancel method:
return
case grpc.ErrClientConnClosing.Error():
// from stresser.Cancel method:
return
default:
s.logger.Warn(
"key stresser exited with error",
zap.String("endpoint", s.Endpoint),
zap.Error(err),
)
return
}
}
}
func (s *keyStresser) Pause() {
s.Close()
}
func (s *keyStresser) Close() {
s.cancel()
s.conn.Close()
s.wg.Wait()
s.logger.Info(
"key stresser is closed",
zap.String("endpoint", s.Endpoint),
)
}
func (s *keyStresser) ModifiedKeys() int64 {
return atomic.LoadInt64(&s.atomicModifiedKeys)
}
func (s *keyStresser) Checker() Checker { return nil }
type stressFunc func(ctx context.Context) (err error, modifiedKeys int64)
type stressEntry struct {
weight float32
f stressFunc
}
type stressTable struct {
entries []stressEntry
sumWeights float32
}
func createStressTable(entries []stressEntry) *stressTable {
st := stressTable{entries: entries}
for _, entry := range st.entries {
st.sumWeights += entry.weight
}
return &st
}
func (st *stressTable) choose() stressFunc {
v := rand.Float32() * st.sumWeights
var sum float32
var idx int
for i := range st.entries {
sum += st.entries[i].weight
if sum >= v {
idx = i
break
}
}
return st.entries[idx].f
}
func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc {
return func(ctx context.Context) (error, int64) {
_, err := kvc.Put(ctx, &pb.PutRequest{
Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
Value: randBytes(keySize),
}, grpc.FailFast(false))
return err, 1
}
}
func newStressTxn(kvc pb.KVClient, keyTxnSuffixRange, txnOps int) stressFunc {
keys := make([]string, keyTxnSuffixRange)
for i := range keys {
keys[i] = fmt.Sprintf("/k%03d", i)
}
return writeTxn(kvc, keys, txnOps)
}
func writeTxn(kvc pb.KVClient, keys []string, txnOps int) stressFunc {
return func(ctx context.Context) (error, int64) {
ks := make(map[string]struct{}, txnOps)
for len(ks) != txnOps {
ks[keys[rand.Intn(len(keys))]] = struct{}{}
}
selected := make([]string, 0, txnOps)
for k := range ks {
selected = append(selected, k)
}
com, delOp, putOp := getTxnReqs(selected[0], "bar00")
txnReq := &pb.TxnRequest{
Compare: []*pb.Compare{com},
Success: []*pb.RequestOp{delOp},
Failure: []*pb.RequestOp{putOp},
}
// add nested txns if any
for i := 1; i < txnOps; i++ {
k, v := selected[i], fmt.Sprintf("bar%02d", i)
com, delOp, putOp = getTxnReqs(k, v)
nested := &pb.RequestOp{
Request: &pb.RequestOp_RequestTxn{
RequestTxn: &pb.TxnRequest{
Compare: []*pb.Compare{com},
Success: []*pb.RequestOp{delOp},
Failure: []*pb.RequestOp{putOp},
},
},
}
txnReq.Success = append(txnReq.Success, nested)
txnReq.Failure = append(txnReq.Failure, nested)
}
_, err := kvc.Txn(ctx, txnReq, grpc.FailFast(false))
return err, int64(txnOps)
}
}
func getTxnReqs(key, val string) (com *pb.Compare, delOp *pb.RequestOp, putOp *pb.RequestOp) {
// if key exists (version > 0)
com = &pb.Compare{
Key: []byte(key),
Target: pb.Compare_VERSION,
Result: pb.Compare_GREATER,
TargetUnion: &pb.Compare_Version{Version: 0},
}
delOp = &pb.RequestOp{
Request: &pb.RequestOp_RequestDeleteRange{
RequestDeleteRange: &pb.DeleteRangeRequest{
Key: []byte(key),
},
},
}
putOp = &pb.RequestOp{
Request: &pb.RequestOp_RequestPut{
RequestPut: &pb.PutRequest{
Key: []byte(key),
Value: []byte(val),
},
},
}
return com, delOp, putOp
}
func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc {
return func(ctx context.Context) (error, int64) {
_, err := kvc.Range(ctx, &pb.RangeRequest{
Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
}, grpc.FailFast(false))
return err, 0
}
}
func newStressRangeInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
return func(ctx context.Context) (error, int64) {
start := rand.Intn(keySuffixRange)
end := start + 500
_, err := kvc.Range(ctx, &pb.RangeRequest{
Key: []byte(fmt.Sprintf("foo%016x", start)),
RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
}, grpc.FailFast(false))
return err, 0
}
}
func newStressDelete(kvc pb.KVClient, keySuffixRange int) stressFunc {
return func(ctx context.Context) (error, int64) {
_, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
}, grpc.FailFast(false))
return err, 1
}
}
func newStressDeleteInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
return func(ctx context.Context) (error, int64) {
start := rand.Intn(keySuffixRange)
end := start + 500
resp, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
Key: []byte(fmt.Sprintf("foo%016x", start)),
RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
}, grpc.FailFast(false))
if err == nil {
return nil, resp.Deleted
}
return err, 0
}
}

View File

@ -0,0 +1,485 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"go.uber.org/zap"
"golang.org/x/time/rate"
"google.golang.org/grpc"
)
const (
// time to live for lease
TTL = 120
TTLShort = 2
)
type leaseStresser struct {
logger *zap.Logger
endpoint string
cancel func()
conn *grpc.ClientConn
kvc pb.KVClient
lc pb.LeaseClient
ctx context.Context
rateLimiter *rate.Limiter
// atomicModifiedKey records the number of keys created and deleted during a test case
atomicModifiedKey int64
numLeases int
keysPerLease int
aliveLeases *atomicLeases
revokedLeases *atomicLeases
shortLivedLeases *atomicLeases
runWg sync.WaitGroup
aliveWg sync.WaitGroup
}
type atomicLeases struct {
// rwLock is used to protect read/write access of leases map
// which are accessed and modified by different go routines.
rwLock sync.RWMutex
leases map[int64]time.Time
}
func (al *atomicLeases) add(leaseID int64, t time.Time) {
al.rwLock.Lock()
al.leases[leaseID] = t
al.rwLock.Unlock()
}
func (al *atomicLeases) update(leaseID int64, t time.Time) {
al.rwLock.Lock()
_, ok := al.leases[leaseID]
if ok {
al.leases[leaseID] = t
}
al.rwLock.Unlock()
}
func (al *atomicLeases) read(leaseID int64) (rv time.Time, ok bool) {
al.rwLock.RLock()
rv, ok = al.leases[leaseID]
al.rwLock.RUnlock()
return rv, ok
}
func (al *atomicLeases) remove(leaseID int64) {
al.rwLock.Lock()
delete(al.leases, leaseID)
al.rwLock.Unlock()
}
func (al *atomicLeases) getLeasesMap() map[int64]time.Time {
leasesCopy := make(map[int64]time.Time)
al.rwLock.RLock()
for k, v := range al.leases {
leasesCopy[k] = v
}
al.rwLock.RUnlock()
return leasesCopy
}
func (ls *leaseStresser) setupOnce() error {
if ls.aliveLeases != nil {
return nil
}
if ls.numLeases == 0 {
panic("expect numLeases to be set")
}
if ls.keysPerLease == 0 {
panic("expect keysPerLease to be set")
}
ls.aliveLeases = &atomicLeases{leases: make(map[int64]time.Time)}
return nil
}
func (ls *leaseStresser) Stress() error {
ls.logger.Info(
"lease stresser is started",
zap.String("endpoint", ls.endpoint),
)
if err := ls.setupOnce(); err != nil {
return err
}
conn, err := grpc.Dial(ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1*time.Second))
if err != nil {
return fmt.Errorf("%v (%s)", err, ls.endpoint)
}
ls.conn = conn
ls.kvc = pb.NewKVClient(conn)
ls.lc = pb.NewLeaseClient(conn)
ls.revokedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
ls.shortLivedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
ctx, cancel := context.WithCancel(context.Background())
ls.cancel = cancel
ls.ctx = ctx
ls.runWg.Add(1)
go ls.run()
return nil
}
func (ls *leaseStresser) run() {
defer ls.runWg.Done()
ls.restartKeepAlives()
for {
// the number of keys created and deleted is roughly 2x the number of created keys for an iteration.
// the rateLimiter therefore consumes 2x ls.numLeases*ls.keysPerLease tokens where each token represents a create/delete operation for key.
err := ls.rateLimiter.WaitN(ls.ctx, 2*ls.numLeases*ls.keysPerLease)
if err == context.Canceled {
return
}
ls.logger.Debug(
"lease stresser is creating leases",
zap.String("endpoint", ls.endpoint),
)
ls.createLeases()
ls.logger.Debug(
"lease stresser created leases",
zap.String("endpoint", ls.endpoint),
)
ls.logger.Debug(
"lease stresser is dropped leases",
zap.String("endpoint", ls.endpoint),
)
ls.randomlyDropLeases()
ls.logger.Debug(
"lease stresser dropped leases",
zap.String("endpoint", ls.endpoint),
)
}
}
func (ls *leaseStresser) restartKeepAlives() {
for leaseID := range ls.aliveLeases.getLeasesMap() {
ls.aliveWg.Add(1)
go func(id int64) {
ls.keepLeaseAlive(id)
}(leaseID)
}
}
func (ls *leaseStresser) createLeases() {
ls.createAliveLeases()
ls.createShortLivedLeases()
}
func (ls *leaseStresser) createAliveLeases() {
neededLeases := ls.numLeases - len(ls.aliveLeases.getLeasesMap())
var wg sync.WaitGroup
for i := 0; i < neededLeases; i++ {
wg.Add(1)
go func() {
defer wg.Done()
leaseID, err := ls.createLeaseWithKeys(TTL)
if err != nil {
ls.logger.Debug(
"createLeaseWithKeys failed",
zap.String("endpoint", ls.endpoint),
zap.Error(err),
)
return
}
ls.aliveLeases.add(leaseID, time.Now())
// keep track of all the keep lease alive go routines
ls.aliveWg.Add(1)
go ls.keepLeaseAlive(leaseID)
}()
}
wg.Wait()
}
func (ls *leaseStresser) createShortLivedLeases() {
// one round of createLeases() might not create all the short lived leases we want due to falures.
// thus, we want to create remaining short lived leases in the future round.
neededLeases := ls.numLeases - len(ls.shortLivedLeases.getLeasesMap())
var wg sync.WaitGroup
for i := 0; i < neededLeases; i++ {
wg.Add(1)
go func() {
defer wg.Done()
leaseID, err := ls.createLeaseWithKeys(TTLShort)
if err != nil {
return
}
ls.shortLivedLeases.add(leaseID, time.Now())
}()
}
wg.Wait()
}
func (ls *leaseStresser) createLeaseWithKeys(ttl int64) (int64, error) {
leaseID, err := ls.createLease(ttl)
if err != nil {
ls.logger.Debug(
"createLease failed",
zap.String("endpoint", ls.endpoint),
zap.Error(err),
)
return -1, err
}
ls.logger.Debug(
"createLease created lease",
zap.String("endpoint", ls.endpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
)
if err := ls.attachKeysWithLease(leaseID); err != nil {
return -1, err
}
return leaseID, nil
}
func (ls *leaseStresser) randomlyDropLeases() {
var wg sync.WaitGroup
for l := range ls.aliveLeases.getLeasesMap() {
wg.Add(1)
go func(leaseID int64) {
defer wg.Done()
dropped, err := ls.randomlyDropLease(leaseID)
// if randomlyDropLease encountered an error such as context is cancelled, remove the lease from aliveLeases
// because we can't tell whether the lease is dropped or not.
if err != nil {
ls.logger.Debug(
"randomlyDropLease failed",
zap.String("endpoint", ls.endpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err),
)
ls.aliveLeases.remove(leaseID)
return
}
if !dropped {
return
}
ls.logger.Debug(
"randomlyDropLease dropped a lease",
zap.String("endpoint", ls.endpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
)
ls.revokedLeases.add(leaseID, time.Now())
ls.aliveLeases.remove(leaseID)
}(l)
}
wg.Wait()
}
func (ls *leaseStresser) createLease(ttl int64) (int64, error) {
resp, err := ls.lc.LeaseGrant(ls.ctx, &pb.LeaseGrantRequest{TTL: ttl})
if err != nil {
return -1, err
}
return resp.ID, nil
}
func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
defer ls.aliveWg.Done()
ctx, cancel := context.WithCancel(ls.ctx)
stream, err := ls.lc.LeaseKeepAlive(ctx)
defer func() { cancel() }()
for {
select {
case <-time.After(500 * time.Millisecond):
case <-ls.ctx.Done():
ls.logger.Debug(
"keepLeaseAlive context canceled",
zap.String("endpoint", ls.endpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(ls.ctx.Err()),
)
// it is possible that lease expires at invariant checking phase but not at keepLeaseAlive() phase.
// this scenerio is possible when alive lease is just about to expire when keepLeaseAlive() exists and expires at invariant checking phase.
// to circumvent that scenerio, we check each lease before keepalive loop exist to see if it has been renewed in last TTL/2 duration.
// if it is renewed, this means that invariant checking have at least ttl/2 time before lease exipres which is long enough for the checking to finish.
// if it is not renewed, we remove the lease from the alive map so that the lease doesn't exipre during invariant checking
renewTime, ok := ls.aliveLeases.read(leaseID)
if ok && renewTime.Add(TTL/2*time.Second).Before(time.Now()) {
ls.aliveLeases.remove(leaseID)
ls.logger.Debug(
"keepLeaseAlive lease has not been renewed, dropped it",
zap.String("endpoint", ls.endpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
)
}
return
}
if err != nil {
ls.logger.Debug(
"keepLeaseAlive lease creates stream error",
zap.String("endpoint", ls.endpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err),
)
cancel()
ctx, cancel = context.WithCancel(ls.ctx)
stream, err = ls.lc.LeaseKeepAlive(ctx)
cancel()
continue
}
ls.logger.Debug(
"keepLeaseAlive stream sends lease keepalive request",
zap.String("endpoint", ls.endpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
)
err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID})
if err != nil {
ls.logger.Debug(
"keepLeaseAlive stream failed to send lease keepalive request",
zap.String("endpoint", ls.endpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err),
)
continue
}
leaseRenewTime := time.Now()
ls.logger.Debug(
"keepLeaseAlive stream sent lease keepalive request",
zap.String("endpoint", ls.endpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
)
respRC, err := stream.Recv()
if err != nil {
ls.logger.Debug(
"keepLeaseAlive stream failed to receive lease keepalive response",
zap.String("endpoint", ls.endpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err),
)
continue
}
// lease expires after TTL become 0
// don't send keepalive if the lease has expired
if respRC.TTL <= 0 {
ls.logger.Debug(
"keepLeaseAlive stream received lease keepalive response TTL <= 0",
zap.String("endpoint", ls.endpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Int64("ttl", respRC.TTL),
)
ls.aliveLeases.remove(leaseID)
return
}
// renew lease timestamp only if lease is present
ls.logger.Debug(
"keepLeaseAlive renewed a lease",
zap.String("endpoint", ls.endpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
)
ls.aliveLeases.update(leaseID, leaseRenewTime)
}
}
// attachKeysWithLease function attaches keys to the lease.
// the format of key is the concat of leaseID + '_' + '<order of key creation>'
// e.g 5186835655248304152_0 for first created key and 5186835655248304152_1 for second created key
func (ls *leaseStresser) attachKeysWithLease(leaseID int64) error {
var txnPuts []*pb.RequestOp
for j := 0; j < ls.keysPerLease; j++ {
txnput := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte(fmt.Sprintf("%d%s%d", leaseID, "_", j)),
Value: []byte(fmt.Sprintf("bar")), Lease: leaseID}}}
txnPuts = append(txnPuts, txnput)
}
// keep retrying until lease is not found or ctx is being canceled
for ls.ctx.Err() == nil {
txn := &pb.TxnRequest{Success: txnPuts}
_, err := ls.kvc.Txn(ls.ctx, txn)
if err == nil {
// since all created keys will be deleted too, the number of operations on keys will be roughly 2x the number of created keys
atomic.AddInt64(&ls.atomicModifiedKey, 2*int64(ls.keysPerLease))
return nil
}
if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
return err
}
}
return ls.ctx.Err()
}
// randomlyDropLease drops the lease only when the rand.Int(2) returns 1.
// This creates a 50/50 percents chance of dropping a lease
func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) {
if rand.Intn(2) != 0 {
return false, nil
}
// keep retrying until a lease is dropped or ctx is being canceled
for ls.ctx.Err() == nil {
_, err := ls.lc.LeaseRevoke(ls.ctx, &pb.LeaseRevokeRequest{ID: leaseID})
if err == nil || rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
return true, nil
}
}
ls.logger.Debug(
"randomlyDropLease error",
zap.String("endpoint", ls.endpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(ls.ctx.Err()),
)
return false, ls.ctx.Err()
}
func (ls *leaseStresser) Pause() {
ls.Close()
}
func (ls *leaseStresser) Close() {
ls.logger.Info(
"lease stresser is closing",
zap.String("endpoint", ls.endpoint),
)
ls.cancel()
ls.runWg.Wait()
ls.aliveWg.Wait()
ls.conn.Close()
ls.logger.Info(
"lease stresser is closed",
zap.String("endpoint", ls.endpoint),
)
}
func (ls *leaseStresser) ModifiedKeys() int64 {
return atomic.LoadInt64(&ls.atomicModifiedKey)
}
func (ls *leaseStresser) Checker() Checker {
return &leaseChecker{
logger: ls.logger,
endpoint: ls.endpoint,
ls: ls,
}
}

View File

@ -0,0 +1,97 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import (
"fmt"
"io/ioutil"
"os/exec"
"syscall"
"golang.org/x/time/rate"
)
type runnerStresser struct {
cmd *exec.Cmd
cmdStr string
args []string
rl *rate.Limiter
reqRate int
errc chan error
donec chan struct{}
}
func newRunnerStresser(cmdStr string, args []string, rl *rate.Limiter, reqRate int) *runnerStresser {
rl.SetLimit(rl.Limit() - rate.Limit(reqRate))
return &runnerStresser{
cmdStr: cmdStr,
args: args,
rl: rl,
reqRate: reqRate,
errc: make(chan error, 1),
donec: make(chan struct{}),
}
}
func (rs *runnerStresser) setupOnce() (err error) {
if rs.cmd != nil {
return nil
}
rs.cmd = exec.Command(rs.cmdStr, rs.args...)
stderr, err := rs.cmd.StderrPipe()
if err != nil {
return err
}
go func() {
defer close(rs.donec)
out, err := ioutil.ReadAll(stderr)
if err != nil {
rs.errc <- err
} else {
rs.errc <- fmt.Errorf("(%v %v) stderr %v", rs.cmdStr, rs.args, string(out))
}
}()
return rs.cmd.Start()
}
func (rs *runnerStresser) Stress() (err error) {
if err = rs.setupOnce(); err != nil {
return err
}
return syscall.Kill(rs.cmd.Process.Pid, syscall.SIGCONT)
}
func (rs *runnerStresser) Pause() {
syscall.Kill(rs.cmd.Process.Pid, syscall.SIGSTOP)
}
func (rs *runnerStresser) Close() {
syscall.Kill(rs.cmd.Process.Pid, syscall.SIGINT)
rs.cmd.Wait()
<-rs.donec
rs.rl.SetLimit(rs.rl.Limit() + rate.Limit(rs.reqRate))
}
func (rs *runnerStresser) ModifiedKeys() int64 {
return 1
}
func (rs *runnerStresser) Checker() Checker {
return &runnerChecker{rs.errc}
}

View File

@ -0,0 +1,274 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import (
"fmt"
"os"
"time"
"go.uber.org/zap"
)
// compactQPS is rough number of compact requests per second.
// Previous tests showed etcd can compact about 60,000 entries per second.
const compactQPS = 50000
// StartTester starts tester.
func (clus *Cluster) StartTester() {
// TODO: upate status
clus.startStresser()
var preModifiedKey int64
for round := 0; round < int(clus.Tester.RoundLimit) || clus.Tester.RoundLimit == -1; round++ {
roundTotalCounter.Inc()
clus.rd = round
if err := clus.doRound(round); err != nil {
clus.logger.Warn(
"doRound failed; returning",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.Error(err),
)
if clus.cleanup() != nil {
return
}
// reset preModifiedKey after clean up
preModifiedKey = 0
continue
}
// -1 so that logPrefix doesn't print out 'case'
clus.cs = -1
revToCompact := max(0, clus.currentRevision-10000)
currentModifiedKey := clus.stresser.ModifiedKeys()
modifiedKey := currentModifiedKey - preModifiedKey
preModifiedKey = currentModifiedKey
timeout := 10 * time.Second
timeout += time.Duration(modifiedKey/compactQPS) * time.Second
clus.logger.Info(
"compacting",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.Duration("timeout", timeout),
)
if err := clus.compact(revToCompact, timeout); err != nil {
clus.logger.Warn(
"compact failed",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.Error(err),
)
if err = clus.cleanup(); err != nil {
clus.logger.Warn(
"cleanup failed",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.Error(err),
)
return
}
// reset preModifiedKey after clean up
preModifiedKey = 0
}
if round > 0 && round%500 == 0 { // every 500 rounds
if err := clus.defrag(); err != nil {
clus.logger.Warn(
"defrag failed; returning",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.Error(err),
)
clus.failed()
return
}
}
}
clus.logger.Info(
"functional-tester is finished",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
}
func (clus *Cluster) doRound(round int) error {
for i, f := range clus.failures {
clus.cs = i
caseTotalCounter.WithLabelValues(f.Desc()).Inc()
if err := clus.WaitHealth(); err != nil {
return fmt.Errorf("wait full health error: %v", err)
}
clus.logger.Info(
"injecting failure",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.String("desc", f.Desc()),
)
if err := f.Inject(clus, round); err != nil {
return fmt.Errorf("injection error: %v", err)
}
clus.logger.Info(
"injected failure",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.String("desc", f.Desc()),
)
clus.logger.Info(
"recovering failure",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.String("desc", f.Desc()),
)
if err := f.Recover(clus, round); err != nil {
return fmt.Errorf("recovery error: %v", err)
}
clus.logger.Info(
"recovered failure",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.String("desc", f.Desc()),
)
clus.pauseStresser()
if err := clus.WaitHealth(); err != nil {
return fmt.Errorf("wait full health error: %v", err)
}
if err := clus.checkConsistency(); err != nil {
return fmt.Errorf("tt.checkConsistency error (%v)", err)
}
clus.logger.Info(
"success",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.String("desc", f.Desc()),
)
}
return nil
}
func (clus *Cluster) updateRevision() error {
revs, _, err := clus.getRevisionHash()
for _, rev := range revs {
clus.currentRevision = rev
break // just need get one of the current revisions
}
clus.logger.Info(
"updated current revision",
zap.Int64("current-revision", clus.currentRevision),
)
return err
}
func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) {
clus.pauseStresser()
defer func() {
if err == nil {
err = clus.startStresser()
}
}()
clus.logger.Info(
"compacting storage",
zap.Int64("current-revision", clus.currentRevision),
zap.Int64("compact-revision", rev),
)
if err = clus.compactKV(rev, timeout); err != nil {
return err
}
clus.logger.Info(
"compacted storage",
zap.Int64("current-revision", clus.currentRevision),
zap.Int64("compact-revision", rev),
)
clus.logger.Info(
"checking compaction",
zap.Int64("current-revision", clus.currentRevision),
zap.Int64("compact-revision", rev),
)
if err = clus.checkCompact(rev); err != nil {
clus.logger.Warn(
"checkCompact failed",
zap.Int64("current-revision", clus.currentRevision),
zap.Int64("compact-revision", rev),
zap.Error(err),
)
return err
}
clus.logger.Info(
"confirmed compaction",
zap.Int64("current-revision", clus.currentRevision),
zap.Int64("compact-revision", rev),
)
return nil
}
func (clus *Cluster) failed() {
if !clus.Tester.ExitOnFailure {
return
}
clus.logger.Info(
"exiting on failure",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
clus.DestroyEtcdAgents()
os.Exit(2)
}
func (clus *Cluster) cleanup() error {
defer clus.failed()
roundFailedTotalCounter.Inc()
desc := "compact/defrag"
if clus.cs != -1 {
desc = clus.failures[clus.cs].Desc()
}
caseFailedTotalCounter.WithLabelValues(desc).Inc()
clus.closeStresser()
if err := clus.FailArchive(); err != nil {
clus.logger.Warn(
"Cleanup failed",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.Error(err),
)
return err
}
if err := clus.Restart(); err != nil {
clus.logger.Warn(
"Restart failed",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.Error(err),
)
return err
}
clus.updateStresserChecker()
return nil
}

View File

@ -0,0 +1,79 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import (
"fmt"
"math/rand"
"net"
"net/url"
"strings"
)
func isValidURL(u string) bool {
_, err := url.Parse(u)
return err == nil
}
func getPort(addr string) (port string, err error) {
urlAddr, err := url.Parse(addr)
if err != nil {
return "", err
}
_, port, err = net.SplitHostPort(urlAddr.Host)
if err != nil {
return "", err
}
return port, nil
}
func getSameValue(vals map[string]int64) bool {
var rv int64
for _, v := range vals {
if rv == 0 {
rv = v
}
if rv != v {
return false
}
}
return true
}
func max(n1, n2 int64) int64 {
if n1 > n2 {
return n1
}
return n2
}
func errsToError(errs []error) error {
if len(errs) == 0 {
return nil
}
stringArr := make([]string, len(errs))
for i, err := range errs {
stringArr[i] = err.Error()
}
return fmt.Errorf(strings.Join(stringArr, ", "))
}
func randBytes(size int) []byte {
data := make([]byte, size)
for i := 0; i < size; i++ {
data[i] = byte(int('a') + rand.Intn(26))
}
return data
}