Implement basic degraded object recovery (integrated into primary_write)
parent
dbd8418798
commit
250f22c0b6
13
osd.h
13
osd.h
|
@ -36,6 +36,7 @@
|
|||
#define OSD_CONNECTING_PEERS 1
|
||||
#define OSD_PEERING_PGS 2
|
||||
#define OSD_FLUSHING_PGS 4
|
||||
#define OSD_RECOVERING 8
|
||||
|
||||
#define IMMEDIATE_NONE 0
|
||||
#define IMMEDIATE_SMALL 1
|
||||
|
@ -166,6 +167,14 @@ struct osd_object_id_t
|
|||
object_id oid;
|
||||
};
|
||||
|
||||
struct osd_recovery_state_t
|
||||
{
|
||||
int st;
|
||||
pg_num_t pg_num;
|
||||
object_id oid;
|
||||
osd_op_t *op;
|
||||
};
|
||||
|
||||
class osd_t
|
||||
{
|
||||
// config
|
||||
|
@ -188,6 +197,7 @@ class osd_t
|
|||
int peering_state = 0;
|
||||
unsigned pg_count = 0;
|
||||
uint64_t next_subop_id = 1;
|
||||
osd_recovery_state_t *recovery_state;
|
||||
|
||||
// Unstable writes
|
||||
std::map<osd_object_id_t, uint64_t> unstable_writes;
|
||||
|
@ -244,9 +254,12 @@ class osd_t
|
|||
void handle_peers();
|
||||
void repeer_pgs(osd_num_t osd_num, bool is_connected);
|
||||
void start_pg_peering(pg_num_t pg_num);
|
||||
|
||||
// flushing, recovery and backfill
|
||||
void submit_pg_flush_ops(pg_num_t pg_num);
|
||||
void handle_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t osd_num, bool ok);
|
||||
void submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t osd_num, int count, obj_ver_id *data);
|
||||
bool continue_recovery();
|
||||
|
||||
// op execution
|
||||
void exec_op(osd_op_t *cur_op);
|
||||
|
|
135
osd_flush.cpp
135
osd_flush.cpp
|
@ -185,3 +185,138 @@ void osd_t::submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback
|
|||
outbox_push(clients[peer_fd], op);
|
||||
}
|
||||
}
|
||||
|
||||
// Just trigger write requests for degraded objects. They'll be recovered during writing
|
||||
bool osd_t::continue_recovery()
|
||||
{
|
||||
pg_t *pg = NULL;
|
||||
if (recovery_state->st == 0) goto resume_0;
|
||||
else if (recovery_state->st == 1) goto resume_1;
|
||||
else if (recovery_state->st == 2) goto resume_2;
|
||||
else if (recovery_state->st == 3) goto resume_3;
|
||||
else if (recovery_state->st == 4) goto resume_4;
|
||||
resume_0:
|
||||
for (auto p: pgs)
|
||||
{
|
||||
if (p.second.state & PG_HAS_DEGRADED)
|
||||
{
|
||||
recovery_state->pg_num = p.first;
|
||||
goto resume_1;
|
||||
}
|
||||
}
|
||||
recovery_state->st = 0;
|
||||
return false;
|
||||
resume_1:
|
||||
pg = &pgs[recovery_state->pg_num];
|
||||
if (!pg->degraded_objects.size())
|
||||
{
|
||||
pg->state = pg->state & ~PG_HAS_DEGRADED;
|
||||
goto resume_0;
|
||||
}
|
||||
recovery_state->oid = pg->degraded_objects.begin()->first;
|
||||
recovery_state->op = new osd_op_t();
|
||||
recovery_state->op->op_type = OSD_OP_OUT;
|
||||
recovery_state->op->req = {
|
||||
.rw = {
|
||||
.header = {
|
||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||
.id = 0,
|
||||
.opcode = OSD_OP_WRITE,
|
||||
},
|
||||
.inode = recovery_state->oid.inode,
|
||||
.offset = recovery_state->oid.stripe,
|
||||
.len = 0,
|
||||
},
|
||||
};
|
||||
recovery_state->op->callback = [this](osd_op_t *op)
|
||||
{
|
||||
if (op->reply.hdr.retval < 0)
|
||||
recovery_state->st += 1; // error
|
||||
else
|
||||
recovery_state->st += 2; // ok
|
||||
continue_recovery();
|
||||
};
|
||||
exec_op(recovery_state->op);
|
||||
recovery_state->st = 2;
|
||||
resume_2:
|
||||
return true;
|
||||
resume_3:
|
||||
// FIXME handle error
|
||||
throw std::runtime_error("failed to recover an object");
|
||||
resume_4:
|
||||
delete recovery_state->op;
|
||||
recovery_state->op = NULL;
|
||||
// Don't sync the write, it will be synced by our regular sync coroutine
|
||||
pg = &pgs[recovery_state->pg_num];
|
||||
pg_osd_set_state_t *st;
|
||||
{
|
||||
auto st_it = pg->degraded_objects.find(recovery_state->oid);
|
||||
st = st_it->second;
|
||||
pg->degraded_objects.erase(st_it);
|
||||
}
|
||||
st->object_count--;
|
||||
if (st->state == OBJ_DEGRADED)
|
||||
{
|
||||
pg->clean_count++;
|
||||
}
|
||||
else
|
||||
{
|
||||
assert(st->state == (OBJ_DEGRADED|OBJ_MISPLACED));
|
||||
pg_osd_set_state_t *new_st;
|
||||
pg_osd_set_t new_set(st->osd_set);
|
||||
for (uint64_t role = 0; role < pg->pg_size; role++)
|
||||
{
|
||||
if (pg->cur_set[role] != 0)
|
||||
{
|
||||
// Maintain order (outdated -> role -> osd_num)
|
||||
int added = 0;
|
||||
for (int j = 0; j < new_set.size(); j++)
|
||||
{
|
||||
if (new_set[j].role == role && new_set[j].osd_num == pg->cur_set[role])
|
||||
{
|
||||
if (new_set[j].outdated)
|
||||
{
|
||||
if (!added)
|
||||
new_set[j].outdated = false;
|
||||
else
|
||||
{
|
||||
new_set.erase(new_set.begin()+j);
|
||||
j--;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
else if (!added && (new_set[j].outdated || new_set[j].role > role ||
|
||||
new_set[j].role == role && new_set[j].osd_num > pg->cur_set[role]))
|
||||
{
|
||||
new_set.insert(new_set.begin()+j, (pg_obj_loc_t){
|
||||
.role = role,
|
||||
.osd_num = pg->cur_set[role],
|
||||
.outdated = false,
|
||||
});
|
||||
added = 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
auto st_it = pg->state_dict.find(new_set);
|
||||
if (st_it != pg->state_dict.end())
|
||||
{
|
||||
st_it = pg->state_dict.emplace(new_set, (pg_osd_set_state_t){
|
||||
.read_target = pg->cur_set,
|
||||
.osd_set = new_set,
|
||||
.state = OBJ_MISPLACED,
|
||||
.object_count = 0,
|
||||
}).first;
|
||||
}
|
||||
new_st = &st_it->second;
|
||||
new_st->object_count++;
|
||||
pg->misplaced_objects[recovery_state->oid] = new_st;
|
||||
}
|
||||
if (!st->object_count)
|
||||
{
|
||||
pg->state_dict.erase(st->osd_set);
|
||||
}
|
||||
recovery_state->st = 0;
|
||||
goto resume_0;
|
||||
}
|
||||
|
|
|
@ -209,7 +209,14 @@ void osd_t::handle_peers()
|
|||
}
|
||||
if (!still)
|
||||
{
|
||||
peering_state = peering_state & ~OSD_FLUSHING_PGS;
|
||||
peering_state = peering_state & ~OSD_FLUSHING_PGS | OSD_RECOVERING;
|
||||
}
|
||||
}
|
||||
if (peering_state & OSD_RECOVERING)
|
||||
{
|
||||
if (!continue_recovery())
|
||||
{
|
||||
peering_state = peering_state & ~OSD_RECOVERING;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -249,7 +256,8 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
|
|||
pg.state = PG_PEERING;
|
||||
pg.print_state();
|
||||
pg.state_dict.clear();
|
||||
pg.obj_states.clear();
|
||||
pg.misplaced_objects.clear();
|
||||
pg.degraded_objects.clear();
|
||||
pg.ver_override.clear();
|
||||
pg.flush_actions.clear();
|
||||
if (pg.flush_batch)
|
||||
|
|
|
@ -10,11 +10,15 @@ struct obj_ver_role
|
|||
|
||||
inline bool operator < (const obj_ver_role & a, const obj_ver_role & b)
|
||||
{
|
||||
// ORDER BY inode ASC, stripe & ~STRIPE_MASK ASC, version DESC, osd_num ASC
|
||||
// ORDER BY inode ASC, stripe & ~STRIPE_MASK ASC, version DESC, role ASC, osd_num ASC
|
||||
return a.oid.inode < b.oid.inode || a.oid.inode == b.oid.inode && (
|
||||
(a.oid.stripe & ~STRIPE_MASK) < (b.oid.stripe & ~STRIPE_MASK) ||
|
||||
(a.oid.stripe & ~STRIPE_MASK) == (b.oid.stripe & ~STRIPE_MASK) && (
|
||||
a.version > b.version || a.version == b.version && a.osd_num < b.osd_num
|
||||
a.version > b.version ||
|
||||
a.version == b.version && (
|
||||
a.oid.stripe < b.oid.stripe ||
|
||||
a.oid.stripe == b.oid.stripe && a.osd_num < b.osd_num
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
@ -142,42 +146,13 @@ void pg_obj_state_check_t::finish_object()
|
|||
}
|
||||
obj_end = list_pos;
|
||||
// Remember the decision
|
||||
uint64_t state = OBJ_CLEAN;
|
||||
uint64_t state = 0;
|
||||
if (n_buggy > 0)
|
||||
{
|
||||
state = OBJ_BUGGY;
|
||||
// FIXME: bring pg offline
|
||||
throw std::runtime_error("buggy object state");
|
||||
}
|
||||
if (target_ver > 0)
|
||||
{
|
||||
if (n_roles < pg->pg_minsize)
|
||||
{
|
||||
printf("Object is incomplete: inode=%lu stripe=%lu version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver);
|
||||
for (int i = ver_start; i < ver_end; i++)
|
||||
{
|
||||
printf("Present on: osd %lu, role %ld%s\n", list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : "");
|
||||
}
|
||||
state = OBJ_INCOMPLETE;
|
||||
pg->state = pg->state | PG_HAS_INCOMPLETE;
|
||||
}
|
||||
else if (n_roles < pg->pg_cursize)
|
||||
{
|
||||
printf("Object is degraded: inode=%lu stripe=%lu version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver);
|
||||
for (int i = ver_start; i < ver_end; i++)
|
||||
{
|
||||
printf("Present on: osd %lu, role %ld%s\n", list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : "");
|
||||
}
|
||||
state = OBJ_DEGRADED;
|
||||
pg->state = pg->state | PG_HAS_DEGRADED;
|
||||
}
|
||||
if (n_mismatched > 0)
|
||||
{
|
||||
state |= OBJ_MISPLACED;
|
||||
pg->state = pg->state | PG_HAS_MISPLACED;
|
||||
}
|
||||
pg->total_count++;
|
||||
}
|
||||
if (n_unstable > 0)
|
||||
{
|
||||
pg->state |= PG_HAS_UNCLEAN;
|
||||
|
@ -207,20 +182,48 @@ void pg_obj_state_check_t::finish_object()
|
|||
// osd_set doesn't include rollback/stable states, so don't include them in the state code either
|
||||
if (pcs.max_ver > target_ver)
|
||||
{
|
||||
//state |= OBJ_NEEDS_ROLLBACK;
|
||||
act.rollback = true;
|
||||
act.rollback_to = pcs.max_target;
|
||||
}
|
||||
if (pcs.stable_ver < (pcs.max_ver > target_ver ? pcs.max_target : pcs.max_ver))
|
||||
{
|
||||
//state |= OBJ_NEEDS_STABLE;
|
||||
act.make_stable = true;
|
||||
act.stable_to = pcs.max_ver > target_ver ? pcs.max_target : pcs.max_ver;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (state != OBJ_CLEAN || ver_end < obj_end)
|
||||
if (!target_ver)
|
||||
{
|
||||
return;
|
||||
}
|
||||
if (n_roles < pg->pg_minsize)
|
||||
{
|
||||
printf("Object is incomplete: inode=%lu stripe=%lu version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver);
|
||||
for (int i = ver_start; i < ver_end; i++)
|
||||
{
|
||||
printf("Present on: osd %lu, role %ld%s\n", list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : "");
|
||||
}
|
||||
state = OBJ_INCOMPLETE;
|
||||
pg->state = pg->state | PG_HAS_INCOMPLETE;
|
||||
}
|
||||
else if (n_roles < pg->pg_cursize)
|
||||
{
|
||||
printf("Object is degraded: inode=%lu stripe=%lu version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver);
|
||||
for (int i = ver_start; i < ver_end; i++)
|
||||
{
|
||||
printf("Present on: osd %lu, role %ld%s\n", list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : "");
|
||||
}
|
||||
state = OBJ_DEGRADED;
|
||||
pg->state = pg->state | PG_HAS_DEGRADED;
|
||||
}
|
||||
if (n_mismatched > 0)
|
||||
{
|
||||
state |= OBJ_MISPLACED;
|
||||
pg->state = pg->state | PG_HAS_MISPLACED;
|
||||
}
|
||||
pg->total_count++;
|
||||
if (state != 0 || ver_end < obj_end)
|
||||
{
|
||||
osd_set.clear();
|
||||
for (int i = ver_start; i < ver_end; i++)
|
||||
|
@ -261,7 +264,7 @@ void pg_obj_state_check_t::finish_object()
|
|||
{
|
||||
pg->ver_override[oid] = target_ver;
|
||||
}
|
||||
if (state == OBJ_CLEAN)
|
||||
if (state == 0)
|
||||
{
|
||||
pg->clean_count++;
|
||||
}
|
||||
|
@ -295,7 +298,18 @@ void pg_obj_state_check_t::finish_object()
|
|||
{
|
||||
it->second.object_count++;
|
||||
}
|
||||
pg->obj_states[oid] = &it->second;
|
||||
if (state & OBJ_INCOMPLETE)
|
||||
{
|
||||
pg->incomplete_objects[oid] = &it->second;
|
||||
}
|
||||
else if (state & OBJ_DEGRADED)
|
||||
{
|
||||
pg->degraded_objects[oid] = &it->second;
|
||||
}
|
||||
else
|
||||
{
|
||||
pg->misplaced_objects[oid] = &it->second;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,6 @@
|
|||
#define STRIPE_MASK ((uint64_t)4096 - 1)
|
||||
|
||||
// OSD object states
|
||||
#define OBJ_CLEAN 0x01
|
||||
#define OBJ_DEGRADED 0x02
|
||||
#define OBJ_INCOMPLETE 0x04
|
||||
#define OBJ_MISPLACED 0x08
|
||||
|
@ -106,7 +105,7 @@ struct pg_t
|
|||
// it may consume up to ~ (raw storage / object size) * 24 bytes in the worst case scenario
|
||||
// which is up to ~192 MB per 1 TB in the worst case scenario
|
||||
std::map<pg_osd_set_t, pg_osd_set_state_t> state_dict;
|
||||
btree::btree_map<object_id, pg_osd_set_state_t*> obj_states;
|
||||
btree::btree_map<object_id, pg_osd_set_state_t*> incomplete_objects, misplaced_objects, degraded_objects;
|
||||
std::map<obj_piece_id_t, flush_action_t> flush_actions;
|
||||
btree::btree_map<object_id, uint64_t> ver_override;
|
||||
pg_peering_state_t *peering_state = NULL;
|
||||
|
@ -121,9 +120,9 @@ struct pg_t
|
|||
|
||||
inline bool operator < (const pg_obj_loc_t &a, const pg_obj_loc_t &b)
|
||||
{
|
||||
return a.role < b.role ||
|
||||
a.role == b.role && a.outdated < b.outdated ||
|
||||
a.role == b.role && a.outdated == b.outdated && a.osd_num < b.osd_num;
|
||||
return a.outdated < b.outdated ||
|
||||
a.outdated == b.outdated && a.role < b.role ||
|
||||
a.outdated == b.outdated && a.role == b.role && a.osd_num < b.osd_num;
|
||||
}
|
||||
|
||||
inline bool operator == (const obj_piece_id_t & a, const obj_piece_id_t & b)
|
||||
|
|
|
@ -31,6 +31,7 @@ struct osd_primary_op_data_t
|
|||
int degraded = 0, pg_size, pg_minsize;
|
||||
osd_rmw_stripe_t *stripes;
|
||||
osd_op_t *subops = NULL;
|
||||
void *recovery_buf = NULL;
|
||||
// for sync. oops, requires freeing
|
||||
std::vector<unstable_osd_num_t> *unstable_write_osds = NULL;
|
||||
obj_ver_id *unstable_writes = NULL;
|
||||
|
@ -100,6 +101,30 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
|
|||
return true;
|
||||
}
|
||||
|
||||
uint64_t* get_object_osd_set(pg_t &pg, object_id &oid, uint64_t *def)
|
||||
{
|
||||
if (!(pg.state & (PG_HAS_INCOMPLETE | PG_HAS_DEGRADED | PG_HAS_MISPLACED)))
|
||||
{
|
||||
return def;
|
||||
}
|
||||
auto st_it = pg.incomplete_objects.find(oid);
|
||||
if (st_it != pg.incomplete_objects.end())
|
||||
{
|
||||
return st_it->second->read_target.data();
|
||||
}
|
||||
st_it = pg.degraded_objects.find(oid);
|
||||
if (st_it != pg.degraded_objects.end())
|
||||
{
|
||||
return st_it->second->read_target.data();
|
||||
}
|
||||
st_it = pg.misplaced_objects.find(oid);
|
||||
if (st_it != pg.misplaced_objects.end())
|
||||
{
|
||||
return st_it->second->read_target.data();
|
||||
}
|
||||
return def;
|
||||
}
|
||||
|
||||
void osd_t::continue_primary_read(osd_op_t *cur_op)
|
||||
{
|
||||
if (!cur_op->op_data && !prepare_primary_rw(cur_op))
|
||||
|
@ -130,10 +155,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op)
|
|||
else
|
||||
{
|
||||
// PG may be degraded or have misplaced objects
|
||||
auto st_it = pg.obj_states.find(op_data->oid);
|
||||
uint64_t* cur_set = (st_it != pg.obj_states.end()
|
||||
? st_it->second->read_target.data()
|
||||
: pg.cur_set.data());
|
||||
uint64_t* cur_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data());
|
||||
if (extend_missing_stripes(op_data->stripes, cur_set, pg.pg_minsize, pg.pg_size) < 0)
|
||||
{
|
||||
free(op_data);
|
||||
|
@ -335,6 +357,8 @@ void osd_t::continue_primary_write(osd_op_t *cur_op)
|
|||
else if (op_data->st == 5) goto resume_5;
|
||||
else if (op_data->st == 6) goto resume_6;
|
||||
else if (op_data->st == 7) goto resume_7;
|
||||
else if (op_data->st == 8) goto resume_8;
|
||||
else if (op_data->st == 9) goto resume_9;
|
||||
assert(op_data->st == 0);
|
||||
// Check if actions are pending for this object
|
||||
{
|
||||
|
@ -361,6 +385,41 @@ void osd_t::continue_primary_write(osd_op_t *cur_op)
|
|||
}
|
||||
pg.write_queue.emplace(op_data->oid, cur_op);
|
||||
}
|
||||
if (pg.state != PG_ACTIVE)
|
||||
{
|
||||
// If the object is degraded, read and write the whole object
|
||||
auto st_it = pg.degraded_objects.find(op_data->oid);
|
||||
if (st_it != pg.degraded_objects.end())
|
||||
{
|
||||
uint64_t* cur_set = st_it->second->read_target.data();
|
||||
for (int i = 0; i < pg.pg_minsize; i++)
|
||||
{
|
||||
op_data->stripes[i] = {
|
||||
.req_start = 0,
|
||||
.req_end = bs_block_size,
|
||||
.read_start = 0,
|
||||
.read_end = bs_block_size,
|
||||
};
|
||||
}
|
||||
assert(extend_missing_stripes(op_data->stripes, cur_set, pg.pg_minsize, pg.pg_size) >= 0);
|
||||
op_data->degraded = 1;
|
||||
op_data->recovery_buf = alloc_read_buffer(op_data->stripes, pg.pg_size, 0);
|
||||
submit_primary_subops(SUBMIT_READ, pg.pg_size, cur_set, cur_op);
|
||||
op_data->st = 8;
|
||||
return;
|
||||
}
|
||||
}
|
||||
goto resume_1;
|
||||
resume_8:
|
||||
return;
|
||||
resume_9:
|
||||
memcpy(
|
||||
op_data->recovery_buf + cur_op->req.rw.offset - op_data->oid.stripe,
|
||||
cur_op->buf, cur_op->req.rw.len
|
||||
);
|
||||
free(cur_op->buf);
|
||||
cur_op->buf = op_data->recovery_buf;
|
||||
op_data->recovery_buf = NULL;
|
||||
resume_1:
|
||||
// Determine blocks to read
|
||||
cur_op->rmw_buf = calc_rmw_reads(cur_op->buf, op_data->stripes, pg.cur_set.data(), pg.pg_size, pg.pg_minsize, pg.pg_cursize);
|
||||
|
|
Loading…
Reference in New Issue