Merge pull request #418 from xiangli-cmu/cancel_watcher

cancel watcher
release-0.4
Xiang Li 2014-01-08 21:34:32 -08:00
commit 2bfb8f5e4f
7 changed files with 98 additions and 67 deletions

View File

@ -25,11 +25,11 @@ func WatchKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
} }
// Start the watcher on the store. // Start the watcher on the store.
c, err := s.Store().Watch(key, false, sinceIndex) watcher, err := s.Store().Watch(key, false, sinceIndex)
if err != nil { if err != nil {
return etcdErr.NewError(500, key, s.Store().Index()) return etcdErr.NewError(500, key, s.Store().Index())
} }
event := <-c event := <-watcher.EventChan
// Convert event to a response and write to client. // Convert event to a response and write to client.
b, _ := json.Marshal(event.Response(s.Store().Index())) b, _ := json.Marshal(event.Response(s.Store().Index()))

View File

@ -55,7 +55,7 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
} }
// Start the watcher on the store. // Start the watcher on the store.
eventChan, err := s.Store().Watch(key, recursive, sinceIndex) watcher, err := s.Store().Watch(key, recursive, sinceIndex)
if err != nil { if err != nil {
return err return err
} }
@ -65,8 +65,9 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
select { select {
case <-closeChan: case <-closeChan:
watcher.Remove()
return nil return nil
case event = <-eventChan: case event = <-watcher.EventChan:
} }
} else { //get } else { //get

View File

@ -52,7 +52,8 @@ type Store interface {
value string, expireTime time.Time) (*Event, error) value string, expireTime time.Time) (*Event, error)
Delete(nodePath string, recursive, dir bool) (*Event, error) Delete(nodePath string, recursive, dir bool) (*Event, error)
CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan *Event, error)
Watch(prefix string, recursive bool, sinceIndex uint64) (*Watcher, error)
Save() ([]byte, error) Save() ([]byte, error)
Recovery(state []byte) error Recovery(state []byte) error
@ -339,21 +340,21 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui
return e, nil return e, nil
} }
func (s *store) Watch(key string, recursive bool, sinceIndex uint64) (<-chan *Event, error) { func (s *store) Watch(key string, recursive bool, sinceIndex uint64) (*Watcher, error) {
key = path.Clean(path.Join("/", key)) key = path.Clean(path.Join("/", key))
nextIndex := s.CurrentIndex + 1 nextIndex := s.CurrentIndex + 1
s.worldLock.RLock() s.worldLock.RLock()
defer s.worldLock.RUnlock() defer s.worldLock.RUnlock()
var c <-chan *Event var w *Watcher
var err *etcdErr.Error var err *etcdErr.Error
if sinceIndex == 0 { if sinceIndex == 0 {
c, err = s.WatcherHub.watch(key, recursive, nextIndex) w, err = s.WatcherHub.watch(key, recursive, nextIndex)
} else { } else {
c, err = s.WatcherHub.watch(key, recursive, sinceIndex) w, err = s.WatcherHub.watch(key, recursive, sinceIndex)
} }
if err != nil { if err != nil {
@ -363,7 +364,7 @@ func (s *store) Watch(key string, recursive bool, sinceIndex uint64) (<-chan *Ev
return nil, err return nil, err
} }
return c, nil return w, nil
} }
// walk function walks all the nodePath and apply the walkFunc on each directory // walk function walks all the nodePath and apply the walkFunc on each directory

View File

@ -489,7 +489,8 @@ func TestStoreCompareAndSwapPrevIndexFailsIfNotMatch(t *testing.T) {
// Ensure that the store can watch for key creation. // Ensure that the store can watch for key creation.
func TestStoreWatchCreate(t *testing.T) { func TestStoreWatchCreate(t *testing.T) {
s := newStore() s := newStore()
c, _ := s.Watch("/foo", false, 0) w, _ := s.Watch("/foo", false, 0)
c := w.EventChan
s.Create("/foo", false, "bar", false, Permanent) s.Create("/foo", false, "bar", false, Permanent)
e := nbselect(c) e := nbselect(c)
assert.Equal(t, e.Action, "create", "") assert.Equal(t, e.Action, "create", "")
@ -501,9 +502,9 @@ func TestStoreWatchCreate(t *testing.T) {
// Ensure that the store can watch for recursive key creation. // Ensure that the store can watch for recursive key creation.
func TestStoreWatchRecursiveCreate(t *testing.T) { func TestStoreWatchRecursiveCreate(t *testing.T) {
s := newStore() s := newStore()
c, _ := s.Watch("/foo", true, 0) w, _ := s.Watch("/foo", true, 0)
s.Create("/foo/bar", false, "baz", false, Permanent) s.Create("/foo/bar", false, "baz", false, Permanent)
e := nbselect(c) e := nbselect(w.EventChan)
assert.Equal(t, e.Action, "create", "") assert.Equal(t, e.Action, "create", "")
assert.Equal(t, e.Node.Key, "/foo/bar", "") assert.Equal(t, e.Node.Key, "/foo/bar", "")
} }
@ -512,9 +513,9 @@ func TestStoreWatchRecursiveCreate(t *testing.T) {
func TestStoreWatchUpdate(t *testing.T) { func TestStoreWatchUpdate(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", false, "bar", false, Permanent) s.Create("/foo", false, "bar", false, Permanent)
c, _ := s.Watch("/foo", false, 0) w, _ := s.Watch("/foo", false, 0)
s.Update("/foo", "baz", Permanent) s.Update("/foo", "baz", Permanent)
e := nbselect(c) e := nbselect(w.EventChan)
assert.Equal(t, e.Action, "update", "") assert.Equal(t, e.Action, "update", "")
assert.Equal(t, e.Node.Key, "/foo", "") assert.Equal(t, e.Node.Key, "/foo", "")
} }
@ -523,9 +524,9 @@ func TestStoreWatchUpdate(t *testing.T) {
func TestStoreWatchRecursiveUpdate(t *testing.T) { func TestStoreWatchRecursiveUpdate(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo/bar", false, "baz", false, Permanent) s.Create("/foo/bar", false, "baz", false, Permanent)
c, _ := s.Watch("/foo", true, 0) w, _ := s.Watch("/foo", true, 0)
s.Update("/foo/bar", "baz", Permanent) s.Update("/foo/bar", "baz", Permanent)
e := nbselect(c) e := nbselect(w.EventChan)
assert.Equal(t, e.Action, "update", "") assert.Equal(t, e.Action, "update", "")
assert.Equal(t, e.Node.Key, "/foo/bar", "") assert.Equal(t, e.Node.Key, "/foo/bar", "")
} }
@ -534,9 +535,9 @@ func TestStoreWatchRecursiveUpdate(t *testing.T) {
func TestStoreWatchDelete(t *testing.T) { func TestStoreWatchDelete(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", false, "bar", false, Permanent) s.Create("/foo", false, "bar", false, Permanent)
c, _ := s.Watch("/foo", false, 0) w, _ := s.Watch("/foo", false, 0)
s.Delete("/foo", false, false) s.Delete("/foo", false, false)
e := nbselect(c) e := nbselect(w.EventChan)
assert.Equal(t, e.Action, "delete", "") assert.Equal(t, e.Action, "delete", "")
assert.Equal(t, e.Node.Key, "/foo", "") assert.Equal(t, e.Node.Key, "/foo", "")
} }
@ -545,9 +546,9 @@ func TestStoreWatchDelete(t *testing.T) {
func TestStoreWatchRecursiveDelete(t *testing.T) { func TestStoreWatchRecursiveDelete(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo/bar", false, "baz", false, Permanent) s.Create("/foo/bar", false, "baz", false, Permanent)
c, _ := s.Watch("/foo", true, 0) w, _ := s.Watch("/foo", true, 0)
s.Delete("/foo/bar", false, false) s.Delete("/foo/bar", false, false)
e := nbselect(c) e := nbselect(w.EventChan)
assert.Equal(t, e.Action, "delete", "") assert.Equal(t, e.Action, "delete", "")
assert.Equal(t, e.Node.Key, "/foo/bar", "") assert.Equal(t, e.Node.Key, "/foo/bar", "")
} }
@ -556,9 +557,9 @@ func TestStoreWatchRecursiveDelete(t *testing.T) {
func TestStoreWatchCompareAndSwap(t *testing.T) { func TestStoreWatchCompareAndSwap(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo", false, "bar", false, Permanent) s.Create("/foo", false, "bar", false, Permanent)
c, _ := s.Watch("/foo", false, 0) w, _ := s.Watch("/foo", false, 0)
s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent) s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
e := nbselect(c) e := nbselect(w.EventChan)
assert.Equal(t, e.Action, "compareAndSwap", "") assert.Equal(t, e.Action, "compareAndSwap", "")
assert.Equal(t, e.Node.Key, "/foo", "") assert.Equal(t, e.Node.Key, "/foo", "")
} }
@ -567,9 +568,9 @@ func TestStoreWatchCompareAndSwap(t *testing.T) {
func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) { func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) {
s := newStore() s := newStore()
s.Create("/foo/bar", false, "baz", false, Permanent) s.Create("/foo/bar", false, "baz", false, Permanent)
c, _ := s.Watch("/foo", true, 0) w, _ := s.Watch("/foo", true, 0)
s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent) s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent)
e := nbselect(c) e := nbselect(w.EventChan)
assert.Equal(t, e.Action, "compareAndSwap", "") assert.Equal(t, e.Action, "compareAndSwap", "")
assert.Equal(t, e.Node.Key, "/foo/bar", "") assert.Equal(t, e.Node.Key, "/foo/bar", "")
} }
@ -587,15 +588,16 @@ func TestStoreWatchExpire(t *testing.T) {
s.Create("/foo", false, "bar", false, time.Now().Add(500*time.Millisecond)) s.Create("/foo", false, "bar", false, time.Now().Add(500*time.Millisecond))
s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(500*time.Millisecond)) s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(500*time.Millisecond))
c, _ := s.Watch("/", true, 0) w, _ := s.Watch("/", true, 0)
c := w.EventChan
e := nbselect(c) e := nbselect(c)
assert.Nil(t, e, "") assert.Nil(t, e, "")
time.Sleep(600 * time.Millisecond) time.Sleep(600 * time.Millisecond)
e = nbselect(c) e = nbselect(c)
assert.Equal(t, e.Action, "expire", "") assert.Equal(t, e.Action, "expire", "")
assert.Equal(t, e.Node.Key, "/foo", "") assert.Equal(t, e.Node.Key, "/foo", "")
c, _ = s.Watch("/", true, 4) w, _ = s.Watch("/", true, 4)
e = nbselect(c) e = nbselect(w.EventChan)
assert.Equal(t, e.Action, "expire", "") assert.Equal(t, e.Action, "expire", "")
assert.Equal(t, e.Node.Key, "/foofoo", "") assert.Equal(t, e.Node.Key, "/foofoo", "")
} }

View File

@ -16,15 +16,16 @@ limitations under the License.
package store package store
type watcher struct { type Watcher struct {
eventChan chan *Event EventChan chan *Event
recursive bool recursive bool
sinceIndex uint64 sinceIndex uint64
remove func()
} }
// notify function notifies the watcher. If the watcher interests in the given path, // notify function notifies the watcher. If the watcher interests in the given path,
// the function will return true. // the function will return true.
func (w *watcher) notify(e *Event, originalPath bool, deleted bool) bool { func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool {
// watcher is interested the path in three cases and under one condition // watcher is interested the path in three cases and under one condition
// the condition is that the event happens after the watcher's sinceIndex // the condition is that the event happens after the watcher's sinceIndex
@ -41,8 +42,19 @@ func (w *watcher) notify(e *Event, originalPath bool, deleted bool) bool {
// For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher // For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher
// should get notified even if "/foo" is not the path it is watching. // should get notified even if "/foo" is not the path it is watching.
if (w.recursive || originalPath || deleted) && e.Index() >= w.sinceIndex { if (w.recursive || originalPath || deleted) && e.Index() >= w.sinceIndex {
w.eventChan <- e w.EventChan <- e
return true return true
} }
return false return false
} }
// Remove removes the watcher from watcherHub
func (w *Watcher) Remove() {
if w.remove != nil {
w.remove()
} else {
// We attached a remove function to watcher
// Other pkg cannot change it, so this should not happen
panic("missing Watcher remove function")
}
}

View File

@ -4,6 +4,7 @@ import (
"container/list" "container/list"
"path" "path"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
etcdErr "github.com/coreos/etcd/error" etcdErr "github.com/coreos/etcd/error"
@ -16,6 +17,7 @@ import (
// event happens between the end of the first watch command and the start // event happens between the end of the first watch command and the start
// of the second command. // of the second command.
type watcherHub struct { type watcherHub struct {
mutex sync.Mutex // protect the hash map
watchers map[string]*list.List watchers map[string]*list.List
count int64 // current number of watchers. count int64 // current number of watchers.
EventHistory *EventHistory EventHistory *EventHistory
@ -32,45 +34,57 @@ func newWatchHub(capacity int) *watcherHub {
} }
} }
// watch function returns an Event channel. // Watch function returns a watcher.
// If recursive is true, the first change after index under key will be sent to the event channel. // If recursive is true, the first change after index under key will be sent to the event channel of the watcher.
// If recursive is false, the first change after index at key will be sent to the event channel. // If recursive is false, the first change after index at key will be sent to the event channel of the watcher.
// If index is zero, watch will start from the current index + 1. // If index is zero, watch will start from the current index + 1.
func (wh *watcherHub) watch(key string, recursive bool, index uint64) (<-chan *Event, *etcdErr.Error) { func (wh *watcherHub) watch(key string, recursive bool, index uint64) (*Watcher, *etcdErr.Error) {
event, err := wh.EventHistory.scan(key, recursive, index) event, err := wh.EventHistory.scan(key, recursive, index)
if err != nil { if err != nil {
return nil, err return nil, err
} }
eventChan := make(chan *Event, 1) // use a buffered channel w := &Watcher{
EventChan: make(chan *Event, 1), // use a buffered channel
if event != nil {
eventChan <- event
return eventChan, nil
}
w := &watcher{
eventChan: eventChan,
recursive: recursive, recursive: recursive,
sinceIndex: index, sinceIndex: index,
} }
if event != nil {
w.EventChan <- event
return w, nil
}
wh.mutex.Lock()
defer wh.mutex.Unlock()
l, ok := wh.watchers[key] l, ok := wh.watchers[key]
var elem *list.Element
if ok { // add the new watcher to the back of the list if ok { // add the new watcher to the back of the list
l.PushBack(w) elem = l.PushBack(w)
} else { // create a new list and add the new watcher } else { // create a new list and add the new watcher
l := list.New() l = list.New()
l.PushBack(w) elem = l.PushBack(w)
wh.watchers[key] = l wh.watchers[key] = l
} }
w.remove = func() {
wh.mutex.Lock()
defer wh.mutex.Unlock()
l.Remove(elem)
atomic.AddInt64(&wh.count, -1)
if l.Len() == 0 {
delete(wh.watchers, key)
}
}
atomic.AddInt64(&wh.count, 1) atomic.AddInt64(&wh.count, 1)
return eventChan, nil return w, nil
} }
// notify function accepts an event and notify to the watchers. // notify function accepts an event and notify to the watchers.
@ -93,35 +107,33 @@ func (wh *watcherHub) notify(e *Event) {
} }
func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) { func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) {
wh.mutex.Lock()
defer wh.mutex.Unlock()
l, ok := wh.watchers[path] l, ok := wh.watchers[path]
if ok { if ok {
curr := l.Front() curr := l.Front()
for { for curr != nil {
if curr == nil { // we have reached the end of the list
if l.Len() == 0 {
// if we have notified all watcher in the list
// we can delete the list
delete(wh.watchers, path)
}
break
}
next := curr.Next() // save reference to the next one in the list next := curr.Next() // save reference to the next one in the list
w, _ := curr.Value.(*watcher) w, _ := curr.Value.(*Watcher)
if w.notify(e, e.Node.Key == path, deleted) { if w.notify(e, e.Node.Key == path, deleted) {
// if we successfully notify a watcher // if we successfully notify a watcher
// we need to remove the watcher from the list // we need to remove the watcher from the list
// and decrease the counter // and decrease the counter
l.Remove(curr) l.Remove(curr)
atomic.AddInt64(&wh.count, -1) atomic.AddInt64(&wh.count, -1)
} }
curr = next // update current to the next curr = next // update current to the next element in the list
}
if l.Len() == 0 {
// if we have notified all watcher in the list
// we can delete the list
delete(wh.watchers, path)
} }
} }
} }

View File

@ -23,10 +23,11 @@ import (
func TestWatcher(t *testing.T) { func TestWatcher(t *testing.T) {
s := newStore() s := newStore()
wh := s.WatcherHub wh := s.WatcherHub
c, err := wh.watch("/foo", true, 1) w, err := wh.watch("/foo", true, 1)
if err != nil { if err != nil {
t.Fatalf("%v", err) t.Fatalf("%v", err)
} }
c := w.EventChan
select { select {
case <-c: case <-c:
@ -45,7 +46,8 @@ func TestWatcher(t *testing.T) {
t.Fatal("recv != send") t.Fatal("recv != send")
} }
c, _ = wh.watch("/foo", false, 2) w, _ = wh.watch("/foo", false, 2)
c = w.EventChan
e = newEvent(Create, "/foo/bar", 2, 2) e = newEvent(Create, "/foo/bar", 2, 2)
@ -69,7 +71,8 @@ func TestWatcher(t *testing.T) {
} }
// ensure we are doing exact matching rather than prefix matching // ensure we are doing exact matching rather than prefix matching
c, _ = wh.watch("/fo", true, 1) w, _ = wh.watch("/fo", true, 1)
c = w.EventChan
select { select {
case re = <-c: case re = <-c: