Handle parallel NFS extending writes without imposing extra load on etcd

hotfix-1.0.0
Vitaliy Filippov 2023-07-27 01:32:59 +00:00
parent ab0ca7c00f
commit 700e0e9bff
2 changed files with 148 additions and 47 deletions

View File

@ -190,7 +190,15 @@ static int nfs3_setattr_proc(void *opaque, rpc_op_t *rop)
{
if (handle == "roothandle" || self->parent->dir_by_hash.find(handle) != self->parent->dir_by_hash.end())
{
*reply = (SETATTR3res){ .status = NFS3ERR_ISDIR };
if (args->new_attributes.size.set_it)
{
*reply = (SETATTR3res){ .status = NFS3ERR_ISDIR };
}
else
{
// Silently ignore mode, uid, gid, atime, mtime changes
*reply = (SETATTR3res){ .status = NFS3_OK };
}
}
else
{
@ -358,7 +366,6 @@ static int nfs3_read_proc(void *opaque, rpc_op_t *rop)
}
static void nfs_resize_write(nfs_client_t *self, rpc_op_t *rop, uint64_t inode, uint64_t new_size, uint64_t offset, uint64_t count, void *buf);
static void nfs_do_write(nfs_client_t *self, rpc_op_t *rop, uint64_t inode, uint64_t offset, uint64_t count, void *buf);
static int nfs3_write_proc(void *opaque, rpc_op_t *rop)
{
@ -435,43 +442,101 @@ static int nfs3_write_proc(void *opaque, rpc_op_t *rop)
return 1;
}
static void nfs_resize_write(nfs_client_t *self, rpc_op_t *rop, uint64_t inode, uint64_t new_size, uint64_t offset, uint64_t count, void *buf)
static void complete_extend_write(nfs_client_t *self, rpc_op_t *rop, inode_t inode, int res)
{
// Check if we have to resize the inode before writing
WRITE3args *args = (WRITE3args*)rop->request;
WRITE3res *reply = (WRITE3res*)rop->reply;
if (res < 0)
{
*reply = (WRITE3res){ .status = vitastor_nfs_map_err(res) };
rpc_queue_reply(rop);
return;
}
bool imm = self->parent->cli->get_immediate_commit(inode);
reply->resok.committed = args->stable != UNSTABLE || imm ? FILE_SYNC : UNSTABLE;
*(uint64_t*)reply->resok.verf = self->parent->server_id;
if (args->stable != UNSTABLE && !imm)
{
// Client requested a stable write. Add an fsync
auto op = new cluster_op_t;
op->opcode = OSD_OP_SYNC;
op->callback = [rop](cluster_op_t *op)
{
if (op->retval != 0)
{
WRITE3res *reply = (WRITE3res*)rop->reply;
*reply = (WRITE3res){ .status = vitastor_nfs_map_err(-op->retval) };
}
delete op;
rpc_queue_reply(rop);
};
self->parent->cli->execute(op);
}
else
{
rpc_queue_reply(rop);
}
}
static void complete_extend_inode(nfs_client_t *self, uint64_t inode, uint64_t new_size, int err)
{
auto ext_it = self->extend_writes.lower_bound((extend_size_t){ .inode = inode, .new_size = 0 });
while (ext_it != self->extend_writes.end() &&
ext_it->first.inode == inode &&
ext_it->first.new_size <= new_size)
{
ext_it->second.resize_res = err;
if (ext_it->second.write_res <= 0)
{
complete_extend_write(self, ext_it->second.rop, inode, ext_it->second.write_res < 0
? ext_it->second.write_res : ext_it->second.resize_res);
self->extend_writes.erase(ext_it++);
}
else
ext_it++;
}
}
static void extend_inode(nfs_client_t *self, uint64_t inode, uint64_t new_size)
{
// Send an extend request
auto & ext = self->extends[inode];
ext.cur_extend = new_size;
auto inode_it = self->parent->cli->st_cli.inode_config.find(inode);
if (inode_it != self->parent->cli->st_cli.inode_config.end() &&
inode_it->second.size < new_size)
{
self->parent->cmd->loop_and_wait(self->parent->cmd->start_modify(json11::Json::object {
// FIXME: Resizing by ID is probably more correct
{ "image", inode_it->second.name },
{ "resize", new_size },
{ "inc_size", true },
{ "force_size", true },
}), [=](const cli_result_t & r)
{
auto & ext = self->extends[inode];
if (r.err)
{
if (r.err == EAGAIN)
{
// Multiple concurrent resize requests received, try to repeat
nfs_resize_write(self, rop, inode, new_size, offset, count, buf);
return;
}
WRITE3res *reply = (WRITE3res*)rop->reply;
*reply = (WRITE3res){ .status = vitastor_nfs_map_err(r.err) };
rpc_queue_reply(rop);
fprintf(stderr, "Error extending inode %lu to %lu bytes: %s\n", inode, new_size, r.text.c_str());
}
if (r.err == EAGAIN || ext.next_extend > ext.cur_extend)
{
// Multiple concurrent resize requests received, try to repeat
extend_inode(self, inode, ext.next_extend > ext.cur_extend ? ext.next_extend : ext.cur_extend);
return;
}
nfs_do_write(self, rop, inode, offset, count, buf);
ext.cur_extend = ext.next_extend = 0;
complete_extend_inode(self, inode, new_size, r.err);
});
}
else
{
nfs_do_write(self, rop, inode, offset, count, buf);
complete_extend_inode(self, inode, new_size, 0);
}
}
static void nfs_do_write(nfs_client_t *self, rpc_op_t *rop, uint64_t inode, uint64_t offset, uint64_t count, void *buf)
static void nfs_do_write(nfs_client_t *self, std::multimap<extend_size_t, extend_write_t>::iterator ewr_it,
rpc_op_t *rop, uint64_t inode, uint64_t offset, uint64_t count, void *buf)
{
cluster_op_t *op = new cluster_op_t;
op->opcode = OSD_OP_WRITE;
@ -479,49 +544,61 @@ static void nfs_do_write(nfs_client_t *self, rpc_op_t *rop, uint64_t inode, uint
op->offset = offset;
op->len = count;
op->iov.push_back(buf, count);
op->callback = [self, rop](cluster_op_t *op)
op->callback = [self, ewr_it, rop](cluster_op_t *op)
{
uint64_t inode = op->inode;
WRITE3args *args = (WRITE3args*)rop->request;
WRITE3res *reply = (WRITE3res*)rop->reply;
if (op->retval != op->len)
auto inode = op->inode;
int write_res = op->retval < 0 ? op->retval : (op->retval != op->len ? -ERANGE : 0);
if (ewr_it == self->extend_writes.end())
{
*reply = (WRITE3res){ .status = vitastor_nfs_map_err(-op->retval) };
delete op;
rpc_queue_reply(rop);
complete_extend_write(self, rop, inode, write_res);
}
else
{
bool imm = self->parent->cli->get_immediate_commit(inode);
reply->resok.committed = args->stable != UNSTABLE || imm ? FILE_SYNC : UNSTABLE;
*(uint64_t*)reply->resok.verf = self->parent->server_id;
delete op;
if (args->stable != UNSTABLE && !imm)
ewr_it->second.write_res = write_res;
if (ewr_it->second.resize_res <= 0)
{
// Client requested a stable write. Add an fsync
op = new cluster_op_t;
op->opcode = OSD_OP_SYNC;
op->callback = [rop](cluster_op_t *op)
{
if (op->retval != 0)
{
WRITE3res *reply = (WRITE3res*)rop->reply;
*reply = (WRITE3res){ .status = vitastor_nfs_map_err(-op->retval) };
}
delete op;
rpc_queue_reply(rop);
};
self->parent->cli->execute(op);
}
else
{
rpc_queue_reply(rop);
complete_extend_write(self, rop, inode, write_res < 0 ? write_res : ewr_it->second.resize_res);
self->extend_writes.erase(ewr_it);
}
}
};
self->parent->cli->execute(op);
}
static void nfs_resize_write(nfs_client_t *self, rpc_op_t *rop, uint64_t inode, uint64_t new_size, uint64_t offset, uint64_t count, void *buf)
{
// Check if we have to resize the inode during write
auto inode_it = self->parent->cli->st_cli.inode_config.find(inode);
if (inode_it != self->parent->cli->st_cli.inode_config.end() &&
inode_it->second.size < new_size)
{
auto ewr_it = self->extend_writes.emplace((extend_size_t){
.inode = inode,
.new_size = new_size,
}, (extend_write_t){
.rop = rop,
.resize_res = 1,
.write_res = 1,
});
auto & ext = self->extends[inode];
if (ext.cur_extend > 0)
{
// Already resizing, just wait
if (ext.next_extend < new_size)
ext.next_extend = new_size;
}
else
{
extend_inode(self, inode, new_size);
}
nfs_do_write(self, ewr_it, rop, inode, offset, count, buf);
}
else
{
nfs_do_write(self, self->extend_writes.end(), rop, inode, offset, count, buf);
}
}
static int nfs3_create_proc(void *opaque, rpc_op_t *rop)
{
nfs_client_t *self = (nfs_client_t*)opaque;

View File

@ -86,6 +86,28 @@ struct rpc_free_buffer_t
unsigned size;
};
struct extend_size_t
{
inode_t inode;
uint64_t new_size;
};
inline bool operator < (const extend_size_t &a, const extend_size_t &b)
{
return a.inode < b.inode || a.inode == b.inode && a.new_size < b.new_size;
}
struct extend_write_t
{
rpc_op_t *rop;
int resize_res, write_res; // 1 = started, 0 = completed OK, -errno = completed with error
};
struct extend_inode_t
{
uint64_t cur_extend = 0, next_extend = 0;
};
class nfs_client_t
{
public:
@ -100,6 +122,8 @@ public:
rpc_cur_buffer_t cur_buffer = { 0 };
std::map<uint8_t*, rpc_used_buffer_t> used_buffers;
std::vector<rpc_free_buffer_t> free_buffers;
std::map<inode_t, extend_inode_t> extends;
std::multimap<extend_size_t, extend_write_t> extend_writes;
iovec read_iov;
msghdr read_msg = { 0 };