Support inmemory journal

blocking-uring-test
Vitaliy Filippov 2019-11-28 14:41:03 +03:00
parent e1ac4dba23
commit 9fa0d3325f
11 changed files with 110 additions and 29 deletions

View File

@ -6,7 +6,7 @@ clean:
crc32c.o: crc32c.c
g++ -g -O3 -fPIC -c -o $@ $<
%.o: %.cpp allocator.h blockstore_flush.h blockstore.h blockstore_init.h blockstore_journal.h crc32c.h ringloop.h xor.h timerfd_interval.h
g++ -g -O3 -Wall -Wno-sign-compare -Wno-parentheses -fPIC -c -o $@ $<
g++ -g -O3 -Wall -Wno-sign-compare -Wno-parentheses -Wno-pointer-arith -fPIC -c -o $@ $<
test: test.cpp
g++ -g -O3 -o test -luring test.cpp
test_blockstore: $(BLOCKSTORE_OBJS) test_blockstore.cpp

View File

@ -54,8 +54,6 @@ blockstore::~blockstore()
close(meta_fd);
if (journal.fd >= 0 && journal.fd != meta_fd)
close(journal.fd);
free(journal.sector_buf);
free(journal.sector_info);
}
bool blockstore::is_started()

View File

@ -10,7 +10,7 @@ journal_flusher_t::journal_flusher_t(int flusher_count, blockstore *bs)
sync_threshold = flusher_count == 1 ? 1 : flusher_count/2;
journal_trim_interval = sync_threshold;
journal_trim_counter = 0;
journal_superblock = (uint8_t*)memalign(512, 512);
journal_superblock = bs->journal.inmemory ? bs->journal.buffer : memalign(512, 512);
co = new journal_flusher_co[flusher_count];
for (int i = 0; i < flusher_count; i++)
{
@ -48,7 +48,8 @@ journal_flusher_co::journal_flusher_co()
journal_flusher_t::~journal_flusher_t()
{
free(journal_superblock);
if (!bs->journal.inmemory)
free(journal_superblock);
delete[] co;
}
@ -176,6 +177,7 @@ resume_0:
flusher->active_until_sync++;
v.clear();
wait_count = 0;
copy_count = 0;
clean_loc = UINT64_MAX;
skip_copy = false;
while (1)
@ -183,7 +185,6 @@ resume_0:
if (dirty_it->second.state == ST_J_STABLE && !skip_copy)
{
// First we submit all reads
// FIXME: Introduce a (default) mode where we'll keep the whole journal in memory instead of re-reading data during flush
offset = dirty_it->second.offset;
len = dirty_it->second.len;
it = v.begin();
@ -194,15 +195,26 @@ resume_0:
break;
if (it == v.end() || it->offset > offset)
{
submit_offset = dirty_it->second.location + offset - dirty_it->second.offset;
submit_len = it == v.end() || it->offset >= offset+len ? len : it->offset-offset;
await_sqe(1);
it = v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(512, submit_len) });
data->iov = (struct iovec){ v.back().buf, (size_t)submit_len };
data->callback = simple_callback_r;
my_uring_prep_readv(
sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + dirty_it->second.location + offset - dirty_it->second.offset
);
wait_count++;
copy_count++;
if (bs->journal.inmemory)
{
// Take it from memory
memcpy(v.back().buf, bs->journal.buffer + submit_offset, submit_len);
}
else
{
// Read it from disk
await_sqe(1);
data->iov = (struct iovec){ v.back().buf, (size_t)submit_len };
data->callback = simple_callback_r;
my_uring_prep_readv(
sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + submit_offset
);
wait_count++;
}
}
if (it == v.end() || it->offset+it->len >= offset+len)
{
@ -238,7 +250,7 @@ resume_0:
break;
}
}
if (wait_count == 0 && clean_loc == UINT64_MAX)
if (copy_count == 0 && clean_loc == UINT64_MAX)
{
// Nothing to flush
flusher->active_flushers--;

View File

@ -33,7 +33,8 @@ class journal_flusher_co
std::map<obj_ver_id, dirty_entry>::iterator dirty_it, dirty_start, dirty_end;
std::vector<copy_buffer_t> v;
std::vector<copy_buffer_t>::iterator it;
uint64_t offset, len, submit_len, clean_loc, old_clean_loc, meta_sector, meta_pos;
int copy_count;
uint64_t offset, len, submit_offset, submit_len, clean_loc, old_clean_loc, meta_sector, meta_pos;
std::map<uint64_t, meta_sector_t>::iterator meta_it;
std::map<object_id, uint64_t>::iterator repeat_it;
std::map<uint64_t, uint64_t>::iterator journal_used_it;
@ -56,7 +57,7 @@ class journal_flusher_t
friend class journal_flusher_co;
int journal_trim_counter, journal_trim_interval;
uint8_t* journal_superblock;
void* journal_superblock;
int active_flushers, active_until_sync;
std::list<flusher_sync_t> syncs;

View File

@ -167,7 +167,14 @@ int blockstore_init_journal::loop()
goto resume_3;
else if (wait_state == 4)
goto resume_4;
journal_buffer = (uint8_t*)memalign(DISK_ALIGNMENT, 2*JOURNAL_BUFFER_SIZE);
if (!bs->journal.inmemory)
{
journal_buffer = (uint8_t*)memalign(DISK_ALIGNMENT, 2*JOURNAL_BUFFER_SIZE);
if (!journal_buffer)
throw std::bad_alloc();
}
else
journal_buffer = bs->journal.buffer;
// Read first block of the journal
sqe = bs->get_sqe();
if (!sqe)
@ -254,7 +261,7 @@ resume_1:
end = bs->journal.used_start;
}
data->iov = {
journal_buffer + (done_buf == 1 ? JOURNAL_BUFFER_SIZE : 0),
journal_buffer + (bs->journal.inmemory ? journal_pos : (done_buf == 1 ? JOURNAL_BUFFER_SIZE : 0)),
end - journal_pos < JOURNAL_BUFFER_SIZE ? end - journal_pos : JOURNAL_BUFFER_SIZE,
};
data->callback = [this](ring_data_t *data1) { handle_event(data1); };
@ -262,7 +269,9 @@ resume_1:
bs->ringloop->submit();
submitted = done_buf == 1 ? 2 : 1;
}
if (done_buf && handle_journal_part(journal_buffer + (done_buf == 1 ? 0 : JOURNAL_BUFFER_SIZE), done_len) == 0)
if (done_buf && handle_journal_part(journal_buffer + (bs->journal.inmemory
? done_pos
: (done_buf == 1 ? 0 : JOURNAL_BUFFER_SIZE)), done_len) == 0)
{
// journal ended. wait for the next read to complete, then stop
resume_3:
@ -279,7 +288,10 @@ resume_1:
}
}
printf("Journal entries loaded: %d\n", entries_loaded);
free(journal_buffer);
if (!bs->journal.inmemory)
{
free(journal_buffer);
}
bs->journal.crc32_last = crc32_last;
journal_buffer = NULL;
return 0;

View File

@ -22,7 +22,7 @@ class blockstore_init_journal
blockstore *bs;
int wait_state = 0, wait_count = 0;
int entries_loaded = 0;
uint8_t *journal_buffer = NULL;
void *journal_buffer = NULL;
uint32_t crc32_last = 0;
bool started = false;
uint64_t done_pos = 0, journal_pos = 0;

View File

@ -76,10 +76,14 @@ journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type,
journal.sector_info[journal.cur_sector].offset = journal.next_free;
journal.in_sector_pos = 0;
journal.next_free = (journal.next_free+512) < journal.len ? journal.next_free + 512 : 512;
memset(journal.sector_buf + 512*journal.cur_sector, 0, 512);
memset(journal.inmemory
? journal.buffer + journal.sector_info[journal.cur_sector].offset
: journal.sector_buf + 512*journal.cur_sector, 0, 512);
}
journal_entry *je = (struct journal_entry*)(
journal.sector_buf + 512*journal.cur_sector + journal.in_sector_pos
(journal.inmemory
? journal.buffer + journal.sector_info[journal.cur_sector].offset
: journal.sector_buf + 512*journal.cur_sector) + journal.in_sector_pos
);
journal.in_sector_pos += size;
je->magic = JOURNAL_MAGIC;
@ -93,9 +97,27 @@ void prepare_journal_sector_write(journal_t & journal, io_uring_sqe *sqe, std::f
{
journal.sector_info[journal.cur_sector].usage_count++;
ring_data_t *data = ((ring_data_t*)sqe->user_data);
data->iov = (struct iovec){ journal.sector_buf + 512*journal.cur_sector, 512 };
data->iov = (struct iovec){
(journal.inmemory
? journal.buffer + journal.sector_info[journal.cur_sector].offset
: journal.sector_buf + 512*journal.cur_sector),
512
};
data->callback = cb;
my_uring_prep_writev(
sqe, journal.fd, &data->iov, 1, journal.offset + journal.sector_info[journal.cur_sector].offset
);
}
journal_t::~journal_t()
{
if (sector_buf)
free(sector_buf);
if (sector_info)
free(sector_info);
if (buffer)
free(buffer);
sector_buf = NULL;
sector_info = NULL;
buffer = NULL;
}

View File

@ -114,6 +114,8 @@ struct journal_t
{
int fd;
uint64_t device_size;
bool inmemory = false;
void *buffer = NULL;
uint64_t offset, len;
uint64_t next_free = 512;
@ -121,8 +123,8 @@ struct journal_t
uint32_t crc32_last = 0;
// Current sector(s) used for writing
uint8_t *sector_buf;
journal_sector_info_t *sector_info;
void *sector_buf = NULL;
journal_sector_info_t *sector_info = NULL;
uint64_t sector_count;
int cur_sector = 0;
int in_sector_pos = 512; // no free space because sector is initially unmapped
@ -130,6 +132,8 @@ struct journal_t
// Used sector map
// May use ~ 80 MB per 1 GB of used journal space in the worst case
std::map<uint64_t, uint64_t> used_sectors;
~journal_t();
};
struct blockstore_journal_check_t

View File

@ -61,6 +61,12 @@ void blockstore::calc_lengths(blockstore_config_t & config)
{
throw std::runtime_error("Journal is too small");
}
if (journal.inmemory)
{
journal.buffer = memalign(512, journal.len);
if (!journal.buffer)
throw std::bad_alloc();
}
}
void check_size(int fd, uint64_t *size, std::string name)
@ -171,10 +177,20 @@ void blockstore::open_journal(blockstore_config_t & config)
{
journal.sector_count = 32;
}
journal.sector_buf = (uint8_t*)memalign(512, journal.sector_count * 512);
journal.sector_info = (journal_sector_info_t*)calloc(journal.sector_count, sizeof(journal_sector_info_t));
if (!journal.sector_buf || !journal.sector_info)
if (!journal.sector_info)
{
throw std::bad_alloc();
}
if (config["journal_inmemory"] == "false")
{
journal.inmemory = false;
journal.sector_buf = (uint8_t*)memalign(512, journal.sector_count * 512);
if (!journal.sector_buf)
throw std::bad_alloc();
}
else
{
journal.inmemory = true;
}
}

View File

@ -18,6 +18,16 @@ int blockstore::fulfill_read_push(blockstore_operation *op, uint64_t &fulfilled,
memset((uint8_t*)op->buf + cur_start - op->offset, 0, cur_end - cur_start);
return 1;
}
if (journal.inmemory && IS_JOURNAL(item_state))
{
iovec v = {
(uint8_t*)op->buf + cur_start - op->offset,
cur_end - cur_start
};
op->read_vec[cur_start] = v;
memcpy(v.iov_base, journal.buffer + item_location + cur_start - item_start, v.iov_len);
return 1;
}
BS_SUBMIT_GET_SQE(sqe, data);
data->iov = (struct iovec){
(uint8_t*)op->buf + cur_start - op->offset,
@ -25,6 +35,7 @@ int blockstore::fulfill_read_push(blockstore_operation *op, uint64_t &fulfilled,
};
// FIXME: use simple std::vector instead of map for read_vec
op->read_vec[cur_start] = data->iov;
op->pending_ops++;
my_uring_prep_readv(
sqe,
IS_JOURNAL(item_state) ? journal.fd : data_fd,
@ -90,6 +101,7 @@ int blockstore::dequeue_read(blockstore_operation *read_op)
return 1;
}
uint64_t fulfilled = 0;
read_op->pending_ops = 0;
if (dirty_found)
{
while (dirty_it->first.oid == read_op->oid)
@ -137,7 +149,6 @@ int blockstore::dequeue_read(blockstore_operation *read_op)
return 1;
}
read_op->retval = 0;
read_op->pending_ops = read_op->read_vec.size();
return 1;
}

View File

@ -137,6 +137,11 @@ int blockstore::dequeue_write(blockstore_operation *op)
prepare_journal_sector_write(journal, sqe1, cb);
op->min_used_journal_sector = op->max_used_journal_sector = 1 + journal.cur_sector;
// Prepare journal data write
if (journal.inmemory)
{
// Copy data
memcpy(journal.buffer + journal.next_free, op->buf, op->len);
}
data2->iov = (struct iovec){ op->buf, op->len };
data2->callback = cb;
my_uring_prep_writev(