etcdserver: cleanup server.go
parent
60d25635c4
commit
69444b6bba
|
@ -198,10 +198,10 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|||
if err := cfg.VerifyBootstrapConfig(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := checkClientURLsEmptyFromPeers(cfg.Cluster, cfg.Name); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m := cfg.Cluster.MemberByName(cfg.Name)
|
||||
if isBootstrapped(cfg.Cluster, cfg.Name) {
|
||||
return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID)
|
||||
}
|
||||
if cfg.ShouldDiscover() {
|
||||
s, err := discovery.JoinCluster(cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.Cluster.String())
|
||||
if err != nil {
|
||||
|
@ -216,7 +216,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|||
id, n, s, w = startNode(cfg, cfg.Cluster.MemberIDs())
|
||||
case haveWAL:
|
||||
if walVersion != wal.WALv0_5 {
|
||||
if err := UpgradeWAL(cfg, walVersion); err != nil {
|
||||
if err := upgradeWAL(cfg, walVersion); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
@ -837,24 +837,68 @@ func (s *EtcdServer) ResumeSending() {
|
|||
hub.resume()
|
||||
}
|
||||
|
||||
// checkClientURLsEmptyFromPeers does its best to get the cluster from peers,
|
||||
// and if this succeeds, checks that the member of the given id exists in the
|
||||
// cluster, and its ClientURLs is empty.
|
||||
func checkClientURLsEmptyFromPeers(cl *Cluster, name string) error {
|
||||
us := getOtherPeerURLs(cl, name)
|
||||
func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
|
||||
var err error
|
||||
member := cfg.Cluster.MemberByName(cfg.Name)
|
||||
metadata := pbutil.MustMarshal(
|
||||
&pb.Metadata{
|
||||
NodeID: uint64(member.ID),
|
||||
ClusterID: uint64(cfg.Cluster.ID()),
|
||||
},
|
||||
)
|
||||
if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil {
|
||||
log.Fatalf("etcdserver create snapshot directory error: %v", err)
|
||||
}
|
||||
if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
|
||||
log.Fatalf("etcdserver: create wal error: %v", err)
|
||||
}
|
||||
peers := make([]raft.Peer, len(ids))
|
||||
for i, id := range ids {
|
||||
ctx, err := json.Marshal((*cfg.Cluster).Member(id))
|
||||
if err != nil {
|
||||
log.Panicf("marshal member should never fail: %v", err)
|
||||
}
|
||||
peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
|
||||
}
|
||||
id = member.ID
|
||||
log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID())
|
||||
s = raft.NewMemoryStorage()
|
||||
n = raft.StartNode(uint64(id), peers, 10, 1, s)
|
||||
return
|
||||
}
|
||||
|
||||
func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
|
||||
w, id, cid, st, ents := readWAL(cfg.WALDir(), index)
|
||||
cfg.Cluster.SetID(cid)
|
||||
|
||||
log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
|
||||
s := raft.NewMemoryStorage()
|
||||
if snapshot != nil {
|
||||
s.ApplySnapshot(*snapshot)
|
||||
}
|
||||
s.SetHardState(st)
|
||||
s.Append(ents)
|
||||
n := raft.RestartNode(uint64(id), 10, 1, s)
|
||||
return id, n, s, w
|
||||
}
|
||||
|
||||
// isBootstrapped tries to check if the given member has been bootstrapped
|
||||
// in the given cluster.
|
||||
func isBootstrapped(cl *Cluster, member string) bool {
|
||||
us := getOtherPeerURLs(cl, member)
|
||||
rcl, err := getClusterFromPeers(us, false)
|
||||
if err != nil {
|
||||
return nil
|
||||
return false
|
||||
}
|
||||
id := cl.MemberByName(name).ID
|
||||
id := cl.MemberByName(member).ID
|
||||
m := rcl.Member(id)
|
||||
if m == nil {
|
||||
return nil
|
||||
return false
|
||||
}
|
||||
if len(m.ClientURLs) > 0 {
|
||||
return fmt.Errorf("etcdserver: member with id %s has started and registered its client urls", id)
|
||||
return true
|
||||
}
|
||||
return nil
|
||||
return false
|
||||
}
|
||||
|
||||
// GetClusterFromPeers takes a set of URLs representing etcd peers, and
|
||||
|
@ -908,36 +952,6 @@ func getClusterFromPeers(urls []string, logerr bool) (*Cluster, error) {
|
|||
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
|
||||
}
|
||||
|
||||
func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
|
||||
var err error
|
||||
member := cfg.Cluster.MemberByName(cfg.Name)
|
||||
metadata := pbutil.MustMarshal(
|
||||
&pb.Metadata{
|
||||
NodeID: uint64(member.ID),
|
||||
ClusterID: uint64(cfg.Cluster.ID()),
|
||||
},
|
||||
)
|
||||
if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil {
|
||||
log.Fatalf("etcdserver create snapshot directory error: %v", err)
|
||||
}
|
||||
if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
|
||||
log.Fatalf("etcdserver: create wal error: %v", err)
|
||||
}
|
||||
peers := make([]raft.Peer, len(ids))
|
||||
for i, id := range ids {
|
||||
ctx, err := json.Marshal((*cfg.Cluster).Member(id))
|
||||
if err != nil {
|
||||
log.Panicf("marshal member should never fail: %v", err)
|
||||
}
|
||||
peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
|
||||
}
|
||||
id = member.ID
|
||||
log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID())
|
||||
s = raft.NewMemoryStorage()
|
||||
n = raft.StartNode(uint64(id), peers, 10, 1, s)
|
||||
return
|
||||
}
|
||||
|
||||
func getOtherMembers(cl ClusterInfo, self string) []*Member {
|
||||
var ms []*Member
|
||||
for _, m := range cl.Members() {
|
||||
|
@ -962,37 +976,6 @@ func getOtherPeerURLs(cl ClusterInfo, self string) []string {
|
|||
return us
|
||||
}
|
||||
|
||||
func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
|
||||
w, id, cid, st, ents := readWAL(cfg.WALDir(), index)
|
||||
cfg.Cluster.SetID(cid)
|
||||
|
||||
log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
|
||||
s := raft.NewMemoryStorage()
|
||||
if snapshot != nil {
|
||||
s.ApplySnapshot(*snapshot)
|
||||
}
|
||||
s.SetHardState(st)
|
||||
s.Append(ents)
|
||||
n := raft.RestartNode(uint64(id), 10, 1, s)
|
||||
return id, n, s, w
|
||||
}
|
||||
|
||||
func readWAL(waldir string, index uint64) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
|
||||
var err error
|
||||
if w, err = wal.Open(waldir, index); err != nil {
|
||||
log.Fatalf("etcdserver: open wal error: %v", err)
|
||||
}
|
||||
var wmetadata []byte
|
||||
if wmetadata, st, ents, err = w.ReadAll(); err != nil {
|
||||
log.Fatalf("etcdserver: read wal error: %v", err)
|
||||
}
|
||||
var metadata pb.Metadata
|
||||
pbutil.MustUnmarshal(&metadata, wmetadata)
|
||||
id = types.ID(metadata.NodeID)
|
||||
cid = types.ID(metadata.ClusterID)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: move the function to /id pkg maybe?
|
||||
// GenID generates a random id that is not equal to 0.
|
||||
func GenID() (n uint64) {
|
||||
|
|
|
@ -3,7 +3,10 @@ package etcdserver
|
|||
import (
|
||||
"log"
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/migrate"
|
||||
"github.com/coreos/etcd/pkg/pbutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/snap"
|
||||
"github.com/coreos/etcd/wal"
|
||||
|
@ -47,9 +50,25 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// UpgradeWAL converts an older version of the etcdServer data to the newest version.
|
||||
func readWAL(waldir string, index uint64) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
|
||||
var err error
|
||||
if w, err = wal.Open(waldir, index); err != nil {
|
||||
log.Fatalf("etcdserver: open wal error: %v", err)
|
||||
}
|
||||
var wmetadata []byte
|
||||
if wmetadata, st, ents, err = w.ReadAll(); err != nil {
|
||||
log.Fatalf("etcdserver: read wal error: %v", err)
|
||||
}
|
||||
var metadata pb.Metadata
|
||||
pbutil.MustUnmarshal(&metadata, wmetadata)
|
||||
id = types.ID(metadata.NodeID)
|
||||
cid = types.ID(metadata.ClusterID)
|
||||
return
|
||||
}
|
||||
|
||||
// upgradeWAL converts an older version of the etcdServer data to the newest version.
|
||||
// It must ensure that, after upgrading, the most recent version is present.
|
||||
func UpgradeWAL(cfg *ServerConfig, ver wal.WalVersion) error {
|
||||
func upgradeWAL(cfg *ServerConfig, ver wal.WalVersion) error {
|
||||
if ver == wal.WALv0_4 {
|
||||
log.Print("etcdserver: converting v0.4 log to v2.0")
|
||||
err := migrate.Migrate4To2(cfg.DataDir, cfg.Name)
|
||||
|
|
Loading…
Reference in New Issue