diff --git a/src/blockstore_journal.cpp b/src/blockstore_journal.cpp index 687bee82..78cad803 100644 --- a/src/blockstore_journal.cpp +++ b/src/blockstore_journal.cpp @@ -177,6 +177,7 @@ void blockstore_impl_t::prepare_journal_sector_write(int cur_sector, blockstore_ ring_data_t *data = ((ring_data_t*)sqe->user_data); journal.sector_info[cur_sector].written = true; journal.sector_info[cur_sector].submit_id = ++journal.submit_id; + assert(journal.submit_id != 0); // check overflow journal.submitting_sectors.push_back(cur_sector); journal.sector_info[cur_sector].flush_count++; data->iov = (struct iovec){ @@ -192,8 +193,8 @@ void blockstore_impl_t::prepare_journal_sector_write(int cur_sector, blockstore_ } journal.sector_info[cur_sector].dirty = false; // But always remember that this operation has to wait until this exact journal write is finished - journal.flushing_ops.insert((pending_journaling_t){ - .flush_id = journal.sector_info[cur_sector].submit_id, + journal.flushing_ops.emplace(journal.sector_info[cur_sector].submit_id, (pending_journaling_t){ + .pending = 1, .sector = cur_sector, .op = op, }); @@ -213,23 +214,43 @@ void blockstore_impl_t::handle_journal_write(ring_data_t *data, uint64_t flush_i // FIXME: our state becomes corrupted after a write error. maybe do something better than just die disk_error_abort("journal write", data->res, data->iov.iov_len); } - auto fl_it = journal.flushing_ops.upper_bound((pending_journaling_t){ .flush_id = flush_id }); - if (fl_it != journal.flushing_ops.end() && fl_it->flush_id == flush_id) + auto fl_it = journal.flushing_ops.lower_bound(flush_id); + if (fl_it != journal.flushing_ops.end() && fl_it->first == flush_id && fl_it->second.sector >= 0) { - journal.sector_info[fl_it->sector].flush_count--; + journal.sector_info[fl_it->second.sector].flush_count--; } - while (fl_it != journal.flushing_ops.end() && fl_it->flush_id == flush_id) + auto is_first = fl_it == journal.flushing_ops.begin(); + while (fl_it != journal.flushing_ops.end()) { - auto priv = PRIV(fl_it->op); - priv->pending_ops--; - assert(priv->pending_ops >= 0); - if (priv->pending_ops == 0) + bool del = false; + if (fl_it->first == flush_id) { - release_journal_sectors(fl_it->op); - priv->op_state++; - ringloop->wakeup(); + fl_it->second.pending = 0; + del = is_first; + } + else + { + del = !fl_it->second.pending; + } + if (del) + { + // Do not complete this operation if previous writes are unfinished + // Otherwise also complete following operations waiting for this one + auto priv = PRIV(fl_it->second.op); + priv->pending_ops--; + assert(priv->pending_ops >= 0); + if (priv->pending_ops == 0) + { + release_journal_sectors(fl_it->second.op); + priv->op_state++; + ringloop->wakeup(); + } + journal.flushing_ops.erase(fl_it++); + } + else + { + fl_it++; } - journal.flushing_ops.erase(fl_it++); } } diff --git a/src/blockstore_journal.h b/src/blockstore_journal.h index 60476304..4fea4d5a 100644 --- a/src/blockstore_journal.h +++ b/src/blockstore_journal.h @@ -156,16 +156,11 @@ struct journal_sector_info_t struct pending_journaling_t { - uint64_t flush_id; + int pending; int sector; blockstore_op_t *op; }; -inline bool operator < (const pending_journaling_t & a, const pending_journaling_t & b) -{ - return a.flush_id < b.flush_id || a.flush_id == b.flush_id && a.op < b.op; -} - struct journal_t { int fd; @@ -191,7 +186,7 @@ struct journal_t int cur_sector = 0; int in_sector_pos = 0; std::vector submitting_sectors; - std::set flushing_ops; + std::multimap flushing_ops; uint64_t submit_id = 0; // Used sector map diff --git a/src/blockstore_write.cpp b/src/blockstore_write.cpp index f393ed05..b77d3602 100644 --- a/src/blockstore_write.cpp +++ b/src/blockstore_write.cpp @@ -427,7 +427,6 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) ); write_iodepth++; // Got SQEs. Prepare previous journal sector write if required - auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); }; if (immediate_commit == IMMEDIATE_NONE && !journal.entry_fits(sizeof(journal_entry_small_write) + dyn_size)) { @@ -503,7 +502,15 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) } BS_SUBMIT_GET_SQE(sqe2, data2); data2->iov = (struct iovec){ op->buf, op->len }; - data2->callback = cb; + ++journal.submit_id; + assert(journal.submit_id != 0); // check overflow + // Make subsequent journal writes wait for our data write + journal.flushing_ops.emplace(journal.submit_id, (pending_journaling_t){ + .pending = 1, + .sector = -1, + .op = op, + }); + data2->callback = [this, flush_id = journal.submit_id](ring_data_t *data) { handle_journal_write(data, flush_id); }; my_uring_prep_writev( sqe2, dsk.journal_fd, &data2->iov, 1, journal.offset + journal.next_free );