WIP Packing small files into shared inodes
Test / buildenv (push) Successful in 12s Details
Test / build (push) Successful in 2m58s Details
Test / test_cas (push) Successful in 11s Details
Test / make_test (push) Successful in 36s Details
Test / test_change_pg_size (push) Successful in 10s Details
Test / test_change_pg_count (push) Successful in 44s Details
Test / test_create_nomaxid (push) Successful in 8s Details
Test / test_change_pg_count_ec (push) Successful in 40s Details
Test / test_etcd_fail (push) Successful in 51s Details
Test / test_interrupted_rebalance_imm (push) Successful in 1m35s Details
Test / test_add_osd (push) Successful in 2m34s Details
Test / test_failure_domain (push) Successful in 38s Details
Test / test_snapshot (push) Successful in 25s Details
Test / test_interrupted_rebalance_ec_imm (push) Successful in 1m22s Details
Test / test_snapshot_ec (push) Successful in 22s Details
Test / test_minsize_1 (push) Successful in 14s Details
Test / test_move_reappear (push) Successful in 19s Details
Test / test_rm (push) Successful in 13s Details
Test / test_interrupted_rebalance_ec (push) Successful in 4m6s Details
Test / test_snapshot_down (push) Successful in 26s Details
Test / test_interrupted_rebalance (push) Successful in 5m26s Details
Test / test_snapshot_chain (push) Successful in 2m8s Details
Test / test_snapshot_down_ec (push) Successful in 26s Details
Test / test_splitbrain (push) Successful in 28s Details
Test / test_snapshot_chain_ec (push) Failing after 3m7s Details
Test / test_rebalance_verify_imm (push) Successful in 2m19s Details
Test / test_rebalance_verify_ec (push) Successful in 2m39s Details
Test / test_switch_primary (push) Successful in 37s Details
Test / test_rebalance_verify_ec_imm (push) Successful in 2m9s Details
Test / test_write_no_same (push) Successful in 18s Details
Test / test_write (push) Successful in 39s Details
Test / test_rebalance_verify (push) Successful in 6m8s Details
Test / test_write_xor (push) Failing after 3m14s Details
Test / test_heal_pg_size_2 (push) Successful in 3m48s Details
Test / test_heal_ec (push) Successful in 4m40s Details
Test / test_heal_csum_32k_dmj (push) Successful in 5m35s Details
Test / test_heal_csum_32k_dj (push) Successful in 5m28s Details
Test / test_heal_csum_32k (push) Successful in 6m26s Details
Test / test_scrub (push) Successful in 54s Details
Test / test_heal_csum_4k_dmj (push) Successful in 6m23s Details
Test / test_scrub_zero_osd_2 (push) Successful in 37s Details
Test / test_scrub_xor (push) Successful in 42s Details
Test / test_scrub_pg_size_6_pg_minsize_4_osd_count_6_ec (push) Successful in 1m26s Details
Test / test_heal_csum_4k (push) Successful in 5m25s Details
Test / test_scrub_pg_size_3 (push) Failing after 2m5s Details
Test / test_scrub_ec (push) Successful in 1m6s Details
Test / test_heal_csum_4k_dj (push) Successful in 6m37s Details

Vitaliy Filippov 2024-01-25 02:23:46 +03:00
parent 7802d85d12
commit 22fd44f4e6
3 changed files with 545 additions and 188 deletions

View File

@ -171,7 +171,8 @@ static bool kv_fh_valid(const std::string & fh)
// Attributes are always stored in the inode
static void kv_read_inode(nfs_client_t *self, uint64_t ino,
std::function<void(int res, const std::string & value, json11::Json ientry)> cb)
std::function<void(int res, const std::string & value, json11::Json ientry)> cb,
bool allow_cache = false)
{
auto key = kv_inode_key(ino);
self->parent->db->get(key, [=](int res, const std::string & value)
@ -197,7 +198,7 @@ static void kv_read_inode(nfs_client_t *self, uint64_t ino,
res = -EIO;
}
cb(res, value, attrs);
});
}, allow_cache);
}
static int nfs3_getattr_proc(void *opaque, rpc_op_t *rop)
@ -573,104 +574,537 @@ static int nfs3_read_proc(void *opaque, rpc_op_t *rop)
return 1;
}
static void nfs_resize_write(nfs_client_t *self, rpc_op_t *rop, uint64_t inode, uint64_t new_size, uint64_t offset, uint64_t count, void *buf);
struct nfs_kv_write_state
{
nfs_client_t *self = NULL;
rpc_op_t *rop = NULL;
uint64_t ino = 0;
uint64_t offset = 0, size = 0;
bool stable = false;
uint8_t *buf = NULL;
std::function<void(int res)> cb;
// state
bool allow_cache = true;
int res = 0, res2 = 0;
std::string ientry_text;
json11::Json ientry, new_ientry;
uint64_t new_size = 0;
uint64_t aligned_size = 0;
void *aligned_buf = NULL;
uint64_t shared_inode = 0, shared_offset = 0;
bool was_immediate = false;
~nfs_kv_write_state()
{
if (aligned_buf)
{
free(aligned_buf);
aligned_buf = NULL;
}
}
};
struct shared_file_header_t
{
uint64_t magic = 0;
uint64_t inode = 0;
uint64_t size = 0;
};
#define SHARED_FILE_MAGIC_V1 0x711A5158A6EDF17E
static void nfs_kv_continue_write(nfs_kv_write_state *st, int state);
static void finish_allocate_shared(nfs_client_t *self, int res)
{
std::vector<shared_alloc_queue_t> waiting;
waiting.swap(self->parent->allocating_shared);
for (auto & w: waiting)
{
w.st->res = res;
if (res == 0)
{
w.st->shared_inode = self->parent->cur_shared_inode;
w.st->shared_offset = self->parent->cur_shared_offset;
self->parent->cur_shared_offset += (w.size + self->parent->pool_alignment-1) & ~(self->parent->pool_alignment-1);
}
nfs_kv_continue_write(w.st, w.state);
}
}
static void allocate_new_id(nfs_client_t *self, std::function<void(int res, uint64_t new_id)> cb);
static void allocate_shared_inode(nfs_kv_write_state *st, int state, uint64_t size)
{
if (st->self->parent->cur_shared_inode == 0)
{
st->self->parent->allocating_shared.push_back({ st, state, size });
if (st->self->parent->allocating_shared.size() > 1)
{
return;
}
allocate_new_id(st->self, [st](int res, uint64_t new_id)
{
if (res < 0)
{
finish_allocate_shared(st->self, res);
return;
}
st->self->parent->cur_shared_inode = new_id;
st->self->parent->cur_shared_offset = 0;
st->self->parent->db->set(
kv_inode_key(new_id), json11::Json(json11::Json::object{ { "type", "shared" } }).dump(),
[st](int res)
{
if (res < 0)
{
st->self->parent->cur_shared_inode = 0;
}
finish_allocate_shared(st->self, res);
},
[](int res, const std::string & old_value)
{
return res == -ENOENT;
}
);
});
}
}
static uint64_t align_shared_size(nfs_client_t *self, uint64_t size)
{
return (size + sizeof(shared_file_header_t) + self->parent->pool_alignment-1)
& ~(self->parent->pool_alignment-1);
}
static void nfs_do_unshare_write(nfs_kv_write_state *st, int state)
{
auto op = new cluster_op_t;
op->opcode = OSD_OP_WRITE;
op->inode = st->self->parent->fs_base_inode + st->ino;
op->offset = 0;
op->len = st->aligned_size - sizeof(shared_file_header_t);
op->iov.push_back(st->aligned_buf + sizeof(shared_file_header_t), op->len);
op->callback = [st, state](cluster_op_t *op)
{
st->res = op->retval == op->len ? 0 : op->retval;
delete op;
nfs_kv_continue_write(st, state);
};
st->self->parent->cli->execute(op);
}
static void nfs_do_shared_write(nfs_kv_write_state *st, int state)
{
auto op = new cluster_op_t;
op->opcode = OSD_OP_WRITE;
op->inode = st->self->parent->fs_base_inode + st->shared_inode;
op->offset = st->shared_offset;
op->len = st->aligned_size;
op->iov.push_back(st->aligned_buf, st->aligned_size); // FIXME assemble from parts, do not copy?
op->callback = [st, state](cluster_op_t *op)
{
st->res = op->retval == op->len ? 0 : op->retval;
delete op;
nfs_kv_continue_write(st, state);
};
st->self->parent->cli->execute(op);
}
static void nfs_do_shared_read(nfs_kv_write_state *st, int state)
{
auto op = new cluster_op_t;
op->opcode = OSD_OP_READ;
op->inode = st->self->parent->fs_base_inode + st->ientry["shared_ino"].uint64_value();
op->offset = st->ientry["shared_offset"].uint64_value();
op->len = align_shared_size(st->self, st->ientry["size"].uint64_value());
op->iov.push_back(st->aligned_buf, op->len);
op->callback = [st, state](cluster_op_t *op)
{
st->res = op->retval == op->len ? 0 : op->retval;
delete op;
nfs_kv_continue_write(st, state);
};
st->self->parent->cli->execute(op);
}
static void nfs_do_fsync(nfs_kv_write_state *st, int state)
{
// Client requested a stable write. Add an fsync
auto op = new cluster_op_t;
op->opcode = OSD_OP_SYNC;
op->callback = [st, state](cluster_op_t *op)
{
delete op;
nfs_kv_continue_write(st, state);
};
st->self->parent->cli->execute(op);
}
static bool nfs_do_shared_readmodify(nfs_kv_write_state *st, int base_state, int state, bool unshare)
{
assert(state <= base_state);
if (state < base_state) {}
else if (state == base_state) goto resume_0;
assert(!st->aligned_buf);
st->aligned_size = unshare
? sizeof(shared_file_header_t) + (st->new_size + st->self->parent->pool_alignment-1) & ~(st->self->parent->pool_alignment-1)
: align_shared_size(st->self, st->new_size);
st->aligned_buf = malloc_or_die(st->aligned_size);
memset(st->aligned_buf + sizeof(shared_file_header_t), 0, st->offset);
if (st->ientry["shared_ino"].uint64_value() != 0)
{
// Read old data if shared non-empty
nfs_do_shared_read(st, base_state);
return false;
resume_0:
if (st->res < 0)
{
auto cb = std::move(st->cb);
cb(st->res);
return true;
}
}
*((shared_file_header_t*)st->aligned_buf) = {
.magic = SHARED_FILE_MAGIC_V1,
.inode = st->ino,
.size = st->new_size,
};
memcpy(st->aligned_buf + sizeof(shared_file_header_t) + st->offset, st->buf, st->size);
memset(st->aligned_buf + sizeof(shared_file_header_t) + st->offset + st->size, 0,
st->aligned_size - sizeof(shared_file_header_t) - st->offset - st->size);
return true;
}
static void nfs_do_align_write(nfs_kv_write_state *st, uint64_t ino, uint64_t offset, int state)
{
}
// Packing small files into "shared inodes". Insane algorithm...
// Write:
// - If (offset+size <= threshold):
// - Read inode from cache
// - If inode does not exist - stop with -ENOENT
// - If inode is not a regular file - stop with -EINVAL
// - If it's empty (size == 0 || empty == true):
// - If preset size is larger than threshold:
// - Write data into non-shared inode
// - In parallel: clear empty flag
// - If CAS failure: re-read inode and restart
// - Otherwise:
// - Allocate/take a shared inode
// - Allocate space in its end
// - Write data into shared inode
// - If CAS failure: allocate another shared inode and retry
// - Write shared inode reference, set size
// - If CAS failure: free allocated shared space, re-read inode and restart
// - If it's not empty:
// - If non-shared:
// - Write data into non-shared inode
// - In parallel: check if data fits into inode size and extend if it doesn't
// - If CAS failure: re-read inode and retry to extend the size
// - If shared:
// - Read whole file from shared inode
// - If the file header in data doesn't match: re-read inode and restart
// - If data doesn't fit into the same shared inode:
// - Allocate space in a new shared inode
// - Write data into the new shared inode
// - If CAS failure: allocate another shared inode and retry
// - Update inode metadata (set new size and new shared inode)
// - If CAS failure: free allocated shared space, re-read inode and restart
// - If it fits:
// - Write updated data into the shared inode
// - Update shared inode header
// - If CAS failure: re-read inode and restart
// - Otherwise:
// - Write data into non-shared inode
// - Read inode in parallel
// - If not a regular file:
// - Remove data
// - Stop with -EINVAL
// - If shared:
// - Read whole file from shared inode
// - Write data into non-shared inode
// - If CAS failure (block should not exist): restart
// - Update inode metadata (make non-shared, update size)
// - If CAS failure: restart
// - Zero out the shared inode header
// - If CAS failure: restart
// - Check if size fits
// - Extend if it doesn't
// Read:
// - If (offset+size <= threshold):
// - Read inode from cache
// - If empty: return zeroes
// - If shared:
// - Read the whole file from shared inode, or at least data and shared inode header
// - If the file header in data doesn't match: re-read inode and restart
// - If non-shared:
// - Read data from non-shared inode
// - Otherwise:
// - Read data from non-shared inode
//
// Plus parallelism... Write algorithm variants:
// 1) non-shared, inplace, no extend
// 2) non-shared, inplace, extend
// 3) shared, inplace
// 4) shared, redirect
// 5) unshare
// Compatible: 1&(1|2) or 3&3. So:
// 1) Read inode
// 2) Make a decision about how to write and enqueue it to the inode's queue
// 3) Wait for all previous incompatible writes to complete
// 4) Re-read inode if there were incompatible writes
// 5) Make a decision again and proceed
static void nfs_kv_continue_write(nfs_kv_write_state *st, int state)
{
if (state == 0) {}
else if (state == 1) goto resume_1;
else if (state == 2) goto resume_2;
else if (state == 3) goto resume_3;
else if (state == 4) goto resume_4;
else if (state == 5) goto resume_5;
else if (state == 6) goto resume_6;
else if (state == 7) goto resume_7;
else if (state == 8) goto resume_8;
else if (state == 9) goto resume_9;
else if (state == 10) goto resume_10;
else if (state == 11) goto resume_11;
else if (state == 12) goto resume_12;
else
{
fprintf(stderr, "BUG: invalid state in nfs_kv_continue_write()");
abort();
}
resume_0:
if (!st->size)
{
auto cb = std::move(st->cb);
cb(0);
return;
}
st->was_immediate = st->self->parent->cli->get_immediate_commit(st->self->parent->fs_base_inode + st->ino);
kv_read_inode(st->self, st->ino, [st](int res, const std::string & value, json11::Json attrs)
{
st->res = res;
st->ientry_text = value;
st->ientry = attrs;
nfs_kv_continue_write(st, 1);
}, st->allow_cache);
return;
resume_1:
if (st->res < 0 ||
st->ientry["type"].uint64_value() != 0 &&
st->ientry["type"].uint64_value() != NF3REG)
{
auto cb = std::move(st->cb);
cb(st->res == 0 ? -EINVAL : st->res);
return;
}
st->new_size = st->ientry["size"].uint64_value();
if (st->new_size < st->offset + st->size)
{
st->new_size = st->offset + st->size;
}
if (st->offset + st->size + sizeof(shared_file_header_t) < st->self->parent->shared_inode_threshold)
{
if (st->ientry["size"].uint64_value() == 0 ||
st->ientry["empty"].bool_value() &&
st->ientry["size"].uint64_value() + sizeof(shared_file_header_t) < st->self->parent->shared_inode_threshold ||
st->ientry["shared_ino"].uint64_value() != 0 &&
st->ientry["size"].uint64_value() < st->offset+st->size &&
st->ientry["shared_alloc"].uint64_value() < align_shared_size(st->self, st->offset+st->size))
{
// Either empty or shared and requires moving into a larger place (redirect-write)
allocate_shared_inode(st, 2, st->new_size);
return;
resume_2:
if (st->res < 0)
{
auto cb = std::move(st->cb);
cb(st->res);
return;
}
resume_3:
if (!nfs_do_shared_readmodify(st, 3, state, false))
return;
nfs_do_shared_write(st, 4);
return;
resume_4:
if (st->res < 0)
{
auto cb = std::move(st->cb);
cb(st->res);
return;
}
{
auto ni = st->ientry.object_items();
ni.erase("empty");
ni["size"] = st->new_size;
ni["shared_ino"] = st->shared_inode;
ni["shared_offset"] = st->shared_offset;
ni["shared_alloc"] = st->aligned_size;
st->new_ientry = ni;
}
st->self->parent->db->set(kv_inode_key(st->ino), st->new_ientry.dump(), [st](int res)
{
st->res = res;
nfs_kv_continue_write(st, 5);
}, [st](int res, const std::string & old_value)
{
return res == 0 && old_value == st->ientry_text;
});
return;
resume_5:
if (st->res < 0)
{
st->res2 = st->res;
memset(st->aligned_buf, 0, st->aligned_size);
nfs_do_shared_write(st, 6);
return;
resume_6:
free(st->aligned_buf);
st->aligned_buf = NULL;
if (st->res2 == -EAGAIN)
{
goto resume_0;
}
else
{
auto cb = std::move(st->cb);
cb(st->res2);
return;
}
}
auto cb = std::move(st->cb);
cb(0);
return;
}
else if (st->ientry["shared_ino"].uint64_value() > 0)
{
// Non-empty, shared, can be updated in-place
nfs_do_align_write(st, st->ientry["shared_ino"].uint64_value(),
st->ientry["shared_offset"].uint64_value() + sizeof(shared_file_header_t) + st->offset, 7);
return;
resume_7:
if (st->res == 0 && st->stable && !st->was_immediate)
{
nfs_do_fsync(st, 8);
return;
}
resume_8:
auto cb = std::move(st->cb);
cb(st->res);
return;
}
// Fall through for non-shared
}
// Non-shared write
if (st->ientry["shared_ino"].uint64_value() != 0)
{
// Unshare
resume_9:
if (!nfs_do_shared_readmodify(st, 9, state, true))
return;
nfs_do_unshare_write(st, 10);
return;
}
else
{
// Just write
nfs_do_align_write(st, st->ino, st->offset, 10);
}
resume_10:
if (st->res == 0 && st->stable && !st->was_immediate)
{
nfs_do_fsync(st, 11);
return;
}
resume_11:
if (st->res < 0)
{
auto cb = std::move(st->cb);
cb(st->res);
return;
}
if (st->ientry["empty"].bool_value() ||
st->ientry["size"].uint64_value() < st->new_size ||
st->ientry["shared_ino"].uint64_value() != 0)
{
{
auto ni = st->ientry.object_items();
ni.erase("empty");
ni.erase("shared_ino");
ni.erase("shared_offset");
ni.erase("shared_alloc");
ni["size"] = st->new_size;
st->new_ientry = ni;
}
// FIXME: If already extending - wait for extend
st->self->parent->db->set(kv_inode_key(st->ino), st->new_ientry.dump(), [st](int res)
{
st->res = res;
nfs_kv_continue_write(st, 12);
}, [st](int res, const std::string & old_value)
{
return res == 0 && old_value == st->ientry_text;
});
return;
resume_12:
if (st->res == -EAGAIN)
{
goto resume_0;
}
else if (st->res < 0)
{
auto cb = std::move(st->cb);
cb(st->res);
return;
}
}
auto cb = std::move(st->cb);
cb(0);
}
static int nfs3_write_proc(void *opaque, rpc_op_t *rop)
{
// FIXME: Pack small files into "shared inodes"
// Insane algorithm...
// Threshold should be equal to one allocation block of the pool
// Write:
// - If (offset+size <= threshold):
// - Read inode from cache
// - If inode does not exist - stop with -ENOENT
// - If it's empty (size == 0 || empty == true):
// - If preset size is larger than threshold:
// - Write data into non-shared inode
// - In parallel: clear empty flag
// - If CAS failure: re-read inode and restart
// - Otherwise:
// - Allocate/take a shared inode
// - Allocate space in its end
// - Write data into shared inode
// - If CAS failure: allocate another shared inode and retry
// - Write shared inode reference, set size
// - If CAS failure: free allocated shared space, re-read inode and restart
// - If it's not empty:
// - If non-shared:
// - Write data into non-shared inode
// - In parallel: check if data fits into inode size and extend if it doesn't
// - If CAS failure: re-read inode and retry to extend the size
// - If shared:
// - Read whole file from shared inode
// - If the file header in data doesn't match: re-read inode and restart
// - If data doesn't fit into the same shared inode:
// - Allocate space in a new shared inode
// - Write data into the new shared inode
// - If CAS failure: allocate another shared inode and retry
// - Update inode metadata (set new size and new shared inode)
// - If CAS failure: free allocated shared space, re-read inode and restart
// - If it fits:
// - Write updated data into the shared inode
// - Update shared inode header
// - If CAS failure: re-read inode and restart
// - Otherwise:
// - Write data into non-shared inode
// - Read inode in parallel
// - If shared:
// - Read whole file from shared inode
// - Write data into non-shared inode
// - If CAS failure (block should not exist): restart
// - Update inode metadata (make non-shared, update size)
// - If CAS failure: restart
// - Zero out the shared inode header
// - If CAS failure: restart
// - Check if size fits
// - Extend if it doesn't
// Read:
// - If (offset+size <= threshold):
// - Read inode from cache
// - If empty: return zeroes
// - If shared:
// - Read the whole file from shared inode, or at least data and shared inode header
// - If the file header in data doesn't match: re-read inode and restart
// - If non-shared:
// - Read data from non-shared inode
// - Otherwise:
// - Read data from non-shared inode
nfs_client_t *self = (nfs_client_t*)opaque;
nfs_kv_write_state *st = new nfs_kv_write_state;
st->self = (nfs_client_t*)opaque;
st->rop = rop;
WRITE3args *args = (WRITE3args*)rop->request;
WRITE3res *reply = (WRITE3res*)rop->reply;
inode_t ino = kv_fh_inode(args->file);
if (!ino)
st->ino = kv_fh_inode(args->file);
st->offset = args->offset;
st->size = (args->count > args->data.size ? args->data.size : args->count);
if (!st->ino || st->size > MAX_REQUEST_SIZE)
{
*reply = (WRITE3res){ .status = NFS3ERR_INVAL };
rpc_queue_reply(rop);
delete st;
return 0;
}
if (args->count > MAX_REQUEST_SIZE)
st->buf = (uint8_t*)args->data.data;
st->stable = (args->stable != UNSTABLE);
st->cb = [st](int res)
{
*reply = (WRITE3res){ .status = NFS3ERR_INVAL };
rpc_queue_reply(rop);
return 0;
}
uint64_t count = args->count > args->data.size ? args->data.size : args->count;
uint64_t alignment = self->parent->cli->st_cli.global_bitmap_granularity;
auto pool_cfg = self->parent->cli->st_cli.pool_config.find(INODE_POOL(self->parent->fs_base_inode));
if (pool_cfg != self->parent->cli->st_cli.pool_config.end())
{
alignment = pool_cfg->second.bitmap_granularity;
}
// Pre-fill reply
*reply = (WRITE3res){
.status = NFS3_OK,
.resok = (WRITE3resok){
//.file_wcc = ...,
.count = (unsigned)count,
},
WRITE3res *reply = (WRITE3res*)st->rop->reply;
*reply = (WRITE3res){ .status = vitastor_nfs_map_err(res) };
if (res == 0)
{
reply->resok.count = (unsigned)st->size;
reply->resok.committed = st->stable || st->was_immediate ? FILE_SYNC : UNSTABLE;
*(uint64_t*)reply->resok.verf = st->self->parent->server_id;
}
rpc_queue_reply(st->rop);
delete st;
};
if ((args->offset % alignment) != 0 || (count % alignment) != 0)
{
nfs_kv_continue_write(st, 0);
return 1;
}
/* {
// Unaligned write, requires read-modify-write
uint64_t aligned_offset = args->offset - (args->offset % alignment);
uint64_t aligned_count = args->offset + args->count;
@ -703,49 +1137,6 @@ static int nfs3_write_proc(void *opaque, rpc_op_t *rop)
};
self->parent->cli->execute(op);
}
else
{
nfs_resize_write(self, rop, ino, args->offset+count, args->offset, count, args->data.data);
}
return 1;
}
static void complete_extend_write(nfs_client_t *self, rpc_op_t *rop, inode_t inode, int res)
{
WRITE3args *args = (WRITE3args*)rop->request;
WRITE3res *reply = (WRITE3res*)rop->reply;
if (res < 0)
{
*reply = (WRITE3res){ .status = vitastor_nfs_map_err(res) };
rpc_queue_reply(rop);
return;
}
bool imm = self->parent->cli->get_immediate_commit(inode);
reply->resok.committed = args->stable != UNSTABLE || imm ? FILE_SYNC : UNSTABLE;
*(uint64_t*)reply->resok.verf = self->parent->server_id;
if (args->stable != UNSTABLE && !imm)
{
// Client requested a stable write. Add an fsync
auto op = new cluster_op_t;
op->opcode = OSD_OP_SYNC;
op->callback = [rop](cluster_op_t *op)
{
if (op->retval != 0)
{
WRITE3res *reply = (WRITE3res*)rop->reply;
*reply = (WRITE3res){ .status = vitastor_nfs_map_err(-op->retval) };
}
delete op;
rpc_queue_reply(rop);
};
self->parent->cli->execute(op);
}
else
{
rpc_queue_reply(rop);
}
}
static void complete_extend_inode(nfs_client_t *self, uint64_t inode, uint64_t new_size, int err)
{
auto ext_it = self->extend_writes.lower_bound((extend_size_t){ .inode = inode, .new_size = 0 });
@ -807,53 +1198,6 @@ static void extend_inode(nfs_client_t *self, uint64_t inode)
});
}
static void nfs_do_write(nfs_client_t *self, std::multimap<extend_size_t, extend_write_t>::iterator ewr_it,
rpc_op_t *rop, uint64_t inode, uint64_t offset, uint64_t count, void *buf)
{
cluster_op_t *op = new cluster_op_t;
op->opcode = OSD_OP_WRITE;
op->inode = self->parent->fs_base_inode + inode;
op->offset = offset;
op->len = count;
op->iov.push_back(buf, count);
op->callback = [self, ewr_it, rop](cluster_op_t *op)
{
auto inode = op->inode;
int write_res = op->retval < 0 ? op->retval : (op->retval != op->len ? -ERANGE : 0);
if (ewr_it == self->extend_writes.end())
{
complete_extend_write(self, rop, inode, write_res);
}
else
{
ewr_it->second.write_res = write_res;
if (ewr_it->second.resize_res <= 0)
{
complete_extend_write(self, rop, inode, write_res < 0 ? write_res : ewr_it->second.resize_res);
self->extend_writes.erase(ewr_it);
}
}
};
self->parent->cli->execute(op);
}
static void nfs_resize_write(nfs_client_t *self, rpc_op_t *rop, uint64_t inode, uint64_t new_size, uint64_t offset, uint64_t count, void *buf)
{
// Check if we have to resize the inode during write
kv_read_inode(self, inode, [=](int res, const std::string & value, json11::Json attrs)
{
if (res < 0)
{
*(WRITE3res*)rop->reply = (WRITE3res){ .status = vitastor_nfs_map_err(-res) };
rpc_queue_reply(rop);
}
else if (kv_map_type(attrs["type"].string_value()) != NF3REG)
{
*(WRITE3res*)rop->reply = (WRITE3res){ .status = NFS3ERR_INVAL };
rpc_queue_reply(rop);
}
else if (attrs["size"].uint64_value() < new_size)
{
auto ewr_it = self->extend_writes.emplace((extend_size_t){
.inode = inode,
.new_size = new_size,
@ -877,14 +1221,7 @@ static void nfs_resize_write(nfs_client_t *self, rpc_op_t *rop, uint64_t inode,
ext->attrs["size"] = new_size;
extend_inode(self, inode);
}
nfs_do_write(self, ewr_it, rop, inode, offset, count, buf);
}
else
{
nfs_do_write(self, self->extend_writes.end(), rop, inode, offset, count, buf);
}
});
}
*/
static void allocate_new_id(nfs_client_t *self, std::function<void(int res, uint64_t new_id)> cb)
{

View File

@ -251,6 +251,7 @@ void nfs_proxy_t::run(json11::Json cfg)
}
fs_base_inode = ((uint64_t)default_pool_id << (64-POOL_ID_BITS));
fs_inode_count = ((uint64_t)1 << (64-POOL_ID_BITS)) - 1;
shared_inode_threshold = pool_block_size;
}
// Self-register portmap and NFS
pmap.reg_ports.insert((portmap_id_t){
@ -432,8 +433,11 @@ void nfs_proxy_t::check_default_pool()
{
if (cli->st_cli.pool_config.size() == 1)
{
default_pool = cli->st_cli.pool_config.begin()->second.name;
default_pool_id = cli->st_cli.pool_config.begin()->first;
auto pool_it = cli->st_cli.pool_config.begin();
default_pool_id = pool_it->first;
default_pool = pool_it->second.name;
pool_block_size = pool_it->second.pg_stripe_size;
pool_alignment = pool_it->second.bitmap_granularity;
}
else
{
@ -448,6 +452,8 @@ void nfs_proxy_t::check_default_pool()
if (p.second.name == default_pool)
{
default_pool_id = p.first;
pool_block_size = p.second.pg_stripe_size;
pool_alignment = p.second.bitmap_granularity;
break;
}
}

View File

@ -33,6 +33,15 @@ struct list_cookie_val_t
std::string key;
};
struct nfs_kv_write_state;
struct shared_alloc_queue_t
{
nfs_kv_write_state *st;
int state;
uint64_t size;
};
class nfs_proxy_t
{
public:
@ -51,6 +60,9 @@ public:
int trace = 0;
pool_id_t default_pool_id;
uint64_t pool_block_size = 0;
uint64_t pool_alignment = 0;
uint64_t shared_inode_threshold = 0;
portmap_service_t pmap;
ring_loop_t *ringloop = NULL;
@ -62,6 +74,8 @@ public:
std::map<list_cookie_t, list_cookie_val_t> list_cookies;
uint64_t fs_next_id = 0, fs_allocated_id = 0;
std::vector<uint64_t> unallocated_ids;
std::vector<shared_alloc_queue_t> allocating_shared;
uint64_t cur_shared_inode = 0, cur_shared_offset = 0;
std::vector<XDR*> xdr_pool;