clean up from yifan

release-0.4
Xiang Li 2013-09-28 16:26:19 -07:00
parent a568c6dc75
commit da83ee223b
19 changed files with 443 additions and 586 deletions

View File

@ -9,7 +9,7 @@ import (
"time"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/file_system"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
)
@ -39,7 +39,7 @@ func (c *CreateCommand) CommandName() string {
// Create node
func (c *CreateCommand) Apply(server *raft.Server) (interface{}, error) {
e, err := etcdFs.Create(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term())
e, err := etcdStore.Create(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term())
if err != nil {
debug(err)
@ -63,7 +63,7 @@ func (c *UpdateCommand) CommandName() string {
// Update node
func (c *UpdateCommand) Apply(server *raft.Server) (interface{}, error) {
e, err := etcdFs.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term())
e, err := etcdStore.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term())
if err != nil {
debug(err)
@ -89,7 +89,7 @@ func (c *TestAndSetCommand) CommandName() string {
// Set the key-value pair if the current value of the key equals to the given prevValue
func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) {
e, err := etcdFs.TestAndSet(c.Key, c.PrevValue, c.PrevIndex,
e, err := etcdStore.TestAndSet(c.Key, c.PrevValue, c.PrevIndex,
c.Value, c.ExpireTime, server.CommitIndex(), server.Term())
if err != nil {
@ -114,7 +114,7 @@ func (c *GetCommand) CommandName() string {
// Get the value of key
func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) {
e, err := etcdFs.Get(c.Key, c.Recursive, c.Sorted, server.CommitIndex(), server.Term())
e, err := etcdStore.Get(c.Key, c.Recursive, c.Sorted, server.CommitIndex(), server.Term())
if err != nil {
debug(err)
@ -137,7 +137,7 @@ func (c *DeleteCommand) CommandName() string {
// Delete the key
func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) {
e, err := etcdFs.Delete(c.Key, c.Recursive, server.CommitIndex(), server.Term())
e, err := etcdStore.Delete(c.Key, c.Recursive, server.CommitIndex(), server.Term())
if err != nil {
debug(err)
@ -160,7 +160,7 @@ func (c *WatchCommand) CommandName() string {
}
func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
eventChan, err := etcdFs.Watch(c.Key, c.Recursive, c.SinceIndex, server.CommitIndex(), server.Term())
eventChan, err := etcdStore.Watch(c.Key, c.Recursive, c.SinceIndex, server.CommitIndex(), server.Term())
if err != nil {
return nil, err
@ -197,7 +197,7 @@ func (c *JoinCommand) CommandName() string {
func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
// check if the join command is from a previous machine, who lost all its previous log.
e, _ := etcdFs.Get(path.Join("/_etcd/machines", c.Name), false, false, raftServer.CommitIndex(), raftServer.Term())
e, _ := etcdStore.Get(path.Join("/_etcd/machines", c.Name), false, false, raftServer.CommitIndex(), raftServer.Term())
b := make([]byte, 8)
binary.PutUvarint(b, raftServer.CommitIndex())
@ -221,7 +221,7 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
// add machine in etcd storage
key := path.Join("_etcd/machines", c.Name)
value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion)
etcdFs.Create(key, value, fileSystem.Permanent, raftServer.CommitIndex(), raftServer.Term())
etcdStore.Create(key, value, store.Permanent, raftServer.CommitIndex(), raftServer.Term())
if c.Name != r.Name() { // do not add self to the peer list
r.peersStats[c.Name] = &raftPeerStats{MinLatency: 1 << 63}
@ -250,7 +250,7 @@ func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) {
// remove machine in etcd storage
key := path.Join("_etcd/machines", c.Name)
_, err := etcdFs.Delete(key, false, raftServer.CommitIndex(), raftServer.Term())
_, err := etcdStore.Delete(key, false, raftServer.CommitIndex(), raftServer.Term())
delete(r.peersStats, c.Name)
if err != nil {

View File

@ -32,27 +32,27 @@ func init() {
errors = make(map[int]string)
// command related errors
errors[100] = "Key Not Found"
errors[101] = "Test Failed" //test and set
errors[102] = "Not A File"
errors[103] = "Reached the max number of machines in the cluster"
errors[104] = "Not A Directory"
errors[105] = "Already exists" // create
errors[106] = "The prefix of given key is a keyword in etcd"
errors[EcodeKeyNotFound] = "Key Not Found"
errors[EcodeTestFailed] = "Test Failed" //test and set
errors[EcodeNotFile] = "Not A File"
errors[EcodeNoMoreMachine] = "Reached the max number of machines in the cluster"
errors[EcodeNotDir] = "Not A Directory"
errors[EcodeNodeExist] = "Already exists" // create
errors[EcodeKeyIsPreserved] = "The prefix of given key is a keyword in etcd"
// Post form related errors
errors[200] = "Value is Required in POST form"
errors[201] = "PrevValue is Required in POST form"
errors[202] = "The given TTL in POST form is not a number"
errors[203] = "The given index in POST form is not a number"
errors[EcodeValueRequired] = "Value is Required in POST form"
errors[EcodePrevValueRequired] = "PrevValue is Required in POST form"
errors[EcodeTTLNaN] = "The given TTL in POST form is not a number"
errors[EcodeIndexNaN] = "The given index in POST form is not a number"
// raft related errors
errors[300] = "Raft Internal Error"
errors[301] = "During Leader Election"
errors[EcodeRaftInternal] = "Raft Internal Error"
errors[EcodeLeaderElect] = "During Leader Election"
// etcd related errors
errors[400] = "watcher is cleared due to etcd recovery"
errors[401] = "The event in requested index is outdated and cleared"
errors[EcodeWatcherCleared] = "watcher is cleared due to etcd recovery"
errors[EcodeEventIndexCleared] = "The event in requested index is outdated and cleared"
}

View File

@ -10,7 +10,7 @@ import (
"strings"
"time"
"github.com/coreos/etcd/file_system"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
)
@ -136,7 +136,7 @@ type TLSConfig struct {
//
//------------------------------------------------------------------------------
var etcdFs *fileSystem.FileSystem
var etcdStore *store.Store
//------------------------------------------------------------------------------
//
@ -204,7 +204,7 @@ func main() {
info := getInfo(dirPath)
// Create etcd key-value store
etcdFs = fileSystem.New()
etcdStore = store.New()
snapConf = newSnapshotConf()

View File

@ -7,7 +7,7 @@ import (
"strings"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/file_system"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
)
@ -120,7 +120,7 @@ func UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error {
}
// TODO: update should give at least one option
if value == "" && expireTime.Sub(fileSystem.Permanent) == 0 {
if value == "" && expireTime.Sub(store.Permanent) == 0 {
return nil
}
@ -223,8 +223,7 @@ func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
// Handler to return the basic stats of etcd
func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
w.WriteHeader(http.StatusOK)
//w.Write(etcdStore.Stats())
w.Write(etcdFs.Stats.GetStats())
w.Write(etcdStore.JsonStats())
w.Write(r.Stats())
return nil
}

View File

@ -1,146 +0,0 @@
package fileSystem
import (
"encoding/json"
"sync"
)
const (
// Operations that will be running serializely
StatsSetsHit = 100
StatsSetsMiss = 101
StatsDeletesHit = 102
StatsDeletesMiss = 103
StatsUpdatesHit = 104
StatsUpdatesMiss = 105
StatsTestAndSetsHit = 106
StatsTestAndSetsMiss = 107
StatsRecoveryHit = 108
StatsRecoveryMiss = 109
// concurrent operations
StatsGetsHit = 200
StatsGetsMiss = 201
StatsWatchHit = 300
StatsWatchMiss = 301
StatsInWatchingNum = 302
StatsSaveHit = 400
StatsSaveMiss = 401
)
type EtcdStats struct {
// Lock for synchronization
rwlock sync.RWMutex
// Number of get requests
GetsHit uint64 `json:"gets_hits"`
GetsMiss uint64 `json:"gets_misses"`
// Number of sets requests
SetsHit uint64 `json:"sets_hits"`
SetsMiss uint64 `json:"sets_misses"`
// Number of delete requests
DeletesHit uint64 `json:"deletes_hits"`
DeletesMiss uint64 `json:"deletes_misses"`
// Number of update requests
UpdatesHit uint64 `json:"updates_hits"`
UpdatesMiss uint64 `json:"updates_misses"`
// Number of testAndSet requests
TestAndSetsHit uint64 `json:"testAndSets_hits"`
TestAndSetsMiss uint64 `json:"testAndSets_misses"`
// Number of Watch requests
WatchHit uint64 `json:"watch_hit"`
WatchMiss uint64 `json:"watch_miss"`
InWatchingNum uint64 `json:"in_watching_number"`
// Number of save requests
SaveHit uint64 `json:"save_hit"`
SaveMiss uint64 `json:"save_miss"`
// Number of recovery requests
RecoveryHit uint64 `json:"recovery_hit"`
RecoveryMiss uint64 `json:"recovery_miss"`
}
func newStats() *EtcdStats {
e := new(EtcdStats)
return e
}
// Status() return the statistics info of etcd storage its recent start
func (e *EtcdStats) GetStats() []byte {
b, _ := json.Marshal(e)
return b
}
func (e *EtcdStats) TotalReads() uint64 {
return e.GetsHit + e.GetsMiss
}
func (e *EtcdStats) TotalWrites() uint64 {
return e.SetsHit + e.SetsMiss +
e.DeletesHit + e.DeletesMiss +
e.UpdatesHit + e.UpdatesMiss +
e.TestAndSetsHit + e.TestAndSetsMiss
}
func (e *EtcdStats) IncStats(field int) {
if field >= 200 {
e.rwlock.Lock()
switch field {
case StatsGetsHit:
e.GetsHit++
case StatsGetsMiss:
e.GetsMiss++
case StatsWatchHit:
e.WatchHit++
case StatsWatchMiss:
e.WatchMiss++
case StatsInWatchingNum:
e.InWatchingNum++
case StatsSaveHit:
e.SaveHit++
case StatsSaveMiss:
e.SaveMiss++
}
e.rwlock.Unlock()
} else {
e.rwlock.RLock()
switch field {
case StatsSetsHit:
e.SetsHit++
case StatsSetsMiss:
e.SetsMiss++
case StatsDeletesHit:
e.DeletesHit++
case StatsDeletesMiss:
e.DeletesMiss++
case StatsUpdatesHit:
e.UpdatesHit++
case StatsUpdatesMiss:
e.UpdatesMiss++
case StatsTestAndSetsHit:
e.TestAndSetsHit++
case StatsTestAndSetsMiss:
e.TestAndSetsMiss++
case StatsRecoveryHit:
e.RecoveryHit++
case StatsRecoveryMiss:
e.RecoveryMiss++
}
e.rwlock.RUnlock()
}
}

View File

@ -1,221 +0,0 @@
package fileSystem
import (
"math/rand"
"testing"
"time"
//"fmt"
)
func TestBasicStats(t *testing.T) {
fs := New()
keys := GenKeys(rand.Intn(100), 5)
i := uint64(0)
GetsHit := uint64(0)
GetsMiss := uint64(0)
SetsHit := uint64(0)
SetsMiss := uint64(0)
DeletesHit := uint64(0)
DeletesMiss := uint64(0)
UpdatesHit := uint64(0)
UpdatesMiss := uint64(0)
TestAndSetsHit := uint64(0)
TestAndSetsMiss := uint64(0)
WatchHit := uint64(0)
WatchMiss := uint64(0)
InWatchingNum := uint64(0)
SaveHit := uint64(0)
SaveMiss := uint64(0)
RecoveryHit := uint64(0)
RecoveryMiss := uint64(0)
for _, k := range keys {
i++
_, err := fs.Create(k, "bar", time.Now().Add(time.Second*time.Duration(rand.Intn(10))), i, 1)
if err != nil {
SetsMiss++
} else {
SetsHit++
}
}
for _, k := range keys {
_, err := fs.Get(k, false, false, i, 1)
if err != nil {
GetsMiss++
} else {
GetsHit++
}
}
for _, k := range keys {
i++
_, err := fs.Update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(5))), i, 1)
if err != nil {
UpdatesMiss++
} else {
UpdatesHit++
}
}
for _, k := range keys {
_, err := fs.Get(k, false, false, i, 1)
if err != nil {
GetsMiss++
} else {
GetsHit++
}
}
for _, k := range keys {
i++
_, err := fs.TestAndSet(k, "foo", 0, "bar", Permanent, i, 1)
if err != nil {
TestAndSetsMiss++
} else {
TestAndSetsHit++
}
}
//fmt.Printf("#TestAndSet [%d]\n", TestAndSetsHit)
for _, k := range keys {
_, err := fs.Watch(k, false, 0, i, 1)
if err != nil {
WatchMiss++
} else {
WatchHit++
InWatchingNum++
}
}
//fmt.Printf("#Watch [%d]\n", WatchHit)
for _, k := range keys {
_, err := fs.Get(k, false, false, i, 1)
if err != nil {
GetsMiss++
} else {
GetsHit++
}
}
//fmt.Println("fs.index ", fs.Index)
for j := 0; j < 5; j++ {
b := make([]byte, 10)
err := fs.Recovery(b)
if err != nil {
RecoveryMiss++
}
b, err = fs.Save()
if err != nil {
SaveMiss++
} else {
SaveHit++
}
err = fs.Recovery(b)
if err != nil {
RecoveryMiss++
} else {
RecoveryHit++
}
}
//fmt.Println("fs.index after ", fs.Index)
//fmt.Println("stats.inwatching ", fs.Stats.InWatchingNum)
for _, k := range keys {
i++
_, err := fs.Delete(k, false, i, 1)
if err != nil {
DeletesMiss++
} else {
InWatchingNum--
DeletesHit++
}
}
//fmt.Printf("#Delete [%d] stats.deletehit [%d] \n", DeletesHit, fs.Stats.DeletesHit)
for _, k := range keys {
_, err := fs.Get(k, false, false, i, 1)
if err != nil {
GetsMiss++
} else {
GetsHit++
}
}
if GetsHit != fs.Stats.GetsHit {
t.Fatalf("GetsHit [%d] != Stats.GetsHit [%d]", GetsHit, fs.Stats.GetsHit)
}
if GetsMiss != fs.Stats.GetsMiss {
t.Fatalf("GetsMiss [%d] != Stats.GetsMiss [%d]", GetsMiss, fs.Stats.GetsMiss)
}
if SetsHit != fs.Stats.SetsHit {
t.Fatalf("SetsHit [%d] != Stats.SetsHit [%d]", SetsHit, fs.Stats.SetsHit)
}
if SetsMiss != fs.Stats.SetsMiss {
t.Fatalf("SetsMiss [%d] != Stats.SetsMiss [%d]", SetsMiss, fs.Stats.SetsMiss)
}
if DeletesHit != fs.Stats.DeletesHit {
t.Fatalf("DeletesHit [%d] != Stats.DeletesHit [%d]", DeletesHit, fs.Stats.DeletesHit)
}
if DeletesMiss != fs.Stats.DeletesMiss {
t.Fatalf("DeletesMiss [%d] != Stats.DeletesMiss [%d]", DeletesMiss, fs.Stats.DeletesMiss)
}
if UpdatesHit != fs.Stats.UpdatesHit {
t.Fatalf("UpdatesHit [%d] != Stats.UpdatesHit [%d]", UpdatesHit, fs.Stats.UpdatesHit)
}
if UpdatesMiss != fs.Stats.UpdatesMiss {
t.Fatalf("UpdatesMiss [%d] != Stats.UpdatesMiss [%d]", UpdatesMiss, fs.Stats.UpdatesMiss)
}
if TestAndSetsHit != fs.Stats.TestAndSetsHit {
t.Fatalf("TestAndSetsHit [%d] != Stats.TestAndSetsHit [%d]", TestAndSetsHit, fs.Stats.TestAndSetsHit)
}
if TestAndSetsMiss != fs.Stats.TestAndSetsMiss {
t.Fatalf("TestAndSetsMiss [%d] != Stats.TestAndSetsMiss [%d]", TestAndSetsMiss, fs.Stats.TestAndSetsMiss)
}
if SaveHit != fs.Stats.SaveHit {
t.Fatalf("SaveHit [%d] != Stats.SaveHit [%d]", SaveHit, fs.Stats.SaveHit)
}
if SaveMiss != fs.Stats.SaveMiss {
t.Fatalf("SaveMiss [%d] != Stats.SaveMiss [%d]", SaveMiss, fs.Stats.SaveMiss)
}
if WatchHit != fs.Stats.WatchHit {
t.Fatalf("WatchHit [%d] != Stats.WatchHit [%d]", WatchHit, fs.Stats.WatchHit)
}
if WatchMiss != fs.Stats.WatchMiss {
t.Fatalf("WatchMiss [%d] != Stats.WatchMiss [%d]", WatchMiss, fs.Stats.WatchMiss)
}
if InWatchingNum != fs.Stats.InWatchingNum {
t.Fatalf("InWatchingNum [%d] != Stats.InWatchingNum [%d]", InWatchingNum, fs.Stats.InWatchingNum)
}
if RecoveryHit != fs.Stats.RecoveryHit {
t.Fatalf("RecoveryHit [%d] != Stats.RecoveryHit [%d]", RecoveryHit, fs.Stats.RecoveryHit)
}
if RecoveryMiss != fs.Stats.RecoveryMiss {
t.Fatalf("RecoveryMiss [%d] != Stats.RecoveryMiss [%d]", RecoveryMiss, fs.Stats.RecoveryMiss)
}
//fmt.Println(GetsHit, GetsMiss, SetsHit, SetsMiss, DeletesHit, DeletesMiss, UpdatesHit, UpdatesMiss, TestAndSetsHit, TestAndSetsMiss, WatchHit, WatchMiss, InWatchingNum, SaveHit, SaveMiss, RecoveryHit, RecoveryMiss)
}

View File

@ -2,7 +2,7 @@ package main
// machineNum returns the number of machines in the cluster
func machineNum() int {
e, err := etcdFs.Get("/_etcd/machines", false, false, r.CommitIndex(), r.Term())
e, err := etcdStore.Get("/_etcd/machines", false, false, r.CommitIndex(), r.Term())
if err != nil {
return 0

View File

@ -56,7 +56,7 @@ func readURL(nodeName string, urlName string) (string, bool) {
// convert nodeName to url from etcd storage
key := path.Join("/_etcd/machines", nodeName)
e, err := etcdFs.Get(key, false, false, r.CommitIndex(), r.Term())
e, err := etcdStore.Get(key, false, false, r.CommitIndex(), r.Term())
if err != nil {
return "", false

View File

@ -36,7 +36,7 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi
raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, ElectionTimeout)
// Create raft server
server, err := raft.NewServer(name, dirPath, raftTransporter, etcdFs, nil)
server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil)
check(err)

View File

@ -1,4 +1,4 @@
package fileSystem
package store
import (
"fmt"
@ -6,7 +6,7 @@ import (
"sync"
"time"
etcdErr "github.com/xiangli-cmu/etcd/error"
etcdErr "github.com/coreos/etcd/error"
)
const (

View File

@ -1,4 +1,4 @@
package fileSystem
package store
import (
"testing"

View File

@ -1,4 +1,4 @@
package fileSystem
package store
import (
"path"
@ -6,7 +6,7 @@ import (
"sync"
"time"
etcdErr "github.com/xiangli-cmu/etcd/error"
etcdErr "github.com/coreos/etcd/error"
)
var (

91
store/stats.go Normal file
View File

@ -0,0 +1,91 @@
package store
import (
"encoding/json"
"sync/atomic"
)
const (
SetSuccess = 100
SetFail = 101
DeleteSuccess = 102
DeleteFail = 103
UpdateSuccess = 104
UpdateFail = 105
TestAndSetSuccess = 106
TestAndSetFail = 107
GetSuccess = 110
GetFail = 111
)
type Stats struct {
// Number of get requests
GetSuccess uint64 `json:"getsSuccess"`
GetFail uint64 `json:"getsFail"`
// Number of sets requests
SetSuccess uint64 `json:"setsSuccess"`
SetFail uint64 `json:"setsFail"`
// Number of delete requests
DeleteSuccess uint64 `json:"deleteSuccess"`
DeleteFail uint64 `json:"deleteFail"`
// Number of update requests
UpdateSuccess uint64 `json:"updateSuccess"`
UpdateFail uint64 `json:"updateFail"`
// Number of testAndSet requests
TestAndSetSuccess uint64 `json:"testAndSetSuccess"`
TestAndSetFail uint64 `json:"testAndSetFail"`
Watchers uint64 `json:"watchers"`
}
func newStats() *Stats {
s := new(Stats)
return s
}
// Status() return the statistics info of etcd storage its recent start
func (s *Stats) toJson() []byte {
b, _ := json.Marshal(s)
return b
}
func (s *Stats) TotalReads() uint64 {
return s.GetSuccess + s.GetFail
}
func (s *Stats) TotalWrites() uint64 {
return s.SetSuccess + s.SetFail +
s.DeleteSuccess + s.DeleteFail +
s.TestAndSetSuccess + s.TestAndSetFail +
s.UpdateSuccess + s.UpdateFail
}
func (s *Stats) Inc(field int) {
switch field {
case SetSuccess:
atomic.AddUint64(&s.SetSuccess, 1)
case SetFail:
atomic.AddUint64(&s.SetFail, 1)
case DeleteSuccess:
atomic.AddUint64(&s.DeleteSuccess, 1)
case DeleteFail:
atomic.AddUint64(&s.DeleteFail, 1)
case GetSuccess:
atomic.AddUint64(&s.GetSuccess, 1)
case GetFail:
atomic.AddUint64(&s.GetFail, 1)
case UpdateSuccess:
atomic.AddUint64(&s.UpdateSuccess, 1)
case UpdateFail:
atomic.AddUint64(&s.UpdateFail, 1)
case TestAndSetSuccess:
atomic.AddUint64(&s.TestAndSetSuccess, 1)
case TestAndSetFail:
atomic.AddUint64(&s.TestAndSetFail, 1)
}
}

139
store/stats_test.go Normal file
View File

@ -0,0 +1,139 @@
package store
import (
"math/rand"
"testing"
"time"
)
func TestBasicStats(t *testing.T) {
s := New()
keys := GenKeys(rand.Intn(100), 5)
var i uint64
var GetSuccess, GetFail, SetSuccess, SetFail, DeleteSuccess, DeleteFail uint64
var UpdateSuccess, UpdateFail, TestAndSetSuccess, TestAndSetFail, watcher_number uint64
for _, k := range keys {
i++
_, err := s.Create(k, "bar", time.Now().Add(time.Second*time.Duration(rand.Intn(10))), i, 1)
if err != nil {
SetFail++
} else {
SetSuccess++
}
}
for _, k := range keys {
_, err := s.Get(k, false, false, i, 1)
if err != nil {
GetFail++
} else {
GetSuccess++
}
}
for _, k := range keys {
i++
_, err := s.Update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(5))), i, 1)
if err != nil {
UpdateFail++
} else {
UpdateSuccess++
}
}
for _, k := range keys {
_, err := s.Get(k, false, false, i, 1)
if err != nil {
GetFail++
} else {
GetSuccess++
}
}
for _, k := range keys {
i++
_, err := s.TestAndSet(k, "foo", 0, "bar", Permanent, i, 1)
if err != nil {
TestAndSetFail++
} else {
TestAndSetSuccess++
}
}
for _, k := range keys {
s.Watch(k, false, 0, i, 1)
watcher_number++
}
for _, k := range keys {
_, err := s.Get(k, false, false, i, 1)
if err != nil {
GetFail++
} else {
GetSuccess++
}
}
for _, k := range keys {
i++
_, err := s.Delete(k, false, i, 1)
if err != nil {
DeleteFail++
} else {
watcher_number--
DeleteSuccess++
}
}
for _, k := range keys {
_, err := s.Get(k, false, false, i, 1)
if err != nil {
GetFail++
} else {
GetSuccess++
}
}
if GetSuccess != s.Stats.GetSuccess {
t.Fatalf("GetSuccess [%d] != Stats.GetSuccess [%d]", GetSuccess, s.Stats.GetSuccess)
}
if GetFail != s.Stats.GetFail {
t.Fatalf("GetFail [%d] != Stats.GetFail [%d]", GetFail, s.Stats.GetFail)
}
if SetSuccess != s.Stats.SetSuccess {
t.Fatalf("SetSuccess [%d] != Stats.SetSuccess [%d]", SetSuccess, s.Stats.SetSuccess)
}
if SetFail != s.Stats.SetFail {
t.Fatalf("SetFail [%d] != Stats.SetFail [%d]", SetFail, s.Stats.SetFail)
}
if DeleteSuccess != s.Stats.DeleteSuccess {
t.Fatalf("DeleteSuccess [%d] != Stats.DeleteSuccess [%d]", DeleteSuccess, s.Stats.DeleteSuccess)
}
if DeleteFail != s.Stats.DeleteFail {
t.Fatalf("DeleteFail [%d] != Stats.DeleteFail [%d]", DeleteFail, s.Stats.DeleteFail)
}
if UpdateSuccess != s.Stats.UpdateSuccess {
t.Fatalf("UpdateSuccess [%d] != Stats.UpdateSuccess [%d]", UpdateSuccess, s.Stats.UpdateSuccess)
}
if UpdateFail != s.Stats.UpdateFail {
t.Fatalf("UpdateFail [%d] != Stats.UpdateFail [%d]", UpdateFail, s.Stats.UpdateFail)
}
if TestAndSetSuccess != s.Stats.TestAndSetSuccess {
t.Fatalf("TestAndSetSuccess [%d] != Stats.TestAndSetSuccess [%d]", TestAndSetSuccess, s.Stats.TestAndSetSuccess)
}
if TestAndSetFail != s.Stats.TestAndSetFail {
t.Fatalf("TestAndSetFail [%d] != Stats.TestAndSetFail [%d]", TestAndSetFail, s.Stats.TestAndSetFail)
}
}

View File

@ -1,4 +1,4 @@
package fileSystem
package store
import (
"encoding/json"
@ -8,34 +8,34 @@ import (
"strings"
"time"
etcdErr "github.com/xiangli-cmu/etcd/error"
etcdErr "github.com/coreos/etcd/error"
)
type FileSystem struct {
type Store struct {
Root *Node
WatcherHub *watcherHub
Index uint64
Term uint64
Stats *EtcdStats
Stats *Stats
}
func New() *FileSystem {
fs := new(FileSystem)
fs.Root = newDir("/", 0, 0, nil, "", Permanent)
fs.Stats = newStats()
fs.WatcherHub = newWatchHub(1000, fs.Stats)
func New() *Store {
s := new(Store)
s.Root = newDir("/", 0, 0, nil, "", Permanent)
s.Stats = newStats()
s.WatcherHub = newWatchHub(1000)
return fs
return s
}
func (fs *FileSystem) Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) {
func (s *Store) Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) {
nodePath = path.Clean(path.Join("/", nodePath))
n, err := fs.InternalGet(nodePath, index, term)
n, err := s.InternalGet(nodePath, index, term)
if err != nil {
fs.Stats.IncStats(StatsGetsMiss)
s.Stats.Inc(GetFail)
return nil, err
}
@ -82,61 +82,62 @@ func (fs *FileSystem) Get(nodePath string, recursive, sorted bool, index uint64,
e.TTL = int64(n.ExpireTime.Sub(time.Now())/time.Second) + 1
}
fs.Stats.IncStats(StatsGetsHit)
s.Stats.Inc(GetSuccess)
return e, nil
}
// Create function creates the Node at nodePath. Create will help to create intermediate directories with no ttl.
// If the node has already existed, create will fail.
// If any node on the path is a file, create will fail.
func (fs *FileSystem) Create(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
func (s *Store) Create(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
nodePath = path.Clean(path.Join("/", nodePath))
// make sure we can create the node
_, err := fs.InternalGet(nodePath, index, term)
_, err := s.InternalGet(nodePath, index, term)
if err == nil { // key already exists
fs.Stats.IncStats(StatsSetsMiss)
s.Stats.Inc(SetFail)
return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath)
}
etcdError, _ := err.(etcdErr.Error)
if etcdError.ErrorCode == 104 { // we cannot create the key due to meet a file while walking
fs.Stats.IncStats(StatsSetsMiss)
s.Stats.Inc(SetFail)
return nil, err
}
dir, _ := path.Split(nodePath)
// walk through the nodePath, create dirs and get the last directory node
d, err := fs.walk(dir, fs.checkDir)
d, err := s.walk(dir, s.checkDir)
if err != nil {
fs.Stats.IncStats(StatsSetsMiss)
s.Stats.Inc(SetFail)
return nil, err
}
e := newEvent(Create, nodePath, fs.Index, fs.Term)
e := newEvent(Create, nodePath, s.Index, s.Term)
var n *Node
if len(value) != 0 { // create file
e.Value = value
n = newFile(nodePath, value, fs.Index, fs.Term, d, "", expireTime)
n = newFile(nodePath, value, s.Index, s.Term, d, "", expireTime)
} else { // create directory
e.Dir = true
n = newDir(nodePath, fs.Index, fs.Term, d, "", expireTime)
n = newDir(nodePath, s.Index, s.Term, d, "", expireTime)
}
err = d.Add(n)
if err != nil {
fs.Stats.IncStats(StatsSetsMiss)
s.Stats.Inc(SetFail)
return nil, err
}
@ -147,28 +148,28 @@ func (fs *FileSystem) Create(nodePath string, value string, expireTime time.Time
e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1
}
fs.WatcherHub.notify(e)
fs.Stats.IncStats(StatsSetsHit)
s.WatcherHub.notify(e)
s.Stats.Inc(SetSuccess)
return e, nil
}
// Update function updates the value/ttl of the node.
// If the node is a file, the value and the ttl can be updated.
// If the node is a directory, only the ttl can be updated.
func (fs *FileSystem) Update(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
n, err := fs.InternalGet(nodePath, index, term)
func (s *Store) Update(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
n, err := s.InternalGet(nodePath, index, term)
if err != nil { // if the node does not exist, return error
fs.Stats.IncStats(StatsUpdatesMiss)
s.Stats.Inc(UpdateFail)
return nil, err
}
e := newEvent(Update, nodePath, fs.Index, fs.Term)
e := newEvent(Update, nodePath, s.Index, s.Term)
if n.IsDir() { // if the node is a directory, we can only update ttl
if len(value) != 0 {
fs.Stats.IncStats(StatsUpdatesMiss)
s.Stats.Inc(UpdateFail)
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath)
}
@ -194,23 +195,23 @@ func (fs *FileSystem) Update(nodePath string, value string, expireTime time.Time
e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1
}
fs.WatcherHub.notify(e)
fs.Stats.IncStats(StatsUpdatesHit)
s.WatcherHub.notify(e)
s.Stats.Inc(UpdateSuccess)
return e, nil
}
func (fs *FileSystem) TestAndSet(nodePath string, prevValue string, prevIndex uint64,
func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64,
value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
f, err := fs.InternalGet(nodePath, index, term)
f, err := s.InternalGet(nodePath, index, term)
if err != nil {
fs.Stats.IncStats(StatsTestAndSetsMiss)
s.Stats.Inc(TestAndSetFail)
return nil, err
}
if f.IsDir() { // can only test and set file
fs.Stats.IncStats(StatsTestAndSetsMiss)
s.Stats.Inc(TestAndSetFail)
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath)
}
@ -221,23 +222,23 @@ func (fs *FileSystem) TestAndSet(nodePath string, prevValue string, prevIndex ui
e.Value = value
f.Write(value, index, term)
fs.WatcherHub.notify(e)
fs.Stats.IncStats(StatsTestAndSetsHit)
s.WatcherHub.notify(e)
s.Stats.Inc(TestAndSetSuccess)
return e, nil
}
cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, f.Value, prevIndex, f.ModifiedIndex)
fs.Stats.IncStats(StatsTestAndSetsMiss)
s.Stats.Inc(TestAndSetFail)
return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause)
}
// Delete function deletes the node at the given path.
// If the node is a directory, recursive must be true to delete it.
func (fs *FileSystem) Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) {
n, err := fs.InternalGet(nodePath, index, term)
func (s *Store) Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) {
n, err := s.InternalGet(nodePath, index, term)
if err != nil { // if the node does not exist, return error
fs.Stats.IncStats(StatsDeletesMiss)
s.Stats.Inc(DeleteFail)
return nil, err
}
@ -250,37 +251,37 @@ func (fs *FileSystem) Delete(nodePath string, recursive bool, index uint64, term
}
callback := func(path string) { // notify function
fs.WatcherHub.notifyWithPath(e, path, true)
s.WatcherHub.notifyWithPath(e, path, true)
}
err = n.Remove(recursive, callback)
if err != nil {
fs.Stats.IncStats(StatsDeletesMiss)
s.Stats.Inc(DeleteFail)
return nil, err
}
fs.WatcherHub.notify(e)
fs.Stats.IncStats(StatsDeletesHit)
s.WatcherHub.notify(e)
s.Stats.Inc(DeleteSuccess)
return e, nil
}
func (fs *FileSystem) Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error) {
fs.Index, fs.Term = index, term
func (s *Store) Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error) {
s.Index, s.Term = index, term
if sinceIndex == 0 {
return fs.WatcherHub.watch(prefix, recursive, index+1)
return s.WatcherHub.watch(prefix, recursive, index+1)
}
return fs.WatcherHub.watch(prefix, recursive, sinceIndex)
return s.WatcherHub.watch(prefix, recursive, sinceIndex)
}
// walk function walks all the nodePath and apply the walkFunc on each directory
func (fs *FileSystem) walk(nodePath string, walkFunc func(prev *Node, component string) (*Node, error)) (*Node, error) {
func (s *Store) walk(nodePath string, walkFunc func(prev *Node, component string) (*Node, error)) (*Node, error) {
components := strings.Split(nodePath, "/")
curr := fs.Root
curr := s.Root
var err error
for i := 1; i < len(components); i++ {
@ -299,11 +300,11 @@ func (fs *FileSystem) walk(nodePath string, walkFunc func(prev *Node, component
}
// InternalGet function get the node of the given nodePath.
func (fs *FileSystem) InternalGet(nodePath string, index uint64, term uint64) (*Node, error) {
func (s *Store) InternalGet(nodePath string, index uint64, term uint64) (*Node, error) {
nodePath = path.Clean(path.Join("/", nodePath))
// update file system known index and term
fs.Index, fs.Term = index, term
s.Index, s.Term = index, term
walkFunc := func(parent *Node, name string) (*Node, error) {
@ -319,7 +320,7 @@ func (fs *FileSystem) InternalGet(nodePath string, index uint64, term uint64) (*
return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name))
}
f, err := fs.walk(nodePath, walkFunc)
f, err := s.walk(nodePath, walkFunc)
if err != nil {
return nil, err
@ -332,14 +333,14 @@ func (fs *FileSystem) InternalGet(nodePath string, index uint64, term uint64) (*
// If it is a directory, this function will return the pointer to that node.
// If it does not exist, this function will create a new directory and return the pointer to that node.
// If it is a file, this function will return error.
func (fs *FileSystem) checkDir(parent *Node, dirName string) (*Node, error) {
func (s *Store) checkDir(parent *Node, dirName string) (*Node, error) {
subDir, ok := parent.Children[dirName]
if ok {
return subDir, nil
}
n := newDir(path.Join(parent.Path, dirName), fs.Index, fs.Term, parent, parent.ACL, Permanent)
n := newDir(path.Join(parent.Path, dirName), s.Index, s.Term, parent, parent.ACL, Permanent)
parent.Children[dirName] = n
@ -350,24 +351,20 @@ func (fs *FileSystem) checkDir(parent *Node, dirName string) (*Node, error) {
// Save function will not be able to save the state of watchers.
// Save function will not save the parent field of the node. Or there will
// be cyclic dependencies issue for the json package.
func (fs *FileSystem) Save() ([]byte, error) {
func (s *Store) Save() ([]byte, error) {
clonedStore := New()
clonedStore.Root = s.Root.Clone()
clonedStore.WatcherHub = s.WatcherHub
clonedStore.Index = s.Index
clonedStore.Term = s.Term
clonedStore.Stats = s.Stats
fs.Stats.IncStats(StatsSaveHit)
cloneFs := New()
cloneFs.Root = fs.Root.Clone()
b, err := json.Marshal(fs)
b, err := json.Marshal(clonedStore)
if err != nil {
fs.Stats.IncStats(StatsSaveMiss)
fs.Stats.rwlock.Lock()
fs.Stats.SaveHit-- // restore the savehit
fs.Stats.rwlock.Unlock()
return nil, err
}
fs.Stats.IncStats(StatsSaveHit)
return b, nil
}
@ -375,15 +372,18 @@ func (fs *FileSystem) Save() ([]byte, error) {
// It needs to recovery the parent field of the nodes.
// It needs to delete the expired nodes since the saved time and also
// need to create monitor go routines.
func (fs *FileSystem) Recovery(state []byte) error {
err := json.Unmarshal(state, fs)
func (s *Store) Recovery(state []byte) error {
err := json.Unmarshal(state, s)
if err != nil {
fs.Stats.IncStats(StatsRecoveryMiss)
return err
}
fs.Root.recoverAndclean()
fs.Stats.IncStats(StatsRecoveryHit)
s.Root.recoverAndclean()
return nil
}
func (s *Store) JsonStats() []byte {
s.Stats.Watchers = uint64(s.WatcherHub.count)
return s.Stats.toJson()
}

View File

@ -1,4 +1,4 @@
package fileSystem
package store
import (
"math/rand"
@ -8,46 +8,46 @@ import (
)
func TestCreateAndGet(t *testing.T) {
fs := New()
s := New()
fs.Create("/foobar", "bar", Permanent, 1, 1)
s.Create("/foobar", "bar", Permanent, 1, 1)
// already exist, create should fail
_, err := fs.Create("/foobar", "bar", Permanent, 1, 1)
_, err := s.Create("/foobar", "bar", Permanent, 1, 1)
if err == nil {
t.Fatal("Create should fail")
}
fs.Delete("/foobar", true, 1, 1)
s.Delete("/foobar", true, 1, 1)
// this should create successfully
createAndGet(fs, "/foobar", t)
createAndGet(fs, "/foo/bar", t)
createAndGet(fs, "/foo/foo/bar", t)
createAndGet(s, "/foobar", t)
createAndGet(s, "/foo/bar", t)
createAndGet(s, "/foo/foo/bar", t)
// meet file, create should fail
_, err = fs.Create("/foo/bar/bar", "bar", Permanent, 2, 1)
_, err = s.Create("/foo/bar/bar", "bar", Permanent, 2, 1)
if err == nil {
t.Fatal("Create should fail")
}
// create a directory
_, err = fs.Create("/fooDir", "", Permanent, 3, 1)
_, err = s.Create("/fooDir", "", Permanent, 3, 1)
if err != nil {
t.Fatal("Cannot create /fooDir")
}
e, err := fs.Get("/fooDir", false, false, 3, 1)
e, err := s.Get("/fooDir", false, false, 3, 1)
if err != nil || e.Dir != true {
t.Fatal("Cannot create /fooDir ")
}
// create a file under directory
_, err = fs.Create("/fooDir/bar", "bar", Permanent, 4, 1)
_, err = s.Create("/fooDir/bar", "bar", Permanent, 4, 1)
if err != nil {
t.Fatal("Cannot create /fooDir/bar = bar")
@ -56,21 +56,21 @@ func TestCreateAndGet(t *testing.T) {
}
func TestUpdateFile(t *testing.T) {
fs := New()
s := New()
_, err := fs.Create("/foo/bar", "bar", Permanent, 1, 1)
_, err := s.Create("/foo/bar", "bar", Permanent, 1, 1)
if err != nil {
t.Fatalf("cannot create %s=bar [%s]", "/foo/bar", err.Error())
}
_, err = fs.Update("/foo/bar", "barbar", Permanent, 2, 1)
_, err = s.Update("/foo/bar", "barbar", Permanent, 2, 1)
if err != nil {
t.Fatalf("cannot update %s=barbar [%s]", "/foo/bar", err.Error())
}
e, err := fs.Get("/foo/bar", false, false, 2, 1)
e, err := s.Get("/foo/bar", false, false, 2, 1)
if err != nil {
t.Fatalf("cannot get %s [%s]", "/foo/bar", err.Error())
@ -82,37 +82,37 @@ func TestUpdateFile(t *testing.T) {
// create a directory, update its ttl, to see if it will be deleted
_, err = fs.Create("/foo/foo", "", Permanent, 3, 1)
_, err = s.Create("/foo/foo", "", Permanent, 3, 1)
if err != nil {
t.Fatalf("cannot create dir [%s] [%s]", "/foo/foo", err.Error())
}
_, err = fs.Create("/foo/foo/foo1", "bar1", Permanent, 4, 1)
_, err = s.Create("/foo/foo/foo1", "bar1", Permanent, 4, 1)
if err != nil {
t.Fatal("cannot create [%s]", err.Error())
}
_, err = fs.Create("/foo/foo/foo2", "", Permanent, 5, 1)
_, err = s.Create("/foo/foo/foo2", "", Permanent, 5, 1)
if err != nil {
t.Fatal("cannot create [%s]", err.Error())
}
_, err = fs.Create("/foo/foo/foo2/boo", "boo1", Permanent, 6, 1)
_, err = s.Create("/foo/foo/foo2/boo", "boo1", Permanent, 6, 1)
if err != nil {
t.Fatal("cannot create [%s]", err.Error())
}
expire := time.Now().Add(time.Second * 2)
_, err = fs.Update("/foo/foo", "", expire, 7, 1)
_, err = s.Update("/foo/foo", "", expire, 7, 1)
if err != nil {
t.Fatalf("cannot update dir [%s] [%s]", "/foo/foo", err.Error())
}
// sleep 50ms, it should still reach the node
time.Sleep(time.Microsecond * 50)
e, err = fs.Get("/foo/foo", true, false, 7, 1)
e, err = s.Get("/foo/foo", true, false, 7, 1)
if err != nil || e.Key != "/foo/foo" {
t.Fatalf("cannot get dir before expiration [%s]", err.Error())
@ -132,23 +132,23 @@ func TestUpdateFile(t *testing.T) {
// wait for expiration
time.Sleep(time.Second * 3)
e, err = fs.Get("/foo/foo", true, false, 7, 1)
e, err = s.Get("/foo/foo", true, false, 7, 1)
if err == nil {
t.Fatal("still can get dir after expiration [%s]")
}
_, err = fs.Get("/foo/foo/foo1", true, false, 7, 1)
_, err = s.Get("/foo/foo/foo1", true, false, 7, 1)
if err == nil {
t.Fatal("still can get sub node after expiration [%s]")
}
_, err = fs.Get("/foo/foo/foo2", true, false, 7, 1)
_, err = s.Get("/foo/foo/foo2", true, false, 7, 1)
if err == nil {
t.Fatal("still can get sub dir after expiration [%s]")
}
_, err = fs.Get("/foo/foo/foo2/boo", true, false, 7, 1)
_, err = s.Get("/foo/foo/foo2/boo", true, false, 7, 1)
if err == nil {
t.Fatalf("still can get sub node of sub dir after expiration [%s]", err.Error())
}
@ -156,17 +156,17 @@ func TestUpdateFile(t *testing.T) {
}
func TestListDirectory(t *testing.T) {
fs := New()
s := New()
// create dir /foo
// set key-value /foo/foo=bar
fs.Create("/foo/foo", "bar", Permanent, 1, 1)
s.Create("/foo/foo", "bar", Permanent, 1, 1)
// create dir /foo/fooDir
// set key-value /foo/fooDir/foo=bar
fs.Create("/foo/fooDir/foo", "bar", Permanent, 2, 1)
s.Create("/foo/fooDir/foo", "bar", Permanent, 2, 1)
e, err := fs.Get("/foo", true, false, 2, 1)
e, err := s.Get("/foo", true, false, 2, 1)
if err != nil {
t.Fatalf("%v", err)
@ -191,9 +191,9 @@ func TestListDirectory(t *testing.T) {
// create dir /foo/_hidden
// set key-value /foo/_hidden/foo -> bar
fs.Create("/foo/_hidden/foo", "bar", Permanent, 3, 1)
s.Create("/foo/_hidden/foo", "bar", Permanent, 3, 1)
e, _ = fs.Get("/foo", false, false, 2, 1)
e, _ = s.Get("/foo", false, false, 2, 1)
if len(e.KVPairs) != 2 {
t.Fatalf("hidden node is not hidden! %s", e.KVPairs[2].Key)
@ -201,38 +201,38 @@ func TestListDirectory(t *testing.T) {
}
func TestRemove(t *testing.T) {
fs := New()
s := New()
fs.Create("/foo", "bar", Permanent, 1, 1)
_, err := fs.Delete("/foo", false, 1, 1)
s.Create("/foo", "bar", Permanent, 1, 1)
_, err := s.Delete("/foo", false, 1, 1)
if err != nil {
t.Fatalf("cannot delete %s [%s]", "/foo", err.Error())
}
_, err = fs.Get("/foo", false, false, 1, 1)
_, err = s.Get("/foo", false, false, 1, 1)
if err == nil || err.Error() != "Key Not Found" {
t.Fatalf("can get the node after deletion")
}
fs.Create("/foo/bar", "bar", Permanent, 1, 1)
fs.Create("/foo/car", "car", Permanent, 1, 1)
fs.Create("/foo/dar/dar", "dar", Permanent, 1, 1)
s.Create("/foo/bar", "bar", Permanent, 1, 1)
s.Create("/foo/car", "car", Permanent, 1, 1)
s.Create("/foo/dar/dar", "dar", Permanent, 1, 1)
_, err = fs.Delete("/foo", false, 1, 1)
_, err = s.Delete("/foo", false, 1, 1)
if err == nil {
t.Fatalf("should not be able to delete a directory without recursive")
}
_, err = fs.Delete("/foo", true, 1, 1)
_, err = s.Delete("/foo", true, 1, 1)
if err != nil {
t.Fatalf("cannot delete %s [%s]", "/foo", err.Error())
}
_, err = fs.Get("/foo", false, false, 1, 1)
_, err = s.Get("/foo", false, false, 1, 1)
if err == nil || err.Error() != "Key Not Found" {
t.Fatalf("can get the node after deletion ")
@ -241,13 +241,13 @@ func TestRemove(t *testing.T) {
}
func TestExpire(t *testing.T) {
fs := New()
s := New()
expire := time.Now().Add(time.Second)
fs.Create("/foo", "bar", expire, 1, 1)
s.Create("/foo", "bar", expire, 1, 1)
_, err := fs.InternalGet("/foo", 1, 1)
_, err := s.InternalGet("/foo", 1, 1)
if err != nil {
t.Fatalf("can not get the node")
@ -255,7 +255,7 @@ func TestExpire(t *testing.T) {
time.Sleep(time.Second * 2)
_, err = fs.InternalGet("/foo", 1, 1)
_, err = s.InternalGet("/foo", 1, 1)
if err == nil {
t.Fatalf("can get the node after expiration time")
@ -263,10 +263,10 @@ func TestExpire(t *testing.T) {
// test if we can reach the node before expiration
expire = time.Now().Add(time.Second)
fs.Create("/foo", "bar", expire, 1, 1)
s.Create("/foo", "bar", expire, 1, 1)
time.Sleep(time.Millisecond * 50)
_, err = fs.InternalGet("/foo", 1, 1)
_, err = s.InternalGet("/foo", 1, 1)
if err != nil {
t.Fatalf("cannot get the node before expiration", err.Error())
@ -274,8 +274,8 @@ func TestExpire(t *testing.T) {
expire = time.Now().Add(time.Second)
fs.Create("/foo", "bar", expire, 1, 1)
_, err = fs.Delete("/foo", false, 1, 1)
s.Create("/foo", "bar", expire, 1, 1)
_, err = s.Delete("/foo", false, 1, 1)
if err != nil {
t.Fatalf("cannot delete the node before expiration", err.Error())
@ -284,17 +284,17 @@ func TestExpire(t *testing.T) {
}
func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ?
fs := New()
fs.Create("/foo", "bar", Permanent, 1, 1)
s := New()
s.Create("/foo", "bar", Permanent, 1, 1)
// test on wrong previous value
_, err := fs.TestAndSet("/foo", "barbar", 0, "car", Permanent, 2, 1)
_, err := s.TestAndSet("/foo", "barbar", 0, "car", Permanent, 2, 1)
if err == nil {
t.Fatal("test and set should fail barbar != bar")
}
// test on value
e, err := fs.TestAndSet("/foo", "bar", 0, "car", Permanent, 3, 1)
e, err := s.TestAndSet("/foo", "bar", 0, "car", Permanent, 3, 1)
if err != nil {
t.Fatal("test and set should succeed bar == bar")
@ -305,7 +305,7 @@ func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ?
}
// test on index
e, err = fs.TestAndSet("/foo", "", 3, "bar", Permanent, 4, 1)
e, err = s.TestAndSet("/foo", "", 3, "bar", Permanent, 4, 1)
if err != nil {
t.Fatal("test and set should succeed index 3 == 3")
@ -318,61 +318,61 @@ func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ?
}
func TestWatch(t *testing.T) {
fs := New()
s := New()
// watch at a deeper path
c, _ := fs.WatcherHub.watch("/foo/foo/foo", false, 0)
fs.Create("/foo/foo/foo", "bar", Permanent, 1, 1)
c, _ := s.WatcherHub.watch("/foo/foo/foo", false, 0)
s.Create("/foo/foo/foo", "bar", Permanent, 1, 1)
e := nonblockingRetrive(c)
if e.Key != "/foo/foo/foo" {
t.Fatal("watch for Create node fails")
}
c, _ = fs.WatcherHub.watch("/foo/foo/foo", false, 0)
fs.Update("/foo/foo/foo", "car", Permanent, 2, 1)
c, _ = s.WatcherHub.watch("/foo/foo/foo", false, 0)
s.Update("/foo/foo/foo", "car", Permanent, 2, 1)
e = nonblockingRetrive(c)
if e.Key != "/foo/foo/foo" {
t.Fatal("watch for Update node fails")
}
c, _ = fs.WatcherHub.watch("/foo/foo/foo", false, 0)
fs.TestAndSet("/foo/foo/foo", "car", 0, "bar", Permanent, 3, 1)
c, _ = s.WatcherHub.watch("/foo/foo/foo", false, 0)
s.TestAndSet("/foo/foo/foo", "car", 0, "bar", Permanent, 3, 1)
e = nonblockingRetrive(c)
if e.Key != "/foo/foo/foo" {
t.Fatal("watch for TestAndSet node fails")
}
c, _ = fs.WatcherHub.watch("/foo/foo/foo", false, 0)
fs.Delete("/foo", true, 4, 1) //recursively delete
c, _ = s.WatcherHub.watch("/foo/foo/foo", false, 0)
s.Delete("/foo", true, 4, 1) //recursively delete
e = nonblockingRetrive(c)
if e.Key != "/foo" {
t.Fatal("watch for Delete node fails")
}
// watch at a prefix
c, _ = fs.WatcherHub.watch("/foo", true, 0)
fs.Create("/foo/foo/boo", "bar", Permanent, 5, 1)
c, _ = s.WatcherHub.watch("/foo", true, 0)
s.Create("/foo/foo/boo", "bar", Permanent, 5, 1)
e = nonblockingRetrive(c)
if e.Key != "/foo/foo/boo" {
t.Fatal("watch for Create subdirectory fails")
}
c, _ = fs.WatcherHub.watch("/foo", true, 0)
fs.Update("/foo/foo/boo", "foo", Permanent, 6, 1)
c, _ = s.WatcherHub.watch("/foo", true, 0)
s.Update("/foo/foo/boo", "foo", Permanent, 6, 1)
e = nonblockingRetrive(c)
if e.Key != "/foo/foo/boo" {
t.Fatal("watch for Update subdirectory fails")
}
c, _ = fs.WatcherHub.watch("/foo", true, 0)
fs.TestAndSet("/foo/foo/boo", "foo", 0, "bar", Permanent, 7, 1)
c, _ = s.WatcherHub.watch("/foo", true, 0)
s.TestAndSet("/foo/foo/boo", "foo", 0, "bar", Permanent, 7, 1)
e = nonblockingRetrive(c)
if e.Key != "/foo/foo/boo" {
t.Fatal("watch for TestAndSet subdirectory fails")
}
c, _ = fs.WatcherHub.watch("/foo", true, 0)
fs.Delete("/foo/foo/boo", false, 8, 1)
c, _ = s.WatcherHub.watch("/foo", true, 0)
s.Delete("/foo/foo/boo", false, 8, 1)
e = nonblockingRetrive(c)
if e.Key != "/foo/foo/boo" {
t.Fatal("watch for Delete subdirectory fails")
@ -381,14 +381,14 @@ func TestWatch(t *testing.T) {
}
func TestSort(t *testing.T) {
fs := New()
s := New()
// simulating random creation
keys := GenKeys(80, 4)
i := uint64(1)
for _, k := range keys {
_, err := fs.Create(k, "bar", Permanent, i, 1)
_, err := s.Create(k, "bar", Permanent, i, 1)
if err != nil {
panic(err)
} else {
@ -396,7 +396,7 @@ func TestSort(t *testing.T) {
}
}
e, err := fs.Get("/foo", true, true, i, 1)
e, err := s.Get("/foo", true, true, i, 1)
if err != nil {
t.Fatalf("get dir nodes failed [%s]", err.Error())
}
@ -419,14 +419,14 @@ func TestSort(t *testing.T) {
}
func TestSaveAndRecover(t *testing.T) {
fs := New()
s := New()
// simulating random creation
keys := GenKeys(8, 4)
i := uint64(1)
for _, k := range keys {
_, err := fs.Create(k, "bar", Permanent, i, 1)
_, err := s.Create(k, "bar", Permanent, i, 1)
if err != nil {
panic(err)
} else {
@ -438,8 +438,8 @@ func TestSaveAndRecover(t *testing.T) {
// test if we can reach the node before expiration
expire := time.Now().Add(time.Second)
fs.Create("/foo/foo", "bar", expire, 1, 1)
b, err := fs.Save()
s.Create("/foo/foo", "bar", expire, 1, 1)
b, err := s.Save()
cloneFs := New()
time.Sleep(time.Second)
@ -453,18 +453,18 @@ func TestSaveAndRecover(t *testing.T) {
}
}
if fs.WatcherHub.EventHistory.StartIndex != cloneFs.WatcherHub.EventHistory.StartIndex {
if s.WatcherHub.EventHistory.StartIndex != cloneFs.WatcherHub.EventHistory.StartIndex {
t.Fatal("Error recovered event history start index")
}
for i = 0; int(i) < fs.WatcherHub.EventHistory.Queue.Size; i++ {
if fs.WatcherHub.EventHistory.Queue.Events[i].Key !=
for i = 0; int(i) < s.WatcherHub.EventHistory.Queue.Size; i++ {
if s.WatcherHub.EventHistory.Queue.Events[i].Key !=
cloneFs.WatcherHub.EventHistory.Queue.Events[i].Key {
t.Fatal("Error recovered event history")
}
}
_, err = fs.Get("/foo/foo", false, false, 1, 1)
_, err = s.Get("/foo/foo", false, false, 1, 1)
if err == nil || err.Error() != "Key Not Found" {
t.Fatalf("can get the node after deletion ")
@ -488,14 +488,14 @@ func GenKeys(num int, depth int) []string {
return keys
}
func createAndGet(fs *FileSystem, path string, t *testing.T) {
_, err := fs.Create(path, "bar", Permanent, 1, 1)
func createAndGet(s *Store, path string, t *testing.T) {
_, err := s.Create(path, "bar", Permanent, 1, 1)
if err != nil {
t.Fatalf("cannot create %s=bar [%s]", path, err.Error())
}
e, err := fs.Get(path, false, false, 1, 1)
e, err := s.Get(path, false, false, 1, 1)
if err != nil {
t.Fatalf("cannot get %s [%s]", path, err.Error())

View File

@ -1,16 +1,16 @@
package fileSystem
package store
import (
"container/list"
"path"
"strings"
"sync/atomic"
)
type watcherHub struct {
watchers map[string]*list.List
count uint64 // current number of watchers
count int64 // current number of watchers.
EventHistory *EventHistory
Stats *EtcdStats
}
type watcher struct {
@ -19,11 +19,10 @@ type watcher struct {
sinceIndex uint64
}
func newWatchHub(capacity int, stats *EtcdStats) *watcherHub {
func newWatchHub(capacity int) *watcherHub {
return &watcherHub{
watchers: make(map[string]*list.List),
EventHistory: newEventHistory(capacity),
Stats: stats,
}
}
@ -37,13 +36,9 @@ func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan
e, err := wh.EventHistory.scan(prefix, index)
if err != nil {
wh.Stats.IncStats(StatsWatchMiss)
return nil, err
}
wh.Stats.IncStats(StatsWatchHit)
wh.Stats.IncStats(StatsInWatchingNum)
if e != nil {
eventChan <- e
return eventChan, nil
@ -66,6 +61,8 @@ func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan
wh.watchers[prefix] = l
}
atomic.AddInt64(&wh.count, 1)
return eventChan, nil
}
@ -94,10 +91,8 @@ func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) {
if (w.recursive || force || e.Key == path) && e.Index >= w.sinceIndex {
w.eventChan <- e
wh.Stats.rwlock.Lock() // lock the InWatchingNum
wh.Stats.InWatchingNum--
wh.Stats.rwlock.Unlock()
l.Remove(curr)
atomic.AddInt64(&wh.count, -1)
} else {
notifiedAll = false
}

View File

@ -1,12 +1,12 @@
package fileSystem
package store
import (
"testing"
)
func TestWatcher(t *testing.T) {
fs := New()
wh := fs.WatcherHub
s := New()
wh := s.WatcherHub
c, err := wh.watch("/foo", true, 0)
if err != nil {

View File

@ -15,7 +15,7 @@ import (
"time"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/file_system"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/web"
"github.com/coreos/go-raft"
)
@ -30,12 +30,12 @@ func durationToExpireTime(strDuration string) (time.Time, error) {
duration, err := strconv.Atoi(strDuration)
if err != nil {
return fileSystem.Permanent, err
return store.Permanent, err
}
return time.Now().Add(time.Second * (time.Duration)(duration)), nil
} else {
return fileSystem.Permanent, nil
return store.Permanent, nil
}
}