diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 9067db55f..7cdc4bbea 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -111,152 +111,139 @@ bool iszero(uint64_t *buf, int len) return true; } -void blockstore_init_journal::handle_event(ring_data_t *data) +void blockstore_init_journal::handle_event(ring_data_t *data1) { - if (step == 1) + // Step 3: Read journal + if (data1->res < 0) { - // Step 1: Read first block of the journal - if (data->res < 0) - { - throw std::runtime_error( - std::string("read journal failed at offset ") + std::to_string(0) + - std::string(": ") + strerror(-data->res) - ); - } - if (iszero((uint64_t*)journal_buffer, 3)) - { - // Journal is empty - // FIXME handle this wrapping to 512 better - bs->journal.used_start = 512; - bs->journal.next_free = 512; - step = 99; - } - else - { - // First block always contains a single JE_START entry - journal_entry_start *je = (journal_entry_start*)journal_buffer; - if (je->magic != JOURNAL_MAGIC || - je->type != JE_START || - je->size != sizeof(journal_entry_start) || - je_crc32((journal_entry*)je) != je->crc32) - { - // Entry is corrupt - throw std::runtime_error("first entry of the journal is corrupt"); - } - journal_pos = bs->journal.used_start = je->journal_start; - crc32_last = 0; - step = 2; - started = false; - } + throw std::runtime_error( + std::string("read journal failed at offset ") + std::to_string(journal_pos) + + std::string(": ") + strerror(-data1->res) + ); } - else if (step == 2 || step == 3) + done_pos = journal_pos; + done_buf = submitted; + done_len = data1->res; + journal_pos += data1->res; + if (journal_pos >= bs->journal.len) { - // Step 3: Read journal - if (data->res < 0) - { - throw std::runtime_error( - std::string("read journal failed at offset ") + std::to_string(journal_pos) + - std::string(": ") + strerror(-data->res) - ); - } - done_pos = journal_pos; - done_buf = submitted; - done_len = data->res; - journal_pos += data->res; - if (journal_pos >= bs->journal.len) - { - // Continue from the beginning - journal_pos = 512; - wrapped = true; - } - submitted = 0; + // Continue from the beginning + journal_pos = 512; + wrapped = true; } + submitted = 0; } int blockstore_init_journal::loop() { - if (step == 100) + if (wait_state == 1) + goto resume_1; + else if (wait_state == 2) + goto resume_2; + else if (wait_state == 3) + goto resume_3; + journal_buffer = (uint8_t*)memalign(DISK_ALIGNMENT, 2*JOURNAL_BUFFER_SIZE); + // Read first block of the journal + sqe = bs->get_sqe(); + if (!sqe) { - return 0; + throw std::runtime_error("io_uring is full while trying to read journal"); } - if (!journal_buffer) + data = ((ring_data_t*)sqe->user_data); + data->iov = { journal_buffer, 512 }; + data->callback = [this](ring_data_t *data1) { - journal_buffer = (uint8_t*)memalign(DISK_ALIGNMENT, 2*JOURNAL_BUFFER_SIZE); - } - if (step == 0) - { - // Step 1: Read first block of the journal - struct io_uring_sqe *sqe = bs->get_sqe(); - if (!sqe) + if (data1->res != 512) { - throw std::runtime_error("io_uring is full while trying to read journal"); + throw std::runtime_error( + std::string("read journal failed at offset ") + std::to_string(0) + + std::string(": ") + strerror(-data1->res) + ); } - struct ring_data_t *data = ((ring_data_t*)sqe->user_data); - data->iov = { journal_buffer, 512 }; - data->callback = [this](ring_data_t *data) { handle_event(data); }; - my_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset); - bs->ringloop->submit(); - step = 1; - } - if (step == 2 || step == 3) + done_buf = 1; + }; + my_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset); + bs->ringloop->submit(); + done_buf = 0; +resume_1: + if (done_buf == 0) { - // Step 3: Read journal - if (!submitted) + wait_state = 1; + return 1; + } + done_buf = 0; + if (iszero((uint64_t*)journal_buffer, 3)) + { + // Journal is empty + // FIXME handle this wrapping to 512 better + bs->journal.used_start = 512; + bs->journal.next_free = 512; + } + else + { + // First block always contains a single JE_START entry + je_start = (journal_entry_start*)journal_buffer; + if (je_start->magic != JOURNAL_MAGIC || + je_start->type != JE_START || + je_start->size != sizeof(journal_entry_start) || + je_crc32((journal_entry*)je_start) != je_start->crc32) { - if (step != 3) + // Entry is corrupt + throw std::runtime_error("first entry of the journal is corrupt"); + } + journal_pos = bs->journal.used_start = je_start->journal_start; + crc32_last = 0; + // Read journal + while (true) + { + resume_2: + if (submitted) { - if (journal_pos == bs->journal.used_start && wrapped) - { - step = 3; - } - else - { - struct io_uring_sqe *sqe = bs->get_sqe(); - if (!sqe) - { - throw std::runtime_error("io_uring is full while trying to read journal"); - } - struct ring_data_t *data = ((ring_data_t*)sqe->user_data); - uint64_t end = bs->journal.len; - if (journal_pos < bs->journal.used_start) - { - end = bs->journal.used_start; - } - data->iov = { - journal_buffer + (done_buf == 1 ? JOURNAL_BUFFER_SIZE : 0), - end - journal_pos < JOURNAL_BUFFER_SIZE ? end - journal_pos : JOURNAL_BUFFER_SIZE, - }; - data->callback = [this](ring_data_t *data) { handle_event(data); }; - my_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + journal_pos); - bs->ringloop->submit(); - submitted = done_buf == 1 ? 2 : 1; - } + wait_state = 2; + return 1; + } + if (wrapped && journal_pos >= bs->journal.used_start) + { + break; } else { - step = 99; + sqe = bs->get_sqe(); + if (!sqe) + { + throw std::runtime_error("io_uring is full while trying to read journal"); + } + data = ((ring_data_t*)sqe->user_data); + uint64_t end = bs->journal.len; + if (journal_pos < bs->journal.used_start) + { + end = bs->journal.used_start; + } + data->iov = { + journal_buffer + (done_buf == 1 ? JOURNAL_BUFFER_SIZE : 0), + end - journal_pos < JOURNAL_BUFFER_SIZE ? end - journal_pos : JOURNAL_BUFFER_SIZE, + }; + data->callback = [this](ring_data_t *data1) { handle_event(data1); }; + my_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + journal_pos); + bs->ringloop->submit(); + submitted = done_buf == 1 ? 2 : 1; } - } - if (done_buf && step != 3) - { - // handle journal entries - if (handle_journal_part(journal_buffer + (done_buf == 1 ? 0 : JOURNAL_BUFFER_SIZE), done_len) == 0) + if (done_buf && handle_journal_part(journal_buffer + (done_buf == 1 ? 0 : JOURNAL_BUFFER_SIZE), done_len) == 0) { // journal ended. wait for the next read to complete, then stop - step = 3; + resume_3: + if (submitted) + { + wait_state = 3; + return 1; + } } - done_buf = 0; } } - if (step == 99) - { - free(journal_buffer); - bs->journal.crc32_last = crc32_last; - journal_buffer = NULL; - step = 100; - return 0; - } - return 1; + free(journal_buffer); + bs->journal.crc32_last = crc32_last; + journal_buffer = NULL; + return 0; } int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) diff --git a/blockstore_init.h b/blockstore_init.h index 2a0d1ab77..683a0d2c2 100644 --- a/blockstore_init.h +++ b/blockstore_init.h @@ -16,14 +16,17 @@ public: class blockstore_init_journal { blockstore *bs; + int wait_state = 0; uint8_t *journal_buffer = NULL; - int step = 0; uint32_t crc32_last = 0; bool started = false; uint64_t done_pos = 0, journal_pos = 0; uint64_t cur_skip = 0; bool wrapped = false; int submitted = 0, done_buf = 0, done_len = 0; + struct io_uring_sqe *sqe; + struct ring_data_t *data; + journal_entry_start *je_start; int handle_journal_part(void *buf, uint64_t len); void handle_event(ring_data_t *data); public: