store: convert Watch to interface
parent
7c8b1a553f
commit
f7444ff300
|
@ -288,7 +288,7 @@ func encodeResponse(ctx context.Context, w http.ResponseWriter, resp etcdserver.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitForEvent(ctx context.Context, w http.ResponseWriter, wa *store.Watcher) (*store.Event, error) {
|
func waitForEvent(ctx context.Context, w http.ResponseWriter, wa store.Watcher) (*store.Event, error) {
|
||||||
// TODO(bmizerany): support streaming?
|
// TODO(bmizerany): support streaming?
|
||||||
defer wa.Remove()
|
defer wa.Remove()
|
||||||
var nch <-chan bool
|
var nch <-chan bool
|
||||||
|
@ -297,7 +297,7 @@ func waitForEvent(ctx context.Context, w http.ResponseWriter, wa *store.Watcher)
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case ev := <-wa.EventChan:
|
case ev := <-wa.EventChan():
|
||||||
return ev, nil
|
return ev, nil
|
||||||
case <-nch:
|
case <-nch:
|
||||||
elog.TODO()
|
elog.TODO()
|
||||||
|
|
|
@ -21,7 +21,7 @@ type SendFunc func(m []raftpb.Message)
|
||||||
|
|
||||||
type Response struct {
|
type Response struct {
|
||||||
Event *store.Event
|
Event *store.Event
|
||||||
Watcher *store.Watcher
|
Watcher store.Watcher
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -51,7 +51,7 @@ type Store interface {
|
||||||
Delete(nodePath string, recursive, dir bool) (*Event, error)
|
Delete(nodePath string, recursive, dir bool) (*Event, error)
|
||||||
CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
|
CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
|
||||||
|
|
||||||
Watch(prefix string, recursive, stream bool, sinceIndex uint64) (*Watcher, error)
|
Watch(prefix string, recursive, stream bool, sinceIndex uint64) (Watcher, error)
|
||||||
|
|
||||||
Save() ([]byte, error)
|
Save() ([]byte, error)
|
||||||
Recovery(state []byte) error
|
Recovery(state []byte) error
|
||||||
|
@ -344,14 +344,14 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui
|
||||||
return e, nil
|
return e, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (*Watcher, error) {
|
func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (Watcher, error) {
|
||||||
s.worldLock.RLock()
|
s.worldLock.RLock()
|
||||||
defer s.worldLock.RUnlock()
|
defer s.worldLock.RUnlock()
|
||||||
|
|
||||||
key = path.Clean(path.Join("/", key))
|
key = path.Clean(path.Join("/", key))
|
||||||
nextIndex := s.CurrentIndex + 1
|
nextIndex := s.CurrentIndex + 1
|
||||||
|
|
||||||
var w *Watcher
|
var w Watcher
|
||||||
var err *etcdErr.Error
|
var err *etcdErr.Error
|
||||||
|
|
||||||
if sinceIndex == 0 {
|
if sinceIndex == 0 {
|
||||||
|
|
|
@ -113,7 +113,7 @@ func BenchmarkWatch(b *testing.B) {
|
||||||
|
|
||||||
e := newEvent("set", kvs[i][0], uint64(i+1), uint64(i+1))
|
e := newEvent("set", kvs[i][0], uint64(i+1), uint64(i+1))
|
||||||
s.WatcherHub.notify(e)
|
s.WatcherHub.notify(e)
|
||||||
<-w.EventChan
|
<-w.EventChan()
|
||||||
s.CurrentIndex++
|
s.CurrentIndex++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,7 +135,7 @@ func BenchmarkWatchWithSet(b *testing.B) {
|
||||||
w, _ := s.Watch(kvs[i][0], false, false, 0)
|
w, _ := s.Watch(kvs[i][0], false, false, 0)
|
||||||
|
|
||||||
s.Set(kvs[i][0], false, "test", Permanent)
|
s.Set(kvs[i][0], false, "test", Permanent)
|
||||||
<-w.EventChan
|
<-w.EventChan()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -145,7 +145,7 @@ func BenchmarkWatchWithSetBatch(b *testing.B) {
|
||||||
kvs, _ := generateNRandomKV(b.N, 128)
|
kvs, _ := generateNRandomKV(b.N, 128)
|
||||||
b.StartTimer()
|
b.StartTimer()
|
||||||
|
|
||||||
watchers := make([]*Watcher, b.N)
|
watchers := make([]Watcher, b.N)
|
||||||
|
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
watchers[i], _ = s.Watch(kvs[i][0], false, false, 0)
|
watchers[i], _ = s.Watch(kvs[i][0], false, false, 0)
|
||||||
|
@ -156,14 +156,14 @@ func BenchmarkWatchWithSetBatch(b *testing.B) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
<-watchers[i].EventChan
|
<-watchers[i].EventChan()
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkWatchOneKey(b *testing.B) {
|
func BenchmarkWatchOneKey(b *testing.B) {
|
||||||
s := newStore()
|
s := newStore()
|
||||||
watchers := make([]*Watcher, b.N)
|
watchers := make([]Watcher, b.N)
|
||||||
|
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
watchers[i], _ = s.Watch("/foo", false, false, 0)
|
watchers[i], _ = s.Watch("/foo", false, false, 0)
|
||||||
|
@ -172,7 +172,7 @@ func BenchmarkWatchOneKey(b *testing.B) {
|
||||||
s.Set("/foo", false, "", Permanent)
|
s.Set("/foo", false, "", Permanent)
|
||||||
|
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
<-watchers[i].EventChan
|
<-watchers[i].EventChan()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -535,7 +535,7 @@ func TestStoreCompareAndSwapPrevIndexFailsIfNotMatch(t *testing.T) {
|
||||||
func TestStoreWatchCreate(t *testing.T) {
|
func TestStoreWatchCreate(t *testing.T) {
|
||||||
s := newStore()
|
s := newStore()
|
||||||
w, _ := s.Watch("/foo", false, false, 0)
|
w, _ := s.Watch("/foo", false, false, 0)
|
||||||
c := w.EventChan
|
c := w.EventChan()
|
||||||
s.Create("/foo", false, "bar", false, Permanent)
|
s.Create("/foo", false, "bar", false, Permanent)
|
||||||
e := nbselect(c)
|
e := nbselect(c)
|
||||||
assert.Equal(t, e.Action, "create", "")
|
assert.Equal(t, e.Action, "create", "")
|
||||||
|
@ -549,7 +549,7 @@ func TestStoreWatchRecursiveCreate(t *testing.T) {
|
||||||
s := newStore()
|
s := newStore()
|
||||||
w, _ := s.Watch("/foo", true, false, 0)
|
w, _ := s.Watch("/foo", true, false, 0)
|
||||||
s.Create("/foo/bar", false, "baz", false, Permanent)
|
s.Create("/foo/bar", false, "baz", false, Permanent)
|
||||||
e := nbselect(w.EventChan)
|
e := nbselect(w.EventChan())
|
||||||
assert.Equal(t, e.Action, "create", "")
|
assert.Equal(t, e.Action, "create", "")
|
||||||
assert.Equal(t, e.Node.Key, "/foo/bar", "")
|
assert.Equal(t, e.Node.Key, "/foo/bar", "")
|
||||||
}
|
}
|
||||||
|
@ -560,7 +560,7 @@ func TestStoreWatchUpdate(t *testing.T) {
|
||||||
s.Create("/foo", false, "bar", false, Permanent)
|
s.Create("/foo", false, "bar", false, Permanent)
|
||||||
w, _ := s.Watch("/foo", false, false, 0)
|
w, _ := s.Watch("/foo", false, false, 0)
|
||||||
s.Update("/foo", "baz", Permanent)
|
s.Update("/foo", "baz", Permanent)
|
||||||
e := nbselect(w.EventChan)
|
e := nbselect(w.EventChan())
|
||||||
assert.Equal(t, e.Action, "update", "")
|
assert.Equal(t, e.Action, "update", "")
|
||||||
assert.Equal(t, e.Node.Key, "/foo", "")
|
assert.Equal(t, e.Node.Key, "/foo", "")
|
||||||
}
|
}
|
||||||
|
@ -571,7 +571,7 @@ func TestStoreWatchRecursiveUpdate(t *testing.T) {
|
||||||
s.Create("/foo/bar", false, "baz", false, Permanent)
|
s.Create("/foo/bar", false, "baz", false, Permanent)
|
||||||
w, _ := s.Watch("/foo", true, false, 0)
|
w, _ := s.Watch("/foo", true, false, 0)
|
||||||
s.Update("/foo/bar", "baz", Permanent)
|
s.Update("/foo/bar", "baz", Permanent)
|
||||||
e := nbselect(w.EventChan)
|
e := nbselect(w.EventChan())
|
||||||
assert.Equal(t, e.Action, "update", "")
|
assert.Equal(t, e.Action, "update", "")
|
||||||
assert.Equal(t, e.Node.Key, "/foo/bar", "")
|
assert.Equal(t, e.Node.Key, "/foo/bar", "")
|
||||||
}
|
}
|
||||||
|
@ -582,7 +582,7 @@ func TestStoreWatchDelete(t *testing.T) {
|
||||||
s.Create("/foo", false, "bar", false, Permanent)
|
s.Create("/foo", false, "bar", false, Permanent)
|
||||||
w, _ := s.Watch("/foo", false, false, 0)
|
w, _ := s.Watch("/foo", false, false, 0)
|
||||||
s.Delete("/foo", false, false)
|
s.Delete("/foo", false, false)
|
||||||
e := nbselect(w.EventChan)
|
e := nbselect(w.EventChan())
|
||||||
assert.Equal(t, e.Action, "delete", "")
|
assert.Equal(t, e.Action, "delete", "")
|
||||||
assert.Equal(t, e.Node.Key, "/foo", "")
|
assert.Equal(t, e.Node.Key, "/foo", "")
|
||||||
}
|
}
|
||||||
|
@ -593,7 +593,7 @@ func TestStoreWatchRecursiveDelete(t *testing.T) {
|
||||||
s.Create("/foo/bar", false, "baz", false, Permanent)
|
s.Create("/foo/bar", false, "baz", false, Permanent)
|
||||||
w, _ := s.Watch("/foo", true, false, 0)
|
w, _ := s.Watch("/foo", true, false, 0)
|
||||||
s.Delete("/foo/bar", false, false)
|
s.Delete("/foo/bar", false, false)
|
||||||
e := nbselect(w.EventChan)
|
e := nbselect(w.EventChan())
|
||||||
assert.Equal(t, e.Action, "delete", "")
|
assert.Equal(t, e.Action, "delete", "")
|
||||||
assert.Equal(t, e.Node.Key, "/foo/bar", "")
|
assert.Equal(t, e.Node.Key, "/foo/bar", "")
|
||||||
}
|
}
|
||||||
|
@ -604,7 +604,7 @@ func TestStoreWatchCompareAndSwap(t *testing.T) {
|
||||||
s.Create("/foo", false, "bar", false, Permanent)
|
s.Create("/foo", false, "bar", false, Permanent)
|
||||||
w, _ := s.Watch("/foo", false, false, 0)
|
w, _ := s.Watch("/foo", false, false, 0)
|
||||||
s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
|
s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
|
||||||
e := nbselect(w.EventChan)
|
e := nbselect(w.EventChan())
|
||||||
assert.Equal(t, e.Action, "compareAndSwap", "")
|
assert.Equal(t, e.Action, "compareAndSwap", "")
|
||||||
assert.Equal(t, e.Node.Key, "/foo", "")
|
assert.Equal(t, e.Node.Key, "/foo", "")
|
||||||
}
|
}
|
||||||
|
@ -615,7 +615,7 @@ func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) {
|
||||||
s.Create("/foo/bar", false, "baz", false, Permanent)
|
s.Create("/foo/bar", false, "baz", false, Permanent)
|
||||||
w, _ := s.Watch("/foo", true, false, 0)
|
w, _ := s.Watch("/foo", true, false, 0)
|
||||||
s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent)
|
s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent)
|
||||||
e := nbselect(w.EventChan)
|
e := nbselect(w.EventChan())
|
||||||
assert.Equal(t, e.Action, "compareAndSwap", "")
|
assert.Equal(t, e.Action, "compareAndSwap", "")
|
||||||
assert.Equal(t, e.Node.Key, "/foo/bar", "")
|
assert.Equal(t, e.Node.Key, "/foo/bar", "")
|
||||||
}
|
}
|
||||||
|
@ -634,7 +634,7 @@ func TestStoreWatchExpire(t *testing.T) {
|
||||||
s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(500*time.Millisecond))
|
s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(500*time.Millisecond))
|
||||||
|
|
||||||
w, _ := s.Watch("/", true, false, 0)
|
w, _ := s.Watch("/", true, false, 0)
|
||||||
c := w.EventChan
|
c := w.EventChan()
|
||||||
e := nbselect(c)
|
e := nbselect(c)
|
||||||
assert.Nil(t, e, "")
|
assert.Nil(t, e, "")
|
||||||
time.Sleep(600 * time.Millisecond)
|
time.Sleep(600 * time.Millisecond)
|
||||||
|
@ -642,7 +642,7 @@ func TestStoreWatchExpire(t *testing.T) {
|
||||||
assert.Equal(t, e.Action, "expire", "")
|
assert.Equal(t, e.Action, "expire", "")
|
||||||
assert.Equal(t, e.Node.Key, "/foo", "")
|
assert.Equal(t, e.Node.Key, "/foo", "")
|
||||||
w, _ = s.Watch("/", true, false, 4)
|
w, _ = s.Watch("/", true, false, 4)
|
||||||
e = nbselect(w.EventChan)
|
e = nbselect(w.EventChan())
|
||||||
assert.Equal(t, e.Action, "expire", "")
|
assert.Equal(t, e.Action, "expire", "")
|
||||||
assert.Equal(t, e.Node.Key, "/foofoo", "")
|
assert.Equal(t, e.Node.Key, "/foofoo", "")
|
||||||
}
|
}
|
||||||
|
@ -653,19 +653,19 @@ func TestStoreWatchStream(t *testing.T) {
|
||||||
w, _ := s.Watch("/foo", false, true, 0)
|
w, _ := s.Watch("/foo", false, true, 0)
|
||||||
// first modification
|
// first modification
|
||||||
s.Create("/foo", false, "bar", false, Permanent)
|
s.Create("/foo", false, "bar", false, Permanent)
|
||||||
e := nbselect(w.EventChan)
|
e := nbselect(w.EventChan())
|
||||||
assert.Equal(t, e.Action, "create", "")
|
assert.Equal(t, e.Action, "create", "")
|
||||||
assert.Equal(t, e.Node.Key, "/foo", "")
|
assert.Equal(t, e.Node.Key, "/foo", "")
|
||||||
assert.Equal(t, *e.Node.Value, "bar", "")
|
assert.Equal(t, *e.Node.Value, "bar", "")
|
||||||
e = nbselect(w.EventChan)
|
e = nbselect(w.EventChan())
|
||||||
assert.Nil(t, e, "")
|
assert.Nil(t, e, "")
|
||||||
// second modification
|
// second modification
|
||||||
s.Update("/foo", "baz", Permanent)
|
s.Update("/foo", "baz", Permanent)
|
||||||
e = nbselect(w.EventChan)
|
e = nbselect(w.EventChan())
|
||||||
assert.Equal(t, e.Action, "update", "")
|
assert.Equal(t, e.Action, "update", "")
|
||||||
assert.Equal(t, e.Node.Key, "/foo", "")
|
assert.Equal(t, e.Node.Key, "/foo", "")
|
||||||
assert.Equal(t, *e.Node.Value, "baz", "")
|
assert.Equal(t, *e.Node.Value, "baz", "")
|
||||||
e = nbselect(w.EventChan)
|
e = nbselect(w.EventChan())
|
||||||
assert.Nil(t, e, "")
|
assert.Nil(t, e, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -732,10 +732,10 @@ func TestStoreWatchCreateWithHiddenKey(t *testing.T) {
|
||||||
s := newStore()
|
s := newStore()
|
||||||
w, _ := s.Watch("/_foo", false, false, 0)
|
w, _ := s.Watch("/_foo", false, false, 0)
|
||||||
s.Create("/_foo", false, "bar", false, Permanent)
|
s.Create("/_foo", false, "bar", false, Permanent)
|
||||||
e := nbselect(w.EventChan)
|
e := nbselect(w.EventChan())
|
||||||
assert.Equal(t, e.Action, "create", "")
|
assert.Equal(t, e.Action, "create", "")
|
||||||
assert.Equal(t, e.Node.Key, "/_foo", "")
|
assert.Equal(t, e.Node.Key, "/_foo", "")
|
||||||
e = nbselect(w.EventChan)
|
e = nbselect(w.EventChan())
|
||||||
assert.Nil(t, e, "")
|
assert.Nil(t, e, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -744,14 +744,14 @@ func TestStoreWatchRecursiveCreateWithHiddenKey(t *testing.T) {
|
||||||
s := newStore()
|
s := newStore()
|
||||||
w, _ := s.Watch("/foo", true, false, 0)
|
w, _ := s.Watch("/foo", true, false, 0)
|
||||||
s.Create("/foo/_bar", false, "baz", false, Permanent)
|
s.Create("/foo/_bar", false, "baz", false, Permanent)
|
||||||
e := nbselect(w.EventChan)
|
e := nbselect(w.EventChan())
|
||||||
assert.Nil(t, e, "")
|
assert.Nil(t, e, "")
|
||||||
w, _ = s.Watch("/foo", true, false, 0)
|
w, _ = s.Watch("/foo", true, false, 0)
|
||||||
s.Create("/foo/_baz", true, "", false, Permanent)
|
s.Create("/foo/_baz", true, "", false, Permanent)
|
||||||
e = nbselect(w.EventChan)
|
e = nbselect(w.EventChan())
|
||||||
assert.Nil(t, e, "")
|
assert.Nil(t, e, "")
|
||||||
s.Create("/foo/_baz/quux", false, "quux", false, Permanent)
|
s.Create("/foo/_baz/quux", false, "quux", false, Permanent)
|
||||||
e = nbselect(w.EventChan)
|
e = nbselect(w.EventChan())
|
||||||
assert.Nil(t, e, "")
|
assert.Nil(t, e, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -761,10 +761,10 @@ func TestStoreWatchUpdateWithHiddenKey(t *testing.T) {
|
||||||
s.Create("/_foo", false, "bar", false, Permanent)
|
s.Create("/_foo", false, "bar", false, Permanent)
|
||||||
w, _ := s.Watch("/_foo", false, false, 0)
|
w, _ := s.Watch("/_foo", false, false, 0)
|
||||||
s.Update("/_foo", "baz", Permanent)
|
s.Update("/_foo", "baz", Permanent)
|
||||||
e := nbselect(w.EventChan)
|
e := nbselect(w.EventChan())
|
||||||
assert.Equal(t, e.Action, "update", "")
|
assert.Equal(t, e.Action, "update", "")
|
||||||
assert.Equal(t, e.Node.Key, "/_foo", "")
|
assert.Equal(t, e.Node.Key, "/_foo", "")
|
||||||
e = nbselect(w.EventChan)
|
e = nbselect(w.EventChan())
|
||||||
assert.Nil(t, e, "")
|
assert.Nil(t, e, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -774,7 +774,7 @@ func TestStoreWatchRecursiveUpdateWithHiddenKey(t *testing.T) {
|
||||||
s.Create("/foo/_bar", false, "baz", false, Permanent)
|
s.Create("/foo/_bar", false, "baz", false, Permanent)
|
||||||
w, _ := s.Watch("/foo", true, false, 0)
|
w, _ := s.Watch("/foo", true, false, 0)
|
||||||
s.Update("/foo/_bar", "baz", Permanent)
|
s.Update("/foo/_bar", "baz", Permanent)
|
||||||
e := nbselect(w.EventChan)
|
e := nbselect(w.EventChan())
|
||||||
assert.Nil(t, e, "")
|
assert.Nil(t, e, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -784,10 +784,10 @@ func TestStoreWatchDeleteWithHiddenKey(t *testing.T) {
|
||||||
s.Create("/_foo", false, "bar", false, Permanent)
|
s.Create("/_foo", false, "bar", false, Permanent)
|
||||||
w, _ := s.Watch("/_foo", false, false, 0)
|
w, _ := s.Watch("/_foo", false, false, 0)
|
||||||
s.Delete("/_foo", false, false)
|
s.Delete("/_foo", false, false)
|
||||||
e := nbselect(w.EventChan)
|
e := nbselect(w.EventChan())
|
||||||
assert.Equal(t, e.Action, "delete", "")
|
assert.Equal(t, e.Action, "delete", "")
|
||||||
assert.Equal(t, e.Node.Key, "/_foo", "")
|
assert.Equal(t, e.Node.Key, "/_foo", "")
|
||||||
e = nbselect(w.EventChan)
|
e = nbselect(w.EventChan())
|
||||||
assert.Nil(t, e, "")
|
assert.Nil(t, e, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -797,7 +797,7 @@ func TestStoreWatchRecursiveDeleteWithHiddenKey(t *testing.T) {
|
||||||
s.Create("/foo/_bar", false, "baz", false, Permanent)
|
s.Create("/foo/_bar", false, "baz", false, Permanent)
|
||||||
w, _ := s.Watch("/foo", true, false, 0)
|
w, _ := s.Watch("/foo", true, false, 0)
|
||||||
s.Delete("/foo/_bar", false, false)
|
s.Delete("/foo/_bar", false, false)
|
||||||
e := nbselect(w.EventChan)
|
e := nbselect(w.EventChan())
|
||||||
assert.Nil(t, e, "")
|
assert.Nil(t, e, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -815,7 +815,7 @@ func TestStoreWatchExpireWithHiddenKey(t *testing.T) {
|
||||||
s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(1000*time.Millisecond))
|
s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(1000*time.Millisecond))
|
||||||
|
|
||||||
w, _ := s.Watch("/", true, false, 0)
|
w, _ := s.Watch("/", true, false, 0)
|
||||||
c := w.EventChan
|
c := w.EventChan()
|
||||||
e := nbselect(c)
|
e := nbselect(c)
|
||||||
assert.Nil(t, e, "")
|
assert.Nil(t, e, "")
|
||||||
time.Sleep(600 * time.Millisecond)
|
time.Sleep(600 * time.Millisecond)
|
||||||
|
@ -833,7 +833,7 @@ func TestStoreWatchRecursiveCreateDeeperThanHiddenKey(t *testing.T) {
|
||||||
w, _ := s.Watch("/_foo/bar", true, false, 0)
|
w, _ := s.Watch("/_foo/bar", true, false, 0)
|
||||||
s.Create("/_foo/bar/baz", false, "baz", false, Permanent)
|
s.Create("/_foo/bar/baz", false, "baz", false, Permanent)
|
||||||
|
|
||||||
e := nbselect(w.EventChan)
|
e := nbselect(w.EventChan())
|
||||||
assert.NotNil(t, e, "")
|
assert.NotNil(t, e, "")
|
||||||
assert.Equal(t, e.Action, "create", "")
|
assert.Equal(t, e.Action, "create", "")
|
||||||
assert.Equal(t, e.Node.Key, "/_foo/bar/baz", "")
|
assert.Equal(t, e.Node.Key, "/_foo/bar/baz", "")
|
||||||
|
@ -841,7 +841,7 @@ func TestStoreWatchRecursiveCreateDeeperThanHiddenKey(t *testing.T) {
|
||||||
|
|
||||||
// Ensure that slow consumers are handled properly.
|
// Ensure that slow consumers are handled properly.
|
||||||
//
|
//
|
||||||
// Since Watcher.EventChan has a buffer of size 1 we can only queue 1
|
// Since Watcher.EventChan() has a buffer of size 1 we can only queue 1
|
||||||
// event per watcher. If the consumer cannot consume the event on time and
|
// event per watcher. If the consumer cannot consume the event on time and
|
||||||
// another event arrives, the channel is closed and event is discarded.
|
// another event arrives, the channel is closed and event is discarded.
|
||||||
// This test ensures that after closing the channel, the store can continue
|
// This test ensures that after closing the channel, the store can continue
|
||||||
|
|
|
@ -16,8 +16,13 @@ limitations under the License.
|
||||||
|
|
||||||
package store
|
package store
|
||||||
|
|
||||||
type Watcher struct {
|
type Watcher interface {
|
||||||
EventChan chan *Event
|
EventChan() chan *Event
|
||||||
|
Remove()
|
||||||
|
}
|
||||||
|
|
||||||
|
type watcher struct {
|
||||||
|
eventChan chan *Event
|
||||||
stream bool
|
stream bool
|
||||||
recursive bool
|
recursive bool
|
||||||
sinceIndex uint64
|
sinceIndex uint64
|
||||||
|
@ -26,9 +31,13 @@ type Watcher struct {
|
||||||
remove func()
|
remove func()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *watcher) EventChan() chan *Event {
|
||||||
|
return w.eventChan
|
||||||
|
}
|
||||||
|
|
||||||
// notify function notifies the watcher. If the watcher interests in the given path,
|
// notify function notifies the watcher. If the watcher interests in the given path,
|
||||||
// the function will return true.
|
// the function will return true.
|
||||||
func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool {
|
func (w *watcher) notify(e *Event, originalPath bool, deleted bool) bool {
|
||||||
// watcher is interested the path in three cases and under one condition
|
// watcher is interested the path in three cases and under one condition
|
||||||
// the condition is that the event happens after the watcher's sinceIndex
|
// the condition is that the event happens after the watcher's sinceIndex
|
||||||
|
|
||||||
|
@ -45,15 +54,15 @@ func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool {
|
||||||
// For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher
|
// For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher
|
||||||
// should get notified even if "/foo" is not the path it is watching.
|
// should get notified even if "/foo" is not the path it is watching.
|
||||||
if (w.recursive || originalPath || deleted) && e.Index() >= w.sinceIndex {
|
if (w.recursive || originalPath || deleted) && e.Index() >= w.sinceIndex {
|
||||||
// We cannot block here if the EventChan capacity is full, otherwise
|
// We cannot block here if the eventChan capacity is full, otherwise
|
||||||
// etcd will hang. EventChan capacity is full when the rate of
|
// etcd will hang. eventChan capacity is full when the rate of
|
||||||
// notifications are higher than our send rate.
|
// notifications are higher than our send rate.
|
||||||
// If this happens, we close the channel.
|
// If this happens, we close the channel.
|
||||||
select {
|
select {
|
||||||
case w.EventChan <- e:
|
case w.eventChan <- e:
|
||||||
default:
|
default:
|
||||||
// We have missed a notification. Remove the watcher.
|
// We have missed a notification. Remove the watcher.
|
||||||
// Removing the watcher also closes the EventChan.
|
// Removing the watcher also closes the eventChan.
|
||||||
w.remove()
|
w.remove()
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
|
@ -63,11 +72,11 @@ func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool {
|
||||||
|
|
||||||
// Remove removes the watcher from watcherHub
|
// Remove removes the watcher from watcherHub
|
||||||
// The actual remove function is guaranteed to only be executed once
|
// The actual remove function is guaranteed to only be executed once
|
||||||
func (w *Watcher) Remove() {
|
func (w *watcher) Remove() {
|
||||||
w.hub.mutex.Lock()
|
w.hub.mutex.Lock()
|
||||||
defer w.hub.mutex.Unlock()
|
defer w.hub.mutex.Unlock()
|
||||||
|
|
||||||
close(w.EventChan)
|
close(w.eventChan)
|
||||||
if w.remove != nil {
|
if w.remove != nil {
|
||||||
w.remove()
|
w.remove()
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,19 +34,19 @@ func newWatchHub(capacity int) *watcherHub {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Watch function returns a watcher.
|
// Watch function returns a Watcher.
|
||||||
// If recursive is true, the first change after index under key will be sent to the event channel of the watcher.
|
// If recursive is true, the first change after index under key will be sent to the event channel of the watcher.
|
||||||
// If recursive is false, the first change after index at key will be sent to the event channel of the watcher.
|
// If recursive is false, the first change after index at key will be sent to the event channel of the watcher.
|
||||||
// If index is zero, watch will start from the current index + 1.
|
// If index is zero, watch will start from the current index + 1.
|
||||||
func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (*Watcher, *etcdErr.Error) {
|
func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (Watcher, *etcdErr.Error) {
|
||||||
event, err := wh.EventHistory.scan(key, recursive, index)
|
event, err := wh.EventHistory.scan(key, recursive, index)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
w := &Watcher{
|
w := &watcher{
|
||||||
EventChan: make(chan *Event, 1), // use a buffered channel
|
eventChan: make(chan *Event, 1), // use a buffered channel
|
||||||
recursive: recursive,
|
recursive: recursive,
|
||||||
stream: stream,
|
stream: stream,
|
||||||
sinceIndex: index,
|
sinceIndex: index,
|
||||||
|
@ -54,7 +54,7 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (*
|
||||||
}
|
}
|
||||||
|
|
||||||
if event != nil {
|
if event != nil {
|
||||||
w.EventChan <- event
|
w.eventChan <- event
|
||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,7 +75,7 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (*
|
||||||
}
|
}
|
||||||
|
|
||||||
w.remove = func() {
|
w.remove = func() {
|
||||||
if w.removed { // avoid remove it twice
|
if w.removed { // avoid removing it twice
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
w.removed = true
|
w.removed = true
|
||||||
|
@ -121,7 +121,7 @@ func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) {
|
||||||
for curr != nil {
|
for curr != nil {
|
||||||
next := curr.Next() // save reference to the next one in the list
|
next := curr.Next() // save reference to the next one in the list
|
||||||
|
|
||||||
w, _ := curr.Value.(*Watcher)
|
w, _ := curr.Value.(*watcher)
|
||||||
|
|
||||||
originalPath := (e.Node.Key == nodePath)
|
originalPath := (e.Node.Key == nodePath)
|
||||||
if (originalPath || !isHidden(nodePath, e.Node.Key)) && w.notify(e, originalPath, deleted) {
|
if (originalPath || !isHidden(nodePath, e.Node.Key)) && w.notify(e, originalPath, deleted) {
|
||||||
|
|
|
@ -27,7 +27,7 @@ func TestWatcher(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("%v", err)
|
t.Fatalf("%v", err)
|
||||||
}
|
}
|
||||||
c := w.EventChan
|
c := w.EventChan()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-c:
|
case <-c:
|
||||||
|
@ -47,7 +47,7 @@ func TestWatcher(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
w, _ = wh.watch("/foo", false, false, 2)
|
w, _ = wh.watch("/foo", false, false, 2)
|
||||||
c = w.EventChan
|
c = w.EventChan()
|
||||||
|
|
||||||
e = newEvent(Create, "/foo/bar", 2, 2)
|
e = newEvent(Create, "/foo/bar", 2, 2)
|
||||||
|
|
||||||
|
@ -72,7 +72,7 @@ func TestWatcher(t *testing.T) {
|
||||||
|
|
||||||
// ensure we are doing exact matching rather than prefix matching
|
// ensure we are doing exact matching rather than prefix matching
|
||||||
w, _ = wh.watch("/fo", true, false, 1)
|
w, _ = wh.watch("/fo", true, false, 1)
|
||||||
c = w.EventChan
|
c = w.EventChan()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case re = <-c:
|
case re = <-c:
|
||||||
|
|
Loading…
Reference in New Issue