Compare commits

...

17 Commits

Author SHA1 Message Date
Vitaliy Filippov f8397e1a03 Allow to specify additional NFS mount options
Test / test_interrupted_rebalance_ec (push) Successful in 1m58s Details
Test / test_rm (push) Successful in 13s Details
Test / test_snapshot_down (push) Successful in 27s Details
Test / test_snapshot_down_ec (push) Successful in 30s Details
Test / test_splitbrain (push) Successful in 23s Details
Test / test_snapshot_chain (push) Successful in 2m22s Details
Test / test_snapshot_chain_ec (push) Successful in 2m53s Details
Test / test_rebalance_verify_imm (push) Successful in 4m7s Details
Test / test_rebalance_verify (push) Successful in 4m37s Details
Test / test_switch_primary (push) Successful in 36s Details
Test / test_write (push) Successful in 41s Details
Test / test_write_no_same (push) Successful in 17s Details
Test / test_write_xor (push) Successful in 1m5s Details
Test / test_rebalance_verify_ec_imm (push) Successful in 4m17s Details
Test / test_rebalance_verify_ec (push) Successful in 7m13s Details
Test / test_heal_pg_size_2 (push) Successful in 3m18s Details
Test / test_heal_ec (push) Successful in 5m4s Details
Test / test_heal_csum_32k_dmj (push) Successful in 4m40s Details
Test / test_heal_csum_32k_dj (push) Successful in 6m14s Details
Test / test_heal_csum_32k (push) Successful in 6m13s Details
Test / test_scrub (push) Successful in 1m34s Details
Test / test_heal_csum_4k_dmj (push) Successful in 6m43s Details
Test / test_heal_csum_4k_dj (push) Successful in 6m42s Details
Test / test_scrub_zero_osd_2 (push) Successful in 1m28s Details
Test / test_scrub_xor (push) Successful in 50s Details
Test / test_scrub_pg_size_6_pg_minsize_4_osd_count_6_ec (push) Successful in 1m10s Details
Test / test_scrub_ec (push) Successful in 57s Details
Test / test_scrub_pg_size_3 (push) Successful in 1m46s Details
Test / test_nfs (push) Successful in 24s Details
Test / test_heal_csum_4k (push) Successful in 5m35s Details
2024-03-11 23:30:42 +03:00
Vitaliy Filippov b1129c171b Stop then retry, not retry then stop 2024-03-11 14:17:16 +03:00
Vitaliy Filippov 8f077bc67c Fix malloc/free in nfs_kv_read/write 2024-03-11 14:17:16 +03:00
Vitaliy Filippov 5624a67c67 Fix another rare OSD hang on zeroing out entries on start 2024-03-11 14:17:16 +03:00
Vitaliy Filippov e0f36d47cc Fix "bad key in etcd" in mon for FS pools 2024-03-11 14:17:16 +03:00
Vitaliy Filippov 5b34ffa9b6 Check if already mounted before mounting 2024-03-11 14:17:16 +03:00
Vitaliy Filippov 94de5618e6 Fix zero used space, update mtime when moving/changing inode 2024-03-11 14:17:16 +03:00
Vitaliy Filippov 551160c693 Ignore ECANCELED in nfs-proxy (happens in io_uring on fork) 2024-03-11 14:17:16 +03:00
Vitaliy Filippov 462845c44b Support unaligned shared_offsets, align shared file data instead of header 2024-03-11 14:17:16 +03:00
Vitaliy Filippov 7b75a9bf1b Implement auto-unmount local NFS server mode for vitastor-nfs 2024-03-11 14:17:16 +03:00
Vitaliy Filippov 08fc67de05 Return error on failed shrink 2024-03-11 14:17:16 +03:00
Vitaliy Filippov c8a2082dc7 Implement rename over an existing file/directory 2024-03-11 14:17:16 +03:00
Vitaliy Filippov 871db8fe31 Support --logfile in nfs-proxy 2024-03-11 14:17:16 +03:00
Vitaliy Filippov 6aed77041f Fix shared file overlap, add FIXMEs 2024-03-11 14:17:16 +03:00
Vitaliy Filippov 03d5f512bf Create inode, then direntry, not direntry, then inode; retry ID collisions 2024-03-11 14:17:16 +03:00
Vitaliy Filippov 127ce8c104 Fix NFS shared/aligned write FIXMEs 2024-03-11 14:17:16 +03:00
Vitaliy Filippov 654386e447 Allow to disable per-inode stats for VitastorFS pools 2024-03-11 14:17:16 +03:00
39 changed files with 1346 additions and 511 deletions

View File

@ -267,6 +267,7 @@ Optional parameters:
| `--immediate_commit none` | Put pool only on OSDs with this or larger immediate_commit (none < small < all) |
| `--primary_affinity_tags tags` | Prefer to put primary copies on OSDs with all specified tags |
| `--scrub_interval <time>` | Enable regular scrubbing for this pool. Format: number + unit s/m/h/d/M/y |
| `--no_inode_stats 1` | Disable per-inode statistics for this pool (use for VitastorFS pools) |
| `--pg_stripe_size <number>` | Increase object grouping stripe |
| `--max_osd_combinations 10000` | Maximum number of random combinations for LP solver input |
| `--wait` | Wait for the new pool to come online |
@ -288,7 +289,7 @@ Modify an existing pool. Modifiable parameters:
```
[-s|--pg_size <number>] [--pg_minsize <number>] [-n|--pg_count <count>]
[--failure_domain <level>] [--root_node <node>] [--osd_tags <tags>]
[--failure_domain <level>] [--root_node <node>] [--osd_tags <tags>] [--no_inode_stats 0|1]
[--max_osd_combinations <number>] [--primary_affinity_tags <tags>] [--scrub_interval <time>]
```

View File

@ -37,7 +37,7 @@ const etcd_allow = new RegExp('^'+[
'pg/history/[1-9]\\d*/[1-9]\\d*',
'pool/stats/[1-9]\\d*',
'history/last_clean_pgs',
'inode/stats/[1-9]\\d*/[1-9]\\d*',
'inode/stats/[1-9]\\d*/\\d+',
'pool/stats/[1-9]\\d*',
'stats',
'index/image/.*',
@ -1737,8 +1737,11 @@ class Mon
for (const inode_num in this.state.osd.space[osd_num][pool_id])
{
const u = BigInt(this.state.osd.space[osd_num][pool_id][inode_num]||0);
if (inode_num)
{
inode_stats[pool_id][inode_num] = inode_stats[pool_id][inode_num] || inode_stub();
inode_stats[pool_id][inode_num].raw_used += u;
}
this.state.pool.stats[pool_id].used_raw_tb += u;
}
}

View File

@ -82,3 +82,8 @@ uint32_t blockstore_t::get_bitmap_granularity()
{
return impl->get_bitmap_granularity();
}
void blockstore_t::set_no_inode_stats(const std::vector<uint64_t> & pool_ids)
{
impl->set_no_inode_stats(pool_ids);
}

View File

@ -216,6 +216,9 @@ public:
// Get per-inode space usage statistics
std::map<uint64_t, uint64_t> & get_inode_space_stats();
// Set per-pool no_inode_stats
void set_no_inode_stats(const std::vector<uint64_t> & pool_ids);
// Print diagnostics to stdout
void dump_diagnostics();

View File

@ -733,3 +733,86 @@ void blockstore_impl_t::disk_error_abort(const char *op, int retval, int expecte
fprintf(stderr, "Disk %s failed: result is %d, expected %d. Can't continue, sorry :-(\n", op, retval, expected);
exit(1);
}
void blockstore_impl_t::set_no_inode_stats(const std::vector<uint64_t> & pool_ids)
{
for (auto & np: no_inode_stats)
{
np.second = 2;
}
for (auto pool_id: pool_ids)
{
if (!no_inode_stats[pool_id])
recalc_inode_space_stats(pool_id, false);
no_inode_stats[pool_id] = 1;
}
for (auto np_it = no_inode_stats.begin(); np_it != no_inode_stats.end(); )
{
if (np_it->second == 2)
{
recalc_inode_space_stats(np_it->first, true);
no_inode_stats.erase(np_it++);
}
else
np_it++;
}
}
void blockstore_impl_t::recalc_inode_space_stats(uint64_t pool_id, bool per_inode)
{
auto sp_begin = inode_space_stats.lower_bound((pool_id << (64-POOL_ID_BITS)));
auto sp_end = inode_space_stats.lower_bound(((pool_id+1) << (64-POOL_ID_BITS)));
inode_space_stats.erase(sp_begin, sp_end);
auto sh_it = clean_db_shards.lower_bound((pool_id << (64-POOL_ID_BITS)));
while (sh_it != clean_db_shards.end() &&
(sh_it->first >> (64-POOL_ID_BITS)) == pool_id)
{
for (auto & pair: sh_it->second)
{
uint64_t space_id = per_inode ? pair.first.inode : (pool_id << (64-POOL_ID_BITS));
inode_space_stats[space_id] += dsk.data_block_size;
}
sh_it++;
}
object_id last_oid = {};
bool last_exists = false;
auto dirty_it = dirty_db.lower_bound((obj_ver_id){ .oid = { .inode = (pool_id << (64-POOL_ID_BITS)) } });
while (dirty_it != dirty_db.end() && (dirty_it->first.oid.inode >> (64-POOL_ID_BITS)) == pool_id)
{
if (IS_STABLE(dirty_it->second.state) && (IS_BIG_WRITE(dirty_it->second.state) || IS_DELETE(dirty_it->second.state)))
{
bool exists = false;
if (last_oid == dirty_it->first.oid)
{
exists = last_exists;
}
else
{
auto & clean_db = clean_db_shard(dirty_it->first.oid);
auto clean_it = clean_db.find(dirty_it->first.oid);
exists = clean_it != clean_db.end();
}
uint64_t space_id = per_inode ? dirty_it->first.oid.inode : (pool_id << (64-POOL_ID_BITS));
if (IS_BIG_WRITE(dirty_it->second.state))
{
if (!exists)
inode_space_stats[space_id] += dsk.data_block_size;
last_exists = true;
}
else
{
if (exists)
{
auto & sp = inode_space_stats[space_id];
if (sp > dsk.data_block_size)
sp -= dsk.data_block_size;
else
inode_space_stats.erase(space_id);
}
last_exists = false;
}
last_oid = dirty_it->first.oid;
}
dirty_it++;
}
}

View File

@ -272,6 +272,7 @@ class blockstore_impl_t
std::map<pool_id_t, pool_shard_settings_t> clean_db_settings;
std::map<pool_pg_id_t, blockstore_clean_db_t> clean_db_shards;
std::map<uint64_t, int> no_inode_stats;
uint8_t *clean_bitmaps = NULL;
blockstore_dirty_db_t dirty_db;
std::vector<blockstore_op_t*> submit_queue;
@ -318,6 +319,7 @@ class blockstore_impl_t
blockstore_clean_db_t& clean_db_shard(object_id oid);
void reshard_clean_db(pool_id_t pool_id, uint32_t pg_count, uint32_t pg_stripe_size);
void recalc_inode_space_stats(uint64_t pool_id, bool per_inode);
// Journaling
void prepare_journal_sector_write(int sector, blockstore_op_t *op);
@ -428,6 +430,9 @@ public:
// Space usage statistics
std::map<uint64_t, uint64_t> inode_space_stats;
// Set per-pool no_inode_stats
void set_no_inode_stats(const std::vector<uint64_t> & pool_ids);
// Print diagnostics to stdout
void dump_diagnostics();

View File

@ -238,6 +238,7 @@ resume_2:
data->iov = { bufs[i].buf, (size_t)bufs[i].size };
data->callback = [this, i](ring_data_t *data) { handle_event(data, i); };
my_uring_prep_writev(sqe, bs->dsk.meta_fd, &data->iov, 1, bs->dsk.meta_offset + bufs[i].offset);
bs->ringloop->submit();
bufs[i].state = INIT_META_WRITING;
submitted++;
}

View File

@ -487,18 +487,24 @@ void blockstore_impl_t::mark_stable(obj_ver_id v, bool forget_dirty)
}
if (!exists)
{
inode_space_stats[dirty_it->first.oid.inode] += dsk.data_block_size;
uint64_t space_id = dirty_it->first.oid.inode;
if (no_inode_stats[dirty_it->first.oid.inode >> (64-POOL_ID_BITS)])
space_id = space_id & ~(((uint64_t)1 << (64-POOL_ID_BITS)) - 1);
inode_space_stats[space_id] += dsk.data_block_size;
used_blocks++;
}
big_to_flush++;
}
else if (IS_DELETE(dirty_it->second.state))
{
auto & sp = inode_space_stats[dirty_it->first.oid.inode];
uint64_t space_id = dirty_it->first.oid.inode;
if (no_inode_stats[dirty_it->first.oid.inode >> (64-POOL_ID_BITS)])
space_id = space_id & ~(((uint64_t)1 << (64-POOL_ID_BITS)) - 1);
auto & sp = inode_space_stats[space_id];
if (sp > dsk.data_block_size)
sp -= dsk.data_block_size;
else
inode_space_stats.erase(dirty_it->first.oid.inode);
inode_space_stats.erase(space_id);
used_blocks--;
big_to_flush++;
}

View File

@ -131,6 +131,7 @@ static const char* help_text =
" --immediate_commit none Put pool only on OSDs with this or larger immediate_commit (none < small < all)\n"
" --primary_affinity_tags tags Prefer to put primary copies on OSDs with all specified tags\n"
" --scrub_interval <time> Enable regular scrubbing for this pool. Format: number + unit s/m/h/d/M/y\n"
" --no_inode_stats 1 Disable per-inode statistics for this pool (use for VitastorFS pools)\n"
" --pg_stripe_size <number> Increase object grouping stripe\n"
" --max_osd_combinations 10000 Maximum number of random combinations for LP solver input\n"
" --wait Wait for the new pool to come online\n"
@ -142,7 +143,7 @@ static const char* help_text =
"vitastor-cli modify-pool|pool-modify <id|name> [--name <new_name>] [PARAMETERS...]\n"
" Modify an existing pool. Modifiable parameters:\n"
" [-s|--pg_size <number>] [--pg_minsize <number>] [-n|--pg_count <count>]\n"
" [--failure_domain <level>] [--root_node <node>] [--osd_tags <tags>]\n"
" [--failure_domain <level>] [--root_node <node>] [--osd_tags <tags>] [--no_inode_stats 0|1]\n"
" [--max_osd_combinations <number>] [--primary_affinity_tags <tags>] [--scrub_interval <time>]\n"
" Non-modifiable parameters (changing them WILL lead to data loss):\n"
" [--block_size <size>] [--bitmap_granularity <size>]\n"

View File

@ -153,6 +153,7 @@ void cli_tool_t::loop_and_wait(std::function<bool(cli_result_t &)> loop_cb, std:
ringloop->unregister_consumer(&looper->consumer);
looper->loop_cb = NULL;
looper->complete_cb(looper->result);
ringloop->submit();
delete looper;
return;
}

View File

@ -81,6 +81,11 @@ std::string validate_pool_config(json11::Json::object & new_cfg, json11::Json ol
}
value = value.uint64_value();
}
else if (key == "no_inode_stats" && value.bool_value())
{
// Leave true, remove false
value = true;
}
else if (key == "name" || key == "scheme" || key == "immediate_commit" ||
key == "failure_domain" || key == "root_node" || key == "scrub_interval")
{
@ -248,7 +253,7 @@ std::string validate_pool_config(json11::Json::object & new_cfg, json11::Json ol
// immediate_commit
if (!cfg["immediate_commit"].is_null() && !etcd_state_client_t::parse_immediate_commit(cfg["immediate_commit"].string_value()))
{
return "immediate_commit must be one of \"all\", \"small\", or \"none\", but it is "+cfg["scrub_interval"].as_string();
return "immediate_commit must be one of \"all\", \"small\", or \"none\", but it is "+cfg["immediate_commit"].as_string();
}
// scrub_interval

View File

@ -529,6 +529,8 @@ resume_3:
st["block_size_fmt"] = format_size(st["block_size"].uint64_value());
if (st["bitmap_granularity"].uint64_value())
st["bitmap_granularity_fmt"] = format_size(st["bitmap_granularity"].uint64_value());
if (st["no_inode_stats"].bool_value())
st["inode_stats_fmt"] = "disabled";
}
// All pool parameters are only displayed in the "detailed" mode
// because there's too many of them to show them in table
@ -547,6 +549,7 @@ resume_3:
{ "bitmap_granularity_fmt", "Bitmap granularity" },
{ "immediate_commit", "Immediate commit" },
{ "scrub_interval", "Scrub interval" },
{ "inode_stats_fmt", "Per-inode stats" },
{ "pg_stripe_size", "PG stripe size" },
{ "max_osd_combinations", "Max OSD combinations" },
{ "total_fmt", "Total" },

View File

@ -1213,6 +1213,10 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part)
op->retry_after = op->retval == -EIO ? client_eio_retry_interval : client_retry_interval;
}
reset_retry_timer(op->retry_after);
if (stop_fd >= 0)
{
msgr.stop_client(stop_fd);
}
if (op->inflight_count == 0)
{
if (op->opcode == OSD_OP_SYNC)
@ -1220,10 +1224,6 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part)
else
continue_rw(op);
}
if (stop_fd >= 0)
{
msgr.stop_client(stop_fd);
}
}
else
{

View File

@ -132,9 +132,6 @@ void disk_tool_simple_offsets(json11::Json cfg, bool json_output);
uint64_t sscanf_json(const char *fmt, const json11::Json & str);
void fromhexstr(const std::string & from, int bytes, uint8_t *to);
std::string realpath_str(std::string path, bool nofail = true);
std::string read_all_fd(int fd);
std::string read_file(std::string file, bool allow_enoent = false);
int disable_cache(std::string dev);
std::string get_parent_device(std::string dev);
bool json_is_true(const json11::Json & val);

View File

@ -42,36 +42,6 @@ void fromhexstr(const std::string & from, int bytes, uint8_t *to)
}
}
std::string realpath_str(std::string path, bool nofail)
{
char *p = realpath((char*)path.c_str(), NULL);
if (!p)
{
fprintf(stderr, "Failed to resolve %s: %s\n", path.c_str(), strerror(errno));
return nofail ? path : "";
}
std::string rp(p);
free(p);
return rp;
}
std::string read_file(std::string file, bool allow_enoent)
{
std::string res;
int fd = open(file.c_str(), O_RDONLY);
if (fd < 0 || (res = read_all_fd(fd)) == "")
{
int err = errno;
if (fd >= 0)
close(fd);
if (!allow_enoent || err != ENOENT)
fprintf(stderr, "Can't read %s: %s\n", file.c_str(), strerror(err));
return "";
}
close(fd);
return res;
}
// returns 1 = check error, 0 = write through, -1 = write back
// (similar to 1 = warning, -1 = error, 0 = success in disable_cache)
static int check_queue_cache(std::string dev, std::string parent_dev)

View File

@ -101,7 +101,7 @@ void epoll_manager_t::handle_uring_event()
my_uring_prep_poll_add(sqe, epoll_fd, POLLIN);
data->callback = [this](ring_data_t *data)
{
if (data->res < 0)
if (data->res < 0 && data->res != -ECANCELED)
{
throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res));
}

View File

@ -863,6 +863,8 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
pc.scrub_interval = parse_time(pool_item.second["scrub_interval"].string_value());
if (!pc.scrub_interval)
pc.scrub_interval = 0;
// Disable per-inode stats
pc.no_inode_stats = pool_item.second["no_inode_stats"].bool_value();
// Immediate Commit Mode
pc.immediate_commit = pool_item.second["immediate_commit"].is_string()
? parse_immediate_commit(pool_item.second["immediate_commit"].string_value())

View File

@ -60,6 +60,7 @@ struct pool_config_t
uint64_t pg_stripe_size;
std::map<pg_num_t, pg_config_t> pg_config;
uint64_t scrub_interval;
bool no_inode_stats;
};
struct inode_config_t

View File

@ -146,7 +146,7 @@ public:
" Note that nbd_timeout, nbd_max_devices and nbd_max_part options may also be specified\n"
" in /etc/vitastor/vitastor.conf or in other configuration file specified with --config_file.\n"
" --logfile /path/to/log/file.txt\n"
" Wite log messages to the specified file instead of dropping them (in background mode)\n"
" Write log messages to the specified file instead of dropping them (in background mode)\n"
" or printing them to the standard output (in foreground mode).\n"
" --dev_num N\n"
" Use the specified device /dev/nbdN instead of automatic selection.\n"
@ -298,7 +298,7 @@ public:
}
}
}
if (cfg["logfile"].is_string())
if (cfg["logfile"].string_value() != "")
{
logfile = cfg["logfile"].string_value();
}

View File

@ -34,7 +34,7 @@ static std::string get_inode_name(nfs_client_t *self, diropargs3 & what)
std::string name = what.name;
return (dir.size()
? dir+"/"+name
: self->parent->name_prefix+name);
: self->parent->blockfs->name_prefix+name);
}
static fattr3 get_dir_attributes(nfs_client_t *self, std::string dir)
@ -985,7 +985,7 @@ static void block_nfs3_readdir_common(void *opaque, rpc_op_t *rop, bool is_plus)
if (dir_it != self->parent->blockfs->dir_by_hash.end())
dir = dir_it->second;
}
std::string prefix = dir.size() ? dir+"/" : self->parent->name_prefix;
std::string prefix = dir.size() ? dir+"/" : self->parent->blockfs->name_prefix;
std::map<std::string, struct entryplus3> entries;
for (auto & ic: self->parent->cli->st_cli.inode_config)
{
@ -1154,8 +1154,20 @@ static int block_nfs3_readdirplus_proc(void *opaque, rpc_op_t *rop)
return 0;
}
void block_fs_state_t::init(nfs_proxy_t *proxy)
void block_fs_state_t::init(nfs_proxy_t *proxy, json11::Json cfg)
{
name_prefix = cfg["subdir"].string_value();
{
int e = name_prefix.size();
while (e > 0 && name_prefix[e-1] == '/')
e--;
int s = 0;
while (s < e && name_prefix[s] == '/')
s++;
name_prefix = name_prefix.substr(s, e-s);
if (name_prefix.size())
name_prefix += "/";
}
// We need inode name hashes for NFS handles to remain stateless and <= 64 bytes long
dir_info[""] = (nfs_dir_t){
.id = 1,
@ -1172,7 +1184,7 @@ void block_fs_state_t::init(nfs_proxy_t *proxy)
}
auto & inode_cfg = inode_cfg_it->second;
std::string full_name = inode_cfg.name;
if (proxy->name_prefix != "" && full_name.substr(0, proxy->name_prefix.size()) != proxy->name_prefix)
if (proxy->blockfs->name_prefix != "" && full_name.substr(0, proxy->blockfs->name_prefix.size()) != proxy->blockfs->name_prefix)
{
return;
}
@ -1181,7 +1193,7 @@ void block_fs_state_t::init(nfs_proxy_t *proxy)
clock_gettime(CLOCK_REALTIME, &now);
dir_info[""].mod_rev = dir_info[""].mod_rev < inode_cfg.mod_revision ? inode_cfg.mod_revision : dir_info[""].mod_rev;
dir_info[""].mtime = now;
int pos = full_name.find('/', proxy->name_prefix.size());
int pos = full_name.find('/', proxy->blockfs->name_prefix.size());
while (pos >= 0)
{
std::string dir = full_name.substr(0, pos);

View File

@ -36,6 +36,8 @@ struct extend_inode_t
struct block_fs_state_t
{
std::string name_prefix;
// filehandle = "S"+base64(sha256(full name with prefix)) or "roothandle" for mount root)
uint64_t next_dir_id = 2;
// filehandle => dir with name_prefix
@ -51,7 +53,7 @@ struct block_fs_state_t
std::map<inode_t, extend_inode_t> extends;
std::multimap<extend_size_t, extend_write_t> extend_writes;
void init(nfs_proxy_t *proxy);
void init(nfs_proxy_t *proxy, json11::Json cfg);
};
nfsstat3 vitastor_nfs_map_err(int err);

View File

@ -72,7 +72,7 @@ fattr3 get_kv_attributes(nfs_client_t *self, uint64_t ino, json11::Json attrs)
auto nlink = attrs["nlink"].uint64_value();
nfstime3 mtime = nfstime_from_str(attrs["mtime"].string_value());
nfstime3 atime = attrs["atime"].is_null() ? mtime : nfstime_from_str(attrs["atime"].string_value());
// FIXME In theory we could store the binary structure itself instead of JSON
// In theory we could store the binary structure itself, but JSON is simpler :-)
return (fattr3){
.type = (type == 0 ? NF3REG : (ftype3)type),
.mode = (attrs["mode"].is_null() ? (type == NF3DIR ? 0755 : 0644) : (uint32_t)mode),
@ -80,7 +80,8 @@ fattr3 get_kv_attributes(nfs_client_t *self, uint64_t ino, json11::Json attrs)
.uid = (uint32_t)attrs["uid"].uint64_value(),
.gid = (uint32_t)attrs["gid"].uint64_value(),
.size = (type == NF3DIR ? 4096 : attrs["size"].uint64_value()),
.used = (type == NF3DIR ? 4096 : attrs["alloc"].uint64_value()),
// FIXME Counting actual used file size would require reworking statistics
.used = (type == NF3DIR ? 4096 : attrs["size"].uint64_value()),
.rdev = (type == NF3BLK || type == NF3CHR
? (specdata3){ (uint32_t)attrs["major"].uint64_value(), (uint32_t)attrs["minor"].uint64_value() }
: (specdata3){}),
@ -190,3 +191,73 @@ void nfs_kv_procs(nfs_client_t *self)
self->proc_table.insert(pt[i]);
}
}
void kv_fs_state_t::init(nfs_proxy_t *proxy, json11::Json cfg)
{
// Check if we're using VitastorFS
fs_kv_inode = cfg["fs"].uint64_value();
if (fs_kv_inode)
{
if (!INODE_POOL(fs_kv_inode))
{
fprintf(stderr, "FS metadata inode number must include pool\n");
exit(1);
}
}
else
{
for (auto & ic: proxy->cli->st_cli.inode_config)
{
if (ic.second.name == cfg["fs"].string_value())
{
fs_kv_inode = ic.first;
break;
}
}
if (!fs_kv_inode)
{
fprintf(stderr, "FS metadata image \"%s\" does not exist\n", cfg["fs"].string_value().c_str());
exit(1);
}
}
readdir_getattr_parallel = cfg["readdir_getattr_parallel"].uint64_value();
if (!readdir_getattr_parallel)
readdir_getattr_parallel = 8;
id_alloc_batch_size = cfg["id_alloc_batch_size"].uint64_value();
if (!id_alloc_batch_size)
id_alloc_batch_size = 200;
auto & pool_cfg = proxy->cli->st_cli.pool_config.at(proxy->default_pool_id);
pool_block_size = pool_cfg.pg_stripe_size;
pool_alignment = pool_cfg.bitmap_granularity;
// Open DB and wait
int open_res = 0;
bool open_done = false;
proxy->db = new kv_dbw_t(proxy->cli);
proxy->db->open(fs_kv_inode, cfg, [&](int res)
{
open_done = true;
open_res = res;
});
while (!open_done)
{
proxy->ringloop->loop();
if (open_done)
break;
proxy->ringloop->wait();
}
if (open_res < 0)
{
fprintf(stderr, "Failed to open key/value filesystem metadata index: %s (code %d)\n",
strerror(-open_res), open_res);
exit(1);
}
fs_base_inode = ((uint64_t)proxy->default_pool_id << (64-POOL_ID_BITS));
fs_inode_count = ((uint64_t)1 << (64-POOL_ID_BITS)) - 1;
shared_inode_threshold = pool_block_size;
if (!cfg["shared_inode_threshold"].is_null())
{
shared_inode_threshold = cfg["shared_inode_threshold"].uint64_value();
}
zero_block.resize(pool_block_size < 1048576 ? 1048576 : pool_block_size);
scrap_block.resize(pool_block_size < 1048576 ? 1048576 : pool_block_size);
}

View File

@ -33,7 +33,6 @@ struct shared_alloc_queue_t
{
nfs_kv_write_state *st;
int state;
uint64_t size;
};
struct kv_inode_extend_t
@ -45,12 +44,24 @@ struct kv_inode_extend_t
struct kv_fs_state_t
{
uint64_t fs_kv_inode = 0;
uint64_t fs_base_inode = 0;
uint64_t fs_inode_count = 0;
int readdir_getattr_parallel = 8, id_alloc_batch_size = 200;
uint64_t pool_block_size = 0;
uint64_t pool_alignment = 0;
uint64_t shared_inode_threshold = 0;
std::map<list_cookie_t, list_cookie_val_t> list_cookies;
uint64_t fs_next_id = 1, fs_allocated_id = 0;
std::vector<uint64_t> unallocated_ids;
std::vector<shared_alloc_queue_t> allocating_shared;
uint64_t cur_shared_inode = 0, cur_shared_offset = 0;
std::map<inode_t, kv_inode_extend_t> extends;
std::vector<uint8_t> zero_block;
std::vector<uint8_t> scrap_block;
void init(nfs_proxy_t *proxy, json11::Json cfg);
};
struct shared_file_header_t

View File

@ -16,7 +16,7 @@ void allocate_new_id(nfs_client_t *self, std::function<void(int res, uint64_t ne
cb(0, self->parent->kvfs->fs_next_id++);
return;
}
else if (self->parent->kvfs->fs_next_id > self->parent->fs_inode_count)
else if (self->parent->kvfs->fs_next_id > self->parent->kvfs->fs_inode_count)
{
cb(-ENOSPC, 0);
return;
@ -29,7 +29,7 @@ void allocate_new_id(nfs_client_t *self, std::function<void(int res, uint64_t ne
return;
}
uint64_t prev_val = stoull_full(prev_str);
if (prev_val >= self->parent->fs_inode_count)
if (prev_val >= self->parent->kvfs->fs_inode_count)
{
cb(-ENOSPC, 0);
return;
@ -38,10 +38,10 @@ void allocate_new_id(nfs_client_t *self, std::function<void(int res, uint64_t ne
{
prev_val = 1;
}
uint64_t new_val = prev_val + self->parent->id_alloc_batch_size;
if (new_val >= self->parent->fs_inode_count)
uint64_t new_val = prev_val + self->parent->kvfs->id_alloc_batch_size;
if (new_val >= self->parent->kvfs->fs_inode_count)
{
new_val = self->parent->fs_inode_count;
new_val = self->parent->kvfs->fs_inode_count;
}
self->parent->db->set(KV_NEXT_ID_KEY, std::to_string(new_val), [=](int res)
{
@ -76,6 +76,7 @@ struct kv_create_state
uint64_t verf = 0;
uint64_t dir_ino = 0;
std::string filename;
int res = 0;
uint64_t new_id = 0;
json11::Json::object attrobj;
json11::Json attrs;
@ -84,8 +85,14 @@ struct kv_create_state
std::function<void(int res)> cb;
};
static void kv_do_create(kv_create_state *st)
static void kv_continue_create(kv_create_state *st, int state)
{
if (state == 0) {}
else if (state == 1) goto resume_1;
else if (state == 2) goto resume_2;
else if (state == 3) goto resume_3;
else if (state == 4) goto resume_4;
else if (state == 5) goto resume_5;
if (st->self->parent->trace)
fprintf(stderr, "[%d] CREATE %ju/%s ATTRS %s\n", st->self->nfs_fd, st->dir_ino, st->filename.c_str(), json11::Json(st->attrobj).dump().c_str());
if (st->filename == "" || st->filename.find("/") != std::string::npos)
@ -98,78 +105,58 @@ static void kv_do_create(kv_create_state *st)
st->attrobj["mtime"] = nfstime_now_str();
if (st->attrobj.find("atime") == st->attrobj.end())
st->attrobj["atime"] = st->attrobj["mtime"];
st->attrs = std::move(st->attrobj);
resume_1:
// Generate inode ID
allocate_new_id(st->self, [st](int res, uint64_t new_id)
{
if (res < 0)
st->res = res;
st->new_id = new_id;
kv_continue_create(st, 2);
});
return;
resume_2:
if (st->res < 0)
{
auto cb = std::move(st->cb);
cb(res);
cb(st->res);
return;
}
st->new_id = new_id;
auto direntry = json11::Json::object{ { "ino", st->new_id } };
if (st->attrobj.find("type") != st->attrobj.end() &&
st->attrobj["type"].string_value() == "dir")
{
direntry["type"] = "dir";
}
st->attrs = std::move(st->attrobj);
st->direntry_text = json11::Json(direntry).dump().c_str();
// Set direntry
st->self->parent->db->set(kv_direntry_key(st->dir_ino, st->filename), st->direntry_text, [st](int res)
{
if (res < 0)
{
st->self->parent->kvfs->unallocated_ids.push_back(st->new_id);
if (res == -EAGAIN)
{
if (st->dup_ino)
{
st->new_id = st->dup_ino;
res = 0;
}
else
res = -EEXIST;
}
else
fprintf(stderr, "create %ju/%s failed: %s (code %d)\n", st->dir_ino, st->filename.c_str(), strerror(-res), res);
auto cb = std::move(st->cb);
cb(res);
}
else
{
st->self->parent->db->set(kv_inode_key(st->new_id), st->attrs.dump().c_str(), [st](int res)
{
if (res == -EAGAIN)
{
res = -EEXIST;
}
if (res < 0)
{
st->self->parent->db->del(kv_direntry_key(st->dir_ino, st->filename), [st, res](int del_res)
{
if (!del_res)
{
st->self->parent->kvfs->unallocated_ids.push_back(st->new_id);
}
auto cb = std::move(st->cb);
cb(res);
}, [st](int res, const std::string & value)
{
return res != -ENOENT && value == st->direntry_text;
});
}
else
{
auto cb = std::move(st->cb);
cb(0);
}
st->res = res;
kv_continue_create(st, 3);
}, [st](int res, const std::string & value)
{
return res == -ENOENT;
});
return;
resume_3:
if (st->res == -EAGAIN)
{
// Inode ID generator failure - retry
goto resume_1;
}
if (st->res < 0)
{
auto cb = std::move(st->cb);
cb(st->res);
return;
}
{
auto direntry = json11::Json::object{ { "ino", st->new_id } };
if (st->attrs["type"].string_value() == "dir")
{
direntry["type"] = "dir";
}
st->direntry_text = json11::Json(direntry).dump().c_str();
}
// Set direntry
st->dup_ino = 0;
st->self->parent->db->set(kv_direntry_key(st->dir_ino, st->filename), st->direntry_text, [st](int res)
{
st->res = res;
kv_continue_create(st, 4);
}, [st](int res, const std::string & value)
{
// CAS compare - check that the key doesn't exist
@ -192,7 +179,34 @@ static void kv_do_create(kv_create_state *st)
}
return true;
});
return;
resume_4:
if (st->res == -EAGAIN)
{
// Direntry already exists
st->self->parent->db->del(kv_inode_key(st->new_id), [st](int res)
{
st->res = res;
kv_continue_create(st, 5);
});
resume_5:
if (st->res < 0)
{
fprintf(stderr, "failed to delete duplicate inode %ju left from create %s (code %d)\n", st->new_id, strerror(-st->res), st->res);
}
else
{
st->self->parent->kvfs->unallocated_ids.push_back(st->new_id);
}
if (st->dup_ino)
{
// Successfully created by the previous "exclusive" request
st->new_id = st->dup_ino;
}
st->res = st->dup_ino ? 0 : -EEXIST;
}
auto cb = std::move(st->cb);
cb(st->res);
}
static void kv_create_setattr(json11::Json::object & attrobj, sattr3 & sattr)
@ -260,7 +274,7 @@ int kv_nfs3_create_proc(void *opaque, rpc_op_t *rop)
}
}
st->cb = [st](int res) { kv_create_reply<CREATE3res, CREATE3resok>(st, res); };
kv_do_create(st);
kv_continue_create(st, 0);
return 1;
}
@ -276,7 +290,7 @@ int kv_nfs3_mkdir_proc(void *opaque, rpc_op_t *rop)
st->attrobj["parent_ino"] = st->dir_ino;
kv_create_setattr(st->attrobj, args->attributes);
st->cb = [st](int res) { kv_create_reply<MKDIR3res, MKDIR3resok>(st, res); };
kv_do_create(st);
kv_continue_create(st, 0);
return 1;
}
@ -292,7 +306,7 @@ int kv_nfs3_symlink_proc(void *opaque, rpc_op_t *rop)
st->attrobj["symlink"] = (std::string)args->symlink.symlink_data;
kv_create_setattr(st->attrobj, args->symlink.symlink_attributes);
st->cb = [st](int res) { kv_create_reply<SYMLINK3res, SYMLINK3resok>(st, res); };
kv_do_create(st);
kv_continue_create(st, 0);
return 1;
}
@ -324,6 +338,6 @@ int kv_nfs3_mknod_proc(void *opaque, rpc_op_t *rop)
return 0;
}
st->cb = [st](int res) { kv_create_reply<MKNOD3res, MKNOD3resok>(st, res); };
kv_do_create(st);
kv_continue_create(st, 0);
return 1;
}

View File

@ -142,7 +142,6 @@ resume_4:
cb(st->res);
}
// FIXME: We'll need some tests for the FS
int kv_nfs3_link_proc(void *opaque, rpc_op_t *rop)
{
auto st = new nfs_kv_link_state;

View File

@ -18,6 +18,7 @@ struct nfs_kv_read_state
std::function<void(int)> cb;
// state
int res = 0;
int eof = 0;
json11::Json ientry;
uint64_t aligned_size = 0, aligned_offset = 0;
uint8_t *aligned_buf = NULL;
@ -25,6 +26,9 @@ struct nfs_kv_read_state
uint8_t *buf = NULL;
};
#define align_down(size) ((size) & ~(st->self->parent->kvfs->pool_alignment-1))
#define align_up(size) (((size) + st->self->parent->kvfs->pool_alignment-1) & ~(st->self->parent->kvfs->pool_alignment-1))
static void nfs_kv_continue_read(nfs_kv_read_state *st, int state)
{
if (state == 0) {}
@ -36,7 +40,8 @@ static void nfs_kv_continue_read(nfs_kv_read_state *st, int state)
fprintf(stderr, "BUG: invalid state in nfs_kv_continue_read()");
abort();
}
if (st->offset + sizeof(shared_file_header_t) < st->self->parent->shared_inode_threshold)
resume_0:
if (st->offset + sizeof(shared_file_header_t) < st->self->parent->kvfs->shared_inode_threshold)
{
kv_read_inode(st->self, st->ino, [st](int res, const std::string & value, json11::Json attrs)
{
@ -54,21 +59,45 @@ resume_1:
}
if (st->ientry["shared_ino"].uint64_value() != 0)
{
st->aligned_size = align_shared_size(st->self, st->offset+st->size);
st->aligned_buf = (uint8_t*)malloc_or_die(st->aligned_size);
st->buf = st->aligned_buf + sizeof(shared_file_header_t) + st->offset;
st->op = new cluster_op_t;
st->op->opcode = OSD_OP_READ;
st->op->inode = st->self->parent->fs_base_inode + st->ientry["shared_ino"].uint64_value();
st->op->offset = st->ientry["shared_offset"].uint64_value();
if (st->offset+st->size > st->ientry["size"].uint64_value())
if (st->offset >= st->ientry["size"].uint64_value())
{
st->op->len = align_shared_size(st->self, st->ientry["size"].uint64_value());
memset(st->aligned_buf+st->op->len, 0, st->aligned_size-st->op->len);
st->size = 0;
st->eof = 1;
auto cb = std::move(st->cb);
cb(0);
return;
}
st->op = new cluster_op_t;
{
st->op->opcode = OSD_OP_READ;
st->op->inode = st->self->parent->kvfs->fs_base_inode + st->ientry["shared_ino"].uint64_value();
// Always read including header to react if the file was possibly moved away
auto read_offset = st->ientry["shared_offset"].uint64_value();
st->op->offset = align_down(read_offset);
if (st->op->offset < read_offset)
{
st->op->iov.push_back(st->self->parent->kvfs->scrap_block.data(),
read_offset-st->op->offset);
}
auto read_size = st->offset+st->size;
if (read_size > st->ientry["size"].uint64_value())
{
st->eof = 1;
st->size = st->ientry["size"].uint64_value()-st->offset;
read_size = st->ientry["size"].uint64_value();
}
read_size += sizeof(shared_file_header_t);
assert(!st->aligned_buf);
st->aligned_buf = (uint8_t*)malloc_or_die(read_size);
st->buf = st->aligned_buf + sizeof(shared_file_header_t) + st->offset;
st->op->iov.push_back(st->aligned_buf, read_size);
st->op->len = align_up(read_offset+read_size) - st->op->offset;
if (read_offset+read_size < st->op->offset+st->op->len)
{
st->op->iov.push_back(st->self->parent->kvfs->scrap_block.data(),
st->op->offset+st->op->len - (read_offset+read_size));
}
}
else
st->op->len = st->aligned_size;
st->op->iov.push_back(st->aligned_buf, st->op->len);
st->op->callback = [st, state](cluster_op_t *op)
{
st->res = op->retval == op->len ? 0 : op->retval;
@ -80,6 +109,8 @@ resume_1:
resume_2:
if (st->res < 0)
{
free(st->aligned_buf);
st->aligned_buf = NULL;
auto cb = std::move(st->cb);
cb(st->res);
return;
@ -91,22 +122,21 @@ resume_2:
free(st->aligned_buf);
st->aligned_buf = NULL;
st->allow_cache = false;
nfs_kv_continue_read(st, 0);
return;
goto resume_0;
}
auto cb = std::move(st->cb);
cb(0);
return;
}
}
st->aligned_offset = (st->offset & ~(st->self->parent->pool_alignment-1));
st->aligned_size = ((st->offset + st->size + st->self->parent->pool_alignment-1) &
~(st->self->parent->pool_alignment-1)) - st->aligned_offset;
st->aligned_offset = align_down(st->offset);
st->aligned_size = align_up(st->offset+st->size) - st->aligned_offset;
assert(!st->aligned_buf);
st->aligned_buf = (uint8_t*)malloc_or_die(st->aligned_size);
st->buf = st->aligned_buf + st->offset - st->aligned_offset;
st->op = new cluster_op_t;
st->op->opcode = OSD_OP_READ;
st->op->inode = st->self->parent->fs_base_inode + st->ino;
st->op->inode = st->self->parent->kvfs->fs_base_inode + st->ino;
st->op->offset = st->aligned_offset;
st->op->len = st->aligned_size;
st->op->iov.push_back(st->aligned_buf, st->aligned_size);
@ -119,6 +149,11 @@ resume_2:
st->self->parent->cli->execute(st->op);
return;
resume_3:
if (st->res < 0)
{
free(st->aligned_buf);
st->aligned_buf = NULL;
}
auto cb = std::move(st->cb);
cb(st->res < 0 ? st->res : 0);
return;
@ -151,7 +186,7 @@ int kv_nfs3_read_proc(void *opaque, rpc_op_t *rop)
reply->resok.data.data = (char*)st->buf;
reply->resok.data.size = st->size;
reply->resok.count = st->size;
reply->resok.eof = 0;
reply->resok.eof = st->eof;
}
rpc_queue_reply(st->rop);
delete st;

View File

@ -46,7 +46,7 @@ static void nfs_kv_continue_readdir(nfs_kv_readdir_state *st, int state);
static void kv_getattr_next(nfs_kv_readdir_state *st)
{
while (st->is_plus && st->getattr_cur < st->entries.size() && st->getattr_running < st->self->parent->readdir_getattr_parallel)
while (st->is_plus && st->getattr_cur < st->entries.size() && st->getattr_running < st->self->parent->kvfs->readdir_getattr_parallel)
{
auto idx = st->getattr_cur++;
st->getattr_running++;

View File

@ -22,6 +22,7 @@ struct kv_del_state
int type = 0;
bool is_rmdir = false;
bool rm_data = false;
bool allow_cache = true;
int res = 0, res2 = 0;
std::function<void(int)> cb;
};
@ -48,12 +49,13 @@ static void nfs_kv_continue_delete(kv_del_state *st, int state)
fprintf(stderr, "BUG: invalid state in nfs_kv_continue_delete()");
abort();
}
resume_0:
st->self->parent->db->get(kv_direntry_key(st->dir_ino, st->filename), [st](int res, const std::string & value)
{
st->res = res;
st->direntry_text = value;
nfs_kv_continue_delete(st, 1);
});
}, st->allow_cache);
return;
resume_1:
if (st->res < 0)
@ -82,7 +84,7 @@ resume_1:
st->res = res;
st->ientry_text = value;
nfs_kv_continue_delete(st, 2);
});
}, st->allow_cache);
return;
resume_2:
if (st->res < 0)
@ -124,8 +126,8 @@ resume_3:
if (st->res == -EAGAIN)
{
// CAS failure, restart from the beginning
nfs_kv_continue_delete(st, 0);
return;
st->allow_cache = false;
goto resume_0;
}
else if (st->res < 0 && st->res != -ENOENT)
{
@ -229,8 +231,8 @@ resume_6:
{
// Remove data
st->self->parent->cmd->loop_and_wait(st->self->parent->cmd->start_rm_data(json11::Json::object {
{ "inode", INODE_NO_POOL(st->self->parent->fs_base_inode + st->ino) },
{ "pool", (uint64_t)INODE_POOL(st->self->parent->fs_base_inode + st->ino) },
{ "inode", INODE_NO_POOL(st->self->parent->kvfs->fs_base_inode + st->ino) },
{ "pool", (uint64_t)INODE_POOL(st->self->parent->kvfs->fs_base_inode + st->ino) },
}), [st](const cli_result_t & r)
{
if (r.err)

View File

@ -7,28 +7,46 @@
#include "nfs_proxy.h"
#include "nfs_kv.h"
#include "cli.h"
struct nfs_kv_rename_state
{
nfs_client_t *self = NULL;
rpc_op_t *rop = NULL;
// params:
uint64_t old_dir_ino = 0, new_dir_ino = 0;
std::string old_name, new_name;
std::string old_direntry_text;
std::string ientry_text;
json11::Json direntry, ientry;
// state:
bool allow_cache = true;
std::string old_direntry_text, old_ientry_text, new_direntry_text, new_ientry_text;
json11::Json old_direntry, old_ientry, new_direntry, new_ientry;
std::string new_dir_prefix;
void *list_handle = NULL;
bool new_exists = false;
bool rm_dest_data = false;
int res = 0, res2 = 0;
std::function<void(int)> cb;
};
static void nfs_kv_continue_rename(nfs_kv_rename_state *st, int state)
{
// Simplified algorithm (non-atomic and without ENOTDIR/EISDIR):
// 1) Check if the target directory exists
// 2) Delete & save (using CAS) the source direntry
// 3) Write the target direntry using CAS, fail if it already exists
// 4) Restore the source direntry on failure
// Atomic version would require something like a journal
// Algorithm (non-atomic of course):
// 1) Read source direntry
// 2) Read destination direntry
// 3) If destination exists:
// 3.1) Check file/folder compatibility (EISDIR/ENOTDIR)
// 3.2) Check if destination is empty if it's a folder
// 4) If not:
// 4.1) Check that the destination directory is actually a directory
// 5) Overwrite destination direntry, restart from beginning if CAS failure
// 6) Delete source direntry, restart from beginning if CAS failure
// 7) If the moved direntry was a regular file:
// 7.1) Read inode
// 7.2) Delete inode if its link count <= 1
// 7.3) Delete inode data if its link count <= 1 and it's a regular non-shared file
// 7.4) Reduce link count by 1 if it's > 1
// 8) If the moved direntry is a directory:
// 8.1) Change parent_ino reference in its inode
if (state == 0) {}
else if (state == 1) goto resume_1;
else if (state == 2) goto resume_2;
@ -36,37 +54,27 @@ static void nfs_kv_continue_rename(nfs_kv_rename_state *st, int state)
else if (state == 4) goto resume_4;
else if (state == 5) goto resume_5;
else if (state == 6) goto resume_6;
else if (state == 7) goto resume_7;
else if (state == 8) goto resume_8;
else if (state == 9) goto resume_9;
else if (state == 10) goto resume_10;
else if (state == 11) goto resume_11;
else if (state == 12) goto resume_12;
else
{
fprintf(stderr, "BUG: invalid state in nfs_kv_continue_rename()");
abort();
}
kv_read_inode(st->self, st->new_dir_ino, [st](int res, const std::string & value, json11::Json attrs)
resume_0:
// Read the old direntry
st->self->parent->db->get(kv_direntry_key(st->old_dir_ino, st->old_name), [=](int res, const std::string & value)
{
st->res = res == 0 ? (attrs["type"].string_value() == "dir" ? 0 : -ENOTDIR) : res;
st->res = res;
st->old_direntry_text = value;
nfs_kv_continue_rename(st, 1);
});
}, st->allow_cache);
return;
resume_1:
if (st->res < 0)
{
auto cb = std::move(st->cb);
cb(st->res);
return;
}
// Read & delete the old direntry
st->self->parent->db->del(kv_direntry_key(st->old_dir_ino, st->old_name), [st](int res)
{
st->res = res;
nfs_kv_continue_rename(st, 2);
}, [=](int res, const std::string & old_value)
{
st->res = res;
st->old_direntry_text = old_value;
return true;
});
return;
resume_2:
if (st->res < 0)
{
auto cb = std::move(st->cb);
@ -75,47 +83,85 @@ resume_2:
}
{
std::string err;
st->direntry = json11::Json::parse(st->old_direntry_text, err);
st->old_direntry = json11::Json::parse(st->old_direntry_text, err);
if (err != "")
{
fprintf(stderr, "Invalid JSON in direntry %s = %s: %s\n",
kv_direntry_key(st->old_dir_ino, st->old_name).c_str(),
st->old_direntry_text.c_str(), err.c_str());
auto cb = std::move(st->cb);
cb(-EIO);
return;
}
}
if (st->direntry["type"].string_value() == "dir" &&
st->direntry["ino"].uint64_value() != 0 &&
st->new_dir_ino != st->old_dir_ino)
{
// Read & check inode
kv_read_inode(st->self, st->direntry["ino"].uint64_value(), [st](int res, const std::string & value, json11::Json ientry)
// Read the new direntry
st->self->parent->db->get(kv_direntry_key(st->new_dir_ino, st->new_name), [=](int res, const std::string & value)
{
st->res = res;
st->ientry_text = value;
st->ientry = ientry;
nfs_kv_continue_rename(st, 3);
});
st->new_direntry_text = value;
nfs_kv_continue_rename(st, 2);
}, st->allow_cache);
return;
resume_3:
if (st->res < 0)
resume_2:
if (st->res < 0 && st->res != -ENOENT)
{
auto cb = std::move(st->cb);
cb(st->res);
return;
}
// Change parent reference
if (st->res == 0)
{
auto ientry_new = st->ientry.object_items();
ientry_new["parent_ino"] = st->new_dir_ino;
st->self->parent->db->set(kv_inode_key(st->direntry["ino"].uint64_value()), json11::Json(ientry_new).dump(), [st](int res)
std::string err;
st->new_direntry = json11::Json::parse(st->new_direntry_text, err);
if (err != "")
{
fprintf(stderr, "Invalid JSON in direntry %s = %s: %s\n",
kv_direntry_key(st->new_dir_ino, st->new_name).c_str(),
st->new_direntry_text.c_str(), err.c_str());
auto cb = std::move(st->cb);
cb(-EIO);
return;
}
}
st->new_exists = st->res == 0;
if (st->new_exists)
{
// Check file/folder compatibility (EISDIR/ENOTDIR)
if ((st->old_direntry["type"] == "dir") != (st->new_direntry["type"] == "dir"))
{
auto cb = std::move(st->cb);
cb((st->new_direntry["type"] == "dir") ? -ENOTDIR : -EISDIR);
return;
}
if (st->new_direntry["type"] == "dir")
{
// Check that the destination directory is empty
st->new_dir_prefix = kv_direntry_key(st->new_direntry["ino"].uint64_value(), "");
st->list_handle = st->self->parent->db->list_start(st->new_dir_prefix);
st->self->parent->db->list_next(st->list_handle, [st](int res, const std::string & key, const std::string & value)
{
st->res = res;
nfs_kv_continue_rename(st, 4);
}, [st](int res, const std::string & old_value)
{
return old_value == st->ientry_text;
nfs_kv_continue_rename(st, 3);
});
return;
resume_3:
st->self->parent->db->list_close(st->list_handle);
if (st->res != -ENOENT)
{
auto cb = std::move(st->cb);
cb(-ENOTEMPTY);
return;
}
}
}
else
{
// Check that the new directory is actually a directory
kv_read_inode(st->self, st->new_dir_ino, [st](int res, const std::string & value, json11::Json attrs)
{
st->res = res == 0 ? (attrs["type"].string_value() == "dir" ? 0 : -ENOTDIR) : res;
nfs_kv_continue_rename(st, 4);
});
return;
resume_4:
if (st->res < 0)
@ -125,43 +171,183 @@ resume_4:
return;
}
}
// Write the new direntry
st->self->parent->db->set(kv_direntry_key(st->new_dir_ino, st->new_name), st->old_direntry_text, [st](int res)
{
st->res = res;
nfs_kv_continue_rename(st, 5);
}, [st](int res, const std::string & old_value)
{
return res == -ENOENT;
return st->new_exists ? (old_value == st->new_direntry_text) : (res == -ENOENT);
});
return;
resume_5:
if (st->res == -EAGAIN)
{
// CAS failure
st->allow_cache = false;
goto resume_0;
}
if (st->res < 0)
{
if (st->res == -EAGAIN)
st->res = -EEXIST;
st->res2 = st->res;
st->self->parent->db->set(kv_direntry_key(st->old_dir_ino, st->old_name), st->old_direntry_text, [st](int res)
auto cb = std::move(st->cb);
cb(st->res);
return;
}
// Delete the old direntry
st->self->parent->db->del(kv_direntry_key(st->old_dir_ino, st->old_name), [st](int res)
{
st->res = res;
nfs_kv_continue_rename(st, 6);
}, [st](int res, const std::string & old_value)
}, [=](int res, const std::string & old_value)
{
return res == -ENOENT;
return res == 0 && old_value == st->old_direntry_text;
});
return;
resume_6:
if (st->res == -EAGAIN)
{
// CAS failure
st->allow_cache = false;
goto resume_0;
}
if (st->res < 0)
{
if (st->res == -EAGAIN)
st->res = -EEXIST;
fprintf(stderr, "error restoring %s = %s after failed rename: %s (code %d)\n",
kv_direntry_key(st->old_dir_ino, st->old_name).c_str(), st->old_direntry_text.c_str(),
strerror(-st->res), st->res);
}
auto cb = std::move(st->cb);
cb(st->res2);
cb(st->res);
return;
}
st->allow_cache = true;
resume_7again:
if (st->new_exists && st->new_direntry["type"].string_value() != "dir")
{
// (Maybe) delete old destination file data
kv_read_inode(st->self, st->new_direntry["ino"].uint64_value(), [st](int res, const std::string & value, json11::Json attrs)
{
st->res = res;
st->new_ientry_text = value;
st->new_ientry = attrs;
nfs_kv_continue_rename(st, 7);
}, st->allow_cache);
return;
resume_7:
if (st->res == 0)
{
// (5) Reduce inode refcount by 1 or delete inode
if (st->new_ientry["nlink"].uint64_value() > 1)
{
auto copy = st->new_ientry.object_items();
copy["nlink"] = st->new_ientry["nlink"].uint64_value()-1;
st->self->parent->db->set(kv_inode_key(st->new_direntry["ino"].uint64_value()), json11::Json(copy).dump(), [st](int res)
{
st->res = res;
nfs_kv_continue_rename(st, 8);
}, [st](int res, const std::string & old_value)
{
return old_value == st->new_ientry_text;
});
}
else
{
st->rm_dest_data = kv_map_type(st->new_ientry["type"].string_value()) == NF3REG
&& !st->new_ientry["shared_ino"].uint64_value();
st->self->parent->db->del(kv_inode_key(st->new_direntry["ino"].uint64_value()), [st](int res)
{
st->res = res;
nfs_kv_continue_rename(st, 8);
}, [st](int res, const std::string & old_value)
{
return old_value == st->new_ientry_text;
});
}
return;
resume_8:
if (st->res == -EAGAIN)
{
// CAS failure - re-read inode
st->allow_cache = false;
goto resume_7again;
}
if (st->res < 0)
{
auto cb = std::move(st->cb);
cb(st->res);
return;
}
// Delete inode data if required
if (st->rm_dest_data)
{
st->self->parent->cmd->loop_and_wait(st->self->parent->cmd->start_rm_data(json11::Json::object {
{ "inode", INODE_NO_POOL(st->self->parent->kvfs->fs_base_inode + st->new_direntry["ino"].uint64_value()) },
{ "pool", (uint64_t)INODE_POOL(st->self->parent->kvfs->fs_base_inode + st->new_direntry["ino"].uint64_value()) },
}), [st](const cli_result_t & r)
{
if (r.err)
{
fprintf(stderr, "Failed to remove inode %jx data: %s (code %d)\n",
st->new_direntry["ino"].uint64_value(), r.text.c_str(), r.err);
}
st->res = r.err;
nfs_kv_continue_rename(st, 9);
});
return;
resume_9:
if (st->res < 0)
{
auto cb = std::move(st->cb);
cb(st->res);
return;
}
}
}
}
if (st->old_direntry["type"].string_value() == "dir" && st->new_dir_ino != st->old_dir_ino)
{
// Change parent_ino in old ientry
st->allow_cache = true;
resume_10:
kv_read_inode(st->self, st->old_direntry["ino"].uint64_value(), [st](int res, const std::string & value, json11::Json ientry)
{
st->res = res;
st->old_ientry_text = value;
st->old_ientry = ientry;
nfs_kv_continue_rename(st, 11);
}, st->allow_cache);
return;
resume_11:
if (st->res < 0)
{
auto cb = std::move(st->cb);
cb(st->res);
return;
}
{
auto ientry_new = st->old_ientry.object_items();
ientry_new["parent_ino"] = st->new_dir_ino;
st->self->parent->db->set(kv_inode_key(st->old_direntry["ino"].uint64_value()), json11::Json(ientry_new).dump(), [st](int res)
{
st->res = res;
nfs_kv_continue_rename(st, 12);
}, [st](int res, const std::string & old_value)
{
return old_value == st->old_ientry_text;
});
}
return;
resume_12:
if (st->res == -EAGAIN)
{
// CAS failure - try again
st->allow_cache = false;
goto resume_10;
}
if (st->res < 0)
{
auto cb = std::move(st->cb);
cb(st->res);
return;
}
}
auto cb = std::move(st->cb);
cb(st->res);
}

View File

@ -25,6 +25,7 @@ struct nfs_kv_setattr_state
static void nfs_kv_continue_setattr(nfs_kv_setattr_state *st, int state)
{
// FIXME: NFS client does a lot of setattr calls, so maybe process them asynchronously
if (state == 0) {}
else if (state == 1) goto resume_1;
else if (state == 2) goto resume_2;
@ -50,8 +51,7 @@ resume_1:
cb(st->res);
return;
}
if (st->ientry["type"].string_value() == "link" ||
st->ientry["type"].string_value() != "file" &&
if (st->ientry["type"].string_value() != "file" &&
st->ientry["type"].string_value() != "" &&
!st->set_attrs["size"].is_null())
{
@ -104,8 +104,8 @@ resume_2:
{
// Delete extra data when downsizing
st->self->parent->cmd->loop_and_wait(st->self->parent->cmd->start_rm_data(json11::Json::object {
{ "inode", INODE_NO_POOL(st->self->parent->fs_base_inode + st->ino) },
{ "pool", (uint64_t)INODE_POOL(st->self->parent->fs_base_inode + st->ino) },
{ "inode", INODE_NO_POOL(st->self->parent->kvfs->fs_base_inode + st->ino) },
{ "pool", (uint64_t)INODE_POOL(st->self->parent->kvfs->fs_base_inode + st->ino) },
{ "min_offset", st->set_attrs["size"].uint64_value() },
}), [st](const cli_result_t & r)
{
@ -121,7 +121,7 @@ resume_2:
}
resume_3:
auto cb = std::move(st->cb);
cb(0);
cb(st->res);
}
int kv_nfs3_setattr_proc(void *opaque, rpc_op_t *rop)

View File

@ -8,6 +8,9 @@
#include "nfs_proxy.h"
#include "nfs_kv.h"
// FIXME: Implement shared inode defragmentator
// FIXME: Implement fsck for vitastor-fs and for vitastor-kv
struct nfs_rmw_t
{
nfs_kv_write_state *st = NULL;
@ -38,9 +41,11 @@ struct nfs_kv_write_state
uint64_t new_size = 0;
uint64_t aligned_size = 0;
uint8_t *aligned_buf = NULL;
uint64_t shared_inode = 0, shared_offset = 0;
// new shared parameters
uint64_t shared_inode = 0, shared_offset = 0, shared_alloc = 0;
bool was_immediate = false;
nfs_rmw_t rmw[2];
shared_file_header_t shdr;
kv_inode_extend_t *ext = NULL;
~nfs_kv_write_state()
@ -53,30 +58,51 @@ struct nfs_kv_write_state
}
};
#define align_down(size) ((size) & ~(st->self->parent->kvfs->pool_alignment-1))
#define align_up(size) (((size) + st->self->parent->kvfs->pool_alignment-1) & ~(st->self->parent->kvfs->pool_alignment-1))
static void nfs_kv_continue_write(nfs_kv_write_state *st, int state);
static void allocate_shared_space(nfs_kv_write_state *st)
{
auto kvfs = st->self->parent->kvfs;
st->shared_inode = kvfs->cur_shared_inode;
if (st->new_size < 3*kvfs->pool_alignment - sizeof(shared_file_header_t))
{
// Allocate as is, without alignment if file is smaller than 3*4kb - 24
st->shared_offset = kvfs->cur_shared_offset;
st->shared_alloc = align_up(sizeof(shared_file_header_t) + st->new_size);
}
else
{
// Try to skip some space to store data aligned
st->shared_offset = align_up(kvfs->cur_shared_offset + sizeof(shared_file_header_t)) - sizeof(shared_file_header_t);
st->shared_alloc = sizeof(shared_file_header_t) + align_up(st->new_size);
}
st->self->parent->kvfs->cur_shared_offset = st->shared_offset + st->shared_alloc;
}
static void finish_allocate_shared(nfs_client_t *self, int res)
{
std::vector<shared_alloc_queue_t> waiting;
waiting.swap(self->parent->kvfs->allocating_shared);
for (auto & w: waiting)
{
w.st->res = res;
auto st = w.st;
st->res = res;
if (res == 0)
{
w.st->shared_inode = self->parent->kvfs->cur_shared_inode;
w.st->shared_offset = self->parent->kvfs->cur_shared_offset;
self->parent->kvfs->cur_shared_offset += (w.size + self->parent->pool_alignment-1) & ~(self->parent->pool_alignment-1);
allocate_shared_space(st);
}
nfs_kv_continue_write(w.st, w.state);
}
}
static void allocate_shared_inode(nfs_kv_write_state *st, int state, uint64_t size)
static void allocate_shared_inode(nfs_kv_write_state *st, int state)
{
if (st->self->parent->kvfs->cur_shared_inode == 0)
{
st->self->parent->kvfs->allocating_shared.push_back({ st, state, size });
st->self->parent->kvfs->allocating_shared.push_back({ st, state });
if (st->self->parent->kvfs->allocating_shared.size() > 1)
{
return;
@ -110,27 +136,19 @@ static void allocate_shared_inode(nfs_kv_write_state *st, int state, uint64_t si
else
{
st->res = 0;
st->shared_inode = st->self->parent->kvfs->cur_shared_inode;
st->shared_offset = st->self->parent->kvfs->cur_shared_offset;
st->self->parent->kvfs->cur_shared_offset += (size + st->self->parent->pool_alignment-1) & ~(st->self->parent->pool_alignment-1);
allocate_shared_space(st);
nfs_kv_continue_write(st, state);
}
}
uint64_t align_shared_size(nfs_client_t *self, uint64_t size)
{
return (size + sizeof(shared_file_header_t) + self->parent->pool_alignment-1)
& ~(self->parent->pool_alignment-1);
}
static void nfs_do_write(uint64_t ino, uint64_t offset, uint8_t *buf, uint64_t size, nfs_kv_write_state *st, int state)
static void nfs_do_write(uint64_t ino, uint64_t offset, uint64_t size, std::function<void(cluster_op_t *op)> prepare, nfs_kv_write_state *st, int state)
{
auto op = new cluster_op_t;
op->opcode = OSD_OP_WRITE;
op->inode = st->self->parent->fs_base_inode + ino;
op->inode = st->self->parent->kvfs->fs_base_inode + ino;
op->offset = offset;
op->len = size;
op->iov.push_back(buf, size);
prepare(op);
st->waiting++;
op->callback = [st, state](cluster_op_t *op)
{
@ -148,30 +166,31 @@ static void nfs_do_write(uint64_t ino, uint64_t offset, uint8_t *buf, uint64_t s
st->self->parent->cli->execute(op);
}
static void nfs_do_shared_write(nfs_kv_write_state *st, int state)
{
nfs_do_write(st->shared_inode, st->shared_offset, st->aligned_buf, st->aligned_size, st, state);
}
static void nfs_do_unshare_write(nfs_kv_write_state *st, int state)
{
nfs_do_write(st->ino, 0, st->aligned_buf + sizeof(shared_file_header_t),
st->aligned_size - sizeof(shared_file_header_t), st, state);
uint64_t size = st->ientry["size"].uint64_value();
uint64_t aligned_size = align_up(size);
nfs_do_write(st->ino, 0, aligned_size, [&](cluster_op_t *op)
{
op->iov.push_back(st->aligned_buf, size);
if (aligned_size > size)
op->iov.push_back(st->self->parent->kvfs->zero_block.data(), aligned_size-size);
}, st, state);
}
static void nfs_do_rmw(nfs_rmw_t *rmw)
{
auto parent = rmw->st->self->parent;
auto align = parent->pool_alignment;
auto align = parent->kvfs->pool_alignment;
assert(rmw->size < align);
assert((rmw->offset/parent->pool_block_size) == ((rmw->offset+rmw->size-1)/parent->pool_block_size));
assert((rmw->offset/parent->kvfs->pool_block_size) == ((rmw->offset+rmw->size-1)/parent->kvfs->pool_block_size));
if (!rmw->part_buf)
{
rmw->part_buf = (uint8_t*)malloc_or_die(align);
}
auto op = new cluster_op_t;
op->opcode = OSD_OP_READ;
op->inode = parent->fs_base_inode + rmw->ino;
op->inode = parent->kvfs->fs_base_inode + rmw->ino;
op->offset = rmw->offset & ~(align-1);
op->len = align;
op->iov.push_back(rmw->part_buf, op->len);
@ -196,7 +215,7 @@ static void nfs_do_rmw(nfs_rmw_t *rmw)
auto st = rmw->st;
rmw->version = rd_op->version+1;
if (st->rmw[0].st && st->rmw[1].st &&
st->rmw[0].offset/st->self->parent->pool_block_size == st->rmw[1].offset/st->self->parent->pool_block_size)
st->rmw[0].offset/st->self->parent->kvfs->pool_block_size == st->rmw[1].offset/st->self->parent->kvfs->pool_block_size)
{
// Same block... RMWs should be sequential
int other = rmw == &st->rmw[0] ? 1 : 0;
@ -204,12 +223,12 @@ static void nfs_do_rmw(nfs_rmw_t *rmw)
}
}
auto parent = rmw->st->self->parent;
auto align = parent->pool_alignment;
auto align = parent->kvfs->pool_alignment;
bool is_begin = (rmw->offset % align);
bool is_end = ((rmw->offset+rmw->size) % align);
auto op = new cluster_op_t;
op->opcode = OSD_OP_WRITE;
op->inode = rmw->st->self->parent->fs_base_inode + rmw->ino;
op->inode = rmw->st->self->parent->kvfs->fs_base_inode + rmw->ino;
op->offset = rmw->offset & ~(align-1);
op->len = align;
op->version = rmw->version;
@ -256,17 +275,55 @@ static void nfs_do_rmw(nfs_rmw_t *rmw)
static void nfs_do_shared_read(nfs_kv_write_state *st, int state)
{
uint64_t data_size = st->ientry["size"].uint64_value();
if (!data_size)
{
nfs_kv_continue_write(st, state);
return;
}
assert(!st->aligned_buf);
st->aligned_buf = (uint8_t*)malloc_or_die(data_size);
uint64_t shared_offset = st->ientry["shared_offset"].uint64_value();
auto op = new cluster_op_t;
op->opcode = OSD_OP_READ;
op->inode = st->self->parent->fs_base_inode + st->ientry["shared_ino"].uint64_value();
op->offset = st->ientry["shared_offset"].uint64_value();
op->len = align_shared_size(st->self, st->ientry["size"].uint64_value());
op->iov.push_back(st->aligned_buf, op->len);
op->inode = st->self->parent->kvfs->fs_base_inode + st->ientry["shared_ino"].uint64_value();
op->offset = align_down(shared_offset);
// Allow unaligned shared reads
auto pre = shared_offset-align_down(shared_offset);
if (pre > 0)
{
op->iov.push_back(st->self->parent->kvfs->scrap_block.data(), pre);
}
op->iov.push_back(&st->shdr, sizeof(shared_file_header_t));
op->iov.push_back(st->aligned_buf, data_size);
auto post = (shared_offset+sizeof(shared_file_header_t)+data_size);
post = align_up(post) - post;
if (post > 0)
{
op->iov.push_back(st->self->parent->kvfs->scrap_block.data(), post);
}
op->len = pre+sizeof(shared_file_header_t)+data_size+post;
op->callback = [st, state](cluster_op_t *op)
{
st->res = op->retval == op->len ? 0 : op->retval;
st->res = op->retval == op->len ? 0 : (op->retval > 0 ? -EIO : op->retval);
delete op;
if (st->shdr.magic != SHARED_FILE_MAGIC_V1 || st->shdr.inode != st->ino)
{
// Got unrelated data - retry from the beginning
st->allow_cache = false;
free(st->aligned_buf);
st->aligned_buf = NULL;
nfs_kv_continue_write(st, 0);
}
else
{
if (st->res < 0)
{
free(st->aligned_buf);
st->aligned_buf = NULL;
}
nfs_kv_continue_write(st, state);
}
};
st->self->parent->cli->execute(op);
}
@ -287,59 +344,128 @@ static void nfs_do_fsync(nfs_kv_write_state *st, int state)
static bool nfs_do_shared_readmodify(nfs_kv_write_state *st, int base_state, int state, bool unshare)
{
assert(state <= base_state);
if (state < base_state) {}
else if (state == base_state) goto resume_0;
assert(!st->aligned_buf);
st->aligned_size = unshare
? sizeof(shared_file_header_t) + ((st->new_size + st->self->parent->pool_alignment-1) & ~(st->self->parent->pool_alignment-1))
: align_shared_size(st->self, st->new_size);
st->aligned_buf = (uint8_t*)malloc_or_die(st->aligned_size);
memset(st->aligned_buf + sizeof(shared_file_header_t), 0, st->offset);
if (state < base_state) goto resume_0;
else if (state == base_state) goto resume_1;
resume_0:
if (st->ientry["shared_ino"].uint64_value() != 0 &&
st->ientry["size"].uint64_value() != 0)
st->ientry["size"].uint64_value() != 0 &&
(st->offset > 0 || (st->offset+st->size) < st->new_size))
{
// Read old data if shared non-empty
// Read old data if shared non-empty and not fully overwritten
nfs_do_shared_read(st, base_state);
return false;
resume_0:
resume_1:
if (st->res < 0)
{
auto cb = std::move(st->cb);
cb(st->res);
return false;
}
auto hdr = ((shared_file_header_t*)st->aligned_buf);
if (hdr->magic != SHARED_FILE_MAGIC_V1 || hdr->inode != st->ino)
{
// Got unrelated data - retry from the beginning
st->allow_cache = false;
free(st->aligned_buf);
st->aligned_buf = NULL;
nfs_kv_continue_write(st, 0);
return false;
}
}
*((shared_file_header_t*)st->aligned_buf) = {
.magic = SHARED_FILE_MAGIC_V1,
.inode = st->ino,
.alloc = st->aligned_size,
};
memcpy(st->aligned_buf + sizeof(shared_file_header_t) + st->offset, st->buf, st->size);
memset(st->aligned_buf + sizeof(shared_file_header_t) + st->offset + st->size, 0,
st->aligned_size - sizeof(shared_file_header_t) - st->offset - st->size);
return true;
}
static void nfs_do_align_write(nfs_kv_write_state *st, uint64_t ino, uint64_t offset, int state)
static void add_zero(cluster_op_t *op, uint64_t count, std::vector<uint8_t> & zero_buf)
{
auto alignment = st->self->parent->pool_alignment;
while (count > zero_buf.size())
{
op->iov.push_back(zero_buf.data(), zero_buf.size());
count -= zero_buf.size();
}
if (count > 0)
{
op->iov.push_back(zero_buf.data(), count);
count = 0;
}
}
static void nfs_do_shared_write(nfs_kv_write_state *st, int state)
{
st->shdr = {
.magic = SHARED_FILE_MAGIC_V1,
.inode = st->ino,
.alloc = st->shared_alloc,
};
bool unaligned_is_free = true;
uint64_t write_offset = st->shared_offset;
uint64_t write_size = sizeof(shared_file_header_t) + st->new_size;
uint64_t aligned_offset = write_offset;
uint64_t aligned_size = write_size;
if (unaligned_is_free)
{
// Zero pad if "unaligned but aligned"
aligned_offset = align_down(write_offset);
aligned_size = align_up(write_offset+write_size) - aligned_offset;
}
// FIXME: Do RMW if unaligned_is_free == false i.e. if we want tighter packing
bool has_old = st->ientry["shared_ino"].uint64_value() != 0 &&
st->ientry["size"].uint64_value() != 0;
nfs_do_write(st->shared_inode, aligned_offset, aligned_size, [&](cluster_op_t *op)
{
if (unaligned_is_free && aligned_offset < write_offset)
{
// zero padding
op->iov.push_back(st->self->parent->kvfs->zero_block.data(), write_offset-aligned_offset);
}
// header
op->iov.push_back(&st->shdr, sizeof(shared_file_header_t));
if (st->offset > 0)
{
if (has_old)
{
// old data
op->iov.push_back(st->aligned_buf, st->offset);
}
else
add_zero(op, st->offset, st->self->parent->kvfs->zero_block);
}
// new data
op->iov.push_back(st->buf, st->size);
if (st->offset+st->size < st->new_size)
{
if (has_old)
{
// old data
op->iov.push_back(st->aligned_buf+st->offset+st->size, st->new_size-(st->offset+st->size));
}
else
add_zero(op, st->offset, st->self->parent->kvfs->zero_block);
}
if (unaligned_is_free && (aligned_size+aligned_offset) > (write_size+write_offset))
{
// zero padding
op->iov.push_back(st->self->parent->kvfs->zero_block.data(), aligned_size+aligned_offset - (write_size+write_offset));
}
}, st, state);
}
static void nfs_do_align_write(nfs_kv_write_state *st, uint64_t ino, uint64_t offset, uint64_t shared_alloc, int state)
{
auto alignment = st->self->parent->kvfs->pool_alignment;
uint64_t end = (offset+st->size);
uint8_t *good_buf = st->buf;
uint64_t good_offset = offset;
uint64_t good_size = st->size;
bool begin_shdr = false;
uint64_t end_pad = 0;
st->waiting++;
st->rmw[0].st = NULL;
st->rmw[1].st = NULL;
if (offset % alignment)
{
if (shared_alloc && st->offset == 0 && (offset % alignment) == sizeof(shared_file_header_t))
{
// RMW can be skipped at shared beginning
st->shdr = {
.magic = SHARED_FILE_MAGIC_V1,
.inode = st->ino,
.alloc = shared_alloc,
};
begin_shdr = true;
good_offset -= sizeof(shared_file_header_t);
offset = 0;
}
else
{
// Requires read-modify-write in the beginning
auto s = (alignment - (offset % alignment));
@ -360,35 +486,48 @@ static void nfs_do_align_write(nfs_kv_write_state *st, uint64_t ino, uint64_t of
.buf = st->buf,
.size = s,
};
// FIXME: skip rmw at shared beginning
nfs_do_rmw(&st->rmw[0]);
}
if ((offset+st->size) % alignment)
}
if ((end % alignment) &&
(offset == 0 || end/alignment > (offset-1)/alignment))
{
// Requires read-modify-write in the end
auto s = ((offset+st->size) % alignment);
assert(st->offset+st->size <= st->new_size);
if (st->offset+st->size == st->new_size)
{
// rmw can be skipped at end - we can just zero pad the request
end_pad = alignment - (end % alignment);
}
else
{
auto s = (end % alignment);
if (good_size > s)
good_size -= s;
else
good_size = 0;
if ((offset+st->size)/alignment > offset/alignment)
{
st->rmw[1] = {
.st = st,
.continue_state = state,
.ino = ino,
.offset = offset + st->size-s,
.offset = end - s,
.buf = st->buf + st->size - s,
.size = s,
};
// FIXME: skip rmw at end
nfs_do_rmw(&st->rmw[1]);
}
}
if (good_size > 0)
if (good_size > 0 || end_pad > 0 || begin_shdr)
{
// Normal write
nfs_do_write(ino, good_offset, good_buf, good_size, st, state);
nfs_do_write(ino, good_offset, (begin_shdr ? sizeof(shared_file_header_t) : 0)+good_size+end_pad, [&](cluster_op_t *op)
{
if (begin_shdr)
op->iov.push_back(&st->shdr, sizeof(shared_file_header_t));
op->iov.push_back(good_buf, good_size);
if (end_pad)
op->iov.push_back(st->self->parent->kvfs->zero_block.data(), end_pad);
}, st, state);
}
st->waiting--;
if (!st->waiting)
@ -406,6 +545,7 @@ static std::string new_normal_ientry(nfs_kv_write_state *st)
ni.erase("shared_alloc");
ni.erase("shared_ver");
ni["size"] = st->ext->cur_extend;
ni["mtime"] = nfstime_now_str();
return json11::Json(ni).dump();
}
@ -418,6 +558,7 @@ static std::string new_moved_ientry(nfs_kv_write_state *st)
ni["shared_alloc"] = st->aligned_size;
ni.erase("shared_ver");
ni["size"] = st->new_size;
ni["mtime"] = nfstime_now_str();
return json11::Json(ni).dump();
}
@ -426,6 +567,7 @@ static std::string new_shared_ientry(nfs_kv_write_state *st)
auto ni = st->ientry.object_items();
ni.erase("empty");
ni["size"] = st->new_size;
ni["mtime"] = nfstime_now_str();
ni["shared_ver"] = ni["shared_ver"].uint64_value()+1;
return json11::Json(ni).dump();
}
@ -438,6 +580,7 @@ static std::string new_unshared_ientry(nfs_kv_write_state *st)
ni.erase("shared_offset");
ni.erase("shared_alloc");
ni.erase("shared_ver");
ni["mtime"] = nfstime_now_str();
return json11::Json(ni).dump();
}
@ -580,7 +723,6 @@ static void nfs_kv_continue_write(nfs_kv_write_state *st, int state)
else if (state == 3) goto resume_3;
else if (state == 4) goto resume_4;
else if (state == 5) goto resume_5;
else if (state == 6) goto resume_6;
else if (state == 7) goto resume_7;
else if (state == 8) goto resume_8;
else if (state == 9) goto resume_9;
@ -618,23 +760,23 @@ resume_1:
cb(st->res == 0 ? -EINVAL : st->res);
return;
}
st->was_immediate = st->self->parent->cli->get_immediate_commit(st->self->parent->fs_base_inode + st->ino);
st->was_immediate = st->self->parent->cli->get_immediate_commit(st->self->parent->kvfs->fs_base_inode + st->ino);
st->new_size = st->ientry["size"].uint64_value();
if (st->new_size < st->offset + st->size)
{
st->new_size = st->offset + st->size;
}
if (st->offset + st->size + sizeof(shared_file_header_t) < st->self->parent->shared_inode_threshold)
if (st->offset + st->size + sizeof(shared_file_header_t) < st->self->parent->kvfs->shared_inode_threshold)
{
if (st->ientry["size"].uint64_value() == 0 &&
st->ientry["shared_ino"].uint64_value() == 0 ||
st->ientry["empty"].bool_value() &&
(st->ientry["size"].uint64_value() + sizeof(shared_file_header_t)) < st->self->parent->shared_inode_threshold ||
(st->ientry["size"].uint64_value() + sizeof(shared_file_header_t)) < st->self->parent->kvfs->shared_inode_threshold ||
st->ientry["shared_ino"].uint64_value() != 0 &&
st->ientry["shared_alloc"].uint64_value() < align_shared_size(st->self, st->offset+st->size))
st->ientry["shared_alloc"].uint64_value() < sizeof(shared_file_header_t)+st->offset+st->size)
{
// Either empty, or shared and requires moving into a larger place (redirect-write)
allocate_shared_inode(st, 2, st->new_size);
allocate_shared_inode(st, 2);
return;
resume_2:
if (st->res < 0)
@ -645,10 +787,17 @@ resume_2:
}
resume_3:
if (!nfs_do_shared_readmodify(st, 3, state, false))
{
return;
nfs_do_shared_write(st, 4); // FIXME assemble from parts, do not copy?
}
nfs_do_shared_write(st, 4);
return;
resume_4:
if (st->aligned_buf)
{
free(st->aligned_buf);
st->aligned_buf = NULL;
}
if (st->res < 0)
{
auto cb = std::move(st->cb);
@ -668,12 +817,6 @@ resume_5:
if (st->res < 0)
{
st->res2 = st->res;
memset(st->aligned_buf, 0, st->aligned_size);
nfs_do_shared_write(st, 6);
return;
resume_6:
free(st->aligned_buf);
st->aligned_buf = NULL;
if (st->res2 == -EAGAIN)
{
goto resume_0;
@ -689,11 +832,12 @@ resume_6:
cb(0);
return;
}
else if (st->ientry["shared_ino"].uint64_value() > 0)
else if (st->ientry["shared_ino"].uint64_value() != 0)
{
// Non-empty, shared, can be updated in-place
nfs_do_align_write(st, st->ientry["shared_ino"].uint64_value(),
st->ientry["shared_offset"].uint64_value() + sizeof(shared_file_header_t) + st->offset, 7);
st->ientry["shared_offset"].uint64_value() + sizeof(shared_file_header_t) + st->offset,
st->ientry["shared_alloc"].uint64_value(), 7);
return;
resume_7:
if (st->res == 0 && st->stable && !st->was_immediate)
@ -728,16 +872,23 @@ resume_9:
{
if (st->ientry["size"].uint64_value() != 0)
{
assert(!st->aligned_buf);
st->aligned_size = align_shared_size(st->self, st->ientry["size"].uint64_value());
st->aligned_buf = (uint8_t*)malloc_or_die(st->aligned_size);
nfs_do_shared_read(st, 10);
return;
resume_10:
nfs_do_unshare_write(st, 11);
return;
resume_11:
;
if (st->aligned_buf)
{
free(st->aligned_buf);
st->aligned_buf = NULL;
}
if (st->res < 0)
{
auto cb = std::move(st->cb);
cb(st->res);
return;
}
}
st->self->parent->db->set(kv_inode_key(st->ino), new_unshared_ientry(st), [st](int res)
{
@ -763,7 +914,7 @@ resume_12:
st->ientry_text = new_unshared_ientry(st);
}
// Non-shared write
nfs_do_align_write(st, st->ino, st->offset, 13);
nfs_do_align_write(st, st->ino, st->offset, 0, 13);
return;
resume_13:
if (st->res == 0 && st->stable && !st->was_immediate)

View File

@ -10,11 +10,18 @@
nfsstat3 vitastor_nfs_map_err(int err)
{
if (err < 0)
{
err = -err;
}
return (err == EINVAL ? NFS3ERR_INVAL
: (err == ENOENT ? NFS3ERR_NOENT
: (err == ENOSPC ? NFS3ERR_NOSPC
: (err == EEXIST ? NFS3ERR_EXIST
: (err == EIO ? NFS3ERR_IO : (err ? NFS3ERR_IO : NFS3_OK))))));
: (err == EISDIR ? NFS3ERR_ISDIR
: (err == ENOTDIR ? NFS3ERR_NOTDIR
: (err == ENOTEMPTY ? NFS3ERR_NOTEMPTY
: (err == EIO ? NFS3ERR_IO : (err ? NFS3ERR_IO : NFS3_OK)))))))));
}
int nfs3_null_proc(void *opaque, rpc_op_t *rop)

View File

@ -10,9 +10,10 @@
#include <netinet/tcp.h>
#include <sys/epoll.h>
#include <sys/wait.h>
#include <unistd.h>
#include <fcntl.h>
//#include <signal.h>
#include <signal.h>
#include "nfs/nfs.h"
#include "nfs/rpc.h"
@ -34,6 +35,10 @@ const char *exe_name = NULL;
nfs_proxy_t::~nfs_proxy_t()
{
if (kvfs)
delete kvfs;
if (blockfs)
delete blockfs;
if (db)
delete db;
if (cmd)
@ -49,26 +54,30 @@ nfs_proxy_t::~nfs_proxy_t()
delete ringloop;
}
json11::Json::object nfs_proxy_t::parse_args(int narg, const char *args[])
{
json11::Json::object cfg;
for (int i = 1; i < narg; i++)
{
if (!strcmp(args[i], "-h") || !strcmp(args[i], "--help"))
{
printf(
"Vitastor NFS 3.0 proxy\n"
"(c) Vitaliy Filippov, 2021-2022 (VNPL-1.1)\n"
static const char* help_text =
"Vitastor NFS 3.0 proxy " VERSION "\n"
"(c) Vitaliy Filippov, 2021+ (VNPL-1.1)\n"
"\n"
"USAGE:\n"
" %s [STANDARD OPTIONS] [OTHER OPTIONS]\n"
" --fs <META> mount VitastorFS with metadata in image <META>\n"
" --subdir <DIR> export images prefixed <DIR>/ (default empty - export all images)\n"
" --portmap 0 do not listen on port 111 (portmap/rpcbind, requires root)\n"
"vitastor-nfs (--fs <NAME> | --block) [-o <OPT>] mount <MOUNTPOINT>\n"
" Start local filesystem server and mount file system to <MOUNTPOINT>.\n"
" Use regular `umount <MOUNTPOINT>` to unmount the FS.\n"
" The server will be automatically stopped when the FS is unmounted.\n"
" -o|--options <OPT> Pass additional NFS mount options (ex.: -o async).\n"
"\n"
"vitastor-nfs (--fs <NAME> | --block) start\n"
" Start network NFS server. Options:\n"
" --bind <IP> bind service to <IP> address (default 0.0.0.0)\n"
" --nfspath <PATH> set NFS export path to <PATH> (default is /)\n"
" --port <PORT> use port <PORT> for NFS services (default is 2049)\n"
" --pool <POOL> use <POOL> as default pool for new files (images)\n"
" --portmap 0 do not listen on port 111 (portmap/rpcbind, requires root)\n"
"\n"
"OPTIONS:\n"
" --fs <NAME> use VitastorFS with metadata in image <NAME>\n"
" --block use pseudo-FS presenting images as files\n"
" --pool <POOL> use <POOL> as default pool for new files\n"
" --subdir <DIR> export <DIR> instead of root directory\n"
" --nfspath <PATH> set NFS export path to <PATH> (default is /)\n"
" --pidfile <FILE> write process ID to the specified file\n"
" --logfile <FILE> log to the specified file\n"
" --foreground 1 stay in foreground, do not daemonize\n"
"\n"
"NFS proxy is stateless if you use immediate_commit=all in your cluster and if\n"
@ -76,17 +85,59 @@ json11::Json::object nfs_proxy_t::parse_args(int narg, const char *args[])
"NFS proxies with L3 load balancing in this case.\n"
"\n"
"Example start and mount commands for a custom NFS port:\n"
" %s --etcd_address 192.168.5.10:2379 --portmap 0 --port 2050 --pool testpool\n"
" mount localhost:/ /mnt/ -o port=2050,mountport=2050,nfsvers=3,soft,nolock,tcp\n",
exe_name, exe_name
);
" vitastor-nfs start --block --etcd_address 192.168.5.10:2379 --portmap 0 --port 2050 --pool testpool\n"
" mount localhost:/ /mnt/ -o port=2050,mountport=2050,nfsvers=3,soft,nolock,tcp\n"
"Or just:\n"
" vitastor-nfs mount --block --pool testpool /mnt/\n"
;
json11::Json::object nfs_proxy_t::parse_args(int narg, const char *args[])
{
json11::Json::object cfg;
std::vector<std::string> cmd;
for (int i = 1; i < narg; i++)
{
if (!strcmp(args[i], "-h") || !strcmp(args[i], "--help"))
{
printf("%s", help_text);
exit(0);
}
else if (!strcmp(args[i], "-o") || !strcmp(args[i], "--options"))
{
if (i >= narg-1)
{
printf("%s", help_text);
exit(0);
}
const std::string & old = cfg["options"].string_value();
cfg["options"] = old != "" ? old+","+args[i+1] : args[i+1];
}
else if (args[i][0] == '-' && args[i][1] == '-')
{
const char *opt = args[i]+2;
cfg[opt] = !strcmp(opt, "json") || i == narg-1 ? "1" : args[++i];
cfg[opt] = !strcmp(opt, "json") || !strcmp(opt, "block") || i == narg-1 ? "1" : args[++i];
}
else
{
cmd.push_back(args[i]);
}
}
if (cfg.find("block") == cfg.end() && cfg.find("fs") == cfg.end())
{
fprintf(stderr, "Specify one of --block or --fs NAME. Use vitastor-nfs --help for details\n");
exit(1);
}
if (cmd.size() >= 2 && cmd[0] == "mount")
{
cfg["mount"] = cmd[1];
}
else if (cmd.size() >= 1 && cmd[0] == "start")
{
}
else
{
printf("%s", help_text);
exit(1);
}
return cfg;
}
@ -98,6 +149,9 @@ void nfs_proxy_t::run(json11::Json cfg)
srand48(tv.tv_sec*1000000000 + tv.tv_nsec);
server_id = (uint64_t)lrand48() | ((uint64_t)lrand48() << 31) | ((uint64_t)lrand48() << 62);
// Parse options
if (cfg["logfile"].string_value() != "")
logfile = cfg["logfile"].string_value();
pidfile = cfg["pidfile"].string_value();
trace = cfg["log_level"].uint64_value() > 5 || cfg["trace"].uint64_value() > 0;
bind_address = cfg["bind"].string_value();
if (bind_address == "")
@ -110,18 +164,6 @@ void nfs_proxy_t::run(json11::Json cfg)
export_root = cfg["nfspath"].string_value();
if (!export_root.size())
export_root = "/";
name_prefix = cfg["subdir"].string_value();
{
int e = name_prefix.size();
while (e > 0 && name_prefix[e-1] == '/')
e--;
int s = 0;
while (s < e && name_prefix[s] == '/')
s++;
name_prefix = name_prefix.substr(s, e-s);
if (name_prefix.size())
name_prefix += "/";
}
if (cfg["client_writeback_allowed"].is_null())
{
// NFS is always aware of fsync, so we allow write-back cache
@ -130,6 +172,16 @@ void nfs_proxy_t::run(json11::Json cfg)
obj["client_writeback_allowed"] = true;
cfg = obj;
}
mountpoint = cfg["mount"].string_value();
if (mountpoint != "")
{
bind_address = "127.0.0.1";
nfs_port = 0;
portmap_enabled = false;
exit_on_umount = true;
}
mountopts = cfg["options"].string_value();
fsname = cfg["fs"].string_value();
// Create client
ringloop = new ring_loop_t(RINGLOOP_DEFAULT_SIZE);
epmgr = new epoll_manager_t(ringloop);
@ -139,11 +191,6 @@ void nfs_proxy_t::run(json11::Json cfg)
cmd->epmgr = epmgr;
cmd->cli = cli;
watch_stats();
if (!fs_kv_inode)
{
blockfs = new block_fs_state_t();
blockfs->init(this);
}
// Load image metadata
while (!cli->is_ready())
{
@ -155,65 +202,15 @@ void nfs_proxy_t::run(json11::Json cfg)
// Check default pool
check_default_pool();
// Check if we're using VitastorFS
fs_kv_inode = cfg["fs"].uint64_value();
if (fs_kv_inode)
if (fsname == "")
{
if (!INODE_POOL(fs_kv_inode))
{
fprintf(stderr, "FS metadata inode number must include pool\n");
exit(1);
blockfs = new block_fs_state_t();
blockfs->init(this, cfg);
}
}
else if (cfg["fs"].is_string())
else
{
for (auto & ic: cli->st_cli.inode_config)
{
if (ic.second.name == cfg["fs"].string_value())
{
fs_kv_inode = ic.first;
break;
}
}
if (!fs_kv_inode)
{
fprintf(stderr, "FS metadata image \"%s\" does not exist\n", cfg["fs"].string_value().c_str());
exit(1);
}
}
readdir_getattr_parallel = cfg["readdir_getattr_parallel"].uint64_value();
if (!readdir_getattr_parallel)
readdir_getattr_parallel = 8;
id_alloc_batch_size = cfg["id_alloc_batch_size"].uint64_value();
if (!id_alloc_batch_size)
id_alloc_batch_size = 200;
if (fs_kv_inode)
{
// Open DB and wait
int open_res = 0;
bool open_done = false;
db = new kv_dbw_t(cli);
db->open(fs_kv_inode, cfg, [&](int res)
{
open_done = true;
open_res = res;
});
while (!open_done)
{
ringloop->loop();
if (open_done)
break;
ringloop->wait();
}
if (open_res < 0)
{
fprintf(stderr, "Failed to open key/value filesystem metadata index: %s (code %d)\n",
strerror(-open_res), open_res);
exit(1);
}
fs_base_inode = ((uint64_t)default_pool_id << (64-POOL_ID_BITS));
fs_inode_count = ((uint64_t)1 << (64-POOL_ID_BITS)) - 1;
shared_inode_threshold = pool_block_size;
kvfs = new kv_fs_state_t;
kvfs = new kv_fs_state_t();
kvfs->init(this, cfg);
}
// Self-register portmap and NFS
pmap.reg_ports.insert((portmap_id_t){
@ -245,7 +242,7 @@ void nfs_proxy_t::run(json11::Json cfg)
.addr = "0.0.0.0.0."+std::to_string(nfs_port),
});
// Create NFS socket and add it to epoll
int nfs_socket = create_and_bind_socket(bind_address, nfs_port, 128, NULL);
int nfs_socket = create_and_bind_socket(bind_address, nfs_port, 128, &listening_port);
fcntl(nfs_socket, F_SETFL, fcntl(nfs_socket, F_GETFL, 0) | O_NONBLOCK);
epmgr->tfd->set_fd_handler(nfs_socket, false, [this](int nfs_socket, int epoll_events)
{
@ -277,24 +274,43 @@ void nfs_proxy_t::run(json11::Json cfg)
}
});
}
if (mountpoint != "")
{
mount_fs();
}
if (cfg["foreground"].is_null())
{
daemonize();
}
while (true)
if (pidfile != "")
{
write_pid();
}
while (!finished)
{
ringloop->loop();
ringloop->wait();
}
// Destroy the client
cli->flush();
if (kvfs)
{
delete kvfs;
kvfs = NULL;
}
if (blockfs)
{
delete blockfs;
blockfs = NULL;
}
if (db)
{
delete db;
db = NULL;
}
delete cli;
delete epmgr;
delete ringloop;
kvfs = NULL;
db = NULL;
cli = NULL;
epmgr = NULL;
ringloop = NULL;
@ -368,7 +384,7 @@ void nfs_proxy_t::parse_stats(etcd_kv_t & kv)
inode_t inode_num = 0;
char null_byte = 0;
int scanned = sscanf(key.c_str() + cli->st_cli.etcd_prefix.length()+13, "%u/%ju%c", &pool_id, &inode_num, &null_byte);
if (scanned != 2 || !pool_id || pool_id >= POOL_ID_MAX || !inode_num)
if (scanned != 2 || !pool_id || pool_id >= POOL_ID_MAX)
{
fprintf(stderr, "Bad etcd key %s, ignoring\n", key.c_str());
}
@ -402,8 +418,6 @@ void nfs_proxy_t::check_default_pool()
auto pool_it = cli->st_cli.pool_config.begin();
default_pool_id = pool_it->first;
default_pool = pool_it->second.name;
pool_block_size = pool_it->second.pg_stripe_size;
pool_alignment = pool_it->second.bitmap_granularity;
}
else
{
@ -418,8 +432,6 @@ void nfs_proxy_t::check_default_pool()
if (p.second.name == default_pool)
{
default_pool_id = p.first;
pool_block_size = p.second.pg_stripe_size;
pool_alignment = p.second.bitmap_granularity;
break;
}
}
@ -438,12 +450,14 @@ void nfs_proxy_t::do_accept(int listen_fd)
int nfs_fd = 0;
while ((nfs_fd = accept(listen_fd, (struct sockaddr *)&addr, &addr_size)) >= 0)
{
if (trace)
fprintf(stderr, "New client %d: connection from %s\n", nfs_fd, addr_to_string(addr).c_str());
active_connections++;
fcntl(nfs_fd, F_SETFL, fcntl(nfs_fd, F_GETFL, 0) | O_NONBLOCK);
int one = 1;
setsockopt(nfs_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
auto cli = new nfs_client_t();
if (fs_kv_inode)
if (kvfs)
nfs_kv_procs(cli);
else
nfs_block_procs(cli);
@ -458,8 +472,12 @@ void nfs_proxy_t::do_accept(int listen_fd)
// Handle incoming event
if (epoll_events & EPOLLRDHUP)
{
auto parent = cli->parent;
if (parent->trace)
fprintf(stderr, "Client %d disconnected\n", nfs_fd);
cli->stop();
parent->active_connections--;
parent->check_exit();
return;
}
cli->epoll_events |= epoll_events;
@ -570,7 +588,7 @@ void nfs_client_t::handle_read(int result)
read_msg.msg_iovlen = 0;
if (deref())
return;
if (result <= 0 && result != -EAGAIN && result != -EINTR)
if (result <= 0 && result != -EAGAIN && result != -EINTR && result != -ECANCELED)
{
printf("Failed read from client %d: %d (%s)\n", nfs_fd, result, strerror(-result));
stop();
@ -665,8 +683,8 @@ void nfs_client_t::handle_read(int result)
return;
}
}
submit_read(0);
}
submit_read(0);
}
void nfs_client_t::submit_send()
@ -994,8 +1012,159 @@ void nfs_proxy_t::daemonize()
close(1);
close(2);
open("/dev/null", O_RDONLY);
open("/dev/null", O_WRONLY);
open("/dev/null", O_WRONLY);
open(logfile.c_str(), O_WRONLY|O_APPEND|O_CREAT, 0666);
open(logfile.c_str(), O_WRONLY|O_APPEND|O_CREAT, 0666);
}
void nfs_proxy_t::write_pid()
{
int fd = open(pidfile.c_str(), O_WRONLY|O_CREAT|O_TRUNC, 0666);
if (fd < 0)
{
fprintf(stderr, "Failed to create pid file %s: %s (code %d)\n", pidfile.c_str(), strerror(errno), errno);
return;
}
auto pid = std::to_string(getpid());
if (write(fd, pid.c_str(), pid.size()) < 0)
{
fprintf(stderr, "Failed to write pid to %s: %s (code %d)\n", pidfile.c_str(), strerror(errno), errno);
}
close(fd);
}
static pid_t wanted_pid = 0;
static bool child_finished = false;
static int child_status = -1;
void single_child_handler(int signal)
{
child_finished = true;
waitpid(wanted_pid, &child_status, WNOHANG);
}
void nfs_proxy_t::mount_fs()
{
check_already_mounted();
signal(SIGCHLD, single_child_handler);
auto pid = fork();
if (pid < 0)
{
fprintf(stderr, "Failed to fork: %s (code %d)\n", strerror(errno), errno);
exit(1);
}
if (pid > 0)
{
// Parent - loop and wait until child finishes
wanted_pid = pid;
while (!child_finished)
{
ringloop->loop();
ringloop->wait();
}
if (!WIFEXITED(child_status) || WEXITSTATUS(child_status) != 0)
{
// Mounting failed
exit(1);
}
if (fsname != "")
fprintf(stderr, "Successfully mounted VitastorFS %s at %s\n", fsname.c_str(), mountpoint.c_str());
else
fprintf(stderr, "Successfully mounted Vitastor pseudo-FS at %s\n", mountpoint.c_str());
finished = false;
}
else
{
// Child
std::string src = ("localhost:"+export_root);
std::string opts = ("port="+std::to_string(listening_port)+",mountport="+std::to_string(listening_port)+",nfsvers=3,nolock,tcp");
bool hard = false, async = false;
for (auto & opt: explode(",", mountopts, true))
{
if (opt == "hard")
hard = true;
else if (opt == "async")
async = true;
else if (opt.substr(0, 4) != "port" && opt.substr(0, 9) != "mountport" &&
opt.substr(0, 7) != "nfsvers" && opt.substr(0, 5) != "proto" &&
opt != "udp" && opt != "tcp" && opt != "rdma")
{
opts += ","+opt;
}
}
if (!hard)
opts += ",soft";
if (!async)
opts += ",sync";
const char *args[] = { "mount", src.c_str(), mountpoint.c_str(), "-o", opts.c_str(), NULL };
execvp("mount", (char* const*)args);
fprintf(stderr, "Failed to run mount %s %s -o %s: %s (code %d)\n",
src.c_str(), mountpoint.c_str(), opts.c_str(), strerror(errno), errno);
exit(1);
}
}
void nfs_proxy_t::check_already_mounted()
{
std::string realpoint = realpath_str(mountpoint, false);
if (realpoint == "")
{
return;
}
std::string mountstr = read_file("/proc/mounts");
if (mountstr == "")
{
return;
}
auto mounts = explode("\n", mountstr, true);
for (auto & str: mounts)
{
auto mnt = explode(" ", str, true);
if (mnt.size() >= 2 && mnt[1] == realpoint)
{
fprintf(stderr, "%s is already mounted\n", mountpoint.c_str());
exit(1);
}
}
}
void nfs_proxy_t::check_exit()
{
if (active_connections || !exit_on_umount)
{
return;
}
std::string mountstr = read_file("/proc/mounts");
if (mountstr == "")
{
return;
}
auto port_opt = "port="+std::to_string(listening_port);
auto mountport_opt = "mountport="+std::to_string(listening_port);
auto mounts = explode("\n", mountstr, true);
for (auto & str: mounts)
{
auto opts = explode(" ", str, true);
if (opts[2].size() >= 3 && opts[2].substr(0, 3) == "nfs" && opts.size() >= 4)
{
opts = explode(",", opts[3], true);
bool port_found = false;
bool addr_found = false;
for (auto & opt: opts)
{
if (opt == port_opt || opt == mountport_opt)
port_found = true;
if (opt == "addr=127.0.0.1" || opt == "mountaddr=127.0.0.1")
addr_found = true;
}
if (port_found && addr_found)
{
// OK, do not unmount
return;
}
}
}
// Not found, unmount
finished = true;
}
int main(int narg, const char *args[])

View File

@ -21,23 +21,24 @@ class nfs_proxy_t
{
public:
std::string bind_address;
std::string name_prefix;
uint64_t fsid = 1;
uint64_t server_id = 0;
std::string default_pool;
std::string export_root;
bool portmap_enabled;
unsigned nfs_port;
uint64_t fs_kv_inode = 0;
uint64_t fs_base_inode = 0;
uint64_t fs_inode_count = 0;
int readdir_getattr_parallel = 8, id_alloc_batch_size = 200;
int trace = 0;
std::string logfile = "/dev/null";
std::string pidfile;
bool exit_on_umount = false;
std::string mountpoint;
std::string mountopts;
std::string fsname;
pool_id_t default_pool_id;
uint64_t pool_block_size = 0;
uint64_t pool_alignment = 0;
uint64_t shared_inode_threshold = 0;
int active_connections = 0;
bool finished = false;
int listening_port = 0;
pool_id_t default_pool_id = 0;
portmap_service_t pmap;
ring_loop_t *ringloop = NULL;
@ -64,6 +65,10 @@ public:
void check_default_pool();
void do_accept(int listen_fd);
void daemonize();
void write_pid();
void mount_fs();
void check_already_mounted();
void check_exit();
};
struct rpc_cur_buffer_t

View File

@ -239,6 +239,7 @@ class osd_t
void report_statistics();
void report_pg_state(pg_t & pg);
void report_pg_states();
void apply_no_inode_stats();
void apply_pg_count();
void apply_pg_config();

View File

@ -388,9 +388,18 @@ void osd_t::on_change_etcd_state_hook(std::map<std::string, etcd_kv_t> & changes
etcd_global_config = changes[st_cli.etcd_prefix+"/config/global"].value.object_items();
parse_config(false);
}
bool pools = changes.find(st_cli.etcd_prefix+"/config/pools") != changes.end();
if (pools)
{
apply_no_inode_stats();
}
if (run_primary)
{
bool pgs = changes.find(st_cli.etcd_prefix+"/config/pgs") != changes.end();
if (pools || pgs)
{
apply_pg_count();
}
apply_pg_config();
}
}
@ -414,6 +423,8 @@ void osd_t::on_reload_config_hook(json11::Json::object & global_config)
// Acquire lease
void osd_t::acquire_lease()
{
// Apply no_inode_stats before the first statistics report
apply_no_inode_stats();
// Maximum lease TTL is (report interval) + retries * (timeout + repeat interval)
st_cli.etcd_call("/lease/grant", json11::Json::object {
{ "TTL", etcd_report_interval+(st_cli.max_etcd_attempts*(2*st_cli.etcd_quick_timeout)+999)/1000 }
@ -602,10 +613,31 @@ void osd_t::on_load_pgs_hook(bool success)
else
{
peering_state &= ~OSD_LOADING_PGS;
apply_no_inode_stats();
if (run_primary)
{
apply_pg_count();
apply_pg_config();
}
}
}
void osd_t::apply_no_inode_stats()
{
if (!bs)
{
return;
}
std::vector<uint64_t> no_inode_stats;
for (auto & pool_item: st_cli.pool_config)
{
if (pool_item.second.no_inode_stats)
{
no_inode_stats.push_back(pool_item.first);
}
}
bs->set_no_inode_stats(no_inode_stats);
}
void osd_t::apply_pg_count()
{

View File

@ -1,9 +1,10 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#include <assert.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include "str_util.h"
std::string base64_encode(const std::string &in)
@ -304,6 +305,23 @@ std::string read_all_fd(int fd)
return res;
}
std::string read_file(std::string file, bool allow_enoent)
{
std::string res;
int fd = open(file.c_str(), O_RDONLY);
if (fd < 0 || (res = read_all_fd(fd)) == "")
{
int err = errno;
if (fd >= 0)
close(fd);
if (!allow_enoent || err != ENOENT)
fprintf(stderr, "Failed to read %s: %s (code %d)\n", file.c_str(), strerror(err), err);
return "";
}
close(fd);
return res;
}
std::string str_repeat(const std::string & str, int times)
{
std::string r;
@ -410,3 +428,16 @@ std::string auto_addslashes(const std::string & str)
}
return res+"\"";
}
std::string realpath_str(std::string path, bool nofail)
{
char *p = realpath((char*)path.c_str(), NULL);
if (!p)
{
fprintf(stderr, "Failed to resolve %s: %s\n", path.c_str(), strerror(errno));
return nofail ? path : "";
}
std::string rp(p);
free(p);
return rp;
}

View File

@ -1,5 +1,5 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#pragma once
#include <stdint.h>
@ -18,9 +18,11 @@ std::string format_size(uint64_t size, bool nobytes = false);
void print_help(const char *help_text, std::string exe_name, std::string cmd, bool all);
uint64_t parse_time(std::string time_str, bool *ok = NULL);
std::string read_all_fd(int fd);
std::string read_file(std::string file, bool allow_enoent = false);
std::string str_repeat(const std::string & str, int times);
size_t utf8_length(const std::string & s);
size_t utf8_length(const char *s);
std::vector<std::string> explode(const std::string & sep, const std::string & value, bool trim);
std::string scan_escaped(const std::string & cmd, size_t & pos);
std::string auto_addslashes(const std::string & str);
std::string realpath_str(std::string path, bool nofail = true);

View File

@ -4,7 +4,7 @@ PG_COUNT=16
. `dirname $0`/run_3osds.sh
build/src/vitastor-cli --etcd_address $ETCD_URL create -s 10G fsmeta
build/src/vitastor-nfs --fs fsmeta --etcd_address $ETCD_URL --portmap 0 --port 2050 --foreground 1 --trace 1 >>./testdata/nfs.log 2>&1 &
build/src/vitastor-nfs start --fs fsmeta --etcd_address $ETCD_URL --portmap 0 --port 2050 --foreground 1 --trace 1 >>./testdata/nfs.log 2>&1 &
NFS_PID=$!
mkdir -p testdata/nfs
@ -40,7 +40,15 @@ cp ./testdata/nfs/f1 ./testdata/f1_nfs
diff ./testdata/f1_90k ./testdata/nfs/f1
format_green "90K data ok"
# move it inplace
# test partial shared overwrite
dd if=/dev/urandom of=./testdata/f1_90k bs=9317 count=1 seek=5 conv=notrunc
dd if=./testdata/f1_90k of=./testdata/nfs/f1 bs=9317 count=1 skip=5 seek=5 conv=notrunc
sudo umount ./testdata/nfs/
sudo mount localhost:/ ./testdata/nfs -o port=2050,mountport=2050,nfsvers=3,soft,nolock,tcp
diff ./testdata/f1_90k ./testdata/nfs/f1
format_green "partial inplace shared overwrite ok"
# move it to a larger shared space
dd if=/dev/urandom of=./testdata/f1_110k bs=110k count=1
cp testdata/f1_110k ./testdata/nfs/f1
sudo umount ./testdata/nfs/
@ -146,4 +154,14 @@ sudo mount localhost:/ ./testdata/nfs -o port=2050,mountport=2050,nfsvers=3,soft
if ls ./testdata/nfs | grep smallfile; then false; fi
format_green "rm small ok"
# rename over existing
echo ZXCVBN > ./testdata/nfs/over1
mv ./testdata/nfs/over1 ./testdata/nfs/linked2
sudo umount ./testdata/nfs/
sudo mount localhost:/ ./testdata/nfs -o port=2050,mountport=2050,nfsvers=3,soft,nolock,tcp
if ls ./testdata/nfs | grep over1; then false; fi
[[ "`cat ./testdata/nfs/linked2`" = "ZXCVBN" ]]
[[ "`cat ./testdata/nfs/linked1`" = "BABABA" ]]
format_green "rename over existing file ok"
format_green OK