storage: rename watching -> watcher
parent
34187a4fbe
commit
807db7e2aa
|
@ -67,20 +67,20 @@ var (
|
|||
Help: "Total number of watch streams.",
|
||||
})
|
||||
|
||||
watchingGauge = prometheus.NewGauge(
|
||||
watcherGauge = prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "storage",
|
||||
Name: "watching_total",
|
||||
Help: "Total number of watchings.",
|
||||
Name: "watcher_total",
|
||||
Help: "Total number of watchers.",
|
||||
})
|
||||
|
||||
slowWatchingGauge = prometheus.NewGauge(
|
||||
slowWatcherGauge = prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "storage",
|
||||
Name: "slow_watching_total",
|
||||
Help: "Total number of unsynced slow watchings.",
|
||||
Name: "slow_watcher_total",
|
||||
Help: "Total number of unsynced slow watchers.",
|
||||
})
|
||||
|
||||
totalEventsCounter = prometheus.NewCounter(
|
||||
|
@ -144,8 +144,8 @@ func init() {
|
|||
prometheus.MustRegister(txnCounter)
|
||||
prometheus.MustRegister(keysGauge)
|
||||
prometheus.MustRegister(watchStreamGauge)
|
||||
prometheus.MustRegister(watchingGauge)
|
||||
prometheus.MustRegister(slowWatchingGauge)
|
||||
prometheus.MustRegister(watcherGauge)
|
||||
prometheus.MustRegister(slowWatcherGauge)
|
||||
prometheus.MustRegister(totalEventsCounter)
|
||||
prometheus.MustRegister(pendingEventsGauge)
|
||||
prometheus.MustRegister(indexCompactionPauseDurations)
|
||||
|
|
|
@ -33,7 +33,7 @@ const (
|
|||
)
|
||||
|
||||
type watchable interface {
|
||||
watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watching, CancelFunc)
|
||||
watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watcher, CancelFunc)
|
||||
}
|
||||
|
||||
type watchableStore struct {
|
||||
|
@ -41,12 +41,12 @@ type watchableStore struct {
|
|||
|
||||
*store
|
||||
|
||||
// contains all unsynced watching that needs to sync events that have happened
|
||||
unsynced map[*watching]struct{}
|
||||
// contains all unsynced watchers that needs to sync with events that have happened
|
||||
unsynced map[*watcher]struct{}
|
||||
|
||||
// contains all synced watching that are tracking the events that will happen
|
||||
// The key of the map is the key that the watching is watching on.
|
||||
synced map[string]map[*watching]struct{}
|
||||
// contains all synced watchers that are in sync with the progress of the store.
|
||||
// The key of the map is the key that the watcher watches on.
|
||||
synced map[string]map[*watcher]struct{}
|
||||
tx *ongoingTx
|
||||
|
||||
stopc chan struct{}
|
||||
|
@ -56,12 +56,12 @@ type watchableStore struct {
|
|||
func newWatchableStore(path string) *watchableStore {
|
||||
s := &watchableStore{
|
||||
store: newDefaultStore(path),
|
||||
unsynced: make(map[*watching]struct{}),
|
||||
synced: make(map[string]map[*watching]struct{}),
|
||||
unsynced: make(map[*watcher]struct{}),
|
||||
synced: make(map[string]map[*watcher]struct{}),
|
||||
stopc: make(chan struct{}),
|
||||
}
|
||||
s.wg.Add(1)
|
||||
go s.syncWatchingsLoop()
|
||||
go s.syncWatchersLoop()
|
||||
return s
|
||||
}
|
||||
|
||||
|
@ -185,11 +185,11 @@ func (s *watchableStore) NewWatchStream() WatchStream {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watching, CancelFunc) {
|
||||
func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watcher, CancelFunc) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
wa := &watching{
|
||||
wa := &watcher{
|
||||
key: key,
|
||||
prefix: prefix,
|
||||
cur: startRev,
|
||||
|
@ -199,23 +199,23 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch c
|
|||
|
||||
k := string(key)
|
||||
if startRev == 0 {
|
||||
if err := unsafeAddWatching(&s.synced, k, wa); err != nil {
|
||||
log.Panicf("error unsafeAddWatching (%v) for key %s", err, k)
|
||||
if err := unsafeAddWatcher(&s.synced, k, wa); err != nil {
|
||||
log.Panicf("error unsafeAddWatcher (%v) for key %s", err, k)
|
||||
}
|
||||
} else {
|
||||
slowWatchingGauge.Inc()
|
||||
slowWatcherGauge.Inc()
|
||||
s.unsynced[wa] = struct{}{}
|
||||
}
|
||||
watchingGauge.Inc()
|
||||
watcherGauge.Inc()
|
||||
|
||||
cancel := CancelFunc(func() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
// remove global references of the watching
|
||||
// remove global references of the watcher
|
||||
if _, ok := s.unsynced[wa]; ok {
|
||||
delete(s.unsynced, wa)
|
||||
slowWatchingGauge.Dec()
|
||||
watchingGauge.Dec()
|
||||
slowWatcherGauge.Dec()
|
||||
watcherGauge.Dec()
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -227,7 +227,7 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch c
|
|||
if len(v) == 0 {
|
||||
delete(s.synced, k)
|
||||
}
|
||||
watchingGauge.Dec()
|
||||
watcherGauge.Dec()
|
||||
}
|
||||
}
|
||||
// If we cannot find it, it should have finished watch.
|
||||
|
@ -236,13 +236,13 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch c
|
|||
return wa, cancel
|
||||
}
|
||||
|
||||
// syncWatchingsLoop syncs the watching in the unsyncd map every 100ms.
|
||||
func (s *watchableStore) syncWatchingsLoop() {
|
||||
// syncWatchersLoop syncs the watcher in the unsyncd map every 100ms.
|
||||
func (s *watchableStore) syncWatchersLoop() {
|
||||
defer s.wg.Done()
|
||||
|
||||
for {
|
||||
s.mu.Lock()
|
||||
s.syncWatchings()
|
||||
s.syncWatchers()
|
||||
s.mu.Unlock()
|
||||
|
||||
select {
|
||||
|
@ -253,12 +253,12 @@ func (s *watchableStore) syncWatchingsLoop() {
|
|||
}
|
||||
}
|
||||
|
||||
// syncWatchings periodically syncs unsynced watchings by: Iterate all unsynced
|
||||
// watchings to get the minimum revision within its range, skipping the
|
||||
// watching if its current revision is behind the compact revision of the
|
||||
// syncWatchers periodically syncs unsynced watchers by: Iterate all unsynced
|
||||
// watchers to get the minimum revision within its range, skipping the
|
||||
// watcher if its current revision is behind the compact revision of the
|
||||
// store. And use this minimum revision to get all key-value pairs. Then send
|
||||
// those events to watchings.
|
||||
func (s *watchableStore) syncWatchings() {
|
||||
// those events to watchers.
|
||||
func (s *watchableStore) syncWatchers() {
|
||||
s.store.mu.Lock()
|
||||
defer s.store.mu.Unlock()
|
||||
|
||||
|
@ -266,7 +266,7 @@ func (s *watchableStore) syncWatchings() {
|
|||
return
|
||||
}
|
||||
|
||||
// in order to find key-value pairs from unsynced watchings, we need to
|
||||
// in order to find key-value pairs from unsynced watchers, we need to
|
||||
// find min revision index, and these revisions can be used to
|
||||
// query the backend store of key-value pairs
|
||||
minRev := int64(math.MaxInt64)
|
||||
|
@ -275,17 +275,17 @@ func (s *watchableStore) syncWatchings() {
|
|||
compactionRev := s.store.compactMainRev
|
||||
|
||||
// TODO: change unsynced struct type same to this
|
||||
keyToUnsynced := make(map[string]map[*watching]struct{})
|
||||
keyToUnsynced := make(map[string]map[*watcher]struct{})
|
||||
|
||||
for w := range s.unsynced {
|
||||
k := string(w.key)
|
||||
|
||||
if w.cur > curRev {
|
||||
panic("watching current revision should not exceed current revision")
|
||||
panic("watcher current revision should not exceed current revision")
|
||||
}
|
||||
|
||||
if w.cur < compactionRev {
|
||||
// TODO: return error compacted to that watching instead of
|
||||
// TODO: return error compacted to that watcher instead of
|
||||
// just removing it sliently from unsynced.
|
||||
delete(s.unsynced, w)
|
||||
continue
|
||||
|
@ -296,7 +296,7 @@ func (s *watchableStore) syncWatchings() {
|
|||
}
|
||||
|
||||
if _, ok := keyToUnsynced[k]; !ok {
|
||||
keyToUnsynced[k] = make(map[*watching]struct{})
|
||||
keyToUnsynced[k] = make(map[*watcher]struct{})
|
||||
}
|
||||
keyToUnsynced[k][w] = struct{}{}
|
||||
}
|
||||
|
@ -338,35 +338,35 @@ func (s *watchableStore) syncWatchings() {
|
|||
evs = append(evs, ev)
|
||||
}
|
||||
|
||||
for w, es := range newWatchingToEventMap(keyToUnsynced, evs) {
|
||||
for w, es := range newWatcherToEventMap(keyToUnsynced, evs) {
|
||||
select {
|
||||
case w.ch <- es:
|
||||
pendingEventsGauge.Add(float64(len(es)))
|
||||
default:
|
||||
// TODO: handle the full unsynced watchings.
|
||||
// continue to process other watchings for now, the full ones
|
||||
// TODO: handle the full unsynced watchers.
|
||||
// continue to process other watchers for now, the full ones
|
||||
// will be processed next time and hopefully it will not be full.
|
||||
continue
|
||||
}
|
||||
k := string(w.key)
|
||||
if err := unsafeAddWatching(&s.synced, k, w); err != nil {
|
||||
log.Panicf("error unsafeAddWatching (%v) for key %s", err, k)
|
||||
if err := unsafeAddWatcher(&s.synced, k, w); err != nil {
|
||||
log.Panicf("error unsafeAddWatcher (%v) for key %s", err, k)
|
||||
}
|
||||
delete(s.unsynced, w)
|
||||
}
|
||||
|
||||
slowWatchingGauge.Set(float64(len(s.unsynced)))
|
||||
slowWatcherGauge.Set(float64(len(s.unsynced)))
|
||||
}
|
||||
|
||||
// handle handles the change of the happening event on all watchings.
|
||||
// handle handles the change of the happening event on all watchers.
|
||||
func (s *watchableStore) handle(rev int64, evs []storagepb.Event) {
|
||||
s.notify(rev, evs)
|
||||
}
|
||||
|
||||
// notify notifies the fact that given event at the given rev just happened to
|
||||
// watchings that watch on the key of the event.
|
||||
// watchers that watch on the key of the event.
|
||||
func (s *watchableStore) notify(rev int64, evs []storagepb.Event) {
|
||||
we := newWatchingToEventMap(s.synced, evs)
|
||||
we := newWatcherToEventMap(s.synced, evs)
|
||||
for _, wm := range s.synced {
|
||||
for w := range wm {
|
||||
if _, ok := we[w]; !ok {
|
||||
|
@ -377,11 +377,11 @@ func (s *watchableStore) notify(rev int64, evs []storagepb.Event) {
|
|||
case w.ch <- es:
|
||||
pendingEventsGauge.Add(float64(len(es)))
|
||||
default:
|
||||
// move slow watching to unsynced
|
||||
// move slow watcher to unsynced
|
||||
w.cur = rev
|
||||
s.unsynced[w] = struct{}{}
|
||||
delete(wm, w)
|
||||
slowWatchingGauge.Inc()
|
||||
slowWatcherGauge.Inc()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -414,52 +414,52 @@ func (tx *ongoingTx) del(k string) {
|
|||
}
|
||||
}
|
||||
|
||||
type watching struct {
|
||||
// the watching key
|
||||
type watcher struct {
|
||||
// the watcher key
|
||||
key []byte
|
||||
// prefix indicates if watching is on a key or a prefix.
|
||||
// If prefix is true, the watching is on a prefix.
|
||||
// prefix indicates if watcher is on a key or a prefix.
|
||||
// If prefix is true, the watcher is on a prefix.
|
||||
prefix bool
|
||||
// cur is the current watching revision.
|
||||
// cur is the current watcher revision.
|
||||
// If cur is behind the current revision of the KV,
|
||||
// watching is unsynced and needs to catch up.
|
||||
// watcher is unsynced and needs to catch up.
|
||||
cur int64
|
||||
id int64
|
||||
|
||||
// a chan to send out the watched events.
|
||||
// The chan might be shared with other watchings.
|
||||
// The chan might be shared with other watchers.
|
||||
ch chan<- []storagepb.Event
|
||||
}
|
||||
|
||||
// unsafeAddWatching puts watching with key k into watchableStore's synced.
|
||||
// unsafeAddWatcher puts watcher with key k into watchableStore's synced.
|
||||
// Make sure to this is thread-safe using mutex before and after.
|
||||
func unsafeAddWatching(synced *map[string]map[*watching]struct{}, k string, wa *watching) error {
|
||||
func unsafeAddWatcher(synced *map[string]map[*watcher]struct{}, k string, wa *watcher) error {
|
||||
if wa == nil {
|
||||
return fmt.Errorf("nil watching received")
|
||||
return fmt.Errorf("nil watcher received")
|
||||
}
|
||||
mp := *synced
|
||||
if v, ok := mp[k]; ok {
|
||||
if _, ok := v[wa]; ok {
|
||||
return fmt.Errorf("put the same watch twice: %+v", wa)
|
||||
return fmt.Errorf("put the same watcher twice: %+v", wa)
|
||||
} else {
|
||||
v[wa] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
mp[k] = make(map[*watching]struct{})
|
||||
mp[k] = make(map[*watcher]struct{})
|
||||
mp[k][wa] = struct{}{}
|
||||
return nil
|
||||
}
|
||||
|
||||
// newWatchingToEventMap creates a map that has watching as key and events as
|
||||
// value. It enables quick events look up by watching.
|
||||
func newWatchingToEventMap(sm map[string]map[*watching]struct{}, evs []storagepb.Event) map[*watching][]storagepb.Event {
|
||||
watchingToEvents := make(map[*watching][]storagepb.Event)
|
||||
// newWatcherToEventMap creates a map that has watcher as key and events as
|
||||
// value. It enables quick events look up by watcher.
|
||||
func newWatcherToEventMap(sm map[string]map[*watcher]struct{}, evs []storagepb.Event) map[*watcher][]storagepb.Event {
|
||||
watcherToEvents := make(map[*watcher][]storagepb.Event)
|
||||
for _, ev := range evs {
|
||||
key := string(ev.Kv.Key)
|
||||
|
||||
// check all prefixes of the key to notify all corresponded watchings
|
||||
// check all prefixes of the key to notify all corresponded watchers
|
||||
for i := 0; i <= len(key); i++ {
|
||||
k := string(key[:i])
|
||||
|
||||
|
@ -469,20 +469,20 @@ func newWatchingToEventMap(sm map[string]map[*watching]struct{}, evs []storagepb
|
|||
}
|
||||
|
||||
for w := range wm {
|
||||
// the watching needs to be notified when either it watches prefix or
|
||||
// the watcher needs to be notified when either it watches prefix or
|
||||
// the key is exactly matched.
|
||||
if !w.prefix && i != len(ev.Kv.Key) {
|
||||
continue
|
||||
}
|
||||
ev.WatchID = w.id
|
||||
|
||||
if _, ok := watchingToEvents[w]; !ok {
|
||||
watchingToEvents[w] = []storagepb.Event{}
|
||||
if _, ok := watcherToEvents[w]; !ok {
|
||||
watcherToEvents[w] = []storagepb.Event{}
|
||||
}
|
||||
watchingToEvents[w] = append(watchingToEvents[w], ev)
|
||||
watcherToEvents[w] = append(watcherToEvents[w], ev)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return watchingToEvents
|
||||
return watcherToEvents
|
||||
}
|
||||
|
|
|
@ -34,11 +34,11 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
|
|||
// in unsynced for this benchmark.
|
||||
s := &watchableStore{
|
||||
store: newDefaultStore(tmpPath),
|
||||
unsynced: make(map[*watching]struct{}),
|
||||
unsynced: make(map[*watcher]struct{}),
|
||||
|
||||
// to make the test not crash from assigning to nil map.
|
||||
// 'synced' doesn't get populated in this test.
|
||||
synced: make(map[string]map[*watching]struct{}),
|
||||
synced: make(map[string]map[*watcher]struct{}),
|
||||
}
|
||||
|
||||
defer func() {
|
||||
|
|
|
@ -63,7 +63,7 @@ func TestNewWatcherCancel(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestCancelUnsynced tests if running CancelFunc removes watchings from unsynced.
|
||||
// TestCancelUnsynced tests if running CancelFunc removes watchers from unsynced.
|
||||
func TestCancelUnsynced(t *testing.T) {
|
||||
// manually create watchableStore instead of newWatchableStore
|
||||
// because newWatchableStore automatically calls syncWatchers
|
||||
|
@ -71,11 +71,11 @@ func TestCancelUnsynced(t *testing.T) {
|
|||
// in unsynced to test if syncWatchers works as expected.
|
||||
s := &watchableStore{
|
||||
store: newDefaultStore(tmpPath),
|
||||
unsynced: make(map[*watching]struct{}),
|
||||
unsynced: make(map[*watcher]struct{}),
|
||||
|
||||
// to make the test not crash from assigning to nil map.
|
||||
// 'synced' doesn't get populated in this test.
|
||||
synced: make(map[string]map[*watching]struct{}),
|
||||
synced: make(map[string]map[*watcher]struct{}),
|
||||
}
|
||||
|
||||
defer func() {
|
||||
|
@ -112,21 +112,20 @@ func TestCancelUnsynced(t *testing.T) {
|
|||
// After running CancelFunc
|
||||
//
|
||||
// unsynced should be empty
|
||||
// because cancel removes watching from unsynced
|
||||
// because cancel removes watcher from unsynced
|
||||
if len(s.unsynced) != 0 {
|
||||
t.Errorf("unsynced size = %d, want 0", len(s.unsynced))
|
||||
}
|
||||
}
|
||||
|
||||
// TestSyncWatchings populates unsynced watching map and
|
||||
// tests syncWatchings method to see if it correctly sends
|
||||
// events to channel of unsynced watchings and moves these
|
||||
// watchings to synced.
|
||||
func TestSyncWatchings(t *testing.T) {
|
||||
// TestSyncWatchers populates unsynced watcher map and tests syncWatchers
|
||||
// method to see if it correctly sends events to channel of unsynced watchers
|
||||
// and moves these watchers to synced.
|
||||
func TestSyncWatchers(t *testing.T) {
|
||||
s := &watchableStore{
|
||||
store: newDefaultStore(tmpPath),
|
||||
unsynced: make(map[*watching]struct{}),
|
||||
synced: make(map[string]map[*watching]struct{}),
|
||||
unsynced: make(map[*watcher]struct{}),
|
||||
synced: make(map[string]map[*watcher]struct{}),
|
||||
}
|
||||
|
||||
defer func() {
|
||||
|
@ -148,7 +147,7 @@ func TestSyncWatchings(t *testing.T) {
|
|||
w.Watch(testKey, true, 1)
|
||||
}
|
||||
|
||||
// Before running s.syncWatchings()
|
||||
// Before running s.syncWatchers()
|
||||
//
|
||||
// synced should be empty
|
||||
// because we manually populate unsynced only
|
||||
|
@ -161,27 +160,27 @@ func TestSyncWatchings(t *testing.T) {
|
|||
t.Errorf("unsynced size = %d, want %d", len(s.unsynced), watcherN)
|
||||
}
|
||||
|
||||
// this should move all unsynced watchings
|
||||
// this should move all unsynced watchers
|
||||
// to synced ones
|
||||
s.syncWatchings()
|
||||
s.syncWatchers()
|
||||
|
||||
// After running s.syncWatchings()
|
||||
// After running s.syncWatchers()
|
||||
//
|
||||
// synced should not be empty
|
||||
// because syncWatchings populates synced
|
||||
// because syncwatchers populates synced
|
||||
// in this test case
|
||||
if len(s.synced[string(testKey)]) == 0 {
|
||||
t.Errorf("synced[string(testKey)] size = 0, want %d", len(s.synced[string(testKey)]))
|
||||
}
|
||||
// unsynced should be empty
|
||||
// because syncWatchings is expected to move
|
||||
// all watchings from unsynced to synced
|
||||
// because syncwatchers is expected to move
|
||||
// all watchers from unsynced to synced
|
||||
// in this test case
|
||||
if len(s.unsynced) != 0 {
|
||||
t.Errorf("unsynced size = %d, want 0", len(s.unsynced))
|
||||
}
|
||||
|
||||
// All of the watchings actually share one channel
|
||||
// All of the watchers actually share one channel
|
||||
// so we only need to check one shared channel
|
||||
// (See watcher.go for more detail).
|
||||
if len(w.(*watchStream).ch) != watcherN {
|
||||
|
@ -202,7 +201,7 @@ func TestSyncWatchings(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestUnsafeAddWatching(t *testing.T) {
|
||||
func TestUnsafeAddWatcher(t *testing.T) {
|
||||
s := newWatchableStore(tmpPath)
|
||||
defer func() {
|
||||
s.store.Close()
|
||||
|
@ -213,18 +212,18 @@ func TestUnsafeAddWatching(t *testing.T) {
|
|||
s.Put(testKey, testValue)
|
||||
|
||||
size := 10
|
||||
ws := make([]*watching, size)
|
||||
ws := make([]*watcher, size)
|
||||
for i := 0; i < size; i++ {
|
||||
ws[i] = &watching{
|
||||
ws[i] = &watcher{
|
||||
key: testKey,
|
||||
prefix: true,
|
||||
cur: 0,
|
||||
}
|
||||
}
|
||||
// to test if unsafeAddWatching is correctly updating
|
||||
// synced map when adding new watching.
|
||||
// to test if unsafeAddWatcher is correctly updating
|
||||
// synced map when adding new watcher.
|
||||
for i, wa := range ws {
|
||||
if err := unsafeAddWatching(&s.synced, string(testKey), wa); err != nil {
|
||||
if err := unsafeAddWatcher(&s.synced, string(testKey), wa); err != nil {
|
||||
t.Errorf("#%d: error = %v, want nil", i, err)
|
||||
}
|
||||
if v, ok := s.synced[string(testKey)]; !ok {
|
||||
|
@ -240,11 +239,11 @@ func TestUnsafeAddWatching(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestNewMapWatchingToEventMap(t *testing.T) {
|
||||
func TestNewMapwatcherToEventMap(t *testing.T) {
|
||||
k0, k1, k2 := []byte("foo0"), []byte("foo1"), []byte("foo2")
|
||||
v0, v1, v2 := []byte("bar0"), []byte("bar1"), []byte("bar2")
|
||||
|
||||
ws := []*watching{{key: k0}, {key: k1}, {key: k2}}
|
||||
ws := []*watcher{{key: k0}, {key: k1}, {key: k2}}
|
||||
|
||||
evs := []storagepb.Event{
|
||||
{
|
||||
|
@ -262,63 +261,63 @@ func TestNewMapWatchingToEventMap(t *testing.T) {
|
|||
}
|
||||
|
||||
tests := []struct {
|
||||
sync map[string]map[*watching]struct{}
|
||||
sync map[string]map[*watcher]struct{}
|
||||
evs []storagepb.Event
|
||||
|
||||
wwe map[*watching][]storagepb.Event
|
||||
wwe map[*watcher][]storagepb.Event
|
||||
}{
|
||||
// no watching in sync, some events should return empty wwe
|
||||
// no watcher in sync, some events should return empty wwe
|
||||
{
|
||||
map[string]map[*watching]struct{}{},
|
||||
map[string]map[*watcher]struct{}{},
|
||||
evs,
|
||||
map[*watching][]storagepb.Event{},
|
||||
map[*watcher][]storagepb.Event{},
|
||||
},
|
||||
|
||||
// one watching in sync, one event that does not match the key of that
|
||||
// watching should return empty wwe
|
||||
// one watcher in sync, one event that does not match the key of that
|
||||
// watcher should return empty wwe
|
||||
{
|
||||
map[string]map[*watching]struct{}{
|
||||
map[string]map[*watcher]struct{}{
|
||||
string(k2): {ws[2]: struct{}{}},
|
||||
},
|
||||
evs[:1],
|
||||
map[*watching][]storagepb.Event{},
|
||||
map[*watcher][]storagepb.Event{},
|
||||
},
|
||||
|
||||
// one watching in sync, one event that matches the key of that
|
||||
// watching should return wwe with that matching watching
|
||||
// one watcher in sync, one event that matches the key of that
|
||||
// watcher should return wwe with that matching watcher
|
||||
{
|
||||
map[string]map[*watching]struct{}{
|
||||
map[string]map[*watcher]struct{}{
|
||||
string(k1): {ws[1]: struct{}{}},
|
||||
},
|
||||
evs[1:2],
|
||||
map[*watching][]storagepb.Event{
|
||||
map[*watcher][]storagepb.Event{
|
||||
ws[1]: evs[1:2],
|
||||
},
|
||||
},
|
||||
|
||||
// two watchings in sync that watches two different keys, one event
|
||||
// that matches the key of only one of the watching should return wwe
|
||||
// with the matching watching
|
||||
// two watchers in sync that watches two different keys, one event
|
||||
// that matches the key of only one of the watcher should return wwe
|
||||
// with the matching watcher
|
||||
{
|
||||
map[string]map[*watching]struct{}{
|
||||
map[string]map[*watcher]struct{}{
|
||||
string(k0): {ws[0]: struct{}{}},
|
||||
string(k2): {ws[2]: struct{}{}},
|
||||
},
|
||||
evs[2:],
|
||||
map[*watching][]storagepb.Event{
|
||||
map[*watcher][]storagepb.Event{
|
||||
ws[2]: evs[2:],
|
||||
},
|
||||
},
|
||||
|
||||
// two watchings in sync that watches the same key, two events that
|
||||
// match the keys should return wwe with those two watchings
|
||||
// two watchers in sync that watches the same key, two events that
|
||||
// match the keys should return wwe with those two watchers
|
||||
{
|
||||
map[string]map[*watching]struct{}{
|
||||
map[string]map[*watcher]struct{}{
|
||||
string(k0): {ws[0]: struct{}{}},
|
||||
string(k1): {ws[1]: struct{}{}},
|
||||
},
|
||||
evs[:2],
|
||||
map[*watching][]storagepb.Event{
|
||||
map[*watcher][]storagepb.Event{
|
||||
ws[0]: evs[:1],
|
||||
ws[1]: evs[1:2],
|
||||
},
|
||||
|
@ -326,7 +325,7 @@ func TestNewMapWatchingToEventMap(t *testing.T) {
|
|||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
gwe := newWatchingToEventMap(tt.sync, tt.evs)
|
||||
gwe := newWatcherToEventMap(tt.sync, tt.evs)
|
||||
if len(gwe) != len(tt.wwe) {
|
||||
t.Errorf("#%d: len(gwe) got = %d, want = %d", i, len(gwe), len(tt.wwe))
|
||||
}
|
||||
|
|
|
@ -21,13 +21,15 @@ import (
|
|||
)
|
||||
|
||||
type WatchStream interface {
|
||||
// Watch watches the events happening or happened on the given key
|
||||
// or key prefix from the given startRev.
|
||||
// Watch creates a watcher. The watcher watches the events happening or
|
||||
// happened on the given key or key prefix from the given startRev.
|
||||
//
|
||||
// The whole event history can be watched unless compacted.
|
||||
// If `prefix` is true, watch observes all events whose key prefix could be the given `key`.
|
||||
// If `startRev` <=0, watch observes events after currentRev.
|
||||
// The returned `id` is the ID of this watching. It appears as WatchID
|
||||
// in events that are sent to this watching.
|
||||
//
|
||||
// The returned `id` is the ID of this watcher. It appears as WatchID
|
||||
// in events that are sent to the created watcher through stream channel.
|
||||
Watch(key []byte, prefix bool, startRev int64) (id int64, cancel CancelFunc)
|
||||
|
||||
// Chan returns a chan. All watched events will be sent to the returned chan.
|
||||
|
@ -37,14 +39,15 @@ type WatchStream interface {
|
|||
Close()
|
||||
}
|
||||
|
||||
// watchStream contains a collection of watching that share
|
||||
// watchStream contains a collection of watchers that share
|
||||
// one streaming chan to send out watched events and other control events.
|
||||
type watchStream struct {
|
||||
watchable watchable
|
||||
ch chan []storagepb.Event
|
||||
|
||||
mu sync.Mutex // guards fields below it
|
||||
nextID int64 // nextID is the ID allocated for next new watching
|
||||
mu sync.Mutex // guards fields below it
|
||||
// nextID is the ID pre-allocated for next new watcher in this stream
|
||||
nextID int64
|
||||
closed bool
|
||||
cancels []CancelFunc
|
||||
}
|
||||
|
|
|
@ -16,8 +16,8 @@ package storage
|
|||
|
||||
import "testing"
|
||||
|
||||
// TestWatcherWatchID tests that each watcher provides unique watch ID,
|
||||
// and the watched event attaches the correct watch ID.
|
||||
// TestWatcherWatchID tests that each watcher provides unique watchID,
|
||||
// and the watched event attaches the correct watchID.
|
||||
func TestWatcherWatchID(t *testing.T) {
|
||||
s := WatchableKV(newWatchableStore(tmpPath))
|
||||
defer cleanup(s, tmpPath)
|
||||
|
@ -47,7 +47,7 @@ func TestWatcherWatchID(t *testing.T) {
|
|||
|
||||
s.Put([]byte("foo2"), []byte("bar"))
|
||||
|
||||
// unsynced watchings
|
||||
// unsynced watchers
|
||||
for i := 10; i < 20; i++ {
|
||||
id, cancel := w.Watch([]byte("foo2"), false, 1)
|
||||
if _, ok := idm[id]; ok {
|
||||
|
|
Loading…
Reference in New Issue