*: extract types.Cluster from etcdserver.Cluster
The PR extracts types.Cluster from etcdserver.Cluster. types.Cluster is used for flag parsing and etcdserver config. There is no need to expose etcdserver.Cluster public, which contains lots of etcdserver internal details and methods. This is the first step for it.release-2.1
parent
197437316f
commit
032db5e396
|
@ -23,7 +23,6 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/discovery"
|
"github.com/coreos/etcd/discovery"
|
||||||
|
@ -71,6 +70,7 @@ func Main() {
|
||||||
|
|
||||||
var stopped <-chan struct{}
|
var stopped <-chan struct{}
|
||||||
|
|
||||||
|
// TODO: check whether fields are set instead of whether fields have default value
|
||||||
if cfg.name != defaultName && cfg.initialCluster == initialClusterFromName(defaultName) {
|
if cfg.name != defaultName && cfg.initialCluster == initialClusterFromName(defaultName) {
|
||||||
cfg.initialCluster = initialClusterFromName(cfg.name)
|
cfg.initialCluster = initialClusterFromName(cfg.name)
|
||||||
}
|
}
|
||||||
|
@ -116,7 +116,7 @@ func Main() {
|
||||||
|
|
||||||
// startEtcd launches the etcd server and HTTP handlers for client/server communication.
|
// startEtcd launches the etcd server and HTTP handlers for client/server communication.
|
||||||
func startEtcd(cfg *config) (<-chan struct{}, error) {
|
func startEtcd(cfg *config) (<-chan struct{}, error) {
|
||||||
cls, err := setupCluster(cfg)
|
urlsmap, token, err := getPeerURLsMapAndToken(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error setting up initial cluster: %v", err)
|
return nil, fmt.Errorf("error setting up initial cluster: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -171,21 +171,22 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
srvcfg := &etcdserver.ServerConfig{
|
srvcfg := &etcdserver.ServerConfig{
|
||||||
Name: cfg.name,
|
Name: cfg.name,
|
||||||
ClientURLs: cfg.acurls,
|
ClientURLs: cfg.acurls,
|
||||||
PeerURLs: cfg.apurls,
|
PeerURLs: cfg.apurls,
|
||||||
DataDir: cfg.dir,
|
DataDir: cfg.dir,
|
||||||
SnapCount: cfg.snapCount,
|
SnapCount: cfg.snapCount,
|
||||||
MaxSnapFiles: cfg.maxSnapFiles,
|
MaxSnapFiles: cfg.maxSnapFiles,
|
||||||
MaxWALFiles: cfg.maxWalFiles,
|
MaxWALFiles: cfg.maxWalFiles,
|
||||||
Cluster: cls,
|
InitialPeerURLsMap: urlsmap,
|
||||||
DiscoveryURL: cfg.durl,
|
InitialClusterToken: token,
|
||||||
DiscoveryProxy: cfg.dproxy,
|
DiscoveryURL: cfg.durl,
|
||||||
NewCluster: cfg.isNewCluster(),
|
DiscoveryProxy: cfg.dproxy,
|
||||||
ForceNewCluster: cfg.forceNewCluster,
|
NewCluster: cfg.isNewCluster(),
|
||||||
Transport: pt,
|
ForceNewCluster: cfg.forceNewCluster,
|
||||||
TickMs: cfg.TickMs,
|
Transport: pt,
|
||||||
ElectionTicks: cfg.electionTicks(),
|
TickMs: cfg.TickMs,
|
||||||
|
ElectionTicks: cfg.electionTicks(),
|
||||||
}
|
}
|
||||||
var s *etcdserver.EtcdServer
|
var s *etcdserver.EtcdServer
|
||||||
s, err = etcdserver.NewServer(srvcfg)
|
s, err = etcdserver.NewServer(srvcfg)
|
||||||
|
@ -222,7 +223,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
||||||
|
|
||||||
// startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes.
|
// startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes.
|
||||||
func startProxy(cfg *config) error {
|
func startProxy(cfg *config) error {
|
||||||
cls, err := setupCluster(cfg)
|
urlsmap, _, err := getPeerURLsMapAndToken(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error setting up initial cluster: %v", err)
|
return fmt.Errorf("error setting up initial cluster: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -232,7 +233,7 @@ func startProxy(cfg *config) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if cls, err = etcdserver.NewClusterFromString(cfg.durl, s); err != nil {
|
if urlsmap, err = types.NewURLsMap(s); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -267,12 +268,13 @@ func startProxy(cfg *config) error {
|
||||||
peerURLs = urls.PeerURLs
|
peerURLs = urls.PeerURLs
|
||||||
log.Printf("proxy: using peer urls %v from cluster file ./%s", peerURLs, clusterfile)
|
log.Printf("proxy: using peer urls %v from cluster file ./%s", peerURLs, clusterfile)
|
||||||
case os.IsNotExist(err):
|
case os.IsNotExist(err):
|
||||||
peerURLs = cls.PeerURLs()
|
peerURLs = urlsmap.URLs()
|
||||||
log.Printf("proxy: using peer urls %v ", peerURLs)
|
log.Printf("proxy: using peer urls %v ", peerURLs)
|
||||||
default:
|
default:
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
clientURLs := []string{}
|
||||||
uf := func() []string {
|
uf := func() []string {
|
||||||
gcls, err := etcdserver.GetClusterFromRemotePeers(peerURLs, tr)
|
gcls, err := etcdserver.GetClusterFromRemotePeers(peerURLs, tr)
|
||||||
// TODO: remove the 2nd check when we fix GetClusterFromPeers
|
// TODO: remove the 2nd check when we fix GetClusterFromPeers
|
||||||
|
@ -282,33 +284,33 @@ func startProxy(cfg *config) error {
|
||||||
return []string{}
|
return []string{}
|
||||||
}
|
}
|
||||||
if len(gcls.Members()) == 0 {
|
if len(gcls.Members()) == 0 {
|
||||||
return cls.ClientURLs()
|
return clientURLs
|
||||||
}
|
}
|
||||||
cls = gcls
|
clientURLs = gcls.ClientURLs()
|
||||||
|
|
||||||
urls := struct{ PeerURLs []string }{cls.PeerURLs()}
|
urls := struct{ PeerURLs []string }{gcls.PeerURLs()}
|
||||||
b, err := json.Marshal(urls)
|
b, err := json.Marshal(urls)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("proxy: error on marshal peer urls %s", err)
|
log.Printf("proxy: error on marshal peer urls %s", err)
|
||||||
return cls.ClientURLs()
|
return clientURLs
|
||||||
}
|
}
|
||||||
|
|
||||||
err = ioutil.WriteFile(clusterfile+".bak", b, 0600)
|
err = ioutil.WriteFile(clusterfile+".bak", b, 0600)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("proxy: error on writing urls %s", err)
|
log.Printf("proxy: error on writing urls %s", err)
|
||||||
return cls.ClientURLs()
|
return clientURLs
|
||||||
}
|
}
|
||||||
err = os.Rename(clusterfile+".bak", clusterfile)
|
err = os.Rename(clusterfile+".bak", clusterfile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("proxy: error on updating clusterfile %s", err)
|
log.Printf("proxy: error on updating clusterfile %s", err)
|
||||||
return cls.ClientURLs()
|
return clientURLs
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(cls.PeerURLs(), peerURLs) {
|
if !reflect.DeepEqual(gcls.PeerURLs(), peerURLs) {
|
||||||
log.Printf("proxy: updated peer urls in cluster file from %v to %v", peerURLs, cls.PeerURLs())
|
log.Printf("proxy: updated peer urls in cluster file from %v to %v", peerURLs, gcls.PeerURLs())
|
||||||
}
|
}
|
||||||
peerURLs = cls.PeerURLs()
|
peerURLs = gcls.PeerURLs()
|
||||||
|
|
||||||
return cls.ClientURLs()
|
return clientURLs
|
||||||
}
|
}
|
||||||
ph := proxy.NewHandler(pt, uf)
|
ph := proxy.NewHandler(pt, uf)
|
||||||
ph = &cors.CORSHandler{
|
ph = &cors.CORSHandler{
|
||||||
|
@ -335,35 +337,28 @@ func startProxy(cfg *config) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// setupCluster sets up an initial cluster definition for bootstrap or discovery.
|
// getPeerURLsMapAndToken sets up an initial peer URLsMap and cluster token for bootstrap or discovery.
|
||||||
func setupCluster(cfg *config) (*etcdserver.Cluster, error) {
|
func getPeerURLsMapAndToken(cfg *config) (urlsmap types.URLsMap, token string, err error) {
|
||||||
var cls *etcdserver.Cluster
|
|
||||||
var err error
|
|
||||||
switch {
|
switch {
|
||||||
case cfg.durl != "":
|
case cfg.durl != "":
|
||||||
|
urlsmap = types.URLsMap{}
|
||||||
// If using discovery, generate a temporary cluster based on
|
// If using discovery, generate a temporary cluster based on
|
||||||
// self's advertised peer URLs
|
// self's advertised peer URLs
|
||||||
clusterStr := genClusterString(cfg.name, cfg.apurls)
|
urlsmap[cfg.name] = cfg.apurls
|
||||||
cls, err = etcdserver.NewClusterFromString(cfg.durl, clusterStr)
|
token = cfg.durl
|
||||||
case cfg.dnsCluster != "":
|
case cfg.dnsCluster != "":
|
||||||
clusterStr, clusterToken, err := discovery.SRVGetCluster(cfg.name, cfg.dnsCluster, cfg.initialClusterToken, cfg.apurls)
|
var clusterStr string
|
||||||
|
clusterStr, token, err = discovery.SRVGetCluster(cfg.name, cfg.dnsCluster, cfg.initialClusterToken, cfg.apurls)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, "", err
|
||||||
}
|
}
|
||||||
cls, err = etcdserver.NewClusterFromString(clusterToken, clusterStr)
|
urlsmap, err = types.NewURLsMap(clusterStr)
|
||||||
default:
|
default:
|
||||||
// We're statically configured, and cluster has appropriately been set.
|
// We're statically configured, and cluster has appropriately been set.
|
||||||
cls, err = etcdserver.NewClusterFromString(cfg.initialClusterToken, cfg.initialCluster)
|
urlsmap, err = types.NewURLsMap(cfg.initialCluster)
|
||||||
|
token = cfg.initialClusterToken
|
||||||
}
|
}
|
||||||
return cls, err
|
return urlsmap, token, err
|
||||||
}
|
|
||||||
|
|
||||||
func genClusterString(name string, urls types.URLs) string {
|
|
||||||
addrs := make([]string, 0)
|
|
||||||
for _, u := range urls {
|
|
||||||
addrs = append(addrs, fmt.Sprintf("%v=%v", name, u.String()))
|
|
||||||
}
|
|
||||||
return strings.Join(addrs, ",")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// identifyDataDirOrDie returns the type of the data dir.
|
// identifyDataDirOrDie returns the type of the data dir.
|
||||||
|
|
|
@ -1,45 +0,0 @@
|
||||||
// Copyright 2015 CoreOS, Inc.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package etcdmain
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/testutil"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestGenClusterString(t *testing.T) {
|
|
||||||
tests := []struct {
|
|
||||||
token string
|
|
||||||
urls []string
|
|
||||||
wstr string
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
"default", []string{"http://127.0.0.1:2379"},
|
|
||||||
"default=http://127.0.0.1:2379",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"node1", []string{"http://0.0.0.0:2379", "http://1.1.1.1:2379"},
|
|
||||||
"node1=http://0.0.0.0:2379,node1=http://1.1.1.1:2379",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
for i, tt := range tests {
|
|
||||||
urls := testutil.MustNewURLs(t, tt.urls)
|
|
||||||
str := genClusterString(tt.token, urls)
|
|
||||||
if str != tt.wstr {
|
|
||||||
t.Errorf("#%d: cluster = %s, want %s", i, str, tt.wstr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -15,21 +15,21 @@
|
||||||
package etcdserver
|
package etcdserver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"crypto/sha1"
|
"crypto/sha1"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"net/url"
|
|
||||||
"path"
|
"path"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
|
||||||
"github.com/coreos/etcd/pkg/flags"
|
|
||||||
"github.com/coreos/etcd/pkg/netutil"
|
"github.com/coreos/etcd/pkg/netutil"
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
|
"github.com/coreos/etcd/raft"
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
"github.com/coreos/etcd/store"
|
"github.com/coreos/etcd/store"
|
||||||
)
|
)
|
||||||
|
@ -69,28 +69,15 @@ type Cluster struct {
|
||||||
removed map[types.ID]bool
|
removed map[types.ID]bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClusterFromString returns a Cluster instantiated from the given cluster token
|
func NewCluster(token string, initial types.URLsMap) (*Cluster, error) {
|
||||||
// and cluster string, by parsing members from a set of discovery-formatted
|
|
||||||
// names-to-IPs, like:
|
|
||||||
// mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach1=http://3.3.3.3,mach2=http://4.4.4.4
|
|
||||||
func NewClusterFromString(token string, cluster string) (*Cluster, error) {
|
|
||||||
c := newCluster(token)
|
c := newCluster(token)
|
||||||
|
for name, urls := range initial {
|
||||||
v, err := url.ParseQuery(strings.Replace(cluster, ",", "&", -1))
|
m := NewMember(name, urls, token, nil)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
for name, urls := range v {
|
|
||||||
if len(urls) == 0 || urls[0] == "" {
|
|
||||||
return nil, fmt.Errorf("Empty URL given for %q", name)
|
|
||||||
}
|
|
||||||
purls := &flags.URLsValue{}
|
|
||||||
if err := purls.Set(strings.Join(urls, ",")); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
m := NewMember(name, types.URLs(*purls), c.token, nil)
|
|
||||||
if _, ok := c.members[m.ID]; ok {
|
if _, ok := c.members[m.ID]; ok {
|
||||||
return nil, fmt.Errorf("Member exists with identical ID %v", m)
|
return nil, fmt.Errorf("member exists with identical ID %v", m)
|
||||||
|
}
|
||||||
|
if uint64(m.ID) == raft.None {
|
||||||
|
return nil, fmt.Errorf("cannot use %x as member id", raft.None)
|
||||||
}
|
}
|
||||||
c.members[m.ID] = m
|
c.members[m.ID] = m
|
||||||
}
|
}
|
||||||
|
@ -98,14 +85,6 @@ func NewClusterFromString(token string, cluster string) (*Cluster, error) {
|
||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClusterFromStore(token string, st store.Store) *Cluster {
|
|
||||||
c := newCluster(token)
|
|
||||||
c.store = st
|
|
||||||
c.members, c.removed = membersFromStore(c.store)
|
|
||||||
c.version = clusterVersionFromStore(c.store)
|
|
||||||
return c
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewClusterFromMembers(token string, id types.ID, membs []*Member) *Cluster {
|
func NewClusterFromMembers(token string, id types.ID, membs []*Member) *Cluster {
|
||||||
c := newCluster(token)
|
c := newCluster(token)
|
||||||
c.id = id
|
c.id = id
|
||||||
|
@ -209,14 +188,19 @@ func (c *Cluster) ClientURLs() []string {
|
||||||
func (c *Cluster) String() string {
|
func (c *Cluster) String() string {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
sl := []string{}
|
b := &bytes.Buffer{}
|
||||||
|
fmt.Fprintf(b, "{ClusterID:%s ", c.id)
|
||||||
|
var ms []string
|
||||||
for _, m := range c.members {
|
for _, m := range c.members {
|
||||||
for _, u := range m.PeerURLs {
|
ms = append(ms, fmt.Sprintf("%+v", m))
|
||||||
sl = append(sl, fmt.Sprintf("%s=%s", m.Name, u))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
sort.Strings(sl)
|
fmt.Fprintf(b, "Members:[%s] ", strings.Join(ms, " "))
|
||||||
return strings.Join(sl, ",")
|
var ids []string
|
||||||
|
for id, _ := range c.removed {
|
||||||
|
ids = append(ids, fmt.Sprintf("%s", id))
|
||||||
|
}
|
||||||
|
fmt.Fprintf(b, "RemovedMemberIDs:[%s]}", strings.Join(ids, " "))
|
||||||
|
return b.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) genID() {
|
func (c *Cluster) genID() {
|
||||||
|
@ -371,20 +355,6 @@ func (c *Cluster) SetVersion(ver *semver.Version) {
|
||||||
c.version = ver
|
c.version = ver
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate ensures that there is no identical urls in the cluster peer list
|
|
||||||
func (c *Cluster) Validate() error {
|
|
||||||
urlMap := make(map[string]bool)
|
|
||||||
for _, m := range c.Members() {
|
|
||||||
for _, url := range m.PeerURLs {
|
|
||||||
if urlMap[url] {
|
|
||||||
return fmt.Errorf("duplicate url %v in cluster config", url)
|
|
||||||
}
|
|
||||||
urlMap[url] = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) {
|
func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) {
|
||||||
members := make(map[types.ID]*Member)
|
members := make(map[types.ID]*Member)
|
||||||
removed := make(map[types.ID]bool)
|
removed := make(map[types.ID]bool)
|
||||||
|
|
|
@ -21,110 +21,12 @@ import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
|
|
||||||
"github.com/coreos/etcd/pkg/testutil"
|
"github.com/coreos/etcd/pkg/testutil"
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
"github.com/coreos/etcd/store"
|
"github.com/coreos/etcd/store"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestClusterFromString(t *testing.T) {
|
|
||||||
tests := []struct {
|
|
||||||
f string
|
|
||||||
mems []*Member
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
"mem1=http://10.0.0.1:2379,mem1=http://128.193.4.20:2379,mem2=http://10.0.0.2:2379,default=http://127.0.0.1:2379",
|
|
||||||
[]*Member{
|
|
||||||
newTestMember(3141198903430435750, []string{"http://10.0.0.2:2379"}, "mem2", nil),
|
|
||||||
newTestMember(4322322643958477905, []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}, "mem1", nil),
|
|
||||||
newTestMember(12762790032478827328, []string{"http://127.0.0.1:2379"}, "default", nil),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
for i, tt := range tests {
|
|
||||||
c, err := NewClusterFromString("abc", tt.f)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("#%d: unexpected new error: %v", i, err)
|
|
||||||
}
|
|
||||||
if c.token != "abc" {
|
|
||||||
t.Errorf("#%d: token = %v, want abc", i, c.token)
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(c.Members(), tt.mems) {
|
|
||||||
t.Errorf("#%d: members = %+v, want %+v", i, c.Members(), tt.mems)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestClusterFromStringBad(t *testing.T) {
|
|
||||||
tests := []string{
|
|
||||||
// invalid URL
|
|
||||||
"%^",
|
|
||||||
// no URL defined for member
|
|
||||||
"mem1=,mem2=http://128.193.4.20:2379,mem3=http://10.0.0.2:2379",
|
|
||||||
"mem1,mem2=http://128.193.4.20:2379,mem3=http://10.0.0.2:2379",
|
|
||||||
// bad URL for member
|
|
||||||
"default=http://localhost/",
|
|
||||||
// TODO(philips): anyone know of a 64 bit sha1 hash collision
|
|
||||||
// "06b2f82fd81b2c20=http://128.193.4.20:2379,02c60cb75083ceef=http://128.193.4.20:2379",
|
|
||||||
// the same url for two members
|
|
||||||
"mem1=http://128.193.4.20:2379,mem2=http://128.193.4.20:2379",
|
|
||||||
}
|
|
||||||
for i, tt := range tests {
|
|
||||||
if _, err := NewClusterFromString("abc", tt); err == nil {
|
|
||||||
t.Errorf("#%d: unexpected successful new, want err", i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestClusterFromStore(t *testing.T) {
|
|
||||||
tests := []struct {
|
|
||||||
mems []*Member
|
|
||||||
ver *semver.Version
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
[]*Member{newTestMember(1, nil, "", nil)},
|
|
||||||
semver.Must(semver.NewVersion("2.0.0")),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
nil,
|
|
||||||
nil,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
[]*Member{
|
|
||||||
newTestMember(1, nil, "", nil),
|
|
||||||
newTestMember(2, nil, "", nil),
|
|
||||||
},
|
|
||||||
semver.Must(semver.NewVersion("2.0.0")),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
for i, tt := range tests {
|
|
||||||
st := store.New()
|
|
||||||
hc := newTestCluster(nil)
|
|
||||||
hc.SetStore(st)
|
|
||||||
for _, m := range tt.mems {
|
|
||||||
hc.AddMember(m)
|
|
||||||
}
|
|
||||||
if tt.ver != nil {
|
|
||||||
_, err := st.Set(path.Join(StoreClusterPrefix, "version"), false, tt.ver.String(), store.Permanent)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
c := NewClusterFromStore("abc", st)
|
|
||||||
if c.token != "abc" {
|
|
||||||
t.Errorf("#%d: token = %v, want %v", i, c.token, "abc")
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(c.Members(), tt.mems) {
|
|
||||||
t.Errorf("#%d: members = %v, want %v", i, c.Members(), tt.mems)
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(c.Version(), tt.ver) {
|
|
||||||
t.Errorf("#%d: ver = %v, want %v", i, c.Version(), tt.ver)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestClusterMember(t *testing.T) {
|
func TestClusterMember(t *testing.T) {
|
||||||
membs := []*Member{
|
membs := []*Member{
|
||||||
newTestMember(1, nil, "node1", nil),
|
newTestMember(1, nil, "node1", nil),
|
||||||
|
@ -589,49 +491,6 @@ func TestClusterMembers(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestClusterString(t *testing.T) {
|
|
||||||
cls := &Cluster{
|
|
||||||
members: map[types.ID]*Member{
|
|
||||||
1: newTestMember(
|
|
||||||
1,
|
|
||||||
[]string{"http://1.1.1.1:1111", "http://0.0.0.0:0000"},
|
|
||||||
"abc",
|
|
||||||
nil,
|
|
||||||
),
|
|
||||||
2: newTestMember(
|
|
||||||
2,
|
|
||||||
[]string{"http://2.2.2.2:2222"},
|
|
||||||
"def",
|
|
||||||
nil,
|
|
||||||
),
|
|
||||||
3: newTestMember(
|
|
||||||
3,
|
|
||||||
[]string{"http://3.3.3.3:1234", "http://127.0.0.1:2380"},
|
|
||||||
"ghi",
|
|
||||||
nil,
|
|
||||||
),
|
|
||||||
// no PeerURLs = not included
|
|
||||||
4: newTestMember(
|
|
||||||
4,
|
|
||||||
[]string{},
|
|
||||||
"four",
|
|
||||||
nil,
|
|
||||||
),
|
|
||||||
5: newTestMember(
|
|
||||||
5,
|
|
||||||
nil,
|
|
||||||
"five",
|
|
||||||
nil,
|
|
||||||
),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
w := "abc=http://0.0.0.0:0000,abc=http://1.1.1.1:1111,def=http://2.2.2.2:2222,ghi=http://127.0.0.1:2380,ghi=http://3.3.3.3:1234"
|
|
||||||
if g := cls.String(); g != w {
|
|
||||||
t.Fatalf("Cluster.String():\ngot %#v\nwant %#v", g, w)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestClusterRemoveMember(t *testing.T) {
|
func TestClusterRemoveMember(t *testing.T) {
|
||||||
st := &storeRecorder{}
|
st := &storeRecorder{}
|
||||||
c := newTestCluster(nil)
|
c := newTestCluster(nil)
|
||||||
|
|
|
@ -23,24 +23,24 @@ import (
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/netutil"
|
"github.com/coreos/etcd/pkg/netutil"
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
"github.com/coreos/etcd/raft"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ServerConfig holds the configuration of etcd as taken from the command line or discovery.
|
// ServerConfig holds the configuration of etcd as taken from the command line or discovery.
|
||||||
type ServerConfig struct {
|
type ServerConfig struct {
|
||||||
Name string
|
Name string
|
||||||
DiscoveryURL string
|
DiscoveryURL string
|
||||||
DiscoveryProxy string
|
DiscoveryProxy string
|
||||||
ClientURLs types.URLs
|
ClientURLs types.URLs
|
||||||
PeerURLs types.URLs
|
PeerURLs types.URLs
|
||||||
DataDir string
|
DataDir string
|
||||||
SnapCount uint64
|
SnapCount uint64
|
||||||
MaxSnapFiles uint
|
MaxSnapFiles uint
|
||||||
MaxWALFiles uint
|
MaxWALFiles uint
|
||||||
Cluster *Cluster
|
InitialPeerURLsMap types.URLsMap
|
||||||
NewCluster bool
|
InitialClusterToken string
|
||||||
ForceNewCluster bool
|
NewCluster bool
|
||||||
Transport *http.Transport
|
ForceNewCluster bool
|
||||||
|
Transport *http.Transport
|
||||||
|
|
||||||
TickMs uint
|
TickMs uint
|
||||||
ElectionTicks int
|
ElectionTicks int
|
||||||
|
@ -52,10 +52,10 @@ func (c *ServerConfig) VerifyBootstrap() error {
|
||||||
if err := c.verifyLocalMember(true); err != nil {
|
if err := c.verifyLocalMember(true); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := c.Cluster.Validate(); err != nil {
|
if checkDuplicateURL(c.InitialPeerURLsMap) {
|
||||||
return err
|
return fmt.Errorf("initial cluster %s has duplicate url", c.InitialPeerURLsMap)
|
||||||
}
|
}
|
||||||
if c.Cluster.String() == "" && c.DiscoveryURL == "" {
|
if c.InitialPeerURLsMap.String() == "" && c.DiscoveryURL == "" {
|
||||||
return fmt.Errorf("initial cluster unset and no discovery URL found")
|
return fmt.Errorf("initial cluster unset and no discovery URL found")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -70,8 +70,8 @@ func (c *ServerConfig) VerifyJoinExisting() error {
|
||||||
if err := c.verifyLocalMember(false); err != nil {
|
if err := c.verifyLocalMember(false); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := c.Cluster.Validate(); err != nil {
|
if checkDuplicateURL(c.InitialPeerURLsMap) {
|
||||||
return err
|
return fmt.Errorf("initial cluster %s has duplicate url", c.InitialPeerURLsMap)
|
||||||
}
|
}
|
||||||
if c.DiscoveryURL != "" {
|
if c.DiscoveryURL != "" {
|
||||||
return fmt.Errorf("discovery URL should not be set when joining existing initial cluster")
|
return fmt.Errorf("discovery URL should not be set when joining existing initial cluster")
|
||||||
|
@ -83,21 +83,19 @@ func (c *ServerConfig) VerifyJoinExisting() error {
|
||||||
// cluster. If strict is set, it also verifies the configured member
|
// cluster. If strict is set, it also verifies the configured member
|
||||||
// has the same peer urls as configured advertised peer urls.
|
// has the same peer urls as configured advertised peer urls.
|
||||||
func (c *ServerConfig) verifyLocalMember(strict bool) error {
|
func (c *ServerConfig) verifyLocalMember(strict bool) error {
|
||||||
m := c.Cluster.MemberByName(c.Name)
|
urls := c.InitialPeerURLsMap[c.Name]
|
||||||
// Make sure the cluster at least contains the local server.
|
// Make sure the cluster at least contains the local server.
|
||||||
if m == nil {
|
if urls == nil {
|
||||||
return fmt.Errorf("couldn't find local name %q in the initial cluster configuration", c.Name)
|
return fmt.Errorf("couldn't find local name %q in the initial cluster configuration", c.Name)
|
||||||
}
|
}
|
||||||
if uint64(m.ID) == raft.None {
|
|
||||||
return fmt.Errorf("cannot use %x as member id", raft.None)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Advertised peer URLs must match those in the cluster peer list
|
// Advertised peer URLs must match those in the cluster peer list
|
||||||
// TODO: Remove URLStringsEqual after improvement of using hostnames #2150 #2123
|
// TODO: Remove URLStringsEqual after improvement of using hostnames #2150 #2123
|
||||||
apurls := c.PeerURLs.StringSlice()
|
apurls := c.PeerURLs.StringSlice()
|
||||||
sort.Strings(apurls)
|
sort.Strings(apurls)
|
||||||
|
urls.Sort()
|
||||||
if strict {
|
if strict {
|
||||||
if !netutil.URLStringsEqual(apurls, m.PeerURLs) {
|
if !netutil.URLStringsEqual(apurls, urls.StringSlice()) {
|
||||||
return fmt.Errorf("%s has different advertised URLs in the cluster and advertised peer URLs list", c.Name)
|
return fmt.Errorf("%s has different advertised URLs in the cluster and advertised peer URLs list", c.Name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -135,6 +133,20 @@ func (c *ServerConfig) print(initial bool) {
|
||||||
log.Printf("etcdserver: advertise client URLs = %s", c.ClientURLs)
|
log.Printf("etcdserver: advertise client URLs = %s", c.ClientURLs)
|
||||||
if initial {
|
if initial {
|
||||||
log.Printf("etcdserver: initial advertise peer URLs = %s", c.PeerURLs)
|
log.Printf("etcdserver: initial advertise peer URLs = %s", c.PeerURLs)
|
||||||
log.Printf("etcdserver: initial cluster = %s", c.Cluster)
|
log.Printf("etcdserver: initial cluster = %s", c.InitialPeerURLsMap)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func checkDuplicateURL(urlsmap types.URLsMap) bool {
|
||||||
|
um := make(map[string]bool)
|
||||||
|
for _, urls := range urlsmap {
|
||||||
|
for _, url := range urls {
|
||||||
|
u := url.String()
|
||||||
|
if um[u] {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
um[u] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
|
@ -33,14 +33,10 @@ func mustNewURLs(t *testing.T, urls []string) []url.URL {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConfigVerifyBootstrapWithoutClusterAndDiscoveryURLFail(t *testing.T) {
|
func TestConfigVerifyBootstrapWithoutClusterAndDiscoveryURLFail(t *testing.T) {
|
||||||
cluster, err := NewClusterFromString("", "")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("NewClusterFromString error: %v", err)
|
|
||||||
}
|
|
||||||
c := &ServerConfig{
|
c := &ServerConfig{
|
||||||
Name: "node1",
|
Name: "node1",
|
||||||
DiscoveryURL: "",
|
DiscoveryURL: "",
|
||||||
Cluster: cluster,
|
InitialPeerURLsMap: types.URLsMap{},
|
||||||
}
|
}
|
||||||
if err := c.VerifyBootstrap(); err == nil {
|
if err := c.VerifyBootstrap(); err == nil {
|
||||||
t.Errorf("err = nil, want not nil")
|
t.Errorf("err = nil, want not nil")
|
||||||
|
@ -48,16 +44,16 @@ func TestConfigVerifyBootstrapWithoutClusterAndDiscoveryURLFail(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConfigVerifyExistingWithDiscoveryURLFail(t *testing.T) {
|
func TestConfigVerifyExistingWithDiscoveryURLFail(t *testing.T) {
|
||||||
cluster, err := NewClusterFromString("", "node1=http://127.0.0.1:2380")
|
cluster, err := types.NewURLsMap("node1=http://127.0.0.1:2380")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("NewClusterFromString error: %v", err)
|
t.Fatalf("NewCluster error: %v", err)
|
||||||
}
|
}
|
||||||
c := &ServerConfig{
|
c := &ServerConfig{
|
||||||
Name: "node1",
|
Name: "node1",
|
||||||
DiscoveryURL: "http://127.0.0.1:2379/abcdefg",
|
DiscoveryURL: "http://127.0.0.1:2379/abcdefg",
|
||||||
PeerURLs: mustNewURLs(t, []string{"http://127.0.0.1:2380"}),
|
PeerURLs: mustNewURLs(t, []string{"http://127.0.0.1:2380"}),
|
||||||
Cluster: cluster,
|
InitialPeerURLsMap: cluster,
|
||||||
NewCluster: false,
|
NewCluster: false,
|
||||||
}
|
}
|
||||||
if err := c.VerifyJoinExisting(); err == nil {
|
if err := c.VerifyJoinExisting(); err == nil {
|
||||||
t.Errorf("err = nil, want not nil")
|
t.Errorf("err = nil, want not nil")
|
||||||
|
@ -130,20 +126,19 @@ func TestConfigVerifyLocalMember(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
cluster, err := NewClusterFromString("", tt.clusterSetting)
|
cluster, err := types.NewURLsMap(tt.clusterSetting)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("#%d: Got unexpected error: %v", i, err)
|
t.Fatalf("#%d: Got unexpected error: %v", i, err)
|
||||||
}
|
}
|
||||||
cfg := ServerConfig{
|
cfg := ServerConfig{
|
||||||
Name: "node1",
|
Name: "node1",
|
||||||
Cluster: cluster,
|
InitialPeerURLsMap: cluster,
|
||||||
}
|
}
|
||||||
if tt.apurls != nil {
|
if tt.apurls != nil {
|
||||||
cfg.PeerURLs = mustNewURLs(t, tt.apurls)
|
cfg.PeerURLs = mustNewURLs(t, tt.apurls)
|
||||||
}
|
}
|
||||||
err = cfg.verifyLocalMember(tt.strict)
|
err = cfg.verifyLocalMember(tt.strict)
|
||||||
if (err == nil) && tt.shouldError {
|
if (err == nil) && tt.shouldError {
|
||||||
t.Errorf("%#v", *cluster)
|
|
||||||
t.Errorf("#%d: Got no error where one was expected", i)
|
t.Errorf("#%d: Got no error where one was expected", i)
|
||||||
}
|
}
|
||||||
if (err != nil) && !tt.shouldError {
|
if (err != nil) && !tt.shouldError {
|
||||||
|
|
|
@ -192,13 +192,13 @@ func (r *raftNode) resumeSending() {
|
||||||
p.Resume()
|
p.Resume()
|
||||||
}
|
}
|
||||||
|
|
||||||
func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
|
func startNode(cfg *ServerConfig, cl *Cluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
|
||||||
var err error
|
var err error
|
||||||
member := cfg.Cluster.MemberByName(cfg.Name)
|
member := cl.MemberByName(cfg.Name)
|
||||||
metadata := pbutil.MustMarshal(
|
metadata := pbutil.MustMarshal(
|
||||||
&pb.Metadata{
|
&pb.Metadata{
|
||||||
NodeID: uint64(member.ID),
|
NodeID: uint64(member.ID),
|
||||||
ClusterID: uint64(cfg.Cluster.ID()),
|
ClusterID: uint64(cl.ID()),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil {
|
if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil {
|
||||||
|
@ -209,14 +209,14 @@ func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *
|
||||||
}
|
}
|
||||||
peers := make([]raft.Peer, len(ids))
|
peers := make([]raft.Peer, len(ids))
|
||||||
for i, id := range ids {
|
for i, id := range ids {
|
||||||
ctx, err := json.Marshal((*cfg.Cluster).Member(id))
|
ctx, err := json.Marshal((*cl).Member(id))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("marshal member should never fail: %v", err)
|
log.Panicf("marshal member should never fail: %v", err)
|
||||||
}
|
}
|
||||||
peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
|
peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
|
||||||
}
|
}
|
||||||
id = member.ID
|
id = member.ID
|
||||||
log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID())
|
log.Printf("etcdserver: start member %s in cluster %s", id, cl.ID())
|
||||||
s = raft.NewMemoryStorage()
|
s = raft.NewMemoryStorage()
|
||||||
c := &raft.Config{
|
c := &raft.Config{
|
||||||
ID: uint64(id),
|
ID: uint64(id),
|
||||||
|
@ -231,15 +231,16 @@ func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
|
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *Cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
|
||||||
var walsnap walpb.Snapshot
|
var walsnap walpb.Snapshot
|
||||||
if snapshot != nil {
|
if snapshot != nil {
|
||||||
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||||
}
|
}
|
||||||
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
|
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
|
||||||
cfg.Cluster.SetID(cid)
|
|
||||||
|
|
||||||
log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
|
log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cid, st.Commit)
|
||||||
|
cl := newCluster("")
|
||||||
|
cl.SetID(cid)
|
||||||
s := raft.NewMemoryStorage()
|
s := raft.NewMemoryStorage()
|
||||||
if snapshot != nil {
|
if snapshot != nil {
|
||||||
s.ApplySnapshot(*snapshot)
|
s.ApplySnapshot(*snapshot)
|
||||||
|
@ -256,16 +257,15 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.N
|
||||||
}
|
}
|
||||||
n := raft.RestartNode(c)
|
n := raft.RestartNode(c)
|
||||||
raftStatus = n.Status
|
raftStatus = n.Status
|
||||||
return id, n, s, w
|
return id, cl, n, s, w
|
||||||
}
|
}
|
||||||
|
|
||||||
func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
|
func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *Cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
|
||||||
var walsnap walpb.Snapshot
|
var walsnap walpb.Snapshot
|
||||||
if snapshot != nil {
|
if snapshot != nil {
|
||||||
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||||
}
|
}
|
||||||
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
|
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
|
||||||
cfg.Cluster.SetID(cid)
|
|
||||||
|
|
||||||
// discard the previously uncommitted entries
|
// discard the previously uncommitted entries
|
||||||
for i, ent := range ents {
|
for i, ent := range ents {
|
||||||
|
@ -289,7 +289,9 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type
|
||||||
st.Commit = ents[len(ents)-1].Index
|
st.Commit = ents[len(ents)-1].Index
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("etcdserver: forcing restart of member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
|
log.Printf("etcdserver: forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit)
|
||||||
|
cl := newCluster("")
|
||||||
|
cl.SetID(cid)
|
||||||
s := raft.NewMemoryStorage()
|
s := raft.NewMemoryStorage()
|
||||||
if snapshot != nil {
|
if snapshot != nil {
|
||||||
s.ApplySnapshot(*snapshot)
|
s.ApplySnapshot(*snapshot)
|
||||||
|
@ -306,7 +308,7 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type
|
||||||
}
|
}
|
||||||
n := raft.RestartNode(c)
|
n := raft.RestartNode(c)
|
||||||
raftStatus = n.Status
|
raftStatus = n.Status
|
||||||
return id, n, s, w
|
return id, cl, n, s, w
|
||||||
}
|
}
|
||||||
|
|
||||||
// getIDs returns an ordered set of IDs included in the given snapshot and
|
// getIDs returns an ordered set of IDs included in the given snapshot and
|
||||||
|
|
|
@ -178,6 +178,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||||
var n raft.Node
|
var n raft.Node
|
||||||
var s *raft.MemoryStorage
|
var s *raft.MemoryStorage
|
||||||
var id types.ID
|
var id types.ID
|
||||||
|
var cl *Cluster
|
||||||
|
|
||||||
// Run the migrations.
|
// Run the migrations.
|
||||||
dataVer, err := version.DetectDataDir(cfg.DataDir)
|
dataVer, err := version.DetectDataDir(cfg.DataDir)
|
||||||
|
@ -197,41 +198,53 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||||
if err := cfg.VerifyJoinExisting(); err != nil {
|
if err := cfg.VerifyJoinExisting(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cfg.Cluster, cfg.Name), cfg.Transport)
|
cl, err = NewCluster(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), cfg.Transport)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", err)
|
return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", err)
|
||||||
}
|
}
|
||||||
if err := ValidateClusterAndAssignIDs(cfg.Cluster, existingCluster); err != nil {
|
if err := ValidateClusterAndAssignIDs(cl, existingCluster); err != nil {
|
||||||
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
|
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
|
||||||
}
|
}
|
||||||
remotes = existingCluster.Members()
|
remotes = existingCluster.Members()
|
||||||
cfg.Cluster.SetID(existingCluster.id)
|
cl.SetID(existingCluster.id)
|
||||||
cfg.Cluster.SetStore(st)
|
cl.SetStore(st)
|
||||||
cfg.Print()
|
cfg.Print()
|
||||||
id, n, s, w = startNode(cfg, nil)
|
id, n, s, w = startNode(cfg, cl, nil)
|
||||||
case !haveWAL && cfg.NewCluster:
|
case !haveWAL && cfg.NewCluster:
|
||||||
if err := cfg.VerifyBootstrap(); err != nil {
|
if err := cfg.VerifyBootstrap(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
m := cfg.Cluster.MemberByName(cfg.Name)
|
cl, err = NewCluster(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
|
||||||
if isMemberBootstrapped(cfg.Cluster, cfg.Name, cfg.Transport) {
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
m := cl.MemberByName(cfg.Name)
|
||||||
|
if isMemberBootstrapped(cl, cfg.Name, cfg.Transport) {
|
||||||
return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID)
|
return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID)
|
||||||
}
|
}
|
||||||
if cfg.ShouldDiscover() {
|
if cfg.ShouldDiscover() {
|
||||||
str, err := discovery.JoinCluster(cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.Cluster.String())
|
str, err := discovery.JoinCluster(cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if cfg.Cluster, err = NewClusterFromString(cfg.Cluster.token, str); err != nil {
|
urlsmap, err := types.NewURLsMap(str)
|
||||||
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := cfg.Cluster.Validate(); err != nil {
|
if checkDuplicateURL(urlsmap) {
|
||||||
return nil, fmt.Errorf("bad discovery cluster: %v", err)
|
return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap)
|
||||||
|
}
|
||||||
|
if cl, err = NewCluster(cfg.InitialClusterToken, urlsmap); err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
cfg.Cluster.SetStore(st)
|
cl.SetStore(st)
|
||||||
cfg.PrintWithInitial()
|
cfg.PrintWithInitial()
|
||||||
id, n, s, w = startNode(cfg, cfg.Cluster.MemberIDs())
|
id, n, s, w = startNode(cfg, cl, cl.MemberIDs())
|
||||||
case haveWAL:
|
case haveWAL:
|
||||||
if err := fileutil.IsDirWriteable(cfg.DataDir); err != nil {
|
if err := fileutil.IsDirWriteable(cfg.DataDir); err != nil {
|
||||||
return nil, fmt.Errorf("cannot write to data directory: %v", err)
|
return nil, fmt.Errorf("cannot write to data directory: %v", err)
|
||||||
|
@ -254,16 +267,17 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||||
}
|
}
|
||||||
log.Printf("etcdserver: recovered store from snapshot at index %d", snapshot.Metadata.Index)
|
log.Printf("etcdserver: recovered store from snapshot at index %d", snapshot.Metadata.Index)
|
||||||
}
|
}
|
||||||
cfg.Cluster = NewClusterFromStore(cfg.Cluster.token, st)
|
|
||||||
cfg.Print()
|
cfg.Print()
|
||||||
if snapshot != nil {
|
if snapshot != nil {
|
||||||
log.Printf("etcdserver: loaded cluster information from store: %s", cfg.Cluster)
|
log.Printf("etcdserver: loaded cluster information from store: %s", cl)
|
||||||
}
|
}
|
||||||
if !cfg.ForceNewCluster {
|
if !cfg.ForceNewCluster {
|
||||||
id, n, s, w = restartNode(cfg, snapshot)
|
id, cl, n, s, w = restartNode(cfg, snapshot)
|
||||||
} else {
|
} else {
|
||||||
id, n, s, w = restartAsStandaloneNode(cfg, snapshot)
|
id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot)
|
||||||
}
|
}
|
||||||
|
cl.SetStore(st)
|
||||||
|
cl.Recover()
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unsupported bootstrap config")
|
return nil, fmt.Errorf("unsupported bootstrap config")
|
||||||
}
|
}
|
||||||
|
@ -288,7 +302,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||||
},
|
},
|
||||||
id: id,
|
id: id,
|
||||||
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
|
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
|
||||||
Cluster: cfg.Cluster,
|
Cluster: cl,
|
||||||
stats: sstats,
|
stats: sstats,
|
||||||
lstats: lstats,
|
lstats: lstats,
|
||||||
SyncTicker: time.Tick(500 * time.Millisecond),
|
SyncTicker: time.Tick(500 * time.Millisecond),
|
||||||
|
@ -297,14 +311,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: move transport initialization near the definition of remote
|
// TODO: move transport initialization near the definition of remote
|
||||||
tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats)
|
tr := rafthttp.NewTransporter(cfg.Transport, id, cl.ID(), srv, srv.errorc, sstats, lstats)
|
||||||
// add all remotes into transport
|
// add all remotes into transport
|
||||||
for _, m := range remotes {
|
for _, m := range remotes {
|
||||||
if m.ID != id {
|
if m.ID != id {
|
||||||
tr.AddRemote(m.ID, m.PeerURLs)
|
tr.AddRemote(m.ID, m.PeerURLs)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, m := range cfg.Cluster.Members() {
|
for _, m := range cl.Members() {
|
||||||
if m.ID != id {
|
if m.ID != id {
|
||||||
tr.AddPeer(m.ID, m.PeerURLs)
|
tr.AddPeer(m.ID, m.PeerURLs)
|
||||||
}
|
}
|
||||||
|
|
|
@ -301,7 +301,7 @@ type cluster struct {
|
||||||
Members []*member
|
Members []*member
|
||||||
}
|
}
|
||||||
|
|
||||||
func fillClusterForMembers(ms []*member, cName string) error {
|
func fillClusterForMembers(ms []*member) error {
|
||||||
addrs := make([]string, 0)
|
addrs := make([]string, 0)
|
||||||
for _, m := range ms {
|
for _, m := range ms {
|
||||||
scheme := "http"
|
scheme := "http"
|
||||||
|
@ -315,7 +315,7 @@ func fillClusterForMembers(ms []*member, cName string) error {
|
||||||
clusterStr := strings.Join(addrs, ",")
|
clusterStr := strings.Join(addrs, ",")
|
||||||
var err error
|
var err error
|
||||||
for _, m := range ms {
|
for _, m := range ms {
|
||||||
m.Cluster, err = etcdserver.NewClusterFromString(cName, clusterStr)
|
m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -330,7 +330,7 @@ func newCluster(t *testing.T, size int, usePeerTLS bool) *cluster {
|
||||||
ms[i] = mustNewMember(t, c.name(i), usePeerTLS)
|
ms[i] = mustNewMember(t, c.name(i), usePeerTLS)
|
||||||
}
|
}
|
||||||
c.Members = ms
|
c.Members = ms
|
||||||
if err := fillClusterForMembers(c.Members, clusterName); err != nil {
|
if err := fillClusterForMembers(c.Members); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -420,7 +420,6 @@ func (c *cluster) HTTPMembers() []client.Member {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cluster) addMember(t *testing.T, usePeerTLS bool) {
|
func (c *cluster) addMember(t *testing.T, usePeerTLS bool) {
|
||||||
clusterStr := c.Members[0].Cluster.String()
|
|
||||||
m := mustNewMember(t, c.name(rand.Int()), usePeerTLS)
|
m := mustNewMember(t, c.name(rand.Int()), usePeerTLS)
|
||||||
scheme := "http"
|
scheme := "http"
|
||||||
if usePeerTLS {
|
if usePeerTLS {
|
||||||
|
@ -441,14 +440,11 @@ func (c *cluster) addMember(t *testing.T, usePeerTLS bool) {
|
||||||
members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
|
members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
|
||||||
c.waitMembersMatch(t, members)
|
c.waitMembersMatch(t, members)
|
||||||
|
|
||||||
for _, ln := range m.PeerListeners {
|
m.InitialPeerURLsMap = types.URLsMap{}
|
||||||
clusterStr += fmt.Sprintf(",%s=%s://%s", m.Name, scheme, ln.Addr().String())
|
for _, mm := range c.Members {
|
||||||
}
|
m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
|
||||||
var err error
|
|
||||||
m.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
}
|
||||||
|
m.InitialPeerURLsMap[m.Name] = m.PeerURLs
|
||||||
m.NewCluster = false
|
m.NewCluster = false
|
||||||
if err := m.Launch(); err != nil {
|
if err := m.Launch(); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -645,10 +641,11 @@ func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
clusterStr := fmt.Sprintf("%s=%s://%s", name, peerScheme, pln.Addr().String())
|
clusterStr := fmt.Sprintf("%s=%s://%s", name, peerScheme, pln.Addr().String())
|
||||||
m.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr)
|
m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
m.InitialClusterToken = clusterName
|
||||||
m.NewCluster = true
|
m.NewCluster = true
|
||||||
m.Transport = mustNewTransport(t, m.PeerTLSInfo)
|
m.Transport = mustNewTransport(t, m.PeerTLSInfo)
|
||||||
m.ElectionTicks = electionTicks
|
m.ElectionTicks = electionTicks
|
||||||
|
@ -675,12 +672,13 @@ func (m *member) Clone(t *testing.T) *member {
|
||||||
// this should never fail
|
// this should never fail
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
clusterStr := m.Cluster.String()
|
clusterStr := m.InitialPeerURLsMap.String()
|
||||||
mm.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr)
|
mm.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// this should never fail
|
// this should never fail
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
mm.InitialClusterToken = m.InitialClusterToken
|
||||||
mm.Transport = mustNewTransport(t, m.PeerTLSInfo)
|
mm.Transport = mustNewTransport(t, m.PeerTLSInfo)
|
||||||
mm.ElectionTicks = m.ElectionTicks
|
mm.ElectionTicks = m.ElectionTicks
|
||||||
mm.PeerTLSInfo = m.PeerTLSInfo
|
mm.PeerTLSInfo = m.PeerTLSInfo
|
||||||
|
|
|
@ -0,0 +1,71 @@
|
||||||
|
// Copyright 2015 CoreOS, Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package types
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net/url"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type URLsMap map[string]URLs
|
||||||
|
|
||||||
|
// NewURLsMap returns a URLsMap instantiated from the given string,
|
||||||
|
// which consists of discovery-formatted names-to-URLs, like:
|
||||||
|
// mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach1=http://3.3.3.3,mach2=http://4.4.4.4
|
||||||
|
func NewURLsMap(s string) (URLsMap, error) {
|
||||||
|
cl := URLsMap{}
|
||||||
|
v, err := url.ParseQuery(strings.Replace(s, ",", "&", -1))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for name, urls := range v {
|
||||||
|
if len(urls) == 0 || urls[0] == "" {
|
||||||
|
return nil, fmt.Errorf("empty URL given for %q", name)
|
||||||
|
}
|
||||||
|
us, err := NewURLs(urls)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
cl[name] = us
|
||||||
|
}
|
||||||
|
return cl, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// String returns NameURLPairs into discovery-formatted name-to-URLs sorted by name.
|
||||||
|
func (c URLsMap) String() string {
|
||||||
|
pairs := make([]string, 0)
|
||||||
|
for name, urls := range c {
|
||||||
|
for _, url := range urls {
|
||||||
|
pairs = append(pairs, fmt.Sprintf("%s=%s", name, url.String()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sort.Strings(pairs)
|
||||||
|
return strings.Join(pairs, ",")
|
||||||
|
}
|
||||||
|
|
||||||
|
// URLs returns a list of all URLs.
|
||||||
|
// The returned list is sorted in ascending lexicographical order.
|
||||||
|
func (c URLsMap) URLs() []string {
|
||||||
|
urls := make([]string, 0)
|
||||||
|
for _, us := range c {
|
||||||
|
for _, u := range us {
|
||||||
|
urls = append(urls, u.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sort.Strings(urls)
|
||||||
|
return urls
|
||||||
|
}
|
|
@ -0,0 +1,69 @@
|
||||||
|
// Copyright 2015 CoreOS, Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package types
|
||||||
|
|
||||||
|
import (
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/coreos/etcd/pkg/testutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestParseInitialCluster(t *testing.T) {
|
||||||
|
c, err := NewURLsMap("mem1=http://10.0.0.1:2379,mem1=http://128.193.4.20:2379,mem2=http://10.0.0.2:2379,default=http://127.0.0.1:2379")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected parse error: %v", err)
|
||||||
|
}
|
||||||
|
wc := URLsMap(map[string]URLs{
|
||||||
|
"mem1": testutil.MustNewURLs(t, []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}),
|
||||||
|
"mem2": testutil.MustNewURLs(t, []string{"http://10.0.0.2:2379"}),
|
||||||
|
"default": testutil.MustNewURLs(t, []string{"http://127.0.0.1:2379"}),
|
||||||
|
})
|
||||||
|
if !reflect.DeepEqual(c, wc) {
|
||||||
|
t.Errorf("cluster = %+v, want %+v", c, wc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParseInitialClusterBad(t *testing.T) {
|
||||||
|
tests := []string{
|
||||||
|
// invalid URL
|
||||||
|
"%^",
|
||||||
|
// no URL defined for member
|
||||||
|
"mem1=,mem2=http://128.193.4.20:2379,mem3=http://10.0.0.2:2379",
|
||||||
|
"mem1,mem2=http://128.193.4.20:2379,mem3=http://10.0.0.2:2379",
|
||||||
|
// bad URL for member
|
||||||
|
"default=http://localhost/",
|
||||||
|
}
|
||||||
|
for i, tt := range tests {
|
||||||
|
if _, err := NewURLsMap(tt); err == nil {
|
||||||
|
t.Errorf("#%d: unexpected successful parse, want err", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNameURLPairsString(t *testing.T) {
|
||||||
|
cls := URLsMap(map[string]URLs{
|
||||||
|
"abc": testutil.MustNewURLs(t, []string{"http://1.1.1.1:1111", "http://0.0.0.0:0000"}),
|
||||||
|
"def": testutil.MustNewURLs(t, []string{"http://2.2.2.2:2222"}),
|
||||||
|
"ghi": testutil.MustNewURLs(t, []string{"http://3.3.3.3:1234", "http://127.0.0.1:2380"}),
|
||||||
|
// no PeerURLs = not included
|
||||||
|
"four": testutil.MustNewURLs(t, []string{}),
|
||||||
|
"five": testutil.MustNewURLs(t, nil),
|
||||||
|
})
|
||||||
|
w := "abc=http://0.0.0.0:0000,abc=http://1.1.1.1:1111,def=http://2.2.2.2:2222,ghi=http://127.0.0.1:2380,ghi=http://3.3.3.3:1234"
|
||||||
|
if g := cls.String(); g != w {
|
||||||
|
t.Fatalf("NameURLPairs.String():\ngot %#v\nwant %#v", g, w)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue