functional-tester: add retries to hash checking
allows hashes to converge through retrying.release-3.1
parent
d7bc15300b
commit
bb97adda0d
|
@ -16,12 +16,18 @@ package main
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
const (
|
||||
retries = 7
|
||||
stabilizationPeriod = 3 * time.Second
|
||||
)
|
||||
|
||||
type Checker interface {
|
||||
// Check returns an error if the system fails a consistency check.
|
||||
Check() error
|
||||
|
@ -39,39 +45,92 @@ func newHashChecker(hrg hashAndRevGetter) Checker { return &hashChecker{hrg} }
|
|||
|
||||
const leaseCheckerTimeout = 10 * time.Second
|
||||
|
||||
func (hc *hashChecker) Check() (err error) {
|
||||
plog.Printf("fetching current revisions...")
|
||||
func (hc *hashChecker) checkRevAndHashes() (err error) {
|
||||
// retries in case of transient failure or etcd nodes have not stablized yet.
|
||||
var (
|
||||
revs map[string]int64
|
||||
hashes map[string]int64
|
||||
ok bool
|
||||
revsStable bool
|
||||
hashesStable bool
|
||||
)
|
||||
// retry in case of transient failure
|
||||
for i := 0; i < 3; i++ {
|
||||
revs, hashes, err = hc.hrg.getRevisionHash()
|
||||
if err != nil {
|
||||
plog.Printf("#%d failed to get current revisions (%v)", i, err)
|
||||
for i := 0; i < retries; i++ {
|
||||
revsStable, err = hc.areRevisonsStable()
|
||||
if err != nil || !revsStable {
|
||||
continue
|
||||
}
|
||||
if _, ok = getSameValue(revs); ok {
|
||||
break
|
||||
hashesStable, err = hc.areHashesStable()
|
||||
if err != nil || !hashesStable {
|
||||
continue
|
||||
}
|
||||
// hashes must be stable at this point
|
||||
return nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !revsStable || !hashesStable {
|
||||
return fmt.Errorf("checkRevAndHashes detects inconsistency: [revisions stable %v] [hashes stable %v]", revsStable, hashesStable)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (hc *hashChecker) areRevisonsStable() (rv bool, err error) {
|
||||
var preRevs map[string]int64
|
||||
for i := 0; i < 2; i++ {
|
||||
revs, _, err := hc.hrg.getRevisionHash()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
plog.Printf("#%d inconsistent current revisions %+v", i, revs)
|
||||
time.Sleep(time.Second)
|
||||
_, sameRev := getSameValue(revs)
|
||||
if !sameRev {
|
||||
plog.Printf("current revisions are not consistent: revisions [revisions: %v]", revs)
|
||||
return false, nil
|
||||
}
|
||||
// sleep for N seconds. after that, check to make sure that revisions don't change
|
||||
if i == 0 {
|
||||
preRevs = revs
|
||||
time.Sleep(stabilizationPeriod)
|
||||
} else if !reflect.DeepEqual(revs, preRevs) {
|
||||
// use map comparison logic found in http://stackoverflow.com/questions/18208394/testing-equivalence-of-maps-golang
|
||||
plog.Printf("revisions are not stable: [current revisions: %v] [previous revisions: %v]", revs, preRevs)
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
if !ok || err != nil {
|
||||
return fmt.Errorf("checking current revisions failed [err: %v, revisions: %v]", err, revs)
|
||||
}
|
||||
plog.Printf("all members are consistent with current revisions [revisions: %v]", revs)
|
||||
plog.Printf("revisions are stable: revisions [revisions: %v]", preRevs)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
plog.Printf("checking current storage hashes...")
|
||||
if _, ok = getSameValue(hashes); !ok {
|
||||
return fmt.Errorf("inconsistent hashes [%v]", hashes)
|
||||
func (hc *hashChecker) areHashesStable() (rv bool, err error) {
|
||||
var prevHashes map[string]int64
|
||||
for i := 0; i < 2; i++ {
|
||||
revs, hashes, err := hc.hrg.getRevisionHash()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
_, sameRev := getSameValue(revs)
|
||||
_, sameHashes := getSameValue(hashes)
|
||||
if !sameRev || !sameHashes {
|
||||
plog.Printf("hashes are not stable: revisions [revisions: %v] and hashes [hashes: %v]", revs, hashes)
|
||||
return false, nil
|
||||
}
|
||||
// sleep for N seconds. after that, check to make sure that the hashes and revisions don't change
|
||||
if i == 0 {
|
||||
time.Sleep(stabilizationPeriod)
|
||||
prevHashes = hashes
|
||||
} else if !reflect.DeepEqual(hashes, prevHashes) {
|
||||
// use map comparison logic found in http://stackoverflow.com/questions/18208394/testing-equivalence-of-maps-golang
|
||||
plog.Printf("hashes are not stable: [current hashes: %v] [previous hashes: %v]", hashes, prevHashes)
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
plog.Printf("hashes are stable: hashes [hashes: %v]", prevHashes)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
plog.Printf("all members are consistent with storage hashes")
|
||||
return nil
|
||||
func (hc *hashChecker) Check() error {
|
||||
return hc.checkRevAndHashes()
|
||||
}
|
||||
|
||||
type leaseChecker struct {
|
||||
|
|
|
@ -211,13 +211,14 @@ func (ls *leaseStresser) createLeases() {
|
|||
defer wg.Done()
|
||||
leaseID, err := ls.createLease()
|
||||
if err != nil {
|
||||
plog.Errorf("lease creation error: (%v)", err)
|
||||
plog.Debugf("lease creation error: (%v)", err)
|
||||
return
|
||||
}
|
||||
plog.Debugf("lease %v created", leaseID)
|
||||
// if attaching keys to the lease encountered an error, we don't add the lease to the aliveLeases map
|
||||
// because invariant check on the lease will fail due to keys not found
|
||||
if err := ls.attachKeysWithLease(leaseID); err != nil {
|
||||
plog.Debugf("unable to attach keys to lease %d error (%v)", leaseID, err)
|
||||
return
|
||||
}
|
||||
ls.aliveLeases.add(leaseID, time.Now())
|
||||
|
@ -239,6 +240,7 @@ func (ls *leaseStresser) randomlyDropLeases() {
|
|||
// if randomlyDropLease encountered an error such as context is cancelled, remove the lease from aliveLeases
|
||||
// becasue we can't tell whether the lease is dropped or not.
|
||||
if err != nil {
|
||||
plog.Debugf("drop lease %v has failed error (%v)", leaseID, err)
|
||||
ls.aliveLeases.remove(leaseID)
|
||||
return
|
||||
}
|
||||
|
@ -271,7 +273,6 @@ func (ls *leaseStresser) hasLeaseExpired(ctx context.Context, leaseID int64) (bo
|
|||
// Since the format of keys contains about leaseID, finding keys base on "<leaseID>" prefix
|
||||
// determines whether the attached keys for a given leaseID has been deleted or not
|
||||
func (ls *leaseStresser) hasKeysAttachedToLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
|
||||
// plog.Infof("retriving keys attached to lease %v", leaseID)
|
||||
resp, err := ls.kvc.Range(ctx, &pb.RangeRequest{
|
||||
Key: []byte(fmt.Sprintf("%d", leaseID)),
|
||||
RangeEnd: []byte(clientv3.GetPrefixRangeEnd(fmt.Sprintf("%d", leaseID))),
|
||||
|
@ -368,7 +369,6 @@ func (ls *leaseStresser) attachKeysWithLease(leaseID int64) error {
|
|||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return ls.ctx.Err()
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue