From dfb6e15eaa615e4b99703e7ca7c2f8186ebdc06b Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 3 Apr 2020 13:03:42 +0300 Subject: [PATCH] Implement graceful stopping of PGs --- osd.cpp | 2 +- osd.h | 3 +++ osd_peering.cpp | 31 ++++++++++++++++++++++- osd_peering_pg.h | 3 ++- osd_primary.cpp | 64 ++++++++++++++++++++++++++++++++++++----------- osd_secondary.cpp | 1 - 6 files changed, 86 insertions(+), 18 deletions(-) diff --git a/osd.cpp b/osd.cpp index 4b509b92..175a03cf 100644 --- a/osd.cpp +++ b/osd.cpp @@ -370,6 +370,7 @@ void osd_t::exec_op(osd_op_t *cur_op) delete cur_op; return; } + inflight_ops++; cur_op->send_list.push_back(cur_op->reply.buf, OSD_PACKET_SIZE); if (cur_op->req.hdr.magic != SECONDARY_OSD_OP_MAGIC || cur_op->req.hdr.opcode < OSD_OP_MIN || cur_op->req.hdr.opcode > OSD_OP_MAX || @@ -382,7 +383,6 @@ void osd_t::exec_op(osd_op_t *cur_op) finish_op(cur_op, -EINVAL); return; } - inflight_ops++; if (cur_op->req.hdr.opcode == OSD_OP_TEST_SYNC_STAB_ALL) { exec_sync_stab_all(cur_op); diff --git a/osd.h b/osd.h index b9f86385..2a6e6bcd 100644 --- a/osd.h +++ b/osd.h @@ -200,6 +200,7 @@ class osd_t std::map osd_peer_fds; std::map pgs; + std::set dirty_pgs; uint64_t misplaced_objects = 0, degraded_objects = 0, incomplete_objects = 0; int peering_state = 0; unsigned pg_count = 0; @@ -265,6 +266,8 @@ class osd_t void handle_peers(); void repeer_pgs(osd_num_t osd_num, bool is_connected); void start_pg_peering(pg_num_t pg_num); + bool stop_pg(pg_num_t pg_num); + void finish_stop_pg(pg_t & pg); // flushing, recovery and backfill void submit_pg_flush_ops(pg_num_t pg_num); diff --git a/osd_peering.cpp b/osd_peering.cpp index f662dad5..ade12687 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -265,6 +265,7 @@ void osd_t::start_pg_peering(pg_num_t pg_num) auto & pg = pgs[pg_num]; pg.state = PG_PEERING; pg.print_state(); + // Reset PG state pg.state_dict.clear(); incomplete_objects -= pg.incomplete_objects.size(); misplaced_objects -= pg.misplaced_objects.size(); @@ -284,15 +285,18 @@ void osd_t::start_pg_peering(pg_num_t pg_num) cancel_op(p.second); } pg.write_queue.clear(); - // Forget this PG's unstable writes for (auto it = unstable_writes.begin(); it != unstable_writes.end(); ) { + // Forget this PG's unstable writes pg_num_t n = (it->first.oid.inode + it->first.oid.stripe / pg_stripe_size) % pg_count + 1; if (n == pg.pg_num) unstable_writes.erase(it++); else it++; } + pg.inflight = 0; + dirty_pgs.erase(pg.pg_num); + // Start peering pg.pg_cursize = 0; for (int role = 0; role < pg.cur_set.size(); role++) { @@ -472,3 +476,28 @@ void osd_t::start_pg_peering(pg_num_t pg_num) } ringloop->wakeup(); } + +bool osd_t::stop_pg(pg_num_t pg_num) +{ + auto pg_it = pgs.find(pg_num); + if (pg_it == pgs.end()) + { + return false; + } + auto & pg = pg_it->second; + if (!(pg.state & PG_ACTIVE)) + { + return false; + } + pg.state = pg.state & ~PG_ACTIVE | PG_STOPPING; + if (pg.inflight == 0) + { + finish_stop_pg(pg); + } + return true; +} + +void osd_t::finish_stop_pg(pg_t & pg) +{ + pg.state = PG_OFFLINE; +} diff --git a/osd_peering_pg.h b/osd_peering_pg.h index ddf16715..61b92ade 100644 --- a/osd_peering_pg.h +++ b/osd_peering_pg.h @@ -92,7 +92,7 @@ struct pg_flush_batch_t struct pg_t { - int state; + int state = PG_OFFLINE; uint64_t pg_cursize = 3, pg_size = 3, pg_minsize = 2; pg_num_t pg_num; uint64_t clean_count = 0, total_count = 0; @@ -112,6 +112,7 @@ struct pg_t pg_peering_state_t *peering_state = NULL; pg_flush_batch_t *flush_batch = NULL; + int inflight = 0; // including write_queue std::multimap write_queue; void calc_object_states(); diff --git a/osd_primary.cpp b/osd_primary.cpp index 2ecc3341..138fcff2 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -35,11 +35,24 @@ struct osd_primary_op_data_t uint64_t *prev_set = NULL; // for sync. oops, requires freeing std::vector *unstable_write_osds = NULL; + pg_num_t *dirty_pgs = NULL; + int dirty_pg_count = 0; obj_ver_id *unstable_writes = NULL; }; void osd_t::finish_op(osd_op_t *cur_op, int retval) { + inflight_ops--; + if (cur_op->op_data && cur_op->op_data->pg_num > 0) + { + auto & pg = pgs[cur_op->op_data->pg_num]; + int n = --pg.inflight; + assert(n >= 0); + if (n == 0 && (pg.state & PG_STOPPING)) + { + finish_stop_pg(pg); + } + } if (!cur_op->peer_fd) { // Copy lambda to be unaffected by `delete op` @@ -71,12 +84,14 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op) // But we must not use K in the process of calculating the PG number // So we calculate the PG number using a separate setting which should be per-inode (FIXME) pg_num_t pg_num = (cur_op->req.rw.inode + cur_op->req.rw.offset / pg_stripe_size) % pg_count + 1; - if (pgs.find(pg_num) == pgs.end() || !(pgs[pg_num].state & PG_ACTIVE)) + auto pg_it = pgs.find(pg_num); + if (pg_it == pgs.end() || !(pg_it->second.state & PG_ACTIVE)) { + // This OSD is not primary for this PG or the PG is inactive finish_op(cur_op, -EPIPE); return false; } - uint64_t pg_block_size = bs_block_size * pgs[pg_num].pg_minsize; + uint64_t pg_block_size = bs_block_size * pg_it->second.pg_minsize; object_id oid = { .inode = cur_op->req.rw.inode, // oid.stripe = starting offset of the parity stripe, so it can be mapped back to the PG @@ -91,13 +106,14 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op) return false; } osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)calloc( - sizeof(osd_primary_op_data_t) + sizeof(osd_rmw_stripe_t) * pgs[pg_num].pg_size, 1 + sizeof(osd_primary_op_data_t) + sizeof(osd_rmw_stripe_t) * pg_it->second.pg_size, 1 ); op_data->pg_num = pg_num; op_data->oid = oid; op_data->stripes = ((osd_rmw_stripe_t*)(op_data+1)); cur_op->op_data = op_data; - split_stripes(pgs[pg_num].pg_minsize, bs_block_size, (uint32_t)(cur_op->req.rw.offset - oid.stripe), cur_op->req.rw.len, op_data->stripes); + split_stripes(pg_it->second.pg_minsize, bs_block_size, (uint32_t)(cur_op->req.rw.offset - oid.stripe), cur_op->req.rw.len, op_data->stripes); + pg_it->second.inflight++; return true; } @@ -601,6 +617,7 @@ resume_7: // Remember PG as dirty to drop the connection when PG goes offline // (this is required because of the "lazy sync") this->clients[cur_op->peer_fd].dirty_pgs.insert(op_data->pg_num); + dirty_pgs.insert(op_data->pg_num); } // Remove version override object_id oid = op_data->oid; @@ -683,11 +700,13 @@ resume_2: goto finish; } // Save and clear unstable_writes - // FIXME: This is possible to do it on a per-client basis - // It would be cool not to copy them here at all, but someone has to deduplicate them by object IDs anyway - op_data->unstable_write_osds = new std::vector(); - op_data->unstable_writes = new obj_ver_id[this->unstable_writes.size()]; + // In theory it is possible to do in on a per-client basis, but this seems to be an unnecessary complication + // It would be cool not to copy these here at all, but someone has to deduplicate them by object IDs anyway { + op_data->unstable_write_osds = new std::vector(); + op_data->unstable_writes = new obj_ver_id[this->unstable_writes.size()]; + op_data->dirty_pgs = new pg_num_t[dirty_pgs.size()]; + op_data->dirty_pg_count = dirty_pgs.size(); osd_num_t last_osd = 0; int last_start = 0, last_end = 0; for (auto it = this->unstable_writes.begin(); it != this->unstable_writes.end(); it++) @@ -719,8 +738,15 @@ resume_2: .len = last_end - last_start, }); } + int dpg = 0; + for (auto dirty_pg_num: dirty_pgs) + { + pgs[dirty_pg_num].inflight++; + op_data->dirty_pgs[dpg++] = dirty_pg_num; + } + dirty_pgs.clear(); + this->unstable_writes.clear(); } - this->unstable_writes.clear(); if (immediate_commit != IMMEDIATE_ALL) { // SYNC @@ -740,6 +766,10 @@ resume_5: op_data->st = 5; return; resume_6: + for (int i = 0; i < op_data->dirty_pg_count; i++) + { + pgs[op_data->dirty_pgs[i]].inflight--; + } if (op_data->errors > 0) { // Return objects back into the unstable write set @@ -747,20 +777,23 @@ resume_6: { for (int i = 0; i < unstable_osd.len; i++) { - // Expect those from peered PGs + // Except those from peered PGs auto & w = op_data->unstable_writes[i]; - if (pgs[map_to_pg(w.oid)].state & PG_ACTIVE) + pg_num_t wpg = map_to_pg(w.oid); + if (pgs[wpg].state & PG_ACTIVE) { uint64_t & dest = this->unstable_writes[(osd_object_id_t){ .osd_num = unstable_osd.osd_num, .oid = w.oid, }]; dest = dest < w.version ? w.version : dest; + dirty_pgs.insert(wpg); } } } } // FIXME: Free those in the destructor? + delete op_data->dirty_pgs; delete op_data->unstable_write_osds; delete[] op_data->unstable_writes; op_data->unstable_writes = NULL; @@ -772,9 +805,12 @@ resume_6: else { finish: - auto it = clients.find(cur_op->peer_fd); - if (it != clients.end()) - it->second.dirty_pgs.clear(); + if (cur_op->peer_fd) + { + auto it = clients.find(cur_op->peer_fd); + if (it != clients.end()) + it->second.dirty_pgs.clear(); + } finish_op(cur_op, 0); } assert(syncs_in_progress.front() == cur_op); diff --git a/osd_secondary.cpp b/osd_secondary.cpp index ba5564d1..c6cc513f 100644 --- a/osd_secondary.cpp +++ b/osd_secondary.cpp @@ -4,7 +4,6 @@ void osd_t::secondary_op_callback(osd_op_t *op) { - inflight_ops--; if (op->req.hdr.opcode == OSD_OP_SECONDARY_READ || op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE) {