From 76caecf7c7ada1d0bed2bda1268a503005ffa0e0 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 2 Dec 2019 02:44:56 +0300 Subject: [PATCH] Inmemory metadata mode --- blockstore.cpp | 4 +++- blockstore.h | 5 ++++- blockstore_flush.cpp | 49 ++++++++++++++++++++++++++------------------ blockstore_flush.h | 1 + blockstore_init.cpp | 26 ++++++++++++++++------- blockstore_init.h | 4 ++-- blockstore_open.cpp | 11 ++++++++-- 7 files changed, 67 insertions(+), 33 deletions(-) diff --git a/blockstore.cpp b/blockstore.cpp index 86caee6b..29c44525 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -12,7 +12,7 @@ blockstore::blockstore(blockstore_config_t & config, ring_loop_t *ringloop) block_order = DEFAULT_ORDER; } block_size = 1 << block_order; - if (block_size <= 1 || block_size >= MAX_BLOCK_SIZE) + if (block_size < MIN_BLOCK_SIZE || block_size >= MAX_BLOCK_SIZE) { throw std::runtime_error("Bad block size"); } @@ -54,6 +54,8 @@ blockstore::~blockstore() close(meta_fd); if (journal.fd >= 0 && journal.fd != meta_fd) close(journal.fd); + if (metadata_buffer) + free(metadata_buffer); } bool blockstore::is_started() diff --git a/blockstore.h b/blockstore.h index 39fe3389..baf43abb 100644 --- a/blockstore.h +++ b/blockstore.h @@ -58,8 +58,9 @@ #define IS_DELETE(st) (st >= ST_DEL_SUBMITTED && st <= ST_DEL_STABLE) #define IS_UNSYNCED(st) (st >= ST_J_SUBMITTED && st <= ST_J_WRITTEN || st >= ST_D_SUBMITTED && st <= ST_D_META_WRITTEN || st >= ST_DEL_SUBMITTED && st <= ST_DEL_WRITTEN) -// Default object size is 128 KB +// Default block size is 128 KB, current allowed range is 4K - 128M #define DEFAULT_ORDER 17 +#define MIN_BLOCK_SIZE 4*1024 #define MAX_BLOCK_SIZE 128*1024*1024 #define DISK_ALIGNMENT 512 @@ -272,6 +273,8 @@ class blockstore bool readonly = false; bool disable_fsync = false; + bool inmemory_meta = false; + void *metadata_buffer = NULL; struct journal_t journal; journal_flusher_t *flusher; diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index d4973097..574350aa 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -341,7 +341,7 @@ resume_0: } resume_5: // And metadata writes, but only after data writes complete - if (meta_new.it->second.state == 0 || wait_count > 0) + if (!bs->inmemory_meta && meta_new.it->second.state == 0 || wait_count > 0) { // metadata sector is still being read or data is still being written, wait for it wait_state = 5; @@ -349,28 +349,28 @@ resume_0: } if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc) { - if (meta_old.it->second.state == 0) + if (!bs->inmemory_meta && meta_old.it->second.state == 0) { wait_state = 5; return false; } - ((clean_disk_entry*)meta_old.it->second.buf)[meta_old.pos] = { 0 }; + ((clean_disk_entry*)meta_old.buf)[meta_old.pos] = { 0 }; await_sqe(15); - data->iov = (struct iovec){ meta_old.it->second.buf, 512 }; + data->iov = (struct iovec){ meta_old.buf, 512 }; data->callback = simple_callback_w; my_uring_prep_writev( sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_old.sector ); wait_count++; } - ((clean_disk_entry*)meta_new.it->second.buf)[meta_new.pos] = has_delete + ((clean_disk_entry*)meta_new.buf)[meta_new.pos] = has_delete ? (clean_disk_entry){ 0 } : (clean_disk_entry){ .oid = cur.oid, .version = cur.version, }; await_sqe(6); - data->iov = (struct iovec){ meta_new.it->second.buf, 512 }; + data->iov = (struct iovec){ meta_new.buf, 512 }; data->callback = simple_callback_w; my_uring_prep_writev( sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_new.sector @@ -383,19 +383,22 @@ resume_0: return false; } // Done, free all buffers - meta_new.it->second.usage_count--; - if (meta_new.it->second.usage_count == 0) + if (!bs->inmemory_meta) { - free(meta_new.it->second.buf); - flusher->meta_sectors.erase(meta_new.it); - } - if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc) - { - meta_old.it->second.usage_count--; - if (meta_old.it->second.usage_count == 0) + meta_new.it->second.usage_count--; + if (meta_new.it->second.usage_count == 0) { - free(meta_old.it->second.buf); - flusher->meta_sectors.erase(meta_old.it); + free(meta_new.it->second.buf); + flusher->meta_sectors.erase(meta_new.it); + } + if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc) + { + meta_old.it->second.usage_count--; + if (meta_old.it->second.usage_count == 0) + { + free(meta_old.it->second.buf); + flusher->meta_sectors.erase(meta_old.it); + } } } for (it = v.begin(); it != v.end(); it++) @@ -465,21 +468,26 @@ bool journal_flusher_co::modify_meta_read(uint64_t meta_loc, flusher_meta_write_ { if (wait_state == wait_base) goto resume_0; - // But we must check if the same sector is already in memory. - // Another option is to keep all raw metadata in memory all the time. FIXME: Maybe add this mode. + // We must check if the same sector is already in memory if we don't keep all metadata in memory all the time. // And yet another option is to use LSM trees for metadata, but it sophisticates everything a lot, // so I'll avoid it as long as I can. wr.sector = ((meta_loc >> bs->block_order) / (512 / sizeof(clean_disk_entry))) * 512; wr.pos = ((meta_loc >> bs->block_order) % (512 / sizeof(clean_disk_entry))); + if (bs->inmemory_meta) + { + wr.buf = bs->metadata_buffer + wr.sector; + return true; + } wr.it = flusher->meta_sectors.find(wr.sector); if (wr.it == flusher->meta_sectors.end()) { // Not in memory yet, read it + wr.buf = memalign(512, 512); wr.it = flusher->meta_sectors.emplace(wr.sector, (meta_sector_t){ .offset = wr.sector, .len = 512, .state = 0, // 0 = not read yet - .buf = memalign(512, 512), + .buf = wr.buf, .usage_count = 1, }).first; await_sqe(0); @@ -494,6 +502,7 @@ bool journal_flusher_co::modify_meta_read(uint64_t meta_loc, flusher_meta_write_ else { wr.submitted = false; + wr.buf = wr.it->second.buf; wr.it->second.usage_count++; } return true; diff --git a/blockstore_flush.h b/blockstore_flush.h index e1432a0c..eba6c61e 100644 --- a/blockstore_flush.h +++ b/blockstore_flush.h @@ -22,6 +22,7 @@ struct flusher_meta_write_t { uint64_t sector, pos; bool submitted; + void *buf; std::map::iterator it; }; diff --git a/blockstore_init.cpp b/blockstore_init.cpp index ca7bf1d9..38c846c3 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -16,6 +16,7 @@ void blockstore_init_meta::handle_event(ring_data_t *data) } prev_done = data->res > 0 ? submitted : 0; done_len = data->res; + done_pos = metadata_read; metadata_read += data->res; submitted = 0; } @@ -25,9 +26,12 @@ int blockstore_init_meta::loop() if (wait_state == 1) goto resume_1; printf("Reading blockstore metadata\n"); - metadata_buffer = (uint8_t*)memalign(512, 2*bs->metadata_buf_size); + if (bs->inmemory_meta) + metadata_buffer = bs->metadata_buffer; + else + metadata_buffer = memalign(512, 2*bs->metadata_buf_size); if (!metadata_buffer) - throw std::bad_alloc(); + throw std::runtime_error("Failed to allocate metadata read buffer"); while (1) { resume_1: @@ -45,7 +49,9 @@ int blockstore_init_meta::loop() } data = ((ring_data_t*)sqe->user_data); data->iov = { - metadata_buffer + (prev == 1 ? bs->metadata_buf_size : 0), + metadata_buffer + (bs->inmemory_meta + ? metadata_read + : (prev == 1 ? bs->metadata_buf_size : 0)), bs->meta_len - metadata_read > bs->metadata_buf_size ? bs->metadata_buf_size : bs->meta_len - metadata_read, }; data->callback = [this](ring_data_t *data) { handle_event(data); }; @@ -56,10 +62,13 @@ int blockstore_init_meta::loop() } if (prev_done) { + void *done_buf = bs->inmemory_meta + ? (metadata_buffer + done_pos) + : (metadata_buffer + (prev_done == 2 ? bs->metadata_buf_size : 0)); unsigned count = 512 / sizeof(clean_disk_entry); for (int sector = 0; sector < done_len; sector += 512) { - clean_disk_entry *entries = (clean_disk_entry*)(metadata_buffer + (prev_done == 2 ? bs->metadata_buf_size : 0) + sector); + clean_disk_entry *entries = (clean_disk_entry*)(done_buf + sector); // handle entries handle_entries(entries, count, bs->block_order); done_cnt += count; @@ -74,8 +83,11 @@ int blockstore_init_meta::loop() } // metadata read finished printf("Metadata entries loaded: %lu, free blocks: %lu / %lu\n", entries_loaded, bs->data_alloc->get_free_count(), bs->block_count); - free(metadata_buffer); - metadata_buffer = NULL; + if (!bs->inmemory_meta) + { + free(metadata_buffer); + metadata_buffer = NULL; + } return 0; } @@ -394,7 +406,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u resume: while (pos < 512) { - journal_entry *je = (journal_entry*)((uint8_t*)buf + proc_pos - done_pos + pos); + journal_entry *je = (journal_entry*)(buf + proc_pos - done_pos + pos); if (je->magic != JOURNAL_MAGIC || je_crc32(je) != je->crc32 || je->type < JE_SMALL_WRITE || je->type > JE_DELETE || started && je->crc32_prev != crc32_last) { diff --git a/blockstore_init.h b/blockstore_init.h index e0666d24..395ac58b 100644 --- a/blockstore_init.h +++ b/blockstore_init.h @@ -4,10 +4,10 @@ class blockstore_init_meta { blockstore *bs; int wait_state = 0, wait_count = 0; - uint8_t *metadata_buffer = NULL; + void *metadata_buffer = NULL; uint64_t metadata_read = 0; int prev = 0, prev_done = 0, done_len = 0, submitted = 0; - uint64_t done_cnt = 0; + uint64_t done_cnt = 0, done_pos = 0; uint64_t entries_loaded = 0; struct io_uring_sqe *sqe; struct ring_data_t *data; diff --git a/blockstore_open.cpp b/blockstore_open.cpp index 957ea78a..ae2e9438 100644 --- a/blockstore_open.cpp +++ b/blockstore_open.cpp @@ -55,6 +55,13 @@ void blockstore::calc_lengths(blockstore_config_t & config) { metadata_buf_size = 4*1024*1024; } + inmemory_meta = config["inmemory_metadata"] != "false"; + if (inmemory_meta) + { + metadata_buffer = memalign(512, meta_len); + if (!metadata_buffer) + throw std::runtime_error("Failed to allocate memory for metadata"); + } // requested journal size uint64_t journal_wanted = strtoull(config["journal_size"].c_str(), NULL, 10); if (journal_wanted > journal.len) @@ -73,7 +80,7 @@ void blockstore::calc_lengths(blockstore_config_t & config) { journal.buffer = memalign(512, journal.len); if (!journal.buffer) - throw std::bad_alloc(); + throw std::runtime_error("Failed to allocate memory for journal"); } } @@ -190,7 +197,7 @@ void blockstore::open_journal(blockstore_config_t & config) { throw std::bad_alloc(); } - if (config["journal_inmemory"] == "false") + if (config["inmemory_journal"] == "false") { journal.inmemory = false; journal.sector_buf = (uint8_t*)memalign(512, journal.sector_count * 512);