Compare commits
17 Commits
e58554c298
...
f8397e1a03
Author | SHA1 | Date |
---|---|---|
Vitaliy Filippov | f8397e1a03 | |
Vitaliy Filippov | b1129c171b | |
Vitaliy Filippov | 8f077bc67c | |
Vitaliy Filippov | 5624a67c67 | |
Vitaliy Filippov | e0f36d47cc | |
Vitaliy Filippov | 5b34ffa9b6 | |
Vitaliy Filippov | 94de5618e6 | |
Vitaliy Filippov | 551160c693 | |
Vitaliy Filippov | 462845c44b | |
Vitaliy Filippov | 7b75a9bf1b | |
Vitaliy Filippov | 08fc67de05 | |
Vitaliy Filippov | c8a2082dc7 | |
Vitaliy Filippov | 871db8fe31 | |
Vitaliy Filippov | 6aed77041f | |
Vitaliy Filippov | 03d5f512bf | |
Vitaliy Filippov | 127ce8c104 | |
Vitaliy Filippov | 654386e447 |
|
@ -267,6 +267,7 @@ Optional parameters:
|
|||
| `--immediate_commit none` | Put pool only on OSDs with this or larger immediate_commit (none < small < all) |
|
||||
| `--primary_affinity_tags tags` | Prefer to put primary copies on OSDs with all specified tags |
|
||||
| `--scrub_interval <time>` | Enable regular scrubbing for this pool. Format: number + unit s/m/h/d/M/y |
|
||||
| `--no_inode_stats 1` | Disable per-inode statistics for this pool (use for VitastorFS pools) |
|
||||
| `--pg_stripe_size <number>` | Increase object grouping stripe |
|
||||
| `--max_osd_combinations 10000` | Maximum number of random combinations for LP solver input |
|
||||
| `--wait` | Wait for the new pool to come online |
|
||||
|
@ -288,7 +289,7 @@ Modify an existing pool. Modifiable parameters:
|
|||
|
||||
```
|
||||
[-s|--pg_size <number>] [--pg_minsize <number>] [-n|--pg_count <count>]
|
||||
[--failure_domain <level>] [--root_node <node>] [--osd_tags <tags>]
|
||||
[--failure_domain <level>] [--root_node <node>] [--osd_tags <tags>] [--no_inode_stats 0|1]
|
||||
[--max_osd_combinations <number>] [--primary_affinity_tags <tags>] [--scrub_interval <time>]
|
||||
```
|
||||
|
||||
|
|
|
@ -37,7 +37,7 @@ const etcd_allow = new RegExp('^'+[
|
|||
'pg/history/[1-9]\\d*/[1-9]\\d*',
|
||||
'pool/stats/[1-9]\\d*',
|
||||
'history/last_clean_pgs',
|
||||
'inode/stats/[1-9]\\d*/[1-9]\\d*',
|
||||
'inode/stats/[1-9]\\d*/\\d+',
|
||||
'pool/stats/[1-9]\\d*',
|
||||
'stats',
|
||||
'index/image/.*',
|
||||
|
@ -1737,8 +1737,11 @@ class Mon
|
|||
for (const inode_num in this.state.osd.space[osd_num][pool_id])
|
||||
{
|
||||
const u = BigInt(this.state.osd.space[osd_num][pool_id][inode_num]||0);
|
||||
if (inode_num)
|
||||
{
|
||||
inode_stats[pool_id][inode_num] = inode_stats[pool_id][inode_num] || inode_stub();
|
||||
inode_stats[pool_id][inode_num].raw_used += u;
|
||||
}
|
||||
this.state.pool.stats[pool_id].used_raw_tb += u;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -82,3 +82,8 @@ uint32_t blockstore_t::get_bitmap_granularity()
|
|||
{
|
||||
return impl->get_bitmap_granularity();
|
||||
}
|
||||
|
||||
void blockstore_t::set_no_inode_stats(const std::vector<uint64_t> & pool_ids)
|
||||
{
|
||||
impl->set_no_inode_stats(pool_ids);
|
||||
}
|
||||
|
|
|
@ -216,6 +216,9 @@ public:
|
|||
// Get per-inode space usage statistics
|
||||
std::map<uint64_t, uint64_t> & get_inode_space_stats();
|
||||
|
||||
// Set per-pool no_inode_stats
|
||||
void set_no_inode_stats(const std::vector<uint64_t> & pool_ids);
|
||||
|
||||
// Print diagnostics to stdout
|
||||
void dump_diagnostics();
|
||||
|
||||
|
|
|
@ -733,3 +733,86 @@ void blockstore_impl_t::disk_error_abort(const char *op, int retval, int expecte
|
|||
fprintf(stderr, "Disk %s failed: result is %d, expected %d. Can't continue, sorry :-(\n", op, retval, expected);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
void blockstore_impl_t::set_no_inode_stats(const std::vector<uint64_t> & pool_ids)
|
||||
{
|
||||
for (auto & np: no_inode_stats)
|
||||
{
|
||||
np.second = 2;
|
||||
}
|
||||
for (auto pool_id: pool_ids)
|
||||
{
|
||||
if (!no_inode_stats[pool_id])
|
||||
recalc_inode_space_stats(pool_id, false);
|
||||
no_inode_stats[pool_id] = 1;
|
||||
}
|
||||
for (auto np_it = no_inode_stats.begin(); np_it != no_inode_stats.end(); )
|
||||
{
|
||||
if (np_it->second == 2)
|
||||
{
|
||||
recalc_inode_space_stats(np_it->first, true);
|
||||
no_inode_stats.erase(np_it++);
|
||||
}
|
||||
else
|
||||
np_it++;
|
||||
}
|
||||
}
|
||||
|
||||
void blockstore_impl_t::recalc_inode_space_stats(uint64_t pool_id, bool per_inode)
|
||||
{
|
||||
auto sp_begin = inode_space_stats.lower_bound((pool_id << (64-POOL_ID_BITS)));
|
||||
auto sp_end = inode_space_stats.lower_bound(((pool_id+1) << (64-POOL_ID_BITS)));
|
||||
inode_space_stats.erase(sp_begin, sp_end);
|
||||
auto sh_it = clean_db_shards.lower_bound((pool_id << (64-POOL_ID_BITS)));
|
||||
while (sh_it != clean_db_shards.end() &&
|
||||
(sh_it->first >> (64-POOL_ID_BITS)) == pool_id)
|
||||
{
|
||||
for (auto & pair: sh_it->second)
|
||||
{
|
||||
uint64_t space_id = per_inode ? pair.first.inode : (pool_id << (64-POOL_ID_BITS));
|
||||
inode_space_stats[space_id] += dsk.data_block_size;
|
||||
}
|
||||
sh_it++;
|
||||
}
|
||||
object_id last_oid = {};
|
||||
bool last_exists = false;
|
||||
auto dirty_it = dirty_db.lower_bound((obj_ver_id){ .oid = { .inode = (pool_id << (64-POOL_ID_BITS)) } });
|
||||
while (dirty_it != dirty_db.end() && (dirty_it->first.oid.inode >> (64-POOL_ID_BITS)) == pool_id)
|
||||
{
|
||||
if (IS_STABLE(dirty_it->second.state) && (IS_BIG_WRITE(dirty_it->second.state) || IS_DELETE(dirty_it->second.state)))
|
||||
{
|
||||
bool exists = false;
|
||||
if (last_oid == dirty_it->first.oid)
|
||||
{
|
||||
exists = last_exists;
|
||||
}
|
||||
else
|
||||
{
|
||||
auto & clean_db = clean_db_shard(dirty_it->first.oid);
|
||||
auto clean_it = clean_db.find(dirty_it->first.oid);
|
||||
exists = clean_it != clean_db.end();
|
||||
}
|
||||
uint64_t space_id = per_inode ? dirty_it->first.oid.inode : (pool_id << (64-POOL_ID_BITS));
|
||||
if (IS_BIG_WRITE(dirty_it->second.state))
|
||||
{
|
||||
if (!exists)
|
||||
inode_space_stats[space_id] += dsk.data_block_size;
|
||||
last_exists = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (exists)
|
||||
{
|
||||
auto & sp = inode_space_stats[space_id];
|
||||
if (sp > dsk.data_block_size)
|
||||
sp -= dsk.data_block_size;
|
||||
else
|
||||
inode_space_stats.erase(space_id);
|
||||
}
|
||||
last_exists = false;
|
||||
}
|
||||
last_oid = dirty_it->first.oid;
|
||||
}
|
||||
dirty_it++;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -272,6 +272,7 @@ class blockstore_impl_t
|
|||
|
||||
std::map<pool_id_t, pool_shard_settings_t> clean_db_settings;
|
||||
std::map<pool_pg_id_t, blockstore_clean_db_t> clean_db_shards;
|
||||
std::map<uint64_t, int> no_inode_stats;
|
||||
uint8_t *clean_bitmaps = NULL;
|
||||
blockstore_dirty_db_t dirty_db;
|
||||
std::vector<blockstore_op_t*> submit_queue;
|
||||
|
@ -318,6 +319,7 @@ class blockstore_impl_t
|
|||
|
||||
blockstore_clean_db_t& clean_db_shard(object_id oid);
|
||||
void reshard_clean_db(pool_id_t pool_id, uint32_t pg_count, uint32_t pg_stripe_size);
|
||||
void recalc_inode_space_stats(uint64_t pool_id, bool per_inode);
|
||||
|
||||
// Journaling
|
||||
void prepare_journal_sector_write(int sector, blockstore_op_t *op);
|
||||
|
@ -428,6 +430,9 @@ public:
|
|||
// Space usage statistics
|
||||
std::map<uint64_t, uint64_t> inode_space_stats;
|
||||
|
||||
// Set per-pool no_inode_stats
|
||||
void set_no_inode_stats(const std::vector<uint64_t> & pool_ids);
|
||||
|
||||
// Print diagnostics to stdout
|
||||
void dump_diagnostics();
|
||||
|
||||
|
|
|
@ -238,6 +238,7 @@ resume_2:
|
|||
data->iov = { bufs[i].buf, (size_t)bufs[i].size };
|
||||
data->callback = [this, i](ring_data_t *data) { handle_event(data, i); };
|
||||
my_uring_prep_writev(sqe, bs->dsk.meta_fd, &data->iov, 1, bs->dsk.meta_offset + bufs[i].offset);
|
||||
bs->ringloop->submit();
|
||||
bufs[i].state = INIT_META_WRITING;
|
||||
submitted++;
|
||||
}
|
||||
|
|
|
@ -487,18 +487,24 @@ void blockstore_impl_t::mark_stable(obj_ver_id v, bool forget_dirty)
|
|||
}
|
||||
if (!exists)
|
||||
{
|
||||
inode_space_stats[dirty_it->first.oid.inode] += dsk.data_block_size;
|
||||
uint64_t space_id = dirty_it->first.oid.inode;
|
||||
if (no_inode_stats[dirty_it->first.oid.inode >> (64-POOL_ID_BITS)])
|
||||
space_id = space_id & ~(((uint64_t)1 << (64-POOL_ID_BITS)) - 1);
|
||||
inode_space_stats[space_id] += dsk.data_block_size;
|
||||
used_blocks++;
|
||||
}
|
||||
big_to_flush++;
|
||||
}
|
||||
else if (IS_DELETE(dirty_it->second.state))
|
||||
{
|
||||
auto & sp = inode_space_stats[dirty_it->first.oid.inode];
|
||||
uint64_t space_id = dirty_it->first.oid.inode;
|
||||
if (no_inode_stats[dirty_it->first.oid.inode >> (64-POOL_ID_BITS)])
|
||||
space_id = space_id & ~(((uint64_t)1 << (64-POOL_ID_BITS)) - 1);
|
||||
auto & sp = inode_space_stats[space_id];
|
||||
if (sp > dsk.data_block_size)
|
||||
sp -= dsk.data_block_size;
|
||||
else
|
||||
inode_space_stats.erase(dirty_it->first.oid.inode);
|
||||
inode_space_stats.erase(space_id);
|
||||
used_blocks--;
|
||||
big_to_flush++;
|
||||
}
|
||||
|
|
|
@ -131,6 +131,7 @@ static const char* help_text =
|
|||
" --immediate_commit none Put pool only on OSDs with this or larger immediate_commit (none < small < all)\n"
|
||||
" --primary_affinity_tags tags Prefer to put primary copies on OSDs with all specified tags\n"
|
||||
" --scrub_interval <time> Enable regular scrubbing for this pool. Format: number + unit s/m/h/d/M/y\n"
|
||||
" --no_inode_stats 1 Disable per-inode statistics for this pool (use for VitastorFS pools)\n"
|
||||
" --pg_stripe_size <number> Increase object grouping stripe\n"
|
||||
" --max_osd_combinations 10000 Maximum number of random combinations for LP solver input\n"
|
||||
" --wait Wait for the new pool to come online\n"
|
||||
|
@ -142,7 +143,7 @@ static const char* help_text =
|
|||
"vitastor-cli modify-pool|pool-modify <id|name> [--name <new_name>] [PARAMETERS...]\n"
|
||||
" Modify an existing pool. Modifiable parameters:\n"
|
||||
" [-s|--pg_size <number>] [--pg_minsize <number>] [-n|--pg_count <count>]\n"
|
||||
" [--failure_domain <level>] [--root_node <node>] [--osd_tags <tags>]\n"
|
||||
" [--failure_domain <level>] [--root_node <node>] [--osd_tags <tags>] [--no_inode_stats 0|1]\n"
|
||||
" [--max_osd_combinations <number>] [--primary_affinity_tags <tags>] [--scrub_interval <time>]\n"
|
||||
" Non-modifiable parameters (changing them WILL lead to data loss):\n"
|
||||
" [--block_size <size>] [--bitmap_granularity <size>]\n"
|
||||
|
|
|
@ -153,6 +153,7 @@ void cli_tool_t::loop_and_wait(std::function<bool(cli_result_t &)> loop_cb, std:
|
|||
ringloop->unregister_consumer(&looper->consumer);
|
||||
looper->loop_cb = NULL;
|
||||
looper->complete_cb(looper->result);
|
||||
ringloop->submit();
|
||||
delete looper;
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -81,6 +81,11 @@ std::string validate_pool_config(json11::Json::object & new_cfg, json11::Json ol
|
|||
}
|
||||
value = value.uint64_value();
|
||||
}
|
||||
else if (key == "no_inode_stats" && value.bool_value())
|
||||
{
|
||||
// Leave true, remove false
|
||||
value = true;
|
||||
}
|
||||
else if (key == "name" || key == "scheme" || key == "immediate_commit" ||
|
||||
key == "failure_domain" || key == "root_node" || key == "scrub_interval")
|
||||
{
|
||||
|
@ -248,7 +253,7 @@ std::string validate_pool_config(json11::Json::object & new_cfg, json11::Json ol
|
|||
// immediate_commit
|
||||
if (!cfg["immediate_commit"].is_null() && !etcd_state_client_t::parse_immediate_commit(cfg["immediate_commit"].string_value()))
|
||||
{
|
||||
return "immediate_commit must be one of \"all\", \"small\", or \"none\", but it is "+cfg["scrub_interval"].as_string();
|
||||
return "immediate_commit must be one of \"all\", \"small\", or \"none\", but it is "+cfg["immediate_commit"].as_string();
|
||||
}
|
||||
|
||||
// scrub_interval
|
||||
|
|
|
@ -529,6 +529,8 @@ resume_3:
|
|||
st["block_size_fmt"] = format_size(st["block_size"].uint64_value());
|
||||
if (st["bitmap_granularity"].uint64_value())
|
||||
st["bitmap_granularity_fmt"] = format_size(st["bitmap_granularity"].uint64_value());
|
||||
if (st["no_inode_stats"].bool_value())
|
||||
st["inode_stats_fmt"] = "disabled";
|
||||
}
|
||||
// All pool parameters are only displayed in the "detailed" mode
|
||||
// because there's too many of them to show them in table
|
||||
|
@ -547,6 +549,7 @@ resume_3:
|
|||
{ "bitmap_granularity_fmt", "Bitmap granularity" },
|
||||
{ "immediate_commit", "Immediate commit" },
|
||||
{ "scrub_interval", "Scrub interval" },
|
||||
{ "inode_stats_fmt", "Per-inode stats" },
|
||||
{ "pg_stripe_size", "PG stripe size" },
|
||||
{ "max_osd_combinations", "Max OSD combinations" },
|
||||
{ "total_fmt", "Total" },
|
||||
|
|
|
@ -1213,6 +1213,10 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part)
|
|||
op->retry_after = op->retval == -EIO ? client_eio_retry_interval : client_retry_interval;
|
||||
}
|
||||
reset_retry_timer(op->retry_after);
|
||||
if (stop_fd >= 0)
|
||||
{
|
||||
msgr.stop_client(stop_fd);
|
||||
}
|
||||
if (op->inflight_count == 0)
|
||||
{
|
||||
if (op->opcode == OSD_OP_SYNC)
|
||||
|
@ -1220,10 +1224,6 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part)
|
|||
else
|
||||
continue_rw(op);
|
||||
}
|
||||
if (stop_fd >= 0)
|
||||
{
|
||||
msgr.stop_client(stop_fd);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -132,9 +132,6 @@ void disk_tool_simple_offsets(json11::Json cfg, bool json_output);
|
|||
|
||||
uint64_t sscanf_json(const char *fmt, const json11::Json & str);
|
||||
void fromhexstr(const std::string & from, int bytes, uint8_t *to);
|
||||
std::string realpath_str(std::string path, bool nofail = true);
|
||||
std::string read_all_fd(int fd);
|
||||
std::string read_file(std::string file, bool allow_enoent = false);
|
||||
int disable_cache(std::string dev);
|
||||
std::string get_parent_device(std::string dev);
|
||||
bool json_is_true(const json11::Json & val);
|
||||
|
|
|
@ -42,36 +42,6 @@ void fromhexstr(const std::string & from, int bytes, uint8_t *to)
|
|||
}
|
||||
}
|
||||
|
||||
std::string realpath_str(std::string path, bool nofail)
|
||||
{
|
||||
char *p = realpath((char*)path.c_str(), NULL);
|
||||
if (!p)
|
||||
{
|
||||
fprintf(stderr, "Failed to resolve %s: %s\n", path.c_str(), strerror(errno));
|
||||
return nofail ? path : "";
|
||||
}
|
||||
std::string rp(p);
|
||||
free(p);
|
||||
return rp;
|
||||
}
|
||||
|
||||
std::string read_file(std::string file, bool allow_enoent)
|
||||
{
|
||||
std::string res;
|
||||
int fd = open(file.c_str(), O_RDONLY);
|
||||
if (fd < 0 || (res = read_all_fd(fd)) == "")
|
||||
{
|
||||
int err = errno;
|
||||
if (fd >= 0)
|
||||
close(fd);
|
||||
if (!allow_enoent || err != ENOENT)
|
||||
fprintf(stderr, "Can't read %s: %s\n", file.c_str(), strerror(err));
|
||||
return "";
|
||||
}
|
||||
close(fd);
|
||||
return res;
|
||||
}
|
||||
|
||||
// returns 1 = check error, 0 = write through, -1 = write back
|
||||
// (similar to 1 = warning, -1 = error, 0 = success in disable_cache)
|
||||
static int check_queue_cache(std::string dev, std::string parent_dev)
|
||||
|
|
|
@ -101,7 +101,7 @@ void epoll_manager_t::handle_uring_event()
|
|||
my_uring_prep_poll_add(sqe, epoll_fd, POLLIN);
|
||||
data->callback = [this](ring_data_t *data)
|
||||
{
|
||||
if (data->res < 0)
|
||||
if (data->res < 0 && data->res != -ECANCELED)
|
||||
{
|
||||
throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res));
|
||||
}
|
||||
|
|
|
@ -863,6 +863,8 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
|||
pc.scrub_interval = parse_time(pool_item.second["scrub_interval"].string_value());
|
||||
if (!pc.scrub_interval)
|
||||
pc.scrub_interval = 0;
|
||||
// Disable per-inode stats
|
||||
pc.no_inode_stats = pool_item.second["no_inode_stats"].bool_value();
|
||||
// Immediate Commit Mode
|
||||
pc.immediate_commit = pool_item.second["immediate_commit"].is_string()
|
||||
? parse_immediate_commit(pool_item.second["immediate_commit"].string_value())
|
||||
|
|
|
@ -60,6 +60,7 @@ struct pool_config_t
|
|||
uint64_t pg_stripe_size;
|
||||
std::map<pg_num_t, pg_config_t> pg_config;
|
||||
uint64_t scrub_interval;
|
||||
bool no_inode_stats;
|
||||
};
|
||||
|
||||
struct inode_config_t
|
||||
|
|
|
@ -146,7 +146,7 @@ public:
|
|||
" Note that nbd_timeout, nbd_max_devices and nbd_max_part options may also be specified\n"
|
||||
" in /etc/vitastor/vitastor.conf or in other configuration file specified with --config_file.\n"
|
||||
" --logfile /path/to/log/file.txt\n"
|
||||
" Wite log messages to the specified file instead of dropping them (in background mode)\n"
|
||||
" Write log messages to the specified file instead of dropping them (in background mode)\n"
|
||||
" or printing them to the standard output (in foreground mode).\n"
|
||||
" --dev_num N\n"
|
||||
" Use the specified device /dev/nbdN instead of automatic selection.\n"
|
||||
|
@ -298,7 +298,7 @@ public:
|
|||
}
|
||||
}
|
||||
}
|
||||
if (cfg["logfile"].is_string())
|
||||
if (cfg["logfile"].string_value() != "")
|
||||
{
|
||||
logfile = cfg["logfile"].string_value();
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ static std::string get_inode_name(nfs_client_t *self, diropargs3 & what)
|
|||
std::string name = what.name;
|
||||
return (dir.size()
|
||||
? dir+"/"+name
|
||||
: self->parent->name_prefix+name);
|
||||
: self->parent->blockfs->name_prefix+name);
|
||||
}
|
||||
|
||||
static fattr3 get_dir_attributes(nfs_client_t *self, std::string dir)
|
||||
|
@ -985,7 +985,7 @@ static void block_nfs3_readdir_common(void *opaque, rpc_op_t *rop, bool is_plus)
|
|||
if (dir_it != self->parent->blockfs->dir_by_hash.end())
|
||||
dir = dir_it->second;
|
||||
}
|
||||
std::string prefix = dir.size() ? dir+"/" : self->parent->name_prefix;
|
||||
std::string prefix = dir.size() ? dir+"/" : self->parent->blockfs->name_prefix;
|
||||
std::map<std::string, struct entryplus3> entries;
|
||||
for (auto & ic: self->parent->cli->st_cli.inode_config)
|
||||
{
|
||||
|
@ -1154,8 +1154,20 @@ static int block_nfs3_readdirplus_proc(void *opaque, rpc_op_t *rop)
|
|||
return 0;
|
||||
}
|
||||
|
||||
void block_fs_state_t::init(nfs_proxy_t *proxy)
|
||||
void block_fs_state_t::init(nfs_proxy_t *proxy, json11::Json cfg)
|
||||
{
|
||||
name_prefix = cfg["subdir"].string_value();
|
||||
{
|
||||
int e = name_prefix.size();
|
||||
while (e > 0 && name_prefix[e-1] == '/')
|
||||
e--;
|
||||
int s = 0;
|
||||
while (s < e && name_prefix[s] == '/')
|
||||
s++;
|
||||
name_prefix = name_prefix.substr(s, e-s);
|
||||
if (name_prefix.size())
|
||||
name_prefix += "/";
|
||||
}
|
||||
// We need inode name hashes for NFS handles to remain stateless and <= 64 bytes long
|
||||
dir_info[""] = (nfs_dir_t){
|
||||
.id = 1,
|
||||
|
@ -1172,7 +1184,7 @@ void block_fs_state_t::init(nfs_proxy_t *proxy)
|
|||
}
|
||||
auto & inode_cfg = inode_cfg_it->second;
|
||||
std::string full_name = inode_cfg.name;
|
||||
if (proxy->name_prefix != "" && full_name.substr(0, proxy->name_prefix.size()) != proxy->name_prefix)
|
||||
if (proxy->blockfs->name_prefix != "" && full_name.substr(0, proxy->blockfs->name_prefix.size()) != proxy->blockfs->name_prefix)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
@ -1181,7 +1193,7 @@ void block_fs_state_t::init(nfs_proxy_t *proxy)
|
|||
clock_gettime(CLOCK_REALTIME, &now);
|
||||
dir_info[""].mod_rev = dir_info[""].mod_rev < inode_cfg.mod_revision ? inode_cfg.mod_revision : dir_info[""].mod_rev;
|
||||
dir_info[""].mtime = now;
|
||||
int pos = full_name.find('/', proxy->name_prefix.size());
|
||||
int pos = full_name.find('/', proxy->blockfs->name_prefix.size());
|
||||
while (pos >= 0)
|
||||
{
|
||||
std::string dir = full_name.substr(0, pos);
|
||||
|
|
|
@ -36,6 +36,8 @@ struct extend_inode_t
|
|||
|
||||
struct block_fs_state_t
|
||||
{
|
||||
std::string name_prefix;
|
||||
|
||||
// filehandle = "S"+base64(sha256(full name with prefix)) or "roothandle" for mount root)
|
||||
uint64_t next_dir_id = 2;
|
||||
// filehandle => dir with name_prefix
|
||||
|
@ -51,7 +53,7 @@ struct block_fs_state_t
|
|||
std::map<inode_t, extend_inode_t> extends;
|
||||
std::multimap<extend_size_t, extend_write_t> extend_writes;
|
||||
|
||||
void init(nfs_proxy_t *proxy);
|
||||
void init(nfs_proxy_t *proxy, json11::Json cfg);
|
||||
};
|
||||
|
||||
nfsstat3 vitastor_nfs_map_err(int err);
|
||||
|
|
|
@ -72,7 +72,7 @@ fattr3 get_kv_attributes(nfs_client_t *self, uint64_t ino, json11::Json attrs)
|
|||
auto nlink = attrs["nlink"].uint64_value();
|
||||
nfstime3 mtime = nfstime_from_str(attrs["mtime"].string_value());
|
||||
nfstime3 atime = attrs["atime"].is_null() ? mtime : nfstime_from_str(attrs["atime"].string_value());
|
||||
// FIXME In theory we could store the binary structure itself instead of JSON
|
||||
// In theory we could store the binary structure itself, but JSON is simpler :-)
|
||||
return (fattr3){
|
||||
.type = (type == 0 ? NF3REG : (ftype3)type),
|
||||
.mode = (attrs["mode"].is_null() ? (type == NF3DIR ? 0755 : 0644) : (uint32_t)mode),
|
||||
|
@ -80,7 +80,8 @@ fattr3 get_kv_attributes(nfs_client_t *self, uint64_t ino, json11::Json attrs)
|
|||
.uid = (uint32_t)attrs["uid"].uint64_value(),
|
||||
.gid = (uint32_t)attrs["gid"].uint64_value(),
|
||||
.size = (type == NF3DIR ? 4096 : attrs["size"].uint64_value()),
|
||||
.used = (type == NF3DIR ? 4096 : attrs["alloc"].uint64_value()),
|
||||
// FIXME Counting actual used file size would require reworking statistics
|
||||
.used = (type == NF3DIR ? 4096 : attrs["size"].uint64_value()),
|
||||
.rdev = (type == NF3BLK || type == NF3CHR
|
||||
? (specdata3){ (uint32_t)attrs["major"].uint64_value(), (uint32_t)attrs["minor"].uint64_value() }
|
||||
: (specdata3){}),
|
||||
|
@ -190,3 +191,73 @@ void nfs_kv_procs(nfs_client_t *self)
|
|||
self->proc_table.insert(pt[i]);
|
||||
}
|
||||
}
|
||||
|
||||
void kv_fs_state_t::init(nfs_proxy_t *proxy, json11::Json cfg)
|
||||
{
|
||||
// Check if we're using VitastorFS
|
||||
fs_kv_inode = cfg["fs"].uint64_value();
|
||||
if (fs_kv_inode)
|
||||
{
|
||||
if (!INODE_POOL(fs_kv_inode))
|
||||
{
|
||||
fprintf(stderr, "FS metadata inode number must include pool\n");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (auto & ic: proxy->cli->st_cli.inode_config)
|
||||
{
|
||||
if (ic.second.name == cfg["fs"].string_value())
|
||||
{
|
||||
fs_kv_inode = ic.first;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!fs_kv_inode)
|
||||
{
|
||||
fprintf(stderr, "FS metadata image \"%s\" does not exist\n", cfg["fs"].string_value().c_str());
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
readdir_getattr_parallel = cfg["readdir_getattr_parallel"].uint64_value();
|
||||
if (!readdir_getattr_parallel)
|
||||
readdir_getattr_parallel = 8;
|
||||
id_alloc_batch_size = cfg["id_alloc_batch_size"].uint64_value();
|
||||
if (!id_alloc_batch_size)
|
||||
id_alloc_batch_size = 200;
|
||||
auto & pool_cfg = proxy->cli->st_cli.pool_config.at(proxy->default_pool_id);
|
||||
pool_block_size = pool_cfg.pg_stripe_size;
|
||||
pool_alignment = pool_cfg.bitmap_granularity;
|
||||
// Open DB and wait
|
||||
int open_res = 0;
|
||||
bool open_done = false;
|
||||
proxy->db = new kv_dbw_t(proxy->cli);
|
||||
proxy->db->open(fs_kv_inode, cfg, [&](int res)
|
||||
{
|
||||
open_done = true;
|
||||
open_res = res;
|
||||
});
|
||||
while (!open_done)
|
||||
{
|
||||
proxy->ringloop->loop();
|
||||
if (open_done)
|
||||
break;
|
||||
proxy->ringloop->wait();
|
||||
}
|
||||
if (open_res < 0)
|
||||
{
|
||||
fprintf(stderr, "Failed to open key/value filesystem metadata index: %s (code %d)\n",
|
||||
strerror(-open_res), open_res);
|
||||
exit(1);
|
||||
}
|
||||
fs_base_inode = ((uint64_t)proxy->default_pool_id << (64-POOL_ID_BITS));
|
||||
fs_inode_count = ((uint64_t)1 << (64-POOL_ID_BITS)) - 1;
|
||||
shared_inode_threshold = pool_block_size;
|
||||
if (!cfg["shared_inode_threshold"].is_null())
|
||||
{
|
||||
shared_inode_threshold = cfg["shared_inode_threshold"].uint64_value();
|
||||
}
|
||||
zero_block.resize(pool_block_size < 1048576 ? 1048576 : pool_block_size);
|
||||
scrap_block.resize(pool_block_size < 1048576 ? 1048576 : pool_block_size);
|
||||
}
|
||||
|
|
13
src/nfs_kv.h
13
src/nfs_kv.h
|
@ -33,7 +33,6 @@ struct shared_alloc_queue_t
|
|||
{
|
||||
nfs_kv_write_state *st;
|
||||
int state;
|
||||
uint64_t size;
|
||||
};
|
||||
|
||||
struct kv_inode_extend_t
|
||||
|
@ -45,12 +44,24 @@ struct kv_inode_extend_t
|
|||
|
||||
struct kv_fs_state_t
|
||||
{
|
||||
uint64_t fs_kv_inode = 0;
|
||||
uint64_t fs_base_inode = 0;
|
||||
uint64_t fs_inode_count = 0;
|
||||
int readdir_getattr_parallel = 8, id_alloc_batch_size = 200;
|
||||
uint64_t pool_block_size = 0;
|
||||
uint64_t pool_alignment = 0;
|
||||
uint64_t shared_inode_threshold = 0;
|
||||
|
||||
std::map<list_cookie_t, list_cookie_val_t> list_cookies;
|
||||
uint64_t fs_next_id = 1, fs_allocated_id = 0;
|
||||
std::vector<uint64_t> unallocated_ids;
|
||||
std::vector<shared_alloc_queue_t> allocating_shared;
|
||||
uint64_t cur_shared_inode = 0, cur_shared_offset = 0;
|
||||
std::map<inode_t, kv_inode_extend_t> extends;
|
||||
std::vector<uint8_t> zero_block;
|
||||
std::vector<uint8_t> scrap_block;
|
||||
|
||||
void init(nfs_proxy_t *proxy, json11::Json cfg);
|
||||
};
|
||||
|
||||
struct shared_file_header_t
|
||||
|
|
|
@ -16,7 +16,7 @@ void allocate_new_id(nfs_client_t *self, std::function<void(int res, uint64_t ne
|
|||
cb(0, self->parent->kvfs->fs_next_id++);
|
||||
return;
|
||||
}
|
||||
else if (self->parent->kvfs->fs_next_id > self->parent->fs_inode_count)
|
||||
else if (self->parent->kvfs->fs_next_id > self->parent->kvfs->fs_inode_count)
|
||||
{
|
||||
cb(-ENOSPC, 0);
|
||||
return;
|
||||
|
@ -29,7 +29,7 @@ void allocate_new_id(nfs_client_t *self, std::function<void(int res, uint64_t ne
|
|||
return;
|
||||
}
|
||||
uint64_t prev_val = stoull_full(prev_str);
|
||||
if (prev_val >= self->parent->fs_inode_count)
|
||||
if (prev_val >= self->parent->kvfs->fs_inode_count)
|
||||
{
|
||||
cb(-ENOSPC, 0);
|
||||
return;
|
||||
|
@ -38,10 +38,10 @@ void allocate_new_id(nfs_client_t *self, std::function<void(int res, uint64_t ne
|
|||
{
|
||||
prev_val = 1;
|
||||
}
|
||||
uint64_t new_val = prev_val + self->parent->id_alloc_batch_size;
|
||||
if (new_val >= self->parent->fs_inode_count)
|
||||
uint64_t new_val = prev_val + self->parent->kvfs->id_alloc_batch_size;
|
||||
if (new_val >= self->parent->kvfs->fs_inode_count)
|
||||
{
|
||||
new_val = self->parent->fs_inode_count;
|
||||
new_val = self->parent->kvfs->fs_inode_count;
|
||||
}
|
||||
self->parent->db->set(KV_NEXT_ID_KEY, std::to_string(new_val), [=](int res)
|
||||
{
|
||||
|
@ -76,6 +76,7 @@ struct kv_create_state
|
|||
uint64_t verf = 0;
|
||||
uint64_t dir_ino = 0;
|
||||
std::string filename;
|
||||
int res = 0;
|
||||
uint64_t new_id = 0;
|
||||
json11::Json::object attrobj;
|
||||
json11::Json attrs;
|
||||
|
@ -84,8 +85,14 @@ struct kv_create_state
|
|||
std::function<void(int res)> cb;
|
||||
};
|
||||
|
||||
static void kv_do_create(kv_create_state *st)
|
||||
static void kv_continue_create(kv_create_state *st, int state)
|
||||
{
|
||||
if (state == 0) {}
|
||||
else if (state == 1) goto resume_1;
|
||||
else if (state == 2) goto resume_2;
|
||||
else if (state == 3) goto resume_3;
|
||||
else if (state == 4) goto resume_4;
|
||||
else if (state == 5) goto resume_5;
|
||||
if (st->self->parent->trace)
|
||||
fprintf(stderr, "[%d] CREATE %ju/%s ATTRS %s\n", st->self->nfs_fd, st->dir_ino, st->filename.c_str(), json11::Json(st->attrobj).dump().c_str());
|
||||
if (st->filename == "" || st->filename.find("/") != std::string::npos)
|
||||
|
@ -98,78 +105,58 @@ static void kv_do_create(kv_create_state *st)
|
|||
st->attrobj["mtime"] = nfstime_now_str();
|
||||
if (st->attrobj.find("atime") == st->attrobj.end())
|
||||
st->attrobj["atime"] = st->attrobj["mtime"];
|
||||
st->attrs = std::move(st->attrobj);
|
||||
resume_1:
|
||||
// Generate inode ID
|
||||
allocate_new_id(st->self, [st](int res, uint64_t new_id)
|
||||
{
|
||||
if (res < 0)
|
||||
st->res = res;
|
||||
st->new_id = new_id;
|
||||
kv_continue_create(st, 2);
|
||||
});
|
||||
return;
|
||||
resume_2:
|
||||
if (st->res < 0)
|
||||
{
|
||||
auto cb = std::move(st->cb);
|
||||
cb(res);
|
||||
cb(st->res);
|
||||
return;
|
||||
}
|
||||
st->new_id = new_id;
|
||||
auto direntry = json11::Json::object{ { "ino", st->new_id } };
|
||||
if (st->attrobj.find("type") != st->attrobj.end() &&
|
||||
st->attrobj["type"].string_value() == "dir")
|
||||
{
|
||||
direntry["type"] = "dir";
|
||||
}
|
||||
st->attrs = std::move(st->attrobj);
|
||||
st->direntry_text = json11::Json(direntry).dump().c_str();
|
||||
// Set direntry
|
||||
st->self->parent->db->set(kv_direntry_key(st->dir_ino, st->filename), st->direntry_text, [st](int res)
|
||||
{
|
||||
if (res < 0)
|
||||
{
|
||||
st->self->parent->kvfs->unallocated_ids.push_back(st->new_id);
|
||||
if (res == -EAGAIN)
|
||||
{
|
||||
if (st->dup_ino)
|
||||
{
|
||||
st->new_id = st->dup_ino;
|
||||
res = 0;
|
||||
}
|
||||
else
|
||||
res = -EEXIST;
|
||||
}
|
||||
else
|
||||
fprintf(stderr, "create %ju/%s failed: %s (code %d)\n", st->dir_ino, st->filename.c_str(), strerror(-res), res);
|
||||
auto cb = std::move(st->cb);
|
||||
cb(res);
|
||||
}
|
||||
else
|
||||
{
|
||||
st->self->parent->db->set(kv_inode_key(st->new_id), st->attrs.dump().c_str(), [st](int res)
|
||||
{
|
||||
if (res == -EAGAIN)
|
||||
{
|
||||
res = -EEXIST;
|
||||
}
|
||||
if (res < 0)
|
||||
{
|
||||
st->self->parent->db->del(kv_direntry_key(st->dir_ino, st->filename), [st, res](int del_res)
|
||||
{
|
||||
if (!del_res)
|
||||
{
|
||||
st->self->parent->kvfs->unallocated_ids.push_back(st->new_id);
|
||||
}
|
||||
auto cb = std::move(st->cb);
|
||||
cb(res);
|
||||
}, [st](int res, const std::string & value)
|
||||
{
|
||||
return res != -ENOENT && value == st->direntry_text;
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
auto cb = std::move(st->cb);
|
||||
cb(0);
|
||||
}
|
||||
st->res = res;
|
||||
kv_continue_create(st, 3);
|
||||
}, [st](int res, const std::string & value)
|
||||
{
|
||||
return res == -ENOENT;
|
||||
});
|
||||
return;
|
||||
resume_3:
|
||||
if (st->res == -EAGAIN)
|
||||
{
|
||||
// Inode ID generator failure - retry
|
||||
goto resume_1;
|
||||
}
|
||||
if (st->res < 0)
|
||||
{
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res);
|
||||
return;
|
||||
}
|
||||
{
|
||||
auto direntry = json11::Json::object{ { "ino", st->new_id } };
|
||||
if (st->attrs["type"].string_value() == "dir")
|
||||
{
|
||||
direntry["type"] = "dir";
|
||||
}
|
||||
st->direntry_text = json11::Json(direntry).dump().c_str();
|
||||
}
|
||||
// Set direntry
|
||||
st->dup_ino = 0;
|
||||
st->self->parent->db->set(kv_direntry_key(st->dir_ino, st->filename), st->direntry_text, [st](int res)
|
||||
{
|
||||
st->res = res;
|
||||
kv_continue_create(st, 4);
|
||||
}, [st](int res, const std::string & value)
|
||||
{
|
||||
// CAS compare - check that the key doesn't exist
|
||||
|
@ -192,7 +179,34 @@ static void kv_do_create(kv_create_state *st)
|
|||
}
|
||||
return true;
|
||||
});
|
||||
return;
|
||||
resume_4:
|
||||
if (st->res == -EAGAIN)
|
||||
{
|
||||
// Direntry already exists
|
||||
st->self->parent->db->del(kv_inode_key(st->new_id), [st](int res)
|
||||
{
|
||||
st->res = res;
|
||||
kv_continue_create(st, 5);
|
||||
});
|
||||
resume_5:
|
||||
if (st->res < 0)
|
||||
{
|
||||
fprintf(stderr, "failed to delete duplicate inode %ju left from create %s (code %d)\n", st->new_id, strerror(-st->res), st->res);
|
||||
}
|
||||
else
|
||||
{
|
||||
st->self->parent->kvfs->unallocated_ids.push_back(st->new_id);
|
||||
}
|
||||
if (st->dup_ino)
|
||||
{
|
||||
// Successfully created by the previous "exclusive" request
|
||||
st->new_id = st->dup_ino;
|
||||
}
|
||||
st->res = st->dup_ino ? 0 : -EEXIST;
|
||||
}
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res);
|
||||
}
|
||||
|
||||
static void kv_create_setattr(json11::Json::object & attrobj, sattr3 & sattr)
|
||||
|
@ -260,7 +274,7 @@ int kv_nfs3_create_proc(void *opaque, rpc_op_t *rop)
|
|||
}
|
||||
}
|
||||
st->cb = [st](int res) { kv_create_reply<CREATE3res, CREATE3resok>(st, res); };
|
||||
kv_do_create(st);
|
||||
kv_continue_create(st, 0);
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
@ -276,7 +290,7 @@ int kv_nfs3_mkdir_proc(void *opaque, rpc_op_t *rop)
|
|||
st->attrobj["parent_ino"] = st->dir_ino;
|
||||
kv_create_setattr(st->attrobj, args->attributes);
|
||||
st->cb = [st](int res) { kv_create_reply<MKDIR3res, MKDIR3resok>(st, res); };
|
||||
kv_do_create(st);
|
||||
kv_continue_create(st, 0);
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
@ -292,7 +306,7 @@ int kv_nfs3_symlink_proc(void *opaque, rpc_op_t *rop)
|
|||
st->attrobj["symlink"] = (std::string)args->symlink.symlink_data;
|
||||
kv_create_setattr(st->attrobj, args->symlink.symlink_attributes);
|
||||
st->cb = [st](int res) { kv_create_reply<SYMLINK3res, SYMLINK3resok>(st, res); };
|
||||
kv_do_create(st);
|
||||
kv_continue_create(st, 0);
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
@ -324,6 +338,6 @@ int kv_nfs3_mknod_proc(void *opaque, rpc_op_t *rop)
|
|||
return 0;
|
||||
}
|
||||
st->cb = [st](int res) { kv_create_reply<MKNOD3res, MKNOD3resok>(st, res); };
|
||||
kv_do_create(st);
|
||||
kv_continue_create(st, 0);
|
||||
return 1;
|
||||
}
|
||||
|
|
|
@ -142,7 +142,6 @@ resume_4:
|
|||
cb(st->res);
|
||||
}
|
||||
|
||||
// FIXME: We'll need some tests for the FS
|
||||
int kv_nfs3_link_proc(void *opaque, rpc_op_t *rop)
|
||||
{
|
||||
auto st = new nfs_kv_link_state;
|
||||
|
|
|
@ -18,6 +18,7 @@ struct nfs_kv_read_state
|
|||
std::function<void(int)> cb;
|
||||
// state
|
||||
int res = 0;
|
||||
int eof = 0;
|
||||
json11::Json ientry;
|
||||
uint64_t aligned_size = 0, aligned_offset = 0;
|
||||
uint8_t *aligned_buf = NULL;
|
||||
|
@ -25,6 +26,9 @@ struct nfs_kv_read_state
|
|||
uint8_t *buf = NULL;
|
||||
};
|
||||
|
||||
#define align_down(size) ((size) & ~(st->self->parent->kvfs->pool_alignment-1))
|
||||
#define align_up(size) (((size) + st->self->parent->kvfs->pool_alignment-1) & ~(st->self->parent->kvfs->pool_alignment-1))
|
||||
|
||||
static void nfs_kv_continue_read(nfs_kv_read_state *st, int state)
|
||||
{
|
||||
if (state == 0) {}
|
||||
|
@ -36,7 +40,8 @@ static void nfs_kv_continue_read(nfs_kv_read_state *st, int state)
|
|||
fprintf(stderr, "BUG: invalid state in nfs_kv_continue_read()");
|
||||
abort();
|
||||
}
|
||||
if (st->offset + sizeof(shared_file_header_t) < st->self->parent->shared_inode_threshold)
|
||||
resume_0:
|
||||
if (st->offset + sizeof(shared_file_header_t) < st->self->parent->kvfs->shared_inode_threshold)
|
||||
{
|
||||
kv_read_inode(st->self, st->ino, [st](int res, const std::string & value, json11::Json attrs)
|
||||
{
|
||||
|
@ -54,21 +59,45 @@ resume_1:
|
|||
}
|
||||
if (st->ientry["shared_ino"].uint64_value() != 0)
|
||||
{
|
||||
st->aligned_size = align_shared_size(st->self, st->offset+st->size);
|
||||
st->aligned_buf = (uint8_t*)malloc_or_die(st->aligned_size);
|
||||
st->buf = st->aligned_buf + sizeof(shared_file_header_t) + st->offset;
|
||||
st->op = new cluster_op_t;
|
||||
st->op->opcode = OSD_OP_READ;
|
||||
st->op->inode = st->self->parent->fs_base_inode + st->ientry["shared_ino"].uint64_value();
|
||||
st->op->offset = st->ientry["shared_offset"].uint64_value();
|
||||
if (st->offset+st->size > st->ientry["size"].uint64_value())
|
||||
if (st->offset >= st->ientry["size"].uint64_value())
|
||||
{
|
||||
st->op->len = align_shared_size(st->self, st->ientry["size"].uint64_value());
|
||||
memset(st->aligned_buf+st->op->len, 0, st->aligned_size-st->op->len);
|
||||
st->size = 0;
|
||||
st->eof = 1;
|
||||
auto cb = std::move(st->cb);
|
||||
cb(0);
|
||||
return;
|
||||
}
|
||||
st->op = new cluster_op_t;
|
||||
{
|
||||
st->op->opcode = OSD_OP_READ;
|
||||
st->op->inode = st->self->parent->kvfs->fs_base_inode + st->ientry["shared_ino"].uint64_value();
|
||||
// Always read including header to react if the file was possibly moved away
|
||||
auto read_offset = st->ientry["shared_offset"].uint64_value();
|
||||
st->op->offset = align_down(read_offset);
|
||||
if (st->op->offset < read_offset)
|
||||
{
|
||||
st->op->iov.push_back(st->self->parent->kvfs->scrap_block.data(),
|
||||
read_offset-st->op->offset);
|
||||
}
|
||||
auto read_size = st->offset+st->size;
|
||||
if (read_size > st->ientry["size"].uint64_value())
|
||||
{
|
||||
st->eof = 1;
|
||||
st->size = st->ientry["size"].uint64_value()-st->offset;
|
||||
read_size = st->ientry["size"].uint64_value();
|
||||
}
|
||||
read_size += sizeof(shared_file_header_t);
|
||||
assert(!st->aligned_buf);
|
||||
st->aligned_buf = (uint8_t*)malloc_or_die(read_size);
|
||||
st->buf = st->aligned_buf + sizeof(shared_file_header_t) + st->offset;
|
||||
st->op->iov.push_back(st->aligned_buf, read_size);
|
||||
st->op->len = align_up(read_offset+read_size) - st->op->offset;
|
||||
if (read_offset+read_size < st->op->offset+st->op->len)
|
||||
{
|
||||
st->op->iov.push_back(st->self->parent->kvfs->scrap_block.data(),
|
||||
st->op->offset+st->op->len - (read_offset+read_size));
|
||||
}
|
||||
}
|
||||
else
|
||||
st->op->len = st->aligned_size;
|
||||
st->op->iov.push_back(st->aligned_buf, st->op->len);
|
||||
st->op->callback = [st, state](cluster_op_t *op)
|
||||
{
|
||||
st->res = op->retval == op->len ? 0 : op->retval;
|
||||
|
@ -80,6 +109,8 @@ resume_1:
|
|||
resume_2:
|
||||
if (st->res < 0)
|
||||
{
|
||||
free(st->aligned_buf);
|
||||
st->aligned_buf = NULL;
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res);
|
||||
return;
|
||||
|
@ -91,22 +122,21 @@ resume_2:
|
|||
free(st->aligned_buf);
|
||||
st->aligned_buf = NULL;
|
||||
st->allow_cache = false;
|
||||
nfs_kv_continue_read(st, 0);
|
||||
return;
|
||||
goto resume_0;
|
||||
}
|
||||
auto cb = std::move(st->cb);
|
||||
cb(0);
|
||||
return;
|
||||
}
|
||||
}
|
||||
st->aligned_offset = (st->offset & ~(st->self->parent->pool_alignment-1));
|
||||
st->aligned_size = ((st->offset + st->size + st->self->parent->pool_alignment-1) &
|
||||
~(st->self->parent->pool_alignment-1)) - st->aligned_offset;
|
||||
st->aligned_offset = align_down(st->offset);
|
||||
st->aligned_size = align_up(st->offset+st->size) - st->aligned_offset;
|
||||
assert(!st->aligned_buf);
|
||||
st->aligned_buf = (uint8_t*)malloc_or_die(st->aligned_size);
|
||||
st->buf = st->aligned_buf + st->offset - st->aligned_offset;
|
||||
st->op = new cluster_op_t;
|
||||
st->op->opcode = OSD_OP_READ;
|
||||
st->op->inode = st->self->parent->fs_base_inode + st->ino;
|
||||
st->op->inode = st->self->parent->kvfs->fs_base_inode + st->ino;
|
||||
st->op->offset = st->aligned_offset;
|
||||
st->op->len = st->aligned_size;
|
||||
st->op->iov.push_back(st->aligned_buf, st->aligned_size);
|
||||
|
@ -119,6 +149,11 @@ resume_2:
|
|||
st->self->parent->cli->execute(st->op);
|
||||
return;
|
||||
resume_3:
|
||||
if (st->res < 0)
|
||||
{
|
||||
free(st->aligned_buf);
|
||||
st->aligned_buf = NULL;
|
||||
}
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res < 0 ? st->res : 0);
|
||||
return;
|
||||
|
@ -151,7 +186,7 @@ int kv_nfs3_read_proc(void *opaque, rpc_op_t *rop)
|
|||
reply->resok.data.data = (char*)st->buf;
|
||||
reply->resok.data.size = st->size;
|
||||
reply->resok.count = st->size;
|
||||
reply->resok.eof = 0;
|
||||
reply->resok.eof = st->eof;
|
||||
}
|
||||
rpc_queue_reply(st->rop);
|
||||
delete st;
|
||||
|
|
|
@ -46,7 +46,7 @@ static void nfs_kv_continue_readdir(nfs_kv_readdir_state *st, int state);
|
|||
|
||||
static void kv_getattr_next(nfs_kv_readdir_state *st)
|
||||
{
|
||||
while (st->is_plus && st->getattr_cur < st->entries.size() && st->getattr_running < st->self->parent->readdir_getattr_parallel)
|
||||
while (st->is_plus && st->getattr_cur < st->entries.size() && st->getattr_running < st->self->parent->kvfs->readdir_getattr_parallel)
|
||||
{
|
||||
auto idx = st->getattr_cur++;
|
||||
st->getattr_running++;
|
||||
|
|
|
@ -22,6 +22,7 @@ struct kv_del_state
|
|||
int type = 0;
|
||||
bool is_rmdir = false;
|
||||
bool rm_data = false;
|
||||
bool allow_cache = true;
|
||||
int res = 0, res2 = 0;
|
||||
std::function<void(int)> cb;
|
||||
};
|
||||
|
@ -48,12 +49,13 @@ static void nfs_kv_continue_delete(kv_del_state *st, int state)
|
|||
fprintf(stderr, "BUG: invalid state in nfs_kv_continue_delete()");
|
||||
abort();
|
||||
}
|
||||
resume_0:
|
||||
st->self->parent->db->get(kv_direntry_key(st->dir_ino, st->filename), [st](int res, const std::string & value)
|
||||
{
|
||||
st->res = res;
|
||||
st->direntry_text = value;
|
||||
nfs_kv_continue_delete(st, 1);
|
||||
});
|
||||
}, st->allow_cache);
|
||||
return;
|
||||
resume_1:
|
||||
if (st->res < 0)
|
||||
|
@ -82,7 +84,7 @@ resume_1:
|
|||
st->res = res;
|
||||
st->ientry_text = value;
|
||||
nfs_kv_continue_delete(st, 2);
|
||||
});
|
||||
}, st->allow_cache);
|
||||
return;
|
||||
resume_2:
|
||||
if (st->res < 0)
|
||||
|
@ -124,8 +126,8 @@ resume_3:
|
|||
if (st->res == -EAGAIN)
|
||||
{
|
||||
// CAS failure, restart from the beginning
|
||||
nfs_kv_continue_delete(st, 0);
|
||||
return;
|
||||
st->allow_cache = false;
|
||||
goto resume_0;
|
||||
}
|
||||
else if (st->res < 0 && st->res != -ENOENT)
|
||||
{
|
||||
|
@ -229,8 +231,8 @@ resume_6:
|
|||
{
|
||||
// Remove data
|
||||
st->self->parent->cmd->loop_and_wait(st->self->parent->cmd->start_rm_data(json11::Json::object {
|
||||
{ "inode", INODE_NO_POOL(st->self->parent->fs_base_inode + st->ino) },
|
||||
{ "pool", (uint64_t)INODE_POOL(st->self->parent->fs_base_inode + st->ino) },
|
||||
{ "inode", INODE_NO_POOL(st->self->parent->kvfs->fs_base_inode + st->ino) },
|
||||
{ "pool", (uint64_t)INODE_POOL(st->self->parent->kvfs->fs_base_inode + st->ino) },
|
||||
}), [st](const cli_result_t & r)
|
||||
{
|
||||
if (r.err)
|
||||
|
|
|
@ -7,28 +7,46 @@
|
|||
|
||||
#include "nfs_proxy.h"
|
||||
#include "nfs_kv.h"
|
||||
#include "cli.h"
|
||||
|
||||
struct nfs_kv_rename_state
|
||||
{
|
||||
nfs_client_t *self = NULL;
|
||||
rpc_op_t *rop = NULL;
|
||||
// params:
|
||||
uint64_t old_dir_ino = 0, new_dir_ino = 0;
|
||||
std::string old_name, new_name;
|
||||
std::string old_direntry_text;
|
||||
std::string ientry_text;
|
||||
json11::Json direntry, ientry;
|
||||
// state:
|
||||
bool allow_cache = true;
|
||||
std::string old_direntry_text, old_ientry_text, new_direntry_text, new_ientry_text;
|
||||
json11::Json old_direntry, old_ientry, new_direntry, new_ientry;
|
||||
std::string new_dir_prefix;
|
||||
void *list_handle = NULL;
|
||||
bool new_exists = false;
|
||||
bool rm_dest_data = false;
|
||||
int res = 0, res2 = 0;
|
||||
std::function<void(int)> cb;
|
||||
};
|
||||
|
||||
static void nfs_kv_continue_rename(nfs_kv_rename_state *st, int state)
|
||||
{
|
||||
// Simplified algorithm (non-atomic and without ENOTDIR/EISDIR):
|
||||
// 1) Check if the target directory exists
|
||||
// 2) Delete & save (using CAS) the source direntry
|
||||
// 3) Write the target direntry using CAS, fail if it already exists
|
||||
// 4) Restore the source direntry on failure
|
||||
// Atomic version would require something like a journal
|
||||
// Algorithm (non-atomic of course):
|
||||
// 1) Read source direntry
|
||||
// 2) Read destination direntry
|
||||
// 3) If destination exists:
|
||||
// 3.1) Check file/folder compatibility (EISDIR/ENOTDIR)
|
||||
// 3.2) Check if destination is empty if it's a folder
|
||||
// 4) If not:
|
||||
// 4.1) Check that the destination directory is actually a directory
|
||||
// 5) Overwrite destination direntry, restart from beginning if CAS failure
|
||||
// 6) Delete source direntry, restart from beginning if CAS failure
|
||||
// 7) If the moved direntry was a regular file:
|
||||
// 7.1) Read inode
|
||||
// 7.2) Delete inode if its link count <= 1
|
||||
// 7.3) Delete inode data if its link count <= 1 and it's a regular non-shared file
|
||||
// 7.4) Reduce link count by 1 if it's > 1
|
||||
// 8) If the moved direntry is a directory:
|
||||
// 8.1) Change parent_ino reference in its inode
|
||||
if (state == 0) {}
|
||||
else if (state == 1) goto resume_1;
|
||||
else if (state == 2) goto resume_2;
|
||||
|
@ -36,37 +54,27 @@ static void nfs_kv_continue_rename(nfs_kv_rename_state *st, int state)
|
|||
else if (state == 4) goto resume_4;
|
||||
else if (state == 5) goto resume_5;
|
||||
else if (state == 6) goto resume_6;
|
||||
else if (state == 7) goto resume_7;
|
||||
else if (state == 8) goto resume_8;
|
||||
else if (state == 9) goto resume_9;
|
||||
else if (state == 10) goto resume_10;
|
||||
else if (state == 11) goto resume_11;
|
||||
else if (state == 12) goto resume_12;
|
||||
else
|
||||
{
|
||||
fprintf(stderr, "BUG: invalid state in nfs_kv_continue_rename()");
|
||||
abort();
|
||||
}
|
||||
kv_read_inode(st->self, st->new_dir_ino, [st](int res, const std::string & value, json11::Json attrs)
|
||||
resume_0:
|
||||
// Read the old direntry
|
||||
st->self->parent->db->get(kv_direntry_key(st->old_dir_ino, st->old_name), [=](int res, const std::string & value)
|
||||
{
|
||||
st->res = res == 0 ? (attrs["type"].string_value() == "dir" ? 0 : -ENOTDIR) : res;
|
||||
st->res = res;
|
||||
st->old_direntry_text = value;
|
||||
nfs_kv_continue_rename(st, 1);
|
||||
});
|
||||
}, st->allow_cache);
|
||||
return;
|
||||
resume_1:
|
||||
if (st->res < 0)
|
||||
{
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res);
|
||||
return;
|
||||
}
|
||||
// Read & delete the old direntry
|
||||
st->self->parent->db->del(kv_direntry_key(st->old_dir_ino, st->old_name), [st](int res)
|
||||
{
|
||||
st->res = res;
|
||||
nfs_kv_continue_rename(st, 2);
|
||||
}, [=](int res, const std::string & old_value)
|
||||
{
|
||||
st->res = res;
|
||||
st->old_direntry_text = old_value;
|
||||
return true;
|
||||
});
|
||||
return;
|
||||
resume_2:
|
||||
if (st->res < 0)
|
||||
{
|
||||
auto cb = std::move(st->cb);
|
||||
|
@ -75,47 +83,85 @@ resume_2:
|
|||
}
|
||||
{
|
||||
std::string err;
|
||||
st->direntry = json11::Json::parse(st->old_direntry_text, err);
|
||||
st->old_direntry = json11::Json::parse(st->old_direntry_text, err);
|
||||
if (err != "")
|
||||
{
|
||||
fprintf(stderr, "Invalid JSON in direntry %s = %s: %s\n",
|
||||
kv_direntry_key(st->old_dir_ino, st->old_name).c_str(),
|
||||
st->old_direntry_text.c_str(), err.c_str());
|
||||
auto cb = std::move(st->cb);
|
||||
cb(-EIO);
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (st->direntry["type"].string_value() == "dir" &&
|
||||
st->direntry["ino"].uint64_value() != 0 &&
|
||||
st->new_dir_ino != st->old_dir_ino)
|
||||
{
|
||||
// Read & check inode
|
||||
kv_read_inode(st->self, st->direntry["ino"].uint64_value(), [st](int res, const std::string & value, json11::Json ientry)
|
||||
// Read the new direntry
|
||||
st->self->parent->db->get(kv_direntry_key(st->new_dir_ino, st->new_name), [=](int res, const std::string & value)
|
||||
{
|
||||
st->res = res;
|
||||
st->ientry_text = value;
|
||||
st->ientry = ientry;
|
||||
nfs_kv_continue_rename(st, 3);
|
||||
});
|
||||
st->new_direntry_text = value;
|
||||
nfs_kv_continue_rename(st, 2);
|
||||
}, st->allow_cache);
|
||||
return;
|
||||
resume_3:
|
||||
if (st->res < 0)
|
||||
resume_2:
|
||||
if (st->res < 0 && st->res != -ENOENT)
|
||||
{
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res);
|
||||
return;
|
||||
}
|
||||
// Change parent reference
|
||||
if (st->res == 0)
|
||||
{
|
||||
auto ientry_new = st->ientry.object_items();
|
||||
ientry_new["parent_ino"] = st->new_dir_ino;
|
||||
st->self->parent->db->set(kv_inode_key(st->direntry["ino"].uint64_value()), json11::Json(ientry_new).dump(), [st](int res)
|
||||
std::string err;
|
||||
st->new_direntry = json11::Json::parse(st->new_direntry_text, err);
|
||||
if (err != "")
|
||||
{
|
||||
fprintf(stderr, "Invalid JSON in direntry %s = %s: %s\n",
|
||||
kv_direntry_key(st->new_dir_ino, st->new_name).c_str(),
|
||||
st->new_direntry_text.c_str(), err.c_str());
|
||||
auto cb = std::move(st->cb);
|
||||
cb(-EIO);
|
||||
return;
|
||||
}
|
||||
}
|
||||
st->new_exists = st->res == 0;
|
||||
if (st->new_exists)
|
||||
{
|
||||
// Check file/folder compatibility (EISDIR/ENOTDIR)
|
||||
if ((st->old_direntry["type"] == "dir") != (st->new_direntry["type"] == "dir"))
|
||||
{
|
||||
auto cb = std::move(st->cb);
|
||||
cb((st->new_direntry["type"] == "dir") ? -ENOTDIR : -EISDIR);
|
||||
return;
|
||||
}
|
||||
if (st->new_direntry["type"] == "dir")
|
||||
{
|
||||
// Check that the destination directory is empty
|
||||
st->new_dir_prefix = kv_direntry_key(st->new_direntry["ino"].uint64_value(), "");
|
||||
st->list_handle = st->self->parent->db->list_start(st->new_dir_prefix);
|
||||
st->self->parent->db->list_next(st->list_handle, [st](int res, const std::string & key, const std::string & value)
|
||||
{
|
||||
st->res = res;
|
||||
nfs_kv_continue_rename(st, 4);
|
||||
}, [st](int res, const std::string & old_value)
|
||||
{
|
||||
return old_value == st->ientry_text;
|
||||
nfs_kv_continue_rename(st, 3);
|
||||
});
|
||||
return;
|
||||
resume_3:
|
||||
st->self->parent->db->list_close(st->list_handle);
|
||||
if (st->res != -ENOENT)
|
||||
{
|
||||
auto cb = std::move(st->cb);
|
||||
cb(-ENOTEMPTY);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Check that the new directory is actually a directory
|
||||
kv_read_inode(st->self, st->new_dir_ino, [st](int res, const std::string & value, json11::Json attrs)
|
||||
{
|
||||
st->res = res == 0 ? (attrs["type"].string_value() == "dir" ? 0 : -ENOTDIR) : res;
|
||||
nfs_kv_continue_rename(st, 4);
|
||||
});
|
||||
return;
|
||||
resume_4:
|
||||
if (st->res < 0)
|
||||
|
@ -125,43 +171,183 @@ resume_4:
|
|||
return;
|
||||
}
|
||||
}
|
||||
// Write the new direntry
|
||||
st->self->parent->db->set(kv_direntry_key(st->new_dir_ino, st->new_name), st->old_direntry_text, [st](int res)
|
||||
{
|
||||
st->res = res;
|
||||
nfs_kv_continue_rename(st, 5);
|
||||
}, [st](int res, const std::string & old_value)
|
||||
{
|
||||
return res == -ENOENT;
|
||||
return st->new_exists ? (old_value == st->new_direntry_text) : (res == -ENOENT);
|
||||
});
|
||||
return;
|
||||
resume_5:
|
||||
if (st->res == -EAGAIN)
|
||||
{
|
||||
// CAS failure
|
||||
st->allow_cache = false;
|
||||
goto resume_0;
|
||||
}
|
||||
if (st->res < 0)
|
||||
{
|
||||
if (st->res == -EAGAIN)
|
||||
st->res = -EEXIST;
|
||||
st->res2 = st->res;
|
||||
st->self->parent->db->set(kv_direntry_key(st->old_dir_ino, st->old_name), st->old_direntry_text, [st](int res)
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res);
|
||||
return;
|
||||
}
|
||||
// Delete the old direntry
|
||||
st->self->parent->db->del(kv_direntry_key(st->old_dir_ino, st->old_name), [st](int res)
|
||||
{
|
||||
st->res = res;
|
||||
nfs_kv_continue_rename(st, 6);
|
||||
}, [st](int res, const std::string & old_value)
|
||||
}, [=](int res, const std::string & old_value)
|
||||
{
|
||||
return res == -ENOENT;
|
||||
return res == 0 && old_value == st->old_direntry_text;
|
||||
});
|
||||
return;
|
||||
resume_6:
|
||||
if (st->res == -EAGAIN)
|
||||
{
|
||||
// CAS failure
|
||||
st->allow_cache = false;
|
||||
goto resume_0;
|
||||
}
|
||||
if (st->res < 0)
|
||||
{
|
||||
if (st->res == -EAGAIN)
|
||||
st->res = -EEXIST;
|
||||
fprintf(stderr, "error restoring %s = %s after failed rename: %s (code %d)\n",
|
||||
kv_direntry_key(st->old_dir_ino, st->old_name).c_str(), st->old_direntry_text.c_str(),
|
||||
strerror(-st->res), st->res);
|
||||
}
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res2);
|
||||
cb(st->res);
|
||||
return;
|
||||
}
|
||||
st->allow_cache = true;
|
||||
resume_7again:
|
||||
if (st->new_exists && st->new_direntry["type"].string_value() != "dir")
|
||||
{
|
||||
// (Maybe) delete old destination file data
|
||||
kv_read_inode(st->self, st->new_direntry["ino"].uint64_value(), [st](int res, const std::string & value, json11::Json attrs)
|
||||
{
|
||||
st->res = res;
|
||||
st->new_ientry_text = value;
|
||||
st->new_ientry = attrs;
|
||||
nfs_kv_continue_rename(st, 7);
|
||||
}, st->allow_cache);
|
||||
return;
|
||||
resume_7:
|
||||
if (st->res == 0)
|
||||
{
|
||||
// (5) Reduce inode refcount by 1 or delete inode
|
||||
if (st->new_ientry["nlink"].uint64_value() > 1)
|
||||
{
|
||||
auto copy = st->new_ientry.object_items();
|
||||
copy["nlink"] = st->new_ientry["nlink"].uint64_value()-1;
|
||||
st->self->parent->db->set(kv_inode_key(st->new_direntry["ino"].uint64_value()), json11::Json(copy).dump(), [st](int res)
|
||||
{
|
||||
st->res = res;
|
||||
nfs_kv_continue_rename(st, 8);
|
||||
}, [st](int res, const std::string & old_value)
|
||||
{
|
||||
return old_value == st->new_ientry_text;
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
st->rm_dest_data = kv_map_type(st->new_ientry["type"].string_value()) == NF3REG
|
||||
&& !st->new_ientry["shared_ino"].uint64_value();
|
||||
st->self->parent->db->del(kv_inode_key(st->new_direntry["ino"].uint64_value()), [st](int res)
|
||||
{
|
||||
st->res = res;
|
||||
nfs_kv_continue_rename(st, 8);
|
||||
}, [st](int res, const std::string & old_value)
|
||||
{
|
||||
return old_value == st->new_ientry_text;
|
||||
});
|
||||
}
|
||||
return;
|
||||
resume_8:
|
||||
if (st->res == -EAGAIN)
|
||||
{
|
||||
// CAS failure - re-read inode
|
||||
st->allow_cache = false;
|
||||
goto resume_7again;
|
||||
}
|
||||
if (st->res < 0)
|
||||
{
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res);
|
||||
return;
|
||||
}
|
||||
// Delete inode data if required
|
||||
if (st->rm_dest_data)
|
||||
{
|
||||
st->self->parent->cmd->loop_and_wait(st->self->parent->cmd->start_rm_data(json11::Json::object {
|
||||
{ "inode", INODE_NO_POOL(st->self->parent->kvfs->fs_base_inode + st->new_direntry["ino"].uint64_value()) },
|
||||
{ "pool", (uint64_t)INODE_POOL(st->self->parent->kvfs->fs_base_inode + st->new_direntry["ino"].uint64_value()) },
|
||||
}), [st](const cli_result_t & r)
|
||||
{
|
||||
if (r.err)
|
||||
{
|
||||
fprintf(stderr, "Failed to remove inode %jx data: %s (code %d)\n",
|
||||
st->new_direntry["ino"].uint64_value(), r.text.c_str(), r.err);
|
||||
}
|
||||
st->res = r.err;
|
||||
nfs_kv_continue_rename(st, 9);
|
||||
});
|
||||
return;
|
||||
resume_9:
|
||||
if (st->res < 0)
|
||||
{
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (st->old_direntry["type"].string_value() == "dir" && st->new_dir_ino != st->old_dir_ino)
|
||||
{
|
||||
// Change parent_ino in old ientry
|
||||
st->allow_cache = true;
|
||||
resume_10:
|
||||
kv_read_inode(st->self, st->old_direntry["ino"].uint64_value(), [st](int res, const std::string & value, json11::Json ientry)
|
||||
{
|
||||
st->res = res;
|
||||
st->old_ientry_text = value;
|
||||
st->old_ientry = ientry;
|
||||
nfs_kv_continue_rename(st, 11);
|
||||
}, st->allow_cache);
|
||||
return;
|
||||
resume_11:
|
||||
if (st->res < 0)
|
||||
{
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res);
|
||||
return;
|
||||
}
|
||||
{
|
||||
auto ientry_new = st->old_ientry.object_items();
|
||||
ientry_new["parent_ino"] = st->new_dir_ino;
|
||||
st->self->parent->db->set(kv_inode_key(st->old_direntry["ino"].uint64_value()), json11::Json(ientry_new).dump(), [st](int res)
|
||||
{
|
||||
st->res = res;
|
||||
nfs_kv_continue_rename(st, 12);
|
||||
}, [st](int res, const std::string & old_value)
|
||||
{
|
||||
return old_value == st->old_ientry_text;
|
||||
});
|
||||
}
|
||||
return;
|
||||
resume_12:
|
||||
if (st->res == -EAGAIN)
|
||||
{
|
||||
// CAS failure - try again
|
||||
st->allow_cache = false;
|
||||
goto resume_10;
|
||||
}
|
||||
if (st->res < 0)
|
||||
{
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res);
|
||||
return;
|
||||
}
|
||||
}
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res);
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ struct nfs_kv_setattr_state
|
|||
|
||||
static void nfs_kv_continue_setattr(nfs_kv_setattr_state *st, int state)
|
||||
{
|
||||
// FIXME: NFS client does a lot of setattr calls, so maybe process them asynchronously
|
||||
if (state == 0) {}
|
||||
else if (state == 1) goto resume_1;
|
||||
else if (state == 2) goto resume_2;
|
||||
|
@ -50,8 +51,7 @@ resume_1:
|
|||
cb(st->res);
|
||||
return;
|
||||
}
|
||||
if (st->ientry["type"].string_value() == "link" ||
|
||||
st->ientry["type"].string_value() != "file" &&
|
||||
if (st->ientry["type"].string_value() != "file" &&
|
||||
st->ientry["type"].string_value() != "" &&
|
||||
!st->set_attrs["size"].is_null())
|
||||
{
|
||||
|
@ -104,8 +104,8 @@ resume_2:
|
|||
{
|
||||
// Delete extra data when downsizing
|
||||
st->self->parent->cmd->loop_and_wait(st->self->parent->cmd->start_rm_data(json11::Json::object {
|
||||
{ "inode", INODE_NO_POOL(st->self->parent->fs_base_inode + st->ino) },
|
||||
{ "pool", (uint64_t)INODE_POOL(st->self->parent->fs_base_inode + st->ino) },
|
||||
{ "inode", INODE_NO_POOL(st->self->parent->kvfs->fs_base_inode + st->ino) },
|
||||
{ "pool", (uint64_t)INODE_POOL(st->self->parent->kvfs->fs_base_inode + st->ino) },
|
||||
{ "min_offset", st->set_attrs["size"].uint64_value() },
|
||||
}), [st](const cli_result_t & r)
|
||||
{
|
||||
|
@ -121,7 +121,7 @@ resume_2:
|
|||
}
|
||||
resume_3:
|
||||
auto cb = std::move(st->cb);
|
||||
cb(0);
|
||||
cb(st->res);
|
||||
}
|
||||
|
||||
int kv_nfs3_setattr_proc(void *opaque, rpc_op_t *rop)
|
||||
|
|
|
@ -8,6 +8,9 @@
|
|||
#include "nfs_proxy.h"
|
||||
#include "nfs_kv.h"
|
||||
|
||||
// FIXME: Implement shared inode defragmentator
|
||||
// FIXME: Implement fsck for vitastor-fs and for vitastor-kv
|
||||
|
||||
struct nfs_rmw_t
|
||||
{
|
||||
nfs_kv_write_state *st = NULL;
|
||||
|
@ -38,9 +41,11 @@ struct nfs_kv_write_state
|
|||
uint64_t new_size = 0;
|
||||
uint64_t aligned_size = 0;
|
||||
uint8_t *aligned_buf = NULL;
|
||||
uint64_t shared_inode = 0, shared_offset = 0;
|
||||
// new shared parameters
|
||||
uint64_t shared_inode = 0, shared_offset = 0, shared_alloc = 0;
|
||||
bool was_immediate = false;
|
||||
nfs_rmw_t rmw[2];
|
||||
shared_file_header_t shdr;
|
||||
kv_inode_extend_t *ext = NULL;
|
||||
|
||||
~nfs_kv_write_state()
|
||||
|
@ -53,30 +58,51 @@ struct nfs_kv_write_state
|
|||
}
|
||||
};
|
||||
|
||||
#define align_down(size) ((size) & ~(st->self->parent->kvfs->pool_alignment-1))
|
||||
#define align_up(size) (((size) + st->self->parent->kvfs->pool_alignment-1) & ~(st->self->parent->kvfs->pool_alignment-1))
|
||||
|
||||
static void nfs_kv_continue_write(nfs_kv_write_state *st, int state);
|
||||
|
||||
static void allocate_shared_space(nfs_kv_write_state *st)
|
||||
{
|
||||
auto kvfs = st->self->parent->kvfs;
|
||||
st->shared_inode = kvfs->cur_shared_inode;
|
||||
if (st->new_size < 3*kvfs->pool_alignment - sizeof(shared_file_header_t))
|
||||
{
|
||||
// Allocate as is, without alignment if file is smaller than 3*4kb - 24
|
||||
st->shared_offset = kvfs->cur_shared_offset;
|
||||
st->shared_alloc = align_up(sizeof(shared_file_header_t) + st->new_size);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Try to skip some space to store data aligned
|
||||
st->shared_offset = align_up(kvfs->cur_shared_offset + sizeof(shared_file_header_t)) - sizeof(shared_file_header_t);
|
||||
st->shared_alloc = sizeof(shared_file_header_t) + align_up(st->new_size);
|
||||
}
|
||||
st->self->parent->kvfs->cur_shared_offset = st->shared_offset + st->shared_alloc;
|
||||
}
|
||||
|
||||
static void finish_allocate_shared(nfs_client_t *self, int res)
|
||||
{
|
||||
std::vector<shared_alloc_queue_t> waiting;
|
||||
waiting.swap(self->parent->kvfs->allocating_shared);
|
||||
for (auto & w: waiting)
|
||||
{
|
||||
w.st->res = res;
|
||||
auto st = w.st;
|
||||
st->res = res;
|
||||
if (res == 0)
|
||||
{
|
||||
w.st->shared_inode = self->parent->kvfs->cur_shared_inode;
|
||||
w.st->shared_offset = self->parent->kvfs->cur_shared_offset;
|
||||
self->parent->kvfs->cur_shared_offset += (w.size + self->parent->pool_alignment-1) & ~(self->parent->pool_alignment-1);
|
||||
allocate_shared_space(st);
|
||||
}
|
||||
nfs_kv_continue_write(w.st, w.state);
|
||||
}
|
||||
}
|
||||
|
||||
static void allocate_shared_inode(nfs_kv_write_state *st, int state, uint64_t size)
|
||||
static void allocate_shared_inode(nfs_kv_write_state *st, int state)
|
||||
{
|
||||
if (st->self->parent->kvfs->cur_shared_inode == 0)
|
||||
{
|
||||
st->self->parent->kvfs->allocating_shared.push_back({ st, state, size });
|
||||
st->self->parent->kvfs->allocating_shared.push_back({ st, state });
|
||||
if (st->self->parent->kvfs->allocating_shared.size() > 1)
|
||||
{
|
||||
return;
|
||||
|
@ -110,27 +136,19 @@ static void allocate_shared_inode(nfs_kv_write_state *st, int state, uint64_t si
|
|||
else
|
||||
{
|
||||
st->res = 0;
|
||||
st->shared_inode = st->self->parent->kvfs->cur_shared_inode;
|
||||
st->shared_offset = st->self->parent->kvfs->cur_shared_offset;
|
||||
st->self->parent->kvfs->cur_shared_offset += (size + st->self->parent->pool_alignment-1) & ~(st->self->parent->pool_alignment-1);
|
||||
allocate_shared_space(st);
|
||||
nfs_kv_continue_write(st, state);
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t align_shared_size(nfs_client_t *self, uint64_t size)
|
||||
{
|
||||
return (size + sizeof(shared_file_header_t) + self->parent->pool_alignment-1)
|
||||
& ~(self->parent->pool_alignment-1);
|
||||
}
|
||||
|
||||
static void nfs_do_write(uint64_t ino, uint64_t offset, uint8_t *buf, uint64_t size, nfs_kv_write_state *st, int state)
|
||||
static void nfs_do_write(uint64_t ino, uint64_t offset, uint64_t size, std::function<void(cluster_op_t *op)> prepare, nfs_kv_write_state *st, int state)
|
||||
{
|
||||
auto op = new cluster_op_t;
|
||||
op->opcode = OSD_OP_WRITE;
|
||||
op->inode = st->self->parent->fs_base_inode + ino;
|
||||
op->inode = st->self->parent->kvfs->fs_base_inode + ino;
|
||||
op->offset = offset;
|
||||
op->len = size;
|
||||
op->iov.push_back(buf, size);
|
||||
prepare(op);
|
||||
st->waiting++;
|
||||
op->callback = [st, state](cluster_op_t *op)
|
||||
{
|
||||
|
@ -148,30 +166,31 @@ static void nfs_do_write(uint64_t ino, uint64_t offset, uint8_t *buf, uint64_t s
|
|||
st->self->parent->cli->execute(op);
|
||||
}
|
||||
|
||||
static void nfs_do_shared_write(nfs_kv_write_state *st, int state)
|
||||
{
|
||||
nfs_do_write(st->shared_inode, st->shared_offset, st->aligned_buf, st->aligned_size, st, state);
|
||||
}
|
||||
|
||||
static void nfs_do_unshare_write(nfs_kv_write_state *st, int state)
|
||||
{
|
||||
nfs_do_write(st->ino, 0, st->aligned_buf + sizeof(shared_file_header_t),
|
||||
st->aligned_size - sizeof(shared_file_header_t), st, state);
|
||||
uint64_t size = st->ientry["size"].uint64_value();
|
||||
uint64_t aligned_size = align_up(size);
|
||||
nfs_do_write(st->ino, 0, aligned_size, [&](cluster_op_t *op)
|
||||
{
|
||||
op->iov.push_back(st->aligned_buf, size);
|
||||
if (aligned_size > size)
|
||||
op->iov.push_back(st->self->parent->kvfs->zero_block.data(), aligned_size-size);
|
||||
}, st, state);
|
||||
}
|
||||
|
||||
static void nfs_do_rmw(nfs_rmw_t *rmw)
|
||||
{
|
||||
auto parent = rmw->st->self->parent;
|
||||
auto align = parent->pool_alignment;
|
||||
auto align = parent->kvfs->pool_alignment;
|
||||
assert(rmw->size < align);
|
||||
assert((rmw->offset/parent->pool_block_size) == ((rmw->offset+rmw->size-1)/parent->pool_block_size));
|
||||
assert((rmw->offset/parent->kvfs->pool_block_size) == ((rmw->offset+rmw->size-1)/parent->kvfs->pool_block_size));
|
||||
if (!rmw->part_buf)
|
||||
{
|
||||
rmw->part_buf = (uint8_t*)malloc_or_die(align);
|
||||
}
|
||||
auto op = new cluster_op_t;
|
||||
op->opcode = OSD_OP_READ;
|
||||
op->inode = parent->fs_base_inode + rmw->ino;
|
||||
op->inode = parent->kvfs->fs_base_inode + rmw->ino;
|
||||
op->offset = rmw->offset & ~(align-1);
|
||||
op->len = align;
|
||||
op->iov.push_back(rmw->part_buf, op->len);
|
||||
|
@ -196,7 +215,7 @@ static void nfs_do_rmw(nfs_rmw_t *rmw)
|
|||
auto st = rmw->st;
|
||||
rmw->version = rd_op->version+1;
|
||||
if (st->rmw[0].st && st->rmw[1].st &&
|
||||
st->rmw[0].offset/st->self->parent->pool_block_size == st->rmw[1].offset/st->self->parent->pool_block_size)
|
||||
st->rmw[0].offset/st->self->parent->kvfs->pool_block_size == st->rmw[1].offset/st->self->parent->kvfs->pool_block_size)
|
||||
{
|
||||
// Same block... RMWs should be sequential
|
||||
int other = rmw == &st->rmw[0] ? 1 : 0;
|
||||
|
@ -204,12 +223,12 @@ static void nfs_do_rmw(nfs_rmw_t *rmw)
|
|||
}
|
||||
}
|
||||
auto parent = rmw->st->self->parent;
|
||||
auto align = parent->pool_alignment;
|
||||
auto align = parent->kvfs->pool_alignment;
|
||||
bool is_begin = (rmw->offset % align);
|
||||
bool is_end = ((rmw->offset+rmw->size) % align);
|
||||
auto op = new cluster_op_t;
|
||||
op->opcode = OSD_OP_WRITE;
|
||||
op->inode = rmw->st->self->parent->fs_base_inode + rmw->ino;
|
||||
op->inode = rmw->st->self->parent->kvfs->fs_base_inode + rmw->ino;
|
||||
op->offset = rmw->offset & ~(align-1);
|
||||
op->len = align;
|
||||
op->version = rmw->version;
|
||||
|
@ -256,17 +275,55 @@ static void nfs_do_rmw(nfs_rmw_t *rmw)
|
|||
|
||||
static void nfs_do_shared_read(nfs_kv_write_state *st, int state)
|
||||
{
|
||||
uint64_t data_size = st->ientry["size"].uint64_value();
|
||||
if (!data_size)
|
||||
{
|
||||
nfs_kv_continue_write(st, state);
|
||||
return;
|
||||
}
|
||||
assert(!st->aligned_buf);
|
||||
st->aligned_buf = (uint8_t*)malloc_or_die(data_size);
|
||||
uint64_t shared_offset = st->ientry["shared_offset"].uint64_value();
|
||||
auto op = new cluster_op_t;
|
||||
op->opcode = OSD_OP_READ;
|
||||
op->inode = st->self->parent->fs_base_inode + st->ientry["shared_ino"].uint64_value();
|
||||
op->offset = st->ientry["shared_offset"].uint64_value();
|
||||
op->len = align_shared_size(st->self, st->ientry["size"].uint64_value());
|
||||
op->iov.push_back(st->aligned_buf, op->len);
|
||||
op->inode = st->self->parent->kvfs->fs_base_inode + st->ientry["shared_ino"].uint64_value();
|
||||
op->offset = align_down(shared_offset);
|
||||
// Allow unaligned shared reads
|
||||
auto pre = shared_offset-align_down(shared_offset);
|
||||
if (pre > 0)
|
||||
{
|
||||
op->iov.push_back(st->self->parent->kvfs->scrap_block.data(), pre);
|
||||
}
|
||||
op->iov.push_back(&st->shdr, sizeof(shared_file_header_t));
|
||||
op->iov.push_back(st->aligned_buf, data_size);
|
||||
auto post = (shared_offset+sizeof(shared_file_header_t)+data_size);
|
||||
post = align_up(post) - post;
|
||||
if (post > 0)
|
||||
{
|
||||
op->iov.push_back(st->self->parent->kvfs->scrap_block.data(), post);
|
||||
}
|
||||
op->len = pre+sizeof(shared_file_header_t)+data_size+post;
|
||||
op->callback = [st, state](cluster_op_t *op)
|
||||
{
|
||||
st->res = op->retval == op->len ? 0 : op->retval;
|
||||
st->res = op->retval == op->len ? 0 : (op->retval > 0 ? -EIO : op->retval);
|
||||
delete op;
|
||||
if (st->shdr.magic != SHARED_FILE_MAGIC_V1 || st->shdr.inode != st->ino)
|
||||
{
|
||||
// Got unrelated data - retry from the beginning
|
||||
st->allow_cache = false;
|
||||
free(st->aligned_buf);
|
||||
st->aligned_buf = NULL;
|
||||
nfs_kv_continue_write(st, 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (st->res < 0)
|
||||
{
|
||||
free(st->aligned_buf);
|
||||
st->aligned_buf = NULL;
|
||||
}
|
||||
nfs_kv_continue_write(st, state);
|
||||
}
|
||||
};
|
||||
st->self->parent->cli->execute(op);
|
||||
}
|
||||
|
@ -287,59 +344,128 @@ static void nfs_do_fsync(nfs_kv_write_state *st, int state)
|
|||
static bool nfs_do_shared_readmodify(nfs_kv_write_state *st, int base_state, int state, bool unshare)
|
||||
{
|
||||
assert(state <= base_state);
|
||||
if (state < base_state) {}
|
||||
else if (state == base_state) goto resume_0;
|
||||
assert(!st->aligned_buf);
|
||||
st->aligned_size = unshare
|
||||
? sizeof(shared_file_header_t) + ((st->new_size + st->self->parent->pool_alignment-1) & ~(st->self->parent->pool_alignment-1))
|
||||
: align_shared_size(st->self, st->new_size);
|
||||
st->aligned_buf = (uint8_t*)malloc_or_die(st->aligned_size);
|
||||
memset(st->aligned_buf + sizeof(shared_file_header_t), 0, st->offset);
|
||||
if (state < base_state) goto resume_0;
|
||||
else if (state == base_state) goto resume_1;
|
||||
resume_0:
|
||||
if (st->ientry["shared_ino"].uint64_value() != 0 &&
|
||||
st->ientry["size"].uint64_value() != 0)
|
||||
st->ientry["size"].uint64_value() != 0 &&
|
||||
(st->offset > 0 || (st->offset+st->size) < st->new_size))
|
||||
{
|
||||
// Read old data if shared non-empty
|
||||
// Read old data if shared non-empty and not fully overwritten
|
||||
nfs_do_shared_read(st, base_state);
|
||||
return false;
|
||||
resume_0:
|
||||
resume_1:
|
||||
if (st->res < 0)
|
||||
{
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res);
|
||||
return false;
|
||||
}
|
||||
auto hdr = ((shared_file_header_t*)st->aligned_buf);
|
||||
if (hdr->magic != SHARED_FILE_MAGIC_V1 || hdr->inode != st->ino)
|
||||
{
|
||||
// Got unrelated data - retry from the beginning
|
||||
st->allow_cache = false;
|
||||
free(st->aligned_buf);
|
||||
st->aligned_buf = NULL;
|
||||
nfs_kv_continue_write(st, 0);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
*((shared_file_header_t*)st->aligned_buf) = {
|
||||
.magic = SHARED_FILE_MAGIC_V1,
|
||||
.inode = st->ino,
|
||||
.alloc = st->aligned_size,
|
||||
};
|
||||
memcpy(st->aligned_buf + sizeof(shared_file_header_t) + st->offset, st->buf, st->size);
|
||||
memset(st->aligned_buf + sizeof(shared_file_header_t) + st->offset + st->size, 0,
|
||||
st->aligned_size - sizeof(shared_file_header_t) - st->offset - st->size);
|
||||
return true;
|
||||
}
|
||||
|
||||
static void nfs_do_align_write(nfs_kv_write_state *st, uint64_t ino, uint64_t offset, int state)
|
||||
static void add_zero(cluster_op_t *op, uint64_t count, std::vector<uint8_t> & zero_buf)
|
||||
{
|
||||
auto alignment = st->self->parent->pool_alignment;
|
||||
while (count > zero_buf.size())
|
||||
{
|
||||
op->iov.push_back(zero_buf.data(), zero_buf.size());
|
||||
count -= zero_buf.size();
|
||||
}
|
||||
if (count > 0)
|
||||
{
|
||||
op->iov.push_back(zero_buf.data(), count);
|
||||
count = 0;
|
||||
}
|
||||
}
|
||||
|
||||
static void nfs_do_shared_write(nfs_kv_write_state *st, int state)
|
||||
{
|
||||
st->shdr = {
|
||||
.magic = SHARED_FILE_MAGIC_V1,
|
||||
.inode = st->ino,
|
||||
.alloc = st->shared_alloc,
|
||||
};
|
||||
bool unaligned_is_free = true;
|
||||
uint64_t write_offset = st->shared_offset;
|
||||
uint64_t write_size = sizeof(shared_file_header_t) + st->new_size;
|
||||
uint64_t aligned_offset = write_offset;
|
||||
uint64_t aligned_size = write_size;
|
||||
if (unaligned_is_free)
|
||||
{
|
||||
// Zero pad if "unaligned but aligned"
|
||||
aligned_offset = align_down(write_offset);
|
||||
aligned_size = align_up(write_offset+write_size) - aligned_offset;
|
||||
}
|
||||
// FIXME: Do RMW if unaligned_is_free == false i.e. if we want tighter packing
|
||||
bool has_old = st->ientry["shared_ino"].uint64_value() != 0 &&
|
||||
st->ientry["size"].uint64_value() != 0;
|
||||
nfs_do_write(st->shared_inode, aligned_offset, aligned_size, [&](cluster_op_t *op)
|
||||
{
|
||||
if (unaligned_is_free && aligned_offset < write_offset)
|
||||
{
|
||||
// zero padding
|
||||
op->iov.push_back(st->self->parent->kvfs->zero_block.data(), write_offset-aligned_offset);
|
||||
}
|
||||
// header
|
||||
op->iov.push_back(&st->shdr, sizeof(shared_file_header_t));
|
||||
if (st->offset > 0)
|
||||
{
|
||||
if (has_old)
|
||||
{
|
||||
// old data
|
||||
op->iov.push_back(st->aligned_buf, st->offset);
|
||||
}
|
||||
else
|
||||
add_zero(op, st->offset, st->self->parent->kvfs->zero_block);
|
||||
}
|
||||
// new data
|
||||
op->iov.push_back(st->buf, st->size);
|
||||
if (st->offset+st->size < st->new_size)
|
||||
{
|
||||
if (has_old)
|
||||
{
|
||||
// old data
|
||||
op->iov.push_back(st->aligned_buf+st->offset+st->size, st->new_size-(st->offset+st->size));
|
||||
}
|
||||
else
|
||||
add_zero(op, st->offset, st->self->parent->kvfs->zero_block);
|
||||
}
|
||||
if (unaligned_is_free && (aligned_size+aligned_offset) > (write_size+write_offset))
|
||||
{
|
||||
// zero padding
|
||||
op->iov.push_back(st->self->parent->kvfs->zero_block.data(), aligned_size+aligned_offset - (write_size+write_offset));
|
||||
}
|
||||
}, st, state);
|
||||
}
|
||||
|
||||
static void nfs_do_align_write(nfs_kv_write_state *st, uint64_t ino, uint64_t offset, uint64_t shared_alloc, int state)
|
||||
{
|
||||
auto alignment = st->self->parent->kvfs->pool_alignment;
|
||||
uint64_t end = (offset+st->size);
|
||||
uint8_t *good_buf = st->buf;
|
||||
uint64_t good_offset = offset;
|
||||
uint64_t good_size = st->size;
|
||||
bool begin_shdr = false;
|
||||
uint64_t end_pad = 0;
|
||||
st->waiting++;
|
||||
st->rmw[0].st = NULL;
|
||||
st->rmw[1].st = NULL;
|
||||
if (offset % alignment)
|
||||
{
|
||||
if (shared_alloc && st->offset == 0 && (offset % alignment) == sizeof(shared_file_header_t))
|
||||
{
|
||||
// RMW can be skipped at shared beginning
|
||||
st->shdr = {
|
||||
.magic = SHARED_FILE_MAGIC_V1,
|
||||
.inode = st->ino,
|
||||
.alloc = shared_alloc,
|
||||
};
|
||||
begin_shdr = true;
|
||||
good_offset -= sizeof(shared_file_header_t);
|
||||
offset = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Requires read-modify-write in the beginning
|
||||
auto s = (alignment - (offset % alignment));
|
||||
|
@ -360,35 +486,48 @@ static void nfs_do_align_write(nfs_kv_write_state *st, uint64_t ino, uint64_t of
|
|||
.buf = st->buf,
|
||||
.size = s,
|
||||
};
|
||||
// FIXME: skip rmw at shared beginning
|
||||
nfs_do_rmw(&st->rmw[0]);
|
||||
}
|
||||
if ((offset+st->size) % alignment)
|
||||
}
|
||||
if ((end % alignment) &&
|
||||
(offset == 0 || end/alignment > (offset-1)/alignment))
|
||||
{
|
||||
// Requires read-modify-write in the end
|
||||
auto s = ((offset+st->size) % alignment);
|
||||
assert(st->offset+st->size <= st->new_size);
|
||||
if (st->offset+st->size == st->new_size)
|
||||
{
|
||||
// rmw can be skipped at end - we can just zero pad the request
|
||||
end_pad = alignment - (end % alignment);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto s = (end % alignment);
|
||||
if (good_size > s)
|
||||
good_size -= s;
|
||||
else
|
||||
good_size = 0;
|
||||
if ((offset+st->size)/alignment > offset/alignment)
|
||||
{
|
||||
st->rmw[1] = {
|
||||
.st = st,
|
||||
.continue_state = state,
|
||||
.ino = ino,
|
||||
.offset = offset + st->size-s,
|
||||
.buf = st->buf + st->size-s,
|
||||
.offset = end - s,
|
||||
.buf = st->buf + st->size - s,
|
||||
.size = s,
|
||||
};
|
||||
// FIXME: skip rmw at end
|
||||
nfs_do_rmw(&st->rmw[1]);
|
||||
}
|
||||
}
|
||||
if (good_size > 0)
|
||||
if (good_size > 0 || end_pad > 0 || begin_shdr)
|
||||
{
|
||||
// Normal write
|
||||
nfs_do_write(ino, good_offset, good_buf, good_size, st, state);
|
||||
nfs_do_write(ino, good_offset, (begin_shdr ? sizeof(shared_file_header_t) : 0)+good_size+end_pad, [&](cluster_op_t *op)
|
||||
{
|
||||
if (begin_shdr)
|
||||
op->iov.push_back(&st->shdr, sizeof(shared_file_header_t));
|
||||
op->iov.push_back(good_buf, good_size);
|
||||
if (end_pad)
|
||||
op->iov.push_back(st->self->parent->kvfs->zero_block.data(), end_pad);
|
||||
}, st, state);
|
||||
}
|
||||
st->waiting--;
|
||||
if (!st->waiting)
|
||||
|
@ -406,6 +545,7 @@ static std::string new_normal_ientry(nfs_kv_write_state *st)
|
|||
ni.erase("shared_alloc");
|
||||
ni.erase("shared_ver");
|
||||
ni["size"] = st->ext->cur_extend;
|
||||
ni["mtime"] = nfstime_now_str();
|
||||
return json11::Json(ni).dump();
|
||||
}
|
||||
|
||||
|
@ -418,6 +558,7 @@ static std::string new_moved_ientry(nfs_kv_write_state *st)
|
|||
ni["shared_alloc"] = st->aligned_size;
|
||||
ni.erase("shared_ver");
|
||||
ni["size"] = st->new_size;
|
||||
ni["mtime"] = nfstime_now_str();
|
||||
return json11::Json(ni).dump();
|
||||
}
|
||||
|
||||
|
@ -426,6 +567,7 @@ static std::string new_shared_ientry(nfs_kv_write_state *st)
|
|||
auto ni = st->ientry.object_items();
|
||||
ni.erase("empty");
|
||||
ni["size"] = st->new_size;
|
||||
ni["mtime"] = nfstime_now_str();
|
||||
ni["shared_ver"] = ni["shared_ver"].uint64_value()+1;
|
||||
return json11::Json(ni).dump();
|
||||
}
|
||||
|
@ -438,6 +580,7 @@ static std::string new_unshared_ientry(nfs_kv_write_state *st)
|
|||
ni.erase("shared_offset");
|
||||
ni.erase("shared_alloc");
|
||||
ni.erase("shared_ver");
|
||||
ni["mtime"] = nfstime_now_str();
|
||||
return json11::Json(ni).dump();
|
||||
}
|
||||
|
||||
|
@ -580,7 +723,6 @@ static void nfs_kv_continue_write(nfs_kv_write_state *st, int state)
|
|||
else if (state == 3) goto resume_3;
|
||||
else if (state == 4) goto resume_4;
|
||||
else if (state == 5) goto resume_5;
|
||||
else if (state == 6) goto resume_6;
|
||||
else if (state == 7) goto resume_7;
|
||||
else if (state == 8) goto resume_8;
|
||||
else if (state == 9) goto resume_9;
|
||||
|
@ -618,23 +760,23 @@ resume_1:
|
|||
cb(st->res == 0 ? -EINVAL : st->res);
|
||||
return;
|
||||
}
|
||||
st->was_immediate = st->self->parent->cli->get_immediate_commit(st->self->parent->fs_base_inode + st->ino);
|
||||
st->was_immediate = st->self->parent->cli->get_immediate_commit(st->self->parent->kvfs->fs_base_inode + st->ino);
|
||||
st->new_size = st->ientry["size"].uint64_value();
|
||||
if (st->new_size < st->offset + st->size)
|
||||
{
|
||||
st->new_size = st->offset + st->size;
|
||||
}
|
||||
if (st->offset + st->size + sizeof(shared_file_header_t) < st->self->parent->shared_inode_threshold)
|
||||
if (st->offset + st->size + sizeof(shared_file_header_t) < st->self->parent->kvfs->shared_inode_threshold)
|
||||
{
|
||||
if (st->ientry["size"].uint64_value() == 0 &&
|
||||
st->ientry["shared_ino"].uint64_value() == 0 ||
|
||||
st->ientry["empty"].bool_value() &&
|
||||
(st->ientry["size"].uint64_value() + sizeof(shared_file_header_t)) < st->self->parent->shared_inode_threshold ||
|
||||
(st->ientry["size"].uint64_value() + sizeof(shared_file_header_t)) < st->self->parent->kvfs->shared_inode_threshold ||
|
||||
st->ientry["shared_ino"].uint64_value() != 0 &&
|
||||
st->ientry["shared_alloc"].uint64_value() < align_shared_size(st->self, st->offset+st->size))
|
||||
st->ientry["shared_alloc"].uint64_value() < sizeof(shared_file_header_t)+st->offset+st->size)
|
||||
{
|
||||
// Either empty, or shared and requires moving into a larger place (redirect-write)
|
||||
allocate_shared_inode(st, 2, st->new_size);
|
||||
allocate_shared_inode(st, 2);
|
||||
return;
|
||||
resume_2:
|
||||
if (st->res < 0)
|
||||
|
@ -645,10 +787,17 @@ resume_2:
|
|||
}
|
||||
resume_3:
|
||||
if (!nfs_do_shared_readmodify(st, 3, state, false))
|
||||
{
|
||||
return;
|
||||
nfs_do_shared_write(st, 4); // FIXME assemble from parts, do not copy?
|
||||
}
|
||||
nfs_do_shared_write(st, 4);
|
||||
return;
|
||||
resume_4:
|
||||
if (st->aligned_buf)
|
||||
{
|
||||
free(st->aligned_buf);
|
||||
st->aligned_buf = NULL;
|
||||
}
|
||||
if (st->res < 0)
|
||||
{
|
||||
auto cb = std::move(st->cb);
|
||||
|
@ -668,12 +817,6 @@ resume_5:
|
|||
if (st->res < 0)
|
||||
{
|
||||
st->res2 = st->res;
|
||||
memset(st->aligned_buf, 0, st->aligned_size);
|
||||
nfs_do_shared_write(st, 6);
|
||||
return;
|
||||
resume_6:
|
||||
free(st->aligned_buf);
|
||||
st->aligned_buf = NULL;
|
||||
if (st->res2 == -EAGAIN)
|
||||
{
|
||||
goto resume_0;
|
||||
|
@ -689,11 +832,12 @@ resume_6:
|
|||
cb(0);
|
||||
return;
|
||||
}
|
||||
else if (st->ientry["shared_ino"].uint64_value() > 0)
|
||||
else if (st->ientry["shared_ino"].uint64_value() != 0)
|
||||
{
|
||||
// Non-empty, shared, can be updated in-place
|
||||
nfs_do_align_write(st, st->ientry["shared_ino"].uint64_value(),
|
||||
st->ientry["shared_offset"].uint64_value() + sizeof(shared_file_header_t) + st->offset, 7);
|
||||
st->ientry["shared_offset"].uint64_value() + sizeof(shared_file_header_t) + st->offset,
|
||||
st->ientry["shared_alloc"].uint64_value(), 7);
|
||||
return;
|
||||
resume_7:
|
||||
if (st->res == 0 && st->stable && !st->was_immediate)
|
||||
|
@ -728,16 +872,23 @@ resume_9:
|
|||
{
|
||||
if (st->ientry["size"].uint64_value() != 0)
|
||||
{
|
||||
assert(!st->aligned_buf);
|
||||
st->aligned_size = align_shared_size(st->self, st->ientry["size"].uint64_value());
|
||||
st->aligned_buf = (uint8_t*)malloc_or_die(st->aligned_size);
|
||||
nfs_do_shared_read(st, 10);
|
||||
return;
|
||||
resume_10:
|
||||
nfs_do_unshare_write(st, 11);
|
||||
return;
|
||||
resume_11:
|
||||
;
|
||||
if (st->aligned_buf)
|
||||
{
|
||||
free(st->aligned_buf);
|
||||
st->aligned_buf = NULL;
|
||||
}
|
||||
if (st->res < 0)
|
||||
{
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res);
|
||||
return;
|
||||
}
|
||||
}
|
||||
st->self->parent->db->set(kv_inode_key(st->ino), new_unshared_ientry(st), [st](int res)
|
||||
{
|
||||
|
@ -763,7 +914,7 @@ resume_12:
|
|||
st->ientry_text = new_unshared_ientry(st);
|
||||
}
|
||||
// Non-shared write
|
||||
nfs_do_align_write(st, st->ino, st->offset, 13);
|
||||
nfs_do_align_write(st, st->ino, st->offset, 0, 13);
|
||||
return;
|
||||
resume_13:
|
||||
if (st->res == 0 && st->stable && !st->was_immediate)
|
||||
|
|
|
@ -10,11 +10,18 @@
|
|||
|
||||
nfsstat3 vitastor_nfs_map_err(int err)
|
||||
{
|
||||
if (err < 0)
|
||||
{
|
||||
err = -err;
|
||||
}
|
||||
return (err == EINVAL ? NFS3ERR_INVAL
|
||||
: (err == ENOENT ? NFS3ERR_NOENT
|
||||
: (err == ENOSPC ? NFS3ERR_NOSPC
|
||||
: (err == EEXIST ? NFS3ERR_EXIST
|
||||
: (err == EIO ? NFS3ERR_IO : (err ? NFS3ERR_IO : NFS3_OK))))));
|
||||
: (err == EISDIR ? NFS3ERR_ISDIR
|
||||
: (err == ENOTDIR ? NFS3ERR_NOTDIR
|
||||
: (err == ENOTEMPTY ? NFS3ERR_NOTEMPTY
|
||||
: (err == EIO ? NFS3ERR_IO : (err ? NFS3ERR_IO : NFS3_OK)))))))));
|
||||
}
|
||||
|
||||
int nfs3_null_proc(void *opaque, rpc_op_t *rop)
|
||||
|
|
|
@ -10,9 +10,10 @@
|
|||
|
||||
#include <netinet/tcp.h>
|
||||
#include <sys/epoll.h>
|
||||
#include <sys/wait.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
//#include <signal.h>
|
||||
#include <signal.h>
|
||||
|
||||
#include "nfs/nfs.h"
|
||||
#include "nfs/rpc.h"
|
||||
|
@ -34,6 +35,10 @@ const char *exe_name = NULL;
|
|||
|
||||
nfs_proxy_t::~nfs_proxy_t()
|
||||
{
|
||||
if (kvfs)
|
||||
delete kvfs;
|
||||
if (blockfs)
|
||||
delete blockfs;
|
||||
if (db)
|
||||
delete db;
|
||||
if (cmd)
|
||||
|
@ -49,26 +54,30 @@ nfs_proxy_t::~nfs_proxy_t()
|
|||
delete ringloop;
|
||||
}
|
||||
|
||||
json11::Json::object nfs_proxy_t::parse_args(int narg, const char *args[])
|
||||
{
|
||||
json11::Json::object cfg;
|
||||
for (int i = 1; i < narg; i++)
|
||||
{
|
||||
if (!strcmp(args[i], "-h") || !strcmp(args[i], "--help"))
|
||||
{
|
||||
printf(
|
||||
"Vitastor NFS 3.0 proxy\n"
|
||||
"(c) Vitaliy Filippov, 2021-2022 (VNPL-1.1)\n"
|
||||
static const char* help_text =
|
||||
"Vitastor NFS 3.0 proxy " VERSION "\n"
|
||||
"(c) Vitaliy Filippov, 2021+ (VNPL-1.1)\n"
|
||||
"\n"
|
||||
"USAGE:\n"
|
||||
" %s [STANDARD OPTIONS] [OTHER OPTIONS]\n"
|
||||
" --fs <META> mount VitastorFS with metadata in image <META>\n"
|
||||
" --subdir <DIR> export images prefixed <DIR>/ (default empty - export all images)\n"
|
||||
" --portmap 0 do not listen on port 111 (portmap/rpcbind, requires root)\n"
|
||||
"vitastor-nfs (--fs <NAME> | --block) [-o <OPT>] mount <MOUNTPOINT>\n"
|
||||
" Start local filesystem server and mount file system to <MOUNTPOINT>.\n"
|
||||
" Use regular `umount <MOUNTPOINT>` to unmount the FS.\n"
|
||||
" The server will be automatically stopped when the FS is unmounted.\n"
|
||||
" -o|--options <OPT> Pass additional NFS mount options (ex.: -o async).\n"
|
||||
"\n"
|
||||
"vitastor-nfs (--fs <NAME> | --block) start\n"
|
||||
" Start network NFS server. Options:\n"
|
||||
" --bind <IP> bind service to <IP> address (default 0.0.0.0)\n"
|
||||
" --nfspath <PATH> set NFS export path to <PATH> (default is /)\n"
|
||||
" --port <PORT> use port <PORT> for NFS services (default is 2049)\n"
|
||||
" --pool <POOL> use <POOL> as default pool for new files (images)\n"
|
||||
" --portmap 0 do not listen on port 111 (portmap/rpcbind, requires root)\n"
|
||||
"\n"
|
||||
"OPTIONS:\n"
|
||||
" --fs <NAME> use VitastorFS with metadata in image <NAME>\n"
|
||||
" --block use pseudo-FS presenting images as files\n"
|
||||
" --pool <POOL> use <POOL> as default pool for new files\n"
|
||||
" --subdir <DIR> export <DIR> instead of root directory\n"
|
||||
" --nfspath <PATH> set NFS export path to <PATH> (default is /)\n"
|
||||
" --pidfile <FILE> write process ID to the specified file\n"
|
||||
" --logfile <FILE> log to the specified file\n"
|
||||
" --foreground 1 stay in foreground, do not daemonize\n"
|
||||
"\n"
|
||||
"NFS proxy is stateless if you use immediate_commit=all in your cluster and if\n"
|
||||
|
@ -76,17 +85,59 @@ json11::Json::object nfs_proxy_t::parse_args(int narg, const char *args[])
|
|||
"NFS proxies with L3 load balancing in this case.\n"
|
||||
"\n"
|
||||
"Example start and mount commands for a custom NFS port:\n"
|
||||
" %s --etcd_address 192.168.5.10:2379 --portmap 0 --port 2050 --pool testpool\n"
|
||||
" mount localhost:/ /mnt/ -o port=2050,mountport=2050,nfsvers=3,soft,nolock,tcp\n",
|
||||
exe_name, exe_name
|
||||
);
|
||||
" vitastor-nfs start --block --etcd_address 192.168.5.10:2379 --portmap 0 --port 2050 --pool testpool\n"
|
||||
" mount localhost:/ /mnt/ -o port=2050,mountport=2050,nfsvers=3,soft,nolock,tcp\n"
|
||||
"Or just:\n"
|
||||
" vitastor-nfs mount --block --pool testpool /mnt/\n"
|
||||
;
|
||||
|
||||
json11::Json::object nfs_proxy_t::parse_args(int narg, const char *args[])
|
||||
{
|
||||
json11::Json::object cfg;
|
||||
std::vector<std::string> cmd;
|
||||
for (int i = 1; i < narg; i++)
|
||||
{
|
||||
if (!strcmp(args[i], "-h") || !strcmp(args[i], "--help"))
|
||||
{
|
||||
printf("%s", help_text);
|
||||
exit(0);
|
||||
}
|
||||
else if (!strcmp(args[i], "-o") || !strcmp(args[i], "--options"))
|
||||
{
|
||||
if (i >= narg-1)
|
||||
{
|
||||
printf("%s", help_text);
|
||||
exit(0);
|
||||
}
|
||||
const std::string & old = cfg["options"].string_value();
|
||||
cfg["options"] = old != "" ? old+","+args[i+1] : args[i+1];
|
||||
}
|
||||
else if (args[i][0] == '-' && args[i][1] == '-')
|
||||
{
|
||||
const char *opt = args[i]+2;
|
||||
cfg[opt] = !strcmp(opt, "json") || i == narg-1 ? "1" : args[++i];
|
||||
cfg[opt] = !strcmp(opt, "json") || !strcmp(opt, "block") || i == narg-1 ? "1" : args[++i];
|
||||
}
|
||||
else
|
||||
{
|
||||
cmd.push_back(args[i]);
|
||||
}
|
||||
}
|
||||
if (cfg.find("block") == cfg.end() && cfg.find("fs") == cfg.end())
|
||||
{
|
||||
fprintf(stderr, "Specify one of --block or --fs NAME. Use vitastor-nfs --help for details\n");
|
||||
exit(1);
|
||||
}
|
||||
if (cmd.size() >= 2 && cmd[0] == "mount")
|
||||
{
|
||||
cfg["mount"] = cmd[1];
|
||||
}
|
||||
else if (cmd.size() >= 1 && cmd[0] == "start")
|
||||
{
|
||||
}
|
||||
else
|
||||
{
|
||||
printf("%s", help_text);
|
||||
exit(1);
|
||||
}
|
||||
return cfg;
|
||||
}
|
||||
|
@ -98,6 +149,9 @@ void nfs_proxy_t::run(json11::Json cfg)
|
|||
srand48(tv.tv_sec*1000000000 + tv.tv_nsec);
|
||||
server_id = (uint64_t)lrand48() | ((uint64_t)lrand48() << 31) | ((uint64_t)lrand48() << 62);
|
||||
// Parse options
|
||||
if (cfg["logfile"].string_value() != "")
|
||||
logfile = cfg["logfile"].string_value();
|
||||
pidfile = cfg["pidfile"].string_value();
|
||||
trace = cfg["log_level"].uint64_value() > 5 || cfg["trace"].uint64_value() > 0;
|
||||
bind_address = cfg["bind"].string_value();
|
||||
if (bind_address == "")
|
||||
|
@ -110,18 +164,6 @@ void nfs_proxy_t::run(json11::Json cfg)
|
|||
export_root = cfg["nfspath"].string_value();
|
||||
if (!export_root.size())
|
||||
export_root = "/";
|
||||
name_prefix = cfg["subdir"].string_value();
|
||||
{
|
||||
int e = name_prefix.size();
|
||||
while (e > 0 && name_prefix[e-1] == '/')
|
||||
e--;
|
||||
int s = 0;
|
||||
while (s < e && name_prefix[s] == '/')
|
||||
s++;
|
||||
name_prefix = name_prefix.substr(s, e-s);
|
||||
if (name_prefix.size())
|
||||
name_prefix += "/";
|
||||
}
|
||||
if (cfg["client_writeback_allowed"].is_null())
|
||||
{
|
||||
// NFS is always aware of fsync, so we allow write-back cache
|
||||
|
@ -130,6 +172,16 @@ void nfs_proxy_t::run(json11::Json cfg)
|
|||
obj["client_writeback_allowed"] = true;
|
||||
cfg = obj;
|
||||
}
|
||||
mountpoint = cfg["mount"].string_value();
|
||||
if (mountpoint != "")
|
||||
{
|
||||
bind_address = "127.0.0.1";
|
||||
nfs_port = 0;
|
||||
portmap_enabled = false;
|
||||
exit_on_umount = true;
|
||||
}
|
||||
mountopts = cfg["options"].string_value();
|
||||
fsname = cfg["fs"].string_value();
|
||||
// Create client
|
||||
ringloop = new ring_loop_t(RINGLOOP_DEFAULT_SIZE);
|
||||
epmgr = new epoll_manager_t(ringloop);
|
||||
|
@ -139,11 +191,6 @@ void nfs_proxy_t::run(json11::Json cfg)
|
|||
cmd->epmgr = epmgr;
|
||||
cmd->cli = cli;
|
||||
watch_stats();
|
||||
if (!fs_kv_inode)
|
||||
{
|
||||
blockfs = new block_fs_state_t();
|
||||
blockfs->init(this);
|
||||
}
|
||||
// Load image metadata
|
||||
while (!cli->is_ready())
|
||||
{
|
||||
|
@ -155,65 +202,15 @@ void nfs_proxy_t::run(json11::Json cfg)
|
|||
// Check default pool
|
||||
check_default_pool();
|
||||
// Check if we're using VitastorFS
|
||||
fs_kv_inode = cfg["fs"].uint64_value();
|
||||
if (fs_kv_inode)
|
||||
if (fsname == "")
|
||||
{
|
||||
if (!INODE_POOL(fs_kv_inode))
|
||||
{
|
||||
fprintf(stderr, "FS metadata inode number must include pool\n");
|
||||
exit(1);
|
||||
blockfs = new block_fs_state_t();
|
||||
blockfs->init(this, cfg);
|
||||
}
|
||||
}
|
||||
else if (cfg["fs"].is_string())
|
||||
else
|
||||
{
|
||||
for (auto & ic: cli->st_cli.inode_config)
|
||||
{
|
||||
if (ic.second.name == cfg["fs"].string_value())
|
||||
{
|
||||
fs_kv_inode = ic.first;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!fs_kv_inode)
|
||||
{
|
||||
fprintf(stderr, "FS metadata image \"%s\" does not exist\n", cfg["fs"].string_value().c_str());
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
readdir_getattr_parallel = cfg["readdir_getattr_parallel"].uint64_value();
|
||||
if (!readdir_getattr_parallel)
|
||||
readdir_getattr_parallel = 8;
|
||||
id_alloc_batch_size = cfg["id_alloc_batch_size"].uint64_value();
|
||||
if (!id_alloc_batch_size)
|
||||
id_alloc_batch_size = 200;
|
||||
if (fs_kv_inode)
|
||||
{
|
||||
// Open DB and wait
|
||||
int open_res = 0;
|
||||
bool open_done = false;
|
||||
db = new kv_dbw_t(cli);
|
||||
db->open(fs_kv_inode, cfg, [&](int res)
|
||||
{
|
||||
open_done = true;
|
||||
open_res = res;
|
||||
});
|
||||
while (!open_done)
|
||||
{
|
||||
ringloop->loop();
|
||||
if (open_done)
|
||||
break;
|
||||
ringloop->wait();
|
||||
}
|
||||
if (open_res < 0)
|
||||
{
|
||||
fprintf(stderr, "Failed to open key/value filesystem metadata index: %s (code %d)\n",
|
||||
strerror(-open_res), open_res);
|
||||
exit(1);
|
||||
}
|
||||
fs_base_inode = ((uint64_t)default_pool_id << (64-POOL_ID_BITS));
|
||||
fs_inode_count = ((uint64_t)1 << (64-POOL_ID_BITS)) - 1;
|
||||
shared_inode_threshold = pool_block_size;
|
||||
kvfs = new kv_fs_state_t;
|
||||
kvfs = new kv_fs_state_t();
|
||||
kvfs->init(this, cfg);
|
||||
}
|
||||
// Self-register portmap and NFS
|
||||
pmap.reg_ports.insert((portmap_id_t){
|
||||
|
@ -245,7 +242,7 @@ void nfs_proxy_t::run(json11::Json cfg)
|
|||
.addr = "0.0.0.0.0."+std::to_string(nfs_port),
|
||||
});
|
||||
// Create NFS socket and add it to epoll
|
||||
int nfs_socket = create_and_bind_socket(bind_address, nfs_port, 128, NULL);
|
||||
int nfs_socket = create_and_bind_socket(bind_address, nfs_port, 128, &listening_port);
|
||||
fcntl(nfs_socket, F_SETFL, fcntl(nfs_socket, F_GETFL, 0) | O_NONBLOCK);
|
||||
epmgr->tfd->set_fd_handler(nfs_socket, false, [this](int nfs_socket, int epoll_events)
|
||||
{
|
||||
|
@ -277,24 +274,43 @@ void nfs_proxy_t::run(json11::Json cfg)
|
|||
}
|
||||
});
|
||||
}
|
||||
if (mountpoint != "")
|
||||
{
|
||||
mount_fs();
|
||||
}
|
||||
if (cfg["foreground"].is_null())
|
||||
{
|
||||
daemonize();
|
||||
}
|
||||
while (true)
|
||||
if (pidfile != "")
|
||||
{
|
||||
write_pid();
|
||||
}
|
||||
while (!finished)
|
||||
{
|
||||
ringloop->loop();
|
||||
ringloop->wait();
|
||||
}
|
||||
// Destroy the client
|
||||
cli->flush();
|
||||
if (kvfs)
|
||||
{
|
||||
delete kvfs;
|
||||
kvfs = NULL;
|
||||
}
|
||||
if (blockfs)
|
||||
{
|
||||
delete blockfs;
|
||||
blockfs = NULL;
|
||||
}
|
||||
if (db)
|
||||
{
|
||||
delete db;
|
||||
db = NULL;
|
||||
}
|
||||
delete cli;
|
||||
delete epmgr;
|
||||
delete ringloop;
|
||||
kvfs = NULL;
|
||||
db = NULL;
|
||||
cli = NULL;
|
||||
epmgr = NULL;
|
||||
ringloop = NULL;
|
||||
|
@ -368,7 +384,7 @@ void nfs_proxy_t::parse_stats(etcd_kv_t & kv)
|
|||
inode_t inode_num = 0;
|
||||
char null_byte = 0;
|
||||
int scanned = sscanf(key.c_str() + cli->st_cli.etcd_prefix.length()+13, "%u/%ju%c", &pool_id, &inode_num, &null_byte);
|
||||
if (scanned != 2 || !pool_id || pool_id >= POOL_ID_MAX || !inode_num)
|
||||
if (scanned != 2 || !pool_id || pool_id >= POOL_ID_MAX)
|
||||
{
|
||||
fprintf(stderr, "Bad etcd key %s, ignoring\n", key.c_str());
|
||||
}
|
||||
|
@ -402,8 +418,6 @@ void nfs_proxy_t::check_default_pool()
|
|||
auto pool_it = cli->st_cli.pool_config.begin();
|
||||
default_pool_id = pool_it->first;
|
||||
default_pool = pool_it->second.name;
|
||||
pool_block_size = pool_it->second.pg_stripe_size;
|
||||
pool_alignment = pool_it->second.bitmap_granularity;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -418,8 +432,6 @@ void nfs_proxy_t::check_default_pool()
|
|||
if (p.second.name == default_pool)
|
||||
{
|
||||
default_pool_id = p.first;
|
||||
pool_block_size = p.second.pg_stripe_size;
|
||||
pool_alignment = p.second.bitmap_granularity;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -438,12 +450,14 @@ void nfs_proxy_t::do_accept(int listen_fd)
|
|||
int nfs_fd = 0;
|
||||
while ((nfs_fd = accept(listen_fd, (struct sockaddr *)&addr, &addr_size)) >= 0)
|
||||
{
|
||||
if (trace)
|
||||
fprintf(stderr, "New client %d: connection from %s\n", nfs_fd, addr_to_string(addr).c_str());
|
||||
active_connections++;
|
||||
fcntl(nfs_fd, F_SETFL, fcntl(nfs_fd, F_GETFL, 0) | O_NONBLOCK);
|
||||
int one = 1;
|
||||
setsockopt(nfs_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
||||
auto cli = new nfs_client_t();
|
||||
if (fs_kv_inode)
|
||||
if (kvfs)
|
||||
nfs_kv_procs(cli);
|
||||
else
|
||||
nfs_block_procs(cli);
|
||||
|
@ -458,8 +472,12 @@ void nfs_proxy_t::do_accept(int listen_fd)
|
|||
// Handle incoming event
|
||||
if (epoll_events & EPOLLRDHUP)
|
||||
{
|
||||
auto parent = cli->parent;
|
||||
if (parent->trace)
|
||||
fprintf(stderr, "Client %d disconnected\n", nfs_fd);
|
||||
cli->stop();
|
||||
parent->active_connections--;
|
||||
parent->check_exit();
|
||||
return;
|
||||
}
|
||||
cli->epoll_events |= epoll_events;
|
||||
|
@ -570,7 +588,7 @@ void nfs_client_t::handle_read(int result)
|
|||
read_msg.msg_iovlen = 0;
|
||||
if (deref())
|
||||
return;
|
||||
if (result <= 0 && result != -EAGAIN && result != -EINTR)
|
||||
if (result <= 0 && result != -EAGAIN && result != -EINTR && result != -ECANCELED)
|
||||
{
|
||||
printf("Failed read from client %d: %d (%s)\n", nfs_fd, result, strerror(-result));
|
||||
stop();
|
||||
|
@ -665,8 +683,8 @@ void nfs_client_t::handle_read(int result)
|
|||
return;
|
||||
}
|
||||
}
|
||||
submit_read(0);
|
||||
}
|
||||
submit_read(0);
|
||||
}
|
||||
|
||||
void nfs_client_t::submit_send()
|
||||
|
@ -994,8 +1012,159 @@ void nfs_proxy_t::daemonize()
|
|||
close(1);
|
||||
close(2);
|
||||
open("/dev/null", O_RDONLY);
|
||||
open("/dev/null", O_WRONLY);
|
||||
open("/dev/null", O_WRONLY);
|
||||
open(logfile.c_str(), O_WRONLY|O_APPEND|O_CREAT, 0666);
|
||||
open(logfile.c_str(), O_WRONLY|O_APPEND|O_CREAT, 0666);
|
||||
}
|
||||
|
||||
void nfs_proxy_t::write_pid()
|
||||
{
|
||||
int fd = open(pidfile.c_str(), O_WRONLY|O_CREAT|O_TRUNC, 0666);
|
||||
if (fd < 0)
|
||||
{
|
||||
fprintf(stderr, "Failed to create pid file %s: %s (code %d)\n", pidfile.c_str(), strerror(errno), errno);
|
||||
return;
|
||||
}
|
||||
auto pid = std::to_string(getpid());
|
||||
if (write(fd, pid.c_str(), pid.size()) < 0)
|
||||
{
|
||||
fprintf(stderr, "Failed to write pid to %s: %s (code %d)\n", pidfile.c_str(), strerror(errno), errno);
|
||||
}
|
||||
close(fd);
|
||||
}
|
||||
|
||||
static pid_t wanted_pid = 0;
|
||||
static bool child_finished = false;
|
||||
static int child_status = -1;
|
||||
|
||||
void single_child_handler(int signal)
|
||||
{
|
||||
child_finished = true;
|
||||
waitpid(wanted_pid, &child_status, WNOHANG);
|
||||
}
|
||||
|
||||
void nfs_proxy_t::mount_fs()
|
||||
{
|
||||
check_already_mounted();
|
||||
signal(SIGCHLD, single_child_handler);
|
||||
auto pid = fork();
|
||||
if (pid < 0)
|
||||
{
|
||||
fprintf(stderr, "Failed to fork: %s (code %d)\n", strerror(errno), errno);
|
||||
exit(1);
|
||||
}
|
||||
if (pid > 0)
|
||||
{
|
||||
// Parent - loop and wait until child finishes
|
||||
wanted_pid = pid;
|
||||
while (!child_finished)
|
||||
{
|
||||
ringloop->loop();
|
||||
ringloop->wait();
|
||||
}
|
||||
if (!WIFEXITED(child_status) || WEXITSTATUS(child_status) != 0)
|
||||
{
|
||||
// Mounting failed
|
||||
exit(1);
|
||||
}
|
||||
if (fsname != "")
|
||||
fprintf(stderr, "Successfully mounted VitastorFS %s at %s\n", fsname.c_str(), mountpoint.c_str());
|
||||
else
|
||||
fprintf(stderr, "Successfully mounted Vitastor pseudo-FS at %s\n", mountpoint.c_str());
|
||||
finished = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Child
|
||||
std::string src = ("localhost:"+export_root);
|
||||
std::string opts = ("port="+std::to_string(listening_port)+",mountport="+std::to_string(listening_port)+",nfsvers=3,nolock,tcp");
|
||||
bool hard = false, async = false;
|
||||
for (auto & opt: explode(",", mountopts, true))
|
||||
{
|
||||
if (opt == "hard")
|
||||
hard = true;
|
||||
else if (opt == "async")
|
||||
async = true;
|
||||
else if (opt.substr(0, 4) != "port" && opt.substr(0, 9) != "mountport" &&
|
||||
opt.substr(0, 7) != "nfsvers" && opt.substr(0, 5) != "proto" &&
|
||||
opt != "udp" && opt != "tcp" && opt != "rdma")
|
||||
{
|
||||
opts += ","+opt;
|
||||
}
|
||||
}
|
||||
if (!hard)
|
||||
opts += ",soft";
|
||||
if (!async)
|
||||
opts += ",sync";
|
||||
const char *args[] = { "mount", src.c_str(), mountpoint.c_str(), "-o", opts.c_str(), NULL };
|
||||
execvp("mount", (char* const*)args);
|
||||
fprintf(stderr, "Failed to run mount %s %s -o %s: %s (code %d)\n",
|
||||
src.c_str(), mountpoint.c_str(), opts.c_str(), strerror(errno), errno);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
void nfs_proxy_t::check_already_mounted()
|
||||
{
|
||||
std::string realpoint = realpath_str(mountpoint, false);
|
||||
if (realpoint == "")
|
||||
{
|
||||
return;
|
||||
}
|
||||
std::string mountstr = read_file("/proc/mounts");
|
||||
if (mountstr == "")
|
||||
{
|
||||
return;
|
||||
}
|
||||
auto mounts = explode("\n", mountstr, true);
|
||||
for (auto & str: mounts)
|
||||
{
|
||||
auto mnt = explode(" ", str, true);
|
||||
if (mnt.size() >= 2 && mnt[1] == realpoint)
|
||||
{
|
||||
fprintf(stderr, "%s is already mounted\n", mountpoint.c_str());
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void nfs_proxy_t::check_exit()
|
||||
{
|
||||
if (active_connections || !exit_on_umount)
|
||||
{
|
||||
return;
|
||||
}
|
||||
std::string mountstr = read_file("/proc/mounts");
|
||||
if (mountstr == "")
|
||||
{
|
||||
return;
|
||||
}
|
||||
auto port_opt = "port="+std::to_string(listening_port);
|
||||
auto mountport_opt = "mountport="+std::to_string(listening_port);
|
||||
auto mounts = explode("\n", mountstr, true);
|
||||
for (auto & str: mounts)
|
||||
{
|
||||
auto opts = explode(" ", str, true);
|
||||
if (opts[2].size() >= 3 && opts[2].substr(0, 3) == "nfs" && opts.size() >= 4)
|
||||
{
|
||||
opts = explode(",", opts[3], true);
|
||||
bool port_found = false;
|
||||
bool addr_found = false;
|
||||
for (auto & opt: opts)
|
||||
{
|
||||
if (opt == port_opt || opt == mountport_opt)
|
||||
port_found = true;
|
||||
if (opt == "addr=127.0.0.1" || opt == "mountaddr=127.0.0.1")
|
||||
addr_found = true;
|
||||
}
|
||||
if (port_found && addr_found)
|
||||
{
|
||||
// OK, do not unmount
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Not found, unmount
|
||||
finished = true;
|
||||
}
|
||||
|
||||
int main(int narg, const char *args[])
|
||||
|
|
|
@ -21,23 +21,24 @@ class nfs_proxy_t
|
|||
{
|
||||
public:
|
||||
std::string bind_address;
|
||||
std::string name_prefix;
|
||||
uint64_t fsid = 1;
|
||||
uint64_t server_id = 0;
|
||||
std::string default_pool;
|
||||
std::string export_root;
|
||||
bool portmap_enabled;
|
||||
unsigned nfs_port;
|
||||
uint64_t fs_kv_inode = 0;
|
||||
uint64_t fs_base_inode = 0;
|
||||
uint64_t fs_inode_count = 0;
|
||||
int readdir_getattr_parallel = 8, id_alloc_batch_size = 200;
|
||||
int trace = 0;
|
||||
std::string logfile = "/dev/null";
|
||||
std::string pidfile;
|
||||
bool exit_on_umount = false;
|
||||
std::string mountpoint;
|
||||
std::string mountopts;
|
||||
std::string fsname;
|
||||
|
||||
pool_id_t default_pool_id;
|
||||
uint64_t pool_block_size = 0;
|
||||
uint64_t pool_alignment = 0;
|
||||
uint64_t shared_inode_threshold = 0;
|
||||
int active_connections = 0;
|
||||
bool finished = false;
|
||||
int listening_port = 0;
|
||||
pool_id_t default_pool_id = 0;
|
||||
|
||||
portmap_service_t pmap;
|
||||
ring_loop_t *ringloop = NULL;
|
||||
|
@ -64,6 +65,10 @@ public:
|
|||
void check_default_pool();
|
||||
void do_accept(int listen_fd);
|
||||
void daemonize();
|
||||
void write_pid();
|
||||
void mount_fs();
|
||||
void check_already_mounted();
|
||||
void check_exit();
|
||||
};
|
||||
|
||||
struct rpc_cur_buffer_t
|
||||
|
|
|
@ -239,6 +239,7 @@ class osd_t
|
|||
void report_statistics();
|
||||
void report_pg_state(pg_t & pg);
|
||||
void report_pg_states();
|
||||
void apply_no_inode_stats();
|
||||
void apply_pg_count();
|
||||
void apply_pg_config();
|
||||
|
||||
|
|
|
@ -388,9 +388,18 @@ void osd_t::on_change_etcd_state_hook(std::map<std::string, etcd_kv_t> & changes
|
|||
etcd_global_config = changes[st_cli.etcd_prefix+"/config/global"].value.object_items();
|
||||
parse_config(false);
|
||||
}
|
||||
bool pools = changes.find(st_cli.etcd_prefix+"/config/pools") != changes.end();
|
||||
if (pools)
|
||||
{
|
||||
apply_no_inode_stats();
|
||||
}
|
||||
if (run_primary)
|
||||
{
|
||||
bool pgs = changes.find(st_cli.etcd_prefix+"/config/pgs") != changes.end();
|
||||
if (pools || pgs)
|
||||
{
|
||||
apply_pg_count();
|
||||
}
|
||||
apply_pg_config();
|
||||
}
|
||||
}
|
||||
|
@ -414,6 +423,8 @@ void osd_t::on_reload_config_hook(json11::Json::object & global_config)
|
|||
// Acquire lease
|
||||
void osd_t::acquire_lease()
|
||||
{
|
||||
// Apply no_inode_stats before the first statistics report
|
||||
apply_no_inode_stats();
|
||||
// Maximum lease TTL is (report interval) + retries * (timeout + repeat interval)
|
||||
st_cli.etcd_call("/lease/grant", json11::Json::object {
|
||||
{ "TTL", etcd_report_interval+(st_cli.max_etcd_attempts*(2*st_cli.etcd_quick_timeout)+999)/1000 }
|
||||
|
@ -602,9 +613,30 @@ void osd_t::on_load_pgs_hook(bool success)
|
|||
else
|
||||
{
|
||||
peering_state &= ~OSD_LOADING_PGS;
|
||||
apply_no_inode_stats();
|
||||
if (run_primary)
|
||||
{
|
||||
apply_pg_count();
|
||||
apply_pg_config();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::apply_no_inode_stats()
|
||||
{
|
||||
if (!bs)
|
||||
{
|
||||
return;
|
||||
}
|
||||
std::vector<uint64_t> no_inode_stats;
|
||||
for (auto & pool_item: st_cli.pool_config)
|
||||
{
|
||||
if (pool_item.second.no_inode_stats)
|
||||
{
|
||||
no_inode_stats.push_back(pool_item.first);
|
||||
}
|
||||
}
|
||||
bs->set_no_inode_stats(no_inode_stats);
|
||||
}
|
||||
|
||||
void osd_t::apply_pg_count()
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
// Copyright (c) Vitaliy Filippov, 2019+
|
||||
// License: VNPL-1.1 (see README.md for details)
|
||||
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
|
||||
|
||||
#include <assert.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
#include "str_util.h"
|
||||
|
||||
std::string base64_encode(const std::string &in)
|
||||
|
@ -304,6 +305,23 @@ std::string read_all_fd(int fd)
|
|||
return res;
|
||||
}
|
||||
|
||||
std::string read_file(std::string file, bool allow_enoent)
|
||||
{
|
||||
std::string res;
|
||||
int fd = open(file.c_str(), O_RDONLY);
|
||||
if (fd < 0 || (res = read_all_fd(fd)) == "")
|
||||
{
|
||||
int err = errno;
|
||||
if (fd >= 0)
|
||||
close(fd);
|
||||
if (!allow_enoent || err != ENOENT)
|
||||
fprintf(stderr, "Failed to read %s: %s (code %d)\n", file.c_str(), strerror(err), err);
|
||||
return "";
|
||||
}
|
||||
close(fd);
|
||||
return res;
|
||||
}
|
||||
|
||||
std::string str_repeat(const std::string & str, int times)
|
||||
{
|
||||
std::string r;
|
||||
|
@ -410,3 +428,16 @@ std::string auto_addslashes(const std::string & str)
|
|||
}
|
||||
return res+"\"";
|
||||
}
|
||||
|
||||
std::string realpath_str(std::string path, bool nofail)
|
||||
{
|
||||
char *p = realpath((char*)path.c_str(), NULL);
|
||||
if (!p)
|
||||
{
|
||||
fprintf(stderr, "Failed to resolve %s: %s\n", path.c_str(), strerror(errno));
|
||||
return nofail ? path : "";
|
||||
}
|
||||
std::string rp(p);
|
||||
free(p);
|
||||
return rp;
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
// Copyright (c) Vitaliy Filippov, 2019+
|
||||
// License: VNPL-1.1 (see README.md for details)
|
||||
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
|
||||
|
||||
#pragma once
|
||||
#include <stdint.h>
|
||||
|
@ -18,9 +18,11 @@ std::string format_size(uint64_t size, bool nobytes = false);
|
|||
void print_help(const char *help_text, std::string exe_name, std::string cmd, bool all);
|
||||
uint64_t parse_time(std::string time_str, bool *ok = NULL);
|
||||
std::string read_all_fd(int fd);
|
||||
std::string read_file(std::string file, bool allow_enoent = false);
|
||||
std::string str_repeat(const std::string & str, int times);
|
||||
size_t utf8_length(const std::string & s);
|
||||
size_t utf8_length(const char *s);
|
||||
std::vector<std::string> explode(const std::string & sep, const std::string & value, bool trim);
|
||||
std::string scan_escaped(const std::string & cmd, size_t & pos);
|
||||
std::string auto_addslashes(const std::string & str);
|
||||
std::string realpath_str(std::string path, bool nofail = true);
|
||||
|
|
|
@ -4,7 +4,7 @@ PG_COUNT=16
|
|||
. `dirname $0`/run_3osds.sh
|
||||
|
||||
build/src/vitastor-cli --etcd_address $ETCD_URL create -s 10G fsmeta
|
||||
build/src/vitastor-nfs --fs fsmeta --etcd_address $ETCD_URL --portmap 0 --port 2050 --foreground 1 --trace 1 >>./testdata/nfs.log 2>&1 &
|
||||
build/src/vitastor-nfs start --fs fsmeta --etcd_address $ETCD_URL --portmap 0 --port 2050 --foreground 1 --trace 1 >>./testdata/nfs.log 2>&1 &
|
||||
NFS_PID=$!
|
||||
|
||||
mkdir -p testdata/nfs
|
||||
|
@ -40,7 +40,15 @@ cp ./testdata/nfs/f1 ./testdata/f1_nfs
|
|||
diff ./testdata/f1_90k ./testdata/nfs/f1
|
||||
format_green "90K data ok"
|
||||
|
||||
# move it inplace
|
||||
# test partial shared overwrite
|
||||
dd if=/dev/urandom of=./testdata/f1_90k bs=9317 count=1 seek=5 conv=notrunc
|
||||
dd if=./testdata/f1_90k of=./testdata/nfs/f1 bs=9317 count=1 skip=5 seek=5 conv=notrunc
|
||||
sudo umount ./testdata/nfs/
|
||||
sudo mount localhost:/ ./testdata/nfs -o port=2050,mountport=2050,nfsvers=3,soft,nolock,tcp
|
||||
diff ./testdata/f1_90k ./testdata/nfs/f1
|
||||
format_green "partial inplace shared overwrite ok"
|
||||
|
||||
# move it to a larger shared space
|
||||
dd if=/dev/urandom of=./testdata/f1_110k bs=110k count=1
|
||||
cp testdata/f1_110k ./testdata/nfs/f1
|
||||
sudo umount ./testdata/nfs/
|
||||
|
@ -146,4 +154,14 @@ sudo mount localhost:/ ./testdata/nfs -o port=2050,mountport=2050,nfsvers=3,soft
|
|||
if ls ./testdata/nfs | grep smallfile; then false; fi
|
||||
format_green "rm small ok"
|
||||
|
||||
# rename over existing
|
||||
echo ZXCVBN > ./testdata/nfs/over1
|
||||
mv ./testdata/nfs/over1 ./testdata/nfs/linked2
|
||||
sudo umount ./testdata/nfs/
|
||||
sudo mount localhost:/ ./testdata/nfs -o port=2050,mountport=2050,nfsvers=3,soft,nolock,tcp
|
||||
if ls ./testdata/nfs | grep over1; then false; fi
|
||||
[[ "`cat ./testdata/nfs/linked2`" = "ZXCVBN" ]]
|
||||
[[ "`cat ./testdata/nfs/linked1`" = "BABABA" ]]
|
||||
format_green "rename over existing file ok"
|
||||
|
||||
format_green OK
|
||||
|
|
Loading…
Reference in New Issue