diff --git a/osd.cpp b/osd.cpp index 412f91fb..a048e8fa 100644 --- a/osd.cpp +++ b/osd.cpp @@ -22,6 +22,7 @@ const char* osd_op_names[] = { "primary_read", "primary_write", "primary_sync", + "primary_delete", }; osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop) @@ -446,7 +447,7 @@ void osd_t::exec_op(osd_op_t *cur_op) cur_op->req.hdr.opcode < OSD_OP_MIN || cur_op->req.hdr.opcode > OSD_OP_MAX || (cur_op->req.hdr.opcode == OSD_OP_SECONDARY_READ || cur_op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE) && (cur_op->req.sec_rw.len > OSD_RW_MAX || cur_op->req.sec_rw.len % bs_disk_alignment || cur_op->req.sec_rw.offset % bs_disk_alignment) || - (cur_op->req.hdr.opcode == OSD_OP_READ || cur_op->req.hdr.opcode == OSD_OP_WRITE) && + (cur_op->req.hdr.opcode == OSD_OP_READ || cur_op->req.hdr.opcode == OSD_OP_WRITE || cur_op->req.hdr.opcode == OSD_OP_DELETE) && (cur_op->req.rw.len > OSD_RW_MAX || cur_op->req.rw.len % bs_disk_alignment || cur_op->req.rw.offset % bs_disk_alignment)) { // Bad command @@ -484,6 +485,10 @@ void osd_t::exec_op(osd_op_t *cur_op) { continue_primary_sync(cur_op); } + else if (cur_op->req.hdr.opcode == OSD_OP_DELETE) + { + continue_primary_del(cur_op); + } else { exec_secondary(cur_op); diff --git a/osd.h b/osd.h index 57a7c18c..4f5c880c 100644 --- a/osd.h +++ b/osd.h @@ -361,6 +361,7 @@ class osd_t // op execution void exec_op(osd_op_t *cur_op); + void finish_op(osd_op_t *cur_op, int retval); // secondary ops void exec_sync_stab_all(osd_op_t *cur_op); @@ -374,11 +375,14 @@ class osd_t void continue_primary_read(osd_op_t *cur_op); void continue_primary_write(osd_op_t *cur_op); void continue_primary_sync(osd_op_t *cur_op); - void finish_op(osd_op_t *cur_op, int retval); + void continue_primary_del(osd_op_t *cur_op); + bool check_write_queue(osd_op_t *cur_op, pg_t & pg); + void remove_object_from_state(object_id & oid, pg_osd_set_state_t *object_state, pg_t &pg); + bool finalize_primary_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state); void handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version); void pg_cancel_write_queue(pg_t & pg, object_id oid, int retval); void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op); - void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_state_t *object_state); + void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_t & loc_set); void submit_primary_sync_subops(osd_op_t *cur_op); void submit_primary_stab_subops(osd_op_t *cur_op); diff --git a/osd_ops.h b/osd_ops.h index 4e2829e2..2b5d6817 100644 --- a/osd_ops.h +++ b/osd_ops.h @@ -22,7 +22,8 @@ #define OSD_OP_READ 10 #define OSD_OP_WRITE 11 #define OSD_OP_SYNC 12 -#define OSD_OP_MAX 12 +#define OSD_OP_DELETE 13 +#define OSD_OP_MAX 13 // Alignment & limit for read/write operations #ifndef MEM_ALIGNMENT #define MEM_ALIGNMENT 512 @@ -59,6 +60,7 @@ struct __attribute__((__packed__)) osd_op_secondary_rw_t // object object_id oid; // read/write version (automatic or specific) + // FIXME deny values close to UINT64_MAX uint64_t version; // offset uint32_t offset; diff --git a/osd_peering.cpp b/osd_peering.cpp index b323e55e..37e8963b 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -287,6 +287,7 @@ void osd_t::start_pg_peering(pg_num_t pg_num) // Calculate current write OSD set pg.pg_cursize = 0; pg.cur_set.resize(pg.target_set.size()); + pg.cur_loc_set.clear(); for (int role = 0; role < pg.target_set.size(); role++) { pg.cur_set[role] = pg.target_set[role] == this->osd_num || @@ -294,6 +295,11 @@ void osd_t::start_pg_peering(pg_num_t pg_num) if (pg.cur_set[role] != 0) { pg.pg_cursize++; + pg.cur_loc_set.push_back({ + .role = (uint64_t)role, + .osd_num = pg.cur_set[role], + .outdated = false, + }); } } if (pg.target_history.size()) diff --git a/osd_peering_pg.h b/osd_peering_pg.h index a0e18f95..54a44dcd 100644 --- a/osd_peering_pg.h +++ b/osd_peering_pg.h @@ -114,6 +114,8 @@ struct pg_t // cur_set is the current set of connected peer OSDs for this PG // cur_set = (role => osd_num or UINT64_MAX if missing). role numbers begin with zero std::vector cur_set; + // same thing in state_dict-like format + pg_osd_set_t cur_loc_set; // moved object map. by default, each object is considered to reside on the cur_set. // this map stores all objects that differ. // it may consume up to ~ (raw storage / object size) * 24 bytes in the worst case scenario diff --git a/osd_primary.cpp b/osd_primary.cpp index 6ed74024..b04362d2 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -155,6 +155,33 @@ resume_2: finish_op(cur_op, cur_op->req.rw.len); } +bool osd_t::check_write_queue(osd_op_t *cur_op, pg_t & pg) +{ + osd_primary_op_data_t *op_data = cur_op->op_data; + // Check if actions are pending for this object + auto act_it = pg.flush_actions.lower_bound((obj_piece_id_t){ + .oid = op_data->oid, + .osd_num = 0, + }); + if (act_it != pg.flush_actions.end() && + act_it->first.oid.inode == op_data->oid.inode && + (act_it->first.oid.stripe & ~STRIPE_MASK) == op_data->oid.stripe) + { + pg.write_queue.emplace(op_data->oid, cur_op); + return false; + } + // Check if there are other write requests to the same object + auto vo_it = pg.write_queue.find(op_data->oid); + if (vo_it != pg.write_queue.end()) + { + op_data->st = 1; + pg.write_queue.emplace(op_data->oid, cur_op); + return false; + } + pg.write_queue.emplace(op_data->oid, cur_op); + return true; +} + void osd_t::continue_primary_write(osd_op_t *cur_op) { if (!cur_op->op_data && !prepare_primary_rw(cur_op)) @@ -172,30 +199,9 @@ void osd_t::continue_primary_write(osd_op_t *cur_op) else if (op_data->st == 7) goto resume_7; else if (op_data->st == 8) goto resume_8; assert(op_data->st == 0); - // Check if actions are pending for this object + if (!check_write_queue(cur_op, pg)) { - auto act_it = pg.flush_actions.lower_bound((obj_piece_id_t){ - .oid = op_data->oid, - .osd_num = 0, - }); - if (act_it != pg.flush_actions.end() && - act_it->first.oid.inode == op_data->oid.inode && - (act_it->first.oid.stripe & ~STRIPE_MASK) == op_data->oid.stripe) - { - pg.write_queue.emplace(op_data->oid, cur_op); - return; - } - } - // Check if there are other write requests to the same object - { - auto vo_it = pg.write_queue.find(op_data->oid); - if (vo_it != pg.write_queue.end()) - { - op_data->st = 1; - pg.write_queue.emplace(op_data->oid, cur_op); - return; - } - pg.write_queue.emplace(op_data->oid, cur_op); + return; } resume_1: // Determine blocks to read and write @@ -241,7 +247,7 @@ resume_5: if (op_data->object_state->state & OBJ_MISPLACED) { // Remove extra chunks - submit_primary_del_subops(cur_op, pg.cur_set.data(), op_data->object_state); + submit_primary_del_subops(cur_op, pg.cur_set.data(), op_data->object_state->osd_set); if (op_data->n_subops > 0) { op_data->st = 8; @@ -255,118 +261,19 @@ resume_8: } } // Clear object state - if (op_data->object_state->state & OBJ_INCOMPLETE) - { - // Successful write means that object is not incomplete anymore - incomplete_objects--; - pg.incomplete_objects.erase(op_data->oid); - if (!pg.incomplete_objects.size()) - { - pg.state = pg.state & ~PG_HAS_INCOMPLETE; - report_pg_state(pg); - } - } - else if (op_data->object_state->state & OBJ_DEGRADED) - { - degraded_objects--; - pg.degraded_objects.erase(op_data->oid); - if (!pg.degraded_objects.size()) - { - pg.state = pg.state & ~PG_HAS_DEGRADED; - report_pg_state(pg); - } - } - else if (op_data->object_state->state & OBJ_MISPLACED) - { - misplaced_objects--; - pg.misplaced_objects.erase(op_data->oid); - if (!pg.misplaced_objects.size()) - { - pg.state = pg.state & ~PG_HAS_MISPLACED; - report_pg_state(pg); - } - } - else - { - throw std::runtime_error("Invalid object state during recovery: "+std::to_string(op_data->object_state->state)); - } + remove_object_from_state(op_data->oid, op_data->object_state, pg); pg.clean_count++; - op_data->object_state->object_count--; - if (!op_data->object_state->object_count) - { - pg.state_dict.erase(op_data->object_state->osd_set); - } - } - // FIXME: Check for immediate_commit == IMMEDIATE_SMALL - if (immediate_commit == IMMEDIATE_ALL) - { - op_data->unstable_write_osds = new std::vector(); - op_data->unstable_writes = new obj_ver_id[pg.pg_cursize]; - { - int last_start = 0; - osd_num_t *osd_set = pg.cur_set.data(); - for (int role = 0; role < pg.pg_size; role++) - { - if (osd_set[role] != 0) - { - op_data->unstable_writes[last_start] = (obj_ver_id){ - .oid = { - .inode = op_data->oid.inode, - .stripe = op_data->oid.stripe | role, - }, - .version = op_data->fact_ver, - }; - op_data->unstable_write_osds->push_back((unstable_osd_num_t){ - .osd_num = osd_set[role], - .start = last_start, - .len = 1, - }); - last_start++; - } - } - } - // Stabilize version sets - submit_primary_stab_subops(cur_op); -resume_6: - op_data->st = 6; - return; -resume_7: - // FIXME: Free those in the destructor? - delete op_data->unstable_write_osds; - delete[] op_data->unstable_writes; - op_data->unstable_writes = NULL; - op_data->unstable_write_osds = NULL; - if (op_data->errors > 0) - { - pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); - return; - } - } - else - { - // Remember version as unstable - osd_num_t *osd_set = pg.cur_set.data(); - for (int role = 0; role < pg.pg_size; role++) - { - if (osd_set[role] != 0) - { - this->unstable_writes[(osd_object_id_t){ - .osd_num = osd_set[role], - .oid = { - .inode = op_data->oid.inode, - .stripe = op_data->oid.stripe | role, - }, - }] = op_data->fact_ver; - } - } - // Remember PG as dirty to drop the connection when PG goes offline - // (this is required because of the "lazy sync") - this->clients[cur_op->peer_fd].dirty_pgs.insert(op_data->pg_num); - dirty_pgs.insert(op_data->pg_num); } // Remove version override + pg.ver_override.erase(op_data->oid); + // FIXME: Check for immediate_commit == IMMEDIATE_SMALL +resume_6: +resume_7: + if (!finalize_primary_write(cur_op, pg, pg.cur_loc_set, 6)) + { + return; + } object_id oid = op_data->oid; - pg.ver_override.erase(oid); finish_op(cur_op, cur_op->req.rw.len); // Continue other write operations to the same object auto next_it = pg.write_queue.find(oid); @@ -381,6 +288,77 @@ resume_7: } } +bool osd_t::finalize_primary_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state) +{ + osd_primary_op_data_t *op_data = cur_op->op_data; + if (op_data->st == base_state) + { + goto resume_6; + } + else if (op_data->st == base_state+1) + { + goto resume_7; + } + if (immediate_commit == IMMEDIATE_ALL) + { + op_data->unstable_write_osds = new std::vector(); + op_data->unstable_writes = new obj_ver_id[loc_set.size()]; + { + int last_start = 0; + for (auto & chunk: loc_set) + { + op_data->unstable_writes[last_start] = (obj_ver_id){ + .oid = { + .inode = op_data->oid.inode, + .stripe = op_data->oid.stripe | chunk.role, + }, + .version = op_data->fact_ver, + }; + op_data->unstable_write_osds->push_back((unstable_osd_num_t){ + .osd_num = chunk.osd_num, + .start = last_start, + .len = 1, + }); + last_start++; + } + } + submit_primary_stab_subops(cur_op); +resume_6: + op_data->st = 6; + return false; +resume_7: + // FIXME: Free those in the destructor? + delete op_data->unstable_write_osds; + delete[] op_data->unstable_writes; + op_data->unstable_writes = NULL; + op_data->unstable_write_osds = NULL; + if (op_data->errors > 0) + { + pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); + return false; + } + } + else + { + // Remember version as unstable + for (auto & chunk: loc_set) + { + this->unstable_writes[(osd_object_id_t){ + .osd_num = chunk.osd_num, + .oid = { + .inode = op_data->oid.inode, + .stripe = op_data->oid.stripe | chunk.role, + }, + }] = op_data->fact_ver; + } + // Remember PG as dirty to drop the connection when PG goes offline + // (this is required because of the "lazy sync") + this->clients[cur_op->peer_fd].dirty_pgs.insert(op_data->pg_num); + dirty_pgs.insert(op_data->pg_num); + } + return true; +} + // Save and clear unstable_writes -> SYNC all -> STABLE all void osd_t::continue_primary_sync(osd_op_t *cur_op) { @@ -543,3 +521,135 @@ finish: goto resume_2; } } + +// Decrement pg_osd_set_state_t's object_count and change PG state accordingly +void osd_t::remove_object_from_state(object_id & oid, pg_osd_set_state_t *object_state, pg_t & pg) +{ + if (object_state->state & OBJ_INCOMPLETE) + { + // Successful write means that object is not incomplete anymore + this->incomplete_objects--; + pg.incomplete_objects.erase(oid); + if (!pg.incomplete_objects.size()) + { + pg.state = pg.state & ~PG_HAS_INCOMPLETE; + report_pg_state(pg); + } + } + else if (object_state->state & OBJ_DEGRADED) + { + this->degraded_objects--; + pg.degraded_objects.erase(oid); + if (!pg.degraded_objects.size()) + { + pg.state = pg.state & ~PG_HAS_DEGRADED; + report_pg_state(pg); + } + } + else if (object_state->state & OBJ_MISPLACED) + { + this->misplaced_objects--; + pg.misplaced_objects.erase(oid); + if (!pg.misplaced_objects.size()) + { + pg.state = pg.state & ~PG_HAS_MISPLACED; + report_pg_state(pg); + } + } + else + { + throw std::runtime_error("BUG: Invalid object state: "+std::to_string(object_state->state)); + } + object_state->object_count--; + if (!object_state->object_count) + { + pg.state_dict.erase(object_state->osd_set); + } +} + +void osd_t::continue_primary_del(osd_op_t *cur_op) +{ + if (!cur_op->op_data && !prepare_primary_rw(cur_op)) + { + return; + } + osd_primary_op_data_t *op_data = cur_op->op_data; + auto & pg = pgs[op_data->pg_num]; + if (op_data->st == 1) goto resume_1; + else if (op_data->st == 2) goto resume_2; + else if (op_data->st == 3) goto resume_3; + else if (op_data->st == 4) goto resume_4; + else if (op_data->st == 5) goto resume_5; + else if (op_data->st == 6) goto resume_6; + else if (op_data->st == 7) goto resume_7; + assert(op_data->st == 0); + // Delete is forbidden even in active PGs if they're also degraded or have previous dead OSDs + if (pg.state & (PG_DEGRADED | PG_LEFT_ON_DEAD)) + { + finish_op(cur_op, -EBUSY); + return; + } + if (!check_write_queue(cur_op, pg)) + { + return; + } +resume_1: + // Determine which OSDs contain this object and delete it + op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state); + // Submit 1 read to determine the actual version number + submit_primary_subops(SUBMIT_RMW_READ, pg.pg_size, op_data->prev_set, cur_op); +resume_2: + op_data->st = 2; + return; +resume_3: + if (op_data->errors > 0) + { + pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); + return; + } + // Save version override for parallel reads + pg.ver_override[op_data->oid] = op_data->fact_ver; + // Submit deletes + op_data->fact_ver++; + submit_primary_del_subops(cur_op, NULL, op_data->object_state ? op_data->object_state->osd_set : pg.cur_loc_set); +resume_4: + op_data->st = 4; + return; +resume_5: + if (op_data->errors > 0) + { + pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); + return; + } + // Remove version override + pg.ver_override.erase(op_data->oid); +resume_6: +resume_7: + if (!finalize_primary_write(cur_op, pg, op_data->object_state ? op_data->object_state->osd_set : pg.cur_loc_set, 6)) + { + return; + } + // Adjust PG stats after "instant stabilize", because we need object_state above + if (!op_data->object_state) + { + pg.clean_count--; + } + else + { + remove_object_from_state(op_data->oid, op_data->object_state, pg); + } + pg.total_count--; + object_id oid = op_data->oid; + finish_op(cur_op, cur_op->req.rw.len); + // Continue other write operations to the same object + auto next_it = pg.write_queue.find(oid); + auto this_it = next_it; + next_it++; + pg.write_queue.erase(this_it); + if (next_it != pg.write_queue.end() && + next_it->first == oid) + { + osd_op_t *next_op = next_it->second; + continue_primary_write(next_op); + } +} diff --git a/osd_primary_subops.cpp b/osd_primary_subops.cpp index 9b704858..eea5e7fc 100644 --- a/osd_primary_subops.cpp +++ b/osd_primary_subops.cpp @@ -1,9 +1,5 @@ #include "osd_primary.h" -#define SUBMIT_READ 0 -#define SUBMIT_RMW_READ 1 -#define SUBMIT_WRITE 2 - void osd_t::autosync() { if (immediate_commit != IMMEDIATE_ALL && !autosync_op) @@ -227,6 +223,10 @@ void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, { continue_primary_sync(cur_op); } + else if (cur_op->req.hdr.opcode == OSD_OP_DELETE) + { + continue_primary_del(cur_op); + } else { throw std::runtime_error("BUG: unknown opcode"); @@ -234,13 +234,13 @@ void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, } } -void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_state_t *object_state) +void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_t & loc_set) { osd_primary_op_data_t *op_data = cur_op->op_data; int extra_chunks = 0; - for (auto chunk: object_state->osd_set) + for (auto & chunk: loc_set) { - if (chunk.osd_num != cur_set[chunk.role]) + if (!cur_set || chunk.osd_num != cur_set[chunk.role]) { extra_chunks++; } @@ -254,9 +254,9 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_os osd_op_t *subops = new osd_op_t[extra_chunks]; op_data->subops = subops; int i = 0; - for (auto chunk: object_state->osd_set) + for (auto & chunk: loc_set) { - if (chunk.osd_num != cur_set[chunk.role]) + if (!cur_set || chunk.osd_num != cur_set[chunk.role]) { if (chunk.osd_num == this->osd_num) {