diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index d3bb1462..1f72d30e 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -4,8 +4,9 @@ journal_flusher_t::journal_flusher_t(int flusher_count, blockstore_impl_t *bs) { this->bs = bs; this->flusher_count = flusher_count; + dequeuing = false; active_flushers = 0; - sync_threshold = flusher_count == 1 ? 1 : flusher_count/2; + sync_threshold = bs->journal_block_size / sizeof(journal_entry_stable); journal_trim_interval = sync_threshold; journal_trim_counter = 0; journal_superblock = bs->journal.inmemory ? bs->journal.buffer : memalign(MEM_ALIGNMENT, bs->journal_block_size); @@ -55,17 +56,13 @@ journal_flusher_t::~journal_flusher_t() bool journal_flusher_t::is_active() { - return active_flushers > 0 || start_forced && flush_queue.size() > 0 || flush_queue.size() >= sync_threshold; + return active_flushers > 0 || dequeuing; } void journal_flusher_t::loop() { - for (int i = 0; i < flusher_count; i++) + for (int i = 0; (active_flushers > 0 || dequeuing) && i < flusher_count; i++) { - if (!active_flushers && (start_forced ? !flush_queue.size() : (flush_queue.size() < sync_threshold))) - { - return; - } co[i].loop(); } } @@ -83,6 +80,11 @@ void journal_flusher_t::enqueue_flush(obj_ver_id ov) flush_versions[ov.oid] = ov.version; flush_queue.push_back(ov.oid); } + if (!dequeuing && flush_queue.size() >= sync_threshold) + { + dequeuing = true; + bs->ringloop->wakeup(); + } } void journal_flusher_t::unshift_flush(obj_ver_id ov) @@ -98,11 +100,16 @@ void journal_flusher_t::unshift_flush(obj_ver_id ov) flush_versions[ov.oid] = ov.version; flush_queue.push_front(ov.oid); } + if (!dequeuing && flush_queue.size() >= sync_threshold) + { + dequeuing = true; + bs->ringloop->wakeup(); + } } void journal_flusher_t::force_start() { - start_forced = true; + dequeuing = true; bs->ringloop->wakeup(); } @@ -155,10 +162,9 @@ bool journal_flusher_co::loop() else if (wait_state == 18) goto resume_18; resume_0: - if (!flusher->flush_queue.size() || - !flusher->start_forced && !flusher->active_flushers && flusher->flush_queue.size() < flusher->sync_threshold) + if (!flusher->flush_queue.size() || !flusher->dequeuing) { - flusher->start_forced = false; + flusher->dequeuing = false; wait_state = 0; return true; } @@ -169,6 +175,16 @@ resume_0: dirty_end = bs->dirty_db.find(cur); if (dirty_end != bs->dirty_db.end()) { + if (dirty_end->second.journal_sector >= bs->journal.dirty_start && + (bs->journal.dirty_start >= bs->journal.used_start || + dirty_end->second.journal_sector < bs->journal.used_start)) + { + // We can't flush journal sectors that are still written to + flusher->enqueue_flush(cur); + flusher->dequeuing = false; + wait_state = 0; + return true; + } repeat_it = flusher->sync_to_repeat.find(cur.oid); if (repeat_it != flusher->sync_to_repeat.end()) { diff --git a/blockstore_flush.h b/blockstore_flush.h index 740f3fa8..6451b151 100644 --- a/blockstore_flush.h +++ b/blockstore_flush.h @@ -73,7 +73,7 @@ public: // Journal flusher itself class journal_flusher_t { - bool start_forced = false; + bool dequeuing; int flusher_count; int sync_threshold; journal_flusher_co *co; diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 201e99b0..75637eec 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -402,6 +402,7 @@ resume_1: } // Trim journal on start so we don't stall when all entries are older bs->journal.trim(); + bs->journal.dirty_start = bs->journal.next_free; printf( "Journal entries loaded: %lu, free journal space: %lu bytes (%lu..%lu is used), free blocks: %lu / %lu\n", entries_loaded, diff --git a/blockstore_journal.cpp b/blockstore_journal.cpp index d962fd10..1e2f74a4 100644 --- a/blockstore_journal.cpp +++ b/blockstore_journal.cpp @@ -92,6 +92,10 @@ journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, // Also select next sector buffer in memory journal.cur_sector = ((journal.cur_sector + 1) % journal.sector_count); } + else + { + journal.dirty_start = journal.next_free; + } journal.sector_info[journal.cur_sector].offset = journal.next_free; journal.in_sector_pos = 0; journal.next_free = (journal.next_free+journal.block_size) < journal.len ? journal.next_free + journal.block_size : journal.block_size; diff --git a/blockstore_journal.h b/blockstore_journal.h index 5d14985a..94169847 100644 --- a/blockstore_journal.h +++ b/blockstore_journal.h @@ -137,8 +137,12 @@ struct journal_t uint64_t block_size = 512; uint64_t offset, len; + // Next free block offset uint64_t next_free = 0; + // First occupied block offset uint64_t used_start = 0; + // End of the last block not used for writing anymore + uint64_t dirty_start = 0; uint32_t crc32_last = 0; // Current sector(s) used for writing diff --git a/blockstore_write.cpp b/blockstore_write.cpp index c587fd15..c0260b4a 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -249,7 +249,7 @@ void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *o void blockstore_impl_t::release_journal_sectors(blockstore_op_t *op) { - // Release used journal sectors + // Release flushed journal sectors if (PRIV(op)->min_flushed_journal_sector > 0 && PRIV(op)->max_flushed_journal_sector > 0) { @@ -257,6 +257,16 @@ void blockstore_impl_t::release_journal_sectors(blockstore_op_t *op) while (1) { journal.sector_info[s-1].usage_count--; + if (s != (1+journal.cur_sector) && journal.sector_info[s-1].usage_count == 0) + { + // We know for sure that we won't write into this sector anymore + uint64_t new_ds = journal.sector_info[s-1].offset + journal.block_size; + if ((journal.dirty_start + (journal.dirty_start >= journal.used_start ? 0 : journal.len)) < + (new_ds + (new_ds >= journal.used_start ? 0 : journal.len))) + { + journal.dirty_start = new_ds; + } + } if (s == PRIV(op)->max_flushed_journal_sector) break; s = 1 + s % journal.sector_count;