diff --git a/etcd.go b/etcd.go index 20ab45392..22c5a1729 100644 --- a/etcd.go +++ b/etcd.go @@ -48,6 +48,8 @@ var dirPath string var ignore bool +var responseBufferSize int + func init() { flag.BoolVar(&verbose, "v", false, "verbose logging") diff --git a/store/store.go b/store/store.go index fbd580bdc..aa22e4dc8 100644 --- a/store/store.go +++ b/store/store.go @@ -5,6 +5,7 @@ import ( "fmt" "path" "time" + "strconv" ) // global store @@ -28,8 +29,13 @@ type Store struct { // now we use it to send changes to the hub of the web service messager *chan string - // previous responses - Responses [1024]Response + // + ResponseMap map[string]Response + + // + ResponseMaxSize int + + ResponseCurrSize uint // at some point, we may need to compact the Response ResponseStartIndex uint64 @@ -71,12 +77,15 @@ func init() { s = createStore() s.messager = nil s.ResponseStartIndex = 0 + s.ResponseMaxSize = 1024 + s.ResponseCurrSize = 0 } // make a new stroe func createStore() *Store { s := new(Store) s.Nodes = make(map[string]Node) + s.ResponseMap = make(map[string]Response) s.ResponseStartIndex = 0 return s } @@ -155,7 +164,7 @@ func Set(key string, value string, expireTime time.Time, index uint64) ([]byte, *s.messager <- string(msg) } - s.Responses[index - s.ResponseStartIndex] = resp + updateMap(index, &resp) return msg, err @@ -182,10 +191,8 @@ func Set(key string, value string, expireTime time.Time, index uint64) ([]byte, *s.messager <- string(msg) } - - s.Responses[index - s.ResponseStartIndex] = resp - - fmt.Println(index - s.ResponseStartIndex) + + updateMap(index, &resp) return msg, err } } @@ -235,6 +242,30 @@ func expire(key string, update chan time.Time, expireTime time.Time) { } } +func updateMap(index uint64, resp *Response) { + + if s.ResponseMaxSize == 0 { + return + } + + strIndex := strconv.FormatUint(index, 10) + s.ResponseMap[strIndex] = *resp + + // unlimited + if s.ResponseMaxSize < 0{ + s.ResponseCurrSize++ + return + } + + if s.ResponseCurrSize == uint(s.ResponseMaxSize) { + s.ResponseStartIndex++ + delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10)) + } else { + s.ResponseCurrSize++ + } +} + + // get the value of the key func Get(key string) Response { key = path.Clean(key) @@ -296,13 +327,14 @@ func Delete(key string, index uint64) ([]byte, error) { *s.messager <- string(msg) } - s.Responses[index - s.ResponseStartIndex] = resp + updateMap(index, &resp) return msg, err } else { resp := Response{DELETE, key, "", "", false, time.Unix(0, 0), 0, index} - s.Responses[index - s.ResponseStartIndex] = resp + + updateMap(index, &resp) return json.Marshal(resp) } diff --git a/store/watcher.go b/store/watcher.go index 9df42eb4a..0f55a018e 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -2,8 +2,8 @@ package store import ( "path" + "strconv" "strings" - "fmt" ) type WatcherHub struct { @@ -41,7 +41,7 @@ func AddWatcher(prefix string, c chan Response, sinceIndex uint64) error { if sinceIndex != 0 && sinceIndex >= s.ResponseStartIndex { for i := sinceIndex; i <= s.Index; i++ { if check(prefix, i) { - c <- s.Responses[i] + c <- s.ResponseMap[strconv.FormatUint(i, 10)] return nil } } @@ -66,22 +66,23 @@ func AddWatcher(prefix string, c chan Response, sinceIndex uint64) error { return nil } -// check if the response has what we are waching +// check if the response has what we are watching func check(prefix string, index uint64) bool { - index = index - s.ResponseStartIndex + resp, ok := s.ResponseMap[strconv.FormatUint(index, 10)] - if index < 0 { + if !ok { + // not storage system command return false - } + } else { + path := resp.Key + if strings.HasPrefix(path, prefix) { + prefixLen := len(prefix) + if len(path) == prefixLen || path[prefixLen] == '/' { + return true + } - path := s.Responses[index].Key - if strings.HasPrefix(path, prefix) { - prefixLen := len(prefix) - if len(path) == prefixLen || path[prefixLen] == '/' { - return true } - } return false