Merge pull request #1383 from unihorn/183

etcdserver: support newly-join member bootstrap
release-2.0
Yicheng Qin 2014-10-24 12:43:25 -07:00
commit d1b57b448d
5 changed files with 164 additions and 6 deletions

View File

@ -24,6 +24,7 @@ import (
"log"
"net/url"
"path"
"reflect"
"sort"
"strings"
@ -118,6 +119,15 @@ func NewClusterFromStore(name string, st store.Store) *Cluster {
return c
}
func NewClusterFromMembers(name string, id uint64, membs []*Member) *Cluster {
c := newCluster(name)
c.id = id
for _, m := range membs {
c.members[m.ID] = m
}
return c
}
func newCluster(name string) *Cluster {
return &Cluster{
name: name,
@ -214,6 +224,33 @@ func (c Cluster) String() string {
return strings.Join(sl, ",")
}
// ValidateAndAssignIDs validates the given members by matching their PeerURLs
// with the existing members in the cluster. If the validation succeeds, it
// assigns the IDs from the given members to the existing members in the
// cluster. If the validation fails, an error will be returned.
func (c *Cluster) ValidateAndAssignIDs(membs []*Member) error {
if len(c.members) != len(membs) {
return fmt.Errorf("cannot update %v from %v because the member count is unequal", c.members, membs)
}
omembs := make([]*Member, 0)
for _, m := range c.members {
omembs = append(omembs, m)
}
sort.Sort(SortableMemberSliceByPeerURLs(omembs))
sort.Sort(SortableMemberSliceByPeerURLs(membs))
for i := range omembs {
if !reflect.DeepEqual(omembs[i].PeerURLs, membs[i].PeerURLs) {
return fmt.Errorf("unmatched member while checking PeerURLs")
}
omembs[i].ID = membs[i].ID
}
c.members = make(map[uint64]*Member)
for _, m := range omembs {
c.members[m.ID] = m
}
return nil
}
func (c *Cluster) genID() {
mIDs := c.MemberIDs()
b := make([]byte, 8*len(mIDs))

View File

@ -21,12 +21,14 @@ import (
)
const (
ClusterStateValueNew = "new"
ClusterStateValueNew = "new"
ClusterStateValueExisting = "existing"
)
var (
ClusterStateValues = []string{
ClusterStateValueNew,
ClusterStateValueExisting,
}
)

View File

@ -282,6 +282,76 @@ func TestClusterClientURLs(t *testing.T) {
}
}
func TestClusterValidateAndAssignIDsBad(t *testing.T) {
tests := []struct {
clmembs []Member
membs []*Member
}{
{
// unmatched length
[]Member{
newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil),
},
[]*Member{},
},
{
// unmatched peer urls
[]Member{
newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil),
},
[]*Member{
newTestMemberp(1, []string{"http://127.0.0.1:4001"}, "", nil),
},
},
{
// unmatched peer urls
[]Member{
newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil),
newTestMember(2, []string{"http://127.0.0.2:2379"}, "", nil),
},
[]*Member{
newTestMemberp(1, []string{"http://127.0.0.1:2379"}, "", nil),
newTestMemberp(2, []string{"http://127.0.0.2:4001"}, "", nil),
},
},
}
for i, tt := range tests {
cl := newTestCluster(tt.clmembs)
if err := cl.ValidateAndAssignIDs(tt.membs); err == nil {
t.Errorf("#%d: unexpected update success", i)
}
}
}
func TestClusterValidateAndAssignIDs(t *testing.T) {
tests := []struct {
clmembs []Member
membs []*Member
wids []uint64
}{
{
[]Member{
newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil),
newTestMember(2, []string{"http://127.0.0.2:2379"}, "", nil),
},
[]*Member{
newTestMemberp(3, []string{"http://127.0.0.1:2379"}, "", nil),
newTestMemberp(4, []string{"http://127.0.0.2:2379"}, "", nil),
},
[]uint64{3, 4},
},
}
for i, tt := range tests {
cl := newTestCluster(tt.clmembs)
if err := cl.ValidateAndAssignIDs(tt.membs); err != nil {
t.Errorf("#%d: unexpect update error: %v", i, err)
}
if !reflect.DeepEqual(cl.MemberIDs(), tt.wids) {
t.Errorf("#%d: ids = %v, want %v", i, cl.MemberIDs(), tt.wids)
}
}
}
func TestClusterGenID(t *testing.T) {
cs := newTestCluster([]Member{
newTestMember(1, nil, "", nil),

View File

@ -96,3 +96,11 @@ func parseMemberID(key string) uint64 {
func removedMemberStoreKey(id uint64) string {
return path.Join(storeRemovedMembersPrefix, idAsHex(id))
}
type SortableMemberSliceByPeerURLs []*Member
func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) }
func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool {
return p[i].PeerURLs[0] < p[j].PeerURLs[0]
}
func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

View File

@ -19,8 +19,10 @@ package etcdserver
import (
"encoding/json"
"errors"
"io/ioutil"
"log"
"math/rand"
"net/http"
"os"
"path"
"strconv"
@ -176,7 +178,17 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
var w *wal.WAL
var n raft.Node
var id uint64
if !wal.Exist(cfg.WALDir()) {
haveWAL := wal.Exist(cfg.WALDir())
switch {
case !haveWAL && cfg.ClusterState == ClusterStateValueExisting:
cl := getClusterFromPeers(cfg.Cluster.PeerURLs())
if err := cfg.Cluster.ValidateAndAssignIDs(cl.Members()); err != nil {
log.Fatalf("etcdserver: %v", err)
}
cfg.Cluster.SetID(cl.id)
cfg.Cluster.SetStore(st)
id, n, w = startNode(cfg, nil)
case !haveWAL && cfg.ClusterState == ClusterStateValueNew:
if err := cfg.VerifyBootstrapConfig(); err != nil {
log.Fatalf("etcdserver: %v", err)
}
@ -195,8 +207,8 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
}
}
cfg.Cluster.SetStore(st)
id, n, w = startNode(cfg)
} else {
id, n, w = startNode(cfg, cfg.Cluster.MemberIDs())
case haveWAL:
if cfg.ShouldDiscover() {
log.Printf("etcdserver: warn: ignoring discovery: etcd has already been initialized and has a valid log in %q", cfg.WALDir())
}
@ -212,6 +224,8 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
}
cfg.Cluster = NewClusterFromStore(cfg.Cluster.name, st)
id, n, w = restartNode(cfg, index, snapshot)
default:
log.Fatalf("etcdserver: unsupported bootstrap config")
}
sstats := &stats.ServerStats{
@ -642,7 +656,35 @@ func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) {
s.storage.Cut()
}
func startNode(cfg *ServerConfig) (id uint64, n raft.Node, w *wal.WAL) {
func getClusterFromPeers(urls []string) *Cluster {
for _, u := range urls {
resp, err := http.Get(u + "/members")
if err != nil {
log.Printf("etcdserver: get /members on %s: %v", u, err)
continue
}
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("etcdserver: read body error: %v", err)
continue
}
var membs []*Member
if err := json.Unmarshal(b, &membs); err != nil {
log.Printf("etcdserver: unmarshal body error: %v", err)
continue
}
id, err := strconv.ParseUint(resp.Header.Get("X-Etcd-Cluster-ID"), 16, 64)
if err != nil {
log.Printf("etcdserver: parse uint error: %v", err)
continue
}
return NewClusterFromMembers("", id, membs)
}
log.Fatalf("etcdserver: could not retrieve cluster information from the given urls")
return nil
}
func startNode(cfg *ServerConfig, ids []uint64) (id uint64, n raft.Node, w *wal.WAL) {
var err error
// TODO: remove the discoveryURL when it becomes part of the source for
// generating nodeID.
@ -651,7 +693,6 @@ func startNode(cfg *ServerConfig) (id uint64, n raft.Node, w *wal.WAL) {
if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
log.Fatal(err)
}
ids := cfg.Cluster.MemberIDs()
peers := make([]raft.Peer, len(ids))
for i, id := range ids {
ctx, err := json.Marshal((*cfg.Cluster).Member(id))