diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 518ffaa47..68ceb6af6 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -23,7 +23,6 @@ import ( "os" "path" "reflect" - "strings" "time" "github.com/coreos/etcd/discovery" @@ -71,6 +70,7 @@ func Main() { var stopped <-chan struct{} + // TODO: check whether fields are set instead of whether fields have default value if cfg.name != defaultName && cfg.initialCluster == initialClusterFromName(defaultName) { cfg.initialCluster = initialClusterFromName(cfg.name) } @@ -116,7 +116,7 @@ func Main() { // startEtcd launches the etcd server and HTTP handlers for client/server communication. func startEtcd(cfg *config) (<-chan struct{}, error) { - cls, err := setupCluster(cfg) + urlsmap, token, err := getPeerURLsMapAndToken(cfg) if err != nil { return nil, fmt.Errorf("error setting up initial cluster: %v", err) } @@ -171,21 +171,22 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { } srvcfg := &etcdserver.ServerConfig{ - Name: cfg.name, - ClientURLs: cfg.acurls, - PeerURLs: cfg.apurls, - DataDir: cfg.dir, - SnapCount: cfg.snapCount, - MaxSnapFiles: cfg.maxSnapFiles, - MaxWALFiles: cfg.maxWalFiles, - Cluster: cls, - DiscoveryURL: cfg.durl, - DiscoveryProxy: cfg.dproxy, - NewCluster: cfg.isNewCluster(), - ForceNewCluster: cfg.forceNewCluster, - Transport: pt, - TickMs: cfg.TickMs, - ElectionTicks: cfg.electionTicks(), + Name: cfg.name, + ClientURLs: cfg.acurls, + PeerURLs: cfg.apurls, + DataDir: cfg.dir, + SnapCount: cfg.snapCount, + MaxSnapFiles: cfg.maxSnapFiles, + MaxWALFiles: cfg.maxWalFiles, + InitialPeerURLsMap: urlsmap, + InitialClusterToken: token, + DiscoveryURL: cfg.durl, + DiscoveryProxy: cfg.dproxy, + NewCluster: cfg.isNewCluster(), + ForceNewCluster: cfg.forceNewCluster, + Transport: pt, + TickMs: cfg.TickMs, + ElectionTicks: cfg.electionTicks(), } var s *etcdserver.EtcdServer s, err = etcdserver.NewServer(srvcfg) @@ -222,7 +223,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { // startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes. func startProxy(cfg *config) error { - cls, err := setupCluster(cfg) + urlsmap, _, err := getPeerURLsMapAndToken(cfg) if err != nil { return fmt.Errorf("error setting up initial cluster: %v", err) } @@ -232,7 +233,7 @@ func startProxy(cfg *config) error { if err != nil { return err } - if cls, err = etcdserver.NewClusterFromString(cfg.durl, s); err != nil { + if urlsmap, err = types.NewURLsMap(s); err != nil { return err } } @@ -267,12 +268,13 @@ func startProxy(cfg *config) error { peerURLs = urls.PeerURLs log.Printf("proxy: using peer urls %v from cluster file ./%s", peerURLs, clusterfile) case os.IsNotExist(err): - peerURLs = cls.PeerURLs() + peerURLs = urlsmap.URLs() log.Printf("proxy: using peer urls %v ", peerURLs) default: return err } + clientURLs := []string{} uf := func() []string { gcls, err := etcdserver.GetClusterFromRemotePeers(peerURLs, tr) // TODO: remove the 2nd check when we fix GetClusterFromPeers @@ -282,33 +284,33 @@ func startProxy(cfg *config) error { return []string{} } if len(gcls.Members()) == 0 { - return cls.ClientURLs() + return clientURLs } - cls = gcls + clientURLs = gcls.ClientURLs() - urls := struct{ PeerURLs []string }{cls.PeerURLs()} + urls := struct{ PeerURLs []string }{gcls.PeerURLs()} b, err := json.Marshal(urls) if err != nil { log.Printf("proxy: error on marshal peer urls %s", err) - return cls.ClientURLs() + return clientURLs } err = ioutil.WriteFile(clusterfile+".bak", b, 0600) if err != nil { log.Printf("proxy: error on writing urls %s", err) - return cls.ClientURLs() + return clientURLs } err = os.Rename(clusterfile+".bak", clusterfile) if err != nil { log.Printf("proxy: error on updating clusterfile %s", err) - return cls.ClientURLs() + return clientURLs } - if !reflect.DeepEqual(cls.PeerURLs(), peerURLs) { - log.Printf("proxy: updated peer urls in cluster file from %v to %v", peerURLs, cls.PeerURLs()) + if !reflect.DeepEqual(gcls.PeerURLs(), peerURLs) { + log.Printf("proxy: updated peer urls in cluster file from %v to %v", peerURLs, gcls.PeerURLs()) } - peerURLs = cls.PeerURLs() + peerURLs = gcls.PeerURLs() - return cls.ClientURLs() + return clientURLs } ph := proxy.NewHandler(pt, uf) ph = &cors.CORSHandler{ @@ -335,35 +337,28 @@ func startProxy(cfg *config) error { return nil } -// setupCluster sets up an initial cluster definition for bootstrap or discovery. -func setupCluster(cfg *config) (*etcdserver.Cluster, error) { - var cls *etcdserver.Cluster - var err error +// getPeerURLsMapAndToken sets up an initial peer URLsMap and cluster token for bootstrap or discovery. +func getPeerURLsMapAndToken(cfg *config) (urlsmap types.URLsMap, token string, err error) { switch { case cfg.durl != "": + urlsmap = types.URLsMap{} // If using discovery, generate a temporary cluster based on // self's advertised peer URLs - clusterStr := genClusterString(cfg.name, cfg.apurls) - cls, err = etcdserver.NewClusterFromString(cfg.durl, clusterStr) + urlsmap[cfg.name] = cfg.apurls + token = cfg.durl case cfg.dnsCluster != "": - clusterStr, clusterToken, err := discovery.SRVGetCluster(cfg.name, cfg.dnsCluster, cfg.initialClusterToken, cfg.apurls) + var clusterStr string + clusterStr, token, err = discovery.SRVGetCluster(cfg.name, cfg.dnsCluster, cfg.initialClusterToken, cfg.apurls) if err != nil { - return nil, err + return nil, "", err } - cls, err = etcdserver.NewClusterFromString(clusterToken, clusterStr) + urlsmap, err = types.NewURLsMap(clusterStr) default: // We're statically configured, and cluster has appropriately been set. - cls, err = etcdserver.NewClusterFromString(cfg.initialClusterToken, cfg.initialCluster) + urlsmap, err = types.NewURLsMap(cfg.initialCluster) + token = cfg.initialClusterToken } - return cls, err -} - -func genClusterString(name string, urls types.URLs) string { - addrs := make([]string, 0) - for _, u := range urls { - addrs = append(addrs, fmt.Sprintf("%v=%v", name, u.String())) - } - return strings.Join(addrs, ",") + return urlsmap, token, err } // identifyDataDirOrDie returns the type of the data dir. diff --git a/etcdmain/etcd_test.go b/etcdmain/etcd_test.go deleted file mode 100644 index 700b61544..000000000 --- a/etcdmain/etcd_test.go +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2015 CoreOS, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package etcdmain - -import ( - "testing" - - "github.com/coreos/etcd/pkg/testutil" -) - -func TestGenClusterString(t *testing.T) { - tests := []struct { - token string - urls []string - wstr string - }{ - { - "default", []string{"http://127.0.0.1:2379"}, - "default=http://127.0.0.1:2379", - }, - { - "node1", []string{"http://0.0.0.0:2379", "http://1.1.1.1:2379"}, - "node1=http://0.0.0.0:2379,node1=http://1.1.1.1:2379", - }, - } - for i, tt := range tests { - urls := testutil.MustNewURLs(t, tt.urls) - str := genClusterString(tt.token, urls) - if str != tt.wstr { - t.Errorf("#%d: cluster = %s, want %s", i, str, tt.wstr) - } - } -} diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index f9c60fe1e..90868cc1d 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -15,21 +15,21 @@ package etcdserver import ( + "bytes" "crypto/sha1" "encoding/binary" "encoding/json" "fmt" "log" - "net/url" "path" "sort" "strings" "sync" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver" - "github.com/coreos/etcd/pkg/flags" "github.com/coreos/etcd/pkg/netutil" "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/store" ) @@ -69,28 +69,15 @@ type Cluster struct { removed map[types.ID]bool } -// NewClusterFromString returns a Cluster instantiated from the given cluster token -// and cluster string, by parsing members from a set of discovery-formatted -// names-to-IPs, like: -// mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach1=http://3.3.3.3,mach2=http://4.4.4.4 -func NewClusterFromString(token string, cluster string) (*Cluster, error) { +func NewCluster(token string, initial types.URLsMap) (*Cluster, error) { c := newCluster(token) - - v, err := url.ParseQuery(strings.Replace(cluster, ",", "&", -1)) - if err != nil { - return nil, err - } - for name, urls := range v { - if len(urls) == 0 || urls[0] == "" { - return nil, fmt.Errorf("Empty URL given for %q", name) - } - purls := &flags.URLsValue{} - if err := purls.Set(strings.Join(urls, ",")); err != nil { - return nil, err - } - m := NewMember(name, types.URLs(*purls), c.token, nil) + for name, urls := range initial { + m := NewMember(name, urls, token, nil) if _, ok := c.members[m.ID]; ok { - return nil, fmt.Errorf("Member exists with identical ID %v", m) + return nil, fmt.Errorf("member exists with identical ID %v", m) + } + if uint64(m.ID) == raft.None { + return nil, fmt.Errorf("cannot use %x as member id", raft.None) } c.members[m.ID] = m } @@ -98,14 +85,6 @@ func NewClusterFromString(token string, cluster string) (*Cluster, error) { return c, nil } -func NewClusterFromStore(token string, st store.Store) *Cluster { - c := newCluster(token) - c.store = st - c.members, c.removed = membersFromStore(c.store) - c.version = clusterVersionFromStore(c.store) - return c -} - func NewClusterFromMembers(token string, id types.ID, membs []*Member) *Cluster { c := newCluster(token) c.id = id @@ -209,14 +188,19 @@ func (c *Cluster) ClientURLs() []string { func (c *Cluster) String() string { c.Lock() defer c.Unlock() - sl := []string{} + b := &bytes.Buffer{} + fmt.Fprintf(b, "{ClusterID:%s ", c.id) + var ms []string for _, m := range c.members { - for _, u := range m.PeerURLs { - sl = append(sl, fmt.Sprintf("%s=%s", m.Name, u)) - } + ms = append(ms, fmt.Sprintf("%+v", m)) } - sort.Strings(sl) - return strings.Join(sl, ",") + fmt.Fprintf(b, "Members:[%s] ", strings.Join(ms, " ")) + var ids []string + for id, _ := range c.removed { + ids = append(ids, fmt.Sprintf("%s", id)) + } + fmt.Fprintf(b, "RemovedMemberIDs:[%s]}", strings.Join(ids, " ")) + return b.String() } func (c *Cluster) genID() { @@ -371,20 +355,6 @@ func (c *Cluster) SetVersion(ver *semver.Version) { c.version = ver } -// Validate ensures that there is no identical urls in the cluster peer list -func (c *Cluster) Validate() error { - urlMap := make(map[string]bool) - for _, m := range c.Members() { - for _, url := range m.PeerURLs { - if urlMap[url] { - return fmt.Errorf("duplicate url %v in cluster config", url) - } - urlMap[url] = true - } - } - return nil -} - func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) { members := make(map[types.ID]*Member) removed := make(map[types.ID]bool) diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index 529890819..a18f06a78 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -21,110 +21,12 @@ import ( "reflect" "testing" - "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/store" ) -func TestClusterFromString(t *testing.T) { - tests := []struct { - f string - mems []*Member - }{ - { - "mem1=http://10.0.0.1:2379,mem1=http://128.193.4.20:2379,mem2=http://10.0.0.2:2379,default=http://127.0.0.1:2379", - []*Member{ - newTestMember(3141198903430435750, []string{"http://10.0.0.2:2379"}, "mem2", nil), - newTestMember(4322322643958477905, []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}, "mem1", nil), - newTestMember(12762790032478827328, []string{"http://127.0.0.1:2379"}, "default", nil), - }, - }, - } - for i, tt := range tests { - c, err := NewClusterFromString("abc", tt.f) - if err != nil { - t.Fatalf("#%d: unexpected new error: %v", i, err) - } - if c.token != "abc" { - t.Errorf("#%d: token = %v, want abc", i, c.token) - } - if !reflect.DeepEqual(c.Members(), tt.mems) { - t.Errorf("#%d: members = %+v, want %+v", i, c.Members(), tt.mems) - } - } -} - -func TestClusterFromStringBad(t *testing.T) { - tests := []string{ - // invalid URL - "%^", - // no URL defined for member - "mem1=,mem2=http://128.193.4.20:2379,mem3=http://10.0.0.2:2379", - "mem1,mem2=http://128.193.4.20:2379,mem3=http://10.0.0.2:2379", - // bad URL for member - "default=http://localhost/", - // TODO(philips): anyone know of a 64 bit sha1 hash collision - // "06b2f82fd81b2c20=http://128.193.4.20:2379,02c60cb75083ceef=http://128.193.4.20:2379", - // the same url for two members - "mem1=http://128.193.4.20:2379,mem2=http://128.193.4.20:2379", - } - for i, tt := range tests { - if _, err := NewClusterFromString("abc", tt); err == nil { - t.Errorf("#%d: unexpected successful new, want err", i) - } - } -} - -func TestClusterFromStore(t *testing.T) { - tests := []struct { - mems []*Member - ver *semver.Version - }{ - { - []*Member{newTestMember(1, nil, "", nil)}, - semver.Must(semver.NewVersion("2.0.0")), - }, - { - nil, - nil, - }, - { - []*Member{ - newTestMember(1, nil, "", nil), - newTestMember(2, nil, "", nil), - }, - semver.Must(semver.NewVersion("2.0.0")), - }, - } - for i, tt := range tests { - st := store.New() - hc := newTestCluster(nil) - hc.SetStore(st) - for _, m := range tt.mems { - hc.AddMember(m) - } - if tt.ver != nil { - _, err := st.Set(path.Join(StoreClusterPrefix, "version"), false, tt.ver.String(), store.Permanent) - if err != nil { - t.Fatal(err) - } - } - - c := NewClusterFromStore("abc", st) - if c.token != "abc" { - t.Errorf("#%d: token = %v, want %v", i, c.token, "abc") - } - if !reflect.DeepEqual(c.Members(), tt.mems) { - t.Errorf("#%d: members = %v, want %v", i, c.Members(), tt.mems) - } - if !reflect.DeepEqual(c.Version(), tt.ver) { - t.Errorf("#%d: ver = %v, want %v", i, c.Version(), tt.ver) - } - } -} - func TestClusterMember(t *testing.T) { membs := []*Member{ newTestMember(1, nil, "node1", nil), @@ -589,49 +491,6 @@ func TestClusterMembers(t *testing.T) { } } -func TestClusterString(t *testing.T) { - cls := &Cluster{ - members: map[types.ID]*Member{ - 1: newTestMember( - 1, - []string{"http://1.1.1.1:1111", "http://0.0.0.0:0000"}, - "abc", - nil, - ), - 2: newTestMember( - 2, - []string{"http://2.2.2.2:2222"}, - "def", - nil, - ), - 3: newTestMember( - 3, - []string{"http://3.3.3.3:1234", "http://127.0.0.1:2380"}, - "ghi", - nil, - ), - // no PeerURLs = not included - 4: newTestMember( - 4, - []string{}, - "four", - nil, - ), - 5: newTestMember( - 5, - nil, - "five", - nil, - ), - }, - } - w := "abc=http://0.0.0.0:0000,abc=http://1.1.1.1:1111,def=http://2.2.2.2:2222,ghi=http://127.0.0.1:2380,ghi=http://3.3.3.3:1234" - if g := cls.String(); g != w { - t.Fatalf("Cluster.String():\ngot %#v\nwant %#v", g, w) - } - -} - func TestClusterRemoveMember(t *testing.T) { st := &storeRecorder{} c := newTestCluster(nil) diff --git a/etcdserver/config.go b/etcdserver/config.go index c90692150..273782305 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -23,24 +23,24 @@ import ( "github.com/coreos/etcd/pkg/netutil" "github.com/coreos/etcd/pkg/types" - "github.com/coreos/etcd/raft" ) // ServerConfig holds the configuration of etcd as taken from the command line or discovery. type ServerConfig struct { - Name string - DiscoveryURL string - DiscoveryProxy string - ClientURLs types.URLs - PeerURLs types.URLs - DataDir string - SnapCount uint64 - MaxSnapFiles uint - MaxWALFiles uint - Cluster *Cluster - NewCluster bool - ForceNewCluster bool - Transport *http.Transport + Name string + DiscoveryURL string + DiscoveryProxy string + ClientURLs types.URLs + PeerURLs types.URLs + DataDir string + SnapCount uint64 + MaxSnapFiles uint + MaxWALFiles uint + InitialPeerURLsMap types.URLsMap + InitialClusterToken string + NewCluster bool + ForceNewCluster bool + Transport *http.Transport TickMs uint ElectionTicks int @@ -52,10 +52,10 @@ func (c *ServerConfig) VerifyBootstrap() error { if err := c.verifyLocalMember(true); err != nil { return err } - if err := c.Cluster.Validate(); err != nil { - return err + if checkDuplicateURL(c.InitialPeerURLsMap) { + return fmt.Errorf("initial cluster %s has duplicate url", c.InitialPeerURLsMap) } - if c.Cluster.String() == "" && c.DiscoveryURL == "" { + if c.InitialPeerURLsMap.String() == "" && c.DiscoveryURL == "" { return fmt.Errorf("initial cluster unset and no discovery URL found") } return nil @@ -70,8 +70,8 @@ func (c *ServerConfig) VerifyJoinExisting() error { if err := c.verifyLocalMember(false); err != nil { return err } - if err := c.Cluster.Validate(); err != nil { - return err + if checkDuplicateURL(c.InitialPeerURLsMap) { + return fmt.Errorf("initial cluster %s has duplicate url", c.InitialPeerURLsMap) } if c.DiscoveryURL != "" { return fmt.Errorf("discovery URL should not be set when joining existing initial cluster") @@ -83,21 +83,19 @@ func (c *ServerConfig) VerifyJoinExisting() error { // cluster. If strict is set, it also verifies the configured member // has the same peer urls as configured advertised peer urls. func (c *ServerConfig) verifyLocalMember(strict bool) error { - m := c.Cluster.MemberByName(c.Name) + urls := c.InitialPeerURLsMap[c.Name] // Make sure the cluster at least contains the local server. - if m == nil { + if urls == nil { return fmt.Errorf("couldn't find local name %q in the initial cluster configuration", c.Name) } - if uint64(m.ID) == raft.None { - return fmt.Errorf("cannot use %x as member id", raft.None) - } // Advertised peer URLs must match those in the cluster peer list // TODO: Remove URLStringsEqual after improvement of using hostnames #2150 #2123 apurls := c.PeerURLs.StringSlice() sort.Strings(apurls) + urls.Sort() if strict { - if !netutil.URLStringsEqual(apurls, m.PeerURLs) { + if !netutil.URLStringsEqual(apurls, urls.StringSlice()) { return fmt.Errorf("%s has different advertised URLs in the cluster and advertised peer URLs list", c.Name) } } @@ -135,6 +133,20 @@ func (c *ServerConfig) print(initial bool) { log.Printf("etcdserver: advertise client URLs = %s", c.ClientURLs) if initial { log.Printf("etcdserver: initial advertise peer URLs = %s", c.PeerURLs) - log.Printf("etcdserver: initial cluster = %s", c.Cluster) + log.Printf("etcdserver: initial cluster = %s", c.InitialPeerURLsMap) } } + +func checkDuplicateURL(urlsmap types.URLsMap) bool { + um := make(map[string]bool) + for _, urls := range urlsmap { + for _, url := range urls { + u := url.String() + if um[u] { + return true + } + um[u] = true + } + } + return false +} diff --git a/etcdserver/config_test.go b/etcdserver/config_test.go index d6bf2f279..b04252dfc 100644 --- a/etcdserver/config_test.go +++ b/etcdserver/config_test.go @@ -33,14 +33,10 @@ func mustNewURLs(t *testing.T, urls []string) []url.URL { } func TestConfigVerifyBootstrapWithoutClusterAndDiscoveryURLFail(t *testing.T) { - cluster, err := NewClusterFromString("", "") - if err != nil { - t.Fatalf("NewClusterFromString error: %v", err) - } c := &ServerConfig{ - Name: "node1", - DiscoveryURL: "", - Cluster: cluster, + Name: "node1", + DiscoveryURL: "", + InitialPeerURLsMap: types.URLsMap{}, } if err := c.VerifyBootstrap(); err == nil { t.Errorf("err = nil, want not nil") @@ -48,16 +44,16 @@ func TestConfigVerifyBootstrapWithoutClusterAndDiscoveryURLFail(t *testing.T) { } func TestConfigVerifyExistingWithDiscoveryURLFail(t *testing.T) { - cluster, err := NewClusterFromString("", "node1=http://127.0.0.1:2380") + cluster, err := types.NewURLsMap("node1=http://127.0.0.1:2380") if err != nil { - t.Fatalf("NewClusterFromString error: %v", err) + t.Fatalf("NewCluster error: %v", err) } c := &ServerConfig{ - Name: "node1", - DiscoveryURL: "http://127.0.0.1:2379/abcdefg", - PeerURLs: mustNewURLs(t, []string{"http://127.0.0.1:2380"}), - Cluster: cluster, - NewCluster: false, + Name: "node1", + DiscoveryURL: "http://127.0.0.1:2379/abcdefg", + PeerURLs: mustNewURLs(t, []string{"http://127.0.0.1:2380"}), + InitialPeerURLsMap: cluster, + NewCluster: false, } if err := c.VerifyJoinExisting(); err == nil { t.Errorf("err = nil, want not nil") @@ -130,20 +126,19 @@ func TestConfigVerifyLocalMember(t *testing.T) { } for i, tt := range tests { - cluster, err := NewClusterFromString("", tt.clusterSetting) + cluster, err := types.NewURLsMap(tt.clusterSetting) if err != nil { t.Fatalf("#%d: Got unexpected error: %v", i, err) } cfg := ServerConfig{ - Name: "node1", - Cluster: cluster, + Name: "node1", + InitialPeerURLsMap: cluster, } if tt.apurls != nil { cfg.PeerURLs = mustNewURLs(t, tt.apurls) } err = cfg.verifyLocalMember(tt.strict) if (err == nil) && tt.shouldError { - t.Errorf("%#v", *cluster) t.Errorf("#%d: Got no error where one was expected", i) } if (err != nil) && !tt.shouldError { diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 11e63bf86..2b6b8960b 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -192,13 +192,13 @@ func (r *raftNode) resumeSending() { p.Resume() } -func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { +func startNode(cfg *ServerConfig, cl *Cluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { var err error - member := cfg.Cluster.MemberByName(cfg.Name) + member := cl.MemberByName(cfg.Name) metadata := pbutil.MustMarshal( &pb.Metadata{ NodeID: uint64(member.ID), - ClusterID: uint64(cfg.Cluster.ID()), + ClusterID: uint64(cl.ID()), }, ) if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil { @@ -209,14 +209,14 @@ func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s * } peers := make([]raft.Peer, len(ids)) for i, id := range ids { - ctx, err := json.Marshal((*cfg.Cluster).Member(id)) + ctx, err := json.Marshal((*cl).Member(id)) if err != nil { log.Panicf("marshal member should never fail: %v", err) } peers[i] = raft.Peer{ID: uint64(id), Context: ctx} } id = member.ID - log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID()) + log.Printf("etcdserver: start member %s in cluster %s", id, cl.ID()) s = raft.NewMemoryStorage() c := &raft.Config{ ID: uint64(id), @@ -231,15 +231,16 @@ func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s * return } -func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) { +func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *Cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term } w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap) - cfg.Cluster.SetID(cid) - log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit) + log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cid, st.Commit) + cl := newCluster("") + cl.SetID(cid) s := raft.NewMemoryStorage() if snapshot != nil { s.ApplySnapshot(*snapshot) @@ -256,16 +257,15 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.N } n := raft.RestartNode(c) raftStatus = n.Status - return id, n, s, w + return id, cl, n, s, w } -func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) { +func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *Cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term } w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap) - cfg.Cluster.SetID(cid) // discard the previously uncommitted entries for i, ent := range ents { @@ -289,7 +289,9 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type st.Commit = ents[len(ents)-1].Index } - log.Printf("etcdserver: forcing restart of member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit) + log.Printf("etcdserver: forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit) + cl := newCluster("") + cl.SetID(cid) s := raft.NewMemoryStorage() if snapshot != nil { s.ApplySnapshot(*snapshot) @@ -306,7 +308,7 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type } n := raft.RestartNode(c) raftStatus = n.Status - return id, n, s, w + return id, cl, n, s, w } // getIDs returns an ordered set of IDs included in the given snapshot and diff --git a/etcdserver/server.go b/etcdserver/server.go index c3a31e8d0..e298b672b 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -178,6 +178,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { var n raft.Node var s *raft.MemoryStorage var id types.ID + var cl *Cluster // Run the migrations. dataVer, err := version.DetectDataDir(cfg.DataDir) @@ -197,41 +198,53 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if err := cfg.VerifyJoinExisting(); err != nil { return nil, err } - existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cfg.Cluster, cfg.Name), cfg.Transport) + cl, err = NewCluster(cfg.InitialClusterToken, cfg.InitialPeerURLsMap) + if err != nil { + return nil, err + } + existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), cfg.Transport) if err != nil { return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", err) } - if err := ValidateClusterAndAssignIDs(cfg.Cluster, existingCluster); err != nil { + if err := ValidateClusterAndAssignIDs(cl, existingCluster); err != nil { return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err) } remotes = existingCluster.Members() - cfg.Cluster.SetID(existingCluster.id) - cfg.Cluster.SetStore(st) + cl.SetID(existingCluster.id) + cl.SetStore(st) cfg.Print() - id, n, s, w = startNode(cfg, nil) + id, n, s, w = startNode(cfg, cl, nil) case !haveWAL && cfg.NewCluster: if err := cfg.VerifyBootstrap(); err != nil { return nil, err } - m := cfg.Cluster.MemberByName(cfg.Name) - if isMemberBootstrapped(cfg.Cluster, cfg.Name, cfg.Transport) { + cl, err = NewCluster(cfg.InitialClusterToken, cfg.InitialPeerURLsMap) + if err != nil { + return nil, err + } + m := cl.MemberByName(cfg.Name) + if isMemberBootstrapped(cl, cfg.Name, cfg.Transport) { return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID) } if cfg.ShouldDiscover() { - str, err := discovery.JoinCluster(cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.Cluster.String()) + str, err := discovery.JoinCluster(cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String()) if err != nil { return nil, err } - if cfg.Cluster, err = NewClusterFromString(cfg.Cluster.token, str); err != nil { + urlsmap, err := types.NewURLsMap(str) + if err != nil { return nil, err } - if err := cfg.Cluster.Validate(); err != nil { - return nil, fmt.Errorf("bad discovery cluster: %v", err) + if checkDuplicateURL(urlsmap) { + return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap) + } + if cl, err = NewCluster(cfg.InitialClusterToken, urlsmap); err != nil { + return nil, err } } - cfg.Cluster.SetStore(st) + cl.SetStore(st) cfg.PrintWithInitial() - id, n, s, w = startNode(cfg, cfg.Cluster.MemberIDs()) + id, n, s, w = startNode(cfg, cl, cl.MemberIDs()) case haveWAL: if err := fileutil.IsDirWriteable(cfg.DataDir); err != nil { return nil, fmt.Errorf("cannot write to data directory: %v", err) @@ -254,16 +267,17 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { } log.Printf("etcdserver: recovered store from snapshot at index %d", snapshot.Metadata.Index) } - cfg.Cluster = NewClusterFromStore(cfg.Cluster.token, st) cfg.Print() if snapshot != nil { - log.Printf("etcdserver: loaded cluster information from store: %s", cfg.Cluster) + log.Printf("etcdserver: loaded cluster information from store: %s", cl) } if !cfg.ForceNewCluster { - id, n, s, w = restartNode(cfg, snapshot) + id, cl, n, s, w = restartNode(cfg, snapshot) } else { - id, n, s, w = restartAsStandaloneNode(cfg, snapshot) + id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot) } + cl.SetStore(st) + cl.Recover() default: return nil, fmt.Errorf("unsupported bootstrap config") } @@ -288,7 +302,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { }, id: id, attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, - Cluster: cfg.Cluster, + Cluster: cl, stats: sstats, lstats: lstats, SyncTicker: time.Tick(500 * time.Millisecond), @@ -297,14 +311,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { } // TODO: move transport initialization near the definition of remote - tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats) + tr := rafthttp.NewTransporter(cfg.Transport, id, cl.ID(), srv, srv.errorc, sstats, lstats) // add all remotes into transport for _, m := range remotes { if m.ID != id { tr.AddRemote(m.ID, m.PeerURLs) } } - for _, m := range cfg.Cluster.Members() { + for _, m := range cl.Members() { if m.ID != id { tr.AddPeer(m.ID, m.PeerURLs) } diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 7c93da0bd..453b27407 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -301,7 +301,7 @@ type cluster struct { Members []*member } -func fillClusterForMembers(ms []*member, cName string) error { +func fillClusterForMembers(ms []*member) error { addrs := make([]string, 0) for _, m := range ms { scheme := "http" @@ -315,7 +315,7 @@ func fillClusterForMembers(ms []*member, cName string) error { clusterStr := strings.Join(addrs, ",") var err error for _, m := range ms { - m.Cluster, err = etcdserver.NewClusterFromString(cName, clusterStr) + m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr) if err != nil { return err } @@ -330,7 +330,7 @@ func newCluster(t *testing.T, size int, usePeerTLS bool) *cluster { ms[i] = mustNewMember(t, c.name(i), usePeerTLS) } c.Members = ms - if err := fillClusterForMembers(c.Members, clusterName); err != nil { + if err := fillClusterForMembers(c.Members); err != nil { t.Fatal(err) } @@ -420,7 +420,6 @@ func (c *cluster) HTTPMembers() []client.Member { } func (c *cluster) addMember(t *testing.T, usePeerTLS bool) { - clusterStr := c.Members[0].Cluster.String() m := mustNewMember(t, c.name(rand.Int()), usePeerTLS) scheme := "http" if usePeerTLS { @@ -441,14 +440,11 @@ func (c *cluster) addMember(t *testing.T, usePeerTLS bool) { members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}}) c.waitMembersMatch(t, members) - for _, ln := range m.PeerListeners { - clusterStr += fmt.Sprintf(",%s=%s://%s", m.Name, scheme, ln.Addr().String()) - } - var err error - m.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr) - if err != nil { - t.Fatal(err) + m.InitialPeerURLsMap = types.URLsMap{} + for _, mm := range c.Members { + m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs } + m.InitialPeerURLsMap[m.Name] = m.PeerURLs m.NewCluster = false if err := m.Launch(); err != nil { t.Fatal(err) @@ -645,10 +641,11 @@ func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member { t.Fatal(err) } clusterStr := fmt.Sprintf("%s=%s://%s", name, peerScheme, pln.Addr().String()) - m.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr) + m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr) if err != nil { t.Fatal(err) } + m.InitialClusterToken = clusterName m.NewCluster = true m.Transport = mustNewTransport(t, m.PeerTLSInfo) m.ElectionTicks = electionTicks @@ -675,12 +672,13 @@ func (m *member) Clone(t *testing.T) *member { // this should never fail panic(err) } - clusterStr := m.Cluster.String() - mm.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr) + clusterStr := m.InitialPeerURLsMap.String() + mm.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr) if err != nil { // this should never fail panic(err) } + mm.InitialClusterToken = m.InitialClusterToken mm.Transport = mustNewTransport(t, m.PeerTLSInfo) mm.ElectionTicks = m.ElectionTicks mm.PeerTLSInfo = m.PeerTLSInfo diff --git a/pkg/types/urlsmap.go b/pkg/types/urlsmap.go new file mode 100644 index 000000000..5f664ddc5 --- /dev/null +++ b/pkg/types/urlsmap.go @@ -0,0 +1,71 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +import ( + "fmt" + "net/url" + "sort" + "strings" +) + +type URLsMap map[string]URLs + +// NewURLsMap returns a URLsMap instantiated from the given string, +// which consists of discovery-formatted names-to-URLs, like: +// mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach1=http://3.3.3.3,mach2=http://4.4.4.4 +func NewURLsMap(s string) (URLsMap, error) { + cl := URLsMap{} + v, err := url.ParseQuery(strings.Replace(s, ",", "&", -1)) + if err != nil { + return nil, err + } + for name, urls := range v { + if len(urls) == 0 || urls[0] == "" { + return nil, fmt.Errorf("empty URL given for %q", name) + } + us, err := NewURLs(urls) + if err != nil { + return nil, err + } + cl[name] = us + } + return cl, nil +} + +// String returns NameURLPairs into discovery-formatted name-to-URLs sorted by name. +func (c URLsMap) String() string { + pairs := make([]string, 0) + for name, urls := range c { + for _, url := range urls { + pairs = append(pairs, fmt.Sprintf("%s=%s", name, url.String())) + } + } + sort.Strings(pairs) + return strings.Join(pairs, ",") +} + +// URLs returns a list of all URLs. +// The returned list is sorted in ascending lexicographical order. +func (c URLsMap) URLs() []string { + urls := make([]string, 0) + for _, us := range c { + for _, u := range us { + urls = append(urls, u.String()) + } + } + sort.Strings(urls) + return urls +} diff --git a/pkg/types/urlsmap_test.go b/pkg/types/urlsmap_test.go new file mode 100644 index 000000000..8b52dc17b --- /dev/null +++ b/pkg/types/urlsmap_test.go @@ -0,0 +1,69 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +import ( + "reflect" + "testing" + + "github.com/coreos/etcd/pkg/testutil" +) + +func TestParseInitialCluster(t *testing.T) { + c, err := NewURLsMap("mem1=http://10.0.0.1:2379,mem1=http://128.193.4.20:2379,mem2=http://10.0.0.2:2379,default=http://127.0.0.1:2379") + if err != nil { + t.Fatalf("unexpected parse error: %v", err) + } + wc := URLsMap(map[string]URLs{ + "mem1": testutil.MustNewURLs(t, []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}), + "mem2": testutil.MustNewURLs(t, []string{"http://10.0.0.2:2379"}), + "default": testutil.MustNewURLs(t, []string{"http://127.0.0.1:2379"}), + }) + if !reflect.DeepEqual(c, wc) { + t.Errorf("cluster = %+v, want %+v", c, wc) + } +} + +func TestParseInitialClusterBad(t *testing.T) { + tests := []string{ + // invalid URL + "%^", + // no URL defined for member + "mem1=,mem2=http://128.193.4.20:2379,mem3=http://10.0.0.2:2379", + "mem1,mem2=http://128.193.4.20:2379,mem3=http://10.0.0.2:2379", + // bad URL for member + "default=http://localhost/", + } + for i, tt := range tests { + if _, err := NewURLsMap(tt); err == nil { + t.Errorf("#%d: unexpected successful parse, want err", i) + } + } +} + +func TestNameURLPairsString(t *testing.T) { + cls := URLsMap(map[string]URLs{ + "abc": testutil.MustNewURLs(t, []string{"http://1.1.1.1:1111", "http://0.0.0.0:0000"}), + "def": testutil.MustNewURLs(t, []string{"http://2.2.2.2:2222"}), + "ghi": testutil.MustNewURLs(t, []string{"http://3.3.3.3:1234", "http://127.0.0.1:2380"}), + // no PeerURLs = not included + "four": testutil.MustNewURLs(t, []string{}), + "five": testutil.MustNewURLs(t, nil), + }) + w := "abc=http://0.0.0.0:0000,abc=http://1.1.1.1:1111,def=http://2.2.2.2:2222,ghi=http://127.0.0.1:2380,ghi=http://3.3.3.3:1234" + if g := cls.String(); g != w { + t.Fatalf("NameURLPairs.String():\ngot %#v\nwant %#v", g, w) + } +}