Merge pull request #74 from philips/cherry-pick-mistobaan

cherry pick out Mistobaan's patches
release-0.4
Brandon Philips 2013-08-08 16:53:26 -07:00
commit 28685651bb
4 changed files with 16 additions and 24 deletions

View File

@ -216,7 +216,7 @@ func main() {
if argInfo.WebPort != -1 {
// start web
etcdStore.SetMessager(&storeMsg)
etcdStore.SetMessager(storeMsg)
go webHelper()
go web.Start(raftServer, argInfo.WebPort)
}

View File

@ -35,7 +35,7 @@ type Store struct {
// The string channel to send messages to the outside world
// Now we use it to send changes to the hub of the web service
messager *chan string
messager chan<- string
// A map to keep the recent response to the clients
ResponseMap map[string]*Response
@ -141,7 +141,7 @@ func CreateStore(max int) *Store {
}
// Set the messager of the store
func (s *Store) SetMessager(messager *chan string) {
func (s *Store) SetMessager(messager chan<- string) {
s.messager = messager
}
@ -205,7 +205,7 @@ func (s *Store) internalSet(key string, value string, expireTime time.Time, inde
} else {
// If we want the permanent node to have expire time
// We need to create create a go routine with a channel
// We need to create a go routine with a channel
if isExpire {
node.update = make(chan time.Time)
go s.monitorExpiration(key, node.update, expireTime)
@ -224,8 +224,7 @@ func (s *Store) internalSet(key string, value string, expireTime time.Time, inde
// Send to the messager
if s.messager != nil && err == nil {
*s.messager <- string(msg)
s.messager <- string(msg)
}
s.addToResponseMap(index, &resp)
@ -257,8 +256,7 @@ func (s *Store) internalSet(key string, value string, expireTime time.Time, inde
// Send to the messager
if s.messager != nil && err == nil {
*s.messager <- string(msg)
s.messager <- string(msg)
}
s.addToResponseMap(index, &resp)
@ -440,8 +438,7 @@ func (s *Store) internalDelete(key string, index uint64) ([]byte, error) {
// notify the messager
if s.messager != nil && err == nil {
*s.messager <- string(msg)
s.messager <- string(msg)
}
s.addToResponseMap(index, &resp)
@ -486,7 +483,7 @@ func (s *Store) TestAndSet(key string, prevValue string, value string, expireTim
// The watchHub will send response to the channel when any key under the prefix
// changes [since the sinceIndex if given]
func (s *Store) AddWatcher(prefix string, watcher *Watcher, sinceIndex uint64) error {
return s.watcher.addWatcher(prefix, watcher, sinceIndex, s.ResponseStartIndex, s.Index, &s.ResponseMap)
return s.watcher.addWatcher(prefix, watcher, sinceIndex, s.ResponseStartIndex, s.Index, s.ResponseMap)
}
// This function should be created as a go routine to delete the key-value pair
@ -526,8 +523,7 @@ func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime
// notify the messager
if s.messager != nil && err == nil {
*s.messager <- string(msg)
s.messager <- string(msg)
}
return

View File

@ -36,14 +36,14 @@ func NewWatcher() *Watcher {
// Add a watcher to the watcherHub
func (w *WatcherHub) addWatcher(prefix string, watcher *Watcher, sinceIndex uint64,
responseStartIndex uint64, currentIndex uint64, resMap *map[string]*Response) error {
responseStartIndex uint64, currentIndex uint64, resMap map[string]*Response) error {
prefix = path.Clean("/" + prefix)
if sinceIndex != 0 && sinceIndex >= responseStartIndex {
for i := sinceIndex; i <= currentIndex; i++ {
if checkResponse(prefix, i, resMap) {
watcher.C <- (*resMap)[strconv.FormatUint(i, 10)]
watcher.C <- resMap[strconv.FormatUint(i, 10)]
return nil
}
}
@ -52,22 +52,18 @@ func (w *WatcherHub) addWatcher(prefix string, watcher *Watcher, sinceIndex uint
_, ok := w.watchers[prefix]
if !ok {
w.watchers[prefix] = make([]*Watcher, 0)
w.watchers[prefix] = append(w.watchers[prefix], watcher)
} else {
w.watchers[prefix] = append(w.watchers[prefix], watcher)
}
w.watchers[prefix] = append(w.watchers[prefix], watcher)
return nil
}
// Check if the response has what we are watching
func checkResponse(prefix string, index uint64, resMap *map[string]*Response) bool {
func checkResponse(prefix string, index uint64, resMap map[string]*Response) bool {
resp, ok := (*resMap)[strconv.FormatUint(index, 10)]
resp, ok := resMap[strconv.FormatUint(index, 10)]
if !ok {
// not storage system command

View File

@ -54,7 +54,7 @@ func TestWatch(t *testing.T) {
}
}
// BenchmarkWatch creates 10K watchers watch at /foo/[paht] each time.
// BenchmarkWatch creates 10K watchers watch at /foo/[path] each time.
// Path is randomly chosen with max depth 10.
// It should take less than 15ms to wake up 10K watchers.
func BenchmarkWatch(b *testing.B) {