From 299b7288d546b12ef807215799700fffcc8973d9 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Wed, 20 Nov 2019 00:46:44 +0300 Subject: [PATCH] Fix journal loading --- Makefile | 2 +- blockstore_flush.cpp | 2 +- blockstore_init.cpp | 63 ++++++++++++++++++++++++-------------------- blockstore_init.h | 1 + blockstore_journal.h | 4 ++- blockstore_write.cpp | 4 ++- 6 files changed, 44 insertions(+), 32 deletions(-) diff --git a/Makefile b/Makefile index 398abcf39..4fe2ecd8c 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ clean: rm -f *.o crc32c.o: crc32c.c g++ -c -o $@ $< -%.o: %.cpp blockstore.h +%.o: %.cpp allocator.h blockstore_flush.h blockstore.h blockstore_init.h blockstore_journal.h crc32c.h ringloop.h xor.h g++ -g -Wall -Wno-sign-compare -Wno-parentheses -c -o $@ $< test: test.cpp g++ -g -O3 -o test -luring test.cpp diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index 80dc5096c..94b66b4bd 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -184,7 +184,7 @@ resume_0: } else if (dirty_it->second.state == ST_D_STABLE) { - // There is an unflushed big write. Overwrite it with small writes + // There is an unflushed big write. Copy small writes in its position if (!skip_copy) { clean_loc = dirty_it->second.location; diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 96856ce52..07b82c908 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -232,11 +232,7 @@ resume_1: wait_state = 2; return 1; } - if (wrapped && journal_pos >= bs->journal.used_start) - { - break; - } - else + if (!wrapped || journal_pos < bs->journal.used_start) { GET_SQE(); uint64_t end = bs->journal.len; @@ -263,8 +259,13 @@ resume_1: return 1; } } + if (!submitted) + { + break; + } } } + printf("Journal entries loaded: %d\n", entries_loaded); free(journal_buffer); bs->journal.crc32_last = crc32_last; journal_buffer = NULL; @@ -273,27 +274,26 @@ resume_1: int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) { - uint64_t total_pos = 0; + uint64_t buf_pos = 0; if (cur_skip >= 0) { - total_pos = cur_skip; + buf_pos = cur_skip; cur_skip = 0; } - while (total_pos < len) + while (buf_pos < len) { - total_pos += 512; - uint64_t pos = 0; + uint64_t proc_pos = buf_pos, pos = 0; + buf_pos += 512; while (pos < 512) { - journal_entry *je = (journal_entry*)((uint8_t*)buf + total_pos + pos); + journal_entry *je = (journal_entry*)((uint8_t*)buf + proc_pos + pos); if (je->magic != JOURNAL_MAGIC || je_crc32(je) != je->crc32 || je->type < JE_SMALL_WRITE || je->type > JE_DELETE || started && je->crc32_prev != crc32_last) { if (pos == 0) { // invalid entry in the beginning, this is definitely the end of the journal - // FIXME handle the edge case when the journal is full - bs->journal.next_free = done_pos + total_pos; + bs->journal.next_free = done_pos + proc_pos; return 0; } else @@ -309,18 +309,26 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) { // oid, version, offset, len uint64_t location; - if (cur_skip > 0 || done_pos + total_pos + je->small_write.len > bs->journal.len) + if (cur_skip > 0 || done_pos + buf_pos + je->small_write.len > bs->journal.len) { // data continues from the beginning of the journal + if (buf_pos > len) + { + // if something is already skipped, skip everything until the end of the journal + buf_pos = bs->journal.len-done_pos; + } location = 512 + cur_skip; cur_skip += je->small_write.len; } else { // data is right next - location = done_pos + total_pos; - // FIXME: OOPS. Please don't modify total_pos here - total_pos += je->small_write.len; + location = done_pos + buf_pos; + buf_pos += je->small_write.len; + } + if (location != je->small_write.data_offset) + { + throw new std::runtime_error("BUG: calculated journal data offset != stored journal data offset"); } obj_ver_id ov = { .oid = je->small_write.oid, @@ -332,10 +340,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) .location = location, .offset = je->small_write.offset, .len = je->small_write.len, - .journal_sector = total_pos, + .journal_sector = proc_pos, }); - bs->journal.used_sectors[total_pos]++; - bs->flusher->queue_flush(ov); + bs->journal.used_sectors[proc_pos]++; } else if (je->type == JE_BIG_WRITE) { @@ -350,10 +357,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) .location = je->big_write.location, .offset = 0, .len = bs->block_size, - .journal_sector = total_pos, + .journal_sector = proc_pos, }); - bs->journal.used_sectors[total_pos]++; - bs->flusher->queue_flush(ov); + bs->journal.used_sectors[proc_pos]++; } else if (je->type == JE_STABLE) { @@ -374,6 +380,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) it->second.state = (it->second.state == ST_D_META_SYNCED ? ST_D_STABLE : (it->second.state == ST_DEL_SYNCED ? ST_DEL_STABLE : ST_J_STABLE)); + bs->flusher->queue_flush(ov); } } else if (je->type == JE_DELETE) @@ -389,16 +396,16 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) .location = 0, .offset = 0, .len = 0, - .journal_sector = total_pos, + .journal_sector = proc_pos, }); - bs->journal.used_sectors[total_pos]++; - bs->flusher->queue_flush(ov); + bs->journal.used_sectors[proc_pos]++; } + entries_loaded++; } } - if (cur_skip == 0 && total_pos > len) + if (buf_pos > len) { - cur_skip = total_pos - len; + cur_skip = buf_pos - len; } return 1; } diff --git a/blockstore_init.h b/blockstore_init.h index 90f2a0cab..a9ac495a4 100644 --- a/blockstore_init.h +++ b/blockstore_init.h @@ -17,6 +17,7 @@ class blockstore_init_journal { blockstore *bs; int wait_state = 0, wait_count = 0; + int entries_loaded = 0; uint8_t *journal_buffer = NULL; uint32_t crc32_last = 0; bool started = false; diff --git a/blockstore_journal.h b/blockstore_journal.h index 81933ec92..a68de6d59 100644 --- a/blockstore_journal.h +++ b/blockstore_journal.h @@ -37,7 +37,9 @@ struct __attribute__((__packed__)) journal_entry_small_write uint64_t version; uint32_t offset; uint32_t len; - // small_write entries contain bytes of data, but data is stored in the next journal sector + // small_write entries contain bytes of data which is stored in next sectors + // data_offset is its offset within journal + uint64_t data_offset; }; struct __attribute__((__packed__)) journal_entry_big_write diff --git a/blockstore_write.cpp b/blockstore_write.cpp index b08b6c455..2a61d6683 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -132,16 +132,18 @@ int blockstore::dequeue_write(blockstore_operation *op) prefill_single_journal_entry(journal, JE_SMALL_WRITE, sizeof(struct journal_entry_small_write)); dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset; journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++; + // Figure out where data will be + journal.next_free = (journal.next_free + op->len) < journal.len ? journal.next_free : 512; je->oid = op->oid; je->version = op->version; je->offset = op->offset; je->len = op->len; + je->data_offset = journal.next_free; je->crc32 = je_crc32((journal_entry*)je); journal.crc32_last = je->crc32; prepare_journal_sector_write(journal, sqe1, cb); op->min_used_journal_sector = op->max_used_journal_sector = 1 + journal.cur_sector; // Prepare journal data write - journal.next_free = (journal.next_free + op->len) < journal.len ? journal.next_free : 512; data2->iov = (struct iovec){ op->buf, op->len }; data2->callback = cb; my_uring_prep_writev(