diff --git a/blockstore_impl.cpp b/blockstore_impl.cpp index 90711c9d..c3ceb9ae 100644 --- a/blockstore_impl.cpp +++ b/blockstore_impl.cpp @@ -145,7 +145,7 @@ void blockstore_impl_t::loop() } unsigned ring_space = ringloop->space_left(); unsigned prev_sqe_pos = ringloop->save(); - int dequeue_op = 0; + bool dequeue_op = false; if (op->opcode == BS_OP_READ) { dequeue_op = dequeue_read(op); @@ -175,16 +175,33 @@ void blockstore_impl_t::loop() } else if (op->opcode == BS_OP_STABLE) { + if (has_writes == 2) + { + // Don't submit additional flushes before completing previous LISTs + break; + } dequeue_op = dequeue_stable(op); } else if (op->opcode == BS_OP_ROLLBACK) { + if (has_writes == 2) + { + // Don't submit additional flushes before completing previous LISTs + break; + } dequeue_op = dequeue_rollback(op); } else if (op->opcode == BS_OP_LIST) { - process_list(op); - dequeue_op = true; + // Block LIST operation by previous modifications, + // so it always returns a consistent state snapshot + if (has_writes == 2 || inflight_writes > 0) + has_writes = 2; + else + { + process_list(op); + dequeue_op = true; + } } if (dequeue_op) { diff --git a/blockstore_impl.h b/blockstore_impl.h index 151b65d1..ff6c915c 100644 --- a/blockstore_impl.h +++ b/blockstore_impl.h @@ -230,6 +230,7 @@ class blockstore_impl_t bool live = false, queue_stall = false; ring_loop_t *ringloop; + int inflight_writes = 0; bool stop_sync_submitted; diff --git a/blockstore_rollback.cpp b/blockstore_rollback.cpp index 78aa47b6..bdb8b812 100644 --- a/blockstore_rollback.cpp +++ b/blockstore_rollback.cpp @@ -18,6 +18,10 @@ int blockstore_impl_t::dequeue_rollback(blockstore_op_t *op) }); if (dirty_it == dirty_db.begin()) { + if (v->version == 0) + { + // Already rolled back + } bad_op: op->retval = -EINVAL; FINISH_OP(op); @@ -115,6 +119,7 @@ int blockstore_impl_t::dequeue_rollback(blockstore_op_t *op) PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; PRIV(op)->pending_ops = s; PRIV(op)->op_state = 1; + inflight_writes++; return 1; } @@ -175,6 +180,7 @@ resume_5: erase_dirty(rm_start, rm_end, UINT64_MAX); } journal.trim(); + inflight_writes--; // Acknowledge op op->retval = 0; FINISH_OP(op); @@ -186,6 +192,7 @@ void blockstore_impl_t::handle_rollback_event(ring_data_t *data, blockstore_op_t live = true; if (data->res != data->iov.iov_len) { + inflight_writes--; throw std::runtime_error( "write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+ "). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111" diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index d7c04ec4..4fe78c5e 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -108,6 +108,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) } for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) { + // FIXME: Only stabilize versions that aren't stable yet auto unstab_it = unstable_writes.find(v->oid); if (unstab_it != unstable_writes.end() && unstab_it->second <= v->version) @@ -132,6 +133,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; PRIV(op)->pending_ops = s; PRIV(op)->op_state = 1; + inflight_writes++; return 1; } @@ -209,6 +211,7 @@ resume_5: flusher->enqueue_flush(*v); } } + inflight_writes--; // Acknowledge op op->retval = 0; FINISH_OP(op); @@ -220,6 +223,7 @@ void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t * live = true; if (data->res != data->iov.iov_len) { + inflight_writes--; throw std::runtime_error( "write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+ "). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111" diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 9a98fe7b..d9fd2b37 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -276,6 +276,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) PRIV(op)->op_state = 3; } } + inflight_writes++; return 1; } @@ -357,6 +358,7 @@ resume_4: dirty_it++; } } + inflight_writes--; // Acknowledge write op->retval = op->len; FINISH_OP(op); @@ -368,6 +370,7 @@ void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *o live = true; if (data->res != data->iov.iov_len) { + inflight_writes--; // FIXME: our state becomes corrupted after a write error. maybe do something better than just die throw std::runtime_error( "write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+