diff --git a/blockstore.cpp b/blockstore.cpp index ce0a051e..27639bfc 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -208,7 +208,7 @@ bool blockstore::is_safe_to_stop() blockstore_operation *op = new blockstore_operation; op->flags = OP_SYNC; op->buf = NULL; - op->callback = [&](blockstore_operation *op) + op->callback = [](blockstore_operation *op) { delete op; }; diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index d1123932..6c96b0a4 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -141,7 +141,10 @@ void journal_flusher_co::loop() goto resume_13; resume_0: if (!flusher->flush_queue.size()) + { + wait_state = 0; return; + } cur.oid = flusher->flush_queue.front(); cur.version = flusher->flush_versions[cur.oid]; flusher->flush_queue.pop_front(); @@ -161,7 +164,6 @@ resume_0: // Another coroutine will see it and re-queue the object after it finishes if (repeat_it->second < cur.version) repeat_it->second = cur.version; - wait_state = 0; goto resume_0; } else @@ -248,7 +250,6 @@ resume_0: flusher->unshift_flush({ .oid = cur.oid, .version = repeat_it->second }); } flusher->sync_to_repeat.erase(repeat_it); - wait_state = 0; goto resume_0; } // Find it in clean_db @@ -371,13 +372,13 @@ resume_0: { // And sync everything (in batches - not per each operation!) cur_sync = flusher->syncs.end(); - if (cur_sync == flusher->syncs.begin()) + if (cur_sync == flusher->syncs.begin() || cur_sync->state == 1) cur_sync = flusher->syncs.emplace(flusher->syncs.end(), (flusher_sync_t){ .ready_count = 0, .state = 0 }); else cur_sync--; cur_sync->ready_count++; if (cur_sync->ready_count >= flusher->sync_threshold || - !flusher->active_until_sync && !flusher->flush_queue.size()) + !flusher->active_until_sync && (!flusher->flush_queue.size() || flusher->active_flushers >= flusher->flusher_count)) { // Sync batch is ready. Do it. await_sqe(9); @@ -393,12 +394,15 @@ resume_0: my_uring_prep_fsync(sqe, bs->meta_fd, IORING_FSYNC_DATASYNC); wait_count++; } - wait_state = 11; resume_11: if (wait_count > 0) + { + wait_state = 11; return; + } // Sync completed. All previous coroutines waiting for it must be resumed cur_sync->state = 1; + bs->ringloop->wakeup(bs->ring_consumer); } // Wait until someone else sends and completes a sync. resume_8: @@ -519,7 +523,6 @@ resume_0: #ifdef BLOCKSTORE_DEBUG printf("Flushed %lu:%lu v%lu\n", cur.oid.inode, cur.oid.stripe, cur.version); #endif - wait_state = 0; flusher->active_flushers--; repeat_it = flusher->sync_to_repeat.find(cur.oid); if (repeat_it->second > cur.version) diff --git a/blockstore_init.cpp b/blockstore_init.cpp index e95b456c..491a3941 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -349,7 +349,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) .oid = je->small_write.oid, .version = je->small_write.version, }; +#ifdef BLOCKSTORE_DEBUG printf("je_small_write oid=%lu:%lu ver=%lu offset=%u len=%u\n", ov.oid.inode, ov.oid.stripe, ov.version, je->small_write.offset, je->small_write.len); +#endif bs->dirty_db.emplace(ov, (dirty_entry){ .state = ST_J_SYNCED, .flags = 0, @@ -372,7 +374,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) .oid = je->big_write.oid, .version = je->big_write.version, }; +#ifdef BLOCKSTORE_DEBUG printf("je_big_write oid=%lu:%lu ver=%lu\n", ov.oid.inode, ov.oid.stripe, ov.version); +#endif bs->dirty_db.emplace(ov, (dirty_entry){ .state = ST_D_META_SYNCED, .flags = 0, @@ -401,7 +405,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) } else { +#ifdef BLOCKSTORE_DEBUG printf("je_stable oid=%lu:%lu ver=%lu\n", ov.oid.inode, ov.oid.stripe, ov.version); +#endif while (1) { it->second.state = (it->second.state == ST_D_META_SYNCED diff --git a/fio_engine.cpp b/fio_engine.cpp index fe86563f..8743b927 100644 --- a/fio_engine.cpp +++ b/fio_engine.cpp @@ -89,12 +89,19 @@ static void bs_cleanup(struct thread_data *td) bs_data *bsd = (bs_data*)td->io_ops_data; if (bsd) { - while (!bsd->bs->is_safe_to_stop()) + while (1) { - bsd->ringloop->loop(); + do + { + bsd->ringloop->loop(); + if (bsd->bs->is_safe_to_stop()) + goto safe; + } while (bsd->ringloop->loop_again); bsd->ringloop->wait(); } + safe: delete bsd->bs; + delete bsd->ringloop; delete bsd; } } diff --git a/ringloop.h b/ringloop.h index 8201a1e8..0c573c09 100644 --- a/ringloop.h +++ b/ringloop.h @@ -120,8 +120,8 @@ class ring_loop_t { std::vector consumers; struct ring_data_t *ring_data; - bool loop_again; public: + bool loop_again; struct io_uring ring; ring_loop_t(int qd); ~ring_loop_t();