forked from vitalif/vitastor
Initialize sector 0 of the journal
parent
b5f04c58ff
commit
eb55b2fe20
|
@ -101,6 +101,14 @@ void blockstore_init_meta::handle_entries(struct clean_disk_entry* entries, int
|
||||||
blockstore_init_journal::blockstore_init_journal(blockstore *bs)
|
blockstore_init_journal::blockstore_init_journal(blockstore *bs)
|
||||||
{
|
{
|
||||||
this->bs = bs;
|
this->bs = bs;
|
||||||
|
simple_callback = [this](ring_data_t *data1)
|
||||||
|
{
|
||||||
|
if (data1->res != data1->iov.iov_len)
|
||||||
|
{
|
||||||
|
throw std::runtime_error(std::string("I/O operation failed while reading journal: ") + strerror(-data1->res));
|
||||||
|
}
|
||||||
|
wait_count--;
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
bool iszero(uint64_t *buf, int len)
|
bool iszero(uint64_t *buf, int len)
|
||||||
|
@ -134,6 +142,12 @@ void blockstore_init_journal::handle_event(ring_data_t *data1)
|
||||||
submitted = 0;
|
submitted = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#define GET_SQE() \
|
||||||
|
sqe = bs->get_sqe();\
|
||||||
|
if (!sqe)\
|
||||||
|
throw std::runtime_error("io_uring is full while trying to read journal");\
|
||||||
|
data = ((ring_data_t*)sqe->user_data)
|
||||||
|
|
||||||
int blockstore_init_journal::loop()
|
int blockstore_init_journal::loop()
|
||||||
{
|
{
|
||||||
if (wait_state == 1)
|
if (wait_state == 1)
|
||||||
|
@ -142,42 +156,58 @@ int blockstore_init_journal::loop()
|
||||||
goto resume_2;
|
goto resume_2;
|
||||||
else if (wait_state == 3)
|
else if (wait_state == 3)
|
||||||
goto resume_3;
|
goto resume_3;
|
||||||
|
else if (wait_state == 4)
|
||||||
|
goto resume_4;
|
||||||
journal_buffer = (uint8_t*)memalign(DISK_ALIGNMENT, 2*JOURNAL_BUFFER_SIZE);
|
journal_buffer = (uint8_t*)memalign(DISK_ALIGNMENT, 2*JOURNAL_BUFFER_SIZE);
|
||||||
// Read first block of the journal
|
// Read first block of the journal
|
||||||
sqe = bs->get_sqe();
|
sqe = bs->get_sqe();
|
||||||
if (!sqe)
|
if (!sqe)
|
||||||
{
|
|
||||||
throw std::runtime_error("io_uring is full while trying to read journal");
|
throw std::runtime_error("io_uring is full while trying to read journal");
|
||||||
}
|
|
||||||
data = ((ring_data_t*)sqe->user_data);
|
data = ((ring_data_t*)sqe->user_data);
|
||||||
data->iov = { journal_buffer, 512 };
|
data->iov = { journal_buffer, 512 };
|
||||||
data->callback = [this](ring_data_t *data1)
|
data->callback = simple_callback;
|
||||||
{
|
|
||||||
if (data1->res != 512)
|
|
||||||
{
|
|
||||||
throw std::runtime_error(
|
|
||||||
std::string("read journal failed at offset ") + std::to_string(0) +
|
|
||||||
std::string(": ") + strerror(-data1->res)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
done_buf = 1;
|
|
||||||
};
|
|
||||||
my_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
|
my_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
|
||||||
bs->ringloop->submit();
|
bs->ringloop->submit();
|
||||||
done_buf = 0;
|
wait_count = 1;
|
||||||
resume_1:
|
resume_1:
|
||||||
if (done_buf == 0)
|
if (wait_count > 0)
|
||||||
{
|
{
|
||||||
wait_state = 1;
|
wait_state = 1;
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
done_buf = 0;
|
|
||||||
if (iszero((uint64_t*)journal_buffer, 3))
|
if (iszero((uint64_t*)journal_buffer, 3))
|
||||||
{
|
{
|
||||||
// Journal is empty
|
// Journal is empty
|
||||||
// FIXME handle this wrapping to 512 better
|
// FIXME handle this wrapping to 512 better ... and align it to 4096
|
||||||
bs->journal.used_start = 512;
|
bs->journal.used_start = 512;
|
||||||
bs->journal.next_free = 512;
|
bs->journal.next_free = 512;
|
||||||
|
// Initialize journal "superblock"
|
||||||
|
GET_SQE();
|
||||||
|
memset(journal_buffer, 0, 512);
|
||||||
|
*((journal_entry_start*)journal_buffer) = {
|
||||||
|
.crc32 = 0,
|
||||||
|
.magic = JOURNAL_MAGIC,
|
||||||
|
.type = JE_START,
|
||||||
|
.size = sizeof(journal_entry_start),
|
||||||
|
.reserved = 0,
|
||||||
|
.journal_start = bs->journal.used_start,
|
||||||
|
};
|
||||||
|
((journal_entry_start*)journal_buffer)->crc32 = je_crc32((journal_entry*)journal_buffer);
|
||||||
|
data->iov = (struct iovec){ journal_buffer, 512 };
|
||||||
|
data->callback = simple_callback;
|
||||||
|
my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
|
||||||
|
wait_count++;
|
||||||
|
GET_SQE();
|
||||||
|
my_uring_prep_fsync(sqe, bs->journal.fd, 0);
|
||||||
|
data->callback = simple_callback;
|
||||||
|
wait_count++;
|
||||||
|
bs->ringloop->submit();
|
||||||
|
resume_4:
|
||||||
|
if (wait_count > 0)
|
||||||
|
{
|
||||||
|
wait_state = 4;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -208,12 +238,7 @@ resume_1:
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
sqe = bs->get_sqe();
|
GET_SQE();
|
||||||
if (!sqe)
|
|
||||||
{
|
|
||||||
throw std::runtime_error("io_uring is full while trying to read journal");
|
|
||||||
}
|
|
||||||
data = ((ring_data_t*)sqe->user_data);
|
|
||||||
uint64_t end = bs->journal.len;
|
uint64_t end = bs->journal.len;
|
||||||
if (journal_pos < bs->journal.used_start)
|
if (journal_pos < bs->journal.used_start)
|
||||||
{
|
{
|
||||||
|
|
|
@ -16,7 +16,7 @@ public:
|
||||||
class blockstore_init_journal
|
class blockstore_init_journal
|
||||||
{
|
{
|
||||||
blockstore *bs;
|
blockstore *bs;
|
||||||
int wait_state = 0;
|
int wait_state = 0, wait_count = 0;
|
||||||
uint8_t *journal_buffer = NULL;
|
uint8_t *journal_buffer = NULL;
|
||||||
uint32_t crc32_last = 0;
|
uint32_t crc32_last = 0;
|
||||||
bool started = false;
|
bool started = false;
|
||||||
|
@ -27,6 +27,7 @@ class blockstore_init_journal
|
||||||
struct io_uring_sqe *sqe;
|
struct io_uring_sqe *sqe;
|
||||||
struct ring_data_t *data;
|
struct ring_data_t *data;
|
||||||
journal_entry_start *je_start;
|
journal_entry_start *je_start;
|
||||||
|
std::function<void(ring_data_t*)> simple_callback;
|
||||||
int handle_journal_part(void *buf, uint64_t len);
|
int handle_journal_part(void *buf, uint64_t len);
|
||||||
void handle_event(ring_data_t *data);
|
void handle_event(ring_data_t *data);
|
||||||
public:
|
public:
|
||||||
|
|
Loading…
Reference in New Issue