Merge pull request #5456 from gyuho/tester_fix

etcd-tester: fix compact timeout
release-3.0
Gyu-Ho Lee 2016-05-25 18:53:07 -07:00
commit d7fa07cffa
2 changed files with 25 additions and 10 deletions

View File

@ -320,7 +320,7 @@ func (c *cluster) getRevisionHash() (map[string]int64, map[string]int64, error)
return revs, hashes, nil
}
func (c *cluster) compactKV(rev int64) (err error) {
func (c *cluster) compactKV(rev int64, timeout time.Duration) (err error) {
if rev <= 0 {
return nil
}
@ -332,7 +332,7 @@ func (c *cluster) compactKV(rev int64) (err error) {
continue
}
kvc := pb.NewKVClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
_, cerr := kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev, Physical: true})
cancel()
conn.Close()

View File

@ -29,6 +29,10 @@ type tester struct {
currentRevision int64
}
// compactQPS is rough number of compact requests per second.
// Previous tests showed etcd can compact about 60,000 entries per second.
const compactQPS = 50000
func (tt *tester) runLoop() {
tt.status.Since = time.Now()
tt.status.RoundLimit = tt.limit
@ -42,7 +46,10 @@ func (tt *tester) runLoop() {
tt.status.setCase(-1) // -1 so that logPrefix doesn't print out 'case'
roundTotalCounter.Inc()
var failed bool
var (
prevCompactRev int64
failed bool
)
for j, f := range tt.failures {
caseTotalCounter.WithLabelValues(f.Desc()).Inc()
tt.status.setCase(j)
@ -105,7 +112,15 @@ func (tt *tester) runLoop() {
}
revToCompact := max(0, tt.currentRevision-10000)
if err := tt.compact(revToCompact); err != nil {
compactN := revToCompact - prevCompactRev
timeout := 10 * time.Second
if prevCompactRev != 0 && compactN > 0 {
timeout += time.Duration(compactN/compactQPS) * time.Second
}
prevCompactRev = revToCompact
plog.Printf("%s compacting %d entries (timeout %v)", tt.logPrefix(), compactN, timeout)
if err := tt.compact(revToCompact, timeout); err != nil {
plog.Warningf("%s functional-tester returning with error (%v)", tt.logPrefix(), err)
return
}
@ -164,18 +179,18 @@ func (tt *tester) startStressers() {
plog.Printf("%s started stressers", tt.logPrefix())
}
func (tt *tester) compact(rev int64) error {
plog.Printf("%s compacting storage at %d (current revision %d)", tt.logPrefix(), rev, tt.currentRevision)
if err := tt.cluster.compactKV(rev); err != nil {
func (tt *tester) compact(rev int64, timeout time.Duration) error {
plog.Printf("%s compacting storage (current revision %d, compact revision %d)", tt.logPrefix(), tt.currentRevision, rev)
if err := tt.cluster.compactKV(rev, timeout); err != nil {
plog.Printf("%s compactKV error (%v)", tt.logPrefix(), err)
if cerr := tt.cleanup(); cerr != nil {
return fmt.Errorf("%s, %s", err, cerr)
}
return err
}
plog.Printf("%s compacted storage at %d", tt.logPrefix(), rev)
plog.Printf("%s compacted storage (compact revision %d)", tt.logPrefix(), rev)
plog.Printf("%s checking compaction at %d", tt.logPrefix(), rev)
plog.Printf("%s checking compaction (compact revision %d)", tt.logPrefix(), rev)
if err := tt.cluster.checkCompact(rev); err != nil {
plog.Printf("%s checkCompact error (%v)", tt.logPrefix(), err)
if cerr := tt.cleanup(); cerr != nil {
@ -184,7 +199,7 @@ func (tt *tester) compact(rev int64) error {
return err
}
plog.Printf("%s confirmed compaction at %d", tt.logPrefix(), rev)
plog.Printf("%s confirmed compaction (compact revision %d)", tt.logPrefix(), rev)
return nil
}