diff --git a/etcdserver/server.go b/etcdserver/server.go index 7fcc58dc1..cb4b93e6f 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -198,10 +198,10 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if err := cfg.VerifyBootstrapConfig(); err != nil { return nil, err } - if err := checkClientURLsEmptyFromPeers(cfg.Cluster, cfg.Name); err != nil { - return nil, err - } m := cfg.Cluster.MemberByName(cfg.Name) + if isBootstrapped(cfg.Cluster, cfg.Name) { + return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID) + } if cfg.ShouldDiscover() { s, err := discovery.JoinCluster(cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.Cluster.String()) if err != nil { @@ -216,7 +216,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { id, n, s, w = startNode(cfg, cfg.Cluster.MemberIDs()) case haveWAL: if walVersion != wal.WALv0_5 { - if err := UpgradeWAL(cfg, walVersion); err != nil { + if err := upgradeWAL(cfg, walVersion); err != nil { return nil, err } } @@ -837,24 +837,68 @@ func (s *EtcdServer) ResumeSending() { hub.resume() } -// checkClientURLsEmptyFromPeers does its best to get the cluster from peers, -// and if this succeeds, checks that the member of the given id exists in the -// cluster, and its ClientURLs is empty. -func checkClientURLsEmptyFromPeers(cl *Cluster, name string) error { - us := getOtherPeerURLs(cl, name) +func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { + var err error + member := cfg.Cluster.MemberByName(cfg.Name) + metadata := pbutil.MustMarshal( + &pb.Metadata{ + NodeID: uint64(member.ID), + ClusterID: uint64(cfg.Cluster.ID()), + }, + ) + if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil { + log.Fatalf("etcdserver create snapshot directory error: %v", err) + } + if w, err = wal.Create(cfg.WALDir(), metadata); err != nil { + log.Fatalf("etcdserver: create wal error: %v", err) + } + peers := make([]raft.Peer, len(ids)) + for i, id := range ids { + ctx, err := json.Marshal((*cfg.Cluster).Member(id)) + if err != nil { + log.Panicf("marshal member should never fail: %v", err) + } + peers[i] = raft.Peer{ID: uint64(id), Context: ctx} + } + id = member.ID + log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID()) + s = raft.NewMemoryStorage() + n = raft.StartNode(uint64(id), peers, 10, 1, s) + return +} + +func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) { + w, id, cid, st, ents := readWAL(cfg.WALDir(), index) + cfg.Cluster.SetID(cid) + + log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit) + s := raft.NewMemoryStorage() + if snapshot != nil { + s.ApplySnapshot(*snapshot) + } + s.SetHardState(st) + s.Append(ents) + n := raft.RestartNode(uint64(id), 10, 1, s) + return id, n, s, w +} + +// isBootstrapped tries to check if the given member has been bootstrapped +// in the given cluster. +func isBootstrapped(cl *Cluster, member string) bool { + us := getOtherPeerURLs(cl, member) rcl, err := getClusterFromPeers(us, false) if err != nil { - return nil + return false } - id := cl.MemberByName(name).ID + id := cl.MemberByName(member).ID m := rcl.Member(id) if m == nil { - return nil + return false } if len(m.ClientURLs) > 0 { - return fmt.Errorf("etcdserver: member with id %s has started and registered its client urls", id) + return true } - return nil + return false } // GetClusterFromPeers takes a set of URLs representing etcd peers, and @@ -908,36 +952,6 @@ func getClusterFromPeers(urls []string, logerr bool) (*Cluster, error) { return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls") } -func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { - var err error - member := cfg.Cluster.MemberByName(cfg.Name) - metadata := pbutil.MustMarshal( - &pb.Metadata{ - NodeID: uint64(member.ID), - ClusterID: uint64(cfg.Cluster.ID()), - }, - ) - if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil { - log.Fatalf("etcdserver create snapshot directory error: %v", err) - } - if w, err = wal.Create(cfg.WALDir(), metadata); err != nil { - log.Fatalf("etcdserver: create wal error: %v", err) - } - peers := make([]raft.Peer, len(ids)) - for i, id := range ids { - ctx, err := json.Marshal((*cfg.Cluster).Member(id)) - if err != nil { - log.Panicf("marshal member should never fail: %v", err) - } - peers[i] = raft.Peer{ID: uint64(id), Context: ctx} - } - id = member.ID - log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID()) - s = raft.NewMemoryStorage() - n = raft.StartNode(uint64(id), peers, 10, 1, s) - return -} - func getOtherMembers(cl ClusterInfo, self string) []*Member { var ms []*Member for _, m := range cl.Members() { @@ -962,37 +976,6 @@ func getOtherPeerURLs(cl ClusterInfo, self string) []string { return us } -func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) { - w, id, cid, st, ents := readWAL(cfg.WALDir(), index) - cfg.Cluster.SetID(cid) - - log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit) - s := raft.NewMemoryStorage() - if snapshot != nil { - s.ApplySnapshot(*snapshot) - } - s.SetHardState(st) - s.Append(ents) - n := raft.RestartNode(uint64(id), 10, 1, s) - return id, n, s, w -} - -func readWAL(waldir string, index uint64) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) { - var err error - if w, err = wal.Open(waldir, index); err != nil { - log.Fatalf("etcdserver: open wal error: %v", err) - } - var wmetadata []byte - if wmetadata, st, ents, err = w.ReadAll(); err != nil { - log.Fatalf("etcdserver: read wal error: %v", err) - } - var metadata pb.Metadata - pbutil.MustUnmarshal(&metadata, wmetadata) - id = types.ID(metadata.NodeID) - cid = types.ID(metadata.ClusterID) - return -} - // TODO: move the function to /id pkg maybe? // GenID generates a random id that is not equal to 0. func GenID() (n uint64) { diff --git a/etcdserver/storage.go b/etcdserver/storage.go index 61f68fbd8..9f1f5cc6d 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -3,7 +3,10 @@ package etcdserver import ( "log" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/migrate" + "github.com/coreos/etcd/pkg/pbutil" + "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap" "github.com/coreos/etcd/wal" @@ -47,9 +50,25 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error { return nil } -// UpgradeWAL converts an older version of the etcdServer data to the newest version. +func readWAL(waldir string, index uint64) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) { + var err error + if w, err = wal.Open(waldir, index); err != nil { + log.Fatalf("etcdserver: open wal error: %v", err) + } + var wmetadata []byte + if wmetadata, st, ents, err = w.ReadAll(); err != nil { + log.Fatalf("etcdserver: read wal error: %v", err) + } + var metadata pb.Metadata + pbutil.MustUnmarshal(&metadata, wmetadata) + id = types.ID(metadata.NodeID) + cid = types.ID(metadata.ClusterID) + return +} + +// upgradeWAL converts an older version of the etcdServer data to the newest version. // It must ensure that, after upgrading, the most recent version is present. -func UpgradeWAL(cfg *ServerConfig, ver wal.WalVersion) error { +func upgradeWAL(cfg *ServerConfig, ver wal.WalVersion) error { if ver == wal.WALv0_4 { log.Print("etcdserver: converting v0.4 log to v2.0") err := migrate.Migrate4To2(cfg.DataDir, cfg.Name)