diff --git a/blockstore.cpp b/blockstore.cpp index 897521932..97cd179ec 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -20,6 +20,11 @@ bool blockstore_t::is_started() return impl->is_started(); } +bool blockstore_t::is_stalled() +{ + return impl->is_stalled(); +} + bool blockstore_t::is_safe_to_stop() { return impl->is_safe_to_stop(); diff --git a/blockstore.h b/blockstore.h index 06d8af04c..96d88b79d 100644 --- a/blockstore.h +++ b/blockstore.h @@ -82,6 +82,9 @@ public: // (Although you're free to enqueue them before that) bool is_started(); + // Returns true when blockstore is stalled + bool is_stalled(); + // Returns true when it's safe to destroy the instance. If destroying the instance // requires to purge some queues, starts that process. Should be called in the event // loop until it returns true. diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index 189249c12..0a1c72661 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -22,6 +22,7 @@ journal_flusher_co::journal_flusher_co() wait_state = 0; simple_callback_r = [this](ring_data_t* data) { + bs->live = true; if (data->res != data->iov.iov_len) { throw std::runtime_error( @@ -33,6 +34,7 @@ journal_flusher_co::journal_flusher_co() }; simple_callback_w = [this](ring_data_t* data) { + bs->live = true; if (data->res != data->iov.iov_len) { throw std::runtime_error( diff --git a/blockstore_impl.cpp b/blockstore_impl.cpp index 3c367b931..2c6660452 100644 --- a/blockstore_impl.cpp +++ b/blockstore_impl.cpp @@ -64,6 +64,11 @@ bool blockstore_impl_t::is_started() return initialized == 10; } +bool blockstore_impl_t::is_stalled() +{ + return queue_stall; +} + // main event loop - produce requests void blockstore_impl_t::loop() { @@ -100,6 +105,7 @@ void blockstore_impl_t::loop() else { // try to submit ops + unsigned initial_ring_space = ringloop->space_left(); auto cur_sync = in_progress_syncs.begin(); while (cur_sync != in_progress_syncs.end()) { @@ -190,6 +196,12 @@ void blockstore_impl_t::loop() { throw std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret)); } + if ((initial_ring_space - ringloop->space_left()) > 0) + { + live = true; + } + queue_stall = !live && !ringloop->get_loop_again(); + live = false; } } diff --git a/blockstore_impl.h b/blockstore_impl.h index 62e876113..b8fe39b41 100644 --- a/blockstore_impl.h +++ b/blockstore_impl.h @@ -207,6 +207,7 @@ class blockstore_impl_t struct journal_t journal; journal_flusher_t *flusher; + bool live = false, queue_stall = false; ring_loop_t *ringloop; bool stop_sync_submitted; @@ -281,6 +282,9 @@ public: // loop until it returns true. bool is_safe_to_stop(); + // Returns true if stalled + bool is_stalled(); + // Submission void enqueue_op(blockstore_op_t *op); diff --git a/blockstore_read.cpp b/blockstore_read.cpp index 5f9413779..d53c6c384 100644 --- a/blockstore_read.cpp +++ b/blockstore_read.cpp @@ -155,6 +155,7 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op) void blockstore_impl_t::handle_read_event(ring_data_t *data, blockstore_op_t *op) { + live = true; PRIV(op)->pending_ops--; if (data->res != data->iov.iov_len) { diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index ace475306..7942801b4 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -123,6 +123,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t *op) { + live = true; if (data->res != data->iov.iov_len) { throw std::runtime_error( diff --git a/blockstore_sync.cpp b/blockstore_sync.cpp index f35227ca0..c5b9319b1 100644 --- a/blockstore_sync.cpp +++ b/blockstore_sync.cpp @@ -133,6 +133,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) void blockstore_impl_t::handle_sync_event(ring_data_t *data, blockstore_op_t *op) { + live = true; if (data->res != data->iov.iov_len) { throw std::runtime_error( diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 7089519ee..515a5773d 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -212,6 +212,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *op) { + live = true; if (data->res != data->iov.iov_len) { // FIXME: our state becomes corrupted after a write error. maybe do something better than just die