Inmemory metadata mode
parent
f4d06ba102
commit
76caecf7c7
|
@ -12,7 +12,7 @@ blockstore::blockstore(blockstore_config_t & config, ring_loop_t *ringloop)
|
|||
block_order = DEFAULT_ORDER;
|
||||
}
|
||||
block_size = 1 << block_order;
|
||||
if (block_size <= 1 || block_size >= MAX_BLOCK_SIZE)
|
||||
if (block_size < MIN_BLOCK_SIZE || block_size >= MAX_BLOCK_SIZE)
|
||||
{
|
||||
throw std::runtime_error("Bad block size");
|
||||
}
|
||||
|
@ -54,6 +54,8 @@ blockstore::~blockstore()
|
|||
close(meta_fd);
|
||||
if (journal.fd >= 0 && journal.fd != meta_fd)
|
||||
close(journal.fd);
|
||||
if (metadata_buffer)
|
||||
free(metadata_buffer);
|
||||
}
|
||||
|
||||
bool blockstore::is_started()
|
||||
|
|
|
@ -58,8 +58,9 @@
|
|||
#define IS_DELETE(st) (st >= ST_DEL_SUBMITTED && st <= ST_DEL_STABLE)
|
||||
#define IS_UNSYNCED(st) (st >= ST_J_SUBMITTED && st <= ST_J_WRITTEN || st >= ST_D_SUBMITTED && st <= ST_D_META_WRITTEN || st >= ST_DEL_SUBMITTED && st <= ST_DEL_WRITTEN)
|
||||
|
||||
// Default object size is 128 KB
|
||||
// Default block size is 128 KB, current allowed range is 4K - 128M
|
||||
#define DEFAULT_ORDER 17
|
||||
#define MIN_BLOCK_SIZE 4*1024
|
||||
#define MAX_BLOCK_SIZE 128*1024*1024
|
||||
#define DISK_ALIGNMENT 512
|
||||
|
||||
|
@ -272,6 +273,8 @@ class blockstore
|
|||
|
||||
bool readonly = false;
|
||||
bool disable_fsync = false;
|
||||
bool inmemory_meta = false;
|
||||
void *metadata_buffer = NULL;
|
||||
|
||||
struct journal_t journal;
|
||||
journal_flusher_t *flusher;
|
||||
|
|
|
@ -341,7 +341,7 @@ resume_0:
|
|||
}
|
||||
resume_5:
|
||||
// And metadata writes, but only after data writes complete
|
||||
if (meta_new.it->second.state == 0 || wait_count > 0)
|
||||
if (!bs->inmemory_meta && meta_new.it->second.state == 0 || wait_count > 0)
|
||||
{
|
||||
// metadata sector is still being read or data is still being written, wait for it
|
||||
wait_state = 5;
|
||||
|
@ -349,28 +349,28 @@ resume_0:
|
|||
}
|
||||
if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc)
|
||||
{
|
||||
if (meta_old.it->second.state == 0)
|
||||
if (!bs->inmemory_meta && meta_old.it->second.state == 0)
|
||||
{
|
||||
wait_state = 5;
|
||||
return false;
|
||||
}
|
||||
((clean_disk_entry*)meta_old.it->second.buf)[meta_old.pos] = { 0 };
|
||||
((clean_disk_entry*)meta_old.buf)[meta_old.pos] = { 0 };
|
||||
await_sqe(15);
|
||||
data->iov = (struct iovec){ meta_old.it->second.buf, 512 };
|
||||
data->iov = (struct iovec){ meta_old.buf, 512 };
|
||||
data->callback = simple_callback_w;
|
||||
my_uring_prep_writev(
|
||||
sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_old.sector
|
||||
);
|
||||
wait_count++;
|
||||
}
|
||||
((clean_disk_entry*)meta_new.it->second.buf)[meta_new.pos] = has_delete
|
||||
((clean_disk_entry*)meta_new.buf)[meta_new.pos] = has_delete
|
||||
? (clean_disk_entry){ 0 }
|
||||
: (clean_disk_entry){
|
||||
.oid = cur.oid,
|
||||
.version = cur.version,
|
||||
};
|
||||
await_sqe(6);
|
||||
data->iov = (struct iovec){ meta_new.it->second.buf, 512 };
|
||||
data->iov = (struct iovec){ meta_new.buf, 512 };
|
||||
data->callback = simple_callback_w;
|
||||
my_uring_prep_writev(
|
||||
sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_new.sector
|
||||
|
@ -383,6 +383,8 @@ resume_0:
|
|||
return false;
|
||||
}
|
||||
// Done, free all buffers
|
||||
if (!bs->inmemory_meta)
|
||||
{
|
||||
meta_new.it->second.usage_count--;
|
||||
if (meta_new.it->second.usage_count == 0)
|
||||
{
|
||||
|
@ -398,6 +400,7 @@ resume_0:
|
|||
flusher->meta_sectors.erase(meta_old.it);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (it = v.begin(); it != v.end(); it++)
|
||||
{
|
||||
free(it->buf);
|
||||
|
@ -465,21 +468,26 @@ bool journal_flusher_co::modify_meta_read(uint64_t meta_loc, flusher_meta_write_
|
|||
{
|
||||
if (wait_state == wait_base)
|
||||
goto resume_0;
|
||||
// But we must check if the same sector is already in memory.
|
||||
// Another option is to keep all raw metadata in memory all the time. FIXME: Maybe add this mode.
|
||||
// We must check if the same sector is already in memory if we don't keep all metadata in memory all the time.
|
||||
// And yet another option is to use LSM trees for metadata, but it sophisticates everything a lot,
|
||||
// so I'll avoid it as long as I can.
|
||||
wr.sector = ((meta_loc >> bs->block_order) / (512 / sizeof(clean_disk_entry))) * 512;
|
||||
wr.pos = ((meta_loc >> bs->block_order) % (512 / sizeof(clean_disk_entry)));
|
||||
if (bs->inmemory_meta)
|
||||
{
|
||||
wr.buf = bs->metadata_buffer + wr.sector;
|
||||
return true;
|
||||
}
|
||||
wr.it = flusher->meta_sectors.find(wr.sector);
|
||||
if (wr.it == flusher->meta_sectors.end())
|
||||
{
|
||||
// Not in memory yet, read it
|
||||
wr.buf = memalign(512, 512);
|
||||
wr.it = flusher->meta_sectors.emplace(wr.sector, (meta_sector_t){
|
||||
.offset = wr.sector,
|
||||
.len = 512,
|
||||
.state = 0, // 0 = not read yet
|
||||
.buf = memalign(512, 512),
|
||||
.buf = wr.buf,
|
||||
.usage_count = 1,
|
||||
}).first;
|
||||
await_sqe(0);
|
||||
|
@ -494,6 +502,7 @@ bool journal_flusher_co::modify_meta_read(uint64_t meta_loc, flusher_meta_write_
|
|||
else
|
||||
{
|
||||
wr.submitted = false;
|
||||
wr.buf = wr.it->second.buf;
|
||||
wr.it->second.usage_count++;
|
||||
}
|
||||
return true;
|
||||
|
|
|
@ -22,6 +22,7 @@ struct flusher_meta_write_t
|
|||
{
|
||||
uint64_t sector, pos;
|
||||
bool submitted;
|
||||
void *buf;
|
||||
std::map<uint64_t, meta_sector_t>::iterator it;
|
||||
};
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@ void blockstore_init_meta::handle_event(ring_data_t *data)
|
|||
}
|
||||
prev_done = data->res > 0 ? submitted : 0;
|
||||
done_len = data->res;
|
||||
done_pos = metadata_read;
|
||||
metadata_read += data->res;
|
||||
submitted = 0;
|
||||
}
|
||||
|
@ -25,9 +26,12 @@ int blockstore_init_meta::loop()
|
|||
if (wait_state == 1)
|
||||
goto resume_1;
|
||||
printf("Reading blockstore metadata\n");
|
||||
metadata_buffer = (uint8_t*)memalign(512, 2*bs->metadata_buf_size);
|
||||
if (bs->inmemory_meta)
|
||||
metadata_buffer = bs->metadata_buffer;
|
||||
else
|
||||
metadata_buffer = memalign(512, 2*bs->metadata_buf_size);
|
||||
if (!metadata_buffer)
|
||||
throw std::bad_alloc();
|
||||
throw std::runtime_error("Failed to allocate metadata read buffer");
|
||||
while (1)
|
||||
{
|
||||
resume_1:
|
||||
|
@ -45,7 +49,9 @@ int blockstore_init_meta::loop()
|
|||
}
|
||||
data = ((ring_data_t*)sqe->user_data);
|
||||
data->iov = {
|
||||
metadata_buffer + (prev == 1 ? bs->metadata_buf_size : 0),
|
||||
metadata_buffer + (bs->inmemory_meta
|
||||
? metadata_read
|
||||
: (prev == 1 ? bs->metadata_buf_size : 0)),
|
||||
bs->meta_len - metadata_read > bs->metadata_buf_size ? bs->metadata_buf_size : bs->meta_len - metadata_read,
|
||||
};
|
||||
data->callback = [this](ring_data_t *data) { handle_event(data); };
|
||||
|
@ -56,10 +62,13 @@ int blockstore_init_meta::loop()
|
|||
}
|
||||
if (prev_done)
|
||||
{
|
||||
void *done_buf = bs->inmemory_meta
|
||||
? (metadata_buffer + done_pos)
|
||||
: (metadata_buffer + (prev_done == 2 ? bs->metadata_buf_size : 0));
|
||||
unsigned count = 512 / sizeof(clean_disk_entry);
|
||||
for (int sector = 0; sector < done_len; sector += 512)
|
||||
{
|
||||
clean_disk_entry *entries = (clean_disk_entry*)(metadata_buffer + (prev_done == 2 ? bs->metadata_buf_size : 0) + sector);
|
||||
clean_disk_entry *entries = (clean_disk_entry*)(done_buf + sector);
|
||||
// handle <count> entries
|
||||
handle_entries(entries, count, bs->block_order);
|
||||
done_cnt += count;
|
||||
|
@ -74,8 +83,11 @@ int blockstore_init_meta::loop()
|
|||
}
|
||||
// metadata read finished
|
||||
printf("Metadata entries loaded: %lu, free blocks: %lu / %lu\n", entries_loaded, bs->data_alloc->get_free_count(), bs->block_count);
|
||||
if (!bs->inmemory_meta)
|
||||
{
|
||||
free(metadata_buffer);
|
||||
metadata_buffer = NULL;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -394,7 +406,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
|
|||
resume:
|
||||
while (pos < 512)
|
||||
{
|
||||
journal_entry *je = (journal_entry*)((uint8_t*)buf + proc_pos - done_pos + pos);
|
||||
journal_entry *je = (journal_entry*)(buf + proc_pos - done_pos + pos);
|
||||
if (je->magic != JOURNAL_MAGIC || je_crc32(je) != je->crc32 ||
|
||||
je->type < JE_SMALL_WRITE || je->type > JE_DELETE || started && je->crc32_prev != crc32_last)
|
||||
{
|
||||
|
|
|
@ -4,10 +4,10 @@ class blockstore_init_meta
|
|||
{
|
||||
blockstore *bs;
|
||||
int wait_state = 0, wait_count = 0;
|
||||
uint8_t *metadata_buffer = NULL;
|
||||
void *metadata_buffer = NULL;
|
||||
uint64_t metadata_read = 0;
|
||||
int prev = 0, prev_done = 0, done_len = 0, submitted = 0;
|
||||
uint64_t done_cnt = 0;
|
||||
uint64_t done_cnt = 0, done_pos = 0;
|
||||
uint64_t entries_loaded = 0;
|
||||
struct io_uring_sqe *sqe;
|
||||
struct ring_data_t *data;
|
||||
|
|
|
@ -55,6 +55,13 @@ void blockstore::calc_lengths(blockstore_config_t & config)
|
|||
{
|
||||
metadata_buf_size = 4*1024*1024;
|
||||
}
|
||||
inmemory_meta = config["inmemory_metadata"] != "false";
|
||||
if (inmemory_meta)
|
||||
{
|
||||
metadata_buffer = memalign(512, meta_len);
|
||||
if (!metadata_buffer)
|
||||
throw std::runtime_error("Failed to allocate memory for metadata");
|
||||
}
|
||||
// requested journal size
|
||||
uint64_t journal_wanted = strtoull(config["journal_size"].c_str(), NULL, 10);
|
||||
if (journal_wanted > journal.len)
|
||||
|
@ -73,7 +80,7 @@ void blockstore::calc_lengths(blockstore_config_t & config)
|
|||
{
|
||||
journal.buffer = memalign(512, journal.len);
|
||||
if (!journal.buffer)
|
||||
throw std::bad_alloc();
|
||||
throw std::runtime_error("Failed to allocate memory for journal");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -190,7 +197,7 @@ void blockstore::open_journal(blockstore_config_t & config)
|
|||
{
|
||||
throw std::bad_alloc();
|
||||
}
|
||||
if (config["journal_inmemory"] == "false")
|
||||
if (config["inmemory_journal"] == "false")
|
||||
{
|
||||
journal.inmemory = false;
|
||||
journal.sector_buf = (uint8_t*)memalign(512, journal.sector_count * 512);
|
||||
|
|
Loading…
Reference in New Issue