Fix safe stop procedure

blocking-uring-test
Vitaliy Filippov 2019-11-28 02:27:17 +03:00
parent d56cb290ee
commit e1ac4dba23
5 changed files with 26 additions and 10 deletions

View File

@ -208,7 +208,7 @@ bool blockstore::is_safe_to_stop()
blockstore_operation *op = new blockstore_operation; blockstore_operation *op = new blockstore_operation;
op->flags = OP_SYNC; op->flags = OP_SYNC;
op->buf = NULL; op->buf = NULL;
op->callback = [&](blockstore_operation *op) op->callback = [](blockstore_operation *op)
{ {
delete op; delete op;
}; };

View File

@ -141,7 +141,10 @@ void journal_flusher_co::loop()
goto resume_13; goto resume_13;
resume_0: resume_0:
if (!flusher->flush_queue.size()) if (!flusher->flush_queue.size())
{
wait_state = 0;
return; return;
}
cur.oid = flusher->flush_queue.front(); cur.oid = flusher->flush_queue.front();
cur.version = flusher->flush_versions[cur.oid]; cur.version = flusher->flush_versions[cur.oid];
flusher->flush_queue.pop_front(); flusher->flush_queue.pop_front();
@ -161,7 +164,6 @@ resume_0:
// Another coroutine will see it and re-queue the object after it finishes // Another coroutine will see it and re-queue the object after it finishes
if (repeat_it->second < cur.version) if (repeat_it->second < cur.version)
repeat_it->second = cur.version; repeat_it->second = cur.version;
wait_state = 0;
goto resume_0; goto resume_0;
} }
else else
@ -248,7 +250,6 @@ resume_0:
flusher->unshift_flush({ .oid = cur.oid, .version = repeat_it->second }); flusher->unshift_flush({ .oid = cur.oid, .version = repeat_it->second });
} }
flusher->sync_to_repeat.erase(repeat_it); flusher->sync_to_repeat.erase(repeat_it);
wait_state = 0;
goto resume_0; goto resume_0;
} }
// Find it in clean_db // Find it in clean_db
@ -371,13 +372,13 @@ resume_0:
{ {
// And sync everything (in batches - not per each operation!) // And sync everything (in batches - not per each operation!)
cur_sync = flusher->syncs.end(); cur_sync = flusher->syncs.end();
if (cur_sync == flusher->syncs.begin()) if (cur_sync == flusher->syncs.begin() || cur_sync->state == 1)
cur_sync = flusher->syncs.emplace(flusher->syncs.end(), (flusher_sync_t){ .ready_count = 0, .state = 0 }); cur_sync = flusher->syncs.emplace(flusher->syncs.end(), (flusher_sync_t){ .ready_count = 0, .state = 0 });
else else
cur_sync--; cur_sync--;
cur_sync->ready_count++; cur_sync->ready_count++;
if (cur_sync->ready_count >= flusher->sync_threshold || if (cur_sync->ready_count >= flusher->sync_threshold ||
!flusher->active_until_sync && !flusher->flush_queue.size()) !flusher->active_until_sync && (!flusher->flush_queue.size() || flusher->active_flushers >= flusher->flusher_count))
{ {
// Sync batch is ready. Do it. // Sync batch is ready. Do it.
await_sqe(9); await_sqe(9);
@ -393,12 +394,15 @@ resume_0:
my_uring_prep_fsync(sqe, bs->meta_fd, IORING_FSYNC_DATASYNC); my_uring_prep_fsync(sqe, bs->meta_fd, IORING_FSYNC_DATASYNC);
wait_count++; wait_count++;
} }
wait_state = 11;
resume_11: resume_11:
if (wait_count > 0) if (wait_count > 0)
{
wait_state = 11;
return; return;
}
// Sync completed. All previous coroutines waiting for it must be resumed // Sync completed. All previous coroutines waiting for it must be resumed
cur_sync->state = 1; cur_sync->state = 1;
bs->ringloop->wakeup(bs->ring_consumer);
} }
// Wait until someone else sends and completes a sync. // Wait until someone else sends and completes a sync.
resume_8: resume_8:
@ -519,7 +523,6 @@ resume_0:
#ifdef BLOCKSTORE_DEBUG #ifdef BLOCKSTORE_DEBUG
printf("Flushed %lu:%lu v%lu\n", cur.oid.inode, cur.oid.stripe, cur.version); printf("Flushed %lu:%lu v%lu\n", cur.oid.inode, cur.oid.stripe, cur.version);
#endif #endif
wait_state = 0;
flusher->active_flushers--; flusher->active_flushers--;
repeat_it = flusher->sync_to_repeat.find(cur.oid); repeat_it = flusher->sync_to_repeat.find(cur.oid);
if (repeat_it->second > cur.version) if (repeat_it->second > cur.version)

View File

@ -349,7 +349,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
.oid = je->small_write.oid, .oid = je->small_write.oid,
.version = je->small_write.version, .version = je->small_write.version,
}; };
#ifdef BLOCKSTORE_DEBUG
printf("je_small_write oid=%lu:%lu ver=%lu offset=%u len=%u\n", ov.oid.inode, ov.oid.stripe, ov.version, je->small_write.offset, je->small_write.len); printf("je_small_write oid=%lu:%lu ver=%lu offset=%u len=%u\n", ov.oid.inode, ov.oid.stripe, ov.version, je->small_write.offset, je->small_write.len);
#endif
bs->dirty_db.emplace(ov, (dirty_entry){ bs->dirty_db.emplace(ov, (dirty_entry){
.state = ST_J_SYNCED, .state = ST_J_SYNCED,
.flags = 0, .flags = 0,
@ -372,7 +374,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
.oid = je->big_write.oid, .oid = je->big_write.oid,
.version = je->big_write.version, .version = je->big_write.version,
}; };
#ifdef BLOCKSTORE_DEBUG
printf("je_big_write oid=%lu:%lu ver=%lu\n", ov.oid.inode, ov.oid.stripe, ov.version); printf("je_big_write oid=%lu:%lu ver=%lu\n", ov.oid.inode, ov.oid.stripe, ov.version);
#endif
bs->dirty_db.emplace(ov, (dirty_entry){ bs->dirty_db.emplace(ov, (dirty_entry){
.state = ST_D_META_SYNCED, .state = ST_D_META_SYNCED,
.flags = 0, .flags = 0,
@ -401,7 +405,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
} }
else else
{ {
#ifdef BLOCKSTORE_DEBUG
printf("je_stable oid=%lu:%lu ver=%lu\n", ov.oid.inode, ov.oid.stripe, ov.version); printf("je_stable oid=%lu:%lu ver=%lu\n", ov.oid.inode, ov.oid.stripe, ov.version);
#endif
while (1) while (1)
{ {
it->second.state = (it->second.state == ST_D_META_SYNCED it->second.state = (it->second.state == ST_D_META_SYNCED

View File

@ -89,12 +89,19 @@ static void bs_cleanup(struct thread_data *td)
bs_data *bsd = (bs_data*)td->io_ops_data; bs_data *bsd = (bs_data*)td->io_ops_data;
if (bsd) if (bsd)
{ {
while (!bsd->bs->is_safe_to_stop()) while (1)
{ {
bsd->ringloop->loop(); do
{
bsd->ringloop->loop();
if (bsd->bs->is_safe_to_stop())
goto safe;
} while (bsd->ringloop->loop_again);
bsd->ringloop->wait(); bsd->ringloop->wait();
} }
safe:
delete bsd->bs; delete bsd->bs;
delete bsd->ringloop;
delete bsd; delete bsd;
} }
} }

View File

@ -120,8 +120,8 @@ class ring_loop_t
{ {
std::vector<ring_consumer_t> consumers; std::vector<ring_consumer_t> consumers;
struct ring_data_t *ring_data; struct ring_data_t *ring_data;
bool loop_again;
public: public:
bool loop_again;
struct io_uring ring; struct io_uring ring;
ring_loop_t(int qd); ring_loop_t(int qd);
~ring_loop_t(); ~ring_loop_t();