From 3ed5d28e2efb46ef2fc1ce3984a2a1a729cd6c5e Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Thu, 26 May 2016 11:37:49 -0700 Subject: [PATCH] etcd-tester: fix, clean up multiple things (#5462) * etcd-tester: more logging, fix typo * etcd-tester: fix prevCompactRev scope Fix https://github.com/coreos/etcd/issues/5440. * etcd-tester: move utils to bottom, clean up logs And remove stresser operation inside defrag * etcd-tester: separate update revision call * etcd-tester: fix cleanup when case is -1 --- .../functional-tester/etcd-tester/cluster.go | 3 +- .../functional-tester/etcd-tester/failure.go | 2 +- tools/functional-tester/etcd-tester/tester.go | 195 +++++++++--------- 3 files changed, 104 insertions(+), 96 deletions(-) diff --git a/tools/functional-tester/etcd-tester/cluster.go b/tools/functional-tester/etcd-tester/cluster.go index 9f3e40cbc..fde467b0d 100644 --- a/tools/functional-tester/etcd-tester/cluster.go +++ b/tools/functional-tester/etcd-tester/cluster.go @@ -175,6 +175,7 @@ func (c *cluster) WaitHealth() error { if err == nil { return nil } + plog.Warningf("#%d setHealthKey error (%v)", i, err) time.Sleep(time.Second) } return err @@ -271,7 +272,7 @@ func setHealthKey(us []string) error { cancel() conn.Close() if err != nil { - return err + return fmt.Errorf("%v (%s)", err, u) } } return nil diff --git a/tools/functional-tester/etcd-tester/failure.go b/tools/functional-tester/etcd-tester/failure.go index b65821ebb..921c61595 100644 --- a/tools/functional-tester/etcd-tester/failure.go +++ b/tools/functional-tester/etcd-tester/failure.go @@ -26,7 +26,7 @@ const ( randomVariation = 50 // Wait more when it recovers from slow network, because network layer - // needs extra time to propogate traffic control (tc command) change. + // needs extra time to propagate traffic control (tc command) change. // Otherwise, we get different hash values from the previous revision. // For more detail, please see https://github.com/coreos/etcd/issues/5121. waitRecover = 5 * time.Second diff --git a/tools/functional-tester/etcd-tester/tester.go b/tools/functional-tester/etcd-tester/tester.go index 1b4be937b..afdd4b725 100644 --- a/tools/functional-tester/etcd-tester/tester.go +++ b/tools/functional-tester/etcd-tester/tester.go @@ -40,16 +40,17 @@ func (tt *tester) runLoop() { for _, f := range tt.failures { tt.status.Failures = append(tt.status.Failures, f.Desc()) } - round := 0 + + var ( + round int + prevCompactRev int64 + ) for { tt.status.setRound(round) tt.status.setCase(-1) // -1 so that logPrefix doesn't print out 'case' roundTotalCounter.Inc() - var ( - prevCompactRev int64 - failed bool - ) + var failed bool for j, f := range tt.failures { caseTotalCounter.WithLabelValues(f.Desc()).Inc() tt.status.setCase(j) @@ -63,9 +64,7 @@ func (tt *tester) runLoop() { break } - plog.Printf("%s starting failure %s", tt.logPrefix(), f.Desc()) - - plog.Printf("%s injecting failure...", tt.logPrefix()) + plog.Printf("%s injecting failure %q", tt.logPrefix(), f.Desc()) if err := f.Inject(tt.cluster, round); err != nil { plog.Printf("%s injection error: %v", tt.logPrefix(), err) if err := tt.cleanup(); err != nil { @@ -76,7 +75,7 @@ func (tt *tester) runLoop() { } plog.Printf("%s injected failure", tt.logPrefix()) - plog.Printf("%s recovering failure...", tt.logPrefix()) + plog.Printf("%s recovering failure %q", tt.logPrefix(), f.Desc()) if err := f.Recover(tt.cluster, round); err != nil { plog.Printf("%s recovery error: %v", tt.logPrefix(), err) if err := tt.cleanup(); err != nil { @@ -92,10 +91,18 @@ func (tt *tester) runLoop() { continue } + if !tt.consistencyCheck { + if err := tt.updateRevision(); err != nil { + plog.Warningf("%s functional-tester returning with tt.updateRevision error (%v)", tt.logPrefix(), err) + return + } + continue + } + var err error - failed, err = tt.updateCurrentRevisionHash(tt.consistencyCheck) + failed, err = tt.checkConsistency() if err != nil { - plog.Warningf("%s functional-tester returning with error (%v)", tt.logPrefix(), err) + plog.Warningf("%s functional-tester returning with tt.checkConsistency error (%v)", tt.logPrefix(), err) return } if failed { @@ -139,44 +146,59 @@ func (tt *tester) runLoop() { } } -func (tt *tester) logPrefix() string { +func (tt *tester) updateRevision() error { + revs, _, err := tt.cluster.getRevisionHash() + for _, rev := range revs { + tt.currentRevision = rev + break // just need get one of the current revisions + } + return err +} + +func (tt *tester) checkConsistency() (failed bool, err error) { + tt.cancelStressers() + defer tt.startStressers() + + plog.Printf("%s updating current revisions...", tt.logPrefix()) var ( - rd = tt.status.getRound() - cs = tt.status.getCase() - prefix = fmt.Sprintf("[round#%d case#%d]", rd, cs) + revs map[string]int64 + hashes map[string]int64 + rerr error + ok bool ) - if cs == -1 { - prefix = fmt.Sprintf("[round#%d]", rd) - } - return prefix -} + for i := 0; i < 7; i++ { + time.Sleep(time.Second) -func (tt *tester) cleanup() error { - roundFailedTotalCounter.Inc() - caseFailedTotalCounter.WithLabelValues(tt.failures[tt.status.Case].Desc()).Inc() + revs, hashes, rerr = tt.cluster.getRevisionHash() + if rerr != nil { + plog.Printf("%s #%d failed to get current revisions (%v)", tt.logPrefix(), i, rerr) + continue + } + if tt.currentRevision, ok = getSameValue(revs); ok { + break + } - plog.Printf("%s cleaning up...", tt.logPrefix()) - if err := tt.cluster.Cleanup(); err != nil { - plog.Printf("%s cleanup error: %v", tt.logPrefix(), err) - return err + plog.Printf("%s #%d inconsistent current revisions %+v", tt.logPrefix(), i, revs) } - return tt.cluster.Bootstrap() -} + plog.Printf("%s updated current revisions with %d", tt.logPrefix(), tt.currentRevision) -func (tt *tester) cancelStressers() { - plog.Printf("%s canceling the stressers...", tt.logPrefix()) - for _, s := range tt.cluster.Stressers { - s.Cancel() + if !ok || rerr != nil { + plog.Printf("%s checking current revisions failed [revisions: %v]", tt.logPrefix(), revs) + failed = true + err = tt.cleanup() + return } - plog.Printf("%s canceled stressers", tt.logPrefix()) -} + plog.Printf("%s all members are consistent with current revisions [revisions: %v]", tt.logPrefix(), revs) -func (tt *tester) startStressers() { - plog.Printf("%s starting the stressers...", tt.logPrefix()) - for _, s := range tt.cluster.Stressers { - go s.Stress() + plog.Printf("%s checking current storage hashes...", tt.logPrefix()) + if _, ok = getSameValue(hashes); !ok { + plog.Printf("%s checking current storage hashes failed [hashes: %v]", tt.logPrefix(), hashes) + failed = true + err = tt.cleanup() + return } - plog.Printf("%s started stressers", tt.logPrefix()) + plog.Printf("%s all members are consistent with storage hashes", tt.logPrefix()) + return } func (tt *tester) compact(rev int64, timeout time.Duration) error { @@ -204,9 +226,6 @@ func (tt *tester) compact(rev int64, timeout time.Duration) error { } func (tt *tester) defrag() error { - tt.cancelStressers() - defer tt.startStressers() - plog.Printf("%s defragmenting...", tt.logPrefix()) if err := tt.cluster.defrag(); err != nil { plog.Printf("%s defrag error (%v)", tt.logPrefix(), err) @@ -220,58 +239,46 @@ func (tt *tester) defrag() error { return nil } -func (tt *tester) updateCurrentRevisionHash(check bool) (failed bool, err error) { - if check { - tt.cancelStressers() - defer tt.startStressers() - } - - plog.Printf("%s updating current revisions...", tt.logPrefix()) +func (tt *tester) logPrefix() string { var ( - revs map[string]int64 - hashes map[string]int64 - rerr error - ok bool + rd = tt.status.getRound() + cs = tt.status.getCase() + prefix = fmt.Sprintf("[round#%d case#%d]", rd, cs) ) - for i := 0; i < 7; i++ { - revs, hashes, rerr = tt.cluster.getRevisionHash() - if rerr != nil { - plog.Printf("%s #%d failed to get current revisions (%v)", tt.logPrefix(), i, rerr) - continue - } - if tt.currentRevision, ok = getSameValue(revs); ok { - break - } - - plog.Printf("%s #%d inconsistent current revisions %+v", tt.logPrefix(), i, revs) - if !check { - break // if consistency check is false, just try once - } - - time.Sleep(time.Second) + if cs == -1 { + prefix = fmt.Sprintf("[round#%d]", rd) } - plog.Printf("%s updated current revisions with %d", tt.logPrefix(), tt.currentRevision) - - if !check { - failed = false - return - } - - if !ok || rerr != nil { - plog.Printf("%s checking current revisions failed [revisions: %v]", tt.logPrefix(), revs) - failed = true - err = tt.cleanup() - return - } - plog.Printf("%s all members are consistent with current revisions [revisions: %v]", tt.logPrefix(), revs) - - plog.Printf("%s checking current storage hashes...", tt.logPrefix()) - if _, ok = getSameValue(hashes); !ok { - plog.Printf("%s checking current storage hashes failed [hashes: %v]", tt.logPrefix(), hashes) - failed = true - err = tt.cleanup() - return - } - plog.Printf("%s all members are consistent with storage hashes", tt.logPrefix()) - return + return prefix +} + +func (tt *tester) cleanup() error { + roundFailedTotalCounter.Inc() + desc := "compact/defrag" + if tt.status.Case != -1 { + desc = tt.failures[tt.status.Case].Desc() + } + caseFailedTotalCounter.WithLabelValues(desc).Inc() + + plog.Printf("%s cleaning up...", tt.logPrefix()) + if err := tt.cluster.Cleanup(); err != nil { + plog.Printf("%s cleanup error: %v", tt.logPrefix(), err) + return err + } + return tt.cluster.Bootstrap() +} + +func (tt *tester) cancelStressers() { + plog.Printf("%s canceling the stressers...", tt.logPrefix()) + for _, s := range tt.cluster.Stressers { + s.Cancel() + } + plog.Printf("%s canceled stressers", tt.logPrefix()) +} + +func (tt *tester) startStressers() { + plog.Printf("%s starting the stressers...", tt.logPrefix()) + for _, s := range tt.cluster.Stressers { + go s.Stress() + } + plog.Printf("%s started stressers", tt.logPrefix()) }