diff --git a/src/osd.h b/src/osd.h index 75661ad2..0f7be0a0 100644 --- a/src/osd.h +++ b/src/osd.h @@ -239,8 +239,8 @@ class osd_t void continue_primary_sync(osd_op_t *cur_op); void continue_primary_del(osd_op_t *cur_op); bool check_write_queue(osd_op_t *cur_op, pg_t & pg); - void remove_object_from_state(object_id & oid, pg_osd_set_state_t *object_state, pg_t &pg); - void free_object_state(pg_t & pg, pg_osd_set_state_t **object_state); + void remove_object_from_state(object_id & oid, pg_osd_set_state_t **object_state, pg_t &pg); + void deref_object_state(pg_t & pg, pg_osd_set_state_t **object_state, bool deref); bool remember_unstable_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state); void handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op); void handle_primary_bs_subop(osd_op_t *subop); diff --git a/src/osd_peering_pg.cpp b/src/osd_peering_pg.cpp index c744b071..97002084 100644 --- a/src/osd_peering_pg.cpp +++ b/src/osd_peering_pg.cpp @@ -322,67 +322,73 @@ void pg_obj_state_check_t::finish_object() } else { - auto it = pg->state_dict.find(osd_set); - if (it == pg->state_dict.end()) - { - std::vector read_target; - if (replicated) - { - for (auto & o: osd_set) - { - if (!(o.loc_bad & LOC_OUTDATED)) - { - read_target.push_back(o.osd_num); - } - } - while (read_target.size() < pg->pg_size) - { - // FIXME: This is because we then use .data() and assume it's at least long - read_target.push_back(0); - } - } - else - { - read_target.resize(pg->pg_size); - for (int i = 0; i < pg->pg_size; i++) - { - read_target[i] = 0; - } - for (auto & o: osd_set) - { - if (!(o.loc_bad & LOC_OUTDATED)) - { - read_target[o.role] = o.osd_num; - } - } - } - pg->state_dict[osd_set] = { - .read_target = read_target, - .osd_set = osd_set, - .state = state, - .object_count = 1, - }; - it = pg->state_dict.find(osd_set); - } - else - { - it->second.object_count++; - } - if (state & OBJ_INCOMPLETE) - { - pg->incomplete_objects[oid] = &it->second; - } - else if (state & OBJ_DEGRADED) - { - pg->degraded_objects[oid] = &it->second; - } - else - { - pg->misplaced_objects[oid] = &it->second; - } + pg->add_object_to_state(oid, state, osd_set); } } +pg_osd_set_state_t* pg_t::add_object_to_state(const object_id oid, const uint64_t state, const pg_osd_set_t & osd_set) +{ + auto it = state_dict.find(osd_set); + if (it == state_dict.end()) + { + std::vector read_target; + if (scheme == POOL_SCHEME_REPLICATED) + { + for (auto & o: osd_set) + { + if (!(o.loc_bad & LOC_OUTDATED)) + { + read_target.push_back(o.osd_num); + } + } + while (read_target.size() < pg_size) + { + // FIXME: This is because we then use .data() and assume it's at least long + read_target.push_back(0); + } + } + else + { + read_target.resize(pg_size); + for (int i = 0; i < pg_size; i++) + { + read_target[i] = 0; + } + for (auto & o: osd_set) + { + if (!o.loc_bad) + { + read_target[o.role] = o.osd_num; + } + } + } + state_dict[osd_set] = { + .read_target = read_target, + .osd_set = osd_set, + .state = state, + .object_count = 1, + }; + it = state_dict.find(osd_set); + } + else + { + it->second.object_count++; + } + if (state & OBJ_INCOMPLETE) + { + incomplete_objects[oid] = &it->second; + } + else if (state & OBJ_DEGRADED) + { + degraded_objects[oid] = &it->second; + } + else + { + misplaced_objects[oid] = &it->second; + } + return &it->second; +} + // FIXME: Write at least some tests for this function void pg_t::calc_object_states(int log_level) { diff --git a/src/osd_peering_pg.h b/src/osd_peering_pg.h index 3fbd0c55..a4d7c617 100644 --- a/src/osd_peering_pg.h +++ b/src/osd_peering_pg.h @@ -33,6 +33,7 @@ struct pg_osd_set_state_t pg_osd_set_t osd_set; uint64_t state = 0; uint64_t object_count = 0; + uint64_t ref_count = 0; }; struct pg_list_result_t @@ -120,6 +121,7 @@ struct pg_t int inflight = 0; // including write_queue std::multimap write_queue; + pg_osd_set_state_t* add_object_to_state(const object_id oid, const uint64_t state, const pg_osd_set_t & osd_set); void calc_object_states(int log_level); void print_state(); }; diff --git a/src/osd_primary.cpp b/src/osd_primary.cpp index 95d947c3..aeb0d3a1 100644 --- a/src/osd_primary.cpp +++ b/src/osd_primary.cpp @@ -267,9 +267,23 @@ resume_2: } // Decrement pg_osd_set_state_t's object_count and change PG state accordingly -void osd_t::remove_object_from_state(object_id & oid, pg_osd_set_state_t *object_state, pg_t & pg) +void osd_t::remove_object_from_state(object_id & oid, pg_osd_set_state_t **object_state, pg_t & pg) { - if (object_state->state & OBJ_INCOMPLETE) + if (!*object_state) + { + return; + } + pg_osd_set_state_t *recheck_state = NULL; + get_object_osd_set(pg, oid, NULL, &recheck_state); + if (recheck_state != *object_state) + { + recheck_state->ref_count++; + (*object_state)->ref_count--; + *object_state = recheck_state; + return; + } + (*object_state)->object_count--; + if ((*object_state)->state & OBJ_INCOMPLETE) { // Successful write means that object is not incomplete anymore this->incomplete_objects--; @@ -280,7 +294,7 @@ void osd_t::remove_object_from_state(object_id & oid, pg_osd_set_state_t *object report_pg_state(pg); } } - else if (object_state->state & OBJ_DEGRADED) + else if ((*object_state)->state & OBJ_DEGRADED) { this->degraded_objects--; pg.degraded_objects.erase(oid); @@ -290,7 +304,7 @@ void osd_t::remove_object_from_state(object_id & oid, pg_osd_set_state_t *object report_pg_state(pg); } } - else if (object_state->state & OBJ_MISPLACED) + else if ((*object_state)->state & OBJ_MISPLACED) { this->misplaced_objects--; pg.misplaced_objects.erase(oid); @@ -302,16 +316,23 @@ void osd_t::remove_object_from_state(object_id & oid, pg_osd_set_state_t *object } else { - throw std::runtime_error("BUG: Invalid object state: "+std::to_string(object_state->state)); + throw std::runtime_error("BUG: Invalid object state: "+std::to_string((*object_state)->state)); } } -void osd_t::free_object_state(pg_t & pg, pg_osd_set_state_t **object_state) +void osd_t::deref_object_state(pg_t & pg, pg_osd_set_state_t **object_state, bool deref) { - if (*object_state && !(--(*object_state)->object_count)) + if (*object_state) { - pg.state_dict.erase((*object_state)->osd_set); - *object_state = NULL; + if (deref) + { + (*object_state)->ref_count--; + } + if (!(*object_state)->object_count && !(*object_state)->ref_count) + { + pg.state_dict.erase((*object_state)->osd_set); + *object_state = NULL; + } } } @@ -342,6 +363,10 @@ void osd_t::continue_primary_del(osd_op_t *cur_op) resume_1: // Determine which OSDs contain this object and delete it op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state); + if (op_data->object_state) + { + op_data->object_state->ref_count++; + } // Submit 1 read to determine the actual version number submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, op_data->prev_set, cur_op); resume_2: @@ -350,12 +375,14 @@ resume_2: resume_3: if (op_data->errors > 0) { + deref_object_state(pg, &op_data->object_state, true); pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode); return; } // Check CAS version if (cur_op->req.rw.version && op_data->fact_ver != (cur_op->req.rw.version-1)) { + deref_object_state(pg, &op_data->object_state, true); cur_op->reply.hdr.retval = -EINTR; cur_op->reply.rw.version = op_data->fact_ver; goto continue_others; @@ -371,6 +398,7 @@ resume_4: resume_5: if (op_data->errors > 0) { + deref_object_state(pg, &op_data->object_state, true); pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode); return; } @@ -383,8 +411,8 @@ resume_5: } else { - remove_object_from_state(op_data->oid, op_data->object_state, pg); - free_object_state(pg, &op_data->object_state); + remove_object_from_state(op_data->oid, &op_data->object_state, pg); + deref_object_state(pg, &op_data->object_state, true); } pg.total_count--; cur_op->reply.hdr.retval = 0; diff --git a/src/osd_primary_write.cpp b/src/osd_primary_write.cpp index a2dee57a..c1c08d9e 100644 --- a/src/osd_primary_write.cpp +++ b/src/osd_primary_write.cpp @@ -59,6 +59,11 @@ resume_1: // Missing chunks are allowed to be overwritten even in incomplete objects // FIXME: Allow to do small writes to the old (degraded/misplaced) OSD set for lower performance impact op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state); + if (op_data->object_state) + { + // Protect object_state from being freed by a parallel read operation changing it + op_data->object_state->ref_count++; + } if (op_data->scheme == POOL_SCHEME_REPLICATED) { // Simplified algorithm @@ -93,12 +98,14 @@ resume_2: resume_3: if (op_data->errors > 0) { + deref_object_state(pg, &op_data->object_state, true); pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode); return; } // Check CAS version if (cur_op->req.rw.version && op_data->fact_ver != (cur_op->req.rw.version-1)) { + deref_object_state(pg, &op_data->object_state, true); cur_op->reply.hdr.retval = -EINTR; cur_op->reply.rw.version = op_data->fact_ver; goto continue_others; @@ -182,6 +189,7 @@ resume_10: // Recheck PG state after reporting history - maybe it's already stopping/restarting if (pg.state & (PG_STOPPING|PG_REPEERING)) { + deref_object_state(pg, &op_data->object_state, true); pg_cancel_write_queue(pg, cur_op, op_data->oid, -EPIPE); return; } @@ -197,6 +205,7 @@ resume_5: } if (op_data->errors > 0) { + deref_object_state(pg, &op_data->object_state, true); pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode); return; } @@ -205,7 +214,7 @@ resume_5: // We must forget the unclean state of the object before deleting it // so the next reads don't accidentally read a deleted version // And it should be done at the same time as the removal of the version override - remove_object_from_state(op_data->oid, op_data->object_state, pg); + remove_object_from_state(op_data->oid, &op_data->object_state, pg); pg.clean_count++; } resume_6: @@ -260,12 +269,12 @@ resume_7: copies_to_delete_after_sync_count++; } } - free_object_state(pg, &op_data->object_state); + deref_object_state(pg, &op_data->object_state, true); } else { submit_primary_del_subops(cur_op, pg.cur_set.data(), pg.pg_size, op_data->object_state->osd_set); - free_object_state(pg, &op_data->object_state); + deref_object_state(pg, &op_data->object_state, true); if (op_data->n_subops > 0) { resume_8: