lessor: extend leases on promote if expires will be rate limited
Instead of unconditionally randomizing, extend leases on promotion if too many leases expire within the same time span. If the server has few leases or spread out expires, there will be no extension.release-3.3
parent
310a09691f
commit
c38c00f7c3
|
@ -18,7 +18,6 @@ import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
@ -33,15 +32,14 @@ const (
|
||||||
// NoLease is a special LeaseID representing the absence of a lease.
|
// NoLease is a special LeaseID representing the absence of a lease.
|
||||||
NoLease = LeaseID(0)
|
NoLease = LeaseID(0)
|
||||||
|
|
||||||
// maximum number of leases to revoke per iteration
|
forever = monotime.Time(math.MaxInt64)
|
||||||
// TODO: make this configurable?
|
|
||||||
leaseRevokeRate = 1000
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
leaseBucketName = []byte("lease")
|
leaseBucketName = []byte("lease")
|
||||||
|
|
||||||
forever = monotime.Time(math.MaxInt64)
|
// maximum number of leases to revoke per second; configurable for tests
|
||||||
|
leaseRevokeRate = 1000
|
||||||
|
|
||||||
ErrNotPrimary = errors.New("not a primary lessor")
|
ErrNotPrimary = errors.New("not a primary lessor")
|
||||||
ErrLeaseNotFound = errors.New("lease not found")
|
ErrLeaseNotFound = errors.New("lease not found")
|
||||||
|
@ -327,21 +325,54 @@ func (le *lessor) Promote(extend time.Duration) {
|
||||||
|
|
||||||
// refresh the expiries of all leases.
|
// refresh the expiries of all leases.
|
||||||
for _, l := range le.leaseMap {
|
for _, l := range le.leaseMap {
|
||||||
// randomize expiry with 士10%, otherwise leases of same TTL
|
l.refresh(extend)
|
||||||
// will expire all at the same time,
|
}
|
||||||
l.refresh(extend + computeRandomDelta(l.ttl))
|
|
||||||
|
if len(le.leaseMap) < leaseRevokeRate {
|
||||||
|
// no possibility of lease pile-up
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// adjust expiries in case of overlap
|
||||||
|
leases := make([]*Lease, 0, len(le.leaseMap))
|
||||||
|
for _, l := range le.leaseMap {
|
||||||
|
leases = append(leases, l)
|
||||||
|
}
|
||||||
|
sort.Sort(leasesByExpiry(leases))
|
||||||
|
|
||||||
|
baseWindow := leases[0].Remaining()
|
||||||
|
nextWindow := baseWindow + time.Second
|
||||||
|
expires := 0
|
||||||
|
// have fewer expires than the total revoke rate so piled up leases
|
||||||
|
// don't consume the entire revoke limit
|
||||||
|
targetExpiresPerSecond := (3 * leaseRevokeRate) / 4
|
||||||
|
for _, l := range leases {
|
||||||
|
remaining := l.Remaining()
|
||||||
|
if remaining > nextWindow {
|
||||||
|
baseWindow = remaining
|
||||||
|
nextWindow = baseWindow + time.Second
|
||||||
|
expires = 1
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
expires++
|
||||||
|
if expires <= targetExpiresPerSecond {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond))
|
||||||
|
// If leases are extended by n seconds, leases n seconds ahead of the
|
||||||
|
// base window should be extended by only one second.
|
||||||
|
rateDelay -= float64(remaining - baseWindow)
|
||||||
|
delay := time.Duration(rateDelay)
|
||||||
|
nextWindow = baseWindow + delay
|
||||||
|
l.refresh(delay + extend)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func computeRandomDelta(seconds int64) time.Duration {
|
type leasesByExpiry []*Lease
|
||||||
var delta int64
|
|
||||||
if seconds > 10 {
|
func (le leasesByExpiry) Len() int { return len(le) }
|
||||||
delta = int64(float64(seconds) * 0.1 * rand.Float64())
|
func (le leasesByExpiry) Less(i, j int) bool { return le[i].Remaining() < le[j].Remaining() }
|
||||||
} else {
|
func (le leasesByExpiry) Swap(i, j int) { le[i], le[j] = le[j], le[i] }
|
||||||
delta = rand.Int63n(10)
|
|
||||||
}
|
|
||||||
return time.Duration(delta) * time.Second
|
|
||||||
}
|
|
||||||
|
|
||||||
func (le *lessor) Demote() {
|
func (le *lessor) Demote() {
|
||||||
le.mu.Lock()
|
le.mu.Lock()
|
||||||
|
|
|
@ -26,7 +26,6 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/mvcc/backend"
|
"github.com/coreos/etcd/mvcc/backend"
|
||||||
"github.com/coreos/etcd/pkg/monotime"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -211,14 +210,24 @@ func TestLessorRenew(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestLessorRenewRandomize ensures Lessor renews with randomized expiry.
|
// TestLessorRenewExtendPileup ensures Lessor extends leases on promotion if too many
|
||||||
func TestLessorRenewRandomize(t *testing.T) {
|
// expire at the same time.
|
||||||
|
func TestLessorRenewExtendPileup(t *testing.T) {
|
||||||
|
oldRevokeRate := leaseRevokeRate
|
||||||
|
defer func() { leaseRevokeRate = oldRevokeRate }()
|
||||||
|
leaseRevokeRate = 10
|
||||||
|
|
||||||
dir, be := NewTestBackend(t)
|
dir, be := NewTestBackend(t)
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
le := newLessor(be, minLeaseTTL)
|
le := newLessor(be, minLeaseTTL)
|
||||||
for i := LeaseID(1); i <= 10; i++ {
|
ttl := int64(10)
|
||||||
if _, err := le.Grant(i, 3600); err != nil {
|
for i := 1; i <= leaseRevokeRate*10; i++ {
|
||||||
|
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
// ttls that overlap spillover for ttl=10
|
||||||
|
if _, err := le.Grant(LeaseID(2*i+1), ttl+1); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -232,16 +241,23 @@ func TestLessorRenewRandomize(t *testing.T) {
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
le = newLessor(be, minLeaseTTL)
|
le = newLessor(be, minLeaseTTL)
|
||||||
|
|
||||||
now := monotime.Now()
|
// extend after recovery should extend expiration on lease pile-up
|
||||||
|
|
||||||
// extend after recovery should randomize expiries
|
|
||||||
le.Promote(0)
|
le.Promote(0)
|
||||||
|
|
||||||
|
windowCounts := make(map[int64]int)
|
||||||
for _, l := range le.leaseMap {
|
for _, l := range le.leaseMap {
|
||||||
leftSeconds := uint64(float64(l.expiry-now) * float64(1e-9))
|
// round up slightly for baseline ttl
|
||||||
pc := (float64(leftSeconds-3600) / float64(3600)) * 100
|
s := int64(l.Remaining().Seconds() + 0.1)
|
||||||
if pc > 10.0 || pc < -10.0 || pc == 0 { // should be within 士10%
|
windowCounts[s]++
|
||||||
t.Fatalf("expected randomized expiry, got %d seconds (ttl: 3600)", leftSeconds)
|
}
|
||||||
|
|
||||||
|
for i := ttl; i < ttl+20; i++ {
|
||||||
|
c := windowCounts[i]
|
||||||
|
if c > leaseRevokeRate {
|
||||||
|
t.Errorf("expected at most %d expiring at %ds, got %d", leaseRevokeRate, i, c)
|
||||||
|
}
|
||||||
|
if c < leaseRevokeRate/2 {
|
||||||
|
t.Errorf("expected at least %d expiring at %ds, got %d", leaseRevokeRate/2, i, c)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue