From 2bc666292efdc004595c5b22db619157bc6950fc Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Tue, 27 Mar 2018 18:27:28 -0700 Subject: [PATCH] functional-tester/tester: initial commit Signed-off-by: Gyuho Lee --- tools/functional-tester/tester/checks.go | 319 ++++++++ tools/functional-tester/tester/cluster.go | 724 ++++++++++++++++++ .../functional-tester/tester/cluster_test.go | 162 ++++ tools/functional-tester/tester/doc.go | 16 + tools/functional-tester/tester/failure.go | 30 + .../tester/failure_case_blackhole.go | 49 ++ .../tester/failure_case_delay.go | 41 + .../tester/failure_case_external.go | 44 ++ .../tester/failure_case_failpoints.go | 159 ++++ .../tester/failure_case_kill.go | 210 +++++ .../tester/failure_case_no_op.go | 26 + .../tester/failure_case_slow_network.go | 85 ++ .../functional-tester/tester/local-test.yaml | 126 +++ tools/functional-tester/tester/metrics.go | 62 ++ tools/functional-tester/tester/stress.go | 202 +++++ tools/functional-tester/tester/stress_key.go | 345 +++++++++ .../functional-tester/tester/stress_lease.go | 485 ++++++++++++ .../functional-tester/tester/stress_runner.go | 97 +++ tools/functional-tester/tester/tester.go | 274 +++++++ tools/functional-tester/tester/utils.go | 79 ++ 20 files changed, 3535 insertions(+) create mode 100644 tools/functional-tester/tester/checks.go create mode 100644 tools/functional-tester/tester/cluster.go create mode 100644 tools/functional-tester/tester/cluster_test.go create mode 100644 tools/functional-tester/tester/doc.go create mode 100644 tools/functional-tester/tester/failure.go create mode 100644 tools/functional-tester/tester/failure_case_blackhole.go create mode 100644 tools/functional-tester/tester/failure_case_delay.go create mode 100644 tools/functional-tester/tester/failure_case_external.go create mode 100644 tools/functional-tester/tester/failure_case_failpoints.go create mode 100644 tools/functional-tester/tester/failure_case_kill.go create mode 100644 tools/functional-tester/tester/failure_case_no_op.go create mode 100644 tools/functional-tester/tester/failure_case_slow_network.go create mode 100644 tools/functional-tester/tester/local-test.yaml create mode 100644 tools/functional-tester/tester/metrics.go create mode 100644 tools/functional-tester/tester/stress.go create mode 100644 tools/functional-tester/tester/stress_key.go create mode 100644 tools/functional-tester/tester/stress_lease.go create mode 100644 tools/functional-tester/tester/stress_runner.go create mode 100644 tools/functional-tester/tester/tester.go create mode 100644 tools/functional-tester/tester/utils.go diff --git a/tools/functional-tester/tester/checks.go b/tools/functional-tester/tester/checks.go new file mode 100644 index 000000000..23e76398b --- /dev/null +++ b/tools/functional-tester/tester/checks.go @@ -0,0 +1,319 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tester + +import ( + "context" + "fmt" + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + + "go.uber.org/zap" + "google.golang.org/grpc" +) + +const retries = 7 + +type Checker interface { + // Check returns an error if the system fails a consistency check. + Check() error +} + +type hashAndRevGetter interface { + getRevisionHash() (revs map[string]int64, hashes map[string]int64, err error) +} + +type hashChecker struct { + logger *zap.Logger + hrg hashAndRevGetter +} + +func newHashChecker(logger *zap.Logger, hrg hashAndRevGetter) Checker { + return &hashChecker{ + logger: logger, + hrg: hrg, + } +} + +const leaseCheckerTimeout = 10 * time.Second + +func (hc *hashChecker) checkRevAndHashes() (err error) { + var ( + revs map[string]int64 + hashes map[string]int64 + ) + + // retries in case of transient failure or etcd cluster has not stablized yet. + for i := 0; i < retries; i++ { + revs, hashes, err = hc.hrg.getRevisionHash() + if err != nil { + hc.logger.Warn( + "failed to get revision and hash", + zap.Int("retries", i), + zap.Error(err), + ) + } else { + sameRev := getSameValue(revs) + sameHashes := getSameValue(hashes) + if sameRev && sameHashes { + return nil + } + hc.logger.Warn( + "retrying; etcd cluster is not stable", + zap.Int("retries", i), + zap.Bool("same-revisions", sameRev), + zap.Bool("same-hashes", sameHashes), + zap.String("revisions", fmt.Sprintf("%+v", revs)), + zap.String("hashes", fmt.Sprintf("%+v", hashes)), + ) + } + time.Sleep(time.Second) + } + + if err != nil { + return fmt.Errorf("failed revision and hash check (%v)", err) + } + + return fmt.Errorf("etcd cluster is not stable: [revisions: %v] and [hashes: %v]", revs, hashes) +} + +func (hc *hashChecker) Check() error { + return hc.checkRevAndHashes() +} + +type leaseChecker struct { + logger *zap.Logger + + // TODO: use Member + endpoint string + + ls *leaseStresser + leaseClient pb.LeaseClient + kvc pb.KVClient +} + +func (lc *leaseChecker) Check() error { + conn, err := grpc.Dial(lc.ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1)) + if err != nil { + return fmt.Errorf("%v (%s)", err, lc.ls.endpoint) + } + defer func() { + if conn != nil { + conn.Close() + } + }() + lc.kvc = pb.NewKVClient(conn) + lc.leaseClient = pb.NewLeaseClient(conn) + if err := lc.check(true, lc.ls.revokedLeases.leases); err != nil { + return err + } + if err := lc.check(false, lc.ls.aliveLeases.leases); err != nil { + return err + } + return lc.checkShortLivedLeases() +} + +// checkShortLivedLeases ensures leases expire. +func (lc *leaseChecker) checkShortLivedLeases() error { + ctx, cancel := context.WithTimeout(context.Background(), leaseCheckerTimeout) + errc := make(chan error) + defer cancel() + for leaseID := range lc.ls.shortLivedLeases.leases { + go func(id int64) { + errc <- lc.checkShortLivedLease(ctx, id) + }(leaseID) + } + + var errs []error + for range lc.ls.shortLivedLeases.leases { + if err := <-errc; err != nil { + errs = append(errs, err) + } + } + return errsToError(errs) +} + +func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64) (err error) { + // retry in case of transient failure or lease is expired but not yet revoked due to the fact that etcd cluster didn't have enought time to delete it. + var resp *pb.LeaseTimeToLiveResponse + for i := 0; i < retries; i++ { + resp, err = lc.getLeaseByID(ctx, leaseID) + // lease not found, for ~v3.1 compatibilities, check ErrLeaseNotFound + if (err == nil && resp.TTL == -1) || (err != nil && rpctypes.Error(err) == rpctypes.ErrLeaseNotFound) { + return nil + } + if err != nil { + lc.logger.Debug( + "retrying; Lease TimeToLive failed", + zap.Int("retries", i), + zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), + zap.Error(err), + ) + continue + } + if resp.TTL > 0 { + dur := time.Duration(resp.TTL) * time.Second + lc.logger.Debug( + "lease has not been expired, wait until expire", + zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), + zap.Int64("ttl", resp.TTL), + zap.Duration("wait-duration", dur), + ) + time.Sleep(dur) + } else { + lc.logger.Debug( + "lease expired but not yet revoked", + zap.Int("retries", i), + zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), + zap.Int64("ttl", resp.TTL), + zap.Duration("wait-duration", time.Second), + ) + time.Sleep(time.Second) + } + if err = lc.checkLease(ctx, false, leaseID); err != nil { + continue + } + return nil + } + return err +} + +func (lc *leaseChecker) checkLease(ctx context.Context, expired bool, leaseID int64) error { + keysExpired, err := lc.hasKeysAttachedToLeaseExpired(ctx, leaseID) + if err != nil { + lc.logger.Warn( + "hasKeysAttachedToLeaseExpired failed", + zap.String("endpoint", lc.endpoint), + zap.Error(err), + ) + return err + } + leaseExpired, err := lc.hasLeaseExpired(ctx, leaseID) + if err != nil { + lc.logger.Warn( + "hasLeaseExpired failed", + zap.String("endpoint", lc.endpoint), + zap.Error(err), + ) + return err + } + if leaseExpired != keysExpired { + return fmt.Errorf("lease %v expiration mismatch (lease expired=%v, keys expired=%v)", leaseID, leaseExpired, keysExpired) + } + if leaseExpired != expired { + return fmt.Errorf("lease %v expected expired=%v, got %v", leaseID, expired, leaseExpired) + } + return nil +} + +func (lc *leaseChecker) check(expired bool, leases map[int64]time.Time) error { + ctx, cancel := context.WithTimeout(context.Background(), leaseCheckerTimeout) + defer cancel() + for leaseID := range leases { + if err := lc.checkLease(ctx, expired, leaseID); err != nil { + return err + } + } + return nil +} + +func (lc *leaseChecker) getLeaseByID(ctx context.Context, leaseID int64) (*pb.LeaseTimeToLiveResponse, error) { + ltl := &pb.LeaseTimeToLiveRequest{ID: leaseID, Keys: true} + return lc.leaseClient.LeaseTimeToLive(ctx, ltl, grpc.FailFast(false)) +} + +func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) { + // keep retrying until lease's state is known or ctx is being canceled + for ctx.Err() == nil { + resp, err := lc.getLeaseByID(ctx, leaseID) + if err != nil { + // for ~v3.1 compatibilities + if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound { + return true, nil + } + } else { + return resp.TTL == -1, nil + } + lc.logger.Warn( + "hasLeaseExpired getLeaseByID failed", + zap.String("endpoint", lc.endpoint), + zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), + zap.Error(err), + ) + } + return false, ctx.Err() +} + +// The keys attached to the lease has the format of "_" where idx is the ordering key creation +// Since the format of keys contains about leaseID, finding keys base on "" prefix +// determines whether the attached keys for a given leaseID has been deleted or not +func (lc *leaseChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, leaseID int64) (bool, error) { + resp, err := lc.kvc.Range(ctx, &pb.RangeRequest{ + Key: []byte(fmt.Sprintf("%d", leaseID)), + RangeEnd: []byte(clientv3.GetPrefixRangeEnd(fmt.Sprintf("%d", leaseID))), + }, grpc.FailFast(false)) + if err != nil { + lc.logger.Warn( + "hasKeysAttachedToLeaseExpired failed", + zap.String("endpoint", lc.endpoint), + zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), + zap.Error(err), + ) + return false, err + } + return len(resp.Kvs) == 0, nil +} + +// compositeChecker implements a checker that runs a slice of Checkers concurrently. +type compositeChecker struct{ checkers []Checker } + +func newCompositeChecker(checkers []Checker) Checker { + return &compositeChecker{checkers} +} + +func (cchecker *compositeChecker) Check() error { + errc := make(chan error) + for _, c := range cchecker.checkers { + go func(chk Checker) { errc <- chk.Check() }(c) + } + var errs []error + for range cchecker.checkers { + if err := <-errc; err != nil { + errs = append(errs, err) + } + } + return errsToError(errs) +} + +type runnerChecker struct { + errc chan error +} + +func (rc *runnerChecker) Check() error { + select { + case err := <-rc.errc: + return err + default: + return nil + } +} + +type noChecker struct{} + +func newNoChecker() Checker { return &noChecker{} } +func (nc *noChecker) Check() error { return nil } diff --git a/tools/functional-tester/tester/cluster.go b/tools/functional-tester/tester/cluster.go new file mode 100644 index 000000000..89302e8c6 --- /dev/null +++ b/tools/functional-tester/tester/cluster.go @@ -0,0 +1,724 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tester + +import ( + "context" + "errors" + "fmt" + "io/ioutil" + "net/http" + "path/filepath" + "strings" + "time" + + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/debugutil" + "github.com/coreos/etcd/tools/functional-tester/rpcpb" + "golang.org/x/time/rate" + + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.uber.org/zap" + "google.golang.org/grpc" + yaml "gopkg.in/yaml.v2" +) + +// Cluster defines tester cluster. +type Cluster struct { + logger *zap.Logger + + agentConns []*grpc.ClientConn + agentClients []rpcpb.TransportClient + agentStreams []rpcpb.Transport_TransportClient + agentRequests []*rpcpb.Request + + testerHTTPServer *http.Server + + Members []*rpcpb.Member `yaml:"agent-configs"` + Tester *rpcpb.Tester `yaml:"tester-config"` + + failures []Failure + + rateLimiter *rate.Limiter + stresser Stresser + checker Checker + + currentRevision int64 + rd int + cs int +} + +func newCluster(logger *zap.Logger, fpath string) (*Cluster, error) { + logger.Info("reading configuration file", zap.String("path", fpath)) + bts, err := ioutil.ReadFile(fpath) + if err != nil { + return nil, err + } + logger.Info("opened configuration file", zap.String("path", fpath)) + + clus := &Cluster{logger: logger} + if err = yaml.Unmarshal(bts, clus); err != nil { + return nil, err + } + + for i := range clus.Members { + if clus.Members[i].BaseDir == "" { + return nil, fmt.Errorf("Members[i].BaseDir cannot be empty (got %q)", clus.Members[i].BaseDir) + } + if clus.Members[i].EtcdLogPath == "" { + return nil, fmt.Errorf("Members[i].EtcdLogPath cannot be empty (got %q)", clus.Members[i].EtcdLogPath) + } + + if clus.Members[i].Etcd.Name == "" { + return nil, fmt.Errorf("'--name' cannot be empty (got %+v)", clus.Members[i]) + } + if clus.Members[i].Etcd.DataDir == "" { + return nil, fmt.Errorf("'--data-dir' cannot be empty (got %+v)", clus.Members[i]) + } + if clus.Members[i].Etcd.SnapshotCount == 0 { + return nil, fmt.Errorf("'--snapshot-count' cannot be 0 (got %+v)", clus.Members[i].Etcd.SnapshotCount) + } + if clus.Members[i].Etcd.DataDir == "" { + return nil, fmt.Errorf("'--data-dir' cannot be empty (got %q)", clus.Members[i].Etcd.DataDir) + } + if clus.Members[i].Etcd.WALDir == "" { + clus.Members[i].Etcd.WALDir = filepath.Join(clus.Members[i].Etcd.DataDir, "member", "wal") + } + + port := "" + listenClientPorts := make([]string, len(clus.Members)) + for i, u := range clus.Members[i].Etcd.ListenClientURLs { + if !isValidURL(u) { + return nil, fmt.Errorf("'--listen-client-urls' has valid URL %q", u) + } + listenClientPorts[i], err = getPort(u) + if err != nil { + return nil, fmt.Errorf("'--listen-client-urls' has no port %q", u) + } + } + for i, u := range clus.Members[i].Etcd.AdvertiseClientURLs { + if !isValidURL(u) { + return nil, fmt.Errorf("'--advertise-client-urls' has valid URL %q", u) + } + port, err = getPort(u) + if err != nil { + return nil, fmt.Errorf("'--advertise-client-urls' has no port %q", u) + } + if clus.Members[i].EtcdClientProxy && listenClientPorts[i] == port { + return nil, fmt.Errorf("clus.Members[%d] requires client port proxy, but advertise port %q conflicts with listener port %q", i, port, listenClientPorts[i]) + } + } + + listenPeerPorts := make([]string, len(clus.Members)) + for i, u := range clus.Members[i].Etcd.ListenPeerURLs { + if !isValidURL(u) { + return nil, fmt.Errorf("'--listen-peer-urls' has valid URL %q", u) + } + listenPeerPorts[i], err = getPort(u) + if err != nil { + return nil, fmt.Errorf("'--listen-peer-urls' has no port %q", u) + } + } + for i, u := range clus.Members[i].Etcd.InitialAdvertisePeerURLs { + if !isValidURL(u) { + return nil, fmt.Errorf("'--initial-advertise-peer-urls' has valid URL %q", u) + } + port, err = getPort(u) + if err != nil { + return nil, fmt.Errorf("'--initial-advertise-peer-urls' has no port %q", u) + } + if clus.Members[i].EtcdPeerProxy && listenPeerPorts[i] == port { + return nil, fmt.Errorf("clus.Members[%d] requires peer port proxy, but advertise port %q conflicts with listener port %q", i, port, listenPeerPorts[i]) + } + } + + if !strings.HasPrefix(clus.Members[i].EtcdLogPath, clus.Members[i].BaseDir) { + return nil, fmt.Errorf("EtcdLogPath must be prefixed with BaseDir (got %q)", clus.Members[i].EtcdLogPath) + } + if !strings.HasPrefix(clus.Members[i].Etcd.DataDir, clus.Members[i].BaseDir) { + return nil, fmt.Errorf("Etcd.DataDir must be prefixed with BaseDir (got %q)", clus.Members[i].Etcd.DataDir) + } + + // TODO: support separate WALDir that can be handled via failure-archive + if !strings.HasPrefix(clus.Members[i].Etcd.WALDir, clus.Members[i].BaseDir) { + return nil, fmt.Errorf("Etcd.WALDir must be prefixed with BaseDir (got %q)", clus.Members[i].Etcd.WALDir) + } + + if len(clus.Tester.FailureCases) == 0 { + return nil, errors.New("FailureCases not found") + } + } + + for _, v := range clus.Tester.FailureCases { + if _, ok := rpcpb.FailureCase_value[v]; !ok { + return nil, fmt.Errorf("%q is not defined in 'rpcpb.FailureCase_value'", v) + } + } + + for _, v := range clus.Tester.StressTypes { + if _, ok := rpcpb.StressType_value[v]; !ok { + return nil, fmt.Errorf("StressType is unknown; got %q", v) + } + } + if clus.Tester.StressKeySuffixRangeTxn > 100 { + return nil, fmt.Errorf("StressKeySuffixRangeTxn maximum value is 100, got %v", clus.Tester.StressKeySuffixRangeTxn) + } + if clus.Tester.StressKeyTxnOps > 64 { + return nil, fmt.Errorf("StressKeyTxnOps maximum value is 64, got %v", clus.Tester.StressKeyTxnOps) + } + + return clus, err +} + +// TODO: status handler + +var dialOpts = []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithTimeout(5 * time.Second), + grpc.WithBlock(), +} + +// NewCluster creates a client from a tester configuration. +func NewCluster(logger *zap.Logger, fpath string) (*Cluster, error) { + clus, err := newCluster(logger, fpath) + if err != nil { + return nil, err + } + + clus.agentConns = make([]*grpc.ClientConn, len(clus.Members)) + clus.agentClients = make([]rpcpb.TransportClient, len(clus.Members)) + clus.agentStreams = make([]rpcpb.Transport_TransportClient, len(clus.Members)) + clus.agentRequests = make([]*rpcpb.Request, len(clus.Members)) + clus.failures = make([]Failure, 0) + + for i, ap := range clus.Members { + logger.Info("connecting", zap.String("agent-address", ap.AgentAddr)) + var err error + clus.agentConns[i], err = grpc.Dial(ap.AgentAddr, dialOpts...) + if err != nil { + return nil, err + } + clus.agentClients[i] = rpcpb.NewTransportClient(clus.agentConns[i]) + logger.Info("connected", zap.String("agent-address", ap.AgentAddr)) + + logger.Info("creating stream", zap.String("agent-address", ap.AgentAddr)) + clus.agentStreams[i], err = clus.agentClients[i].Transport(context.Background()) + if err != nil { + return nil, err + } + logger.Info("created stream", zap.String("agent-address", ap.AgentAddr)) + } + + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + if clus.Tester.EnablePprof { + for p, h := range debugutil.PProfHandlers() { + mux.Handle(p, h) + } + } + clus.testerHTTPServer = &http.Server{ + Addr: clus.Tester.TesterAddr, + Handler: mux, + } + go clus.serveTesterServer() + + for _, cs := range clus.Tester.FailureCases { + switch cs { + case "KILL_ONE_FOLLOWER": + clus.failures = append(clus.failures, newFailureKillOne()) // TODO + case "KILL_LEADER": + clus.failures = append(clus.failures, newFailureKillLeader()) + case "KILL_ONE_FOLLOWER_FOR_LONG": + clus.failures = append(clus.failures, newFailureKillOneForLongTime()) // TODO + case "KILL_LEADER_FOR_LONG": + clus.failures = append(clus.failures, newFailureKillLeaderForLongTime()) + case "KILL_QUORUM": + clus.failures = append(clus.failures, newFailureKillQuorum()) + case "KILL_ALL": + clus.failures = append(clus.failures, newFailureKillAll()) + case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER": + clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOne()) // TODO + case "BLACKHOLE_PEER_PORT_TX_RX_LEADER_ONE": + clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOne()) // TODO + case "BLACKHOLE_PEER_PORT_TX_RX_ALL": + clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxAll()) + case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER": + clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneMember()) // TODO + case "DELAY_PEER_PORT_TX_RX_LEADER": + clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeader()) // TODO + case "DELAY_PEER_PORT_TX_RX_ALL": + clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxAll()) // TODO + case "FAILPOINTS": + fpFailures, fperr := failpointFailures(clus) + if len(fpFailures) == 0 { + clus.logger.Info("no failpoints found!", zap.Error(fperr)) + } + clus.failures = append(clus.failures, fpFailures...) + case "NO_FAIL": + clus.failures = append(clus.failures, newFailureNoOp()) + case "EXTERNAL": + clus.failures = append(clus.failures, newFailureExternal(clus.Tester.ExternalExecPath)) + default: + return nil, fmt.Errorf("unknown failure %q", cs) + } + } + + clus.rateLimiter = rate.NewLimiter( + rate.Limit(int(clus.Tester.StressQPS)), + int(clus.Tester.StressQPS), + ) + clus.updateStresserChecker() + return clus, nil +} + +func (clus *Cluster) serveTesterServer() { + clus.logger.Info( + "started tester HTTP server", + zap.String("tester-address", clus.Tester.TesterAddr), + ) + err := clus.testerHTTPServer.ListenAndServe() + clus.logger.Info( + "tester HTTP server returned", + zap.String("tester-address", clus.Tester.TesterAddr), + zap.Error(err), + ) + if err != nil && err != http.ErrServerClosed { + clus.logger.Fatal("tester HTTP errored", zap.Error(err)) + } +} + +func (clus *Cluster) updateStresserChecker() { + clus.logger.Info( + "updating stressers", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + ) + + cs := &compositeStresser{} + for idx := range clus.Members { + cs.stressers = append(cs.stressers, newStresser(clus, idx)) + } + clus.stresser = cs + + clus.checker = newHashChecker(clus.logger, hashAndRevGetter(clus)) + if schk := cs.Checker(); schk != nil { + clus.checker = newCompositeChecker([]Checker{clus.checker, schk}) + } + + clus.logger.Info( + "updated stressers", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + ) +} + +func (clus *Cluster) startStresser() (err error) { + clus.logger.Info( + "starting stressers", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + ) + err = clus.stresser.Stress() + clus.logger.Info( + "started stressers", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + ) + return err +} + +func (clus *Cluster) closeStresser() { + clus.logger.Info( + "closing stressers", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + ) + clus.stresser.Close() + clus.logger.Info( + "closed stressers", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + ) +} + +func (clus *Cluster) pauseStresser() { + clus.logger.Info( + "pausing stressers", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + ) + clus.stresser.Pause() + clus.logger.Info( + "paused stressers", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + ) +} + +func (clus *Cluster) checkConsistency() (err error) { + defer func() { + if err != nil { + return + } + if err = clus.updateRevision(); err != nil { + clus.logger.Warn( + "updateRevision failed", + zap.Error(err), + ) + return + } + err = clus.startStresser() + }() + + clus.logger.Info( + "checking consistency and invariant of cluster", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + zap.String("desc", clus.failures[clus.cs].Desc()), + ) + if err = clus.checker.Check(); err != nil { + clus.logger.Warn( + "checker.Check failed", + zap.Error(err), + ) + return err + } + clus.logger.Info( + "checked consistency and invariant of cluster", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + zap.String("desc", clus.failures[clus.cs].Desc()), + ) + + return err +} + +// Bootstrap bootstraps etcd cluster the very first time. +// After this, just continue to call kill/restart. +func (clus *Cluster) Bootstrap() error { + // this is the only time that creates request from scratch + return clus.broadcastOperation(rpcpb.Operation_InitialStartEtcd) +} + +// FailArchive sends "FailArchive" operation. +func (clus *Cluster) FailArchive() error { + return clus.broadcastOperation(rpcpb.Operation_FailArchive) +} + +// Restart sends "Restart" operation. +func (clus *Cluster) Restart() error { + return clus.broadcastOperation(rpcpb.Operation_RestartEtcd) +} + +func (clus *Cluster) broadcastOperation(op rpcpb.Operation) error { + for i := range clus.agentStreams { + err := clus.sendOperation(i, op) + if err != nil { + if op == rpcpb.Operation_DestroyEtcdAgent && + strings.Contains(err.Error(), "rpc error: code = Unavailable desc = transport is closing") { + // agent server has already closed; + // so this error is expected + clus.logger.Info( + "successfully destroyed", + zap.String("member", clus.Members[i].EtcdClientEndpoint), + ) + continue + } + return err + } + } + return nil +} + +func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error { + if op == rpcpb.Operation_InitialStartEtcd { + clus.agentRequests[idx] = &rpcpb.Request{ + Operation: op, + Member: clus.Members[idx], + Tester: clus.Tester, + } + } else { + clus.agentRequests[idx].Operation = op + } + + clus.logger.Info( + "sending request", + zap.String("operation", op.String()), + zap.String("to", clus.Members[idx].EtcdClientEndpoint), + ) + err := clus.agentStreams[idx].Send(clus.agentRequests[idx]) + clus.logger.Info( + "sent request", + zap.String("operation", op.String()), + zap.String("to", clus.Members[idx].EtcdClientEndpoint), + zap.Error(err), + ) + if err != nil { + return err + } + + clus.logger.Info( + "receiving response", + zap.String("operation", op.String()), + zap.String("from", clus.Members[idx].EtcdClientEndpoint), + ) + resp, err := clus.agentStreams[idx].Recv() + if resp != nil { + clus.logger.Info( + "received response", + zap.String("operation", op.String()), + zap.String("from", clus.Members[idx].EtcdClientEndpoint), + zap.Bool("success", resp.Success), + zap.String("status", resp.Status), + zap.Error(err), + ) + } else { + clus.logger.Info( + "received empty response", + zap.String("operation", op.String()), + zap.String("from", clus.Members[idx].EtcdClientEndpoint), + zap.Error(err), + ) + } + if err != nil { + return err + } + + if !resp.Success { + err = errors.New(resp.Status) + } + return err +} + +// DestroyEtcdAgents terminates all tester connections to agents and etcd servers. +func (clus *Cluster) DestroyEtcdAgents() { + clus.logger.Info("destroying etcd servers and agents") + err := clus.broadcastOperation(rpcpb.Operation_DestroyEtcdAgent) + if err != nil { + clus.logger.Warn("failed to destroy etcd servers and agents", zap.Error(err)) + } else { + clus.logger.Info("destroyed etcd servers and agents") + } + + for i, conn := range clus.agentConns { + clus.logger.Info("closing connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr)) + err := conn.Close() + clus.logger.Info("closed connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr), zap.Error(err)) + } + + // TODO: closing stresser connections to etcd + + if clus.testerHTTPServer != nil { + clus.logger.Info("closing tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr)) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + err := clus.testerHTTPServer.Shutdown(ctx) + cancel() + clus.logger.Info("closed tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr), zap.Error(err)) + } +} + +// WaitHealth ensures all members are healthy +// by writing a test key to etcd cluster. +func (clus *Cluster) WaitHealth() error { + var err error + // wait 60s to check cluster health. + // TODO: set it to a reasonable value. It is set that high because + // follower may use long time to catch up the leader when reboot under + // reasonable workload (https://github.com/coreos/etcd/issues/2698) + for i := 0; i < 60; i++ { + for _, m := range clus.Members { + clus.logger.Info( + "writing health key", + zap.Int("retries", i), + zap.String("endpoint", m.EtcdClientEndpoint), + ) + if err = m.WriteHealthKey(); err != nil { + clus.logger.Warn( + "writing health key failed", + zap.Int("retries", i), + zap.String("endpoint", m.EtcdClientEndpoint), + zap.Error(err), + ) + break + } + clus.logger.Info( + "successfully wrote health key", + zap.Int("retries", i), + zap.String("endpoint", m.EtcdClientEndpoint), + ) + } + if err == nil { + clus.logger.Info( + "writing health key success on all members", + zap.Int("retries", i), + ) + return nil + } + time.Sleep(time.Second) + } + return err +} + +// GetLeader returns the index of leader and error if any. +func (clus *Cluster) GetLeader() (int, error) { + for i, m := range clus.Members { + isLeader, err := m.IsLeader() + if isLeader || err != nil { + return i, err + } + } + return 0, fmt.Errorf("no leader found") +} + +// maxRev returns the maximum revision found on the cluster. +func (clus *Cluster) maxRev() (rev int64, err error) { + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() + revc, errc := make(chan int64, len(clus.Members)), make(chan error, len(clus.Members)) + for i := range clus.Members { + go func(m *rpcpb.Member) { + mrev, merr := m.Rev(ctx) + revc <- mrev + errc <- merr + }(clus.Members[i]) + } + for i := 0; i < len(clus.Members); i++ { + if merr := <-errc; merr != nil { + err = merr + } + if mrev := <-revc; mrev > rev { + rev = mrev + } + } + return rev, err +} + +func (clus *Cluster) getRevisionHash() (map[string]int64, map[string]int64, error) { + revs := make(map[string]int64) + hashes := make(map[string]int64) + for _, m := range clus.Members { + rev, hash, err := m.RevHash() + if err != nil { + return nil, nil, err + } + revs[m.EtcdClientEndpoint] = rev + hashes[m.EtcdClientEndpoint] = hash + } + return revs, hashes, nil +} + +func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) { + if rev <= 0 { + return nil + } + + for i, m := range clus.Members { + conn, derr := m.DialEtcdGRPCServer() + if derr != nil { + clus.logger.Warn( + "compactKV dial failed", + zap.String("endpoint", m.EtcdClientEndpoint), + zap.Error(derr), + ) + err = derr + continue + } + kvc := pb.NewKVClient(conn) + + clus.logger.Info( + "starting compaction", + zap.String("endpoint", m.EtcdClientEndpoint), + zap.Int64("revision", rev), + zap.Duration("timeout", timeout), + ) + + now := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + _, cerr := kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev, Physical: true}, grpc.FailFast(false)) + cancel() + + conn.Close() + succeed := true + if cerr != nil { + if strings.Contains(cerr.Error(), "required revision has been compacted") && i > 0 { + clus.logger.Info( + "compact error is ignored", + zap.String("endpoint", m.EtcdClientEndpoint), + zap.Int64("revision", rev), + zap.Error(cerr), + ) + } else { + clus.logger.Warn( + "compact failed", + zap.String("endpoint", m.EtcdClientEndpoint), + zap.Int64("revision", rev), + zap.Error(cerr), + ) + err = cerr + succeed = false + } + } + + if succeed { + clus.logger.Info( + "finished compaction", + zap.String("endpoint", m.EtcdClientEndpoint), + zap.Int64("revision", rev), + zap.Duration("timeout", timeout), + zap.Duration("took", time.Since(now)), + ) + } + } + return err +} + +func (clus *Cluster) checkCompact(rev int64) error { + if rev == 0 { + return nil + } + for _, m := range clus.Members { + if err := m.CheckCompact(rev); err != nil { + return err + } + } + return nil +} + +func (clus *Cluster) defrag() error { + clus.logger.Info( + "defragmenting", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + ) + for _, m := range clus.Members { + if err := m.Defrag(); err != nil { + clus.logger.Warn( + "defrag failed", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + zap.Error(err), + ) + return err + } + } + clus.logger.Info( + "defragmented", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + ) + return nil +} + +func (clus *Cluster) Report() int64 { return clus.stresser.ModifiedKeys() } diff --git a/tools/functional-tester/tester/cluster_test.go b/tools/functional-tester/tester/cluster_test.go new file mode 100644 index 000000000..331606d19 --- /dev/null +++ b/tools/functional-tester/tester/cluster_test.go @@ -0,0 +1,162 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tester + +import ( + "reflect" + "testing" + + "github.com/coreos/etcd/tools/functional-tester/rpcpb" + + "go.uber.org/zap" +) + +func Test_newCluster(t *testing.T) { + exp := &Cluster{ + Members: []*rpcpb.Member{ + { + EtcdExecPath: "./bin/etcd", + AgentAddr: "127.0.0.1:19027", + FailpointHTTPAddr: "http://127.0.0.1:7381", + BaseDir: "/tmp/etcd-agent-data-1", + EtcdLogPath: "/tmp/etcd-agent-data-1/current-etcd.log", + EtcdClientTLS: false, + EtcdClientProxy: false, + EtcdPeerProxy: true, + EtcdClientEndpoint: "127.0.0.1:1379", + Etcd: &rpcpb.Etcd{ + Name: "s1", + DataDir: "/tmp/etcd-agent-data-1/etcd.data", + WALDir: "/tmp/etcd-agent-data-1/etcd.data/member/wal", + ListenClientURLs: []string{"http://127.0.0.1:1379"}, + AdvertiseClientURLs: []string{"http://127.0.0.1:1379"}, + ListenPeerURLs: []string{"http://127.0.0.1:1380"}, + InitialAdvertisePeerURLs: []string{"http://127.0.0.1:13800"}, + InitialCluster: "s1=http://127.0.0.1:13800,s2=http://127.0.0.1:23800,s3=http://127.0.0.1:33800", + InitialClusterState: "new", + InitialClusterToken: "tkn", + SnapshotCount: 10000, + QuotaBackendBytes: 10740000000, + PreVote: true, + InitialCorruptCheck: true, + }, + }, + { + EtcdExecPath: "./bin/etcd", + AgentAddr: "127.0.0.1:29027", + FailpointHTTPAddr: "http://127.0.0.1:7382", + BaseDir: "/tmp/etcd-agent-data-2", + EtcdLogPath: "/tmp/etcd-agent-data-2/current-etcd.log", + EtcdClientTLS: false, + EtcdClientProxy: false, + EtcdPeerProxy: true, + EtcdClientEndpoint: "127.0.0.1:2379", + Etcd: &rpcpb.Etcd{ + Name: "s2", + DataDir: "/tmp/etcd-agent-data-2/etcd.data", + WALDir: "/tmp/etcd-agent-data-2/etcd.data/member/wal", + ListenClientURLs: []string{"http://127.0.0.1:2379"}, + AdvertiseClientURLs: []string{"http://127.0.0.1:2379"}, + ListenPeerURLs: []string{"http://127.0.0.1:2380"}, + InitialAdvertisePeerURLs: []string{"http://127.0.0.1:23800"}, + InitialCluster: "s1=http://127.0.0.1:13800,s2=http://127.0.0.1:23800,s3=http://127.0.0.1:33800", + InitialClusterState: "new", + InitialClusterToken: "tkn", + SnapshotCount: 10000, + QuotaBackendBytes: 10740000000, + PreVote: true, + InitialCorruptCheck: true, + }, + }, + { + EtcdExecPath: "./bin/etcd", + AgentAddr: "127.0.0.1:39027", + FailpointHTTPAddr: "http://127.0.0.1:7383", + BaseDir: "/tmp/etcd-agent-data-3", + EtcdLogPath: "/tmp/etcd-agent-data-3/current-etcd.log", + EtcdClientTLS: false, + EtcdClientProxy: false, + EtcdPeerProxy: true, + EtcdClientEndpoint: "127.0.0.1:3379", + Etcd: &rpcpb.Etcd{ + Name: "s3", + DataDir: "/tmp/etcd-agent-data-3/etcd.data", + WALDir: "/tmp/etcd-agent-data-3/etcd.data/member/wal", + ListenClientURLs: []string{"http://127.0.0.1:3379"}, + AdvertiseClientURLs: []string{"http://127.0.0.1:3379"}, + ListenPeerURLs: []string{"http://127.0.0.1:3380"}, + InitialAdvertisePeerURLs: []string{"http://127.0.0.1:33800"}, + InitialCluster: "s1=http://127.0.0.1:13800,s2=http://127.0.0.1:23800,s3=http://127.0.0.1:33800", + InitialClusterState: "new", + InitialClusterToken: "tkn", + SnapshotCount: 10000, + QuotaBackendBytes: 10740000000, + PreVote: true, + InitialCorruptCheck: true, + }, + }, + }, + Tester: &rpcpb.Tester{ + TesterNetwork: "tcp", + TesterAddr: "127.0.0.1:9028", + DelayLatencyMs: 500, + DelayLatencyMsRv: 50, + RoundLimit: 1, + ExitOnFailure: true, + ConsistencyCheck: true, + EnablePprof: true, + FailureCases: []string{ + "KILL_ONE_FOLLOWER", + "KILL_LEADER", + "KILL_ONE_FOLLOWER_FOR_LONG", + "KILL_LEADER_FOR_LONG", + "KILL_QUORUM", + "KILL_ALL", + "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER", + "BLACKHOLE_PEER_PORT_TX_RX_LEADER_ONE", + "BLACKHOLE_PEER_PORT_TX_RX_ALL", + "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER", + "DELAY_PEER_PORT_TX_RX_LEADER", + "DELAY_PEER_PORT_TX_RX_ALL", + }, + FailpointCommands: []string{`panic("etcd-tester")`}, + RunnerExecPath: "/etcd-runner", + ExternalExecPath: "", + StressTypes: []string{"KV", "LEASE"}, + StressKeySize: 100, + StressKeySizeLarge: 32769, + StressKeySuffixRange: 250000, + StressKeySuffixRangeTxn: 100, + StressKeyTxnOps: 10, + StressQPS: 1000, + }, + } + + logger, err := zap.NewProduction() + if err != nil { + t.Fatal(err) + } + defer logger.Sync() + + cfg, err := newCluster(logger, "./local-test.yaml") + if err != nil { + t.Fatal(err) + } + cfg.logger = nil + + if !reflect.DeepEqual(exp, cfg) { + t.Fatalf("expected %+v, got %+v", exp, cfg) + } +} diff --git a/tools/functional-tester/tester/doc.go b/tools/functional-tester/tester/doc.go new file mode 100644 index 000000000..d1e23e941 --- /dev/null +++ b/tools/functional-tester/tester/doc.go @@ -0,0 +1,16 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package tester implements functional-tester tester server. +package tester diff --git a/tools/functional-tester/tester/failure.go b/tools/functional-tester/tester/failure.go new file mode 100644 index 000000000..4947c86ae --- /dev/null +++ b/tools/functional-tester/tester/failure.go @@ -0,0 +1,30 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tester + +// Failure defines failure injection interface. +// To add a fail case: +// 1. implement "Failure" interface +// 2. define fail case name in "rpcpb.FailureCase" +type Failure interface { + // Inject injeccts the failure into the testing cluster at the given + // round. When calling the function, the cluster should be in health. + Inject(clus *Cluster, round int) error + // Recover recovers the injected failure caused by the injection of the + // given round and wait for the recovery of the testing cluster. + Recover(clus *Cluster, round int) error + // Desc returns a description of the failure + Desc() string +} diff --git a/tools/functional-tester/tester/failure_case_blackhole.go b/tools/functional-tester/tester/failure_case_blackhole.go new file mode 100644 index 000000000..44e698d04 --- /dev/null +++ b/tools/functional-tester/tester/failure_case_blackhole.go @@ -0,0 +1,49 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tester + +import "github.com/coreos/etcd/tools/functional-tester/rpcpb" + +func injectBlackholePeerPortTxRx(clus *Cluster, idx int) error { + return clus.sendOperation(idx, rpcpb.Operation_BlackholePeerPortTxRx) +} + +func recoverBlackholePeerPortTxRx(clus *Cluster, idx int) error { + return clus.sendOperation(idx, rpcpb.Operation_UnblackholePeerPortTxRx) +} + +func newFailureBlackholePeerPortTxRxOne() Failure { + f := &failureOne{ + description: "blackhole peer port on one member", + injectMember: injectBlackholePeerPortTxRx, + recoverMember: recoverBlackholePeerPortTxRx, + } + return &failureDelay{ + Failure: f, + delayDuration: triggerElectionDur, + } +} + +func newFailureBlackholePeerPortTxRxAll() Failure { + f := &failureAll{ + description: "blackhole peer port on all members", + injectMember: injectBlackholePeerPortTxRx, + recoverMember: recoverBlackholePeerPortTxRx, + } + return &failureDelay{ + Failure: f, + delayDuration: triggerElectionDur, + } +} diff --git a/tools/functional-tester/tester/failure_case_delay.go b/tools/functional-tester/tester/failure_case_delay.go new file mode 100644 index 000000000..3a7c975a1 --- /dev/null +++ b/tools/functional-tester/tester/failure_case_delay.go @@ -0,0 +1,41 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tester + +import ( + "time" + + "go.uber.org/zap" +) + +type failureDelay struct { + Failure + delayDuration time.Duration +} + +func (f *failureDelay) Inject(clus *Cluster, round int) error { + if err := f.Failure.Inject(clus, round); err != nil { + return err + } + if f.delayDuration > 0 { + clus.logger.Info( + "sleeping in failureDelay", + zap.Duration("delay", f.delayDuration), + zap.String("case", f.Failure.Desc()), + ) + time.Sleep(f.delayDuration) + } + return nil +} diff --git a/tools/functional-tester/tester/failure_case_external.go b/tools/functional-tester/tester/failure_case_external.go new file mode 100644 index 000000000..4f88b60f3 --- /dev/null +++ b/tools/functional-tester/tester/failure_case_external.go @@ -0,0 +1,44 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tester + +import ( + "fmt" + "os/exec" +) + +type failureExternal struct { + Failure + + description string + scriptPath string +} + +func (f *failureExternal) Inject(clus *Cluster, round int) error { + return exec.Command(f.scriptPath, "enable", fmt.Sprintf("%d", round)).Run() +} + +func (f *failureExternal) Recover(clus *Cluster, round int) error { + return exec.Command(f.scriptPath, "disable", fmt.Sprintf("%d", round)).Run() +} + +func (f *failureExternal) Desc() string { return f.description } + +func newFailureExternal(scriptPath string) Failure { + return &failureExternal{ + description: fmt.Sprintf("external fault injector (script: %q)", scriptPath), + scriptPath: scriptPath, + } +} diff --git a/tools/functional-tester/tester/failure_case_failpoints.go b/tools/functional-tester/tester/failure_case_failpoints.go new file mode 100644 index 000000000..e6ceaf442 --- /dev/null +++ b/tools/functional-tester/tester/failure_case_failpoints.go @@ -0,0 +1,159 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tester + +import ( + "fmt" + "io/ioutil" + "net/http" + "strings" + "sync" + "time" +) + +type failpointStats struct { + mu sync.Mutex + // crashes counts the number of crashes for a failpoint + crashes map[string]int +} + +var fpStats failpointStats + +func failpointFailures(clus *Cluster) (ret []Failure, err error) { + var fps []string + fps, err = failpointPaths(clus.Members[0].FailpointHTTPAddr) + if err != nil { + return nil, err + } + // create failure objects for all failpoints + for _, fp := range fps { + if len(fp) == 0 { + continue + } + fpFails := failuresFromFailpoint(fp, clus.Tester.FailpointCommands) + // wrap in delays so failpoint has time to trigger + for i, fpf := range fpFails { + if strings.Contains(fp, "Snap") { + // hack to trigger snapshot failpoints + fpFails[i] = &failureUntilSnapshot{fpf} + } else { + fpFails[i] = &failureDelay{fpf, 3 * time.Second} + } + } + ret = append(ret, fpFails...) + } + fpStats.crashes = make(map[string]int) + return ret, err +} + +func failpointPaths(endpoint string) ([]string, error) { + resp, err := http.Get(endpoint) + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, rerr := ioutil.ReadAll(resp.Body) + if rerr != nil { + return nil, rerr + } + var fps []string + for _, l := range strings.Split(string(body), "\n") { + fp := strings.Split(l, "=")[0] + fps = append(fps, fp) + } + return fps, nil +} + +// failpoints follows FreeBSD KFAIL_POINT syntax. +// e.g. panic("etcd-tester"),1*sleep(1000)->panic("etcd-tester") +func failuresFromFailpoint(fp string, failpointCommands []string) (fs []Failure) { + recov := makeRecoverFailpoint(fp) + for _, fcmd := range failpointCommands { + inject := makeInjectFailpoint(fp, fcmd) + fs = append(fs, []Failure{ + &failureOne{ + description: description(fmt.Sprintf("failpoint %s (one: %s)", fp, fcmd)), + injectMember: inject, + recoverMember: recov, + }, + &failureAll{ + description: description(fmt.Sprintf("failpoint %s (all: %s)", fp, fcmd)), + injectMember: inject, + recoverMember: recov, + }, + &failureQuorum{ + description: description(fmt.Sprintf("failpoint %s (majority: %s)", fp, fcmd)), + injectMember: inject, + recoverMember: recov, + }, + &failureLeader{ + failureByFunc{ + description: description(fmt.Sprintf("failpoint %s (leader: %s)", fp, fcmd)), + injectMember: inject, + recoverMember: recov, + }, + 0, + }, + }...) + } + return fs +} + +func makeInjectFailpoint(fp, val string) injectMemberFunc { + return func(clus *Cluster, idx int) (err error) { + return putFailpoint(clus.Members[idx].FailpointHTTPAddr, fp, val) + } +} + +func makeRecoverFailpoint(fp string) recoverMemberFunc { + return func(clus *Cluster, idx int) error { + if err := delFailpoint(clus.Members[idx].FailpointHTTPAddr, fp); err == nil { + return nil + } + // node not responding, likely dead from fp panic; restart + fpStats.mu.Lock() + fpStats.crashes[fp]++ + fpStats.mu.Unlock() + return recoverKill(clus, idx) + } +} + +func putFailpoint(ep, fp, val string) error { + req, _ := http.NewRequest(http.MethodPut, ep+"/"+fp, strings.NewReader(val)) + c := http.Client{} + resp, err := c.Do(req) + if err != nil { + return err + } + resp.Body.Close() + if resp.StatusCode/100 != 2 { + return fmt.Errorf("failed to PUT %s=%s at %s (%v)", fp, val, ep, resp.Status) + } + return nil +} + +func delFailpoint(ep, fp string) error { + req, _ := http.NewRequest(http.MethodDelete, ep+"/"+fp, strings.NewReader("")) + c := http.Client{} + resp, err := c.Do(req) + if err != nil { + return err + } + resp.Body.Close() + if resp.StatusCode/100 != 2 { + return fmt.Errorf("failed to DELETE %s at %s (%v)", fp, ep, resp.Status) + } + return nil +} diff --git a/tools/functional-tester/tester/failure_case_kill.go b/tools/functional-tester/tester/failure_case_kill.go new file mode 100644 index 000000000..223e946bf --- /dev/null +++ b/tools/functional-tester/tester/failure_case_kill.go @@ -0,0 +1,210 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tester + +import ( + "fmt" + "math/rand" + "time" + + "github.com/coreos/etcd/tools/functional-tester/rpcpb" +) + +const snapshotCount = 10000 + +func injectKill(clus *Cluster, idx int) error { + return clus.sendOperation(idx, rpcpb.Operation_KillEtcd) +} + +func recoverKill(clus *Cluster, idx int) error { + return clus.sendOperation(idx, rpcpb.Operation_RestartEtcd) +} + +func newFailureKillAll() Failure { + return &failureAll{ + description: "kill all members", + injectMember: injectKill, + recoverMember: recoverKill, + } +} + +func newFailureKillQuorum() Failure { + return &failureQuorum{ + description: "kill quorum of the cluster", + injectMember: injectKill, + recoverMember: recoverKill, + } +} + +func newFailureKillOne() Failure { + return &failureOne{ + description: "kill one random member", + injectMember: injectKill, + recoverMember: recoverKill, + } +} + +func newFailureKillLeader() Failure { + ff := failureByFunc{ + description: "kill leader member", + injectMember: injectKill, + recoverMember: recoverKill, + } + return &failureLeader{ff, 0} +} + +func newFailureKillOneForLongTime() Failure { + return &failureUntilSnapshot{newFailureKillOne()} +} + +func newFailureKillLeaderForLongTime() Failure { + return &failureUntilSnapshot{newFailureKillLeader()} +} + +type description string + +func (d description) Desc() string { return string(d) } + +type injectMemberFunc func(*Cluster, int) error +type recoverMemberFunc func(*Cluster, int) error + +type failureByFunc struct { + description + injectMember injectMemberFunc + recoverMember recoverMemberFunc +} + +// TODO: support kill follower +type failureOne failureByFunc +type failureAll failureByFunc +type failureQuorum failureByFunc + +type failureLeader struct { + failureByFunc + idx int +} + +// failureUntilSnapshot injects a failure and waits for a snapshot event +type failureUntilSnapshot struct{ Failure } + +func (f *failureOne) Inject(clus *Cluster, round int) error { + return f.injectMember(clus, round%len(clus.Members)) +} + +func (f *failureOne) Recover(clus *Cluster, round int) error { + if err := f.recoverMember(clus, round%len(clus.Members)); err != nil { + return err + } + return clus.WaitHealth() +} + +func (f *failureAll) Inject(clus *Cluster, round int) error { + for i := range clus.Members { + if err := f.injectMember(clus, i); err != nil { + return err + } + } + return nil +} + +func (f *failureAll) Recover(clus *Cluster, round int) error { + for i := range clus.Members { + if err := f.recoverMember(clus, i); err != nil { + return err + } + } + return clus.WaitHealth() +} + +func (f *failureQuorum) Inject(clus *Cluster, round int) error { + for i := range killMap(len(clus.Members), round) { + if err := f.injectMember(clus, i); err != nil { + return err + } + } + return nil +} + +func (f *failureQuorum) Recover(clus *Cluster, round int) error { + for i := range killMap(len(clus.Members), round) { + if err := f.recoverMember(clus, i); err != nil { + return err + } + } + return nil +} + +func (f *failureLeader) Inject(clus *Cluster, round int) error { + idx, err := clus.GetLeader() + if err != nil { + return err + } + f.idx = idx + return f.injectMember(clus, idx) +} + +func (f *failureLeader) Recover(clus *Cluster, round int) error { + if err := f.recoverMember(clus, f.idx); err != nil { + return err + } + return clus.WaitHealth() +} + +func (f *failureUntilSnapshot) Inject(clus *Cluster, round int) error { + if err := f.Failure.Inject(clus, round); err != nil { + return err + } + if len(clus.Members) < 3 { + return nil + } + // maxRev may fail since failure just injected, retry if failed. + startRev, err := clus.maxRev() + for i := 0; i < 10 && startRev == 0; i++ { + startRev, err = clus.maxRev() + } + if startRev == 0 { + return err + } + lastRev := startRev + // Normal healthy cluster could accept 1000req/s at least. + // Give it 3-times time to create a new snapshot. + retry := snapshotCount / 1000 * 3 + for j := 0; j < retry; j++ { + lastRev, _ = clus.maxRev() + // If the number of proposals committed is bigger than snapshot count, + // a new snapshot should have been created. + if lastRev-startRev > snapshotCount { + return nil + } + time.Sleep(time.Second) + } + return fmt.Errorf("cluster too slow: only commit %d requests in %ds", lastRev-startRev, retry) +} + +func (f *failureUntilSnapshot) Desc() string { + return f.Failure.Desc() + " for a long time and expect it to recover from an incoming snapshot" +} + +func killMap(size int, seed int) map[int]bool { + m := make(map[int]bool) + r := rand.New(rand.NewSource(int64(seed))) + majority := size/2 + 1 + for { + m[r.Intn(size)] = true + if len(m) >= majority { + return m + } + } +} diff --git a/tools/functional-tester/tester/failure_case_no_op.go b/tools/functional-tester/tester/failure_case_no_op.go new file mode 100644 index 000000000..719430290 --- /dev/null +++ b/tools/functional-tester/tester/failure_case_no_op.go @@ -0,0 +1,26 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tester + +type failureNoOp failureByFunc + +func (f *failureNoOp) Inject(clus *Cluster, round int) error { return nil } +func (f *failureNoOp) Recover(clus *Cluster, round int) error { return nil } + +func newFailureNoOp() Failure { + return &failureNoOp{ + description: "no failure", + } +} diff --git a/tools/functional-tester/tester/failure_case_slow_network.go b/tools/functional-tester/tester/failure_case_slow_network.go new file mode 100644 index 000000000..5bd932680 --- /dev/null +++ b/tools/functional-tester/tester/failure_case_slow_network.go @@ -0,0 +1,85 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tester + +import ( + "fmt" + "time" + + "github.com/coreos/etcd/tools/functional-tester/rpcpb" +) + +const ( + // TODO + slowNetworkLatency = 500 // 500 millisecond + + // delay duration to trigger leader election (default election timeout 1s) + triggerElectionDur = 5 * time.Second + + // Wait more when it recovers from slow network, because network layer + // needs extra time to propagate traffic control (tc command) change. + // Otherwise, we get different hash values from the previous revision. + // For more detail, please see https://github.com/coreos/etcd/issues/5121. + waitRecover = 5 * time.Second +) + +func injectDelayPeerPortTxRx(clus *Cluster, idx int) error { + return clus.sendOperation(idx, rpcpb.Operation_DelayPeerPortTxRx) +} + +func recoverDelayPeerPortTxRx(clus *Cluster, idx int) error { + err := clus.sendOperation(idx, rpcpb.Operation_UndelayPeerPortTxRx) + time.Sleep(waitRecover) + return err +} + +func newFailureDelayPeerPortTxRxOneMember() Failure { + desc := fmt.Sprintf("delay one member's network by adding %d ms latency", slowNetworkLatency) + f := &failureOne{ + description: description(desc), + injectMember: injectDelayPeerPortTxRx, + recoverMember: recoverDelayPeerPortTxRx, + } + return &failureDelay{ + Failure: f, + delayDuration: triggerElectionDur, + } +} + +func newFailureDelayPeerPortTxRxLeader() Failure { + desc := fmt.Sprintf("delay leader's network by adding %d ms latency", slowNetworkLatency) + ff := failureByFunc{ + description: description(desc), + injectMember: injectDelayPeerPortTxRx, + recoverMember: recoverDelayPeerPortTxRx, + } + f := &failureLeader{ff, 0} + return &failureDelay{ + Failure: f, + delayDuration: triggerElectionDur, + } +} + +func newFailureDelayPeerPortTxRxAll() Failure { + f := &failureAll{ + description: "delay all members' network", + injectMember: injectDelayPeerPortTxRx, + recoverMember: recoverDelayPeerPortTxRx, + } + return &failureDelay{ + Failure: f, + delayDuration: triggerElectionDur, + } +} diff --git a/tools/functional-tester/tester/local-test.yaml b/tools/functional-tester/tester/local-test.yaml new file mode 100644 index 000000000..bc55c9d81 --- /dev/null +++ b/tools/functional-tester/tester/local-test.yaml @@ -0,0 +1,126 @@ +agent-configs: +- etcd-exec-path: ./bin/etcd + agent-addr: 127.0.0.1:19027 + failpoint-http-addr: http://127.0.0.1:7381 + base-dir: /tmp/etcd-agent-data-1 + etcd-log-path: /tmp/etcd-agent-data-1/current-etcd.log + etcd-client-tls: false + etcd-client-proxy: false + etcd-peer-proxy: true + etcd-client-endpoint: 127.0.0.1:1379 + etcd-config: + name: s1 + data-dir: /tmp/etcd-agent-data-1/etcd.data + wal-dir: /tmp/etcd-agent-data-1/etcd.data/member/wal + listen-client-urls: ["http://127.0.0.1:1379"] + advertise-client-urls: ["http://127.0.0.1:1379"] + listen-peer-urls: ["http://127.0.0.1:1380"] + initial-advertise-peer-urls: ["http://127.0.0.1:13800"] + initial-cluster: s1=http://127.0.0.1:13800,s2=http://127.0.0.1:23800,s3=http://127.0.0.1:33800 + initial-cluster-state: new + initial-cluster-token: tkn + snapshot-count: 10000 + quota-backend-bytes: 10740000000 # 10 GiB + pre-vote: true + initial-corrupt-check: true +- etcd-exec-path: ./bin/etcd + agent-addr: 127.0.0.1:29027 + failpoint-http-addr: http://127.0.0.1:7382 + base-dir: /tmp/etcd-agent-data-2 + etcd-log-path: /tmp/etcd-agent-data-2/current-etcd.log + etcd-client-tls: false + etcd-client-proxy: false + etcd-peer-proxy: true + etcd-client-endpoint: 127.0.0.1:2379 + etcd-config: + name: s2 + data-dir: /tmp/etcd-agent-data-2/etcd.data + wal-dir: /tmp/etcd-agent-data-2/etcd.data/member/wal + listen-client-urls: ["http://127.0.0.1:2379"] + advertise-client-urls: ["http://127.0.0.1:2379"] + listen-peer-urls: ["http://127.0.0.1:2380"] + initial-advertise-peer-urls: ["http://127.0.0.1:23800"] + initial-cluster: s1=http://127.0.0.1:13800,s2=http://127.0.0.1:23800,s3=http://127.0.0.1:33800 + initial-cluster-state: new + initial-cluster-token: tkn + snapshot-count: 10000 + quota-backend-bytes: 10740000000 # 10 GiB + pre-vote: true + initial-corrupt-check: true +- etcd-exec-path: ./bin/etcd + agent-addr: 127.0.0.1:39027 + failpoint-http-addr: http://127.0.0.1:7383 + base-dir: /tmp/etcd-agent-data-3 + etcd-log-path: /tmp/etcd-agent-data-3/current-etcd.log + etcd-client-tls: false + etcd-client-proxy: false + etcd-peer-proxy: true + etcd-client-endpoint: 127.0.0.1:3379 + etcd-config: + name: s3 + data-dir: /tmp/etcd-agent-data-3/etcd.data + wal-dir: /tmp/etcd-agent-data-3/etcd.data/member/wal + listen-client-urls: ["http://127.0.0.1:3379"] + advertise-client-urls: ["http://127.0.0.1:3379"] + listen-peer-urls: ["http://127.0.0.1:3380"] + initial-advertise-peer-urls: ["http://127.0.0.1:33800"] + initial-cluster: s1=http://127.0.0.1:13800,s2=http://127.0.0.1:23800,s3=http://127.0.0.1:33800 + initial-cluster-state: new + initial-cluster-token: tkn + snapshot-count: 10000 + quota-backend-bytes: 10740000000 # 10 GiB + pre-vote: true + initial-corrupt-check: true + +tester-config: + tester-network: tcp + tester-addr: 127.0.0.1:9028 + + delay-latency-ms: 500 + delay-latency-ms-rv: 50 + + round-limit: 1 + exit-on-failure: true + consistency-check: true + enable-pprof: true + + failure-cases: + - KILL_ONE_FOLLOWER + - KILL_LEADER + - KILL_ONE_FOLLOWER_FOR_LONG + - KILL_LEADER_FOR_LONG + - KILL_QUORUM + - KILL_ALL + - BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER + - BLACKHOLE_PEER_PORT_TX_RX_LEADER_ONE + - BLACKHOLE_PEER_PORT_TX_RX_ALL + - DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER + - DELAY_PEER_PORT_TX_RX_LEADER + - DELAY_PEER_PORT_TX_RX_ALL + + # TODO: shuffle + # fail-shuffle: true + + failpoint-commands: + - panic("etcd-tester") + # failpoint-commands: + # - panic("etcd-tester"),1*sleep(1000) + + runner-exec-path: /etcd-runner + external-exec-path: "" + + stress-types: + - KV + - LEASE + # - NO_STRESS + # - ELECTION_RUNNER + # - WATCH_RUNNER + # - LOCK_RACER_RUNNER + # - LEASE_RUNNER + + stress-key-size: 100 + stress-key-size-large: 32769 + stress-key-suffix-range: 250000 + stress-key-suffix-range-txn: 100 + stress-key-txn-ops: 10 + stress-qps: 1000 diff --git a/tools/functional-tester/tester/metrics.go b/tools/functional-tester/tester/metrics.go new file mode 100644 index 000000000..f8546ddbd --- /dev/null +++ b/tools/functional-tester/tester/metrics.go @@ -0,0 +1,62 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tester + +import "github.com/prometheus/client_golang/prometheus" + +var ( + caseTotalCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "funcational_tester", + Name: "case_total", + Help: "Total number of finished test cases", + }, + []string{"desc"}, + ) + + caseFailedTotalCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "funcational_tester", + Name: "case_failed_total", + Help: "Total number of failed test cases", + }, + []string{"desc"}, + ) + + roundTotalCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "funcational_tester", + Name: "round_total", + Help: "Total number of finished test rounds.", + }) + + roundFailedTotalCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "funcational_tester", + Name: "round_failed_total", + Help: "Total number of failed test rounds.", + }) +) + +func init() { + prometheus.MustRegister(caseTotalCounter) + prometheus.MustRegister(caseFailedTotalCounter) + prometheus.MustRegister(roundTotalCounter) + prometheus.MustRegister(roundFailedTotalCounter) +} diff --git a/tools/functional-tester/tester/stress.go b/tools/functional-tester/tester/stress.go new file mode 100644 index 000000000..155320aab --- /dev/null +++ b/tools/functional-tester/tester/stress.go @@ -0,0 +1,202 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tester + +import ( + "fmt" + "sync" + "time" + + "go.uber.org/zap" +) + +type Stresser interface { + // Stress starts to stress the etcd cluster + Stress() error + // Pause stops the stresser from sending requests to etcd. Resume by calling Stress. + Pause() + // Close releases all of the Stresser's resources. + Close() + // ModifiedKeys reports the number of keys created and deleted by stresser + ModifiedKeys() int64 + // Checker returns an invariant checker for after the stresser is canceled. + Checker() Checker +} + +// nopStresser implements Stresser that does nothing +type nopStresser struct { + start time.Time + qps int +} + +func (s *nopStresser) Stress() error { return nil } +func (s *nopStresser) Pause() {} +func (s *nopStresser) Close() {} +func (s *nopStresser) ModifiedKeys() int64 { + return 0 +} +func (s *nopStresser) Checker() Checker { return nil } + +// compositeStresser implements a Stresser that runs a slice of +// stressing clients concurrently. +type compositeStresser struct { + stressers []Stresser +} + +func (cs *compositeStresser) Stress() error { + for i, s := range cs.stressers { + if err := s.Stress(); err != nil { + for j := 0; j < i; j++ { + cs.stressers[i].Close() + } + return err + } + } + return nil +} + +func (cs *compositeStresser) Pause() { + var wg sync.WaitGroup + wg.Add(len(cs.stressers)) + for i := range cs.stressers { + go func(s Stresser) { + defer wg.Done() + s.Pause() + }(cs.stressers[i]) + } + wg.Wait() +} + +func (cs *compositeStresser) Close() { + var wg sync.WaitGroup + wg.Add(len(cs.stressers)) + for i := range cs.stressers { + go func(s Stresser) { + defer wg.Done() + s.Close() + }(cs.stressers[i]) + } + wg.Wait() +} + +func (cs *compositeStresser) ModifiedKeys() (modifiedKey int64) { + for _, stress := range cs.stressers { + modifiedKey += stress.ModifiedKeys() + } + return modifiedKey +} + +func (cs *compositeStresser) Checker() Checker { + var chks []Checker + for _, s := range cs.stressers { + if chk := s.Checker(); chk != nil { + chks = append(chks, chk) + } + } + if len(chks) == 0 { + return nil + } + return newCompositeChecker(chks) +} + +// newStresser creates stresser from a comma separated list of stresser types. +func newStresser(clus *Cluster, idx int) Stresser { + stressers := make([]Stresser, len(clus.Tester.StressTypes)) + for i, stype := range clus.Tester.StressTypes { + clus.logger.Info("creating stresser", zap.String("type", stype)) + + switch stype { + case "NO_STRESS": + stressers[i] = &nopStresser{start: time.Now(), qps: int(clus.rateLimiter.Limit())} + + case "KV": + // TODO: Too intensive stressing clients can panic etcd member with + // 'out of memory' error. Put rate limits in server side. + stressers[i] = &keyStresser{ + logger: clus.logger, + Endpoint: clus.Members[idx].EtcdClientEndpoint, + keySize: int(clus.Tester.StressKeySize), + keyLargeSize: int(clus.Tester.StressKeySizeLarge), + keySuffixRange: int(clus.Tester.StressKeySuffixRange), + keyTxnSuffixRange: int(clus.Tester.StressKeySuffixRangeTxn), + keyTxnOps: int(clus.Tester.StressKeyTxnOps), + N: 100, + rateLimiter: clus.rateLimiter, + } + + case "LEASE": + stressers[i] = &leaseStresser{ + logger: clus.logger, + endpoint: clus.Members[idx].EtcdClientEndpoint, + numLeases: 10, // TODO: configurable + keysPerLease: 10, // TODO: configurable + rateLimiter: clus.rateLimiter, + } + + case "ELECTION_RUNNER": + reqRate := 100 + args := []string{ + "election", + fmt.Sprintf("%v", time.Now().UnixNano()), // election name as current nano time + "--dial-timeout=10s", + "--endpoints", clus.Members[idx].EtcdClientEndpoint, + "--total-client-connections=10", + "--rounds=0", // runs forever + "--req-rate", fmt.Sprintf("%v", reqRate), + } + stressers[i] = newRunnerStresser( + clus.Tester.RunnerExecPath, + args, + clus.rateLimiter, + reqRate, + ) + + case "WATCH_RUNNER": + reqRate := 100 + args := []string{ + "watcher", + "--prefix", fmt.Sprintf("%v", time.Now().UnixNano()), // prefix all keys with nano time + "--total-keys=1", + "--total-prefixes=1", + "--watch-per-prefix=1", + "--endpoints", clus.Members[idx].EtcdClientEndpoint, + "--rounds=0", // runs forever + "--req-rate", fmt.Sprintf("%v", reqRate), + } + stressers[i] = newRunnerStresser(clus.Tester.RunnerExecPath, args, clus.rateLimiter, reqRate) + + case "LOCK_RACER_RUNNER": + reqRate := 100 + args := []string{ + "lock-racer", + fmt.Sprintf("%v", time.Now().UnixNano()), // locker name as current nano time + "--endpoints", clus.Members[idx].EtcdClientEndpoint, + "--total-client-connections=10", + "--rounds=0", // runs forever + "--req-rate", fmt.Sprintf("%v", reqRate), + } + stressers[i] = newRunnerStresser(clus.Tester.RunnerExecPath, args, clus.rateLimiter, reqRate) + + case "LEASE_RUNNER": + args := []string{ + "lease-renewer", + "--ttl=30", + "--endpoints", clus.Members[idx].EtcdClientEndpoint, + } + stressers[i] = newRunnerStresser(clus.Tester.RunnerExecPath, args, clus.rateLimiter, 0) + } + } + return &compositeStresser{stressers} +} diff --git a/tools/functional-tester/tester/stress_key.go b/tools/functional-tester/tester/stress_key.go new file mode 100644 index 000000000..12ce46e97 --- /dev/null +++ b/tools/functional-tester/tester/stress_key.go @@ -0,0 +1,345 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tester + +import ( + "context" + "fmt" + "math/rand" + "sync" + "sync/atomic" + "time" + + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + + "go.uber.org/zap" + "golang.org/x/time/rate" + "google.golang.org/grpc" + "google.golang.org/grpc/transport" +) + +type keyStresser struct { + logger *zap.Logger + + // TODO: use Member + Endpoint string + + keySize int + keyLargeSize int + keySuffixRange int + keyTxnSuffixRange int + keyTxnOps int + + N int + + rateLimiter *rate.Limiter + + wg sync.WaitGroup + + cancel func() + conn *grpc.ClientConn + // atomicModifiedKeys records the number of keys created and deleted by the stresser. + atomicModifiedKeys int64 + + stressTable *stressTable +} + +func (s *keyStresser) Stress() error { + // TODO: add backoff option + conn, err := grpc.Dial(s.Endpoint, grpc.WithInsecure()) + if err != nil { + return fmt.Errorf("%v (%s)", err, s.Endpoint) + } + ctx, cancel := context.WithCancel(context.Background()) + + s.wg.Add(s.N) + s.conn = conn + s.cancel = cancel + + kvc := pb.NewKVClient(conn) + + var stressEntries = []stressEntry{ + {weight: 0.7, f: newStressPut(kvc, s.keySuffixRange, s.keySize)}, + { + weight: 0.7 * float32(s.keySize) / float32(s.keyLargeSize), + f: newStressPut(kvc, s.keySuffixRange, s.keyLargeSize), + }, + {weight: 0.07, f: newStressRange(kvc, s.keySuffixRange)}, + {weight: 0.07, f: newStressRangeInterval(kvc, s.keySuffixRange)}, + {weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)}, + {weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)}, + } + if s.keyTxnSuffixRange > 0 { + // adjust to make up ±70% of workloads with writes + stressEntries[0].weight = 0.35 + stressEntries = append(stressEntries, stressEntry{ + weight: 0.35, + f: newStressTxn(kvc, s.keyTxnSuffixRange, s.keyTxnOps), + }) + } + s.stressTable = createStressTable(stressEntries) + + for i := 0; i < s.N; i++ { + go s.run(ctx) + } + + s.logger.Info( + "key stresser started in background", + zap.String("endpoint", s.Endpoint), + ) + return nil +} + +func (s *keyStresser) run(ctx context.Context) { + defer s.wg.Done() + + for { + if err := s.rateLimiter.Wait(ctx); err == context.Canceled { + return + } + + // TODO: 10-second is enough timeout to cover leader failure + // and immediate leader election. Find out what other cases this + // could be timed out. + sctx, scancel := context.WithTimeout(ctx, 10*time.Second) + err, modifiedKeys := s.stressTable.choose()(sctx) + scancel() + if err == nil { + atomic.AddInt64(&s.atomicModifiedKeys, modifiedKeys) + continue + } + + switch rpctypes.ErrorDesc(err) { + case context.DeadlineExceeded.Error(): + // This retries when request is triggered at the same time as + // leader failure. When we terminate the leader, the request to + // that leader cannot be processed, and times out. Also requests + // to followers cannot be forwarded to the old leader, so timing out + // as well. We want to keep stressing until the cluster elects a + // new leader and start processing requests again. + case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error(): + // This retries when request is triggered at the same time as + // leader failure and follower nodes receive time out errors + // from losing their leader. Followers should retry to connect + // to the new leader. + case etcdserver.ErrStopped.Error(): + // one of the etcd nodes stopped from failure injection + case transport.ErrConnClosing.Desc: + // server closed the transport (failure injected node) + case rpctypes.ErrNotCapable.Error(): + // capability check has not been done (in the beginning) + case rpctypes.ErrTooManyRequests.Error(): + // hitting the recovering member. + case context.Canceled.Error(): + // from stresser.Cancel method: + return + case grpc.ErrClientConnClosing.Error(): + // from stresser.Cancel method: + return + default: + s.logger.Warn( + "key stresser exited with error", + zap.String("endpoint", s.Endpoint), + zap.Error(err), + ) + return + } + } +} + +func (s *keyStresser) Pause() { + s.Close() +} + +func (s *keyStresser) Close() { + s.cancel() + s.conn.Close() + s.wg.Wait() + + s.logger.Info( + "key stresser is closed", + zap.String("endpoint", s.Endpoint), + ) +} + +func (s *keyStresser) ModifiedKeys() int64 { + return atomic.LoadInt64(&s.atomicModifiedKeys) +} + +func (s *keyStresser) Checker() Checker { return nil } + +type stressFunc func(ctx context.Context) (err error, modifiedKeys int64) + +type stressEntry struct { + weight float32 + f stressFunc +} + +type stressTable struct { + entries []stressEntry + sumWeights float32 +} + +func createStressTable(entries []stressEntry) *stressTable { + st := stressTable{entries: entries} + for _, entry := range st.entries { + st.sumWeights += entry.weight + } + return &st +} + +func (st *stressTable) choose() stressFunc { + v := rand.Float32() * st.sumWeights + var sum float32 + var idx int + for i := range st.entries { + sum += st.entries[i].weight + if sum >= v { + idx = i + break + } + } + return st.entries[idx].f +} + +func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc { + return func(ctx context.Context) (error, int64) { + _, err := kvc.Put(ctx, &pb.PutRequest{ + Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))), + Value: randBytes(keySize), + }, grpc.FailFast(false)) + return err, 1 + } +} + +func newStressTxn(kvc pb.KVClient, keyTxnSuffixRange, txnOps int) stressFunc { + keys := make([]string, keyTxnSuffixRange) + for i := range keys { + keys[i] = fmt.Sprintf("/k%03d", i) + } + return writeTxn(kvc, keys, txnOps) +} + +func writeTxn(kvc pb.KVClient, keys []string, txnOps int) stressFunc { + return func(ctx context.Context) (error, int64) { + ks := make(map[string]struct{}, txnOps) + for len(ks) != txnOps { + ks[keys[rand.Intn(len(keys))]] = struct{}{} + } + selected := make([]string, 0, txnOps) + for k := range ks { + selected = append(selected, k) + } + com, delOp, putOp := getTxnReqs(selected[0], "bar00") + txnReq := &pb.TxnRequest{ + Compare: []*pb.Compare{com}, + Success: []*pb.RequestOp{delOp}, + Failure: []*pb.RequestOp{putOp}, + } + + // add nested txns if any + for i := 1; i < txnOps; i++ { + k, v := selected[i], fmt.Sprintf("bar%02d", i) + com, delOp, putOp = getTxnReqs(k, v) + nested := &pb.RequestOp{ + Request: &pb.RequestOp_RequestTxn{ + RequestTxn: &pb.TxnRequest{ + Compare: []*pb.Compare{com}, + Success: []*pb.RequestOp{delOp}, + Failure: []*pb.RequestOp{putOp}, + }, + }, + } + txnReq.Success = append(txnReq.Success, nested) + txnReq.Failure = append(txnReq.Failure, nested) + } + + _, err := kvc.Txn(ctx, txnReq, grpc.FailFast(false)) + return err, int64(txnOps) + } +} + +func getTxnReqs(key, val string) (com *pb.Compare, delOp *pb.RequestOp, putOp *pb.RequestOp) { + // if key exists (version > 0) + com = &pb.Compare{ + Key: []byte(key), + Target: pb.Compare_VERSION, + Result: pb.Compare_GREATER, + TargetUnion: &pb.Compare_Version{Version: 0}, + } + delOp = &pb.RequestOp{ + Request: &pb.RequestOp_RequestDeleteRange{ + RequestDeleteRange: &pb.DeleteRangeRequest{ + Key: []byte(key), + }, + }, + } + putOp = &pb.RequestOp{ + Request: &pb.RequestOp_RequestPut{ + RequestPut: &pb.PutRequest{ + Key: []byte(key), + Value: []byte(val), + }, + }, + } + return com, delOp, putOp +} + +func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc { + return func(ctx context.Context) (error, int64) { + _, err := kvc.Range(ctx, &pb.RangeRequest{ + Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))), + }, grpc.FailFast(false)) + return err, 0 + } +} + +func newStressRangeInterval(kvc pb.KVClient, keySuffixRange int) stressFunc { + return func(ctx context.Context) (error, int64) { + start := rand.Intn(keySuffixRange) + end := start + 500 + _, err := kvc.Range(ctx, &pb.RangeRequest{ + Key: []byte(fmt.Sprintf("foo%016x", start)), + RangeEnd: []byte(fmt.Sprintf("foo%016x", end)), + }, grpc.FailFast(false)) + return err, 0 + } +} + +func newStressDelete(kvc pb.KVClient, keySuffixRange int) stressFunc { + return func(ctx context.Context) (error, int64) { + _, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{ + Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))), + }, grpc.FailFast(false)) + return err, 1 + } +} + +func newStressDeleteInterval(kvc pb.KVClient, keySuffixRange int) stressFunc { + return func(ctx context.Context) (error, int64) { + start := rand.Intn(keySuffixRange) + end := start + 500 + resp, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{ + Key: []byte(fmt.Sprintf("foo%016x", start)), + RangeEnd: []byte(fmt.Sprintf("foo%016x", end)), + }, grpc.FailFast(false)) + if err == nil { + return nil, resp.Deleted + } + return err, 0 + } +} diff --git a/tools/functional-tester/tester/stress_lease.go b/tools/functional-tester/tester/stress_lease.go new file mode 100644 index 000000000..bf65d294a --- /dev/null +++ b/tools/functional-tester/tester/stress_lease.go @@ -0,0 +1,485 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tester + +import ( + "context" + "fmt" + "math/rand" + "sync" + "sync/atomic" + "time" + + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + + "go.uber.org/zap" + "golang.org/x/time/rate" + "google.golang.org/grpc" +) + +const ( + // time to live for lease + TTL = 120 + TTLShort = 2 +) + +type leaseStresser struct { + logger *zap.Logger + + endpoint string + cancel func() + conn *grpc.ClientConn + kvc pb.KVClient + lc pb.LeaseClient + ctx context.Context + + rateLimiter *rate.Limiter + // atomicModifiedKey records the number of keys created and deleted during a test case + atomicModifiedKey int64 + numLeases int + keysPerLease int + + aliveLeases *atomicLeases + revokedLeases *atomicLeases + shortLivedLeases *atomicLeases + + runWg sync.WaitGroup + aliveWg sync.WaitGroup +} + +type atomicLeases struct { + // rwLock is used to protect read/write access of leases map + // which are accessed and modified by different go routines. + rwLock sync.RWMutex + leases map[int64]time.Time +} + +func (al *atomicLeases) add(leaseID int64, t time.Time) { + al.rwLock.Lock() + al.leases[leaseID] = t + al.rwLock.Unlock() +} + +func (al *atomicLeases) update(leaseID int64, t time.Time) { + al.rwLock.Lock() + _, ok := al.leases[leaseID] + if ok { + al.leases[leaseID] = t + } + al.rwLock.Unlock() +} + +func (al *atomicLeases) read(leaseID int64) (rv time.Time, ok bool) { + al.rwLock.RLock() + rv, ok = al.leases[leaseID] + al.rwLock.RUnlock() + return rv, ok +} + +func (al *atomicLeases) remove(leaseID int64) { + al.rwLock.Lock() + delete(al.leases, leaseID) + al.rwLock.Unlock() +} + +func (al *atomicLeases) getLeasesMap() map[int64]time.Time { + leasesCopy := make(map[int64]time.Time) + al.rwLock.RLock() + for k, v := range al.leases { + leasesCopy[k] = v + } + al.rwLock.RUnlock() + return leasesCopy +} + +func (ls *leaseStresser) setupOnce() error { + if ls.aliveLeases != nil { + return nil + } + if ls.numLeases == 0 { + panic("expect numLeases to be set") + } + if ls.keysPerLease == 0 { + panic("expect keysPerLease to be set") + } + + ls.aliveLeases = &atomicLeases{leases: make(map[int64]time.Time)} + + return nil +} + +func (ls *leaseStresser) Stress() error { + ls.logger.Info( + "lease stresser is started", + zap.String("endpoint", ls.endpoint), + ) + + if err := ls.setupOnce(); err != nil { + return err + } + + conn, err := grpc.Dial(ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1*time.Second)) + if err != nil { + return fmt.Errorf("%v (%s)", err, ls.endpoint) + } + ls.conn = conn + ls.kvc = pb.NewKVClient(conn) + ls.lc = pb.NewLeaseClient(conn) + ls.revokedLeases = &atomicLeases{leases: make(map[int64]time.Time)} + ls.shortLivedLeases = &atomicLeases{leases: make(map[int64]time.Time)} + + ctx, cancel := context.WithCancel(context.Background()) + ls.cancel = cancel + ls.ctx = ctx + + ls.runWg.Add(1) + go ls.run() + return nil +} + +func (ls *leaseStresser) run() { + defer ls.runWg.Done() + ls.restartKeepAlives() + for { + // the number of keys created and deleted is roughly 2x the number of created keys for an iteration. + // the rateLimiter therefore consumes 2x ls.numLeases*ls.keysPerLease tokens where each token represents a create/delete operation for key. + err := ls.rateLimiter.WaitN(ls.ctx, 2*ls.numLeases*ls.keysPerLease) + if err == context.Canceled { + return + } + + ls.logger.Debug( + "lease stresser is creating leases", + zap.String("endpoint", ls.endpoint), + ) + ls.createLeases() + ls.logger.Debug( + "lease stresser created leases", + zap.String("endpoint", ls.endpoint), + ) + + ls.logger.Debug( + "lease stresser is dropped leases", + zap.String("endpoint", ls.endpoint), + ) + ls.randomlyDropLeases() + ls.logger.Debug( + "lease stresser dropped leases", + zap.String("endpoint", ls.endpoint), + ) + } +} + +func (ls *leaseStresser) restartKeepAlives() { + for leaseID := range ls.aliveLeases.getLeasesMap() { + ls.aliveWg.Add(1) + go func(id int64) { + ls.keepLeaseAlive(id) + }(leaseID) + } +} + +func (ls *leaseStresser) createLeases() { + ls.createAliveLeases() + ls.createShortLivedLeases() +} + +func (ls *leaseStresser) createAliveLeases() { + neededLeases := ls.numLeases - len(ls.aliveLeases.getLeasesMap()) + var wg sync.WaitGroup + for i := 0; i < neededLeases; i++ { + wg.Add(1) + go func() { + defer wg.Done() + leaseID, err := ls.createLeaseWithKeys(TTL) + if err != nil { + ls.logger.Debug( + "createLeaseWithKeys failed", + zap.String("endpoint", ls.endpoint), + zap.Error(err), + ) + return + } + ls.aliveLeases.add(leaseID, time.Now()) + // keep track of all the keep lease alive go routines + ls.aliveWg.Add(1) + go ls.keepLeaseAlive(leaseID) + }() + } + wg.Wait() +} + +func (ls *leaseStresser) createShortLivedLeases() { + // one round of createLeases() might not create all the short lived leases we want due to falures. + // thus, we want to create remaining short lived leases in the future round. + neededLeases := ls.numLeases - len(ls.shortLivedLeases.getLeasesMap()) + var wg sync.WaitGroup + for i := 0; i < neededLeases; i++ { + wg.Add(1) + go func() { + defer wg.Done() + leaseID, err := ls.createLeaseWithKeys(TTLShort) + if err != nil { + return + } + ls.shortLivedLeases.add(leaseID, time.Now()) + }() + } + wg.Wait() +} + +func (ls *leaseStresser) createLeaseWithKeys(ttl int64) (int64, error) { + leaseID, err := ls.createLease(ttl) + if err != nil { + ls.logger.Debug( + "createLease failed", + zap.String("endpoint", ls.endpoint), + zap.Error(err), + ) + return -1, err + } + + ls.logger.Debug( + "createLease created lease", + zap.String("endpoint", ls.endpoint), + zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), + ) + if err := ls.attachKeysWithLease(leaseID); err != nil { + return -1, err + } + return leaseID, nil +} + +func (ls *leaseStresser) randomlyDropLeases() { + var wg sync.WaitGroup + for l := range ls.aliveLeases.getLeasesMap() { + wg.Add(1) + go func(leaseID int64) { + defer wg.Done() + dropped, err := ls.randomlyDropLease(leaseID) + // if randomlyDropLease encountered an error such as context is cancelled, remove the lease from aliveLeases + // because we can't tell whether the lease is dropped or not. + if err != nil { + ls.logger.Debug( + "randomlyDropLease failed", + zap.String("endpoint", ls.endpoint), + zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), + zap.Error(err), + ) + ls.aliveLeases.remove(leaseID) + return + } + if !dropped { + return + } + ls.logger.Debug( + "randomlyDropLease dropped a lease", + zap.String("endpoint", ls.endpoint), + zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), + ) + ls.revokedLeases.add(leaseID, time.Now()) + ls.aliveLeases.remove(leaseID) + }(l) + } + wg.Wait() +} + +func (ls *leaseStresser) createLease(ttl int64) (int64, error) { + resp, err := ls.lc.LeaseGrant(ls.ctx, &pb.LeaseGrantRequest{TTL: ttl}) + if err != nil { + return -1, err + } + return resp.ID, nil +} + +func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { + defer ls.aliveWg.Done() + ctx, cancel := context.WithCancel(ls.ctx) + stream, err := ls.lc.LeaseKeepAlive(ctx) + defer func() { cancel() }() + for { + select { + case <-time.After(500 * time.Millisecond): + case <-ls.ctx.Done(): + ls.logger.Debug( + "keepLeaseAlive context canceled", + zap.String("endpoint", ls.endpoint), + zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), + zap.Error(ls.ctx.Err()), + ) + // it is possible that lease expires at invariant checking phase but not at keepLeaseAlive() phase. + // this scenerio is possible when alive lease is just about to expire when keepLeaseAlive() exists and expires at invariant checking phase. + // to circumvent that scenerio, we check each lease before keepalive loop exist to see if it has been renewed in last TTL/2 duration. + // if it is renewed, this means that invariant checking have at least ttl/2 time before lease exipres which is long enough for the checking to finish. + // if it is not renewed, we remove the lease from the alive map so that the lease doesn't exipre during invariant checking + renewTime, ok := ls.aliveLeases.read(leaseID) + if ok && renewTime.Add(TTL/2*time.Second).Before(time.Now()) { + ls.aliveLeases.remove(leaseID) + ls.logger.Debug( + "keepLeaseAlive lease has not been renewed, dropped it", + zap.String("endpoint", ls.endpoint), + zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), + ) + } + return + } + + if err != nil { + ls.logger.Debug( + "keepLeaseAlive lease creates stream error", + zap.String("endpoint", ls.endpoint), + zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), + zap.Error(err), + ) + cancel() + ctx, cancel = context.WithCancel(ls.ctx) + stream, err = ls.lc.LeaseKeepAlive(ctx) + cancel() + continue + } + + ls.logger.Debug( + "keepLeaseAlive stream sends lease keepalive request", + zap.String("endpoint", ls.endpoint), + zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), + ) + err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID}) + if err != nil { + ls.logger.Debug( + "keepLeaseAlive stream failed to send lease keepalive request", + zap.String("endpoint", ls.endpoint), + zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), + zap.Error(err), + ) + continue + } + leaseRenewTime := time.Now() + ls.logger.Debug( + "keepLeaseAlive stream sent lease keepalive request", + zap.String("endpoint", ls.endpoint), + zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), + ) + respRC, err := stream.Recv() + if err != nil { + ls.logger.Debug( + "keepLeaseAlive stream failed to receive lease keepalive response", + zap.String("endpoint", ls.endpoint), + zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), + zap.Error(err), + ) + continue + } + // lease expires after TTL become 0 + // don't send keepalive if the lease has expired + if respRC.TTL <= 0 { + ls.logger.Debug( + "keepLeaseAlive stream received lease keepalive response TTL <= 0", + zap.String("endpoint", ls.endpoint), + zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), + zap.Int64("ttl", respRC.TTL), + ) + ls.aliveLeases.remove(leaseID) + return + } + // renew lease timestamp only if lease is present + ls.logger.Debug( + "keepLeaseAlive renewed a lease", + zap.String("endpoint", ls.endpoint), + zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), + ) + ls.aliveLeases.update(leaseID, leaseRenewTime) + } +} + +// attachKeysWithLease function attaches keys to the lease. +// the format of key is the concat of leaseID + '_' + '' +// e.g 5186835655248304152_0 for first created key and 5186835655248304152_1 for second created key +func (ls *leaseStresser) attachKeysWithLease(leaseID int64) error { + var txnPuts []*pb.RequestOp + for j := 0; j < ls.keysPerLease; j++ { + txnput := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte(fmt.Sprintf("%d%s%d", leaseID, "_", j)), + Value: []byte(fmt.Sprintf("bar")), Lease: leaseID}}} + txnPuts = append(txnPuts, txnput) + } + // keep retrying until lease is not found or ctx is being canceled + for ls.ctx.Err() == nil { + txn := &pb.TxnRequest{Success: txnPuts} + _, err := ls.kvc.Txn(ls.ctx, txn) + if err == nil { + // since all created keys will be deleted too, the number of operations on keys will be roughly 2x the number of created keys + atomic.AddInt64(&ls.atomicModifiedKey, 2*int64(ls.keysPerLease)) + return nil + } + if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound { + return err + } + } + return ls.ctx.Err() +} + +// randomlyDropLease drops the lease only when the rand.Int(2) returns 1. +// This creates a 50/50 percents chance of dropping a lease +func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) { + if rand.Intn(2) != 0 { + return false, nil + } + // keep retrying until a lease is dropped or ctx is being canceled + for ls.ctx.Err() == nil { + _, err := ls.lc.LeaseRevoke(ls.ctx, &pb.LeaseRevokeRequest{ID: leaseID}) + if err == nil || rpctypes.Error(err) == rpctypes.ErrLeaseNotFound { + return true, nil + } + } + + ls.logger.Debug( + "randomlyDropLease error", + zap.String("endpoint", ls.endpoint), + zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), + zap.Error(ls.ctx.Err()), + ) + return false, ls.ctx.Err() +} + +func (ls *leaseStresser) Pause() { + ls.Close() +} + +func (ls *leaseStresser) Close() { + ls.logger.Info( + "lease stresser is closing", + zap.String("endpoint", ls.endpoint), + ) + ls.cancel() + ls.runWg.Wait() + ls.aliveWg.Wait() + ls.conn.Close() + ls.logger.Info( + "lease stresser is closed", + zap.String("endpoint", ls.endpoint), + ) +} + +func (ls *leaseStresser) ModifiedKeys() int64 { + return atomic.LoadInt64(&ls.atomicModifiedKey) +} + +func (ls *leaseStresser) Checker() Checker { + return &leaseChecker{ + logger: ls.logger, + endpoint: ls.endpoint, + ls: ls, + } +} diff --git a/tools/functional-tester/tester/stress_runner.go b/tools/functional-tester/tester/stress_runner.go new file mode 100644 index 000000000..7ed6139b3 --- /dev/null +++ b/tools/functional-tester/tester/stress_runner.go @@ -0,0 +1,97 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tester + +import ( + "fmt" + "io/ioutil" + "os/exec" + "syscall" + + "golang.org/x/time/rate" +) + +type runnerStresser struct { + cmd *exec.Cmd + cmdStr string + args []string + rl *rate.Limiter + reqRate int + + errc chan error + donec chan struct{} +} + +func newRunnerStresser(cmdStr string, args []string, rl *rate.Limiter, reqRate int) *runnerStresser { + rl.SetLimit(rl.Limit() - rate.Limit(reqRate)) + return &runnerStresser{ + cmdStr: cmdStr, + args: args, + rl: rl, + reqRate: reqRate, + errc: make(chan error, 1), + donec: make(chan struct{}), + } +} + +func (rs *runnerStresser) setupOnce() (err error) { + if rs.cmd != nil { + return nil + } + + rs.cmd = exec.Command(rs.cmdStr, rs.args...) + stderr, err := rs.cmd.StderrPipe() + if err != nil { + return err + } + + go func() { + defer close(rs.donec) + out, err := ioutil.ReadAll(stderr) + if err != nil { + rs.errc <- err + } else { + rs.errc <- fmt.Errorf("(%v %v) stderr %v", rs.cmdStr, rs.args, string(out)) + } + }() + + return rs.cmd.Start() +} + +func (rs *runnerStresser) Stress() (err error) { + if err = rs.setupOnce(); err != nil { + return err + } + return syscall.Kill(rs.cmd.Process.Pid, syscall.SIGCONT) +} + +func (rs *runnerStresser) Pause() { + syscall.Kill(rs.cmd.Process.Pid, syscall.SIGSTOP) +} + +func (rs *runnerStresser) Close() { + syscall.Kill(rs.cmd.Process.Pid, syscall.SIGINT) + rs.cmd.Wait() + <-rs.donec + rs.rl.SetLimit(rs.rl.Limit() + rate.Limit(rs.reqRate)) +} + +func (rs *runnerStresser) ModifiedKeys() int64 { + return 1 +} + +func (rs *runnerStresser) Checker() Checker { + return &runnerChecker{rs.errc} +} diff --git a/tools/functional-tester/tester/tester.go b/tools/functional-tester/tester/tester.go new file mode 100644 index 000000000..86bb601dc --- /dev/null +++ b/tools/functional-tester/tester/tester.go @@ -0,0 +1,274 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tester + +import ( + "fmt" + "os" + "time" + + "go.uber.org/zap" +) + +// compactQPS is rough number of compact requests per second. +// Previous tests showed etcd can compact about 60,000 entries per second. +const compactQPS = 50000 + +// StartTester starts tester. +func (clus *Cluster) StartTester() { + // TODO: upate status + clus.startStresser() + + var preModifiedKey int64 + for round := 0; round < int(clus.Tester.RoundLimit) || clus.Tester.RoundLimit == -1; round++ { + roundTotalCounter.Inc() + + clus.rd = round + if err := clus.doRound(round); err != nil { + clus.logger.Warn( + "doRound failed; returning", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + zap.Error(err), + ) + if clus.cleanup() != nil { + return + } + // reset preModifiedKey after clean up + preModifiedKey = 0 + continue + } + // -1 so that logPrefix doesn't print out 'case' + clus.cs = -1 + + revToCompact := max(0, clus.currentRevision-10000) + currentModifiedKey := clus.stresser.ModifiedKeys() + modifiedKey := currentModifiedKey - preModifiedKey + preModifiedKey = currentModifiedKey + timeout := 10 * time.Second + timeout += time.Duration(modifiedKey/compactQPS) * time.Second + clus.logger.Info( + "compacting", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + zap.Duration("timeout", timeout), + ) + if err := clus.compact(revToCompact, timeout); err != nil { + clus.logger.Warn( + "compact failed", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + zap.Error(err), + ) + if err = clus.cleanup(); err != nil { + clus.logger.Warn( + "cleanup failed", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + zap.Error(err), + ) + return + } + // reset preModifiedKey after clean up + preModifiedKey = 0 + } + if round > 0 && round%500 == 0 { // every 500 rounds + if err := clus.defrag(); err != nil { + clus.logger.Warn( + "defrag failed; returning", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + zap.Error(err), + ) + clus.failed() + return + } + } + } + + clus.logger.Info( + "functional-tester is finished", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + ) +} + +func (clus *Cluster) doRound(round int) error { + for i, f := range clus.failures { + clus.cs = i + + caseTotalCounter.WithLabelValues(f.Desc()).Inc() + + if err := clus.WaitHealth(); err != nil { + return fmt.Errorf("wait full health error: %v", err) + } + + clus.logger.Info( + "injecting failure", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + zap.String("desc", f.Desc()), + ) + if err := f.Inject(clus, round); err != nil { + return fmt.Errorf("injection error: %v", err) + } + clus.logger.Info( + "injected failure", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + zap.String("desc", f.Desc()), + ) + + clus.logger.Info( + "recovering failure", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + zap.String("desc", f.Desc()), + ) + if err := f.Recover(clus, round); err != nil { + return fmt.Errorf("recovery error: %v", err) + } + clus.logger.Info( + "recovered failure", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + zap.String("desc", f.Desc()), + ) + + clus.pauseStresser() + + if err := clus.WaitHealth(); err != nil { + return fmt.Errorf("wait full health error: %v", err) + } + if err := clus.checkConsistency(); err != nil { + return fmt.Errorf("tt.checkConsistency error (%v)", err) + } + + clus.logger.Info( + "success", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + zap.String("desc", f.Desc()), + ) + } + return nil +} + +func (clus *Cluster) updateRevision() error { + revs, _, err := clus.getRevisionHash() + for _, rev := range revs { + clus.currentRevision = rev + break // just need get one of the current revisions + } + + clus.logger.Info( + "updated current revision", + zap.Int64("current-revision", clus.currentRevision), + ) + return err +} + +func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) { + clus.pauseStresser() + defer func() { + if err == nil { + err = clus.startStresser() + } + }() + + clus.logger.Info( + "compacting storage", + zap.Int64("current-revision", clus.currentRevision), + zap.Int64("compact-revision", rev), + ) + if err = clus.compactKV(rev, timeout); err != nil { + return err + } + clus.logger.Info( + "compacted storage", + zap.Int64("current-revision", clus.currentRevision), + zap.Int64("compact-revision", rev), + ) + + clus.logger.Info( + "checking compaction", + zap.Int64("current-revision", clus.currentRevision), + zap.Int64("compact-revision", rev), + ) + if err = clus.checkCompact(rev); err != nil { + clus.logger.Warn( + "checkCompact failed", + zap.Int64("current-revision", clus.currentRevision), + zap.Int64("compact-revision", rev), + zap.Error(err), + ) + return err + } + clus.logger.Info( + "confirmed compaction", + zap.Int64("current-revision", clus.currentRevision), + zap.Int64("compact-revision", rev), + ) + + return nil +} + +func (clus *Cluster) failed() { + if !clus.Tester.ExitOnFailure { + return + } + + clus.logger.Info( + "exiting on failure", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + ) + clus.DestroyEtcdAgents() + os.Exit(2) +} + +func (clus *Cluster) cleanup() error { + defer clus.failed() + + roundFailedTotalCounter.Inc() + desc := "compact/defrag" + if clus.cs != -1 { + desc = clus.failures[clus.cs].Desc() + } + caseFailedTotalCounter.WithLabelValues(desc).Inc() + + clus.closeStresser() + if err := clus.FailArchive(); err != nil { + clus.logger.Warn( + "Cleanup failed", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + zap.Error(err), + ) + return err + } + if err := clus.Restart(); err != nil { + clus.logger.Warn( + "Restart failed", + zap.Int("round", clus.rd), + zap.Int("case", clus.cs), + zap.Error(err), + ) + return err + } + + clus.updateStresserChecker() + return nil +} diff --git a/tools/functional-tester/tester/utils.go b/tools/functional-tester/tester/utils.go new file mode 100644 index 000000000..74e34146d --- /dev/null +++ b/tools/functional-tester/tester/utils.go @@ -0,0 +1,79 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tester + +import ( + "fmt" + "math/rand" + "net" + "net/url" + "strings" +) + +func isValidURL(u string) bool { + _, err := url.Parse(u) + return err == nil +} + +func getPort(addr string) (port string, err error) { + urlAddr, err := url.Parse(addr) + if err != nil { + return "", err + } + _, port, err = net.SplitHostPort(urlAddr.Host) + if err != nil { + return "", err + } + return port, nil +} + +func getSameValue(vals map[string]int64) bool { + var rv int64 + for _, v := range vals { + if rv == 0 { + rv = v + } + if rv != v { + return false + } + } + return true +} + +func max(n1, n2 int64) int64 { + if n1 > n2 { + return n1 + } + return n2 +} + +func errsToError(errs []error) error { + if len(errs) == 0 { + return nil + } + stringArr := make([]string, len(errs)) + for i, err := range errs { + stringArr[i] = err.Error() + } + return fmt.Errorf(strings.Join(stringArr, ", ")) +} + +func randBytes(size int) []byte { + data := make([]byte, size) + for i := 0; i < size; i++ { + data[i] = byte(int('a') + rand.Intn(26)) + } + return data +}