etcd/store/watcher_hub.go

201 lines
5.8 KiB
Go
Raw Normal View History

2016-05-13 06:51:48 +03:00
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2013-10-08 09:17:58 +04:00
package store
import (
"container/list"
"path"
"strings"
"sync"
2013-10-08 09:17:58 +04:00
"sync/atomic"
etcdErr "github.com/coreos/etcd/error"
)
// A watcherHub contains all subscribed watchers
// watchers is a map with watched path as key and watcher as value
// EventHistory keeps the old events for watcherHub. It is used to help
// watcher to get a continuous event history. Or a watcher might miss the
// event happens between the end of the first watch command and the start
// of the second command.
type watcherHub struct {
// count must be the first element to keep 64-bit alignment for atomic
// access
count int64 // current number of watchers.
mutex sync.Mutex
watchers map[string]*list.List
EventHistory *EventHistory
2013-10-08 09:17:58 +04:00
}
2016-02-01 08:42:39 +03:00
// newWatchHub creates a watcherHub. The capacity determines how many events we will
2013-10-08 09:17:58 +04:00
// keep in the eventHistory.
// Typically, we only need to keep a small size of history[smaller than 20K].
// Ideally, it should smaller than 20K/s[max throughput] * 2 * 50ms[RTT] = 2000
func newWatchHub(capacity int) *watcherHub {
return &watcherHub{
watchers: make(map[string]*list.List),
EventHistory: newEventHistory(capacity),
2013-10-08 09:17:58 +04:00
}
}
2014-09-09 03:56:10 +04:00
// Watch function returns a Watcher.
// If recursive is true, the first change after index under key will be sent to the event channel of the watcher.
// If recursive is false, the first change after index at key will be sent to the event channel of the watcher.
2013-10-08 09:17:58 +04:00
// If index is zero, watch will start from the current index + 1.
func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeIndex uint64) (Watcher, *etcdErr.Error) {
2015-06-17 16:32:13 +03:00
reportWatchRequest()
event, err := wh.EventHistory.scan(key, recursive, index)
2013-10-08 09:17:58 +04:00
if err != nil {
err.Index = storeIndex
2013-10-08 09:17:58 +04:00
return nil, err
}
2014-09-09 03:56:10 +04:00
w := &watcher{
eventChan: make(chan *Event, 100), // use a buffered channel
2013-12-26 18:06:15 +04:00
recursive: recursive,
stream: stream,
2013-12-26 18:06:15 +04:00
sinceIndex: index,
startIndex: storeIndex,
hub: wh,
2013-12-26 18:06:15 +04:00
}
2013-11-07 09:19:37 +04:00
wh.mutex.Lock()
defer wh.mutex.Unlock()
// If the event exists in the known history, append the EtcdIndex and return immediately
if event != nil {
ne := event.Clone()
ne.EtcdIndex = storeIndex
w.eventChan <- ne
2013-12-26 18:06:15 +04:00
return w, nil
2013-10-08 09:17:58 +04:00
}
l, ok := wh.watchers[key]
2013-10-08 09:17:58 +04:00
2013-12-26 18:06:15 +04:00
var elem *list.Element
2013-10-08 09:17:58 +04:00
if ok { // add the new watcher to the back of the list
2013-12-26 18:06:15 +04:00
elem = l.PushBack(w)
2013-10-08 09:17:58 +04:00
} else { // create a new list and add the new watcher
2013-12-26 18:06:15 +04:00
l = list.New()
elem = l.PushBack(w)
wh.watchers[key] = l
2013-10-08 09:17:58 +04:00
}
w.remove = func() {
2014-09-09 03:56:10 +04:00
if w.removed { // avoid removing it twice
return
}
w.removed = true
2013-12-26 18:06:15 +04:00
l.Remove(elem)
atomic.AddInt64(&wh.count, -1)
2015-06-17 16:32:13 +03:00
reportWatcherRemoved()
2013-12-26 18:06:15 +04:00
if l.Len() == 0 {
delete(wh.watchers, key)
}
}
2013-10-08 09:17:58 +04:00
atomic.AddInt64(&wh.count, 1)
2015-06-17 16:32:13 +03:00
reportWatcherAdded()
2013-10-08 09:17:58 +04:00
2013-12-26 18:06:15 +04:00
return w, nil
2013-10-08 09:17:58 +04:00
}
func (wh *watcherHub) add(e *Event) {
e = wh.EventHistory.addEvent(e)
}
2013-10-08 09:17:58 +04:00
// notify function accepts an event and notify to the watchers.
func (wh *watcherHub) notify(e *Event) {
e = wh.EventHistory.addEvent(e) // add event into the eventHistory
2013-11-28 08:04:52 +04:00
segments := strings.Split(e.Node.Key, "/")
2013-10-08 09:17:58 +04:00
currPath := "/"
// walk through all the segments of the path and notify the watchers
// if the path is "/foo/bar", it will notify watchers with path "/",
// "/foo" and "/foo/bar"
for _, segment := range segments {
currPath = path.Join(currPath, segment)
// notify the watchers who interests in the changes of current path
wh.notifyWatchers(e, currPath, false)
}
}
func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) {
wh.mutex.Lock()
defer wh.mutex.Unlock()
l, ok := wh.watchers[nodePath]
2013-10-08 09:17:58 +04:00
if ok {
curr := l.Front()
for curr != nil {
2013-10-08 09:17:58 +04:00
next := curr.Next() // save reference to the next one in the list
2014-09-09 03:56:10 +04:00
w, _ := curr.Value.(*watcher)
2013-10-08 09:17:58 +04:00
originalPath := (e.Node.Key == nodePath)
if (originalPath || !isHidden(nodePath, e.Node.Key)) && w.notify(e, originalPath, deleted) {
if !w.stream { // do not remove the stream watcher
// if we successfully notify a watcher
// we need to remove the watcher from the list
// and decrease the counter
2015-04-03 20:13:43 +03:00
w.removed = true
l.Remove(curr)
atomic.AddInt64(&wh.count, -1)
2015-06-17 16:32:13 +03:00
reportWatcherRemoved()
}
2013-10-08 09:17:58 +04:00
}
curr = next // update current to the next element in the list
}
if l.Len() == 0 {
// if we have notified all watcher in the list
// we can delete the list
delete(wh.watchers, nodePath)
2013-10-08 09:17:58 +04:00
}
}
}
// clone function clones the watcherHub and return the cloned one.
// only clone the static content. do not clone the current watchers.
func (wh *watcherHub) clone() *watcherHub {
clonedHistory := wh.EventHistory.clone()
return &watcherHub{
EventHistory: clonedHistory,
}
}
// isHidden checks to see if key path is considered hidden to watch path i.e. the
// last element is hidden or it's within a hidden directory
func isHidden(watchPath, keyPath string) bool {
// When deleting a directory, watchPath might be deeper than the actual keyPath
// For example, when deleting /foo we also need to notify watchers on /foo/bar.
if len(watchPath) > len(keyPath) {
return false
}
// if watch path is just a "/", after path will start without "/"
// add a "/" to deal with the special case when watchPath is "/"
afterPath := path.Clean("/" + keyPath[len(watchPath):])
return strings.Contains(afterPath, "/_")
}