From b8ac1d082b7045aeffcfcba5825dbebc3e27a12a Mon Sep 17 00:00:00 2001 From: evan-gu Date: Mon, 30 Sep 2013 22:10:40 -0400 Subject: [PATCH] fix race between Expire() and others, fix UpdateTTL(), modified watcher to catch Expire() --- store/event.go | 29 ++++++++------------ store/event_test.go | 12 ++++----- store/node.go | 16 +++++++---- store/stats_test.go | 12 +++++++++ store/store.go | 5 ++-- store/store_test.go | 63 +++++++++++++++++++------------------------ store/watcher.go | 2 +- store/watcher_test.go | 15 +++++------ 8 files changed, 78 insertions(+), 76 deletions(-) diff --git a/store/event.go b/store/event.go index 4ca6f5e58..e95d50f1a 100644 --- a/store/event.go +++ b/store/event.go @@ -16,6 +16,8 @@ const ( Delete = "delete" TestAndSet = "testAndSet" Expire = "expire" + UndefIndex = 0 + UndefTerm = 0 ) type Event struct { @@ -92,6 +94,7 @@ type EventHistory struct { StartIndex uint64 LastIndex uint64 LastTerm uint64 + DupIndex uint64 rwl sync.RWMutex } @@ -109,12 +112,16 @@ func (eh *EventHistory) addEvent(e *Event) *Event { eh.rwl.Lock() defer eh.rwl.Unlock() - if e.Index == 0 { + DupIndex := uint64(0) + + if e.Index == UndefIndex { e.Index = eh.LastIndex + DupIndex = 1 } - if e.Term == 0 { + if e.Term == UndefTerm { e.Term = eh.LastTerm + DupIndex = 1 } eh.Queue.insert(e) @@ -123,32 +130,18 @@ func (eh *EventHistory) addEvent(e *Event) *Event { eh.LastIndex = e.Index eh.LastTerm = e.Term + eh.DupIndex += DupIndex return e } -// addEvent with the last event's index and term -/*func (eh *EventHistory) addEventWithouIndex(action, key string) (e *Event) { - eh.rwl.Lock() - defer eh.rwl.Unlock() - - LastEvent := eh.Queue.Events[eh.Queue.back()] - e = newEvent(action, key, LastEvent.Index, LastEvent.Term); - - eh.Queue.insert(e) - - eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index - - return e; -}*/ - // scan function is enumerating events from the index in history and // stops till the first point where the key has identified prefix func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) { eh.rwl.RLock() defer eh.rwl.RUnlock() - start := index - eh.StartIndex + start := index - eh.StartIndex + eh.DupIndex // the index should locate after the event history's StartIndex // and before its size diff --git a/store/event_test.go b/store/event_test.go index 0d19dd52a..e5b35061c 100644 --- a/store/event_test.go +++ b/store/event_test.go @@ -13,7 +13,7 @@ func TestEventQueue(t *testing.T) { // Add for i := 0; i < 200; i++ { - e := newEvent(Create, "/foo", uint64(i), 0) + e := newEvent(Create, "/foo", uint64(i), 1) eh.addEvent(e) } @@ -37,11 +37,11 @@ func TestScanHistory(t *testing.T) { eh := newEventHistory(100) // Add - eh.addEvent(newEvent(Create, "/foo", 1, 0)) - eh.addEvent(newEvent(Create, "/foo/bar", 2, 0)) - eh.addEvent(newEvent(Create, "/foo/foo", 3, 0)) - eh.addEvent(newEvent(Create, "/foo/bar/bar", 4, 0)) - eh.addEvent(newEvent(Create, "/foo/foo/foo", 5, 0)) + eh.addEvent(newEvent(Create, "/foo", 1, 1)) + eh.addEvent(newEvent(Create, "/foo/bar", 2, 1)) + eh.addEvent(newEvent(Create, "/foo/foo", 3, 1)) + eh.addEvent(newEvent(Create, "/foo/bar/bar", 4, 1)) + eh.addEvent(newEvent(Create, "/foo/foo/foo", 5, 1)) e, err := eh.scan("/foo", 1) if err != nil || e.Index != 1 { diff --git a/store/node.go b/store/node.go index 214a33fa0..f82f01cd6 100644 --- a/store/node.go +++ b/store/node.go @@ -66,6 +66,7 @@ func newDir(nodePath string, createIndex uint64, createTerm uint64, parent *Node // If the node is a directory and recursive is true, the function will recursively remove // add nodes under the receiver node. func (n *Node) Remove(recursive bool, callback func(path string)) error { + n.mu.Lock() defer n.mu.Unlock() @@ -87,6 +88,7 @@ func (n *Node) Remove(recursive bool, callback func(path string)) error { n.stopExpire <- true n.status = removed + } return nil @@ -265,14 +267,14 @@ func (n *Node) Expire(s *Store) { select { // if timeout, delete the node case <-time.After(duration): - e := newEvent(Expire, n.Path, 0, 0) + s.worldLock.Lock() + e := newEvent(Expire, n.Path, UndefIndex, UndefTerm) + s.WatcherHub.notify(e) n.Remove(true, nil) - s.Stats.Inc(ExpireCount) - s.WatcherHub.notify(e) - + s.worldLock.Unlock() return // if stopped, return @@ -364,7 +366,11 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair { func (n *Node) UpdateTTL(expireTime time.Time, s *Store) { if !n.IsPermanent() { - n.stopExpire <- true // suspend it to modify the expiration + expired, _ := n.IsExpired() + + if !expired { + n.stopExpire <- true // suspend it to modify the expiration + } } if expireTime.Sub(Permanent) != 0 { diff --git a/store/stats_test.go b/store/stats_test.go index 207df825f..52cd1c8f8 100644 --- a/store/stats_test.go +++ b/store/stats_test.go @@ -24,6 +24,8 @@ func TestBasicStats(t *testing.T) { } } + //fmt.Println("create") + time.Sleep(time.Second * 3) for _, k := range keys { @@ -35,6 +37,8 @@ func TestBasicStats(t *testing.T) { } } + //fmt.Println("get") + for _, k := range keys { i++ _, err := s.Update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1) @@ -45,6 +49,8 @@ func TestBasicStats(t *testing.T) { } } + //fmt.Println("update") + time.Sleep(time.Second * 3) for _, k := range keys { @@ -66,11 +72,15 @@ func TestBasicStats(t *testing.T) { } } + //fmt.Println("get testAndSet") + for _, k := range keys { s.Watch(k, false, 0, i, 1) watcher_number++ } + //fmt.Println("watch") + for _, k := range keys { _, err := s.Get(k, false, false, i, 1) if err != nil { @@ -91,6 +101,8 @@ func TestBasicStats(t *testing.T) { } } + //fmt.Println("get delete") + for _, k := range keys { _, err := s.Get(k, false, false, i, 1) if err != nil { diff --git a/store/store.go b/store/store.go index 51f49f51f..e6b8cf1b9 100644 --- a/store/store.go +++ b/store/store.go @@ -164,6 +164,7 @@ func (s *Store) Create(nodePath string, value string, expireTime time.Time, inde // If the node is a file, the value and the ttl can be updated. // If the node is a directory, only the ttl can be updated. func (s *Store) Update(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { + s.worldLock.RLock() defer s.worldLock.RUnlock() @@ -171,15 +172,16 @@ func (s *Store) Update(nodePath string, value string, expireTime time.Time, inde if err != nil { // if the node does not exist, return error s.Stats.Inc(UpdateFail) + return nil, err } e := newEvent(Update, nodePath, s.Index, s.Term) if n.IsDir() { // if the node is a directory, we can only update ttl - if len(value) != 0 { s.Stats.Inc(UpdateFail) + return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath) } @@ -195,7 +197,6 @@ func (s *Store) Update(nodePath string, value string, expireTime time.Time, inde // update ttl n.UpdateTTL(expireTime, s) - e.Expiration = &n.ExpireTime e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1 diff --git a/store/store_test.go b/store/store_test.go index 01a934d4a..a16e57032 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -320,88 +320,88 @@ func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ? func TestWatch(t *testing.T) { s := New() // watch at a deeper path - c, _ := s.WatcherHub.watch("/foo/foo/foo", false, 0) + c, _ := s.Watch("/foo/foo/foo", false, 0, 0, 1) s.Create("/foo/foo/foo", "bar", Permanent, 1, 1) e := nonblockingRetrive(c) - if e.Key != "/foo/foo/foo" { - t.Fatal("watch for Create node fails") + if e.Key != "/foo/foo/foo" || e.Action != Create { + t.Fatal("watch for Create node fails ", e) } - c, _ = s.WatcherHub.watch("/foo/foo/foo", false, 0) + c, _ = s.Watch("/foo/foo/foo", false, 0, 1, 1) s.Update("/foo/foo/foo", "car", Permanent, 2, 1) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/foo" { - t.Fatal("watch for Update node fails") + if e.Key != "/foo/foo/foo" || e.Action != Update { + t.Fatal("watch for Update node fails ", e) } - c, _ = s.WatcherHub.watch("/foo/foo/foo", false, 0) + c, _ = s.Watch("/foo/foo/foo", false, 0, 2, 1) s.TestAndSet("/foo/foo/foo", "car", 0, "bar", Permanent, 3, 1) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/foo" { + if e.Key != "/foo/foo/foo" || e.Action != TestAndSet { t.Fatal("watch for TestAndSet node fails") } - c, _ = s.WatcherHub.watch("/foo/foo/foo", false, 0) + c, _ = s.Watch("/foo/foo/foo", false, 0, 3, 1) s.Delete("/foo", true, 4, 1) //recursively delete e = nonblockingRetrive(c) - if e.Key != "/foo" { - t.Fatal("watch for Delete node fails") + if e.Key != "/foo" || e.Action != Delete { + t.Fatal("watch for Delete node fails ", e) } // watch at a prefix - c, _ = s.WatcherHub.watch("/foo", true, 0) + c, _ = s.Watch("/foo", true, 0, 4, 1) s.Create("/foo/foo/boo", "bar", Permanent, 5, 1) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/boo" { + if e.Key != "/foo/foo/boo" || e.Action != Create { t.Fatal("watch for Create subdirectory fails") } - c, _ = s.WatcherHub.watch("/foo", true, 0) + c, _ = s.Watch("/foo", true, 0, 5, 1) s.Update("/foo/foo/boo", "foo", Permanent, 6, 1) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/boo" { + if e.Key != "/foo/foo/boo" || e.Action != Update { t.Fatal("watch for Update subdirectory fails") } - c, _ = s.WatcherHub.watch("/foo", true, 0) + c, _ = s.Watch("/foo", true, 0, 6, 1) s.TestAndSet("/foo/foo/boo", "foo", 0, "bar", Permanent, 7, 1) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/boo" { + if e.Key != "/foo/foo/boo" || e.Action != TestAndSet { t.Fatal("watch for TestAndSet subdirectory fails") } - c, _ = s.WatcherHub.watch("/foo", true, 0) + c, _ = s.Watch("/foo", true, 0, 7, 1) s.Delete("/foo/foo/boo", false, 8, 1) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/boo" { + if e.Key != "/foo/foo/boo" || e.Action != Delete { t.Fatal("watch for Delete subdirectory fails") } // watch expire s.Create("/foo/foo/boo", "foo", time.Now().Add(time.Second*1), 9, 1) - c, _ = s.WatcherHub.watch("/foo", true, 0) + c, _ = s.Watch("/foo", true, 0, 9, 1) time.Sleep(time.Second * 2) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/boo" || e.Index != 9 { + if e.Key != "/foo/foo/boo" || e.Action != Expire || e.Index != 9 { t.Fatal("watch for Expiration of Create() subdirectory fails ", e) } s.Create("/foo/foo/boo", "foo", Permanent, 10, 1) s.Update("/foo/foo/boo", "bar", time.Now().Add(time.Second*1), 11, 1) - c, _ = s.WatcherHub.watch("/foo", true, 0) + c, _ = s.Watch("/foo", true, 0, 11, 1) time.Sleep(time.Second * 2) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/boo" || e.Index != 11 { + if e.Key != "/foo/foo/boo" || e.Action != Expire || e.Index != 11 { t.Fatal("watch for Expiration of Update() subdirectory fails ", e) } s.Create("/foo/foo/boo", "foo", Permanent, 12, 1) s.TestAndSet("/foo/foo/boo", "foo", 0, "bar", time.Now().Add(time.Second*1), 13, 1) - c, _ = s.WatcherHub.watch("/foo", true, 0) + c, _ = s.Watch("/foo", true, 0, 13, 1) time.Sleep(time.Second * 2) e = nonblockingRetrive(c) - if e.Key != "/foo/foo/boo" || e.Index != 13 { + if e.Key != "/foo/foo/boo" || e.Action != Expire || e.Index != 13 { t.Fatal("watch for Expiration of TestAndSet() subdirectory fails ", e) } @@ -479,21 +479,12 @@ func TestSaveAndRecover(t *testing.T) { panic(err) } } - + s.worldLock.RLock() + defer s.worldLock.RUnlock() if s.WatcherHub.EventHistory.StartIndex != cloneFs.WatcherHub.EventHistory.StartIndex { t.Fatal("Error recovered event history start index") } - //t.Log("watcherhub.size: ", s.WatcherHub.EventHistory.Queue.Size) - //for i = 0; int(i) < s.WatcherHub.EventHistory.Queue.Size; i++ { - // t.Log(s.WatcherHub.EventHistory.Queue.Events[i]) - //} - // - //t.Log("ClonedWatcherhub.size: ", cloneFs.WatcherHub.EventHistory.Queue.Size) - //for i = 0; int(i) < cloneFs.WatcherHub.EventHistory.Queue.Size; i++ { - // t.Log(cloneFs.WatcherHub.EventHistory.Queue.Events[i]) - //} - for i = 0; int(i) < cloneFs.WatcherHub.EventHistory.Queue.Size; i++ { if s.WatcherHub.EventHistory.Queue.Events[i].Key != cloneFs.WatcherHub.EventHistory.Queue.Events[i].Key { diff --git a/store/watcher.go b/store/watcher.go index d67c713df..7576a866b 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -47,7 +47,7 @@ func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan w := &watcher{ eventChan: eventChan, recursive: recursive, - sinceIndex: index, + sinceIndex: index - 1, // to catch Expire() } l, ok := wh.watchers[prefix] diff --git a/store/watcher_test.go b/store/watcher_test.go index 90c23c59e..e437422ad 100644 --- a/store/watcher_test.go +++ b/store/watcher_test.go @@ -7,8 +7,7 @@ import ( func TestWatcher(t *testing.T) { s := New() wh := s.WatcherHub - c, err := wh.watch("/foo", true, 0) - + c, err := wh.watch("/foo", true, 1) if err != nil { t.Fatal("%v", err) } @@ -20,7 +19,7 @@ func TestWatcher(t *testing.T) { // do nothing } - e := newEvent(Create, "/foo/bar", 1, 0) + e := newEvent(Create, "/foo/bar", 1, 1) wh.notify(e) @@ -30,20 +29,20 @@ func TestWatcher(t *testing.T) { t.Fatal("recv != send") } - c, _ = wh.watch("/foo", false, 0) + c, _ = wh.watch("/foo", false, 2) - e = newEvent(Create, "/foo/bar", 1, 0) + e = newEvent(Create, "/foo/bar", 2, 1) wh.notify(e) select { - case <-c: - t.Fatal("should not receive from channel if not recursive") + case re = <-c: + t.Fatal("should not receive from channel if not recursive ", re) default: // do nothing } - e = newEvent(Create, "/foo", 1, 0) + e = newEvent(Create, "/foo", 3, 1) wh.notify(e)