From 807db7e2aa874d264c822d063d4a609581def588 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 2 Jan 2016 20:20:22 -0800 Subject: [PATCH] storage: rename watching -> watcher --- storage/metrics.go | 16 ++-- storage/watchable_store.go | 130 +++++++++++++------------- storage/watchable_store_bench_test.go | 4 +- storage/watchable_store_test.go | 99 ++++++++++---------- storage/watcher.go | 17 ++-- storage/watcher_test.go | 6 +- 6 files changed, 137 insertions(+), 135 deletions(-) diff --git a/storage/metrics.go b/storage/metrics.go index 02aad13e8..8f01a732f 100644 --- a/storage/metrics.go +++ b/storage/metrics.go @@ -67,20 +67,20 @@ var ( Help: "Total number of watch streams.", }) - watchingGauge = prometheus.NewGauge( + watcherGauge = prometheus.NewGauge( prometheus.GaugeOpts{ Namespace: "etcd", Subsystem: "storage", - Name: "watching_total", - Help: "Total number of watchings.", + Name: "watcher_total", + Help: "Total number of watchers.", }) - slowWatchingGauge = prometheus.NewGauge( + slowWatcherGauge = prometheus.NewGauge( prometheus.GaugeOpts{ Namespace: "etcd", Subsystem: "storage", - Name: "slow_watching_total", - Help: "Total number of unsynced slow watchings.", + Name: "slow_watcher_total", + Help: "Total number of unsynced slow watchers.", }) totalEventsCounter = prometheus.NewCounter( @@ -144,8 +144,8 @@ func init() { prometheus.MustRegister(txnCounter) prometheus.MustRegister(keysGauge) prometheus.MustRegister(watchStreamGauge) - prometheus.MustRegister(watchingGauge) - prometheus.MustRegister(slowWatchingGauge) + prometheus.MustRegister(watcherGauge) + prometheus.MustRegister(slowWatcherGauge) prometheus.MustRegister(totalEventsCounter) prometheus.MustRegister(pendingEventsGauge) prometheus.MustRegister(indexCompactionPauseDurations) diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 1527d349f..41847b9e0 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -33,7 +33,7 @@ const ( ) type watchable interface { - watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watching, CancelFunc) + watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watcher, CancelFunc) } type watchableStore struct { @@ -41,12 +41,12 @@ type watchableStore struct { *store - // contains all unsynced watching that needs to sync events that have happened - unsynced map[*watching]struct{} + // contains all unsynced watchers that needs to sync with events that have happened + unsynced map[*watcher]struct{} - // contains all synced watching that are tracking the events that will happen - // The key of the map is the key that the watching is watching on. - synced map[string]map[*watching]struct{} + // contains all synced watchers that are in sync with the progress of the store. + // The key of the map is the key that the watcher watches on. + synced map[string]map[*watcher]struct{} tx *ongoingTx stopc chan struct{} @@ -56,12 +56,12 @@ type watchableStore struct { func newWatchableStore(path string) *watchableStore { s := &watchableStore{ store: newDefaultStore(path), - unsynced: make(map[*watching]struct{}), - synced: make(map[string]map[*watching]struct{}), + unsynced: make(map[*watcher]struct{}), + synced: make(map[string]map[*watcher]struct{}), stopc: make(chan struct{}), } s.wg.Add(1) - go s.syncWatchingsLoop() + go s.syncWatchersLoop() return s } @@ -185,11 +185,11 @@ func (s *watchableStore) NewWatchStream() WatchStream { } } -func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watching, CancelFunc) { +func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watcher, CancelFunc) { s.mu.Lock() defer s.mu.Unlock() - wa := &watching{ + wa := &watcher{ key: key, prefix: prefix, cur: startRev, @@ -199,23 +199,23 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch c k := string(key) if startRev == 0 { - if err := unsafeAddWatching(&s.synced, k, wa); err != nil { - log.Panicf("error unsafeAddWatching (%v) for key %s", err, k) + if err := unsafeAddWatcher(&s.synced, k, wa); err != nil { + log.Panicf("error unsafeAddWatcher (%v) for key %s", err, k) } } else { - slowWatchingGauge.Inc() + slowWatcherGauge.Inc() s.unsynced[wa] = struct{}{} } - watchingGauge.Inc() + watcherGauge.Inc() cancel := CancelFunc(func() { s.mu.Lock() defer s.mu.Unlock() - // remove global references of the watching + // remove global references of the watcher if _, ok := s.unsynced[wa]; ok { delete(s.unsynced, wa) - slowWatchingGauge.Dec() - watchingGauge.Dec() + slowWatcherGauge.Dec() + watcherGauge.Dec() return } @@ -227,7 +227,7 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch c if len(v) == 0 { delete(s.synced, k) } - watchingGauge.Dec() + watcherGauge.Dec() } } // If we cannot find it, it should have finished watch. @@ -236,13 +236,13 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch c return wa, cancel } -// syncWatchingsLoop syncs the watching in the unsyncd map every 100ms. -func (s *watchableStore) syncWatchingsLoop() { +// syncWatchersLoop syncs the watcher in the unsyncd map every 100ms. +func (s *watchableStore) syncWatchersLoop() { defer s.wg.Done() for { s.mu.Lock() - s.syncWatchings() + s.syncWatchers() s.mu.Unlock() select { @@ -253,12 +253,12 @@ func (s *watchableStore) syncWatchingsLoop() { } } -// syncWatchings periodically syncs unsynced watchings by: Iterate all unsynced -// watchings to get the minimum revision within its range, skipping the -// watching if its current revision is behind the compact revision of the +// syncWatchers periodically syncs unsynced watchers by: Iterate all unsynced +// watchers to get the minimum revision within its range, skipping the +// watcher if its current revision is behind the compact revision of the // store. And use this minimum revision to get all key-value pairs. Then send -// those events to watchings. -func (s *watchableStore) syncWatchings() { +// those events to watchers. +func (s *watchableStore) syncWatchers() { s.store.mu.Lock() defer s.store.mu.Unlock() @@ -266,7 +266,7 @@ func (s *watchableStore) syncWatchings() { return } - // in order to find key-value pairs from unsynced watchings, we need to + // in order to find key-value pairs from unsynced watchers, we need to // find min revision index, and these revisions can be used to // query the backend store of key-value pairs minRev := int64(math.MaxInt64) @@ -275,17 +275,17 @@ func (s *watchableStore) syncWatchings() { compactionRev := s.store.compactMainRev // TODO: change unsynced struct type same to this - keyToUnsynced := make(map[string]map[*watching]struct{}) + keyToUnsynced := make(map[string]map[*watcher]struct{}) for w := range s.unsynced { k := string(w.key) if w.cur > curRev { - panic("watching current revision should not exceed current revision") + panic("watcher current revision should not exceed current revision") } if w.cur < compactionRev { - // TODO: return error compacted to that watching instead of + // TODO: return error compacted to that watcher instead of // just removing it sliently from unsynced. delete(s.unsynced, w) continue @@ -296,7 +296,7 @@ func (s *watchableStore) syncWatchings() { } if _, ok := keyToUnsynced[k]; !ok { - keyToUnsynced[k] = make(map[*watching]struct{}) + keyToUnsynced[k] = make(map[*watcher]struct{}) } keyToUnsynced[k][w] = struct{}{} } @@ -338,35 +338,35 @@ func (s *watchableStore) syncWatchings() { evs = append(evs, ev) } - for w, es := range newWatchingToEventMap(keyToUnsynced, evs) { + for w, es := range newWatcherToEventMap(keyToUnsynced, evs) { select { case w.ch <- es: pendingEventsGauge.Add(float64(len(es))) default: - // TODO: handle the full unsynced watchings. - // continue to process other watchings for now, the full ones + // TODO: handle the full unsynced watchers. + // continue to process other watchers for now, the full ones // will be processed next time and hopefully it will not be full. continue } k := string(w.key) - if err := unsafeAddWatching(&s.synced, k, w); err != nil { - log.Panicf("error unsafeAddWatching (%v) for key %s", err, k) + if err := unsafeAddWatcher(&s.synced, k, w); err != nil { + log.Panicf("error unsafeAddWatcher (%v) for key %s", err, k) } delete(s.unsynced, w) } - slowWatchingGauge.Set(float64(len(s.unsynced))) + slowWatcherGauge.Set(float64(len(s.unsynced))) } -// handle handles the change of the happening event on all watchings. +// handle handles the change of the happening event on all watchers. func (s *watchableStore) handle(rev int64, evs []storagepb.Event) { s.notify(rev, evs) } // notify notifies the fact that given event at the given rev just happened to -// watchings that watch on the key of the event. +// watchers that watch on the key of the event. func (s *watchableStore) notify(rev int64, evs []storagepb.Event) { - we := newWatchingToEventMap(s.synced, evs) + we := newWatcherToEventMap(s.synced, evs) for _, wm := range s.synced { for w := range wm { if _, ok := we[w]; !ok { @@ -377,11 +377,11 @@ func (s *watchableStore) notify(rev int64, evs []storagepb.Event) { case w.ch <- es: pendingEventsGauge.Add(float64(len(es))) default: - // move slow watching to unsynced + // move slow watcher to unsynced w.cur = rev s.unsynced[w] = struct{}{} delete(wm, w) - slowWatchingGauge.Inc() + slowWatcherGauge.Inc() } } } @@ -414,52 +414,52 @@ func (tx *ongoingTx) del(k string) { } } -type watching struct { - // the watching key +type watcher struct { + // the watcher key key []byte - // prefix indicates if watching is on a key or a prefix. - // If prefix is true, the watching is on a prefix. + // prefix indicates if watcher is on a key or a prefix. + // If prefix is true, the watcher is on a prefix. prefix bool - // cur is the current watching revision. + // cur is the current watcher revision. // If cur is behind the current revision of the KV, - // watching is unsynced and needs to catch up. + // watcher is unsynced and needs to catch up. cur int64 id int64 // a chan to send out the watched events. - // The chan might be shared with other watchings. + // The chan might be shared with other watchers. ch chan<- []storagepb.Event } -// unsafeAddWatching puts watching with key k into watchableStore's synced. +// unsafeAddWatcher puts watcher with key k into watchableStore's synced. // Make sure to this is thread-safe using mutex before and after. -func unsafeAddWatching(synced *map[string]map[*watching]struct{}, k string, wa *watching) error { +func unsafeAddWatcher(synced *map[string]map[*watcher]struct{}, k string, wa *watcher) error { if wa == nil { - return fmt.Errorf("nil watching received") + return fmt.Errorf("nil watcher received") } mp := *synced if v, ok := mp[k]; ok { if _, ok := v[wa]; ok { - return fmt.Errorf("put the same watch twice: %+v", wa) + return fmt.Errorf("put the same watcher twice: %+v", wa) } else { v[wa] = struct{}{} } return nil } - mp[k] = make(map[*watching]struct{}) + mp[k] = make(map[*watcher]struct{}) mp[k][wa] = struct{}{} return nil } -// newWatchingToEventMap creates a map that has watching as key and events as -// value. It enables quick events look up by watching. -func newWatchingToEventMap(sm map[string]map[*watching]struct{}, evs []storagepb.Event) map[*watching][]storagepb.Event { - watchingToEvents := make(map[*watching][]storagepb.Event) +// newWatcherToEventMap creates a map that has watcher as key and events as +// value. It enables quick events look up by watcher. +func newWatcherToEventMap(sm map[string]map[*watcher]struct{}, evs []storagepb.Event) map[*watcher][]storagepb.Event { + watcherToEvents := make(map[*watcher][]storagepb.Event) for _, ev := range evs { key := string(ev.Kv.Key) - // check all prefixes of the key to notify all corresponded watchings + // check all prefixes of the key to notify all corresponded watchers for i := 0; i <= len(key); i++ { k := string(key[:i]) @@ -469,20 +469,20 @@ func newWatchingToEventMap(sm map[string]map[*watching]struct{}, evs []storagepb } for w := range wm { - // the watching needs to be notified when either it watches prefix or + // the watcher needs to be notified when either it watches prefix or // the key is exactly matched. if !w.prefix && i != len(ev.Kv.Key) { continue } ev.WatchID = w.id - if _, ok := watchingToEvents[w]; !ok { - watchingToEvents[w] = []storagepb.Event{} + if _, ok := watcherToEvents[w]; !ok { + watcherToEvents[w] = []storagepb.Event{} } - watchingToEvents[w] = append(watchingToEvents[w], ev) + watcherToEvents[w] = append(watcherToEvents[w], ev) } } } - return watchingToEvents + return watcherToEvents } diff --git a/storage/watchable_store_bench_test.go b/storage/watchable_store_bench_test.go index e7b9be5d1..350b7063e 100644 --- a/storage/watchable_store_bench_test.go +++ b/storage/watchable_store_bench_test.go @@ -34,11 +34,11 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { // in unsynced for this benchmark. s := &watchableStore{ store: newDefaultStore(tmpPath), - unsynced: make(map[*watching]struct{}), + unsynced: make(map[*watcher]struct{}), // to make the test not crash from assigning to nil map. // 'synced' doesn't get populated in this test. - synced: make(map[string]map[*watching]struct{}), + synced: make(map[string]map[*watcher]struct{}), } defer func() { diff --git a/storage/watchable_store_test.go b/storage/watchable_store_test.go index 81c8511e5..2895f637e 100644 --- a/storage/watchable_store_test.go +++ b/storage/watchable_store_test.go @@ -63,7 +63,7 @@ func TestNewWatcherCancel(t *testing.T) { } } -// TestCancelUnsynced tests if running CancelFunc removes watchings from unsynced. +// TestCancelUnsynced tests if running CancelFunc removes watchers from unsynced. func TestCancelUnsynced(t *testing.T) { // manually create watchableStore instead of newWatchableStore // because newWatchableStore automatically calls syncWatchers @@ -71,11 +71,11 @@ func TestCancelUnsynced(t *testing.T) { // in unsynced to test if syncWatchers works as expected. s := &watchableStore{ store: newDefaultStore(tmpPath), - unsynced: make(map[*watching]struct{}), + unsynced: make(map[*watcher]struct{}), // to make the test not crash from assigning to nil map. // 'synced' doesn't get populated in this test. - synced: make(map[string]map[*watching]struct{}), + synced: make(map[string]map[*watcher]struct{}), } defer func() { @@ -112,21 +112,20 @@ func TestCancelUnsynced(t *testing.T) { // After running CancelFunc // // unsynced should be empty - // because cancel removes watching from unsynced + // because cancel removes watcher from unsynced if len(s.unsynced) != 0 { t.Errorf("unsynced size = %d, want 0", len(s.unsynced)) } } -// TestSyncWatchings populates unsynced watching map and -// tests syncWatchings method to see if it correctly sends -// events to channel of unsynced watchings and moves these -// watchings to synced. -func TestSyncWatchings(t *testing.T) { +// TestSyncWatchers populates unsynced watcher map and tests syncWatchers +// method to see if it correctly sends events to channel of unsynced watchers +// and moves these watchers to synced. +func TestSyncWatchers(t *testing.T) { s := &watchableStore{ store: newDefaultStore(tmpPath), - unsynced: make(map[*watching]struct{}), - synced: make(map[string]map[*watching]struct{}), + unsynced: make(map[*watcher]struct{}), + synced: make(map[string]map[*watcher]struct{}), } defer func() { @@ -148,7 +147,7 @@ func TestSyncWatchings(t *testing.T) { w.Watch(testKey, true, 1) } - // Before running s.syncWatchings() + // Before running s.syncWatchers() // // synced should be empty // because we manually populate unsynced only @@ -161,27 +160,27 @@ func TestSyncWatchings(t *testing.T) { t.Errorf("unsynced size = %d, want %d", len(s.unsynced), watcherN) } - // this should move all unsynced watchings + // this should move all unsynced watchers // to synced ones - s.syncWatchings() + s.syncWatchers() - // After running s.syncWatchings() + // After running s.syncWatchers() // // synced should not be empty - // because syncWatchings populates synced + // because syncwatchers populates synced // in this test case if len(s.synced[string(testKey)]) == 0 { t.Errorf("synced[string(testKey)] size = 0, want %d", len(s.synced[string(testKey)])) } // unsynced should be empty - // because syncWatchings is expected to move - // all watchings from unsynced to synced + // because syncwatchers is expected to move + // all watchers from unsynced to synced // in this test case if len(s.unsynced) != 0 { t.Errorf("unsynced size = %d, want 0", len(s.unsynced)) } - // All of the watchings actually share one channel + // All of the watchers actually share one channel // so we only need to check one shared channel // (See watcher.go for more detail). if len(w.(*watchStream).ch) != watcherN { @@ -202,7 +201,7 @@ func TestSyncWatchings(t *testing.T) { } } -func TestUnsafeAddWatching(t *testing.T) { +func TestUnsafeAddWatcher(t *testing.T) { s := newWatchableStore(tmpPath) defer func() { s.store.Close() @@ -213,18 +212,18 @@ func TestUnsafeAddWatching(t *testing.T) { s.Put(testKey, testValue) size := 10 - ws := make([]*watching, size) + ws := make([]*watcher, size) for i := 0; i < size; i++ { - ws[i] = &watching{ + ws[i] = &watcher{ key: testKey, prefix: true, cur: 0, } } - // to test if unsafeAddWatching is correctly updating - // synced map when adding new watching. + // to test if unsafeAddWatcher is correctly updating + // synced map when adding new watcher. for i, wa := range ws { - if err := unsafeAddWatching(&s.synced, string(testKey), wa); err != nil { + if err := unsafeAddWatcher(&s.synced, string(testKey), wa); err != nil { t.Errorf("#%d: error = %v, want nil", i, err) } if v, ok := s.synced[string(testKey)]; !ok { @@ -240,11 +239,11 @@ func TestUnsafeAddWatching(t *testing.T) { } } -func TestNewMapWatchingToEventMap(t *testing.T) { +func TestNewMapwatcherToEventMap(t *testing.T) { k0, k1, k2 := []byte("foo0"), []byte("foo1"), []byte("foo2") v0, v1, v2 := []byte("bar0"), []byte("bar1"), []byte("bar2") - ws := []*watching{{key: k0}, {key: k1}, {key: k2}} + ws := []*watcher{{key: k0}, {key: k1}, {key: k2}} evs := []storagepb.Event{ { @@ -262,63 +261,63 @@ func TestNewMapWatchingToEventMap(t *testing.T) { } tests := []struct { - sync map[string]map[*watching]struct{} + sync map[string]map[*watcher]struct{} evs []storagepb.Event - wwe map[*watching][]storagepb.Event + wwe map[*watcher][]storagepb.Event }{ - // no watching in sync, some events should return empty wwe + // no watcher in sync, some events should return empty wwe { - map[string]map[*watching]struct{}{}, + map[string]map[*watcher]struct{}{}, evs, - map[*watching][]storagepb.Event{}, + map[*watcher][]storagepb.Event{}, }, - // one watching in sync, one event that does not match the key of that - // watching should return empty wwe + // one watcher in sync, one event that does not match the key of that + // watcher should return empty wwe { - map[string]map[*watching]struct{}{ + map[string]map[*watcher]struct{}{ string(k2): {ws[2]: struct{}{}}, }, evs[:1], - map[*watching][]storagepb.Event{}, + map[*watcher][]storagepb.Event{}, }, - // one watching in sync, one event that matches the key of that - // watching should return wwe with that matching watching + // one watcher in sync, one event that matches the key of that + // watcher should return wwe with that matching watcher { - map[string]map[*watching]struct{}{ + map[string]map[*watcher]struct{}{ string(k1): {ws[1]: struct{}{}}, }, evs[1:2], - map[*watching][]storagepb.Event{ + map[*watcher][]storagepb.Event{ ws[1]: evs[1:2], }, }, - // two watchings in sync that watches two different keys, one event - // that matches the key of only one of the watching should return wwe - // with the matching watching + // two watchers in sync that watches two different keys, one event + // that matches the key of only one of the watcher should return wwe + // with the matching watcher { - map[string]map[*watching]struct{}{ + map[string]map[*watcher]struct{}{ string(k0): {ws[0]: struct{}{}}, string(k2): {ws[2]: struct{}{}}, }, evs[2:], - map[*watching][]storagepb.Event{ + map[*watcher][]storagepb.Event{ ws[2]: evs[2:], }, }, - // two watchings in sync that watches the same key, two events that - // match the keys should return wwe with those two watchings + // two watchers in sync that watches the same key, two events that + // match the keys should return wwe with those two watchers { - map[string]map[*watching]struct{}{ + map[string]map[*watcher]struct{}{ string(k0): {ws[0]: struct{}{}}, string(k1): {ws[1]: struct{}{}}, }, evs[:2], - map[*watching][]storagepb.Event{ + map[*watcher][]storagepb.Event{ ws[0]: evs[:1], ws[1]: evs[1:2], }, @@ -326,7 +325,7 @@ func TestNewMapWatchingToEventMap(t *testing.T) { } for i, tt := range tests { - gwe := newWatchingToEventMap(tt.sync, tt.evs) + gwe := newWatcherToEventMap(tt.sync, tt.evs) if len(gwe) != len(tt.wwe) { t.Errorf("#%d: len(gwe) got = %d, want = %d", i, len(gwe), len(tt.wwe)) } diff --git a/storage/watcher.go b/storage/watcher.go index 09b9ec8cb..cc66c81ef 100644 --- a/storage/watcher.go +++ b/storage/watcher.go @@ -21,13 +21,15 @@ import ( ) type WatchStream interface { - // Watch watches the events happening or happened on the given key - // or key prefix from the given startRev. + // Watch creates a watcher. The watcher watches the events happening or + // happened on the given key or key prefix from the given startRev. + // // The whole event history can be watched unless compacted. // If `prefix` is true, watch observes all events whose key prefix could be the given `key`. // If `startRev` <=0, watch observes events after currentRev. - // The returned `id` is the ID of this watching. It appears as WatchID - // in events that are sent to this watching. + // + // The returned `id` is the ID of this watcher. It appears as WatchID + // in events that are sent to the created watcher through stream channel. Watch(key []byte, prefix bool, startRev int64) (id int64, cancel CancelFunc) // Chan returns a chan. All watched events will be sent to the returned chan. @@ -37,14 +39,15 @@ type WatchStream interface { Close() } -// watchStream contains a collection of watching that share +// watchStream contains a collection of watchers that share // one streaming chan to send out watched events and other control events. type watchStream struct { watchable watchable ch chan []storagepb.Event - mu sync.Mutex // guards fields below it - nextID int64 // nextID is the ID allocated for next new watching + mu sync.Mutex // guards fields below it + // nextID is the ID pre-allocated for next new watcher in this stream + nextID int64 closed bool cancels []CancelFunc } diff --git a/storage/watcher_test.go b/storage/watcher_test.go index 060a189f9..af92aa743 100644 --- a/storage/watcher_test.go +++ b/storage/watcher_test.go @@ -16,8 +16,8 @@ package storage import "testing" -// TestWatcherWatchID tests that each watcher provides unique watch ID, -// and the watched event attaches the correct watch ID. +// TestWatcherWatchID tests that each watcher provides unique watchID, +// and the watched event attaches the correct watchID. func TestWatcherWatchID(t *testing.T) { s := WatchableKV(newWatchableStore(tmpPath)) defer cleanup(s, tmpPath) @@ -47,7 +47,7 @@ func TestWatcherWatchID(t *testing.T) { s.Put([]byte("foo2"), []byte("bar")) - // unsynced watchings + // unsynced watchers for i := 10; i < 20; i++ { id, cancel := w.Watch([]byte("foo2"), false, 1) if _, ok := idm[id]; ok {