etcd/store/watcher.go

130 lines
2.7 KiB
Go
Raw Normal View History

2013-06-18 22:14:22 +04:00
package store
import (
"path"
2013-06-30 21:09:05 +04:00
"strconv"
2013-06-18 22:14:22 +04:00
"strings"
2013-06-21 02:59:23 +04:00
)
2013-07-16 21:44:09 +04:00
2013-07-10 00:14:12 +04:00
//------------------------------------------------------------------------------
//
// Typedefs
//
//------------------------------------------------------------------------------
2013-06-18 22:14:22 +04:00
2013-07-10 00:14:12 +04:00
// WatcherHub is where the client register its watcher
type WatcherHub struct {
2013-07-10 00:14:12 +04:00
watchers map[string][]*Watcher
}
2013-07-10 00:14:12 +04:00
// Currently watcher only contains a response channel
type Watcher struct {
C chan *Response
2013-06-18 22:14:22 +04:00
}
2013-07-10 00:14:12 +04:00
// Create a new watcherHub
2013-08-05 04:17:40 +04:00
func newWatcherHub() *WatcherHub {
w := new(WatcherHub)
2013-07-10 00:14:12 +04:00
w.watchers = make(map[string][]*Watcher)
2013-06-18 22:14:22 +04:00
return w
}
2013-07-10 00:14:12 +04:00
// Create a new watcher
2013-08-05 04:17:40 +04:00
func NewWatcher() *Watcher {
return &Watcher{C: make(chan *Response, 1)}
2013-06-18 22:14:22 +04:00
}
2013-07-10 00:14:12 +04:00
// Add a watcher to the watcherHub
2013-07-16 21:40:41 +04:00
func (w *WatcherHub) addWatcher(prefix string, watcher *Watcher, sinceIndex uint64,
2013-08-07 07:00:50 +04:00
responseStartIndex uint64, currentIndex uint64, resMap map[string]*Response) error {
2013-06-18 22:14:22 +04:00
2013-07-10 00:14:12 +04:00
prefix = path.Clean("/" + prefix)
2013-06-18 22:14:22 +04:00
2013-07-10 00:14:12 +04:00
if sinceIndex != 0 && sinceIndex >= responseStartIndex {
for i := sinceIndex; i <= currentIndex; i++ {
if checkResponse(prefix, i, resMap) {
2013-08-07 07:00:50 +04:00
watcher.C <- resMap[strconv.FormatUint(i, 10)]
2013-06-30 02:29:10 +04:00
return nil
}
}
}
_, ok := w.watchers[prefix]
2013-06-18 22:14:22 +04:00
if !ok {
2013-07-10 00:14:12 +04:00
w.watchers[prefix] = make([]*Watcher, 0)
2013-06-18 22:14:22 +04:00
}
2013-08-07 07:03:32 +04:00
w.watchers[prefix] = append(w.watchers[prefix], watcher)
2013-06-18 22:14:22 +04:00
return nil
}
2013-07-10 00:14:12 +04:00
// Check if the response has what we are watching
2013-08-07 07:00:50 +04:00
func checkResponse(prefix string, index uint64, resMap map[string]*Response) bool {
2013-06-30 02:29:10 +04:00
2013-08-07 07:00:50 +04:00
resp, ok := resMap[strconv.FormatUint(index, 10)]
2013-06-30 02:29:10 +04:00
2013-06-30 21:09:05 +04:00
if !ok {
// not storage system command
2013-06-30 02:29:10 +04:00
return false
2013-06-30 21:09:05 +04:00
} else {
path := resp.Key
if strings.HasPrefix(path, prefix) {
prefixLen := len(prefix)
if len(path) == prefixLen || path[prefixLen] == '/' {
return true
}
2013-06-30 02:29:10 +04:00
}
}
return false
}
2013-07-10 00:14:12 +04:00
// Notify the watcher a action happened
func (w *WatcherHub) notify(resp Response) error {
resp.Key = path.Clean(resp.Key)
segments := strings.Split(resp.Key, "/")
2013-06-18 22:14:22 +04:00
currPath := "/"
// walk through all the pathes
for _, segment := range segments {
currPath = path.Join(currPath, segment)
watchers, ok := w.watchers[currPath]
2013-06-18 22:14:22 +04:00
if ok {
2013-07-10 00:14:12 +04:00
newWatchers := make([]*Watcher, 0)
2013-06-18 22:14:22 +04:00
// notify all the watchers
for _, watcher := range watchers {
watcher.C <- &resp
2013-06-18 22:14:22 +04:00
}
2013-06-21 02:59:23 +04:00
if len(newWatchers) == 0 {
// we have notified all the watchers at this path
// delete the map
delete(w.watchers, currPath)
} else {
w.watchers[currPath] = newWatchers
}
2013-06-18 22:14:22 +04:00
}
}
return nil
2013-06-21 02:59:23 +04:00
}
// stopWatchers stops all the watchers
// This function is used when the etcd recovery from a snapshot at runtime
func (w *WatcherHub) stopWatchers() {
2013-08-04 10:30:15 +04:00
for _, subWatchers := range w.watchers {
for _, watcher := range subWatchers {
watcher.C <- nil
}
}
w.watchers = nil
2013-08-04 10:30:15 +04:00
}