From cf36445359ccb169ba3202ee38245321190fc241 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 20 Nov 2023 03:01:38 +0300 Subject: [PATCH] Reserve journal space for stabilize requests dynamically to prevent stalls --- src/blockstore_impl.h | 2 +- src/blockstore_journal.cpp | 1 + src/blockstore_journal.h | 6 ------ src/blockstore_sync.cpp | 15 +++++++++++++-- src/blockstore_write.cpp | 25 ++++++++++++++++++++----- 5 files changed, 35 insertions(+), 14 deletions(-) diff --git a/src/blockstore_impl.h b/src/blockstore_impl.h index d833b8a8..ba79e16a 100644 --- a/src/blockstore_impl.h +++ b/src/blockstore_impl.h @@ -274,7 +274,7 @@ class blockstore_impl_t blockstore_dirty_db_t dirty_db; std::vector submit_queue; std::vector unsynced_big_writes, unsynced_small_writes; - int unsynced_big_write_count = 0; + int unsynced_big_write_count = 0, unstable_unsynced = 0; int unsynced_queued_ops = 0; allocator *data_alloc = NULL; uint8_t *zero_object; diff --git a/src/blockstore_journal.cpp b/src/blockstore_journal.cpp index b1a661a7..74344722 100644 --- a/src/blockstore_journal.cpp +++ b/src/blockstore_journal.cpp @@ -145,6 +145,7 @@ journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, journal.sector_info[journal.cur_sector].offset = journal.next_free; journal.in_sector_pos = 0; journal.next_free = (journal.next_free+journal.block_size) < journal.len ? journal.next_free + journal.block_size : journal.block_size; + assert(journal.next_free != journal.used_start); memset(journal.inmemory ? (uint8_t*)journal.buffer + journal.sector_info[journal.cur_sector].offset : (uint8_t*)journal.sector_buf + journal.block_size*journal.cur_sector, 0, journal.block_size); diff --git a/src/blockstore_journal.h b/src/blockstore_journal.h index dcfffacb..60476304 100644 --- a/src/blockstore_journal.h +++ b/src/blockstore_journal.h @@ -13,12 +13,6 @@ #define JOURNAL_BUFFER_SIZE 4*1024*1024 #define JOURNAL_ENTRY_HEADER_SIZE 16 -// We reserve some extra space for future stabilize requests during writes -// FIXME: This value should be dynamic i.e. Blockstore ideally shouldn't allow -// writing more than can be stabilized afterwards -#define JOURNAL_STABILIZE_RESERVATION 65536 -#define JOURNAL_INSTANT_RESERVATION 131072 - // Journal entries // Journal entries are linked to each other by their crc32 value // The journal is almost a blockchain, because object versions constantly increase diff --git a/src/blockstore_sync.cpp b/src/blockstore_sync.cpp index 918eb90c..1d64a4f6 100644 --- a/src/blockstore_sync.cpp +++ b/src/blockstore_sync.cpp @@ -86,14 +86,15 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) auto & dirty_entry = dirty_db.at(sbw); uint64_t dyn_size = dsk.dirty_dyn_size(dirty_entry.offset, dirty_entry.len); if (!space_check.check_available(op, 1, sizeof(journal_entry_big_write) + dyn_size, - left == 0 ? JOURNAL_STABILIZE_RESERVATION : 0)) + (unstable_writes.size()+unstable_unsynced)*journal.block_size)) { return 0; } } } else if (!space_check.check_available(op, PRIV(op)->sync_big_writes.size(), - sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size, JOURNAL_STABILIZE_RESERVATION)) + sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size, + (unstable_writes.size()+unstable_unsynced)*journal.block_size)) { return 0; } @@ -184,6 +185,11 @@ void blockstore_impl_t::ack_sync(blockstore_op_t *op) { mark_stable(dirty_it->first); } + else + { + unstable_unsynced--; + assert(unstable_unsynced >= 0); + } dirty_it++; while (dirty_it != dirty_db.end() && dirty_it->first.oid == it->oid) { @@ -214,6 +220,11 @@ void blockstore_impl_t::ack_sync(blockstore_op_t *op) { mark_stable(*it); } + else + { + unstable_unsynced--; + assert(unstable_unsynced >= 0); + } } } op->retval = 0; diff --git a/src/blockstore_write.cpp b/src/blockstore_write.cpp index 61a9a0a0..1eda2d43 100644 --- a/src/blockstore_write.cpp +++ b/src/blockstore_write.cpp @@ -320,7 +320,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) blockstore_journal_check_t space_check(this); if (!space_check.check_available(op, unsynced_big_write_count + 1, sizeof(journal_entry_big_write) + dsk.clean_dyn_size, - (dirty_it->second.state & BS_ST_INSTANT) ? JOURNAL_INSTANT_RESERVATION : JOURNAL_STABILIZE_RESERVATION)) + (unstable_writes.size()+unstable_unsynced)*journal.block_size)) { return 0; } @@ -386,6 +386,10 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) sqe, dsk.data_fd, PRIV(op)->iov_zerofill, vcnt, dsk.data_offset + (loc << dsk.block_order) + op->offset - stripe_offset ); PRIV(op)->pending_ops = 1; + if (immediate_commit != IMMEDIATE_ALL && !(dirty_it->second.state & BS_ST_INSTANT)) + { + unstable_unsynced++; + } if (immediate_commit != IMMEDIATE_ALL) { // Increase the counter, but don't save into unsynced_writes yet (can't sync until the write is finished) @@ -408,7 +412,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) sizeof(journal_entry_big_write) + dsk.clean_dyn_size, 0) || !space_check.check_available(op, 1, sizeof(journal_entry_small_write) + dyn_size, - op->len + ((dirty_it->second.state & BS_ST_INSTANT) ? JOURNAL_INSTANT_RESERVATION : JOURNAL_STABILIZE_RESERVATION))) + (unstable_writes.size()+unstable_unsynced)*journal.block_size)) { return 0; } @@ -499,6 +503,11 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) if (journal.next_free >= journal.len) { journal.next_free = dsk.journal_block_size; + assert(journal.next_free != journal.used_start); + } + if (immediate_commit == IMMEDIATE_NONE && !(dirty_it->second.state & BS_ST_INSTANT)) + { + unstable_unsynced++; } if (!PRIV(op)->pending_ops) { @@ -538,7 +547,7 @@ resume_2: uint64_t dyn_size = dsk.dirty_dyn_size(op->offset, op->len); blockstore_journal_check_t space_check(this); if (!space_check.check_available(op, 1, sizeof(journal_entry_big_write) + dyn_size, - ((dirty_it->second.state & BS_ST_INSTANT) ? JOURNAL_INSTANT_RESERVATION : JOURNAL_STABILIZE_RESERVATION))) + (unstable_writes.size()+unstable_unsynced)*journal.block_size)) { return 0; } @@ -582,14 +591,20 @@ resume_4: #endif bool is_big = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE; bool imm = is_big ? (immediate_commit == IMMEDIATE_ALL) : (immediate_commit != IMMEDIATE_NONE); + bool is_instant = ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE || (dirty_it->second.state & BS_ST_INSTANT)); if (imm) { auto & unstab = unstable_writes[op->oid]; unstab = unstab < op->version ? op->version : unstab; } + else if (!is_instant) + { + unstable_unsynced--; + assert(unstable_unsynced >= 0); + } dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | (imm ? BS_ST_SYNCED : BS_ST_WRITTEN); - if (imm && ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE || (dirty_it->second.state & BS_ST_INSTANT))) + if (imm && is_instant) { // Deletions and 'instant' operations are treated as immediately stable mark_stable(dirty_it->first); @@ -735,7 +750,7 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op) }); assert(dirty_it != dirty_db.end()); blockstore_journal_check_t space_check(this); - if (!space_check.check_available(op, 1, sizeof(journal_entry_del), JOURNAL_INSTANT_RESERVATION)) + if (!space_check.check_available(op, 1, sizeof(journal_entry_del), (unstable_writes.size()+unstable_unsynced)*journal.block_size)) { return 0; }