Merge pull request #498 from tobz/actually-hidden-keys
fix(store): properly hide hidden keys from watchers, not just getsrelease-0.4
commit
6b70efbe30
|
@ -290,6 +290,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
|
|||
s.CurrentIndex++
|
||||
|
||||
s.WatcherHub.notify(e)
|
||||
|
||||
s.Stats.Inc(DeleteSuccess)
|
||||
|
||||
return e, nil
|
||||
|
@ -514,6 +515,7 @@ func (s *store) internalCreate(nodePath string, dir bool, value string, unique,
|
|||
s.CurrentIndex = nextIndex
|
||||
|
||||
s.WatcherHub.notify(e)
|
||||
|
||||
return e, nil
|
||||
}
|
||||
|
||||
|
@ -568,6 +570,7 @@ func (s *store) DeleteExpiredKeys(cutoff time.Time) {
|
|||
node.Remove(true, true, callback)
|
||||
|
||||
s.Stats.Inc(ExpireCount)
|
||||
|
||||
s.WatcherHub.notify(e)
|
||||
}
|
||||
|
||||
|
|
|
@ -682,6 +682,106 @@ func TestStoreRecoverWithExpiration(t *testing.T) {
|
|||
assert.Nil(t, e, "")
|
||||
}
|
||||
|
||||
// Ensure that the store can watch for hidden keys as long as it's an exact path match.
|
||||
func TestStoreWatchCreateWithHiddenKey(t *testing.T) {
|
||||
s := newStore()
|
||||
w, _ := s.Watch("/_foo", false, false, 0)
|
||||
s.Create("/_foo", false, "bar", false, Permanent)
|
||||
e := nbselect(w.EventChan)
|
||||
assert.Equal(t, e.Action, "create", "")
|
||||
assert.Equal(t, e.Node.Key, "/_foo", "")
|
||||
e = nbselect(w.EventChan)
|
||||
assert.Nil(t, e, "")
|
||||
}
|
||||
|
||||
// Ensure that the store doesn't see hidden key creates without an exact path match in recursive mode.
|
||||
func TestStoreWatchRecursiveCreateWithHiddenKey(t *testing.T) {
|
||||
s := newStore()
|
||||
w, _ := s.Watch("/foo", true, false, 0)
|
||||
s.Create("/foo/_bar", false, "baz", false, Permanent)
|
||||
e := nbselect(w.EventChan)
|
||||
assert.Nil(t, e, "")
|
||||
w, _ = s.Watch("/foo", true, false, 0)
|
||||
s.Create("/foo/_baz", true, "", false, Permanent)
|
||||
e = nbselect(w.EventChan)
|
||||
assert.Nil(t, e, "")
|
||||
s.Create("/foo/_baz/quux", false, "quux", false, Permanent)
|
||||
e = nbselect(w.EventChan)
|
||||
assert.Nil(t, e, "")
|
||||
}
|
||||
|
||||
// Ensure that the store doesn't see hidden key updates.
|
||||
func TestStoreWatchUpdateWithHiddenKey(t *testing.T) {
|
||||
s := newStore()
|
||||
s.Create("/_foo", false, "bar", false, Permanent)
|
||||
w, _ := s.Watch("/_foo", false, false, 0)
|
||||
s.Update("/_foo", "baz", Permanent)
|
||||
e := nbselect(w.EventChan)
|
||||
assert.Equal(t, e.Action, "update", "")
|
||||
assert.Equal(t, e.Node.Key, "/_foo", "")
|
||||
e = nbselect(w.EventChan)
|
||||
assert.Nil(t, e, "")
|
||||
}
|
||||
|
||||
// Ensure that the store doesn't see hidden key updates without an exact path match in recursive mode.
|
||||
func TestStoreWatchRecursiveUpdateWithHiddenKey(t *testing.T) {
|
||||
s := newStore()
|
||||
s.Create("/foo/_bar", false, "baz", false, Permanent)
|
||||
w, _ := s.Watch("/foo", true, false, 0)
|
||||
s.Update("/foo/_bar", "baz", Permanent)
|
||||
e := nbselect(w.EventChan)
|
||||
assert.Nil(t, e, "")
|
||||
}
|
||||
|
||||
// Ensure that the store can watch for key deletions.
|
||||
func TestStoreWatchDeleteWithHiddenKey(t *testing.T) {
|
||||
s := newStore()
|
||||
s.Create("/_foo", false, "bar", false, Permanent)
|
||||
w, _ := s.Watch("/_foo", false, false, 0)
|
||||
s.Delete("/_foo", false, false)
|
||||
e := nbselect(w.EventChan)
|
||||
assert.Equal(t, e.Action, "delete", "")
|
||||
assert.Equal(t, e.Node.Key, "/_foo", "")
|
||||
e = nbselect(w.EventChan)
|
||||
assert.Nil(t, e, "")
|
||||
}
|
||||
|
||||
// Ensure that the store doesn't see hidden key deletes without an exact path match in recursive mode.
|
||||
func TestStoreWatchRecursiveDeleteWithHiddenKey(t *testing.T) {
|
||||
s := newStore()
|
||||
s.Create("/foo/_bar", false, "baz", false, Permanent)
|
||||
w, _ := s.Watch("/foo", true, false, 0)
|
||||
s.Delete("/foo/_bar", false, false)
|
||||
e := nbselect(w.EventChan)
|
||||
assert.Nil(t, e, "")
|
||||
}
|
||||
|
||||
// Ensure that the store doesn't see expirations of hidden keys.
|
||||
func TestStoreWatchExpireWithHiddenKey(t *testing.T) {
|
||||
s := newStore()
|
||||
|
||||
stopChan := make(chan bool)
|
||||
defer func() {
|
||||
stopChan <- true
|
||||
}()
|
||||
go mockSyncService(s.DeleteExpiredKeys, stopChan)
|
||||
|
||||
s.Create("/_foo", false, "bar", false, time.Now().Add(500*time.Millisecond))
|
||||
s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(1000*time.Millisecond))
|
||||
|
||||
w, _ := s.Watch("/", true, false, 0)
|
||||
c := w.EventChan
|
||||
e := nbselect(c)
|
||||
assert.Nil(t, e, "")
|
||||
time.Sleep(600 * time.Millisecond)
|
||||
e = nbselect(c)
|
||||
assert.Nil(t, e, "")
|
||||
time.Sleep(600 * time.Millisecond)
|
||||
e = nbselect(c)
|
||||
assert.Equal(t, e.Action, "expire", "")
|
||||
assert.Equal(t, e.Node.Key, "/foofoo", "")
|
||||
}
|
||||
|
||||
// Performs a non-blocking select on an event channel.
|
||||
func nbselect(c <-chan *Event) *Event {
|
||||
select {
|
||||
|
|
|
@ -113,11 +113,11 @@ func (wh *watcherHub) notify(e *Event) {
|
|||
}
|
||||
}
|
||||
|
||||
func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) {
|
||||
func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) {
|
||||
wh.mutex.Lock()
|
||||
defer wh.mutex.Unlock()
|
||||
|
||||
l, ok := wh.watchers[path]
|
||||
l, ok := wh.watchers[nodePath]
|
||||
if ok {
|
||||
curr := l.Front()
|
||||
|
||||
|
@ -126,7 +126,8 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) {
|
|||
|
||||
w, _ := curr.Value.(*Watcher)
|
||||
|
||||
if w.notify(e, e.Node.Key == path, deleted) {
|
||||
originalPath := (e.Node.Key == nodePath)
|
||||
if (originalPath || !isHidden(e.Node.Key)) && w.notify(e, originalPath, deleted) {
|
||||
if !w.stream { // do not remove the stream watcher
|
||||
// if we successfully notify a watcher
|
||||
// we need to remove the watcher from the list
|
||||
|
@ -142,7 +143,7 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) {
|
|||
if l.Len() == 0 {
|
||||
// if we have notified all watcher in the list
|
||||
// we can delete the list
|
||||
delete(wh.watchers, path)
|
||||
delete(wh.watchers, nodePath)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -156,3 +157,9 @@ func (wh *watcherHub) clone() *watcherHub {
|
|||
EventHistory: clonedHistory,
|
||||
}
|
||||
}
|
||||
|
||||
// isHidden checks to see if this path is considered hidden i.e. the
|
||||
// last element is hidden or it's within a hidden directory
|
||||
func isHidden(nodePath string) bool {
|
||||
return strings.Contains(nodePath, "/_")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue