// Copyright 2017 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package grpcproxy import ( "context" "math" "sync" "go.etcd.io/etcd/v3/clientv3" "golang.org/x/time/rate" ) const ( lostLeaderKey = "__lostleader" // watched to detect leader loss retryPerSecond = 10 ) type leader struct { ctx context.Context w clientv3.Watcher // mu protects leaderc updates. mu sync.RWMutex leaderc chan struct{} disconnc chan struct{} donec chan struct{} } func newLeader(ctx context.Context, w clientv3.Watcher) *leader { l := &leader{ ctx: clientv3.WithRequireLeader(ctx), w: w, leaderc: make(chan struct{}), disconnc: make(chan struct{}), donec: make(chan struct{}), } // begin assuming leader is lost close(l.leaderc) go l.recvLoop() return l } func (l *leader) recvLoop() { defer close(l.donec) limiter := rate.NewLimiter(rate.Limit(retryPerSecond), retryPerSecond) rev := int64(math.MaxInt64 - 2) for limiter.Wait(l.ctx) == nil { wch := l.w.Watch(l.ctx, lostLeaderKey, clientv3.WithRev(rev), clientv3.WithCreatedNotify()) cresp, ok := <-wch if !ok { l.loseLeader() continue } if cresp.Err() != nil { l.loseLeader() if clientv3.IsConnCanceled(cresp.Err()) { close(l.disconnc) return } continue } l.gotLeader() <-wch l.loseLeader() } } func (l *leader) loseLeader() { l.mu.RLock() defer l.mu.RUnlock() select { case <-l.leaderc: default: close(l.leaderc) } } // gotLeader will force update the leadership status to having a leader. func (l *leader) gotLeader() { l.mu.Lock() defer l.mu.Unlock() select { case <-l.leaderc: l.leaderc = make(chan struct{}) default: } } func (l *leader) disconnectNotify() <-chan struct{} { return l.disconnc } func (l *leader) stopNotify() <-chan struct{} { return l.donec } // lostNotify returns a channel that is closed if there has been // a leader loss not yet followed by a leader reacquire. func (l *leader) lostNotify() <-chan struct{} { l.mu.RLock() defer l.mu.RUnlock() return l.leaderc }