From 153de65ce710ec9668495351edc4f7913bfdc874 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 8 Nov 2019 14:10:24 +0300 Subject: [PATCH] Begin sync implementation --- blockstore.cpp | 22 +++++++++++++++- blockstore.h | 15 +++++++++-- blockstore_write.cpp | 63 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 97 insertions(+), 3 deletions(-) diff --git a/blockstore.cpp b/blockstore.cpp index c4e66afc..8ec4486e 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -115,6 +115,10 @@ void blockstore::handle_event(ring_data_t *data) op->retval = op->len; op->callback(op); in_process_ops.erase(op); + unsynced_writes.push_back((obj_ver_id){ + .oid = op->oid, + .version = op->version, + }); } } else if ((op->flags & OP_TYPE_MASK) == OP_SYNC) @@ -165,6 +169,7 @@ void blockstore::loop() { // try to submit ops auto cur = submit_queue.begin(); + bool has_writes = false; while (cur != submit_queue.end()) { auto op_ptr = cur; @@ -243,6 +248,7 @@ void blockstore::loop() // ring is full, stop submission break; } + has_writes = true; } else if ((op->flags & OP_TYPE_MASK) == OP_SYNC) { @@ -250,7 +256,21 @@ void blockstore::loop() // wait for all big writes to complete, submit data device fsync // wait for the data device fsync to complete, then submit journal writes for big writes // then submit an fsync operation - + if (has_writes) + { + // Can't submit SYNC before previous writes + continue; + } + int dequeue_op = dequeue_sync(op); + if (dequeue_op) + { + submit_queue.erase(op_ptr); + } + else if (op->wait_for == WAIT_SQE) + { + // ring is full, stop submission + break; + } } else if ((op->flags & OP_TYPE_MASK) == OP_STABLE) { diff --git a/blockstore.h b/blockstore.h index 9705385e..28badf76 100644 --- a/blockstore.h +++ b/blockstore.h @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -53,6 +54,8 @@ #define IS_IN_FLIGHT(st) (st == ST_IN_FLIGHT || st == ST_J_SUBMITTED || st == ST_D_SUBMITTED || st == ST_DEL_SUBMITTED) #define IS_STABLE(st) (st >= ST_J_STABLE && st <= ST_J_MOVE_SYNCED || st >= ST_D_STABLE && st <= ST_D_META_COMMITTED || st >= ST_DEL_STABLE && st <= ST_DEL_MOVED || st == ST_CURRENT) #define IS_JOURNAL(st) (st >= ST_J_SUBMITTED && st <= ST_J_MOVE_SYNCED) +#define IS_BIG_WRITE(st) (st >= ST_D_SUBMITTED && st <= ST_D_META_COMMITTED) +#define IS_UNSYNCED(st) (st == ST_J_WRITTEN || st >= ST_D_WRITTEN && st <= ST_D_META_WRITTEN || st == ST_DEL_WRITTEN) // Default object size is 128 KB #define DEFAULT_ORDER 17 @@ -182,11 +185,16 @@ struct blockstore_operation uint8_t *buf; int retval; - std::map read_vec; - int pending_ops; + // Wait status int wait_for; uint64_t wait_detail; + int pending_ops; + + // FIXME make all of these pointers and put them into a union + std::map read_vec; uint64_t used_journal_sector; + std::deque sync_writes; + bool has_big_writes; }; class blockstore; @@ -197,9 +205,11 @@ class blockstore { struct ring_consumer_t ring_consumer; public: + // Another option is https://github.com/algorithm-ninja/cpp-btree spp::sparse_hash_map object_db; std::map dirty_db; std::list submit_queue; + std::deque unsynced_writes; std::set in_process_ops; uint32_t block_order, block_size; uint64_t block_count; @@ -252,4 +262,5 @@ public: int dequeue_write(blockstore_operation *op); // Sync + int dequeue_sync(blockstore_operation *op); }; diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 463ef09a..25b95ebe 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -142,3 +142,66 @@ int blockstore::dequeue_write(blockstore_operation *op) } return 1; } + +int blockstore::dequeue_sync(blockstore_operation *op) +{ + op->has_big_writes = 0x10000; + op->sync_writes.swap(unsynced_writes); + unsynced_writes.clear(); + auto it = sync_writes.begin(); + while (it != sync_writes.end()) + { + uint32_t state = dirty_db[*it].state; + if (IS_BIG_WRITE(state)) + { + op->has_big_writes = op->has_big_writes < state ? op->has_big_writes : state; + } + it++; + } + if (op->has_big_writes == 0x10000 || op->has_big_writes == ST_D_META_WRITTEN) + { + // Just fsync the journal + struct io_uring_sqe *sqe = get_sqe(); + if (!sqe) + { + // Pause until there are more requests available + op->wait_for = WAIT_SQE; + op->wait_detail = 1; + return 0; + } + struct ring_data_t *data = ((ring_data_t*)sqe->user_data); + io_uring_prep_fsync(sqe, journal.fd, 0); + data->op = op; + op->pending_ops = 1; + } + else if (op->has_big_writes == ST_D_WRITTEN) + { + // FIXME: try to remove duplicated get_sqe+!sqe+data code + // 1st step: fsync data + struct io_uring_sqe *sqe = get_sqe(); + if (!sqe) + { + // Pause until there are more requests available + op->wait_for = WAIT_SQE; + op->wait_detail = 1; + return 0; + } + struct ring_data_t *data = ((ring_data_t*)sqe->user_data); + io_uring_prep_fsync(sqe, data_fd, 0); + data->op = op; + op->pending_ops = 1; + } + else if (op->has_big_writes == ST_D_SYNCED) + { + // 2nd step: Data device is synced, prepare & write journal entries + + } + // FIXME: try to remove this duplicated code, too + in_process_ops.insert(op); + int ret = ringloop->submit(); + if (ret < 0) + { + throw new std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret)); + } + return 1; +}