Rewrite journal_init to the "goto-coroutine" style

blocking-uring-test
Vitaliy Filippov 2019-11-19 19:49:52 +03:00
parent 3bfa2f5f39
commit b5f04c58ff
2 changed files with 111 additions and 121 deletions

View File

@ -111,152 +111,139 @@ bool iszero(uint64_t *buf, int len)
return true;
}
void blockstore_init_journal::handle_event(ring_data_t *data)
void blockstore_init_journal::handle_event(ring_data_t *data1)
{
if (step == 1)
// Step 3: Read journal
if (data1->res < 0)
{
// Step 1: Read first block of the journal
if (data->res < 0)
{
throw std::runtime_error(
std::string("read journal failed at offset ") + std::to_string(0) +
std::string(": ") + strerror(-data->res)
);
}
if (iszero((uint64_t*)journal_buffer, 3))
{
// Journal is empty
// FIXME handle this wrapping to 512 better
bs->journal.used_start = 512;
bs->journal.next_free = 512;
step = 99;
}
else
{
// First block always contains a single JE_START entry
journal_entry_start *je = (journal_entry_start*)journal_buffer;
if (je->magic != JOURNAL_MAGIC ||
je->type != JE_START ||
je->size != sizeof(journal_entry_start) ||
je_crc32((journal_entry*)je) != je->crc32)
{
// Entry is corrupt
throw std::runtime_error("first entry of the journal is corrupt");
}
journal_pos = bs->journal.used_start = je->journal_start;
crc32_last = 0;
step = 2;
started = false;
}
throw std::runtime_error(
std::string("read journal failed at offset ") + std::to_string(journal_pos) +
std::string(": ") + strerror(-data1->res)
);
}
else if (step == 2 || step == 3)
done_pos = journal_pos;
done_buf = submitted;
done_len = data1->res;
journal_pos += data1->res;
if (journal_pos >= bs->journal.len)
{
// Step 3: Read journal
if (data->res < 0)
{
throw std::runtime_error(
std::string("read journal failed at offset ") + std::to_string(journal_pos) +
std::string(": ") + strerror(-data->res)
);
}
done_pos = journal_pos;
done_buf = submitted;
done_len = data->res;
journal_pos += data->res;
if (journal_pos >= bs->journal.len)
{
// Continue from the beginning
journal_pos = 512;
wrapped = true;
}
submitted = 0;
// Continue from the beginning
journal_pos = 512;
wrapped = true;
}
submitted = 0;
}
int blockstore_init_journal::loop()
{
if (step == 100)
if (wait_state == 1)
goto resume_1;
else if (wait_state == 2)
goto resume_2;
else if (wait_state == 3)
goto resume_3;
journal_buffer = (uint8_t*)memalign(DISK_ALIGNMENT, 2*JOURNAL_BUFFER_SIZE);
// Read first block of the journal
sqe = bs->get_sqe();
if (!sqe)
{
return 0;
throw std::runtime_error("io_uring is full while trying to read journal");
}
if (!journal_buffer)
data = ((ring_data_t*)sqe->user_data);
data->iov = { journal_buffer, 512 };
data->callback = [this](ring_data_t *data1)
{
journal_buffer = (uint8_t*)memalign(DISK_ALIGNMENT, 2*JOURNAL_BUFFER_SIZE);
}
if (step == 0)
{
// Step 1: Read first block of the journal
struct io_uring_sqe *sqe = bs->get_sqe();
if (!sqe)
if (data1->res != 512)
{
throw std::runtime_error("io_uring is full while trying to read journal");
throw std::runtime_error(
std::string("read journal failed at offset ") + std::to_string(0) +
std::string(": ") + strerror(-data1->res)
);
}
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
data->iov = { journal_buffer, 512 };
data->callback = [this](ring_data_t *data) { handle_event(data); };
my_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
bs->ringloop->submit();
step = 1;
}
if (step == 2 || step == 3)
done_buf = 1;
};
my_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
bs->ringloop->submit();
done_buf = 0;
resume_1:
if (done_buf == 0)
{
// Step 3: Read journal
if (!submitted)
wait_state = 1;
return 1;
}
done_buf = 0;
if (iszero((uint64_t*)journal_buffer, 3))
{
// Journal is empty
// FIXME handle this wrapping to 512 better
bs->journal.used_start = 512;
bs->journal.next_free = 512;
}
else
{
// First block always contains a single JE_START entry
je_start = (journal_entry_start*)journal_buffer;
if (je_start->magic != JOURNAL_MAGIC ||
je_start->type != JE_START ||
je_start->size != sizeof(journal_entry_start) ||
je_crc32((journal_entry*)je_start) != je_start->crc32)
{
if (step != 3)
// Entry is corrupt
throw std::runtime_error("first entry of the journal is corrupt");
}
journal_pos = bs->journal.used_start = je_start->journal_start;
crc32_last = 0;
// Read journal
while (true)
{
resume_2:
if (submitted)
{
if (journal_pos == bs->journal.used_start && wrapped)
{
step = 3;
}
else
{
struct io_uring_sqe *sqe = bs->get_sqe();
if (!sqe)
{
throw std::runtime_error("io_uring is full while trying to read journal");
}
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
uint64_t end = bs->journal.len;
if (journal_pos < bs->journal.used_start)
{
end = bs->journal.used_start;
}
data->iov = {
journal_buffer + (done_buf == 1 ? JOURNAL_BUFFER_SIZE : 0),
end - journal_pos < JOURNAL_BUFFER_SIZE ? end - journal_pos : JOURNAL_BUFFER_SIZE,
};
data->callback = [this](ring_data_t *data) { handle_event(data); };
my_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + journal_pos);
bs->ringloop->submit();
submitted = done_buf == 1 ? 2 : 1;
}
wait_state = 2;
return 1;
}
if (wrapped && journal_pos >= bs->journal.used_start)
{
break;
}
else
{
step = 99;
sqe = bs->get_sqe();
if (!sqe)
{
throw std::runtime_error("io_uring is full while trying to read journal");
}
data = ((ring_data_t*)sqe->user_data);
uint64_t end = bs->journal.len;
if (journal_pos < bs->journal.used_start)
{
end = bs->journal.used_start;
}
data->iov = {
journal_buffer + (done_buf == 1 ? JOURNAL_BUFFER_SIZE : 0),
end - journal_pos < JOURNAL_BUFFER_SIZE ? end - journal_pos : JOURNAL_BUFFER_SIZE,
};
data->callback = [this](ring_data_t *data1) { handle_event(data1); };
my_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + journal_pos);
bs->ringloop->submit();
submitted = done_buf == 1 ? 2 : 1;
}
}
if (done_buf && step != 3)
{
// handle journal entries
if (handle_journal_part(journal_buffer + (done_buf == 1 ? 0 : JOURNAL_BUFFER_SIZE), done_len) == 0)
if (done_buf && handle_journal_part(journal_buffer + (done_buf == 1 ? 0 : JOURNAL_BUFFER_SIZE), done_len) == 0)
{
// journal ended. wait for the next read to complete, then stop
step = 3;
resume_3:
if (submitted)
{
wait_state = 3;
return 1;
}
}
done_buf = 0;
}
}
if (step == 99)
{
free(journal_buffer);
bs->journal.crc32_last = crc32_last;
journal_buffer = NULL;
step = 100;
return 0;
}
return 1;
free(journal_buffer);
bs->journal.crc32_last = crc32_last;
journal_buffer = NULL;
return 0;
}
int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)

View File

@ -16,14 +16,17 @@ public:
class blockstore_init_journal
{
blockstore *bs;
int wait_state = 0;
uint8_t *journal_buffer = NULL;
int step = 0;
uint32_t crc32_last = 0;
bool started = false;
uint64_t done_pos = 0, journal_pos = 0;
uint64_t cur_skip = 0;
bool wrapped = false;
int submitted = 0, done_buf = 0, done_len = 0;
struct io_uring_sqe *sqe;
struct ring_data_t *data;
journal_entry_start *je_start;
int handle_journal_part(void *buf, uint64_t len);
void handle_event(ring_data_t *data);
public: