From f7444ff3002548e73e239744c9bbae2bb513401a Mon Sep 17 00:00:00 2001 From: Jonathan Boulle Date: Mon, 8 Sep 2014 16:56:10 -0700 Subject: [PATCH] store: convert Watch to interface --- etcdserver/etcdhttp/http.go | 4 +-- etcdserver/server.go | 2 +- store/store.go | 6 ++-- store/store_bench_test.go | 12 ++++---- store/store_test.go | 56 ++++++++++++++++++------------------- store/watcher.go | 27 ++++++++++++------ store/watcher_hub.go | 14 +++++----- store/watcher_test.go | 6 ++-- 8 files changed, 68 insertions(+), 59 deletions(-) diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index 94b455fec..b7c9e23fa 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -288,7 +288,7 @@ func encodeResponse(ctx context.Context, w http.ResponseWriter, resp etcdserver. return nil } -func waitForEvent(ctx context.Context, w http.ResponseWriter, wa *store.Watcher) (*store.Event, error) { +func waitForEvent(ctx context.Context, w http.ResponseWriter, wa store.Watcher) (*store.Event, error) { // TODO(bmizerany): support streaming? defer wa.Remove() var nch <-chan bool @@ -297,7 +297,7 @@ func waitForEvent(ctx context.Context, w http.ResponseWriter, wa *store.Watcher) } select { - case ev := <-wa.EventChan: + case ev := <-wa.EventChan(): return ev, nil case <-nch: elog.TODO() diff --git a/etcdserver/server.go b/etcdserver/server.go index 914a2c5c4..ccf2e6827 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -21,7 +21,7 @@ type SendFunc func(m []raftpb.Message) type Response struct { Event *store.Event - Watcher *store.Watcher + Watcher store.Watcher err error } diff --git a/store/store.go b/store/store.go index b40dd4fac..e6c874f43 100644 --- a/store/store.go +++ b/store/store.go @@ -51,7 +51,7 @@ type Store interface { Delete(nodePath string, recursive, dir bool) (*Event, error) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) - Watch(prefix string, recursive, stream bool, sinceIndex uint64) (*Watcher, error) + Watch(prefix string, recursive, stream bool, sinceIndex uint64) (Watcher, error) Save() ([]byte, error) Recovery(state []byte) error @@ -344,14 +344,14 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui return e, nil } -func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (*Watcher, error) { +func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (Watcher, error) { s.worldLock.RLock() defer s.worldLock.RUnlock() key = path.Clean(path.Join("/", key)) nextIndex := s.CurrentIndex + 1 - var w *Watcher + var w Watcher var err *etcdErr.Error if sinceIndex == 0 { diff --git a/store/store_bench_test.go b/store/store_bench_test.go index a637d1d0b..6bc80473b 100644 --- a/store/store_bench_test.go +++ b/store/store_bench_test.go @@ -113,7 +113,7 @@ func BenchmarkWatch(b *testing.B) { e := newEvent("set", kvs[i][0], uint64(i+1), uint64(i+1)) s.WatcherHub.notify(e) - <-w.EventChan + <-w.EventChan() s.CurrentIndex++ } @@ -135,7 +135,7 @@ func BenchmarkWatchWithSet(b *testing.B) { w, _ := s.Watch(kvs[i][0], false, false, 0) s.Set(kvs[i][0], false, "test", Permanent) - <-w.EventChan + <-w.EventChan() } } @@ -145,7 +145,7 @@ func BenchmarkWatchWithSetBatch(b *testing.B) { kvs, _ := generateNRandomKV(b.N, 128) b.StartTimer() - watchers := make([]*Watcher, b.N) + watchers := make([]Watcher, b.N) for i := 0; i < b.N; i++ { watchers[i], _ = s.Watch(kvs[i][0], false, false, 0) @@ -156,14 +156,14 @@ func BenchmarkWatchWithSetBatch(b *testing.B) { } for i := 0; i < b.N; i++ { - <-watchers[i].EventChan + <-watchers[i].EventChan() } } func BenchmarkWatchOneKey(b *testing.B) { s := newStore() - watchers := make([]*Watcher, b.N) + watchers := make([]Watcher, b.N) for i := 0; i < b.N; i++ { watchers[i], _ = s.Watch("/foo", false, false, 0) @@ -172,7 +172,7 @@ func BenchmarkWatchOneKey(b *testing.B) { s.Set("/foo", false, "", Permanent) for i := 0; i < b.N; i++ { - <-watchers[i].EventChan + <-watchers[i].EventChan() } } diff --git a/store/store_test.go b/store/store_test.go index dc3ed5cb1..aead54d47 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -535,7 +535,7 @@ func TestStoreCompareAndSwapPrevIndexFailsIfNotMatch(t *testing.T) { func TestStoreWatchCreate(t *testing.T) { s := newStore() w, _ := s.Watch("/foo", false, false, 0) - c := w.EventChan + c := w.EventChan() s.Create("/foo", false, "bar", false, Permanent) e := nbselect(c) assert.Equal(t, e.Action, "create", "") @@ -549,7 +549,7 @@ func TestStoreWatchRecursiveCreate(t *testing.T) { s := newStore() w, _ := s.Watch("/foo", true, false, 0) s.Create("/foo/bar", false, "baz", false, Permanent) - e := nbselect(w.EventChan) + e := nbselect(w.EventChan()) assert.Equal(t, e.Action, "create", "") assert.Equal(t, e.Node.Key, "/foo/bar", "") } @@ -560,7 +560,7 @@ func TestStoreWatchUpdate(t *testing.T) { s.Create("/foo", false, "bar", false, Permanent) w, _ := s.Watch("/foo", false, false, 0) s.Update("/foo", "baz", Permanent) - e := nbselect(w.EventChan) + e := nbselect(w.EventChan()) assert.Equal(t, e.Action, "update", "") assert.Equal(t, e.Node.Key, "/foo", "") } @@ -571,7 +571,7 @@ func TestStoreWatchRecursiveUpdate(t *testing.T) { s.Create("/foo/bar", false, "baz", false, Permanent) w, _ := s.Watch("/foo", true, false, 0) s.Update("/foo/bar", "baz", Permanent) - e := nbselect(w.EventChan) + e := nbselect(w.EventChan()) assert.Equal(t, e.Action, "update", "") assert.Equal(t, e.Node.Key, "/foo/bar", "") } @@ -582,7 +582,7 @@ func TestStoreWatchDelete(t *testing.T) { s.Create("/foo", false, "bar", false, Permanent) w, _ := s.Watch("/foo", false, false, 0) s.Delete("/foo", false, false) - e := nbselect(w.EventChan) + e := nbselect(w.EventChan()) assert.Equal(t, e.Action, "delete", "") assert.Equal(t, e.Node.Key, "/foo", "") } @@ -593,7 +593,7 @@ func TestStoreWatchRecursiveDelete(t *testing.T) { s.Create("/foo/bar", false, "baz", false, Permanent) w, _ := s.Watch("/foo", true, false, 0) s.Delete("/foo/bar", false, false) - e := nbselect(w.EventChan) + e := nbselect(w.EventChan()) assert.Equal(t, e.Action, "delete", "") assert.Equal(t, e.Node.Key, "/foo/bar", "") } @@ -604,7 +604,7 @@ func TestStoreWatchCompareAndSwap(t *testing.T) { s.Create("/foo", false, "bar", false, Permanent) w, _ := s.Watch("/foo", false, false, 0) s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent) - e := nbselect(w.EventChan) + e := nbselect(w.EventChan()) assert.Equal(t, e.Action, "compareAndSwap", "") assert.Equal(t, e.Node.Key, "/foo", "") } @@ -615,7 +615,7 @@ func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) { s.Create("/foo/bar", false, "baz", false, Permanent) w, _ := s.Watch("/foo", true, false, 0) s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent) - e := nbselect(w.EventChan) + e := nbselect(w.EventChan()) assert.Equal(t, e.Action, "compareAndSwap", "") assert.Equal(t, e.Node.Key, "/foo/bar", "") } @@ -634,7 +634,7 @@ func TestStoreWatchExpire(t *testing.T) { s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(500*time.Millisecond)) w, _ := s.Watch("/", true, false, 0) - c := w.EventChan + c := w.EventChan() e := nbselect(c) assert.Nil(t, e, "") time.Sleep(600 * time.Millisecond) @@ -642,7 +642,7 @@ func TestStoreWatchExpire(t *testing.T) { assert.Equal(t, e.Action, "expire", "") assert.Equal(t, e.Node.Key, "/foo", "") w, _ = s.Watch("/", true, false, 4) - e = nbselect(w.EventChan) + e = nbselect(w.EventChan()) assert.Equal(t, e.Action, "expire", "") assert.Equal(t, e.Node.Key, "/foofoo", "") } @@ -653,19 +653,19 @@ func TestStoreWatchStream(t *testing.T) { w, _ := s.Watch("/foo", false, true, 0) // first modification s.Create("/foo", false, "bar", false, Permanent) - e := nbselect(w.EventChan) + e := nbselect(w.EventChan()) assert.Equal(t, e.Action, "create", "") assert.Equal(t, e.Node.Key, "/foo", "") assert.Equal(t, *e.Node.Value, "bar", "") - e = nbselect(w.EventChan) + e = nbselect(w.EventChan()) assert.Nil(t, e, "") // second modification s.Update("/foo", "baz", Permanent) - e = nbselect(w.EventChan) + e = nbselect(w.EventChan()) assert.Equal(t, e.Action, "update", "") assert.Equal(t, e.Node.Key, "/foo", "") assert.Equal(t, *e.Node.Value, "baz", "") - e = nbselect(w.EventChan) + e = nbselect(w.EventChan()) assert.Nil(t, e, "") } @@ -732,10 +732,10 @@ func TestStoreWatchCreateWithHiddenKey(t *testing.T) { s := newStore() w, _ := s.Watch("/_foo", false, false, 0) s.Create("/_foo", false, "bar", false, Permanent) - e := nbselect(w.EventChan) + e := nbselect(w.EventChan()) assert.Equal(t, e.Action, "create", "") assert.Equal(t, e.Node.Key, "/_foo", "") - e = nbselect(w.EventChan) + e = nbselect(w.EventChan()) assert.Nil(t, e, "") } @@ -744,14 +744,14 @@ func TestStoreWatchRecursiveCreateWithHiddenKey(t *testing.T) { s := newStore() w, _ := s.Watch("/foo", true, false, 0) s.Create("/foo/_bar", false, "baz", false, Permanent) - e := nbselect(w.EventChan) + e := nbselect(w.EventChan()) assert.Nil(t, e, "") w, _ = s.Watch("/foo", true, false, 0) s.Create("/foo/_baz", true, "", false, Permanent) - e = nbselect(w.EventChan) + e = nbselect(w.EventChan()) assert.Nil(t, e, "") s.Create("/foo/_baz/quux", false, "quux", false, Permanent) - e = nbselect(w.EventChan) + e = nbselect(w.EventChan()) assert.Nil(t, e, "") } @@ -761,10 +761,10 @@ func TestStoreWatchUpdateWithHiddenKey(t *testing.T) { s.Create("/_foo", false, "bar", false, Permanent) w, _ := s.Watch("/_foo", false, false, 0) s.Update("/_foo", "baz", Permanent) - e := nbselect(w.EventChan) + e := nbselect(w.EventChan()) assert.Equal(t, e.Action, "update", "") assert.Equal(t, e.Node.Key, "/_foo", "") - e = nbselect(w.EventChan) + e = nbselect(w.EventChan()) assert.Nil(t, e, "") } @@ -774,7 +774,7 @@ func TestStoreWatchRecursiveUpdateWithHiddenKey(t *testing.T) { s.Create("/foo/_bar", false, "baz", false, Permanent) w, _ := s.Watch("/foo", true, false, 0) s.Update("/foo/_bar", "baz", Permanent) - e := nbselect(w.EventChan) + e := nbselect(w.EventChan()) assert.Nil(t, e, "") } @@ -784,10 +784,10 @@ func TestStoreWatchDeleteWithHiddenKey(t *testing.T) { s.Create("/_foo", false, "bar", false, Permanent) w, _ := s.Watch("/_foo", false, false, 0) s.Delete("/_foo", false, false) - e := nbselect(w.EventChan) + e := nbselect(w.EventChan()) assert.Equal(t, e.Action, "delete", "") assert.Equal(t, e.Node.Key, "/_foo", "") - e = nbselect(w.EventChan) + e = nbselect(w.EventChan()) assert.Nil(t, e, "") } @@ -797,7 +797,7 @@ func TestStoreWatchRecursiveDeleteWithHiddenKey(t *testing.T) { s.Create("/foo/_bar", false, "baz", false, Permanent) w, _ := s.Watch("/foo", true, false, 0) s.Delete("/foo/_bar", false, false) - e := nbselect(w.EventChan) + e := nbselect(w.EventChan()) assert.Nil(t, e, "") } @@ -815,7 +815,7 @@ func TestStoreWatchExpireWithHiddenKey(t *testing.T) { s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(1000*time.Millisecond)) w, _ := s.Watch("/", true, false, 0) - c := w.EventChan + c := w.EventChan() e := nbselect(c) assert.Nil(t, e, "") time.Sleep(600 * time.Millisecond) @@ -833,7 +833,7 @@ func TestStoreWatchRecursiveCreateDeeperThanHiddenKey(t *testing.T) { w, _ := s.Watch("/_foo/bar", true, false, 0) s.Create("/_foo/bar/baz", false, "baz", false, Permanent) - e := nbselect(w.EventChan) + e := nbselect(w.EventChan()) assert.NotNil(t, e, "") assert.Equal(t, e.Action, "create", "") assert.Equal(t, e.Node.Key, "/_foo/bar/baz", "") @@ -841,7 +841,7 @@ func TestStoreWatchRecursiveCreateDeeperThanHiddenKey(t *testing.T) { // Ensure that slow consumers are handled properly. // -// Since Watcher.EventChan has a buffer of size 1 we can only queue 1 +// Since Watcher.EventChan() has a buffer of size 1 we can only queue 1 // event per watcher. If the consumer cannot consume the event on time and // another event arrives, the channel is closed and event is discarded. // This test ensures that after closing the channel, the store can continue diff --git a/store/watcher.go b/store/watcher.go index 3c201e0d0..d6b2e6313 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -16,8 +16,13 @@ limitations under the License. package store -type Watcher struct { - EventChan chan *Event +type Watcher interface { + EventChan() chan *Event + Remove() +} + +type watcher struct { + eventChan chan *Event stream bool recursive bool sinceIndex uint64 @@ -26,9 +31,13 @@ type Watcher struct { remove func() } +func (w *watcher) EventChan() chan *Event { + return w.eventChan +} + // notify function notifies the watcher. If the watcher interests in the given path, // the function will return true. -func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool { +func (w *watcher) notify(e *Event, originalPath bool, deleted bool) bool { // watcher is interested the path in three cases and under one condition // the condition is that the event happens after the watcher's sinceIndex @@ -45,15 +54,15 @@ func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool { // For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher // should get notified even if "/foo" is not the path it is watching. if (w.recursive || originalPath || deleted) && e.Index() >= w.sinceIndex { - // We cannot block here if the EventChan capacity is full, otherwise - // etcd will hang. EventChan capacity is full when the rate of + // We cannot block here if the eventChan capacity is full, otherwise + // etcd will hang. eventChan capacity is full when the rate of // notifications are higher than our send rate. // If this happens, we close the channel. select { - case w.EventChan <- e: + case w.eventChan <- e: default: // We have missed a notification. Remove the watcher. - // Removing the watcher also closes the EventChan. + // Removing the watcher also closes the eventChan. w.remove() } return true @@ -63,11 +72,11 @@ func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool { // Remove removes the watcher from watcherHub // The actual remove function is guaranteed to only be executed once -func (w *Watcher) Remove() { +func (w *watcher) Remove() { w.hub.mutex.Lock() defer w.hub.mutex.Unlock() - close(w.EventChan) + close(w.eventChan) if w.remove != nil { w.remove() } diff --git a/store/watcher_hub.go b/store/watcher_hub.go index 45b98cfd2..6db9ab661 100644 --- a/store/watcher_hub.go +++ b/store/watcher_hub.go @@ -34,19 +34,19 @@ func newWatchHub(capacity int) *watcherHub { } } -// Watch function returns a watcher. +// Watch function returns a Watcher. // If recursive is true, the first change after index under key will be sent to the event channel of the watcher. // If recursive is false, the first change after index at key will be sent to the event channel of the watcher. // If index is zero, watch will start from the current index + 1. -func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (*Watcher, *etcdErr.Error) { +func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (Watcher, *etcdErr.Error) { event, err := wh.EventHistory.scan(key, recursive, index) if err != nil { return nil, err } - w := &Watcher{ - EventChan: make(chan *Event, 1), // use a buffered channel + w := &watcher{ + eventChan: make(chan *Event, 1), // use a buffered channel recursive: recursive, stream: stream, sinceIndex: index, @@ -54,7 +54,7 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (* } if event != nil { - w.EventChan <- event + w.eventChan <- event return w, nil } @@ -75,7 +75,7 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (* } w.remove = func() { - if w.removed { // avoid remove it twice + if w.removed { // avoid removing it twice return } w.removed = true @@ -121,7 +121,7 @@ func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) { for curr != nil { next := curr.Next() // save reference to the next one in the list - w, _ := curr.Value.(*Watcher) + w, _ := curr.Value.(*watcher) originalPath := (e.Node.Key == nodePath) if (originalPath || !isHidden(nodePath, e.Node.Key)) && w.notify(e, originalPath, deleted) { diff --git a/store/watcher_test.go b/store/watcher_test.go index c2cd154ea..baea41953 100644 --- a/store/watcher_test.go +++ b/store/watcher_test.go @@ -27,7 +27,7 @@ func TestWatcher(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - c := w.EventChan + c := w.EventChan() select { case <-c: @@ -47,7 +47,7 @@ func TestWatcher(t *testing.T) { } w, _ = wh.watch("/foo", false, false, 2) - c = w.EventChan + c = w.EventChan() e = newEvent(Create, "/foo/bar", 2, 2) @@ -72,7 +72,7 @@ func TestWatcher(t *testing.T) { // ensure we are doing exact matching rather than prefix matching w, _ = wh.watch("/fo", true, false, 1) - c = w.EventChan + c = w.EventChan() select { case re = <-c: