From 0959448855580976939704cd6f72686cc7c3ebd0 Mon Sep 17 00:00:00 2001 From: evan-gu Date: Mon, 30 Sep 2013 12:18:28 -0400 Subject: [PATCH] add LastIndex and LastTerm in EventHistory --- store/event.go | 25 ++++++++++++++++++++----- store/node.go | 5 ++++- store/watcher.go | 17 +---------------- 3 files changed, 25 insertions(+), 22 deletions(-) diff --git a/store/event.go b/store/event.go index 29cec3f12..4ca6f5e58 100644 --- a/store/event.go +++ b/store/event.go @@ -90,6 +90,8 @@ func (eq *eventQueue) insert(e *Event) { type EventHistory struct { Queue eventQueue StartIndex uint64 + LastIndex uint64 + LastTerm uint64 rwl sync.RWMutex } @@ -103,29 +105,42 @@ func newEventHistory(capacity int) *EventHistory { } // addEvent function adds event into the eventHistory -func (eh *EventHistory) addEvent(e *Event) { +func (eh *EventHistory) addEvent(e *Event) *Event { eh.rwl.Lock() defer eh.rwl.Unlock() + if e.Index == 0 { + e.Index = eh.LastIndex + } + + if e.Term == 0 { + e.Term = eh.LastTerm + } + eh.Queue.insert(e) eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index + + eh.LastIndex = e.Index + eh.LastTerm = e.Term + + return e } // addEvent with the last event's index and term -func (eh *EventHistory) addEventWithouIndex(action, key string) (e *Event) { +/*func (eh *EventHistory) addEventWithouIndex(action, key string) (e *Event) { eh.rwl.Lock() defer eh.rwl.Unlock() LastEvent := eh.Queue.Events[eh.Queue.back()] - e = newEvent(action, key, LastEvent.Index, LastEvent.Term) + e = newEvent(action, key, LastEvent.Index, LastEvent.Term); eh.Queue.insert(e) eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index - return e -} + return e; +}*/ // scan function is enumerating events from the index in history and // stops till the first point where the key has identified prefix diff --git a/store/node.go b/store/node.go index f09c742c3..214a33fa0 100644 --- a/store/node.go +++ b/store/node.go @@ -265,10 +265,13 @@ func (n *Node) Expire(s *Store) { select { // if timeout, delete the node case <-time.After(duration): + e := newEvent(Expire, n.Path, 0, 0) + n.Remove(true, nil) s.Stats.Inc(ExpireCount) - s.WatcherHub.notifyWithoutIndex(Expire, n.Path) + + s.WatcherHub.notify(e) return diff --git a/store/watcher.go b/store/watcher.go index b039cd9a9..d67c713df 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -104,22 +104,7 @@ func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) { } func (wh *watcherHub) notify(e *Event) { - segments := strings.Split(e.Key, "/") - - currPath := "/" - - // walk through all the paths - for _, segment := range segments { - currPath = path.Join(currPath, segment) - wh.notifyWithPath(e, currPath, false) - } - - wh.EventHistory.addEvent(e) -} - -// notify with last event's index and term -func (wh *watcherHub) notifyWithoutIndex(action, key string) { - e := wh.EventHistory.addEventWithouIndex(action, key) + e = wh.EventHistory.addEvent(e) segments := strings.Split(e.Key, "/")