diff --git a/osd.cpp b/osd.cpp index d925aadff..c39918fea 100644 --- a/osd.cpp +++ b/osd.cpp @@ -332,11 +332,11 @@ void osd_t::exec_op(osd_op_t *cur_op) } else if (cur_op->req.hdr.opcode == OSD_OP_READ) { - exec_primary_read(cur_op); + continue_primary_read(cur_op); } else if (cur_op->req.hdr.opcode == OSD_OP_WRITE) { - exec_primary_write(cur_op); + continue_primary_write(cur_op); } else { diff --git a/osd.h b/osd.h index a8e91532e..264b3dff0 100644 --- a/osd.h +++ b/osd.h @@ -236,12 +236,12 @@ class osd_t // primary ops bool prepare_primary_rw(osd_op_t *cur_op); - void exec_primary_read(osd_op_t *cur_op); - void exec_primary_write(osd_op_t *cur_op); + void continue_primary_read(osd_op_t *cur_op); + void continue_primary_write(osd_op_t *cur_op); void exec_primary_sync(osd_op_t *cur_op); void finish_primary_op(osd_op_t *cur_op, int retval); - void handle_primary_read_subop(osd_op_t *cur_op, int ok); - void submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op); + void handle_primary_subop(osd_op_t *cur_op, int ok, uint64_t version); + void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op); public: osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop); ~osd_t(); diff --git a/osd_primary.cpp b/osd_primary.cpp index 799780888..912549f11 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -1,6 +1,10 @@ #include "osd.h" #include "osd_rmw.h" +#define SUBMIT_READ 0 +#define SUBMIT_RMW_READ 1 +#define SUBMIT_WRITE 2 + // read: read directly or read paired stripe(s), reconstruct, return // write: read paired stripe(s), modify, write // @@ -12,14 +16,15 @@ struct osd_primary_op_data_t { + int st = 0; pg_num_t pg_num; object_id oid; uint64_t target_ver; + uint64_t fact_ver = 0; int n_subops = 0, done = 0, errors = 0; int degraded = 0, pg_size, pg_minsize; osd_rmw_stripe_t *stripes; osd_op_t *subops = NULL; - bool should_read_version = false; }; void osd_t::finish_primary_op(osd_op_t *cur_op, int retval) @@ -71,127 +76,126 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op) return true; } -void osd_t::exec_primary_read(osd_op_t *cur_op) +void osd_t::continue_primary_read(osd_op_t *cur_op) { - if (!prepare_primary_rw(cur_op)) + if (!cur_op->op_data && !prepare_primary_rw(cur_op)) { return; } - osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)cur_op->op_data; - auto & pg = pgs[op_data->pg_num]; - for (int role = 0; role < pg.pg_minsize; role++) + osd_primary_op_data_t *op_data = cur_op->op_data; + if (op_data->st == 1) goto resume_1; + else if (op_data->st == 2) goto resume_2; { - op_data->stripes[role].read_start = op_data->stripes[role].req_start; - op_data->stripes[role].read_end = op_data->stripes[role].req_end; - } - // Determine version - auto vo_it = pg.ver_override.find(op_data->oid); - op_data->target_ver = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX; - if (pg.state == PG_ACTIVE) - { - // Fast happy-path - cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_minsize, 0); - submit_read_subops(pg.pg_minsize, pg.cur_set.data(), cur_op); - cur_op->send_list.push_back(cur_op->buf, cur_op->req.rw.len); - } - else - { - // PG may be degraded or have misplaced objects - auto st_it = pg.obj_states.find(op_data->oid); - uint64_t* cur_set = (st_it != pg.obj_states.end() - ? st_it->second->read_target.data() - : pg.cur_set.data()); - if (extend_missing_stripes(op_data->stripes, cur_set, pg.pg_minsize, pg.pg_size) < 0) + auto & pg = pgs[op_data->pg_num]; + for (int role = 0; role < pg.pg_minsize; role++) { - free(op_data); - finish_primary_op(cur_op, -EIO); - return; + op_data->stripes[role].read_start = op_data->stripes[role].req_start; + op_data->stripes[role].read_end = op_data->stripes[role].req_end; } - // Submit reads - cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_size, 0); - submit_read_subops(pg.pg_size, cur_set, cur_op); - op_data->pg_minsize = pg.pg_minsize; - op_data->pg_size = pg.pg_size; - op_data->degraded = 1; - } -} - -void osd_t::handle_primary_read_subop(osd_op_t *cur_op, int ok) -{ - osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)cur_op->op_data; - if (!ok) - op_data->errors++; - else - op_data->done++; - if ((op_data->errors + op_data->done) >= op_data->n_subops) - { - delete[] op_data->subops; - op_data->subops = NULL; - if (op_data->errors > 0) + // Determine version + auto vo_it = pg.ver_override.find(op_data->oid); + op_data->target_ver = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX; + if (pg.state == PG_ACTIVE) { - free(op_data); - cur_op->op_data = NULL; - finish_primary_op(cur_op, -EIO); - return; + // Fast happy-path + cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_minsize, 0); + submit_primary_subops(SUBMIT_READ, pg.pg_minsize, pg.cur_set.data(), cur_op); + cur_op->send_list.push_back(cur_op->buf, cur_op->req.rw.len); + op_data->st = 1; } - if (op_data->degraded) + else { - // Reconstruct missing stripes - // FIXME: Always EC(k+1) by now. Add different coding schemes - osd_rmw_stripe_t *stripes = op_data->stripes; - for (int role = 0; role < op_data->pg_minsize; role++) + // PG may be degraded or have misplaced objects + auto st_it = pg.obj_states.find(op_data->oid); + uint64_t* cur_set = (st_it != pg.obj_states.end() + ? st_it->second->read_target.data() + : pg.cur_set.data()); + if (extend_missing_stripes(op_data->stripes, cur_set, pg.pg_minsize, pg.pg_size) < 0) { - if (stripes[role].read_end != 0 && stripes[role].missing) - { - reconstruct_stripe(stripes, op_data->pg_size, role); - } - if (stripes[role].req_end != 0) - { - // Send buffer in parts to avoid copying - cur_op->send_list.push_back( - stripes[role].read_buf + (stripes[role].read_start - stripes[role].req_start), stripes[role].req_end - ); - } + free(op_data); + finish_primary_op(cur_op, -EIO); + return; } + // Submit reads + op_data->pg_minsize = pg.pg_minsize; + op_data->pg_size = pg.pg_size; + op_data->degraded = 1; + cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_size, 0); + submit_primary_subops(SUBMIT_READ, pg.pg_size, cur_set, cur_op); + op_data->st = 1; } + } +resume_1: + return; +resume_2: + if (op_data->errors > 0) + { free(op_data); cur_op->op_data = NULL; - finish_primary_op(cur_op, cur_op->req.rw.len); + finish_primary_op(cur_op, -EIO); + return; } + if (op_data->degraded) + { + // Reconstruct missing stripes + // FIXME: Always EC(k+1) by now. Add different coding schemes + osd_rmw_stripe_t *stripes = op_data->stripes; + for (int role = 0; role < op_data->pg_minsize; role++) + { + if (stripes[role].read_end != 0 && stripes[role].missing) + { + reconstruct_stripe(stripes, op_data->pg_size, role); + } + if (stripes[role].req_end != 0) + { + // Send buffer in parts to avoid copying + cur_op->send_list.push_back( + stripes[role].read_buf + (stripes[role].req_start - stripes[role].read_start), + stripes[role].req_end - stripes[role].req_start + ); + } + } + } + free(op_data); + cur_op->op_data = NULL; + finish_primary_op(cur_op, cur_op->req.rw.len); } -void osd_t::submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op) +void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op) { - osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)cur_op->op_data; + bool w = submit_type == SUBMIT_WRITE; + osd_primary_op_data_t *op_data = cur_op->op_data; osd_rmw_stripe_t *stripes = op_data->stripes; // Allocate subops - int n_subops = 0, force_read = -1; - for (int role = 0; role < read_pg_size; role++) + int n_subops = 0, zero_read = -1; + for (int role = 0; role < pg_size; role++) { - if (osd_set[role] == this->osd_num || osd_set[role] != 0 && force_read == -1) + if (osd_set[role] == this->osd_num || osd_set[role] != 0 && zero_read == -1) { - force_read = role; + zero_read = role; } - if (osd_set[role] != 0 && stripes[role].read_end != 0) + if (osd_set[role] != 0 && + (w ? stripes[role].write_end : stripes[role].read_end) != 0) { n_subops++; } } - if (!n_subops && op_data->should_read_version) + if (!n_subops && submit_type == SUBMIT_RMW_READ) { n_subops = 1; } else { - force_read = -1; + zero_read = -1; } osd_op_t *subops = new osd_op_t[n_subops]; + op_data->done = op_data->errors = 0; op_data->n_subops = n_subops; op_data->subops = subops; int subop = 0; - for (int role = 0; role < read_pg_size; role++) + for (int role = 0; role < pg_size; role++) { - if (stripes[role].read_end == 0 && force_read != role) + if ((submit_type == SUBMIT_WRITE ? stripes[role].write_end : stripes[role].read_end) == 0 && zero_read != role) { continue; } @@ -201,19 +205,19 @@ void osd_t::submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op if (role_osd_num == this->osd_num) { subops[subop].bs_op = new blockstore_op_t({ - .opcode = BS_OP_READ, + .opcode = (uint64_t)(w ? BS_OP_WRITE : BS_OP_READ), .callback = [cur_op, this](blockstore_op_t *subop) { - handle_primary_read_subop(cur_op, subop->retval == subop->len); + handle_primary_subop(cur_op, subop->retval == subop->len, subop->version); }, .oid = { .inode = op_data->oid.inode, .stripe = op_data->oid.stripe | role, }, - .version = op_data->target_ver, - .offset = stripes[role].read_start, - .len = stripes[role].read_end - stripes[role].read_start, - .buf = stripes[role].read_buf, + .version = w ? 0 : (submit_type == SUBMIT_RMW_READ ? UINT64_MAX : op_data->target_ver), + .offset = w ? stripes[role].write_start : stripes[role].read_start, + .len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start, + .buf = w ? stripes[role].write_buf : stripes[role].read_buf, }); bs->enqueue_op(subops[subop].bs_op); } @@ -225,22 +229,22 @@ void osd_t::submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op .header = { .magic = SECONDARY_OSD_OP_MAGIC, .id = this->next_subop_id++, - .opcode = OSD_OP_SECONDARY_READ, + .opcode = (uint64_t)(w ? OSD_OP_SECONDARY_WRITE : OSD_OP_SECONDARY_READ), }, .oid = { .inode = op_data->oid.inode, .stripe = op_data->oid.stripe | role, }, - .version = op_data->target_ver, - .offset = stripes[role].read_start, - .len = stripes[role].read_end - stripes[role].read_start, + .version = w ? 0 : (submit_type == SUBMIT_RMW_READ ? UINT64_MAX : op_data->target_ver), + .offset = w ? stripes[role].write_start : stripes[role].read_start, + .len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start, }; - subops[subop].buf = stripes[role].read_buf; + subops[subop].buf = w ? stripes[role].write_buf : stripes[role].read_buf; subops[subop].callback = [cur_op, this](osd_op_t *subop) { // so it doesn't get freed subop->buf = NULL; - handle_primary_read_subop(cur_op, subop->reply.hdr.retval == subop->req.sec_rw.len); + handle_primary_subop(cur_op, subop->reply.hdr.retval == subop->req.sec_rw.len, subop->reply.sec_rw.version); }; outbox_push(clients[subops[subop].peer_fd], &subops[subop]); } @@ -249,47 +253,98 @@ void osd_t::submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op } } -void osd_t::exec_primary_write(osd_op_t *cur_op) +void osd_t::handle_primary_subop(osd_op_t *cur_op, int ok, uint64_t version) { - if (!prepare_primary_rw(cur_op)) + osd_primary_op_data_t *op_data = cur_op->op_data; + op_data->fact_ver = version; + if (!ok) + { + op_data->errors++; + } + else + { + op_data->done++; + } + if ((op_data->errors + op_data->done) >= op_data->n_subops) + { + delete[] op_data->subops; + op_data->subops = NULL; + op_data->st++; + if (cur_op->req.hdr.opcode == OSD_OP_READ) + { + continue_primary_read(cur_op); + } + else + { + continue_primary_write(cur_op); + } + } +} + +void osd_t::continue_primary_write(osd_op_t *cur_op) +{ + if (!cur_op->op_data && !prepare_primary_rw(cur_op)) { return; } - osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)cur_op->op_data; + osd_primary_op_data_t *op_data = cur_op->op_data; + // FIXME: Handle operation cancel auto & pg = pgs[op_data->pg_num]; + if (op_data->st == 1) goto resume_1; + else if (op_data->st == 2) goto resume_2; + else if (op_data->st == 3) goto resume_3; + else if (op_data->st == 4) goto resume_4; + else if (op_data->st == 5) goto resume_5; // Check if actions are pending for this object - auto act_it = pg.obj_stab_actions.lower_bound((obj_piece_id_t){ - .oid = op_data->oid, - .osd_num = 0, - }); - if (act_it != pg.obj_stab_actions.end() && - act_it->first.oid.inode == op_data->oid.inode && - (act_it->first.oid.stripe & ~STRIPE_MASK) == op_data->oid.stripe) { - // FIXME postpone the request until actions are done - free(op_data); - finish_primary_op(cur_op, -EIO); - return; + auto act_it = pg.obj_stab_actions.lower_bound((obj_piece_id_t){ + .oid = op_data->oid, + .osd_num = 0, + }); + if (act_it != pg.obj_stab_actions.end() && + act_it->first.oid.inode == op_data->oid.inode && + (act_it->first.oid.stripe & ~STRIPE_MASK) == op_data->oid.stripe) + { + // FIXME postpone the request until actions are done + free(op_data); + finish_primary_op(cur_op, -EIO); + return; + } } +resume_1: // Check if there are other write requests to the same object - + { + auto vo_it = pg.ver_override.find(op_data->oid); + if (vo_it != pg.ver_override.end()) + { + op_data->st = 1; + //pg.write_queue.push_back(cur_op); + return; + } + } // Determine blocks to read cur_op->rmw_buf = calc_rmw_reads(cur_op->buf, op_data->stripes, pg.cur_set.data(), pg.pg_size, pg.pg_minsize, pg.pg_cursize); - op_data->should_read_version = true; // Read required blocks - submit_read_subops(pg.pg_size, pg.cur_set.data(), cur_op); - // ->>>>> Continue from the callback + submit_primary_subops(SUBMIT_RMW_READ, pg.pg_size, pg.cur_set.data(), cur_op); + op_data->st = 2; +resume_2: + return; +resume_3: + // Save version override + pg.ver_override[op_data->oid] = op_data->fact_ver; // Calculate parity calc_rmw_parity(op_data->stripes, op_data->pg_size); - // Save version override if degraded - // Send writes - - // ->>>>> Continue from the callback + submit_primary_subops(SUBMIT_WRITE, pg.pg_size, pg.cur_set.data(), cur_op); + op_data->st = 4; +resume_4: + return; +resume_5: // Remember version as unstable // Remove version override if degraded + finish_primary_op(cur_op, cur_op->req.rw.len); } void osd_t::exec_primary_sync(osd_op_t *cur_op)