2016-05-13 06:51:48 +03:00
// Copyright 2015 The etcd Authors
2015-11-08 22:15:39 +03:00
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package lease
import (
2018-03-10 00:14:14 +03:00
"container/heap"
2018-07-17 23:17:15 +03:00
"context"
2016-01-05 09:53:11 +03:00
"encoding/binary"
2016-01-08 04:52:48 +03:00
"errors"
2016-01-05 21:57:59 +03:00
"math"
2016-10-01 02:45:52 +03:00
"sort"
2015-11-08 22:15:39 +03:00
"sync"
"time"
2018-08-29 03:13:25 +03:00
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/lease/leasepb"
"go.etcd.io/etcd/mvcc/backend"
2018-07-17 23:10:34 +03:00
"go.uber.org/zap"
2015-11-08 22:15:39 +03:00
)
2017-09-06 15:56:34 +03:00
// NoLease is a special LeaseID representing the absence of a lease.
const NoLease = LeaseID ( 0 )
2016-01-07 13:09:50 +03:00
2018-03-08 03:32:04 +03:00
// MaxLeaseTTL is the maximum lease TTL value
const MaxLeaseTTL = 9000000000
2015-11-08 22:15:39 +03:00
var (
2017-09-06 15:56:34 +03:00
forever = time . Time { }
2016-01-05 09:53:11 +03:00
leaseBucketName = [ ] byte ( "lease" )
2016-11-29 02:02:51 +03:00
2017-06-17 03:51:01 +03:00
// maximum number of leases to revoke per second; configurable for tests
leaseRevokeRate = 1000
2016-01-09 00:17:54 +03:00
2018-07-17 23:17:15 +03:00
// maximum number of lease checkpoints recorded to the consensus log per second; configurable for tests
leaseCheckpointRate = 1000
// maximum number of lease checkpoints to batch into a single consensus log entry
maxLeaseCheckpointBatchSize = 1000
2018-03-08 03:32:04 +03:00
ErrNotPrimary = errors . New ( "not a primary lessor" )
ErrLeaseNotFound = errors . New ( "lease not found" )
ErrLeaseExists = errors . New ( "lease already exists" )
ErrLeaseTTLTooLarge = errors . New ( "too large lease TTL" )
2015-11-08 22:15:39 +03:00
)
2017-01-05 09:30:03 +03:00
// TxnDelete is a TxnWrite that only permits deletes. Defined here
// to avoid circular dependency with mvcc.
type TxnDelete interface {
DeleteRange ( key , end [ ] byte ) ( n , rev int64 )
End ( )
2015-11-10 00:33:12 +03:00
}
2017-01-05 09:30:03 +03:00
// RangeDeleter is a TxnDelete constructor.
type RangeDeleter func ( ) TxnDelete
2018-07-17 23:17:15 +03:00
// Checkpointer permits checkpointing of lease remaining TTLs to the consensus log. Defined here to
// avoid circular dependency with mvcc.
type Checkpointer func ( ctx context . Context , lc * pb . LeaseCheckpointRequest )
2017-01-05 09:30:03 +03:00
type LeaseID int64
2016-06-17 22:13:01 +03:00
// Lessor owns leases. It can grant, revoke, renew and modify leases for lessee.
2016-01-06 00:49:25 +03:00
type Lessor interface {
2017-01-05 09:30:03 +03:00
// SetRangeDeleter lets the lessor create TxnDeletes to the store.
// Lessor deletes the items in the revoked or expired lease by creating
// new TxnDeletes.
SetRangeDeleter ( rd RangeDeleter )
2016-01-08 21:06:33 +03:00
2018-07-17 23:17:15 +03:00
SetCheckpointer ( cp Checkpointer )
2016-01-06 00:49:25 +03:00
// Grant grants a lease that expires at least after TTL seconds.
2016-01-20 08:09:09 +03:00
Grant ( id LeaseID , ttl int64 ) ( * Lease , error )
2016-01-06 00:49:25 +03:00
// Revoke revokes a lease with given ID. The item attached to the
// given lease will be removed. If the ID does not exist, an error
// will be returned.
Revoke ( id LeaseID ) error
2016-01-08 04:52:48 +03:00
2018-07-17 23:17:15 +03:00
// Checkpoint applies the remainingTTL of a lease. The remainingTTL is used in Promote to set
// the expiry of leases to less than the full TTL when possible.
Checkpoint ( id LeaseID , remainingTTL int64 ) error
2016-01-08 21:06:33 +03:00
// Attach attaches given leaseItem to the lease with given LeaseID.
// If the lease does not exist, an error will be returned.
Attach ( id LeaseID , items [ ] LeaseItem ) error
2016-10-12 07:37:09 +03:00
// GetLease returns LeaseID for given item.
// If no lease found, NoLease value will be returned.
GetLease ( item LeaseItem ) LeaseID
2016-02-04 02:03:54 +03:00
// Detach detaches given leaseItem from the lease with given LeaseID.
// If the lease does not exist, an error will be returned.
Detach ( id LeaseID , items [ ] LeaseItem ) error
2016-01-08 04:52:48 +03:00
// Promote promotes the lessor to be the primary lessor. Primary lessor manages
// the expiration and renew of leases.
2016-03-14 06:19:47 +03:00
// Newly promoted lessor renew the TTL of all lease to extend + previous TTL.
Promote ( extend time . Duration )
2016-01-08 04:52:48 +03:00
// Demote demotes the lessor from being the primary lessor.
Demote ( )
2016-01-10 00:26:45 +03:00
// Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist,
// an error will be returned.
Renew ( id LeaseID ) ( int64 , error )
2016-01-09 00:17:54 +03:00
2016-01-23 02:48:06 +03:00
// Lookup gives the lease at a given lease id, if any
Lookup ( id LeaseID ) * Lease
2017-08-14 21:19:09 +03:00
// Leases lists all leases.
Leases ( ) [ ] * Lease
2016-01-12 20:00:57 +03:00
// ExpiredLeasesC returns a chan that is used to receive expired leases.
2016-01-09 00:17:54 +03:00
ExpiredLeasesC ( ) <- chan [ ] * Lease
2016-01-12 20:00:57 +03:00
2016-03-11 04:06:10 +03:00
// Recover recovers the lessor state from the given backend and RangeDeleter.
Recover ( b backend . Backend , rd RangeDeleter )
2016-01-12 20:00:57 +03:00
// Stop stops the lessor for managing leases. The behavior of calling Stop multiple
// times is undefined.
Stop ( )
2016-01-06 00:49:25 +03:00
}
// lessor implements Lessor interface.
2015-11-08 22:15:39 +03:00
// TODO: use clockwork for testability.
type lessor struct {
2018-01-25 11:19:28 +03:00
mu sync . RWMutex
2016-01-08 04:52:48 +03:00
2016-10-03 11:01:56 +03:00
// demotec is set when the lessor is the primary.
// demotec will be closed if the lessor is demoted.
demotec chan struct { }
2016-01-08 04:52:48 +03:00
2018-07-17 23:17:15 +03:00
leaseMap map [ LeaseID ] * Lease
leaseHeap LeaseQueue
leaseCheckpointHeap LeaseQueue
itemMap map [ LeaseItem ] LeaseID
2016-10-12 07:37:09 +03:00
2015-11-10 00:33:12 +03:00
// When a lease expires, the lessor will delete the
2016-01-09 03:20:48 +03:00
// leased range (or key) by the RangeDeleter.
rd RangeDeleter
2015-11-10 00:33:12 +03:00
2018-07-17 23:17:15 +03:00
// When a lease's deadline should be persisted to preserve the remaining TTL across leader
// elections and restarts, the lessor will checkpoint the lease by the Checkpointer.
cp Checkpointer
2016-01-05 09:53:11 +03:00
// backend to persist leases. We only persist lease ID and expiry for now.
// The leased items can be recovered by iterating all the keys in kv.
b backend . Backend
2016-08-02 20:29:23 +03:00
// minLeaseTTL is the minimum lease TTL that can be granted for a lease. Any
// requests for shorter TTLs are extended to the minimum TTL.
minLeaseTTL int64
2016-01-09 00:17:54 +03:00
expiredC chan [ ] * Lease
2016-01-12 20:00:57 +03:00
// stopC is a channel whose closure indicates that the lessor should be stopped.
stopC chan struct { }
// doneC is a channel whose closure indicates that the lessor is stopped.
doneC chan struct { }
2018-07-17 23:10:34 +03:00
lg * zap . Logger
2018-07-17 23:17:15 +03:00
// Wait duration between lease checkpoints.
checkpointInterval time . Duration
2018-07-17 23:10:34 +03:00
}
type LessorConfig struct {
2018-07-17 23:17:15 +03:00
MinLeaseTTL int64
CheckpointInterval time . Duration
2015-11-08 22:15:39 +03:00
}
2018-07-17 23:10:34 +03:00
func NewLessor ( lg * zap . Logger , b backend . Backend , cfg LessorConfig ) Lessor {
return newLessor ( lg , b , cfg )
2016-01-06 00:49:25 +03:00
}
2018-07-17 23:10:34 +03:00
func newLessor ( lg * zap . Logger , b backend . Backend , cfg LessorConfig ) * lessor {
2018-07-17 23:17:15 +03:00
checkpointInterval := cfg . CheckpointInterval
if checkpointInterval == 0 {
checkpointInterval = 5 * time . Minute
}
2016-01-05 09:53:11 +03:00
l := & lessor {
2018-07-17 23:17:15 +03:00
leaseMap : make ( map [ LeaseID ] * Lease ) ,
itemMap : make ( map [ LeaseItem ] LeaseID ) ,
leaseHeap : make ( LeaseQueue , 0 ) ,
leaseCheckpointHeap : make ( LeaseQueue , 0 ) ,
b : b ,
minLeaseTTL : cfg . MinLeaseTTL ,
checkpointInterval : checkpointInterval ,
2016-01-14 12:28:29 +03:00
// expiredC is a small buffered chan to avoid unnecessary blocking.
2016-01-09 00:17:54 +03:00
expiredC : make ( chan [ ] * Lease , 16 ) ,
2016-01-12 20:00:57 +03:00
stopC : make ( chan struct { } ) ,
doneC : make ( chan struct { } ) ,
2018-07-17 23:10:34 +03:00
lg : lg ,
2015-11-08 22:15:39 +03:00
}
2016-01-05 21:57:59 +03:00
l . initAndRecover ( )
2016-01-05 09:53:11 +03:00
2016-01-08 04:52:48 +03:00
go l . runLoop ( )
2016-01-05 09:53:11 +03:00
return l
2015-11-08 22:15:39 +03:00
}
2016-10-03 11:01:56 +03:00
// isPrimary indicates if this lessor is the primary lessor. The primary
// lessor manages lease expiration and renew.
//
// in etcd, raft leader is the primary. Thus there might be two primary
// leaders at the same time (raft allows concurrent leader but with different term)
// for at most a leader election timeout.
// The old primary leader cannot affect the correctness since its proposal has a
// smaller term and will not be committed.
//
// TODO: raft follower do not forward lease management proposals. There might be a
// very small window (within second normally which depends on go scheduling) that
// a raft follow is the primary between the raft leader demotion and lessor demotion.
// Usually this should not be a problem. Lease should not be that sensitive to timing.
func ( le * lessor ) isPrimary ( ) bool {
return le . demotec != nil
}
2016-01-09 03:20:48 +03:00
func ( le * lessor ) SetRangeDeleter ( rd RangeDeleter ) {
2016-01-08 21:06:33 +03:00
le . mu . Lock ( )
defer le . mu . Unlock ( )
2016-01-09 03:20:48 +03:00
le . rd = rd
2016-01-08 21:06:33 +03:00
}
2018-07-17 23:17:15 +03:00
func ( le * lessor ) SetCheckpointer ( cp Checkpointer ) {
le . mu . Lock ( )
defer le . mu . Unlock ( )
le . cp = cp
}
2016-01-20 08:09:09 +03:00
func ( le * lessor ) Grant ( id LeaseID , ttl int64 ) ( * Lease , error ) {
if id == NoLease {
return nil , ErrLeaseNotFound
}
2018-03-08 03:32:04 +03:00
if ttl > MaxLeaseTTL {
return nil , ErrLeaseTTLTooLarge
}
2016-06-17 22:13:01 +03:00
// TODO: when lessor is under high load, it should give out lease
// with longer TTL to reduce renew load.
2016-10-03 11:01:56 +03:00
l := & Lease {
ID : id ,
2016-10-06 20:31:36 +03:00
ttl : ttl ,
2016-10-03 11:01:56 +03:00
itemSet : make ( map [ LeaseItem ] struct { } ) ,
revokec : make ( chan struct { } ) ,
}
2015-11-08 22:15:39 +03:00
le . mu . Lock ( )
defer le . mu . Unlock ( )
2016-01-20 08:09:09 +03:00
if _ , ok := le . leaseMap [ id ] ; ok {
return nil , ErrLeaseExists
}
2016-01-09 01:29:33 +03:00
2016-10-06 20:31:36 +03:00
if l . ttl < le . minLeaseTTL {
l . ttl = le . minLeaseTTL
2016-08-02 20:29:23 +03:00
}
2016-10-03 11:01:56 +03:00
if le . isPrimary ( ) {
2016-03-14 06:19:47 +03:00
l . refresh ( 0 )
2016-01-09 01:29:33 +03:00
} else {
l . forever ( )
}
2015-11-08 22:15:39 +03:00
le . leaseMap [ id ] = l
2018-07-17 23:17:15 +03:00
item := & LeaseWithTime { id : l . ID , time : l . expiry . UnixNano ( ) }
2018-03-10 00:14:14 +03:00
heap . Push ( & le . leaseHeap , item )
2016-01-05 09:53:11 +03:00
l . persistTo ( le . b )
2018-05-24 07:34:48 +03:00
leaseTotalTTLs . Observe ( float64 ( l . ttl ) )
leaseGranted . Inc ( )
2018-07-17 23:17:15 +03:00
if le . isPrimary ( ) {
le . scheduleCheckpointIfNeeded ( l )
}
2016-01-20 08:09:09 +03:00
return l , nil
2015-11-08 22:15:39 +03:00
}
2016-01-05 21:16:50 +03:00
func ( le * lessor ) Revoke ( id LeaseID ) error {
2015-11-08 22:15:39 +03:00
le . mu . Lock ( )
l := le . leaseMap [ id ]
if l == nil {
2016-02-04 02:03:54 +03:00
le . mu . Unlock ( )
2016-01-08 21:06:33 +03:00
return ErrLeaseNotFound
2015-11-08 22:15:39 +03:00
}
2016-10-03 11:01:56 +03:00
defer close ( l . revokec )
2016-02-04 02:03:54 +03:00
// unlock before doing external work
le . mu . Unlock ( )
2015-11-08 22:15:39 +03:00
2016-08-05 06:39:32 +03:00
if le . rd == nil {
return nil
}
2016-08-04 18:06:33 +03:00
2017-01-05 09:30:03 +03:00
txn := le . rd ( )
2016-10-01 02:45:52 +03:00
// sort keys so deletes are in same order among all members,
// otherwise the backened hashes will be different
2017-03-08 21:18:19 +03:00
keys := l . Keys ( )
2016-10-01 02:45:52 +03:00
sort . StringSlice ( keys ) . Sort ( )
for _ , key := range keys {
2017-01-05 09:30:03 +03:00
txn . DeleteRange ( [ ] byte ( key ) , nil )
2015-11-10 00:33:12 +03:00
}
2016-08-05 06:39:32 +03:00
le . mu . Lock ( )
defer le . mu . Unlock ( )
delete ( le . leaseMap , l . ID )
// lease deletion needs to be in the same backend transaction with the
// kv deletion. Or we might end up with not executing the revoke or not
// deleting the keys if etcdserver fails in between.
le . b . BatchTx ( ) . UnsafeDelete ( leaseBucketName , int64ToBytes ( int64 ( l . ID ) ) )
2017-01-05 09:30:03 +03:00
txn . End ( )
2018-05-24 07:34:48 +03:00
leaseRevoked . Inc ( )
2015-11-08 22:15:39 +03:00
return nil
}
2018-07-17 23:17:15 +03:00
func ( le * lessor ) Checkpoint ( id LeaseID , remainingTTL int64 ) error {
le . mu . Lock ( )
defer le . mu . Unlock ( )
if l , ok := le . leaseMap [ id ] ; ok {
// when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry
l . remainingTTL = remainingTTL
if le . isPrimary ( ) {
// schedule the next checkpoint as needed
le . scheduleCheckpointIfNeeded ( l )
}
}
return nil
}
2016-01-05 09:53:11 +03:00
// Renew renews an existing lease. If the given lease does not exist or
// has expired, an error will be returned.
2016-01-10 00:26:45 +03:00
func ( le * lessor ) Renew ( id LeaseID ) ( int64 , error ) {
2019-02-21 07:10:04 +03:00
le . mu . RLock ( )
2016-10-03 11:01:56 +03:00
if ! le . isPrimary ( ) {
2016-01-10 00:26:45 +03:00
// forward renew request to primary instead of returning error.
2019-02-21 07:10:04 +03:00
le . mu . RUnlock ( )
2016-01-10 00:26:45 +03:00
return - 1 , ErrNotPrimary
2016-01-08 04:52:48 +03:00
}
2016-10-03 11:01:56 +03:00
demotec := le . demotec
2015-11-08 22:15:39 +03:00
l := le . leaseMap [ id ]
if l == nil {
2019-02-21 07:10:04 +03:00
le . mu . RUnlock ( )
2016-01-10 00:26:45 +03:00
return - 1 , ErrLeaseNotFound
2015-11-08 22:15:39 +03:00
}
2019-02-21 07:10:04 +03:00
// Clear remaining TTL when we renew if it is set
clearRemainingTTL := le . cp != nil && l . remainingTTL > 0
2015-11-08 22:15:39 +03:00
2019-02-21 07:10:04 +03:00
le . mu . RUnlock ( )
2016-10-03 11:01:56 +03:00
if l . expired ( ) {
select {
// A expired lease might be pending for revoking or going through
// quorum to be revoked. To be accurate, renew request must wait for the
// deletion to complete.
case <- l . revokec :
return - 1 , ErrLeaseNotFound
// The expired lease might fail to be revoked if the primary changes.
// The caller will retry on ErrNotPrimary.
case <- demotec :
return - 1 , ErrNotPrimary
case <- le . stopC :
return - 1 , ErrNotPrimary
}
}
2018-07-17 23:17:15 +03:00
// Clear remaining TTL when we renew if it is set
// By applying a RAFT entry only when the remainingTTL is already set, we limit the number
// of RAFT entries written per lease to a max of 2 per checkpoint interval.
2019-02-21 07:10:04 +03:00
if clearRemainingTTL {
2018-07-17 23:17:15 +03:00
le . cp ( context . Background ( ) , & pb . LeaseCheckpointRequest { Checkpoints : [ ] * pb . LeaseCheckpoint { { ID : int64 ( l . ID ) , Remaining_TTL : 0 } } } )
}
2019-02-21 07:10:04 +03:00
le . mu . Lock ( )
2016-03-14 06:19:47 +03:00
l . refresh ( 0 )
2018-07-17 23:17:15 +03:00
item := & LeaseWithTime { id : l . ID , time : l . expiry . UnixNano ( ) }
2018-03-10 00:14:14 +03:00
heap . Push ( & le . leaseHeap , item )
2019-02-21 07:10:04 +03:00
le . mu . Unlock ( )
2018-05-24 07:34:48 +03:00
leaseRenewed . Inc ( )
2016-10-06 20:31:36 +03:00
return l . ttl , nil
2015-11-08 22:15:39 +03:00
}
2016-01-23 02:48:06 +03:00
func ( le * lessor ) Lookup ( id LeaseID ) * Lease {
2018-01-25 11:19:28 +03:00
le . mu . RLock ( )
defer le . mu . RUnlock ( )
2016-09-30 20:09:28 +03:00
return le . leaseMap [ id ]
2016-01-23 02:48:06 +03:00
}
2017-08-14 21:19:09 +03:00
func ( le * lessor ) unsafeLeases ( ) [ ] * Lease {
leases := make ( [ ] * Lease , 0 , len ( le . leaseMap ) )
for _ , l := range le . leaseMap {
leases = append ( leases , l )
}
return leases
}
func ( le * lessor ) Leases ( ) [ ] * Lease {
2018-01-25 11:19:28 +03:00
le . mu . RLock ( )
2017-08-14 21:19:09 +03:00
ls := le . unsafeLeases ( )
2018-01-25 11:19:28 +03:00
le . mu . RUnlock ( )
2018-06-06 18:05:23 +03:00
sort . Sort ( leasesByExpiry ( ls ) )
2017-08-14 21:19:09 +03:00
return ls
}
2016-03-14 06:19:47 +03:00
func ( le * lessor ) Promote ( extend time . Duration ) {
2016-01-08 04:52:48 +03:00
le . mu . Lock ( )
defer le . mu . Unlock ( )
2016-10-03 11:01:56 +03:00
le . demotec = make ( chan struct { } )
2016-01-09 00:17:54 +03:00
// refresh the expiries of all leases.
for _ , l := range le . leaseMap {
2017-06-17 03:51:01 +03:00
l . refresh ( extend )
2018-07-17 23:17:15 +03:00
item := & LeaseWithTime { id : l . ID , time : l . expiry . UnixNano ( ) }
2018-03-10 00:14:14 +03:00
heap . Push ( & le . leaseHeap , item )
2016-01-09 00:17:54 +03:00
}
2016-01-08 04:52:48 +03:00
2017-06-17 03:51:01 +03:00
if len ( le . leaseMap ) < leaseRevokeRate {
// no possibility of lease pile-up
return
}
// adjust expiries in case of overlap
2017-08-14 21:19:09 +03:00
leases := le . unsafeLeases ( )
2018-06-06 18:05:23 +03:00
sort . Sort ( leasesByExpiry ( leases ) )
2017-06-17 03:51:01 +03:00
baseWindow := leases [ 0 ] . Remaining ( )
nextWindow := baseWindow + time . Second
expires := 0
// have fewer expires than the total revoke rate so piled up leases
// don't consume the entire revoke limit
targetExpiresPerSecond := ( 3 * leaseRevokeRate ) / 4
for _ , l := range leases {
remaining := l . Remaining ( )
if remaining > nextWindow {
baseWindow = remaining
nextWindow = baseWindow + time . Second
expires = 1
continue
}
expires ++
if expires <= targetExpiresPerSecond {
continue
}
rateDelay := float64 ( time . Second ) * ( float64 ( expires ) / float64 ( targetExpiresPerSecond ) )
// If leases are extended by n seconds, leases n seconds ahead of the
// base window should be extended by only one second.
rateDelay -= float64 ( remaining - baseWindow )
delay := time . Duration ( rateDelay )
nextWindow = baseWindow + delay
l . refresh ( delay + extend )
2018-07-17 23:17:15 +03:00
item := & LeaseWithTime { id : l . ID , time : l . expiry . UnixNano ( ) }
2018-03-10 00:14:14 +03:00
heap . Push ( & le . leaseHeap , item )
2018-07-17 23:17:15 +03:00
le . scheduleCheckpointIfNeeded ( l )
2017-06-15 02:08:16 +03:00
}
}
2017-06-17 03:51:01 +03:00
type leasesByExpiry [ ] * Lease
func ( le leasesByExpiry ) Len ( ) int { return len ( le ) }
func ( le leasesByExpiry ) Less ( i , j int ) bool { return le [ i ] . Remaining ( ) < le [ j ] . Remaining ( ) }
func ( le leasesByExpiry ) Swap ( i , j int ) { le [ i ] , le [ j ] = le [ j ] , le [ i ] }
2016-01-08 04:52:48 +03:00
func ( le * lessor ) Demote ( ) {
le . mu . Lock ( )
defer le . mu . Unlock ( )
2016-01-09 00:17:54 +03:00
// set the expiries of all leases to forever
for _ , l := range le . leaseMap {
2016-01-09 01:29:33 +03:00
l . forever ( )
2016-01-09 00:17:54 +03:00
}
2018-07-17 23:17:15 +03:00
le . clearScheduledLeasesCheckpoints ( )
2016-10-03 11:01:56 +03:00
if le . demotec != nil {
close ( le . demotec )
le . demotec = nil
}
2016-01-08 04:52:48 +03:00
}
2015-11-08 22:15:39 +03:00
// Attach attaches items to the lease with given ID. When the lease
// expires, the attached items will be automatically removed.
// If the given lease does not exist, an error will be returned.
2016-01-08 21:06:33 +03:00
func ( le * lessor ) Attach ( id LeaseID , items [ ] LeaseItem ) error {
2015-11-08 22:15:39 +03:00
le . mu . Lock ( )
defer le . mu . Unlock ( )
l := le . leaseMap [ id ]
if l == nil {
2016-01-08 21:06:33 +03:00
return ErrLeaseNotFound
2015-11-08 22:15:39 +03:00
}
2017-03-08 21:18:19 +03:00
l . mu . Lock ( )
2015-11-08 22:15:39 +03:00
for _ , it := range items {
l . itemSet [ it ] = struct { } { }
2016-10-12 07:37:09 +03:00
le . itemMap [ it ] = id
2015-11-08 22:15:39 +03:00
}
2017-03-08 21:18:19 +03:00
l . mu . Unlock ( )
2015-11-08 22:15:39 +03:00
return nil
}
2016-10-12 07:37:09 +03:00
func ( le * lessor ) GetLease ( item LeaseItem ) LeaseID {
2018-01-25 11:19:28 +03:00
le . mu . RLock ( )
2016-10-12 07:37:09 +03:00
id := le . itemMap [ item ]
2018-01-25 11:19:28 +03:00
le . mu . RUnlock ( )
2016-10-12 07:37:09 +03:00
return id
}
2016-02-04 02:03:54 +03:00
// Detach detaches items from the lease with given ID.
// If the given lease does not exist, an error will be returned.
func ( le * lessor ) Detach ( id LeaseID , items [ ] LeaseItem ) error {
le . mu . Lock ( )
defer le . mu . Unlock ( )
l := le . leaseMap [ id ]
if l == nil {
return ErrLeaseNotFound
}
2017-03-08 21:18:19 +03:00
l . mu . Lock ( )
2016-02-04 02:03:54 +03:00
for _ , it := range items {
delete ( l . itemSet , it )
2016-10-12 07:37:09 +03:00
delete ( le . itemMap , it )
2016-02-04 02:03:54 +03:00
}
2017-03-08 21:18:19 +03:00
l . mu . Unlock ( )
2016-02-04 02:03:54 +03:00
return nil
}
2016-01-09 03:20:48 +03:00
func ( le * lessor ) Recover ( b backend . Backend , rd RangeDeleter ) {
2016-01-06 00:49:25 +03:00
le . mu . Lock ( )
defer le . mu . Unlock ( )
le . b = b
2016-01-09 03:20:48 +03:00
le . rd = rd
2016-01-06 00:49:25 +03:00
le . leaseMap = make ( map [ LeaseID ] * Lease )
2016-10-12 07:37:09 +03:00
le . itemMap = make ( map [ LeaseItem ] LeaseID )
2016-01-06 00:49:25 +03:00
le . initAndRecover ( )
}
2016-01-09 00:17:54 +03:00
func ( le * lessor ) ExpiredLeasesC ( ) <- chan [ ] * Lease {
return le . expiredC
}
2016-01-12 20:00:57 +03:00
func ( le * lessor ) Stop ( ) {
close ( le . stopC )
<- le . doneC
}
2016-01-08 04:52:48 +03:00
func ( le * lessor ) runLoop ( ) {
2016-01-12 20:00:57 +03:00
defer close ( le . doneC )
2016-01-08 04:52:48 +03:00
for {
2018-07-17 23:17:15 +03:00
le . revokeExpiredLeases ( )
le . checkpointScheduledLeases ( )
2017-09-03 17:49:53 +03:00
2018-07-17 23:17:15 +03:00
select {
case <- time . After ( 500 * time . Millisecond ) :
case <- le . stopC :
return
2016-01-09 00:17:54 +03:00
}
2018-07-17 23:17:15 +03:00
}
}
2016-01-09 00:17:54 +03:00
2018-07-17 23:17:15 +03:00
// revokeExpiredLeases finds all leases past their expiry and sends them to epxired channel for
// to be revoked.
func ( le * lessor ) revokeExpiredLeases ( ) {
var ls [ ] * Lease
// rate limit
revokeLimit := leaseRevokeRate / 2
le . mu . RLock ( )
if le . isPrimary ( ) {
ls = le . findExpiredLeases ( revokeLimit )
}
le . mu . RUnlock ( )
if len ( ls ) != 0 {
2016-01-12 20:00:57 +03:00
select {
case <- le . stopC :
return
2018-07-17 23:17:15 +03:00
case le . expiredC <- ls :
default :
// the receiver of expiredC is probably busy handling
// other stuff
// let's try this next time after 500ms
}
}
}
// checkpointScheduledLeases finds all scheduled lease checkpoints that are due and
// submits them to the checkpointer to persist them to the consensus log.
func ( le * lessor ) checkpointScheduledLeases ( ) {
var cps [ ] * pb . LeaseCheckpoint
// rate limit
for i := 0 ; i < leaseCheckpointRate / 2 ; i ++ {
le . mu . Lock ( )
if le . isPrimary ( ) {
cps = le . findDueScheduledCheckpoints ( maxLeaseCheckpointBatchSize )
}
le . mu . Unlock ( )
if len ( cps ) != 0 {
le . cp ( context . Background ( ) , & pb . LeaseCheckpointRequest { Checkpoints : cps } )
}
if len ( cps ) < maxLeaseCheckpointBatchSize {
return
2016-01-12 20:00:57 +03:00
}
2016-01-08 04:52:48 +03:00
}
}
2018-07-17 23:17:15 +03:00
func ( le * lessor ) clearScheduledLeasesCheckpoints ( ) {
le . leaseCheckpointHeap = make ( LeaseQueue , 0 )
}
2018-04-03 02:27:16 +03:00
// expireExists returns true if expiry items exist.
2018-04-03 02:31:58 +03:00
// It pops only when expiry item exists.
2018-04-03 02:27:16 +03:00
// "next" is true, to indicate that it may exist in next attempt.
func ( le * lessor ) expireExists ( ) ( l * Lease , ok bool , next bool ) {
if le . leaseHeap . Len ( ) == 0 {
return nil , false , false
}
2018-04-03 02:31:58 +03:00
item := le . leaseHeap [ 0 ]
2018-04-03 02:27:16 +03:00
l = le . leaseMap [ item . id ]
if l == nil {
// lease has expired or been revoked
// no need to revoke (nothing is expiry)
2018-04-03 02:31:58 +03:00
heap . Pop ( & le . leaseHeap ) // O(log N)
2018-04-03 02:27:16 +03:00
return nil , false , true
}
2018-07-17 23:17:15 +03:00
if time . Now ( ) . UnixNano ( ) < item . time /* expiration time */ {
2018-04-03 02:27:16 +03:00
// Candidate expirations are caught up, reinsert this item
// and no need to revoke (nothing is expiry)
return l , false , false
}
// if the lease is actually expired, add to the removal list. If it is not expired, we can ignore it because another entry will have been inserted into the heap
2018-04-03 02:31:58 +03:00
heap . Pop ( & le . leaseHeap ) // O(log N)
2018-04-03 02:27:16 +03:00
return l , true , false
}
2017-09-03 17:49:53 +03:00
// findExpiredLeases loops leases in the leaseMap until reaching expired limit
// and returns the expired leases that needed to be revoked.
func ( le * lessor ) findExpiredLeases ( limit int ) [ ] * Lease {
2016-01-06 00:49:25 +03:00
leases := make ( [ ] * Lease , 0 , 16 )
2015-11-08 22:15:39 +03:00
2018-03-10 00:14:14 +03:00
for {
2018-04-03 02:27:16 +03:00
l , ok , next := le . expireExists ( )
if ! ok && ! next {
2018-03-10 00:14:14 +03:00
break
}
2018-04-03 02:27:16 +03:00
if ! ok {
2018-03-10 00:14:14 +03:00
continue
}
2018-04-03 02:27:16 +03:00
if next {
continue
2018-03-10 00:14:14 +03:00
}
2016-10-03 11:01:56 +03:00
if l . expired ( ) {
2015-11-08 22:15:39 +03:00
leases = append ( leases , l )
2017-09-03 17:49:53 +03:00
// reach expired limit
if len ( leases ) == limit {
break
}
2015-11-08 22:15:39 +03:00
}
}
return leases
}
2018-07-17 23:17:15 +03:00
func ( le * lessor ) scheduleCheckpointIfNeeded ( lease * Lease ) {
if le . cp == nil {
return
}
if lease . RemainingTTL ( ) > int64 ( le . checkpointInterval . Seconds ( ) ) {
if le . lg != nil {
le . lg . Debug ( "Scheduling lease checkpoint" ,
zap . Int64 ( "leaseID" , int64 ( lease . ID ) ) ,
zap . Duration ( "intervalSeconds" , le . checkpointInterval ) ,
)
}
heap . Push ( & le . leaseCheckpointHeap , & LeaseWithTime {
id : lease . ID ,
time : time . Now ( ) . Add ( le . checkpointInterval ) . UnixNano ( ) ,
} )
}
}
func ( le * lessor ) findDueScheduledCheckpoints ( checkpointLimit int ) [ ] * pb . LeaseCheckpoint {
if le . cp == nil {
return nil
}
now := time . Now ( )
cps := [ ] * pb . LeaseCheckpoint { }
for le . leaseCheckpointHeap . Len ( ) > 0 && len ( cps ) < checkpointLimit {
lt := le . leaseCheckpointHeap [ 0 ]
if lt . time /* next checkpoint time */ > now . UnixNano ( ) {
return cps
}
heap . Pop ( & le . leaseCheckpointHeap )
var l * Lease
var ok bool
if l , ok = le . leaseMap [ lt . id ] ; ! ok {
continue
}
if ! now . Before ( l . expiry ) {
continue
}
remainingTTL := int64 ( math . Ceil ( l . expiry . Sub ( now ) . Seconds ( ) ) )
if remainingTTL >= l . ttl {
continue
}
if le . lg != nil {
le . lg . Debug ( "Checkpointing lease" ,
zap . Int64 ( "leaseID" , int64 ( lt . id ) ) ,
zap . Int64 ( "remainingTTL" , remainingTTL ) ,
)
}
cps = append ( cps , & pb . LeaseCheckpoint { ID : int64 ( lt . id ) , Remaining_TTL : remainingTTL } )
}
return cps
}
2016-01-05 21:57:59 +03:00
func ( le * lessor ) initAndRecover ( ) {
tx := le . b . BatchTx ( )
tx . Lock ( )
tx . UnsafeCreateBucket ( leaseBucketName )
_ , vs := tx . UnsafeRange ( leaseBucketName , int64ToBytes ( 0 ) , int64ToBytes ( math . MaxInt64 ) , 0 )
// TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue.
for i := range vs {
var lpb leasepb . Lease
err := lpb . Unmarshal ( vs [ i ] )
if err != nil {
2016-01-07 21:20:58 +03:00
tx . Unlock ( )
2016-01-05 21:57:59 +03:00
panic ( "failed to unmarshal lease proto item" )
}
2016-01-06 00:49:25 +03:00
ID := LeaseID ( lpb . ID )
2016-08-02 20:29:23 +03:00
if lpb . TTL < le . minLeaseTTL {
lpb . TTL = le . minLeaseTTL
}
2016-01-06 00:49:25 +03:00
le . leaseMap [ ID ] = & Lease {
ID : ID ,
2016-10-06 20:31:36 +03:00
ttl : lpb . TTL ,
2016-01-05 21:57:59 +03:00
// itemSet will be filled in when recover key-value pairs
2016-01-09 00:17:54 +03:00
// set expiry to forever, refresh when promoted
2016-01-25 05:00:31 +03:00
itemSet : make ( map [ LeaseItem ] struct { } ) ,
expiry : forever ,
2016-10-03 11:01:56 +03:00
revokec : make ( chan struct { } ) ,
2016-01-05 21:57:59 +03:00
}
}
2018-03-10 00:14:14 +03:00
heap . Init ( & le . leaseHeap )
2018-07-17 23:17:15 +03:00
heap . Init ( & le . leaseCheckpointHeap )
2016-01-07 21:20:58 +03:00
tx . Unlock ( )
2016-01-05 21:57:59 +03:00
le . b . ForceCommit ( )
}
2016-01-06 00:49:25 +03:00
type Lease struct {
2018-07-17 23:17:15 +03:00
ID LeaseID
ttl int64 // time to live of the lease in seconds
remainingTTL int64 // remaining time to live in seconds, if zero valued it is considered unset and the full ttl should be used
2017-09-06 15:56:34 +03:00
// expiryMu protects concurrent accesses to expiry
expiryMu sync . RWMutex
// expiry is time when lease should expire. no expiration when expiry.IsZero() is true
expiry time . Time
2015-11-08 22:15:39 +03:00
2017-03-08 21:18:19 +03:00
// mu protects concurrent accesses to itemSet
mu sync . RWMutex
2016-01-08 21:06:33 +03:00
itemSet map [ LeaseItem ] struct { }
2016-10-03 11:01:56 +03:00
revokec chan struct { }
}
2016-10-06 20:31:36 +03:00
func ( l * Lease ) expired ( ) bool {
2016-10-03 11:01:56 +03:00
return l . Remaining ( ) <= 0
2015-11-08 22:15:39 +03:00
}
2016-10-06 20:31:36 +03:00
func ( l * Lease ) persistTo ( b backend . Backend ) {
2016-01-06 00:49:25 +03:00
key := int64ToBytes ( int64 ( l . ID ) )
2016-01-05 09:53:11 +03:00
2018-07-17 23:17:15 +03:00
lpb := leasepb . Lease { ID : int64 ( l . ID ) , TTL : l . ttl , RemainingTTL : l . remainingTTL }
2016-01-05 09:53:11 +03:00
val , err := lpb . Marshal ( )
if err != nil {
panic ( "failed to marshal lease proto item" )
}
b . BatchTx ( ) . Lock ( )
b . BatchTx ( ) . UnsafePut ( leaseBucketName , key , val )
b . BatchTx ( ) . Unlock ( )
}
2016-10-06 20:31:36 +03:00
// TTL returns the TTL of the Lease.
func ( l * Lease ) TTL ( ) int64 {
return l . ttl
}
2018-07-17 23:17:15 +03:00
// RemainingTTL returns the last checkpointed remaining TTL of the lease.
// TODO(jpbetz): do not expose this utility method
func ( l * Lease ) RemainingTTL ( ) int64 {
if l . remainingTTL > 0 {
return l . remainingTTL
}
return l . ttl
}
2016-08-02 20:29:23 +03:00
// refresh refreshes the expiry of the lease.
2016-03-14 06:19:47 +03:00
func ( l * Lease ) refresh ( extend time . Duration ) {
2018-07-17 23:17:15 +03:00
newExpiry := time . Now ( ) . Add ( extend + time . Duration ( l . RemainingTTL ( ) ) * time . Second )
2017-09-06 15:56:34 +03:00
l . expiryMu . Lock ( )
defer l . expiryMu . Unlock ( )
l . expiry = newExpiry
2015-11-08 22:15:39 +03:00
}
2016-01-09 01:29:33 +03:00
// forever sets the expiry of lease to be forever.
2017-09-06 15:56:34 +03:00
func ( l * Lease ) forever ( ) {
l . expiryMu . Lock ( )
defer l . expiryMu . Unlock ( )
l . expiry = forever
}
2016-01-09 01:29:33 +03:00
2016-09-09 02:11:46 +03:00
// Keys returns all the keys attached to the lease.
func ( l * Lease ) Keys ( ) [ ] string {
2017-03-08 21:18:19 +03:00
l . mu . RLock ( )
2016-09-09 02:11:46 +03:00
keys := make ( [ ] string , 0 , len ( l . itemSet ) )
for k := range l . itemSet {
keys = append ( keys , k . Key )
}
2017-03-08 21:18:19 +03:00
l . mu . RUnlock ( )
2016-09-09 02:11:46 +03:00
return keys
}
// Remaining returns the remaining time of the lease.
func ( l * Lease ) Remaining ( ) time . Duration {
2017-09-06 15:56:34 +03:00
l . expiryMu . RLock ( )
defer l . expiryMu . RUnlock ( )
if l . expiry . IsZero ( ) {
return time . Duration ( math . MaxInt64 )
}
2017-09-08 04:41:36 +03:00
return time . Until ( l . expiry )
2016-09-09 02:11:46 +03:00
}
2016-01-08 21:06:33 +03:00
type LeaseItem struct {
Key string
2015-11-08 22:15:39 +03:00
}
2016-01-05 09:53:11 +03:00
func int64ToBytes ( n int64 ) [ ] byte {
bytes := make ( [ ] byte , 8 )
binary . BigEndian . PutUint64 ( bytes , uint64 ( n ) )
return bytes
}
2016-01-08 21:06:33 +03:00
// FakeLessor is a fake implementation of Lessor interface.
// Used for testing only.
2016-03-11 04:06:10 +03:00
type FakeLessor struct { }
2016-01-08 21:06:33 +03:00
func ( fl * FakeLessor ) SetRangeDeleter ( dr RangeDeleter ) { }
2018-07-17 23:17:15 +03:00
func ( fl * FakeLessor ) SetCheckpointer ( cp Checkpointer ) { }
2016-01-20 08:09:09 +03:00
func ( fl * FakeLessor ) Grant ( id LeaseID , ttl int64 ) ( * Lease , error ) { return nil , nil }
2016-01-08 21:06:33 +03:00
func ( fl * FakeLessor ) Revoke ( id LeaseID ) error { return nil }
2018-07-17 23:17:15 +03:00
func ( fl * FakeLessor ) Checkpoint ( id LeaseID , remainingTTL int64 ) error { return nil }
2016-01-08 21:06:33 +03:00
func ( fl * FakeLessor ) Attach ( id LeaseID , items [ ] LeaseItem ) error { return nil }
2016-10-12 07:37:09 +03:00
func ( fl * FakeLessor ) GetLease ( item LeaseItem ) LeaseID { return 0 }
2016-02-04 02:03:54 +03:00
func ( fl * FakeLessor ) Detach ( id LeaseID , items [ ] LeaseItem ) error { return nil }
2016-03-14 06:19:47 +03:00
func ( fl * FakeLessor ) Promote ( extend time . Duration ) { }
2016-01-08 21:06:33 +03:00
func ( fl * FakeLessor ) Demote ( ) { }
2016-01-10 00:26:45 +03:00
func ( fl * FakeLessor ) Renew ( id LeaseID ) ( int64 , error ) { return 10 , nil }
2016-01-08 21:06:33 +03:00
2017-09-03 09:46:40 +03:00
func ( fl * FakeLessor ) Lookup ( id LeaseID ) * Lease { return nil }
2016-01-23 02:48:06 +03:00
2017-09-03 09:46:40 +03:00
func ( fl * FakeLessor ) Leases ( ) [ ] * Lease { return nil }
2017-08-14 21:19:09 +03:00
2016-01-08 21:06:33 +03:00
func ( fl * FakeLessor ) ExpiredLeasesC ( ) <- chan [ ] * Lease { return nil }
2016-01-12 20:00:57 +03:00
2016-03-11 04:06:10 +03:00
func ( fl * FakeLessor ) Recover ( b backend . Backend , rd RangeDeleter ) { }
2016-01-12 20:00:57 +03:00
func ( fl * FakeLessor ) Stop ( ) { }