From 44656fbf67e2f8bf7e4fce90a0448c849b9f2fe7 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 4 Dec 2020 11:37:01 +0300 Subject: [PATCH] Allow writes with low version numbers after a delete --- blockstore_flush.cpp | 2 +- blockstore_impl.h | 15 ++++--- blockstore_rollback.cpp | 34 +++++++++++++- blockstore_write.cpp | 98 +++++++++++++++++++++++++++++++---------- 4 files changed, 117 insertions(+), 32 deletions(-) diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index 5868794dd..f94f359a2 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -105,8 +105,8 @@ void journal_flusher_t::unshift_flush(obj_ver_id ov) else { flush_versions[ov.oid] = ov.version; - flush_queue.push_front(ov.oid); } + flush_queue.push_front(ov.oid); if (!dequeuing && (flush_queue.size() >= flusher_start_threshold || trim_wanted > 0)) { dequeuing = true; diff --git a/blockstore_impl.h b/blockstore_impl.h index 3f8b5c506..662987c90 100644 --- a/blockstore_impl.h +++ b/blockstore_impl.h @@ -30,12 +30,13 @@ #define BS_ST_BIG_WRITE 0x02 #define BS_ST_DELETE 0x03 -#define BS_ST_WAIT_BIG 0x10 -#define BS_ST_IN_FLIGHT 0x20 -#define BS_ST_SUBMITTED 0x30 -#define BS_ST_WRITTEN 0x40 -#define BS_ST_SYNCED 0x50 -#define BS_ST_STABLE 0x60 +#define BS_ST_WAIT_DEL 0x10 +#define BS_ST_WAIT_BIG 0x20 +#define BS_ST_IN_FLIGHT 0x30 +#define BS_ST_SUBMITTED 0x40 +#define BS_ST_WRITTEN 0x50 +#define BS_ST_SYNCED 0x60 +#define BS_ST_STABLE 0x70 #define BS_ST_INSTANT 0x100 @@ -153,6 +154,8 @@ struct blockstore_op_private_t // Write struct iovec iov_zerofill[3]; + // Warning: must not have a default value here because it's written to before calling constructor in blockstore_write.cpp O_o + uint64_t real_version; // Sync std::vector sync_big_writes, sync_small_writes; diff --git a/blockstore_rollback.cpp b/blockstore_rollback.cpp index 5dc52b7b7..ae1b5a1c9 100644 --- a/blockstore_rollback.cpp +++ b/blockstore_rollback.cpp @@ -234,10 +234,35 @@ void blockstore_impl_t::handle_rollback_event(ring_data_t *data, blockstore_op_t void blockstore_impl_t::erase_dirty(blockstore_dirty_db_t::iterator dirty_start, blockstore_dirty_db_t::iterator dirty_end, uint64_t clean_loc) { - auto dirty_it = dirty_end; - while (dirty_it != dirty_start) + if (dirty_end == dirty_start) { + return; + } + auto dirty_it = dirty_end; + dirty_it--; + if (IS_DELETE(dirty_it->second.state)) + { + object_id oid = dirty_it->first.oid; + dirty_it = dirty_end; + // Unblock operations blocked by delete flushing + uint32_t next_state = BS_ST_IN_FLIGHT; + while (dirty_it != dirty_db.end() && dirty_it->first.oid == oid) + { + if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_DEL) + { + dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | next_state; + if (IS_BIG_WRITE(dirty_it->second.state)) + { + next_state = BS_ST_WAIT_BIG; + } + } + dirty_it++; + } + dirty_it = dirty_end; dirty_it--; + } + while (1) + { if (IS_BIG_WRITE(dirty_it->second.state) && dirty_it->second.location != clean_loc) { #ifdef BLOCKSTORE_DEBUG @@ -256,6 +281,11 @@ void blockstore_impl_t::erase_dirty(blockstore_dirty_db_t::iterator dirty_start, { journal.used_sectors.erase(dirty_it->second.journal_sector); } + if (dirty_it == dirty_start) + { + break; + } + dirty_it--; } dirty_db.erase(dirty_start, dirty_end); } diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 53a3322c8..c43ca239f 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -7,7 +7,7 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) { // Check or assign version number bool found = false, deleted = false, is_del = (op->opcode == BS_OP_DELETE); - bool is_inflight_big = false; + bool wait_big = false, wait_del = false; uint64_t version = 1; if (dirty_db.size() > 0) { @@ -21,7 +21,8 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) found = true; version = dirty_it->first.version + 1; deleted = IS_DELETE(dirty_it->second.state); - is_inflight_big = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE + wait_del = ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_DEL); + wait_big = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE ? !IS_SYNCED(dirty_it->second.state) : ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG); } @@ -38,23 +39,40 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) deleted = true; } } - if (op->version == 0) - { - op->version = version; - } - else if (op->version < version) - { - // Invalid version requested - op->retval = -EEXIST; - return false; - } if (deleted && is_del) { // Already deleted op->retval = 0; return false; } - if (is_inflight_big && !is_del && !deleted && op->len < block_size && + PRIV(op)->real_version = 0; + if (op->version == 0) + { + op->version = version; + } + else if (op->version < version) + { + // Implicit operations must be added like that: DEL [FLUSH] BIG [SYNC] SMALL SMALL + if (deleted || wait_del) + { + // It's allowed to write versions with low numbers over deletes + // However, we have to flush those deletes first as we use version number for ordering + wait_del = true; + PRIV(op)->real_version = op->version; + op->version = version; + flusher->unshift_flush((obj_ver_id){ + .oid = op->oid, + .version = version-1, + }); + } + else + { + // Invalid version requested + op->retval = -EEXIST; + return false; + } + } + if (wait_big && !is_del && !deleted && op->len < block_size && immediate_commit != IMMEDIATE_ALL) { // Issue an additional sync so that the previous big write can reach the journal @@ -72,19 +90,28 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) else printf("Write %lx:%lx v%lu offset=%u len=%u\n", op->oid.inode, op->oid.stripe, op->version, op->offset, op->len); #endif - // No strict need to add it into dirty_db here, it's just left + // FIXME No strict need to add it into dirty_db here, it's just left // from the previous implementation where reads waited for writes + uint32_t state; + if (is_del) + state = BS_ST_DELETE | BS_ST_IN_FLIGHT; + else + { + state = (op->len == block_size || deleted ? BS_ST_BIG_WRITE : BS_ST_SMALL_WRITE); + if (wait_del) + state |= BS_ST_WAIT_DEL; + else if (state == BS_ST_SMALL_WRITE && wait_big) + state |= BS_ST_WAIT_BIG; + else + state |= BS_ST_IN_FLIGHT; + if (op->opcode == BS_OP_WRITE_STABLE) + state |= BS_ST_INSTANT; + } dirty_db.emplace((obj_ver_id){ .oid = op->oid, .version = op->version, }, (dirty_entry){ - .state = (uint32_t)( - is_del - ? (BS_ST_DELETE | BS_ST_IN_FLIGHT) - : (op->opcode == BS_OP_WRITE_STABLE ? BS_ST_INSTANT : 0) | (op->len == block_size || deleted - ? (BS_ST_BIG_WRITE | BS_ST_IN_FLIGHT) - : (is_inflight_big ? (BS_ST_SMALL_WRITE | BS_ST_WAIT_BIG) : (BS_ST_SMALL_WRITE | BS_ST_IN_FLIGHT))) - ), + .state = state, .flags = 0, .location = 0, .offset = is_del ? 0 : op->offset, @@ -106,12 +133,35 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) .version = op->version, }); assert(dirty_it != dirty_db.end()); - if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG) + if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) < BS_ST_IN_FLIGHT) { // Don't dequeue return 0; } - else if ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE) + if (PRIV(op)->real_version != 0) + { + // Restore original low version number for unblocked operations + auto prev_it = dirty_it; + prev_it--; + if (prev_it->first.oid == op->oid && prev_it->first.version >= PRIV(op)->real_version) + { + // Original version is still invalid + // FIXME Oops. Successive small writes will currently break in an unexpected way. Fix it + dirty_db.erase(dirty_it); + op->retval = -EEXIST; + FINISH_OP(op); + return 1; + } + op->version = PRIV(op)->real_version; + PRIV(op)->real_version = 0; + dirty_entry e = dirty_it->second; + dirty_db.erase(dirty_it); + dirty_it = dirty_db.emplace((obj_ver_id){ + .oid = op->oid, + .version = op->version, + }, e).first; + } + if ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE) { blockstore_journal_check_t space_check(this); if (!space_check.check_available(op, unsynced_big_writes.size() + 1, sizeof(journal_entry_big_write), JOURNAL_STABILIZE_RESERVATION)) @@ -129,6 +179,8 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) PRIV(op)->wait_for = WAIT_FREE; return 0; } + // FIXME Oops. Successive small writes will currently break in an unexpected way. Fix it + dirty_db.erase(dirty_it); op->retval = -ENOSPC; FINISH_OP(op); return 1;