server: Move clusterID and nodeID up the bootstrap hierarchy
parent
db06a4ab28
commit
6a4ea70aef
|
@ -99,9 +99,10 @@ type bootstrappedStorage struct {
|
|||
}
|
||||
|
||||
type bootstrapedCluster struct {
|
||||
raft *bootstrappedRaft
|
||||
remotes []*membership.Member
|
||||
wal *bootstrappedWAL
|
||||
raft *bootstrappedRaft
|
||||
remotes []*membership.Member
|
||||
wal *bootstrappedWAL
|
||||
clusterID, nodeID types.ID
|
||||
}
|
||||
|
||||
func bootstrapStorage(cfg config.ServerConfig, ss *snap.Snapshotter, prt http.RoundTripper) (b *bootstrappedStorage, err error) {
|
||||
|
@ -235,13 +236,16 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe
|
|||
cl.SetID(types.ID(0), existingCluster.ID())
|
||||
cl.SetStore(st)
|
||||
cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be))
|
||||
bwal := bootstrapNewWAL(cfg, cl.MemberByName(cfg.Name).ID, cl.ID())
|
||||
member := cl.MemberByName(cfg.Name)
|
||||
bwal := bootstrapNewWAL(cfg, member, cl)
|
||||
br := bootstrapRaftFromCluster(cfg, cl, nil, bwal)
|
||||
cl.SetID(bwal.id, existingCluster.ID())
|
||||
cl.SetID(member.ID, existingCluster.ID())
|
||||
return &bootstrapedCluster{
|
||||
raft: br,
|
||||
remotes: remotes,
|
||||
wal: bwal,
|
||||
raft: br,
|
||||
remotes: remotes,
|
||||
wal: bwal,
|
||||
clusterID: cl.ID(),
|
||||
nodeID: member.ID,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -277,13 +281,16 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st
|
|||
}
|
||||
cl.SetStore(st)
|
||||
cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be))
|
||||
bwal := bootstrapNewWAL(cfg, cl.MemberByName(cfg.Name).ID, cl.ID())
|
||||
member := cl.MemberByName(cfg.Name)
|
||||
bwal := bootstrapNewWAL(cfg, member, cl)
|
||||
br := bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs(), bwal)
|
||||
cl.SetID(bwal.id, cl.ID())
|
||||
cl.SetID(member.ID, cl.ID())
|
||||
return &bootstrapedCluster{
|
||||
remotes: nil,
|
||||
raft: br,
|
||||
wal: bwal,
|
||||
remotes: nil,
|
||||
raft: br,
|
||||
wal: bwal,
|
||||
clusterID: cl.ID(),
|
||||
nodeID: member.ID,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -360,31 +367,34 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, st v2store.Store, be backe
|
|||
cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!")
|
||||
}
|
||||
|
||||
bwal := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync)
|
||||
bwal, meta := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync)
|
||||
|
||||
b := &bootstrapedCluster{
|
||||
wal: bwal,
|
||||
wal: bwal,
|
||||
clusterID: meta.clusterID,
|
||||
nodeID: meta.nodeID,
|
||||
}
|
||||
if cfg.ForceNewCluster {
|
||||
// discard the previously uncommitted entries
|
||||
bwal.ents = bwal.CommitedEntries()
|
||||
entries := bwal.ConfigChangeEntries()
|
||||
entries := bwal.ConfigChangeEntries(meta)
|
||||
// force commit config change entries
|
||||
bwal.AppendAndCommitEntries(entries)
|
||||
cfg.Logger.Info(
|
||||
"forcing restart member",
|
||||
zap.String("cluster-id", bwal.cid.String()),
|
||||
zap.String("local-member-id", bwal.id.String()),
|
||||
zap.String("cluster-id", meta.clusterID.String()),
|
||||
zap.String("local-member-id", meta.nodeID.String()),
|
||||
zap.Uint64("commit-index", bwal.st.Commit),
|
||||
)
|
||||
} else {
|
||||
cfg.Logger.Info(
|
||||
"restarting local member",
|
||||
zap.String("cluster-id", bwal.cid.String()),
|
||||
zap.String("local-member-id", bwal.id.String()),
|
||||
zap.String("cluster-id", meta.clusterID.String()),
|
||||
zap.String("local-member-id", meta.nodeID.String()),
|
||||
zap.Uint64("commit-index", bwal.st.Commit),
|
||||
)
|
||||
}
|
||||
b.raft = bootstrapRaftFromWal(cfg, bwal)
|
||||
b.raft = bootstrapRaftFromSnapshot(cfg, bwal, meta)
|
||||
|
||||
b.raft.cl.SetStore(st)
|
||||
b.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be))
|
||||
|
@ -424,15 +434,15 @@ func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluste
|
|||
}
|
||||
}
|
||||
|
||||
func bootstrapRaftFromWal(cfg config.ServerConfig, bwal *bootstrappedWAL) *bootstrappedRaft {
|
||||
func bootstrapRaftFromSnapshot(cfg config.ServerConfig, bwal *bootstrappedWAL, meta *snapshotMetadata) *bootstrappedRaft {
|
||||
cl := membership.NewCluster(cfg.Logger)
|
||||
cl.SetID(bwal.id, bwal.cid)
|
||||
cl.SetID(meta.nodeID, meta.clusterID)
|
||||
s := bwal.MemoryStorage()
|
||||
return &bootstrappedRaft{
|
||||
lg: cfg.Logger,
|
||||
heartbeat: time.Duration(cfg.TickMs) * time.Millisecond,
|
||||
cl: cl,
|
||||
config: raftConfig(cfg, uint64(bwal.id), s),
|
||||
config: raftConfig(cfg, uint64(meta.nodeID), s),
|
||||
storage: s,
|
||||
}
|
||||
}
|
||||
|
@ -486,7 +496,7 @@ func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL) *raft
|
|||
// bootstrapWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear
|
||||
// after the position of the given snap in the WAL.
|
||||
// The snap must have been previously saved to the WAL, or this call will panic.
|
||||
func bootstrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Snapshot, unsafeNoFsync bool) *bootstrappedWAL {
|
||||
func bootstrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Snapshot, unsafeNoFsync bool) (*bootstrappedWAL, *snapshotMetadata) {
|
||||
var walsnap walpb.Snapshot
|
||||
if snapshot != nil {
|
||||
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||
|
@ -519,23 +529,26 @@ func bootstrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Sn
|
|||
pbutil.MustUnmarshal(&metadata, wmetadata)
|
||||
id := types.ID(metadata.NodeID)
|
||||
cid := types.ID(metadata.ClusterID)
|
||||
meta := &snapshotMetadata{clusterID: cid, nodeID: id}
|
||||
return &bootstrappedWAL{
|
||||
lg: lg,
|
||||
w: w,
|
||||
id: id,
|
||||
cid: cid,
|
||||
st: &st,
|
||||
ents: ents,
|
||||
snapshot: snapshot,
|
||||
}
|
||||
}, meta
|
||||
}
|
||||
}
|
||||
|
||||
func bootstrapNewWAL(cfg config.ServerConfig, nodeID, clusterID types.ID) *bootstrappedWAL {
|
||||
type snapshotMetadata struct {
|
||||
nodeID, clusterID types.ID
|
||||
}
|
||||
|
||||
func bootstrapNewWAL(cfg config.ServerConfig, m *membership.Member, cl *membership.RaftCluster) *bootstrappedWAL {
|
||||
metadata := pbutil.MustMarshal(
|
||||
&etcdserverpb.Metadata{
|
||||
NodeID: uint64(nodeID),
|
||||
ClusterID: uint64(clusterID),
|
||||
NodeID: uint64(m.ID),
|
||||
ClusterID: uint64(cl.ID()),
|
||||
},
|
||||
)
|
||||
w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata)
|
||||
|
@ -546,10 +559,8 @@ func bootstrapNewWAL(cfg config.ServerConfig, nodeID, clusterID types.ID) *boots
|
|||
w.SetUnsafeNoFsync()
|
||||
}
|
||||
return &bootstrappedWAL{
|
||||
lg: cfg.Logger,
|
||||
w: w,
|
||||
id: nodeID,
|
||||
cid: clusterID,
|
||||
lg: cfg.Logger,
|
||||
w: w,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -557,7 +568,6 @@ type bootstrappedWAL struct {
|
|||
lg *zap.Logger
|
||||
|
||||
w *wal.WAL
|
||||
id, cid types.ID
|
||||
st *raftpb.HardState
|
||||
ents []raftpb.Entry
|
||||
snapshot *raftpb.Snapshot
|
||||
|
@ -592,11 +602,11 @@ func (wal *bootstrappedWAL) CommitedEntries() []raftpb.Entry {
|
|||
return wal.ents
|
||||
}
|
||||
|
||||
func (wal *bootstrappedWAL) ConfigChangeEntries() []raftpb.Entry {
|
||||
func (wal *bootstrappedWAL) ConfigChangeEntries(meta *snapshotMetadata) []raftpb.Entry {
|
||||
return serverstorage.CreateConfigChangeEnts(
|
||||
wal.lg,
|
||||
serverstorage.GetIDs(wal.lg, wal.snapshot, wal.ents),
|
||||
uint64(wal.id),
|
||||
uint64(meta.nodeID),
|
||||
wal.st.Term,
|
||||
wal.st.Commit,
|
||||
)
|
||||
|
|
|
@ -308,8 +308,8 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
|||
}
|
||||
}()
|
||||
|
||||
sstats := stats.NewServerStats(cfg.Name, b.storage.cluster.wal.id.String())
|
||||
lstats := stats.NewLeaderStats(cfg.Logger, b.storage.cluster.wal.id.String())
|
||||
sstats := stats.NewServerStats(cfg.Name, b.storage.cluster.nodeID.String())
|
||||
lstats := stats.NewLeaderStats(cfg.Logger, b.storage.cluster.nodeID.String())
|
||||
|
||||
heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
|
||||
srv = &EtcdServer{
|
||||
|
@ -321,20 +321,20 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
|||
v2store: b.storage.st,
|
||||
snapshotter: b.ss,
|
||||
r: *b.storage.cluster.raft.newRaftNode(b.ss, b.storage.cluster.wal.w),
|
||||
id: b.storage.cluster.wal.id,
|
||||
id: b.storage.cluster.nodeID,
|
||||
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
|
||||
cluster: b.storage.cluster.raft.cl,
|
||||
stats: sstats,
|
||||
lstats: lstats,
|
||||
SyncTicker: time.NewTicker(500 * time.Millisecond),
|
||||
peerRt: b.prt,
|
||||
reqIDGen: idutil.NewGenerator(uint16(b.storage.cluster.wal.id), time.Now()),
|
||||
reqIDGen: idutil.NewGenerator(uint16(b.storage.cluster.nodeID), time.Now()),
|
||||
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
|
||||
consistIndex: b.storage.ci,
|
||||
firstCommitInTerm: notify.NewNotifier(),
|
||||
clusterVersionChanged: notify.NewNotifier(),
|
||||
}
|
||||
serverID.With(prometheus.Labels{"server_id": b.storage.cluster.wal.id.String()}).Set(1)
|
||||
serverID.With(prometheus.Labels{"server_id": b.storage.cluster.nodeID.String()}).Set(1)
|
||||
srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged)
|
||||
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
|
||||
|
||||
|
@ -403,7 +403,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
|||
Logger: cfg.Logger,
|
||||
TLSInfo: cfg.PeerTLSInfo,
|
||||
DialTimeout: cfg.PeerDialTimeout(),
|
||||
ID: b.storage.cluster.wal.id,
|
||||
ID: b.storage.cluster.nodeID,
|
||||
URLs: cfg.PeerURLs,
|
||||
ClusterID: b.storage.cluster.raft.cl.ID(),
|
||||
Raft: srv,
|
||||
|
@ -417,12 +417,12 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
|||
}
|
||||
// add all remotes into transport
|
||||
for _, m := range b.storage.cluster.remotes {
|
||||
if m.ID != b.storage.cluster.wal.id {
|
||||
if m.ID != b.storage.cluster.nodeID {
|
||||
tr.AddRemote(m.ID, m.PeerURLs)
|
||||
}
|
||||
}
|
||||
for _, m := range b.storage.cluster.raft.cl.Members() {
|
||||
if m.ID != b.storage.cluster.wal.id {
|
||||
if m.ID != b.storage.cluster.nodeID {
|
||||
tr.AddPeer(m.ID, m.PeerURLs)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue