From 31f9445030f7076b681d86124a185b711f96369d Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Tue, 10 Mar 2020 02:05:32 +0300 Subject: [PATCH] Use immediate_commit to benefit the primary OSD --- osd.cpp | 4 +++ osd.h | 5 +++ osd_primary.cpp | 84 ++++++++++++++++++++++++++++++++++++++----------- 3 files changed, 75 insertions(+), 18 deletions(-) diff --git a/osd.cpp b/osd.cpp index 9d0ce646..720aca6d 100644 --- a/osd.cpp +++ b/osd.cpp @@ -68,6 +68,10 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo osd_num = strtoull(config["osd_num"].c_str(), NULL, 10); if (!osd_num) throw std::runtime_error("osd_num is required in the configuration"); + if (config["immediate_commit"] == "all") + immediate_commit = IMMEDIATE_ALL; + else if (config["immediate_commit"] == "small") + immediate_commit = IMMEDIATE_SMALL; run_primary = config["run_primary"] == "true" || config["run_primary"] == "1" || config["run_primary"] == "yes"; if (run_primary) init_primary(); diff --git a/osd.h b/osd.h index 5cf78482..a640c2ac 100644 --- a/osd.h +++ b/osd.h @@ -35,6 +35,10 @@ #define OSD_PEERING_PEERS 1 #define OSD_PEERING_PGS 2 +#define IMMEDIATE_NONE 0 +#define IMMEDIATE_SMALL 1 +#define IMMEDIATE_ALL 2 + //#define OSD_STUB struct osd_op_buf_list_t @@ -173,6 +177,7 @@ class osd_t int client_queue_depth = 128; bool allow_test_ops = true; int receive_buffer_size = 9000; + int immediate_commit = IMMEDIATE_NONE; // peer OSDs diff --git a/osd_primary.cpp b/osd_primary.cpp index f973ce7f..deb1e967 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -326,6 +326,8 @@ void osd_t::continue_primary_write(osd_op_t *cur_op) else if (op_data->st == 3) goto resume_3; else if (op_data->st == 4) goto resume_4; else if (op_data->st == 5) goto resume_5; + else if (op_data->st == 6) goto resume_6; + else if (op_data->st == 7) goto resume_7; assert(op_data->st == 0); // Check if actions are pending for this object { @@ -373,24 +375,67 @@ resume_4: op_data->st = 4; return; resume_5: - // Remember version as unstable - osd_num_t *osd_set = pg.cur_set.data(); - for (int role = 0; role < pg.pg_size; role++) + // FIXME: Check for immediate_commit == IMMEDIATE_SMALL + if (immediate_commit == IMMEDIATE_ALL) { - if (osd_set[role] != 0) + op_data->unstable_write_osds = new std::vector(); + op_data->unstable_writes = new obj_ver_id[pg.pg_cursize]; { - this->unstable_writes[(osd_object_id_t){ - .osd_num = osd_set[role], - .oid = { - .inode = op_data->oid.inode, - .stripe = op_data->oid.stripe | role, - }, - }] = op_data->fact_ver; + int last_start = 0; + osd_num_t *osd_set = pg.cur_set.data(); + for (int role = 0; role < pg.pg_size; role++) + { + if (osd_set[role] != 0) + { + op_data->unstable_writes[last_start] = (obj_ver_id){ + .oid = { + .inode = op_data->oid.inode, + .stripe = op_data->oid.stripe | role, + }, + .version = op_data->fact_ver, + }; + op_data->unstable_write_osds->push_back((unstable_osd_num_t){ + .osd_num = osd_set[role], + .start = last_start, + .len = 1, + }); + last_start++; + } + } } + // Stabilize version sets + submit_primary_stab_subops(cur_op); +resume_6: + op_data->st = 6; + return; +resume_7: + // FIXME: Free them correctly (via a destructor or so) + delete op_data->unstable_write_osds; + delete[] op_data->unstable_writes; + op_data->unstable_writes = NULL; + op_data->unstable_write_osds = NULL; + } + else + { + // Remember version as unstable + osd_num_t *osd_set = pg.cur_set.data(); + for (int role = 0; role < pg.pg_size; role++) + { + if (osd_set[role] != 0) + { + this->unstable_writes[(osd_object_id_t){ + .osd_num = osd_set[role], + .oid = { + .inode = op_data->oid.inode, + .stripe = op_data->oid.stripe | role, + }, + }] = op_data->fact_ver; + } + } + // Remember PG as dirty to drop the connection when PG goes offline + // (this is required because of the "lazy sync") + this->clients[cur_op->peer_fd].dirty_pgs.insert(op_data->pg_num); } - // Remember PG as dirty to drop the connection when PG goes offline - // (this is required because of the "lazy sync") - this->clients[cur_op->peer_fd].dirty_pgs.insert(op_data->pg_num); // Remove version override pg.ver_override.erase(op_data->oid); finish_primary_op(cur_op, cur_op->req.rw.len); @@ -483,11 +528,14 @@ resume_2: } } unstable_writes.clear(); - // SYNC - submit_primary_sync_subops(cur_op); + if (immediate_commit != IMMEDIATE_ALL) + { + // SYNC + submit_primary_sync_subops(cur_op); resume_3: - cur_op->op_data->st = 3; - return; + cur_op->op_data->st = 3; + return; + } resume_4: // Stabilize version sets submit_primary_stab_subops(cur_op);