etcdserver: validate new node is not registered before in best effort
parent
1e1535e6f9
commit
ac907d746b
|
@ -214,6 +214,9 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|||
if err := cfg.VerifyBootstrapConfig(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := checkClientURLsEmptyFromPeers(cfg.Cluster, cfg.Name); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m := cfg.Cluster.MemberByName(cfg.Name)
|
||||
if cfg.ShouldDiscover() {
|
||||
s, err := discovery.JoinCluster(cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.Cluster.String())
|
||||
|
@ -732,31 +735,70 @@ func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) {
|
|||
}
|
||||
}
|
||||
|
||||
// checkClientURLsEmptyFromPeers does its best to get the cluster from peers,
|
||||
// and if this succeeds, checks that the member of the given id exists in the
|
||||
// cluster, and its ClientURLs is empty.
|
||||
func checkClientURLsEmptyFromPeers(cl *Cluster, name string) error {
|
||||
us := getOtherPeerURLs(cl, name)
|
||||
rcl, err := getClusterFromPeers(us, false)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
id := cl.MemberByName(name).ID
|
||||
m := rcl.Member(id)
|
||||
if m == nil {
|
||||
return nil
|
||||
}
|
||||
if len(m.ClientURLs) > 0 {
|
||||
return fmt.Errorf("etcdserver: member with id %s has started and registered its client urls", id)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetClusterFromPeers takes a set of URLs representing etcd peers, and
|
||||
// attempts to construct a Cluster by accessing the members endpoint on one of
|
||||
// these URLs. The first URL to provide a response is used. If no URLs provide
|
||||
// a response, or a Cluster cannot be successfully created from a received
|
||||
// response, an error is returned.
|
||||
func GetClusterFromPeers(urls []string) (*Cluster, error) {
|
||||
return getClusterFromPeers(urls, true)
|
||||
}
|
||||
|
||||
// If logerr is true, it prints out more error messages.
|
||||
func getClusterFromPeers(urls []string, logerr bool) (*Cluster, error) {
|
||||
cc := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
ResponseHeaderTimeout: 500 * time.Millisecond,
|
||||
},
|
||||
Timeout: time.Second,
|
||||
}
|
||||
for _, u := range urls {
|
||||
resp, err := http.Get(u + "/members")
|
||||
resp, err := cc.Get(u + "/members")
|
||||
if err != nil {
|
||||
log.Printf("etcdserver: could not get cluster response from %s: %v", u, err)
|
||||
if logerr {
|
||||
log.Printf("etcdserver: could not get cluster response from %s: %v", u, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Printf("etcdserver: could not read the body of cluster response: %v", err)
|
||||
if logerr {
|
||||
log.Printf("etcdserver: could not read the body of cluster response: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
var membs []*Member
|
||||
if err := json.Unmarshal(b, &membs); err != nil {
|
||||
log.Printf("etcdserver: could not unmarshal cluster response: %v", err)
|
||||
if logerr {
|
||||
log.Printf("etcdserver: could not unmarshal cluster response: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
id, err := types.IDFromString(resp.Header.Get("X-Etcd-Cluster-ID"))
|
||||
if err != nil {
|
||||
log.Printf("etcdserver: could not parse the cluster ID from cluster res: %v", err)
|
||||
if logerr {
|
||||
log.Printf("etcdserver: could not parse the cluster ID from cluster res: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
return NewClusterFromMembers("", id, membs), nil
|
||||
|
|
|
@ -99,6 +99,23 @@ func testDoubleClusterSize(t *testing.T, size int) {
|
|||
clusterMustProgress(t, c)
|
||||
}
|
||||
|
||||
func TestLaunchDuplicateMemberShouldFail(t *testing.T) {
|
||||
size := 3
|
||||
c := NewCluster(t, size)
|
||||
m := c.Members[0].Clone()
|
||||
var err error
|
||||
m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
c.Launch(t)
|
||||
defer c.Terminate(t)
|
||||
|
||||
if err := m.Launch(); err == nil {
|
||||
t.Errorf("unexpect successful launch")
|
||||
}
|
||||
}
|
||||
|
||||
// clusterMustProgress ensures that cluster can make progress. It creates
|
||||
// a key first, and check the new key could be got from all client urls of
|
||||
// the cluster.
|
||||
|
@ -339,6 +356,35 @@ func mustNewMember(t *testing.T, name string) *member {
|
|||
return m
|
||||
}
|
||||
|
||||
// Clone returns a member with the same server configuration. The returned
|
||||
// member will not set PeerListeners and ClientListeners.
|
||||
func (m *member) Clone() *member {
|
||||
mm := &member{}
|
||||
mm.ServerConfig = m.ServerConfig
|
||||
|
||||
var err error
|
||||
clientURLStrs := m.ClientURLs.StringSlice()
|
||||
mm.ClientURLs, err = types.NewURLs(clientURLStrs)
|
||||
if err != nil {
|
||||
// this should never fail
|
||||
panic(err)
|
||||
}
|
||||
peerURLStrs := m.PeerURLs.StringSlice()
|
||||
mm.PeerURLs, err = types.NewURLs(peerURLStrs)
|
||||
if err != nil {
|
||||
// this should never fail
|
||||
panic(err)
|
||||
}
|
||||
clusterStr := m.Cluster.String()
|
||||
mm.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr)
|
||||
if err != nil {
|
||||
// this should never fail
|
||||
panic(err)
|
||||
}
|
||||
mm.Transport = newTransport()
|
||||
return mm
|
||||
}
|
||||
|
||||
// Launch starts a member based on ServerConfig, PeerListeners
|
||||
// and ClientListeners.
|
||||
func (m *member) Launch() error {
|
||||
|
|
Loading…
Reference in New Issue