diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index ea8ec885..9f98c060 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -7,8 +7,8 @@ journal_flusher_t::journal_flusher_t(int flusher_count, blockstore_impl_t *bs) dequeuing = false; active_flushers = 0; syncing_flushers = 0; - sync_threshold = bs->journal_block_size / sizeof(journal_entry_stable); - journal_trim_interval = sync_threshold; + flusher_start_threshold = bs->journal_block_size / sizeof(journal_entry_stable); + journal_trim_interval = flusher_start_threshold; journal_trim_counter = 0; journal_superblock = bs->journal.inmemory ? bs->journal.buffer : memalign(MEM_ALIGNMENT, bs->journal_block_size); co = new journal_flusher_co[flusher_count]; @@ -81,7 +81,7 @@ void journal_flusher_t::enqueue_flush(obj_ver_id ov) flush_versions[ov.oid] = ov.version; flush_queue.push_back(ov.oid); } - if (!dequeuing && flush_queue.size() >= sync_threshold) + if (!dequeuing && flush_queue.size() >= flusher_start_threshold) { dequeuing = true; bs->ringloop->wakeup(); @@ -101,19 +101,25 @@ void journal_flusher_t::unshift_flush(obj_ver_id ov) flush_versions[ov.oid] = ov.version; flush_queue.push_front(ov.oid); } - if (!dequeuing && flush_queue.size() >= sync_threshold) + if (!dequeuing && flush_queue.size() >= flusher_start_threshold) { dequeuing = true; bs->ringloop->wakeup(); } } -void journal_flusher_t::force_start() +void journal_flusher_t::request_trim() { dequeuing = true; + trim_wanted++; bs->ringloop->wakeup(); } +void journal_flusher_t::release_trim() +{ + trim_wanted--; +} + #define await_sqe(label) \ resume_##label:\ sqe = bs->get_sqe();\ @@ -181,14 +187,71 @@ resume_0: (bs->journal.dirty_start >= bs->journal.used_start || dirty_end->second.journal_sector < bs->journal.used_start)) { - // We can't flush journal sectors that are still written to -#ifdef BLOCKSTORE_DEBUG - printf("Flusher overran writers - stopping\n"); -#endif flusher->enqueue_flush(cur); - flusher->dequeuing = false; - wait_state = 0; - return true; + // We can't flush journal sectors that are still written to + // However, as we group flushes by oid, current oid may have older writes to flush! + // And it may even block writes if we don't flush the older version + // (if it's in the beginning of the journal)... + // So first try to find an older version of the same object to flush. + bool found = false; + while (dirty_end != bs->dirty_db.begin()) + { + dirty_end--; + if (dirty_end->first.oid != cur.oid) + { + break; + } + if (!(dirty_end->second.journal_sector >= bs->journal.dirty_start && + (bs->journal.dirty_start >= bs->journal.used_start || + dirty_end->second.journal_sector < bs->journal.used_start))) + { + found = true; + cur.version = dirty_end->first.version; + break; + } + } + if (!found) + { + // Try other objects + int search_left = flusher->flush_queue.size() - 1; +#ifdef BLOCKSTORE_DEBUG + printf("Flusher overran writers (dirty_start=%08lx) - searching for older flushes (%d left)\n", bs->journal.dirty_start, search_left); +#endif + while (search_left > 0) + { + cur.oid = flusher->flush_queue.front(); + cur.version = flusher->flush_versions[cur.oid]; + flusher->flush_queue.pop_front(); + flusher->flush_versions.erase(cur.oid); + dirty_end = bs->dirty_db.find(cur); + if (dirty_end != bs->dirty_db.end()) + { + if (dirty_end->second.journal_sector >= bs->journal.dirty_start && + (bs->journal.dirty_start >= bs->journal.used_start || + dirty_end->second.journal_sector < bs->journal.used_start)) + { +#ifdef BLOCKSTORE_DEBUG + printf("Write %lu:%lu v%lu is too new: offset=%08lx\n", cur.oid.inode, cur.oid.stripe, cur.version, dirty_end->second.journal_sector); +#endif + flusher->enqueue_flush(cur); + } + else + { + break; + } + } + search_left--; + } + if (search_left <= 0) + { +#ifdef BLOCKSTORE_DEBUG + printf("No older flushes, stopping\n"); +#endif + flusher->dequeuing = false; + wait_state = 0; + return true; + } + } } repeat_it = flusher->sync_to_repeat.find(cur.oid); if (repeat_it != flusher->sync_to_repeat.end()) @@ -409,7 +472,7 @@ resume_1: // Update clean_db and dirty_db, free old data locations update_clean_db(); // Clear unused part of the journal every flushes - if (!((++flusher->journal_trim_counter) % flusher->journal_trim_interval)) + if (!((++flusher->journal_trim_counter) % flusher->journal_trim_interval) || flusher->trim_wanted > 0) { flusher->journal_trim_counter = 0; if (bs->journal.trim()) diff --git a/blockstore_flush.h b/blockstore_flush.h index a09bc6f4..8705567f 100644 --- a/blockstore_flush.h +++ b/blockstore_flush.h @@ -73,9 +73,10 @@ public: // Journal flusher itself class journal_flusher_t { + int trim_wanted = 0; bool dequeuing; int flusher_count; - int sync_threshold; + int flusher_start_threshold; journal_flusher_co *co; blockstore_impl_t *bs; friend class journal_flusher_co; @@ -96,7 +97,8 @@ public: ~journal_flusher_t(); void loop(); bool is_active(); - void force_start(); + void request_trim(); + void release_trim(); void enqueue_flush(obj_ver_id oid); void unshift_flush(obj_ver_id oid); }; diff --git a/blockstore_impl.cpp b/blockstore_impl.cpp index bff05d94..06f86e2b 100644 --- a/blockstore_impl.cpp +++ b/blockstore_impl.cpp @@ -282,6 +282,7 @@ void blockstore_impl_t::check_wait(blockstore_op_t *op) #endif return; } + flusher->release_trim(); PRIV(op)->wait_for = 0; } else if (PRIV(op)->wait_for == WAIT_JOURNAL_BUFFER) diff --git a/blockstore_journal.cpp b/blockstore_journal.cpp index feab013c..41d1c423 100644 --- a/blockstore_journal.cpp +++ b/blockstore_journal.cpp @@ -101,7 +101,7 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int entries : bs->journal.used_start - bs->journal.next_free) ); PRIV(op)->wait_for = WAIT_JOURNAL; - bs->flusher->force_start(); + bs->flusher->request_trim(); PRIV(op)->wait_detail = bs->journal.used_start; return 0; } diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 0b085184..bcbd6da5 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -411,6 +411,10 @@ void blockstore_impl_t::release_journal_sectors(blockstore_op_t *op) { // We know for sure that we won't write into this sector anymore uint64_t new_ds = journal.sector_info[s-1].offset + journal.block_size; + if (new_ds >= journal.len) + { + new_ds = journal.block_size; + } if ((journal.dirty_start + (journal.dirty_start >= journal.used_start ? 0 : journal.len)) < (new_ds + (new_ds >= journal.used_start ? 0 : journal.len))) {