diff --git a/lease/lessor.go b/lease/lessor.go index df8596ee3..3418cf565 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -18,7 +18,6 @@ import ( "encoding/binary" "errors" "math" - "math/rand" "sort" "sync" "sync/atomic" @@ -33,15 +32,14 @@ const ( // NoLease is a special LeaseID representing the absence of a lease. NoLease = LeaseID(0) - // maximum number of leases to revoke per iteration - // TODO: make this configurable? - leaseRevokeRate = 1000 + forever = monotime.Time(math.MaxInt64) ) var ( leaseBucketName = []byte("lease") - forever = monotime.Time(math.MaxInt64) + // maximum number of leases to revoke per second; configurable for tests + leaseRevokeRate = 1000 ErrNotPrimary = errors.New("not a primary lessor") ErrLeaseNotFound = errors.New("lease not found") @@ -327,21 +325,54 @@ func (le *lessor) Promote(extend time.Duration) { // refresh the expiries of all leases. for _, l := range le.leaseMap { - // randomize expiry with 士10%, otherwise leases of same TTL - // will expire all at the same time, - l.refresh(extend + computeRandomDelta(l.ttl)) + l.refresh(extend) + } + + if len(le.leaseMap) < leaseRevokeRate { + // no possibility of lease pile-up + return + } + + // adjust expiries in case of overlap + leases := make([]*Lease, 0, len(le.leaseMap)) + for _, l := range le.leaseMap { + leases = append(leases, l) + } + sort.Sort(leasesByExpiry(leases)) + + baseWindow := leases[0].Remaining() + nextWindow := baseWindow + time.Second + expires := 0 + // have fewer expires than the total revoke rate so piled up leases + // don't consume the entire revoke limit + targetExpiresPerSecond := (3 * leaseRevokeRate) / 4 + for _, l := range leases { + remaining := l.Remaining() + if remaining > nextWindow { + baseWindow = remaining + nextWindow = baseWindow + time.Second + expires = 1 + continue + } + expires++ + if expires <= targetExpiresPerSecond { + continue + } + rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond)) + // If leases are extended by n seconds, leases n seconds ahead of the + // base window should be extended by only one second. + rateDelay -= float64(remaining - baseWindow) + delay := time.Duration(rateDelay) + nextWindow = baseWindow + delay + l.refresh(delay + extend) } } -func computeRandomDelta(seconds int64) time.Duration { - var delta int64 - if seconds > 10 { - delta = int64(float64(seconds) * 0.1 * rand.Float64()) - } else { - delta = rand.Int63n(10) - } - return time.Duration(delta) * time.Second -} +type leasesByExpiry []*Lease + +func (le leasesByExpiry) Len() int { return len(le) } +func (le leasesByExpiry) Less(i, j int) bool { return le[i].Remaining() < le[j].Remaining() } +func (le leasesByExpiry) Swap(i, j int) { le[i], le[j] = le[j], le[i] } func (le *lessor) Demote() { le.mu.Lock() diff --git a/lease/lessor_test.go b/lease/lessor_test.go index 93ea91c88..e70c56d6b 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -26,7 +26,6 @@ import ( "time" "github.com/coreos/etcd/mvcc/backend" - "github.com/coreos/etcd/pkg/monotime" ) const ( @@ -211,14 +210,24 @@ func TestLessorRenew(t *testing.T) { } } -// TestLessorRenewRandomize ensures Lessor renews with randomized expiry. -func TestLessorRenewRandomize(t *testing.T) { +// TestLessorRenewExtendPileup ensures Lessor extends leases on promotion if too many +// expire at the same time. +func TestLessorRenewExtendPileup(t *testing.T) { + oldRevokeRate := leaseRevokeRate + defer func() { leaseRevokeRate = oldRevokeRate }() + leaseRevokeRate = 10 + dir, be := NewTestBackend(t) defer os.RemoveAll(dir) le := newLessor(be, minLeaseTTL) - for i := LeaseID(1); i <= 10; i++ { - if _, err := le.Grant(i, 3600); err != nil { + ttl := int64(10) + for i := 1; i <= leaseRevokeRate*10; i++ { + if _, err := le.Grant(LeaseID(2*i), ttl); err != nil { + t.Fatal(err) + } + // ttls that overlap spillover for ttl=10 + if _, err := le.Grant(LeaseID(2*i+1), ttl+1); err != nil { t.Fatal(err) } } @@ -232,16 +241,23 @@ func TestLessorRenewRandomize(t *testing.T) { defer be.Close() le = newLessor(be, minLeaseTTL) - now := monotime.Now() - - // extend after recovery should randomize expiries + // extend after recovery should extend expiration on lease pile-up le.Promote(0) + windowCounts := make(map[int64]int) for _, l := range le.leaseMap { - leftSeconds := uint64(float64(l.expiry-now) * float64(1e-9)) - pc := (float64(leftSeconds-3600) / float64(3600)) * 100 - if pc > 10.0 || pc < -10.0 || pc == 0 { // should be within 士10% - t.Fatalf("expected randomized expiry, got %d seconds (ttl: 3600)", leftSeconds) + // round up slightly for baseline ttl + s := int64(l.Remaining().Seconds() + 0.1) + windowCounts[s]++ + } + + for i := ttl; i < ttl+20; i++ { + c := windowCounts[i] + if c > leaseRevokeRate { + t.Errorf("expected at most %d expiring at %ds, got %d", leaseRevokeRate, i, c) + } + if c < leaseRevokeRate/2 { + t.Errorf("expected at least %d expiring at %ds, got %d", leaseRevokeRate/2, i, c) } } }