From a406c62a716606933c598974884544a6078a037a Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Tue, 25 Feb 2020 20:10:17 +0300 Subject: [PATCH] Implement basic primary-sync-stabilize --- blockstore.h | 89 +++++++++++++++++---- osd.cpp | 4 + osd.h | 25 +++++- osd_ops.h | 2 +- osd_primary.cpp | 208 ++++++++++++++++++++++++++++++++++++++++++++++-- 5 files changed, 305 insertions(+), 23 deletions(-) diff --git a/blockstore.h b/blockstore.h index 3c2637de4..48377a410 100644 --- a/blockstore.h +++ b/blockstore.h @@ -35,17 +35,88 @@ #define BS_OP_PRIVATE_DATA_SIZE 256 -/* BS_OP_LIST: +/* + +Blockstore opcode documentation: + +## BS_OP_READ / BS_OP_WRITE + +Read or write object data. Input: -- oid.stripe = parity block size +- oid = requested object +- version = requested version. + For reads: + - version == 0: read the last stable version, + - version == UINT64_MAX: read the last version, + - otherwise: read the newest version that is <= the specified version + For writes: + - if version == 0, a new version is assigned automatically + - if version != 0, it is assigned for the new write if possible, otherwise -EINVAL is returned +- offset, len = offset and length within object. length may be zero, in that case + read operation only returns the version / write operation only bumps the version +- buf = pre-allocated buffer for data (read) / with data (write). may be NULL if len == 0. + +Output: +- retval = number of bytes actually read/written or negative error number (-EINVAL or -ENOSPC) +- version = the version actually read or written + +## BS_OP_DELETE + +Delete an object. + +Input: +- oid = requested object +- version = requested version. Treated the same as with BS_OP_WRITE + +Output: +- retval = 0 or negative error number (-EINVAL) +- version = the version actually written (delete is initially written as an object version) + +## BS_OP_SYNC + +Make sure all previously issued modifications reach physical media. + +Input: Nothing except opcode +Output: +- retval = 0 or negative error number (-EINVAL) + +## BS_OP_STABLE / BS_OP_ROLLBACK + +Mark objects as stable / rollback previous unstable writes. + +Input: +- len = count of obj_ver_id's to stabilize or rollback + - stabilize: all object versions up to the requested version of each object are marked as stable + - rollback: all objects are rolled back to the requested stable versions +- buf = pre-allocated obj_ver_id array units long + +Output: +- retval = 0 or negative error number (-EINVAL) + +## BS_OP_SYNC_STAB_ALL + +ONLY FOR TESTS! Sync and mark all unstable object versions as stable, at once. + +Input: Nothing except opcode +Output: +- retval = 0 or negative error number (-EINVAL) + +## BS_OP_LIST + +Get a list of all objects in this Blockstore. + +Input: +- oid.stripe = PG alignment - len = PG count or 0 to list all objects - offset = PG number Output: - retval = total obj_ver_id count - version = stable obj_ver_id count -- buf = obj_ver_id array allocated by the blockstore. stable versions come first +- buf = obj_ver_id array allocated by the blockstore. Stable versions come first. + You must free it yourself after usage with free(). + Output includes all objects for which (((inode + stripe / ) % ) == ). */ @@ -55,21 +126,9 @@ struct blockstore_op_t uint64_t opcode; // finish callback std::function callback; - // For reads, writes & deletes: oid is the requested object object_id oid; - // For reads: - // version == 0 -> read the last stable version, - // version == UINT64_MAX -> read the last version, - // otherwise -> read the newest version that is <= the specified version - // after execution, version is equal to the version that was read from the blockstore - // For writes & deletes: - // if version == 0, a new version is assigned automatically - // if version != 0, it is assigned for the new write if possible, otherwise -EINVAL is returned - // after execution, version is equal to the version that was written to the blockstore uint64_t version; - // For reads & writes: offset & len are the requested part of the object, buf is the buffer uint32_t offset; - // For stabilize requests: buf contains obj_ver_id's to stabilize uint32_t len; void *buf; int retval; diff --git a/osd.cpp b/osd.cpp index 9ac57f51e..6062eed83 100644 --- a/osd.cpp +++ b/osd.cpp @@ -340,6 +340,10 @@ void osd_t::exec_op(osd_op_t *cur_op) { continue_primary_write(cur_op); } + else if (cur_op->req.hdr.opcode == OSD_OP_SYNC) + { + continue_primary_sync(cur_op); + } else { exec_secondary(cur_op); diff --git a/osd.h b/osd.h index 038c9c877..2f296c130 100644 --- a/osd.h +++ b/osd.h @@ -159,6 +159,12 @@ struct osd_client_t struct osd_rmw_stripe_t; +struct osd_object_id_t +{ + osd_num_t osd_num; + object_id oid; +}; + class osd_t { // config @@ -181,7 +187,8 @@ class osd_t uint64_t next_subop_id = 1; // Unstable writes - spp::sparse_hash_map> unstable_writes; + std::map unstable_writes; + std::deque syncs_in_progress; // client & peer I/O @@ -240,12 +247,26 @@ class osd_t bool prepare_primary_rw(osd_op_t *cur_op); void continue_primary_read(osd_op_t *cur_op); void continue_primary_write(osd_op_t *cur_op); - void exec_primary_sync(osd_op_t *cur_op); + void continue_primary_sync(osd_op_t *cur_op); void finish_primary_op(osd_op_t *cur_op, int retval); void handle_primary_subop(osd_op_t *cur_op, int ok, uint64_t version); void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op); + void submit_primary_sync_subops(osd_op_t *cur_op); + void submit_primary_stab_subops(osd_op_t *cur_op); public: osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop); ~osd_t(); bool shutdown(); }; + +inline bool operator == (const osd_object_id_t & a, const osd_object_id_t & b) +{ + return a.osd_num == b.osd_num && a.oid.inode == b.oid.inode && a.oid.stripe == b.oid.stripe; +} + +inline bool operator < (const osd_object_id_t & a, const osd_object_id_t & b) +{ + return a.osd_num < b.osd_num || a.osd_num == b.osd_num && ( + a.oid.inode < b.oid.inode || a.oid.inode == b.oid.inode && a.oid.stripe < b.oid.stripe + ); +} diff --git a/osd_ops.h b/osd_ops.h index cb7282ff4..7155fda41 100644 --- a/osd_ops.h +++ b/osd_ops.h @@ -103,7 +103,7 @@ struct __attribute__((__packed__)) osd_op_secondary_stabilize_t { osd_op_header_t header; // obj_ver_id array length in bytes - uint32_t len; + uint64_t len; }; typedef osd_op_secondary_stabilize_t osd_op_secondary_rollback_t; diff --git a/osd_primary.cpp b/osd_primary.cpp index e157dcb46..db281cfac 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -14,6 +14,12 @@ // // sync: sync peers, get unstable versions from somewhere, stabilize them +struct unstable_osd_num_t +{ + osd_num_t osd_num; + int start, len; +}; + struct osd_primary_op_data_t { int st = 0; @@ -25,6 +31,9 @@ struct osd_primary_op_data_t int degraded = 0, pg_size, pg_minsize; osd_rmw_stripe_t *stripes; osd_op_t *subops = NULL; + // for sync. oops, requires freeing + std::vector *unstable_write_osds = NULL; + obj_ver_id *unstable_writes = NULL; }; void osd_t::finish_primary_op(osd_op_t *cur_op, int retval) @@ -350,9 +359,12 @@ resume_5: { if (osd_set[role] != 0) { - this->unstable_writes[osd_set[role]][(object_id){ - .inode = op_data->oid.inode, - .stripe = op_data->oid.stripe | role, + this->unstable_writes[(osd_object_id_t){ + .osd_num = osd_set[role], + .oid = { + .inode = op_data->oid.inode, + .stripe = op_data->oid.stripe | role, + }, }] = op_data->fact_ver; } } @@ -374,7 +386,193 @@ resume_5: } } -void osd_t::exec_primary_sync(osd_op_t *cur_op) +// Save and clear unstable_writes -> SYNC all -> STABLE all +// FIXME: Run regular automatic syncs based on the number of unstable writes and/or system time +void osd_t::continue_primary_sync(osd_op_t *cur_op) { - + if (!cur_op->op_data) + { + cur_op->op_data = (osd_primary_op_data_t*)calloc(sizeof(osd_primary_op_data_t), 1); + } + if (cur_op->op_data->st == 1) goto resume_1; + else if (cur_op->op_data->st == 2) goto resume_2; + else if (cur_op->op_data->st == 3) goto resume_3; + else if (cur_op->op_data->st == 4) goto resume_4; + else if (cur_op->op_data->st == 5) goto resume_5; + else if (cur_op->op_data->st == 6) goto resume_6; + if (syncs_in_progress.size() > 0) + { + // Wait for previous syncs, if any + // FIXME: We may try to execute the current one in parallel, like in Blockstore, but I'm not sure if it matters at all + syncs_in_progress.push_back(cur_op); + cur_op->op_data->st = 1; +resume_1: + return; + } + else + { + syncs_in_progress.push_back(cur_op); + } +resume_2: + // FIXME: Handle operation cancel + if (unstable_writes.size() == 0) + { + // Nothing to sync + goto finish; + } + // Save and clear unstable_writes + // FIXME: This is possible to do it on a per-client basis + // It would be cool not to copy them here at all, but someone has to deduplicate them by object IDs anyway + cur_op->op_data->unstable_write_osds = new std::vector(); + cur_op->op_data->unstable_writes = new obj_ver_id[unstable_writes.size()]; + { + osd_num_t last_osd = 0; + int last_start = 0, last_end = 0; + for (auto it = unstable_writes.begin(); it != unstable_writes.end(); it++) + { + if (last_osd != it->first.osd_num) + { + if (last_osd != 0) + { + cur_op->op_data->unstable_write_osds->push_back((unstable_osd_num_t){ + .osd_num = last_osd, + .start = last_start, + .len = last_end - last_start, + }); + } + last_osd = it->first.osd_num; + last_start = last_end; + } + cur_op->op_data->unstable_writes[last_end] = (obj_ver_id){ + .oid = it->first.oid, + .version = it->second, + }; + last_start++; + last_end++; + } + if (last_osd != 0) + { + cur_op->op_data->unstable_write_osds->push_back((unstable_osd_num_t){ + .osd_num = last_osd, + .start = last_start, + .len = last_end - last_start, + }); + } + } + unstable_writes.clear(); + // SYNC + submit_primary_sync_subops(cur_op); +resume_3: + cur_op->op_data->st = 3; + return; +resume_4: + // Stabilize version sets + submit_primary_stab_subops(cur_op); +resume_5: + cur_op->op_data->st = 5; + return; +resume_6: + // FIXME: Free them correctly (via a destructor or so) + delete cur_op->op_data->unstable_write_osds; + delete cur_op->op_data->unstable_writes; + cur_op->op_data->unstable_writes = NULL; + cur_op->op_data->unstable_write_osds = NULL; +finish: + assert(syncs_in_progress.front() == cur_op); + syncs_in_progress.pop_front(); + finish_primary_op(cur_op, 0); + if (syncs_in_progress.size() > 0) + { + osd_op_t *next_op = syncs_in_progress.front(); + next_op->op_data->st++; + continue_primary_sync(next_op); + } +} + +void osd_t::submit_primary_sync_subops(osd_op_t *cur_op) +{ + osd_primary_op_data_t *op_data = cur_op->op_data; + int n_osds = op_data->unstable_write_osds->size(); + osd_op_t *subops = new osd_op_t[n_osds]; + op_data->done = op_data->errors = 0; + op_data->n_subops = n_osds; + op_data->subops = subops; + for (int i = 0; i < n_osds; i++) + { + osd_num_t sync_osd = (*(op_data->unstable_write_osds))[i].osd_num; + if (sync_osd == this->osd_num) + { + subops[i].bs_op = new blockstore_op_t({ + .opcode = BS_OP_SYNC, + .callback = [cur_op, this](blockstore_op_t *subop) + { + handle_primary_subop(cur_op, subop->retval == 0, 0); + }, + }); + bs->enqueue_op(subops[i].bs_op); + } + else + { + subops[i].op_type = OSD_OP_OUT; + subops[i].peer_fd = osd_peer_fds.at(sync_osd); + subops[i].req.sec_sync = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = this->next_subop_id++, + .opcode = OSD_OP_SECONDARY_SYNC, + }, + }; + subops[i].callback = [cur_op, this](osd_op_t *subop) + { + handle_primary_subop(cur_op, subop->reply.hdr.retval == 0, 0); + }; + outbox_push(clients[subops[i].peer_fd], &subops[i]); + } + } +} + +void osd_t::submit_primary_stab_subops(osd_op_t *cur_op) +{ + osd_primary_op_data_t *op_data = cur_op->op_data; + int n_osds = op_data->unstable_write_osds->size(); + osd_op_t *subops = new osd_op_t[n_osds]; + op_data->done = op_data->errors = 0; + op_data->n_subops = n_osds; + op_data->subops = subops; + for (int i = 0; i < n_osds; i++) + { + auto & stab_osd = (*(op_data->unstable_write_osds))[i]; + if (stab_osd.osd_num == this->osd_num) + { + subops[i].bs_op = new blockstore_op_t({ + .opcode = BS_OP_STABLE, + .callback = [cur_op, this](blockstore_op_t *subop) + { + handle_primary_subop(cur_op, subop->retval == 0, 0); + }, + .len = (uint32_t)stab_osd.len, + .buf = (void*)(op_data->unstable_writes + stab_osd.start), + }); + bs->enqueue_op(subops[i].bs_op); + } + else + { + subops[i].op_type = OSD_OP_OUT; + subops[i].peer_fd = osd_peer_fds.at(stab_osd.osd_num); + subops[i].req.sec_stab = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = this->next_subop_id++, + .opcode = OSD_OP_SECONDARY_STABILIZE, + }, + .len = (uint64_t)(stab_osd.len * sizeof(obj_ver_id)), + }; + subops[i].send_list.push_back(op_data->unstable_writes + stab_osd.start, stab_osd.len * sizeof(obj_ver_id)); + subops[i].callback = [cur_op, this](osd_op_t *subop) + { + handle_primary_subop(cur_op, subop->reply.hdr.retval == 0, 0); + }; + outbox_push(clients[subops[i].peer_fd], &subops[i]); + } + } }