diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 40553185f..1c03995ea 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -58,6 +58,8 @@ type applierV3 interface { LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) + LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error) + Alarm(*pb.AlarmRequest) (*pb.AlarmResponse, error) Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error) @@ -130,6 +132,8 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant) case r.LeaseRevoke != nil: ar.resp, ar.err = a.s.applyV3.LeaseRevoke(r.LeaseRevoke) + case r.LeaseCheckpoint != nil: + ar.resp, ar.err = a.s.applyV3.LeaseCheckpoint(r.LeaseCheckpoint) case r.Alarm != nil: ar.resp, ar.err = a.s.applyV3.Alarm(r.Alarm) case r.Authenticate != nil: @@ -582,6 +586,16 @@ func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevo return &pb.LeaseRevokeResponse{Header: newHeader(a.s)}, err } +func (a *applierV3backend) LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error) { + for _, c := range lc.Checkpoints { + err := a.s.lessor.Checkpoint(lease.LeaseID(c.ID), c.Remaining_TTL) + if err != nil { + return &pb.LeaseCheckpointResponse{Header: newHeader(a.s)}, err + } + } + return &pb.LeaseCheckpointResponse{Header: newHeader(a.s)}, nil +} + func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) { resp := &pb.AlarmResponse{} oldCount := len(a.s.alarmStore.Get(ar.Alarm)) diff --git a/etcdserver/config.go b/etcdserver/config.go index 2381b5a71..6d945b342 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -140,6 +140,9 @@ type ServerConfig struct { Debug bool ForceNewCluster bool + + // LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints. + LeaseCheckpointInterval time.Duration } // VerifyBootstrap sanity-checks the initial config for bootstrap case diff --git a/etcdserver/server.go b/etcdserver/server.go index 699b88e7e..86ff6e1d0 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -519,7 +519,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. - srv.lessor = lease.NewLessor(srv.getLogger(), srv.be, lease.LessorConfig{MinLeaseTTL: int64(math.Ceil(minTTL.Seconds()))}) + srv.lessor = lease.NewLessor(srv.getLogger(), srv.be, lease.LessorConfig{MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), CheckpointInterval: cfg.LeaseCheckpointInterval}) srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex) if beExist { kvindex := srv.kv.ConsistentIndex() @@ -576,6 +576,10 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { return nil, err } + srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) { + srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp}) + }) + // TODO: move transport initialization near the definition of remote tr := &rafthttp.Transport{ Logger: cfg.Logger, diff --git a/lease/lease_queue.go b/lease/lease_queue.go index 261df9509..5ecb38b59 100644 --- a/lease/lease_queue.go +++ b/lease/lease_queue.go @@ -14,11 +14,14 @@ package lease -// LeaseWithTime contains lease object with expire information. +// LeaseWithTime contains lease object with a time. +// For the lessor's lease heap, time identifies the lease expiration time. +// For the lessor's lease checkpoint heap, the time identifies the next lease checkpoint time. type LeaseWithTime struct { - id LeaseID - expiration int64 - index int + id LeaseID + // Unix nanos timestamp. + time int64 + index int } type LeaseQueue []*LeaseWithTime @@ -26,7 +29,7 @@ type LeaseQueue []*LeaseWithTime func (pq LeaseQueue) Len() int { return len(pq) } func (pq LeaseQueue) Less(i, j int) bool { - return pq[i].expiration < pq[j].expiration + return pq[i].time < pq[j].time } func (pq LeaseQueue) Swap(i, j int) { diff --git a/lease/lease_queue_test.go b/lease/lease_queue_test.go index 201911fd0..2387ae028 100644 --- a/lease/lease_queue_test.go +++ b/lease/lease_queue_test.go @@ -34,7 +34,7 @@ func TestLeaseQueue(t *testing.T) { exp = time.Now().UnixNano() } le.leaseMap[LeaseID(i)] = &Lease{ID: LeaseID(i)} - heap.Push(&le.leaseHeap, &LeaseWithTime{id: LeaseID(i), expiration: exp}) + heap.Push(&le.leaseHeap, &LeaseWithTime{id: LeaseID(i), time: exp}) } // first element must be front diff --git a/lease/lessor.go b/lease/lessor.go index 8f7a9decc..4b81bd81a 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -16,6 +16,7 @@ package lease import ( "container/heap" + "context" "encoding/binary" "errors" "math" @@ -23,6 +24,7 @@ import ( "sync" "time" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/lease/leasepb" "github.com/coreos/etcd/mvcc/backend" "go.uber.org/zap" @@ -42,6 +44,12 @@ var ( // maximum number of leases to revoke per second; configurable for tests leaseRevokeRate = 1000 + // maximum number of lease checkpoints recorded to the consensus log per second; configurable for tests + leaseCheckpointRate = 1000 + + // maximum number of lease checkpoints to batch into a single consensus log entry + maxLeaseCheckpointBatchSize = 1000 + ErrNotPrimary = errors.New("not a primary lessor") ErrLeaseNotFound = errors.New("lease not found") ErrLeaseExists = errors.New("lease already exists") @@ -58,6 +66,10 @@ type TxnDelete interface { // RangeDeleter is a TxnDelete constructor. type RangeDeleter func() TxnDelete +// Checkpointer permits checkpointing of lease remaining TTLs to the consensus log. Defined here to +// avoid circular dependency with mvcc. +type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest) + type LeaseID int64 // Lessor owns leases. It can grant, revoke, renew and modify leases for lessee. @@ -67,6 +79,8 @@ type Lessor interface { // new TxnDeletes. SetRangeDeleter(rd RangeDeleter) + SetCheckpointer(cp Checkpointer) + // Grant grants a lease that expires at least after TTL seconds. Grant(id LeaseID, ttl int64) (*Lease, error) // Revoke revokes a lease with given ID. The item attached to the @@ -74,6 +88,10 @@ type Lessor interface { // will be returned. Revoke(id LeaseID) error + // Checkpoint applies the remainingTTL of a lease. The remainingTTL is used in Promote to set + // the expiry of leases to less than the full TTL when possible. + Checkpoint(id LeaseID, remainingTTL int64) error + // Attach attaches given leaseItem to the lease with given LeaseID. // If the lease does not exist, an error will be returned. Attach(id LeaseID, items []LeaseItem) error @@ -124,14 +142,19 @@ type lessor struct { // demotec will be closed if the lessor is demoted. demotec chan struct{} - leaseMap map[LeaseID]*Lease - leaseHeap LeaseQueue - itemMap map[LeaseItem]LeaseID + leaseMap map[LeaseID]*Lease + leaseHeap LeaseQueue + leaseCheckpointHeap LeaseQueue + itemMap map[LeaseItem]LeaseID // When a lease expires, the lessor will delete the // leased range (or key) by the RangeDeleter. rd RangeDeleter + // When a lease's deadline should be persisted to preserve the remaining TTL across leader + // elections and restarts, the lessor will checkpoint the lease by the Checkpointer. + cp Checkpointer + // backend to persist leases. We only persist lease ID and expiry for now. // The leased items can be recovered by iterating all the keys in kv. b backend.Backend @@ -147,10 +170,14 @@ type lessor struct { doneC chan struct{} lg *zap.Logger + + // Wait duration between lease checkpoints. + checkpointInterval time.Duration } type LessorConfig struct { - MinLeaseTTL int64 + MinLeaseTTL int64 + CheckpointInterval time.Duration } func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor { @@ -158,12 +185,18 @@ func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor { } func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor { + checkpointInterval := cfg.CheckpointInterval + if checkpointInterval == 0 { + checkpointInterval = 5 * time.Minute + } l := &lessor{ - leaseMap: make(map[LeaseID]*Lease), - itemMap: make(map[LeaseItem]LeaseID), - leaseHeap: make(LeaseQueue, 0), - b: b, - minLeaseTTL: cfg.MinLeaseTTL, + leaseMap: make(map[LeaseID]*Lease), + itemMap: make(map[LeaseItem]LeaseID), + leaseHeap: make(LeaseQueue, 0), + leaseCheckpointHeap: make(LeaseQueue, 0), + b: b, + minLeaseTTL: cfg.MinLeaseTTL, + checkpointInterval: checkpointInterval, // expiredC is a small buffered chan to avoid unnecessary blocking. expiredC: make(chan []*Lease, 16), stopC: make(chan struct{}), @@ -201,6 +234,13 @@ func (le *lessor) SetRangeDeleter(rd RangeDeleter) { le.rd = rd } +func (le *lessor) SetCheckpointer(cp Checkpointer) { + le.mu.Lock() + defer le.mu.Unlock() + + le.cp = cp +} + func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { if id == NoLease { return nil, ErrLeaseNotFound @@ -237,12 +277,17 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { } le.leaseMap[id] = l - item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()} + item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()} heap.Push(&le.leaseHeap, item) l.persistTo(le.b) leaseTotalTTLs.Observe(float64(l.ttl)) leaseGranted.Inc() + + if le.isPrimary() { + le.scheduleCheckpointIfNeeded(l) + } + return l, nil } @@ -286,6 +331,21 @@ func (le *lessor) Revoke(id LeaseID) error { return nil } +func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error { + le.mu.Lock() + defer le.mu.Unlock() + + if l, ok := le.leaseMap[id]; ok { + // when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry + l.remainingTTL = remainingTTL + if le.isPrimary() { + // schedule the next checkpoint as needed + le.scheduleCheckpointIfNeeded(l) + } + } + return nil +} + // Renew renews an existing lease. If the given lease does not exist or // has expired, an error will be returned. func (le *lessor) Renew(id LeaseID) (int64, error) { @@ -324,8 +384,15 @@ func (le *lessor) Renew(id LeaseID) (int64, error) { } } + // Clear remaining TTL when we renew if it is set + // By applying a RAFT entry only when the remainingTTL is already set, we limit the number + // of RAFT entries written per lease to a max of 2 per checkpoint interval. + if le.cp != nil && l.remainingTTL > 0 { + le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}}) + } + l.refresh(0) - item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()} + item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()} heap.Push(&le.leaseHeap, item) leaseRenewed.Inc() @@ -363,7 +430,7 @@ func (le *lessor) Promote(extend time.Duration) { // refresh the expiries of all leases. for _, l := range le.leaseMap { l.refresh(extend) - item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()} + item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()} heap.Push(&le.leaseHeap, item) } @@ -401,8 +468,9 @@ func (le *lessor) Promote(extend time.Duration) { delay := time.Duration(rateDelay) nextWindow = baseWindow + delay l.refresh(delay + extend) - item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()} + item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()} heap.Push(&le.leaseHeap, item) + le.scheduleCheckpointIfNeeded(l) } } @@ -421,6 +489,8 @@ func (le *lessor) Demote() { l.forever() } + le.clearScheduledLeasesCheckpoints() + if le.demotec != nil { close(le.demotec) le.demotec = nil @@ -499,28 +569,8 @@ func (le *lessor) runLoop() { defer close(le.doneC) for { - var ls []*Lease - - // rate limit - revokeLimit := leaseRevokeRate / 2 - - le.mu.RLock() - if le.isPrimary() { - ls = le.findExpiredLeases(revokeLimit) - } - le.mu.RUnlock() - - if len(ls) != 0 { - select { - case <-le.stopC: - return - case le.expiredC <- ls: - default: - // the receiver of expiredC is probably busy handling - // other stuff - // let's try this next time after 500ms - } - } + le.revokeExpiredLeases() + le.checkpointScheduledLeases() select { case <-time.After(500 * time.Millisecond): @@ -530,6 +580,59 @@ func (le *lessor) runLoop() { } } +// revokeExpiredLeases finds all leases past their expiry and sends them to epxired channel for +// to be revoked. +func (le *lessor) revokeExpiredLeases() { + var ls []*Lease + + // rate limit + revokeLimit := leaseRevokeRate / 2 + + le.mu.RLock() + if le.isPrimary() { + ls = le.findExpiredLeases(revokeLimit) + } + le.mu.RUnlock() + + if len(ls) != 0 { + select { + case <-le.stopC: + return + case le.expiredC <- ls: + default: + // the receiver of expiredC is probably busy handling + // other stuff + // let's try this next time after 500ms + } + } +} + +// checkpointScheduledLeases finds all scheduled lease checkpoints that are due and +// submits them to the checkpointer to persist them to the consensus log. +func (le *lessor) checkpointScheduledLeases() { + var cps []*pb.LeaseCheckpoint + + // rate limit + for i := 0; i < leaseCheckpointRate/2; i++ { + le.mu.Lock() + if le.isPrimary() { + cps = le.findDueScheduledCheckpoints(maxLeaseCheckpointBatchSize) + } + le.mu.Unlock() + + if len(cps) != 0 { + le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps}) + } + if len(cps) < maxLeaseCheckpointBatchSize { + return + } + } +} + +func (le *lessor) clearScheduledLeasesCheckpoints() { + le.leaseCheckpointHeap = make(LeaseQueue, 0) +} + // expireExists returns true if expiry items exist. // It pops only when expiry item exists. // "next" is true, to indicate that it may exist in next attempt. @@ -547,7 +650,7 @@ func (le *lessor) expireExists() (l *Lease, ok bool, next bool) { return nil, false, true } - if time.Now().UnixNano() < item.expiration { + if time.Now().UnixNano() < item.time /* expiration time */ { // Candidate expirations are caught up, reinsert this item // and no need to revoke (nothing is expiry) return l, false, false @@ -588,6 +691,61 @@ func (le *lessor) findExpiredLeases(limit int) []*Lease { return leases } +func (le *lessor) scheduleCheckpointIfNeeded(lease *Lease) { + if le.cp == nil { + return + } + + if lease.RemainingTTL() > int64(le.checkpointInterval.Seconds()) { + if le.lg != nil { + le.lg.Debug("Scheduling lease checkpoint", + zap.Int64("leaseID", int64(lease.ID)), + zap.Duration("intervalSeconds", le.checkpointInterval), + ) + } + heap.Push(&le.leaseCheckpointHeap, &LeaseWithTime{ + id: lease.ID, + time: time.Now().Add(le.checkpointInterval).UnixNano(), + }) + } +} + +func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCheckpoint { + if le.cp == nil { + return nil + } + + now := time.Now() + cps := []*pb.LeaseCheckpoint{} + for le.leaseCheckpointHeap.Len() > 0 && len(cps) < checkpointLimit { + lt := le.leaseCheckpointHeap[0] + if lt.time /* next checkpoint time */ > now.UnixNano() { + return cps + } + heap.Pop(&le.leaseCheckpointHeap) + var l *Lease + var ok bool + if l, ok = le.leaseMap[lt.id]; !ok { + continue + } + if !now.Before(l.expiry) { + continue + } + remainingTTL := int64(math.Ceil(l.expiry.Sub(now).Seconds())) + if remainingTTL >= l.ttl { + continue + } + if le.lg != nil { + le.lg.Debug("Checkpointing lease", + zap.Int64("leaseID", int64(lt.id)), + zap.Int64("remainingTTL", remainingTTL), + ) + } + cps = append(cps, &pb.LeaseCheckpoint{ID: int64(lt.id), Remaining_TTL: remainingTTL}) + } + return cps +} + func (le *lessor) initAndRecover() { tx := le.b.BatchTx() tx.Lock() @@ -617,14 +775,16 @@ func (le *lessor) initAndRecover() { } } heap.Init(&le.leaseHeap) + heap.Init(&le.leaseCheckpointHeap) tx.Unlock() le.b.ForceCommit() } type Lease struct { - ID LeaseID - ttl int64 // time to live in seconds + ID LeaseID + ttl int64 // time to live of the lease in seconds + remainingTTL int64 // remaining time to live in seconds, if zero valued it is considered unset and the full ttl should be used // expiryMu protects concurrent accesses to expiry expiryMu sync.RWMutex // expiry is time when lease should expire. no expiration when expiry.IsZero() is true @@ -643,7 +803,7 @@ func (l *Lease) expired() bool { func (l *Lease) persistTo(b backend.Backend) { key := int64ToBytes(int64(l.ID)) - lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl} + lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL} val, err := lpb.Marshal() if err != nil { panic("failed to marshal lease proto item") @@ -659,9 +819,18 @@ func (l *Lease) TTL() int64 { return l.ttl } +// RemainingTTL returns the last checkpointed remaining TTL of the lease. +// TODO(jpbetz): do not expose this utility method +func (l *Lease) RemainingTTL() int64 { + if l.remainingTTL > 0 { + return l.remainingTTL + } + return l.ttl +} + // refresh refreshes the expiry of the lease. func (l *Lease) refresh(extend time.Duration) { - newExpiry := time.Now().Add(extend + time.Duration(l.ttl)*time.Second) + newExpiry := time.Now().Add(extend + time.Duration(l.RemainingTTL())*time.Second) l.expiryMu.Lock() defer l.expiryMu.Unlock() l.expiry = newExpiry @@ -711,10 +880,14 @@ type FakeLessor struct{} func (fl *FakeLessor) SetRangeDeleter(dr RangeDeleter) {} +func (fl *FakeLessor) SetCheckpointer(cp Checkpointer) {} + func (fl *FakeLessor) Grant(id LeaseID, ttl int64) (*Lease, error) { return nil, nil } func (fl *FakeLessor) Revoke(id LeaseID) error { return nil } +func (fl *FakeLessor) Checkpoint(id LeaseID, remainingTTL int64) error { return nil } + func (fl *FakeLessor) Attach(id LeaseID, items []LeaseItem) error { return nil } func (fl *FakeLessor) GetLease(item LeaseItem) LeaseID { return 0 } diff --git a/tools/etcd-dump-logs/main.go b/tools/etcd-dump-logs/main.go index 0fe2dca4f..9c9b5e5ba 100644 --- a/tools/etcd-dump-logs/main.go +++ b/tools/etcd-dump-logs/main.go @@ -45,7 +45,7 @@ func main() { entrytype := flag.String("entry-type", "", `If set, filters output by entry type. Must be one or more than one of: ConfigChange, Normal, Request, InternalRaftRequest, IRRRange, IRRPut, IRRDeleteRange, IRRTxn, - IRRCompaction, IRRLeaseGrant, IRRLeaseRevoke`) + IRRCompaction, IRRLeaseGrant, IRRLeaseRevoke, IRRLeaseCheckpoint`) streamdecoder := flag.String("stream-decoder", "", `The name of an executable decoding tool, the executable must process hex encoded lines of binary input (from etcd-dump-logs) and output a hex encoded line of binary for each input line`) @@ -203,6 +203,11 @@ func passIRRLeaseRevoke(entry raftpb.Entry) (bool, string) { return entry.Type == raftpb.EntryNormal && rr.Unmarshal(entry.Data) == nil && rr.LeaseRevoke != nil, "InternalRaftRequest" } +func passIRRLeaseCheckpoint(entry raftpb.Entry) (bool, string) { + var rr etcdserverpb.InternalRaftRequest + return entry.Type == raftpb.EntryNormal && rr.Unmarshal(entry.Data) == nil && rr.LeaseCheckpoint != nil, "InternalRaftRequest" +} + func passRequest(entry raftpb.Entry) (bool, string) { var rr1 etcdserverpb.Request var rr2 etcdserverpb.InternalRaftRequest @@ -272,6 +277,7 @@ func evaluateEntrytypeFlag(entrytype string) []EntryFilter { "IRRCompaction": {passIRRCompaction}, "IRRLeaseGrant": {passIRRLeaseGrant}, "IRRLeaseRevoke": {passIRRLeaseRevoke}, + "IRRLeaseCheckpoint": {passIRRLeaseCheckpoint}, } filters := make([]EntryFilter, 0) if len(entrytypelist) == 0 { @@ -288,7 +294,7 @@ func evaluateEntrytypeFlag(entrytype string) []EntryFilter { Please set entry-type to one or more of the following: ConfigChange, Normal, Request, InternalRaftRequest, IRRRange, IRRPut, IRRDeleteRange, IRRTxn, -IRRCompaction, IRRLeaseGrant, IRRLeaseRevoke`, et) +IRRCompaction, IRRLeaseGrant, IRRLeaseRevoke, IRRLeaseCheckpoint`, et) } }