diff --git a/blockstore.h b/blockstore.h index baf43abb..bc0cd069 100644 --- a/blockstore.h +++ b/blockstore.h @@ -272,6 +272,7 @@ class blockstore uint64_t data_offset, data_size, data_len; bool readonly = false; + // FIXME: separate flags for data, metadata and journal bool disable_fsync = false; bool inmemory_meta = false; void *metadata_buffer = NULL; diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index 1c63ccfb..8d536afd 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -5,7 +5,6 @@ journal_flusher_t::journal_flusher_t(int flusher_count, blockstore *bs) this->bs = bs; this->flusher_count = flusher_count; active_flushers = 0; - active_until_sync = 0; sync_threshold = flusher_count == 1 ? 1 : flusher_count/2; journal_trim_interval = sync_threshold; journal_trim_counter = 0; @@ -133,8 +132,6 @@ bool journal_flusher_co::loop() goto resume_9; else if (wait_state == 10) goto resume_10; - else if (wait_state == 11) - goto resume_11; else if (wait_state == 12) goto resume_12; else if (wait_state == 13) @@ -143,6 +140,12 @@ bool journal_flusher_co::loop() goto resume_14; else if (wait_state == 15) goto resume_15; + else if (wait_state == 16) + goto resume_16; + else if (wait_state == 17) + goto resume_17; + else if (wait_state == 18) + goto resume_18; resume_0: if (!flusher->flush_queue.size()) { @@ -177,7 +180,6 @@ resume_0: #endif dirty_it = dirty_end; flusher->active_flushers++; - flusher->active_until_sync++; v.clear(); wait_count = 0; copy_count = 0; @@ -261,7 +263,6 @@ resume_0: { // Nothing to flush flusher->active_flushers--; - flusher->active_until_sync--; repeat_it = flusher->sync_to_repeat.find(cur.oid); if (repeat_it->second > cur.version) { @@ -339,11 +340,17 @@ resume_0: ); wait_count++; } + // Sync data before writing metadata + resume_16: + resume_17: + resume_18: + if (copy_count && !fsync_batch(false, 16)) + { + wait_state += 16; + return false; + } resume_5: // And metadata writes, but only after data writes complete - // FIXME: Oops. We must sync data before writing metadata. - // Because this will result in more fsyncs we should reduce flushers activity by postponing - // flushes until there is sufficient amount of work to do (for example, at least ~32 operations in the queue). if (!bs->inmemory_meta && meta_new.it->second.state == 0 || wait_count > 0) { // metadata sector is still being read or data is still being written, wait for it @@ -409,14 +416,13 @@ resume_0: free(it->buf); } v.clear(); - // And sync everything (in batches - not per each operation!) - flusher->active_until_sync--; + // And sync metadata (in batches - not per each operation!) resume_8: resume_9: resume_10: - resume_11: - if (!fsync_batch()) + if (!fsync_batch(true, 8)) { + wait_state += 8; return false; } // Update clean_db and dirty_db, free old data locations @@ -568,56 +574,54 @@ void journal_flusher_co::update_clean_db() bs->dirty_db.erase(dirty_it, std::next(dirty_end)); } -bool journal_flusher_co::fsync_batch() +bool journal_flusher_co::fsync_batch(bool fsync_meta, int wait_base) { - if (wait_state == 8) - goto resume_8; - else if (wait_state == 9) - goto resume_9; - else if (wait_state == 10) - goto resume_10; - else if (wait_state == 11) - goto resume_11; + if (wait_state == wait_base) + goto resume_0; + else if (wait_state == wait_base+1) + goto resume_1; + else if (wait_state == wait_base+2) + goto resume_2; if (!bs->disable_fsync) { cur_sync = flusher->syncs.end(); - if (cur_sync == flusher->syncs.begin() || cur_sync->state == 1) - cur_sync = flusher->syncs.emplace(flusher->syncs.end(), (flusher_sync_t){ .ready_count = 0, .state = 0 }); - else + while (cur_sync != flusher->syncs.begin()) + { cur_sync--; + if (cur_sync->fsync_meta == fsync_meta && cur_sync->state == 0) + goto sync_found; + } + cur_sync = flusher->syncs.emplace(flusher->syncs.end(), (flusher_sync_t){ + .fsync_meta = fsync_meta, + .ready_count = 0, + .state = 0, + }); + sync_found: cur_sync->ready_count++; - if (cur_sync->ready_count >= flusher->sync_threshold || - !flusher->active_until_sync && (!flusher->flush_queue.size() || flusher->active_flushers >= flusher->flusher_count)) + if (cur_sync->ready_count >= flusher->sync_threshold || !flusher->flush_queue.size()) { // Sync batch is ready. Do it. - await_sqe(9); + await_sqe(0); data->callback = simple_callback_w; data->iov = { 0 }; - my_uring_prep_fsync(sqe, bs->data_fd, IORING_FSYNC_DATASYNC); + my_uring_prep_fsync(sqe, fsync_meta ? bs->meta_fd : bs->data_fd, IORING_FSYNC_DATASYNC); + cur_sync->state = 1; wait_count++; - if (bs->meta_fd != bs->data_fd) - { - await_sqe(10); - data->callback = simple_callback_w; - data->iov = { 0 }; - my_uring_prep_fsync(sqe, bs->meta_fd, IORING_FSYNC_DATASYNC); - wait_count++; - } - resume_11: + resume_1: if (wait_count > 0) { - wait_state = 11; + wait_state = 1; return false; } // Sync completed. All previous coroutines waiting for it must be resumed - cur_sync->state = 1; + cur_sync->state = 2; bs->ringloop->wakeup(bs->ring_consumer); } // Wait until someone else sends and completes a sync. - resume_8: + resume_2: if (!cur_sync->state) { - wait_state = 8; + wait_state = 2; return false; } cur_sync->ready_count--; diff --git a/blockstore_flush.h b/blockstore_flush.h index eba6c61e..3f5929ad 100644 --- a/blockstore_flush.h +++ b/blockstore_flush.h @@ -14,6 +14,7 @@ struct meta_sector_t struct flusher_sync_t { + bool fsync_meta; int ready_count; int state; }; @@ -50,7 +51,7 @@ class journal_flusher_co friend class journal_flusher_t; bool modify_meta_read(uint64_t meta_loc, flusher_meta_write_t &wr, int wait_base); void update_clean_db(); - bool fsync_batch(); + bool fsync_batch(bool fsync_meta, int wait_base); public: journal_flusher_co(); bool loop(); @@ -68,7 +69,7 @@ class journal_flusher_t int journal_trim_counter, journal_trim_interval; void* journal_superblock; - int active_flushers, active_until_sync; + int active_flushers; std::list syncs; std::map sync_to_repeat;