diff --git a/tools/functional-tester/etcd-tester/cluster.go b/tools/functional-tester/etcd-tester/cluster.go index 3d93a43f4..94631fc79 100644 --- a/tools/functional-tester/etcd-tester/cluster.go +++ b/tools/functional-tester/etcd-tester/cluster.go @@ -39,17 +39,9 @@ type agentConfig struct { } type cluster struct { - agents []agentConfig - - v2Only bool // to be deprecated - consistencyCheck bool - Size int - - Stressers []Stresser - stressBuilder stressBuilder - leaseStresserBuilder leaseStresserBuilder - Checker Checker - + agents []agentConfig + v2Only bool // to be deprecated + Size int Members []*member } @@ -100,27 +92,6 @@ func (c *cluster) bootstrap() error { } } - c.Stressers = make([]Stresser, 0) - leaseStressers := make([]Stresser, len(members)) - for i, m := range members { - lStresser := c.leaseStresserBuilder(m) - leaseStressers[i] = lStresser - c.Stressers = append(c.Stressers, c.stressBuilder(m), lStresser) - } - - for i := range c.Stressers { - go c.Stressers[i].Stress() - } - - var checkers []Checker - if c.consistencyCheck && !c.v2Only { - checkers = append(checkers, newHashChecker(hashAndRevGetter(c)), newLeaseChecker(leaseStressers)) - } else { - checkers = append(checkers, newNoChecker()) - } - - c.Checker = newCompositeChecker(checkers) - c.Size = size c.Members = members return nil @@ -167,15 +138,6 @@ func (c *cluster) GetLeader() (int, error) { return 0, fmt.Errorf("no leader found") } -func (c *cluster) Report() (success, failure int) { - for _, stress := range c.Stressers { - s, f := stress.Report() - success += s - failure += f - } - return -} - func (c *cluster) Cleanup() error { var lasterr error for _, m := range c.Members { @@ -183,10 +145,6 @@ func (c *cluster) Cleanup() error { lasterr = err } } - for _, s := range c.Stressers { - s.Cancel() - } - return lasterr } @@ -194,9 +152,6 @@ func (c *cluster) Terminate() { for _, m := range c.Members { m.Agent.Terminate() } - for _, s := range c.Stressers { - s.Cancel() - } } func (c *cluster) Status() ClusterStatus { @@ -217,6 +172,22 @@ func (c *cluster) Status() ClusterStatus { return cs } +func (c *cluster) maxRev() (rev int64, err error) { + for _, m := range c.Members { + curRev, _, curErr := m.RevHash() + if curErr != nil { + err = curErr + } + if curRev > rev { + rev = curRev + } + } + if rev == 0 { + return 0, err + } + return rev, nil +} + func (c *cluster) getRevisionHash() (map[string]int64, map[string]int64, error) { revs := make(map[string]int64) hashes := make(map[string]int64) diff --git a/tools/functional-tester/etcd-tester/failure.go b/tools/functional-tester/etcd-tester/failure.go index 82392087d..4d78d5514 100644 --- a/tools/functional-tester/etcd-tester/failure.go +++ b/tools/functional-tester/etcd-tester/failure.go @@ -136,26 +136,29 @@ func (f *failureUntilSnapshot) Inject(c *cluster, round int) error { if err := f.failure.Inject(c, round); err != nil { return err } - if c.Size < 3 { return nil } - - start, _ := c.Report() - end := start + startRev, err := c.maxRev() + if err != nil { + return err + } + lastRev := startRev // Normal healthy cluster could accept 1000req/s at least. // Give it 3-times time to create a new snapshot. retry := snapshotCount / 1000 * 3 for j := 0; j < retry; j++ { - end, _ = c.Report() + if lastRev, err = c.maxRev(); err != nil { + return err + } // If the number of proposals committed is bigger than snapshot count, // a new snapshot should have been created. - if end-start > snapshotCount { + if lastRev-startRev > snapshotCount { return nil } time.Sleep(time.Second) } - return fmt.Errorf("cluster too slow: only commit %d requests in %ds", end-start, retry) + return fmt.Errorf("cluster too slow: only commit %d requests in %ds", lastRev-startRev, retry) } func (f *failureUntilSnapshot) Desc() string { diff --git a/tools/functional-tester/etcd-tester/main.go b/tools/functional-tester/etcd-tester/main.go index 55b8e2d2d..bbb095319 100644 --- a/tools/functional-tester/etcd-tester/main.go +++ b/tools/functional-tester/etcd-tester/main.go @@ -67,26 +67,9 @@ func main() { agents[i].datadir = *datadir } - sConfig := &stressConfig{ - qps: *stressQPS, - keyLargeSize: int(*stressKeyLargeSize), - keySize: int(*stressKeySize), - keySuffixRange: int(*stressKeySuffixRange), - v2: *isV2Only, - } - - lsConfig := &leaseStressConfig{ - numLeases: 10, - keysPerLease: 10, - qps: *stressQPS, // only used to create nop stresser in leaseStresserBuilder - } - c := &cluster{ - agents: agents, - v2Only: *isV2Only, - stressBuilder: newStressBuilder(*stresserType, sConfig), - leaseStresserBuilder: newLeaseStresserBuilder(*stresserType, lsConfig), - consistencyCheck: *consistencyCheck, + agents: agents, + v2Only: *isV2Only, } if err := c.bootstrap(); err != nil { @@ -129,10 +112,26 @@ func main() { schedule[i] = failures[caseNum] } } + + sConfig := &stressConfig{ + qps: *stressQPS, + keyLargeSize: int(*stressKeyLargeSize), + keySize: int(*stressKeySize), + keySuffixRange: int(*stressKeySuffixRange), + v2: *isV2Only, + } + lsConfig := &leaseStressConfig{ + numLeases: 10, + keysPerLease: 10, + qps: *stressQPS, + } t := &tester{ - failures: schedule, - cluster: c, - limit: *limit, + failures: schedule, + cluster: c, + limit: *limit, + stressBuilder: newStressBuilder(*stresserType, sConfig), + leaseStresserBuilder: newLeaseStresserBuilder(*stresserType, lsConfig), + consistencyCheck: *consistencyCheck, } sh := statusHandler{status: &t.status} diff --git a/tools/functional-tester/etcd-tester/tester.go b/tools/functional-tester/etcd-tester/tester.go index 8f5f70eaa..1216e1798 100644 --- a/tools/functional-tester/etcd-tester/tester.go +++ b/tools/functional-tester/etcd-tester/tester.go @@ -25,6 +25,12 @@ type tester struct { limit int status Status currentRevision int64 + + Stressers []Stresser + stressBuilder stressBuilder + leaseStresserBuilder leaseStresserBuilder + Checker Checker + consistencyCheck bool } // compactQPS is rough number of compact requests per second. @@ -39,6 +45,8 @@ func (tt *tester) runLoop() { tt.status.Failures = append(tt.status.Failures, f.Desc()) } + tt.setupStressers() + var prevCompactRev int64 for round := 0; round < tt.limit || tt.limit == -1; round++ { tt.status.setRound(round) @@ -82,6 +90,27 @@ func (tt *tester) runLoop() { plog.Printf("%s functional-tester is finished", tt.logPrefix()) } +func (tt *tester) setupStressers() { + tt.Stressers = make([]Stresser, 0) + leaseStressers := make([]Stresser, 0, 2*len(tt.cluster.Members)) + for i, m := range tt.cluster.Members { + lStresser := tt.leaseStresserBuilder(m) + leaseStressers[i] = lStresser + tt.Stressers = append(tt.Stressers, tt.stressBuilder(m), lStresser) + } + for i := range tt.Stressers { + go tt.Stressers[i].Stress() + } + if !tt.consistencyCheck || tt.cluster.v2Only { + tt.Checker = newNoChecker() + return + } + tt.Checker = newCompositeChecker([]Checker{ + newHashChecker(hashAndRevGetter(tt.cluster)), + newLeaseChecker(leaseStressers)}, + ) +} + func (tt *tester) doRound(round int) error { for j, f := range tt.failures { caseTotalCounter.WithLabelValues(f.Desc()).Inc() @@ -142,7 +171,7 @@ func (tt *tester) checkConsistency() (err error) { } err = tt.startStressers() }() - if err = tt.cluster.Checker.Check(); err != nil { + if err = tt.Checker.Check(); err != nil { plog.Printf("%s %v", tt.logPrefix(), err) } @@ -207,6 +236,7 @@ func (tt *tester) cleanup() error { } caseFailedTotalCounter.WithLabelValues(desc).Inc() + tt.cancelStressers() plog.Printf("%s cleaning up...", tt.logPrefix()) if err := tt.cluster.Cleanup(); err != nil { plog.Warningf("%s cleanup error: %v", tt.logPrefix(), err) @@ -223,7 +253,7 @@ func (tt *tester) cleanup() error { func (tt *tester) cancelStressers() { plog.Printf("%s canceling the stressers...", tt.logPrefix()) - for _, s := range tt.cluster.Stressers { + for _, s := range tt.Stressers { s.Cancel() } plog.Printf("%s canceled stressers", tt.logPrefix()) @@ -231,7 +261,7 @@ func (tt *tester) cancelStressers() { func (tt *tester) startStressers() error { plog.Printf("%s starting the stressers...", tt.logPrefix()) - for _, s := range tt.cluster.Stressers { + for _, s := range tt.Stressers { if err := s.Stress(); err != nil { return err } @@ -239,3 +269,12 @@ func (tt *tester) startStressers() error { plog.Printf("%s started stressers", tt.logPrefix()) return nil } + +func (tt *tester) Report() (success, failure int) { + for _, stresser := range tt.Stressers { + s, f := stresser.Report() + success += s + failure += f + } + return +}