forked from vitalif/vitastor
Implement graceful stopping of PGs
parent
afe2e76c87
commit
dfb6e15eaa
2
osd.cpp
2
osd.cpp
|
@ -370,6 +370,7 @@ void osd_t::exec_op(osd_op_t *cur_op)
|
|||
delete cur_op;
|
||||
return;
|
||||
}
|
||||
inflight_ops++;
|
||||
cur_op->send_list.push_back(cur_op->reply.buf, OSD_PACKET_SIZE);
|
||||
if (cur_op->req.hdr.magic != SECONDARY_OSD_OP_MAGIC ||
|
||||
cur_op->req.hdr.opcode < OSD_OP_MIN || cur_op->req.hdr.opcode > OSD_OP_MAX ||
|
||||
|
@ -382,7 +383,6 @@ void osd_t::exec_op(osd_op_t *cur_op)
|
|||
finish_op(cur_op, -EINVAL);
|
||||
return;
|
||||
}
|
||||
inflight_ops++;
|
||||
if (cur_op->req.hdr.opcode == OSD_OP_TEST_SYNC_STAB_ALL)
|
||||
{
|
||||
exec_sync_stab_all(cur_op);
|
||||
|
|
3
osd.h
3
osd.h
|
@ -200,6 +200,7 @@ class osd_t
|
|||
|
||||
std::map<uint64_t, int> osd_peer_fds;
|
||||
std::map<pg_num_t, pg_t> pgs;
|
||||
std::set<pg_num_t> dirty_pgs;
|
||||
uint64_t misplaced_objects = 0, degraded_objects = 0, incomplete_objects = 0;
|
||||
int peering_state = 0;
|
||||
unsigned pg_count = 0;
|
||||
|
@ -265,6 +266,8 @@ class osd_t
|
|||
void handle_peers();
|
||||
void repeer_pgs(osd_num_t osd_num, bool is_connected);
|
||||
void start_pg_peering(pg_num_t pg_num);
|
||||
bool stop_pg(pg_num_t pg_num);
|
||||
void finish_stop_pg(pg_t & pg);
|
||||
|
||||
// flushing, recovery and backfill
|
||||
void submit_pg_flush_ops(pg_num_t pg_num);
|
||||
|
|
|
@ -265,6 +265,7 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
|
|||
auto & pg = pgs[pg_num];
|
||||
pg.state = PG_PEERING;
|
||||
pg.print_state();
|
||||
// Reset PG state
|
||||
pg.state_dict.clear();
|
||||
incomplete_objects -= pg.incomplete_objects.size();
|
||||
misplaced_objects -= pg.misplaced_objects.size();
|
||||
|
@ -284,15 +285,18 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
|
|||
cancel_op(p.second);
|
||||
}
|
||||
pg.write_queue.clear();
|
||||
// Forget this PG's unstable writes
|
||||
for (auto it = unstable_writes.begin(); it != unstable_writes.end(); )
|
||||
{
|
||||
// Forget this PG's unstable writes
|
||||
pg_num_t n = (it->first.oid.inode + it->first.oid.stripe / pg_stripe_size) % pg_count + 1;
|
||||
if (n == pg.pg_num)
|
||||
unstable_writes.erase(it++);
|
||||
else
|
||||
it++;
|
||||
}
|
||||
pg.inflight = 0;
|
||||
dirty_pgs.erase(pg.pg_num);
|
||||
// Start peering
|
||||
pg.pg_cursize = 0;
|
||||
for (int role = 0; role < pg.cur_set.size(); role++)
|
||||
{
|
||||
|
@ -472,3 +476,28 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
|
|||
}
|
||||
ringloop->wakeup();
|
||||
}
|
||||
|
||||
bool osd_t::stop_pg(pg_num_t pg_num)
|
||||
{
|
||||
auto pg_it = pgs.find(pg_num);
|
||||
if (pg_it == pgs.end())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
auto & pg = pg_it->second;
|
||||
if (!(pg.state & PG_ACTIVE))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
pg.state = pg.state & ~PG_ACTIVE | PG_STOPPING;
|
||||
if (pg.inflight == 0)
|
||||
{
|
||||
finish_stop_pg(pg);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void osd_t::finish_stop_pg(pg_t & pg)
|
||||
{
|
||||
pg.state = PG_OFFLINE;
|
||||
}
|
||||
|
|
|
@ -92,7 +92,7 @@ struct pg_flush_batch_t
|
|||
|
||||
struct pg_t
|
||||
{
|
||||
int state;
|
||||
int state = PG_OFFLINE;
|
||||
uint64_t pg_cursize = 3, pg_size = 3, pg_minsize = 2;
|
||||
pg_num_t pg_num;
|
||||
uint64_t clean_count = 0, total_count = 0;
|
||||
|
@ -112,6 +112,7 @@ struct pg_t
|
|||
pg_peering_state_t *peering_state = NULL;
|
||||
pg_flush_batch_t *flush_batch = NULL;
|
||||
|
||||
int inflight = 0; // including write_queue
|
||||
std::multimap<object_id, osd_op_t*> write_queue;
|
||||
|
||||
void calc_object_states();
|
||||
|
|
|
@ -35,11 +35,24 @@ struct osd_primary_op_data_t
|
|||
uint64_t *prev_set = NULL;
|
||||
// for sync. oops, requires freeing
|
||||
std::vector<unstable_osd_num_t> *unstable_write_osds = NULL;
|
||||
pg_num_t *dirty_pgs = NULL;
|
||||
int dirty_pg_count = 0;
|
||||
obj_ver_id *unstable_writes = NULL;
|
||||
};
|
||||
|
||||
void osd_t::finish_op(osd_op_t *cur_op, int retval)
|
||||
{
|
||||
inflight_ops--;
|
||||
if (cur_op->op_data && cur_op->op_data->pg_num > 0)
|
||||
{
|
||||
auto & pg = pgs[cur_op->op_data->pg_num];
|
||||
int n = --pg.inflight;
|
||||
assert(n >= 0);
|
||||
if (n == 0 && (pg.state & PG_STOPPING))
|
||||
{
|
||||
finish_stop_pg(pg);
|
||||
}
|
||||
}
|
||||
if (!cur_op->peer_fd)
|
||||
{
|
||||
// Copy lambda to be unaffected by `delete op`
|
||||
|
@ -71,12 +84,14 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
|
|||
// But we must not use K in the process of calculating the PG number
|
||||
// So we calculate the PG number using a separate setting which should be per-inode (FIXME)
|
||||
pg_num_t pg_num = (cur_op->req.rw.inode + cur_op->req.rw.offset / pg_stripe_size) % pg_count + 1;
|
||||
if (pgs.find(pg_num) == pgs.end() || !(pgs[pg_num].state & PG_ACTIVE))
|
||||
auto pg_it = pgs.find(pg_num);
|
||||
if (pg_it == pgs.end() || !(pg_it->second.state & PG_ACTIVE))
|
||||
{
|
||||
// This OSD is not primary for this PG or the PG is inactive
|
||||
finish_op(cur_op, -EPIPE);
|
||||
return false;
|
||||
}
|
||||
uint64_t pg_block_size = bs_block_size * pgs[pg_num].pg_minsize;
|
||||
uint64_t pg_block_size = bs_block_size * pg_it->second.pg_minsize;
|
||||
object_id oid = {
|
||||
.inode = cur_op->req.rw.inode,
|
||||
// oid.stripe = starting offset of the parity stripe, so it can be mapped back to the PG
|
||||
|
@ -91,13 +106,14 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
|
|||
return false;
|
||||
}
|
||||
osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)calloc(
|
||||
sizeof(osd_primary_op_data_t) + sizeof(osd_rmw_stripe_t) * pgs[pg_num].pg_size, 1
|
||||
sizeof(osd_primary_op_data_t) + sizeof(osd_rmw_stripe_t) * pg_it->second.pg_size, 1
|
||||
);
|
||||
op_data->pg_num = pg_num;
|
||||
op_data->oid = oid;
|
||||
op_data->stripes = ((osd_rmw_stripe_t*)(op_data+1));
|
||||
cur_op->op_data = op_data;
|
||||
split_stripes(pgs[pg_num].pg_minsize, bs_block_size, (uint32_t)(cur_op->req.rw.offset - oid.stripe), cur_op->req.rw.len, op_data->stripes);
|
||||
split_stripes(pg_it->second.pg_minsize, bs_block_size, (uint32_t)(cur_op->req.rw.offset - oid.stripe), cur_op->req.rw.len, op_data->stripes);
|
||||
pg_it->second.inflight++;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -601,6 +617,7 @@ resume_7:
|
|||
// Remember PG as dirty to drop the connection when PG goes offline
|
||||
// (this is required because of the "lazy sync")
|
||||
this->clients[cur_op->peer_fd].dirty_pgs.insert(op_data->pg_num);
|
||||
dirty_pgs.insert(op_data->pg_num);
|
||||
}
|
||||
// Remove version override
|
||||
object_id oid = op_data->oid;
|
||||
|
@ -683,11 +700,13 @@ resume_2:
|
|||
goto finish;
|
||||
}
|
||||
// Save and clear unstable_writes
|
||||
// FIXME: This is possible to do it on a per-client basis
|
||||
// It would be cool not to copy them here at all, but someone has to deduplicate them by object IDs anyway
|
||||
op_data->unstable_write_osds = new std::vector<unstable_osd_num_t>();
|
||||
op_data->unstable_writes = new obj_ver_id[this->unstable_writes.size()];
|
||||
// In theory it is possible to do in on a per-client basis, but this seems to be an unnecessary complication
|
||||
// It would be cool not to copy these here at all, but someone has to deduplicate them by object IDs anyway
|
||||
{
|
||||
op_data->unstable_write_osds = new std::vector<unstable_osd_num_t>();
|
||||
op_data->unstable_writes = new obj_ver_id[this->unstable_writes.size()];
|
||||
op_data->dirty_pgs = new pg_num_t[dirty_pgs.size()];
|
||||
op_data->dirty_pg_count = dirty_pgs.size();
|
||||
osd_num_t last_osd = 0;
|
||||
int last_start = 0, last_end = 0;
|
||||
for (auto it = this->unstable_writes.begin(); it != this->unstable_writes.end(); it++)
|
||||
|
@ -719,8 +738,15 @@ resume_2:
|
|||
.len = last_end - last_start,
|
||||
});
|
||||
}
|
||||
int dpg = 0;
|
||||
for (auto dirty_pg_num: dirty_pgs)
|
||||
{
|
||||
pgs[dirty_pg_num].inflight++;
|
||||
op_data->dirty_pgs[dpg++] = dirty_pg_num;
|
||||
}
|
||||
dirty_pgs.clear();
|
||||
this->unstable_writes.clear();
|
||||
}
|
||||
this->unstable_writes.clear();
|
||||
if (immediate_commit != IMMEDIATE_ALL)
|
||||
{
|
||||
// SYNC
|
||||
|
@ -740,6 +766,10 @@ resume_5:
|
|||
op_data->st = 5;
|
||||
return;
|
||||
resume_6:
|
||||
for (int i = 0; i < op_data->dirty_pg_count; i++)
|
||||
{
|
||||
pgs[op_data->dirty_pgs[i]].inflight--;
|
||||
}
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
// Return objects back into the unstable write set
|
||||
|
@ -747,20 +777,23 @@ resume_6:
|
|||
{
|
||||
for (int i = 0; i < unstable_osd.len; i++)
|
||||
{
|
||||
// Expect those from peered PGs
|
||||
// Except those from peered PGs
|
||||
auto & w = op_data->unstable_writes[i];
|
||||
if (pgs[map_to_pg(w.oid)].state & PG_ACTIVE)
|
||||
pg_num_t wpg = map_to_pg(w.oid);
|
||||
if (pgs[wpg].state & PG_ACTIVE)
|
||||
{
|
||||
uint64_t & dest = this->unstable_writes[(osd_object_id_t){
|
||||
.osd_num = unstable_osd.osd_num,
|
||||
.oid = w.oid,
|
||||
}];
|
||||
dest = dest < w.version ? w.version : dest;
|
||||
dirty_pgs.insert(wpg);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// FIXME: Free those in the destructor?
|
||||
delete op_data->dirty_pgs;
|
||||
delete op_data->unstable_write_osds;
|
||||
delete[] op_data->unstable_writes;
|
||||
op_data->unstable_writes = NULL;
|
||||
|
@ -772,9 +805,12 @@ resume_6:
|
|||
else
|
||||
{
|
||||
finish:
|
||||
auto it = clients.find(cur_op->peer_fd);
|
||||
if (it != clients.end())
|
||||
it->second.dirty_pgs.clear();
|
||||
if (cur_op->peer_fd)
|
||||
{
|
||||
auto it = clients.find(cur_op->peer_fd);
|
||||
if (it != clients.end())
|
||||
it->second.dirty_pgs.clear();
|
||||
}
|
||||
finish_op(cur_op, 0);
|
||||
}
|
||||
assert(syncs_in_progress.front() == cur_op);
|
||||
|
|
|
@ -4,7 +4,6 @@
|
|||
|
||||
void osd_t::secondary_op_callback(osd_op_t *op)
|
||||
{
|
||||
inflight_ops--;
|
||||
if (op->req.hdr.opcode == OSD_OP_SECONDARY_READ ||
|
||||
op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE)
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue