etcd/store/store.go

663 lines
14 KiB
Go
Raw Normal View History

2013-06-18 22:14:22 +04:00
package store
import (
"encoding/json"
"fmt"
2013-08-18 07:41:15 +04:00
etcdErr "github.com/coreos/etcd/error"
2013-06-21 02:59:23 +04:00
"path"
2013-06-30 21:09:05 +04:00
"strconv"
"sync"
2013-08-06 03:06:23 +04:00
"time"
2013-06-21 02:59:23 +04:00
)
2013-06-18 22:14:22 +04:00
2013-07-10 00:14:12 +04:00
//------------------------------------------------------------------------------
//
// Typedefs
//
//------------------------------------------------------------------------------
2013-07-10 00:14:12 +04:00
// The main struct of the Key-Value store
2013-06-18 22:14:22 +04:00
type Store struct {
2013-07-10 00:14:12 +04:00
// key-value store structure
2013-07-03 23:57:23 +04:00
Tree *tree
2013-07-10 00:14:12 +04:00
// This mutex protects everything except add watcher member.
// Add watch member does not depend on the current state of the store.
// And watch will return when other protected function is called and reach
// the watching condition.
// It is needed so that clone() can atomically replicate the Store
// and do the log snapshot in a go routine.
mutex sync.RWMutex
2013-07-10 00:14:12 +04:00
// WatcherHub is where we register all the clients
// who issue a watch request
watcher *WatcherHub
// The string channel to send messages to the outside world
// Now we use it to send changes to the hub of the web service
2013-08-07 05:59:46 +04:00
messager chan<- string
2013-06-30 00:13:51 +04:00
2013-07-10 00:14:12 +04:00
// A map to keep the recent response to the clients
ResponseMap map[string]*Response
2013-06-30 21:09:05 +04:00
2013-07-10 00:14:12 +04:00
// The max number of the recent responses we can record
2013-06-30 21:09:05 +04:00
ResponseMaxSize int
2013-07-10 00:14:12 +04:00
// The current number of the recent responses we have recorded
2013-06-30 21:09:05 +04:00
ResponseCurrSize uint
2013-06-30 02:29:10 +04:00
2013-07-10 00:14:12 +04:00
// The index of the first recent responses we have
2013-06-30 02:29:10 +04:00
ResponseStartIndex uint64
2013-07-10 00:14:12 +04:00
// Current index of the raft machine
2013-06-30 02:29:10 +04:00
Index uint64
2013-08-02 23:31:35 +04:00
// Basic statistics information of etcd storage
BasicStats EtcdStats
2013-06-18 22:14:22 +04:00
}
2013-07-10 00:14:12 +04:00
// A Node represents a Value in the Key-Value pair in the store
// It has its value, expire time and a channel used to update the
// expire time (since we do countdown in a go routine, we need to
// communicate with it via channel)
2013-06-18 22:14:22 +04:00
type Node struct {
2013-07-10 00:14:12 +04:00
// The string value of the node
2013-06-21 02:59:23 +04:00
Value string `json:"value"`
2013-07-10 00:14:12 +04:00
// If the node is a permanent one the ExprieTime will be Unix(0,0)
// Otherwise after the expireTime, the node will be deleted
2013-06-18 22:14:22 +04:00
ExpireTime time.Time `json:"expireTime"`
2013-07-10 00:14:12 +04:00
// A channel to update the expireTime of the node
2013-06-18 22:14:22 +04:00
update chan time.Time `json:"-"`
}
2013-07-10 00:14:12 +04:00
// The response from the store to the user who issue a command
2013-06-18 22:14:22 +04:00
type Response struct {
2013-07-10 00:14:12 +04:00
Action string `json:"action"`
Key string `json:"key"`
2013-07-12 21:10:56 +04:00
Dir bool `json:"dir,omitempty"`
2013-07-12 19:41:28 +04:00
PrevValue string `json:"prevValue,omitempty"`
Value string `json:"value,omitempty"`
2013-07-16 21:40:41 +04:00
// If the key did not exist before the action,
2013-07-12 21:10:56 +04:00
// this field should be set to true
2013-07-16 21:44:09 +04:00
NewKey bool `json:"newKey,omitempty"`
2013-07-12 19:41:28 +04:00
Expiration *time.Time `json:"expiration,omitempty"`
2013-06-29 23:49:05 +04:00
2013-07-10 00:14:12 +04:00
// Time to live in second
2013-07-12 19:41:28 +04:00
TTL int64 `json:"ttl,omitempty"`
2013-06-30 00:48:13 +04:00
2013-07-10 00:14:12 +04:00
// The command index of the raft machine when the command is executed
2013-06-29 23:49:05 +04:00
Index uint64 `json:"index"`
2013-06-18 22:14:22 +04:00
}
2013-07-10 00:14:12 +04:00
// A listNode represent the simplest Key-Value pair with its type
// It is only used when do list opeartion
// We want to have a file system like store, thus we distingush "file"
// and "directory"
2013-07-06 03:22:45 +04:00
type ListNode struct {
2013-07-10 00:14:12 +04:00
Key string
Value string
Type string
2013-07-06 03:22:45 +04:00
}
2013-07-10 00:14:12 +04:00
var PERMANENT = time.Unix(0, 0)
//------------------------------------------------------------------------------
//
// Methods
//
//------------------------------------------------------------------------------
// Create a new stroe
// Arguement max is the max number of response we want to record
2013-07-01 03:30:41 +04:00
func CreateStore(max int) *Store {
2013-07-10 00:14:12 +04:00
s := new(Store)
2013-07-01 03:30:41 +04:00
s.messager = nil
2013-07-10 00:14:12 +04:00
s.ResponseMap = make(map[string]*Response)
2013-06-30 02:29:10 +04:00
s.ResponseStartIndex = 0
2013-07-01 03:30:41 +04:00
s.ResponseMaxSize = max
s.ResponseCurrSize = 0
2013-07-03 23:57:23 +04:00
2013-07-10 00:14:12 +04:00
s.Tree = &tree{
&treeNode{
Node{
2013-07-03 23:57:23 +04:00
"/",
2013-07-10 00:14:12 +04:00
time.Unix(0, 0),
2013-07-03 23:57:23 +04:00
nil,
},
2013-07-10 00:14:12 +04:00
true,
2013-07-03 23:57:23 +04:00
make(map[string]*treeNode),
},
2013-07-10 00:14:12 +04:00
}
2013-07-03 23:57:23 +04:00
2013-08-05 04:17:40 +04:00
s.watcher = newWatcherHub()
2013-06-18 22:14:22 +04:00
return s
}
2013-07-10 00:14:12 +04:00
// Set the messager of the store
2013-08-07 05:59:46 +04:00
func (s *Store) SetMessager(messager chan<- string) {
2013-06-19 02:04:30 +04:00
s.messager = messager
2013-06-21 02:59:23 +04:00
}
2013-06-19 02:04:30 +04:00
2013-07-10 00:14:12 +04:00
func (s *Store) Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.internalSet(key, value, expireTime, index)
}
2013-06-18 22:14:22 +04:00
// Set the key to value with expiration time
func (s *Store) internalSet(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
2013-07-10 00:14:12 +04:00
//Update index
2013-06-30 02:29:10 +04:00
s.Index = index
2013-06-18 22:14:22 +04:00
2013-08-02 23:31:35 +04:00
//Update stats
s.BasicStats.Sets++
2013-07-10 00:14:12 +04:00
key = path.Clean("/" + key)
2013-06-18 22:14:22 +04:00
2013-07-10 00:14:12 +04:00
isExpire := !expireTime.Equal(PERMANENT)
2013-06-18 22:14:22 +04:00
2013-07-12 19:41:28 +04:00
// base response
resp := Response{
2013-07-16 21:40:41 +04:00
Action: "SET",
2013-07-16 21:44:09 +04:00
Key: key,
Value: value,
Index: index,
2013-07-12 19:41:28 +04:00
}
2013-07-10 00:14:12 +04:00
// When the slow follower receive the set command
// the key may be expired, we should not add the node
// also if the node exist, we need to delete the node
2013-06-18 22:14:22 +04:00
if isExpire && expireTime.Sub(time.Now()) < 0 {
return s.internalDelete(key, index)
2013-06-18 22:14:22 +04:00
}
2013-06-30 00:48:13 +04:00
var TTL int64
2013-07-10 00:14:12 +04:00
// Update ttl
2013-06-30 00:48:13 +04:00
if isExpire {
TTL = int64(expireTime.Sub(time.Now()) / time.Second)
2013-07-12 19:41:28 +04:00
resp.Expiration = &expireTime
2013-07-16 21:40:41 +04:00
resp.TTL = TTL
}
2013-06-30 00:48:13 +04:00
2013-07-10 00:14:12 +04:00
// Get the node
2013-07-03 23:57:23 +04:00
node, ok := s.Tree.get(key)
2013-06-18 22:14:22 +04:00
if ok {
2013-07-10 00:14:12 +04:00
// Update when node exists
// Node is not permanent
if !node.ExpireTime.Equal(PERMANENT) {
2013-06-19 02:04:30 +04:00
2013-07-10 00:14:12 +04:00
// If node is not permanent
// Update its expireTime
2013-06-21 02:59:23 +04:00
node.update <- expireTime
2013-06-18 22:14:22 +04:00
} else {
2013-07-10 00:14:12 +04:00
// If we want the permanent node to have expire time
2013-08-07 06:14:56 +04:00
// We need to create a go routine with a channel
2013-06-18 22:14:22 +04:00
if isExpire {
node.update = make(chan time.Time)
2013-07-10 00:14:12 +04:00
go s.monitorExpiration(key, node.update, expireTime)
2013-06-18 22:14:22 +04:00
}
2013-07-10 00:14:12 +04:00
2013-06-18 22:14:22 +04:00
}
2013-07-10 00:14:12 +04:00
// Update the information of the node
2013-07-03 23:57:23 +04:00
s.Tree.set(key, Node{value, expireTime, node.update})
2013-06-21 02:59:23 +04:00
2013-07-12 19:41:28 +04:00
resp.PrevValue = node.Value
2013-06-19 02:04:30 +04:00
2013-07-10 00:14:12 +04:00
s.watcher.notify(resp)
2013-07-10 00:14:12 +04:00
msg, err := json.Marshal(resp)
2013-07-10 00:14:12 +04:00
// Send to the messager
2013-06-21 02:59:23 +04:00
if s.messager != nil && err == nil {
2013-08-07 05:59:46 +04:00
s.messager <- string(msg)
2013-06-21 02:59:23 +04:00
}
2013-06-19 02:04:30 +04:00
2013-07-10 00:14:12 +04:00
s.addToResponseMap(index, &resp)
2013-06-30 02:29:10 +04:00
2013-06-19 02:04:30 +04:00
return msg, err
2013-06-18 22:14:22 +04:00
2013-07-10 00:14:12 +04:00
// Add new node
2013-06-18 22:14:22 +04:00
} else {
update := make(chan time.Time)
2013-07-12 19:41:28 +04:00
ok := s.Tree.set(key, Node{value, expireTime, update})
if !ok {
2013-08-18 07:41:15 +04:00
return nil, etcdErr.NewError(102, "set: "+key)
2013-07-12 19:41:28 +04:00
}
2013-06-18 22:14:22 +04:00
if isExpire {
2013-07-10 00:14:12 +04:00
go s.monitorExpiration(key, update, expireTime)
2013-06-18 22:14:22 +04:00
}
2013-06-19 02:04:30 +04:00
2013-07-12 19:41:28 +04:00
resp.NewKey = true
msg, err := json.Marshal(resp)
2013-07-10 00:14:12 +04:00
// Nofity the watcher
s.watcher.notify(resp)
2013-06-19 02:04:30 +04:00
2013-07-10 00:14:12 +04:00
// Send to the messager
2013-06-21 02:59:23 +04:00
if s.messager != nil && err == nil {
2013-08-07 05:59:46 +04:00
s.messager <- string(msg)
2013-06-21 02:59:23 +04:00
}
2013-06-30 21:09:05 +04:00
2013-07-10 00:14:12 +04:00
s.addToResponseMap(index, &resp)
2013-06-19 02:04:30 +04:00
return msg, err
2013-06-18 22:14:22 +04:00
}
2013-07-13 03:36:21 +04:00
2013-06-18 22:14:22 +04:00
}
2013-07-12 19:41:28 +04:00
// Get the value of the key and return the raw response
func (s *Store) internalGet(key string) *Response {
2013-07-10 00:14:12 +04:00
key = path.Clean("/" + key)
2013-06-18 22:14:22 +04:00
2013-07-03 23:57:23 +04:00
node, ok := s.Tree.get(key)
2013-06-18 22:14:22 +04:00
if ok {
2013-06-30 00:48:13 +04:00
var TTL int64
var isExpire bool = false
isExpire = !node.ExpireTime.Equal(PERMANENT)
2013-07-12 19:41:28 +04:00
resp := &Response{
2013-07-16 21:40:41 +04:00
Action: "GET",
2013-07-16 21:44:09 +04:00
Key: key,
Value: node.Value,
Index: s.Index,
2013-07-12 19:41:28 +04:00
}
2013-07-10 00:14:12 +04:00
// Update ttl
2013-06-30 00:48:13 +04:00
if isExpire {
TTL = int64(node.ExpireTime.Sub(time.Now()) / time.Second)
2013-07-16 21:40:41 +04:00
resp.Expiration = &node.ExpireTime
resp.TTL = TTL
}
2013-06-30 00:48:13 +04:00
2013-07-12 19:41:28 +04:00
return resp
2013-06-30 02:29:10 +04:00
2013-07-10 00:14:12 +04:00
} else {
// we do not found the key
2013-07-12 19:41:28 +04:00
return nil
2013-06-18 22:14:22 +04:00
}
}
2013-07-12 21:10:56 +04:00
// Get all the items under key
// If key is a file return the file
// If key is a directory reuturn an array of files
func (s *Store) Get(key string) ([]byte, error) {
s.mutex.RLock()
defer s.mutex.RUnlock()
resps, err := s.RawGet(key)
2013-07-10 00:14:12 +04:00
if err != nil {
return nil, err
}
2013-08-12 20:24:33 +04:00
key = path.Clean("/" + key)
// If the number of resps == 1 and the response key
// is the key we query, a signal key-value should
// be returned
if len(resps) == 1 && resps[0].Key == key {
return json.Marshal(resps[0])
}
return json.Marshal(resps)
}
func (s *Store) rawGetNode(key string, node *Node) ([]*Response, error) {
resps := make([]*Response, 1)
2013-07-06 03:22:45 +04:00
isExpire := !node.ExpireTime.Equal(PERMANENT)
2013-07-06 03:22:45 +04:00
resps[0] = &Response{
Action: "GET",
Index: s.Index,
Key: key,
Value: node.Value,
}
// Update ttl
if isExpire {
TTL := int64(node.ExpireTime.Sub(time.Now()) / time.Second)
resps[0].Expiration = &node.ExpireTime
resps[0].TTL = TTL
}
return resps, nil
}
func (s *Store) rawGetNodeList(key string, keys []string, nodes []*Node) ([]*Response, error) {
resps := make([]*Response, len(nodes))
// TODO: check if nodes and keys are the same length
for i := 0; i < len(nodes); i++ {
var TTL int64
var isExpire bool = false
isExpire = !nodes[i].ExpireTime.Equal(PERMANENT)
resps[i] = &Response{
Action: "GET",
Index: s.Index,
Key: path.Join(key, keys[i]),
}
if len(nodes[i].Value) != 0 {
resps[i].Value = nodes[i].Value
} else {
resps[i].Dir = true
}
2013-07-12 21:10:56 +04:00
// Update ttl
if isExpire {
TTL = int64(nodes[i].ExpireTime.Sub(time.Now()) / time.Second)
resps[i].Expiration = &nodes[i].ExpireTime
resps[i].TTL = TTL
}
2013-07-12 21:10:56 +04:00
}
2013-07-12 21:10:56 +04:00
return resps, nil
}
2013-07-12 21:10:56 +04:00
func (s *Store) RawGet(key string) ([]*Response, error) {
// Update stats
s.BasicStats.Gets++
2013-07-12 21:10:56 +04:00
key = path.Clean("/" + key)
nodes, keys, ok := s.Tree.list(key)
if !ok {
return nil, etcdErr.NewError(100, "get: "+key)
2013-07-06 03:22:45 +04:00
}
switch node := nodes.(type) {
case *Node:
return s.rawGetNode(key, node)
case []*Node:
return s.rawGetNodeList(key, keys, node)
default:
panic("invalid cast ")
}
2013-07-06 03:22:45 +04:00
}
2013-07-10 00:14:12 +04:00
func (s *Store) Delete(key string, index uint64) ([]byte, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.internalDelete(key, index)
}
// Delete the key
func (s *Store) internalDelete(key string, index uint64) ([]byte, error) {
2013-07-01 22:16:30 +04:00
2013-08-04 08:16:09 +04:00
// Update stats
2013-08-02 23:31:35 +04:00
s.BasicStats.Deletes++
2013-07-10 00:14:12 +04:00
key = path.Clean("/" + key)
2013-06-30 00:13:51 +04:00
2013-08-04 08:16:09 +04:00
// Update index
2013-07-10 00:14:12 +04:00
s.Index = index
2013-06-18 22:14:22 +04:00
2013-07-03 23:57:23 +04:00
node, ok := s.Tree.get(key)
2013-06-18 22:14:22 +04:00
if !ok {
return nil, etcdErr.NewError(100, "delete: "+key)
}
2013-06-18 22:14:22 +04:00
resp := Response{
Action: "DELETE",
Key: key,
PrevValue: node.Value,
Index: index,
}
2013-07-12 19:41:28 +04:00
if node.ExpireTime.Equal(PERMANENT) {
2013-06-18 22:14:22 +04:00
s.Tree.delete(key)
2013-06-19 02:04:30 +04:00
} else {
resp.Expiration = &node.ExpireTime
// Kill the expire go routine
node.update <- PERMANENT
s.Tree.delete(key)
2013-07-16 21:40:41 +04:00
}
msg, err := json.Marshal(resp)
s.watcher.notify(resp)
// notify the messager
if s.messager != nil && err == nil {
s.messager <- string(msg)
}
2013-07-10 00:14:12 +04:00
s.addToResponseMap(index, &resp)
2013-06-19 02:04:30 +04:00
return msg, err
2013-06-18 22:14:22 +04:00
}
2013-07-10 00:14:12 +04:00
// Set the value of the key to the value if the given prevValue is equal to the value of the key
func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
2013-08-04 08:16:09 +04:00
// Update stats
2013-08-02 23:31:35 +04:00
s.BasicStats.TestAndSets++
2013-07-12 19:41:28 +04:00
resp := s.internalGet(key)
2013-07-08 22:00:10 +04:00
2013-07-12 19:41:28 +04:00
if resp == nil {
if prevValue != "" {
errmsg := fmt.Sprintf("TestAndSet: key not found and previousValue is not empty %s:%s ", key, prevValue)
return nil, etcdErr.NewError(100, errmsg)
}
return s.internalSet(key, value, expireTime, index)
2013-07-12 19:41:28 +04:00
}
if resp.Value == prevValue {
2013-07-10 00:14:12 +04:00
// If test succeed, do set
return s.internalSet(key, value, expireTime, index)
2013-07-08 22:00:10 +04:00
} else {
2013-07-10 00:14:12 +04:00
2013-07-12 19:41:28 +04:00
// If fails, return err
2013-08-18 07:41:15 +04:00
return nil, etcdErr.NewError(101, fmt.Sprintf("TestAndSet: %s!=%s",
resp.Value, prevValue))
2013-07-08 22:00:10 +04:00
}
2013-07-10 00:14:12 +04:00
2013-07-08 22:00:10 +04:00
}
2013-07-10 00:14:12 +04:00
// Add a channel to the watchHub.
// The watchHub will send response to the channel when any key under the prefix
// changes [since the sinceIndex if given]
func (s *Store) AddWatcher(prefix string, watcher *Watcher, sinceIndex uint64) error {
2013-08-07 07:00:50 +04:00
return s.watcher.addWatcher(prefix, watcher, sinceIndex, s.ResponseStartIndex, s.Index, s.ResponseMap)
2013-07-10 00:14:12 +04:00
}
// This function should be created as a go routine to delete the key-value pair
// when it reaches expiration time
func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime time.Time) {
2013-07-08 22:00:10 +04:00
duration := expireTime.Sub(time.Now())
for {
select {
2013-07-10 00:14:12 +04:00
// Timeout delete the node
2013-07-08 22:00:10 +04:00
case <-time.After(duration):
node, ok := s.Tree.get(key)
2013-07-10 00:14:12 +04:00
2013-07-08 22:00:10 +04:00
if !ok {
return
2013-07-10 00:14:12 +04:00
2013-07-08 22:00:10 +04:00
} else {
s.mutex.Lock()
2013-07-08 22:00:10 +04:00
s.Tree.delete(key)
2013-07-12 19:41:28 +04:00
resp := Response{
2013-07-16 21:44:09 +04:00
Action: "DELETE",
Key: key,
PrevValue: node.Value,
2013-07-16 21:40:41 +04:00
Expiration: &node.ExpireTime,
2013-07-16 21:44:09 +04:00
Index: s.Index,
2013-07-12 19:41:28 +04:00
}
s.mutex.Unlock()
2013-07-08 22:00:10 +04:00
msg, err := json.Marshal(resp)
2013-07-10 00:14:12 +04:00
s.watcher.notify(resp)
2013-07-08 22:00:10 +04:00
// notify the messager
if s.messager != nil && err == nil {
2013-08-07 05:59:46 +04:00
s.messager <- string(msg)
2013-07-08 22:00:10 +04:00
}
return
}
case updateTime := <-update:
2013-07-10 00:14:12 +04:00
// Update duration
// If the node become a permanent one, the go routine is
2013-07-08 22:00:10 +04:00
// not needed
if updateTime.Equal(PERMANENT) {
return
}
2013-07-10 00:14:12 +04:00
// Update duration
2013-07-08 22:00:10 +04:00
duration = updateTime.Sub(time.Now())
}
}
}
2013-07-10 00:14:12 +04:00
// When we receive a command that will change the state of the key-value store
// We will add the result of it to the ResponseMap for the use of watch command
// Also we may remove the oldest response when we add new one
func (s *Store) addToResponseMap(index uint64, resp *Response) {
2013-07-08 22:00:10 +04:00
2013-07-10 00:14:12 +04:00
// zero case
2013-07-08 22:00:10 +04:00
if s.ResponseMaxSize == 0 {
return
}
strIndex := strconv.FormatUint(index, 10)
s.ResponseMap[strIndex] = resp
2013-07-08 22:00:10 +04:00
// unlimited
2013-07-10 00:14:12 +04:00
if s.ResponseMaxSize < 0 {
2013-07-08 22:00:10 +04:00
s.ResponseCurrSize++
return
}
2013-07-10 00:14:12 +04:00
// if we reach the max point, we need to delete the most latest
// response and update the startIndex
2013-07-08 22:00:10 +04:00
if s.ResponseCurrSize == uint(s.ResponseMaxSize) {
s.ResponseStartIndex++
delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10))
} else {
s.ResponseCurrSize++
}
}
func (s *Store) clone() *Store {
2013-08-06 03:06:23 +04:00
newStore := &Store{
ResponseMaxSize: s.ResponseMaxSize,
ResponseCurrSize: s.ResponseCurrSize,
ResponseStartIndex: s.ResponseStartIndex,
Index: s.Index,
BasicStats: s.BasicStats,
}
newStore.Tree = s.Tree.clone()
newStore.ResponseMap = make(map[string]*Response)
for index, response := range s.ResponseMap {
newStore.ResponseMap[index] = response
}
return newStore
}
2013-07-10 00:14:12 +04:00
// Save the current state of the storage system
2013-06-21 02:59:23 +04:00
func (s *Store) Save() ([]byte, error) {
2013-08-06 04:03:31 +04:00
// first we clone the store
// json is very slow, we cannot hold the lock for such a long time
s.mutex.Lock()
cloneStore := s.clone()
s.mutex.Unlock()
b, err := json.Marshal(cloneStore)
2013-06-18 22:14:22 +04:00
if err != nil {
fmt.Println(err)
return nil, err
}
return b, nil
}
2013-07-10 00:14:12 +04:00
// Recovery the state of the stroage system from a previous state
2013-06-21 02:59:23 +04:00
func (s *Store) Recovery(state []byte) error {
s.mutex.Lock()
defer s.mutex.Unlock()
// we need to stop all the current watchers
// recovery will clear watcherHub
s.watcher.stopWatchers()
2013-06-18 22:14:22 +04:00
err := json.Unmarshal(state, s)
2013-07-10 00:14:12 +04:00
// The only thing need to change after the recovery is the
// node with expiration time, we need to delete all the node
// that have been expired and setup go routines to monitor the
// other ones
s.checkExpiration()
2013-06-18 22:14:22 +04:00
return err
}
2013-07-10 00:14:12 +04:00
// Clean the expired nodes
// Set up go routines to mon
func (s *Store) checkExpiration() {
s.Tree.traverse(s.checkNode, false)
2013-07-04 02:35:25 +04:00
}
2013-06-18 22:14:22 +04:00
2013-07-10 00:14:12 +04:00
// Check each node
func (s *Store) checkNode(key string, node *Node) {
2013-07-04 02:35:25 +04:00
if node.ExpireTime.Equal(PERMANENT) {
return
2013-07-04 02:35:25 +04:00
} else {
if node.ExpireTime.Sub(time.Now()) >= time.Second {
2013-07-10 00:14:12 +04:00
2013-07-04 02:35:25 +04:00
node.update = make(chan time.Time)
2013-07-10 00:14:12 +04:00
go s.monitorExpiration(key, node.update, node.ExpireTime)
2013-07-04 02:35:25 +04:00
} else {
// we should delete this node
s.Tree.delete(key)
2013-07-04 02:35:25 +04:00
}
}
2013-06-18 22:14:22 +04:00
}