Make nfs_do_rmw a library function

master
Vitaliy Filippov 2024-03-16 00:11:02 +03:00
parent bb8ca6184e
commit 0bde28c24a
2 changed files with 53 additions and 43 deletions

View File

@ -79,6 +79,20 @@ struct shared_file_header_t
uint64_t alloc = 0;
};
struct nfs_rmw_t
{
nfs_proxy_t *parent = NULL;
uint64_t ino = 0;
uint64_t offset = 0;
uint8_t *buf = NULL;
uint64_t size = 0;
uint8_t *part_buf = NULL;
uint64_t version = 0;
nfs_rmw_t *other = NULL;
std::function<void(nfs_rmw_t *)> cb;
int res = 0;
};
nfsstat3 vitastor_nfs_map_err(int err);
nfstime3 nfstime_from_str(const std::string & s);
std::string nfstime_to_str(nfstime3 t);
@ -96,6 +110,7 @@ void kv_read_inode(nfs_proxy_t *proxy, uint64_t ino,
std::function<void(int res, const std::string & value, json11::Json ientry)> cb,
bool allow_cache = false);
uint64_t align_shared_size(nfs_client_t *self, uint64_t size);
void nfs_do_rmw(nfs_rmw_t *rmw);
int kv_nfs3_getattr_proc(void *opaque, rpc_op_t *rop);
int kv_nfs3_setattr_proc(void *opaque, rpc_op_t *rop);

View File

@ -11,18 +11,6 @@
// FIXME: Implement shared inode defragmentator
// FIXME: Implement fsck for vitastor-fs and for vitastor-kv
struct nfs_rmw_t
{
nfs_kv_write_state *st = NULL;
int continue_state = 0;
uint64_t ino = 0;
uint64_t offset = 0;
uint8_t *buf = NULL;
uint64_t size = 0;
uint8_t *part_buf = NULL;
uint64_t version = 0;
};
struct nfs_kv_write_state
{
nfs_client_t *self = NULL;
@ -178,9 +166,9 @@ static void nfs_do_unshare_write(nfs_kv_write_state *st, int state)
}, st, state);
}
static void nfs_do_rmw(nfs_rmw_t *rmw)
void nfs_do_rmw(nfs_rmw_t *rmw)
{
auto parent = rmw->st->self->parent;
auto parent = rmw->parent;
auto align = parent->kvfs->pool_alignment;
assert(rmw->size < align);
assert((rmw->offset/parent->kvfs->pool_block_size) == ((rmw->offset+rmw->size-1)/parent->kvfs->pool_block_size));
@ -194,41 +182,34 @@ static void nfs_do_rmw(nfs_rmw_t *rmw)
op->offset = rmw->offset & ~(align-1);
op->len = align;
op->iov.push_back(rmw->part_buf, op->len);
rmw->st->waiting++;
op->callback = [rmw](cluster_op_t *rd_op)
{
if (rd_op->retval != rd_op->len)
{
free(rmw->part_buf);
rmw->part_buf = NULL;
rmw->st->res = rd_op->retval >= 0 ? -EIO : rd_op->retval;
rmw->st->waiting--;
if (!rmw->st->waiting)
{
nfs_kv_continue_write(rmw->st, rmw->continue_state);
}
rmw->res = rd_op->retval >= 0 ? -EIO : rd_op->retval;
auto cb = std::move(rmw->cb);
cb(rmw);
}
else
{
auto parent = rmw->parent;
if (!rmw->version)
{
auto st = rmw->st;
rmw->version = rd_op->version+1;
if (st->rmw[0].st && st->rmw[1].st &&
st->rmw[0].offset/st->self->parent->kvfs->pool_block_size == st->rmw[1].offset/st->self->parent->kvfs->pool_block_size)
if (rmw->other && rmw->other->offset/parent->kvfs->pool_block_size == rmw->offset/parent->kvfs->pool_block_size)
{
// Same block... RMWs should be sequential
int other = rmw == &st->rmw[0] ? 1 : 0;
st->rmw[other].version = rmw->version+1;
rmw->other->version = rmw->version+1;
}
}
auto parent = rmw->st->self->parent;
auto align = parent->kvfs->pool_alignment;
bool is_begin = (rmw->offset % align);
bool is_end = ((rmw->offset+rmw->size) % align);
auto op = new cluster_op_t;
op->opcode = OSD_OP_WRITE;
op->inode = rmw->st->self->parent->kvfs->fs_base_inode + rmw->ino;
op->inode = parent->kvfs->fs_base_inode + rmw->ino;
op->offset = rmw->offset & ~(align-1);
op->len = align;
op->version = rmw->version;
@ -247,7 +228,6 @@ static void nfs_do_rmw(nfs_rmw_t *rmw)
{
// CAS failure - retry
rmw->version = 0;
rmw->st->waiting--;
nfs_do_rmw(rmw);
}
else
@ -255,14 +235,11 @@ static void nfs_do_rmw(nfs_rmw_t *rmw)
free(rmw->part_buf);
rmw->part_buf = NULL;
if (op->retval != op->len)
{
rmw->st->res = (op->retval >= 0 ? -EIO : op->retval);
}
rmw->st->waiting--;
if (!rmw->st->waiting)
{
nfs_kv_continue_write(rmw->st, rmw->continue_state);
}
rmw->res = (op->retval >= 0 ? -EIO : op->retval);
else
rmw->res = 0;
auto cb = std::move(rmw->cb);
cb(rmw);
}
delete op;
};
@ -449,8 +426,19 @@ static void nfs_do_align_write(nfs_kv_write_state *st, uint64_t ino, uint64_t of
bool begin_shdr = false;
uint64_t end_pad = 0;
st->waiting++;
st->rmw[0].st = NULL;
st->rmw[1].st = NULL;
st->rmw[0].buf = st->rmw[1].buf = NULL;
auto make_rmw_cb = [st, state]()
{
return [st, state](nfs_rmw_t *rmw)
{
st->res = rmw->res < 0 ? rmw->res : st->res;
st->waiting--;
if (!st->waiting)
{
nfs_kv_continue_write(st, state);
}
};
};
if (offset % alignment)
{
if (shared_alloc && st->offset == 0 && (offset % alignment) == sizeof(shared_file_header_t))
@ -479,13 +467,14 @@ static void nfs_do_align_write(nfs_kv_write_state *st, uint64_t ino, uint64_t of
good_size = 0;
s = s > st->size ? st->size : s;
st->rmw[0] = {
.st = st,
.continue_state = state,
.parent = st->self->parent,
.ino = ino,
.offset = offset,
.buf = st->buf,
.size = s,
.cb = make_rmw_cb(),
};
st->waiting++;
nfs_do_rmw(&st->rmw[0]);
}
}
@ -507,13 +496,19 @@ static void nfs_do_align_write(nfs_kv_write_state *st, uint64_t ino, uint64_t of
else
good_size = 0;
st->rmw[1] = {
.st = st,
.continue_state = state,
.parent = st->self->parent,
.ino = ino,
.offset = end - s,
.buf = st->buf + st->size - s,
.size = s,
.cb = make_rmw_cb(),
};
if (st->rmw[0].buf)
{
st->rmw[0].other = &st->rmw[1];
st->rmw[1].other = &st->rmw[0];
}
st->waiting++;
nfs_do_rmw(&st->rmw[1]);
}
}