From 53832d184a0155063ec553c9cd3d9d9417ce67db Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 6 Sep 2020 12:08:44 +0300 Subject: [PATCH] Allow to use lazy sync with replicated pools --- blockstore_write.cpp | 7 -- osd.h | 1 + osd_primary.cpp | 182 ++++++++++++++++++++++++++--------------- osd_primary.h | 2 + osd_primary_subops.cpp | 4 +- 5 files changed, 119 insertions(+), 77 deletions(-) diff --git a/blockstore_write.cpp b/blockstore_write.cpp index f34c5f51..a7ffcd41 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -51,13 +51,6 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) op->retval = 0; return false; } - if (op->opcode == BS_OP_WRITE_STABLE && immediate_commit != IMMEDIATE_ALL && - (op->len == block_size || deleted || immediate_commit != IMMEDIATE_SMALL)) - { - // WRITE_STABLE only works with immediate commit by now - op->retval = -EINVAL; - return false; - } if (is_inflight_big && !is_del && !deleted && op->len < block_size && immediate_commit != IMMEDIATE_ALL) { diff --git a/osd.h b/osd.h index d3d7e4cc..b06e8ddf 100644 --- a/osd.h +++ b/osd.h @@ -91,6 +91,7 @@ class osd_t std::map pg_counts; std::map pgs; std::set dirty_pgs; + std::set dirty_osds; uint64_t misplaced_objects = 0, degraded_objects = 0, incomplete_objects = 0; int peering_state = 0; std::map recovery_ops; diff --git a/osd_primary.cpp b/osd_primary.cpp index 9735d449..92a56c94 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -357,9 +357,7 @@ resume_9: // FIXME: Check for immediate_commit == IMMEDIATE_SMALL resume_6: resume_7: - // FIXME: Replicated writes are always "immediate" - if (op_data->scheme != POOL_SCHEME_REPLICATED && - !remember_unstable_write(cur_op, pg, pg.cur_loc_set, 6)) + if (!remember_unstable_write(cur_op, pg, pg.cur_loc_set, 6)) { return; } @@ -393,55 +391,71 @@ bool osd_t::remember_unstable_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & } if (immediate_commit == IMMEDIATE_ALL) { - op_data->unstable_write_osds = new std::vector(); - op_data->unstable_writes = new obj_ver_id[loc_set.size()]; + if (op_data->scheme != POOL_SCHEME_REPLICATED) { - int last_start = 0; - for (auto & chunk: loc_set) + // Send STABILIZE ops immediately + op_data->unstable_write_osds = new std::vector(); + op_data->unstable_writes = new obj_ver_id[loc_set.size()]; { - op_data->unstable_writes[last_start] = (obj_ver_id){ - .oid = { - .inode = op_data->oid.inode, - .stripe = op_data->oid.stripe | chunk.role, - }, - .version = op_data->fact_ver, - }; - op_data->unstable_write_osds->push_back((unstable_osd_num_t){ - .osd_num = chunk.osd_num, - .start = last_start, - .len = 1, - }); - last_start++; + int last_start = 0; + for (auto & chunk: loc_set) + { + op_data->unstable_writes[last_start] = (obj_ver_id){ + .oid = { + .inode = op_data->oid.inode, + .stripe = op_data->oid.stripe | chunk.role, + }, + .version = op_data->fact_ver, + }; + op_data->unstable_write_osds->push_back((unstable_osd_num_t){ + .osd_num = chunk.osd_num, + .start = last_start, + .len = 1, + }); + last_start++; + } } - } - submit_primary_stab_subops(cur_op); + submit_primary_stab_subops(cur_op); resume_6: - op_data->st = 6; - return false; -resume_7: - // FIXME: Free those in the destructor? - delete op_data->unstable_write_osds; - delete[] op_data->unstable_writes; - op_data->unstable_writes = NULL; - op_data->unstable_write_osds = NULL; - if (op_data->errors > 0) - { - pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); + op_data->st = 6; return false; +resume_7: + // FIXME: Free those in the destructor? + delete op_data->unstable_write_osds; + delete[] op_data->unstable_writes; + op_data->unstable_writes = NULL; + op_data->unstable_write_osds = NULL; + if (op_data->errors > 0) + { + pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); + return false; + } } } else { - // Remember version as unstable - for (auto & chunk: loc_set) + if (op_data->scheme != POOL_SCHEME_REPLICATED) { - this->unstable_writes[(osd_object_id_t){ - .osd_num = chunk.osd_num, - .oid = { - .inode = op_data->oid.inode, - .stripe = op_data->oid.stripe | chunk.role, - }, - }] = op_data->fact_ver; + // Remember version as unstable for EC/XOR + for (auto & chunk: loc_set) + { + this->dirty_osds.insert(chunk.osd_num); + this->unstable_writes[(osd_object_id_t){ + .osd_num = chunk.osd_num, + .oid = { + .inode = op_data->oid.inode, + .stripe = op_data->oid.stripe | chunk.role, + }, + }] = op_data->fact_ver; + } + } + else + { + // Only remember to sync OSDs for replicated pools + for (auto & chunk: loc_set) + { + this->dirty_osds.insert(chunk.osd_num); + } } // Remember PG as dirty to drop the connection when PG goes offline // (this is required because of the "lazy sync") @@ -480,7 +494,7 @@ resume_1: syncs_in_progress.push_back(cur_op); } resume_2: - if (unstable_writes.size() == 0) + if (dirty_osds.size() == 0) { // Nothing to sync goto finish; @@ -488,11 +502,10 @@ resume_2: // Save and clear unstable_writes // In theory it is possible to do in on a per-client basis, but this seems to be an unnecessary complication // It would be cool not to copy these here at all, but someone has to deduplicate them by object IDs anyway + if (unstable_writes.size() > 0) { op_data->unstable_write_osds = new std::vector(); op_data->unstable_writes = new obj_ver_id[this->unstable_writes.size()]; - op_data->dirty_pgs = new pool_pg_num_t[dirty_pgs.size()]; - op_data->dirty_pg_count = dirty_pgs.size(); osd_num_t last_osd = 0; int last_start = 0, last_end = 0; for (auto it = this->unstable_writes.begin(); it != this->unstable_writes.end(); it++) @@ -524,6 +537,14 @@ resume_2: .len = last_end - last_start, }); } + this->unstable_writes.clear(); + } + { + void *dirty_buf = malloc_or_die(sizeof(pool_pg_num_t)*dirty_pgs.size() + sizeof(osd_num_t)*dirty_osds.size()); + op_data->dirty_pgs = (pool_pg_num_t*)dirty_buf; + op_data->dirty_osds = (osd_num_t*)(dirty_buf + sizeof(pool_pg_num_t)*dirty_pgs.size()); + op_data->dirty_pg_count = dirty_pgs.size(); + op_data->dirty_osd_count = dirty_osds.size(); int dpg = 0; for (auto dirty_pg_num: dirty_pgs) { @@ -531,7 +552,12 @@ resume_2: op_data->dirty_pgs[dpg++] = dirty_pg_num; } dirty_pgs.clear(); - this->unstable_writes.clear(); + dpg = 0; + for (auto osd_num: dirty_osds) + { + op_data->dirty_osds[dpg++] = osd_num; + } + dirty_osds.clear(); } if (immediate_commit != IMMEDIATE_ALL) { @@ -546,30 +572,45 @@ resume_4: goto resume_6; } } - // Stabilize version sets - submit_primary_stab_subops(cur_op); + if (op_data->unstable_writes) + { + // Stabilize version sets, if any + submit_primary_stab_subops(cur_op); resume_5: - op_data->st = 5; - return; + op_data->st = 5; + return; + } resume_6: if (op_data->errors > 0) { - // Return objects back into the unstable write set - for (auto unstable_osd: *(op_data->unstable_write_osds)) + // Return PGs and OSDs back into their dirty sets + for (int i = 0; i < op_data->dirty_pg_count; i++) { - for (int i = 0; i < unstable_osd.len; i++) + dirty_pgs.insert(op_data->dirty_pgs[i]); + } + for (int i = 0; i < op_data->dirty_osd_count; i++) + { + dirty_osds.insert(op_data->dirty_osds[i]); + } + if (op_data->unstable_writes) + { + // Return objects back into the unstable write set + for (auto unstable_osd: *(op_data->unstable_write_osds)) { - // Except those from peered PGs - auto & w = op_data->unstable_writes[i]; - pool_pg_num_t wpg = { .pool_id = INODE_POOL(w.oid.inode), .pg_num = map_to_pg(w.oid) }; - if (pgs[wpg].state & PG_ACTIVE) + for (int i = 0; i < unstable_osd.len; i++) { - uint64_t & dest = this->unstable_writes[(osd_object_id_t){ - .osd_num = unstable_osd.osd_num, - .oid = w.oid, - }]; - dest = dest < w.version ? w.version : dest; - dirty_pgs.insert(wpg); + // Except those from peered PGs + auto & w = op_data->unstable_writes[i]; + pool_pg_num_t wpg = { .pool_id = INODE_POOL(w.oid.inode), .pg_num = map_to_pg(w.oid) }; + if (pgs[wpg].state & PG_ACTIVE) + { + uint64_t & dest = this->unstable_writes[(osd_object_id_t){ + .osd_num = unstable_osd.osd_num, + .oid = w.oid, + }]; + dest = dest < w.version ? w.version : dest; + dirty_pgs.insert(wpg); + } } } } @@ -584,11 +625,16 @@ resume_6: } } // FIXME: Free those in the destructor? - delete op_data->dirty_pgs; - delete op_data->unstable_write_osds; - delete[] op_data->unstable_writes; - op_data->unstable_writes = NULL; - op_data->unstable_write_osds = NULL; + free(op_data->dirty_pgs); + op_data->dirty_pgs = NULL; + op_data->dirty_osds = NULL; + if (op_data->unstable_writes) + { + delete op_data->unstable_write_osds; + delete[] op_data->unstable_writes; + op_data->unstable_writes = NULL; + op_data->unstable_write_osds = NULL; + } if (op_data->errors > 0) { finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO); diff --git a/osd_primary.h b/osd_primary.h index c7cbecf9..e4faf6c2 100644 --- a/osd_primary.h +++ b/osd_primary.h @@ -32,5 +32,7 @@ struct osd_primary_op_data_t std::vector *unstable_write_osds = NULL; pool_pg_num_t *dirty_pgs = NULL; int dirty_pg_count = 0; + osd_num_t *dirty_osds = NULL; + int dirty_osd_count = 0; obj_ver_id *unstable_writes = NULL; }; diff --git a/osd_primary_subops.cpp b/osd_primary_subops.cpp index 5b95a3a8..34756f36 100644 --- a/osd_primary_subops.cpp +++ b/osd_primary_subops.cpp @@ -445,14 +445,14 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, osd_num_t *cur_set, uint void osd_t::submit_primary_sync_subops(osd_op_t *cur_op) { osd_primary_op_data_t *op_data = cur_op->op_data; - int n_osds = op_data->unstable_write_osds->size(); + int n_osds = op_data->dirty_osd_count; osd_op_t *subops = new osd_op_t[n_osds]; op_data->done = op_data->errors = 0; op_data->n_subops = n_osds; op_data->subops = subops; for (int i = 0; i < n_osds; i++) { - osd_num_t sync_osd = (*(op_data->unstable_write_osds))[i].osd_num; + osd_num_t sync_osd = op_data->dirty_osds[i]; if (sync_osd == this->osd_num) { clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin);