// Copyright 2016 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package mvcc import ( "fmt" "math" "go.etcd.io/etcd/mvcc/mvccpb" "go.etcd.io/etcd/pkg/adt" ) var ( // watchBatchMaxRevs is the maximum distinct revisions that // may be sent to an unsynced watcher at a time. Declared as // var instead of const for testing purposes. watchBatchMaxRevs = 1000 ) type eventBatch struct { // evs is a batch of revision-ordered events evs []mvccpb.Event // revs is the minimum unique revisions observed for this batch revs int // moreRev is first revision with more events following this batch moreRev int64 } func (eb *eventBatch) add(ev mvccpb.Event) { if eb.revs > watchBatchMaxRevs { // maxed out batch size return } if len(eb.evs) == 0 { // base case eb.revs = 1 eb.evs = append(eb.evs, ev) return } // revision accounting ebRev := eb.evs[len(eb.evs)-1].Kv.ModRevision evRev := ev.Kv.ModRevision if evRev > ebRev { eb.revs++ if eb.revs > watchBatchMaxRevs { eb.moreRev = evRev return } } eb.evs = append(eb.evs, ev) } type watcherBatch map[*watcher]*eventBatch func (wb watcherBatch) add(w *watcher, ev mvccpb.Event) { eb := wb[w] if eb == nil { eb = &eventBatch{} wb[w] = eb } eb.add(ev) } // newWatcherBatch maps watchers to their matched events. It enables quick // events look up by watcher. func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch { if len(wg.watchers) == 0 { return nil } wb := make(watcherBatch) for _, ev := range evs { for w := range wg.watcherSetByKey(string(ev.Kv.Key)) { if ev.Kv.ModRevision >= w.minRev { // don't double notify wb.add(w, ev) } } } return wb } type watcherSet map[*watcher]struct{} func (w watcherSet) add(wa *watcher) { if _, ok := w[wa]; ok { panic("add watcher twice!") } w[wa] = struct{}{} } func (w watcherSet) union(ws watcherSet) { for wa := range ws { w.add(wa) } } func (w watcherSet) delete(wa *watcher) { if _, ok := w[wa]; !ok { panic("removing missing watcher!") } delete(w, wa) } type watcherSetByKey map[string]watcherSet func (w watcherSetByKey) add(wa *watcher) { set := w[string(wa.key)] if set == nil { set = make(watcherSet) w[string(wa.key)] = set } set.add(wa) } func (w watcherSetByKey) delete(wa *watcher) bool { k := string(wa.key) if v, ok := w[k]; ok { if _, ok := v[wa]; ok { delete(v, wa) if len(v) == 0 { // remove the set; nothing left delete(w, k) } return true } } return false } // watcherGroup is a collection of watchers organized by their ranges type watcherGroup struct { // keyWatchers has the watchers that watch on a single key keyWatchers watcherSetByKey // ranges has the watchers that watch a range; it is sorted by interval ranges adt.IntervalTree // watchers is the set of all watchers watchers watcherSet } func newWatcherGroup() watcherGroup { return watcherGroup{ keyWatchers: make(watcherSetByKey), ranges: adt.NewIntervalTree(), watchers: make(watcherSet), } } // add puts a watcher in the group. func (wg *watcherGroup) add(wa *watcher) { wg.watchers.add(wa) if wa.end == nil { wg.keyWatchers.add(wa) return } // interval already registered? ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end)) if iv := wg.ranges.Find(ivl); iv != nil { iv.Val.(watcherSet).add(wa) return } // not registered, put in interval tree ws := make(watcherSet) ws.add(wa) wg.ranges.Insert(ivl, ws) } // contains is whether the given key has a watcher in the group. func (wg *watcherGroup) contains(key string) bool { _, ok := wg.keyWatchers[key] return ok || wg.ranges.Intersects(adt.NewStringAffinePoint(key)) } // size gives the number of unique watchers in the group. func (wg *watcherGroup) size() int { return len(wg.watchers) } // delete removes a watcher from the group. func (wg *watcherGroup) delete(wa *watcher) bool { if _, ok := wg.watchers[wa]; !ok { return false } wg.watchers.delete(wa) if wa.end == nil { wg.keyWatchers.delete(wa) return true } ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end)) iv := wg.ranges.Find(ivl) if iv == nil { return false } ws := iv.Val.(watcherSet) delete(ws, wa) if len(ws) == 0 { // remove interval missing watchers if ok := wg.ranges.Delete(ivl); !ok { panic("could not remove watcher from interval tree") } } return true } // choose selects watchers from the watcher group to update func (wg *watcherGroup) choose(maxWatchers int, curRev, compactRev int64) (*watcherGroup, int64) { if len(wg.watchers) < maxWatchers { return wg, wg.chooseAll(curRev, compactRev) } ret := newWatcherGroup() for w := range wg.watchers { if maxWatchers <= 0 { break } maxWatchers-- ret.add(w) } return &ret, ret.chooseAll(curRev, compactRev) } func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 { minRev := int64(math.MaxInt64) for w := range wg.watchers { if w.minRev > curRev { // after network partition, possibly choosing future revision watcher from restore operation // with watch key "proxy-namespace__lostleader" and revision "math.MaxInt64 - 2" // do not panic when such watcher had been moved from "synced" watcher during restore operation if !w.restore { panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev)) } // mark 'restore' done, since it's chosen w.restore = false } if w.minRev < compactRev { select { case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}: w.compacted = true wg.delete(w) default: // retry next time } continue } if minRev > w.minRev { minRev = w.minRev } } return minRev } // watcherSetByKey gets the set of watchers that receive events on the given key. func (wg *watcherGroup) watcherSetByKey(key string) watcherSet { wkeys := wg.keyWatchers[key] wranges := wg.ranges.Stab(adt.NewStringAffinePoint(key)) // zero-copy cases switch { case len(wranges) == 0: // no need to merge ranges or copy; reuse single-key set return wkeys case len(wranges) == 0 && len(wkeys) == 0: return nil case len(wranges) == 1 && len(wkeys) == 0: return wranges[0].Val.(watcherSet) } // copy case ret := make(watcherSet) ret.union(wg.keyWatchers[key]) for _, item := range wranges { ret.union(item.Val.(watcherSet)) } return ret }