From 621219544030f78b3f55fbc1f862a98e51d8106d Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sat, 4 Apr 2020 02:18:29 +0300 Subject: [PATCH] Implement parallel recovery --- osd.cpp | 3 + osd.h | 13 ++- osd_flush.cpp | 242 ++++++++++++++++++++++++++++-------------------- osd_peering.cpp | 1 + 4 files changed, 154 insertions(+), 105 deletions(-) diff --git a/osd.cpp b/osd.cpp index 175a03cf..4def2b6d 100644 --- a/osd.cpp +++ b/osd.cpp @@ -103,6 +103,9 @@ void osd_t::parse_config(blockstore_config_t & config) autosync_interval = strtoull(config["autosync_interval"].c_str(), NULL, 10); if (autosync_interval < 0 || autosync_interval > MAX_AUTOSYNC_INTERVAL) autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; + recovery_queue_depth = strtoull(config["recovery_queue_depth"].c_str(), NULL, 10); + if (recovery_queue_depth < 1 || recovery_queue_depth > MAX_RECOVERY_QUEUE) + recovery_queue_depth = DEFAULT_RECOVERY_QUEUE; } void osd_t::bind_socket() diff --git a/osd.h b/osd.h index 2a6e6bcd..21aeb861 100644 --- a/osd.h +++ b/osd.h @@ -44,6 +44,8 @@ #define MAX_AUTOSYNC_INTERVAL 3600 #define DEFAULT_AUTOSYNC_INTERVAL 5 +#define MAX_RECOVERY_QUEUE 2048 +#define DEFAULT_RECOVERY_QUEUE 4 //#define OSD_STUB @@ -172,12 +174,13 @@ struct osd_object_id_t object_id oid; }; -struct osd_recovery_state_t +struct osd_recovery_op_t { int st = 0; + bool degraded = false; pg_num_t pg_num = 0; object_id oid = { 0 }; - osd_op_t *op = NULL; + osd_op_t *osd_op = NULL; }; class osd_t @@ -195,6 +198,7 @@ class osd_t int receive_buffer_size = 9000; int immediate_commit = IMMEDIATE_NONE; int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // sync every 5 seconds + int recovery_queue_depth = DEFAULT_RECOVERY_QUEUE; // peer OSDs @@ -205,7 +209,7 @@ class osd_t int peering_state = 0; unsigned pg_count = 0; uint64_t next_subop_id = 1; - osd_recovery_state_t recovery_state; + std::map recovery_ops; osd_op_t *autosync_op = NULL; // Unstable writes @@ -273,7 +277,10 @@ class osd_t void submit_pg_flush_ops(pg_num_t pg_num); void handle_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t osd_num, bool ok); void submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t osd_num, int count, obj_ver_id *data); + bool pick_next_recovery(osd_recovery_op_t &op); + void submit_recovery_op(osd_recovery_op_t *op); bool continue_recovery(); + pg_osd_set_state_t* change_osd_set(pg_osd_set_state_t *st, pg_t *pg); // op execution void exec_op(osd_op_t *cur_op); diff --git a/osd_flush.cpp b/osd_flush.cpp index 711f2eec..657ff5eb 100644 --- a/osd_flush.cpp +++ b/osd_flush.cpp @@ -186,139 +186,177 @@ void osd_t::submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback } } -// Just trigger write requests for degraded objects. They'll be recovered during writing -bool osd_t::continue_recovery() +bool osd_t::pick_next_recovery(osd_recovery_op_t &op) { - pg_t *pg = NULL; - if (recovery_state.st == 0) goto resume_0; - else if (recovery_state.st == 1) goto resume_1; - else if (recovery_state.st == 2) goto resume_2; - else if (recovery_state.st == 3) goto resume_3; - else if (recovery_state.st == 4) goto resume_4; -resume_0: - for (auto p: pgs) + for (auto pg_it = pgs.begin(); pg_it != pgs.end(); pg_it++) { - if (p.second.state & PG_HAS_DEGRADED) + if ((pg_it->second.state & (PG_ACTIVE | PG_HAS_DEGRADED)) == (PG_ACTIVE | PG_HAS_DEGRADED)) { - recovery_state.pg_num = p.first; - goto resume_1; + for (auto obj_it = pg_it->second.degraded_objects.begin(); obj_it != pg_it->second.degraded_objects.end(); obj_it++) + { + if (recovery_ops.find(obj_it->first) == recovery_ops.end()) + { + op.degraded = true; + op.pg_num = pg_it->first; + op.oid = obj_it->first; + return true; + } + } } } - recovery_state.st = 0; return false; -resume_1: - pg = &pgs[recovery_state.pg_num]; - if (!pg->degraded_objects.size()) - { - pg->state = pg->state & ~PG_HAS_DEGRADED; - pg->print_state(); - goto resume_0; - } - recovery_state.oid = pg->degraded_objects.begin()->first; - recovery_state.op = new osd_op_t(); - recovery_state.op->op_type = OSD_OP_OUT; - recovery_state.op->req = { +} + +void osd_t::submit_recovery_op(osd_recovery_op_t *op) +{ + op->osd_op = new osd_op_t(); + op->osd_op->op_type = OSD_OP_OUT; + op->osd_op->req = { .rw = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, .id = 1, .opcode = OSD_OP_WRITE, }, - .inode = recovery_state.oid.inode, - .offset = recovery_state.oid.stripe, + .inode = op->oid.inode, + .offset = op->oid.stripe, .len = 0, }, }; - recovery_state.op->callback = [this](osd_op_t *op) + op->osd_op->callback = [this, op](osd_op_t *osd_op) { - if (op->reply.hdr.retval < 0) - recovery_state.st += 1; // error + // Don't sync the write, it will be synced by our regular sync coroutine + if (osd_op->reply.hdr.retval < 0) + { + // Error recovering object + if (osd_op->reply.hdr.retval == -EPIPE) + { + // PG is stopped or one of the OSDs is gone, error is harmless + } + else + { + throw std::runtime_error("Failed to recover an object"); + } + } else - recovery_state.st += 2; // ok + { + pg_t *pg = &pgs[op->pg_num]; + pg_osd_set_state_t *st; + if (op->degraded) + { + auto st_it = pg->degraded_objects.find(op->oid); + st = st_it->second; + pg->degraded_objects.erase(st_it); + degraded_objects--; + if (!pg->degraded_objects.size()) + { + pg->state = pg->state & ~PG_HAS_DEGRADED; + pg->print_state(); + } + } + else + { + auto st_it = pg->misplaced_objects.find(op->oid); + st = st_it->second; + pg->misplaced_objects.erase(st_it); + misplaced_objects--; + if (!pg->misplaced_objects.size()) + { + pg->state = pg->state & ~PG_HAS_MISPLACED; + pg->print_state(); + } + } + if (st->state == OBJ_DEGRADED) + { + pg->clean_count++; + } + else + { + assert(st->state == (OBJ_DEGRADED|OBJ_MISPLACED)); + pg->misplaced_objects[op->oid] = change_osd_set(st, pg); + } + st->object_count--; + if (!st->object_count) + { + pg->state_dict.erase(st->osd_set); + } + } + recovery_ops.erase(op->oid); + delete osd_op; + op->osd_op = NULL; continue_recovery(); }; - exec_op(recovery_state.op); - recovery_state.st = 2; -resume_2: - return true; -resume_3: - // FIXME handle error - throw std::runtime_error("failed to recover an object"); -resume_4: - delete recovery_state.op; - recovery_state.op = NULL; - // Don't sync the write, it will be synced by our regular sync coroutine - pg = &pgs[recovery_state.pg_num]; - pg_osd_set_state_t *st; + exec_op(op->osd_op); +} + +// Just trigger write requests for degraded objects. They'll be recovered during writing +bool osd_t::continue_recovery() +{ + while (recovery_ops.size() < recovery_queue_depth) { - auto st_it = pg->degraded_objects.find(recovery_state.oid); - st = st_it->second; - pg->degraded_objects.erase(st_it); - degraded_objects--; - } - st->object_count--; - if (st->state == OBJ_DEGRADED) - { - pg->clean_count++; - } - else - { - assert(st->state == (OBJ_DEGRADED|OBJ_MISPLACED)); - pg_osd_set_state_t *new_st; - pg_osd_set_t new_set(st->osd_set); - for (uint64_t role = 0; role < pg->pg_size; role++) + osd_recovery_op_t op; + if (pick_next_recovery(op)) { - if (pg->cur_set[role] != 0) + recovery_ops[op.oid] = op; + submit_recovery_op(&recovery_ops[op.oid]); + } + else + return false; + } + return true; +} + +// This is likely not needed at all, because we'll always recover objects to the clean state +pg_osd_set_state_t* osd_t::change_osd_set(pg_osd_set_state_t *st, pg_t *pg) +{ + pg_osd_set_state_t *new_st; + pg_osd_set_t new_set(st->osd_set); + for (uint64_t role = 0; role < pg->pg_size; role++) + { + if (pg->cur_set[role] != 0) + { + // Maintain order (outdated -> role -> osd_num) + int added = 0; + for (int j = 0; j < new_set.size(); j++) { - // Maintain order (outdated -> role -> osd_num) - int added = 0; - for (int j = 0; j < new_set.size(); j++) + if (new_set[j].role == role && new_set[j].osd_num == pg->cur_set[role]) { - if (new_set[j].role == role && new_set[j].osd_num == pg->cur_set[role]) + if (new_set[j].outdated) { - if (new_set[j].outdated) + if (!added) + new_set[j].outdated = false; + else { - if (!added) - new_set[j].outdated = false; - else - { - new_set.erase(new_set.begin()+j); - j--; - } + new_set.erase(new_set.begin()+j); + j--; } - break; - } - else if (!added && (new_set[j].outdated || new_set[j].role > role || - new_set[j].role == role && new_set[j].osd_num > pg->cur_set[role])) - { - new_set.insert(new_set.begin()+j, (pg_obj_loc_t){ - .role = role, - .osd_num = pg->cur_set[role], - .outdated = false, - }); - added = 1; } + break; + } + else if (!added && (new_set[j].outdated || new_set[j].role > role || + new_set[j].role == role && new_set[j].osd_num > pg->cur_set[role])) + { + new_set.insert(new_set.begin()+j, (pg_obj_loc_t){ + .role = role, + .osd_num = pg->cur_set[role], + .outdated = false, + }); + added = 1; } } } - auto st_it = pg->state_dict.find(new_set); - if (st_it != pg->state_dict.end()) - { - st_it = pg->state_dict.emplace(new_set, (pg_osd_set_state_t){ - .read_target = pg->cur_set, - .osd_set = new_set, - .state = OBJ_MISPLACED, - .object_count = 0, - }).first; - } - new_st = &st_it->second; - new_st->object_count++; - pg->misplaced_objects[recovery_state.oid] = new_st; } - if (!st->object_count) + auto st_it = pg->state_dict.find(new_set); + if (st_it != pg->state_dict.end()) { - pg->state_dict.erase(st->osd_set); + st_it = pg->state_dict.emplace(new_set, (pg_osd_set_state_t){ + .read_target = pg->cur_set, + .osd_set = new_set, + .state = OBJ_MISPLACED, + .object_count = 0, + }).first; } - recovery_state.st = 0; - goto resume_0; + new_st = &st_it->second; + new_st->object_count++; + return new_st; } diff --git a/osd_peering.cpp b/osd_peering.cpp index ade12687..a0ea98d3 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -185,6 +185,7 @@ void osd_t::handle_peers() p.second.calc_object_states(); incomplete_objects += p.second.incomplete_objects.size(); misplaced_objects += p.second.misplaced_objects.size(); + // FIXME: degraded objects may currently include misplaced, too! Report them separately? degraded_objects += p.second.degraded_objects.size(); if (p.second.state & PG_HAS_UNCLEAN) peering_state = peering_state | OSD_FLUSHING_PGS;