forked from vitalif/vitastor
Prevent reenterability side effects during PG history operation resume
parent
73940adf07
commit
02e7be7dc9
|
@ -382,30 +382,6 @@ void osd_t::on_change_etcd_state_hook(std::map<std::string, etcd_kv_t> & changes
|
|||
}
|
||||
}
|
||||
|
||||
void osd_t::on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num)
|
||||
{
|
||||
auto pg_it = pgs.find({
|
||||
.pool_id = pool_id,
|
||||
.pg_num = pg_num,
|
||||
});
|
||||
if (pg_it != pgs.end() && pg_it->second.epoch > pg_it->second.reported_epoch &&
|
||||
st_cli.pool_config[pool_id].pg_config[pg_num].epoch >= pg_it->second.epoch)
|
||||
{
|
||||
pg_it->second.reported_epoch = st_cli.pool_config[pool_id].pg_config[pg_num].epoch;
|
||||
object_id oid = { 0 };
|
||||
bool first = true;
|
||||
for (auto op: pg_it->second.write_queue)
|
||||
{
|
||||
if (first || oid != op.first)
|
||||
{
|
||||
oid = op.first;
|
||||
first = false;
|
||||
continue_primary_write(op.second);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::on_load_config_hook(json11::Json::object & global_config)
|
||||
{
|
||||
json11::Json::object osd_config = this->config;
|
||||
|
|
|
@ -174,7 +174,8 @@ resume_3:
|
|||
resume_10:
|
||||
if (pg.epoch > pg.reported_epoch)
|
||||
{
|
||||
op_data->st = 10;
|
||||
#define PG_EPOCH_WAIT_STATE 10
|
||||
op_data->st = PG_EPOCH_WAIT_STATE;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -305,6 +306,50 @@ continue_others:
|
|||
}
|
||||
}
|
||||
|
||||
void osd_t::on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num)
|
||||
{
|
||||
auto pg_it = pgs.find({
|
||||
.pool_id = pool_id,
|
||||
.pg_num = pg_num,
|
||||
});
|
||||
if (pg_it == pgs.end())
|
||||
{
|
||||
return;
|
||||
}
|
||||
auto & pg = pg_it->second;
|
||||
if (pg.epoch > pg.reported_epoch &&
|
||||
st_cli.pool_config[pool_id].pg_config[pg_num].epoch >= pg.epoch)
|
||||
{
|
||||
pg.reported_epoch = st_cli.pool_config[pool_id].pg_config[pg_num].epoch;
|
||||
std::vector<object_id> resume_oids;
|
||||
for (auto & op: pg.write_queue)
|
||||
{
|
||||
if (op.second->op_data->st == PG_EPOCH_WAIT_STATE)
|
||||
{
|
||||
// Run separately to prevent side effects
|
||||
resume_oids.push_back(op.first);
|
||||
}
|
||||
}
|
||||
for (auto & oid: resume_oids)
|
||||
{
|
||||
auto pg_it = pgs.find({
|
||||
.pool_id = pool_id,
|
||||
.pg_num = pg_num,
|
||||
});
|
||||
if (pg_it != pgs.end())
|
||||
{
|
||||
auto & pg = pg_it->second;
|
||||
auto op_it = pg.write_queue.find(oid);
|
||||
if (op_it != pg.write_queue.end() &&
|
||||
op_it->second->op_data->st == PG_EPOCH_WAIT_STATE)
|
||||
{
|
||||
continue_primary_write(op_it->second);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool osd_t::remember_unstable_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state)
|
||||
{
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
|
|
Loading…
Reference in New Issue