Implement packing small files into shared inodes
parent
181795d748
commit
e5bb986164
|
@ -13,57 +13,155 @@
|
|||
|
||||
#include "cli.h"
|
||||
|
||||
struct nfs_kv_read_state
|
||||
{
|
||||
nfs_client_t *self = NULL;
|
||||
rpc_op_t *rop = NULL;
|
||||
bool allow_cache = true;
|
||||
inode_t ino = 0;
|
||||
uint64_t offset = 0, size = 0;
|
||||
std::function<void(int)> cb;
|
||||
// state
|
||||
int res = 0;
|
||||
json11::Json ientry;
|
||||
uint64_t aligned_size = 0, aligned_offset = 0;
|
||||
uint8_t *aligned_buf = NULL;
|
||||
cluster_op_t *op = NULL;
|
||||
uint8_t *buf = NULL;
|
||||
};
|
||||
|
||||
static void nfs_kv_continue_read(nfs_kv_read_state *st, int state)
|
||||
{
|
||||
if (state == 0) {}
|
||||
else if (state == 1) goto resume_1;
|
||||
else if (state == 2) goto resume_2;
|
||||
else if (state == 3) goto resume_3;
|
||||
else
|
||||
{
|
||||
fprintf(stderr, "BUG: invalid state in nfs_kv_continue_read()");
|
||||
abort();
|
||||
}
|
||||
if (st->offset + sizeof(shared_file_header_t) < st->self->parent->shared_inode_threshold)
|
||||
{
|
||||
kv_read_inode(st->self, st->ino, [st](int res, const std::string & value, json11::Json attrs)
|
||||
{
|
||||
st->res = res;
|
||||
st->ientry = attrs;
|
||||
nfs_kv_continue_read(st, 1);
|
||||
}, st->allow_cache);
|
||||
return;
|
||||
resume_1:
|
||||
if (st->res < 0 || kv_map_type(st->ientry["type"].string_value()) != NF3REG)
|
||||
{
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res < 0 ? st->res : -EINVAL);
|
||||
return;
|
||||
}
|
||||
if (st->ientry["shared_ino"].uint64_value() != 0)
|
||||
{
|
||||
st->aligned_size = align_shared_size(st->self, st->offset+st->size);
|
||||
st->aligned_buf = (uint8_t*)malloc_or_die(st->aligned_size);
|
||||
st->buf = st->aligned_buf + sizeof(shared_file_header_t) + st->offset;
|
||||
st->op = new cluster_op_t;
|
||||
st->op->opcode = OSD_OP_READ;
|
||||
st->op->inode = st->self->parent->fs_base_inode + st->ientry["shared_ino"].uint64_value();
|
||||
st->op->offset = st->ientry["shared_offset"].uint64_value();
|
||||
if (st->offset+st->size > st->ientry["size"].uint64_value())
|
||||
{
|
||||
st->op->len = align_shared_size(st->self, st->ientry["size"].uint64_value());
|
||||
memset(st->aligned_buf+st->op->len, 0, st->aligned_size-st->op->len);
|
||||
}
|
||||
else
|
||||
st->op->len = st->aligned_size;
|
||||
st->op->iov.push_back(st->aligned_buf, st->op->len);
|
||||
st->op->callback = [st, state](cluster_op_t *op)
|
||||
{
|
||||
st->res = op->retval == op->len ? 0 : op->retval;
|
||||
delete op;
|
||||
nfs_kv_continue_read(st, 2);
|
||||
};
|
||||
st->self->parent->cli->execute(st->op);
|
||||
return;
|
||||
resume_2:
|
||||
if (st->res < 0)
|
||||
{
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res);
|
||||
return;
|
||||
}
|
||||
auto hdr = ((shared_file_header_t*)st->aligned_buf);
|
||||
if (hdr->magic != SHARED_FILE_MAGIC_V1 || hdr->inode != st->ino ||
|
||||
align_shared_size(st->self, hdr->size) > align_shared_size(st->self, st->ientry["size"].uint64_value()))
|
||||
{
|
||||
// Got unrelated data - retry from the beginning
|
||||
free(st->aligned_buf);
|
||||
st->aligned_buf = NULL;
|
||||
st->allow_cache = false;
|
||||
nfs_kv_continue_read(st, 0);
|
||||
return;
|
||||
}
|
||||
auto cb = std::move(st->cb);
|
||||
cb(0);
|
||||
return;
|
||||
}
|
||||
}
|
||||
st->aligned_offset = (st->offset & ~(st->self->parent->pool_alignment-1));
|
||||
st->aligned_size = ((st->offset + st->size + st->self->parent->pool_alignment) &
|
||||
~(st->self->parent->pool_alignment-1)) - st->aligned_offset;
|
||||
st->aligned_buf = (uint8_t*)malloc_or_die(st->aligned_size);
|
||||
st->buf = st->aligned_buf + st->offset - st->aligned_offset;
|
||||
st->op = new cluster_op_t;
|
||||
st->op->opcode = OSD_OP_READ;
|
||||
st->op->inode = st->self->parent->fs_base_inode + st->ino;
|
||||
st->op->offset = st->aligned_offset;
|
||||
st->op->len = st->aligned_size;
|
||||
st->op->iov.push_back(st->aligned_buf, st->aligned_size);
|
||||
st->op->callback = [st](cluster_op_t *op)
|
||||
{
|
||||
st->res = op->retval;
|
||||
delete op;
|
||||
nfs_kv_continue_read(st, 3);
|
||||
};
|
||||
st->self->parent->cli->execute(st->op);
|
||||
return;
|
||||
resume_3:
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res);
|
||||
return;
|
||||
}
|
||||
|
||||
int kv_nfs3_read_proc(void *opaque, rpc_op_t *rop)
|
||||
{
|
||||
nfs_client_t *self = (nfs_client_t*)opaque;
|
||||
READ3args *args = (READ3args*)rop->request;
|
||||
READ3res *reply = (READ3res*)rop->reply;
|
||||
inode_t ino = kv_fh_inode(args->file);
|
||||
auto ino = kv_fh_inode(args->file);
|
||||
if (args->count > MAX_REQUEST_SIZE || !ino)
|
||||
{
|
||||
*reply = (READ3res){ .status = NFS3ERR_INVAL };
|
||||
rpc_queue_reply(rop);
|
||||
return 0;
|
||||
}
|
||||
uint64_t alignment = self->parent->cli->st_cli.global_bitmap_granularity;
|
||||
auto pool_cfg = self->parent->cli->st_cli.pool_config.find(INODE_POOL(self->parent->fs_base_inode));
|
||||
if (pool_cfg != self->parent->cli->st_cli.pool_config.end())
|
||||
auto st = new nfs_kv_read_state;
|
||||
st->self = (nfs_client_t*)opaque;
|
||||
st->rop = rop;
|
||||
st->ino = ino;
|
||||
st->offset = args->offset;
|
||||
st->size = args->count;
|
||||
st->cb = [st](int res)
|
||||
{
|
||||
alignment = pool_cfg->second.bitmap_granularity;
|
||||
READ3res *reply = (READ3res*)st->rop->reply;
|
||||
*reply = (READ3res){ .status = vitastor_nfs_map_err(res) };
|
||||
if (res == 0)
|
||||
{
|
||||
xdr_add_malloc(st->rop->xdrs, st->aligned_buf);
|
||||
reply->resok.data.data = (char*)st->buf;
|
||||
reply->resok.data.size = st->size;
|
||||
reply->resok.count = st->size;
|
||||
reply->resok.eof = 0;
|
||||
}
|
||||
uint64_t aligned_offset = args->offset - (args->offset % alignment);
|
||||
uint64_t aligned_count = args->offset + args->count;
|
||||
if (aligned_count % alignment)
|
||||
aligned_count = aligned_count + alignment - (aligned_count % alignment);
|
||||
aligned_count -= aligned_offset;
|
||||
void *buf = malloc_or_die(aligned_count);
|
||||
xdr_add_malloc(rop->xdrs, buf);
|
||||
cluster_op_t *op = new cluster_op_t;
|
||||
op->opcode = OSD_OP_READ;
|
||||
op->inode = self->parent->fs_base_inode + ino;
|
||||
op->offset = aligned_offset;
|
||||
op->len = aligned_count;
|
||||
op->iov.push_back(buf, aligned_count);
|
||||
*reply = (READ3res){ .status = NFS3_OK };
|
||||
reply->resok.data.data = (char*)buf + args->offset - aligned_offset;
|
||||
reply->resok.data.size = args->count;
|
||||
op->callback = [rop](cluster_op_t *op)
|
||||
{
|
||||
READ3res *reply = (READ3res*)rop->reply;
|
||||
if (op->retval != op->len)
|
||||
{
|
||||
*reply = (READ3res){ .status = vitastor_nfs_map_err(-op->retval) };
|
||||
}
|
||||
else
|
||||
{
|
||||
auto & reply_ok = reply->resok;
|
||||
// reply_ok.data.data is already set above
|
||||
reply_ok.count = reply_ok.data.size;
|
||||
reply_ok.eof = 0;
|
||||
}
|
||||
rpc_queue_reply(rop);
|
||||
delete op;
|
||||
rpc_queue_reply(st->rop);
|
||||
delete st;
|
||||
};
|
||||
self->parent->cli->execute(op);
|
||||
nfs_kv_continue_read(st, 0);
|
||||
return 1;
|
||||
}
|
||||
|
|
|
@ -228,7 +228,8 @@ resume_6:
|
|||
return;
|
||||
}
|
||||
// (6) If regular file and inode is deleted: delete data
|
||||
if ((!st->type || st->type == NF3REG) && st->ientry["nlink"].uint64_value() <= 1)
|
||||
if ((!st->type || st->type == NF3REG) && st->ientry["nlink"].uint64_value() <= 1 &&
|
||||
!st->ientry["shared_inode"].uint64_value())
|
||||
{
|
||||
// Remove data
|
||||
st->self->parent->cmd->loop_and_wait(st->self->parent->cmd->start_rm_data(json11::Json::object {
|
||||
|
|
|
@ -13,254 +13,789 @@
|
|||
|
||||
#include "cli.h"
|
||||
|
||||
static void nfs_resize_write(nfs_client_t *self, rpc_op_t *rop, uint64_t inode, uint64_t new_size, uint64_t offset, uint64_t count, void *buf);
|
||||
|
||||
int kv_nfs3_write_proc(void *opaque, rpc_op_t *rop)
|
||||
struct nfs_rmw_t
|
||||
{
|
||||
nfs_client_t *self = (nfs_client_t*)opaque;
|
||||
WRITE3args *args = (WRITE3args*)rop->request;
|
||||
WRITE3res *reply = (WRITE3res*)rop->reply;
|
||||
inode_t ino = kv_fh_inode(args->file);
|
||||
if (!ino)
|
||||
{
|
||||
*reply = (WRITE3res){ .status = NFS3ERR_INVAL };
|
||||
rpc_queue_reply(rop);
|
||||
return 0;
|
||||
}
|
||||
if (args->count > MAX_REQUEST_SIZE)
|
||||
{
|
||||
*reply = (WRITE3res){ .status = NFS3ERR_INVAL };
|
||||
rpc_queue_reply(rop);
|
||||
return 0;
|
||||
}
|
||||
uint64_t count = args->count > args->data.size ? args->data.size : args->count;
|
||||
uint64_t alignment = self->parent->cli->st_cli.global_bitmap_granularity;
|
||||
auto pool_cfg = self->parent->cli->st_cli.pool_config.find(INODE_POOL(self->parent->fs_base_inode));
|
||||
if (pool_cfg != self->parent->cli->st_cli.pool_config.end())
|
||||
{
|
||||
alignment = pool_cfg->second.bitmap_granularity;
|
||||
}
|
||||
// Pre-fill reply
|
||||
*reply = (WRITE3res){
|
||||
.status = NFS3_OK,
|
||||
.resok = (WRITE3resok){
|
||||
//.file_wcc = ...,
|
||||
.count = (unsigned)count,
|
||||
},
|
||||
nfs_kv_write_state *st = NULL;
|
||||
int continue_state = 0;
|
||||
uint64_t ino = 0;
|
||||
uint64_t offset = 0;
|
||||
uint8_t *buf = NULL;
|
||||
uint64_t size = 0;
|
||||
uint8_t *part_buf = NULL;
|
||||
};
|
||||
if ((args->offset % alignment) != 0 || (count % alignment) != 0)
|
||||
|
||||
struct nfs_kv_write_state
|
||||
{
|
||||
// Unaligned write, requires read-modify-write
|
||||
uint64_t aligned_offset = args->offset - (args->offset % alignment);
|
||||
uint64_t aligned_count = args->offset + args->count;
|
||||
if (aligned_count % alignment)
|
||||
aligned_count = aligned_count + alignment - (aligned_count % alignment);
|
||||
aligned_count -= aligned_offset;
|
||||
void *buf = malloc_or_die(aligned_count);
|
||||
xdr_add_malloc(rop->xdrs, buf);
|
||||
// Read
|
||||
cluster_op_t *op = new cluster_op_t;
|
||||
op->opcode = OSD_OP_READ;
|
||||
op->inode = self->parent->fs_base_inode + ino;
|
||||
op->offset = aligned_offset;
|
||||
op->len = aligned_count;
|
||||
op->iov.push_back(buf, aligned_count);
|
||||
op->callback = [self, rop, count](cluster_op_t *op)
|
||||
nfs_client_t *self = NULL;
|
||||
rpc_op_t *rop = NULL;
|
||||
uint64_t ino = 0;
|
||||
uint64_t offset = 0, size = 0;
|
||||
bool stable = false;
|
||||
uint8_t *buf = NULL;
|
||||
std::function<void(int res)> cb;
|
||||
// state
|
||||
bool allow_cache = true;
|
||||
int res = 0, res2 = 0;
|
||||
int waiting = 0;
|
||||
std::string ientry_text;
|
||||
json11::Json ientry;
|
||||
uint64_t new_size = 0;
|
||||
uint64_t aligned_size = 0;
|
||||
uint8_t *aligned_buf = NULL;
|
||||
uint64_t shared_inode = 0, shared_offset = 0;
|
||||
bool was_immediate = false;
|
||||
nfs_rmw_t rmw[2];
|
||||
inode_extend_t *ext = NULL;
|
||||
|
||||
~nfs_kv_write_state()
|
||||
{
|
||||
if (aligned_buf)
|
||||
{
|
||||
free(aligned_buf);
|
||||
aligned_buf = NULL;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
static void nfs_kv_continue_write(nfs_kv_write_state *st, int state);
|
||||
|
||||
static void finish_allocate_shared(nfs_client_t *self, int res)
|
||||
{
|
||||
std::vector<shared_alloc_queue_t> waiting;
|
||||
waiting.swap(self->parent->allocating_shared);
|
||||
for (auto & w: waiting)
|
||||
{
|
||||
w.st->res = res;
|
||||
if (res == 0)
|
||||
{
|
||||
w.st->shared_inode = self->parent->cur_shared_inode;
|
||||
w.st->shared_offset = self->parent->cur_shared_offset;
|
||||
self->parent->cur_shared_offset += (w.size + self->parent->pool_alignment-1) & ~(self->parent->pool_alignment-1);
|
||||
}
|
||||
nfs_kv_continue_write(w.st, w.state);
|
||||
}
|
||||
}
|
||||
|
||||
static void allocate_shared_inode(nfs_kv_write_state *st, int state, uint64_t size)
|
||||
{
|
||||
if (st->self->parent->cur_shared_inode == 0)
|
||||
{
|
||||
st->self->parent->allocating_shared.push_back({ st, state, size });
|
||||
if (st->self->parent->allocating_shared.size() > 1)
|
||||
{
|
||||
return;
|
||||
}
|
||||
allocate_new_id(st->self, [st](int res, uint64_t new_id)
|
||||
{
|
||||
if (res < 0)
|
||||
{
|
||||
finish_allocate_shared(st->self, res);
|
||||
return;
|
||||
}
|
||||
st->self->parent->cur_shared_inode = new_id;
|
||||
st->self->parent->cur_shared_offset = 0;
|
||||
st->self->parent->db->set(
|
||||
kv_inode_key(new_id), json11::Json(json11::Json::object{ { "type", "shared" } }).dump(),
|
||||
[st](int res)
|
||||
{
|
||||
if (res < 0)
|
||||
{
|
||||
st->self->parent->cur_shared_inode = 0;
|
||||
}
|
||||
finish_allocate_shared(st->self, res);
|
||||
},
|
||||
[](int res, const std::string & old_value)
|
||||
{
|
||||
return res == -ENOENT;
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t align_shared_size(nfs_client_t *self, uint64_t size)
|
||||
{
|
||||
return (size + sizeof(shared_file_header_t) + self->parent->pool_alignment-1)
|
||||
& ~(self->parent->pool_alignment-1);
|
||||
}
|
||||
|
||||
static void nfs_do_write(uint64_t ino, uint64_t offset, uint8_t *buf, uint64_t size, nfs_kv_write_state *st, int state)
|
||||
{
|
||||
auto op = new cluster_op_t;
|
||||
op->opcode = OSD_OP_WRITE;
|
||||
op->inode = st->self->parent->fs_base_inode + ino;
|
||||
op->offset = offset;
|
||||
op->len = size;
|
||||
op->iov.push_back(buf, size);
|
||||
st->waiting++;
|
||||
op->callback = [st, state](cluster_op_t *op)
|
||||
{
|
||||
if (op->retval != op->len)
|
||||
{
|
||||
WRITE3res *reply = (WRITE3res*)rop->reply;
|
||||
*reply = (WRITE3res){ .status = vitastor_nfs_map_err(-op->retval) };
|
||||
rpc_queue_reply(rop);
|
||||
return;
|
||||
st->res = op->retval >= 0 ? -EIO : op->retval;
|
||||
}
|
||||
void *buf = op->iov.buf[0].iov_base;
|
||||
WRITE3args *args = (WRITE3args*)rop->request;
|
||||
memcpy((uint8_t*)buf + args->offset - op->offset, args->data.data, count);
|
||||
nfs_resize_write(self, rop, op->inode, args->offset+count, op->offset, op->len, buf);
|
||||
delete op;
|
||||
st->waiting--;
|
||||
if (!st->waiting)
|
||||
{
|
||||
nfs_kv_continue_write(st, state);
|
||||
}
|
||||
};
|
||||
self->parent->cli->execute(op);
|
||||
st->self->parent->cli->execute(op);
|
||||
}
|
||||
|
||||
static void nfs_do_shared_write(nfs_kv_write_state *st, int state)
|
||||
{
|
||||
nfs_do_write(st->shared_inode, st->shared_offset, st->aligned_buf, st->aligned_size, st, state);
|
||||
}
|
||||
|
||||
static void nfs_do_unshare_write(nfs_kv_write_state *st, int state)
|
||||
{
|
||||
nfs_do_write(st->ino, 0, st->aligned_buf + sizeof(shared_file_header_t),
|
||||
st->aligned_size - sizeof(shared_file_header_t), st, state);
|
||||
}
|
||||
|
||||
static void nfs_do_rmw(nfs_rmw_t *rmw)
|
||||
{
|
||||
auto parent = rmw->st->self->parent;
|
||||
auto align = parent->pool_alignment;
|
||||
bool is_begin = (rmw->offset % align);
|
||||
bool is_end = ((rmw->offset+rmw->size) % align);
|
||||
// RMW either only at beginning or only at end and within a single block
|
||||
assert(is_begin != is_end);
|
||||
assert((rmw->offset/parent->pool_block_size) == ((rmw->offset+rmw->size-1)/parent->pool_block_size));
|
||||
if (!rmw->part_buf)
|
||||
{
|
||||
rmw->part_buf = (uint8_t*)malloc_or_die(align);
|
||||
}
|
||||
auto op = new cluster_op_t;
|
||||
op->opcode = OSD_OP_READ;
|
||||
op->inode = parent->fs_base_inode + rmw->ino;
|
||||
op->offset = (rmw->offset + (is_begin ? 0 : rmw->size)) & ~(align-1);
|
||||
op->len = align;
|
||||
op->iov.push_back(rmw->part_buf, op->len);
|
||||
rmw->st->waiting++;
|
||||
op->callback = [rmw](cluster_op_t *rd_op)
|
||||
{
|
||||
if (rd_op->retval != rd_op->len)
|
||||
{
|
||||
free(rmw->part_buf);
|
||||
rmw->part_buf = NULL;
|
||||
rmw->st->res = rd_op->retval >= 0 ? -EIO : rd_op->retval;
|
||||
rmw->st->waiting--;
|
||||
if (!rmw->st->waiting)
|
||||
{
|
||||
nfs_kv_continue_write(rmw->st, rmw->continue_state);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
nfs_resize_write(self, rop, ino, args->offset+count, args->offset, count, args->data.data);
|
||||
auto parent = rmw->st->self->parent;
|
||||
auto align = parent->pool_alignment;
|
||||
bool is_begin = (rmw->offset % align);
|
||||
auto op = new cluster_op_t;
|
||||
op->opcode = OSD_OP_WRITE;
|
||||
op->inode = rmw->st->self->parent->fs_base_inode + rmw->ino;
|
||||
op->offset = rmw->offset & ~(align-1);
|
||||
op->len = (rmw->size + align-1) & ~(align-1);
|
||||
op->version = rd_op->version+1;
|
||||
if (is_begin)
|
||||
{
|
||||
op->iov.push_back(rmw->part_buf, rmw->offset % align);
|
||||
}
|
||||
return 1;
|
||||
op->iov.push_back(rmw->buf, rmw->size);
|
||||
if (!is_begin)
|
||||
{
|
||||
auto tail = ((rmw->offset+rmw->size) % align);
|
||||
op->iov.push_back(rmw->part_buf + tail, align - tail);
|
||||
}
|
||||
op->callback = [rmw](cluster_op_t *op)
|
||||
{
|
||||
if (op->retval == -EAGAIN)
|
||||
{
|
||||
// CAS failure - retry
|
||||
rmw->st->waiting--;
|
||||
nfs_do_rmw(rmw);
|
||||
}
|
||||
else
|
||||
{
|
||||
free(rmw->part_buf);
|
||||
rmw->part_buf = NULL;
|
||||
if (op->retval != op->len)
|
||||
{
|
||||
rmw->st->res = (op->retval >= 0 ? -EIO : op->retval);
|
||||
}
|
||||
rmw->st->waiting--;
|
||||
if (!rmw->st->waiting)
|
||||
{
|
||||
nfs_kv_continue_write(rmw->st, rmw->continue_state);
|
||||
}
|
||||
}
|
||||
delete op;
|
||||
};
|
||||
parent->cli->execute(op);
|
||||
}
|
||||
delete rd_op;
|
||||
};
|
||||
parent->cli->execute(op);
|
||||
}
|
||||
|
||||
static void complete_extend_write(nfs_client_t *self, rpc_op_t *rop, inode_t inode, int res)
|
||||
static void nfs_do_shared_read(nfs_kv_write_state *st, int state)
|
||||
{
|
||||
WRITE3args *args = (WRITE3args*)rop->request;
|
||||
WRITE3res *reply = (WRITE3res*)rop->reply;
|
||||
if (res < 0)
|
||||
auto op = new cluster_op_t;
|
||||
op->opcode = OSD_OP_READ;
|
||||
op->inode = st->self->parent->fs_base_inode + st->ientry["shared_ino"].uint64_value();
|
||||
op->offset = st->ientry["shared_offset"].uint64_value();
|
||||
op->len = align_shared_size(st->self, st->ientry["size"].uint64_value());
|
||||
op->iov.push_back(st->aligned_buf, op->len);
|
||||
op->callback = [st, state](cluster_op_t *op)
|
||||
{
|
||||
*reply = (WRITE3res){ .status = vitastor_nfs_map_err(res) };
|
||||
rpc_queue_reply(rop);
|
||||
return;
|
||||
st->res = op->retval == op->len ? 0 : op->retval;
|
||||
delete op;
|
||||
nfs_kv_continue_write(st, state);
|
||||
};
|
||||
st->self->parent->cli->execute(op);
|
||||
}
|
||||
bool imm = self->parent->cli->get_immediate_commit(inode);
|
||||
reply->resok.committed = args->stable != UNSTABLE || imm ? FILE_SYNC : UNSTABLE;
|
||||
*(uint64_t*)reply->resok.verf = self->parent->server_id;
|
||||
if (args->stable != UNSTABLE && !imm)
|
||||
|
||||
static void nfs_do_fsync(nfs_kv_write_state *st, int state)
|
||||
{
|
||||
// Client requested a stable write. Add an fsync
|
||||
auto op = new cluster_op_t;
|
||||
op->opcode = OSD_OP_SYNC;
|
||||
op->callback = [rop](cluster_op_t *op)
|
||||
op->callback = [st, state](cluster_op_t *op)
|
||||
{
|
||||
if (op->retval != 0)
|
||||
{
|
||||
WRITE3res *reply = (WRITE3res*)rop->reply;
|
||||
*reply = (WRITE3res){ .status = vitastor_nfs_map_err(-op->retval) };
|
||||
}
|
||||
delete op;
|
||||
rpc_queue_reply(rop);
|
||||
nfs_kv_continue_write(st, state);
|
||||
};
|
||||
self->parent->cli->execute(op);
|
||||
st->self->parent->cli->execute(op);
|
||||
}
|
||||
|
||||
static bool nfs_do_shared_readmodify(nfs_kv_write_state *st, int base_state, int state, bool unshare)
|
||||
{
|
||||
assert(state <= base_state);
|
||||
if (state < base_state) {}
|
||||
else if (state == base_state) goto resume_0;
|
||||
assert(!st->aligned_buf);
|
||||
st->aligned_size = unshare
|
||||
? sizeof(shared_file_header_t) + (st->new_size + st->self->parent->pool_alignment-1) & ~(st->self->parent->pool_alignment-1)
|
||||
: align_shared_size(st->self, st->new_size);
|
||||
st->aligned_buf = (uint8_t*)malloc_or_die(st->aligned_size);
|
||||
memset(st->aligned_buf + sizeof(shared_file_header_t), 0, st->offset);
|
||||
if (st->ientry["shared_ino"].uint64_value() != 0)
|
||||
{
|
||||
// Read old data if shared non-empty
|
||||
nfs_do_shared_read(st, base_state);
|
||||
return false;
|
||||
resume_0:
|
||||
if (st->res < 0)
|
||||
{
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res);
|
||||
return true;
|
||||
}
|
||||
auto hdr = ((shared_file_header_t*)st->aligned_buf);
|
||||
if (hdr->magic != SHARED_FILE_MAGIC_V1 || hdr->inode != st->ino ||
|
||||
align_shared_size(st->self, hdr->size) > align_shared_size(st->self, st->ientry["size"].uint64_value()))
|
||||
{
|
||||
// Got unrelated data - retry from the beginning
|
||||
st->allow_cache = false;
|
||||
nfs_kv_continue_write(st, 0);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
*((shared_file_header_t*)st->aligned_buf) = {
|
||||
.magic = SHARED_FILE_MAGIC_V1,
|
||||
.inode = st->ino,
|
||||
.size = st->new_size,
|
||||
};
|
||||
memcpy(st->aligned_buf + sizeof(shared_file_header_t) + st->offset, st->buf, st->size);
|
||||
memset(st->aligned_buf + sizeof(shared_file_header_t) + st->offset + st->size, 0,
|
||||
st->aligned_size - sizeof(shared_file_header_t) - st->offset - st->size);
|
||||
return true;
|
||||
}
|
||||
|
||||
static void nfs_do_align_write(nfs_kv_write_state *st, uint64_t ino, uint64_t offset, int state)
|
||||
{
|
||||
auto alignment = st->self->parent->pool_alignment;
|
||||
uint8_t *good_buf = st->buf;
|
||||
uint64_t good_offset = offset;
|
||||
uint64_t good_size = st->size;
|
||||
st->waiting++;
|
||||
if (offset % alignment)
|
||||
{
|
||||
// Requires read-modify-write in the beginning
|
||||
auto s = (alignment - (offset % alignment));
|
||||
if (good_size > s)
|
||||
{
|
||||
good_buf += s;
|
||||
good_offset += s;
|
||||
good_size -= s;
|
||||
}
|
||||
else
|
||||
good_size = 0;
|
||||
s = s > st->size ? st->size : s;
|
||||
st->rmw[0] = {
|
||||
.st = st,
|
||||
.continue_state = state,
|
||||
.ino = ino,
|
||||
.offset = offset,
|
||||
.buf = st->buf,
|
||||
.size = s,
|
||||
};
|
||||
nfs_do_rmw(&st->rmw[0]);
|
||||
}
|
||||
if ((offset+st->size-1) % alignment)
|
||||
{
|
||||
rpc_queue_reply(rop);
|
||||
// Requires read-modify-write in the end
|
||||
auto s = ((offset+st->size-1) % alignment);
|
||||
if (good_size > s)
|
||||
good_size -= s;
|
||||
else
|
||||
good_size = 0;
|
||||
if (((offset+st->size-1) / alignment) > (offset / alignment))
|
||||
{
|
||||
st->rmw[1] = {
|
||||
.st = st,
|
||||
.continue_state = state,
|
||||
.ino = ino,
|
||||
.offset = offset + st->size-s,
|
||||
.buf = st->buf + st->size-s,
|
||||
.size = s,
|
||||
};
|
||||
nfs_do_rmw(&st->rmw[1]);
|
||||
}
|
||||
}
|
||||
if (good_size > 0)
|
||||
{
|
||||
// Normal write
|
||||
nfs_do_write(ino, good_offset, good_buf, good_size, st, state);
|
||||
}
|
||||
st->waiting--;
|
||||
if (!st->waiting)
|
||||
{
|
||||
nfs_kv_continue_write(st, state);
|
||||
}
|
||||
}
|
||||
|
||||
static void complete_extend_inode(nfs_client_t *self, uint64_t inode, uint64_t new_size, int err)
|
||||
static std::string new_normal_ientry(nfs_kv_write_state *st)
|
||||
{
|
||||
auto ext_it = self->extend_writes.lower_bound((extend_size_t){ .inode = inode, .new_size = 0 });
|
||||
while (ext_it != self->extend_writes.end() &&
|
||||
ext_it->first.inode == inode &&
|
||||
ext_it->first.new_size <= new_size)
|
||||
{
|
||||
ext_it->second.resize_res = err;
|
||||
if (ext_it->second.write_res <= 0)
|
||||
{
|
||||
complete_extend_write(self, ext_it->second.rop, inode, ext_it->second.write_res < 0
|
||||
? ext_it->second.write_res : ext_it->second.resize_res);
|
||||
self->extend_writes.erase(ext_it++);
|
||||
}
|
||||
else
|
||||
ext_it++;
|
||||
}
|
||||
auto ni = st->ientry.object_items();
|
||||
ni.erase("empty");
|
||||
ni.erase("shared_ino");
|
||||
ni.erase("shared_offset");
|
||||
ni.erase("shared_alloc");
|
||||
ni.erase("shared_ver");
|
||||
ni["size"] = st->ext->cur_extend;
|
||||
return json11::Json(ni).dump();
|
||||
}
|
||||
|
||||
static void extend_inode(nfs_client_t *self, uint64_t inode)
|
||||
static std::string new_moved_ientry(nfs_kv_write_state *st)
|
||||
{
|
||||
// Send an extend request
|
||||
auto ext = &self->extends[inode];
|
||||
self->parent->db->set(kv_inode_key(inode), json11::Json(ext->attrs).dump().c_str(), [=](int res)
|
||||
{
|
||||
if (res < 0 && res != -EAGAIN)
|
||||
{
|
||||
fprintf(stderr, "Error extending inode %ju to %ju bytes: %s (code %d)\n", inode, ext->cur_extend, strerror(-res), res);
|
||||
auto ni = st->ientry.object_items();
|
||||
ni.erase("empty");
|
||||
ni["shared_ino"] = st->shared_inode;
|
||||
ni["shared_offset"] = st->shared_offset;
|
||||
ni["shared_alloc"] = st->aligned_size;
|
||||
ni.erase("shared_ver");
|
||||
ni["size"] = st->new_size;
|
||||
return json11::Json(ni).dump();
|
||||
}
|
||||
if (res == -EAGAIN || ext->next_extend > ext->cur_extend)
|
||||
|
||||
static std::string new_shared_ientry(nfs_kv_write_state *st)
|
||||
{
|
||||
// Multiple concurrent resize requests received, try to repeat
|
||||
kv_read_inode(self, inode, [=](int res, const std::string & value, json11::Json attrs)
|
||||
{
|
||||
auto new_size = ext->next_extend > ext->cur_extend ? ext->next_extend : ext->cur_extend;
|
||||
if (res == 0 && attrs["size"].uint64_value() < new_size)
|
||||
{
|
||||
ext->old_ientry = value;
|
||||
ext->attrs = attrs.object_items();
|
||||
ext->cur_extend = new_size;
|
||||
ext->attrs["size"] = new_size;
|
||||
extend_inode(self, inode);
|
||||
auto ni = st->ientry.object_items();
|
||||
ni.erase("empty");
|
||||
ni["size"] = st->new_size;
|
||||
ni["shared_ver"] = ni["shared_ver"].uint64_value()+1;
|
||||
return json11::Json(ni).dump();
|
||||
}
|
||||
else
|
||||
|
||||
static void nfs_kv_extend_inode(nfs_kv_write_state *st, int state)
|
||||
{
|
||||
if (res < 0)
|
||||
if (state == 1)
|
||||
{
|
||||
fprintf(stderr, "Error extending inode %ju to %ju bytes: %s (code %d)\n", inode, ext->cur_extend, strerror(-res), res);
|
||||
goto resume_1;
|
||||
}
|
||||
ext->cur_extend = ext->next_extend = 0;
|
||||
complete_extend_inode(self, inode, attrs["size"].uint64_value(), res);
|
||||
st->ext->cur_extend = st->ext->next_extend;
|
||||
st->ext->next_extend = 0;
|
||||
st->res2 = -EAGAIN;
|
||||
st->self->parent->db->set(kv_inode_key(st->ino), new_normal_ientry(st), [st](int res)
|
||||
{
|
||||
st->res = res;
|
||||
nfs_kv_continue_write(st, 13);
|
||||
}, [st](int res, const std::string & old_value)
|
||||
{
|
||||
if (res != 0)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
if (old_value == st->ientry_text)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
std::string err;
|
||||
auto ientry = json11::Json::parse(old_value, err).object_items();
|
||||
if (err != "")
|
||||
{
|
||||
st->res2 = -EINVAL;
|
||||
return false;
|
||||
}
|
||||
else if (ientry.size() == st->ientry.object_items().size())
|
||||
{
|
||||
for (auto & kv: st->ientry.object_items())
|
||||
{
|
||||
if (kv.first != "size" && ientry[kv.first] != kv.second)
|
||||
{
|
||||
// Something except size changed
|
||||
return false;
|
||||
}
|
||||
}
|
||||
// OK, only size changed
|
||||
if (ientry["size"] >= st->new_size)
|
||||
{
|
||||
// Already extended
|
||||
st->res2 = 0;
|
||||
return false;
|
||||
}
|
||||
// size is different but can still be extended, other parameters don't differ
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
return;
|
||||
resume_1:
|
||||
if (st->res == -EAGAIN)
|
||||
{
|
||||
// EAGAIN may be OK in fact (see above)
|
||||
st->res = st->res2;
|
||||
}
|
||||
if (st->res == 0)
|
||||
{
|
||||
st->ext->done_extend = st->ext->cur_extend;
|
||||
}
|
||||
st->ext->cur_extend = 0;
|
||||
// Wake up other extenders anyway
|
||||
auto waiters = std::move(st->ext->waiters);
|
||||
for (auto & cb: waiters)
|
||||
{
|
||||
cb();
|
||||
}
|
||||
auto new_size = ext->cur_extend;
|
||||
ext->cur_extend = ext->next_extend = 0;
|
||||
complete_extend_inode(self, inode, new_size, res);
|
||||
});
|
||||
}
|
||||
|
||||
static void nfs_do_write(nfs_client_t *self, std::multimap<extend_size_t, extend_write_t>::iterator ewr_it,
|
||||
rpc_op_t *rop, uint64_t inode, uint64_t offset, uint64_t count, void *buf)
|
||||
// Packing small files into "shared inodes". Insane algorithm...
|
||||
// Write:
|
||||
// - If (offset+size <= threshold):
|
||||
// - Read inode from cache
|
||||
// - If inode does not exist - stop with -ENOENT
|
||||
// - If inode is not a regular file - stop with -EINVAL
|
||||
// - If it's empty (size == 0 || empty == true):
|
||||
// - If preset size is larger than threshold:
|
||||
// - Write data into non-shared inode
|
||||
// - In parallel: clear empty flag
|
||||
// - If CAS failure: re-read inode and restart
|
||||
// - Otherwise:
|
||||
// - Allocate/take a shared inode
|
||||
// - Allocate space in its end
|
||||
// - Write data into shared inode
|
||||
// - If CAS failure: allocate another shared inode and retry
|
||||
// - Write shared inode reference, set size
|
||||
// - If CAS failure: free allocated shared space, re-read inode and restart
|
||||
// - If it's not empty:
|
||||
// - If non-shared:
|
||||
// - Write data into non-shared inode
|
||||
// - In parallel: check if data fits into inode size and extend if it doesn't
|
||||
// - If CAS failure: re-read inode and retry to extend the size
|
||||
// - If shared:
|
||||
// - Read whole file from shared inode
|
||||
// - If the file header in data doesn't match: re-read inode and restart
|
||||
// - If data doesn't fit into the same shared inode:
|
||||
// - Allocate space in a new shared inode
|
||||
// - Write data into the new shared inode
|
||||
// - If CAS failure: allocate another shared inode and retry
|
||||
// - Update inode metadata (set new size and new shared inode)
|
||||
// - If CAS failure: free allocated shared space, re-read inode and restart
|
||||
// - If it fits:
|
||||
// - Write updated data into the shared inode
|
||||
// - Update inode entry in any case to block parallel non-shared writes
|
||||
// - If CAS failure: re-read inode and restart
|
||||
// - Otherwise:
|
||||
// - Write data into non-shared inode
|
||||
// - Read inode in parallel
|
||||
// - If not a regular file:
|
||||
// - Remove data
|
||||
// - Stop with -EINVAL
|
||||
// - If shared:
|
||||
// - Read whole file from shared inode
|
||||
// - Write data into non-shared inode
|
||||
// - If CAS failure (block should not exist): restart
|
||||
// - Update inode metadata (make non-shared, update size)
|
||||
// - If CAS failure: restart
|
||||
// - Zero out the shared inode header
|
||||
// - If CAS failure: restart
|
||||
// - Check if size fits
|
||||
// - Extend if it doesn't
|
||||
// Read:
|
||||
// - If (offset+size <= threshold):
|
||||
// - Read inode from cache
|
||||
// - If empty: return zeroes
|
||||
// - If shared:
|
||||
// - Read the whole file from shared inode, or at least data and shared inode header
|
||||
// - If the file header in data doesn't match: re-read inode and restart
|
||||
// - If non-shared:
|
||||
// - Read data from non-shared inode
|
||||
// - Otherwise:
|
||||
// - Read data from non-shared inode
|
||||
|
||||
static void nfs_kv_continue_write(nfs_kv_write_state *st, int state)
|
||||
{
|
||||
cluster_op_t *op = new cluster_op_t;
|
||||
op->opcode = OSD_OP_WRITE;
|
||||
op->inode = self->parent->fs_base_inode + inode;
|
||||
op->offset = offset;
|
||||
op->len = count;
|
||||
op->iov.push_back(buf, count);
|
||||
op->callback = [self, ewr_it, rop](cluster_op_t *op)
|
||||
if (state == 0) {}
|
||||
else if (state == 1) goto resume_1;
|
||||
else if (state == 2) goto resume_2;
|
||||
else if (state == 3) goto resume_3;
|
||||
else if (state == 4) goto resume_4;
|
||||
else if (state == 5) goto resume_5;
|
||||
else if (state == 6) goto resume_6;
|
||||
else if (state == 7) goto resume_7;
|
||||
else if (state == 8) goto resume_8;
|
||||
else if (state == 9) goto resume_9;
|
||||
else if (state == 10) goto resume_10;
|
||||
else if (state == 11) goto resume_11;
|
||||
else if (state == 12) goto resume_12;
|
||||
else if (state == 13) goto resume_13;
|
||||
else
|
||||
{
|
||||
auto inode = op->inode;
|
||||
int write_res = op->retval < 0 ? op->retval : (op->retval != op->len ? -ERANGE : 0);
|
||||
if (ewr_it == self->extend_writes.end())
|
||||
fprintf(stderr, "BUG: invalid state in nfs_kv_continue_write()");
|
||||
abort();
|
||||
}
|
||||
resume_0:
|
||||
if (!st->size)
|
||||
{
|
||||
complete_extend_write(self, rop, inode, write_res);
|
||||
auto cb = std::move(st->cb);
|
||||
cb(0);
|
||||
return;
|
||||
}
|
||||
kv_read_inode(st->self, st->ino, [st](int res, const std::string & value, json11::Json attrs)
|
||||
{
|
||||
st->res = res;
|
||||
st->ientry_text = value;
|
||||
st->ientry = attrs;
|
||||
nfs_kv_continue_write(st, 1);
|
||||
}, st->allow_cache);
|
||||
return;
|
||||
resume_1:
|
||||
if (st->res < 0 ||
|
||||
st->ientry["type"].uint64_value() != 0 &&
|
||||
st->ientry["type"].uint64_value() != NF3REG)
|
||||
{
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res == 0 ? -EINVAL : st->res);
|
||||
return;
|
||||
}
|
||||
st->was_immediate = st->self->parent->cli->get_immediate_commit(st->self->parent->fs_base_inode + st->ino);
|
||||
st->new_size = st->ientry["size"].uint64_value();
|
||||
if (st->new_size < st->offset + st->size)
|
||||
{
|
||||
st->new_size = st->offset + st->size;
|
||||
}
|
||||
if (st->offset + st->size + sizeof(shared_file_header_t) < st->self->parent->shared_inode_threshold)
|
||||
{
|
||||
if (st->ientry["size"].uint64_value() == 0 ||
|
||||
st->ientry["empty"].bool_value() &&
|
||||
st->ientry["size"].uint64_value() + sizeof(shared_file_header_t) < st->self->parent->shared_inode_threshold ||
|
||||
st->ientry["shared_ino"].uint64_value() != 0 &&
|
||||
st->ientry["size"].uint64_value() < st->offset+st->size &&
|
||||
st->ientry["shared_alloc"].uint64_value() < align_shared_size(st->self, st->offset+st->size))
|
||||
{
|
||||
// Either empty, or shared and requires moving into a larger place (redirect-write)
|
||||
allocate_shared_inode(st, 2, st->new_size);
|
||||
return;
|
||||
resume_2:
|
||||
if (st->res < 0)
|
||||
{
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res);
|
||||
return;
|
||||
}
|
||||
resume_3:
|
||||
if (!nfs_do_shared_readmodify(st, 3, state, false))
|
||||
return;
|
||||
nfs_do_shared_write(st, 4); // FIXME assemble from parts, do not copy?
|
||||
return;
|
||||
resume_4:
|
||||
if (st->res < 0)
|
||||
{
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res);
|
||||
return;
|
||||
}
|
||||
st->self->parent->db->set(kv_inode_key(st->ino), new_moved_ientry(st), [st](int res)
|
||||
{
|
||||
st->res = res;
|
||||
nfs_kv_continue_write(st, 5);
|
||||
}, [st](int res, const std::string & old_value)
|
||||
{
|
||||
return res == 0 && old_value == st->ientry_text;
|
||||
});
|
||||
return;
|
||||
resume_5:
|
||||
if (st->res < 0)
|
||||
{
|
||||
st->res2 = st->res;
|
||||
memset(st->aligned_buf, 0, st->aligned_size);
|
||||
nfs_do_shared_write(st, 6);
|
||||
return;
|
||||
resume_6:
|
||||
free(st->aligned_buf);
|
||||
st->aligned_buf = NULL;
|
||||
if (st->res2 == -EAGAIN)
|
||||
{
|
||||
goto resume_0;
|
||||
}
|
||||
else
|
||||
{
|
||||
ewr_it->second.write_res = write_res;
|
||||
if (ewr_it->second.resize_res <= 0)
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res2);
|
||||
return;
|
||||
}
|
||||
}
|
||||
auto cb = std::move(st->cb);
|
||||
cb(0);
|
||||
return;
|
||||
}
|
||||
else if (st->ientry["shared_ino"].uint64_value() > 0)
|
||||
{
|
||||
complete_extend_write(self, rop, inode, write_res < 0 ? write_res : ewr_it->second.resize_res);
|
||||
self->extend_writes.erase(ewr_it);
|
||||
// Non-empty, shared, can be updated in-place
|
||||
nfs_do_align_write(st, st->ientry["shared_ino"].uint64_value(),
|
||||
st->ientry["shared_offset"].uint64_value() + sizeof(shared_file_header_t) + st->offset, 7);
|
||||
return;
|
||||
resume_7:
|
||||
if (st->res == 0 && st->stable && !st->was_immediate)
|
||||
{
|
||||
nfs_do_fsync(st, 8);
|
||||
return;
|
||||
}
|
||||
// We always have to change inode entry on shared writes
|
||||
st->self->parent->db->set(kv_inode_key(st->ino), new_shared_ientry(st), [st](int res)
|
||||
{
|
||||
st->res = res;
|
||||
nfs_kv_continue_write(st, 8);
|
||||
}, [st](int res, const std::string & old_value)
|
||||
{
|
||||
return res == 0 && old_value == st->ientry_text;
|
||||
});
|
||||
return;
|
||||
resume_8:
|
||||
if (st->res == -EAGAIN)
|
||||
{
|
||||
goto resume_0;
|
||||
}
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res);
|
||||
return;
|
||||
}
|
||||
// Fall through for non-shared
|
||||
}
|
||||
// Non-shared write
|
||||
if (st->ientry["shared_ino"].uint64_value() != 0)
|
||||
{
|
||||
// Unshare
|
||||
resume_9:
|
||||
if (!nfs_do_shared_readmodify(st, 9, state, true))
|
||||
return;
|
||||
nfs_do_unshare_write(st, 10);
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Just write
|
||||
nfs_do_align_write(st, st->ino, st->offset, 10);
|
||||
}
|
||||
resume_10:
|
||||
if (st->res == 0 && st->stable && !st->was_immediate)
|
||||
{
|
||||
nfs_do_fsync(st, 11);
|
||||
return;
|
||||
}
|
||||
resume_11:
|
||||
if (st->res < 0)
|
||||
{
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res);
|
||||
return;
|
||||
}
|
||||
if (st->ientry["empty"].bool_value() ||
|
||||
st->ientry["size"].uint64_value() < st->new_size ||
|
||||
st->ientry["shared_ino"].uint64_value() != 0)
|
||||
{
|
||||
st->ext = &st->self->parent->extends[st->ino];
|
||||
st->ext->refcnt++;
|
||||
resume_12:
|
||||
if (st->ext->next_extend < st->new_size)
|
||||
{
|
||||
// Aggregate inode extension requests
|
||||
st->ext->next_extend = st->new_size;
|
||||
}
|
||||
if (st->ext->cur_extend > 0)
|
||||
{
|
||||
// Wait for current extend which is already in progress
|
||||
st->ext->waiters.push_back([st](){ nfs_kv_continue_write(st, 12); });
|
||||
return;
|
||||
}
|
||||
if (st->ext->done_extend < st->new_size)
|
||||
{
|
||||
nfs_kv_extend_inode(st, 0);
|
||||
return;
|
||||
resume_13:
|
||||
nfs_kv_extend_inode(st, 1);
|
||||
}
|
||||
st->ext->refcnt--;
|
||||
assert(st->ext->refcnt >= 0);
|
||||
if (st->ext->refcnt == 0)
|
||||
{
|
||||
st->self->parent->extends.erase(st->ino);
|
||||
}
|
||||
}
|
||||
if (st->res == -EAGAIN)
|
||||
{
|
||||
// Restart
|
||||
goto resume_0;
|
||||
}
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res);
|
||||
}
|
||||
|
||||
int kv_nfs3_write_proc(void *opaque, rpc_op_t *rop)
|
||||
{
|
||||
nfs_kv_write_state *st = new nfs_kv_write_state;
|
||||
st->self = (nfs_client_t*)opaque;
|
||||
st->rop = rop;
|
||||
WRITE3args *args = (WRITE3args*)rop->request;
|
||||
WRITE3res *reply = (WRITE3res*)rop->reply;
|
||||
st->ino = kv_fh_inode(args->file);
|
||||
st->offset = args->offset;
|
||||
st->size = (args->count > args->data.size ? args->data.size : args->count);
|
||||
if (!st->ino || st->size > MAX_REQUEST_SIZE)
|
||||
{
|
||||
*reply = (WRITE3res){ .status = NFS3ERR_INVAL };
|
||||
rpc_queue_reply(rop);
|
||||
delete st;
|
||||
return 0;
|
||||
}
|
||||
st->buf = (uint8_t*)args->data.data;
|
||||
st->stable = (args->stable != UNSTABLE);
|
||||
st->cb = [st](int res)
|
||||
{
|
||||
WRITE3res *reply = (WRITE3res*)st->rop->reply;
|
||||
*reply = (WRITE3res){ .status = vitastor_nfs_map_err(res) };
|
||||
if (res == 0)
|
||||
{
|
||||
reply->resok.count = (unsigned)st->size;
|
||||
reply->resok.committed = st->stable || st->was_immediate ? FILE_SYNC : UNSTABLE;
|
||||
*(uint64_t*)reply->resok.verf = st->self->parent->server_id;
|
||||
}
|
||||
rpc_queue_reply(st->rop);
|
||||
delete st;
|
||||
};
|
||||
self->parent->cli->execute(op);
|
||||
}
|
||||
|
||||
static void nfs_resize_write(nfs_client_t *self, rpc_op_t *rop, uint64_t inode, uint64_t new_size, uint64_t offset, uint64_t count, void *buf)
|
||||
{
|
||||
// Check if we have to resize the inode during write
|
||||
kv_read_inode(self, inode, [=](int res, const std::string & value, json11::Json attrs)
|
||||
{
|
||||
if (res < 0)
|
||||
{
|
||||
*(WRITE3res*)rop->reply = (WRITE3res){ .status = vitastor_nfs_map_err(-res) };
|
||||
rpc_queue_reply(rop);
|
||||
}
|
||||
else if (kv_map_type(attrs["type"].string_value()) != NF3REG)
|
||||
{
|
||||
*(WRITE3res*)rop->reply = (WRITE3res){ .status = NFS3ERR_INVAL };
|
||||
rpc_queue_reply(rop);
|
||||
}
|
||||
else if (attrs["size"].uint64_value() < new_size)
|
||||
{
|
||||
auto ewr_it = self->extend_writes.emplace((extend_size_t){
|
||||
.inode = inode,
|
||||
.new_size = new_size,
|
||||
}, (extend_write_t){
|
||||
.rop = rop,
|
||||
.resize_res = 1,
|
||||
.write_res = 1,
|
||||
});
|
||||
auto ext = &self->extends[inode];
|
||||
if (ext->cur_extend > 0)
|
||||
{
|
||||
// Already resizing, just wait
|
||||
if (ext->next_extend < new_size)
|
||||
ext->next_extend = new_size;
|
||||
}
|
||||
else
|
||||
{
|
||||
ext->old_ientry = value;
|
||||
ext->attrs = attrs.object_items();
|
||||
ext->cur_extend = new_size;
|
||||
ext->attrs["size"] = new_size;
|
||||
extend_inode(self, inode);
|
||||
}
|
||||
nfs_do_write(self, ewr_it, rop, inode, offset, count, buf);
|
||||
}
|
||||
else
|
||||
{
|
||||
nfs_do_write(self, self->extend_writes.end(), rop, inode, offset, count, buf);
|
||||
}
|
||||
});
|
||||
nfs_kv_continue_write(st, 0);
|
||||
return 1;
|
||||
}
|
||||
|
|
|
@ -191,6 +191,7 @@ void nfs_proxy_t::run(json11::Json cfg)
|
|||
}
|
||||
fs_base_inode = ((uint64_t)default_pool_id << (64-POOL_ID_BITS));
|
||||
fs_inode_count = ((uint64_t)1 << (64-POOL_ID_BITS)) - 1;
|
||||
shared_inode_threshold = pool_block_size;
|
||||
}
|
||||
// Self-register portmap and NFS
|
||||
pmap.reg_ports.insert((portmap_id_t){
|
||||
|
@ -372,8 +373,11 @@ void nfs_proxy_t::check_default_pool()
|
|||
{
|
||||
if (cli->st_cli.pool_config.size() == 1)
|
||||
{
|
||||
default_pool = cli->st_cli.pool_config.begin()->second.name;
|
||||
default_pool_id = cli->st_cli.pool_config.begin()->first;
|
||||
auto pool_it = cli->st_cli.pool_config.begin();
|
||||
default_pool_id = pool_it->first;
|
||||
default_pool = pool_it->second.name;
|
||||
pool_block_size = pool_it->second.pg_stripe_size;
|
||||
pool_alignment = pool_it->second.bitmap_granularity;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -388,6 +392,8 @@ void nfs_proxy_t::check_default_pool()
|
|||
if (p.second.name == default_pool)
|
||||
{
|
||||
default_pool_id = p.first;
|
||||
pool_block_size = p.second.pg_stripe_size;
|
||||
pool_alignment = p.second.bitmap_granularity;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,22 @@ struct list_cookie_val_t
|
|||
std::string key;
|
||||
};
|
||||
|
||||
struct nfs_kv_write_state;
|
||||
|
||||
struct shared_alloc_queue_t
|
||||
{
|
||||
nfs_kv_write_state *st;
|
||||
int state;
|
||||
uint64_t size;
|
||||
};
|
||||
|
||||
struct inode_extend_t
|
||||
{
|
||||
int refcnt = 0;
|
||||
uint64_t cur_extend = 0, next_extend = 0, done_extend = 0;
|
||||
std::vector<std::function<void()>> waiters;
|
||||
};
|
||||
|
||||
class nfs_proxy_t
|
||||
{
|
||||
public:
|
||||
|
@ -47,6 +63,9 @@ public:
|
|||
int trace = 0;
|
||||
|
||||
pool_id_t default_pool_id;
|
||||
uint64_t pool_block_size = 0;
|
||||
uint64_t pool_alignment = 0;
|
||||
uint64_t shared_inode_threshold = 0;
|
||||
|
||||
portmap_service_t pmap;
|
||||
ring_loop_t *ringloop = NULL;
|
||||
|
@ -57,6 +76,9 @@ public:
|
|||
std::map<list_cookie_t, list_cookie_val_t> list_cookies;
|
||||
uint64_t fs_next_id = 0, fs_allocated_id = 0;
|
||||
std::vector<uint64_t> unallocated_ids;
|
||||
std::vector<shared_alloc_queue_t> allocating_shared;
|
||||
uint64_t cur_shared_inode = 0, cur_shared_offset = 0;
|
||||
std::map<inode_t, inode_extend_t> extends;
|
||||
|
||||
std::vector<XDR*> xdr_pool;
|
||||
|
||||
|
@ -76,6 +98,7 @@ public:
|
|||
void daemonize();
|
||||
};
|
||||
|
||||
// FIXME: Move to "proto"
|
||||
struct rpc_cur_buffer_t
|
||||
{
|
||||
uint8_t *buf;
|
||||
|
@ -97,30 +120,6 @@ struct rpc_free_buffer_t
|
|||
unsigned size;
|
||||
};
|
||||
|
||||
struct extend_size_t
|
||||
{
|
||||
inode_t inode;
|
||||
uint64_t new_size;
|
||||
};
|
||||
|
||||
inline bool operator < (const extend_size_t &a, const extend_size_t &b)
|
||||
{
|
||||
return a.inode < b.inode || a.inode == b.inode && a.new_size < b.new_size;
|
||||
}
|
||||
|
||||
struct extend_write_t
|
||||
{
|
||||
rpc_op_t *rop;
|
||||
int resize_res, write_res; // 1 = started, 0 = completed OK, -errno = completed with error
|
||||
};
|
||||
|
||||
struct extend_inode_t
|
||||
{
|
||||
uint64_t cur_extend = 0, next_extend = 0;
|
||||
std::string old_ientry;
|
||||
json11::Json::object attrs;
|
||||
};
|
||||
|
||||
class nfs_client_t
|
||||
{
|
||||
public:
|
||||
|
@ -135,8 +134,6 @@ public:
|
|||
rpc_cur_buffer_t cur_buffer = { 0 };
|
||||
std::map<uint8_t*, rpc_used_buffer_t> used_buffers;
|
||||
std::vector<rpc_free_buffer_t> free_buffers;
|
||||
std::map<inode_t, extend_inode_t> extends;
|
||||
std::multimap<extend_size_t, extend_write_t> extend_writes;
|
||||
|
||||
iovec read_iov;
|
||||
msghdr read_msg = { 0 };
|
||||
|
@ -166,6 +163,14 @@ public:
|
|||
#define KV_ROOT_INODE 1
|
||||
#define KV_NEXT_ID_KEY "id"
|
||||
#define KV_ROOT_HANDLE "R"
|
||||
#define SHARED_FILE_MAGIC_V1 0x711A5158A6EDF17E
|
||||
|
||||
struct shared_file_header_t
|
||||
{
|
||||
uint64_t magic = 0;
|
||||
uint64_t inode = 0;
|
||||
uint64_t size = 0;
|
||||
};
|
||||
|
||||
nfsstat3 vitastor_nfs_map_err(int err);
|
||||
nfstime3 nfstime_from_str(const std::string & s);
|
||||
|
@ -182,6 +187,7 @@ void allocate_new_id(nfs_client_t *self, std::function<void(int res, uint64_t ne
|
|||
void kv_read_inode(nfs_client_t *self, uint64_t ino,
|
||||
std::function<void(int res, const std::string & value, json11::Json ientry)> cb,
|
||||
bool allow_cache = false);
|
||||
uint64_t align_shared_size(nfs_client_t *self, uint64_t size);
|
||||
|
||||
int kv_nfs3_getattr_proc(void *opaque, rpc_op_t *rop);
|
||||
int kv_nfs3_setattr_proc(void *opaque, rpc_op_t *rop);
|
||||
|
|
Loading…
Reference in New Issue