From 4b05bde3a2efde3b0b7594b90f68ea4377bbd777 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 10 Jan 2020 20:05:17 +0300 Subject: [PATCH] Block writes earlier than sync/stabilize would be blocked, too --- blockstore.cpp | 7 ++++++- blockstore.h | 4 ++++ blockstore_impl.cpp | 22 +++++++++++++++++----- blockstore_impl.h | 2 +- blockstore_journal.h | 3 +++ blockstore_write.cpp | 12 +++++++----- 6 files changed, 38 insertions(+), 12 deletions(-) diff --git a/blockstore.cpp b/blockstore.cpp index 97cd179ec..e9f2c6e47 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -32,7 +32,12 @@ bool blockstore_t::is_safe_to_stop() void blockstore_t::enqueue_op(blockstore_op_t *op) { - impl->enqueue_op(op); + impl->enqueue_op(op, false); +} + +void blockstore_t::enqueue_op_first(blockstore_op_t *op) +{ + impl->enqueue_op(op, true); } std::map & blockstore_t::get_unstable_writes() diff --git a/blockstore.h b/blockstore.h index 96d88b79d..8f5f76196 100644 --- a/blockstore.h +++ b/blockstore.h @@ -93,6 +93,10 @@ public: // Submission void enqueue_op(blockstore_op_t *op); + // Insert operation into the beginning of the queue + // Intended for the OSD syncer "thread" to be able to stabilize something when the journal is full + void enqueue_op_first(blockstore_op_t *op); + // Unstable writes are added here (map of object_id -> version) std::map & get_unstable_writes(); diff --git a/blockstore_impl.cpp b/blockstore_impl.cpp index 2c6660452..e1d994289 100644 --- a/blockstore_impl.cpp +++ b/blockstore_impl.cpp @@ -289,12 +289,17 @@ void blockstore_impl_t::check_wait(blockstore_op_t *op) } } -void blockstore_impl_t::enqueue_op(blockstore_op_t *op) +void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first) { int type = op->opcode & BS_OP_TYPE_MASK; - if (type < BS_OP_MIN || type > BS_OP_MAX || (type == BS_OP_READ || type == BS_OP_WRITE) && - (op->offset >= block_size || op->len > block_size-op->offset || (op->len % DISK_ALIGNMENT)) || - readonly && type != BS_OP_READ) + if (type < BS_OP_MIN || type > BS_OP_MAX || + ((type == BS_OP_READ || type == BS_OP_WRITE) && ( + op->offset >= block_size || + op->len > block_size-op->offset || + (op->len % DISK_ALIGNMENT) + )) || + readonly && type != BS_OP_READ || + first && type == BS_OP_WRITE) { // Basic verification not passed op->retval = -EINVAL; @@ -313,7 +318,14 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op) PRIV(op)->wait_for = 0; PRIV(op)->sync_state = 0; PRIV(op)->pending_ops = 0; - submit_queue.push_back(op); + if (!first) + { + submit_queue.push_back(op); + } + else + { + submit_queue.push_front(op); + } if (type == BS_OP_WRITE) { enqueue_write(op); diff --git a/blockstore_impl.h b/blockstore_impl.h index b8fe39b41..ce9367c1c 100644 --- a/blockstore_impl.h +++ b/blockstore_impl.h @@ -286,7 +286,7 @@ public: bool is_stalled(); // Submission - void enqueue_op(blockstore_op_t *op); + void enqueue_op(blockstore_op_t *op, bool first = false); // Unstable writes are added here (map of object_id -> version) std::map unstable_writes; diff --git a/blockstore_journal.h b/blockstore_journal.h index f4b01dd60..b181a546e 100644 --- a/blockstore_journal.h +++ b/blockstore_journal.h @@ -6,6 +6,9 @@ #define JOURNAL_MAGIC 0x4A33 #define JOURNAL_BUFFER_SIZE 4*1024*1024 +// We reserve some extra space for future stabilize requests during writes +#define JOURNAL_STABILIZE_RESERVATION 65536 + // Journal entries // Journal entries are linked to each other by their crc32 value // The journal is almost a blockchain, because object versions constantly increase diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 515a5773d..c14e85a66 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -79,6 +79,11 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) }); if (dirty_it->second.state == ST_D_IN_FLIGHT) { + blockstore_journal_check_t space_check(this); + if (!space_check.check_available(op, unsynced_big_writes.size() + 1, sizeof(journal_entry_big_write), JOURNAL_STABILIZE_RESERVATION)) + { + return 0; + } // Big (redirect) write uint64_t loc = data_alloc->find_free(); if (loc == UINT64_MAX) @@ -137,12 +142,9 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) // Small (journaled) write // First check if the journal has sufficient space // FIXME Always two SQEs for now. Although it's possible to send 1 sometimes - //two_sqes = (JOURNAL_BLOCK_SIZE - journal.in_sector_pos < sizeof(struct journal_entry_small_write) - // ? (journal.len - next_pos < op->len) - // : (journal.sector_info[journal.cur_sector].offset + JOURNAL_BLOCK_SIZE != journal.next_free || - // journal.len - next_pos < op->len); blockstore_journal_check_t space_check(this); - if (!space_check.check_available(op, 1, sizeof(journal_entry_small_write), op->len)) + if (unsynced_big_writes.size() && !space_check.check_available(op, unsynced_big_writes.size(), sizeof(journal_entry_big_write), 0) + || !space_check.check_available(op, 1, sizeof(journal_entry_small_write), op->len + JOURNAL_STABILIZE_RESERVATION)) { return 0; }