diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 3e1570fef..562d8ad39 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -148,13 +148,21 @@ func (r *raftNode) start(s *EtcdServer) { } atomic.StoreUint64(&r.lead, rd.SoftState.Lead) if rd.RaftState == raft.StateLeader { + // TODO: raft should send server a notification through chan when + // it promotes or demotes instead of modifying server directly. syncC = r.s.SyncTicker + if r.s.lessor != nil { + r.s.lessor.Promote() + } // TODO: remove the nil checking // current test utility does not provide the stats if r.s.stats != nil { r.s.stats.BecomeLeader() } } else { + if r.s.lessor != nil { + r.s.lessor.Demote() + } syncC = nil } } diff --git a/lease/lessor.go b/lease/lessor.go index 4b26377fe..f328a937f 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -16,6 +16,7 @@ package lease import ( "encoding/binary" + "errors" "fmt" "math" "sync" @@ -35,6 +36,7 @@ var ( minLeaseTerm = 5 * time.Second leaseBucketName = []byte("lease") + ErrNotPrimary = errors.New("not a primary lessor") ) type LeaseID int64 @@ -56,12 +58,39 @@ type Lessor interface { // given lease will be removed. If the ID does not exist, an error // will be returned. Revoke(id LeaseID) error + + // Promote promotes the lessor to be the primary lessor. Primary lessor manages + // the expiration and renew of leases. + Promote() + + // Demote demotes the lessor from being the primary lessor. + Demote() + + // Renew renews a lease with given ID. If the ID does not exist, an error + // will be returned. + Renew(id LeaseID) error } // lessor implements Lessor interface. // TODO: use clockwork for testability. type lessor struct { mu sync.Mutex + + // primary indicates if this lessor is the primary lessor. The primary + // lessor manages lease expiration and renew. + // + // in etcd, raft leader is the primary. Thus there might be two primary + // leaders at the same time (raft allows concurrent leader but with different term) + // for at most a leader election timeout. + // The old primary leader cannot affect the correctness since its proposal has a + // smaller term and will not be committed. + // + // TODO: raft follower do not forward lease management proposals. There might be a + // very small window (within second normally which depends on go scheduling) that + // a raft follow is the primary between the raft leader demotion and lessor demotion. + // Usually this should not be a problem. Lease should not be that sensitive to timing. + primary bool + // TODO: probably this should be a heap with a secondary // id index. // Now it is O(N) to loop over the leases to find expired ones. @@ -101,6 +130,8 @@ func newLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor { } l.initAndRecover() + go l.runLoop() + return l } @@ -153,6 +184,10 @@ func (le *lessor) Renew(id LeaseID) error { le.mu.Lock() defer le.mu.Unlock() + if !le.primary { + return ErrNotPrimary + } + l := le.leaseMap[id] if l == nil { return fmt.Errorf("lease: cannot find lease %x", id) @@ -163,6 +198,20 @@ func (le *lessor) Renew(id LeaseID) error { return nil } +func (le *lessor) Promote() { + le.mu.Lock() + defer le.mu.Unlock() + + le.primary = true +} + +func (le *lessor) Demote() { + le.mu.Lock() + defer le.mu.Unlock() + + le.primary = false +} + // Attach attaches items to the lease with given ID. When the lease // expires, the attached items will be automatically removed. // If the given lease does not exist, an error will be returned. @@ -192,6 +241,22 @@ func (le *lessor) Recover(b backend.Backend, dr DeleteableRange) { le.initAndRecover() } +func (le *lessor) runLoop() { + // TODO: stop runLoop + for { + le.mu.Lock() + if le.primary { + le.revokeExpiredLeases(le.findExpiredLeases()) + } + le.mu.Unlock() + time.Sleep(500 * time.Millisecond) + } +} + +func (le *lessor) revokeExpiredLeases(expired []*Lease) { + // TODO: send revoke request to these expired lease through raft. +} + // findExpiredLeases loops all the leases in the leaseMap and returns the expired // leases that needed to be revoked. func (le *lessor) findExpiredLeases() []*Lease { @@ -202,6 +267,8 @@ func (le *lessor) findExpiredLeases() []*Lease { now := time.Now() for _, l := range le.leaseMap { + // TODO: probably should change to <= 100-500 millisecond to + // make up committing latency. if l.expiry.Sub(now) <= 0 { leases = append(leases, l) }