From 839ec9e6e01854c3f297ffd5b3a941c4539caaa0 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 20 Feb 2022 00:02:02 +0300 Subject: [PATCH] Shard clean_db by PGs to speedup listings --- src/blockstore_flush.cpp | 14 +++-- src/blockstore_flush.h | 1 - src/blockstore_impl.cpp | 126 ++++++++++++++++++++++++++++++++------ src/blockstore_impl.h | 17 ++++- src/blockstore_init.cpp | 29 +++++---- src/blockstore_read.cpp | 2 + src/blockstore_stable.cpp | 3 + src/blockstore_write.cpp | 1 + 8 files changed, 155 insertions(+), 38 deletions(-) diff --git a/src/blockstore_flush.cpp b/src/blockstore_flush.cpp index 31004ed4e..7fcbc745e 100644 --- a/src/blockstore_flush.cpp +++ b/src/blockstore_flush.cpp @@ -415,8 +415,11 @@ stop_flusher: flusher->active_flushers++; resume_1: // Find it in clean_db - clean_it = bs->clean_db.find(cur.oid); - old_clean_loc = (clean_it != bs->clean_db.end() ? clean_it->second.location : UINT64_MAX); + { + auto & clean_db = bs->clean_db_shard(cur.oid); + auto clean_it = clean_db.find(cur.oid); + old_clean_loc = (clean_it != clean_db.end() ? clean_it->second.location : UINT64_MAX); + } // Scan dirty versions of the object if (!scan_dirty(1)) { @@ -870,10 +873,11 @@ void journal_flusher_co::update_clean_db() #endif bs->data_alloc->set(old_clean_loc >> bs->block_order, false); } + auto & clean_db = bs->clean_db_shard(cur.oid); if (has_delete) { - auto clean_it = bs->clean_db.find(cur.oid); - bs->clean_db.erase(clean_it); + auto clean_it = clean_db.find(cur.oid); + clean_db.erase(clean_it); #ifdef BLOCKSTORE_DEBUG printf("Free block %lu from %lx:%lx v%lu (delete)\n", clean_loc >> bs->block_order, @@ -884,7 +888,7 @@ void journal_flusher_co::update_clean_db() } else { - bs->clean_db[cur.oid] = { + clean_db[cur.oid] = { .version = cur.version, .location = clean_loc, }; diff --git a/src/blockstore_flush.h b/src/blockstore_flush.h index f698f1884..7ff2f871c 100644 --- a/src/blockstore_flush.h +++ b/src/blockstore_flush.h @@ -49,7 +49,6 @@ class journal_flusher_co std::function simple_callback_r, simple_callback_w; bool skip_copy, has_delete, has_writes; - blockstore_clean_db_t::iterator clean_it; std::vector v; std::vector::iterator it; int copy_count; diff --git a/src/blockstore_impl.cpp b/src/blockstore_impl.cpp index 8bb343915..2ddc50c91 100644 --- a/src/blockstore_impl.cpp +++ b/src/blockstore_impl.cpp @@ -428,22 +428,104 @@ static bool replace_stable(object_id oid, uint64_t version, int search_start, in return false; } +blockstore_clean_db_t& blockstore_impl_t::clean_db_shard(object_id oid) +{ + uint64_t pg_num = 0; + uint64_t pool_id = (oid.inode >> (64-POOL_ID_BITS)); + auto sh_it = clean_db_settings.find(pool_id); + if (sh_it != clean_db_settings.end()) + { + // like map_to_pg() + pg_num = (oid.stripe / sh_it->second.pg_stripe_size) % sh_it->second.pg_count + 1; + } + return clean_db_shards[(pool_id << (64-POOL_ID_BITS)) | pg_num]; +} + +void blockstore_impl_t::reshard_clean_db(pool_id_t pool, uint32_t pg_count, uint32_t pg_stripe_size) +{ + uint64_t pool_id = (uint64_t)pool; + std::map new_shards; + auto sh_it = clean_db_shards.lower_bound((pool_id << (64-POOL_ID_BITS))); + while (sh_it != clean_db_shards.end() && + (sh_it->first >> (64-POOL_ID_BITS)) == pool_id) + { + for (auto & pair: sh_it->second) + { + // like map_to_pg() + uint64_t pg_num = (pair.first.stripe / pg_stripe_size) % pg_count + 1; + uint64_t shard_id = (pool_id << (64-POOL_ID_BITS)) | pg_num; + new_shards[shard_id][pair.first] = pair.second; + } + clean_db_shards.erase(sh_it++); + } + for (sh_it = new_shards.begin(); sh_it != new_shards.end(); sh_it++) + { + auto & to = clean_db_shards[sh_it->first]; + to.swap(sh_it->second); + } + clean_db_settings[pool_id] = (pool_shard_settings_t){ + .pg_count = pg_count, + .pg_stripe_size = pg_stripe_size, + }; +} + void blockstore_impl_t::process_list(blockstore_op_t *op) { - uint32_t list_pg = op->offset; + uint32_t list_pg = op->offset+1; uint32_t pg_count = op->len; uint64_t pg_stripe_size = op->oid.stripe; uint64_t min_inode = op->oid.inode; uint64_t max_inode = op->version; // Check PG - if (pg_count != 0 && (pg_stripe_size < MIN_BLOCK_SIZE || list_pg >= pg_count)) + if (pg_count != 0 && (pg_stripe_size < MIN_BLOCK_SIZE || list_pg > pg_count)) { op->retval = -EINVAL; FINISH_OP(op); return; } - // Copy clean_db entries (sorted) - int stable_count = 0, stable_alloc = clean_db.size() / (pg_count ? pg_count : 1); + // Check if the DB needs resharding + // (we don't know about PGs from the beginning, we only create "shards" here) + uint64_t first_shard = 0, last_shard = UINT64_MAX; + if (min_inode != 0 && + // Check if min_inode == max_inode == pool_id<> (64-POOL_ID_BITS)) == (max_inode >> (64-POOL_ID_BITS))) + { + pool_id_t pool_id = (min_inode >> (64-POOL_ID_BITS)); + if (pg_count > 1) + { + // Per-pg listing + auto sh_it = clean_db_settings.find(pool_id); + if (sh_it == clean_db_settings.end() || + sh_it->second.pg_count != pg_count || + sh_it->second.pg_stripe_size != pg_stripe_size) + { + reshard_clean_db(pool_id, pg_count, pg_stripe_size); + } + first_shard = last_shard = ((uint64_t)pool_id << (64-POOL_ID_BITS)) | list_pg; + } + else + { + // Per-pool listing + first_shard = ((uint64_t)pool_id << (64-POOL_ID_BITS)); + last_shard = ((uint64_t)(pool_id+1) << (64-POOL_ID_BITS)) - 1; + } + } + // Copy clean_db entries + int stable_count = 0, stable_alloc = 0; + if (min_inode != max_inode) + { + for (auto shard_it = clean_db_shards.lower_bound(first_shard); + shard_it != clean_db_shards.end() && shard_it->first <= last_shard; + shard_it++) + { + auto & clean_db = shard_it->second; + stable_alloc += clean_db.size(); + } + } + else + { + stable_alloc = 32768; + } obj_ver_id *stable = (obj_ver_id*)malloc(sizeof(obj_ver_id) * stable_alloc); if (!stable) { @@ -451,7 +533,11 @@ void blockstore_impl_t::process_list(blockstore_op_t *op) FINISH_OP(op); return; } + for (auto shard_it = clean_db_shards.lower_bound(first_shard); + shard_it != clean_db_shards.end() && shard_it->first <= last_shard; + shard_it++) { + auto & clean_db = shard_it->second; auto clean_it = clean_db.begin(), clean_end = clean_db.end(); if ((min_inode != 0 || max_inode != 0) && min_inode <= max_inode) { @@ -466,26 +552,28 @@ void blockstore_impl_t::process_list(blockstore_op_t *op) } for (; clean_it != clean_end; clean_it++) { - if (!pg_count || ((clean_it->first.stripe / pg_stripe_size) % pg_count) == list_pg) // like map_to_pg() + if (stable_count >= stable_alloc) { - if (stable_count >= stable_alloc) + stable_alloc *= 2; + stable = (obj_ver_id*)realloc(stable, sizeof(obj_ver_id) * stable_alloc); + if (!stable) { - stable_alloc += 32768; - stable = (obj_ver_id*)realloc(stable, sizeof(obj_ver_id) * stable_alloc); - if (!stable) - { - op->retval = -ENOMEM; - FINISH_OP(op); - return; - } + op->retval = -ENOMEM; + FINISH_OP(op); + return; } - stable[stable_count++] = { - .oid = clean_it->first, - .version = clean_it->second.version, - }; } + stable[stable_count++] = { + .oid = clean_it->first, + .version = clean_it->second.version, + }; } } + if (first_shard != last_shard) + { + // If that's not a per-PG listing, sort clean entries + std::sort(stable, stable+stable_count); + } int clean_stable_count = stable_count; // Copy dirty_db entries (sorted, too) int unstable_count = 0, unstable_alloc = 0; @@ -511,7 +599,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op) } for (; dirty_it != dirty_end; dirty_it++) { - if (!pg_count || ((dirty_it->first.oid.stripe / pg_stripe_size) % pg_count) == list_pg) // like map_to_pg() + if (!pg_count || ((dirty_it->first.oid.stripe / pg_stripe_size) % pg_count + 1) == list_pg) // like map_to_pg() { if (IS_DELETE(dirty_it->second.state)) { diff --git a/src/blockstore_impl.h b/src/blockstore_impl.h index 2b10ffe83..73be4f466 100644 --- a/src/blockstore_impl.h +++ b/src/blockstore_impl.h @@ -204,6 +204,17 @@ typedef std::map blockstore_dirty_db_t; #include "blockstore_flush.h" +typedef uint32_t pool_id_t; +typedef uint64_t pool_pg_id_t; + +#define POOL_ID_BITS 16 + +struct pool_shard_settings_t +{ + uint32_t pg_count; + uint32_t pg_stripe_size; +}; + class blockstore_impl_t { /******* OPTIONS *******/ @@ -247,7 +258,8 @@ class blockstore_impl_t struct ring_consumer_t ring_consumer; - blockstore_clean_db_t clean_db; + std::map clean_db_settings; + std::map clean_db_shards; uint8_t *clean_bitmap = NULL; blockstore_dirty_db_t dirty_db; std::vector submit_queue; @@ -296,6 +308,9 @@ class blockstore_impl_t void open_journal(); uint8_t* get_clean_entry_bitmap(uint64_t block_loc, int offset); + blockstore_clean_db_t& clean_db_shard(object_id oid); + void reshard_clean_db(pool_id_t pool_id, uint32_t pg_count, uint32_t pg_stripe_size); + // Journaling void prepare_journal_sector_write(int sector, blockstore_op_t *op); void handle_journal_write(ring_data_t *data, uint64_t flush_id); diff --git a/src/blockstore_init.cpp b/src/blockstore_init.cpp index f649f8a06..56108c846 100644 --- a/src/blockstore_init.cpp +++ b/src/blockstore_init.cpp @@ -222,10 +222,11 @@ void blockstore_init_meta::handle_entries(void* entries, unsigned count, int blo } if (entry->oid.inode > 0) { - auto clean_it = bs->clean_db.find(entry->oid); - if (clean_it == bs->clean_db.end() || clean_it->second.version < entry->version) + auto & clean_db = bs->clean_db_shard(entry->oid); + auto clean_it = clean_db.find(entry->oid); + if (clean_it == clean_db.end() || clean_it->second.version < entry->version) { - if (clean_it != bs->clean_db.end()) + if (clean_it != clean_db.end()) { // free the previous block #ifdef BLOCKSTORE_DEBUG @@ -245,7 +246,7 @@ void blockstore_init_meta::handle_entries(void* entries, unsigned count, int blo printf("Allocate block (clean entry) %lu: %lx:%lx v%lu\n", done_cnt+i, entry->oid.inode, entry->oid.stripe, entry->version); #endif bs->data_alloc->set(done_cnt+i, true); - bs->clean_db[entry->oid] = (struct clean_entry){ + clean_db[entry->oid] = (struct clean_entry){ .version = entry->version, .location = (done_cnt+i) << block_order, }; @@ -656,8 +657,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u init_write_sector = proc_pos; return 0; } - auto clean_it = bs->clean_db.find(je->small_write.oid); - if (clean_it == bs->clean_db.end() || + auto & clean_db = bs->clean_db_shard(je->small_write.oid); + auto clean_it = clean_db.find(je->small_write.oid); + if (clean_it == clean_db.end() || clean_it->second.version < je->small_write.version) { obj_ver_id ov = { @@ -735,8 +737,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u erase_dirty_object(dirty_it); } } - auto clean_it = bs->clean_db.find(je->big_write.oid); - if (clean_it == bs->clean_db.end() || + auto & clean_db = bs->clean_db_shard(je->big_write.oid); + auto clean_it = clean_db.find(je->big_write.oid); + if (clean_it == clean_db.end() || clean_it->second.version < je->big_write.version) { // oid, version, block @@ -841,8 +844,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u dirty_it--; dirty_exists = dirty_it->first.oid == je->del.oid; } - auto clean_it = bs->clean_db.find(je->del.oid); - bool clean_exists = (clean_it != bs->clean_db.end() && + auto & clean_db = bs->clean_db_shard(je->del.oid); + auto clean_it = clean_db.find(je->del.oid); + bool clean_exists = (clean_it != clean_db.end() && clean_it->second.version < je->del.version); if (!clean_exists && dirty_exists) { @@ -901,8 +905,9 @@ void blockstore_init_journal::erase_dirty_object(blockstore_dirty_db_t::iterator break; } } - auto clean_it = bs->clean_db.find(oid); - uint64_t clean_loc = clean_it != bs->clean_db.end() + auto & clean_db = bs->clean_db_shard(oid); + auto clean_it = clean_db.find(oid); + uint64_t clean_loc = clean_it != clean_db.end() ? clean_it->second.location : UINT64_MAX; if (exists && clean_loc == UINT64_MAX) { diff --git a/src/blockstore_read.cpp b/src/blockstore_read.cpp index a4b47f910..780c8b314 100644 --- a/src/blockstore_read.cpp +++ b/src/blockstore_read.cpp @@ -111,6 +111,7 @@ uint8_t* blockstore_impl_t::get_clean_entry_bitmap(uint64_t block_loc, int offse int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op) { + auto & clean_db = clean_db_shard(read_op->oid); auto clean_it = clean_db.find(read_op->oid); auto dirty_it = dirty_db.upper_bound((obj_ver_id){ .oid = read_op->oid, @@ -297,6 +298,7 @@ int blockstore_impl_t::read_bitmap(object_id oid, uint64_t target_version, void dirty_it--; } } + auto & clean_db = clean_db_shard(oid); auto clean_it = clean_db.find(oid); if (clean_it != clean_db.end()) { diff --git a/src/blockstore_stable.cpp b/src/blockstore_stable.cpp index 230023bc1..ca2ba8dbe 100644 --- a/src/blockstore_stable.cpp +++ b/src/blockstore_stable.cpp @@ -54,6 +54,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) auto dirty_it = dirty_db.find(*v); if (dirty_it == dirty_db.end()) { + auto & clean_db = clean_db_shard(v->oid); auto clean_it = clean_db.find(v->oid); if (clean_it == clean_db.end() || clean_it->second.version < v->version) { @@ -188,6 +189,7 @@ void blockstore_impl_t::mark_stable(const obj_ver_id & v, bool forget_dirty) } if (exists == -1) { + auto & clean_db = clean_db_shard(v.oid); auto clean_it = clean_db.find(v.oid); exists = clean_it != clean_db.end() ? 1 : 0; } @@ -215,6 +217,7 @@ void blockstore_impl_t::mark_stable(const obj_ver_id & v, bool forget_dirty) break; } } + auto & clean_db = clean_db_shard(v.oid); auto clean_it = clean_db.find(v.oid); uint64_t clean_loc = clean_it != clean_db.end() ? clean_it->second.location : UINT64_MAX; diff --git a/src/blockstore_write.cpp b/src/blockstore_write.cpp index 1fd9e9ab3..8915f4228 100644 --- a/src/blockstore_write.cpp +++ b/src/blockstore_write.cpp @@ -41,6 +41,7 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) } if (!found) { + auto & clean_db = clean_db_shard(op->oid); auto clean_it = clean_db.find(op->oid); if (clean_it != clean_db.end()) {