diff --git a/command.go b/command.go index 1c014e4d0..9ab35934b 100644 --- a/command.go +++ b/command.go @@ -9,7 +9,7 @@ import ( "time" etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/file_system" + "github.com/coreos/etcd/store" "github.com/coreos/go-raft" ) @@ -39,7 +39,7 @@ func (c *CreateCommand) CommandName() string { // Create node func (c *CreateCommand) Apply(server *raft.Server) (interface{}, error) { - e, err := etcdFs.Create(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) + e, err := etcdStore.Create(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) if err != nil { debug(err) @@ -63,7 +63,7 @@ func (c *UpdateCommand) CommandName() string { // Update node func (c *UpdateCommand) Apply(server *raft.Server) (interface{}, error) { - e, err := etcdFs.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) + e, err := etcdStore.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) if err != nil { debug(err) @@ -89,7 +89,7 @@ func (c *TestAndSetCommand) CommandName() string { // Set the key-value pair if the current value of the key equals to the given prevValue func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) { - e, err := etcdFs.TestAndSet(c.Key, c.PrevValue, c.PrevIndex, + e, err := etcdStore.TestAndSet(c.Key, c.PrevValue, c.PrevIndex, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) if err != nil { @@ -114,7 +114,7 @@ func (c *GetCommand) CommandName() string { // Get the value of key func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) { - e, err := etcdFs.Get(c.Key, c.Recursive, c.Sorted, server.CommitIndex(), server.Term()) + e, err := etcdStore.Get(c.Key, c.Recursive, c.Sorted, server.CommitIndex(), server.Term()) if err != nil { debug(err) @@ -137,7 +137,7 @@ func (c *DeleteCommand) CommandName() string { // Delete the key func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) { - e, err := etcdFs.Delete(c.Key, c.Recursive, server.CommitIndex(), server.Term()) + e, err := etcdStore.Delete(c.Key, c.Recursive, server.CommitIndex(), server.Term()) if err != nil { debug(err) @@ -160,7 +160,7 @@ func (c *WatchCommand) CommandName() string { } func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) { - eventChan, err := etcdFs.Watch(c.Key, c.Recursive, c.SinceIndex, server.CommitIndex(), server.Term()) + eventChan, err := etcdStore.Watch(c.Key, c.Recursive, c.SinceIndex, server.CommitIndex(), server.Term()) if err != nil { return nil, err @@ -197,7 +197,7 @@ func (c *JoinCommand) CommandName() string { func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { // check if the join command is from a previous machine, who lost all its previous log. - e, _ := etcdFs.Get(path.Join("/_etcd/machines", c.Name), false, false, raftServer.CommitIndex(), raftServer.Term()) + e, _ := etcdStore.Get(path.Join("/_etcd/machines", c.Name), false, false, raftServer.CommitIndex(), raftServer.Term()) b := make([]byte, 8) binary.PutUvarint(b, raftServer.CommitIndex()) @@ -221,7 +221,7 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { // add machine in etcd storage key := path.Join("_etcd/machines", c.Name) value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion) - etcdFs.Create(key, value, fileSystem.Permanent, raftServer.CommitIndex(), raftServer.Term()) + etcdStore.Create(key, value, store.Permanent, raftServer.CommitIndex(), raftServer.Term()) if c.Name != r.Name() { // do not add self to the peer list r.peersStats[c.Name] = &raftPeerStats{MinLatency: 1 << 63} @@ -250,7 +250,7 @@ func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) { // remove machine in etcd storage key := path.Join("_etcd/machines", c.Name) - _, err := etcdFs.Delete(key, false, raftServer.CommitIndex(), raftServer.Term()) + _, err := etcdStore.Delete(key, false, raftServer.CommitIndex(), raftServer.Term()) delete(r.peersStats, c.Name) if err != nil { diff --git a/error/error.go b/error/error.go index d97158cf1..643c3244b 100644 --- a/error/error.go +++ b/error/error.go @@ -32,27 +32,27 @@ func init() { errors = make(map[int]string) // command related errors - errors[100] = "Key Not Found" - errors[101] = "Test Failed" //test and set - errors[102] = "Not A File" - errors[103] = "Reached the max number of machines in the cluster" - errors[104] = "Not A Directory" - errors[105] = "Already exists" // create - errors[106] = "The prefix of given key is a keyword in etcd" + errors[EcodeKeyNotFound] = "Key Not Found" + errors[EcodeTestFailed] = "Test Failed" //test and set + errors[EcodeNotFile] = "Not A File" + errors[EcodeNoMoreMachine] = "Reached the max number of machines in the cluster" + errors[EcodeNotDir] = "Not A Directory" + errors[EcodeNodeExist] = "Already exists" // create + errors[EcodeKeyIsPreserved] = "The prefix of given key is a keyword in etcd" // Post form related errors - errors[200] = "Value is Required in POST form" - errors[201] = "PrevValue is Required in POST form" - errors[202] = "The given TTL in POST form is not a number" - errors[203] = "The given index in POST form is not a number" + errors[EcodeValueRequired] = "Value is Required in POST form" + errors[EcodePrevValueRequired] = "PrevValue is Required in POST form" + errors[EcodeTTLNaN] = "The given TTL in POST form is not a number" + errors[EcodeIndexNaN] = "The given index in POST form is not a number" // raft related errors - errors[300] = "Raft Internal Error" - errors[301] = "During Leader Election" + errors[EcodeRaftInternal] = "Raft Internal Error" + errors[EcodeLeaderElect] = "During Leader Election" // etcd related errors - errors[400] = "watcher is cleared due to etcd recovery" - errors[401] = "The event in requested index is outdated and cleared" + errors[EcodeWatcherCleared] = "watcher is cleared due to etcd recovery" + errors[EcodeEventIndexCleared] = "The event in requested index is outdated and cleared" } diff --git a/etcd.go b/etcd.go index cba7809f4..80c8dd122 100644 --- a/etcd.go +++ b/etcd.go @@ -10,7 +10,7 @@ import ( "strings" "time" - "github.com/coreos/etcd/file_system" + "github.com/coreos/etcd/store" "github.com/coreos/go-raft" ) @@ -136,7 +136,7 @@ type TLSConfig struct { // //------------------------------------------------------------------------------ -var etcdFs *fileSystem.FileSystem +var etcdStore *store.Store //------------------------------------------------------------------------------ // @@ -204,7 +204,7 @@ func main() { info := getInfo(dirPath) // Create etcd key-value store - etcdFs = fileSystem.New() + etcdStore = store.New() snapConf = newSnapshotConf() diff --git a/etcd_handlers.go b/etcd_handlers.go index d8f42fc7e..09e5e8252 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -7,7 +7,7 @@ import ( "strings" etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/file_system" + "github.com/coreos/etcd/store" "github.com/coreos/go-raft" ) @@ -120,7 +120,7 @@ func UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error { } // TODO: update should give at least one option - if value == "" && expireTime.Sub(fileSystem.Permanent) == 0 { + if value == "" && expireTime.Sub(store.Permanent) == 0 { return nil } @@ -223,8 +223,7 @@ func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error { // Handler to return the basic stats of etcd func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error { w.WriteHeader(http.StatusOK) - //w.Write(etcdStore.Stats()) - w.Write(etcdFs.Stats.GetStats()) + w.Write(etcdStore.JsonStats()) w.Write(r.Stats()) return nil } diff --git a/file_system/stats.go b/file_system/stats.go deleted file mode 100644 index c05655efc..000000000 --- a/file_system/stats.go +++ /dev/null @@ -1,146 +0,0 @@ -package fileSystem - -import ( - "encoding/json" - "sync" -) - -const ( - // Operations that will be running serializely - StatsSetsHit = 100 - StatsSetsMiss = 101 - StatsDeletesHit = 102 - StatsDeletesMiss = 103 - StatsUpdatesHit = 104 - StatsUpdatesMiss = 105 - StatsTestAndSetsHit = 106 - StatsTestAndSetsMiss = 107 - StatsRecoveryHit = 108 - StatsRecoveryMiss = 109 - - // concurrent operations - StatsGetsHit = 200 - StatsGetsMiss = 201 - - StatsWatchHit = 300 - StatsWatchMiss = 301 - StatsInWatchingNum = 302 - - StatsSaveHit = 400 - StatsSaveMiss = 401 -) - -type EtcdStats struct { - - // Lock for synchronization - rwlock sync.RWMutex - - // Number of get requests - GetsHit uint64 `json:"gets_hits"` - GetsMiss uint64 `json:"gets_misses"` - - // Number of sets requests - SetsHit uint64 `json:"sets_hits"` - SetsMiss uint64 `json:"sets_misses"` - - // Number of delete requests - DeletesHit uint64 `json:"deletes_hits"` - DeletesMiss uint64 `json:"deletes_misses"` - - // Number of update requests - UpdatesHit uint64 `json:"updates_hits"` - UpdatesMiss uint64 `json:"updates_misses"` - - // Number of testAndSet requests - TestAndSetsHit uint64 `json:"testAndSets_hits"` - TestAndSetsMiss uint64 `json:"testAndSets_misses"` - - // Number of Watch requests - WatchHit uint64 `json:"watch_hit"` - WatchMiss uint64 `json:"watch_miss"` - InWatchingNum uint64 `json:"in_watching_number"` - - // Number of save requests - SaveHit uint64 `json:"save_hit"` - SaveMiss uint64 `json:"save_miss"` - - // Number of recovery requests - RecoveryHit uint64 `json:"recovery_hit"` - RecoveryMiss uint64 `json:"recovery_miss"` -} - -func newStats() *EtcdStats { - e := new(EtcdStats) - return e -} - -// Status() return the statistics info of etcd storage its recent start -func (e *EtcdStats) GetStats() []byte { - b, _ := json.Marshal(e) - return b -} - -func (e *EtcdStats) TotalReads() uint64 { - return e.GetsHit + e.GetsMiss -} - -func (e *EtcdStats) TotalWrites() uint64 { - return e.SetsHit + e.SetsMiss + - e.DeletesHit + e.DeletesMiss + - e.UpdatesHit + e.UpdatesMiss + - e.TestAndSetsHit + e.TestAndSetsMiss -} - -func (e *EtcdStats) IncStats(field int) { - if field >= 200 { - e.rwlock.Lock() - - switch field { - case StatsGetsHit: - e.GetsHit++ - case StatsGetsMiss: - e.GetsMiss++ - case StatsWatchHit: - e.WatchHit++ - case StatsWatchMiss: - e.WatchMiss++ - case StatsInWatchingNum: - e.InWatchingNum++ - case StatsSaveHit: - e.SaveHit++ - case StatsSaveMiss: - e.SaveMiss++ - } - - e.rwlock.Unlock() - - } else { - e.rwlock.RLock() - - switch field { - case StatsSetsHit: - e.SetsHit++ - case StatsSetsMiss: - e.SetsMiss++ - case StatsDeletesHit: - e.DeletesHit++ - case StatsDeletesMiss: - e.DeletesMiss++ - case StatsUpdatesHit: - e.UpdatesHit++ - case StatsUpdatesMiss: - e.UpdatesMiss++ - case StatsTestAndSetsHit: - e.TestAndSetsHit++ - case StatsTestAndSetsMiss: - e.TestAndSetsMiss++ - case StatsRecoveryHit: - e.RecoveryHit++ - case StatsRecoveryMiss: - e.RecoveryMiss++ - } - - e.rwlock.RUnlock() - } - -} diff --git a/file_system/stats_test.go b/file_system/stats_test.go deleted file mode 100644 index 38bc42c4f..000000000 --- a/file_system/stats_test.go +++ /dev/null @@ -1,221 +0,0 @@ -package fileSystem - -import ( - "math/rand" - "testing" - "time" - //"fmt" -) - -func TestBasicStats(t *testing.T) { - fs := New() - keys := GenKeys(rand.Intn(100), 5) - - i := uint64(0) - GetsHit := uint64(0) - GetsMiss := uint64(0) - SetsHit := uint64(0) - SetsMiss := uint64(0) - DeletesHit := uint64(0) - DeletesMiss := uint64(0) - UpdatesHit := uint64(0) - UpdatesMiss := uint64(0) - TestAndSetsHit := uint64(0) - TestAndSetsMiss := uint64(0) - WatchHit := uint64(0) - WatchMiss := uint64(0) - InWatchingNum := uint64(0) - SaveHit := uint64(0) - SaveMiss := uint64(0) - RecoveryHit := uint64(0) - RecoveryMiss := uint64(0) - - for _, k := range keys { - i++ - _, err := fs.Create(k, "bar", time.Now().Add(time.Second*time.Duration(rand.Intn(10))), i, 1) - if err != nil { - SetsMiss++ - } else { - SetsHit++ - } - } - - for _, k := range keys { - _, err := fs.Get(k, false, false, i, 1) - if err != nil { - GetsMiss++ - } else { - GetsHit++ - } - } - - for _, k := range keys { - i++ - _, err := fs.Update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(5))), i, 1) - if err != nil { - UpdatesMiss++ - } else { - UpdatesHit++ - } - } - - for _, k := range keys { - _, err := fs.Get(k, false, false, i, 1) - if err != nil { - GetsMiss++ - } else { - GetsHit++ - } - } - - for _, k := range keys { - i++ - _, err := fs.TestAndSet(k, "foo", 0, "bar", Permanent, i, 1) - if err != nil { - TestAndSetsMiss++ - } else { - TestAndSetsHit++ - } - } - - //fmt.Printf("#TestAndSet [%d]\n", TestAndSetsHit) - - for _, k := range keys { - _, err := fs.Watch(k, false, 0, i, 1) - if err != nil { - WatchMiss++ - } else { - WatchHit++ - InWatchingNum++ - } - } - - //fmt.Printf("#Watch [%d]\n", WatchHit) - - for _, k := range keys { - _, err := fs.Get(k, false, false, i, 1) - if err != nil { - GetsMiss++ - } else { - GetsHit++ - } - } - - //fmt.Println("fs.index ", fs.Index) - for j := 0; j < 5; j++ { - b := make([]byte, 10) - err := fs.Recovery(b) - if err != nil { - RecoveryMiss++ - } - - b, err = fs.Save() - if err != nil { - SaveMiss++ - } else { - SaveHit++ - } - - err = fs.Recovery(b) - if err != nil { - RecoveryMiss++ - } else { - RecoveryHit++ - } - } - //fmt.Println("fs.index after ", fs.Index) - //fmt.Println("stats.inwatching ", fs.Stats.InWatchingNum) - - for _, k := range keys { - i++ - _, err := fs.Delete(k, false, i, 1) - if err != nil { - DeletesMiss++ - } else { - InWatchingNum-- - DeletesHit++ - } - } - - //fmt.Printf("#Delete [%d] stats.deletehit [%d] \n", DeletesHit, fs.Stats.DeletesHit) - - for _, k := range keys { - _, err := fs.Get(k, false, false, i, 1) - if err != nil { - GetsMiss++ - } else { - GetsHit++ - } - } - - if GetsHit != fs.Stats.GetsHit { - t.Fatalf("GetsHit [%d] != Stats.GetsHit [%d]", GetsHit, fs.Stats.GetsHit) - } - - if GetsMiss != fs.Stats.GetsMiss { - t.Fatalf("GetsMiss [%d] != Stats.GetsMiss [%d]", GetsMiss, fs.Stats.GetsMiss) - } - - if SetsHit != fs.Stats.SetsHit { - t.Fatalf("SetsHit [%d] != Stats.SetsHit [%d]", SetsHit, fs.Stats.SetsHit) - } - - if SetsMiss != fs.Stats.SetsMiss { - t.Fatalf("SetsMiss [%d] != Stats.SetsMiss [%d]", SetsMiss, fs.Stats.SetsMiss) - } - - if DeletesHit != fs.Stats.DeletesHit { - t.Fatalf("DeletesHit [%d] != Stats.DeletesHit [%d]", DeletesHit, fs.Stats.DeletesHit) - } - - if DeletesMiss != fs.Stats.DeletesMiss { - t.Fatalf("DeletesMiss [%d] != Stats.DeletesMiss [%d]", DeletesMiss, fs.Stats.DeletesMiss) - } - - if UpdatesHit != fs.Stats.UpdatesHit { - t.Fatalf("UpdatesHit [%d] != Stats.UpdatesHit [%d]", UpdatesHit, fs.Stats.UpdatesHit) - } - - if UpdatesMiss != fs.Stats.UpdatesMiss { - t.Fatalf("UpdatesMiss [%d] != Stats.UpdatesMiss [%d]", UpdatesMiss, fs.Stats.UpdatesMiss) - } - - if TestAndSetsHit != fs.Stats.TestAndSetsHit { - t.Fatalf("TestAndSetsHit [%d] != Stats.TestAndSetsHit [%d]", TestAndSetsHit, fs.Stats.TestAndSetsHit) - } - - if TestAndSetsMiss != fs.Stats.TestAndSetsMiss { - t.Fatalf("TestAndSetsMiss [%d] != Stats.TestAndSetsMiss [%d]", TestAndSetsMiss, fs.Stats.TestAndSetsMiss) - } - - if SaveHit != fs.Stats.SaveHit { - t.Fatalf("SaveHit [%d] != Stats.SaveHit [%d]", SaveHit, fs.Stats.SaveHit) - } - - if SaveMiss != fs.Stats.SaveMiss { - t.Fatalf("SaveMiss [%d] != Stats.SaveMiss [%d]", SaveMiss, fs.Stats.SaveMiss) - } - - if WatchHit != fs.Stats.WatchHit { - t.Fatalf("WatchHit [%d] != Stats.WatchHit [%d]", WatchHit, fs.Stats.WatchHit) - } - - if WatchMiss != fs.Stats.WatchMiss { - t.Fatalf("WatchMiss [%d] != Stats.WatchMiss [%d]", WatchMiss, fs.Stats.WatchMiss) - } - - if InWatchingNum != fs.Stats.InWatchingNum { - t.Fatalf("InWatchingNum [%d] != Stats.InWatchingNum [%d]", InWatchingNum, fs.Stats.InWatchingNum) - } - - if RecoveryHit != fs.Stats.RecoveryHit { - t.Fatalf("RecoveryHit [%d] != Stats.RecoveryHit [%d]", RecoveryHit, fs.Stats.RecoveryHit) - } - - if RecoveryMiss != fs.Stats.RecoveryMiss { - t.Fatalf("RecoveryMiss [%d] != Stats.RecoveryMiss [%d]", RecoveryMiss, fs.Stats.RecoveryMiss) - } - - //fmt.Println(GetsHit, GetsMiss, SetsHit, SetsMiss, DeletesHit, DeletesMiss, UpdatesHit, UpdatesMiss, TestAndSetsHit, TestAndSetsMiss, WatchHit, WatchMiss, InWatchingNum, SaveHit, SaveMiss, RecoveryHit, RecoveryMiss) - -} diff --git a/machines.go b/machines.go index 1988353d5..b863c50fb 100644 --- a/machines.go +++ b/machines.go @@ -2,7 +2,7 @@ package main // machineNum returns the number of machines in the cluster func machineNum() int { - e, err := etcdFs.Get("/_etcd/machines", false, false, r.CommitIndex(), r.Term()) + e, err := etcdStore.Get("/_etcd/machines", false, false, r.CommitIndex(), r.Term()) if err != nil { return 0 diff --git a/name_url_map.go b/name_url_map.go index 38e1ecc15..1192f3fda 100644 --- a/name_url_map.go +++ b/name_url_map.go @@ -56,7 +56,7 @@ func readURL(nodeName string, urlName string) (string, bool) { // convert nodeName to url from etcd storage key := path.Join("/_etcd/machines", nodeName) - e, err := etcdFs.Get(key, false, false, r.CommitIndex(), r.Term()) + e, err := etcdStore.Get(key, false, false, r.CommitIndex(), r.Term()) if err != nil { return "", false diff --git a/raft_server.go b/raft_server.go index d88f266f8..506e4771a 100644 --- a/raft_server.go +++ b/raft_server.go @@ -36,7 +36,7 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, ElectionTimeout) // Create raft server - server, err := raft.NewServer(name, dirPath, raftTransporter, etcdFs, nil) + server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil) check(err) diff --git a/file_system/event.go b/store/event.go similarity index 98% rename from file_system/event.go rename to store/event.go index 3053d85cd..ca7b0389b 100644 --- a/file_system/event.go +++ b/store/event.go @@ -1,4 +1,4 @@ -package fileSystem +package store import ( "fmt" @@ -6,7 +6,7 @@ import ( "sync" "time" - etcdErr "github.com/xiangli-cmu/etcd/error" + etcdErr "github.com/coreos/etcd/error" ) const ( diff --git a/file_system/event_test.go b/store/event_test.go similarity index 98% rename from file_system/event_test.go rename to store/event_test.go index 146f8cd4d..0d19dd52a 100644 --- a/file_system/event_test.go +++ b/store/event_test.go @@ -1,4 +1,4 @@ -package fileSystem +package store import ( "testing" diff --git a/file_system/node.go b/store/node.go similarity index 99% rename from file_system/node.go rename to store/node.go index 587cfb430..8502f70d1 100644 --- a/file_system/node.go +++ b/store/node.go @@ -1,4 +1,4 @@ -package fileSystem +package store import ( "path" @@ -6,7 +6,7 @@ import ( "sync" "time" - etcdErr "github.com/xiangli-cmu/etcd/error" + etcdErr "github.com/coreos/etcd/error" ) var ( diff --git a/store/stats.go b/store/stats.go new file mode 100644 index 000000000..1f98625a0 --- /dev/null +++ b/store/stats.go @@ -0,0 +1,91 @@ +package store + +import ( + "encoding/json" + "sync/atomic" +) + +const ( + SetSuccess = 100 + SetFail = 101 + DeleteSuccess = 102 + DeleteFail = 103 + UpdateSuccess = 104 + UpdateFail = 105 + TestAndSetSuccess = 106 + TestAndSetFail = 107 + GetSuccess = 110 + GetFail = 111 +) + +type Stats struct { + + // Number of get requests + GetSuccess uint64 `json:"getsSuccess"` + GetFail uint64 `json:"getsFail"` + + // Number of sets requests + SetSuccess uint64 `json:"setsSuccess"` + SetFail uint64 `json:"setsFail"` + + // Number of delete requests + DeleteSuccess uint64 `json:"deleteSuccess"` + DeleteFail uint64 `json:"deleteFail"` + + // Number of update requests + UpdateSuccess uint64 `json:"updateSuccess"` + UpdateFail uint64 `json:"updateFail"` + + // Number of testAndSet requests + TestAndSetSuccess uint64 `json:"testAndSetSuccess"` + TestAndSetFail uint64 `json:"testAndSetFail"` + + Watchers uint64 `json:"watchers"` +} + +func newStats() *Stats { + s := new(Stats) + return s +} + +// Status() return the statistics info of etcd storage its recent start +func (s *Stats) toJson() []byte { + b, _ := json.Marshal(s) + return b +} + +func (s *Stats) TotalReads() uint64 { + return s.GetSuccess + s.GetFail +} + +func (s *Stats) TotalWrites() uint64 { + return s.SetSuccess + s.SetFail + + s.DeleteSuccess + s.DeleteFail + + s.TestAndSetSuccess + s.TestAndSetFail + + s.UpdateSuccess + s.UpdateFail +} + +func (s *Stats) Inc(field int) { + switch field { + case SetSuccess: + atomic.AddUint64(&s.SetSuccess, 1) + case SetFail: + atomic.AddUint64(&s.SetFail, 1) + case DeleteSuccess: + atomic.AddUint64(&s.DeleteSuccess, 1) + case DeleteFail: + atomic.AddUint64(&s.DeleteFail, 1) + case GetSuccess: + atomic.AddUint64(&s.GetSuccess, 1) + case GetFail: + atomic.AddUint64(&s.GetFail, 1) + case UpdateSuccess: + atomic.AddUint64(&s.UpdateSuccess, 1) + case UpdateFail: + atomic.AddUint64(&s.UpdateFail, 1) + case TestAndSetSuccess: + atomic.AddUint64(&s.TestAndSetSuccess, 1) + case TestAndSetFail: + atomic.AddUint64(&s.TestAndSetFail, 1) + } +} diff --git a/store/stats_test.go b/store/stats_test.go new file mode 100644 index 000000000..ff67f328a --- /dev/null +++ b/store/stats_test.go @@ -0,0 +1,139 @@ +package store + +import ( + "math/rand" + "testing" + "time" +) + +func TestBasicStats(t *testing.T) { + s := New() + keys := GenKeys(rand.Intn(100), 5) + + var i uint64 + var GetSuccess, GetFail, SetSuccess, SetFail, DeleteSuccess, DeleteFail uint64 + var UpdateSuccess, UpdateFail, TestAndSetSuccess, TestAndSetFail, watcher_number uint64 + + for _, k := range keys { + i++ + _, err := s.Create(k, "bar", time.Now().Add(time.Second*time.Duration(rand.Intn(10))), i, 1) + if err != nil { + SetFail++ + } else { + SetSuccess++ + } + } + + for _, k := range keys { + _, err := s.Get(k, false, false, i, 1) + if err != nil { + GetFail++ + } else { + GetSuccess++ + } + } + + for _, k := range keys { + i++ + _, err := s.Update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(5))), i, 1) + if err != nil { + UpdateFail++ + } else { + UpdateSuccess++ + } + } + + for _, k := range keys { + _, err := s.Get(k, false, false, i, 1) + if err != nil { + GetFail++ + } else { + GetSuccess++ + } + } + + for _, k := range keys { + i++ + _, err := s.TestAndSet(k, "foo", 0, "bar", Permanent, i, 1) + if err != nil { + TestAndSetFail++ + } else { + TestAndSetSuccess++ + } + } + + for _, k := range keys { + s.Watch(k, false, 0, i, 1) + watcher_number++ + } + + for _, k := range keys { + _, err := s.Get(k, false, false, i, 1) + if err != nil { + GetFail++ + } else { + GetSuccess++ + } + } + + for _, k := range keys { + i++ + _, err := s.Delete(k, false, i, 1) + if err != nil { + DeleteFail++ + } else { + watcher_number-- + DeleteSuccess++ + } + } + + for _, k := range keys { + _, err := s.Get(k, false, false, i, 1) + if err != nil { + GetFail++ + } else { + GetSuccess++ + } + } + + if GetSuccess != s.Stats.GetSuccess { + t.Fatalf("GetSuccess [%d] != Stats.GetSuccess [%d]", GetSuccess, s.Stats.GetSuccess) + } + + if GetFail != s.Stats.GetFail { + t.Fatalf("GetFail [%d] != Stats.GetFail [%d]", GetFail, s.Stats.GetFail) + } + + if SetSuccess != s.Stats.SetSuccess { + t.Fatalf("SetSuccess [%d] != Stats.SetSuccess [%d]", SetSuccess, s.Stats.SetSuccess) + } + + if SetFail != s.Stats.SetFail { + t.Fatalf("SetFail [%d] != Stats.SetFail [%d]", SetFail, s.Stats.SetFail) + } + + if DeleteSuccess != s.Stats.DeleteSuccess { + t.Fatalf("DeleteSuccess [%d] != Stats.DeleteSuccess [%d]", DeleteSuccess, s.Stats.DeleteSuccess) + } + + if DeleteFail != s.Stats.DeleteFail { + t.Fatalf("DeleteFail [%d] != Stats.DeleteFail [%d]", DeleteFail, s.Stats.DeleteFail) + } + + if UpdateSuccess != s.Stats.UpdateSuccess { + t.Fatalf("UpdateSuccess [%d] != Stats.UpdateSuccess [%d]", UpdateSuccess, s.Stats.UpdateSuccess) + } + + if UpdateFail != s.Stats.UpdateFail { + t.Fatalf("UpdateFail [%d] != Stats.UpdateFail [%d]", UpdateFail, s.Stats.UpdateFail) + } + + if TestAndSetSuccess != s.Stats.TestAndSetSuccess { + t.Fatalf("TestAndSetSuccess [%d] != Stats.TestAndSetSuccess [%d]", TestAndSetSuccess, s.Stats.TestAndSetSuccess) + } + + if TestAndSetFail != s.Stats.TestAndSetFail { + t.Fatalf("TestAndSetFail [%d] != Stats.TestAndSetFail [%d]", TestAndSetFail, s.Stats.TestAndSetFail) + } + +} diff --git a/file_system/file_system.go b/store/store.go similarity index 64% rename from file_system/file_system.go rename to store/store.go index 0ed740253..73c6bf3ec 100644 --- a/file_system/file_system.go +++ b/store/store.go @@ -1,4 +1,4 @@ -package fileSystem +package store import ( "encoding/json" @@ -8,34 +8,34 @@ import ( "strings" "time" - etcdErr "github.com/xiangli-cmu/etcd/error" + etcdErr "github.com/coreos/etcd/error" ) -type FileSystem struct { +type Store struct { Root *Node WatcherHub *watcherHub Index uint64 Term uint64 - Stats *EtcdStats + Stats *Stats } -func New() *FileSystem { - fs := new(FileSystem) - fs.Root = newDir("/", 0, 0, nil, "", Permanent) - fs.Stats = newStats() - fs.WatcherHub = newWatchHub(1000, fs.Stats) +func New() *Store { + s := new(Store) + s.Root = newDir("/", 0, 0, nil, "", Permanent) + s.Stats = newStats() + s.WatcherHub = newWatchHub(1000) - return fs + return s } -func (fs *FileSystem) Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) { +func (s *Store) Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) { nodePath = path.Clean(path.Join("/", nodePath)) - n, err := fs.InternalGet(nodePath, index, term) + n, err := s.InternalGet(nodePath, index, term) if err != nil { - fs.Stats.IncStats(StatsGetsMiss) + s.Stats.Inc(GetFail) return nil, err } @@ -82,61 +82,62 @@ func (fs *FileSystem) Get(nodePath string, recursive, sorted bool, index uint64, e.TTL = int64(n.ExpireTime.Sub(time.Now())/time.Second) + 1 } - fs.Stats.IncStats(StatsGetsHit) + s.Stats.Inc(GetSuccess) + return e, nil } // Create function creates the Node at nodePath. Create will help to create intermediate directories with no ttl. // If the node has already existed, create will fail. // If any node on the path is a file, create will fail. -func (fs *FileSystem) Create(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { +func (s *Store) Create(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { nodePath = path.Clean(path.Join("/", nodePath)) // make sure we can create the node - _, err := fs.InternalGet(nodePath, index, term) + _, err := s.InternalGet(nodePath, index, term) if err == nil { // key already exists - fs.Stats.IncStats(StatsSetsMiss) + s.Stats.Inc(SetFail) return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath) } etcdError, _ := err.(etcdErr.Error) if etcdError.ErrorCode == 104 { // we cannot create the key due to meet a file while walking - fs.Stats.IncStats(StatsSetsMiss) + s.Stats.Inc(SetFail) return nil, err } dir, _ := path.Split(nodePath) // walk through the nodePath, create dirs and get the last directory node - d, err := fs.walk(dir, fs.checkDir) + d, err := s.walk(dir, s.checkDir) if err != nil { - fs.Stats.IncStats(StatsSetsMiss) + s.Stats.Inc(SetFail) return nil, err } - e := newEvent(Create, nodePath, fs.Index, fs.Term) + e := newEvent(Create, nodePath, s.Index, s.Term) var n *Node if len(value) != 0 { // create file e.Value = value - n = newFile(nodePath, value, fs.Index, fs.Term, d, "", expireTime) + n = newFile(nodePath, value, s.Index, s.Term, d, "", expireTime) } else { // create directory e.Dir = true - n = newDir(nodePath, fs.Index, fs.Term, d, "", expireTime) + n = newDir(nodePath, s.Index, s.Term, d, "", expireTime) } err = d.Add(n) if err != nil { - fs.Stats.IncStats(StatsSetsMiss) + s.Stats.Inc(SetFail) return nil, err } @@ -147,28 +148,28 @@ func (fs *FileSystem) Create(nodePath string, value string, expireTime time.Time e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1 } - fs.WatcherHub.notify(e) - fs.Stats.IncStats(StatsSetsHit) + s.WatcherHub.notify(e) + s.Stats.Inc(SetSuccess) return e, nil } // Update function updates the value/ttl of the node. // If the node is a file, the value and the ttl can be updated. // If the node is a directory, only the ttl can be updated. -func (fs *FileSystem) Update(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { - n, err := fs.InternalGet(nodePath, index, term) +func (s *Store) Update(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { + n, err := s.InternalGet(nodePath, index, term) if err != nil { // if the node does not exist, return error - fs.Stats.IncStats(StatsUpdatesMiss) + s.Stats.Inc(UpdateFail) return nil, err } - e := newEvent(Update, nodePath, fs.Index, fs.Term) + e := newEvent(Update, nodePath, s.Index, s.Term) if n.IsDir() { // if the node is a directory, we can only update ttl if len(value) != 0 { - fs.Stats.IncStats(StatsUpdatesMiss) + s.Stats.Inc(UpdateFail) return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath) } @@ -194,23 +195,23 @@ func (fs *FileSystem) Update(nodePath string, value string, expireTime time.Time e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1 } - fs.WatcherHub.notify(e) - fs.Stats.IncStats(StatsUpdatesHit) + s.WatcherHub.notify(e) + s.Stats.Inc(UpdateSuccess) return e, nil } -func (fs *FileSystem) TestAndSet(nodePath string, prevValue string, prevIndex uint64, +func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { - f, err := fs.InternalGet(nodePath, index, term) + f, err := s.InternalGet(nodePath, index, term) if err != nil { - fs.Stats.IncStats(StatsTestAndSetsMiss) + s.Stats.Inc(TestAndSetFail) return nil, err } if f.IsDir() { // can only test and set file - fs.Stats.IncStats(StatsTestAndSetsMiss) + s.Stats.Inc(TestAndSetFail) return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath) } @@ -221,23 +222,23 @@ func (fs *FileSystem) TestAndSet(nodePath string, prevValue string, prevIndex ui e.Value = value f.Write(value, index, term) - fs.WatcherHub.notify(e) - fs.Stats.IncStats(StatsTestAndSetsHit) + s.WatcherHub.notify(e) + s.Stats.Inc(TestAndSetSuccess) return e, nil } cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, f.Value, prevIndex, f.ModifiedIndex) - fs.Stats.IncStats(StatsTestAndSetsMiss) + s.Stats.Inc(TestAndSetFail) return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause) } // Delete function deletes the node at the given path. // If the node is a directory, recursive must be true to delete it. -func (fs *FileSystem) Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) { - n, err := fs.InternalGet(nodePath, index, term) +func (s *Store) Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) { + n, err := s.InternalGet(nodePath, index, term) if err != nil { // if the node does not exist, return error - fs.Stats.IncStats(StatsDeletesMiss) + s.Stats.Inc(DeleteFail) return nil, err } @@ -250,37 +251,37 @@ func (fs *FileSystem) Delete(nodePath string, recursive bool, index uint64, term } callback := func(path string) { // notify function - fs.WatcherHub.notifyWithPath(e, path, true) + s.WatcherHub.notifyWithPath(e, path, true) } err = n.Remove(recursive, callback) if err != nil { - fs.Stats.IncStats(StatsDeletesMiss) + s.Stats.Inc(DeleteFail) return nil, err } - fs.WatcherHub.notify(e) - fs.Stats.IncStats(StatsDeletesHit) + s.WatcherHub.notify(e) + s.Stats.Inc(DeleteSuccess) return e, nil } -func (fs *FileSystem) Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error) { - fs.Index, fs.Term = index, term +func (s *Store) Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error) { + s.Index, s.Term = index, term if sinceIndex == 0 { - return fs.WatcherHub.watch(prefix, recursive, index+1) + return s.WatcherHub.watch(prefix, recursive, index+1) } - return fs.WatcherHub.watch(prefix, recursive, sinceIndex) + return s.WatcherHub.watch(prefix, recursive, sinceIndex) } // walk function walks all the nodePath and apply the walkFunc on each directory -func (fs *FileSystem) walk(nodePath string, walkFunc func(prev *Node, component string) (*Node, error)) (*Node, error) { +func (s *Store) walk(nodePath string, walkFunc func(prev *Node, component string) (*Node, error)) (*Node, error) { components := strings.Split(nodePath, "/") - curr := fs.Root + curr := s.Root var err error for i := 1; i < len(components); i++ { @@ -299,11 +300,11 @@ func (fs *FileSystem) walk(nodePath string, walkFunc func(prev *Node, component } // InternalGet function get the node of the given nodePath. -func (fs *FileSystem) InternalGet(nodePath string, index uint64, term uint64) (*Node, error) { +func (s *Store) InternalGet(nodePath string, index uint64, term uint64) (*Node, error) { nodePath = path.Clean(path.Join("/", nodePath)) // update file system known index and term - fs.Index, fs.Term = index, term + s.Index, s.Term = index, term walkFunc := func(parent *Node, name string) (*Node, error) { @@ -319,7 +320,7 @@ func (fs *FileSystem) InternalGet(nodePath string, index uint64, term uint64) (* return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name)) } - f, err := fs.walk(nodePath, walkFunc) + f, err := s.walk(nodePath, walkFunc) if err != nil { return nil, err @@ -332,14 +333,14 @@ func (fs *FileSystem) InternalGet(nodePath string, index uint64, term uint64) (* // If it is a directory, this function will return the pointer to that node. // If it does not exist, this function will create a new directory and return the pointer to that node. // If it is a file, this function will return error. -func (fs *FileSystem) checkDir(parent *Node, dirName string) (*Node, error) { +func (s *Store) checkDir(parent *Node, dirName string) (*Node, error) { subDir, ok := parent.Children[dirName] if ok { return subDir, nil } - n := newDir(path.Join(parent.Path, dirName), fs.Index, fs.Term, parent, parent.ACL, Permanent) + n := newDir(path.Join(parent.Path, dirName), s.Index, s.Term, parent, parent.ACL, Permanent) parent.Children[dirName] = n @@ -350,24 +351,20 @@ func (fs *FileSystem) checkDir(parent *Node, dirName string) (*Node, error) { // Save function will not be able to save the state of watchers. // Save function will not save the parent field of the node. Or there will // be cyclic dependencies issue for the json package. -func (fs *FileSystem) Save() ([]byte, error) { +func (s *Store) Save() ([]byte, error) { + clonedStore := New() + clonedStore.Root = s.Root.Clone() + clonedStore.WatcherHub = s.WatcherHub + clonedStore.Index = s.Index + clonedStore.Term = s.Term + clonedStore.Stats = s.Stats - fs.Stats.IncStats(StatsSaveHit) - cloneFs := New() - cloneFs.Root = fs.Root.Clone() - - b, err := json.Marshal(fs) + b, err := json.Marshal(clonedStore) if err != nil { - fs.Stats.IncStats(StatsSaveMiss) - fs.Stats.rwlock.Lock() - fs.Stats.SaveHit-- // restore the savehit - fs.Stats.rwlock.Unlock() - return nil, err } - fs.Stats.IncStats(StatsSaveHit) return b, nil } @@ -375,15 +372,18 @@ func (fs *FileSystem) Save() ([]byte, error) { // It needs to recovery the parent field of the nodes. // It needs to delete the expired nodes since the saved time and also // need to create monitor go routines. -func (fs *FileSystem) Recovery(state []byte) error { - err := json.Unmarshal(state, fs) +func (s *Store) Recovery(state []byte) error { + err := json.Unmarshal(state, s) if err != nil { - fs.Stats.IncStats(StatsRecoveryMiss) return err } - fs.Root.recoverAndclean() - fs.Stats.IncStats(StatsRecoveryHit) + s.Root.recoverAndclean() return nil } + +func (s *Store) JsonStats() []byte { + s.Stats.Watchers = uint64(s.WatcherHub.count) + return s.Stats.toJson() +} diff --git a/file_system/file_system_test.go b/store/store_test.go similarity index 68% rename from file_system/file_system_test.go rename to store/store_test.go index 0478f9c30..259310ff4 100644 --- a/file_system/file_system_test.go +++ b/store/store_test.go @@ -1,4 +1,4 @@ -package fileSystem +package store import ( "math/rand" @@ -8,46 +8,46 @@ import ( ) func TestCreateAndGet(t *testing.T) { - fs := New() + s := New() - fs.Create("/foobar", "bar", Permanent, 1, 1) + s.Create("/foobar", "bar", Permanent, 1, 1) // already exist, create should fail - _, err := fs.Create("/foobar", "bar", Permanent, 1, 1) + _, err := s.Create("/foobar", "bar", Permanent, 1, 1) if err == nil { t.Fatal("Create should fail") } - fs.Delete("/foobar", true, 1, 1) + s.Delete("/foobar", true, 1, 1) // this should create successfully - createAndGet(fs, "/foobar", t) - createAndGet(fs, "/foo/bar", t) - createAndGet(fs, "/foo/foo/bar", t) + createAndGet(s, "/foobar", t) + createAndGet(s, "/foo/bar", t) + createAndGet(s, "/foo/foo/bar", t) // meet file, create should fail - _, err = fs.Create("/foo/bar/bar", "bar", Permanent, 2, 1) + _, err = s.Create("/foo/bar/bar", "bar", Permanent, 2, 1) if err == nil { t.Fatal("Create should fail") } // create a directory - _, err = fs.Create("/fooDir", "", Permanent, 3, 1) + _, err = s.Create("/fooDir", "", Permanent, 3, 1) if err != nil { t.Fatal("Cannot create /fooDir") } - e, err := fs.Get("/fooDir", false, false, 3, 1) + e, err := s.Get("/fooDir", false, false, 3, 1) if err != nil || e.Dir != true { t.Fatal("Cannot create /fooDir ") } // create a file under directory - _, err = fs.Create("/fooDir/bar", "bar", Permanent, 4, 1) + _, err = s.Create("/fooDir/bar", "bar", Permanent, 4, 1) if err != nil { t.Fatal("Cannot create /fooDir/bar = bar") @@ -56,21 +56,21 @@ func TestCreateAndGet(t *testing.T) { } func TestUpdateFile(t *testing.T) { - fs := New() + s := New() - _, err := fs.Create("/foo/bar", "bar", Permanent, 1, 1) + _, err := s.Create("/foo/bar", "bar", Permanent, 1, 1) if err != nil { t.Fatalf("cannot create %s=bar [%s]", "/foo/bar", err.Error()) } - _, err = fs.Update("/foo/bar", "barbar", Permanent, 2, 1) + _, err = s.Update("/foo/bar", "barbar", Permanent, 2, 1) if err != nil { t.Fatalf("cannot update %s=barbar [%s]", "/foo/bar", err.Error()) } - e, err := fs.Get("/foo/bar", false, false, 2, 1) + e, err := s.Get("/foo/bar", false, false, 2, 1) if err != nil { t.Fatalf("cannot get %s [%s]", "/foo/bar", err.Error()) @@ -82,37 +82,37 @@ func TestUpdateFile(t *testing.T) { // create a directory, update its ttl, to see if it will be deleted - _, err = fs.Create("/foo/foo", "", Permanent, 3, 1) + _, err = s.Create("/foo/foo", "", Permanent, 3, 1) if err != nil { t.Fatalf("cannot create dir [%s] [%s]", "/foo/foo", err.Error()) } - _, err = fs.Create("/foo/foo/foo1", "bar1", Permanent, 4, 1) + _, err = s.Create("/foo/foo/foo1", "bar1", Permanent, 4, 1) if err != nil { t.Fatal("cannot create [%s]", err.Error()) } - _, err = fs.Create("/foo/foo/foo2", "", Permanent, 5, 1) + _, err = s.Create("/foo/foo/foo2", "", Permanent, 5, 1) if err != nil { t.Fatal("cannot create [%s]", err.Error()) } - _, err = fs.Create("/foo/foo/foo2/boo", "boo1", Permanent, 6, 1) + _, err = s.Create("/foo/foo/foo2/boo", "boo1", Permanent, 6, 1) if err != nil { t.Fatal("cannot create [%s]", err.Error()) } expire := time.Now().Add(time.Second * 2) - _, err = fs.Update("/foo/foo", "", expire, 7, 1) + _, err = s.Update("/foo/foo", "", expire, 7, 1) if err != nil { t.Fatalf("cannot update dir [%s] [%s]", "/foo/foo", err.Error()) } // sleep 50ms, it should still reach the node time.Sleep(time.Microsecond * 50) - e, err = fs.Get("/foo/foo", true, false, 7, 1) + e, err = s.Get("/foo/foo", true, false, 7, 1) if err != nil || e.Key != "/foo/foo" { t.Fatalf("cannot get dir before expiration [%s]", err.Error()) @@ -132,23 +132,23 @@ func TestUpdateFile(t *testing.T) { // wait for expiration time.Sleep(time.Second * 3) - e, err = fs.Get("/foo/foo", true, false, 7, 1) + e, err = s.Get("/foo/foo", true, false, 7, 1) if err == nil { t.Fatal("still can get dir after expiration [%s]") } - _, err = fs.Get("/foo/foo/foo1", true, false, 7, 1) + _, err = s.Get("/foo/foo/foo1", true, false, 7, 1) if err == nil { t.Fatal("still can get sub node after expiration [%s]") } - _, err = fs.Get("/foo/foo/foo2", true, false, 7, 1) + _, err = s.Get("/foo/foo/foo2", true, false, 7, 1) if err == nil { t.Fatal("still can get sub dir after expiration [%s]") } - _, err = fs.Get("/foo/foo/foo2/boo", true, false, 7, 1) + _, err = s.Get("/foo/foo/foo2/boo", true, false, 7, 1) if err == nil { t.Fatalf("still can get sub node of sub dir after expiration [%s]", err.Error()) } @@ -156,17 +156,17 @@ func TestUpdateFile(t *testing.T) { } func TestListDirectory(t *testing.T) { - fs := New() + s := New() // create dir /foo // set key-value /foo/foo=bar - fs.Create("/foo/foo", "bar", Permanent, 1, 1) + s.Create("/foo/foo", "bar", Permanent, 1, 1) // create dir /foo/fooDir // set key-value /foo/fooDir/foo=bar - fs.Create("/foo/fooDir/foo", "bar", Permanent, 2, 1) + s.Create("/foo/fooDir/foo", "bar", Permanent, 2, 1) - e, err := fs.Get("/foo", true, false, 2, 1) + e, err := s.Get("/foo", true, false, 2, 1) if err != nil { t.Fatalf("%v", err) @@ -191,9 +191,9 @@ func TestListDirectory(t *testing.T) { // create dir /foo/_hidden // set key-value /foo/_hidden/foo -> bar - fs.Create("/foo/_hidden/foo", "bar", Permanent, 3, 1) + s.Create("/foo/_hidden/foo", "bar", Permanent, 3, 1) - e, _ = fs.Get("/foo", false, false, 2, 1) + e, _ = s.Get("/foo", false, false, 2, 1) if len(e.KVPairs) != 2 { t.Fatalf("hidden node is not hidden! %s", e.KVPairs[2].Key) @@ -201,38 +201,38 @@ func TestListDirectory(t *testing.T) { } func TestRemove(t *testing.T) { - fs := New() + s := New() - fs.Create("/foo", "bar", Permanent, 1, 1) - _, err := fs.Delete("/foo", false, 1, 1) + s.Create("/foo", "bar", Permanent, 1, 1) + _, err := s.Delete("/foo", false, 1, 1) if err != nil { t.Fatalf("cannot delete %s [%s]", "/foo", err.Error()) } - _, err = fs.Get("/foo", false, false, 1, 1) + _, err = s.Get("/foo", false, false, 1, 1) if err == nil || err.Error() != "Key Not Found" { t.Fatalf("can get the node after deletion") } - fs.Create("/foo/bar", "bar", Permanent, 1, 1) - fs.Create("/foo/car", "car", Permanent, 1, 1) - fs.Create("/foo/dar/dar", "dar", Permanent, 1, 1) + s.Create("/foo/bar", "bar", Permanent, 1, 1) + s.Create("/foo/car", "car", Permanent, 1, 1) + s.Create("/foo/dar/dar", "dar", Permanent, 1, 1) - _, err = fs.Delete("/foo", false, 1, 1) + _, err = s.Delete("/foo", false, 1, 1) if err == nil { t.Fatalf("should not be able to delete a directory without recursive") } - _, err = fs.Delete("/foo", true, 1, 1) + _, err = s.Delete("/foo", true, 1, 1) if err != nil { t.Fatalf("cannot delete %s [%s]", "/foo", err.Error()) } - _, err = fs.Get("/foo", false, false, 1, 1) + _, err = s.Get("/foo", false, false, 1, 1) if err == nil || err.Error() != "Key Not Found" { t.Fatalf("can get the node after deletion ") @@ -241,13 +241,13 @@ func TestRemove(t *testing.T) { } func TestExpire(t *testing.T) { - fs := New() + s := New() expire := time.Now().Add(time.Second) - fs.Create("/foo", "bar", expire, 1, 1) + s.Create("/foo", "bar", expire, 1, 1) - _, err := fs.InternalGet("/foo", 1, 1) + _, err := s.InternalGet("/foo", 1, 1) if err != nil { t.Fatalf("can not get the node") @@ -255,7 +255,7 @@ func TestExpire(t *testing.T) { time.Sleep(time.Second * 2) - _, err = fs.InternalGet("/foo", 1, 1) + _, err = s.InternalGet("/foo", 1, 1) if err == nil { t.Fatalf("can get the node after expiration time") @@ -263,10 +263,10 @@ func TestExpire(t *testing.T) { // test if we can reach the node before expiration expire = time.Now().Add(time.Second) - fs.Create("/foo", "bar", expire, 1, 1) + s.Create("/foo", "bar", expire, 1, 1) time.Sleep(time.Millisecond * 50) - _, err = fs.InternalGet("/foo", 1, 1) + _, err = s.InternalGet("/foo", 1, 1) if err != nil { t.Fatalf("cannot get the node before expiration", err.Error()) @@ -274,8 +274,8 @@ func TestExpire(t *testing.T) { expire = time.Now().Add(time.Second) - fs.Create("/foo", "bar", expire, 1, 1) - _, err = fs.Delete("/foo", false, 1, 1) + s.Create("/foo", "bar", expire, 1, 1) + _, err = s.Delete("/foo", false, 1, 1) if err != nil { t.Fatalf("cannot delete the node before expiration", err.Error()) @@ -284,17 +284,17 @@ func TestExpire(t *testing.T) { } func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ? - fs := New() - fs.Create("/foo", "bar", Permanent, 1, 1) + s := New() + s.Create("/foo", "bar", Permanent, 1, 1) // test on wrong previous value - _, err := fs.TestAndSet("/foo", "barbar", 0, "car", Permanent, 2, 1) + _, err := s.TestAndSet("/foo", "barbar", 0, "car", Permanent, 2, 1) if err == nil { t.Fatal("test and set should fail barbar != bar") } // test on value - e, err := fs.TestAndSet("/foo", "bar", 0, "car", Permanent, 3, 1) + e, err := s.TestAndSet("/foo", "bar", 0, "car", Permanent, 3, 1) if err != nil { t.Fatal("test and set should succeed bar == bar") @@ -305,7 +305,7 @@ func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ? } // test on index - e, err = fs.TestAndSet("/foo", "", 3, "bar", Permanent, 4, 1) + e, err = s.TestAndSet("/foo", "", 3, "bar", Permanent, 4, 1) if err != nil { t.Fatal("test and set should succeed index 3 == 3") @@ -318,61 +318,61 @@ func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ? } func TestWatch(t *testing.T) { - fs := New() + s := New() // watch at a deeper path - c, _ := fs.WatcherHub.watch("/foo/foo/foo", false, 0) - fs.Create("/foo/foo/foo", "bar", Permanent, 1, 1) + c, _ := s.WatcherHub.watch("/foo/foo/foo", false, 0) + s.Create("/foo/foo/foo", "bar", Permanent, 1, 1) e := nonblockingRetrive(c) if e.Key != "/foo/foo/foo" { t.Fatal("watch for Create node fails") } - c, _ = fs.WatcherHub.watch("/foo/foo/foo", false, 0) - fs.Update("/foo/foo/foo", "car", Permanent, 2, 1) + c, _ = s.WatcherHub.watch("/foo/foo/foo", false, 0) + s.Update("/foo/foo/foo", "car", Permanent, 2, 1) e = nonblockingRetrive(c) if e.Key != "/foo/foo/foo" { t.Fatal("watch for Update node fails") } - c, _ = fs.WatcherHub.watch("/foo/foo/foo", false, 0) - fs.TestAndSet("/foo/foo/foo", "car", 0, "bar", Permanent, 3, 1) + c, _ = s.WatcherHub.watch("/foo/foo/foo", false, 0) + s.TestAndSet("/foo/foo/foo", "car", 0, "bar", Permanent, 3, 1) e = nonblockingRetrive(c) if e.Key != "/foo/foo/foo" { t.Fatal("watch for TestAndSet node fails") } - c, _ = fs.WatcherHub.watch("/foo/foo/foo", false, 0) - fs.Delete("/foo", true, 4, 1) //recursively delete + c, _ = s.WatcherHub.watch("/foo/foo/foo", false, 0) + s.Delete("/foo", true, 4, 1) //recursively delete e = nonblockingRetrive(c) if e.Key != "/foo" { t.Fatal("watch for Delete node fails") } // watch at a prefix - c, _ = fs.WatcherHub.watch("/foo", true, 0) - fs.Create("/foo/foo/boo", "bar", Permanent, 5, 1) + c, _ = s.WatcherHub.watch("/foo", true, 0) + s.Create("/foo/foo/boo", "bar", Permanent, 5, 1) e = nonblockingRetrive(c) if e.Key != "/foo/foo/boo" { t.Fatal("watch for Create subdirectory fails") } - c, _ = fs.WatcherHub.watch("/foo", true, 0) - fs.Update("/foo/foo/boo", "foo", Permanent, 6, 1) + c, _ = s.WatcherHub.watch("/foo", true, 0) + s.Update("/foo/foo/boo", "foo", Permanent, 6, 1) e = nonblockingRetrive(c) if e.Key != "/foo/foo/boo" { t.Fatal("watch for Update subdirectory fails") } - c, _ = fs.WatcherHub.watch("/foo", true, 0) - fs.TestAndSet("/foo/foo/boo", "foo", 0, "bar", Permanent, 7, 1) + c, _ = s.WatcherHub.watch("/foo", true, 0) + s.TestAndSet("/foo/foo/boo", "foo", 0, "bar", Permanent, 7, 1) e = nonblockingRetrive(c) if e.Key != "/foo/foo/boo" { t.Fatal("watch for TestAndSet subdirectory fails") } - c, _ = fs.WatcherHub.watch("/foo", true, 0) - fs.Delete("/foo/foo/boo", false, 8, 1) + c, _ = s.WatcherHub.watch("/foo", true, 0) + s.Delete("/foo/foo/boo", false, 8, 1) e = nonblockingRetrive(c) if e.Key != "/foo/foo/boo" { t.Fatal("watch for Delete subdirectory fails") @@ -381,14 +381,14 @@ func TestWatch(t *testing.T) { } func TestSort(t *testing.T) { - fs := New() + s := New() // simulating random creation keys := GenKeys(80, 4) i := uint64(1) for _, k := range keys { - _, err := fs.Create(k, "bar", Permanent, i, 1) + _, err := s.Create(k, "bar", Permanent, i, 1) if err != nil { panic(err) } else { @@ -396,7 +396,7 @@ func TestSort(t *testing.T) { } } - e, err := fs.Get("/foo", true, true, i, 1) + e, err := s.Get("/foo", true, true, i, 1) if err != nil { t.Fatalf("get dir nodes failed [%s]", err.Error()) } @@ -419,14 +419,14 @@ func TestSort(t *testing.T) { } func TestSaveAndRecover(t *testing.T) { - fs := New() + s := New() // simulating random creation keys := GenKeys(8, 4) i := uint64(1) for _, k := range keys { - _, err := fs.Create(k, "bar", Permanent, i, 1) + _, err := s.Create(k, "bar", Permanent, i, 1) if err != nil { panic(err) } else { @@ -438,8 +438,8 @@ func TestSaveAndRecover(t *testing.T) { // test if we can reach the node before expiration expire := time.Now().Add(time.Second) - fs.Create("/foo/foo", "bar", expire, 1, 1) - b, err := fs.Save() + s.Create("/foo/foo", "bar", expire, 1, 1) + b, err := s.Save() cloneFs := New() time.Sleep(time.Second) @@ -453,18 +453,18 @@ func TestSaveAndRecover(t *testing.T) { } } - if fs.WatcherHub.EventHistory.StartIndex != cloneFs.WatcherHub.EventHistory.StartIndex { + if s.WatcherHub.EventHistory.StartIndex != cloneFs.WatcherHub.EventHistory.StartIndex { t.Fatal("Error recovered event history start index") } - for i = 0; int(i) < fs.WatcherHub.EventHistory.Queue.Size; i++ { - if fs.WatcherHub.EventHistory.Queue.Events[i].Key != + for i = 0; int(i) < s.WatcherHub.EventHistory.Queue.Size; i++ { + if s.WatcherHub.EventHistory.Queue.Events[i].Key != cloneFs.WatcherHub.EventHistory.Queue.Events[i].Key { t.Fatal("Error recovered event history") } } - _, err = fs.Get("/foo/foo", false, false, 1, 1) + _, err = s.Get("/foo/foo", false, false, 1, 1) if err == nil || err.Error() != "Key Not Found" { t.Fatalf("can get the node after deletion ") @@ -488,14 +488,14 @@ func GenKeys(num int, depth int) []string { return keys } -func createAndGet(fs *FileSystem, path string, t *testing.T) { - _, err := fs.Create(path, "bar", Permanent, 1, 1) +func createAndGet(s *Store, path string, t *testing.T) { + _, err := s.Create(path, "bar", Permanent, 1, 1) if err != nil { t.Fatalf("cannot create %s=bar [%s]", path, err.Error()) } - e, err := fs.Get(path, false, false, 1, 1) + e, err := s.Get(path, false, false, 1, 1) if err != nil { t.Fatalf("cannot get %s [%s]", path, err.Error()) diff --git a/file_system/watcher.go b/store/watcher.go similarity index 84% rename from file_system/watcher.go rename to store/watcher.go index 30fc89da7..1e946e3c0 100644 --- a/file_system/watcher.go +++ b/store/watcher.go @@ -1,16 +1,16 @@ -package fileSystem +package store import ( "container/list" "path" "strings" + "sync/atomic" ) type watcherHub struct { watchers map[string]*list.List - count uint64 // current number of watchers + count int64 // current number of watchers. EventHistory *EventHistory - Stats *EtcdStats } type watcher struct { @@ -19,11 +19,10 @@ type watcher struct { sinceIndex uint64 } -func newWatchHub(capacity int, stats *EtcdStats) *watcherHub { +func newWatchHub(capacity int) *watcherHub { return &watcherHub{ watchers: make(map[string]*list.List), EventHistory: newEventHistory(capacity), - Stats: stats, } } @@ -37,13 +36,9 @@ func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan e, err := wh.EventHistory.scan(prefix, index) if err != nil { - wh.Stats.IncStats(StatsWatchMiss) return nil, err } - wh.Stats.IncStats(StatsWatchHit) - wh.Stats.IncStats(StatsInWatchingNum) - if e != nil { eventChan <- e return eventChan, nil @@ -66,6 +61,8 @@ func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan wh.watchers[prefix] = l } + atomic.AddInt64(&wh.count, 1) + return eventChan, nil } @@ -94,10 +91,8 @@ func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) { if (w.recursive || force || e.Key == path) && e.Index >= w.sinceIndex { w.eventChan <- e - wh.Stats.rwlock.Lock() // lock the InWatchingNum - wh.Stats.InWatchingNum-- - wh.Stats.rwlock.Unlock() l.Remove(curr) + atomic.AddInt64(&wh.count, -1) } else { notifiedAll = false } diff --git a/file_system/watcher_test.go b/store/watcher_test.go similarity index 92% rename from file_system/watcher_test.go rename to store/watcher_test.go index 132f367f3..90c23c59e 100644 --- a/file_system/watcher_test.go +++ b/store/watcher_test.go @@ -1,12 +1,12 @@ -package fileSystem +package store import ( "testing" ) func TestWatcher(t *testing.T) { - fs := New() - wh := fs.WatcherHub + s := New() + wh := s.WatcherHub c, err := wh.watch("/foo", true, 0) if err != nil { diff --git a/util.go b/util.go index 960d12f4a..434de1a24 100644 --- a/util.go +++ b/util.go @@ -15,7 +15,7 @@ import ( "time" etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/file_system" + "github.com/coreos/etcd/store" "github.com/coreos/etcd/web" "github.com/coreos/go-raft" ) @@ -30,12 +30,12 @@ func durationToExpireTime(strDuration string) (time.Time, error) { duration, err := strconv.Atoi(strDuration) if err != nil { - return fileSystem.Permanent, err + return store.Permanent, err } return time.Now().Add(time.Second * (time.Duration)(duration)), nil } else { - return fileSystem.Permanent, nil + return store.Permanent, nil } }