From f93491bc6c854f00d8cc14e4e3d9a5217ce18bf6 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Wed, 15 Dec 2021 02:43:12 +0300 Subject: [PATCH] Implement journal write batching and slightly refactor journal writes Slightly reduces WA. For example, in 4K T1Q128 replicated randwrite tests WA is reduced from ~3.6 to ~3.1, in T1Q64 from ~3.8 to ~3.4. Only effective without no_same_sector_overwrites. --- src/blockstore_impl.cpp | 6 +++ src/blockstore_impl.h | 17 ++++++-- src/blockstore_journal.cpp | 79 ++++++++++++++++++++++++++++++------- src/blockstore_journal.h | 19 ++++++++- src/blockstore_rollback.cpp | 61 ++++++---------------------- src/blockstore_stable.cpp | 61 ++++++---------------------- src/blockstore_sync.cpp | 69 ++++++-------------------------- src/blockstore_write.cpp | 70 ++++++++++++-------------------- 8 files changed, 163 insertions(+), 219 deletions(-) diff --git a/src/blockstore_impl.cpp b/src/blockstore_impl.cpp index 474cdecb..299e77ec 100644 --- a/src/blockstore_impl.cpp +++ b/src/blockstore_impl.cpp @@ -235,6 +235,12 @@ void blockstore_impl_t::loop() { throw std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret)); } + for (auto s: journal.submitting_sectors) + { + // Mark journal sector writes as submitted + journal.sector_info[s].submit_id = 0; + } + journal.submitting_sectors.clear(); if ((initial_ring_space - ringloop->space_left()) > 0) { live = true; diff --git a/src/blockstore_impl.h b/src/blockstore_impl.h index 4a1edfe4..f1bd6c32 100644 --- a/src/blockstore_impl.h +++ b/src/blockstore_impl.h @@ -54,6 +54,14 @@ #define IS_BIG_WRITE(st) (((st) & 0x0F) == BS_ST_BIG_WRITE) #define IS_DELETE(st) (((st) & 0x0F) == BS_ST_DELETE) +#define BS_SUBMIT_CHECK_SQES(n) \ + if (ringloop->space_left() < (n))\ + {\ + /* Pause until there are more requests available */\ + PRIV(op)->wait_for = WAIT_SQE;\ + return 0;\ + } + #define BS_SUBMIT_GET_SQE(sqe, data) \ BS_SUBMIT_GET_ONLY_SQE(sqe); \ struct ring_data_t *data = ((ring_data_t*)sqe->user_data) @@ -170,7 +178,7 @@ struct blockstore_op_private_t std::vector read_vec; // Sync, write - uint64_t min_flushed_journal_sector, max_flushed_journal_sector; + int min_flushed_journal_sector, max_flushed_journal_sector; // Write struct iovec iov_zerofill[3]; @@ -283,6 +291,10 @@ class blockstore_impl_t void open_journal(); uint8_t* get_clean_entry_bitmap(uint64_t block_loc, int offset); + // Journaling + void prepare_journal_sector_write(int sector, blockstore_op_t *op); + void handle_journal_write(ring_data_t *data, uint64_t flush_id); + // Asynchronous init int initialized; int metadata_buf_size; @@ -310,21 +322,18 @@ class blockstore_impl_t // Sync int continue_sync(blockstore_op_t *op, bool queue_has_in_progress_sync); - void handle_sync_event(ring_data_t *data, blockstore_op_t *op); void ack_sync(blockstore_op_t *op); // Stabilize int dequeue_stable(blockstore_op_t *op); int continue_stable(blockstore_op_t *op); void mark_stable(const obj_ver_id & ov, bool forget_dirty = false); - void handle_stable_event(ring_data_t *data, blockstore_op_t *op); void stabilize_object(object_id oid, uint64_t max_ver); // Rollback int dequeue_rollback(blockstore_op_t *op); int continue_rollback(blockstore_op_t *op); void mark_rolled_back(const obj_ver_id & ov); - void handle_rollback_event(ring_data_t *data, blockstore_op_t *op); void erase_dirty(blockstore_dirty_db_t::iterator dirty_start, blockstore_dirty_db_t::iterator dirty_end, uint64_t clean_loc); // List diff --git a/src/blockstore_journal.cpp b/src/blockstore_journal.cpp index 7ea2adc3..abe07da3 100644 --- a/src/blockstore_journal.cpp +++ b/src/blockstore_journal.cpp @@ -153,22 +153,73 @@ journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, return je; } -void prepare_journal_sector_write(journal_t & journal, int cur_sector, io_uring_sqe *sqe, std::function cb) +void blockstore_impl_t::prepare_journal_sector_write(int cur_sector, blockstore_op_t *op) { + // Don't submit the same sector twice in the same batch + if (!journal.sector_info[cur_sector].submit_id) + { + io_uring_sqe *sqe = get_sqe(); + // Caller must ensure availability of an SQE + assert(sqe != NULL); + ring_data_t *data = ((ring_data_t*)sqe->user_data); + journal.sector_info[cur_sector].written = true; + journal.sector_info[cur_sector].submit_id = ++journal.submit_id; + journal.submitting_sectors.push_back(cur_sector); + journal.sector_info[cur_sector].flush_count++; + data->iov = (struct iovec){ + (journal.inmemory + ? journal.buffer + journal.sector_info[cur_sector].offset + : journal.sector_buf + journal.block_size*cur_sector), + journal.block_size + }; + data->callback = [this, flush_id = journal.submit_id](ring_data_t *data) { handle_journal_write(data, flush_id); }; + my_uring_prep_writev( + sqe, journal.fd, &data->iov, 1, journal.offset + journal.sector_info[cur_sector].offset + ); + } journal.sector_info[cur_sector].dirty = false; - journal.sector_info[cur_sector].written = true; - journal.sector_info[cur_sector].flush_count++; - ring_data_t *data = ((ring_data_t*)sqe->user_data); - data->iov = (struct iovec){ - (journal.inmemory - ? journal.buffer + journal.sector_info[cur_sector].offset - : journal.sector_buf + journal.block_size*cur_sector), - journal.block_size - }; - data->callback = cb; - my_uring_prep_writev( - sqe, journal.fd, &data->iov, 1, journal.offset + journal.sector_info[cur_sector].offset - ); + // But always remember that this operation has to wait until this exact journal write is finished + journal.flushing_ops.insert((pending_journaling_t){ + .flush_id = journal.sector_info[cur_sector].submit_id, + .sector = cur_sector, + .op = op, + }); + auto priv = PRIV(op); + priv->pending_ops++; + if (!priv->min_flushed_journal_sector) + priv->min_flushed_journal_sector = 1+cur_sector; + priv->max_flushed_journal_sector = 1+cur_sector; +} + +void blockstore_impl_t::handle_journal_write(ring_data_t *data, uint64_t flush_id) +{ + live = true; + if (data->res != data->iov.iov_len) + { + // FIXME: our state becomes corrupted after a write error. maybe do something better than just die + throw std::runtime_error( + "journal write failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+ + "). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111" + ); + } + auto fl_it = journal.flushing_ops.upper_bound((pending_journaling_t){ .flush_id = flush_id }); + if (fl_it != journal.flushing_ops.end() && fl_it->flush_id == flush_id) + { + journal.sector_info[fl_it->sector].flush_count--; + } + while (fl_it != journal.flushing_ops.end() && fl_it->flush_id == flush_id) + { + auto priv = PRIV(fl_it->op); + priv->pending_ops--; + assert(priv->pending_ops >= 0); + if (priv->pending_ops == 0) + { + release_journal_sectors(fl_it->op); + priv->op_state++; + ringloop->wakeup(); + } + journal.flushing_ops.erase(fl_it++); + } } journal_t::~journal_t() diff --git a/src/blockstore_journal.h b/src/blockstore_journal.h index 927ead75..0f401ff1 100644 --- a/src/blockstore_journal.h +++ b/src/blockstore_journal.h @@ -4,6 +4,7 @@ #pragma once #include "crc32c.h" +#include #define MIN_JOURNAL_SIZE 4*1024*1024 #define JOURNAL_MAGIC 0x4A33 @@ -145,8 +146,21 @@ struct journal_sector_info_t uint64_t flush_count; bool written; bool dirty; + uint64_t submit_id; }; +struct pending_journaling_t +{ + uint64_t flush_id; + int sector; + blockstore_op_t *op; +}; + +inline bool operator < (const pending_journaling_t & a, const pending_journaling_t & b) +{ + return a.flush_id < b.flush_id || a.flush_id == b.flush_id && a.op < b.op; +} + struct journal_t { int fd; @@ -172,6 +186,9 @@ struct journal_t bool no_same_sector_overwrites = false; int cur_sector = 0; int in_sector_pos = 0; + std::vector submitting_sectors; + std::set flushing_ops; + uint64_t submit_id = 0; // Used sector map // May use ~ 80 MB per 1 GB of used journal space in the worst case @@ -200,5 +217,3 @@ struct blockstore_journal_check_t }; journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, uint32_t size); - -void prepare_journal_sector_write(journal_t & journal, int sector, io_uring_sqe *sqe, std::function cb); diff --git a/src/blockstore_rollback.cpp b/src/blockstore_rollback.cpp index ed17a1d4..3349c761 100644 --- a/src/blockstore_rollback.cpp +++ b/src/blockstore_rollback.cpp @@ -74,24 +74,17 @@ skip_ov: { return 0; } - // There is sufficient space. Get SQEs - struct io_uring_sqe *sqe[space_check.sectors_to_write]; - for (i = 0; i < space_check.sectors_to_write; i++) - { - BS_SUBMIT_GET_SQE_DECL(sqe[i]); - } + // There is sufficient space. Check SQEs + BS_SUBMIT_CHECK_SQES(space_check.sectors_to_write); // Prepare and submit journal entries - auto cb = [this, op](ring_data_t *data) { handle_rollback_event(data, op); }; - int s = 0, cur_sector = -1; + int s = 0; for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) { if (!journal.entry_fits(sizeof(journal_entry_rollback)) && journal.sector_info[journal.cur_sector].dirty) { - if (cur_sector == -1) - PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; - prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb); - cur_sector = journal.cur_sector; + prepare_journal_sector_write(journal.cur_sector, op); + s++; } journal_entry_rollback *je = (journal_entry_rollback*) prefill_single_journal_entry(journal, JE_ROLLBACK, sizeof(journal_entry_rollback)); @@ -100,12 +93,9 @@ skip_ov: je->crc32 = je_crc32((journal_entry*)je); journal.crc32_last = je->crc32; } - prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb); + prepare_journal_sector_write(journal.cur_sector, op); + s++; assert(s == space_check.sectors_to_write); - if (cur_sector == -1) - PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; - PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; - PRIV(op)->pending_ops = s; PRIV(op)->op_state = 1; return 1; } @@ -114,30 +104,23 @@ int blockstore_impl_t::continue_rollback(blockstore_op_t *op) { if (PRIV(op)->op_state == 2) goto resume_2; - else if (PRIV(op)->op_state == 3) - goto resume_3; - else if (PRIV(op)->op_state == 5) - goto resume_5; + else if (PRIV(op)->op_state == 4) + goto resume_4; else return 1; resume_2: - // Release used journal sectors - release_journal_sectors(op); -resume_3: if (!disable_journal_fsync) { - io_uring_sqe *sqe; - BS_SUBMIT_GET_SQE_DECL(sqe); - ring_data_t *data = ((ring_data_t*)sqe->user_data); + BS_SUBMIT_GET_SQE(sqe, data); my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC); data->iov = { 0 }; - data->callback = [this, op](ring_data_t *data) { handle_rollback_event(data, op); }; + data->callback = [this, op](ring_data_t *data) { handle_write_event(data, op); }; PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0; PRIV(op)->pending_ops = 1; - PRIV(op)->op_state = 4; + PRIV(op)->op_state = 3; return 1; } -resume_5: +resume_4: obj_ver_id* v; int i; for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) @@ -196,24 +179,6 @@ void blockstore_impl_t::mark_rolled_back(const obj_ver_id & ov) } } -void blockstore_impl_t::handle_rollback_event(ring_data_t *data, blockstore_op_t *op) -{ - live = true; - if (data->res != data->iov.iov_len) - { - throw std::runtime_error( - "write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+ - "). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111" - ); - } - PRIV(op)->pending_ops--; - if (PRIV(op)->pending_ops == 0) - { - PRIV(op)->op_state++; - ringloop->wakeup(); - } -} - void blockstore_impl_t::erase_dirty(blockstore_dirty_db_t::iterator dirty_start, blockstore_dirty_db_t::iterator dirty_end, uint64_t clean_loc) { if (dirty_end == dirty_start) diff --git a/src/blockstore_stable.cpp b/src/blockstore_stable.cpp index ba0c552a..230023bc 100644 --- a/src/blockstore_stable.cpp +++ b/src/blockstore_stable.cpp @@ -97,25 +97,18 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) { return 0; } - // There is sufficient space. Get SQEs - struct io_uring_sqe *sqe[space_check.sectors_to_write]; - for (i = 0; i < space_check.sectors_to_write; i++) - { - BS_SUBMIT_GET_SQE_DECL(sqe[i]); - } + // There is sufficient space. Check SQEs + BS_SUBMIT_CHECK_SQES(space_check.sectors_to_write); // Prepare and submit journal entries - auto cb = [this, op](ring_data_t *data) { handle_stable_event(data, op); }; - int s = 0, cur_sector = -1; + int s = 0; for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) { // FIXME: Only stabilize versions that aren't stable yet if (!journal.entry_fits(sizeof(journal_entry_stable)) && journal.sector_info[journal.cur_sector].dirty) { - if (cur_sector == -1) - PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; - prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb); - cur_sector = journal.cur_sector; + prepare_journal_sector_write(journal.cur_sector, op); + s++; } journal_entry_stable *je = (journal_entry_stable*) prefill_single_journal_entry(journal, JE_STABLE, sizeof(journal_entry_stable)); @@ -124,12 +117,9 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) je->crc32 = je_crc32((journal_entry*)je); journal.crc32_last = je->crc32; } - prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb); + prepare_journal_sector_write(journal.cur_sector, op); + s++; assert(s == space_check.sectors_to_write); - if (cur_sector == -1) - PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; - PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; - PRIV(op)->pending_ops = s; PRIV(op)->op_state = 1; return 1; } @@ -138,30 +128,23 @@ int blockstore_impl_t::continue_stable(blockstore_op_t *op) { if (PRIV(op)->op_state == 2) goto resume_2; - else if (PRIV(op)->op_state == 3) - goto resume_3; - else if (PRIV(op)->op_state == 5) - goto resume_5; + else if (PRIV(op)->op_state == 4) + goto resume_4; else return 1; resume_2: - // Release used journal sectors - release_journal_sectors(op); -resume_3: if (!disable_journal_fsync) { - io_uring_sqe *sqe; - BS_SUBMIT_GET_SQE_DECL(sqe); - ring_data_t *data = ((ring_data_t*)sqe->user_data); + BS_SUBMIT_GET_SQE(sqe, data); my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC); data->iov = { 0 }; - data->callback = [this, op](ring_data_t *data) { handle_stable_event(data, op); }; + data->callback = [this, op](ring_data_t *data) { handle_write_event(data, op); }; PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0; PRIV(op)->pending_ops = 1; - PRIV(op)->op_state = 4; + PRIV(op)->op_state = 3; return 1; } -resume_5: +resume_4: // Mark dirty_db entries as stable, acknowledge op completion obj_ver_id* v; int i; @@ -257,21 +240,3 @@ void blockstore_impl_t::mark_stable(const obj_ver_id & v, bool forget_dirty) unstable_writes.erase(unstab_it); } } - -void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t *op) -{ - live = true; - if (data->res != data->iov.iov_len) - { - throw std::runtime_error( - "write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+ - "). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111" - ); - } - PRIV(op)->pending_ops--; - if (PRIV(op)->pending_ops == 0) - { - PRIV(op)->op_state++; - ringloop->wakeup(); - } -} diff --git a/src/blockstore_sync.cpp b/src/blockstore_sync.cpp index 5c939b08..ce238780 100644 --- a/src/blockstore_sync.cpp +++ b/src/blockstore_sync.cpp @@ -44,10 +44,8 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog if (journal.sector_info[journal.cur_sector].dirty) { // Write out the last journal sector if it happens to be dirty - BS_SUBMIT_GET_ONLY_SQE(sqe); - prepare_journal_sector_write(journal, journal.cur_sector, sqe, [this, op](ring_data_t *data) { handle_sync_event(data, op); }); - PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; - PRIV(op)->pending_ops = 1; + BS_SUBMIT_CHECK_SQES(1); + prepare_journal_sector_write(journal.cur_sector, op); PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT; return 1; } @@ -64,7 +62,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog BS_SUBMIT_GET_SQE(sqe, data); my_uring_prep_fsync(sqe, data_fd, IORING_FSYNC_DATASYNC); data->iov = { 0 }; - data->callback = [this, op](ring_data_t *data) { handle_sync_event(data, op); }; + data->callback = [this, op](ring_data_t *data) { handle_write_event(data, op); }; PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0; PRIV(op)->pending_ops = 1; PRIV(op)->op_state = SYNC_DATA_SYNC_SENT; @@ -85,24 +83,18 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog { return 0; } - // Get SQEs. Don't bother about merging, submit each journal sector as a separate request - struct io_uring_sqe *sqe[space_check.sectors_to_write]; - for (int i = 0; i < space_check.sectors_to_write; i++) - { - BS_SUBMIT_GET_SQE_DECL(sqe[i]); - } + // Check SQEs. Don't bother about merging, submit each journal sector as a separate request + BS_SUBMIT_CHECK_SQES(space_check.sectors_to_write); // Prepare and submit journal entries auto it = PRIV(op)->sync_big_writes.begin(); - int s = 0, cur_sector = -1; + int s = 0; while (it != PRIV(op)->sync_big_writes.end()) { if (!journal.entry_fits(sizeof(journal_entry_big_write) + clean_entry_bitmap_size) && journal.sector_info[journal.cur_sector].dirty) { - if (cur_sector == -1) - PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; - prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], [this, op](ring_data_t *data) { handle_sync_event(data, op); }); - cur_sector = journal.cur_sector; + prepare_journal_sector_write(journal.cur_sector, op); + s++; } auto & dirty_entry = dirty_db.at(*it); journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry( @@ -129,12 +121,9 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog journal.crc32_last = je->crc32; it++; } - prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], [this, op](ring_data_t *data) { handle_sync_event(data, op); }); + prepare_journal_sector_write(journal.cur_sector, op); + s++; assert(s == space_check.sectors_to_write); - if (cur_sector == -1) - PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; - PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; - PRIV(op)->pending_ops = s; PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT; return 1; } @@ -145,7 +134,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog BS_SUBMIT_GET_SQE(sqe, data); my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC); data->iov = { 0 }; - data->callback = [this, op](ring_data_t *data) { handle_sync_event(data, op); }; + data->callback = [this, op](ring_data_t *data) { handle_write_event(data, op); }; PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0; PRIV(op)->pending_ops = 1; PRIV(op)->op_state = SYNC_JOURNAL_SYNC_SENT; @@ -164,42 +153,6 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog return 1; } -void blockstore_impl_t::handle_sync_event(ring_data_t *data, blockstore_op_t *op) -{ - live = true; - if (data->res != data->iov.iov_len) - { - throw std::runtime_error( - "write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+ - "). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111" - ); - } - PRIV(op)->pending_ops--; - if (PRIV(op)->pending_ops == 0) - { - // Release used journal sectors - release_journal_sectors(op); - // Handle states - if (PRIV(op)->op_state == SYNC_DATA_SYNC_SENT) - { - PRIV(op)->op_state = SYNC_DATA_SYNC_DONE; - } - else if (PRIV(op)->op_state == SYNC_JOURNAL_WRITE_SENT) - { - PRIV(op)->op_state = SYNC_JOURNAL_WRITE_DONE; - } - else if (PRIV(op)->op_state == SYNC_JOURNAL_SYNC_SENT) - { - PRIV(op)->op_state = SYNC_DONE; - } - else - { - throw std::runtime_error("BUG: unexpected sync op state"); - } - ringloop->wakeup(); - } -} - void blockstore_impl_t::ack_sync(blockstore_op_t *op) { // Handle states diff --git a/src/blockstore_write.cpp b/src/blockstore_write.cpp index 07a3b61f..dea0beac 100644 --- a/src/blockstore_write.cpp +++ b/src/blockstore_write.cpp @@ -268,8 +268,8 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) cancel_all_writes(op, dirty_it, -ENOSPC); return 2; } - write_iodepth++; BS_SUBMIT_GET_SQE(sqe, data); + write_iodepth++; dirty_it->second.location = loc << block_order; dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SUBMITTED; #ifdef BLOCKSTORE_DEBUG @@ -324,29 +324,21 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) { return 0; } - write_iodepth++; - // There is sufficient space. Get SQE(s) - struct io_uring_sqe *sqe1 = NULL; - if (immediate_commit != IMMEDIATE_NONE || - !journal.entry_fits(sizeof(journal_entry_small_write) + clean_entry_bitmap_size)) - { + // There is sufficient space. Check SQE(s) + BS_SUBMIT_CHECK_SQES( // Write current journal sector only if it's dirty and full, or in the immediate_commit mode - BS_SUBMIT_GET_SQE_DECL(sqe1); - } - struct io_uring_sqe *sqe2 = NULL; - if (op->len > 0) - { - BS_SUBMIT_GET_SQE_DECL(sqe2); - } + (immediate_commit != IMMEDIATE_NONE || + !journal.entry_fits(sizeof(journal_entry_small_write) + clean_entry_bitmap_size) ? 1 : 0) + + (op->len > 0 ? 1 : 0) + ); + write_iodepth++; // Got SQEs. Prepare previous journal sector write if required auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); }; if (immediate_commit == IMMEDIATE_NONE) { - if (sqe1) + if (!journal.entry_fits(sizeof(journal_entry_small_write) + clean_entry_bitmap_size)) { - prepare_journal_sector_write(journal, journal.cur_sector, sqe1, cb); - PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; - PRIV(op)->pending_ops++; + prepare_journal_sector_write(journal.cur_sector, op); } else { @@ -380,9 +372,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) journal.crc32_last = je->crc32; if (immediate_commit != IMMEDIATE_NONE) { - prepare_journal_sector_write(journal, journal.cur_sector, sqe1, cb); - PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; - PRIV(op)->pending_ops++; + prepare_journal_sector_write(journal.cur_sector, op); } if (op->len > 0) { @@ -392,7 +382,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) // Copy data memcpy(journal.buffer + journal.next_free, op->buf, op->len); } - ring_data_t *data2 = ((ring_data_t*)sqe2->user_data); + BS_SUBMIT_GET_SQE(sqe2, data2); data2->iov = (struct iovec){ op->buf, op->len }; data2->callback = cb; my_uring_prep_writev( @@ -441,13 +431,12 @@ int blockstore_impl_t::continue_write(blockstore_op_t *op) resume_2: // Only for the immediate_commit mode: prepare and submit big_write journal entry { + BS_SUBMIT_CHECK_SQES(1); auto dirty_it = dirty_db.find((obj_ver_id){ .oid = op->oid, .version = op->version, }); assert(dirty_it != dirty_db.end()); - io_uring_sqe *sqe = NULL; - BS_SUBMIT_GET_SQE_DECL(sqe); journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry( journal, op->opcode == BS_OP_WRITE_STABLE ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE, sizeof(journal_entry_big_write) + clean_entry_bitmap_size @@ -469,10 +458,7 @@ resume_2: memcpy((void*)(je+1), (clean_entry_bitmap_size > sizeof(void*) ? dirty_it->second.bitmap : &dirty_it->second.bitmap), clean_entry_bitmap_size); je->crc32 = je_crc32((journal_entry*)je); journal.crc32_last = je->crc32; - prepare_journal_sector_write(journal, journal.cur_sector, sqe, - [this, op](ring_data_t *data) { handle_write_event(data, op); }); - PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; - PRIV(op)->pending_ops = 1; + prepare_journal_sector_write(journal.cur_sector, op); PRIV(op)->op_state = 3; return 1; } @@ -587,6 +573,7 @@ void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *o ); } PRIV(op)->pending_ops--; + assert(PRIV(op)->pending_ops >= 0); if (PRIV(op)->pending_ops == 0) { release_journal_sectors(op); @@ -604,7 +591,6 @@ void blockstore_impl_t::release_journal_sectors(blockstore_op_t *op) uint64_t s = PRIV(op)->min_flushed_journal_sector; while (1) { - journal.sector_info[s-1].flush_count--; if (s != (1+journal.cur_sector) && journal.sector_info[s-1].flush_count == 0) { // We know for sure that we won't write into this sector anymore @@ -644,23 +630,19 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op) return 0; } write_iodepth++; - io_uring_sqe *sqe = NULL; - if (immediate_commit != IMMEDIATE_NONE || - (journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_del) && - journal.sector_info[journal.cur_sector].dirty) - { - // Write current journal sector only if it's dirty and full, or in the immediate_commit mode - BS_SUBMIT_GET_SQE_DECL(sqe); - } - auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); }; + // Write current journal sector only if it's dirty and full, or in the immediate_commit mode + BS_SUBMIT_CHECK_SQES( + (immediate_commit != IMMEDIATE_NONE || + (journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_del) && + journal.sector_info[journal.cur_sector].dirty) ? 1 : 0 + ); // Prepare journal sector write if (immediate_commit == IMMEDIATE_NONE) { - if (sqe) + if ((journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_del) && + journal.sector_info[journal.cur_sector].dirty) { - prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb); - PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; - PRIV(op)->pending_ops++; + prepare_journal_sector_write(journal.cur_sector, op); } else { @@ -687,9 +669,7 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op) dirty_it->second.state = BS_ST_DELETE | BS_ST_SUBMITTED; if (immediate_commit != IMMEDIATE_NONE) { - prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb); - PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; - PRIV(op)->pending_ops++; + prepare_journal_sector_write(journal.cur_sector, op); } if (!PRIV(op)->pending_ops) {