From 1e286eed087e83cc5c0535db0ac6ce54cef89269 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Wed, 29 Jan 2020 16:40:11 +0300 Subject: [PATCH] Wait for writes to complete before issuing an fsync in blockstore_sync Also fix a dormant bug (OP_SYNC could clear unsynced_*_writes and not be added into syncs_in_progress) --- blockstore_impl.h | 1 + blockstore_sync.cpp | 114 ++++++++++++++++++++++++-------------------- 2 files changed, 64 insertions(+), 51 deletions(-) diff --git a/blockstore_impl.h b/blockstore_impl.h index 721c4d76f..b9cd175dd 100644 --- a/blockstore_impl.h +++ b/blockstore_impl.h @@ -163,6 +163,7 @@ struct blockstore_op_private_t // Sync std::vector sync_big_writes, sync_small_writes; + int sync_small_checked, sync_big_checked; std::list::iterator in_progress_ptr; int sync_state, prev_sync_count; }; diff --git a/blockstore_sync.cpp b/blockstore_sync.cpp index 1832de11b..87ed5faa0 100644 --- a/blockstore_sync.cpp +++ b/blockstore_sync.cpp @@ -4,8 +4,10 @@ #define SYNC_HAS_BIG 2 #define SYNC_DATA_SYNC_SENT 3 #define SYNC_DATA_SYNC_DONE 4 -#define SYNC_JOURNAL_SYNC_SENT 5 -#define SYNC_DONE 6 +#define SYNC_JOURNAL_WRITE_SENT 5 +#define SYNC_JOURNAL_WRITE_DONE 6 +#define SYNC_JOURNAL_SYNC_SENT 7 +#define SYNC_DONE 8 int blockstore_impl_t::dequeue_sync(blockstore_op_t *op) { @@ -14,23 +16,25 @@ int blockstore_impl_t::dequeue_sync(blockstore_op_t *op) stop_sync_submitted = false; PRIV(op)->sync_big_writes.swap(unsynced_big_writes); PRIV(op)->sync_small_writes.swap(unsynced_small_writes); + unsynced_big_writes.clear(); + unsynced_small_writes.clear(); if (PRIV(op)->sync_big_writes.size() > 0) PRIV(op)->sync_state = SYNC_HAS_BIG; else if (PRIV(op)->sync_small_writes.size() > 0) PRIV(op)->sync_state = SYNC_HAS_SMALL; else PRIV(op)->sync_state = SYNC_DONE; - unsynced_big_writes.clear(); - unsynced_small_writes.clear(); + // Always add sync to in_progress_syncs because we clear unsynced_big_writes and unsynced_small_writes + PRIV(op)->prev_sync_count = in_progress_syncs.size(); + PRIV(op)->in_progress_ptr = in_progress_syncs.insert(in_progress_syncs.end(), op); } int r = continue_sync(op); if (r) { - PRIV(op)->prev_sync_count = in_progress_syncs.size(); - PRIV(op)->in_progress_ptr = in_progress_syncs.insert(in_progress_syncs.end(), op); ack_sync(op); } - return r; + // Always dequeue because we always add syncs to in_progress_syncs + return 1; } int blockstore_impl_t::continue_sync(blockstore_op_t *op) @@ -39,51 +43,42 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) if (PRIV(op)->sync_state == SYNC_HAS_SMALL) { // No big writes, just fsync the journal - int n_sqes = disable_journal_fsync ? 0 : 1; + for (; PRIV(op)->sync_small_checked < PRIV(op)->sync_small_writes.size(); PRIV(op)->sync_small_checked++) + { + if (IS_IN_FLIGHT(dirty_db[PRIV(op)->sync_small_writes[PRIV(op)->sync_small_checked]].state)) + { + // Wait for small inflight writes to complete + return 0; + } + } if (journal.sector_info[journal.cur_sector].dirty) { - n_sqes++; - } - if (n_sqes > 0) - { - io_uring_sqe* sqes[n_sqes]; - for (int i = 0; i < n_sqes; i++) - { - BS_SUBMIT_GET_SQE_DECL(sqes[i]); - } - int s = 0; - if (journal.sector_info[journal.cur_sector].dirty) - { - prepare_journal_sector_write(journal, journal.cur_sector, sqes[s++], cb); - PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector; - } - else - { - PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0; - } - if (!disable_journal_fsync) - { - // FIXME: Wait for completion of writes before issuing an fsync - // Fsync and write requests posted at the same time can be reordered - ring_data_t *data = ((ring_data_t*)sqes[s]->user_data); - my_uring_prep_fsync(sqes[s++], journal.fd, IORING_FSYNC_DATASYNC); - data->iov = { 0 }; - data->callback = cb; - } - PRIV(op)->pending_ops = s; - PRIV(op)->sync_state = SYNC_JOURNAL_SYNC_SENT; + // Write out the last journal sector if it happens to be dirty + BS_SUBMIT_GET_ONLY_SQE(sqe); + prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb); + PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector; + PRIV(op)->pending_ops = 1; + PRIV(op)->sync_state = SYNC_JOURNAL_WRITE_SENT; + return 1; } else { - PRIV(op)->sync_state = SYNC_DONE; + PRIV(op)->sync_state = SYNC_JOURNAL_WRITE_DONE; } } - else if (PRIV(op)->sync_state == SYNC_HAS_BIG) + if (PRIV(op)->sync_state == SYNC_HAS_BIG) { + for (; PRIV(op)->sync_big_checked < PRIV(op)->sync_big_writes.size(); PRIV(op)->sync_big_checked++) + { + if (IS_IN_FLIGHT(dirty_db[PRIV(op)->sync_big_writes[PRIV(op)->sync_big_checked]].state)) + { + // Wait for big inflight writes to complete + return 0; + } + } // 1st step: fsync data if (!disable_data_fsync) { - // FIXME Wait for completion of writes before issuing an fsync BS_SUBMIT_GET_SQE(sqe, data); my_uring_prep_fsync(sqe, data_fd, IORING_FSYNC_DATASYNC); data->iov = { 0 }; @@ -91,6 +86,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0; PRIV(op)->pending_ops = 1; PRIV(op)->sync_state = SYNC_DATA_SYNC_SENT; + return 1; } else { @@ -99,6 +95,14 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) } if (PRIV(op)->sync_state == SYNC_DATA_SYNC_DONE) { + for (; PRIV(op)->sync_small_checked < PRIV(op)->sync_small_writes.size(); PRIV(op)->sync_small_checked++) + { + if (IS_IN_FLIGHT(dirty_db[PRIV(op)->sync_small_writes[PRIV(op)->sync_small_checked]].state)) + { + // Wait for small inflight writes to complete + return 0; + } + } // 2nd step: Data device is synced, prepare & write journal entries // Check space in the journal and journal memory buffers blockstore_journal_check_t space_check(this); @@ -107,8 +111,8 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) return 0; } // Get SQEs. Don't bother about merging, submit each journal sector as a separate request - struct io_uring_sqe *sqe[space_check.sectors_required + (disable_journal_fsync ? 0 : 1)]; - for (int i = 0; i < space_check.sectors_required + (disable_journal_fsync ? 0 : 1); i++) + struct io_uring_sqe *sqe[space_check.sectors_required]; + for (int i = 0; i < space_check.sectors_required; i++) { BS_SUBMIT_GET_SQE_DECL(sqe[i]); } @@ -150,22 +154,26 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) } } PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector; - // ... And a journal fsync + PRIV(op)->pending_ops = s; + PRIV(op)->sync_state = SYNC_JOURNAL_WRITE_SENT; + return 1; + } + if (PRIV(op)->sync_state == SYNC_JOURNAL_WRITE_DONE) + { if (!disable_journal_fsync) { - // FIXME Wait for completion of writes before issuing an fsync - my_uring_prep_fsync(sqe[s], journal.fd, IORING_FSYNC_DATASYNC); - struct ring_data_t *data = ((ring_data_t*)sqe[s]->user_data); + BS_SUBMIT_GET_SQE(sqe, data); + my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC); data->iov = { 0 }; data->callback = cb; - PRIV(op)->pending_ops = 1 + s; + PRIV(op)->pending_ops = 1; + PRIV(op)->sync_state = SYNC_JOURNAL_SYNC_SENT; + return 1; } else { - PRIV(op)->pending_ops = s; + PRIV(op)->sync_state = SYNC_DONE; } - PRIV(op)->sync_state = SYNC_JOURNAL_SYNC_SENT; - ringloop->submit(); } return 1; } @@ -190,6 +198,10 @@ void blockstore_impl_t::handle_sync_event(ring_data_t *data, blockstore_op_t *op { PRIV(op)->sync_state = SYNC_DATA_SYNC_DONE; } + else if (PRIV(op)->sync_state == SYNC_JOURNAL_WRITE_SENT) + { + PRIV(op)->sync_state = SYNC_JOURNAL_WRITE_DONE; + } else if (PRIV(op)->sync_state == SYNC_JOURNAL_SYNC_SENT) { PRIV(op)->sync_state = SYNC_DONE;