add notification struct

release-0.4
Xiang Li 2013-06-07 11:35:49 -07:00
parent 1f57788f16
commit f8ca35fd77
3 changed files with 53 additions and 23 deletions

View File

@ -26,14 +26,16 @@ func (s *Store) Set(key string, value string) (string, bool) {
if ok {
s.Nodes[key] = value
return oldValue, true
} else {
s.Nodes[key] = value
return "", false
}
}
// get the node of the key
// get the value of the key
func (s *Store) Get(key string) (string, error) {
key = path.Clean(key)
@ -60,6 +62,7 @@ func (s *Store) Delete(key string) (string, error) {
}
}
// save the current state of the storage system
func (s *Store) Save() ([]byte, error) {
b, err := json.Marshal(s)
if err != nil {
@ -68,6 +71,7 @@ func (s *Store) Save() ([]byte, error) {
return b, nil
}
// recovery the state of the stroage system from a previous state
func (s *Store) Recovery(state []byte) error {
err := json.Unmarshal(state, s)
return err

View File

@ -6,43 +6,44 @@ import (
"fmt"
)
// CONSTANTS
type Watcher struct {
chanMap map[string][]chan int
chanMap map[string][]chan Notification
}
type Notification struct {
action int
key string
oldValue string
newValue string
}
func createWatcher() *Watcher {
w := new(Watcher)
w.chanMap = make(map[string][]chan int)
w.chanMap = make(map[string][]chan Notification)
return w
}
func (w *Watcher) add(prefix string, c chan int) error {
func (w *Watcher) add(prefix string, c chan Notification, f func(chan Notification)) error {
prefix = path.Clean(prefix)
fmt.Println("Add ", prefix)
_, ok := w.chanMap[prefix]
if !ok {
w.chanMap[prefix] = make([]chan int, 0)
w.chanMap[prefix] = make([]chan Notification, 0)
w.chanMap[prefix] = append(w.chanMap[prefix], c)
} else {
w.chanMap[prefix] = append(w.chanMap[prefix], c)
}
fmt.Println(len(w.chanMap[prefix]), "@", prefix)
go wait(c)
go f(c)
return nil
}
func wait(c chan int) {
result := <-c
if result == 0 {
fmt.Println("yes")
} else {
fmt.Println("no")
}
}
func (w *Watcher) notify(action int, key string, oldValue string, newValue string) error {
key = path.Clean(key)
@ -50,17 +51,30 @@ func (w *Watcher) notify(action int, key string, oldValue string, newValue strin
currPath := "/"
// walk through all the pathes
for _, segment := range segments {
currPath := path.Join(currPath, segment)
fmt.Println(currPath)
chans, ok := w.chanMap[currPath]
if ok {
fmt.Println("found ", currPath)
n := Notification {action, key, oldValue, newValue}
// notify all the watchers
for _, c := range chans {
c <- 0
c <- n
}
// we have notified all the watchers at this path
// delete the map
delete(w.chanMap, currPath)
}
}
return nil
}

View File

@ -2,13 +2,25 @@ package raftd
import (
"testing"
"fmt"
)
func TestWatch(t *testing.T) {
watcher := createWatcher()
c := make(chan int)
d := make(chan int)
watcher.add("/prefix/", c)
watcher.add("/prefix/", d)
c := make(chan Notification)
d := make(chan Notification)
watcher.add("/", c, say)
watcher.add("/prefix/", d, say)
watcher.notify(0, "/prefix/hihihi", "1", "1")
}
}
func say(c chan Notification) {
result := <-c
if result.action != -1 {
fmt.Println("yes")
} else {
fmt.Println("no")
}
}