add LastIndex and LastTerm in EventHistory

release-0.4
evan-gu 2013-09-30 12:18:28 -04:00
parent 3ae316ac38
commit 0959448855
3 changed files with 25 additions and 22 deletions

View File

@ -90,6 +90,8 @@ func (eq *eventQueue) insert(e *Event) {
type EventHistory struct {
Queue eventQueue
StartIndex uint64
LastIndex uint64
LastTerm uint64
rwl sync.RWMutex
}
@ -103,29 +105,42 @@ func newEventHistory(capacity int) *EventHistory {
}
// addEvent function adds event into the eventHistory
func (eh *EventHistory) addEvent(e *Event) {
func (eh *EventHistory) addEvent(e *Event) *Event {
eh.rwl.Lock()
defer eh.rwl.Unlock()
if e.Index == 0 {
e.Index = eh.LastIndex
}
if e.Term == 0 {
e.Term = eh.LastTerm
}
eh.Queue.insert(e)
eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index
eh.LastIndex = e.Index
eh.LastTerm = e.Term
return e
}
// addEvent with the last event's index and term
func (eh *EventHistory) addEventWithouIndex(action, key string) (e *Event) {
/*func (eh *EventHistory) addEventWithouIndex(action, key string) (e *Event) {
eh.rwl.Lock()
defer eh.rwl.Unlock()
LastEvent := eh.Queue.Events[eh.Queue.back()]
e = newEvent(action, key, LastEvent.Index, LastEvent.Term)
e = newEvent(action, key, LastEvent.Index, LastEvent.Term);
eh.Queue.insert(e)
eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index
return e
}
return e;
}*/
// scan function is enumerating events from the index in history and
// stops till the first point where the key has identified prefix

View File

@ -265,10 +265,13 @@ func (n *Node) Expire(s *Store) {
select {
// if timeout, delete the node
case <-time.After(duration):
e := newEvent(Expire, n.Path, 0, 0)
n.Remove(true, nil)
s.Stats.Inc(ExpireCount)
s.WatcherHub.notifyWithoutIndex(Expire, n.Path)
s.WatcherHub.notify(e)
return

View File

@ -104,22 +104,7 @@ func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) {
}
func (wh *watcherHub) notify(e *Event) {
segments := strings.Split(e.Key, "/")
currPath := "/"
// walk through all the paths
for _, segment := range segments {
currPath = path.Join(currPath, segment)
wh.notifyWithPath(e, currPath, false)
}
wh.EventHistory.addEvent(e)
}
// notify with last event's index and term
func (wh *watcherHub) notifyWithoutIndex(action, key string) {
e := wh.EventHistory.addEventWithouIndex(action, key)
e = wh.EventHistory.addEvent(e)
segments := strings.Split(e.Key, "/")