From c974cb539c28486fd8f96599853ac923d16ea4eb Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Tue, 23 Feb 2021 01:39:49 +0300 Subject: [PATCH] Make flusher_count adaptive and limit write iodepth --- src/blockstore_flush.cpp | 20 ++++++++++++++++++-- src/blockstore_flush.h | 2 +- src/blockstore_impl.h | 6 +++++- src/blockstore_open.cpp | 5 +++++ src/blockstore_write.cpp | 7 +++++++ 5 files changed, 36 insertions(+), 4 deletions(-) diff --git a/src/blockstore_flush.cpp b/src/blockstore_flush.cpp index 4bf229a5..d4ef69fd 100644 --- a/src/blockstore_flush.cpp +++ b/src/blockstore_flush.cpp @@ -7,6 +7,8 @@ journal_flusher_t::journal_flusher_t(int flusher_count, blockstore_impl_t *bs) { this->bs = bs; this->flusher_count = flusher_count; + this->cur_flusher_count = 1; + this->target_flusher_count = 1; dequeuing = false; trimming = false; active_flushers = 0; @@ -68,10 +70,24 @@ bool journal_flusher_t::is_active() void journal_flusher_t::loop() { - for (int i = 0; (active_flushers > 0 || dequeuing) && i < flusher_count; i++) + target_flusher_count = bs->write_iodepth*2; + if (target_flusher_count <= 0) + target_flusher_count = 1; + else if (target_flusher_count > flusher_count) + target_flusher_count = flusher_count; + if (target_flusher_count > cur_flusher_count) + cur_flusher_count = target_flusher_count; + else if (target_flusher_count < cur_flusher_count) { - co[i].loop(); + while (target_flusher_count < cur_flusher_count) + { + if (co[cur_flusher_count-1].wait_state) + break; + cur_flusher_count--; + } } + for (int i = 0; (active_flushers > 0 || dequeuing) && i < cur_flusher_count; i++) + co[i].loop(); } void journal_flusher_t::enqueue_flush(obj_ver_id ov) diff --git a/src/blockstore_flush.h b/src/blockstore_flush.h index adf09abc..c76acf6e 100644 --- a/src/blockstore_flush.h +++ b/src/blockstore_flush.h @@ -80,7 +80,7 @@ class journal_flusher_t { int trim_wanted = 0; bool dequeuing; - int flusher_count; + int flusher_count, cur_flusher_count, target_flusher_count; int flusher_start_threshold; journal_flusher_co *co; blockstore_impl_t *bs; diff --git a/src/blockstore_impl.h b/src/blockstore_impl.h index d263d023..2afb085a 100644 --- a/src/blockstore_impl.h +++ b/src/blockstore_impl.h @@ -199,7 +199,10 @@ class blockstore_impl_t // Suitable only for server SSDs with capacitors, requires disabled data and journal fsyncs int immediate_commit = IMMEDIATE_NONE; bool inmemory_meta = false; - int flusher_count; + // Maximum flusher count + unsigned flusher_count; + // Maximum queue depth + unsigned max_write_iodepth = 128; /******* END OF OPTIONS *******/ struct ring_consumer_t ring_consumer; @@ -226,6 +229,7 @@ class blockstore_impl_t struct journal_t journal; journal_flusher_t *flusher; + int write_iodepth = 0; bool live = false, queue_stall = false; ring_loop_t *ringloop; diff --git a/src/blockstore_open.cpp b/src/blockstore_open.cpp index 1659f400..1b90a2f7 100644 --- a/src/blockstore_open.cpp +++ b/src/blockstore_open.cpp @@ -70,6 +70,7 @@ void blockstore_impl_t::parse_config(blockstore_config_t & config) meta_block_size = strtoull(config["meta_block_size"].c_str(), NULL, 10); bitmap_granularity = strtoull(config["bitmap_granularity"].c_str(), NULL, 10); flusher_count = strtoull(config["flusher_count"].c_str(), NULL, 10); + max_write_iodepth = strtoull(config["max_write_iodepth"].c_str(), NULL, 10); // Validate if (!block_size) { @@ -83,6 +84,10 @@ void blockstore_impl_t::parse_config(blockstore_config_t & config) { flusher_count = 32; } + if (!max_write_iodepth) + { + max_write_iodepth = 128; + } if (!disk_alignment) { disk_alignment = 4096; diff --git a/src/blockstore_write.cpp b/src/blockstore_write.cpp index f9af715e..02d450a7 100644 --- a/src/blockstore_write.cpp +++ b/src/blockstore_write.cpp @@ -167,6 +167,10 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) .version = op->version, }, e).first; } + if (write_iodepth >= max_write_iodepth) + { + return 0; + } if ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE) { blockstore_journal_check_t space_check(this); @@ -191,6 +195,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) FINISH_OP(op); return 1; } + write_iodepth++; BS_SUBMIT_GET_SQE(sqe, data); dirty_it->second.location = loc << block_order; dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SUBMITTED; @@ -243,6 +248,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) { return 0; } + write_iodepth++; // There is sufficient space. Get SQE(s) struct io_uring_sqe *sqe1 = NULL; if (immediate_commit != IMMEDIATE_NONE || @@ -432,6 +438,7 @@ resume_4: } // Acknowledge write op->retval = op->len; + write_iodepth--; FINISH_OP(op); return 1; }