Support unaligned shared_offsets, align shared file data instead of header

master
Vitaliy Filippov 2024-03-09 02:16:42 +03:00
parent f600ce98e2
commit e871de27de
4 changed files with 201 additions and 97 deletions

View File

@ -257,5 +257,6 @@ void kv_fs_state_t::init(nfs_proxy_t *proxy, json11::Json cfg)
{
shared_inode_threshold = cfg["shared_inode_threshold"].uint64_value();
}
zero_block.resize(pool_block_size);
zero_block.resize(pool_block_size < 1048576 ? 1048576 : pool_block_size);
scrap_block.resize(pool_block_size < 1048576 ? 1048576 : pool_block_size);
}

View File

@ -33,7 +33,6 @@ struct shared_alloc_queue_t
{
nfs_kv_write_state *st;
int state;
uint64_t size;
};
struct kv_inode_extend_t
@ -60,6 +59,7 @@ struct kv_fs_state_t
uint64_t cur_shared_inode = 0, cur_shared_offset = 0;
std::map<inode_t, kv_inode_extend_t> extends;
std::vector<uint8_t> zero_block;
std::vector<uint8_t> scrap_block;
void init(nfs_proxy_t *proxy, json11::Json cfg);
};

View File

@ -18,6 +18,7 @@ struct nfs_kv_read_state
std::function<void(int)> cb;
// state
int res = 0;
int eof = 0;
json11::Json ientry;
uint64_t aligned_size = 0, aligned_offset = 0;
uint8_t *aligned_buf = NULL;
@ -25,6 +26,9 @@ struct nfs_kv_read_state
uint8_t *buf = NULL;
};
#define align_down(size) ((size) & ~(st->self->parent->kvfs->pool_alignment-1))
#define align_up(size) (((size) + st->self->parent->kvfs->pool_alignment-1) & ~(st->self->parent->kvfs->pool_alignment-1))
static void nfs_kv_continue_read(nfs_kv_read_state *st, int state)
{
if (state == 0) {}
@ -54,21 +58,44 @@ resume_1:
}
if (st->ientry["shared_ino"].uint64_value() != 0)
{
st->aligned_size = align_shared_size(st->self, st->offset+st->size);
st->aligned_buf = (uint8_t*)malloc_or_die(st->aligned_size);
st->buf = st->aligned_buf + sizeof(shared_file_header_t) + st->offset;
st->op = new cluster_op_t;
st->op->opcode = OSD_OP_READ;
st->op->inode = st->self->parent->kvfs->fs_base_inode + st->ientry["shared_ino"].uint64_value();
st->op->offset = st->ientry["shared_offset"].uint64_value();
if (st->offset+st->size > st->ientry["size"].uint64_value())
if (st->offset >= st->ientry["size"].uint64_value())
{
st->op->len = align_shared_size(st->self, st->ientry["size"].uint64_value());
memset(st->aligned_buf+st->op->len, 0, st->aligned_size-st->op->len);
st->size = 0;
st->eof = 1;
auto cb = std::move(st->cb);
cb(0);
return;
}
st->op = new cluster_op_t;
{
st->op->opcode = OSD_OP_READ;
st->op->inode = st->self->parent->kvfs->fs_base_inode + st->ientry["shared_ino"].uint64_value();
// Always read including header to react if the file was possibly moved away
auto read_offset = st->ientry["shared_offset"].uint64_value();
st->op->offset = align_down(read_offset);
if (st->op->offset < read_offset)
{
st->op->iov.push_back(st->self->parent->kvfs->scrap_block.data(),
read_offset-st->op->offset);
}
auto read_size = st->offset+st->size;
if (read_size > st->ientry["size"].uint64_value())
{
st->eof = 1;
st->size = st->ientry["size"].uint64_value()-st->offset;
read_size = st->ientry["size"].uint64_value();
}
read_size += sizeof(shared_file_header_t);
st->aligned_buf = (uint8_t*)malloc_or_die(read_size);
st->buf = st->aligned_buf + sizeof(shared_file_header_t) + st->offset;
st->op->iov.push_back(st->aligned_buf, read_size);
st->op->len = align_up(read_offset+read_size) - st->op->offset;
if (read_offset+read_size < st->op->offset+st->op->len)
{
st->op->iov.push_back(st->self->parent->kvfs->scrap_block.data(),
st->op->offset+st->op->len - (read_offset+read_size));
}
}
else
st->op->len = st->aligned_size;
st->op->iov.push_back(st->aligned_buf, st->op->len);
st->op->callback = [st, state](cluster_op_t *op)
{
st->res = op->retval == op->len ? 0 : op->retval;
@ -99,9 +126,8 @@ resume_2:
return;
}
}
st->aligned_offset = (st->offset & ~(st->self->parent->kvfs->pool_alignment-1));
st->aligned_size = ((st->offset + st->size + st->self->parent->kvfs->pool_alignment-1) &
~(st->self->parent->kvfs->pool_alignment-1)) - st->aligned_offset;
st->aligned_offset = align_down(st->offset);
st->aligned_size = align_up(st->offset+st->size) - st->aligned_offset;
st->aligned_buf = (uint8_t*)malloc_or_die(st->aligned_size);
st->buf = st->aligned_buf + st->offset - st->aligned_offset;
st->op = new cluster_op_t;
@ -151,7 +177,7 @@ int kv_nfs3_read_proc(void *opaque, rpc_op_t *rop)
reply->resok.data.data = (char*)st->buf;
reply->resok.data.size = st->size;
reply->resok.count = st->size;
reply->resok.eof = 0;
reply->resok.eof = st->eof;
}
rpc_queue_reply(st->rop);
delete st;

View File

@ -41,7 +41,8 @@ struct nfs_kv_write_state
uint64_t new_size = 0;
uint64_t aligned_size = 0;
uint8_t *aligned_buf = NULL;
uint64_t shared_inode = 0, shared_offset = 0;
// new shared parameters
uint64_t shared_inode = 0, shared_offset = 0, shared_alloc = 0;
bool was_immediate = false;
nfs_rmw_t rmw[2];
shared_file_header_t shdr;
@ -57,30 +58,51 @@ struct nfs_kv_write_state
}
};
#define align_down(size) ((size) & ~(st->self->parent->kvfs->pool_alignment-1))
#define align_up(size) (((size) + st->self->parent->kvfs->pool_alignment-1) & ~(st->self->parent->kvfs->pool_alignment-1))
static void nfs_kv_continue_write(nfs_kv_write_state *st, int state);
static void allocate_shared_space(nfs_kv_write_state *st)
{
auto kvfs = st->self->parent->kvfs;
st->shared_inode = kvfs->cur_shared_inode;
if (st->new_size < 3*kvfs->pool_alignment - sizeof(shared_file_header_t))
{
// Allocate as is, without alignment if file is smaller than 3*4kb - 24
st->shared_offset = kvfs->cur_shared_offset;
st->shared_alloc = align_up(sizeof(shared_file_header_t) + st->new_size);
}
else
{
// Try to skip some space to store data aligned
st->shared_offset = align_up(kvfs->cur_shared_offset + sizeof(shared_file_header_t)) - sizeof(shared_file_header_t);
st->shared_alloc = sizeof(shared_file_header_t) + align_up(st->new_size);
}
st->self->parent->kvfs->cur_shared_offset = st->shared_offset + st->shared_alloc;
}
static void finish_allocate_shared(nfs_client_t *self, int res)
{
std::vector<shared_alloc_queue_t> waiting;
waiting.swap(self->parent->kvfs->allocating_shared);
for (auto & w: waiting)
{
w.st->res = res;
auto st = w.st;
st->res = res;
if (res == 0)
{
w.st->shared_inode = self->parent->kvfs->cur_shared_inode;
w.st->shared_offset = self->parent->kvfs->cur_shared_offset;
self->parent->kvfs->cur_shared_offset += (w.size + self->parent->kvfs->pool_alignment-1) & ~(self->parent->kvfs->pool_alignment-1);
allocate_shared_space(st);
}
nfs_kv_continue_write(w.st, w.state);
}
}
static void allocate_shared_inode(nfs_kv_write_state *st, int state, uint64_t size)
static void allocate_shared_inode(nfs_kv_write_state *st, int state)
{
if (st->self->parent->kvfs->cur_shared_inode == 0)
{
st->self->parent->kvfs->allocating_shared.push_back({ st, state, size });
st->self->parent->kvfs->allocating_shared.push_back({ st, state });
if (st->self->parent->kvfs->allocating_shared.size() > 1)
{
return;
@ -114,19 +136,11 @@ static void allocate_shared_inode(nfs_kv_write_state *st, int state, uint64_t si
else
{
st->res = 0;
st->shared_inode = st->self->parent->kvfs->cur_shared_inode;
st->shared_offset = st->self->parent->kvfs->cur_shared_offset;
st->self->parent->kvfs->cur_shared_offset += (size + st->self->parent->kvfs->pool_alignment-1) & ~(st->self->parent->kvfs->pool_alignment-1);
allocate_shared_space(st);
nfs_kv_continue_write(st, state);
}
}
uint64_t align_shared_size(nfs_client_t *self, uint64_t size)
{
return (size + sizeof(shared_file_header_t) + self->parent->kvfs->pool_alignment-1)
& ~(self->parent->kvfs->pool_alignment-1);
}
static void nfs_do_write(uint64_t ino, uint64_t offset, uint64_t size, std::function<void(cluster_op_t *op)> prepare, nfs_kv_write_state *st, int state)
{
auto op = new cluster_op_t;
@ -154,11 +168,13 @@ static void nfs_do_write(uint64_t ino, uint64_t offset, uint64_t size, std::func
static void nfs_do_unshare_write(nfs_kv_write_state *st, int state)
{
uint64_t unshare_size = (st->ientry["size"].uint64_value() + st->self->parent->kvfs->pool_alignment-1)
& ~(st->self->parent->kvfs->pool_alignment-1);
nfs_do_write(st->ino, 0, unshare_size, [&](cluster_op_t *op)
uint64_t size = st->ientry["size"].uint64_value();
uint64_t aligned_size = align_up(size);
nfs_do_write(st->ino, 0, aligned_size, [&](cluster_op_t *op)
{
op->iov.push_back(st->aligned_buf + sizeof(shared_file_header_t), unshare_size);
op->iov.push_back(st->aligned_buf, size);
if (aligned_size > size)
op->iov.push_back(st->self->parent->kvfs->zero_block.data(), aligned_size-size);
}, st, state);
}
@ -259,17 +275,48 @@ static void nfs_do_rmw(nfs_rmw_t *rmw)
static void nfs_do_shared_read(nfs_kv_write_state *st, int state)
{
uint64_t data_size = st->ientry["size"].uint64_value();
if (!data_size)
{
nfs_kv_continue_write(st, state);
return;
}
assert(!st->aligned_buf);
st->aligned_buf = (uint8_t*)malloc_or_die(data_size);
uint64_t shared_offset = st->ientry["shared_offset"].uint64_value();
auto op = new cluster_op_t;
op->opcode = OSD_OP_READ;
op->inode = st->self->parent->kvfs->fs_base_inode + st->ientry["shared_ino"].uint64_value();
op->offset = st->ientry["shared_offset"].uint64_value();
op->len = align_shared_size(st->self, st->ientry["size"].uint64_value());
op->iov.push_back(st->aligned_buf, op->len);
op->offset = align_down(shared_offset);
// Allow unaligned shared reads
auto pre = shared_offset-align_down(shared_offset);
if (pre > 0)
{
op->iov.push_back(st->self->parent->kvfs->scrap_block.data(), pre);
}
op->iov.push_back(&st->shdr, sizeof(shared_file_header_t));
op->iov.push_back(st->aligned_buf, data_size);
auto post = (shared_offset+sizeof(shared_file_header_t)+data_size);
post = align_up(post) - post;
if (post > 0)
{
op->iov.push_back(st->self->parent->kvfs->scrap_block.data(), post);
}
op->len = pre+sizeof(shared_file_header_t)+data_size+post;
op->callback = [st, state](cluster_op_t *op)
{
st->res = op->retval == op->len ? 0 : op->retval;
delete op;
nfs_kv_continue_write(st, state);
if (st->shdr.magic != SHARED_FILE_MAGIC_V1 || st->shdr.inode != st->ino)
{
// Got unrelated data - retry from the beginning
st->allow_cache = false;
free(st->aligned_buf);
st->aligned_buf = NULL;
nfs_kv_continue_write(st, 0);
}
else
nfs_kv_continue_write(st, state);
};
st->self->parent->cli->execute(op);
}
@ -290,64 +337,97 @@ static void nfs_do_fsync(nfs_kv_write_state *st, int state)
static bool nfs_do_shared_readmodify(nfs_kv_write_state *st, int base_state, int state, bool unshare)
{
assert(state <= base_state);
if (state < base_state) {}
else if (state == base_state) goto resume_0;
assert(!st->aligned_buf);
st->aligned_size = unshare
? sizeof(shared_file_header_t) + ((st->new_size + st->self->parent->kvfs->pool_alignment-1) & ~(st->self->parent->kvfs->pool_alignment-1))
: align_shared_size(st->self, st->new_size);
st->aligned_buf = (uint8_t*)malloc_or_die(st->aligned_size);
// FIXME do not allocate zeroes if we only need zeroes
memset(st->aligned_buf + sizeof(shared_file_header_t), 0, st->offset);
memset(st->aligned_buf + sizeof(shared_file_header_t) + st->offset + st->size, 0,
st->aligned_size - sizeof(shared_file_header_t) - st->offset - st->size);
if (state < base_state) goto resume_0;
else if (state == base_state) goto resume_1;
resume_0:
if (st->ientry["shared_ino"].uint64_value() != 0 &&
st->ientry["size"].uint64_value() != 0)
st->ientry["size"].uint64_value() != 0 &&
(st->offset > 0 || (st->offset+st->size) < st->new_size))
{
// Read old data if shared non-empty
// Read old data if shared non-empty and not fully overwritten
nfs_do_shared_read(st, base_state);
return false;
resume_0:
resume_1:
if (st->res < 0)
{
auto cb = std::move(st->cb);
cb(st->res);
return false;
}
auto hdr = ((shared_file_header_t*)st->aligned_buf);
if (hdr->magic != SHARED_FILE_MAGIC_V1 || hdr->inode != st->ino)
{
// Got unrelated data - retry from the beginning
st->allow_cache = false;
free(st->aligned_buf);
st->aligned_buf = NULL;
nfs_kv_continue_write(st, 0);
return false;
}
}
// FIXME put shared_file_header_t after data to not break alignment
*((shared_file_header_t*)st->aligned_buf) = {
.magic = SHARED_FILE_MAGIC_V1,
.inode = st->ino,
.alloc = st->aligned_size,
};
return true;
}
static void nfs_do_shared_write(nfs_kv_write_state *st, int state, bool only_aligned)
static void add_zero(cluster_op_t *op, uint64_t count, std::vector<uint8_t> & zero_buf)
{
nfs_do_write(st->shared_inode, st->shared_offset, st->aligned_size, [&](cluster_op_t *op)
while (count > zero_buf.size())
{
if (only_aligned)
op->iov.push_back(st->aligned_buf, st->aligned_size);
else
op->iov.push_back(zero_buf.data(), zero_buf.size());
count -= zero_buf.size();
}
if (count > 0)
{
op->iov.push_back(zero_buf.data(), count);
count = 0;
}
}
static void nfs_do_shared_write(nfs_kv_write_state *st, int state)
{
st->shdr = {
.magic = SHARED_FILE_MAGIC_V1,
.inode = st->ino,
.alloc = st->shared_alloc,
};
bool unaligned_is_free = true;
uint64_t write_offset = st->shared_offset;
uint64_t write_size = sizeof(shared_file_header_t) + st->new_size;
uint64_t aligned_offset = write_offset;
uint64_t aligned_size = write_size;
if (unaligned_is_free)
{
// Zero pad if "unaligned but aligned"
aligned_offset = align_down(write_offset);
aligned_size = align_up(write_offset+write_size) - aligned_offset;
}
// FIXME: Do RMW if unaligned_is_free == false i.e. if we want tighter packing
bool has_old = st->ientry["shared_ino"].uint64_value() != 0 &&
st->ientry["size"].uint64_value() != 0;
nfs_do_write(st->shared_inode, aligned_offset, aligned_size, [&](cluster_op_t *op)
{
if (unaligned_is_free && aligned_offset < write_offset)
{
op->iov.push_back(st->aligned_buf, sizeof(shared_file_header_t) + st->offset);
op->iov.push_back(st->buf, st->size);
op->iov.push_back(
st->aligned_buf + sizeof(shared_file_header_t) + st->offset + st->size,
st->aligned_size - (sizeof(shared_file_header_t) + st->offset + st->size)
);
// zero padding
op->iov.push_back(st->self->parent->kvfs->zero_block.data(), write_offset-aligned_offset);
}
// header
op->iov.push_back(&st->shdr, sizeof(shared_file_header_t));
if (st->offset > 0)
{
if (has_old)
{
// old data
op->iov.push_back(st->aligned_buf, st->offset);
}
else
add_zero(op, st->offset, st->self->parent->kvfs->zero_block);
}
// new data
op->iov.push_back(st->buf, st->size);
if (st->offset+st->size < st->new_size)
{
if (has_old)
{
// old data
op->iov.push_back(st->aligned_buf+st->offset+st->size, st->new_size-(st->offset+st->size));
}
else
add_zero(op, st->offset, st->self->parent->kvfs->zero_block);
}
if (unaligned_is_free && (aligned_size+aligned_offset) > (write_size+write_offset))
{
// zero padding
op->iov.push_back(st->self->parent->kvfs->zero_block.data(), aligned_size+aligned_offset - (write_size+write_offset));
}
}, st, state);
}
@ -467,7 +547,7 @@ static std::string new_moved_ientry(nfs_kv_write_state *st)
ni.erase("empty");
ni["shared_ino"] = st->shared_inode;
ni["shared_offset"] = st->shared_offset;
ni["shared_alloc"] = st->aligned_size;
ni["shared_alloc"] = st->shared_alloc;
ni.erase("shared_ver");
ni["size"] = st->new_size;
return json11::Json(ni).dump();
@ -632,7 +712,6 @@ static void nfs_kv_continue_write(nfs_kv_write_state *st, int state)
else if (state == 3) goto resume_3;
else if (state == 4) goto resume_4;
else if (state == 5) goto resume_5;
else if (state == 6) goto resume_6;
else if (state == 7) goto resume_7;
else if (state == 8) goto resume_8;
else if (state == 9) goto resume_9;
@ -686,7 +765,7 @@ resume_1:
st->ientry["shared_alloc"].uint64_value() < sizeof(shared_file_header_t)+st->offset+st->size)
{
// Either empty, or shared and requires moving into a larger place (redirect-write)
allocate_shared_inode(st, 2, align_shared_size(st->self, st->new_size));
allocate_shared_inode(st, 2);
return;
resume_2:
if (st->res < 0)
@ -697,10 +776,17 @@ resume_2:
}
resume_3:
if (!nfs_do_shared_readmodify(st, 3, state, false))
{
return;
nfs_do_shared_write(st, 4, false);
}
nfs_do_shared_write(st, 4);
return;
resume_4:
if (st->aligned_buf)
{
free(st->aligned_buf);
st->aligned_buf = NULL;
}
if (st->res < 0)
{
auto cb = std::move(st->cb);
@ -720,12 +806,6 @@ resume_5:
if (st->res < 0)
{
st->res2 = st->res;
memset(st->aligned_buf, 0, st->aligned_size);
nfs_do_shared_write(st, 6, true);
return;
resume_6:
free(st->aligned_buf);
st->aligned_buf = NULL;
if (st->res2 == -EAGAIN)
{
goto resume_0;
@ -781,9 +861,6 @@ resume_9:
{
if (st->ientry["size"].uint64_value() != 0)
{
assert(!st->aligned_buf);
st->aligned_size = align_shared_size(st->self, st->ientry["size"].uint64_value());
st->aligned_buf = (uint8_t*)malloc_or_die(st->aligned_size);
nfs_do_shared_read(st, 10);
return;
resume_10: