diff --git a/blockstore.h b/blockstore.h index 09b71284..7673334a 100644 --- a/blockstore.h +++ b/blockstore.h @@ -279,6 +279,7 @@ class blockstore friend class blockstore_init_journal; friend class blockstore_journal_check_t; friend class journal_flusher_t; + friend class journal_flusher_co; void calc_lengths(spp::sparse_hash_map & config); void open_data(spp::sparse_hash_map & config); diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index eabf8f6f..3904fd36 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -12,7 +12,7 @@ // 512b+4K (journal) + sync + 512b (journal) + sync + 4K (data) [+ sync?] + 512b (metadata) + sync. // WA = 2.375. It's not the best, SSD FTL-like redirect-write with defragmentation // could probably be lower even with defragmentation. But it's fixed and it's still -// better than in Ceph. :) +// better than in Ceph. :) except for HDD-only clusters, because each write results in 3 seeks. // Stabilize big write: // 1) Copy metadata from the journal to the metadata device @@ -29,6 +29,7 @@ // just before submitting fsync // 3) it submits syncs to blockstore and peers // 4) after everyone acks sync it takes the object list and sends stabilize requests to everyone +// 5) after everyone acks stabilize requests it acks the client's sync operation int blockstore::dequeue_stable(blockstore_operation *op) { @@ -170,9 +171,13 @@ struct meta_sector_t int usage_count; }; -class journal_flusher_t +class journal_flusher_t; + +// Journal flusher coroutine +class journal_flusher_co { blockstore *bs; + journal_flusher_t *flusher; int wait_state, wait_count; struct io_uring_sqe *sqe; struct ring_data_t *data; @@ -183,19 +188,59 @@ class journal_flusher_t std::vector::iterator it; uint64_t offset, len, submit_len, clean_loc, meta_sector, meta_pos; std::map::iterator meta_it; - + friend class journal_flusher_t; public: - journal_flusher_t(int flush_count); - std::map meta_sectors; - std::deque flush_queue; void loop(); }; -journal_flusher_t::journal_flusher_t(int flusher_count) +// Journal flusher itself +class journal_flusher_t { + int flusher_count; + int active_flushers; + journal_flusher_co *co; + blockstore *bs; + friend class journal_flusher_co; +public: + std::map meta_sectors; + std::deque flush_queue; + journal_flusher_t(int flusher_count, blockstore *bs); + ~journal_flusher_t(); + void loop(); +}; + +journal_flusher_t::journal_flusher_t(int flusher_count, blockstore *bs) +{ + this->bs = bs; + this->flusher_count = flusher_count; + this->active_flushers = 0; + co = new journal_flusher_co[flusher_count]; + for (int i = 0; i < flusher_count; i++) + { + co[i].bs = bs; + co[i].wait_state = 0; + co[i].flusher = this; + } +} + +journal_flusher_t::~journal_flusher_t() +{ + delete[] co; } void journal_flusher_t::loop() +{ + if (!active_flushers && !flush_queue.size()) + { + return; + } + for (int i = 0; i < flusher_count; i++) + { + co[i].loop(); + } +} + +void journal_flusher_co::loop() { // This is much better than implementing the whole function as an FSM // Maybe I should consider a coroutine library like https://github.com/hnes/libaco ... @@ -213,13 +258,14 @@ void journal_flusher_t::loop() goto resume_6; else if (wait_state == 7) goto resume_7; - if (!flush_queue.size()) + if (!flusher->flush_queue.size()) return; - cur = flush_queue.front(); - flush_queue.pop_front(); + cur = flusher->flush_queue.front(); + flusher->flush_queue.pop_front(); dirty_it = bs->dirty_db.find(cur); if (dirty_it != bs->dirty_db.end()) { + flusher->active_flushers++; v.clear(); wait_count = 0; clean_loc = UINT64_MAX; @@ -299,11 +345,11 @@ void journal_flusher_t::loop() // so I'll avoid it as long as I can. meta_sector = (clean_loc / (512 / sizeof(clean_disk_entry))) * 512; meta_pos = (clean_loc % (512 / sizeof(clean_disk_entry))); - meta_it = meta_sectors.find(meta_sector); - if (meta_it == meta_sectors.end()) + meta_it = flusher->meta_sectors.find(meta_sector); + if (meta_it == flusher->meta_sectors.end()) { // Not in memory yet, read it - meta_it = meta_sectors.emplace(meta_sector, (meta_sector_t){ + meta_it = flusher->meta_sectors.emplace(meta_sector, (meta_sector_t){ .offset = meta_sector, .len = 512, .state = 0, // 0 = not read yet @@ -387,7 +433,7 @@ void journal_flusher_t::loop() if (meta_it->second.usage_count == 0) { free(meta_it->second.buf); - meta_sectors.erase(meta_it); + flusher->meta_sectors.erase(meta_it); } for (it = v.begin(); it != v.end(); it++) { @@ -395,6 +441,7 @@ void journal_flusher_t::loop() } v.clear(); wait_state = 0; + flusher->active_flushers--; } // FIXME Now sync everything }