diff --git a/third_party/github.com/coreos/go-etcd/etcd/client.go b/third_party/github.com/coreos/go-etcd/etcd/client.go new file mode 100644 index 000000000..703b4cec4 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/client.go @@ -0,0 +1,241 @@ +package etcd + +import ( + "crypto/tls" + "errors" + "io/ioutil" + "net" + "net/http" + "path" + "strings" + "time" +) + +const ( + HTTP = iota + HTTPS +) + +type Cluster struct { + Leader string + Machines []string +} + +type Config struct { + CertFile string + KeyFile string + Scheme string + Timeout time.Duration +} + +type Client struct { + cluster Cluster + config Config + httpClient *http.Client +} + +// Setup a basic conf and cluster +func NewClient() *Client { + + // default leader and machines + cluster := Cluster{ + Leader: "0.0.0.0:4001", + Machines: make([]string, 1), + } + cluster.Machines[0] = "0.0.0.0:4001" + + config := Config{ + // default use http + Scheme: "http", + // default timeout is one second + Timeout: time.Second, + } + + tr := &http.Transport{ + Dial: dialTimeout, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + } + + return &Client{ + cluster: cluster, + config: config, + httpClient: &http.Client{Transport: tr}, + } + +} + +func (c *Client) SetCertAndKey(cert string, key string) (bool, error) { + + if cert != "" && key != "" { + tlsCert, err := tls.LoadX509KeyPair(cert, key) + + if err != nil { + return false, err + } + + tr := &http.Transport{ + TLSClientConfig: &tls.Config{ + Certificates: []tls.Certificate{tlsCert}, + InsecureSkipVerify: true, + }, + Dial: dialTimeout, + } + + c.httpClient = &http.Client{Transport: tr} + return true, nil + } + return false, errors.New("Require both cert and key path") +} + +func (c *Client) SetScheme(scheme int) (bool, error) { + if scheme == HTTP { + c.config.Scheme = "http" + return true, nil + } + if scheme == HTTPS { + c.config.Scheme = "https" + return true, nil + } + return false, errors.New("Unknown Scheme") +} + +// Try to sync from the given machine +func (c *Client) SetCluster(machines []string) bool { + success := c.internalSyncCluster(machines) + return success +} + +// sycn cluster information using the existing machine list +func (c *Client) SyncCluster() bool { + success := c.internalSyncCluster(c.cluster.Machines) + return success +} + +// sync cluster information by providing machine list +func (c *Client) internalSyncCluster(machines []string) bool { + for _, machine := range machines { + httpPath := c.createHttpPath(machine, "machines") + resp, err := c.httpClient.Get(httpPath) + if err != nil { + // try another machine in the cluster + continue + } else { + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + // try another machine in the cluster + continue + } + // update Machines List + c.cluster.Machines = strings.Split(string(b), ",") + logger.Debug("sync.machines ", c.cluster.Machines) + return true + } + } + return false +} + +// serverName should contain both hostName and port +func (c *Client) createHttpPath(serverName string, _path string) string { + httpPath := path.Join(serverName, _path) + httpPath = c.config.Scheme + "://" + httpPath + return httpPath +} + +// Dial with timeout. +func dialTimeout(network, addr string) (net.Conn, error) { + return net.DialTimeout(network, addr, time.Second) +} + +func (c *Client) getHttpPath(s ...string) string { + httpPath := path.Join(c.cluster.Leader, version) + + for _, seg := range s { + httpPath = path.Join(httpPath, seg) + } + + httpPath = c.config.Scheme + "://" + httpPath + return httpPath +} + +func (c *Client) updateLeader(httpPath string) { + // httpPath http://127.0.0.1:4001/v1... + leader := strings.Split(httpPath, "://")[1] + // we want to have 127.0.0.1:4001 + + leader = strings.Split(leader, "/")[0] + logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, leader) + c.cluster.Leader = leader +} + +// Wrap GET, POST and internal error handling +func (c *Client) sendRequest(method string, _path string, body string) (*http.Response, error) { + + var resp *http.Response + var err error + var req *http.Request + + retry := 0 + // if we connect to a follower, we will retry until we found a leader + for { + + httpPath := c.getHttpPath(_path) + logger.Debug("send.request.to ", httpPath) + if body == "" { + + req, _ = http.NewRequest(method, httpPath, nil) + + } else { + req, _ = http.NewRequest(method, httpPath, strings.NewReader(body)) + req.Header.Set("Content-Type", "application/x-www-form-urlencoded; param=value") + } + + resp, err = c.httpClient.Do(req) + + logger.Debug("recv.response.from ", httpPath) + // network error, change a machine! + if err != nil { + retry++ + if retry > 2*len(c.cluster.Machines) { + return nil, errors.New("Cannot reach servers") + } + num := retry % len(c.cluster.Machines) + logger.Debug("update.leader[", c.cluster.Leader, ",", c.cluster.Machines[num], "]") + c.cluster.Leader = c.cluster.Machines[num] + time.Sleep(time.Millisecond * 200) + continue + } + + if resp != nil { + if resp.StatusCode == http.StatusTemporaryRedirect { + httpPath := resp.Header.Get("Location") + + resp.Body.Close() + + if httpPath == "" { + return nil, errors.New("Cannot get redirection location") + } + + c.updateLeader(httpPath) + logger.Debug("send.redirect") + // try to connect the leader + continue + } else if resp.StatusCode == http.StatusInternalServerError { + retry++ + if retry > 2*len(c.cluster.Machines) { + return nil, errors.New("Cannot reach servers") + } + resp.Body.Close() + continue + } else { + logger.Debug("send.return.response ", httpPath) + break + } + + } + logger.Debug("error.from ", httpPath, " ", err.Error()) + return nil, err + } + return resp, nil +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/client_test.go b/third_party/github.com/coreos/go-etcd/etcd/client_test.go new file mode 100644 index 000000000..45a99e96c --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/client_test.go @@ -0,0 +1,38 @@ +package etcd + +import ( + "fmt" + "testing" +) + +// To pass this test, we need to create a cluster of 3 machines +// The server should be listening on 127.0.0.1:4001, 4002, 4003 +func TestSync(t *testing.T) { + fmt.Println("Make sure there are three nodes at 0.0.0.0:4001-4003") + + c := NewClient() + + success := c.SyncCluster() + if !success { + t.Fatal("cannot sync machines") + } + + badMachines := []string{"abc", "edef"} + + success = c.SetCluster(badMachines) + + if success { + t.Fatal("should not sync on bad machines") + } + + goodMachines := []string{"127.0.0.1:4002"} + + success = c.SetCluster(goodMachines) + + if !success { + t.Fatal("cannot sync machines") + } else { + fmt.Println(c.cluster.Machines) + } + +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/debug.go b/third_party/github.com/coreos/go-etcd/etcd/debug.go new file mode 100644 index 000000000..f35dfae76 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/debug.go @@ -0,0 +1,19 @@ +package etcd + +import ( + "github.com/ccding/go-logging/logging" +) + +var logger, _ = logging.SimpleLogger("go-etcd") + +func init() { + logger.SetLevel(logging.FATAL) +} + +func OpenDebug() { + logger.SetLevel(logging.NOTSET) +} + +func CloseDebug() { + logger.SetLevel(logging.FATAL) +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/delete.go b/third_party/github.com/coreos/go-etcd/etcd/delete.go new file mode 100644 index 000000000..fea169560 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/delete.go @@ -0,0 +1,41 @@ +package etcd + +import ( + "encoding/json" + "github.com/coreos/etcd/store" + "io/ioutil" + "net/http" + "path" +) + +func (c *Client) Delete(key string) (*store.Response, error) { + + resp, err := c.sendRequest("DELETE", path.Join("keys", key), "") + + if err != nil { + return nil, err + } + + b, err := ioutil.ReadAll(resp.Body) + + resp.Body.Close() + + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, handleError(b) + } + + var result store.Response + + err = json.Unmarshal(b, &result) + + if err != nil { + return nil, err + } + + return &result, nil + +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/delete_test.go b/third_party/github.com/coreos/go-etcd/etcd/delete_test.go new file mode 100644 index 000000000..a5f980167 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/delete_test.go @@ -0,0 +1,22 @@ +package etcd + +import ( + "testing" +) + +func TestDelete(t *testing.T) { + + c := NewClient() + + c.Set("foo", "bar", 100) + result, err := c.Delete("foo") + if err != nil { + t.Fatal(err) + } + + if result.PrevValue != "bar" || result.Value != "" { + t.Fatalf("Delete failed with %s %s", result.PrevValue, + result.Value) + } + +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/error.go b/third_party/github.com/coreos/go-etcd/etcd/error.go new file mode 100644 index 000000000..9a3268d60 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/error.go @@ -0,0 +1,24 @@ +package etcd + +import ( + "encoding/json" + "fmt" +) + +type EtcdError struct { + ErrorCode int `json:"errorCode"` + Message string `json:"message"` + Cause string `json:"cause,omitempty"` +} + +func (e EtcdError) Error() string { + return fmt.Sprintf("%d: %s (%s)", e.ErrorCode, e.Message, e.Cause) +} + +func handleError(b []byte) error { + var err EtcdError + + json.Unmarshal(b, &err) + + return err +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/get.go b/third_party/github.com/coreos/go-etcd/etcd/get.go new file mode 100644 index 000000000..b0d16fe20 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/get.go @@ -0,0 +1,83 @@ +package etcd + +import ( + "encoding/json" + "github.com/coreos/etcd/store" + "io/ioutil" + "net/http" + "path" +) + +func (c *Client) Get(key string) ([]*store.Response, error) { + logger.Debugf("get %s [%s]", key, c.cluster.Leader) + resp, err := c.sendRequest("GET", path.Join("keys", key), "") + + if err != nil { + return nil, err + } + + b, err := ioutil.ReadAll(resp.Body) + + resp.Body.Close() + + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + + return nil, handleError(b) + } + + return convertGetResponse(b) + +} + +// GetTo gets the value of the key from a given machine address. +// If the given machine is not available it returns an error. +// Mainly use for testing purpose +func (c *Client) GetFrom(key string, addr string) ([]*store.Response, error) { + httpPath := c.createHttpPath(addr, path.Join(version, "keys", key)) + + resp, err := c.httpClient.Get(httpPath) + + if err != nil { + return nil, err + } + + b, err := ioutil.ReadAll(resp.Body) + + resp.Body.Close() + + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, handleError(b) + } + + return convertGetResponse(b) +} + +// Convert byte stream to response. +func convertGetResponse(b []byte) ([]*store.Response, error) { + + var results []*store.Response + var result *store.Response + + err := json.Unmarshal(b, &result) + + if err != nil { + err = json.Unmarshal(b, &results) + + if err != nil { + return nil, err + } + + } else { + results = make([]*store.Response, 1) + results[0] = result + } + return results, nil +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/get_test.go b/third_party/github.com/coreos/go-etcd/etcd/get_test.go new file mode 100644 index 000000000..8e3852c9f --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/get_test.go @@ -0,0 +1,46 @@ +package etcd + +import ( + "testing" + "time" +) + +func TestGet(t *testing.T) { + + c := NewClient() + + c.Set("foo", "bar", 100) + + // wait for commit + time.Sleep(100 * time.Millisecond) + + results, err := c.Get("foo") + + if err != nil || results[0].Key != "/foo" || results[0].Value != "bar" { + if err != nil { + t.Fatal(err) + } + t.Fatalf("Get failed with %s %s %v", results[0].Key, results[0].Value, results[0].TTL) + } + + results, err = c.Get("goo") + + if err == nil { + t.Fatalf("should not be able to get non-exist key") + } + + results, err = c.GetFrom("foo", "0.0.0.0:4001") + + if err != nil || results[0].Key != "/foo" || results[0].Value != "bar" { + if err != nil { + t.Fatal(err) + } + t.Fatalf("Get failed with %s %s %v", results[0].Key, results[0].Value, results[0].TTL) + } + + results, err = c.GetFrom("foo", "0.0.0.0:4009") + + if err == nil { + t.Fatal("should not get from port 4009") + } +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/list_test.go b/third_party/github.com/coreos/go-etcd/etcd/list_test.go new file mode 100644 index 000000000..1e98e7645 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/list_test.go @@ -0,0 +1,23 @@ +package etcd + +import ( + "testing" + "time" +) + +func TestList(t *testing.T) { + c := NewClient() + + c.Set("foo_list/foo", "bar", 100) + c.Set("foo_list/fooo", "barbar", 100) + c.Set("foo_list/foooo/foo", "barbarbar", 100) + // wait for commit + time.Sleep(time.Second) + + _, err := c.Get("foo_list") + + if err != nil { + t.Fatal(err) + } + +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/set.go b/third_party/github.com/coreos/go-etcd/etcd/set.go new file mode 100644 index 000000000..78acb9081 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/set.go @@ -0,0 +1,90 @@ +package etcd + +import ( + "encoding/json" + "fmt" + "github.com/coreos/etcd/store" + "io/ioutil" + "net/http" + "net/url" + "path" +) + +func (c *Client) Set(key string, value string, ttl uint64) (*store.Response, error) { + logger.Debugf("set %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader) + v := url.Values{} + v.Set("value", value) + + if ttl > 0 { + v.Set("ttl", fmt.Sprintf("%v", ttl)) + } + + resp, err := c.sendRequest("POST", path.Join("keys", key), v.Encode()) + + if err != nil { + return nil, err + } + + b, err := ioutil.ReadAll(resp.Body) + + resp.Body.Close() + + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + + return nil, handleError(b) + } + + return convertSetResponse(b) + +} + +// SetTo sets the value of the key to a given machine address. +// If the given machine is not available or is not leader it returns an error +// Mainly use for testing purpose. +func (c *Client) SetTo(key string, value string, ttl uint64, addr string) (*store.Response, error) { + v := url.Values{} + v.Set("value", value) + + if ttl > 0 { + v.Set("ttl", fmt.Sprintf("%v", ttl)) + } + + httpPath := c.createHttpPath(addr, path.Join(version, "keys", key)) + + resp, err := c.httpClient.PostForm(httpPath, v) + + if err != nil { + return nil, err + } + + b, err := ioutil.ReadAll(resp.Body) + + resp.Body.Close() + + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, handleError(b) + } + + return convertSetResponse(b) +} + +// Convert byte stream to response. +func convertSetResponse(b []byte) (*store.Response, error) { + var result store.Response + + err := json.Unmarshal(b, &result) + + if err != nil { + return nil, err + } + + return &result, nil +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/set_test.go b/third_party/github.com/coreos/go-etcd/etcd/set_test.go new file mode 100644 index 000000000..dc46608d7 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/set_test.go @@ -0,0 +1,42 @@ +package etcd + +import ( + "testing" + "time" +) + +func TestSet(t *testing.T) { + c := NewClient() + + result, err := c.Set("foo", "bar", 100) + + if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL != 99 { + if err != nil { + t.Fatal(err) + } + + t.Fatalf("Set 1 failed with %s %s %v", result.Key, result.Value, result.TTL) + } + + time.Sleep(time.Second) + + result, err = c.Set("foo", "bar", 100) + + if err != nil || result.Key != "/foo" || result.Value != "bar" || result.PrevValue != "bar" || result.TTL != 99 { + if err != nil { + t.Fatal(err) + } + t.Fatalf("Set 2 failed with %s %s %v", result.Key, result.Value, result.TTL) + } + + result, err = c.SetTo("toFoo", "bar", 100, "0.0.0.0:4001") + + if err != nil || result.Key != "/toFoo" || result.Value != "bar" || result.TTL != 99 { + if err != nil { + t.Fatal(err) + } + + t.Fatalf("SetTo failed with %s %s %v", result.Key, result.Value, result.TTL) + } + +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/testAndSet.go b/third_party/github.com/coreos/go-etcd/etcd/testAndSet.go new file mode 100644 index 000000000..0bd8672ec --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/testAndSet.go @@ -0,0 +1,57 @@ +package etcd + +import ( + "encoding/json" + "fmt" + "github.com/coreos/etcd/store" + "io/ioutil" + "net/http" + "net/url" + "path" +) + +func (c *Client) TestAndSet(key string, prevValue string, value string, ttl uint64) (*store.Response, bool, error) { + logger.Debugf("set %s, %s[%s], ttl: %d, [%s]", key, value, prevValue, ttl, c.cluster.Leader) + v := url.Values{} + v.Set("value", value) + v.Set("prevValue", prevValue) + + if ttl > 0 { + v.Set("ttl", fmt.Sprintf("%v", ttl)) + } + + resp, err := c.sendRequest("POST", path.Join("keys", key), v.Encode()) + + if err != nil { + return nil, false, err + } + + b, err := ioutil.ReadAll(resp.Body) + + resp.Body.Close() + + if err != nil { + + return nil, false, err + } + + if resp.StatusCode != http.StatusOK { + return nil, false, handleError(b) + } + + var result store.Response + + err = json.Unmarshal(b, &result) + + if err != nil { + return nil, false, err + } + + if result.PrevValue == prevValue && result.Value == value { + + return &result, true, nil + } + + return &result, false, nil + +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/testAndSet_test.go b/third_party/github.com/coreos/go-etcd/etcd/testAndSet_test.go new file mode 100644 index 000000000..ba6d0e8f5 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/testAndSet_test.go @@ -0,0 +1,39 @@ +package etcd + +import ( + "testing" + "time" +) + +func TestTestAndSet(t *testing.T) { + c := NewClient() + + c.Set("foo_testAndSet", "bar", 100) + + time.Sleep(time.Second) + + results := make(chan bool, 3) + + for i := 0; i < 3; i++ { + testAndSet("foo_testAndSet", "bar", "barbar", results, c) + } + + count := 0 + + for i := 0; i < 3; i++ { + result := <-results + if result { + count++ + } + } + + if count != 1 { + t.Fatalf("test and set fails %v", count) + } + +} + +func testAndSet(key string, prevValue string, value string, ch chan bool, c *Client) { + _, success, _ := c.TestAndSet(key, prevValue, value, 0) + ch <- success +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/version.go b/third_party/github.com/coreos/go-etcd/etcd/version.go new file mode 100644 index 000000000..b27956805 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/version.go @@ -0,0 +1,3 @@ +package etcd + +var version = "v1" diff --git a/third_party/github.com/coreos/go-etcd/etcd/watch.go b/third_party/github.com/coreos/go-etcd/etcd/watch.go new file mode 100644 index 000000000..c583e9ad1 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/watch.go @@ -0,0 +1,117 @@ +package etcd + +import ( + "encoding/json" + "errors" + "fmt" + "github.com/coreos/etcd/store" + "io/ioutil" + "net/http" + "net/url" + "path" +) + +type respAndErr struct { + resp *http.Response + err error +} + +// Watch any change under the given prefix. +// When a sinceIndex is given, watch will try to scan from that index to the last index +// and will return any changes under the given prefix during the history +// If a receiver channel is given, it will be a long-term watch. Watch will block at the +// channel. And after someone receive the channel, it will go on to watch that prefix. +// If a stop channel is given, client can close long-term watch using the stop channel + +func (c *Client) Watch(prefix string, sinceIndex uint64, receiver chan *store.Response, stop chan bool) (*store.Response, error) { + logger.Debugf("watch %s [%s]", prefix, c.cluster.Leader) + if receiver == nil { + return c.watchOnce(prefix, sinceIndex, stop) + + } else { + for { + resp, err := c.watchOnce(prefix, sinceIndex, stop) + if resp != nil { + sinceIndex = resp.Index + 1 + receiver <- resp + } else { + return nil, err + } + } + } + + return nil, nil +} + +// helper func +// return when there is change under the given prefix +func (c *Client) watchOnce(key string, sinceIndex uint64, stop chan bool) (*store.Response, error) { + + var resp *http.Response + var err error + + if sinceIndex == 0 { + // Get request if no index is given + resp, err = c.sendRequest("GET", path.Join("watch", key), "") + + if err != nil { + return nil, err + } + + } else { + + // Post + v := url.Values{} + v.Set("index", fmt.Sprintf("%v", sinceIndex)) + + ch := make(chan respAndErr) + + if stop != nil { + go func() { + resp, err = c.sendRequest("POST", path.Join("watch", key), v.Encode()) + + ch <- respAndErr{resp, err} + }() + + // select at stop or continue to receive + select { + + case res := <-ch: + resp, err = res.resp, res.err + + case <-stop: + resp, err = nil, errors.New("User stoped watch") + } + } else { + resp, err = c.sendRequest("POST", path.Join("watch", key), v.Encode()) + } + + if err != nil { + return nil, err + } + + } + + b, err := ioutil.ReadAll(resp.Body) + + resp.Body.Close() + + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + + return nil, handleError(b) + } + + var result store.Response + + err = json.Unmarshal(b, &result) + + if err != nil { + return nil, err + } + + return &result, nil +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/watch_test.go b/third_party/github.com/coreos/go-etcd/etcd/watch_test.go new file mode 100644 index 000000000..5e18a2b29 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/watch_test.go @@ -0,0 +1,62 @@ +package etcd + +import ( + "fmt" + "github.com/coreos/etcd/store" + "testing" + "time" +) + +func TestWatch(t *testing.T) { + c := NewClient() + + go setHelper("bar", c) + + result, err := c.Watch("watch_foo", 0, nil, nil) + + if err != nil || result.Key != "/watch_foo/foo" || result.Value != "bar" { + if err != nil { + t.Fatal(err) + } + t.Fatalf("Watch failed with %s %s %v %v", result.Key, result.Value, result.TTL, result.Index) + } + + result, err = c.Watch("watch_foo", result.Index, nil, nil) + + if err != nil || result.Key != "/watch_foo/foo" || result.Value != "bar" { + if err != nil { + t.Fatal(err) + } + t.Fatalf("Watch with Index failed with %s %s %v %v", result.Key, result.Value, result.TTL, result.Index) + } + + ch := make(chan *store.Response, 10) + stop := make(chan bool, 1) + + go setLoop("bar", c) + + go reciver(ch, stop) + + c.Watch("watch_foo", 0, ch, stop) +} + +func setHelper(value string, c *Client) { + time.Sleep(time.Second) + c.Set("watch_foo/foo", value, 100) +} + +func setLoop(value string, c *Client) { + time.Sleep(time.Second) + for i := 0; i < 10; i++ { + newValue := fmt.Sprintf("%s_%v", value, i) + c.Set("watch_foo/foo", newValue, 100) + time.Sleep(time.Second / 10) + } +} + +func reciver(c chan *store.Response, stop chan bool) { + for i := 0; i < 10; i++ { + <-c + } + stop <- true +}