Make flusher_count adaptive and limit write iodepth

Vitaliy Filippov 2021-02-23 01:39:49 +03:00
parent 00e98f64f3
commit c974cb539c
5 changed files with 36 additions and 4 deletions

View File

@ -7,6 +7,8 @@ journal_flusher_t::journal_flusher_t(int flusher_count, blockstore_impl_t *bs)
{ {
this->bs = bs; this->bs = bs;
this->flusher_count = flusher_count; this->flusher_count = flusher_count;
this->cur_flusher_count = 1;
this->target_flusher_count = 1;
dequeuing = false; dequeuing = false;
trimming = false; trimming = false;
active_flushers = 0; active_flushers = 0;
@ -68,10 +70,24 @@ bool journal_flusher_t::is_active()
void journal_flusher_t::loop() void journal_flusher_t::loop()
{ {
for (int i = 0; (active_flushers > 0 || dequeuing) && i < flusher_count; i++) target_flusher_count = bs->write_iodepth*2;
if (target_flusher_count <= 0)
target_flusher_count = 1;
else if (target_flusher_count > flusher_count)
target_flusher_count = flusher_count;
if (target_flusher_count > cur_flusher_count)
cur_flusher_count = target_flusher_count;
else if (target_flusher_count < cur_flusher_count)
{ {
co[i].loop(); while (target_flusher_count < cur_flusher_count)
{
if (co[cur_flusher_count-1].wait_state)
break;
cur_flusher_count--;
}
} }
for (int i = 0; (active_flushers > 0 || dequeuing) && i < cur_flusher_count; i++)
co[i].loop();
} }
void journal_flusher_t::enqueue_flush(obj_ver_id ov) void journal_flusher_t::enqueue_flush(obj_ver_id ov)

View File

@ -80,7 +80,7 @@ class journal_flusher_t
{ {
int trim_wanted = 0; int trim_wanted = 0;
bool dequeuing; bool dequeuing;
int flusher_count; int flusher_count, cur_flusher_count, target_flusher_count;
int flusher_start_threshold; int flusher_start_threshold;
journal_flusher_co *co; journal_flusher_co *co;
blockstore_impl_t *bs; blockstore_impl_t *bs;

View File

@ -199,7 +199,10 @@ class blockstore_impl_t
// Suitable only for server SSDs with capacitors, requires disabled data and journal fsyncs // Suitable only for server SSDs with capacitors, requires disabled data and journal fsyncs
int immediate_commit = IMMEDIATE_NONE; int immediate_commit = IMMEDIATE_NONE;
bool inmemory_meta = false; bool inmemory_meta = false;
int flusher_count; // Maximum flusher count
unsigned flusher_count;
// Maximum queue depth
unsigned max_write_iodepth = 128;
/******* END OF OPTIONS *******/ /******* END OF OPTIONS *******/
struct ring_consumer_t ring_consumer; struct ring_consumer_t ring_consumer;
@ -226,6 +229,7 @@ class blockstore_impl_t
struct journal_t journal; struct journal_t journal;
journal_flusher_t *flusher; journal_flusher_t *flusher;
int write_iodepth = 0;
bool live = false, queue_stall = false; bool live = false, queue_stall = false;
ring_loop_t *ringloop; ring_loop_t *ringloop;

View File

@ -70,6 +70,7 @@ void blockstore_impl_t::parse_config(blockstore_config_t & config)
meta_block_size = strtoull(config["meta_block_size"].c_str(), NULL, 10); meta_block_size = strtoull(config["meta_block_size"].c_str(), NULL, 10);
bitmap_granularity = strtoull(config["bitmap_granularity"].c_str(), NULL, 10); bitmap_granularity = strtoull(config["bitmap_granularity"].c_str(), NULL, 10);
flusher_count = strtoull(config["flusher_count"].c_str(), NULL, 10); flusher_count = strtoull(config["flusher_count"].c_str(), NULL, 10);
max_write_iodepth = strtoull(config["max_write_iodepth"].c_str(), NULL, 10);
// Validate // Validate
if (!block_size) if (!block_size)
{ {
@ -83,6 +84,10 @@ void blockstore_impl_t::parse_config(blockstore_config_t & config)
{ {
flusher_count = 32; flusher_count = 32;
} }
if (!max_write_iodepth)
{
max_write_iodepth = 128;
}
if (!disk_alignment) if (!disk_alignment)
{ {
disk_alignment = 4096; disk_alignment = 4096;

View File

@ -167,6 +167,10 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
.version = op->version, .version = op->version,
}, e).first; }, e).first;
} }
if (write_iodepth >= max_write_iodepth)
{
return 0;
}
if ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE) if ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE)
{ {
blockstore_journal_check_t space_check(this); blockstore_journal_check_t space_check(this);
@ -191,6 +195,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
FINISH_OP(op); FINISH_OP(op);
return 1; return 1;
} }
write_iodepth++;
BS_SUBMIT_GET_SQE(sqe, data); BS_SUBMIT_GET_SQE(sqe, data);
dirty_it->second.location = loc << block_order; dirty_it->second.location = loc << block_order;
dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SUBMITTED; dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SUBMITTED;
@ -243,6 +248,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
{ {
return 0; return 0;
} }
write_iodepth++;
// There is sufficient space. Get SQE(s) // There is sufficient space. Get SQE(s)
struct io_uring_sqe *sqe1 = NULL; struct io_uring_sqe *sqe1 = NULL;
if (immediate_commit != IMMEDIATE_NONE || if (immediate_commit != IMMEDIATE_NONE ||
@ -432,6 +438,7 @@ resume_4:
} }
// Acknowledge write // Acknowledge write
op->retval = op->len; op->retval = op->len;
write_iodepth--;
FINISH_OP(op); FINISH_OP(op);
return 1; return 1;
} }