forked from vitalif/vitastor
Finish journal reader
parent
e1c92d2227
commit
c959948c82
|
@ -35,8 +35,12 @@
|
||||||
#define ST_D_STABLE 20
|
#define ST_D_STABLE 20
|
||||||
#define ST_D_META_MOVED 21
|
#define ST_D_META_MOVED 21
|
||||||
#define ST_D_META_COMMITTED 22
|
#define ST_D_META_COMMITTED 22
|
||||||
|
#define ST_DEL_WRITTEN 23
|
||||||
|
#define ST_DEL_SYNCED 24
|
||||||
|
#define ST_DEL_STABLE 25
|
||||||
|
#define ST_DEL_MOVED 26
|
||||||
#define ST_CURRENT 32
|
#define ST_CURRENT 32
|
||||||
#define IS_STABLE(st) ((st) == 4 || (st) == 5 || (st) == 6 || (st) == 20 || (st) == 21 || (st) == 22 || (st) == 32)
|
#define IS_STABLE(st) ((st) == 4 || (st) == 5 || (st) == 6 || (st) == 20 || (st) == 21 || (st) == 22 || (st) == 32 || (st) == 24 || (st) == 25)
|
||||||
#define IS_JOURNAL(st) (st >= 2 && st <= 6)
|
#define IS_JOURNAL(st) (st >= 2 && st <= 6)
|
||||||
|
|
||||||
// Default object size is 128 KB
|
// Default object size is 128 KB
|
||||||
|
@ -183,4 +187,6 @@ public:
|
||||||
int read(blockstore_operation *read_op);
|
int read(blockstore_operation *read_op);
|
||||||
int fulfill_read(blockstore_operation & read_op, uint32_t item_start, uint32_t item_end,
|
int fulfill_read(blockstore_operation & read_op, uint32_t item_start, uint32_t item_end,
|
||||||
uint32_t item_state, uint64_t item_version, uint64_t item_location);
|
uint32_t item_state, uint64_t item_version, uint64_t item_location);
|
||||||
|
int fulfill_read_push(blockstore_operation & read_op, uint32_t item_start,
|
||||||
|
uint32_t item_state, uint64_t item_version, uint64_t item_location, uint32_t cur_start, uint32_t cur_end);
|
||||||
};
|
};
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
#include "blockstore.h"
|
#include "blockstore.h"
|
||||||
#include "crc32c.h"
|
|
||||||
|
|
||||||
blockstore_init_meta::blockstore_init_meta(blockstore *bs)
|
blockstore_init_meta::blockstore_init_meta(blockstore *bs)
|
||||||
{
|
{
|
||||||
|
@ -101,13 +100,6 @@ bool iszero(uint64_t *buf, int len)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
inline uint32_t je_crc32(journal_entry *je)
|
|
||||||
{
|
|
||||||
return crc32c_zero4(((uint8_t*)je)+4, je->size-4);
|
|
||||||
}
|
|
||||||
|
|
||||||
#define JOURNAL_BUFFER_SIZE 4*1024*1024
|
|
||||||
|
|
||||||
int blockstore_init_journal::read_loop()
|
int blockstore_init_journal::read_loop()
|
||||||
{
|
{
|
||||||
if (step == 100)
|
if (step == 100)
|
||||||
|
@ -148,8 +140,8 @@ int blockstore_init_journal::read_loop()
|
||||||
if (iszero((uint64_t*)journal_buffer, 3))
|
if (iszero((uint64_t*)journal_buffer, 3))
|
||||||
{
|
{
|
||||||
// Journal is empty
|
// Journal is empty
|
||||||
bs->journal_start = 0;
|
bs->journal_start = 512;
|
||||||
bs->journal_end = 0;
|
bs->journal_end = 512;
|
||||||
step = 99;
|
step = 99;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -194,11 +186,20 @@ int blockstore_init_journal::read_loop()
|
||||||
{
|
{
|
||||||
// Continue from the beginning
|
// Continue from the beginning
|
||||||
journal_pos = 512;
|
journal_pos = 512;
|
||||||
|
wrapped = true;
|
||||||
}
|
}
|
||||||
submitted = 0;
|
submitted = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!submitted && step != 3)
|
if (!submitted)
|
||||||
|
{
|
||||||
|
if (step != 3)
|
||||||
|
{
|
||||||
|
if (journal_pos == bs->journal_start && wrapped)
|
||||||
|
{
|
||||||
|
step = 3;
|
||||||
|
}
|
||||||
|
else
|
||||||
{
|
{
|
||||||
struct io_uring_sqe *sqe = io_uring_get_sqe(bs->ring);
|
struct io_uring_sqe *sqe = io_uring_get_sqe(bs->ring);
|
||||||
if (!sqe)
|
if (!sqe)
|
||||||
|
@ -218,12 +219,18 @@ int blockstore_init_journal::read_loop()
|
||||||
io_uring_submit(bs->ring);
|
io_uring_submit(bs->ring);
|
||||||
submitted = done_buf == 1 ? 2 : 1;
|
submitted = done_buf == 1 ? 2 : 1;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
step = 99;
|
||||||
|
}
|
||||||
|
}
|
||||||
if (done_buf && step != 3)
|
if (done_buf && step != 3)
|
||||||
{
|
{
|
||||||
// handle journal entries
|
// handle journal entries
|
||||||
if (handle_journal(journal_buffer + (done_buf == 1 ? 0 : JOURNAL_BUFFER_SIZE), done_len) == 0)
|
if (handle_journal_part(journal_buffer + (done_buf == 1 ? 0 : JOURNAL_BUFFER_SIZE), done_len) == 0)
|
||||||
{
|
{
|
||||||
// finish
|
// journal ended. wait for the next read to complete, then stop
|
||||||
step = 3;
|
step = 3;
|
||||||
}
|
}
|
||||||
done_buf = 0;
|
done_buf = 0;
|
||||||
|
@ -232,43 +239,69 @@ int blockstore_init_journal::read_loop()
|
||||||
if (step == 99)
|
if (step == 99)
|
||||||
{
|
{
|
||||||
free(journal_buffer);
|
free(journal_buffer);
|
||||||
|
bs->journal_crc32_last = crc32_last;
|
||||||
journal_buffer = NULL;
|
journal_buffer = NULL;
|
||||||
step = 100;
|
step = 100;
|
||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int blockstore_init_journal::handle_journal(void *buf, int len)
|
int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
|
||||||
{
|
{
|
||||||
int total_pos = 0;
|
uint64_t total_pos = 0;
|
||||||
|
if (cur_skip >= 0)
|
||||||
|
{
|
||||||
|
total_pos = cur_skip;
|
||||||
|
cur_skip = 0;
|
||||||
|
}
|
||||||
while (total_pos < len)
|
while (total_pos < len)
|
||||||
{
|
{
|
||||||
int pos = 0, skip = 0;
|
total_pos += 512;
|
||||||
|
uint64_t pos = 0;
|
||||||
while (pos < 512)
|
while (pos < 512)
|
||||||
{
|
{
|
||||||
journal_entry *je = (journal_entry*)((uint8_t*)buf + total_pos + pos);
|
journal_entry *je = (journal_entry*)((uint8_t*)buf + total_pos + pos);
|
||||||
if (je->magic != JOURNAL_MAGIC || je_crc32(je) != je->crc32 ||
|
if (je->magic != JOURNAL_MAGIC || je_crc32(je) != je->crc32 ||
|
||||||
je->type < JE_SMALL_WRITE || je->type > JE_DELETE || je->crc32_prev != crc32_last)
|
je->type < JE_SMALL_WRITE || je->type > JE_DELETE || je->crc32_prev != crc32_last)
|
||||||
{
|
{
|
||||||
// Invalid entry - end of the journal
|
if (pos == 0)
|
||||||
|
{
|
||||||
|
// invalid entry in the beginning, this is definitely the end of the journal
|
||||||
bs->journal_end = done_pos + total_pos + pos;
|
bs->journal_end = done_pos + total_pos + pos;
|
||||||
// FIXME: save <skip>
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// allow partially filled sectors
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
pos += je->size;
|
pos += je->size;
|
||||||
|
crc32_last = je->crc32;
|
||||||
if (je->type == JE_SMALL_WRITE)
|
if (je->type == JE_SMALL_WRITE)
|
||||||
{
|
{
|
||||||
// oid, version, offset, len
|
// oid, version, offset, len
|
||||||
|
uint64_t location;
|
||||||
|
if (cur_skip > 0 || done_pos + total_pos + je->small_write.len > bs->journal_len)
|
||||||
|
{
|
||||||
|
// data continues from the beginning of the journal
|
||||||
|
location = 512 + cur_skip;
|
||||||
|
cur_skip += je->small_write.len;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// data is right next
|
||||||
|
location = done_pos + total_pos;
|
||||||
|
total_pos += je->small_write.len;
|
||||||
|
}
|
||||||
bs->dirty_queue[je->small_write.oid].push_back((dirty_entry){
|
bs->dirty_queue[je->small_write.oid].push_back((dirty_entry){
|
||||||
.version = je->small_write.version,
|
.version = je->small_write.version,
|
||||||
.state = ST_J_SYNCED,
|
.state = ST_J_SYNCED,
|
||||||
.flags = 0,
|
.flags = 0,
|
||||||
// FIXME: data in journal may never be non-contiguous
|
.location = location,
|
||||||
.location = done_pos + total_pos + 512 + skip,
|
|
||||||
.offset = je->small_write.offset,
|
.offset = je->small_write.offset,
|
||||||
.size = je->small_write.len,
|
.size = je->small_write.len,
|
||||||
});
|
});
|
||||||
skip += je->small_write.len;
|
|
||||||
}
|
}
|
||||||
else if (je->type == JE_BIG_WRITE)
|
else if (je->type == JE_BIG_WRITE)
|
||||||
{
|
{
|
||||||
|
@ -288,28 +321,47 @@ int blockstore_init_journal::handle_journal(void *buf, int len)
|
||||||
auto it = bs->dirty_queue.find(je->stable.oid);
|
auto it = bs->dirty_queue.find(je->stable.oid);
|
||||||
if (it == bs->dirty_queue.end())
|
if (it == bs->dirty_queue.end())
|
||||||
{
|
{
|
||||||
// FIXME ignore entry, but warn
|
// journal contains a legitimate STABLE entry for a non-existing dirty write
|
||||||
|
// this probably means that journal was trimmed between WRITTEN and STABLE entries
|
||||||
|
// skip for now. but FIXME: maybe warn about it in the future
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
auto & lst = it->second;
|
auto & lst = it->second;
|
||||||
for (int i = 0; i < lst.size(); i++)
|
int i;
|
||||||
|
for (i = 0; i < lst.size(); i++)
|
||||||
{
|
{
|
||||||
if (lst[i].version == je->stable.version)
|
if (lst[i].version == je->stable.version)
|
||||||
{
|
{
|
||||||
lst[i].state = (lst[i].state == ST_D_META_SYNCED ? ST_D_STABLE : ST_J_STABLE);
|
lst[i].state = (lst[i].state == ST_D_META_SYNCED
|
||||||
|
? ST_D_STABLE
|
||||||
|
: (lst[i].state == ST_DEL_SYNCED ? ST_DEL_STABLE : ST_J_STABLE));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (i >= lst.size())
|
||||||
|
{
|
||||||
|
// same. STABLE entry for a missing object version
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (je->type == JE_DELETE)
|
else if (je->type == JE_DELETE)
|
||||||
{
|
{
|
||||||
// oid, version
|
// oid, version
|
||||||
// FIXME
|
bs->dirty_queue[je->small_write.oid].push_back((dirty_entry){
|
||||||
|
.version = je->small_write.version,
|
||||||
|
.state = ST_DEL_SYNCED,
|
||||||
|
.flags = 0,
|
||||||
|
.location = 0,
|
||||||
|
.offset = 0,
|
||||||
|
.size = 0,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
total_pos += 512 + skip;
|
}
|
||||||
|
if (cur_skip == 0 && total_pos > len)
|
||||||
|
{
|
||||||
|
cur_skip = total_pos - len;
|
||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,8 +22,9 @@ class blockstore_init_journal
|
||||||
struct iovec submit_iov;
|
struct iovec submit_iov;
|
||||||
uint64_t done_pos = 0, journal_pos = 0;
|
uint64_t done_pos = 0, journal_pos = 0;
|
||||||
uint64_t cur_skip = 0;
|
uint64_t cur_skip = 0;
|
||||||
|
bool wrapped = false;
|
||||||
int submitted = 0, done_buf = 0, done_len = 0;
|
int submitted = 0, done_buf = 0, done_len = 0;
|
||||||
int handle_journal(void *buf, int len);
|
int handle_journal_part(void *buf, uint64_t len);
|
||||||
public:
|
public:
|
||||||
blockstore_init_journal(blockstore* bs);
|
blockstore_init_journal(blockstore* bs);
|
||||||
int read_loop();
|
int read_loop();
|
||||||
|
|
|
@ -1,7 +1,10 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include "crc32c.h"
|
||||||
|
|
||||||
#define MIN_JOURNAL_SIZE 4*1024*1024
|
#define MIN_JOURNAL_SIZE 4*1024*1024
|
||||||
#define JOURNAL_MAGIC 0x4A33
|
#define JOURNAL_MAGIC 0x4A33
|
||||||
|
#define JOURNAL_BUFFER_SIZE 4*1024*1024
|
||||||
|
|
||||||
// Journal entries
|
// Journal entries
|
||||||
// Journal entries are linked to each other by their crc32 value
|
// Journal entries are linked to each other by their crc32 value
|
||||||
|
@ -90,3 +93,8 @@ struct __attribute__((__packed__)) journal_entry
|
||||||
journal_entry_del del;
|
journal_entry_del del;
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
inline uint32_t je_crc32(journal_entry *je)
|
||||||
|
{
|
||||||
|
return crc32c_zero4(((uint8_t*)je)+4, je->size-4);
|
||||||
|
}
|
||||||
|
|
|
@ -1,5 +1,46 @@
|
||||||
#include "blockstore.h"
|
#include "blockstore.h"
|
||||||
|
|
||||||
|
int blockstore::fulfill_read_push(blockstore_operation & read_op, uint32_t item_start,
|
||||||
|
uint32_t item_state, uint64_t item_version, uint64_t item_location, uint32_t cur_start, uint32_t cur_end)
|
||||||
|
{
|
||||||
|
if (cur_end > cur_start)
|
||||||
|
{
|
||||||
|
if (item_state == ST_IN_FLIGHT)
|
||||||
|
{
|
||||||
|
// Pause until it's written somewhere
|
||||||
|
read_op.wait_for = WAIT_IN_FLIGHT;
|
||||||
|
read_op.wait_version = item_version;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
else if (item_state == ST_DEL_WRITTEN || item_state == ST_DEL_SYNCED || item_state == ST_DEL_MOVED)
|
||||||
|
{
|
||||||
|
// item is unallocated - return zeroes
|
||||||
|
memset(read_op.buf + cur_start - read_op.offset, 0, cur_end - cur_start);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
|
||||||
|
if (!sqe)
|
||||||
|
{
|
||||||
|
// Pause until there are more requests available
|
||||||
|
read_op.wait_for = WAIT_SQE;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
read_op.read_vec[cur_start] = (struct iovec){
|
||||||
|
read_op.buf + cur_start - read_op.offset,
|
||||||
|
cur_end - cur_start
|
||||||
|
};
|
||||||
|
io_uring_prep_readv(
|
||||||
|
sqe,
|
||||||
|
IS_JOURNAL(item_state) ? journal_fd : data_fd,
|
||||||
|
// FIXME: &read_op.read_vec is forbidden
|
||||||
|
&read_op.read_vec[cur_start], 1,
|
||||||
|
(IS_JOURNAL(item_state) ? journal_offset : data_offset) + item_location + cur_start - item_start
|
||||||
|
);
|
||||||
|
io_uring_sqe_set_data(sqe, 0/*read op link*/);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int blockstore::fulfill_read(blockstore_operation & read_op, uint32_t item_start, uint32_t item_end,
|
int blockstore::fulfill_read(blockstore_operation & read_op, uint32_t item_start, uint32_t item_end,
|
||||||
uint32_t item_state, uint64_t item_version, uint64_t item_location)
|
uint32_t item_state, uint64_t item_version, uint64_t item_location)
|
||||||
{
|
{
|
||||||
|
@ -19,66 +60,17 @@ int blockstore::fulfill_read(blockstore_operation & read_op, uint32_t item_start
|
||||||
}
|
}
|
||||||
while (fulfill_near != read_op.read_vec.end() && fulfill_near->first < item_end)
|
while (fulfill_near != read_op.read_vec.end() && fulfill_near->first < item_end)
|
||||||
{
|
{
|
||||||
if (fulfill_near->first > cur_start)
|
if (fulfill_read_push(read_op, item_start, item_state, item_version, item_location, cur_start, fulfill_near->first) < 0)
|
||||||
{
|
{
|
||||||
if (item_state == ST_IN_FLIGHT)
|
|
||||||
{
|
|
||||||
// Pause until it's written somewhere
|
|
||||||
read_op.wait_for = WAIT_IN_FLIGHT;
|
|
||||||
read_op.wait_version = item_version;
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
|
|
||||||
if (!sqe)
|
|
||||||
{
|
|
||||||
// Pause until there are more requests available
|
|
||||||
read_op.wait_for = WAIT_SQE;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
read_op.read_vec[cur_start] = (struct iovec){
|
|
||||||
read_op.buf + cur_start - read_op.offset,
|
|
||||||
fulfill_near->first - cur_start
|
|
||||||
};
|
|
||||||
io_uring_prep_readv(
|
|
||||||
sqe,
|
|
||||||
IS_JOURNAL(item_state) ? journal_fd : data_fd,
|
|
||||||
// FIXME: &read_op.read_vec is forbidden
|
|
||||||
&read_op.read_vec[cur_start], 1,
|
|
||||||
(IS_JOURNAL(item_state) ? journal_offset : data_offset) + item_location + cur_start - item_start
|
|
||||||
);
|
|
||||||
io_uring_sqe_set_data(sqe, 0/*read op link*/);
|
|
||||||
}
|
|
||||||
cur_start = fulfill_near->first + fulfill_near->second.iov_len;
|
cur_start = fulfill_near->first + fulfill_near->second.iov_len;
|
||||||
fulfill_near++;
|
fulfill_near++;
|
||||||
}
|
}
|
||||||
if (cur_start < item_end)
|
if (fulfill_read_push(read_op, item_start, item_state, item_version, item_location, cur_start, item_end) < 0)
|
||||||
{
|
{
|
||||||
if (item_state == ST_IN_FLIGHT)
|
|
||||||
{
|
|
||||||
// Pause until it's written somewhere
|
|
||||||
read_op.wait_for = WAIT_IN_FLIGHT;
|
|
||||||
read_op.wait_version = item_version;
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
|
|
||||||
if (!sqe)
|
|
||||||
{
|
|
||||||
// Pause until there are more requests available
|
|
||||||
read_op.wait_for = WAIT_SQE;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
read_op.read_vec[cur_start] = (struct iovec){
|
|
||||||
read_op.buf + cur_start - read_op.offset,
|
|
||||||
item_end - cur_start
|
|
||||||
};
|
|
||||||
io_uring_prep_readv(
|
|
||||||
sqe,
|
|
||||||
IS_JOURNAL(item_state) ? journal_fd : data_fd,
|
|
||||||
&read_op.read_vec[cur_start], 1,
|
|
||||||
(IS_JOURNAL(item_state) ? journal_offset : data_offset) + item_location + cur_start - item_start
|
|
||||||
);
|
|
||||||
io_uring_sqe_set_data(sqe, 0/*read op link*/);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -121,6 +113,7 @@ int blockstore::read(blockstore_operation *read_op)
|
||||||
// need to wait for something, undo added requests and requeue op
|
// need to wait for something, undo added requests and requeue op
|
||||||
ring->sq.sqe_tail = prev_sqe_pos;
|
ring->sq.sqe_tail = prev_sqe_pos;
|
||||||
read_op->read_vec.clear();
|
read_op->read_vec.clear();
|
||||||
|
// FIXME: bad implementation
|
||||||
submit_queue.push_front(read_op);
|
submit_queue.push_front(read_op);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -128,11 +121,11 @@ int blockstore::read(blockstore_operation *read_op)
|
||||||
if (!read_op->read_vec.size())
|
if (!read_op->read_vec.size())
|
||||||
{
|
{
|
||||||
// region is not allocated - return zeroes
|
// region is not allocated - return zeroes
|
||||||
free(read_op);
|
|
||||||
memset(read_op->buf, 0, read_op->len);
|
memset(read_op->buf, 0, read_op->len);
|
||||||
read_op->callback(read_op);
|
read_op->callback(read_op);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
// FIXME reap events!
|
||||||
int ret = io_uring_submit(ring);
|
int ret = io_uring_submit(ring);
|
||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue