diff --git a/blockstore.cpp b/blockstore.cpp index a262c2b5..87f73b80 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -160,6 +160,7 @@ void blockstore::loop() } } } + flusher->loop(); } } diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index ba1f193b..7e48c580 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -4,10 +4,13 @@ journal_flusher_t::journal_flusher_t(int flusher_count, blockstore *bs) { this->bs = bs; this->flusher_count = flusher_count; - this->active_flushers = 0; - this->active_until_sync = 0; - this->sync_required = true; - this->sync_threshold = flusher_count == 1 ? 1 : flusher_count/2; + active_flushers = 0; + active_until_sync = 0; + sync_required = true; + sync_threshold = flusher_count == 1 ? 1 : flusher_count/2; + journal_trim_interval = sync_threshold; + journal_trim_counter = 0; + journal_superblock = (uint8_t*)memalign(512, 512); co = new journal_flusher_co[flusher_count]; for (int i = 0; i < flusher_count; i++) { @@ -31,6 +34,7 @@ journal_flusher_co::journal_flusher_co() journal_flusher_t::~journal_flusher_t() { + free(journal_superblock); delete[] co; } @@ -110,6 +114,10 @@ void journal_flusher_co::loop() goto resume_10; else if (wait_state == 11) goto resume_11; + else if (wait_state == 12) + goto resume_12; + else if (wait_state == 13) + goto resume_13; resume_0: if (!flusher->flush_queue.size()) return; @@ -276,8 +284,8 @@ resume_0: .oid = cur.oid, .version = cur.version, }; - // I consider unordered writes to data & metadata safe here, because - // "dirty" entries always override "clean" entries in our case + // I consider unordered writes to data & metadata safe here + // BUT it requires that journal entries even older than clean_db should be replayed after restart await_sqe(6); data->iov = (struct iovec){ meta_it->second.buf, 512 }; data->callback = simple_callback; @@ -373,7 +381,57 @@ resume_0: if (dirty_it->first.oid != cur.oid) dirty_it++; bs->dirty_db.erase(dirty_it, std::next(dirty_end)); - // FIXME: ...and clear unused part of the journal (with some interval, not for every flushed op) + // Clear unused part of the journal every flushes + if (!((++flusher->journal_trim_counter) % flusher->journal_trim_interval)) + { + flusher->journal_trim_counter = 0; + journal_used_it = bs->journal.used_sectors.lower_bound(bs->journal.used_start); + if (journal_used_it == bs->journal.used_sectors.end()) + { + // Journal is cleared to its end, restart from the beginning + journal_used_it = bs->journal.used_sectors.begin(); + if (journal_used_it == bs->journal.used_sectors.end()) + { + // Journal is empty + bs->journal.used_start = bs->journal.next_free; + } + else + { + bs->journal.used_start = journal_used_it->first; + } + } + else if (journal_used_it->first > bs->journal.used_start) + { + // Journal is cleared up to + bs->journal.used_start = journal_used_it->first; + } + else + { + // Can't trim journal + goto do_not_trim; + } + // Update journal "superblock" + await_sqe(12); + data->callback = simple_callback; + *((journal_entry_start*)flusher->journal_superblock) = { + .crc32 = 0, + .magic = JOURNAL_MAGIC, + .type = JE_START, + .size = sizeof(journal_entry_start), + .reserved = 0, + .journal_start = bs->journal.used_start, + }; + ((journal_entry_start*)flusher->journal_superblock)->crc32 = je_crc32((journal_entry*)flusher->journal_superblock); + data->iov = (struct iovec){ flusher->journal_superblock, 512 }; + io_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset); + wait_count++; + wait_state = 13; + resume_13: + if (wait_count > 0) + return; + } + do_not_trim: + // All done wait_state = 0; flusher->active_flushers--; repeat_it = flusher->sync_to_repeat.find(cur.oid); diff --git a/blockstore_flush.h b/blockstore_flush.h index 18142a98..bde09ee6 100644 --- a/blockstore_flush.h +++ b/blockstore_flush.h @@ -37,6 +37,7 @@ class journal_flusher_co uint64_t offset, len, submit_len, clean_loc, meta_sector, meta_pos; std::map::iterator meta_it; std::map::iterator repeat_it; + std::map::iterator journal_used_it; std::function simple_callback; std::list::iterator cur_sync; friend class journal_flusher_t; @@ -55,6 +56,9 @@ class journal_flusher_t blockstore *bs; friend class journal_flusher_co; + int journal_trim_counter, journal_trim_interval; + uint8_t* journal_superblock; + int active_flushers, active_until_sync; std::list syncs; std::map sync_to_repeat; diff --git a/blockstore_init.cpp b/blockstore_init.cpp index f50bf9ff..9de3b1cc 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -142,8 +142,9 @@ void blockstore_init_journal::handle_event(ring_data_t *data) throw new std::runtime_error("first entry of the journal is corrupt"); } journal_pos = bs->journal.used_start = je->journal_start; - crc32_last = je->crc32_replaced; + crc32_last = 0; step = 2; + started = false; } } else if (step == 2 || step == 3) @@ -271,7 +272,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) { journal_entry *je = (journal_entry*)((uint8_t*)buf + total_pos + pos); if (je->magic != JOURNAL_MAGIC || je_crc32(je) != je->crc32 || - je->type < JE_SMALL_WRITE || je->type > JE_DELETE || je->crc32_prev != crc32_last) + je->type < JE_SMALL_WRITE || je->type > JE_DELETE || started && je->crc32_prev != crc32_last) { if (pos == 0) { @@ -286,7 +287,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) break; } } - bs->journal.used_sectors[total_pos]++; + started = true; pos += je->size; crc32_last = je->crc32; if (je->type == JE_SMALL_WRITE) @@ -306,10 +307,11 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) // FIXME: OOPS. Please don't modify total_pos here total_pos += je->small_write.len; } - bs->dirty_db.emplace((obj_ver_id){ + obj_ver_id ov = { .oid = je->small_write.oid, .version = je->small_write.version, - }, (dirty_entry){ + }; + bs->dirty_db.emplace(ov, (dirty_entry){ .state = ST_J_SYNCED, .flags = 0, .location = location, @@ -317,14 +319,17 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) .len = je->small_write.len, .journal_sector = total_pos, }); + bs->journal.used_sectors[total_pos]++; + bs->flusher->queue_flush(ov); } else if (je->type == JE_BIG_WRITE) { // oid, version, block - bs->dirty_db.emplace((obj_ver_id){ + obj_ver_id ov = { .oid = je->big_write.oid, .version = je->big_write.version, - }, (dirty_entry){ + }; + bs->dirty_db.emplace(ov, (dirty_entry){ .state = ST_D_META_SYNCED, .flags = 0, .location = je->big_write.location, @@ -332,14 +337,17 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) .len = bs->block_size, .journal_sector = total_pos, }); + bs->journal.used_sectors[total_pos]++; + bs->flusher->queue_flush(ov); } else if (je->type == JE_STABLE) { // oid, version - auto it = bs->dirty_db.find((obj_ver_id){ + obj_ver_id ov = { .oid = je->stable.oid, .version = je->stable.version, - }); + }; + auto it = bs->dirty_db.find(ov); if (it == bs->dirty_db.end()) { // journal contains a legitimate STABLE entry for a non-existing dirty write @@ -356,10 +364,11 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) else if (je->type == JE_DELETE) { // oid, version - bs->dirty_db.emplace((obj_ver_id){ + obj_ver_id ov = { .oid = je->del.oid, .version = je->del.version, - }, (dirty_entry){ + }; + bs->dirty_db.emplace(ov, (dirty_entry){ .state = ST_DEL_SYNCED, .flags = 0, .location = 0, @@ -367,6 +376,8 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) .len = 0, .journal_sector = total_pos, }); + bs->journal.used_sectors[total_pos]++; + bs->flusher->queue_flush(ov); } } } diff --git a/blockstore_init.h b/blockstore_init.h index ef2423c1..2a0d1ab7 100644 --- a/blockstore_init.h +++ b/blockstore_init.h @@ -19,6 +19,7 @@ class blockstore_init_journal uint8_t *journal_buffer = NULL; int step = 0; uint32_t crc32_last = 0; + bool started = false; uint64_t done_pos = 0, journal_pos = 0; uint64_t cur_skip = 0; bool wrapped = false; diff --git a/blockstore_journal.h b/blockstore_journal.h index 0413b63e..9a21ada4 100644 --- a/blockstore_journal.h +++ b/blockstore_journal.h @@ -22,7 +22,7 @@ struct __attribute__((__packed__)) journal_entry_start uint16_t magic; uint16_t type; uint32_t size; - uint32_t crc32_replaced; + uint32_t reserved; uint64_t journal_start; };