diff --git a/src/osd.h b/src/osd.h index 02af70e9..208a2042 100644 --- a/src/osd.h +++ b/src/osd.h @@ -301,8 +301,12 @@ class osd_t pg_osd_set_state_t* add_object_to_set(pg_t & pg, const object_id oid, const pg_osd_set_t & osd_set, uint64_t old_pg_state, int log_at_level); void remove_object_from_state(object_id & oid, pg_osd_set_state_t **object_state, pg_t &pg, bool report = true); + pg_osd_set_state_t *mark_object(pg_t & pg, object_id oid, pg_osd_set_state_t *prev_object_state, bool ref, + std::function calc_set); pg_osd_set_state_t *mark_object_corrupted(pg_t & pg, object_id oid, pg_osd_set_state_t *prev_object_state, osd_rmw_stripe_t *stripes, bool ref, bool inconsistent); + pg_osd_set_state_t *mark_partial_write(pg_t & pg, object_id oid, pg_osd_set_state_t *prev_object_state, + osd_rmw_stripe_t *stripes, bool ref); void deref_object_state(pg_t & pg, pg_osd_set_state_t **object_state, bool deref); bool remember_unstable_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state); void handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op); @@ -317,6 +321,7 @@ class osd_t void submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_delete, int chunks_to_delete_count); int submit_primary_sync_subops(osd_op_t *cur_op); void submit_primary_stab_subops(osd_op_t *cur_op); + void submit_primary_rollback_subops(osd_op_t *cur_op, const uint64_t* osd_set); uint64_t* get_object_osd_set(pg_t &pg, object_id &oid, pg_osd_set_state_t **object_state); diff --git a/src/osd_primary.cpp b/src/osd_primary.cpp index 9bad1e2f..c5a11325 100644 --- a/src/osd_primary.cpp +++ b/src/osd_primary.cpp @@ -299,8 +299,8 @@ resume_2: finish_op(cur_op, cur_op->req.rw.len); } -pg_osd_set_state_t *osd_t::mark_object_corrupted(pg_t & pg, object_id oid, pg_osd_set_state_t *prev_object_state, - osd_rmw_stripe_t *stripes, bool ref, bool inconsistent) +pg_osd_set_state_t *osd_t::mark_object(pg_t & pg, object_id oid, pg_osd_set_state_t *prev_object_state, bool ref, + std::function calc_set) { pg_osd_set_state_t *object_state = NULL; get_object_osd_set(pg, oid, &object_state); @@ -315,58 +315,22 @@ pg_osd_set_state_t *osd_t::mark_object_corrupted(pg_t & pg, object_id oid, pg_os } return object_state; } - pg_osd_set_t corrupted_set; + pg_osd_set_t new_set; if (object_state) { - corrupted_set = object_state->osd_set; + new_set = object_state->osd_set; } else { for (int i = 0; i < pg.cur_set.size(); i++) { - corrupted_set.push_back((pg_obj_loc_t){ + new_set.push_back((pg_obj_loc_t){ .role = (pg.scheme == POOL_SCHEME_REPLICATED ? 0 : (uint64_t)i), .osd_num = pg.cur_set[i], }); } } - // Mark object chunk(s) as corrupted - int changes = 0; - for (auto chunk_it = corrupted_set.begin(); chunk_it != corrupted_set.end(); ) - { - auto & chunk = *chunk_it; - if (stripes[chunk.role].osd_num == chunk.osd_num) - { - if (stripes[chunk.role].not_exists) - { - changes++; - corrupted_set.erase(chunk_it, chunk_it+1); - continue; - } - if (stripes[chunk.role].read_error && chunk.loc_bad != LOC_CORRUPTED) - { - changes++; - chunk.loc_bad = LOC_CORRUPTED; - } - else if (stripes[chunk.role].read_end > 0 && !stripes[chunk.role].missing && - (chunk.loc_bad & LOC_CORRUPTED)) - { - changes++; - chunk.loc_bad &= ~LOC_CORRUPTED; - } - } - if (inconsistent && !chunk.loc_bad) - { - changes++; - chunk.loc_bad |= LOC_INCONSISTENT; - } - else if (!inconsistent && (chunk.loc_bad & LOC_INCONSISTENT)) - { - changes++; - chunk.loc_bad &= ~LOC_INCONSISTENT; - } - chunk_it++; - } + int changes = calc_set(new_set); if (!changes) { // No chunks newly marked as corrupted - object is already marked or moved @@ -379,7 +343,7 @@ pg_osd_set_state_t *osd_t::mark_object_corrupted(pg_t & pg, object_id oid, pg_os deref_object_state(pg, &object_state, ref); } // Insert object into the new state and retry - object_state = add_object_to_set(pg, oid, corrupted_set, old_pg_state, 2); + object_state = add_object_to_set(pg, oid, new_set, old_pg_state, 2); if (ref) { object_state->ref_count++; @@ -387,6 +351,76 @@ pg_osd_set_state_t *osd_t::mark_object_corrupted(pg_t & pg, object_id oid, pg_os return object_state; } +pg_osd_set_state_t *osd_t::mark_object_corrupted(pg_t & pg, object_id oid, pg_osd_set_state_t *prev_object_state, + osd_rmw_stripe_t *stripes, bool ref, bool inconsistent) +{ + return mark_object(pg, oid, prev_object_state, ref, [stripes, inconsistent](pg_osd_set_t & new_set) + { + // Mark object chunk(s) as corrupted + int changes = 0; + for (auto chunk_it = new_set.begin(); chunk_it != new_set.end(); ) + { + auto & chunk = *chunk_it; + if (stripes[chunk.role].osd_num == chunk.osd_num) + { + if (stripes[chunk.role].not_exists) + { + changes++; + new_set.erase(chunk_it, chunk_it+1); + continue; + } + if (stripes[chunk.role].read_error && chunk.loc_bad != LOC_CORRUPTED) + { + changes++; + chunk.loc_bad = LOC_CORRUPTED; + } + else if (stripes[chunk.role].read_end > 0 && !stripes[chunk.role].missing && + (chunk.loc_bad & LOC_CORRUPTED)) + { + changes++; + chunk.loc_bad &= ~LOC_CORRUPTED; + } + } + if (inconsistent && !chunk.loc_bad) + { + changes++; + chunk.loc_bad |= LOC_INCONSISTENT; + } + else if (!inconsistent && (chunk.loc_bad & LOC_INCONSISTENT)) + { + changes++; + chunk.loc_bad &= ~LOC_INCONSISTENT; + } + chunk_it++; + } + return changes; + }); +} + +// Mark the object as partially updated (probably due to a ENOSPC) +pg_osd_set_state_t *osd_t::mark_partial_write(pg_t & pg, object_id oid, pg_osd_set_state_t *prev_object_state, + osd_rmw_stripe_t *stripes, bool ref) +{ + return mark_object(pg, oid, prev_object_state, ref, [stripes](pg_osd_set_t & new_set) + { + // Mark object chunk(s) as outdated + int changes = 0; + for (auto chunk_it = new_set.begin(); chunk_it != new_set.end(); ) + { + auto & chunk = *chunk_it; + if (stripes[chunk.role].osd_num == chunk.osd_num && + stripes[chunk.role].read_error && + chunk.loc_bad != LOC_OUTDATED) + { + changes++; + chunk.loc_bad = LOC_OUTDATED; + } + chunk_it++; + } + return changes; + }); +} + pg_osd_set_state_t* osd_t::add_object_to_set(pg_t & pg, const object_id oid, const pg_osd_set_t & osd_set, uint64_t old_pg_state, int log_at_level) { diff --git a/src/osd_primary.h b/src/osd_primary.h index 34f3063b..ae8b7c48 100644 --- a/src/osd_primary.h +++ b/src/osd_primary.h @@ -25,7 +25,7 @@ struct osd_primary_op_data_t uint64_t target_ver; uint64_t orig_ver = 0, fact_ver = 0; uint64_t scheme = 0; - int n_subops = 0, done = 0, errors = 0, errcode = 0; + int n_subops = 0, done = 0, errors = 0, drops = 0, errcode = 0; int degraded = 0, pg_size, pg_data_size; osd_rmw_stripe_t *stripes; osd_op_t *subops = NULL; diff --git a/src/osd_primary_subops.cpp b/src/osd_primary_subops.cpp index 5839727d..d4d562fb 100644 --- a/src/osd_primary_subops.cpp +++ b/src/osd_primary_subops.cpp @@ -133,7 +133,7 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, const ui zero_read = -1; osd_op_t *subops = new osd_op_t[n_subops]; op_data->fact_ver = 0; - op_data->done = op_data->errors = op_data->errcode = 0; + op_data->done = op_data->errors = op_data->drops = op_data->errcode = 0; op_data->n_subops = n_subops; op_data->subops = subops; int sent = submit_primary_subop_batch(submit_type, op_data->oid.inode, op_version, op_data->stripes, osd_set, cur_op, 0, zero_read); @@ -363,6 +363,13 @@ void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op) memset(((osd_rmw_stripe_t*)subop->rmw_buf)->read_buf, 0, expected); ((osd_rmw_stripe_t*)subop->rmw_buf)->not_exists = true; } + if (opcode == OSD_OP_SEC_READ && (retval == -EIO || retval == -EDOM) || + opcode == OSD_OP_SEC_WRITE && retval != expected) + { + // We'll retry reads from other replica(s) on EIO/EDOM and mark object as corrupted + // And we'll mark write as failed + ((osd_rmw_stripe_t*)subop->rmw_buf)->read_error = true; + } if (retval == expected && (opcode == OSD_OP_SEC_READ || opcode == OSD_OP_SEC_WRITE || opcode == OSD_OP_SEC_WRITE_STABLE)) { uint64_t version = subop->reply.sec_rw.version; @@ -404,14 +411,10 @@ void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op) osd_op_names[opcode], subop->peer_fd, retval, expected ); } - if (opcode == OSD_OP_SEC_READ && (retval == -EIO || retval == -EDOM)) - { - // We'll retry reads from other replica(s) on EIO/EDOM and mark object as corrupted - ((osd_rmw_stripe_t*)subop->rmw_buf)->read_error = true; - } subop->rmw_buf = NULL; - // Error priority: ENOSPC and others > EIO > EDOM > EPIPE + // Error priority: ENOSPC > others > EIO > EDOM > EPIPE if (op_data->errcode == 0 || + retval == -ENOSPC && op_data->errcode != -ENOSPC || retval == -EIO && (op_data->errcode == -EDOM || op_data->errcode == -EPIPE) || retval == -EDOM && (op_data->errcode == -EPIPE) || retval != -EIO && retval != -EDOM && retval != -EPIPE) @@ -424,6 +427,7 @@ void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op) (retval != -EIO || opcode != OSD_OP_SEC_READ)) { // Drop connection on unexpected errors + op_data->drops++; msgr.stop_client(subop->peer_fd); } } @@ -705,6 +709,96 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op) } } +void osd_t::submit_primary_rollback_subops(osd_op_t *cur_op, const uint64_t* osd_set) +{ + osd_primary_op_data_t *op_data = cur_op->op_data; + osd_rmw_stripe_t *stripes = op_data->stripes; + assert(op_data->scheme != POOL_SCHEME_REPLICATED); + // Allocate subops + int n_subops = 0; + for (int role = 0; role < op_data->pg_size; role++) + { + if (osd_set[role] != 0 && !stripes[role].read_error && + msgr.osd_peer_fds.find(osd_set[role]) != msgr.osd_peer_fds.end()) + { + n_subops++; + } + } + op_data->n_subops = n_subops; + op_data->done = op_data->errors = 0; + if (!op_data->n_subops) + { + return; + } + op_data->subops = new osd_op_t[n_subops]; + op_data->unstable_writes = new obj_ver_id[n_subops]; + int i = 0; + for (int role = 0; role < op_data->pg_size; role++) + { + if (osd_set[role] != 0 && !stripes[role].read_error && + msgr.osd_peer_fds.find(osd_set[role]) != msgr.osd_peer_fds.end()) + { + osd_op_t *subop = &op_data->subops[i]; + op_data->unstable_writes[i] = (obj_ver_id){ + .oid = { + .inode = op_data->oid.inode, + .stripe = op_data->oid.stripe | role, + }, + .version = op_data->target_ver-1, + }; + if (osd_set[role] == this->osd_num) + { + clock_gettime(CLOCK_REALTIME, &subop->tv_begin); + subop->op_type = (uint64_t)cur_op; + subop->bs_op = new blockstore_op_t((blockstore_op_t){ + .opcode = BS_OP_ROLLBACK, + .callback = [subop, this](blockstore_op_t *bs_subop) + { + handle_primary_bs_subop(subop); + }, + { + .len = 1, + }, + .buf = (void*)(op_data->unstable_writes + i), + }); +#ifdef OSD_DEBUG + printf( + "Submit rollback to local: %jx:%jx v%ju\n", + op_data->oid.inode, op_data->oid.stripe | role, op_data->target_ver-1 + ); +#endif + bs->enqueue_op(subop->bs_op); + } + else + { + subop->op_type = OSD_OP_OUT; + subop->req = (osd_any_op_t){ .sec_stab = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = msgr.next_subop_id++, + .opcode = OSD_OP_SEC_ROLLBACK, + }, + .len = sizeof(obj_ver_id), + } }; + subop->iov.push_back(op_data->unstable_writes + i, sizeof(obj_ver_id)); + subop->callback = [cur_op, this](osd_op_t *subop) + { + handle_primary_subop(subop, cur_op); + }; +#ifdef OSD_DEBUG + printf( + "Submit rollback to osd %ju: %jx:%jx v%ju\n", osd_set[role], + op_data->oid.inode, op_data->oid.stripe | role, op_data->target_ver-1 + ); +#endif + subop->peer_fd = msgr.osd_peer_fds.at(osd_set[role]); + msgr.outbox_push(subop); + } + i++; + } + } +} + void osd_t::pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval) { auto st_it = pg.write_queue.find(oid), it = st_it; diff --git a/src/osd_primary_write.cpp b/src/osd_primary_write.cpp index dee2da01..a8efffcf 100644 --- a/src/osd_primary_write.cpp +++ b/src/osd_primary_write.cpp @@ -49,6 +49,8 @@ void osd_t::continue_primary_write(osd_op_t *cur_op) else if (op_data->st == 8) goto resume_8; else if (op_data->st == 9) goto resume_9; else if (op_data->st == 10) goto resume_10; + else if (op_data->st == 11) goto resume_11; + else if (op_data->st == 12) goto resume_12; assert(op_data->st == 0); if (!check_write_queue(cur_op, pg)) { @@ -259,11 +261,31 @@ resume_5: } if (op_data->errors > 0) { - // FIXME: Handle ENOSPC. If one of the subops fail with ENOSPC here, + // Handle ENOSPC/EDOM/ERANGE/EIO. If some subops fail, but others succeed, // next writes to the same object will also fail because they'll try // to overwrite the same version number which will result in EEXIST. // To fix it, we should mark the object as degraded for replicas, // and rollback successful part updates in case of EC. + if (op_data->done > 0 && !op_data->drops) + { + if (op_data->scheme != POOL_SCHEME_REPLICATED) + { + submit_primary_rollback_subops(cur_op, op_data->prev_set); +resume_11: + op_data->st = 11; + return; +resume_12: + // Ignore ROLLBACK errors - submit_primary_subops will drop the connection if it fails + delete[] op_data->unstable_writes; + op_data->unstable_writes = NULL; + } + else + { + mark_partial_write(pg, op_data->oid, op_data->object_state, op_data->stripes, true); + pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode); + return; + } + } deref_object_state(pg, &op_data->object_state, true); pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode); return;