Refcount object_states
parent
a7f63f7c29
commit
e307dd13ed
|
@ -239,8 +239,8 @@ class osd_t
|
|||
void continue_primary_sync(osd_op_t *cur_op);
|
||||
void continue_primary_del(osd_op_t *cur_op);
|
||||
bool check_write_queue(osd_op_t *cur_op, pg_t & pg);
|
||||
void remove_object_from_state(object_id & oid, pg_osd_set_state_t *object_state, pg_t &pg);
|
||||
void free_object_state(pg_t & pg, pg_osd_set_state_t **object_state);
|
||||
void remove_object_from_state(object_id & oid, pg_osd_set_state_t **object_state, pg_t &pg);
|
||||
void deref_object_state(pg_t & pg, pg_osd_set_state_t **object_state, bool deref);
|
||||
bool remember_unstable_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state);
|
||||
void handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op);
|
||||
void handle_primary_bs_subop(osd_op_t *subop);
|
||||
|
|
|
@ -322,67 +322,73 @@ void pg_obj_state_check_t::finish_object()
|
|||
}
|
||||
else
|
||||
{
|
||||
auto it = pg->state_dict.find(osd_set);
|
||||
if (it == pg->state_dict.end())
|
||||
{
|
||||
std::vector<uint64_t> read_target;
|
||||
if (replicated)
|
||||
{
|
||||
for (auto & o: osd_set)
|
||||
{
|
||||
if (!(o.loc_bad & LOC_OUTDATED))
|
||||
{
|
||||
read_target.push_back(o.osd_num);
|
||||
}
|
||||
}
|
||||
while (read_target.size() < pg->pg_size)
|
||||
{
|
||||
// FIXME: This is because we then use .data() and assume it's at least <pg_size> long
|
||||
read_target.push_back(0);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
read_target.resize(pg->pg_size);
|
||||
for (int i = 0; i < pg->pg_size; i++)
|
||||
{
|
||||
read_target[i] = 0;
|
||||
}
|
||||
for (auto & o: osd_set)
|
||||
{
|
||||
if (!(o.loc_bad & LOC_OUTDATED))
|
||||
{
|
||||
read_target[o.role] = o.osd_num;
|
||||
}
|
||||
}
|
||||
}
|
||||
pg->state_dict[osd_set] = {
|
||||
.read_target = read_target,
|
||||
.osd_set = osd_set,
|
||||
.state = state,
|
||||
.object_count = 1,
|
||||
};
|
||||
it = pg->state_dict.find(osd_set);
|
||||
}
|
||||
else
|
||||
{
|
||||
it->second.object_count++;
|
||||
}
|
||||
if (state & OBJ_INCOMPLETE)
|
||||
{
|
||||
pg->incomplete_objects[oid] = &it->second;
|
||||
}
|
||||
else if (state & OBJ_DEGRADED)
|
||||
{
|
||||
pg->degraded_objects[oid] = &it->second;
|
||||
}
|
||||
else
|
||||
{
|
||||
pg->misplaced_objects[oid] = &it->second;
|
||||
}
|
||||
pg->add_object_to_state(oid, state, osd_set);
|
||||
}
|
||||
}
|
||||
|
||||
pg_osd_set_state_t* pg_t::add_object_to_state(const object_id oid, const uint64_t state, const pg_osd_set_t & osd_set)
|
||||
{
|
||||
auto it = state_dict.find(osd_set);
|
||||
if (it == state_dict.end())
|
||||
{
|
||||
std::vector<osd_num_t> read_target;
|
||||
if (scheme == POOL_SCHEME_REPLICATED)
|
||||
{
|
||||
for (auto & o: osd_set)
|
||||
{
|
||||
if (!(o.loc_bad & LOC_OUTDATED))
|
||||
{
|
||||
read_target.push_back(o.osd_num);
|
||||
}
|
||||
}
|
||||
while (read_target.size() < pg_size)
|
||||
{
|
||||
// FIXME: This is because we then use .data() and assume it's at least <pg_size> long
|
||||
read_target.push_back(0);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
read_target.resize(pg_size);
|
||||
for (int i = 0; i < pg_size; i++)
|
||||
{
|
||||
read_target[i] = 0;
|
||||
}
|
||||
for (auto & o: osd_set)
|
||||
{
|
||||
if (!o.loc_bad)
|
||||
{
|
||||
read_target[o.role] = o.osd_num;
|
||||
}
|
||||
}
|
||||
}
|
||||
state_dict[osd_set] = {
|
||||
.read_target = read_target,
|
||||
.osd_set = osd_set,
|
||||
.state = state,
|
||||
.object_count = 1,
|
||||
};
|
||||
it = state_dict.find(osd_set);
|
||||
}
|
||||
else
|
||||
{
|
||||
it->second.object_count++;
|
||||
}
|
||||
if (state & OBJ_INCOMPLETE)
|
||||
{
|
||||
incomplete_objects[oid] = &it->second;
|
||||
}
|
||||
else if (state & OBJ_DEGRADED)
|
||||
{
|
||||
degraded_objects[oid] = &it->second;
|
||||
}
|
||||
else
|
||||
{
|
||||
misplaced_objects[oid] = &it->second;
|
||||
}
|
||||
return &it->second;
|
||||
}
|
||||
|
||||
// FIXME: Write at least some tests for this function
|
||||
void pg_t::calc_object_states(int log_level)
|
||||
{
|
||||
|
|
|
@ -33,6 +33,7 @@ struct pg_osd_set_state_t
|
|||
pg_osd_set_t osd_set;
|
||||
uint64_t state = 0;
|
||||
uint64_t object_count = 0;
|
||||
uint64_t ref_count = 0;
|
||||
};
|
||||
|
||||
struct pg_list_result_t
|
||||
|
@ -120,6 +121,7 @@ struct pg_t
|
|||
int inflight = 0; // including write_queue
|
||||
std::multimap<object_id, osd_op_t*> write_queue;
|
||||
|
||||
pg_osd_set_state_t* add_object_to_state(const object_id oid, const uint64_t state, const pg_osd_set_t & osd_set);
|
||||
void calc_object_states(int log_level);
|
||||
void print_state();
|
||||
};
|
||||
|
|
|
@ -267,9 +267,23 @@ resume_2:
|
|||
}
|
||||
|
||||
// Decrement pg_osd_set_state_t's object_count and change PG state accordingly
|
||||
void osd_t::remove_object_from_state(object_id & oid, pg_osd_set_state_t *object_state, pg_t & pg)
|
||||
void osd_t::remove_object_from_state(object_id & oid, pg_osd_set_state_t **object_state, pg_t & pg)
|
||||
{
|
||||
if (object_state->state & OBJ_INCOMPLETE)
|
||||
if (!*object_state)
|
||||
{
|
||||
return;
|
||||
}
|
||||
pg_osd_set_state_t *recheck_state = NULL;
|
||||
get_object_osd_set(pg, oid, NULL, &recheck_state);
|
||||
if (recheck_state != *object_state)
|
||||
{
|
||||
recheck_state->ref_count++;
|
||||
(*object_state)->ref_count--;
|
||||
*object_state = recheck_state;
|
||||
return;
|
||||
}
|
||||
(*object_state)->object_count--;
|
||||
if ((*object_state)->state & OBJ_INCOMPLETE)
|
||||
{
|
||||
// Successful write means that object is not incomplete anymore
|
||||
this->incomplete_objects--;
|
||||
|
@ -280,7 +294,7 @@ void osd_t::remove_object_from_state(object_id & oid, pg_osd_set_state_t *object
|
|||
report_pg_state(pg);
|
||||
}
|
||||
}
|
||||
else if (object_state->state & OBJ_DEGRADED)
|
||||
else if ((*object_state)->state & OBJ_DEGRADED)
|
||||
{
|
||||
this->degraded_objects--;
|
||||
pg.degraded_objects.erase(oid);
|
||||
|
@ -290,7 +304,7 @@ void osd_t::remove_object_from_state(object_id & oid, pg_osd_set_state_t *object
|
|||
report_pg_state(pg);
|
||||
}
|
||||
}
|
||||
else if (object_state->state & OBJ_MISPLACED)
|
||||
else if ((*object_state)->state & OBJ_MISPLACED)
|
||||
{
|
||||
this->misplaced_objects--;
|
||||
pg.misplaced_objects.erase(oid);
|
||||
|
@ -302,16 +316,23 @@ void osd_t::remove_object_from_state(object_id & oid, pg_osd_set_state_t *object
|
|||
}
|
||||
else
|
||||
{
|
||||
throw std::runtime_error("BUG: Invalid object state: "+std::to_string(object_state->state));
|
||||
throw std::runtime_error("BUG: Invalid object state: "+std::to_string((*object_state)->state));
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::free_object_state(pg_t & pg, pg_osd_set_state_t **object_state)
|
||||
void osd_t::deref_object_state(pg_t & pg, pg_osd_set_state_t **object_state, bool deref)
|
||||
{
|
||||
if (*object_state && !(--(*object_state)->object_count))
|
||||
if (*object_state)
|
||||
{
|
||||
pg.state_dict.erase((*object_state)->osd_set);
|
||||
*object_state = NULL;
|
||||
if (deref)
|
||||
{
|
||||
(*object_state)->ref_count--;
|
||||
}
|
||||
if (!(*object_state)->object_count && !(*object_state)->ref_count)
|
||||
{
|
||||
pg.state_dict.erase((*object_state)->osd_set);
|
||||
*object_state = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -342,6 +363,10 @@ void osd_t::continue_primary_del(osd_op_t *cur_op)
|
|||
resume_1:
|
||||
// Determine which OSDs contain this object and delete it
|
||||
op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state);
|
||||
if (op_data->object_state)
|
||||
{
|
||||
op_data->object_state->ref_count++;
|
||||
}
|
||||
// Submit 1 read to determine the actual version number
|
||||
submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, op_data->prev_set, cur_op);
|
||||
resume_2:
|
||||
|
@ -350,12 +375,14 @@ resume_2:
|
|||
resume_3:
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
deref_object_state(pg, &op_data->object_state, true);
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode);
|
||||
return;
|
||||
}
|
||||
// Check CAS version
|
||||
if (cur_op->req.rw.version && op_data->fact_ver != (cur_op->req.rw.version-1))
|
||||
{
|
||||
deref_object_state(pg, &op_data->object_state, true);
|
||||
cur_op->reply.hdr.retval = -EINTR;
|
||||
cur_op->reply.rw.version = op_data->fact_ver;
|
||||
goto continue_others;
|
||||
|
@ -371,6 +398,7 @@ resume_4:
|
|||
resume_5:
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
deref_object_state(pg, &op_data->object_state, true);
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode);
|
||||
return;
|
||||
}
|
||||
|
@ -383,8 +411,8 @@ resume_5:
|
|||
}
|
||||
else
|
||||
{
|
||||
remove_object_from_state(op_data->oid, op_data->object_state, pg);
|
||||
free_object_state(pg, &op_data->object_state);
|
||||
remove_object_from_state(op_data->oid, &op_data->object_state, pg);
|
||||
deref_object_state(pg, &op_data->object_state, true);
|
||||
}
|
||||
pg.total_count--;
|
||||
cur_op->reply.hdr.retval = 0;
|
||||
|
|
|
@ -59,6 +59,11 @@ resume_1:
|
|||
// Missing chunks are allowed to be overwritten even in incomplete objects
|
||||
// FIXME: Allow to do small writes to the old (degraded/misplaced) OSD set for lower performance impact
|
||||
op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state);
|
||||
if (op_data->object_state)
|
||||
{
|
||||
// Protect object_state from being freed by a parallel read operation changing it
|
||||
op_data->object_state->ref_count++;
|
||||
}
|
||||
if (op_data->scheme == POOL_SCHEME_REPLICATED)
|
||||
{
|
||||
// Simplified algorithm
|
||||
|
@ -93,12 +98,14 @@ resume_2:
|
|||
resume_3:
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
deref_object_state(pg, &op_data->object_state, true);
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode);
|
||||
return;
|
||||
}
|
||||
// Check CAS version
|
||||
if (cur_op->req.rw.version && op_data->fact_ver != (cur_op->req.rw.version-1))
|
||||
{
|
||||
deref_object_state(pg, &op_data->object_state, true);
|
||||
cur_op->reply.hdr.retval = -EINTR;
|
||||
cur_op->reply.rw.version = op_data->fact_ver;
|
||||
goto continue_others;
|
||||
|
@ -182,6 +189,7 @@ resume_10:
|
|||
// Recheck PG state after reporting history - maybe it's already stopping/restarting
|
||||
if (pg.state & (PG_STOPPING|PG_REPEERING))
|
||||
{
|
||||
deref_object_state(pg, &op_data->object_state, true);
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, -EPIPE);
|
||||
return;
|
||||
}
|
||||
|
@ -197,6 +205,7 @@ resume_5:
|
|||
}
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
deref_object_state(pg, &op_data->object_state, true);
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode);
|
||||
return;
|
||||
}
|
||||
|
@ -205,7 +214,7 @@ resume_5:
|
|||
// We must forget the unclean state of the object before deleting it
|
||||
// so the next reads don't accidentally read a deleted version
|
||||
// And it should be done at the same time as the removal of the version override
|
||||
remove_object_from_state(op_data->oid, op_data->object_state, pg);
|
||||
remove_object_from_state(op_data->oid, &op_data->object_state, pg);
|
||||
pg.clean_count++;
|
||||
}
|
||||
resume_6:
|
||||
|
@ -260,12 +269,12 @@ resume_7:
|
|||
copies_to_delete_after_sync_count++;
|
||||
}
|
||||
}
|
||||
free_object_state(pg, &op_data->object_state);
|
||||
deref_object_state(pg, &op_data->object_state, true);
|
||||
}
|
||||
else
|
||||
{
|
||||
submit_primary_del_subops(cur_op, pg.cur_set.data(), pg.pg_size, op_data->object_state->osd_set);
|
||||
free_object_state(pg, &op_data->object_state);
|
||||
deref_object_state(pg, &op_data->object_state, true);
|
||||
if (op_data->n_subops > 0)
|
||||
{
|
||||
resume_8:
|
||||
|
|
Loading…
Reference in New Issue