From 34451b6e448e5968db7d81d2cf8f839b6f5be5ed Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Tue, 12 Nov 2019 13:52:27 +0300 Subject: [PATCH] Try to implement journal flusher as a FSM --- blockstore.h | 7 +- blockstore_stable.cpp | 201 ++++++++++++++++++++++++++++++------------ test.cpp | 10 +-- 3 files changed, 154 insertions(+), 64 deletions(-) diff --git a/blockstore.h b/blockstore.h index f786daa0..283b77c2 100644 --- a/blockstore.h +++ b/blockstore.h @@ -1,6 +1,8 @@ #pragma once +#ifndef _LARGEFILE64_SOURCE #define _LARGEFILE64_SOURCE +#endif #include #include #include @@ -240,9 +242,9 @@ class blockstore // Another option is https://github.com/algorithm-ninja/cpp-btree spp::sparse_hash_map clean_db; std::map dirty_db; - std::list submit_queue; + std::list submit_queue; // FIXME: funny thing is that vector is better here std::vector unsynced_big_writes, unsynced_small_writes; - std::list in_progress_syncs; + std::list in_progress_syncs; // ...and probably here, too uint32_t block_order, block_size; uint64_t block_count; allocator *data_alloc; @@ -265,6 +267,7 @@ class blockstore friend class blockstore_init_meta; friend class blockstore_init_journal; friend class blockstore_journal_check_t; + friend class journal_flusher_t; void calc_lengths(spp::sparse_hash_map & config); void open_data(spp::sparse_hash_map & config); diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index 15f9ccab..7ce8ae05 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -123,17 +123,12 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op if (op->pending_ops == 0) { // First step: mark dirty_db entries as stable, acknowledge op completion - // FIXME: oops... we seem to have to copy object id/version pairs... - // No, no, no, copying is bad. We don't want copying. obj_ver_id* v; int i; for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) { // Mark all dirty_db entries up to op->version as stable - auto dirty_it = dirty_db.find((obj_ver_id){ - .oid = v->oid, - .version = v->version, - }); + auto dirty_it = dirty_db.find(*v); if (dirty_it != dirty_db.end()) { do @@ -146,8 +141,13 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op { dirty_it->second.state = ST_D_STABLE; } + else if (IS_STABLE(dirty_it->second.state)) + { + break; + } dirty_it--; } while (dirty_it != dirty_db.begin() && dirty_it->first.oid == v->oid); + flusher.flush_queue.push_back(*v); } } // Acknowledge op @@ -161,62 +161,149 @@ struct offset_len uint64_t offset, len; }; -struct journal_flusher_t +class journal_flusher_t { - std::deque flush_queue; + blockstore *bs; + int state; obj_ver_id cur; std::map::iterator dirty_it; std::vector v; + std::vector::iterator it; + uint64_t offset, len; + +public: + journal_flusher_t(); + std::deque flush_queue; + void stabilize_object_loop(); }; -void blockstore::stabilize_object(object_id oid, uint64_t max_ver) +#define F_NEXT_OBJ 0 +#define F_NEXT_VER 1 +#define F_FIND_POS 2 +#define F_SUBMIT_FULL 3 +#define F_SUBMIT_PART 4 +#define F_CUT_OFFSET 5 +#define F_FINISH_VER 6 + +journal_flusher_t::journal_flusher_t() { - auto dirty_it = dirty_db.find((obj_ver_id){ - .oid = oid, - .version = max_ver, - }); - if (dirty_it != dirty_db.end()) - { - std::vector v; - do - { - if (dirty_it->second.state == ST_J_STABLE) - { - uint64_t offset = dirty_it->second.offset, len = dirty_it->second.size; - auto it = v.begin(); - while (1) - { - for (; it != v.end(); it++) - if (it->offset >= offset) - break; - if (it == v.end() || it->offset >= offset+len) - { - v.insert(it, (offset_len){ .offset = offset, .len = len }); - break; - } - else - { - if (it->offset > offset) - v.insert(it, (offset_len){ .offset = offset, .len = it->offset-offset }); - if (offset+len > it->offset+it->len) - { - len = offset+len - (it->offset+it->len); - offset = it->offset+it->len; - } - else - break; - } - } - } - else if (dirty_it->second.state == ST_D_STABLE) - { - - break; - } - else if (IS_STABLE(dirty_it->second.state)) - { - break; - } - } while (dirty_it != dirty_db.begin()); - } + state = F_NEXT_OBJ; +} + +// It would be prettier as a coroutine (maybe https://github.com/hnes/libaco ?) +// Now it's a state machine +void journal_flusher_t::stabilize_object_loop() +{ +begin: + if (state == F_NEXT_OBJ) + { + // Pick next object + if (!flush_queue.size()) + return; + while (1) + { + cur = flush_queue.front(); + flush_queue.pop_front(); + dirty_it = bs->dirty_db.find(cur); + if (dirty_it != bs->dirty_db.end()) + { + state = F_NEXT_VER; + v.clear(); + break; + } + else if (flush_queue.size() == 0) + return; + } + } + if (state == F_NEXT_VER) + { + if (dirty_it->second.state == ST_J_STABLE) + { + offset = dirty_it->second.offset; + len = dirty_it->second.size; + it = v.begin(); + state = F_FIND_POS; + } + else if (dirty_it->second.state == ST_D_STABLE) + { + + state = F_NEXT_OBJ; + } + else if (IS_STABLE(dirty_it->second.state)) + { + state = F_NEXT_OBJ; + } + else + state = F_FINISH_VER; + } + if (state == F_FIND_POS) + { + for (; it != v.end(); it++) + if (it->offset >= offset) + break; + if (it == v.end() || it->offset >= offset+len) + { + state = F_SUBMIT_FULL; + } + else + { + if (it->offset > offset) + state = F_SUBMIT_PART; + else + state = F_CUT_OFFSET; + } + } + if (state == F_SUBMIT_FULL) + { + struct io_uring_sqe *sqe = get_sqe(); + if (!sqe) + return; + struct ring_data_t *data = ((ring_data_t*)sqe->user_data); + data->iov = (struct iovec){ malloc(len), len }; + data->op = op; // FIXME OOPS + io_uring_prep_readv( + sqe, journal_fd, &data->iov, 1, journal_offset + dirty_it->second.location + offset + ); + op->pending_ops = 1; + v.insert(it, (offset_len){ .offset = offset, .len = len }); + state = F_SUBMIT_FULL_WRITE; + return; + } + if (state == F_SUBMIT_FULL_WRITE) + { + struct io_uring_sqe *sqe = get_sqe(); + if (!sqe) + return; + struct ring_data_t *data = ((ring_data_t*)sqe->user_data); + + } + if (state == F_SUBMIT_PART) + { + if (!can_submit) + { + return; + } + v.insert(it, (offset_len){ .offset = offset, .len = it->offset-offset }); + state = F_CUT_OFFSET; + } + if (state == F_CUT_OFFSET) + { + if (offset+len > it->offset+it->len) + { + len = offset+len - (it->offset+it->len); + offset = it->offset+it->len; + state = F_FIND_POS; + } + else + state = F_FINISH_VER; + } + if (state == F_FINISH_VER) + { + dirty_it--; + if (dirty_it == bs->dirty_db.begin() || dirty_it->first.oid != cur.oid) + state = F_NEXT_OBJ; + else + state = F_NEXT_VER; + } + goto begin; } diff --git a/test.cpp b/test.cpp index 2e039f2b..1501cb07 100644 --- a/test.cpp +++ b/test.cpp @@ -134,7 +134,7 @@ int main_vec(int argc, char *argv[]) for (; it != v.end(); it++) if (it->iov_len >= r) break; - v.insert(it, (iovec){ .iov_base = 0, .iov_len = r }); + v.insert(it, (iovec){ .iov_base = 0, .iov_len = (size_t)r }); } } return 0; @@ -150,7 +150,7 @@ int main_map(int argc, char *argv[]) for (int i = 0; i < 2048; i++) { int r = rand(); - v[r] = (iovec){ .iov_base = 0, .iov_len = r }; + v[r] = (iovec){ .iov_base = 0, .iov_len = (size_t)r }; } } return 0; @@ -169,14 +169,14 @@ int main0(int argc, char *argv[]) { dirty_db[(obj_ver_id){ .oid = (object_id){ - .inode = rand(), - .stripe = i, + .inode = (uint64_t)rand(), + .stripe = (uint64_t)i, }, .version = 1, }] = (dirty_entry){ .state = ST_D_META_SYNCED, .flags = 0, - .location = i << 17, + .location = (uint64_t)i << 17, .offset = 0, .size = 1 << 17, };