diff --git a/osd.h b/osd.h index 21aeb861..3f81f6e1 100644 --- a/osd.h +++ b/osd.h @@ -301,6 +301,7 @@ class osd_t void handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version); void pg_cancel_write_queue(pg_t & pg, object_id oid, int retval); void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op); + void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_state_t *object_state); void submit_primary_sync_subops(osd_op_t *cur_op); void submit_primary_stab_subops(osd_op_t *cur_op); diff --git a/osd_flush.cpp b/osd_flush.cpp index 9b16f0bb..0264ceb8 100644 --- a/osd_flush.cpp +++ b/osd_flush.cpp @@ -254,41 +254,6 @@ void osd_t::submit_recovery_op(osd_recovery_op_t *op) throw std::runtime_error("Failed to recover an object"); } } - else - { - pg_t *pg = &pgs[op->pg_num]; - pg_osd_set_state_t *st; - if (op->degraded) - { - auto st_it = pg->degraded_objects.find(op->oid); - st = st_it->second; - pg->degraded_objects.erase(st_it); - degraded_objects--; - if (!pg->degraded_objects.size()) - { - pg->state = pg->state & ~PG_HAS_DEGRADED; - pg->print_state(); - } - } - else - { - auto st_it = pg->misplaced_objects.find(op->oid); - st = st_it->second; - pg->misplaced_objects.erase(st_it); - misplaced_objects--; - if (!pg->misplaced_objects.size()) - { - pg->state = pg->state & ~PG_HAS_MISPLACED; - pg->print_state(); - } - } - pg->clean_count++; - st->object_count--; - if (!st->object_count) - { - pg->state_dict.erase(st->osd_set); - } - } recovery_ops.erase(op->oid); delete osd_op; op->osd_op = NULL; diff --git a/osd_primary.cpp b/osd_primary.cpp index 090acd89..aa2c60ee 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -1,18 +1,20 @@ #include "osd.h" #include "osd_rmw.h" +// FIXME: Split into more files + #define SUBMIT_READ 0 #define SUBMIT_RMW_READ 1 #define SUBMIT_WRITE 2 // read: read directly or read paired stripe(s), reconstruct, return -// write: read paired stripe(s), modify, write +// write: read paired stripe(s), reconstruct, modify, calculate parity, write // // nuance: take care to read the same version from paired stripes! // to do so, we remember "last readable" version until a write request completes // and we postpone other write requests to the same stripe until completion of previous ones // -// sync: sync peers, get unstable versions from somewhere, stabilize them +// sync: sync peers, get unstable versions, stabilize them struct unstable_osd_num_t { @@ -32,7 +34,7 @@ struct osd_primary_op_data_t osd_rmw_stripe_t *stripes; osd_op_t *subops = NULL; uint64_t *prev_set = NULL; - uint64_t object_state = 0; + pg_osd_set_state_t *object_state = NULL; // for sync. oops, requires freeing std::vector *unstable_write_osds = NULL; @@ -118,32 +120,32 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op) return true; } -uint64_t* get_object_osd_set(pg_t &pg, object_id &oid, uint64_t *def, uint64_t &object_state) +uint64_t* get_object_osd_set(pg_t &pg, object_id &oid, uint64_t *def, pg_osd_set_state_t **object_state) { if (!(pg.state & (PG_HAS_INCOMPLETE | PG_HAS_DEGRADED | PG_HAS_MISPLACED))) { - object_state = 0; + *object_state = NULL; return def; } auto st_it = pg.incomplete_objects.find(oid); if (st_it != pg.incomplete_objects.end()) { - object_state = st_it->second->state; + *object_state = st_it->second; return st_it->second->read_target.data(); } st_it = pg.degraded_objects.find(oid); if (st_it != pg.degraded_objects.end()) { - object_state = st_it->second->state; + *object_state = st_it->second; return st_it->second->read_target.data(); } st_it = pg.misplaced_objects.find(oid); if (st_it != pg.misplaced_objects.end()) { - object_state = st_it->second->state; + *object_state = st_it->second; return st_it->second->read_target.data(); } - object_state = 0; + *object_state = NULL; return def; } @@ -177,7 +179,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op) else { // PG may be degraded or have misplaced objects - uint64_t* cur_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), op_data->object_state); + uint64_t* cur_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state); if (extend_missing_stripes(op_data->stripes, cur_set, pg.pg_minsize, pg.pg_size) < 0) { finish_op(cur_op, -EIO); @@ -418,6 +420,7 @@ void osd_t::continue_primary_write(osd_op_t *cur_op) else if (op_data->st == 5) goto resume_5; else if (op_data->st == 6) goto resume_6; else if (op_data->st == 7) goto resume_7; + else if (op_data->st == 8) goto resume_8; assert(op_data->st == 0); // Check if actions are pending for this object { @@ -447,7 +450,7 @@ void osd_t::continue_primary_write(osd_op_t *cur_op) resume_1: // Determine blocks to read and write // Missing chunks are allowed to be overwritten even in incomplete objects - op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), op_data->object_state); + op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state); cur_op->rmw_buf = calc_rmw(cur_op->buf, op_data->stripes, op_data->prev_set, pg.pg_size, pg.pg_minsize, pg.pg_cursize, pg.cur_set.data(), bs_block_size); // Read required blocks @@ -476,6 +479,68 @@ resume_5: pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); return; } + if (op_data->object_state) + { + if (op_data->object_state->state & OBJ_MISPLACED) + { + // Remove extra chunks + submit_primary_del_subops(cur_op, pg.cur_set.data(), op_data->object_state); + if (op_data->n_subops > 0) + { + op_data->st = 8; + return; +resume_8: + if (op_data->errors > 0) + { + pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); + return; + } + } + } + // Clear object state + if (op_data->object_state->state & OBJ_INCOMPLETE) + { + // Successful write means that object is not incomplete anymore + incomplete_objects--; + pg.incomplete_objects.erase(op_data->oid); + if (!pg.incomplete_objects.size()) + { + pg.state = pg.state & ~PG_HAS_INCOMPLETE; + pg.print_state(); + } + } + else if (op_data->object_state->state & OBJ_DEGRADED) + { + degraded_objects--; + pg.degraded_objects.erase(op_data->oid); + if (!pg.degraded_objects.size()) + { + pg.state = pg.state & ~PG_HAS_DEGRADED; + pg.print_state(); + } + } + else if (op_data->object_state->state & OBJ_MISPLACED) + { + misplaced_objects--; + pg.misplaced_objects.erase(op_data->oid); + if (!pg.misplaced_objects.size()) + { + pg.state = pg.state & ~PG_HAS_MISPLACED; + pg.print_state(); + } + } + else + { + throw std::runtime_error("Invalid object state during recovery: "+std::to_string(op_data->object_state->state)); + } + // FIXME: Track object count during normal writes, too + pg.clean_count++; + op_data->object_state->object_count--; + if (!op_data->object_state->object_count) + { + pg.state_dict.erase(op_data->object_state->osd_set); + } + } // FIXME: Check for immediate_commit == IMMEDIATE_SMALL if (immediate_commit == IMMEDIATE_ALL) { @@ -746,6 +811,87 @@ finish: } } +void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_state_t *object_state) +{ + osd_primary_op_data_t *op_data = cur_op->op_data; + int extra_chunks = 0; + for (auto chunk: object_state->osd_set) + { + if (chunk.osd_num != cur_set[chunk.role]) + { + extra_chunks++; + } + } + op_data->n_subops = extra_chunks; + op_data->done = op_data->errors = 0; + if (!extra_chunks) + { + return; + } + osd_op_t *subops = new osd_op_t[extra_chunks]; + op_data->subops = subops; + int i = 0; + for (auto chunk: object_state->osd_set) + { + if (chunk.osd_num != cur_set[chunk.role]) + { + if (chunk.osd_num == this->osd_num) + { + subops[i].bs_op = new blockstore_op_t({ + .opcode = BS_OP_DELETE, + .callback = [cur_op, this](blockstore_op_t *subop) + { + if (subop->retval != 0) + { + // die + throw std::runtime_error("local delete operation failed"); + } + handle_primary_subop(OSD_OP_SECONDARY_DELETE, cur_op, subop->retval, 0, 0); + }, + .oid = { + .inode = op_data->oid.inode, + .stripe = op_data->oid.stripe | chunk.role, + }, + // Same version as write + .version = op_data->fact_ver, + }); + bs->enqueue_op(subops[i].bs_op); + } + else + { + subops[i].op_type = OSD_OP_OUT; + subops[i].send_list.push_back(subops[i].req.buf, OSD_PACKET_SIZE); + subops[i].peer_fd = osd_peer_fds.at(chunk.osd_num); + subops[i].req.sec_del = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = this->next_subop_id++, + .opcode = OSD_OP_SECONDARY_DELETE, + }, + .oid = { + .inode = op_data->oid.inode, + .stripe = op_data->oid.stripe | chunk.role, + }, + // Same version as write + .version = op_data->fact_ver, + }; + subops[i].callback = [cur_op, this](osd_op_t *subop) + { + int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : 0; + handle_primary_subop(OSD_OP_SECONDARY_DELETE, cur_op, subop->reply.hdr.retval, 0, 0); + if (fail_fd >= 0) + { + // delete operation failed, drop the connection + stop_client(fail_fd); + } + }; + outbox_push(clients[subops[i].peer_fd], &subops[i]); + } + i++; + } + } +} + void osd_t::submit_primary_sync_subops(osd_op_t *cur_op) { osd_primary_op_data_t *op_data = cur_op->op_data;