From aa5b6cdc9e5e6ba07b6dec411c2b167e49e79527 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Tue, 30 Sep 2014 14:57:04 -0700 Subject: [PATCH 1/2] etcdserver: have newMember take an optional time field This will be used by members joining an existing cluster or joining using discovery. --- etcdserver/cluster.go | 2 +- etcdserver/member.go | 8 +++++++- etcdserver/member_test.go | 29 +++++++++++++++++++++++++++++ 3 files changed, 37 insertions(+), 2 deletions(-) create mode 100644 etcdserver/member_test.go diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index bd8e00c14..0e04c9a48 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -71,7 +71,7 @@ func (c *Cluster) Set(s string) error { if len(urls) == 0 || urls[0] == "" { return fmt.Errorf("Empty URL given for %q", name) } - m := newMember(name, urls) + m := newMember(name, urls, nil) err := c.Add(*m) if err != nil { return err diff --git a/etcdserver/member.go b/etcdserver/member.go index f54820c00..6a65adc29 100644 --- a/etcdserver/member.go +++ b/etcdserver/member.go @@ -3,9 +3,11 @@ package etcdserver import ( "crypto/sha1" "encoding/binary" + "fmt" "path" "sort" "strconv" + "time" ) const machineKVPrefix = "/_etcd/machines/" @@ -20,7 +22,7 @@ type Member struct { // newMember creates a Member without an ID and generates one based on the // name, peer URLs. This is used for bootstrapping. -func newMember(name string, peerURLs []string) *Member { +func newMember(name string, peerURLs []string, now *time.Time) *Member { sort.Strings(peerURLs) m := &Member{Name: name, PeerURLs: peerURLs} @@ -29,6 +31,10 @@ func newMember(name string, peerURLs []string) *Member { b = append(b, []byte(p)...) } + if now != nil { + b = append(b, []byte(fmt.Sprintf("%d", now.Unix()))...) + } + hash := sha1.Sum(b) m.ID = int64(binary.BigEndian.Uint64(hash[:8])) if m.ID < 0 { diff --git a/etcdserver/member_test.go b/etcdserver/member_test.go new file mode 100644 index 000000000..9a655269f --- /dev/null +++ b/etcdserver/member_test.go @@ -0,0 +1,29 @@ +package etcdserver + +import ( + "testing" + "time" +) + +func timeParse(value string) (*time.Time) { + t, err := time.Parse(time.RFC3339, value) + if err != nil { + panic(err) + } + return &t +} + +func TestMemberTime(t *testing.T) { + tests := []struct { + mem *Member + id int64 + }{ + {newMember("mem1", []string{"http://10.0.0.8:2379"}, nil), 7206348984215161146}, + {newMember("mem1", []string{"http://10.0.0.1:2379"}, timeParse("1984-12-23T15:04:05Z")), 5483967913615174889}, + } + for i, tt := range tests { + if tt.mem.ID != tt.id { + t.Errorf("#%d: mem.ID = %v, want %v", i, tt.mem.ID, tt.id) + } + } +} From c2f96631d3f5e7dff46180bde7ca44ff65e1dcdd Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Tue, 30 Sep 2014 15:49:13 -0700 Subject: [PATCH 2/2] etcdserver: stop using addScheme This standardizes the flags to use a list of URLs everywhere. The next step is to enforce the scheme based on TLS settings and support compat flags. --- etcdserver/cluster.go | 6 ++-- etcdserver/cluster_store.go | 6 ---- etcdserver/cluster_test.go | 34 ++++++++++---------- etcdserver/etcdhttp/http_test.go | 6 ++-- etcdserver/member_test.go | 2 +- main.go | 55 +++++++++++++++++++------------- pkg/flags/urls.go | 46 ++++++++++++++++++++++++++ pkg/flags/urls_test.go | 45 ++++++++++++++++++++++++++ 8 files changed, 148 insertions(+), 52 deletions(-) create mode 100644 pkg/flags/urls.go create mode 100644 pkg/flags/urls_test.go diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index 0e04c9a48..228709355 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -59,7 +59,7 @@ func (c Cluster) Pick(id int64) string { } // Set parses command line sets of names to IPs formatted like: -// mach0=1.1.1.1,mach0=2.2.2.2,mach0=1.1.1.1,mach1=2.2.2.2,mach1=3.3.3.3 +// mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach0=http://1.1.1.1,mach1=http://2.2.2.2,mach1=http://3.3.3.3 func (c *Cluster) Set(s string) error { *c = Cluster{} v, err := url.ParseQuery(strings.Replace(s, ",", "&", -1)) @@ -106,7 +106,7 @@ func (c Cluster) PeerURLs() []string { endpoints := make([]string, 0) for _, p := range c { for _, addr := range p.PeerURLs { - endpoints = append(endpoints, addScheme(addr)) + endpoints = append(endpoints, addr) } } sort.Strings(endpoints) @@ -120,7 +120,7 @@ func (c Cluster) ClientURLs() []string { urls := make([]string, 0) for _, p := range c { for _, url := range p.ClientURLs { - urls = append(urls, addScheme(url)) + urls = append(urls, url) } } sort.Strings(urls) diff --git a/etcdserver/cluster_store.go b/etcdserver/cluster_store.go index 12904d1dd..d21fd0887 100644 --- a/etcdserver/cluster_store.go +++ b/etcdserver/cluster_store.go @@ -75,12 +75,6 @@ func (s *clusterStore) Delete(id int64) { } } -// addScheme adds the protocol prefix to a string; currently only HTTP -// TODO: improve this when implementing TLS -func addScheme(addr string) string { - return fmt.Sprintf("http://%s", addr) -} - func Sender(t *http.Transport, cls ClusterStore) func(msgs []raftpb.Message) { c := &http.Client{Transport: t} diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index ebae4fbcf..90a42f32d 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -77,11 +77,11 @@ func TestClusterSet(t *testing.T) { parse bool }{ { - "mem1=10.0.0.1:2379,mem1=128.193.4.20:2379,mem2=10.0.0.2:2379,default=127.0.0.1:2379", + "mem1=http://10.0.0.1:2379,mem1=http://128.193.4.20:2379,mem2=http://10.0.0.2:2379,default=http://127.0.0.1:2379", []Member{ - {ID: 3736794188555456841, Name: "mem1", PeerURLs: []string{"10.0.0.1:2379", "128.193.4.20:2379"}}, - {ID: 5674507346857578431, Name: "mem2", PeerURLs: []string{"10.0.0.2:2379"}}, - {ID: 2676999861503984872, Name: "default", PeerURLs: []string{"127.0.0.1:2379"}}, + {ID: 3736794188555456841, Name: "mem1", PeerURLs: []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}}, + {ID: 5674507346857578431, Name: "mem2", PeerURLs: []string{"http://10.0.0.2:2379"}}, + {ID: 2676999861503984872, Name: "default", PeerURLs: []string{"http://127.0.0.1:2379"}}, }, true, }, @@ -104,10 +104,10 @@ func TestClusterSet(t *testing.T) { func TestClusterSetBad(t *testing.T) { tests := []string{ - "mem1=,mem2=128.193.4.20:2379,mem3=10.0.0.2:2379", - "mem1,mem2=128.193.4.20:2379,mem3=10.0.0.2:2379", + "mem1=,mem2=http://128.193.4.20:2379,mem3=http://10.0.0.2:2379", + "mem1,mem2=http://128.193.4.20:2379,mem3=http://10.0.0.2:2379", // TODO(philips): anyone know of a 64 bit sha1 hash collision - // "06b2f82fd81b2c20=128.193.4.20:2379,02c60cb75083ceef=128.193.4.20:2379", + // "06b2f82fd81b2c20=http://128.193.4.20:2379,02c60cb75083ceef=http://128.193.4.20:2379", } for i, tt := range tests { g := Cluster{} @@ -151,7 +151,7 @@ func TestClusterPeerURLs(t *testing.T) { // single peer with a single address { mems: []Member{ - {ID: 1, PeerURLs: []string{"192.0.2.1"}}, + {ID: 1, PeerURLs: []string{"http://192.0.2.1"}}, }, wurls: []string{"http://192.0.2.1"}, }, @@ -159,7 +159,7 @@ func TestClusterPeerURLs(t *testing.T) { // single peer with a single address with a port { mems: []Member{ - {ID: 1, PeerURLs: []string{"192.0.2.1:8001"}}, + {ID: 1, PeerURLs: []string{"http://192.0.2.1:8001"}}, }, wurls: []string{"http://192.0.2.1:8001"}, }, @@ -167,9 +167,9 @@ func TestClusterPeerURLs(t *testing.T) { // several members explicitly unsorted { mems: []Member{ - {ID: 2, PeerURLs: []string{"192.0.2.3", "192.0.2.4"}}, - {ID: 3, PeerURLs: []string{"192.0.2.5", "192.0.2.6"}}, - {ID: 1, PeerURLs: []string{"192.0.2.1", "192.0.2.2"}}, + {ID: 2, PeerURLs: []string{"http://192.0.2.3", "http://192.0.2.4"}}, + {ID: 3, PeerURLs: []string{"http://192.0.2.5", "http://192.0.2.6"}}, + {ID: 1, PeerURLs: []string{"http://192.0.2.1", "http://192.0.2.2"}}, }, wurls: []string{"http://192.0.2.1", "http://192.0.2.2", "http://192.0.2.3", "http://192.0.2.4", "http://192.0.2.5", "http://192.0.2.6"}, }, @@ -210,7 +210,7 @@ func TestClusterClientURLs(t *testing.T) { // single peer with a single address { mems: []Member{ - {ID: 1, ClientURLs: []string{"192.0.2.1"}}, + {ID: 1, ClientURLs: []string{"http://192.0.2.1"}}, }, wurls: []string{"http://192.0.2.1"}, }, @@ -218,7 +218,7 @@ func TestClusterClientURLs(t *testing.T) { // single peer with a single address with a port { mems: []Member{ - {ID: 1, ClientURLs: []string{"192.0.2.1:8001"}}, + {ID: 1, ClientURLs: []string{"http://192.0.2.1:8001"}}, }, wurls: []string{"http://192.0.2.1:8001"}, }, @@ -226,9 +226,9 @@ func TestClusterClientURLs(t *testing.T) { // several members explicitly unsorted { mems: []Member{ - {ID: 2, ClientURLs: []string{"192.0.2.3", "192.0.2.4"}}, - {ID: 3, ClientURLs: []string{"192.0.2.5", "192.0.2.6"}}, - {ID: 1, ClientURLs: []string{"192.0.2.1", "192.0.2.2"}}, + {ID: 2, ClientURLs: []string{"http://192.0.2.3", "http://192.0.2.4"}}, + {ID: 3, ClientURLs: []string{"http://192.0.2.5", "http://192.0.2.6"}}, + {ID: 1, ClientURLs: []string{"http://192.0.2.1", "http://192.0.2.2"}}, }, wurls: []string{"http://192.0.2.1", "http://192.0.2.2", "http://192.0.2.3", "http://192.0.2.4", "http://192.0.2.5", "http://192.0.2.6"}, }, diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index b09c74603..46ebfe7b4 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -612,9 +612,9 @@ func TestV2MachinesEndpoint(t *testing.T) { func TestServeMachines(t *testing.T) { cluster := &fakeCluster{ members: []etcdserver.Member{ - {ID: 0xBEEF0, ClientURLs: []string{"localhost:8080"}}, - {ID: 0xBEEF1, ClientURLs: []string{"localhost:8081"}}, - {ID: 0xBEEF2, ClientURLs: []string{"localhost:8082"}}, + {ID: 0xBEEF0, ClientURLs: []string{"http://localhost:8080"}}, + {ID: 0xBEEF1, ClientURLs: []string{"http://localhost:8081"}}, + {ID: 0xBEEF2, ClientURLs: []string{"http://localhost:8082"}}, }, } diff --git a/etcdserver/member_test.go b/etcdserver/member_test.go index 9a655269f..cc077d25a 100644 --- a/etcdserver/member_test.go +++ b/etcdserver/member_test.go @@ -5,7 +5,7 @@ import ( "time" ) -func timeParse(value string) (*time.Time) { +func timeParse(value string) *time.Time { t, err := time.Parse(time.RFC3339, value) if err != nil { panic(err) diff --git a/main.go b/main.go index 3a828d1a2..fe2f335eb 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "net/http" + "net/url" "os" "path" "strings" @@ -32,13 +33,15 @@ const ( var ( name = flag.String("name", "default", "Unique human-readable name for this node") timeout = flag.Duration("timeout", 10*time.Second, "Request Timeout") - paddr = flag.String("peer-bind-addr", ":7001", "Peer service address (e.g., ':7001')") dir = flag.String("data-dir", "", "Path to the data directory") snapCount = flag.Int64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot") printVersion = flag.Bool("version", false, "Print the version and exit") cluster = &etcdserver.Cluster{} - addrs = &flagtypes.Addrs{} + lcurls = &flagtypes.URLs{} + acurls = &flagtypes.URLs{} + lpurls = &flagtypes.URLs{} + apurls = &flagtypes.URLs{} cors = &pkg.CORSInfo{} proxyFlag = new(flagtypes.Proxy) @@ -66,11 +69,19 @@ var ( func init() { flag.Var(cluster, "bootstrap-config", "Initial cluster configuration for bootstrapping") - flag.Var(addrs, "bind-addr", "List of HTTP service addresses (e.g., '127.0.0.1:4001,10.0.0.1:8080')") + flag.Var(apurls, "advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster") + flag.Var(acurls, "advertise-client-urls", "List of this member's client URLs to advertise to the rest of the cluster") + flag.Var(lpurls, "listen-peer-urls", "List of this URLs to listen on for peer traffic") + flag.Var(lcurls, "listen-client-urls", "List of this URLs to listen on for client traffic") flag.Var(cors, "cors", "Comma-separated white list of origins for CORS (cross-origin resource sharing).") flag.Var(proxyFlag, "proxy", fmt.Sprintf("Valid values include %s", strings.Join(flagtypes.ProxyValues, ", "))) - cluster.Set("default=localhost:8080") - addrs.Set("127.0.0.1:4001") + + cluster.Set("default=http://localhost:2380,default=http://localhost:7001") + lcurls.Set("http://localhost:2379,http://localhost:4001") + acurls.Set("http://localhost:2379,http://localhost:4001") + lpurls.Set("http://localhost:2380,http://localhost:7001") + apurls.Set("http://localhost:2380,http://localhost:7001") + proxyFlag.Set(flagtypes.ProxyValueOff) flag.StringVar(&clientTLSInfo.CAFile, "ca-file", "", "Path to the client server TLS CA file.") @@ -202,27 +213,28 @@ func startEtcd() { } ph := etcdhttp.NewPeerHandler(s) - l, err := transport.NewListener(*paddr, peerTLSInfo) - if err != nil { - log.Fatal(err) + for _, u := range []url.URL(*lpurls) { + l, err := transport.NewListener(u.Host, peerTLSInfo) + if err != nil { + log.Fatal(err) + } + + // Start the peer server in a goroutine + go func() { + log.Print("Listening for peers on ", u.String()) + log.Fatal(http.Serve(l, ph)) + }() } - // Start the peer server in a goroutine - go func() { - log.Print("Listening for peers on ", *paddr) - log.Fatal(http.Serve(l, ph)) - }() - // Start a client server goroutine for each listen address - for _, addr := range *addrs { - addr := addr - l, err := transport.NewListener(addr, clientTLSInfo) + for _, u := range []url.URL(*lcurls) { + l, err := transport.NewListener(u.Host, clientTLSInfo) if err != nil { log.Fatal(err) } go func() { - log.Print("Listening for client requests on ", addr) + log.Print("Listening for client requests on ", u.String()) log.Fatal(http.Serve(l, ch)) }() } @@ -250,15 +262,14 @@ func startProxy() { } // Start a proxy server goroutine for each listen address - for _, addr := range *addrs { - addr := addr - l, err := transport.NewListener(addr, clientTLSInfo) + for _, u := range []url.URL(*lcurls) { + l, err := transport.NewListener(u.Host, clientTLSInfo) if err != nil { log.Fatal(err) } go func() { - log.Print("Listening for client requests on ", addr) + log.Print("Listening for client requests on ", u.Host) log.Fatal(http.Serve(l, ph)) }() } diff --git a/pkg/flags/urls.go b/pkg/flags/urls.go new file mode 100644 index 000000000..34808d862 --- /dev/null +++ b/pkg/flags/urls.go @@ -0,0 +1,46 @@ +package flags + +import ( + "errors" + "fmt" + "net/url" + "strings" +) + +// URLs implements the flag.Value interface to allow users to define multiple +// URLs on the command-line +type URLs []url.URL + +// Set parses a command line set of URLs formatted like: +// http://127.0.0.1:7001,http://10.1.1.2:80 +func (us *URLs) Set(s string) error { + strs := strings.Split(s, ",") + all := make([]url.URL, len(strs)) + if len(all) == 0 { + return errors.New("no valid URLs given") + } + for _, in := range strs { + in = strings.TrimSpace(in) + u, err := url.Parse(in) + if err != nil { + return err + } + if u.Scheme != "http" && u.Scheme != "https" { + return fmt.Errorf("URL scheme must be http or https: %s", s) + } + if u.Path != "" { + return fmt.Errorf("URL must not contain a path: %s", s) + } + all = append(all, *u) + } + *us = all + return nil +} + +func (us *URLs) String() string { + all := make([]string, len(*us)) + for i, u := range *us { + all[i] = u.String() + } + return strings.Join(all, ",") +} diff --git a/pkg/flags/urls_test.go b/pkg/flags/urls_test.go new file mode 100644 index 000000000..49aa3e47d --- /dev/null +++ b/pkg/flags/urls_test.go @@ -0,0 +1,45 @@ +package flags + +import ( + "testing" +) + +func TestValidateURLsBad(t *testing.T) { + tests := []string{ + // bad IP specification + ":4001", + "127.0:8080", + "123:456", + // bad port specification + "127.0.0.1:foo", + "127.0.0.1:", + // unix sockets not supported + "unix://", + "unix://tmp/etcd.sock", + // bad strings + "somewhere", + "234#$", + "file://foo/bar", + "http://hello/asdf", + } + for i, in := range tests { + u := URLs{} + if err := u.Set(in); err == nil { + t.Errorf(`#%d: unexpected nil error for in=%q`, i, in) + } + } +} + +func TestValidateURLsGood(t *testing.T) { + tests := []string{ + "https://1.2.3.4:8080", + "http://10.1.1.1:80", + "http://10.1.1.1", + } + for i, in := range tests { + u := URLs{} + if err := u.Set(in); err != nil { + t.Errorf("#%d: err=%v, want nil for in=%q", i, err, in) + } + } +}