From e871de27de58661ba322f7ea5c00c8d6cefafcd2 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sat, 9 Mar 2024 02:16:42 +0300 Subject: [PATCH] Support unaligned shared_offsets, align shared file data instead of header --- src/nfs_kv.cpp | 3 +- src/nfs_kv.h | 2 +- src/nfs_kv_read.cpp | 60 +++++++---- src/nfs_kv_write.cpp | 233 ++++++++++++++++++++++++++++--------------- 4 files changed, 201 insertions(+), 97 deletions(-) diff --git a/src/nfs_kv.cpp b/src/nfs_kv.cpp index 60d74600..89a0b98f 100644 --- a/src/nfs_kv.cpp +++ b/src/nfs_kv.cpp @@ -257,5 +257,6 @@ void kv_fs_state_t::init(nfs_proxy_t *proxy, json11::Json cfg) { shared_inode_threshold = cfg["shared_inode_threshold"].uint64_value(); } - zero_block.resize(pool_block_size); + zero_block.resize(pool_block_size < 1048576 ? 1048576 : pool_block_size); + scrap_block.resize(pool_block_size < 1048576 ? 1048576 : pool_block_size); } diff --git a/src/nfs_kv.h b/src/nfs_kv.h index e52e7ef3..d01c5e03 100644 --- a/src/nfs_kv.h +++ b/src/nfs_kv.h @@ -33,7 +33,6 @@ struct shared_alloc_queue_t { nfs_kv_write_state *st; int state; - uint64_t size; }; struct kv_inode_extend_t @@ -60,6 +59,7 @@ struct kv_fs_state_t uint64_t cur_shared_inode = 0, cur_shared_offset = 0; std::map extends; std::vector zero_block; + std::vector scrap_block; void init(nfs_proxy_t *proxy, json11::Json cfg); }; diff --git a/src/nfs_kv_read.cpp b/src/nfs_kv_read.cpp index a6be72f3..7937d0ba 100644 --- a/src/nfs_kv_read.cpp +++ b/src/nfs_kv_read.cpp @@ -18,6 +18,7 @@ struct nfs_kv_read_state std::function cb; // state int res = 0; + int eof = 0; json11::Json ientry; uint64_t aligned_size = 0, aligned_offset = 0; uint8_t *aligned_buf = NULL; @@ -25,6 +26,9 @@ struct nfs_kv_read_state uint8_t *buf = NULL; }; +#define align_down(size) ((size) & ~(st->self->parent->kvfs->pool_alignment-1)) +#define align_up(size) (((size) + st->self->parent->kvfs->pool_alignment-1) & ~(st->self->parent->kvfs->pool_alignment-1)) + static void nfs_kv_continue_read(nfs_kv_read_state *st, int state) { if (state == 0) {} @@ -54,21 +58,44 @@ resume_1: } if (st->ientry["shared_ino"].uint64_value() != 0) { - st->aligned_size = align_shared_size(st->self, st->offset+st->size); - st->aligned_buf = (uint8_t*)malloc_or_die(st->aligned_size); - st->buf = st->aligned_buf + sizeof(shared_file_header_t) + st->offset; - st->op = new cluster_op_t; - st->op->opcode = OSD_OP_READ; - st->op->inode = st->self->parent->kvfs->fs_base_inode + st->ientry["shared_ino"].uint64_value(); - st->op->offset = st->ientry["shared_offset"].uint64_value(); - if (st->offset+st->size > st->ientry["size"].uint64_value()) + if (st->offset >= st->ientry["size"].uint64_value()) { - st->op->len = align_shared_size(st->self, st->ientry["size"].uint64_value()); - memset(st->aligned_buf+st->op->len, 0, st->aligned_size-st->op->len); + st->size = 0; + st->eof = 1; + auto cb = std::move(st->cb); + cb(0); + return; + } + st->op = new cluster_op_t; + { + st->op->opcode = OSD_OP_READ; + st->op->inode = st->self->parent->kvfs->fs_base_inode + st->ientry["shared_ino"].uint64_value(); + // Always read including header to react if the file was possibly moved away + auto read_offset = st->ientry["shared_offset"].uint64_value(); + st->op->offset = align_down(read_offset); + if (st->op->offset < read_offset) + { + st->op->iov.push_back(st->self->parent->kvfs->scrap_block.data(), + read_offset-st->op->offset); + } + auto read_size = st->offset+st->size; + if (read_size > st->ientry["size"].uint64_value()) + { + st->eof = 1; + st->size = st->ientry["size"].uint64_value()-st->offset; + read_size = st->ientry["size"].uint64_value(); + } + read_size += sizeof(shared_file_header_t); + st->aligned_buf = (uint8_t*)malloc_or_die(read_size); + st->buf = st->aligned_buf + sizeof(shared_file_header_t) + st->offset; + st->op->iov.push_back(st->aligned_buf, read_size); + st->op->len = align_up(read_offset+read_size) - st->op->offset; + if (read_offset+read_size < st->op->offset+st->op->len) + { + st->op->iov.push_back(st->self->parent->kvfs->scrap_block.data(), + st->op->offset+st->op->len - (read_offset+read_size)); + } } - else - st->op->len = st->aligned_size; - st->op->iov.push_back(st->aligned_buf, st->op->len); st->op->callback = [st, state](cluster_op_t *op) { st->res = op->retval == op->len ? 0 : op->retval; @@ -99,9 +126,8 @@ resume_2: return; } } - st->aligned_offset = (st->offset & ~(st->self->parent->kvfs->pool_alignment-1)); - st->aligned_size = ((st->offset + st->size + st->self->parent->kvfs->pool_alignment-1) & - ~(st->self->parent->kvfs->pool_alignment-1)) - st->aligned_offset; + st->aligned_offset = align_down(st->offset); + st->aligned_size = align_up(st->offset+st->size) - st->aligned_offset; st->aligned_buf = (uint8_t*)malloc_or_die(st->aligned_size); st->buf = st->aligned_buf + st->offset - st->aligned_offset; st->op = new cluster_op_t; @@ -151,7 +177,7 @@ int kv_nfs3_read_proc(void *opaque, rpc_op_t *rop) reply->resok.data.data = (char*)st->buf; reply->resok.data.size = st->size; reply->resok.count = st->size; - reply->resok.eof = 0; + reply->resok.eof = st->eof; } rpc_queue_reply(st->rop); delete st; diff --git a/src/nfs_kv_write.cpp b/src/nfs_kv_write.cpp index 1d3485bd..5e53b7ea 100644 --- a/src/nfs_kv_write.cpp +++ b/src/nfs_kv_write.cpp @@ -41,7 +41,8 @@ struct nfs_kv_write_state uint64_t new_size = 0; uint64_t aligned_size = 0; uint8_t *aligned_buf = NULL; - uint64_t shared_inode = 0, shared_offset = 0; + // new shared parameters + uint64_t shared_inode = 0, shared_offset = 0, shared_alloc = 0; bool was_immediate = false; nfs_rmw_t rmw[2]; shared_file_header_t shdr; @@ -57,30 +58,51 @@ struct nfs_kv_write_state } }; +#define align_down(size) ((size) & ~(st->self->parent->kvfs->pool_alignment-1)) +#define align_up(size) (((size) + st->self->parent->kvfs->pool_alignment-1) & ~(st->self->parent->kvfs->pool_alignment-1)) + static void nfs_kv_continue_write(nfs_kv_write_state *st, int state); +static void allocate_shared_space(nfs_kv_write_state *st) +{ + auto kvfs = st->self->parent->kvfs; + st->shared_inode = kvfs->cur_shared_inode; + if (st->new_size < 3*kvfs->pool_alignment - sizeof(shared_file_header_t)) + { + // Allocate as is, without alignment if file is smaller than 3*4kb - 24 + st->shared_offset = kvfs->cur_shared_offset; + st->shared_alloc = align_up(sizeof(shared_file_header_t) + st->new_size); + } + else + { + // Try to skip some space to store data aligned + st->shared_offset = align_up(kvfs->cur_shared_offset + sizeof(shared_file_header_t)) - sizeof(shared_file_header_t); + st->shared_alloc = sizeof(shared_file_header_t) + align_up(st->new_size); + } + st->self->parent->kvfs->cur_shared_offset = st->shared_offset + st->shared_alloc; +} + static void finish_allocate_shared(nfs_client_t *self, int res) { std::vector waiting; waiting.swap(self->parent->kvfs->allocating_shared); for (auto & w: waiting) { - w.st->res = res; + auto st = w.st; + st->res = res; if (res == 0) { - w.st->shared_inode = self->parent->kvfs->cur_shared_inode; - w.st->shared_offset = self->parent->kvfs->cur_shared_offset; - self->parent->kvfs->cur_shared_offset += (w.size + self->parent->kvfs->pool_alignment-1) & ~(self->parent->kvfs->pool_alignment-1); + allocate_shared_space(st); } nfs_kv_continue_write(w.st, w.state); } } -static void allocate_shared_inode(nfs_kv_write_state *st, int state, uint64_t size) +static void allocate_shared_inode(nfs_kv_write_state *st, int state) { if (st->self->parent->kvfs->cur_shared_inode == 0) { - st->self->parent->kvfs->allocating_shared.push_back({ st, state, size }); + st->self->parent->kvfs->allocating_shared.push_back({ st, state }); if (st->self->parent->kvfs->allocating_shared.size() > 1) { return; @@ -114,19 +136,11 @@ static void allocate_shared_inode(nfs_kv_write_state *st, int state, uint64_t si else { st->res = 0; - st->shared_inode = st->self->parent->kvfs->cur_shared_inode; - st->shared_offset = st->self->parent->kvfs->cur_shared_offset; - st->self->parent->kvfs->cur_shared_offset += (size + st->self->parent->kvfs->pool_alignment-1) & ~(st->self->parent->kvfs->pool_alignment-1); + allocate_shared_space(st); nfs_kv_continue_write(st, state); } } -uint64_t align_shared_size(nfs_client_t *self, uint64_t size) -{ - return (size + sizeof(shared_file_header_t) + self->parent->kvfs->pool_alignment-1) - & ~(self->parent->kvfs->pool_alignment-1); -} - static void nfs_do_write(uint64_t ino, uint64_t offset, uint64_t size, std::function prepare, nfs_kv_write_state *st, int state) { auto op = new cluster_op_t; @@ -154,11 +168,13 @@ static void nfs_do_write(uint64_t ino, uint64_t offset, uint64_t size, std::func static void nfs_do_unshare_write(nfs_kv_write_state *st, int state) { - uint64_t unshare_size = (st->ientry["size"].uint64_value() + st->self->parent->kvfs->pool_alignment-1) - & ~(st->self->parent->kvfs->pool_alignment-1); - nfs_do_write(st->ino, 0, unshare_size, [&](cluster_op_t *op) + uint64_t size = st->ientry["size"].uint64_value(); + uint64_t aligned_size = align_up(size); + nfs_do_write(st->ino, 0, aligned_size, [&](cluster_op_t *op) { - op->iov.push_back(st->aligned_buf + sizeof(shared_file_header_t), unshare_size); + op->iov.push_back(st->aligned_buf, size); + if (aligned_size > size) + op->iov.push_back(st->self->parent->kvfs->zero_block.data(), aligned_size-size); }, st, state); } @@ -259,17 +275,48 @@ static void nfs_do_rmw(nfs_rmw_t *rmw) static void nfs_do_shared_read(nfs_kv_write_state *st, int state) { + uint64_t data_size = st->ientry["size"].uint64_value(); + if (!data_size) + { + nfs_kv_continue_write(st, state); + return; + } + assert(!st->aligned_buf); + st->aligned_buf = (uint8_t*)malloc_or_die(data_size); + uint64_t shared_offset = st->ientry["shared_offset"].uint64_value(); auto op = new cluster_op_t; op->opcode = OSD_OP_READ; op->inode = st->self->parent->kvfs->fs_base_inode + st->ientry["shared_ino"].uint64_value(); - op->offset = st->ientry["shared_offset"].uint64_value(); - op->len = align_shared_size(st->self, st->ientry["size"].uint64_value()); - op->iov.push_back(st->aligned_buf, op->len); + op->offset = align_down(shared_offset); + // Allow unaligned shared reads + auto pre = shared_offset-align_down(shared_offset); + if (pre > 0) + { + op->iov.push_back(st->self->parent->kvfs->scrap_block.data(), pre); + } + op->iov.push_back(&st->shdr, sizeof(shared_file_header_t)); + op->iov.push_back(st->aligned_buf, data_size); + auto post = (shared_offset+sizeof(shared_file_header_t)+data_size); + post = align_up(post) - post; + if (post > 0) + { + op->iov.push_back(st->self->parent->kvfs->scrap_block.data(), post); + } + op->len = pre+sizeof(shared_file_header_t)+data_size+post; op->callback = [st, state](cluster_op_t *op) { st->res = op->retval == op->len ? 0 : op->retval; delete op; - nfs_kv_continue_write(st, state); + if (st->shdr.magic != SHARED_FILE_MAGIC_V1 || st->shdr.inode != st->ino) + { + // Got unrelated data - retry from the beginning + st->allow_cache = false; + free(st->aligned_buf); + st->aligned_buf = NULL; + nfs_kv_continue_write(st, 0); + } + else + nfs_kv_continue_write(st, state); }; st->self->parent->cli->execute(op); } @@ -290,64 +337,97 @@ static void nfs_do_fsync(nfs_kv_write_state *st, int state) static bool nfs_do_shared_readmodify(nfs_kv_write_state *st, int base_state, int state, bool unshare) { assert(state <= base_state); - if (state < base_state) {} - else if (state == base_state) goto resume_0; - assert(!st->aligned_buf); - st->aligned_size = unshare - ? sizeof(shared_file_header_t) + ((st->new_size + st->self->parent->kvfs->pool_alignment-1) & ~(st->self->parent->kvfs->pool_alignment-1)) - : align_shared_size(st->self, st->new_size); - st->aligned_buf = (uint8_t*)malloc_or_die(st->aligned_size); - // FIXME do not allocate zeroes if we only need zeroes - memset(st->aligned_buf + sizeof(shared_file_header_t), 0, st->offset); - memset(st->aligned_buf + sizeof(shared_file_header_t) + st->offset + st->size, 0, - st->aligned_size - sizeof(shared_file_header_t) - st->offset - st->size); + if (state < base_state) goto resume_0; + else if (state == base_state) goto resume_1; +resume_0: if (st->ientry["shared_ino"].uint64_value() != 0 && - st->ientry["size"].uint64_value() != 0) + st->ientry["size"].uint64_value() != 0 && + (st->offset > 0 || (st->offset+st->size) < st->new_size)) { - // Read old data if shared non-empty + // Read old data if shared non-empty and not fully overwritten nfs_do_shared_read(st, base_state); return false; -resume_0: +resume_1: if (st->res < 0) { auto cb = std::move(st->cb); cb(st->res); return false; } - auto hdr = ((shared_file_header_t*)st->aligned_buf); - if (hdr->magic != SHARED_FILE_MAGIC_V1 || hdr->inode != st->ino) - { - // Got unrelated data - retry from the beginning - st->allow_cache = false; - free(st->aligned_buf); - st->aligned_buf = NULL; - nfs_kv_continue_write(st, 0); - return false; - } } - // FIXME put shared_file_header_t after data to not break alignment - *((shared_file_header_t*)st->aligned_buf) = { - .magic = SHARED_FILE_MAGIC_V1, - .inode = st->ino, - .alloc = st->aligned_size, - }; return true; } -static void nfs_do_shared_write(nfs_kv_write_state *st, int state, bool only_aligned) +static void add_zero(cluster_op_t *op, uint64_t count, std::vector & zero_buf) { - nfs_do_write(st->shared_inode, st->shared_offset, st->aligned_size, [&](cluster_op_t *op) + while (count > zero_buf.size()) { - if (only_aligned) - op->iov.push_back(st->aligned_buf, st->aligned_size); - else + op->iov.push_back(zero_buf.data(), zero_buf.size()); + count -= zero_buf.size(); + } + if (count > 0) + { + op->iov.push_back(zero_buf.data(), count); + count = 0; + } +} + +static void nfs_do_shared_write(nfs_kv_write_state *st, int state) +{ + st->shdr = { + .magic = SHARED_FILE_MAGIC_V1, + .inode = st->ino, + .alloc = st->shared_alloc, + }; + bool unaligned_is_free = true; + uint64_t write_offset = st->shared_offset; + uint64_t write_size = sizeof(shared_file_header_t) + st->new_size; + uint64_t aligned_offset = write_offset; + uint64_t aligned_size = write_size; + if (unaligned_is_free) + { + // Zero pad if "unaligned but aligned" + aligned_offset = align_down(write_offset); + aligned_size = align_up(write_offset+write_size) - aligned_offset; + } + // FIXME: Do RMW if unaligned_is_free == false i.e. if we want tighter packing + bool has_old = st->ientry["shared_ino"].uint64_value() != 0 && + st->ientry["size"].uint64_value() != 0; + nfs_do_write(st->shared_inode, aligned_offset, aligned_size, [&](cluster_op_t *op) + { + if (unaligned_is_free && aligned_offset < write_offset) { - op->iov.push_back(st->aligned_buf, sizeof(shared_file_header_t) + st->offset); - op->iov.push_back(st->buf, st->size); - op->iov.push_back( - st->aligned_buf + sizeof(shared_file_header_t) + st->offset + st->size, - st->aligned_size - (sizeof(shared_file_header_t) + st->offset + st->size) - ); + // zero padding + op->iov.push_back(st->self->parent->kvfs->zero_block.data(), write_offset-aligned_offset); + } + // header + op->iov.push_back(&st->shdr, sizeof(shared_file_header_t)); + if (st->offset > 0) + { + if (has_old) + { + // old data + op->iov.push_back(st->aligned_buf, st->offset); + } + else + add_zero(op, st->offset, st->self->parent->kvfs->zero_block); + } + // new data + op->iov.push_back(st->buf, st->size); + if (st->offset+st->size < st->new_size) + { + if (has_old) + { + // old data + op->iov.push_back(st->aligned_buf+st->offset+st->size, st->new_size-(st->offset+st->size)); + } + else + add_zero(op, st->offset, st->self->parent->kvfs->zero_block); + } + if (unaligned_is_free && (aligned_size+aligned_offset) > (write_size+write_offset)) + { + // zero padding + op->iov.push_back(st->self->parent->kvfs->zero_block.data(), aligned_size+aligned_offset - (write_size+write_offset)); } }, st, state); } @@ -467,7 +547,7 @@ static std::string new_moved_ientry(nfs_kv_write_state *st) ni.erase("empty"); ni["shared_ino"] = st->shared_inode; ni["shared_offset"] = st->shared_offset; - ni["shared_alloc"] = st->aligned_size; + ni["shared_alloc"] = st->shared_alloc; ni.erase("shared_ver"); ni["size"] = st->new_size; return json11::Json(ni).dump(); @@ -632,7 +712,6 @@ static void nfs_kv_continue_write(nfs_kv_write_state *st, int state) else if (state == 3) goto resume_3; else if (state == 4) goto resume_4; else if (state == 5) goto resume_5; - else if (state == 6) goto resume_6; else if (state == 7) goto resume_7; else if (state == 8) goto resume_8; else if (state == 9) goto resume_9; @@ -686,7 +765,7 @@ resume_1: st->ientry["shared_alloc"].uint64_value() < sizeof(shared_file_header_t)+st->offset+st->size) { // Either empty, or shared and requires moving into a larger place (redirect-write) - allocate_shared_inode(st, 2, align_shared_size(st->self, st->new_size)); + allocate_shared_inode(st, 2); return; resume_2: if (st->res < 0) @@ -697,10 +776,17 @@ resume_2: } resume_3: if (!nfs_do_shared_readmodify(st, 3, state, false)) + { return; - nfs_do_shared_write(st, 4, false); + } + nfs_do_shared_write(st, 4); return; resume_4: + if (st->aligned_buf) + { + free(st->aligned_buf); + st->aligned_buf = NULL; + } if (st->res < 0) { auto cb = std::move(st->cb); @@ -720,12 +806,6 @@ resume_5: if (st->res < 0) { st->res2 = st->res; - memset(st->aligned_buf, 0, st->aligned_size); - nfs_do_shared_write(st, 6, true); - return; -resume_6: - free(st->aligned_buf); - st->aligned_buf = NULL; if (st->res2 == -EAGAIN) { goto resume_0; @@ -781,9 +861,6 @@ resume_9: { if (st->ientry["size"].uint64_value() != 0) { - assert(!st->aligned_buf); - st->aligned_size = align_shared_size(st->self, st->ientry["size"].uint64_value()); - st->aligned_buf = (uint8_t*)malloc_or_die(st->aligned_size); nfs_do_shared_read(st, 10); return; resume_10: