From 1cec62d25dd1b7aef6d71cd5719d9679ff0f4094 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 31 Dec 2023 00:41:16 +0300 Subject: [PATCH] Sync only completed writes Should be a final remaining fix to EC + non-capacitor (non-immediate-commit) write hangs :). First it was breaking non-EC ("instantly stable") writes because they sometimes complete out of order which was leading to the following error: terminate called after throwing an instance of 'std::runtime_error' what(): BUG: Unexpected dirty_entry 1000000000001:29480000 v65540 unstable state during flush: 0x151 But it is easily fixed by scanning previous and next dirty_entries in mark_stable. --- src/blockstore_impl.cpp | 16 ++++------------ src/blockstore_impl.h | 4 +++- src/blockstore_rollback.cpp | 1 + src/blockstore_stable.cpp | 32 +++++++++++++++++++++++++++++++- src/blockstore_sync.cpp | 3 +-- src/blockstore_write.cpp | 2 +- 6 files changed, 41 insertions(+), 17 deletions(-) diff --git a/src/blockstore_impl.cpp b/src/blockstore_impl.cpp index 2178395d..55d00960 100644 --- a/src/blockstore_impl.cpp +++ b/src/blockstore_impl.cpp @@ -163,20 +163,10 @@ void blockstore_impl_t::loop() } else if (op->opcode == BS_OP_SYNC) { - // wait for all small writes to be submitted - // wait for all big writes to complete, submit data device fsync + // sync only completed writes? // wait for the data device fsync to complete, then submit journal writes for big writes // then submit an fsync operation - if (has_writes) - { - // Can't submit SYNC before previous writes - continue; - } wr_st = continue_sync(op); - if (wr_st != 2) - { - has_writes = wr_st > 0 ? 1 : 2; - } } else if (op->opcode == BS_OP_STABLE) { @@ -283,7 +273,7 @@ void blockstore_impl_t::check_wait(blockstore_op_t *op) } else if (PRIV(op)->wait_for == WAIT_JOURNAL) { - if (journal.used_start == PRIV(op)->wait_detail) + if (journal.used_start == PRIV(op)->wait_detail && !unstable_count_changed) { // do not submit #ifdef BLOCKSTORE_DEBUG @@ -291,6 +281,7 @@ void blockstore_impl_t::check_wait(blockstore_op_t *op) #endif return; } + unstable_count_changed = false; flusher->release_trim(); PRIV(op)->wait_for = 0; } @@ -362,6 +353,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op) }; } unstable_writes.clear(); + unstable_count_changed = true; op->callback = [old_callback](blockstore_op_t *op) { obj_ver_id *vers = (obj_ver_id*)op->buf; diff --git a/src/blockstore_impl.h b/src/blockstore_impl.h index e20e956c..99f01cef 100644 --- a/src/blockstore_impl.h +++ b/src/blockstore_impl.h @@ -55,6 +55,7 @@ #define IS_JOURNAL(st) (((st) & 0x0F) == BS_ST_SMALL_WRITE) #define IS_BIG_WRITE(st) (((st) & 0x0F) == BS_ST_BIG_WRITE) #define IS_DELETE(st) (((st) & 0x0F) == BS_ST_DELETE) +#define IS_INSTANT(st) (((st) & BS_ST_TYPE_MASK) == BS_ST_DELETE || ((st) & BS_ST_INSTANT)) #define BS_SUBMIT_CHECK_SQES(n) \ if (ringloop->sqes_left() < (n))\ @@ -275,6 +276,7 @@ class blockstore_impl_t std::vector submit_queue; std::vector unsynced_big_writes, unsynced_small_writes; int unsynced_big_write_count = 0, unstable_unsynced = 0; + bool unstable_count_changed = false; int unsynced_queued_ops = 0; allocator *data_alloc = NULL; uint64_t used_blocks = 0; @@ -377,7 +379,7 @@ class blockstore_impl_t // Stabilize int dequeue_stable(blockstore_op_t *op); int continue_stable(blockstore_op_t *op); - void mark_stable(const obj_ver_id & ov, bool forget_dirty = false); + void mark_stable(obj_ver_id ov, bool forget_dirty = false); void stabilize_object(object_id oid, uint64_t max_ver); blockstore_op_t* selective_sync(blockstore_op_t *op); int split_stab_op(blockstore_op_t *op, std::function decider); diff --git a/src/blockstore_rollback.cpp b/src/blockstore_rollback.cpp index 50b6eb88..cc686112 100644 --- a/src/blockstore_rollback.cpp +++ b/src/blockstore_rollback.cpp @@ -162,6 +162,7 @@ void blockstore_impl_t::mark_rolled_back(const obj_ver_id & ov) unstable_writes.erase(unstab_it); else unstab_it->second = max_unstable; + unstable_count_changed = true; } } } diff --git a/src/blockstore_stable.cpp b/src/blockstore_stable.cpp index 10648ad9..f3d4dc27 100644 --- a/src/blockstore_stable.cpp +++ b/src/blockstore_stable.cpp @@ -412,11 +412,40 @@ resume_4: return 2; } -void blockstore_impl_t::mark_stable(const obj_ver_id & v, bool forget_dirty) +void blockstore_impl_t::mark_stable(obj_ver_id v, bool forget_dirty) { auto dirty_it = dirty_db.find(v); if (dirty_it != dirty_db.end()) { + if (IS_INSTANT(dirty_it->second.state)) + { + // 'Instant' (non-EC) operations may complete and try to become stable out of order. Prevent it. + auto back_it = dirty_it; + while (back_it != dirty_db.begin()) + { + back_it--; + if (back_it->first.oid != v.oid) + { + break; + } + if (!IS_STABLE(back_it->second.state)) + { + // There are preceding unstable versions, can't flush + return; + } + } + while (true) + { + dirty_it++; + if (dirty_it == dirty_db.end() || dirty_it->first.oid != v.oid || + !IS_SYNCED(dirty_it->second.state)) + { + dirty_it--; + break; + } + v.version = dirty_it->first.version; + } + } while (1) { bool was_stable = IS_STABLE(dirty_it->second.state); @@ -508,5 +537,6 @@ void blockstore_impl_t::mark_stable(const obj_ver_id & v, bool forget_dirty) unstab_it->second <= v.version) { unstable_writes.erase(unstab_it); + unstable_count_changed = true; } } diff --git a/src/blockstore_sync.cpp b/src/blockstore_sync.cpp index 1d64a4f6..50891a13 100644 --- a/src/blockstore_sync.cpp +++ b/src/blockstore_sync.cpp @@ -85,8 +85,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) left--; auto & dirty_entry = dirty_db.at(sbw); uint64_t dyn_size = dsk.dirty_dyn_size(dirty_entry.offset, dirty_entry.len); - if (!space_check.check_available(op, 1, sizeof(journal_entry_big_write) + dyn_size, - (unstable_writes.size()+unstable_unsynced)*journal.block_size)) + if (!space_check.check_available(op, 1, sizeof(journal_entry_big_write) + dyn_size, 0)) { return 0; } diff --git a/src/blockstore_write.cpp b/src/blockstore_write.cpp index 36828abc..355e4f38 100644 --- a/src/blockstore_write.cpp +++ b/src/blockstore_write.cpp @@ -593,7 +593,7 @@ resume_4: #endif bool is_big = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE; bool imm = is_big ? (immediate_commit == IMMEDIATE_ALL) : (immediate_commit != IMMEDIATE_NONE); - bool is_instant = ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE || (dirty_it->second.state & BS_ST_INSTANT)); + bool is_instant = IS_INSTANT(dirty_it->second.state); if (imm) { auto & unstab = unstable_writes[op->oid];