From 9e3d045b2b4150d99bb796d4d496d05614e9b86f Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 4 Oct 2014 20:20:45 +0800 Subject: [PATCH] *:discovery hook up --- client/http.go | 12 ++++++-- discovery/discovery.go | 61 +++++++++++++++++++++++++------------ discovery/discovery_test.go | 34 +++------------------ etcdserver/server.go | 27 ++++++++++++---- main.go | 41 +++++++++++++++++++++---- 5 files changed, 112 insertions(+), 63 deletions(-) diff --git a/client/http.go b/client/http.go index 89e607908..3f0386f76 100644 --- a/client/http.go +++ b/client/http.go @@ -14,7 +14,7 @@ import ( "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context" ) -const ( +var ( v2Prefix = "/v2/keys" ) @@ -47,12 +47,18 @@ func NewHTTPClient(tr *http.Transport, ep string, timeout time.Duration) (*httpC return c, nil } +func (c *httpClient) SetPrefix(p string) { + v2Prefix = p +} + func (c *httpClient) Create(key, val string, ttl time.Duration) (*Response, error) { - uintTTL := uint64(ttl.Seconds()) create := &createAction{ Key: key, Value: val, - TTL: &uintTTL, + } + if ttl >= 0 { + uttl := uint64(ttl.Seconds()) + create.TTL = &uttl } ctx, cancel := context.WithTimeout(context.Background(), c.timeout) diff --git a/discovery/discovery.go b/discovery/discovery.go index 35b7180e6..8416d6376 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -3,13 +3,15 @@ package discovery import ( "errors" "fmt" + "net/http" + "net/url" "path" "sort" "strconv" "strings" + "time" "github.com/coreos/etcd/client" - "github.com/coreos/etcd/etcdserver/etcdhttp" ) var ( @@ -21,40 +23,66 @@ var ( ErrFullCluster = errors.New("discovery: cluster is full") ) +type Discoverer interface { + Discover() (string, error) +} + type discovery struct { cluster string id int64 - ctx []byte + config string c client.Client } -func (d *discovery) discover() (*etcdhttp.Peers, error) { +func New(durl string, id int64, config string) (Discoverer, error) { + u, err := url.Parse(durl) + if err != nil { + return nil, err + } + token := u.Path + u.Path = "" + client, err := client.NewHTTPClient(&http.Transport{}, u.String(), time.Second*5) + if err != nil { + return nil, err + } + // discovery service redirects /[key] to /v2/keys/[key] + // set the prefix of client to "" to handle this + client.SetPrefix("") + return &discovery{ + cluster: token, + id: id, + config: config, + c: client, + }, nil +} + +func (d *discovery) Discover() (string, error) { // fast path: if the cluster is full, returns the error // do not need to register itself to the cluster in this // case. if _, _, err := d.checkCluster(); err != nil { - return nil, err + return "", err } if err := d.createSelf(); err != nil { - return nil, err + return "", err } nodes, size, err := d.checkCluster() if err != nil { - return nil, err + return "", err } all, err := d.waitNodes(nodes, size) if err != nil { - return nil, err + return "", err } - return nodesToPeers(all) + return nodesToCluster(all), nil } func (d *discovery) createSelf() error { - resp, err := d.c.Create(d.selfKey(), string(d.ctx), 0) + resp, err := d.c.Create(d.selfKey(), d.config, -1) if err != nil { return err } @@ -87,7 +115,7 @@ func (d *discovery) checkCluster() (client.Nodes, int, error) { nodes := make(client.Nodes, 0) // append non-config keys to nodes for _, n := range resp.Node.Nodes { - if !strings.HasPrefix(n.Key, configKey) { + if !strings.Contains(n.Key, configKey) { nodes = append(nodes, n) } } @@ -97,7 +125,7 @@ func (d *discovery) checkCluster() (client.Nodes, int, error) { // find self position for i := range nodes { - if nodes[i].Key == d.selfKey() { + if strings.Contains(nodes[i].Key, d.selfKey()) { break } if i >= size-1 { @@ -111,7 +139,7 @@ func (d *discovery) waitNodes(nodes client.Nodes, size int) (client.Nodes, error if len(nodes) > size { nodes = nodes[:size] } - w := d.c.RecursiveWatch(d.cluster, nodes[len(nodes)-1].ModifiedIndex) + w := d.c.RecursiveWatch(d.cluster, nodes[len(nodes)-1].ModifiedIndex+1) all := make(client.Nodes, len(nodes)) copy(all, nodes) // wait for others @@ -129,17 +157,12 @@ func (d *discovery) selfKey() string { return path.Join("/", d.cluster, fmt.Sprintf("%d", d.id)) } -func nodesToPeers(ns client.Nodes) (*etcdhttp.Peers, error) { +func nodesToCluster(ns client.Nodes) string { s := make([]string, len(ns)) for i, n := range ns { s[i] = n.Value } - - var peers etcdhttp.Peers - if err := peers.Set(strings.Join(s, "&")); err != nil { - return nil, err - } - return &peers, nil + return strings.Join(s, ",") } type sortableNodes struct{ client.Nodes } diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go index 416a50b73..aafa9974c 100644 --- a/discovery/discovery_test.go +++ b/discovery/discovery_test.go @@ -10,7 +10,6 @@ import ( "time" "github.com/coreos/etcd/client" - "github.com/coreos/etcd/etcdserver/etcdhttp" ) func TestCheckCluster(t *testing.T) { @@ -216,40 +215,17 @@ func TestCreateSelf(t *testing.T) { } } -func TestNodesToPeers(t *testing.T) { +func TestNodesToCluster(t *testing.T) { nodes := client.Nodes{ {Key: "/1000/1", Value: "1=1.1.1.1", CreatedIndex: 1}, {Key: "/1000/2", Value: "2=2.2.2.2", CreatedIndex: 2}, {Key: "/1000/3", Value: "3=3.3.3.3", CreatedIndex: 3}, } - w := &etcdhttp.Peers{} - w.Set("1=1.1.1.1&2=2.2.2.2&3=3.3.3.3") + w := "1=1.1.1.1,2=2.2.2.2,3=3.3.3.3" - badnodes := client.Nodes{{Key: "1000/1", Value: "1=1.1.1.1&???", CreatedIndex: 1}} - - tests := []struct { - ns client.Nodes - wp *etcdhttp.Peers - we bool - }{ - {nodes, w, false}, - {badnodes, nil, true}, - } - - for i, tt := range tests { - peers, err := nodesToPeers(tt.ns) - if tt.we { - if err == nil { - t.Fatalf("#%d: err = %v, want not nil", i, err) - } - } else { - if err != nil { - t.Fatalf("#%d: err = %v, want nil", i, err) - } - } - if !reflect.DeepEqual(peers, tt.wp) { - t.Errorf("#%d: peers = %v, want %v", i, peers, tt.wp) - } + cluster := nodesToCluster(nodes) + if !reflect.DeepEqual(cluster, w) { + t.Errorf("cluster = %v, want %v", cluster, w) } } diff --git a/etcdserver/server.go b/etcdserver/server.go index 1da827aa1..552e0caff 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -11,6 +11,7 @@ import ( "sync/atomic" "time" + "github.com/coreos/etcd/discovery" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" @@ -84,12 +85,13 @@ type RaftTimer interface { } type ServerConfig struct { - Name string - ClientURLs types.URLs - DataDir string - SnapCount int64 - Cluster *Cluster - Transport *http.Transport + Name string + DiscoveryURL string + ClientURLs types.URLs + DataDir string + SnapCount int64 + Cluster *Cluster + Transport *http.Transport } // NewServer creates a new EtcdServer from the supplied configuration. The @@ -111,6 +113,19 @@ func NewServer(cfg *ServerConfig) *EtcdServer { var err error waldir := path.Join(cfg.DataDir, "wal") if !wal.Exist(waldir) { + if cfg.DiscoveryURL != "" { + d, err := discovery.New(cfg.DiscoveryURL, m.ID, cfg.Cluster.String()) + if err != nil { + log.Fatalf("etcd: cannot init discovery %v", err) + } + s, err := d.Discover() + if err != nil { + log.Fatalf("etcd: %v", err) + } + if err = cfg.Cluster.Set(s); err != nil { + log.Fatalf("etcd: %v", err) + } + } if w, err = wal.Create(waldir); err != nil { log.Fatal(err) } diff --git a/main.go b/main.go index 2ff66f13c..14958b52e 100644 --- a/main.go +++ b/main.go @@ -27,6 +27,7 @@ const ( var ( name = flag.String("name", "default", "Unique human-readable name for this node") dir = flag.String("data-dir", "", "Path to the data directory") + durl = flag.String("discovery", "", "Discovery service used to bootstrap the cluster") snapCount = flag.Uint64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot") printVersion = flag.Bool("version", false, "Print the version and exit") @@ -97,6 +98,9 @@ func main() { } pkg.SetFlagsFromEnv(flag.CommandLine) + if err := setClusterForDiscovery(); err != nil { + log.Fatalf("etcd: %v", err) + } if string(*proxyFlag) == flagtypes.ProxyValueOff { startEtcd() @@ -137,12 +141,13 @@ func startEtcd() { log.Fatal(err.Error()) } cfg := &etcdserver.ServerConfig{ - Name: *name, - ClientURLs: acurls, - DataDir: *dir, - SnapCount: int64(*snapCount), - Cluster: cluster, - Transport: pt, + Name: *name, + ClientURLs: acurls, + DataDir: *dir, + SnapCount: int64(*snapCount), + Cluster: cluster, + Transport: pt, + DiscoveryURL: *durl, } s := etcdserver.NewServer(cfg) s.Start() @@ -231,3 +236,27 @@ func startProxy() { }() } } + +// setClusterForDiscovery sets cluster to a temporary value if you are using +// the discovery. +func setClusterForDiscovery() error { + set := make(map[string]bool) + flag.Visit(func(f *flag.Flag) { + set[f.Name] = true + }) + if set["discovery"] && set["bootstrap-config"] { + return fmt.Errorf("both discovery and bootstrap-config are set") + } + if set["discovery"] { + apurls, err := pkg.URLsFromFlags(flag.CommandLine, "advertise-peer-urls", "addr", peerTLSInfo) + if err != nil { + return err + } + addrs := make([]string, len(apurls)) + for i := range apurls { + addrs[i] = apurls[i].String() + } + cluster.Set(fmt.Sprintf("%s=%s", *name, strings.Join(addrs, ","))) + } + return nil +}