forked from vitalif/vitastor
Track osd_set history and all_peers separately
parent
00cf24fbd7
commit
6355b968f4
1
osd.h
1
osd.h
|
@ -192,6 +192,7 @@ struct pg_config_t
|
|||
osd_num_t primary;
|
||||
std::vector<osd_num_t> target_set;
|
||||
std::vector<std::vector<osd_num_t>> target_history;
|
||||
std::vector<osd_num_t> all_peers;
|
||||
bool pause;
|
||||
osd_num_t cur_primary;
|
||||
int cur_state;
|
||||
|
|
|
@ -614,15 +614,23 @@ void osd_t::parse_pg_state(const std::string & key, const json11::Json & value)
|
|||
}
|
||||
else
|
||||
{
|
||||
this->pg_config[pg_num].target_history.clear();
|
||||
for (auto hist_item: value.array_items())
|
||||
auto & pg_cfg = this->pg_config[pg_num];
|
||||
pg_cfg.target_history.clear();
|
||||
pg_cfg.all_peers.clear();
|
||||
// Refuse to start PG if any set of the <osd_sets> has no live OSDs
|
||||
for (auto hist_item: value["osd_sets"].array_items())
|
||||
{
|
||||
std::vector<osd_num_t> history_set;
|
||||
for (auto pg_osd: hist_item["osd_set"].array_items())
|
||||
for (auto pg_osd: hist_item.array_items())
|
||||
{
|
||||
history_set.push_back(pg_osd.uint64_value());
|
||||
}
|
||||
this->pg_config[pg_num].target_history.push_back(history_set);
|
||||
pg_cfg.target_history.push_back(history_set);
|
||||
}
|
||||
// Include these additional OSDs when peering the PG
|
||||
for (auto pg_osd: value["all_peers"].array_items())
|
||||
{
|
||||
pg_cfg.all_peers.push_back(pg_osd.uint64_value());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -736,6 +744,13 @@ void osd_t::apply_pg_config()
|
|||
all_peers.insert(pg_osd);
|
||||
}
|
||||
}
|
||||
for (osd_num_t pg_osd: pg_cfg.all_peers)
|
||||
{
|
||||
if (pg_osd != 0)
|
||||
{
|
||||
all_peers.insert(pg_osd);
|
||||
}
|
||||
}
|
||||
for (auto & hist_item: pg_cfg.target_history)
|
||||
{
|
||||
for (auto pg_osd: hist_item)
|
||||
|
@ -791,8 +806,8 @@ void osd_t::apply_pg_config()
|
|||
.state = pg_cfg.cur_primary == this->osd_num ? PG_PEERING : PG_STARTING,
|
||||
.pg_cursize = 0,
|
||||
.pg_num = pg_num,
|
||||
.all_peers = std::vector<osd_num_t>(all_peers.begin(), all_peers.end()),
|
||||
.target_history = pg_cfg.target_history,
|
||||
.all_peers = std::vector<osd_num_t>(all_peers.begin(), all_peers.end()),
|
||||
.target_set = pg_cfg.target_set,
|
||||
};
|
||||
this->pg_state_dirty.insert(pg_num);
|
||||
|
@ -831,7 +846,7 @@ void osd_t::report_pg_states()
|
|||
return;
|
||||
}
|
||||
etcd_reporting_pg_state = true;
|
||||
std::vector<pg_num_t> reporting_pgs;
|
||||
std::vector<std::pair<pg_num_t,bool>> reporting_pgs;
|
||||
json11::Json::array checks;
|
||||
json11::Json::array success;
|
||||
json11::Json::array failure;
|
||||
|
@ -843,7 +858,7 @@ void osd_t::report_pg_states()
|
|||
continue;
|
||||
}
|
||||
auto & pg = pg_it->second;
|
||||
reporting_pgs.push_back(pg.pg_num);
|
||||
reporting_pgs.push_back({ pg.pg_num, pg.history_changed });
|
||||
std::string state_key_base64 = base64_encode(etcd_prefix+"/pg/state/"+std::to_string(pg.pg_num));
|
||||
if (pg.state == PG_STARTING)
|
||||
{
|
||||
|
@ -890,18 +905,33 @@ void osd_t::report_pg_states()
|
|||
{ "value", base64_encode(json11::Json(json11::Json::object {
|
||||
{ "primary", this->osd_num },
|
||||
{ "state", pg_state_keywords },
|
||||
{ "peers", pg.cur_peers },
|
||||
}).dump()) },
|
||||
{ "lease", etcd_lease_id },
|
||||
} }
|
||||
});
|
||||
if (pg.state == PG_ACTIVE && pg.target_history.size() > 0)
|
||||
if (pg.history_changed)
|
||||
{
|
||||
// Clear history of active+clean PGs
|
||||
success.push_back(json11::Json::object {
|
||||
{ "request_delete_range", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/pg/history/"+std::to_string(pg.pg_num)) },
|
||||
} }
|
||||
});
|
||||
pg.history_changed = false;
|
||||
if (pg.state == PG_ACTIVE)
|
||||
{
|
||||
success.push_back(json11::Json::object {
|
||||
{ "request_delete_range", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/pg/history/"+std::to_string(pg.pg_num)) },
|
||||
} }
|
||||
});
|
||||
}
|
||||
else if (pg.state == (PG_ACTIVE|PG_LEFT_ON_DEAD))
|
||||
{
|
||||
success.push_back(json11::Json::object {
|
||||
{ "request_put", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/pg/history/"+std::to_string(pg.pg_num)) },
|
||||
{ "value", base64_encode(json11::Json(json11::Json::object {
|
||||
{ "all_peers", pg.all_peers },
|
||||
}).dump()) },
|
||||
} }
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
failure.push_back(json11::Json::object {
|
||||
|
@ -918,10 +948,18 @@ void osd_t::report_pg_states()
|
|||
etcd_reporting_pg_state = false;
|
||||
if (!data["succeeded"].bool_value())
|
||||
{
|
||||
// One of PG state updates failed
|
||||
for (auto pg_num: reporting_pgs)
|
||||
// One of PG state updates failed, put dirty flags back
|
||||
for (auto pp: reporting_pgs)
|
||||
{
|
||||
this->pg_state_dirty.insert(pg_num);
|
||||
this->pg_state_dirty.insert(pp.first);
|
||||
if (pp.second)
|
||||
{
|
||||
auto pg_it = this->pgs.find(pp.first);
|
||||
if (pg_it != this->pgs.end())
|
||||
{
|
||||
pg_it->second.history_changed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
for (auto & res: data["responses"].array_items())
|
||||
{
|
||||
|
@ -942,9 +980,9 @@ void osd_t::report_pg_states()
|
|||
else
|
||||
{
|
||||
// Success. We'll get our changes back via the watcher and react to them
|
||||
for (auto pg_num: reporting_pgs)
|
||||
for (auto pp: reporting_pgs)
|
||||
{
|
||||
auto pg_it = this->pgs.find(pg_num);
|
||||
auto pg_it = this->pgs.find(pp.first);
|
||||
if (pg_it != this->pgs.end())
|
||||
{
|
||||
if (pg_it->second.state == PG_OFFLINE)
|
||||
|
@ -952,12 +990,6 @@ void osd_t::report_pg_states()
|
|||
// Remove offline PGs after reporting their state
|
||||
this->pgs.erase(pg_it);
|
||||
}
|
||||
else if (pg_it->second.state == PG_ACTIVE && pg_it->second.target_history.size() > 0)
|
||||
{
|
||||
// Clear history of active+clean PGs
|
||||
pg_it->second.target_history.clear();
|
||||
pg_it->second.all_peers = pg_it->second.target_set;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Push other PG state updates, if any
|
||||
|
@ -1051,12 +1083,12 @@ void osd_t::load_and_connect_peers()
|
|||
{
|
||||
etcd_txn(json11::Json::object { { "success", load_peer_txn } }, ETCD_QUICK_TIMEOUT, [this](std::string err, json11::Json data)
|
||||
{
|
||||
// Ugly, but required to wake up the loop
|
||||
// Ugly, but required to wake up the loop and retry connecting after <peer_connect_interval> seconds
|
||||
tfd->set_timer(peer_connect_interval*1000, false, [](int timer_id){});
|
||||
loading_peer_config = false;
|
||||
if (err != "")
|
||||
{
|
||||
printf("Failed to load peer configuration from etcd: %s\n", err.c_str());
|
||||
printf("Failed to load peer states from etcd: %s\n", err.c_str());
|
||||
return;
|
||||
}
|
||||
for (auto & res: data["responses"].array_items())
|
||||
|
|
|
@ -253,6 +253,7 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
|
|||
this->peering_state |= OSD_PEERING_PGS;
|
||||
report_pg_state(pg);
|
||||
// Reset PG state
|
||||
pg.cur_peers.clear();
|
||||
pg.state_dict.clear();
|
||||
incomplete_objects -= pg.incomplete_objects.size();
|
||||
misplaced_objects -= pg.misplaced_objects.size();
|
||||
|
@ -335,6 +336,7 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
|
|||
peering_state |= OSD_CONNECTING_PEERS;
|
||||
}
|
||||
}
|
||||
pg.cur_peers.insert(pg.cur_peers.begin(), cur_peers.begin(), cur_peers.end());
|
||||
if (pg.peering_state)
|
||||
{
|
||||
// Adjust the peering operation that's still in progress - discard unneeded results
|
||||
|
@ -614,6 +616,46 @@ void osd_t::report_pg_state(pg_t & pg)
|
|||
{
|
||||
pg.print_state();
|
||||
this->pg_state_dirty.insert(pg.pg_num);
|
||||
if (pg.state == PG_ACTIVE && (pg.target_history.size() > 0 || pg.all_peers.size() > pg.target_set.size()))
|
||||
{
|
||||
// Clear history of active+clean PGs
|
||||
pg.history_changed = true;
|
||||
pg.target_history.clear();
|
||||
pg.all_peers = pg.target_set;
|
||||
pg.cur_peers = pg.target_set;
|
||||
}
|
||||
else if (pg.state == (PG_ACTIVE|PG_LEFT_ON_DEAD))
|
||||
{
|
||||
// Clear history of active+left_on_dead PGs, but leave dead OSDs in all_peers
|
||||
pg.history_changed = true;
|
||||
pg.target_history.clear();
|
||||
std::set<osd_num_t> dead_peers;
|
||||
for (auto pg_osd: pg.all_peers)
|
||||
{
|
||||
dead_peers.insert(pg_osd);
|
||||
}
|
||||
for (auto pg_osd: pg.cur_peers)
|
||||
{
|
||||
dead_peers.erase(pg_osd);
|
||||
}
|
||||
for (auto pg_osd: pg.target_set)
|
||||
{
|
||||
if (pg_osd)
|
||||
{
|
||||
dead_peers.insert(pg_osd);
|
||||
}
|
||||
}
|
||||
pg.all_peers.clear();
|
||||
pg.all_peers.insert(pg.all_peers.begin(), dead_peers.begin(), dead_peers.end());
|
||||
pg.cur_peers.clear();
|
||||
for (auto pg_osd: pg.target_set)
|
||||
{
|
||||
if (pg_osd)
|
||||
{
|
||||
pg.cur_peers.push_back(pg_osd);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (pg.state == PG_OFFLINE && !this->pg_config_applied)
|
||||
{
|
||||
apply_pg_config();
|
||||
|
|
|
@ -75,9 +75,13 @@ void pg_obj_state_check_t::walk()
|
|||
}
|
||||
if (pg->pg_cursize < pg->pg_size)
|
||||
{
|
||||
pg->state = pg->state | PG_DEGRADED;
|
||||
pg->state |= PG_DEGRADED;
|
||||
}
|
||||
pg->state |= PG_ACTIVE;
|
||||
if (pg->state == PG_ACTIVE && pg->cur_peers.size() < pg->all_peers.size())
|
||||
{
|
||||
pg->state |= PG_LEFT_ON_DEAD;
|
||||
}
|
||||
pg->state = pg->state | PG_ACTIVE;
|
||||
}
|
||||
|
||||
void pg_obj_state_check_t::start_object()
|
||||
|
@ -380,9 +384,9 @@ void pg_t::print_state()
|
|||
);
|
||||
}
|
||||
|
||||
const int pg_state_bit_count = 12;
|
||||
const int pg_state_bit_count = 13;
|
||||
|
||||
const int pg_state_bits[12] = {
|
||||
const int pg_state_bits[13] = {
|
||||
PG_STARTING,
|
||||
PG_PEERING,
|
||||
PG_INCOMPLETE,
|
||||
|
@ -394,9 +398,10 @@ const int pg_state_bits[12] = {
|
|||
PG_HAS_DEGRADED,
|
||||
PG_HAS_MISPLACED,
|
||||
PG_HAS_UNCLEAN,
|
||||
PG_LEFT_ON_DEAD,
|
||||
};
|
||||
|
||||
const char *pg_state_names[12] = {
|
||||
const char *pg_state_names[13] = {
|
||||
"starting",
|
||||
"peering",
|
||||
"incomplete",
|
||||
|
@ -408,4 +413,5 @@ const char *pg_state_names[12] = {
|
|||
"has_degraded",
|
||||
"has_misplaced",
|
||||
"has_unclean",
|
||||
"left_on_dead",
|
||||
};
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#define PG_HAS_DEGRADED (1<<8)
|
||||
#define PG_HAS_MISPLACED (1<<9)
|
||||
#define PG_HAS_UNCLEAN (1<<10)
|
||||
#define PG_LEFT_ON_DEAD (1<<11)
|
||||
|
||||
// FIXME: Safe default that doesn't depend on pg_stripe_size or pg_block_size
|
||||
#define STRIPE_MASK ((uint64_t)4096 - 1)
|
||||
|
@ -102,9 +103,12 @@ struct pg_t
|
|||
uint64_t pg_cursize = 3, pg_size = 3, pg_minsize = 2;
|
||||
pg_num_t pg_num;
|
||||
uint64_t clean_count = 0, total_count = 0;
|
||||
// all possible peers
|
||||
std::vector<osd_num_t> all_peers;
|
||||
// target history and all potential peers
|
||||
std::vector<std::vector<osd_num_t>> target_history;
|
||||
std::vector<osd_num_t> all_peers;
|
||||
bool history_changed = false;
|
||||
// peer list from the last peering event
|
||||
std::vector<osd_num_t> cur_peers;
|
||||
// target_set is the "correct" peer OSD set for this PG
|
||||
std::vector<osd_num_t> target_set;
|
||||
// cur_set is the current set of connected peer OSDs for this PG
|
||||
|
|
Loading…
Reference in New Issue