diff --git a/tools/functional-tester/tester/checks.go b/tools/functional-tester/tester/checks.go index 8817bcca4..f18e391ad 100644 --- a/tools/functional-tester/tester/checks.go +++ b/tools/functional-tester/tester/checks.go @@ -39,14 +39,14 @@ type hashAndRevGetter interface { } type hashChecker struct { - logger *zap.Logger - hrg hashAndRevGetter + lg *zap.Logger + hrg hashAndRevGetter } -func newHashChecker(logger *zap.Logger, hrg hashAndRevGetter) Checker { +func newHashChecker(lg *zap.Logger, hrg hashAndRevGetter) Checker { return &hashChecker{ - logger: logger, - hrg: hrg, + lg: lg, + hrg: hrg, } } @@ -62,7 +62,7 @@ func (hc *hashChecker) checkRevAndHashes() (err error) { for i := 0; i < retries; i++ { revs, hashes, err = hc.hrg.getRevisionHash() if err != nil { - hc.logger.Warn( + hc.lg.Warn( "failed to get revision and hash", zap.Int("retries", i), zap.Error(err), @@ -73,7 +73,7 @@ func (hc *hashChecker) checkRevAndHashes() (err error) { if sameRev && sameHashes { return nil } - hc.logger.Warn( + hc.lg.Warn( "retrying; etcd cluster is not stable", zap.Int("retries", i), zap.Bool("same-revisions", sameRev), @@ -97,7 +97,7 @@ func (hc *hashChecker) Check() error { } type leaseChecker struct { - logger *zap.Logger + lg *zap.Logger endpoint string // TODO: use Member @@ -157,7 +157,7 @@ func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64) return nil } if err != nil { - lc.logger.Debug( + lc.lg.Debug( "retrying; Lease TimeToLive failed", zap.Int("retries", i), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), @@ -167,7 +167,7 @@ func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64) } if resp.TTL > 0 { dur := time.Duration(resp.TTL) * time.Second - lc.logger.Debug( + lc.lg.Debug( "lease has not been expired, wait until expire", zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.Int64("ttl", resp.TTL), @@ -175,7 +175,7 @@ func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64) ) time.Sleep(dur) } else { - lc.logger.Debug( + lc.lg.Debug( "lease expired but not yet revoked", zap.Int("retries", i), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), @@ -195,7 +195,7 @@ func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64) func (lc *leaseChecker) checkLease(ctx context.Context, expired bool, leaseID int64) error { keysExpired, err := lc.hasKeysAttachedToLeaseExpired(ctx, leaseID) if err != nil { - lc.logger.Warn( + lc.lg.Warn( "hasKeysAttachedToLeaseExpired failed", zap.String("endpoint", lc.endpoint), zap.Error(err), @@ -204,7 +204,7 @@ func (lc *leaseChecker) checkLease(ctx context.Context, expired bool, leaseID in } leaseExpired, err := lc.hasLeaseExpired(ctx, leaseID) if err != nil { - lc.logger.Warn( + lc.lg.Warn( "hasLeaseExpired failed", zap.String("endpoint", lc.endpoint), zap.Error(err), @@ -248,7 +248,7 @@ func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (boo } else { return resp.TTL == -1, nil } - lc.logger.Warn( + lc.lg.Warn( "hasLeaseExpired getLeaseByID failed", zap.String("endpoint", lc.endpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), @@ -267,7 +267,7 @@ func (lc *leaseChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, lease RangeEnd: []byte(clientv3.GetPrefixRangeEnd(fmt.Sprintf("%d", leaseID))), }, grpc.FailFast(false)) if err != nil { - lc.logger.Warn( + lc.lg.Warn( "hasKeysAttachedToLeaseExpired failed", zap.String("endpoint", lc.endpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), diff --git a/tools/functional-tester/tester/cluster.go b/tools/functional-tester/tester/cluster.go index f94287515..60b14097b 100644 --- a/tools/functional-tester/tester/cluster.go +++ b/tools/functional-tester/tester/cluster.go @@ -38,7 +38,7 @@ import ( // Cluster defines tester cluster. type Cluster struct { - logger *zap.Logger + lg *zap.Logger agentConns []*grpc.ClientConn agentClients []rpcpb.TransportClient @@ -61,15 +61,15 @@ type Cluster struct { cs int } -func newCluster(logger *zap.Logger, fpath string) (*Cluster, error) { - logger.Info("reading configuration file", zap.String("path", fpath)) +func newCluster(lg *zap.Logger, fpath string) (*Cluster, error) { + lg.Info("reading configuration file", zap.String("path", fpath)) bts, err := ioutil.ReadFile(fpath) if err != nil { return nil, err } - logger.Info("opened configuration file", zap.String("path", fpath)) + lg.Info("opened configuration file", zap.String("path", fpath)) - clus := &Cluster{logger: logger} + clus := &Cluster{lg: lg} if err = yaml.Unmarshal(bts, clus); err != nil { return nil, err } @@ -192,8 +192,8 @@ var dialOpts = []grpc.DialOption{ } // NewCluster creates a client from a tester configuration. -func NewCluster(logger *zap.Logger, fpath string) (*Cluster, error) { - clus, err := newCluster(logger, fpath) +func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) { + clus, err := newCluster(lg, fpath) if err != nil { return nil, err } @@ -205,21 +205,21 @@ func NewCluster(logger *zap.Logger, fpath string) (*Cluster, error) { clus.failures = make([]Failure, 0) for i, ap := range clus.Members { - logger.Info("connecting", zap.String("agent-address", ap.AgentAddr)) + clus.lg.Info("connecting", zap.String("agent-address", ap.AgentAddr)) var err error clus.agentConns[i], err = grpc.Dial(ap.AgentAddr, dialOpts...) if err != nil { return nil, err } clus.agentClients[i] = rpcpb.NewTransportClient(clus.agentConns[i]) - logger.Info("connected", zap.String("agent-address", ap.AgentAddr)) + clus.lg.Info("connected", zap.String("agent-address", ap.AgentAddr)) - logger.Info("creating stream", zap.String("agent-address", ap.AgentAddr)) + clus.lg.Info("creating stream", zap.String("agent-address", ap.AgentAddr)) clus.agentStreams[i], err = clus.agentClients[i].Transport(context.Background()) if err != nil { return nil, err } - logger.Info("created stream", zap.String("agent-address", ap.AgentAddr)) + clus.lg.Info("created stream", zap.String("agent-address", ap.AgentAddr)) } mux := http.NewServeMux() @@ -246,18 +246,18 @@ func NewCluster(logger *zap.Logger, fpath string) (*Cluster, error) { } func (clus *Cluster) serveTesterServer() { - clus.logger.Info( + clus.lg.Info( "started tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr), ) err := clus.testerHTTPServer.ListenAndServe() - clus.logger.Info( + clus.lg.Info( "tester HTTP server returned", zap.String("tester-address", clus.Tester.TesterAddr), zap.Error(err), ) if err != nil && err != http.ErrServerClosed { - clus.logger.Fatal("tester HTTP errored", zap.Error(err)) + clus.lg.Fatal("tester HTTP errored", zap.Error(err)) } } @@ -291,7 +291,7 @@ func (clus *Cluster) updateFailures() { case "FAILPOINTS": fpFailures, fperr := failpointFailures(clus) if len(fpFailures) == 0 { - clus.logger.Info("no failpoints found!", zap.Error(fperr)) + clus.lg.Info("no failpoints found!", zap.Error(fperr)) } clus.failures = append(clus.failures, fpFailures...) case "NO_FAIL": @@ -316,13 +316,13 @@ func (clus *Cluster) shuffleFailures() { n := len(clus.failures) cp := coprime(n) - clus.logger.Info("shuffling test failure cases", zap.Int("total", n)) + clus.lg.Info("shuffling test failure cases", zap.Int("total", n)) fs := make([]Failure, n) for i := 0; i < n; i++ { fs[i] = clus.failures[(cp*i+offset)%n] } clus.failures = fs - clus.logger.Info("shuffled test failure cases", zap.Int("total", n)) + clus.lg.Info("shuffled test failure cases", zap.Int("total", n)) } /* @@ -354,7 +354,7 @@ func gcd(x, y int) int { } func (clus *Cluster) updateStresserChecker() { - clus.logger.Info( + clus.lg.Info( "updating stressers", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -367,7 +367,7 @@ func (clus *Cluster) updateStresserChecker() { clus.stresser = cs if clus.Tester.ConsistencyCheck { - clus.checker = newHashChecker(clus.logger, hashAndRevGetter(clus)) + clus.checker = newHashChecker(clus.lg, hashAndRevGetter(clus)) if schk := cs.Checker(); schk != nil { clus.checker = newCompositeChecker([]Checker{clus.checker, schk}) } @@ -375,7 +375,7 @@ func (clus *Cluster) updateStresserChecker() { clus.checker = newNoChecker() } - clus.logger.Info( + clus.lg.Info( "updated stressers", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -383,13 +383,13 @@ func (clus *Cluster) updateStresserChecker() { } func (clus *Cluster) startStresser() (err error) { - clus.logger.Info( + clus.lg.Info( "starting stressers", zap.Int("round", clus.rd), zap.Int("case", clus.cs), ) err = clus.stresser.Stress() - clus.logger.Info( + clus.lg.Info( "started stressers", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -398,13 +398,13 @@ func (clus *Cluster) startStresser() (err error) { } func (clus *Cluster) closeStresser() { - clus.logger.Info( + clus.lg.Info( "closing stressers", zap.Int("round", clus.rd), zap.Int("case", clus.cs), ) clus.stresser.Close() - clus.logger.Info( + clus.lg.Info( "closed stressers", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -412,13 +412,13 @@ func (clus *Cluster) closeStresser() { } func (clus *Cluster) pauseStresser() { - clus.logger.Info( + clus.lg.Info( "pausing stressers", zap.Int("round", clus.rd), zap.Int("case", clus.cs), ) clus.stresser.Pause() - clus.logger.Info( + clus.lg.Info( "paused stressers", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -431,7 +431,7 @@ func (clus *Cluster) checkConsistency() (err error) { return } if err = clus.updateRevision(); err != nil { - clus.logger.Warn( + clus.lg.Warn( "updateRevision failed", zap.Error(err), ) @@ -440,20 +440,20 @@ func (clus *Cluster) checkConsistency() (err error) { err = clus.startStresser() }() - clus.logger.Info( + clus.lg.Info( "checking consistency and invariant of cluster", zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.String("desc", clus.failures[clus.cs].Desc()), ) if err = clus.checker.Check(); err != nil { - clus.logger.Warn( + clus.lg.Warn( "checker.Check failed", zap.Error(err), ) return err } - clus.logger.Info( + clus.lg.Info( "checked consistency and invariant of cluster", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -488,7 +488,7 @@ func (clus *Cluster) broadcastOperation(op rpcpb.Operation) error { strings.Contains(err.Error(), "rpc error: code = Unavailable desc = transport is closing") { // agent server has already closed; // so this error is expected - clus.logger.Info( + clus.lg.Info( "successfully destroyed", zap.String("member", clus.Members[i].EtcdClientEndpoint), ) @@ -511,13 +511,13 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error { clus.agentRequests[idx].Operation = op } - clus.logger.Info( + clus.lg.Info( "sending request", zap.String("operation", op.String()), zap.String("to", clus.Members[idx].EtcdClientEndpoint), ) err := clus.agentStreams[idx].Send(clus.agentRequests[idx]) - clus.logger.Info( + clus.lg.Info( "sent request", zap.String("operation", op.String()), zap.String("to", clus.Members[idx].EtcdClientEndpoint), @@ -527,14 +527,14 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error { return err } - clus.logger.Info( + clus.lg.Info( "receiving response", zap.String("operation", op.String()), zap.String("from", clus.Members[idx].EtcdClientEndpoint), ) resp, err := clus.agentStreams[idx].Recv() if resp != nil { - clus.logger.Info( + clus.lg.Info( "received response", zap.String("operation", op.String()), zap.String("from", clus.Members[idx].EtcdClientEndpoint), @@ -543,7 +543,7 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error { zap.Error(err), ) } else { - clus.logger.Info( + clus.lg.Info( "received empty response", zap.String("operation", op.String()), zap.String("from", clus.Members[idx].EtcdClientEndpoint), @@ -562,26 +562,26 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error { // DestroyEtcdAgents terminates all tester connections to agents and etcd servers. func (clus *Cluster) DestroyEtcdAgents() { - clus.logger.Info("destroying etcd servers and agents") + clus.lg.Info("destroying etcd servers and agents") err := clus.broadcastOperation(rpcpb.Operation_DestroyEtcdAgent) if err != nil { - clus.logger.Warn("failed to destroy etcd servers and agents", zap.Error(err)) + clus.lg.Warn("failed to destroy etcd servers and agents", zap.Error(err)) } else { - clus.logger.Info("destroyed etcd servers and agents") + clus.lg.Info("destroyed etcd servers and agents") } for i, conn := range clus.agentConns { - clus.logger.Info("closing connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr)) + clus.lg.Info("closing connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr)) err := conn.Close() - clus.logger.Info("closed connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr), zap.Error(err)) + clus.lg.Info("closed connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr), zap.Error(err)) } if clus.testerHTTPServer != nil { - clus.logger.Info("closing tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr)) + clus.lg.Info("closing tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr)) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) err := clus.testerHTTPServer.Shutdown(ctx) cancel() - clus.logger.Info("closed tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr), zap.Error(err)) + clus.lg.Info("closed tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr), zap.Error(err)) } } @@ -595,13 +595,13 @@ func (clus *Cluster) WaitHealth() error { // reasonable workload (https://github.com/coreos/etcd/issues/2698) for i := 0; i < 60; i++ { for _, m := range clus.Members { - clus.logger.Info( + clus.lg.Info( "writing health key", zap.Int("retries", i), zap.String("endpoint", m.EtcdClientEndpoint), ) if err = m.WriteHealthKey(); err != nil { - clus.logger.Warn( + clus.lg.Warn( "writing health key failed", zap.Int("retries", i), zap.String("endpoint", m.EtcdClientEndpoint), @@ -609,14 +609,14 @@ func (clus *Cluster) WaitHealth() error { ) break } - clus.logger.Info( + clus.lg.Info( "wrote health key", zap.Int("retries", i), zap.String("endpoint", m.EtcdClientEndpoint), ) } if err == nil { - clus.logger.Info( + clus.lg.Info( "writing health key success on all members", zap.Int("retries", i), ) @@ -683,7 +683,7 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) { for i, m := range clus.Members { conn, derr := m.DialEtcdGRPCServer() if derr != nil { - clus.logger.Warn( + clus.lg.Warn( "compactKV dial failed", zap.String("endpoint", m.EtcdClientEndpoint), zap.Error(derr), @@ -693,7 +693,7 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) { } kvc := pb.NewKVClient(conn) - clus.logger.Info( + clus.lg.Info( "compacting", zap.String("endpoint", m.EtcdClientEndpoint), zap.Int64("compact-revision", rev), @@ -709,14 +709,14 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) { succeed := true if cerr != nil { if strings.Contains(cerr.Error(), "required revision has been compacted") && i > 0 { - clus.logger.Info( + clus.lg.Info( "compact error is ignored", zap.String("endpoint", m.EtcdClientEndpoint), zap.Int64("compact-revision", rev), zap.Error(cerr), ) } else { - clus.logger.Warn( + clus.lg.Warn( "compact failed", zap.String("endpoint", m.EtcdClientEndpoint), zap.Int64("compact-revision", rev), @@ -728,7 +728,7 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) { } if succeed { - clus.logger.Info( + clus.lg.Info( "compacted", zap.String("endpoint", m.EtcdClientEndpoint), zap.Int64("compact-revision", rev), @@ -753,14 +753,14 @@ func (clus *Cluster) checkCompact(rev int64) error { } func (clus *Cluster) defrag() error { - clus.logger.Info( + clus.lg.Info( "defragmenting", zap.Int("round", clus.rd), zap.Int("case", clus.cs), ) for _, m := range clus.Members { if err := m.Defrag(); err != nil { - clus.logger.Warn( + clus.lg.Warn( "defrag failed", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -769,7 +769,7 @@ func (clus *Cluster) defrag() error { return err } } - clus.logger.Info( + clus.lg.Info( "defragmented", zap.Int("round", clus.rd), zap.Int("case", clus.cs), diff --git a/tools/functional-tester/tester/cluster_tester.go b/tools/functional-tester/tester/cluster_tester.go index 8e2b4662c..9d6928ce8 100644 --- a/tools/functional-tester/tester/cluster_tester.go +++ b/tools/functional-tester/tester/cluster_tester.go @@ -37,7 +37,7 @@ func (clus *Cluster) StartTester() { clus.rd = round if err := clus.doRound(); err != nil { - clus.logger.Warn( + clus.lg.Warn( "doRound failed; returning", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -60,21 +60,21 @@ func (clus *Cluster) StartTester() { preModifiedKey = currentModifiedKey timeout := 10 * time.Second timeout += time.Duration(modifiedKey/compactQPS) * time.Second - clus.logger.Info( + clus.lg.Info( "compacting", zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.Duration("timeout", timeout), ) if err := clus.compact(revToCompact, timeout); err != nil { - clus.logger.Warn( + clus.lg.Warn( "compact failed", zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.Error(err), ) if err = clus.cleanup(); err != nil { - clus.logger.Warn( + clus.lg.Warn( "cleanup failed", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -87,7 +87,7 @@ func (clus *Cluster) StartTester() { } if round > 0 && round%500 == 0 { // every 500 rounds if err := clus.defrag(); err != nil { - clus.logger.Warn( + clus.lg.Warn( "defrag failed; returning", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -99,7 +99,7 @@ func (clus *Cluster) StartTester() { } } - clus.logger.Info( + clus.lg.Info( "functional-tester passed", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -111,7 +111,7 @@ func (clus *Cluster) doRound() error { clus.shuffleFailures() } - clus.logger.Info( + clus.lg.Info( "starting round", zap.Int("round", clus.rd), zap.Strings("failures", clus.failureStrings()), @@ -121,12 +121,12 @@ func (clus *Cluster) doRound() error { caseTotalCounter.WithLabelValues(f.Desc()).Inc() - clus.logger.Info("wait health before injecting failures") + clus.lg.Info("wait health before injecting failures") if err := clus.WaitHealth(); err != nil { return fmt.Errorf("wait full health error: %v", err) } - clus.logger.Info( + clus.lg.Info( "injecting failure", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -135,7 +135,7 @@ func (clus *Cluster) doRound() error { if err := f.Inject(clus); err != nil { return fmt.Errorf("injection error: %v", err) } - clus.logger.Info( + clus.lg.Info( "injected failure", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -145,7 +145,7 @@ func (clus *Cluster) doRound() error { // if run local, recovering server may conflict // with stressing client ports // TODO: use unix for local tests - clus.logger.Info( + clus.lg.Info( "recovering failure", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -154,26 +154,26 @@ func (clus *Cluster) doRound() error { if err := f.Recover(clus); err != nil { return fmt.Errorf("recovery error: %v", err) } - clus.logger.Info( + clus.lg.Info( "recovered failure", zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.String("desc", f.Desc()), ) - clus.logger.Info("pausing stresser after failure recovery, before wait health") + clus.lg.Info("pausing stresser after failure recovery, before wait health") clus.pauseStresser() - clus.logger.Info("wait health after recovering failures") + clus.lg.Info("wait health after recovering failures") if err := clus.WaitHealth(); err != nil { return fmt.Errorf("wait full health error: %v", err) } - clus.logger.Info("check consistency after recovering failures") + clus.lg.Info("check consistency after recovering failures") if err := clus.checkConsistency(); err != nil { return fmt.Errorf("tt.checkConsistency error (%v)", err) } - clus.logger.Info( + clus.lg.Info( "failure case passed", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -181,7 +181,7 @@ func (clus *Cluster) doRound() error { ) } - clus.logger.Info( + clus.lg.Info( "finished round", zap.Int("round", clus.rd), zap.Strings("failures", clus.failureStrings()), @@ -196,7 +196,7 @@ func (clus *Cluster) updateRevision() error { break // just need get one of the current revisions } - clus.logger.Info( + clus.lg.Info( "updated current revision", zap.Int64("current-revision", clus.currentRevision), ) @@ -204,7 +204,7 @@ func (clus *Cluster) updateRevision() error { } func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) { - clus.logger.Info("pausing stresser before compact") + clus.lg.Info("pausing stresser before compact") clus.pauseStresser() defer func() { if err == nil { @@ -212,7 +212,7 @@ func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) { } }() - clus.logger.Info( + clus.lg.Info( "compacting storage", zap.Int64("current-revision", clus.currentRevision), zap.Int64("compact-revision", rev), @@ -220,19 +220,19 @@ func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) { if err = clus.compactKV(rev, timeout); err != nil { return err } - clus.logger.Info( + clus.lg.Info( "compacted storage", zap.Int64("current-revision", clus.currentRevision), zap.Int64("compact-revision", rev), ) - clus.logger.Info( + clus.lg.Info( "checking compaction", zap.Int64("current-revision", clus.currentRevision), zap.Int64("compact-revision", rev), ) if err = clus.checkCompact(rev); err != nil { - clus.logger.Warn( + clus.lg.Warn( "checkCompact failed", zap.Int64("current-revision", clus.currentRevision), zap.Int64("compact-revision", rev), @@ -240,7 +240,7 @@ func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) { ) return err } - clus.logger.Info( + clus.lg.Info( "confirmed compaction", zap.Int64("current-revision", clus.currentRevision), zap.Int64("compact-revision", rev), @@ -254,7 +254,7 @@ func (clus *Cluster) failed() { return } - clus.logger.Info( + clus.lg.Info( "exiting on failure", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -275,7 +275,7 @@ func (clus *Cluster) cleanup() error { clus.closeStresser() if err := clus.FailArchive(); err != nil { - clus.logger.Warn( + clus.lg.Warn( "cleanup failed", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -284,7 +284,7 @@ func (clus *Cluster) cleanup() error { return err } if err := clus.Restart(); err != nil { - clus.logger.Warn( + clus.lg.Warn( "restart failed", zap.Int("round", clus.rd), zap.Int("case", clus.cs), diff --git a/tools/functional-tester/tester/failure_case_delay.go b/tools/functional-tester/tester/failure_case_delay.go index ba6d5a291..882a61888 100644 --- a/tools/functional-tester/tester/failure_case_delay.go +++ b/tools/functional-tester/tester/failure_case_delay.go @@ -30,7 +30,7 @@ func (f *failureDelay) Inject(clus *Cluster) error { return err } if f.delayDuration > 0 { - clus.logger.Info( + clus.lg.Info( "sleeping in failureDelay", zap.Duration("delay", f.delayDuration), zap.String("case", f.Failure.Desc()), diff --git a/tools/functional-tester/tester/stress.go b/tools/functional-tester/tester/stress.go index d23a88e9b..646fed1f8 100644 --- a/tools/functional-tester/tester/stress.go +++ b/tools/functional-tester/tester/stress.go @@ -21,6 +21,7 @@ import ( "go.uber.org/zap" ) +// Stresser defines stressing client operations. type Stresser interface { // Stress starts to stress the etcd cluster Stress() error @@ -38,7 +39,7 @@ type Stresser interface { func newStresser(clus *Cluster, idx int) Stresser { stressers := make([]Stresser, len(clus.Tester.StressTypes)) for i, stype := range clus.Tester.StressTypes { - clus.logger.Info("creating stresser", zap.String("type", stype)) + clus.lg.Info("creating stresser", zap.String("type", stype)) switch stype { case "NO_STRESS": @@ -48,7 +49,7 @@ func newStresser(clus *Cluster, idx int) Stresser { // TODO: Too intensive stressing clients can panic etcd member with // 'out of memory' error. Put rate limits in server side. stressers[i] = &keyStresser{ - logger: clus.logger, + lg: clus.lg, Endpoint: clus.Members[idx].EtcdClientEndpoint, keySize: int(clus.Tester.StressKeySize), keyLargeSize: int(clus.Tester.StressKeySizeLarge), @@ -61,7 +62,7 @@ func newStresser(clus *Cluster, idx int) Stresser { case "LEASE": stressers[i] = &leaseStresser{ - logger: clus.logger, + lg: clus.lg, endpoint: clus.Members[idx].EtcdClientEndpoint, numLeases: 10, // TODO: configurable keysPerLease: 10, // TODO: configurable diff --git a/tools/functional-tester/tester/stress_key.go b/tools/functional-tester/tester/stress_key.go index fc291beec..49fba0b26 100644 --- a/tools/functional-tester/tester/stress_key.go +++ b/tools/functional-tester/tester/stress_key.go @@ -33,7 +33,7 @@ import ( ) type keyStresser struct { - logger *zap.Logger + lg *zap.Logger Endpoint string // TODO: use Member @@ -96,7 +96,7 @@ func (s *keyStresser) Stress() error { go s.run(ctx) } - s.logger.Info( + s.lg.Info( "key stresser started in background", zap.String("endpoint", s.Endpoint), ) @@ -150,7 +150,7 @@ func (s *keyStresser) run(ctx context.Context) { // from stresser.Cancel method: return default: - s.logger.Warn( + s.lg.Warn( "key stresser exited with error", zap.String("endpoint", s.Endpoint), zap.Error(err), @@ -169,7 +169,7 @@ func (s *keyStresser) Close() { s.conn.Close() s.wg.Wait() - s.logger.Info( + s.lg.Info( "key stresser is closed", zap.String("endpoint", s.Endpoint), ) diff --git a/tools/functional-tester/tester/stress_lease.go b/tools/functional-tester/tester/stress_lease.go index bf65d294a..3c5f3c2d9 100644 --- a/tools/functional-tester/tester/stress_lease.go +++ b/tools/functional-tester/tester/stress_lease.go @@ -37,7 +37,7 @@ const ( ) type leaseStresser struct { - logger *zap.Logger + lg *zap.Logger endpoint string cancel func() @@ -122,7 +122,7 @@ func (ls *leaseStresser) setupOnce() error { } func (ls *leaseStresser) Stress() error { - ls.logger.Info( + ls.lg.Info( "lease stresser is started", zap.String("endpoint", ls.endpoint), ) @@ -161,22 +161,22 @@ func (ls *leaseStresser) run() { return } - ls.logger.Debug( + ls.lg.Debug( "lease stresser is creating leases", zap.String("endpoint", ls.endpoint), ) ls.createLeases() - ls.logger.Debug( + ls.lg.Debug( "lease stresser created leases", zap.String("endpoint", ls.endpoint), ) - ls.logger.Debug( + ls.lg.Debug( "lease stresser is dropped leases", zap.String("endpoint", ls.endpoint), ) ls.randomlyDropLeases() - ls.logger.Debug( + ls.lg.Debug( "lease stresser dropped leases", zap.String("endpoint", ls.endpoint), ) @@ -206,7 +206,7 @@ func (ls *leaseStresser) createAliveLeases() { defer wg.Done() leaseID, err := ls.createLeaseWithKeys(TTL) if err != nil { - ls.logger.Debug( + ls.lg.Debug( "createLeaseWithKeys failed", zap.String("endpoint", ls.endpoint), zap.Error(err), @@ -244,7 +244,7 @@ func (ls *leaseStresser) createShortLivedLeases() { func (ls *leaseStresser) createLeaseWithKeys(ttl int64) (int64, error) { leaseID, err := ls.createLease(ttl) if err != nil { - ls.logger.Debug( + ls.lg.Debug( "createLease failed", zap.String("endpoint", ls.endpoint), zap.Error(err), @@ -252,7 +252,7 @@ func (ls *leaseStresser) createLeaseWithKeys(ttl int64) (int64, error) { return -1, err } - ls.logger.Debug( + ls.lg.Debug( "createLease created lease", zap.String("endpoint", ls.endpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), @@ -273,7 +273,7 @@ func (ls *leaseStresser) randomlyDropLeases() { // if randomlyDropLease encountered an error such as context is cancelled, remove the lease from aliveLeases // because we can't tell whether the lease is dropped or not. if err != nil { - ls.logger.Debug( + ls.lg.Debug( "randomlyDropLease failed", zap.String("endpoint", ls.endpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), @@ -285,7 +285,7 @@ func (ls *leaseStresser) randomlyDropLeases() { if !dropped { return } - ls.logger.Debug( + ls.lg.Debug( "randomlyDropLease dropped a lease", zap.String("endpoint", ls.endpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), @@ -314,7 +314,7 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { select { case <-time.After(500 * time.Millisecond): case <-ls.ctx.Done(): - ls.logger.Debug( + ls.lg.Debug( "keepLeaseAlive context canceled", zap.String("endpoint", ls.endpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), @@ -328,7 +328,7 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { renewTime, ok := ls.aliveLeases.read(leaseID) if ok && renewTime.Add(TTL/2*time.Second).Before(time.Now()) { ls.aliveLeases.remove(leaseID) - ls.logger.Debug( + ls.lg.Debug( "keepLeaseAlive lease has not been renewed, dropped it", zap.String("endpoint", ls.endpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), @@ -338,7 +338,7 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { } if err != nil { - ls.logger.Debug( + ls.lg.Debug( "keepLeaseAlive lease creates stream error", zap.String("endpoint", ls.endpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), @@ -351,14 +351,14 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { continue } - ls.logger.Debug( + ls.lg.Debug( "keepLeaseAlive stream sends lease keepalive request", zap.String("endpoint", ls.endpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), ) err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID}) if err != nil { - ls.logger.Debug( + ls.lg.Debug( "keepLeaseAlive stream failed to send lease keepalive request", zap.String("endpoint", ls.endpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), @@ -367,14 +367,14 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { continue } leaseRenewTime := time.Now() - ls.logger.Debug( + ls.lg.Debug( "keepLeaseAlive stream sent lease keepalive request", zap.String("endpoint", ls.endpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), ) respRC, err := stream.Recv() if err != nil { - ls.logger.Debug( + ls.lg.Debug( "keepLeaseAlive stream failed to receive lease keepalive response", zap.String("endpoint", ls.endpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), @@ -385,7 +385,7 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { // lease expires after TTL become 0 // don't send keepalive if the lease has expired if respRC.TTL <= 0 { - ls.logger.Debug( + ls.lg.Debug( "keepLeaseAlive stream received lease keepalive response TTL <= 0", zap.String("endpoint", ls.endpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), @@ -395,7 +395,7 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { return } // renew lease timestamp only if lease is present - ls.logger.Debug( + ls.lg.Debug( "keepLeaseAlive renewed a lease", zap.String("endpoint", ls.endpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), @@ -444,7 +444,7 @@ func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) { } } - ls.logger.Debug( + ls.lg.Debug( "randomlyDropLease error", zap.String("endpoint", ls.endpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), @@ -458,7 +458,7 @@ func (ls *leaseStresser) Pause() { } func (ls *leaseStresser) Close() { - ls.logger.Info( + ls.lg.Info( "lease stresser is closing", zap.String("endpoint", ls.endpoint), ) @@ -466,7 +466,7 @@ func (ls *leaseStresser) Close() { ls.runWg.Wait() ls.aliveWg.Wait() ls.conn.Close() - ls.logger.Info( + ls.lg.Info( "lease stresser is closed", zap.String("endpoint", ls.endpoint), ) @@ -478,7 +478,7 @@ func (ls *leaseStresser) ModifiedKeys() int64 { func (ls *leaseStresser) Checker() Checker { return &leaseChecker{ - logger: ls.logger, + lg: ls.lg, endpoint: ls.endpoint, ls: ls, }