From ea4ab2a429c579b6766479e91df117b71769346f Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 6 Sep 2013 22:05:11 -0400 Subject: [PATCH 1/7] recursive watch --- file_system/watcher.go | 61 ++++++++++++++++++++++++++++++------- file_system/watcher_test.go | 26 +++++++++++++++- 2 files changed, 75 insertions(+), 12 deletions(-) diff --git a/file_system/watcher.go b/file_system/watcher.go index 237399dba..7cad8b15b 100644 --- a/file_system/watcher.go +++ b/file_system/watcher.go @@ -12,6 +12,11 @@ type watcherHub struct { EventHistory *EventHistory } +type watcher struct { + eventChan chan *Event + recursive bool +} + func newWatchHub(capacity int) *watcherHub { return &watcherHub{ watchers: make(map[string]*list.List), @@ -19,7 +24,11 @@ func newWatchHub(capacity int) *watcherHub { } } -func (wh *watcherHub) watch(prefix string, index uint64) (error, <-chan *Event) { +// watch function returns an Event channel. +// If recursive is true, the first change after index under prefix will be sent to the event channel. +// If recursive is false, the first change after index at prefix will be sent to the event channel. +// If index is zero, watch will start from the current index + 1. +func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (error, <-chan *Event) { eventChan := make(chan *Event, 1) e, err := wh.EventHistory.scan(prefix, index) @@ -33,13 +42,19 @@ func (wh *watcherHub) watch(prefix string, index uint64) (error, <-chan *Event) return nil, eventChan } + w := &watcher{ + eventChan: eventChan, + recursive: recursive, + } + l, ok := wh.watchers[prefix] - if ok { - l.PushBack(eventChan) - } else { + if ok { // add the new watcher to the back of the list + l.PushBack(w) + + } else { // create a new list and add the new watcher l := list.New() - l.PushBack(eventChan) + l.PushBack(w) wh.watchers[prefix] = l } @@ -59,15 +74,39 @@ func (wh *watcherHub) notify(e *Event) { if ok { + curr := l.Front() + notifiedAll := true + for { - element := l.Front() - if element == nil { - delete(wh.watchers, currPath) + + if curr == nil { // we have reached the end of the list + + if notifiedAll { + // if we have notified all watcher in the list + // we can delete the list + delete(wh.watchers, currPath) + } break } - c, _ := element.Value.(chan *Event) - c <- e - l.Remove(element) + + next := curr.Next() // save the next + + w, _ := curr.Value.(*watcher) + + if w.recursive { + w.eventChan <- e + l.Remove(curr) + } else { + if e.Key == currPath { // only notify the same path + w.eventChan <- e + l.Remove(curr) + } else { // we do not notify all watcher in the list + notifiedAll = false + } + } + + curr = next // go to the next one + } } diff --git a/file_system/watcher_test.go b/file_system/watcher_test.go index fcd3fd9b3..c63a489d7 100644 --- a/file_system/watcher_test.go +++ b/file_system/watcher_test.go @@ -6,7 +6,7 @@ import ( func TestWatch(t *testing.T) { wh := newWatchHub(100) - err, c := wh.watch("/foo", 0) + err, c := wh.watch("/foo", true, 0) if err != nil { t.Fatal("%v", err) @@ -28,4 +28,28 @@ func TestWatch(t *testing.T) { if e != re { t.Fatal("recv != send") } + + _, c = wh.watch("/foo", false, 0) + + e = newEvent(Set, "/foo/bar", 1, 0) + + wh.notify(e) + + select { + case <-c: + t.Fatal("should not receive from channel if not recursive") + default: + // do nothing + } + + e = newEvent(Set, "/foo", 1, 0) + + wh.notify(e) + + re = <-c + + if e != re { + t.Fatal("recv != send") + } + } From 907e39edec707806132ecaf75f8e1760b9d1a8b6 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 6 Sep 2013 23:01:11 -0400 Subject: [PATCH 2/7] update operation --- Documentation/etcd-file-system.md | 15 +++-- file_system/file_system.go | 101 ++++++++++++++++++------------ 2 files changed, 72 insertions(+), 44 deletions(-) diff --git a/Documentation/etcd-file-system.md b/Documentation/etcd-file-system.md index c2fc218a8..e4bcd8992 100644 --- a/Documentation/etcd-file-system.md +++ b/Documentation/etcd-file-system.md @@ -30,13 +30,20 @@ Besides the file and directory difference, all nodes have common attributes and - If the node is a directory, the child nodes of the directory will be returned. - If recursive is true, it will recursively get the nodes of the directory. -- **Set** (path, value[optional], ttl [optional]) +- **Create** (path, value[optional], ttl [optional]) - Set the value to a file. Set operation will help to create intermediate directories with no expiration time. - - If the value is given, set will create a file - - If the value is not given, set will crate a directory + Create a file. Create operation will help to create intermediate directories with no expiration time. + - If the file already exists, create will fail. + - If the value is given, set will create a file. + - If the value is not given, set will crate a directory. - If ttl is given, the node will be deleted when it expires. +- **Update** (path, value[optional], ttl [optional]) + + Update the content of the node. + - If the value is given, the value of the key will be updated. + - If ttl is given, the expiration time of the node will be updated. + - **Delete** (path, recursive) Delete the node of given path. diff --git a/file_system/file_system.go b/file_system/file_system.go index a0b7168b9..9149b9834 100644 --- a/file_system/file_system.go +++ b/file_system/file_system.go @@ -65,15 +65,69 @@ func (fs *FileSystem) Get(keyPath string, recusive bool, index uint64, term uint return e, nil } -func (fs *FileSystem) Set(keyPath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { +func (fs *FileSystem) Update(keyPath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { + n, err := fs.InternalGet(keyPath, index, term) + + if err != nil { // if node does not exist, return error + return nil, err + } + + e := newEvent(Set, keyPath, fs.Index, fs.Term) + + if n.IsDir() { // if the node is a directory, we can only update ttl + + if len(value) != 0 { + return nil, etcdErr.NewError(102, keyPath) + } + + if n.ExpireTime != Permanent && expireTime != Permanent { + n.stopExpire <- true + } + + } else { // if the node is a file, we can update value and ttl + e.PrevValue = n.Value + + if len(value) != 0 { + e.Value = value + } + + n.Write(value, index, term) + + if n.ExpireTime != Permanent && expireTime != Permanent { + n.stopExpire <- true + } + + } + + // update ttl + if expireTime != Permanent { + go n.Expire() + e.Expiration = &n.ExpireTime + e.TTL = int64(expireTime.Sub(time.Now()) / time.Second) + } + + return e, nil +} + +func (fs *FileSystem) Create(keyPath string, value string, expireTime time.Time, create bool, index uint64, term uint64) (*Event, error) { keyPath = path.Clean("/" + keyPath) - // update file system known index and term - fs.Index, fs.Term = index, term + // make sure we can create the node + _, err := fs.InternalGet(keyPath, index, term) - dir, name := path.Split(keyPath) + if err != nil { // key already exists + return nil, etcdErr.NewError(105, keyPath) + } - // walk through the keyPath and get the last directory node + etcdError, _ := err.(etcdErr.Error) + + if etcdError.ErrorCode == 104 { // we cannot create the key due to meet a file while walking + return nil, err + } + + dir, _ := path.Split(keyPath) + + // walk through the keyPath, create dirs and get the last directory node d, err := fs.walk(dir, fs.checkDir) if err != nil { @@ -83,28 +137,9 @@ func (fs *FileSystem) Set(keyPath string, value string, expireTime time.Time, in e := newEvent(Set, keyPath, fs.Index, fs.Term) e.Value = value - f, err := d.GetFile(name) + f := newFile(keyPath, value, fs.Index, fs.Term, d, "", expireTime) - if err == nil { - - if f != nil { // update previous file if exist - e.PrevValue = f.Value - f.Write(e.Value, index, term) - - // if the previous ExpireTime is not Permanent and expireTime is given - // we stop the previous expire routine - if f.ExpireTime != Permanent && expireTime != Permanent { - f.stopExpire <- true - } - } else { // create new file - - f = newFile(keyPath, value, fs.Index, fs.Term, d, "", expireTime) - - err = d.Add(f) - - } - - } + err = d.Add(f) if err != nil { return nil, err @@ -125,20 +160,6 @@ func (fs *FileSystem) TestAndSet(keyPath string, prevValue string, prevIndex uin if err != nil { - etcdError, _ := err.(etcdErr.Error) - if etcdError.ErrorCode == 100 { // file does not exist - - if prevValue == "" && prevIndex == 0 { // test against if prevValue is empty - fs.Set(keyPath, value, expireTime, index, term) - e := newEvent(TestAndSet, keyPath, index, term) - e.Value = value - return e, nil - } - - return nil, err - - } - return nil, err } From 4f99b6029183fe9f22089faf7d869a28a327f6f8 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 6 Sep 2013 23:24:01 -0400 Subject: [PATCH 3/7] update test --- file_system/file_system.go | 92 ++++++++++++++++----------------- file_system/file_system_test.go | 72 ++++++++++++++------------ 2 files changed, 86 insertions(+), 78 deletions(-) diff --git a/file_system/file_system.go b/file_system/file_system.go index 9149b9834..3049d2fc7 100644 --- a/file_system/file_system.go +++ b/file_system/file_system.go @@ -65,6 +65,52 @@ func (fs *FileSystem) Get(keyPath string, recusive bool, index uint64, term uint return e, nil } +func (fs *FileSystem) Create(keyPath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { + keyPath = path.Clean("/" + keyPath) + + // make sure we can create the node + _, err := fs.InternalGet(keyPath, index, term) + + if err == nil { // key already exists + return nil, etcdErr.NewError(105, keyPath) + } + + etcdError, _ := err.(etcdErr.Error) + + if etcdError.ErrorCode == 104 { // we cannot create the key due to meet a file while walking + return nil, err + } + + dir, _ := path.Split(keyPath) + + // walk through the keyPath, create dirs and get the last directory node + d, err := fs.walk(dir, fs.checkDir) + + if err != nil { + return nil, err + } + + e := newEvent(Set, keyPath, fs.Index, fs.Term) + e.Value = value + + f := newFile(keyPath, value, fs.Index, fs.Term, d, "", expireTime) + + err = d.Add(f) + + if err != nil { + return nil, err + } + + // Node with TTL + if expireTime != Permanent { + go f.Expire() + e.Expiration = &f.ExpireTime + e.TTL = int64(expireTime.Sub(time.Now()) / time.Second) + } + + return e, nil +} + func (fs *FileSystem) Update(keyPath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { n, err := fs.InternalGet(keyPath, index, term) @@ -109,52 +155,6 @@ func (fs *FileSystem) Update(keyPath string, value string, expireTime time.Time, return e, nil } -func (fs *FileSystem) Create(keyPath string, value string, expireTime time.Time, create bool, index uint64, term uint64) (*Event, error) { - keyPath = path.Clean("/" + keyPath) - - // make sure we can create the node - _, err := fs.InternalGet(keyPath, index, term) - - if err != nil { // key already exists - return nil, etcdErr.NewError(105, keyPath) - } - - etcdError, _ := err.(etcdErr.Error) - - if etcdError.ErrorCode == 104 { // we cannot create the key due to meet a file while walking - return nil, err - } - - dir, _ := path.Split(keyPath) - - // walk through the keyPath, create dirs and get the last directory node - d, err := fs.walk(dir, fs.checkDir) - - if err != nil { - return nil, err - } - - e := newEvent(Set, keyPath, fs.Index, fs.Term) - e.Value = value - - f := newFile(keyPath, value, fs.Index, fs.Term, d, "", expireTime) - - err = d.Add(f) - - if err != nil { - return nil, err - } - - // Node with TTL - if expireTime != Permanent { - go f.Expire() - e.Expiration = &f.ExpireTime - e.TTL = int64(expireTime.Sub(time.Now()) / time.Second) - } - - return e, nil -} - func (fs *FileSystem) TestAndSet(keyPath string, prevValue string, prevIndex uint64, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { f, err := fs.InternalGet(keyPath, index, term) diff --git a/file_system/file_system_test.go b/file_system/file_system_test.go index 0392cde54..8649c8e50 100644 --- a/file_system/file_system_test.go +++ b/file_system/file_system_test.go @@ -5,26 +5,43 @@ import ( "time" ) -func TestSetAndGet(t *testing.T) { +func TestCreateAndGet(t *testing.T) { fs := New() - setAndGet(fs, "/foobar", t) - setAndGet(fs, "/foo/bar", t) - setAndGet(fs, "/foo/foo/bar", t) + + // this should create successfully + createAndGet(fs, "/foobar", t) + createAndGet(fs, "/foo/bar", t) + createAndGet(fs, "/foo/foo/bar", t) + + // already exist, create should fail + _, err := fs.Create("/foobar", "bar", Permanent, 1, 1) + + if err == nil { + t.Fatal("Create should fail") + } + + // meet file, create should fail + _, err = fs.Create("/foo/bar/bar", "bar", Permanent, 1, 1) + + if err == nil { + t.Fatal("Create should fail") + } + } func TestUpdateFile(t *testing.T) { fs := New() - _, err := fs.Set("/foo/bar", "bar", Permanent, 1, 1) + _, err := fs.Create("/foo/bar", "bar", Permanent, 1, 1) if err != nil { - t.Fatalf("cannot set %s=bar [%s]", "/foo/bar", err.Error()) + t.Fatalf("cannot update %s=bar [%s]", "/foo/bar", err.Error()) } - _, err = fs.Set("/foo/bar", "barbar", Permanent, 2, 1) + _, err = fs.Update("/foo/bar", "barbar", Permanent, 2, 1) if err != nil { - t.Fatalf("cannot set %s=barbar [%s]", "/foo/bar", err.Error()) + t.Fatalf("cannot update %s=barbar [%s]", "/foo/bar", err.Error()) } e, err := fs.Get("/foo/bar", false, 2, 1) @@ -43,11 +60,11 @@ func TestListDirectory(t *testing.T) { // create dir /foo // set key-value /foo/foo=bar - fs.Set("/foo/foo", "bar", Permanent, 1, 1) + fs.Create("/foo/foo", "bar", Permanent, 1, 1) // create dir /foo/fooDir // set key-value /foo/fooDir/foo=bar - fs.Set("/foo/fooDir/foo", "bar", Permanent, 2, 1) + fs.Create("/foo/fooDir/foo", "bar", Permanent, 2, 1) e, err := fs.Get("/foo", true, 2, 1) @@ -74,7 +91,7 @@ func TestListDirectory(t *testing.T) { // create dir /foo/_hidden // set key-value /foo/_hidden/foo -> bar - fs.Set("/foo/_hidden/foo", "bar", Permanent, 3, 1) + fs.Create("/foo/_hidden/foo", "bar", Permanent, 3, 1) e, _ = fs.Get("/foo", false, 2, 1) @@ -86,7 +103,7 @@ func TestListDirectory(t *testing.T) { func TestRemove(t *testing.T) { fs := New() - fs.Set("/foo", "bar", Permanent, 1, 1) + fs.Create("/foo", "bar", Permanent, 1, 1) _, err := fs.Delete("/foo", false, 1, 1) if err != nil { @@ -99,9 +116,9 @@ func TestRemove(t *testing.T) { t.Fatalf("can get the node after deletion") } - fs.Set("/foo/bar", "bar", Permanent, 1, 1) - fs.Set("/foo/car", "car", Permanent, 1, 1) - fs.Set("/foo/dar/dar", "dar", Permanent, 1, 1) + fs.Create("/foo/bar", "bar", Permanent, 1, 1) + fs.Create("/foo/car", "car", Permanent, 1, 1) + fs.Create("/foo/dar/dar", "dar", Permanent, 1, 1) _, err = fs.Delete("/foo", false, 1, 1) @@ -128,7 +145,7 @@ func TestExpire(t *testing.T) { expire := time.Now().Add(time.Second) - fs.Set("/foo", "bar", expire, 1, 1) + fs.Create("/foo", "bar", expire, 1, 1) _, err := fs.InternalGet("/foo", 1, 1) @@ -144,7 +161,7 @@ func TestExpire(t *testing.T) { t.Fatalf("can get the node after expiration time") } - fs.Set("/foo", "bar", expire, 1, 1) + fs.Create("/foo", "bar", expire, 1, 1) time.Sleep(time.Millisecond * 50) _, err = fs.InternalGet("/foo", 1, 1) @@ -155,14 +172,14 @@ func TestExpire(t *testing.T) { expire = time.Now().Add(time.Second) - fs.Set("/foo", "bar", expire, 1, 1) + fs.Create("/foo", "bar", expire, 1, 1) fs.Delete("/foo", false, 1, 1) } func TestTestAndSet(t *testing.T) { fs := New() - fs.Set("/foo", "bar", Permanent, 1, 1) + fs.Create("/foo", "bar", Permanent, 1, 1) // test on wrong previous value _, err := fs.TestAndSet("/foo", "barbar", 0, "car", Permanent, 2, 1) @@ -191,23 +208,13 @@ func TestTestAndSet(t *testing.T) { if e.PrevValue != "car" || e.Value != "bar" { t.Fatalf("[%v/%v] [%v/%v]", e.PrevValue, "car", e.Value, "bar") } - - // test on empty previous value - e, err = fs.TestAndSet("/fooDir/foo", "", 0, "bar", Permanent, 4, 1) - if err != nil { - t.Fatal("test on empty node should be succeeded") - } - - if e.Key != "/fooDir/foo" || e.PrevValue != "" || e.Value != "bar" { - t.Fatalf("[%v/%v] [%v/%v] [%v/%v]", e.Key, "/fooDir/foo", e.PrevValue, "", e.Value, "bar") - } } -func setAndGet(fs *FileSystem, path string, t *testing.T) { - _, err := fs.Set(path, "bar", Permanent, 1, 1) +func createAndGet(fs *FileSystem, path string, t *testing.T) { + _, err := fs.Create(path, "bar", Permanent, 1, 1) if err != nil { - t.Fatalf("cannot set %s=bar [%s]", path, err.Error()) + t.Fatalf("cannot create %s=bar [%s]", path, err.Error()) } e, err := fs.Get(path, false, 1, 1) @@ -219,4 +226,5 @@ func setAndGet(fs *FileSystem, path string, t *testing.T) { if e.Value != "bar" { t.Fatalf("expect value of %s is bar [%s]", path, e.Value) } + } From 948044093b69cbf9a6aa0aea79d1b7489e5f6ad5 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 6 Sep 2013 23:36:11 -0400 Subject: [PATCH 4/7] support create directory --- file_system/file_system.go | 27 +++++++++++++++++++-------- file_system/file_system_test.go | 22 +++++++++++++++++++++- file_system/node.go | 5 +++-- 3 files changed, 43 insertions(+), 11 deletions(-) diff --git a/file_system/file_system.go b/file_system/file_system.go index 3049d2fc7..eb19e2ea8 100644 --- a/file_system/file_system.go +++ b/file_system/file_system.go @@ -19,7 +19,7 @@ type FileSystem struct { func New() *FileSystem { return &FileSystem{ - Root: newDir("/", 0, 0, nil, ""), + Root: newDir("/", 0, 0, nil, "", Permanent), WatcherHub: newWatchHub(1000), } @@ -36,6 +36,7 @@ func (fs *FileSystem) Get(keyPath string, recusive bool, index uint64, term uint e := newEvent(Get, keyPath, index, term) if n.IsDir() { // node is dir + e.Dir = true children, _ := n.List() e.KVPairs = make([]KeyValuePair, len(children)) @@ -57,7 +58,6 @@ func (fs *FileSystem) Get(keyPath string, recusive bool, index uint64, term uint // eliminate hidden nodes e.KVPairs = e.KVPairs[:i] - } else { // node is file e.Value = n.Value } @@ -91,11 +91,22 @@ func (fs *FileSystem) Create(keyPath string, value string, expireTime time.Time, } e := newEvent(Set, keyPath, fs.Index, fs.Term) - e.Value = value - f := newFile(keyPath, value, fs.Index, fs.Term, d, "", expireTime) + var n *Node - err = d.Add(f) + if len(value) != 0 { // create file + e.Value = value + + n = newFile(keyPath, value, fs.Index, fs.Term, d, "", expireTime) + + } else { // create directory + e.Dir = true + + n = newDir(keyPath, fs.Index, fs.Term, d, "", expireTime) + + } + + err = d.Add(n) if err != nil { return nil, err @@ -103,8 +114,8 @@ func (fs *FileSystem) Create(keyPath string, value string, expireTime time.Time, // Node with TTL if expireTime != Permanent { - go f.Expire() - e.Expiration = &f.ExpireTime + go n.Expire() + e.Expiration = &n.ExpireTime e.TTL = int64(expireTime.Sub(time.Now()) / time.Second) } @@ -268,7 +279,7 @@ func (fs *FileSystem) checkDir(parent *Node, dirName string) (*Node, error) { return subDir, nil } - n := newDir(path.Join(parent.Path, dirName), fs.Index, fs.Term, parent, parent.ACL) + n := newDir(path.Join(parent.Path, dirName), fs.Index, fs.Term, parent, parent.ACL, Permanent) parent.Children[dirName] = n diff --git a/file_system/file_system_test.go b/file_system/file_system_test.go index 8649c8e50..04f5ad9b6 100644 --- a/file_system/file_system_test.go +++ b/file_system/file_system_test.go @@ -21,12 +21,32 @@ func TestCreateAndGet(t *testing.T) { } // meet file, create should fail - _, err = fs.Create("/foo/bar/bar", "bar", Permanent, 1, 1) + _, err = fs.Create("/foo/bar/bar", "bar", Permanent, 2, 1) if err == nil { t.Fatal("Create should fail") } + // create a directory + _, err = fs.Create("/fooDir", "", Permanent, 3, 1) + + if err != nil { + t.Fatal("Cannot create /fooDir") + } + + e, err := fs.Get("/fooDir", false, 3, 1) + + if err != nil || e.Dir != true { + t.Fatal("Cannot create /fooDir ") + } + + // create a file under directory + _, err = fs.Create("/fooDir/bar", "bar", Permanent, 4, 1) + + if err != nil { + t.Fatal("Cannot create /fooDir/bar = bar") + } + } func TestUpdateFile(t *testing.T) { diff --git a/file_system/node.go b/file_system/node.go index 8c6765773..4d933f55a 100644 --- a/file_system/node.go +++ b/file_system/node.go @@ -49,7 +49,7 @@ func newFile(keyPath string, value string, createIndex uint64, createTerm uint64 } } -func newDir(keyPath string, createIndex uint64, createTerm uint64, parent *Node, ACL string) *Node { +func newDir(keyPath string, createIndex uint64, createTerm uint64, parent *Node, ACL string, expireTime time.Time) *Node { return &Node{ Path: keyPath, CreateIndex: createIndex, @@ -57,6 +57,7 @@ func newDir(keyPath string, createIndex uint64, createTerm uint64, parent *Node, Parent: parent, ACL: ACL, stopExpire: make(chan bool, 1), + ExpireTime: expireTime, Children: make(map[string]*Node), } } @@ -210,7 +211,7 @@ func (n *Node) Clone() *Node { return newFile(n.Path, n.Value, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL, n.ExpireTime) } - clone := newDir(n.Path, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL) + clone := newDir(n.Path, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL, n.ExpireTime) for key, child := range n.Children { clone.Children[key] = child.Clone() From bd8ec6d67b2a6223715cb454865f5090acc32dcd Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 7 Sep 2013 01:05:11 -0400 Subject: [PATCH 5/7] support watch delete --- file_system/file_system.go | 18 ++++--- file_system/file_system_test.go | 23 +++++++++ file_system/node.go | 18 +++++-- file_system/watcher.go | 88 ++++++++++++++++----------------- file_system/watcher_test.go | 4 +- 5 files changed, 94 insertions(+), 57 deletions(-) diff --git a/file_system/file_system.go b/file_system/file_system.go index eb19e2ea8..43f850275 100644 --- a/file_system/file_system.go +++ b/file_system/file_system.go @@ -198,12 +198,6 @@ func (fs *FileSystem) Delete(keyPath string, recurisive bool, index uint64, term return nil, err } - err = n.Remove(recurisive) - - if err != nil { - return nil, err - } - e := newEvent(Delete, keyPath, index, term) if n.IsDir() { @@ -212,6 +206,18 @@ func (fs *FileSystem) Delete(keyPath string, recurisive bool, index uint64, term e.PrevValue = n.Value } + callback := func(path string) { + fs.WatcherHub.notifyWithPath(e, path, true) + } + + err = n.Remove(recurisive, callback) + + if err != nil { + return nil, err + } + + fs.WatcherHub.notify(e) + return e, nil } diff --git a/file_system/file_system_test.go b/file_system/file_system_test.go index 04f5ad9b6..16666f614 100644 --- a/file_system/file_system_test.go +++ b/file_system/file_system_test.go @@ -230,6 +230,29 @@ func TestTestAndSet(t *testing.T) { } } +func TestWatchRemove(t *testing.T) { + fs := New() + fs.Create("/foo/foo/foo", "bar", Permanent, 1, 1) + + // watch at a deeper path + c, _ := fs.WatcherHub.watch("/foo/foo/foo", false, 0) + fs.Delete("/foo", true, 2, 1) + e := <-c + if e.Key != "/foo" { + t.Fatal("watch for delete fails") + } + + fs.Create("/foo/foo/foo", "bar", Permanent, 3, 1) + // watch at a prefix + c, _ = fs.WatcherHub.watch("/foo", true, 0) + fs.Delete("/foo/foo/foo", false, 4, 1) + e = <-c + if e.Key != "/foo/foo/foo" { + t.Fatal("watch for delete fails") + } + +} + func createAndGet(fs *FileSystem, path string, t *testing.T) { _, err := fs.Create(path, "bar", Permanent, 1, 1) diff --git a/file_system/node.go b/file_system/node.go index 4d933f55a..761266768 100644 --- a/file_system/node.go +++ b/file_system/node.go @@ -65,7 +65,7 @@ func newDir(keyPath string, createIndex uint64, createTerm uint64, parent *Node, // Remove function remove the node. // If the node is a directory and recursive is true, the function will recursively remove // add nodes under the receiver node. -func (n *Node) Remove(recursive bool) error { +func (n *Node) Remove(recursive bool, callback func(path string)) error { n.mu.Lock() defer n.mu.Unlock() @@ -80,6 +80,11 @@ func (n *Node) Remove(recursive bool) error { // This is the only pointer to Node object // Handled by garbage collector delete(n.Parent.Children, name) + + if callback != nil { + callback(n.Path) + } + n.stopExpire <- true n.status = removed } @@ -92,13 +97,18 @@ func (n *Node) Remove(recursive bool) error { } for _, child := range n.Children { // delete all children - child.Remove(true) + child.Remove(true, callback) } // delete self _, name := path.Split(n.Path) if n.Parent.Children[name] == n { delete(n.Parent.Children, name) + + if callback != nil { + callback(n.Path) + } + n.stopExpire <- true n.status = removed } @@ -235,14 +245,14 @@ func (n *Node) IsDir() bool { func (n *Node) Expire() { duration := n.ExpireTime.Sub(time.Now()) if duration <= 0 { - n.Remove(true) + n.Remove(true, nil) return } select { // if timeout, delete the node case <-time.After(duration): - n.Remove(true) + n.Remove(true, nil) return // if stopped, return diff --git a/file_system/watcher.go b/file_system/watcher.go index 7cad8b15b..c17d0eb87 100644 --- a/file_system/watcher.go +++ b/file_system/watcher.go @@ -28,18 +28,18 @@ func newWatchHub(capacity int) *watcherHub { // If recursive is true, the first change after index under prefix will be sent to the event channel. // If recursive is false, the first change after index at prefix will be sent to the event channel. // If index is zero, watch will start from the current index + 1. -func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (error, <-chan *Event) { +func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan *Event, error) { eventChan := make(chan *Event, 1) e, err := wh.EventHistory.scan(prefix, index) if err != nil { - return err, nil + return nil, err } if e != nil { eventChan <- e - return nil, eventChan + return eventChan, nil } w := &watcher{ @@ -58,57 +58,55 @@ func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (error, wh.watchers[prefix] = l } - return nil, eventChan + return eventChan, nil +} + +func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) { + l, ok := wh.watchers[path] + + if ok { + + curr := l.Front() + notifiedAll := true + + for { + + if curr == nil { // we have reached the end of the list + + if notifiedAll { + // if we have notified all watcher in the list + // we can delete the list + delete(wh.watchers, path) + } + break + } + + next := curr.Next() // save the next + + w, _ := curr.Value.(*watcher) + + if w.recursive || force || e.Key == path { + w.eventChan <- e + l.Remove(curr) + } else { + notifiedAll = false + } + + curr = next // go to the next one + + } + } } func (wh *watcherHub) notify(e *Event) { segments := strings.Split(e.Key, "/") + currPath := "/" // walk through all the paths for _, segment := range segments { currPath = path.Join(currPath, segment) - - l, ok := wh.watchers[currPath] - - if ok { - - curr := l.Front() - notifiedAll := true - - for { - - if curr == nil { // we have reached the end of the list - - if notifiedAll { - // if we have notified all watcher in the list - // we can delete the list - delete(wh.watchers, currPath) - } - break - } - - next := curr.Next() // save the next - - w, _ := curr.Value.(*watcher) - - if w.recursive { - w.eventChan <- e - l.Remove(curr) - } else { - if e.Key == currPath { // only notify the same path - w.eventChan <- e - l.Remove(curr) - } else { // we do not notify all watcher in the list - notifiedAll = false - } - } - - curr = next // go to the next one - - } - } - + wh.notifyWithPath(e, currPath, false) } } diff --git a/file_system/watcher_test.go b/file_system/watcher_test.go index c63a489d7..b817e64ec 100644 --- a/file_system/watcher_test.go +++ b/file_system/watcher_test.go @@ -6,7 +6,7 @@ import ( func TestWatch(t *testing.T) { wh := newWatchHub(100) - err, c := wh.watch("/foo", true, 0) + c, err := wh.watch("/foo", true, 0) if err != nil { t.Fatal("%v", err) @@ -29,7 +29,7 @@ func TestWatch(t *testing.T) { t.Fatal("recv != send") } - _, c = wh.watch("/foo", false, 0) + c, _ = wh.watch("/foo", false, 0) e = newEvent(Set, "/foo/bar", 1, 0) From f50cf0497dc4f0a7cfb5e387dcbdc72e09160cb0 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 7 Sep 2013 08:54:58 -0400 Subject: [PATCH 6/7] add comments; event.go: add Create and Update const --- file_system/event.go | 3 +- file_system/event_test.go | 12 ++--- file_system/file_system.go | 96 ++++++++++++++++++++----------------- file_system/node.go | 8 ++-- file_system/watcher_test.go | 6 +-- 5 files changed, 68 insertions(+), 57 deletions(-) diff --git a/file_system/event.go b/file_system/event.go index d539ed9da..d8fef7da8 100644 --- a/file_system/event.go +++ b/file_system/event.go @@ -8,7 +8,8 @@ import ( const ( Get = "get" - Set = "set" + Create = "create" + Update = "update" Delete = "delete" TestAndSet = "testAndSet" ) diff --git a/file_system/event_test.go b/file_system/event_test.go index 2c9b69442..53df55ee5 100644 --- a/file_system/event_test.go +++ b/file_system/event_test.go @@ -13,7 +13,7 @@ func TestEventQueue(t *testing.T) { // Add for i := 0; i < 200; i++ { - e := newEvent(Set, "/foo", uint64(i), 0) + e := newEvent(Create, "/foo", uint64(i), 0) eh.addEvent(e) } @@ -33,11 +33,11 @@ func TestScanHistory(t *testing.T) { eh := newEventHistory(100) // Add - eh.addEvent(newEvent(Set, "/foo", 1, 0)) - eh.addEvent(newEvent(Set, "/foo/bar", 2, 0)) - eh.addEvent(newEvent(Set, "/foo/foo", 3, 0)) - eh.addEvent(newEvent(Set, "/foo/bar/bar", 4, 0)) - eh.addEvent(newEvent(Set, "/foo/foo/foo", 5, 0)) + eh.addEvent(newEvent(Create, "/foo", 1, 0)) + eh.addEvent(newEvent(Create, "/foo/bar", 2, 0)) + eh.addEvent(newEvent(Create, "/foo/foo", 3, 0)) + eh.addEvent(newEvent(Create, "/foo/bar/bar", 4, 0)) + eh.addEvent(newEvent(Create, "/foo/foo/foo", 5, 0)) e, err := eh.scan("/foo", 1) if err != nil || e.Index != 1 { diff --git a/file_system/file_system.go b/file_system/file_system.go index 43f850275..8edf0a068 100644 --- a/file_system/file_system.go +++ b/file_system/file_system.go @@ -25,15 +25,15 @@ func New() *FileSystem { } -func (fs *FileSystem) Get(keyPath string, recusive bool, index uint64, term uint64) (*Event, error) { +func (fs *FileSystem) Get(nodePath string, recusive bool, index uint64, term uint64) (*Event, error) { // TODO: add recursive get - n, err := fs.InternalGet(keyPath, index, term) + n, err := fs.InternalGet(nodePath, index, term) if err != nil { return nil, err } - e := newEvent(Get, keyPath, index, term) + e := newEvent(Get, nodePath, index, term) if n.IsDir() { // node is dir e.Dir = true @@ -65,14 +65,17 @@ func (fs *FileSystem) Get(keyPath string, recusive bool, index uint64, term uint return e, nil } -func (fs *FileSystem) Create(keyPath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { - keyPath = path.Clean("/" + keyPath) +// Create function creates the Node at nodePath. Create will help to create intermediate directories with no ttl. +// If the node has already existed, create will fail. +// If any node on the path is a file, create will fail. +func (fs *FileSystem) Create(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { + nodePath = path.Clean("/" + nodePath) // make sure we can create the node - _, err := fs.InternalGet(keyPath, index, term) + _, err := fs.InternalGet(nodePath, index, term) if err == nil { // key already exists - return nil, etcdErr.NewError(105, keyPath) + return nil, etcdErr.NewError(105, nodePath) } etcdError, _ := err.(etcdErr.Error) @@ -81,28 +84,28 @@ func (fs *FileSystem) Create(keyPath string, value string, expireTime time.Time, return nil, err } - dir, _ := path.Split(keyPath) + dir, _ := path.Split(nodePath) - // walk through the keyPath, create dirs and get the last directory node + // walk through the nodePath, create dirs and get the last directory node d, err := fs.walk(dir, fs.checkDir) if err != nil { return nil, err } - e := newEvent(Set, keyPath, fs.Index, fs.Term) + e := newEvent(Create, nodePath, fs.Index, fs.Term) var n *Node if len(value) != 0 { // create file e.Value = value - n = newFile(keyPath, value, fs.Index, fs.Term, d, "", expireTime) + n = newFile(nodePath, value, fs.Index, fs.Term, d, "", expireTime) } else { // create directory e.Dir = true - n = newDir(keyPath, fs.Index, fs.Term, d, "", expireTime) + n = newDir(nodePath, fs.Index, fs.Term, d, "", expireTime) } @@ -119,26 +122,26 @@ func (fs *FileSystem) Create(keyPath string, value string, expireTime time.Time, e.TTL = int64(expireTime.Sub(time.Now()) / time.Second) } + fs.WatcherHub.notify(e) return e, nil } -func (fs *FileSystem) Update(keyPath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { - n, err := fs.InternalGet(keyPath, index, term) +// Update function updates the value/ttl of the node. +// If the node is a file, the value and the ttl can be updated. +// If the node is a directory, only the ttl can be updated. +func (fs *FileSystem) Update(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { + n, err := fs.InternalGet(nodePath, index, term) - if err != nil { // if node does not exist, return error + if err != nil { // if the node does not exist, return error return nil, err } - e := newEvent(Set, keyPath, fs.Index, fs.Term) + e := newEvent(Update, nodePath, fs.Index, fs.Term) if n.IsDir() { // if the node is a directory, we can only update ttl if len(value) != 0 { - return nil, etcdErr.NewError(102, keyPath) - } - - if n.ExpireTime != Permanent && expireTime != Permanent { - n.stopExpire <- true + return nil, etcdErr.NewError(102, nodePath) } } else { // if the node is a file, we can update value and ttl @@ -149,25 +152,27 @@ func (fs *FileSystem) Update(keyPath string, value string, expireTime time.Time, } n.Write(value, index, term) - - if n.ExpireTime != Permanent && expireTime != Permanent { - n.stopExpire <- true - } - } // update ttl + if n.ExpireTime != Permanent && expireTime != Permanent { + n.stopExpire <- true + } + if expireTime != Permanent { go n.Expire() e.Expiration = &n.ExpireTime e.TTL = int64(expireTime.Sub(time.Now()) / time.Second) } + fs.WatcherHub.notify(e) return e, nil } -func (fs *FileSystem) TestAndSet(keyPath string, prevValue string, prevIndex uint64, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { - f, err := fs.InternalGet(keyPath, index, term) +func (fs *FileSystem) TestAndSet(nodePath string, prevValue string, prevIndex uint64, + value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { + + f, err := fs.InternalGet(nodePath, index, term) if err != nil { @@ -175,15 +180,18 @@ func (fs *FileSystem) TestAndSet(keyPath string, prevValue string, prevIndex uin } if f.IsDir() { // can only test and set file - return nil, etcdErr.NewError(102, keyPath) + return nil, etcdErr.NewError(102, nodePath) } if f.Value == prevValue || f.ModifiedIndex == prevIndex { // if test succeed, write the value - e := newEvent(TestAndSet, keyPath, index, term) + e := newEvent(TestAndSet, nodePath, index, term) e.PrevValue = f.Value e.Value = value f.Write(value, index, term) + + fs.WatcherHub.notify(e) + return e, nil } @@ -191,14 +199,16 @@ func (fs *FileSystem) TestAndSet(keyPath string, prevValue string, prevIndex uin return nil, etcdErr.NewError(101, cause) } -func (fs *FileSystem) Delete(keyPath string, recurisive bool, index uint64, term uint64) (*Event, error) { - n, err := fs.InternalGet(keyPath, index, term) +// Delete function deletes the node at the given path. +// If the node is a directory, recursive must be true to delete it. +func (fs *FileSystem) Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) { + n, err := fs.InternalGet(nodePath, index, term) - if err != nil { + if err != nil { // if the node does not exist, return error return nil, err } - e := newEvent(Delete, keyPath, index, term) + e := newEvent(Delete, nodePath, index, term) if n.IsDir() { e.Dir = true @@ -206,11 +216,11 @@ func (fs *FileSystem) Delete(keyPath string, recurisive bool, index uint64, term e.PrevValue = n.Value } - callback := func(path string) { + callback := func(path string) { // notify function fs.WatcherHub.notifyWithPath(e, path, true) } - err = n.Remove(recurisive, callback) + err = n.Remove(recursive, callback) if err != nil { return nil, err @@ -221,9 +231,9 @@ func (fs *FileSystem) Delete(keyPath string, recurisive bool, index uint64, term return e, nil } -// walk function walks all the keyPath and apply the walkFunc on each directory -func (fs *FileSystem) walk(keyPath string, walkFunc func(prev *Node, component string) (*Node, error)) (*Node, error) { - components := strings.Split(keyPath, "/") +// walk function walks all the nodePath and apply the walkFunc on each directory +func (fs *FileSystem) walk(nodePath string, walkFunc func(prev *Node, component string) (*Node, error)) (*Node, error) { + components := strings.Split(nodePath, "/") curr := fs.Root @@ -243,9 +253,9 @@ func (fs *FileSystem) walk(keyPath string, walkFunc func(prev *Node, component s return curr, nil } -// InternalGet function get the node of the given keyPath. -func (fs *FileSystem) InternalGet(keyPath string, index uint64, term uint64) (*Node, error) { - keyPath = path.Clean("/" + keyPath) +// InternalGet function get the node of the given nodePath. +func (fs *FileSystem) InternalGet(nodePath string, index uint64, term uint64) (*Node, error) { + nodePath = path.Clean("/" + nodePath) // update file system known index and term fs.Index, fs.Term = index, term @@ -264,7 +274,7 @@ func (fs *FileSystem) InternalGet(keyPath string, index uint64, term uint64) (*N return nil, etcdErr.NewError(100, path.Join(parent.Path, name)) } - f, err := fs.walk(keyPath, walkFunc) + f, err := fs.walk(nodePath, walkFunc) if err != nil { return nil, err diff --git a/file_system/node.go b/file_system/node.go index 761266768..362584712 100644 --- a/file_system/node.go +++ b/file_system/node.go @@ -34,9 +34,9 @@ type Node struct { stopExpire chan bool // stop expire routine channel } -func newFile(keyPath string, value string, createIndex uint64, createTerm uint64, parent *Node, ACL string, expireTime time.Time) *Node { +func newFile(nodePath string, value string, createIndex uint64, createTerm uint64, parent *Node, ACL string, expireTime time.Time) *Node { return &Node{ - Path: keyPath, + Path: nodePath, CreateIndex: createIndex, CreateTerm: createTerm, ModifiedIndex: createIndex, @@ -49,9 +49,9 @@ func newFile(keyPath string, value string, createIndex uint64, createTerm uint64 } } -func newDir(keyPath string, createIndex uint64, createTerm uint64, parent *Node, ACL string, expireTime time.Time) *Node { +func newDir(nodePath string, createIndex uint64, createTerm uint64, parent *Node, ACL string, expireTime time.Time) *Node { return &Node{ - Path: keyPath, + Path: nodePath, CreateIndex: createIndex, CreateTerm: createTerm, Parent: parent, diff --git a/file_system/watcher_test.go b/file_system/watcher_test.go index b817e64ec..ea8eae787 100644 --- a/file_system/watcher_test.go +++ b/file_system/watcher_test.go @@ -19,7 +19,7 @@ func TestWatch(t *testing.T) { // do nothing } - e := newEvent(Set, "/foo/bar", 1, 0) + e := newEvent(Create, "/foo/bar", 1, 0) wh.notify(e) @@ -31,7 +31,7 @@ func TestWatch(t *testing.T) { c, _ = wh.watch("/foo", false, 0) - e = newEvent(Set, "/foo/bar", 1, 0) + e = newEvent(Create, "/foo/bar", 1, 0) wh.notify(e) @@ -42,7 +42,7 @@ func TestWatch(t *testing.T) { // do nothing } - e = newEvent(Set, "/foo", 1, 0) + e = newEvent(Create, "/foo", 1, 0) wh.notify(e) From 08057fa6421374a31e2d8343cdc7ccfdcf5c1a87 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 7 Sep 2013 09:14:27 -0400 Subject: [PATCH 7/7] remove todo --- file_system/file_system.go | 1 - 1 file changed, 1 deletion(-) diff --git a/file_system/file_system.go b/file_system/file_system.go index 8edf0a068..154f2d464 100644 --- a/file_system/file_system.go +++ b/file_system/file_system.go @@ -26,7 +26,6 @@ func New() *FileSystem { } func (fs *FileSystem) Get(nodePath string, recusive bool, index uint64, term uint64) (*Event, error) { - // TODO: add recursive get n, err := fs.InternalGet(nodePath, index, term) if err != nil {