diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index cec3df25..6b18befc 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -22,7 +22,18 @@ journal_flusher_t::journal_flusher_t(int flusher_count, blockstore *bs) journal_flusher_co::journal_flusher_co() { wait_state = 0; - simple_callback = [this](ring_data_t* data) + simple_callback_r = [this](ring_data_t* data) + { + if (data->res != data->iov.iov_len) + { + throw std::runtime_error( + "data read operation failed during flush ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+ + "). can't continue, sorry :-(" + ); + } + wait_count--; + }; + simple_callback_w = [this](ring_data_t* data) { if (data->res != data->iov.iov_len) { @@ -184,7 +195,7 @@ resume_0: await_sqe(1); it = v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(512, submit_len) }); data->iov = (struct iovec){ v.back().buf, (size_t)submit_len }; - data->callback = simple_callback; + data->callback = simple_callback_r; my_uring_prep_readv( sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + dirty_it->second.location + offset - dirty_it->second.offset ); @@ -285,7 +296,7 @@ resume_0: { throw std::runtime_error( "metadata read operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+ - "). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111" + "). can't continue, sorry :-(" ); } meta_it->second.state = 1; @@ -309,7 +320,7 @@ resume_0: { await_sqe(4); data->iov = (struct iovec){ it->buf, (size_t)it->len }; - data->callback = simple_callback; + data->callback = simple_callback_w; my_uring_prep_writev( sqe, bs->data_fd, &data->iov, 1, bs->data_offset + clean_loc + it->offset ); @@ -331,7 +342,7 @@ resume_0: // BUT it requires that journal entries even older than clean_db are replayed after restart await_sqe(6); data->iov = (struct iovec){ meta_it->second.buf, 512 }; - data->callback = simple_callback; + data->callback = simple_callback_w; my_uring_prep_writev( sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector ); @@ -369,14 +380,14 @@ resume_0: { // Sync batch is ready. Do it. await_sqe(9); - data->callback = simple_callback; + data->callback = simple_callback_w; data->iov = { 0 }; my_uring_prep_fsync(sqe, bs->data_fd, IORING_FSYNC_DATASYNC); wait_count++; if (bs->meta_fd != bs->data_fd) { await_sqe(10); - data->callback = simple_callback; + data->callback = simple_callback_w; data->iov = { 0 }; my_uring_prep_fsync(sqe, bs->meta_fd, IORING_FSYNC_DATASYNC); wait_count++; @@ -482,7 +493,7 @@ resume_0: #endif // Update journal "superblock" await_sqe(12); - data->callback = simple_callback; + data->callback = simple_callback_w; *((journal_entry_start*)flusher->journal_superblock) = { .crc32 = 0, .magic = JOURNAL_MAGIC, diff --git a/blockstore_flush.h b/blockstore_flush.h index aa6f86ef..93f1aed2 100644 --- a/blockstore_flush.h +++ b/blockstore_flush.h @@ -37,7 +37,7 @@ class journal_flusher_co std::map::iterator meta_it; std::map::iterator repeat_it; std::map::iterator journal_used_it; - std::function simple_callback; + std::function simple_callback_r, simple_callback_w; std::list::iterator cur_sync; friend class journal_flusher_t; public: