From cdb1e34799c7962bde6aeae42f58eda3e6077b3b Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 21 Sep 2016 09:10:25 -0700 Subject: [PATCH] clientv3: add 'Sync' method --- clientv3/balancer.go | 16 +++++++++++++++- clientv3/client.go | 33 +++++++++++++++++++++++++++++++++ clientv3/config.go | 10 ++++++++-- 3 files changed, 56 insertions(+), 3 deletions(-) diff --git a/clientv3/balancer.go b/clientv3/balancer.go index 284a22297..011f6cf04 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -96,10 +96,24 @@ func getHost2ep(eps []string) map[string]string { } func (b *simpleBalancer) updateAddrs(eps []string) { + np := getHost2ep(eps) + b.mu.Lock() defer b.mu.Unlock() - b.host2ep = getHost2ep(eps) + match := len(np) == len(b.host2ep) + for k, v := range np { + if b.host2ep[k] != v { + match = false + break + } + } + if match { + // same endpoints, so no need to update address + return + } + + b.host2ep = np addrs := make([]grpc.Address, 0, len(eps)) for i := range eps { diff --git a/clientv3/client.go b/clientv3/client.go index d36105ba8..d8b04a43f 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -105,6 +105,38 @@ func (c *Client) SetEndpoints(eps ...string) { c.balancer.updateAddrs(eps) } +// Sync synchronizes client's endpoints with the known endpoints from the etcd membership. +func (c *Client) Sync(ctx context.Context) error { + mresp, err := c.MemberList(ctx) + if err != nil { + return err + } + var eps []string + for _, m := range mresp.Members { + eps = append(eps, m.ClientURLs...) + } + c.SetEndpoints(eps...) + return nil +} + +func (c *Client) autoSync() { + if c.cfg.AutoSyncInterval == time.Duration(0) { + return + } + + for { + select { + case <-c.ctx.Done(): + return + case <-time.After(c.cfg.AutoSyncInterval): + ctx, _ := context.WithTimeout(c.ctx, 5*time.Second) + if err := c.Sync(ctx); err != nil && err != c.ctx.Err() { + logger.Println("Auto sync endpoints failed:", err) + } + } + } +} + type authTokenCredential struct { token string } @@ -292,6 +324,7 @@ func newClient(cfg *Config) (*Client, error) { logger.Set(log.New(ioutil.Discard, "", 0)) } + go client.autoSync() return client, nil } diff --git a/clientv3/config.go b/clientv3/config.go index 066b41ece..4f92d7d26 100644 --- a/clientv3/config.go +++ b/clientv3/config.go @@ -28,6 +28,10 @@ type Config struct { // Endpoints is a list of URLs Endpoints []string + // AutoSyncInterval is the interval to update endpoints with its latest members. + // 0 disables auto-sync. By default auto-sync is disabled. + AutoSyncInterval time.Duration + // DialTimeout is the timeout for failing to establish a connection. DialTimeout time.Duration @@ -46,6 +50,7 @@ type Config struct { type yamlConfig struct { Endpoints []string `json:"endpoints"` + AutoSyncInterval time.Duration `json:"auto-sync-interval"` DialTimeout time.Duration `json:"dial-timeout"` InsecureTransport bool `json:"insecure-transport"` InsecureSkipTLSVerify bool `json:"insecure-skip-tls-verify"` @@ -68,8 +73,9 @@ func configFromFile(fpath string) (*Config, error) { } cfg := &Config{ - Endpoints: yc.Endpoints, - DialTimeout: yc.DialTimeout, + Endpoints: yc.Endpoints, + AutoSyncInterval: yc.AutoSyncInterval, + DialTimeout: yc.DialTimeout, } if yc.InsecureTransport {