From e6a4b634f827ddd466faae8e5b8888625a608861 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 1 Jun 2020 16:12:26 +0300 Subject: [PATCH] Fix possible write stall The stall occurred during fio Q=128 random write tests with low flusher_count (4). It was caused by flushers being unable to flush the beginning of the journal because it contained older writes to an object that also had writes in the very end of the journal, after dirty_start. --- blockstore_flush.cpp | 89 ++++++++++++++++++++++++++++++++++++------ blockstore_flush.h | 6 ++- blockstore_impl.cpp | 1 + blockstore_journal.cpp | 2 +- blockstore_write.cpp | 4 ++ 5 files changed, 86 insertions(+), 16 deletions(-) diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index ea8ec885..9f98c060 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -7,8 +7,8 @@ journal_flusher_t::journal_flusher_t(int flusher_count, blockstore_impl_t *bs) dequeuing = false; active_flushers = 0; syncing_flushers = 0; - sync_threshold = bs->journal_block_size / sizeof(journal_entry_stable); - journal_trim_interval = sync_threshold; + flusher_start_threshold = bs->journal_block_size / sizeof(journal_entry_stable); + journal_trim_interval = flusher_start_threshold; journal_trim_counter = 0; journal_superblock = bs->journal.inmemory ? bs->journal.buffer : memalign(MEM_ALIGNMENT, bs->journal_block_size); co = new journal_flusher_co[flusher_count]; @@ -81,7 +81,7 @@ void journal_flusher_t::enqueue_flush(obj_ver_id ov) flush_versions[ov.oid] = ov.version; flush_queue.push_back(ov.oid); } - if (!dequeuing && flush_queue.size() >= sync_threshold) + if (!dequeuing && flush_queue.size() >= flusher_start_threshold) { dequeuing = true; bs->ringloop->wakeup(); @@ -101,19 +101,25 @@ void journal_flusher_t::unshift_flush(obj_ver_id ov) flush_versions[ov.oid] = ov.version; flush_queue.push_front(ov.oid); } - if (!dequeuing && flush_queue.size() >= sync_threshold) + if (!dequeuing && flush_queue.size() >= flusher_start_threshold) { dequeuing = true; bs->ringloop->wakeup(); } } -void journal_flusher_t::force_start() +void journal_flusher_t::request_trim() { dequeuing = true; + trim_wanted++; bs->ringloop->wakeup(); } +void journal_flusher_t::release_trim() +{ + trim_wanted--; +} + #define await_sqe(label) \ resume_##label:\ sqe = bs->get_sqe();\ @@ -181,14 +187,71 @@ resume_0: (bs->journal.dirty_start >= bs->journal.used_start || dirty_end->second.journal_sector < bs->journal.used_start)) { - // We can't flush journal sectors that are still written to -#ifdef BLOCKSTORE_DEBUG - printf("Flusher overran writers - stopping\n"); -#endif flusher->enqueue_flush(cur); - flusher->dequeuing = false; - wait_state = 0; - return true; + // We can't flush journal sectors that are still written to + // However, as we group flushes by oid, current oid may have older writes to flush! + // And it may even block writes if we don't flush the older version + // (if it's in the beginning of the journal)... + // So first try to find an older version of the same object to flush. + bool found = false; + while (dirty_end != bs->dirty_db.begin()) + { + dirty_end--; + if (dirty_end->first.oid != cur.oid) + { + break; + } + if (!(dirty_end->second.journal_sector >= bs->journal.dirty_start && + (bs->journal.dirty_start >= bs->journal.used_start || + dirty_end->second.journal_sector < bs->journal.used_start))) + { + found = true; + cur.version = dirty_end->first.version; + break; + } + } + if (!found) + { + // Try other objects + int search_left = flusher->flush_queue.size() - 1; +#ifdef BLOCKSTORE_DEBUG + printf("Flusher overran writers (dirty_start=%08lx) - searching for older flushes (%d left)\n", bs->journal.dirty_start, search_left); +#endif + while (search_left > 0) + { + cur.oid = flusher->flush_queue.front(); + cur.version = flusher->flush_versions[cur.oid]; + flusher->flush_queue.pop_front(); + flusher->flush_versions.erase(cur.oid); + dirty_end = bs->dirty_db.find(cur); + if (dirty_end != bs->dirty_db.end()) + { + if (dirty_end->second.journal_sector >= bs->journal.dirty_start && + (bs->journal.dirty_start >= bs->journal.used_start || + dirty_end->second.journal_sector < bs->journal.used_start)) + { +#ifdef BLOCKSTORE_DEBUG + printf("Write %lu:%lu v%lu is too new: offset=%08lx\n", cur.oid.inode, cur.oid.stripe, cur.version, dirty_end->second.journal_sector); +#endif + flusher->enqueue_flush(cur); + } + else + { + break; + } + } + search_left--; + } + if (search_left <= 0) + { +#ifdef BLOCKSTORE_DEBUG + printf("No older flushes, stopping\n"); +#endif + flusher->dequeuing = false; + wait_state = 0; + return true; + } + } } repeat_it = flusher->sync_to_repeat.find(cur.oid); if (repeat_it != flusher->sync_to_repeat.end()) @@ -409,7 +472,7 @@ resume_1: // Update clean_db and dirty_db, free old data locations update_clean_db(); // Clear unused part of the journal every flushes - if (!((++flusher->journal_trim_counter) % flusher->journal_trim_interval)) + if (!((++flusher->journal_trim_counter) % flusher->journal_trim_interval) || flusher->trim_wanted > 0) { flusher->journal_trim_counter = 0; if (bs->journal.trim()) diff --git a/blockstore_flush.h b/blockstore_flush.h index a09bc6f4..8705567f 100644 --- a/blockstore_flush.h +++ b/blockstore_flush.h @@ -73,9 +73,10 @@ public: // Journal flusher itself class journal_flusher_t { + int trim_wanted = 0; bool dequeuing; int flusher_count; - int sync_threshold; + int flusher_start_threshold; journal_flusher_co *co; blockstore_impl_t *bs; friend class journal_flusher_co; @@ -96,7 +97,8 @@ public: ~journal_flusher_t(); void loop(); bool is_active(); - void force_start(); + void request_trim(); + void release_trim(); void enqueue_flush(obj_ver_id oid); void unshift_flush(obj_ver_id oid); }; diff --git a/blockstore_impl.cpp b/blockstore_impl.cpp index bff05d94..06f86e2b 100644 --- a/blockstore_impl.cpp +++ b/blockstore_impl.cpp @@ -282,6 +282,7 @@ void blockstore_impl_t::check_wait(blockstore_op_t *op) #endif return; } + flusher->release_trim(); PRIV(op)->wait_for = 0; } else if (PRIV(op)->wait_for == WAIT_JOURNAL_BUFFER) diff --git a/blockstore_journal.cpp b/blockstore_journal.cpp index feab013c..41d1c423 100644 --- a/blockstore_journal.cpp +++ b/blockstore_journal.cpp @@ -101,7 +101,7 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int entries : bs->journal.used_start - bs->journal.next_free) ); PRIV(op)->wait_for = WAIT_JOURNAL; - bs->flusher->force_start(); + bs->flusher->request_trim(); PRIV(op)->wait_detail = bs->journal.used_start; return 0; } diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 0b085184..bcbd6da5 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -411,6 +411,10 @@ void blockstore_impl_t::release_journal_sectors(blockstore_op_t *op) { // We know for sure that we won't write into this sector anymore uint64_t new_ds = journal.sector_info[s-1].offset + journal.block_size; + if (new_ds >= journal.len) + { + new_ds = journal.block_size; + } if ((journal.dirty_start + (journal.dirty_start >= journal.used_start ? 0 : journal.len)) < (new_ds + (new_ds >= journal.used_start ? 0 : journal.len))) {