diff --git a/blockstore.cpp b/blockstore.cpp index c0f176d3..5aaad1b9 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -91,7 +91,6 @@ void blockstore::loop() delete journal_init_reader; journal_init_reader = NULL; initialized = 10; - printf("journal read\n"); } } } diff --git a/blockstore.h b/blockstore.h index 8101398a..827730df 100644 --- a/blockstore.h +++ b/blockstore.h @@ -270,6 +270,8 @@ class blockstore uint64_t meta_offset, meta_size, meta_area, meta_len; uint64_t data_offset, data_size, data_len; + // FIXME: add readonly option + struct journal_t journal; journal_flusher_t *flusher; diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 7f761b3b..0ab6e244 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -137,17 +137,18 @@ bool iszero(uint64_t *buf, int len) void blockstore_init_journal::handle_event(ring_data_t *data1) { - // Step 3: Read journal - if (data1->res < 0) + if (data1->res <= 0) { throw std::runtime_error( std::string("read journal failed at offset ") + std::to_string(journal_pos) + std::string(": ") + strerror(-data1->res) ); } - done_pos = journal_pos; - done_buf = submitted; - done_len = data1->res; + done.push_back({ + .buf = submitted_buf, + .pos = journal_pos, + .len = (uint64_t)data1->res, + }); journal_pos += data1->res; if (journal_pos >= bs->journal.len) { @@ -155,7 +156,7 @@ void blockstore_init_journal::handle_event(ring_data_t *data1) journal_pos = 512; wrapped = true; } - submitted = 0; + submitted_buf = NULL; } #define GET_SQE() \ @@ -174,21 +175,23 @@ int blockstore_init_journal::loop() goto resume_3; else if (wait_state == 4) goto resume_4; + else if (wait_state == 5) + goto resume_5; printf("Reading blockstore journal\n"); if (!bs->journal.inmemory) { - journal_buffer = (uint8_t*)memalign(DISK_ALIGNMENT, 2*JOURNAL_BUFFER_SIZE); - if (!journal_buffer) + submitted_buf = memalign(512, 1024); + if (!submitted_buf) throw std::bad_alloc(); } else - journal_buffer = bs->journal.buffer; + submitted_buf = bs->journal.buffer; // Read first block of the journal sqe = bs->get_sqe(); if (!sqe) throw std::runtime_error("io_uring is full while trying to read journal"); data = ((ring_data_t*)sqe->user_data); - data->iov = { journal_buffer, 512 }; + data->iov = { submitted_buf, 512 }; data->callback = simple_callback; my_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset); bs->ringloop->submit(); @@ -199,7 +202,7 @@ resume_1: wait_state = 1; return 1; } - if (iszero((uint64_t*)journal_buffer, 3)) + if (iszero((uint64_t*)submitted_buf, 3)) { // Journal is empty // FIXME handle this wrapping to 512 better @@ -209,8 +212,8 @@ resume_1: // Cool effect. Same operations result in journal replay. // FIXME: Randomize initial crc32. Track crc32 when trimming. GET_SQE(); - memset(journal_buffer, 0, 1024); - *((journal_entry_start*)journal_buffer) = { + memset(submitted_buf, 0, 1024); + *((journal_entry_start*)submitted_buf) = { .crc32 = 0, .magic = JOURNAL_MAGIC, .type = JE_START, @@ -218,8 +221,8 @@ resume_1: .reserved = 0, .journal_start = 512, }; - ((journal_entry_start*)journal_buffer)->crc32 = je_crc32((journal_entry*)journal_buffer); - data->iov = (struct iovec){ journal_buffer, 1024 }; + ((journal_entry_start*)submitted_buf)->crc32 = je_crc32((journal_entry*)submitted_buf); + data->iov = (struct iovec){ submitted_buf, 1024 }; data->callback = simple_callback; my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset); wait_count++; @@ -236,11 +239,13 @@ resume_1: wait_state = 4; return 1; } + if (!bs->journal.inmemory) + free(submitted_buf); } else { // First block always contains a single JE_START entry - je_start = (journal_entry_start*)journal_buffer; + je_start = (journal_entry_start*)submitted_buf; if (je_start->magic != JOURNAL_MAGIC || je_start->type != JE_START || je_start->size != sizeof(journal_entry_start) || @@ -250,12 +255,15 @@ resume_1: throw std::runtime_error("first entry of the journal is corrupt"); } next_free = journal_pos = bs->journal.used_start = je_start->journal_start; + if (!bs->journal.inmemory) + free(submitted_buf); + submitted_buf = NULL; crc32_last = 0; // Read journal while (1) { resume_2: - if (submitted) + if (submitted_buf) { wait_state = 2; return 1; @@ -265,31 +273,71 @@ resume_1: GET_SQE(); uint64_t end = bs->journal.len; if (journal_pos < bs->journal.used_start) - { end = bs->journal.used_start; - } + if (!bs->journal.inmemory) + submitted_buf = memalign(512, JOURNAL_BUFFER_SIZE); + else + submitted_buf = bs->journal.buffer + journal_pos; data->iov = { - journal_buffer + (bs->journal.inmemory ? journal_pos : (done_buf == 1 ? JOURNAL_BUFFER_SIZE : 0)), + submitted_buf, end - journal_pos < JOURNAL_BUFFER_SIZE ? end - journal_pos : JOURNAL_BUFFER_SIZE, }; data->callback = [this](ring_data_t *data1) { handle_event(data1); }; my_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + journal_pos); bs->ringloop->submit(); - submitted = done_buf == 1 ? 2 : 1; } - if (done_buf && handle_journal_part(journal_buffer + (bs->journal.inmemory - ? done_pos - : (done_buf == 1 ? 0 : JOURNAL_BUFFER_SIZE)), done_len) == 0) + while (done.size() > 0) { - // journal ended. wait for the next read to complete, then stop - resume_3: - if (submitted) + handle_res = handle_journal_part(done[0].buf, done[0].pos, done[0].len); + if (handle_res == 0) { - wait_state = 3; - return 1; + // journal ended + // zero out corrupted entry, if required + if (init_write_buf) + { + GET_SQE(); + data->iov = { init_write_buf, 512 }; + data->callback = simple_callback; + wait_count++; + my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + init_write_sector); + bs->ringloop->submit(); + resume_5: + if (wait_count > 0) + { + wait_state = 5; + return 1; + } + } + // wait for the next read to complete, then stop + resume_3: + if (submitted_buf) + { + wait_state = 3; + return 1; + } + // free buffers + if (!bs->journal.inmemory) + for (auto & e: done) + free(e.buf); + done.clear(); + break; + } + else if (handle_res == 1) + { + // OK, remove it + if (!bs->journal.inmemory) + { + free(done[0].buf); + } + done.erase(done.begin()); + } + else if (handle_res == 2) + { + // Need to wait for more reads + break; } } - if (!submitted) + if (!submitted_buf) { break; } @@ -298,25 +346,30 @@ resume_1: // Trim journal on start so we don't stall when all entries are older bs->journal.trim(); printf("Journal entries loaded: %lu, free blocks: %lu / %lu\n", entries_loaded, bs->data_alloc->get_free_count(), bs->block_count); - if (!bs->journal.inmemory) - { - free(journal_buffer); - } bs->journal.crc32_last = crc32_last; - journal_buffer = NULL; return 0; } -int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) +int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, uint64_t len) { + uint64_t proc_pos, pos; + if (continue_pos != 0) + { + proc_pos = (continue_pos / 512) * 512; + pos = continue_pos % 512; + continue_pos = 0; + goto resume; + } while (next_free >= done_pos && next_free < done_pos+len) { - uint64_t proc_pos = next_free, pos = 0; + proc_pos = next_free; + pos = 0; next_free += 512; if (next_free >= bs->journal.len) { next_free = 512; } + resume: while (pos < 512) { journal_entry *je = (journal_entry*)((uint8_t*)buf + proc_pos - done_pos + pos); @@ -335,12 +388,13 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) break; } } - started = true; - pos += je->size; - crc32_last = je->crc32; if (je->type == JE_SMALL_WRITE) { +#ifdef BLOCKSTORE_DEBUG + printf("je_small_write oid=%lu:%lu ver=%lu offset=%u len=%u\n", je->small_write.oid.inode, je->small_write.oid.stripe, je->small_write.version, je->small_write.offset, je->small_write.len); +#endif // oid, version, offset, len + uint64_t prev_free = next_free; if (next_free + je->small_write.len > bs->journal.len) { // data continues from the beginning of the journal @@ -358,6 +412,45 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) snprintf(err, 1024, "BUG: calculated journal data offset (%lu) != stored journal data offset (%lu)", location, je->small_write.data_offset); throw std::runtime_error(err); } + uint32_t data_crc32 = 0; + if (location >= done_pos && location+je->small_write.len <= done_pos+len) + { + // data is within this buffer + data_crc32 = crc32c(0, buf + location - done_pos, je->small_write.len); + } + else + { + // this case is even more interesting because we must carry data crc32 check to next buffer(s) + uint64_t covered = 0; + for (int i = 0; i < done.size(); i++) + { + if (location+je->small_write.len > done[i].pos && + location < done[i].pos+done[i].len) + { + uint64_t part_end = (location+je->small_write.len < done[i].pos+done[i].len + ? location+je->small_write.len : done[i].pos+done[i].len); + uint64_t part_begin = (location < done[i].pos ? done[i].pos : location); + covered += part_end - part_begin; + data_crc32 = crc32c(data_crc32, done[i].buf + part_begin - done[i].pos, part_end - part_begin); + } + } + if (covered < je->small_write.len) + { + continue_pos = proc_pos+pos; + next_free = prev_free; + return 2; + } + } + if (data_crc32 != je->small_write.crc32_data) + { + // journal entry is corrupt, stop here + // interesting thing is that we must clear the corrupt entry if we're not readonly + memset(buf + proc_pos - done_pos + pos, 0, 512 - pos); + bs->journal.next_free = prev_free; + init_write_buf = buf + proc_pos - done_pos; + init_write_sector = proc_pos; + return 0; + } auto clean_it = bs->clean_db.find(je->small_write.oid); if (clean_it == bs->clean_db.end() || clean_it->second.version < je->big_write.version) @@ -366,9 +459,6 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) .oid = je->small_write.oid, .version = je->small_write.version, }; -#ifdef BLOCKSTORE_DEBUG - printf("je_small_write oid=%lu:%lu ver=%lu offset=%u len=%u\n", ov.oid.inode, ov.oid.stripe, ov.version, je->small_write.offset, je->small_write.len); -#endif bs->dirty_db.emplace(ov, (dirty_entry){ .state = ST_J_SYNCED, .flags = 0, @@ -387,6 +477,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) } else if (je->type == JE_BIG_WRITE) { +#ifdef BLOCKSTORE_DEBUG + printf("je_big_write oid=%lu:%lu ver=%lu\n", je->big_write.oid.inode, je->big_write.oid.stripe, je->big_write.version); +#endif auto clean_it = bs->clean_db.find(je->big_write.oid); if (clean_it == bs->clean_db.end() || clean_it->second.version < je->big_write.version) @@ -396,9 +489,6 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) .oid = je->big_write.oid, .version = je->big_write.version, }; -#ifdef BLOCKSTORE_DEBUG - printf("je_big_write oid=%lu:%lu ver=%lu\n", ov.oid.inode, ov.oid.stripe, ov.version); -#endif bs->dirty_db.emplace(ov, (dirty_entry){ .state = ST_D_META_SYNCED, .flags = 0, @@ -418,6 +508,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) } else if (je->type == JE_STABLE) { +#ifdef BLOCKSTORE_DEBUG + printf("je_stable oid=%lu:%lu ver=%lu\n", je->stable.oid.inode, je->stable.oid.stripe, je->stable.version); +#endif // oid, version obj_ver_id ov = { .oid = je->stable.oid, @@ -428,13 +521,10 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) { // journal contains a legitimate STABLE entry for a non-existing dirty write // this probably means that journal was trimmed between WRITTEN and STABLE entries - // skip for now. but FIXME: maybe warn about it in the future + // skip it } else { -#ifdef BLOCKSTORE_DEBUG - printf("je_stable oid=%lu:%lu ver=%lu\n", ov.oid.inode, ov.oid.stripe, ov.version); -#endif while (1) { it->second.state = (it->second.state == ST_D_META_SYNCED @@ -456,6 +546,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) } else if (je->type == JE_DELETE) { +#ifdef BLOCKSTORE_DEBUG + printf("je_delete oid=%lu:%lu ver=%lu\n", je->del.oid.inode, je->del.oid.stripe, je->del.version); +#endif // oid, version obj_ver_id ov = { .oid = je->del.oid, @@ -471,6 +564,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) }); bs->journal.used_sectors[proc_pos]++; } + started = true; + pos += je->size; + crc32_last = je->crc32; entries_loaded++; } } diff --git a/blockstore_init.h b/blockstore_init.h index deabd466..5d213889 100644 --- a/blockstore_init.h +++ b/blockstore_init.h @@ -18,23 +18,32 @@ public: int loop(); }; +struct bs_init_journal_done +{ + void *buf; + uint64_t pos, len; +}; + class blockstore_init_journal { blockstore *bs; - int wait_state = 0, wait_count = 0; + int wait_state = 0, wait_count = 0, handle_res = 0; uint64_t entries_loaded = 0; - void *journal_buffer = NULL; uint32_t crc32_last = 0; bool started = false; - uint64_t done_pos = 0, journal_pos = 0; uint64_t next_free = 512; + std::vector done; + uint64_t journal_pos = 0; + uint64_t continue_pos = 0; + void *init_write_buf = NULL; + uint64_t init_write_sector = 0; bool wrapped = false; - int submitted = 0, done_buf = 0, done_len = 0; + void *submitted_buf; struct io_uring_sqe *sqe; struct ring_data_t *data; journal_entry_start *je_start; std::function simple_callback; - int handle_journal_part(void *buf, uint64_t len); + int handle_journal_part(void *buf, uint64_t done_pos, uint64_t len); void handle_event(ring_data_t *data); public: blockstore_init_journal(blockstore* bs); diff --git a/blockstore_journal.h b/blockstore_journal.h index cd8c60c4..7e2d82c4 100644 --- a/blockstore_journal.h +++ b/blockstore_journal.h @@ -41,7 +41,6 @@ struct __attribute__((__packed__)) journal_entry_small_write // data_offset is its offset within journal uint64_t data_offset; uint32_t crc32_data; - // FIXME verify data crc32c }; struct __attribute__((__packed__)) journal_entry_big_write