diff --git a/osd.h b/osd.h index 7e13428b..af1bcf8c 100644 --- a/osd.h +++ b/osd.h @@ -36,6 +36,7 @@ #define OSD_CONNECTING_PEERS 1 #define OSD_PEERING_PGS 2 #define OSD_FLUSHING_PGS 4 +#define OSD_RECOVERING 8 #define IMMEDIATE_NONE 0 #define IMMEDIATE_SMALL 1 @@ -166,6 +167,14 @@ struct osd_object_id_t object_id oid; }; +struct osd_recovery_state_t +{ + int st; + pg_num_t pg_num; + object_id oid; + osd_op_t *op; +}; + class osd_t { // config @@ -188,6 +197,7 @@ class osd_t int peering_state = 0; unsigned pg_count = 0; uint64_t next_subop_id = 1; + osd_recovery_state_t *recovery_state; // Unstable writes std::map unstable_writes; @@ -244,9 +254,12 @@ class osd_t void handle_peers(); void repeer_pgs(osd_num_t osd_num, bool is_connected); void start_pg_peering(pg_num_t pg_num); + + // flushing, recovery and backfill void submit_pg_flush_ops(pg_num_t pg_num); void handle_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t osd_num, bool ok); void submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t osd_num, int count, obj_ver_id *data); + bool continue_recovery(); // op execution void exec_op(osd_op_t *cur_op); diff --git a/osd_flush.cpp b/osd_flush.cpp index 241c47c4..c63120e4 100644 --- a/osd_flush.cpp +++ b/osd_flush.cpp @@ -185,3 +185,138 @@ void osd_t::submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback outbox_push(clients[peer_fd], op); } } + +// Just trigger write requests for degraded objects. They'll be recovered during writing +bool osd_t::continue_recovery() +{ + pg_t *pg = NULL; + if (recovery_state->st == 0) goto resume_0; + else if (recovery_state->st == 1) goto resume_1; + else if (recovery_state->st == 2) goto resume_2; + else if (recovery_state->st == 3) goto resume_3; + else if (recovery_state->st == 4) goto resume_4; +resume_0: + for (auto p: pgs) + { + if (p.second.state & PG_HAS_DEGRADED) + { + recovery_state->pg_num = p.first; + goto resume_1; + } + } + recovery_state->st = 0; + return false; +resume_1: + pg = &pgs[recovery_state->pg_num]; + if (!pg->degraded_objects.size()) + { + pg->state = pg->state & ~PG_HAS_DEGRADED; + goto resume_0; + } + recovery_state->oid = pg->degraded_objects.begin()->first; + recovery_state->op = new osd_op_t(); + recovery_state->op->op_type = OSD_OP_OUT; + recovery_state->op->req = { + .rw = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = 0, + .opcode = OSD_OP_WRITE, + }, + .inode = recovery_state->oid.inode, + .offset = recovery_state->oid.stripe, + .len = 0, + }, + }; + recovery_state->op->callback = [this](osd_op_t *op) + { + if (op->reply.hdr.retval < 0) + recovery_state->st += 1; // error + else + recovery_state->st += 2; // ok + continue_recovery(); + }; + exec_op(recovery_state->op); + recovery_state->st = 2; +resume_2: + return true; +resume_3: + // FIXME handle error + throw std::runtime_error("failed to recover an object"); +resume_4: + delete recovery_state->op; + recovery_state->op = NULL; + // Don't sync the write, it will be synced by our regular sync coroutine + pg = &pgs[recovery_state->pg_num]; + pg_osd_set_state_t *st; + { + auto st_it = pg->degraded_objects.find(recovery_state->oid); + st = st_it->second; + pg->degraded_objects.erase(st_it); + } + st->object_count--; + if (st->state == OBJ_DEGRADED) + { + pg->clean_count++; + } + else + { + assert(st->state == (OBJ_DEGRADED|OBJ_MISPLACED)); + pg_osd_set_state_t *new_st; + pg_osd_set_t new_set(st->osd_set); + for (uint64_t role = 0; role < pg->pg_size; role++) + { + if (pg->cur_set[role] != 0) + { + // Maintain order (outdated -> role -> osd_num) + int added = 0; + for (int j = 0; j < new_set.size(); j++) + { + if (new_set[j].role == role && new_set[j].osd_num == pg->cur_set[role]) + { + if (new_set[j].outdated) + { + if (!added) + new_set[j].outdated = false; + else + { + new_set.erase(new_set.begin()+j); + j--; + } + } + break; + } + else if (!added && (new_set[j].outdated || new_set[j].role > role || + new_set[j].role == role && new_set[j].osd_num > pg->cur_set[role])) + { + new_set.insert(new_set.begin()+j, (pg_obj_loc_t){ + .role = role, + .osd_num = pg->cur_set[role], + .outdated = false, + }); + added = 1; + } + } + } + } + auto st_it = pg->state_dict.find(new_set); + if (st_it != pg->state_dict.end()) + { + st_it = pg->state_dict.emplace(new_set, (pg_osd_set_state_t){ + .read_target = pg->cur_set, + .osd_set = new_set, + .state = OBJ_MISPLACED, + .object_count = 0, + }).first; + } + new_st = &st_it->second; + new_st->object_count++; + pg->misplaced_objects[recovery_state->oid] = new_st; + } + if (!st->object_count) + { + pg->state_dict.erase(st->osd_set); + } + recovery_state->st = 0; + goto resume_0; +} diff --git a/osd_peering.cpp b/osd_peering.cpp index 1076fdf0..30c83597 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -209,7 +209,14 @@ void osd_t::handle_peers() } if (!still) { - peering_state = peering_state & ~OSD_FLUSHING_PGS; + peering_state = peering_state & ~OSD_FLUSHING_PGS | OSD_RECOVERING; + } + } + if (peering_state & OSD_RECOVERING) + { + if (!continue_recovery()) + { + peering_state = peering_state & ~OSD_RECOVERING; } } } @@ -249,7 +256,8 @@ void osd_t::start_pg_peering(pg_num_t pg_num) pg.state = PG_PEERING; pg.print_state(); pg.state_dict.clear(); - pg.obj_states.clear(); + pg.misplaced_objects.clear(); + pg.degraded_objects.clear(); pg.ver_override.clear(); pg.flush_actions.clear(); if (pg.flush_batch) diff --git a/osd_peering_pg.cpp b/osd_peering_pg.cpp index 48ccfd66..5d1289aa 100644 --- a/osd_peering_pg.cpp +++ b/osd_peering_pg.cpp @@ -10,11 +10,15 @@ struct obj_ver_role inline bool operator < (const obj_ver_role & a, const obj_ver_role & b) { - // ORDER BY inode ASC, stripe & ~STRIPE_MASK ASC, version DESC, osd_num ASC + // ORDER BY inode ASC, stripe & ~STRIPE_MASK ASC, version DESC, role ASC, osd_num ASC return a.oid.inode < b.oid.inode || a.oid.inode == b.oid.inode && ( (a.oid.stripe & ~STRIPE_MASK) < (b.oid.stripe & ~STRIPE_MASK) || (a.oid.stripe & ~STRIPE_MASK) == (b.oid.stripe & ~STRIPE_MASK) && ( - a.version > b.version || a.version == b.version && a.osd_num < b.osd_num + a.version > b.version || + a.version == b.version && ( + a.oid.stripe < b.oid.stripe || + a.oid.stripe == b.oid.stripe && a.osd_num < b.osd_num + ) ) ); } @@ -142,42 +146,13 @@ void pg_obj_state_check_t::finish_object() } obj_end = list_pos; // Remember the decision - uint64_t state = OBJ_CLEAN; + uint64_t state = 0; if (n_buggy > 0) { state = OBJ_BUGGY; // FIXME: bring pg offline throw std::runtime_error("buggy object state"); } - if (target_ver > 0) - { - if (n_roles < pg->pg_minsize) - { - printf("Object is incomplete: inode=%lu stripe=%lu version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver); - for (int i = ver_start; i < ver_end; i++) - { - printf("Present on: osd %lu, role %ld%s\n", list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : ""); - } - state = OBJ_INCOMPLETE; - pg->state = pg->state | PG_HAS_INCOMPLETE; - } - else if (n_roles < pg->pg_cursize) - { - printf("Object is degraded: inode=%lu stripe=%lu version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver); - for (int i = ver_start; i < ver_end; i++) - { - printf("Present on: osd %lu, role %ld%s\n", list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : ""); - } - state = OBJ_DEGRADED; - pg->state = pg->state | PG_HAS_DEGRADED; - } - if (n_mismatched > 0) - { - state |= OBJ_MISPLACED; - pg->state = pg->state | PG_HAS_MISPLACED; - } - pg->total_count++; - } if (n_unstable > 0) { pg->state |= PG_HAS_UNCLEAN; @@ -207,20 +182,48 @@ void pg_obj_state_check_t::finish_object() // osd_set doesn't include rollback/stable states, so don't include them in the state code either if (pcs.max_ver > target_ver) { - //state |= OBJ_NEEDS_ROLLBACK; act.rollback = true; act.rollback_to = pcs.max_target; } if (pcs.stable_ver < (pcs.max_ver > target_ver ? pcs.max_target : pcs.max_ver)) { - //state |= OBJ_NEEDS_STABLE; act.make_stable = true; act.stable_to = pcs.max_ver > target_ver ? pcs.max_target : pcs.max_ver; } } } } - if (state != OBJ_CLEAN || ver_end < obj_end) + if (!target_ver) + { + return; + } + if (n_roles < pg->pg_minsize) + { + printf("Object is incomplete: inode=%lu stripe=%lu version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver); + for (int i = ver_start; i < ver_end; i++) + { + printf("Present on: osd %lu, role %ld%s\n", list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : ""); + } + state = OBJ_INCOMPLETE; + pg->state = pg->state | PG_HAS_INCOMPLETE; + } + else if (n_roles < pg->pg_cursize) + { + printf("Object is degraded: inode=%lu stripe=%lu version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver); + for (int i = ver_start; i < ver_end; i++) + { + printf("Present on: osd %lu, role %ld%s\n", list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : ""); + } + state = OBJ_DEGRADED; + pg->state = pg->state | PG_HAS_DEGRADED; + } + if (n_mismatched > 0) + { + state |= OBJ_MISPLACED; + pg->state = pg->state | PG_HAS_MISPLACED; + } + pg->total_count++; + if (state != 0 || ver_end < obj_end) { osd_set.clear(); for (int i = ver_start; i < ver_end; i++) @@ -261,7 +264,7 @@ void pg_obj_state_check_t::finish_object() { pg->ver_override[oid] = target_ver; } - if (state == OBJ_CLEAN) + if (state == 0) { pg->clean_count++; } @@ -295,7 +298,18 @@ void pg_obj_state_check_t::finish_object() { it->second.object_count++; } - pg->obj_states[oid] = &it->second; + if (state & OBJ_INCOMPLETE) + { + pg->incomplete_objects[oid] = &it->second; + } + else if (state & OBJ_DEGRADED) + { + pg->degraded_objects[oid] = &it->second; + } + else + { + pg->misplaced_objects[oid] = &it->second; + } } } diff --git a/osd_peering_pg.h b/osd_peering_pg.h index b713966d..60ed31a3 100644 --- a/osd_peering_pg.h +++ b/osd_peering_pg.h @@ -25,7 +25,6 @@ #define STRIPE_MASK ((uint64_t)4096 - 1) // OSD object states -#define OBJ_CLEAN 0x01 #define OBJ_DEGRADED 0x02 #define OBJ_INCOMPLETE 0x04 #define OBJ_MISPLACED 0x08 @@ -106,7 +105,7 @@ struct pg_t // it may consume up to ~ (raw storage / object size) * 24 bytes in the worst case scenario // which is up to ~192 MB per 1 TB in the worst case scenario std::map state_dict; - btree::btree_map obj_states; + btree::btree_map incomplete_objects, misplaced_objects, degraded_objects; std::map flush_actions; btree::btree_map ver_override; pg_peering_state_t *peering_state = NULL; @@ -121,9 +120,9 @@ struct pg_t inline bool operator < (const pg_obj_loc_t &a, const pg_obj_loc_t &b) { - return a.role < b.role || - a.role == b.role && a.outdated < b.outdated || - a.role == b.role && a.outdated == b.outdated && a.osd_num < b.osd_num; + return a.outdated < b.outdated || + a.outdated == b.outdated && a.role < b.role || + a.outdated == b.outdated && a.role == b.role && a.osd_num < b.osd_num; } inline bool operator == (const obj_piece_id_t & a, const obj_piece_id_t & b) diff --git a/osd_primary.cpp b/osd_primary.cpp index b3cd58b8..5abca5a9 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -31,6 +31,7 @@ struct osd_primary_op_data_t int degraded = 0, pg_size, pg_minsize; osd_rmw_stripe_t *stripes; osd_op_t *subops = NULL; + void *recovery_buf = NULL; // for sync. oops, requires freeing std::vector *unstable_write_osds = NULL; obj_ver_id *unstable_writes = NULL; @@ -100,6 +101,30 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op) return true; } +uint64_t* get_object_osd_set(pg_t &pg, object_id &oid, uint64_t *def) +{ + if (!(pg.state & (PG_HAS_INCOMPLETE | PG_HAS_DEGRADED | PG_HAS_MISPLACED))) + { + return def; + } + auto st_it = pg.incomplete_objects.find(oid); + if (st_it != pg.incomplete_objects.end()) + { + return st_it->second->read_target.data(); + } + st_it = pg.degraded_objects.find(oid); + if (st_it != pg.degraded_objects.end()) + { + return st_it->second->read_target.data(); + } + st_it = pg.misplaced_objects.find(oid); + if (st_it != pg.misplaced_objects.end()) + { + return st_it->second->read_target.data(); + } + return def; +} + void osd_t::continue_primary_read(osd_op_t *cur_op) { if (!cur_op->op_data && !prepare_primary_rw(cur_op)) @@ -130,10 +155,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op) else { // PG may be degraded or have misplaced objects - auto st_it = pg.obj_states.find(op_data->oid); - uint64_t* cur_set = (st_it != pg.obj_states.end() - ? st_it->second->read_target.data() - : pg.cur_set.data()); + uint64_t* cur_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data()); if (extend_missing_stripes(op_data->stripes, cur_set, pg.pg_minsize, pg.pg_size) < 0) { free(op_data); @@ -335,6 +357,8 @@ void osd_t::continue_primary_write(osd_op_t *cur_op) else if (op_data->st == 5) goto resume_5; else if (op_data->st == 6) goto resume_6; else if (op_data->st == 7) goto resume_7; + else if (op_data->st == 8) goto resume_8; + else if (op_data->st == 9) goto resume_9; assert(op_data->st == 0); // Check if actions are pending for this object { @@ -361,6 +385,41 @@ void osd_t::continue_primary_write(osd_op_t *cur_op) } pg.write_queue.emplace(op_data->oid, cur_op); } + if (pg.state != PG_ACTIVE) + { + // If the object is degraded, read and write the whole object + auto st_it = pg.degraded_objects.find(op_data->oid); + if (st_it != pg.degraded_objects.end()) + { + uint64_t* cur_set = st_it->second->read_target.data(); + for (int i = 0; i < pg.pg_minsize; i++) + { + op_data->stripes[i] = { + .req_start = 0, + .req_end = bs_block_size, + .read_start = 0, + .read_end = bs_block_size, + }; + } + assert(extend_missing_stripes(op_data->stripes, cur_set, pg.pg_minsize, pg.pg_size) >= 0); + op_data->degraded = 1; + op_data->recovery_buf = alloc_read_buffer(op_data->stripes, pg.pg_size, 0); + submit_primary_subops(SUBMIT_READ, pg.pg_size, cur_set, cur_op); + op_data->st = 8; + return; + } + } + goto resume_1; +resume_8: + return; +resume_9: + memcpy( + op_data->recovery_buf + cur_op->req.rw.offset - op_data->oid.stripe, + cur_op->buf, cur_op->req.rw.len + ); + free(cur_op->buf); + cur_op->buf = op_data->recovery_buf; + op_data->recovery_buf = NULL; resume_1: // Determine blocks to read cur_op->rmw_buf = calc_rmw_reads(cur_op->buf, op_data->stripes, pg.cur_set.data(), pg.pg_size, pg.pg_minsize, pg.pg_cursize);