etcd-tester: move stresser and checker to tester

These really belong in tester code; the stressers and
checkers are higher order operations that are orchestrated
by the tester. They're not really cluster primitives.
release-3.1
Anthony Romano 2016-10-28 20:11:32 -04:00
parent 4a08678ce1
commit 86c4a74139
4 changed files with 92 additions and 80 deletions

View File

@ -39,17 +39,9 @@ type agentConfig struct {
}
type cluster struct {
agents []agentConfig
v2Only bool // to be deprecated
consistencyCheck bool
Size int
Stressers []Stresser
stressBuilder stressBuilder
leaseStresserBuilder leaseStresserBuilder
Checker Checker
agents []agentConfig
v2Only bool // to be deprecated
Size int
Members []*member
}
@ -100,27 +92,6 @@ func (c *cluster) bootstrap() error {
}
}
c.Stressers = make([]Stresser, 0)
leaseStressers := make([]Stresser, len(members))
for i, m := range members {
lStresser := c.leaseStresserBuilder(m)
leaseStressers[i] = lStresser
c.Stressers = append(c.Stressers, c.stressBuilder(m), lStresser)
}
for i := range c.Stressers {
go c.Stressers[i].Stress()
}
var checkers []Checker
if c.consistencyCheck && !c.v2Only {
checkers = append(checkers, newHashChecker(hashAndRevGetter(c)), newLeaseChecker(leaseStressers))
} else {
checkers = append(checkers, newNoChecker())
}
c.Checker = newCompositeChecker(checkers)
c.Size = size
c.Members = members
return nil
@ -167,15 +138,6 @@ func (c *cluster) GetLeader() (int, error) {
return 0, fmt.Errorf("no leader found")
}
func (c *cluster) Report() (success, failure int) {
for _, stress := range c.Stressers {
s, f := stress.Report()
success += s
failure += f
}
return
}
func (c *cluster) Cleanup() error {
var lasterr error
for _, m := range c.Members {
@ -183,10 +145,6 @@ func (c *cluster) Cleanup() error {
lasterr = err
}
}
for _, s := range c.Stressers {
s.Cancel()
}
return lasterr
}
@ -194,9 +152,6 @@ func (c *cluster) Terminate() {
for _, m := range c.Members {
m.Agent.Terminate()
}
for _, s := range c.Stressers {
s.Cancel()
}
}
func (c *cluster) Status() ClusterStatus {
@ -217,6 +172,22 @@ func (c *cluster) Status() ClusterStatus {
return cs
}
func (c *cluster) maxRev() (rev int64, err error) {
for _, m := range c.Members {
curRev, _, curErr := m.RevHash()
if curErr != nil {
err = curErr
}
if curRev > rev {
rev = curRev
}
}
if rev == 0 {
return 0, err
}
return rev, nil
}
func (c *cluster) getRevisionHash() (map[string]int64, map[string]int64, error) {
revs := make(map[string]int64)
hashes := make(map[string]int64)

View File

@ -136,26 +136,29 @@ func (f *failureUntilSnapshot) Inject(c *cluster, round int) error {
if err := f.failure.Inject(c, round); err != nil {
return err
}
if c.Size < 3 {
return nil
}
start, _ := c.Report()
end := start
startRev, err := c.maxRev()
if err != nil {
return err
}
lastRev := startRev
// Normal healthy cluster could accept 1000req/s at least.
// Give it 3-times time to create a new snapshot.
retry := snapshotCount / 1000 * 3
for j := 0; j < retry; j++ {
end, _ = c.Report()
if lastRev, err = c.maxRev(); err != nil {
return err
}
// If the number of proposals committed is bigger than snapshot count,
// a new snapshot should have been created.
if end-start > snapshotCount {
if lastRev-startRev > snapshotCount {
return nil
}
time.Sleep(time.Second)
}
return fmt.Errorf("cluster too slow: only commit %d requests in %ds", end-start, retry)
return fmt.Errorf("cluster too slow: only commit %d requests in %ds", lastRev-startRev, retry)
}
func (f *failureUntilSnapshot) Desc() string {

View File

@ -67,26 +67,9 @@ func main() {
agents[i].datadir = *datadir
}
sConfig := &stressConfig{
qps: *stressQPS,
keyLargeSize: int(*stressKeyLargeSize),
keySize: int(*stressKeySize),
keySuffixRange: int(*stressKeySuffixRange),
v2: *isV2Only,
}
lsConfig := &leaseStressConfig{
numLeases: 10,
keysPerLease: 10,
qps: *stressQPS, // only used to create nop stresser in leaseStresserBuilder
}
c := &cluster{
agents: agents,
v2Only: *isV2Only,
stressBuilder: newStressBuilder(*stresserType, sConfig),
leaseStresserBuilder: newLeaseStresserBuilder(*stresserType, lsConfig),
consistencyCheck: *consistencyCheck,
agents: agents,
v2Only: *isV2Only,
}
if err := c.bootstrap(); err != nil {
@ -129,10 +112,26 @@ func main() {
schedule[i] = failures[caseNum]
}
}
sConfig := &stressConfig{
qps: *stressQPS,
keyLargeSize: int(*stressKeyLargeSize),
keySize: int(*stressKeySize),
keySuffixRange: int(*stressKeySuffixRange),
v2: *isV2Only,
}
lsConfig := &leaseStressConfig{
numLeases: 10,
keysPerLease: 10,
qps: *stressQPS,
}
t := &tester{
failures: schedule,
cluster: c,
limit: *limit,
failures: schedule,
cluster: c,
limit: *limit,
stressBuilder: newStressBuilder(*stresserType, sConfig),
leaseStresserBuilder: newLeaseStresserBuilder(*stresserType, lsConfig),
consistencyCheck: *consistencyCheck,
}
sh := statusHandler{status: &t.status}

View File

@ -25,6 +25,12 @@ type tester struct {
limit int
status Status
currentRevision int64
Stressers []Stresser
stressBuilder stressBuilder
leaseStresserBuilder leaseStresserBuilder
Checker Checker
consistencyCheck bool
}
// compactQPS is rough number of compact requests per second.
@ -39,6 +45,8 @@ func (tt *tester) runLoop() {
tt.status.Failures = append(tt.status.Failures, f.Desc())
}
tt.setupStressers()
var prevCompactRev int64
for round := 0; round < tt.limit || tt.limit == -1; round++ {
tt.status.setRound(round)
@ -82,6 +90,27 @@ func (tt *tester) runLoop() {
plog.Printf("%s functional-tester is finished", tt.logPrefix())
}
func (tt *tester) setupStressers() {
tt.Stressers = make([]Stresser, 0)
leaseStressers := make([]Stresser, 0, 2*len(tt.cluster.Members))
for i, m := range tt.cluster.Members {
lStresser := tt.leaseStresserBuilder(m)
leaseStressers[i] = lStresser
tt.Stressers = append(tt.Stressers, tt.stressBuilder(m), lStresser)
}
for i := range tt.Stressers {
go tt.Stressers[i].Stress()
}
if !tt.consistencyCheck || tt.cluster.v2Only {
tt.Checker = newNoChecker()
return
}
tt.Checker = newCompositeChecker([]Checker{
newHashChecker(hashAndRevGetter(tt.cluster)),
newLeaseChecker(leaseStressers)},
)
}
func (tt *tester) doRound(round int) error {
for j, f := range tt.failures {
caseTotalCounter.WithLabelValues(f.Desc()).Inc()
@ -142,7 +171,7 @@ func (tt *tester) checkConsistency() (err error) {
}
err = tt.startStressers()
}()
if err = tt.cluster.Checker.Check(); err != nil {
if err = tt.Checker.Check(); err != nil {
plog.Printf("%s %v", tt.logPrefix(), err)
}
@ -207,6 +236,7 @@ func (tt *tester) cleanup() error {
}
caseFailedTotalCounter.WithLabelValues(desc).Inc()
tt.cancelStressers()
plog.Printf("%s cleaning up...", tt.logPrefix())
if err := tt.cluster.Cleanup(); err != nil {
plog.Warningf("%s cleanup error: %v", tt.logPrefix(), err)
@ -223,7 +253,7 @@ func (tt *tester) cleanup() error {
func (tt *tester) cancelStressers() {
plog.Printf("%s canceling the stressers...", tt.logPrefix())
for _, s := range tt.cluster.Stressers {
for _, s := range tt.Stressers {
s.Cancel()
}
plog.Printf("%s canceled stressers", tt.logPrefix())
@ -231,7 +261,7 @@ func (tt *tester) cancelStressers() {
func (tt *tester) startStressers() error {
plog.Printf("%s starting the stressers...", tt.logPrefix())
for _, s := range tt.cluster.Stressers {
for _, s := range tt.Stressers {
if err := s.Stress(); err != nil {
return err
}
@ -239,3 +269,12 @@ func (tt *tester) startStressers() error {
plog.Printf("%s started stressers", tt.logPrefix())
return nil
}
func (tt *tester) Report() (success, failure int) {
for _, stresser := range tt.Stressers {
s, f := stresser.Report()
success += s
failure += f
}
return
}