2016-05-13 06:51:48 +03:00
// Copyright 2015 The etcd Authors
2015-11-08 22:15:39 +03:00
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package lease
import (
2018-03-10 00:14:14 +03:00
"container/heap"
2016-01-05 09:53:11 +03:00
"encoding/binary"
2016-01-08 04:52:48 +03:00
"errors"
2016-01-05 21:57:59 +03:00
"math"
2016-10-01 02:45:52 +03:00
"sort"
2015-11-08 22:15:39 +03:00
"sync"
"time"
2016-01-05 09:53:11 +03:00
"github.com/coreos/etcd/lease/leasepb"
2016-04-25 22:32:58 +03:00
"github.com/coreos/etcd/mvcc/backend"
2015-11-08 22:15:39 +03:00
)
2017-09-06 15:56:34 +03:00
// NoLease is a special LeaseID representing the absence of a lease.
const NoLease = LeaseID ( 0 )
2016-01-07 13:09:50 +03:00
2018-03-08 03:32:04 +03:00
// MaxLeaseTTL is the maximum lease TTL value
const MaxLeaseTTL = 9000000000
2015-11-08 22:15:39 +03:00
var (
2017-09-06 15:56:34 +03:00
forever = time . Time { }
2016-01-05 09:53:11 +03:00
leaseBucketName = [ ] byte ( "lease" )
2016-11-29 02:02:51 +03:00
2017-06-17 03:51:01 +03:00
// maximum number of leases to revoke per second; configurable for tests
leaseRevokeRate = 1000
2016-01-09 00:17:54 +03:00
2018-03-08 03:32:04 +03:00
ErrNotPrimary = errors . New ( "not a primary lessor" )
ErrLeaseNotFound = errors . New ( "lease not found" )
ErrLeaseExists = errors . New ( "lease already exists" )
ErrLeaseTTLTooLarge = errors . New ( "too large lease TTL" )
2015-11-08 22:15:39 +03:00
)
2017-01-05 09:30:03 +03:00
// TxnDelete is a TxnWrite that only permits deletes. Defined here
// to avoid circular dependency with mvcc.
type TxnDelete interface {
DeleteRange ( key , end [ ] byte ) ( n , rev int64 )
End ( )
2015-11-10 00:33:12 +03:00
}
2017-01-05 09:30:03 +03:00
// RangeDeleter is a TxnDelete constructor.
type RangeDeleter func ( ) TxnDelete
type LeaseID int64
2016-06-17 22:13:01 +03:00
// Lessor owns leases. It can grant, revoke, renew and modify leases for lessee.
2016-01-06 00:49:25 +03:00
type Lessor interface {
2017-01-05 09:30:03 +03:00
// SetRangeDeleter lets the lessor create TxnDeletes to the store.
// Lessor deletes the items in the revoked or expired lease by creating
// new TxnDeletes.
SetRangeDeleter ( rd RangeDeleter )
2016-01-08 21:06:33 +03:00
2016-01-06 00:49:25 +03:00
// Grant grants a lease that expires at least after TTL seconds.
2016-01-20 08:09:09 +03:00
Grant ( id LeaseID , ttl int64 ) ( * Lease , error )
2016-01-06 00:49:25 +03:00
// Revoke revokes a lease with given ID. The item attached to the
// given lease will be removed. If the ID does not exist, an error
// will be returned.
Revoke ( id LeaseID ) error
2016-01-08 04:52:48 +03:00
2016-01-08 21:06:33 +03:00
// Attach attaches given leaseItem to the lease with given LeaseID.
// If the lease does not exist, an error will be returned.
Attach ( id LeaseID , items [ ] LeaseItem ) error
2016-10-12 07:37:09 +03:00
// GetLease returns LeaseID for given item.
// If no lease found, NoLease value will be returned.
GetLease ( item LeaseItem ) LeaseID
2016-02-04 02:03:54 +03:00
// Detach detaches given leaseItem from the lease with given LeaseID.
// If the lease does not exist, an error will be returned.
Detach ( id LeaseID , items [ ] LeaseItem ) error
2016-01-08 04:52:48 +03:00
// Promote promotes the lessor to be the primary lessor. Primary lessor manages
// the expiration and renew of leases.
2016-03-14 06:19:47 +03:00
// Newly promoted lessor renew the TTL of all lease to extend + previous TTL.
Promote ( extend time . Duration )
2016-01-08 04:52:48 +03:00
// Demote demotes the lessor from being the primary lessor.
Demote ( )
2016-01-10 00:26:45 +03:00
// Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist,
// an error will be returned.
Renew ( id LeaseID ) ( int64 , error )
2016-01-09 00:17:54 +03:00
2016-01-23 02:48:06 +03:00
// Lookup gives the lease at a given lease id, if any
Lookup ( id LeaseID ) * Lease
2017-08-14 21:19:09 +03:00
// Leases lists all leases.
Leases ( ) [ ] * Lease
2016-01-12 20:00:57 +03:00
// ExpiredLeasesC returns a chan that is used to receive expired leases.
2016-01-09 00:17:54 +03:00
ExpiredLeasesC ( ) <- chan [ ] * Lease
2016-01-12 20:00:57 +03:00
2016-03-11 04:06:10 +03:00
// Recover recovers the lessor state from the given backend and RangeDeleter.
Recover ( b backend . Backend , rd RangeDeleter )
2016-01-12 20:00:57 +03:00
// Stop stops the lessor for managing leases. The behavior of calling Stop multiple
// times is undefined.
Stop ( )
2016-01-06 00:49:25 +03:00
}
// lessor implements Lessor interface.
2015-11-08 22:15:39 +03:00
// TODO: use clockwork for testability.
type lessor struct {
2018-01-25 11:19:28 +03:00
mu sync . RWMutex
2016-01-08 04:52:48 +03:00
2016-10-03 11:01:56 +03:00
// demotec is set when the lessor is the primary.
// demotec will be closed if the lessor is demoted.
demotec chan struct { }
2016-01-08 04:52:48 +03:00
2018-03-10 00:14:14 +03:00
leaseMap map [ LeaseID ] * Lease
leaseHeap LeaseQueue
itemMap map [ LeaseItem ] LeaseID
2016-10-12 07:37:09 +03:00
2015-11-10 00:33:12 +03:00
// When a lease expires, the lessor will delete the
2016-01-09 03:20:48 +03:00
// leased range (or key) by the RangeDeleter.
rd RangeDeleter
2015-11-10 00:33:12 +03:00
2016-01-05 09:53:11 +03:00
// backend to persist leases. We only persist lease ID and expiry for now.
// The leased items can be recovered by iterating all the keys in kv.
b backend . Backend
2016-08-02 20:29:23 +03:00
// minLeaseTTL is the minimum lease TTL that can be granted for a lease. Any
// requests for shorter TTLs are extended to the minimum TTL.
minLeaseTTL int64
2016-01-09 00:17:54 +03:00
expiredC chan [ ] * Lease
2016-01-12 20:00:57 +03:00
// stopC is a channel whose closure indicates that the lessor should be stopped.
stopC chan struct { }
// doneC is a channel whose closure indicates that the lessor is stopped.
doneC chan struct { }
2015-11-08 22:15:39 +03:00
}
2016-08-02 20:29:23 +03:00
func NewLessor ( b backend . Backend , minLeaseTTL int64 ) Lessor {
return newLessor ( b , minLeaseTTL )
2016-01-06 00:49:25 +03:00
}
2016-08-02 20:29:23 +03:00
func newLessor ( b backend . Backend , minLeaseTTL int64 ) * lessor {
2016-01-05 09:53:11 +03:00
l := & lessor {
2016-08-02 20:29:23 +03:00
leaseMap : make ( map [ LeaseID ] * Lease ) ,
2016-10-12 07:37:09 +03:00
itemMap : make ( map [ LeaseItem ] LeaseID ) ,
2018-03-10 00:14:14 +03:00
leaseHeap : make ( LeaseQueue , 0 ) ,
2016-08-02 20:29:23 +03:00
b : b ,
minLeaseTTL : minLeaseTTL ,
2016-01-14 12:28:29 +03:00
// expiredC is a small buffered chan to avoid unnecessary blocking.
2016-01-09 00:17:54 +03:00
expiredC : make ( chan [ ] * Lease , 16 ) ,
2016-01-12 20:00:57 +03:00
stopC : make ( chan struct { } ) ,
doneC : make ( chan struct { } ) ,
2015-11-08 22:15:39 +03:00
}
2016-01-05 21:57:59 +03:00
l . initAndRecover ( )
2016-01-05 09:53:11 +03:00
2016-01-08 04:52:48 +03:00
go l . runLoop ( )
2016-01-05 09:53:11 +03:00
return l
2015-11-08 22:15:39 +03:00
}
2016-10-03 11:01:56 +03:00
// isPrimary indicates if this lessor is the primary lessor. The primary
// lessor manages lease expiration and renew.
//
// in etcd, raft leader is the primary. Thus there might be two primary
// leaders at the same time (raft allows concurrent leader but with different term)
// for at most a leader election timeout.
// The old primary leader cannot affect the correctness since its proposal has a
// smaller term and will not be committed.
//
// TODO: raft follower do not forward lease management proposals. There might be a
// very small window (within second normally which depends on go scheduling) that
// a raft follow is the primary between the raft leader demotion and lessor demotion.
// Usually this should not be a problem. Lease should not be that sensitive to timing.
func ( le * lessor ) isPrimary ( ) bool {
return le . demotec != nil
}
2016-01-09 03:20:48 +03:00
func ( le * lessor ) SetRangeDeleter ( rd RangeDeleter ) {
2016-01-08 21:06:33 +03:00
le . mu . Lock ( )
defer le . mu . Unlock ( )
2016-01-09 03:20:48 +03:00
le . rd = rd
2016-01-08 21:06:33 +03:00
}
2016-01-20 08:09:09 +03:00
func ( le * lessor ) Grant ( id LeaseID , ttl int64 ) ( * Lease , error ) {
if id == NoLease {
return nil , ErrLeaseNotFound
}
2018-03-08 03:32:04 +03:00
if ttl > MaxLeaseTTL {
return nil , ErrLeaseTTLTooLarge
}
2016-06-17 22:13:01 +03:00
// TODO: when lessor is under high load, it should give out lease
// with longer TTL to reduce renew load.
2016-10-03 11:01:56 +03:00
l := & Lease {
ID : id ,
2016-10-06 20:31:36 +03:00
ttl : ttl ,
2016-10-03 11:01:56 +03:00
itemSet : make ( map [ LeaseItem ] struct { } ) ,
revokec : make ( chan struct { } ) ,
}
2015-11-08 22:15:39 +03:00
le . mu . Lock ( )
defer le . mu . Unlock ( )
2016-01-20 08:09:09 +03:00
if _ , ok := le . leaseMap [ id ] ; ok {
return nil , ErrLeaseExists
}
2016-01-09 01:29:33 +03:00
2016-10-06 20:31:36 +03:00
if l . ttl < le . minLeaseTTL {
l . ttl = le . minLeaseTTL
2016-08-02 20:29:23 +03:00
}
2016-10-03 11:01:56 +03:00
if le . isPrimary ( ) {
2016-03-14 06:19:47 +03:00
l . refresh ( 0 )
2016-01-09 01:29:33 +03:00
} else {
l . forever ( )
}
2015-11-08 22:15:39 +03:00
le . leaseMap [ id ] = l
2018-04-03 00:58:34 +03:00
item := & LeaseWithTime { id : l . ID , expiration : l . expiry . UnixNano ( ) }
2018-03-10 00:14:14 +03:00
heap . Push ( & le . leaseHeap , item )
2016-01-05 09:53:11 +03:00
l . persistTo ( le . b )
2018-05-24 07:34:48 +03:00
leaseTotalTTLs . Observe ( float64 ( l . ttl ) )
leaseGranted . Inc ( )
2016-01-20 08:09:09 +03:00
return l , nil
2015-11-08 22:15:39 +03:00
}
2016-01-05 21:16:50 +03:00
func ( le * lessor ) Revoke ( id LeaseID ) error {
2015-11-08 22:15:39 +03:00
le . mu . Lock ( )
l := le . leaseMap [ id ]
if l == nil {
2016-02-04 02:03:54 +03:00
le . mu . Unlock ( )
2016-01-08 21:06:33 +03:00
return ErrLeaseNotFound
2015-11-08 22:15:39 +03:00
}
2016-10-03 11:01:56 +03:00
defer close ( l . revokec )
2016-02-04 02:03:54 +03:00
// unlock before doing external work
le . mu . Unlock ( )
2015-11-08 22:15:39 +03:00
2016-08-05 06:39:32 +03:00
if le . rd == nil {
return nil
}
2016-08-04 18:06:33 +03:00
2017-01-05 09:30:03 +03:00
txn := le . rd ( )
2016-10-01 02:45:52 +03:00
// sort keys so deletes are in same order among all members,
// otherwise the backened hashes will be different
2017-03-08 21:18:19 +03:00
keys := l . Keys ( )
2016-10-01 02:45:52 +03:00
sort . StringSlice ( keys ) . Sort ( )
for _ , key := range keys {
2017-01-05 09:30:03 +03:00
txn . DeleteRange ( [ ] byte ( key ) , nil )
2015-11-10 00:33:12 +03:00
}
2016-08-05 06:39:32 +03:00
le . mu . Lock ( )
defer le . mu . Unlock ( )
delete ( le . leaseMap , l . ID )
// lease deletion needs to be in the same backend transaction with the
// kv deletion. Or we might end up with not executing the revoke or not
// deleting the keys if etcdserver fails in between.
le . b . BatchTx ( ) . UnsafeDelete ( leaseBucketName , int64ToBytes ( int64 ( l . ID ) ) )
2017-01-05 09:30:03 +03:00
txn . End ( )
2018-05-24 07:34:48 +03:00
leaseRevoked . Inc ( )
2015-11-08 22:15:39 +03:00
return nil
}
2016-01-05 09:53:11 +03:00
// Renew renews an existing lease. If the given lease does not exist or
// has expired, an error will be returned.
2016-01-10 00:26:45 +03:00
func ( le * lessor ) Renew ( id LeaseID ) ( int64 , error ) {
2015-11-08 22:15:39 +03:00
le . mu . Lock ( )
2016-10-03 11:01:56 +03:00
unlock := func ( ) { le . mu . Unlock ( ) }
defer func ( ) { unlock ( ) } ( )
if ! le . isPrimary ( ) {
2016-01-10 00:26:45 +03:00
// forward renew request to primary instead of returning error.
return - 1 , ErrNotPrimary
2016-01-08 04:52:48 +03:00
}
2016-10-03 11:01:56 +03:00
demotec := le . demotec
2015-11-08 22:15:39 +03:00
l := le . leaseMap [ id ]
if l == nil {
2016-01-10 00:26:45 +03:00
return - 1 , ErrLeaseNotFound
2015-11-08 22:15:39 +03:00
}
2016-10-03 11:01:56 +03:00
if l . expired ( ) {
le . mu . Unlock ( )
unlock = func ( ) { }
select {
// A expired lease might be pending for revoking or going through
// quorum to be revoked. To be accurate, renew request must wait for the
// deletion to complete.
case <- l . revokec :
return - 1 , ErrLeaseNotFound
// The expired lease might fail to be revoked if the primary changes.
// The caller will retry on ErrNotPrimary.
case <- demotec :
return - 1 , ErrNotPrimary
case <- le . stopC :
return - 1 , ErrNotPrimary
}
}
2016-03-14 06:19:47 +03:00
l . refresh ( 0 )
2018-04-03 00:58:34 +03:00
item := & LeaseWithTime { id : l . ID , expiration : l . expiry . UnixNano ( ) }
2018-03-10 00:14:14 +03:00
heap . Push ( & le . leaseHeap , item )
2018-05-24 07:34:48 +03:00
leaseRenewed . Inc ( )
2016-10-06 20:31:36 +03:00
return l . ttl , nil
2015-11-08 22:15:39 +03:00
}
2016-01-23 02:48:06 +03:00
func ( le * lessor ) Lookup ( id LeaseID ) * Lease {
2018-01-25 11:19:28 +03:00
le . mu . RLock ( )
defer le . mu . RUnlock ( )
2016-09-30 20:09:28 +03:00
return le . leaseMap [ id ]
2016-01-23 02:48:06 +03:00
}
2017-08-14 21:19:09 +03:00
func ( le * lessor ) unsafeLeases ( ) [ ] * Lease {
leases := make ( [ ] * Lease , 0 , len ( le . leaseMap ) )
for _ , l := range le . leaseMap {
leases = append ( leases , l )
}
return leases
}
func ( le * lessor ) Leases ( ) [ ] * Lease {
2018-01-25 11:19:28 +03:00
le . mu . RLock ( )
2017-08-14 21:19:09 +03:00
ls := le . unsafeLeases ( )
2018-01-25 11:19:28 +03:00
le . mu . RUnlock ( )
2018-06-06 18:05:23 +03:00
sort . Sort ( leasesByExpiry ( ls ) )
2017-08-14 21:19:09 +03:00
return ls
}
2016-03-14 06:19:47 +03:00
func ( le * lessor ) Promote ( extend time . Duration ) {
2016-01-08 04:52:48 +03:00
le . mu . Lock ( )
defer le . mu . Unlock ( )
2016-10-03 11:01:56 +03:00
le . demotec = make ( chan struct { } )
2016-01-09 00:17:54 +03:00
// refresh the expiries of all leases.
for _ , l := range le . leaseMap {
2017-06-17 03:51:01 +03:00
l . refresh ( extend )
2018-04-03 00:58:34 +03:00
item := & LeaseWithTime { id : l . ID , expiration : l . expiry . UnixNano ( ) }
2018-03-10 00:14:14 +03:00
heap . Push ( & le . leaseHeap , item )
2016-01-09 00:17:54 +03:00
}
2016-01-08 04:52:48 +03:00
2017-06-17 03:51:01 +03:00
if len ( le . leaseMap ) < leaseRevokeRate {
// no possibility of lease pile-up
return
}
// adjust expiries in case of overlap
2017-08-14 21:19:09 +03:00
leases := le . unsafeLeases ( )
2018-06-06 18:05:23 +03:00
sort . Sort ( leasesByExpiry ( leases ) )
2017-06-17 03:51:01 +03:00
baseWindow := leases [ 0 ] . Remaining ( )
nextWindow := baseWindow + time . Second
expires := 0
// have fewer expires than the total revoke rate so piled up leases
// don't consume the entire revoke limit
targetExpiresPerSecond := ( 3 * leaseRevokeRate ) / 4
for _ , l := range leases {
remaining := l . Remaining ( )
if remaining > nextWindow {
baseWindow = remaining
nextWindow = baseWindow + time . Second
expires = 1
continue
}
expires ++
if expires <= targetExpiresPerSecond {
continue
}
rateDelay := float64 ( time . Second ) * ( float64 ( expires ) / float64 ( targetExpiresPerSecond ) )
// If leases are extended by n seconds, leases n seconds ahead of the
// base window should be extended by only one second.
rateDelay -= float64 ( remaining - baseWindow )
delay := time . Duration ( rateDelay )
nextWindow = baseWindow + delay
l . refresh ( delay + extend )
2018-04-03 00:58:34 +03:00
item := & LeaseWithTime { id : l . ID , expiration : l . expiry . UnixNano ( ) }
2018-03-10 00:14:14 +03:00
heap . Push ( & le . leaseHeap , item )
2017-06-15 02:08:16 +03:00
}
}
2017-06-17 03:51:01 +03:00
type leasesByExpiry [ ] * Lease
func ( le leasesByExpiry ) Len ( ) int { return len ( le ) }
func ( le leasesByExpiry ) Less ( i , j int ) bool { return le [ i ] . Remaining ( ) < le [ j ] . Remaining ( ) }
func ( le leasesByExpiry ) Swap ( i , j int ) { le [ i ] , le [ j ] = le [ j ] , le [ i ] }
2016-01-08 04:52:48 +03:00
func ( le * lessor ) Demote ( ) {
le . mu . Lock ( )
defer le . mu . Unlock ( )
2016-01-09 00:17:54 +03:00
// set the expiries of all leases to forever
for _ , l := range le . leaseMap {
2016-01-09 01:29:33 +03:00
l . forever ( )
2016-01-09 00:17:54 +03:00
}
2016-10-03 11:01:56 +03:00
if le . demotec != nil {
close ( le . demotec )
le . demotec = nil
}
2016-01-08 04:52:48 +03:00
}
2015-11-08 22:15:39 +03:00
// Attach attaches items to the lease with given ID. When the lease
// expires, the attached items will be automatically removed.
// If the given lease does not exist, an error will be returned.
2016-01-08 21:06:33 +03:00
func ( le * lessor ) Attach ( id LeaseID , items [ ] LeaseItem ) error {
2015-11-08 22:15:39 +03:00
le . mu . Lock ( )
defer le . mu . Unlock ( )
l := le . leaseMap [ id ]
if l == nil {
2016-01-08 21:06:33 +03:00
return ErrLeaseNotFound
2015-11-08 22:15:39 +03:00
}
2017-03-08 21:18:19 +03:00
l . mu . Lock ( )
2015-11-08 22:15:39 +03:00
for _ , it := range items {
l . itemSet [ it ] = struct { } { }
2016-10-12 07:37:09 +03:00
le . itemMap [ it ] = id
2015-11-08 22:15:39 +03:00
}
2017-03-08 21:18:19 +03:00
l . mu . Unlock ( )
2015-11-08 22:15:39 +03:00
return nil
}
2016-10-12 07:37:09 +03:00
func ( le * lessor ) GetLease ( item LeaseItem ) LeaseID {
2018-01-25 11:19:28 +03:00
le . mu . RLock ( )
2016-10-12 07:37:09 +03:00
id := le . itemMap [ item ]
2018-01-25 11:19:28 +03:00
le . mu . RUnlock ( )
2016-10-12 07:37:09 +03:00
return id
}
2016-02-04 02:03:54 +03:00
// Detach detaches items from the lease with given ID.
// If the given lease does not exist, an error will be returned.
func ( le * lessor ) Detach ( id LeaseID , items [ ] LeaseItem ) error {
le . mu . Lock ( )
defer le . mu . Unlock ( )
l := le . leaseMap [ id ]
if l == nil {
return ErrLeaseNotFound
}
2017-03-08 21:18:19 +03:00
l . mu . Lock ( )
2016-02-04 02:03:54 +03:00
for _ , it := range items {
delete ( l . itemSet , it )
2016-10-12 07:37:09 +03:00
delete ( le . itemMap , it )
2016-02-04 02:03:54 +03:00
}
2017-03-08 21:18:19 +03:00
l . mu . Unlock ( )
2016-02-04 02:03:54 +03:00
return nil
}
2016-01-09 03:20:48 +03:00
func ( le * lessor ) Recover ( b backend . Backend , rd RangeDeleter ) {
2016-01-06 00:49:25 +03:00
le . mu . Lock ( )
defer le . mu . Unlock ( )
le . b = b
2016-01-09 03:20:48 +03:00
le . rd = rd
2016-01-06 00:49:25 +03:00
le . leaseMap = make ( map [ LeaseID ] * Lease )
2016-10-12 07:37:09 +03:00
le . itemMap = make ( map [ LeaseItem ] LeaseID )
2016-01-06 00:49:25 +03:00
le . initAndRecover ( )
}
2016-01-09 00:17:54 +03:00
func ( le * lessor ) ExpiredLeasesC ( ) <- chan [ ] * Lease {
return le . expiredC
}
2016-01-12 20:00:57 +03:00
func ( le * lessor ) Stop ( ) {
close ( le . stopC )
<- le . doneC
}
2016-01-08 04:52:48 +03:00
func ( le * lessor ) runLoop ( ) {
2016-01-12 20:00:57 +03:00
defer close ( le . doneC )
2016-01-08 04:52:48 +03:00
for {
2016-01-09 00:17:54 +03:00
var ls [ ] * Lease
2017-09-03 17:49:53 +03:00
// rate limit
revokeLimit := leaseRevokeRate / 2
2018-01-25 11:19:28 +03:00
le . mu . RLock ( )
2016-10-03 11:01:56 +03:00
if le . isPrimary ( ) {
2017-09-03 17:49:53 +03:00
ls = le . findExpiredLeases ( revokeLimit )
2016-01-08 04:52:48 +03:00
}
2018-01-25 11:19:28 +03:00
le . mu . RUnlock ( )
2016-01-09 00:17:54 +03:00
if len ( ls ) != 0 {
select {
2016-01-12 20:00:57 +03:00
case <- le . stopC :
return
2016-01-09 00:17:54 +03:00
case le . expiredC <- ls :
default :
// the receiver of expiredC is probably busy handling
// other stuff
// let's try this next time after 500ms
}
}
2016-01-12 20:00:57 +03:00
select {
case <- time . After ( 500 * time . Millisecond ) :
case <- le . stopC :
return
}
2016-01-08 04:52:48 +03:00
}
}
2018-04-03 02:27:16 +03:00
// expireExists returns true if expiry items exist.
2018-04-03 02:31:58 +03:00
// It pops only when expiry item exists.
2018-04-03 02:27:16 +03:00
// "next" is true, to indicate that it may exist in next attempt.
func ( le * lessor ) expireExists ( ) ( l * Lease , ok bool , next bool ) {
if le . leaseHeap . Len ( ) == 0 {
return nil , false , false
}
2018-04-03 02:31:58 +03:00
item := le . leaseHeap [ 0 ]
2018-04-03 02:27:16 +03:00
l = le . leaseMap [ item . id ]
if l == nil {
// lease has expired or been revoked
// no need to revoke (nothing is expiry)
2018-04-03 02:31:58 +03:00
heap . Pop ( & le . leaseHeap ) // O(log N)
2018-04-03 02:27:16 +03:00
return nil , false , true
}
if time . Now ( ) . UnixNano ( ) < item . expiration {
// Candidate expirations are caught up, reinsert this item
// and no need to revoke (nothing is expiry)
return l , false , false
}
// if the lease is actually expired, add to the removal list. If it is not expired, we can ignore it because another entry will have been inserted into the heap
2018-04-03 02:31:58 +03:00
heap . Pop ( & le . leaseHeap ) // O(log N)
2018-04-03 02:27:16 +03:00
return l , true , false
}
2017-09-03 17:49:53 +03:00
// findExpiredLeases loops leases in the leaseMap until reaching expired limit
// and returns the expired leases that needed to be revoked.
func ( le * lessor ) findExpiredLeases ( limit int ) [ ] * Lease {
2016-01-06 00:49:25 +03:00
leases := make ( [ ] * Lease , 0 , 16 )
2015-11-08 22:15:39 +03:00
2018-03-10 00:14:14 +03:00
for {
2018-04-03 02:27:16 +03:00
l , ok , next := le . expireExists ( )
if ! ok && ! next {
2018-03-10 00:14:14 +03:00
break
}
2018-04-03 02:27:16 +03:00
if ! ok {
2018-03-10 00:14:14 +03:00
continue
}
2018-04-03 02:27:16 +03:00
if next {
continue
2018-03-10 00:14:14 +03:00
}
2016-10-03 11:01:56 +03:00
if l . expired ( ) {
2015-11-08 22:15:39 +03:00
leases = append ( leases , l )
2017-09-03 17:49:53 +03:00
// reach expired limit
if len ( leases ) == limit {
break
}
2015-11-08 22:15:39 +03:00
}
}
return leases
}
2016-01-05 21:57:59 +03:00
func ( le * lessor ) initAndRecover ( ) {
tx := le . b . BatchTx ( )
tx . Lock ( )
tx . UnsafeCreateBucket ( leaseBucketName )
_ , vs := tx . UnsafeRange ( leaseBucketName , int64ToBytes ( 0 ) , int64ToBytes ( math . MaxInt64 ) , 0 )
// TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue.
for i := range vs {
var lpb leasepb . Lease
err := lpb . Unmarshal ( vs [ i ] )
if err != nil {
2016-01-07 21:20:58 +03:00
tx . Unlock ( )
2016-01-05 21:57:59 +03:00
panic ( "failed to unmarshal lease proto item" )
}
2016-01-06 00:49:25 +03:00
ID := LeaseID ( lpb . ID )
2016-08-02 20:29:23 +03:00
if lpb . TTL < le . minLeaseTTL {
lpb . TTL = le . minLeaseTTL
}
2016-01-06 00:49:25 +03:00
le . leaseMap [ ID ] = & Lease {
ID : ID ,
2016-10-06 20:31:36 +03:00
ttl : lpb . TTL ,
2016-01-05 21:57:59 +03:00
// itemSet will be filled in when recover key-value pairs
2016-01-09 00:17:54 +03:00
// set expiry to forever, refresh when promoted
2016-01-25 05:00:31 +03:00
itemSet : make ( map [ LeaseItem ] struct { } ) ,
expiry : forever ,
2016-10-03 11:01:56 +03:00
revokec : make ( chan struct { } ) ,
2016-01-05 21:57:59 +03:00
}
}
2018-03-10 00:14:14 +03:00
heap . Init ( & le . leaseHeap )
2016-01-07 21:20:58 +03:00
tx . Unlock ( )
2016-01-05 21:57:59 +03:00
le . b . ForceCommit ( )
}
2016-01-06 00:49:25 +03:00
type Lease struct {
ID LeaseID
2016-10-06 20:31:36 +03:00
ttl int64 // time to live in seconds
2017-09-06 15:56:34 +03:00
// expiryMu protects concurrent accesses to expiry
expiryMu sync . RWMutex
// expiry is time when lease should expire. no expiration when expiry.IsZero() is true
expiry time . Time
2015-11-08 22:15:39 +03:00
2017-03-08 21:18:19 +03:00
// mu protects concurrent accesses to itemSet
mu sync . RWMutex
2016-01-08 21:06:33 +03:00
itemSet map [ LeaseItem ] struct { }
2016-10-03 11:01:56 +03:00
revokec chan struct { }
}
2016-10-06 20:31:36 +03:00
func ( l * Lease ) expired ( ) bool {
2016-10-03 11:01:56 +03:00
return l . Remaining ( ) <= 0
2015-11-08 22:15:39 +03:00
}
2016-10-06 20:31:36 +03:00
func ( l * Lease ) persistTo ( b backend . Backend ) {
2016-01-06 00:49:25 +03:00
key := int64ToBytes ( int64 ( l . ID ) )
2016-01-05 09:53:11 +03:00
2018-04-30 23:59:51 +03:00
lpb := leasepb . Lease { ID : int64 ( l . ID ) , TTL : l . ttl }
2016-01-05 09:53:11 +03:00
val , err := lpb . Marshal ( )
if err != nil {
panic ( "failed to marshal lease proto item" )
}
b . BatchTx ( ) . Lock ( )
b . BatchTx ( ) . UnsafePut ( leaseBucketName , key , val )
b . BatchTx ( ) . Unlock ( )
}
2016-10-06 20:31:36 +03:00
// TTL returns the TTL of the Lease.
func ( l * Lease ) TTL ( ) int64 {
return l . ttl
}
2016-08-02 20:29:23 +03:00
// refresh refreshes the expiry of the lease.
2016-03-14 06:19:47 +03:00
func ( l * Lease ) refresh ( extend time . Duration ) {
2017-09-06 15:56:34 +03:00
newExpiry := time . Now ( ) . Add ( extend + time . Duration ( l . ttl ) * time . Second )
l . expiryMu . Lock ( )
defer l . expiryMu . Unlock ( )
l . expiry = newExpiry
2015-11-08 22:15:39 +03:00
}
2016-01-09 01:29:33 +03:00
// forever sets the expiry of lease to be forever.
2017-09-06 15:56:34 +03:00
func ( l * Lease ) forever ( ) {
l . expiryMu . Lock ( )
defer l . expiryMu . Unlock ( )
l . expiry = forever
}
2016-01-09 01:29:33 +03:00
2016-09-09 02:11:46 +03:00
// Keys returns all the keys attached to the lease.
func ( l * Lease ) Keys ( ) [ ] string {
2017-03-08 21:18:19 +03:00
l . mu . RLock ( )
2016-09-09 02:11:46 +03:00
keys := make ( [ ] string , 0 , len ( l . itemSet ) )
for k := range l . itemSet {
keys = append ( keys , k . Key )
}
2017-03-08 21:18:19 +03:00
l . mu . RUnlock ( )
2016-09-09 02:11:46 +03:00
return keys
}
// Remaining returns the remaining time of the lease.
func ( l * Lease ) Remaining ( ) time . Duration {
2017-09-06 15:56:34 +03:00
l . expiryMu . RLock ( )
defer l . expiryMu . RUnlock ( )
if l . expiry . IsZero ( ) {
return time . Duration ( math . MaxInt64 )
}
2017-09-08 04:41:36 +03:00
return time . Until ( l . expiry )
2016-09-09 02:11:46 +03:00
}
2016-01-08 21:06:33 +03:00
type LeaseItem struct {
Key string
2015-11-08 22:15:39 +03:00
}
2016-01-05 09:53:11 +03:00
func int64ToBytes ( n int64 ) [ ] byte {
bytes := make ( [ ] byte , 8 )
binary . BigEndian . PutUint64 ( bytes , uint64 ( n ) )
return bytes
}
2016-01-08 21:06:33 +03:00
// FakeLessor is a fake implementation of Lessor interface.
// Used for testing only.
2016-03-11 04:06:10 +03:00
type FakeLessor struct { }
2016-01-08 21:06:33 +03:00
func ( fl * FakeLessor ) SetRangeDeleter ( dr RangeDeleter ) { }
2016-01-20 08:09:09 +03:00
func ( fl * FakeLessor ) Grant ( id LeaseID , ttl int64 ) ( * Lease , error ) { return nil , nil }
2016-01-08 21:06:33 +03:00
func ( fl * FakeLessor ) Revoke ( id LeaseID ) error { return nil }
func ( fl * FakeLessor ) Attach ( id LeaseID , items [ ] LeaseItem ) error { return nil }
2016-10-12 07:37:09 +03:00
func ( fl * FakeLessor ) GetLease ( item LeaseItem ) LeaseID { return 0 }
2016-02-04 02:03:54 +03:00
func ( fl * FakeLessor ) Detach ( id LeaseID , items [ ] LeaseItem ) error { return nil }
2016-03-14 06:19:47 +03:00
func ( fl * FakeLessor ) Promote ( extend time . Duration ) { }
2016-01-08 21:06:33 +03:00
func ( fl * FakeLessor ) Demote ( ) { }
2016-01-10 00:26:45 +03:00
func ( fl * FakeLessor ) Renew ( id LeaseID ) ( int64 , error ) { return 10 , nil }
2016-01-08 21:06:33 +03:00
2017-09-03 09:46:40 +03:00
func ( fl * FakeLessor ) Lookup ( id LeaseID ) * Lease { return nil }
2016-01-23 02:48:06 +03:00
2017-09-03 09:46:40 +03:00
func ( fl * FakeLessor ) Leases ( ) [ ] * Lease { return nil }
2017-08-14 21:19:09 +03:00
2016-01-08 21:06:33 +03:00
func ( fl * FakeLessor ) ExpiredLeasesC ( ) <- chan [ ] * Lease { return nil }
2016-01-12 20:00:57 +03:00
2016-03-11 04:06:10 +03:00
func ( fl * FakeLessor ) Recover ( b backend . Backend , rd RangeDeleter ) { }
2016-01-12 20:00:57 +03:00
func ( fl * FakeLessor ) Stop ( ) { }