From 59ccefee0f43d5c96b776f21d57952b129425b14 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 28 Dec 2013 14:55:50 +0800 Subject: [PATCH] fix(watchhub.go) add a lock to protect the hashmap --- store/store.go | 4 ++-- store/watcher_hub.go | 19 ++++++++++++++----- store/watcher_test.go | 6 +++--- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/store/store.go b/store/store.go index 15c045d41..6d78e18b4 100644 --- a/store/store.go +++ b/store/store.go @@ -352,10 +352,10 @@ func (s *store) NewWatcher(key string, recursive bool, sinceIndex uint64) (*Watc var err *etcdErr.Error if sinceIndex == 0 { - w, err = s.WatcherHub.watch(key, recursive, nextIndex) + w, err = s.WatcherHub.newWatcher(key, recursive, nextIndex) } else { - w, err = s.WatcherHub.watch(key, recursive, sinceIndex) + w, err = s.WatcherHub.newWatcher(key, recursive, sinceIndex) } if err != nil { diff --git a/store/watcher_hub.go b/store/watcher_hub.go index a8865d49d..64b3bdcf8 100644 --- a/store/watcher_hub.go +++ b/store/watcher_hub.go @@ -4,6 +4,7 @@ import ( "container/list" "path" "strings" + "sync" "sync/atomic" etcdErr "github.com/coreos/etcd/error" @@ -16,6 +17,7 @@ import ( // event happens between the end of the first watch command and the start // of the second command. type watcherHub struct { + mutex sync.Mutex // protect the hash map watchers map[string]*list.List count int64 // current number of watchers. EventHistory *EventHistory @@ -32,11 +34,11 @@ func newWatchHub(capacity int) *watcherHub { } } -// watch function returns an Event channel. -// If recursive is true, the first change after index under key will be sent to the event channel. -// If recursive is false, the first change after index at key will be sent to the event channel. +// newWatcher function returns a watcher. +// If recursive is true, the first change after index under key will be sent to the event channel of the watcher. +// If recursive is false, the first change after index at key will be sent to the event channel of the watcher. // If index is zero, watch will start from the current index + 1. -func (wh *watcherHub) watch(key string, recursive bool, index uint64) (*Watcher, *etcdErr.Error) { +func (wh *watcherHub) newWatcher(key string, recursive bool, index uint64) (*Watcher, *etcdErr.Error) { event, err := wh.EventHistory.scan(key, recursive, index) if err != nil { @@ -51,10 +53,12 @@ func (wh *watcherHub) watch(key string, recursive bool, index uint64) (*Watcher, if event != nil { w.EventChan <- event - return w, nil } + wh.mutex.Lock() + defer wh.mutex.Unlock() + l, ok := wh.watchers[key] var elem *list.Element @@ -69,6 +73,8 @@ func (wh *watcherHub) watch(key string, recursive bool, index uint64) (*Watcher, } w.Remove = func() { + wh.mutex.Lock() + defer wh.mutex.Unlock() l.Remove(elem) if l.Len() == 0 { delete(wh.watchers, key) @@ -100,6 +106,9 @@ func (wh *watcherHub) notify(e *Event) { } func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) { + wh.mutex.Lock() + defer wh.mutex.Unlock() + l, ok := wh.watchers[path] if ok { curr := l.Front() diff --git a/store/watcher_test.go b/store/watcher_test.go index 2600fa161..aa485645a 100644 --- a/store/watcher_test.go +++ b/store/watcher_test.go @@ -23,7 +23,7 @@ import ( func TestWatcher(t *testing.T) { s := newStore() wh := s.WatcherHub - w, err := wh.watch("/foo", true, 1) + w, err := wh.newWatcher("/foo", true, 1) if err != nil { t.Fatalf("%v", err) } @@ -46,7 +46,7 @@ func TestWatcher(t *testing.T) { t.Fatal("recv != send") } - w, _ = wh.watch("/foo", false, 2) + w, _ = wh.newWatcher("/foo", false, 2) c = w.EventChan e = newEvent(Create, "/foo/bar", 2, 2) @@ -71,7 +71,7 @@ func TestWatcher(t *testing.T) { } // ensure we are doing exact matching rather than prefix matching - w, _ = wh.watch("/fo", true, 1) + w, _ = wh.newWatcher("/fo", true, 1) c = w.EventChan select {