Compare commits

..

12 Commits

Author SHA1 Message Date
118a1cd521 Experiment v2 2024-06-27 00:38:09 +03:00
6da49d38fe Experimental: support multiple io_uring-based I/O threads in client 2024-06-27 00:38:09 +03:00
ca63cd507d Fix possible infinite loop in flusher (surprisingly reproduced in test_write.sh with iothreads) 2024-06-27 00:38:01 +03:00
ea0d72289c Treat copied buffers as written only after completing the write in client
SYNC operation fsyncs only completed operations, so treating writes as "eligible
for fsync" before actually completing them is incorrect

It affected SCHEME=ec test_heal.sh (with immediate_commit=none) test - it was
flapping with lost writes - some non-fsynced writes were legitimately lost by
the OSD, but weren't repeated by the client
2024-06-20 02:11:53 +03:00
e400a851f4 Repeat dirty buffer flushes on any PG primary change because the new primary may not know about unfinished operations of the old primary 2024-06-19 00:28:26 +03:00
0fec7a9fea Drop dirty peer connections also when stopping PG to guarantee that clients do not miss fsync 2024-06-19 00:28:26 +03:00
b9de2a92a9 Print OSD performance stats 2024-06-17 13:02:58 +03:00
5360a70853 Make OSD also report derived stats 2024-06-17 13:02:52 +03:00
4c2328eb13 Implement ls-osd command 2024-06-17 02:22:14 +03:00
313daef12d Slightly decopypaste etcd key parsing 2024-06-17 01:38:42 +03:00
ad9c12e1b9 Fix Pseudo-FS initialization leading to ENOENTs some time after start 2024-06-16 23:43:09 +03:00
4473eb5512 Fix slow & failing CAS layer merge 2024-06-14 02:15:49 +03:00
42 changed files with 937 additions and 538 deletions

View File

@@ -1,191 +0,0 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
const fs = require('fs');
const AntiEtcd = require('antietcd');
const vitastor_persist_filter = require('./vitastor_persist_filter.js');
const { b64, local_ips } = require('./utils.js');
class AntiEtcdAdapter
{
static start_antietcd(config)
{
let antietcd;
if (config.use_antietcd)
{
let fileConfig = {};
if (fs.existsSync(config.config_path||'/etc/vitastor/vitastor.conf'))
{
fileConfig = JSON.parse(fs.readFileSync(config.config_path||'/etc/vitastor/vitastor.conf', { encoding: 'utf-8' }));
}
let mergedConfig = { ...fileConfig, ...config };
let cluster = mergedConfig.etcd_address;
if (!(cluster instanceof Array))
cluster = cluster ? (''+(cluster||'')).split(/,+/) : [];
cluster = Object.keys(cluster.reduce((a, url) =>
{
a[url.toLowerCase().replace(/^https?:\/\//, '').replace(/\/.*$/, '')] = true;
return a;
}, {}));
const cfg_port = mergedConfig.antietcd_port;
const is_local = local_ips(true).reduce((a, c) => { a[c] = true; return a; }, {});
const selected = cluster.map(s => s.split(':', 2)).filter(ip => is_local[ip[0]] && (!cfg_port || ip[1] == cfg_port));
if (selected.length > 1)
{
console.error('More than 1 etcd_address matches local IPs, please specify port');
process.exit(1);
}
else if (selected.length == 1)
{
const antietcd_config = {
ip: selected[0][0],
port: selected[0][1],
data: mergedConfig.antietcd_data_file || ((mergedConfig.antietcd_data_dir || '/var/lib/vitastor') + '/mon_'+selected[0][1]+'.json.gz'),
persist_filter: vitastor_persist_filter(mergedConfig.etcd_prefix || '/vitastor'),
node_id: selected[0][0]+':'+selected[0][1], // node_id = ip:port
cluster: (cluster.length == 1 ? null : cluster),
cluster_key: (mergedConfig.etcd_prefix || '/vitastor'),
stale_read: 1,
};
for (const key in config)
{
if (key.substr(0, 9) === 'antietcd_')
{
const noprefix = key.substr(9);
if (!(noprefix in antietcd_config))
{
antietcd_config[noprefix] = config[key];
}
}
}
antietcd = new AntiEtcd(antietcd_config);
antietcd.start();
}
else
{
console.log('Antietcd is enabled, but etcd_address does not contain local IPs, proceeding without it');
}
}
return antietcd;
}
constructor(mon, antietcd)
{
this.mon = mon;
this.antietcd = antietcd;
this.on_leader = [];
this.on_change = (st) =>
{
if (st.state === 'leader')
{
for (const cb of this.on_leader)
{
cb();
}
this.on_leader = [];
}
};
this.antietcd.on('raftchange', this.on_change);
}
parse_config(/*config*/)
{
}
stop_watcher()
{
this.antietcd.off('raftchange', this.on_change);
const watch_id = this.watch_id;
if (watch_id)
{
this.watch_id = null;
this.antietcd.cancel_watch(watch_id).catch(console.error);
}
}
async start_watcher()
{
if (this.watch_id)
{
await this.antietcd.cancel_watch(this.watch_id);
this.watch_id = null;
}
const watch_id = await this.antietcd.create_watch({
key: b64(this.mon.config.etcd_prefix+'/'),
range_end: b64(this.mon.config.etcd_prefix+'0'),
start_revision: ''+this.mon.etcd_watch_revision,
watch_id: 1,
progress_notify: true,
}, (message) =>
{
this.mon.on_message(message.result);
});
console.log('Successfully subscribed to antietcd revision '+this.antietcd.etctree.mod_revision);
this.watch_id = watch_id;
}
async become_master()
{
if (!this.antietcd.raft)
{
console.log('Running in non-clustered mode');
}
else
{
console.log('Waiting to become master');
await new Promise(ok => this.on_leader.push(ok));
}
const state = { ...this.mon.get_mon_state(), id: ''+this.mon.etcd_lease_id };
await this.etcd_call('/kv/txn', {
success: [ { requestPut: { key: b64(this.mon.config.etcd_prefix+'/mon/master'), value: b64(JSON.stringify(state)), lease: ''+this.mon.etcd_lease_id } } ],
}, this.mon.config.etcd_start_timeout, 0);
if (this.antietcd.raft)
{
console.log('Became master');
}
}
async etcd_call(path, body, timeout, retries)
{
let retry = 0;
if (retries >= 0 && retries < 1)
{
retries = 1;
}
let prev = 0;
while (retries < 0 || retry < retries)
{
retry++;
if (this.mon.stopped)
{
throw new Error('Monitor instance is stopped');
}
try
{
if (Date.now()-prev < timeout)
{
await new Promise(ok => setTimeout(ok, timeout-(Date.now()-prev)));
}
prev = Date.now();
const res = await this.antietcd.api(path.replace(/^\/+/, '').replace(/\/+$/, '').replace(/\/+/g, '_'), body);
if (res.error)
{
console.error('Failed to query antietcd '+path+' (retry '+retry+'/'+retries+'): '+res.error);
}
else
{
return res;
}
}
catch (e)
{
console.error('Failed to query antietcd '+path+' (retry '+retry+'/'+retries+'): '+e.stack);
}
}
throw new Error('Failed to query antietcd ('+retries+' retries)');
}
}
module.exports = AntiEtcdAdapter;

View File

@@ -3,7 +3,6 @@
const http = require('http');
const WebSocket = require('ws');
const { b64, local_ips } = require('./utils.js');
const MON_STOPPED = 'Monitor instance is stopped';
@@ -24,7 +23,7 @@ class EtcdAdapter
parse_etcd_addresses(addrs)
{
const is_local_ip = local_ips(true).reduce((a, c) => { a[c] = true; return a; }, {});
const is_local_ip = this.mon.local_ips(true).reduce((a, c) => { a[c] = true; return a; }, {});
this.etcd_local = [];
this.etcd_urls = [];
this.selected_etcd_url = null;
@@ -349,4 +348,9 @@ function POST(url, body, timeout)
});
}
function b64(str)
{
return Buffer.from(str).toString('base64');
}
module.exports = EtcdAdapter;

View File

@@ -4,7 +4,6 @@
const fs = require('fs');
const crypto = require('crypto');
const os = require('os');
const AntiEtcdAdapter = require('./antietcd_adapter.js');
const EtcdAdapter = require('./etcd_adapter.js');
const { etcd_tree, etcd_allow, etcd_nonempty_keys } = require('./etcd_schema.js');
const { validate_pool_cfg } = require('./pool_config.js');
@@ -12,23 +11,17 @@ const { sum_op_stats, sum_object_counts, sum_inode_stats, serialize_bigints } =
const stableStringify = require('./stable-stringify.js');
const { scale_pg_history } = require('./pg_utils.js');
const { get_osd_tree } = require('./osd_tree.js');
const { b64, de64, local_ips } = require('./utils.js');
const { recheck_primary, save_new_pgs_txn, generate_pool_pgs } = require('./pg_gen.js');
class Mon
{
static run_forever(config)
{
let antietcd = AntiEtcdAdapter.start_antietcd(config);
let mon;
const run = () =>
{
console.log('Starting Monitor');
const my_mon = new Mon(config);
my_mon.etcd = antietcd
? new AntiEtcdAdapter(my_mon, antietcd)
: new EtcdAdapter(my_mon);
my_mon.etcd.parse_config(my_mon.config);
mon = my_mon;
my_mon.on_die = () =>
{
@@ -65,6 +58,8 @@ class Mon
this.state = JSON.parse(JSON.stringify(etcd_tree));
this.prev_stats = { osd_stats: {}, osd_diff: {} };
this.recheck_pgs_active = false;
this.etcd = new EtcdAdapter(this);
this.etcd.parse_config(this.config);
}
async start()
@@ -152,8 +147,8 @@ class Mon
this.etcd_watch_revision = BigInt(msg.header.revision)+BigInt(1);
for (const e of msg.events||[])
{
const kv = this.parse_kv(e.kv);
const key = kv.key.substr(this.config.etcd_prefix.length);
this.parse_kv(e.kv);
const key = e.kv.key.substr(this.config.etcd_prefix.length);
if (key.substr(0, 11) == '/osd/state/')
{
stats_changed = true;
@@ -173,7 +168,7 @@ class Mon
}
if (this.config.verbose)
{
console.log(JSON.stringify({ ...e, kv: kv || undefined }));
console.log(JSON.stringify(e));
}
}
if (pg_states_changed)
@@ -257,7 +252,7 @@ class Mon
get_mon_state()
{
return { ip: local_ips(), hostname: os.hostname() };
return { ip: this.local_ips(), hostname: os.hostname() };
}
async get_lease()
@@ -695,16 +690,15 @@ class Mon
{
if (!kv || !kv.key)
{
return kv;
return;
}
kv = { ...kv };
kv.key = de64(kv.key);
kv.value = kv.value ? de64(kv.value) : null;
let key = kv.key.substr(this.config.etcd_prefix.length+1);
if (!etcd_allow.exec(key))
{
console.log('Bad key in etcd: '+kv.key+' = '+kv.value);
return kv;
return;
}
try
{
@@ -713,7 +707,7 @@ class Mon
catch (e)
{
console.log('Bad value in etcd: '+kv.key+' = '+kv.value);
return kv;
return;
}
let key_parts = key.split('/');
let cur = this.state;
@@ -763,7 +757,6 @@ class Mon
!this.state.osd.stats[osd_num] ? 0 : this.state.osd.stats[osd_num].time+this.config.osd_out_time
);
}
return kv;
}
_die(err)
@@ -773,6 +766,33 @@ class Mon
this.on_stop().catch(console.error);
this.on_die();
}
local_ips(all)
{
const ips = [];
const ifaces = os.networkInterfaces();
for (const ifname in ifaces)
{
for (const iface of ifaces[ifname])
{
if (iface.family == 'IPv4' && !iface.internal || all)
{
ips.push(iface.address);
}
}
}
return ips;
}
}
function b64(str)
{
return Buffer.from(str).toString('base64');
}
function de64(str)
{
return Buffer.from(str, 'base64').toString();
}
function sha1hex(str)

View File

@@ -9,7 +9,6 @@
"author": "Vitaliy Filippov",
"license": "UNLICENSED",
"dependencies": {
"antietcd": "^1.0.1",
"sprintf-js": "^1.1.2",
"ws": "^7.2.5"
},

View File

@@ -1,37 +0,0 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
const os = require('os');
function local_ips(all)
{
const ips = [];
const ifaces = os.networkInterfaces();
for (const ifname in ifaces)
{
for (const iface of ifaces[ifname])
{
if (iface.family == 'IPv4' && !iface.internal || all)
{
ips.push(iface.address);
}
}
}
return ips;
}
function b64(str)
{
return Buffer.from(str).toString('base64');
}
function de64(str)
{
return Buffer.from(str, 'base64').toString();
}
module.exports = {
b64,
de64,
local_ips,
};

View File

@@ -1,48 +0,0 @@
// AntiEtcd persistence filter for Vitastor
// (c) Vitaliy Filippov, 2024
// License: Mozilla Public License 2.0 or Vitastor Network Public License 1.1
function vitastor_persist_filter(cfg)
{
const prefix = cfg.vitastor_prefix || '/vitastor';
return (key, value) =>
{
if (key.substr(0, prefix.length+'/osd/stats/'.length) == prefix+'/osd/stats/')
{
if (value)
{
try
{
value = JSON.parse(value);
value = JSON.stringify({
bitmap_granularity: value.bitmap_granularity || undefined,
data_block_size: value.data_block_size || undefined,
host: value.host || undefined,
immediate_commit: value.immediate_commit || undefined,
});
}
catch (e)
{
console.error('invalid JSON in '+key+' = '+value+': '+e);
value = {};
}
}
else
{
value = undefined;
}
return value;
}
else if (key.substr(0, prefix.length+'/osd/'.length) == prefix+'/osd/' ||
key.substr(0, prefix.length+'/inode/stats/'.length) == prefix+'/inode/stats/' ||
key.substr(0, prefix.length+'/pg/stats/'.length) == prefix+'/pg/stats/' ||
key.substr(0, prefix.length+'/pool/stats/'.length) == prefix+'/pool/stats/' ||
key == prefix+'/stats')
{
return undefined;
}
return value;
};
}
module.exports = vitastor_persist_filter;

View File

@@ -366,6 +366,7 @@ resume_0:
!flusher->flush_queue.size() || !flusher->dequeuing)
{
stop_flusher:
flusher->dequeuing = false;
if (flusher->trim_wanted > 0 && try_trim)
{
// Attempt forced trim
@@ -373,7 +374,6 @@ stop_flusher:
flusher->active_flushers++;
goto trim_journal;
}
flusher->dequeuing = false;
wait_state = 0;
return true;
}

View File

@@ -34,7 +34,7 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
{
// peer_osd just dropped connection
// determine WHICH dirty_buffers are now obsolete and repeat them
if (wb->repeat_ops_for(this, peer_osd) > 0)
if (wb->repeat_ops_for(this, peer_osd, 0, 0) > 0)
{
continue_ops();
}
@@ -52,7 +52,8 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
st_cli.tfd = tfd;
st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); };
st_cli.on_change_osd_state_hook = [this](uint64_t peer_osd) { on_change_osd_state_hook(peer_osd); };
st_cli.on_change_hook = [this](std::map<std::string, etcd_kv_t> & changes) { on_change_hook(changes); };
st_cli.on_change_pool_config_hook = [this]() { on_change_pool_config_hook(); };
st_cli.on_change_pg_state_hook = [this](pool_id_t pool_id, pg_num_t pg_num, osd_num_t prev_primary) { on_change_pg_state_hook(pool_id, pg_num, prev_primary); };
st_cli.on_load_pgs_hook = [this](bool success) { on_load_pgs_hook(success); };
st_cli.on_reload_hook = [this]() { st_cli.load_global_config(); };
@@ -77,11 +78,6 @@ cluster_client_t::~cluster_client_t()
cluster_op_t::~cluster_op_t()
{
if (buf)
{
free(buf);
buf = NULL;
}
if (bitmap_buf)
{
free(bitmap_buf);
@@ -427,7 +423,7 @@ void cluster_client_t::on_load_pgs_hook(bool success)
continue_ops();
}
void cluster_client_t::on_change_hook(std::map<std::string, etcd_kv_t> & changes)
void cluster_client_t::on_change_pool_config_hook()
{
for (auto pool_item: st_cli.pool_config)
{
@@ -450,6 +446,19 @@ void cluster_client_t::on_change_hook(std::map<std::string, etcd_kv_t> & changes
continue_ops();
}
void cluster_client_t::on_change_pg_state_hook(pool_id_t pool_id, pg_num_t pg_num, osd_num_t prev_primary)
{
auto & pg_cfg = st_cli.pool_config[pool_id].pg_config[pg_num];
if (pg_cfg.cur_primary != prev_primary)
{
// Repeat this PG operations because an OSD which stopped being primary may not fsync operations
if (wb->repeat_ops_for(this, 0, pool_id, pg_num) > 0)
{
continue_ops();
}
}
}
bool cluster_client_t::get_immediate_commit(uint64_t inode)
{
if (enable_writeback)
@@ -570,6 +579,14 @@ void cluster_client_t::execute_internal(cluster_op_t *op)
{
op->cur_inode = op->inode;
op->retval = 0;
op->state = 0;
op->retry_after = 0;
op->inflight_count = 0;
op->done_count = 0;
op->part_bitmaps = NULL;
op->bitmap_buf_size = 0;
op->prev_wait = 0;
assert(!op->prev && !op->next);
// check alignment, readonly flag and so on
if (!check_rw(op))
{
@@ -600,7 +617,9 @@ void cluster_client_t::execute_internal(cluster_op_t *op)
{
if (!(op->flags & OP_FLUSH_BUFFER) && !op->version /* no CAS write-repeat */)
{
wb->copy_write(op, CACHE_WRITTEN);
uint64_t flush_id = ++wb->last_flush_id;
wb->copy_write(op, CACHE_REPEATING, flush_id);
op->flush_id = flush_id;
}
if (dirty_bytes >= client_max_dirty_bytes || dirty_ops >= client_max_dirty_ops)
{
@@ -816,6 +835,10 @@ resume_2:
auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(op->inode));
op->retval = op->len / pool_cfg.bitmap_granularity;
}
if (op->flush_id)
{
wb->mark_flush_written(op->inode, op->offset, op->len, op->flush_id);
}
erase_op(op);
return 1;
}
@@ -988,6 +1011,29 @@ void cluster_client_t::slice_rw(cluster_op_t *op)
}
}
bool cluster_client_t::affects_pg(uint64_t inode, uint64_t offset, uint64_t len, pool_id_t pool_id, pg_num_t pg_num)
{
if (INODE_POOL(inode) != pool_id)
{
return false;
}
auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(inode));
uint32_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks);
uint64_t pg_block_size = pool_cfg.data_block_size * pg_data_size;
uint64_t first_stripe = (offset / pg_block_size) * pg_block_size;
uint64_t last_stripe = len > 0 ? ((offset + len - 1) / pg_block_size) * pg_block_size : first_stripe;
if ((last_stripe/pool_cfg.pg_stripe_size) - (first_stripe/pool_cfg.pg_stripe_size) + 1 >= pool_cfg.real_pg_count)
{
// All PGs are affected
return true;
}
pg_num_t first_pg_num = (first_stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg()
pg_num_t last_pg_num = (last_stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg()
return (first_pg_num <= last_pg_num
? (pg_num >= first_pg_num && pg_num <= last_pg_num)
: (pg_num >= first_pg_num || pg_num <= last_pg_num));
}
bool cluster_client_t::affects_osd(uint64_t inode, uint64_t offset, uint64_t len, osd_num_t osd)
{
auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(inode));
@@ -1210,7 +1256,9 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part)
// So do all these things after modifying operation state, otherwise we may hit reenterability bugs
// FIXME postpone such things to set_immediate here to avoid bugs
// Set op->retry_after to retry operation after a short pause (not immediately)
if (!op->retry_after)
if (!op->retry_after && (op->retval == -EPIPE ||
op->retval == -EIO && client_eio_retry_interval ||
op->retval == -ENOSPC && client_retry_enospc))
{
op->retry_after = op->retval != -EPIPE ? client_eio_retry_interval : client_retry_interval;
}

View File

@@ -56,8 +56,6 @@ struct cluster_op_t
protected:
int state = 0;
uint64_t cur_inode; // for snapshot reads
void *buf = NULL;
cluster_op_t *orig_op = NULL;
bool needs_reslice = false;
int retry_after = 0;
int inflight_count = 0, done_count = 0;
@@ -66,6 +64,7 @@ protected:
unsigned bitmap_buf_size = 0;
cluster_op_t *prev = NULL, *next = NULL;
int prev_wait = 0;
uint64_t flush_id = 0;
friend class cluster_client_t;
friend class writeback_cache_t;
};
@@ -81,6 +80,7 @@ class cluster_client_t
ring_loop_t *ringloop;
std::map<pool_id_t, uint64_t> pg_counts;
std::map<pool_pg_num_t, osd_num_t> pg_primary;
// client_max_dirty_* is actually "max unsynced", for the case when immediate_commit is off
uint64_t client_max_dirty_bytes = 0;
uint64_t client_max_dirty_ops = 0;
@@ -146,9 +146,11 @@ public:
protected:
bool affects_osd(uint64_t inode, uint64_t offset, uint64_t len, osd_num_t osd);
bool affects_pg(uint64_t inode, uint64_t offset, uint64_t len, pool_id_t pool_id, pg_num_t pg_num);
void on_load_config_hook(json11::Json::object & config);
void on_load_pgs_hook(bool success);
void on_change_hook(std::map<std::string, etcd_kv_t> & changes);
void on_change_pool_config_hook();
void on_change_pg_state_hook(pool_id_t pool_id, pg_num_t pg_num, osd_num_t prev_primary);
void on_change_osd_state_hook(uint64_t peer_osd);
void execute_internal(cluster_op_t *op);
void unshift_op(cluster_op_t *op);

View File

@@ -46,11 +46,12 @@ public:
bool is_left_merged(dirty_buf_it_t dirty_it);
bool is_right_merged(dirty_buf_it_t dirty_it);
bool is_merged(const dirty_buf_it_t & dirty_it);
void copy_write(cluster_op_t *op, int state);
int repeat_ops_for(cluster_client_t *cli, osd_num_t peer_osd);
void copy_write(cluster_op_t *op, int state, uint64_t new_flush_id = 0);
int repeat_ops_for(cluster_client_t *cli, osd_num_t peer_osd, pool_id_t pool_id, pg_num_t pg_num);
void start_writebacks(cluster_client_t *cli, int count);
bool read_from_cache(cluster_op_t *op, uint32_t bitmap_granularity);
void flush_buffers(cluster_client_t *cli, dirty_buf_it_t from_it, dirty_buf_it_t to_it);
void mark_flush_written(uint64_t inode, uint64_t offset, uint64_t len, uint64_t flush_id);
void fsync_start();
void fsync_error();
void fsync_ok();

View File

@@ -71,7 +71,7 @@ bool writeback_cache_t::is_merged(const dirty_buf_it_t & dirty_it)
return is_left_merged(dirty_it) || is_right_merged(dirty_it);
}
void writeback_cache_t::copy_write(cluster_op_t *op, int state)
void writeback_cache_t::copy_write(cluster_op_t *op, int state, uint64_t new_flush_id)
{
// Save operation for replay when one of PGs goes out of sync
// (primary OSD drops our connection in this case)
@@ -180,6 +180,7 @@ void writeback_cache_t::copy_write(cluster_op_t *op, int state)
.buf = buf,
.len = op->len,
.state = state,
.flush_id = new_flush_id,
.refcnt = refcnt,
});
if (state == CACHE_DIRTY)
@@ -208,7 +209,7 @@ void writeback_cache_t::copy_write(cluster_op_t *op, int state)
}
}
int writeback_cache_t::repeat_ops_for(cluster_client_t *cli, osd_num_t peer_osd)
int writeback_cache_t::repeat_ops_for(cluster_client_t *cli, osd_num_t peer_osd, pool_id_t pool_id, pg_num_t pg_num)
{
int repeated = 0;
if (dirty_buffers.size())
@@ -218,8 +219,11 @@ int writeback_cache_t::repeat_ops_for(cluster_client_t *cli, osd_num_t peer_osd)
for (auto wr_it = dirty_buffers.begin(), flush_it = wr_it, last_it = wr_it; ; )
{
bool end = wr_it == dirty_buffers.end();
bool flush_this = !end && wr_it->second.state != CACHE_REPEATING &&
cli->affects_osd(wr_it->first.inode, wr_it->first.stripe, wr_it->second.len, peer_osd);
bool flush_this = !end && wr_it->second.state != CACHE_REPEATING;
if (peer_osd)
flush_this = flush_this && cli->affects_osd(wr_it->first.inode, wr_it->first.stripe, wr_it->second.len, peer_osd);
if (pool_id && pg_num)
flush_this = flush_this && cli->affects_pg(wr_it->first.inode, wr_it->first.stripe, wr_it->second.len, pool_id, pg_num);
if (flush_it != wr_it && (end || !flush_this ||
wr_it->first.inode != flush_it->first.inode ||
wr_it->first.stripe != last_it->first.stripe+last_it->second.len))
@@ -265,7 +269,7 @@ void writeback_cache_t::flush_buffers(cluster_client_t *cli, dirty_buf_it_t from
writebacks_active++;
op->callback = [this, flush_id](cluster_op_t* op)
{
// Buffer flushes should be always retried, regardless of the error,
// Buffer flushes are always retried, regardless of the error,
// so they should never result in an error here
assert(op->retval == op->len);
for (auto fl_it = flushed_buffers.find(flush_id);
@@ -277,16 +281,7 @@ void writeback_cache_t::flush_buffers(cluster_client_t *cli, dirty_buf_it_t from
}
flushed_buffers.erase(fl_it++);
}
for (auto dirty_it = find_dirty(op->inode, op->offset);
dirty_it != dirty_buffers.end() && dirty_it->first.inode == op->inode &&
dirty_it->first.stripe < op->offset+op->len; dirty_it++)
{
if (dirty_it->second.flush_id == flush_id && dirty_it->second.state == CACHE_REPEATING)
{
dirty_it->second.flush_id = 0;
dirty_it->second.state = CACHE_WRITTEN;
}
}
mark_flush_written(op->inode, op->offset, op->len, flush_id);
delete op;
writebacks_active--;
// We can't call execute_internal because it affects an invalid copy of the list here
@@ -304,6 +299,20 @@ void writeback_cache_t::flush_buffers(cluster_client_t *cli, dirty_buf_it_t from
}
}
void writeback_cache_t::mark_flush_written(uint64_t inode, uint64_t offset, uint64_t len, uint64_t flush_id)
{
for (auto dirty_it = find_dirty(inode, offset);
dirty_it != dirty_buffers.end() && dirty_it->first.inode == inode &&
dirty_it->first.stripe < offset+len; dirty_it++)
{
if (dirty_it->second.flush_id == flush_id && dirty_it->second.state == CACHE_REPEATING)
{
dirty_it->second.flush_id = 0;
dirty_it->second.state = CACHE_WRITTEN;
}
}
}
void writeback_cache_t::start_writebacks(cluster_client_t *cli, int count)
{
if (!writeback_queue.size())

View File

@@ -890,6 +890,10 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
}
}
}
if (on_change_pool_config_hook)
{
on_change_pool_config_hook();
}
}
else if (key == etcd_prefix+"/config/pgs")
{
@@ -1028,13 +1032,19 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
else if (value.is_null())
{
auto & pg_cfg = this->pool_config[pool_id].pg_config[pg_num];
auto prev_primary = pg_cfg.cur_primary;
pg_cfg.state_exists = false;
pg_cfg.cur_primary = 0;
pg_cfg.cur_state = 0;
if (on_change_pg_state_hook)
{
on_change_pg_state_hook(pool_id, pg_num, prev_primary);
}
}
else
{
auto & pg_cfg = this->pool_config[pool_id].pg_config[pg_num];
auto prev_primary = pg_cfg.cur_primary;
pg_cfg.state_exists = true;
osd_num_t cur_primary = value["primary"].uint64_value();
int state = 0;
@@ -1065,6 +1075,10 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
}
pg_cfg.cur_primary = cur_primary;
pg_cfg.cur_state = state;
if (on_change_pg_state_hook)
{
on_change_pg_state_hook(pool_id, pg_num, prev_primary);
}
}
}
else if (key.substr(0, etcd_prefix.length()+11) == etcd_prefix+"/osd/state/")

View File

@@ -127,6 +127,8 @@ public:
std::function<void(json11::Json::object &)> on_load_config_hook;
std::function<json11::Json()> load_pgs_checks_hook;
std::function<void(bool)> on_load_pgs_hook;
std::function<void()> on_change_pool_config_hook;
std::function<void(pool_id_t, pg_num_t, osd_num_t)> on_change_pg_state_hook;
std::function<void(pool_id_t, pg_num_t)> on_change_pg_history_hook;
std::function<void(osd_num_t)> on_change_osd_state_hook;
std::function<void()> on_reload_hook;

View File

@@ -15,6 +15,106 @@
#include "msgr_rdma.h"
#endif
#include <sys/poll.h>
msgr_iothread_t::msgr_iothread_t():
ring(RINGLOOP_DEFAULT_SIZE),
thread(&msgr_iothread_t::run, this)
{
eventfd = ring.register_eventfd();
if (eventfd < 0)
{
throw std::runtime_error(std::string("failed to register eventfd: ") + strerror(-eventfd));
}
}
msgr_iothread_t::~msgr_iothread_t()
{
stop();
}
void msgr_iothread_t::add_sqe(io_uring_sqe & sqe)
{
mu.lock();
queue.push_back((iothread_sqe_t){ .sqe = sqe, .data = std::move(*(ring_data_t*)sqe.user_data) });
if (queue.size() == 1)
{
cond.notify_all();
}
mu.unlock();
}
void msgr_iothread_t::stop()
{
mu.lock();
if (stopped)
{
mu.unlock();
return;
}
stopped = true;
if (outer_loop_data)
{
outer_loop_data->callback = [](ring_data_t*){};
}
cond.notify_all();
close(eventfd);
mu.unlock();
thread.join();
}
void msgr_iothread_t::add_to_ringloop(ring_loop_t *outer_loop)
{
assert(!this->outer_loop || this->outer_loop == outer_loop);
io_uring_sqe *sqe = outer_loop->get_sqe();
assert(sqe != NULL);
this->outer_loop = outer_loop;
this->outer_loop_data = ((ring_data_t*)sqe->user_data);
my_uring_prep_poll_add(sqe, eventfd, POLLIN);
outer_loop_data->callback = [this](ring_data_t *data)
{
if (data->res < 0)
{
throw std::runtime_error(std::string("eventfd poll failed: ") + strerror(-data->res));
}
outer_loop_data = NULL;
if (stopped)
{
return;
}
add_to_ringloop(this->outer_loop);
ring.loop();
};
}
void msgr_iothread_t::run()
{
while (true)
{
{
std::unique_lock<std::mutex> lk(mu);
while (!stopped && !queue.size())
cond.wait(lk);
if (stopped)
return;
int i = 0;
for (; i < queue.size(); i++)
{
io_uring_sqe *sqe = ring.get_sqe();
if (!sqe)
break;
ring_data_t *data = ((ring_data_t*)sqe->user_data);
*data = std::move(queue[i].data);
*sqe = queue[i].sqe;
sqe->user_data = (uint64_t)data;
}
queue.erase(queue.begin(), queue.begin()+i);
}
// We only want to offload sendmsg/recvmsg. Callbacks will be called in main thread
ring.submit();
}
}
void osd_messenger_t::init()
{
#ifdef WITH_RDMA
@@ -43,6 +143,15 @@ void osd_messenger_t::init()
}
}
#endif
if (ringloop && iothread_count > 0)
{
for (int i = 0; i < iothread_count; i++)
{
auto iot = new msgr_iothread_t();
iothreads.push_back(iot);
iot->add_to_ringloop(ringloop);
}
}
keepalive_timer_id = tfd->set_timer(1000, true, [this](int)
{
auto cl_it = clients.begin();
@@ -129,6 +238,14 @@ osd_messenger_t::~osd_messenger_t()
{
stop_client(clients.begin()->first, true, true);
}
if (iothreads.size())
{
for (auto iot: iothreads)
{
delete iot;
}
iothreads.clear();
}
#ifdef WITH_RDMA
if (rdma_context)
{

View File

@@ -111,6 +111,44 @@ struct osd_op_stats_t
uint64_t subop_stat_count[OSD_OP_MAX+1] = { 0 };
};
#include <mutex>
#include <condition_variable>
#include <thread>
#ifdef __MOCK__
class msgr_iothread_t;
#else
struct iothread_sqe_t
{
io_uring_sqe sqe;
ring_data_t data;
};
class msgr_iothread_t
{
protected:
ring_loop_t ring;
ring_loop_t *outer_loop = NULL;
ring_data_t *outer_loop_data = NULL;
int eventfd = -1;
bool stopped = false;
std::mutex mu;
std::condition_variable cond;
std::vector<iothread_sqe_t> queue;
std::thread thread;
void run();
public:
msgr_iothread_t();
~msgr_iothread_t();
void add_sqe(io_uring_sqe & sqe);
void stop();
void add_to_ringloop(ring_loop_t *outer_loop);
};
#endif
struct osd_messenger_t
{
protected:
@@ -123,6 +161,7 @@ protected:
int osd_ping_timeout = 0;
int log_level = 0;
bool use_sync_send_recv = false;
int iothread_count = 4;
#ifdef WITH_RDMA
bool use_rdma = true;
@@ -134,6 +173,7 @@ protected:
bool rdma_odp = false;
#endif
std::vector<msgr_iothread_t*> iothreads;
std::vector<int> read_ready_clients;
std::vector<int> write_ready_clients;
// We don't use ringloop->set_immediate here because we may have no ringloop in client :)

View File

@@ -30,7 +30,11 @@ void osd_messenger_t::read_requests()
cl->refs++;
if (ringloop && !use_sync_send_recv)
{
io_uring_sqe* sqe = ringloop->get_sqe();
auto iothread = iothreads.size() ? iothreads[peer_fd % iothreads.size()] : NULL;
io_uring_sqe sqe_local;
ring_data_t data_local;
sqe_local.user_data = (uint64_t)&data_local;
io_uring_sqe* sqe = (iothread ? &sqe_local : ringloop->get_sqe());
if (!sqe)
{
cl->read_msg.msg_iovlen = 0;
@@ -40,6 +44,10 @@ void osd_messenger_t::read_requests()
ring_data_t* data = ((ring_data_t*)sqe->user_data);
data->callback = [this, cl](ring_data_t *data) { handle_read(data->res, cl); };
my_uring_prep_recvmsg(sqe, peer_fd, &cl->read_msg, 0);
if (iothread)
{
iothread->add_sqe(sqe_local);
}
}
else
{

View File

@@ -189,7 +189,11 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
}
if (ringloop && !use_sync_send_recv)
{
io_uring_sqe* sqe = ringloop->get_sqe();
auto iothread = iothreads.size() ? iothreads[peer_fd % iothreads.size()] : NULL;
io_uring_sqe sqe_local;
ring_data_t data_local;
sqe_local.user_data = (uint64_t)&data_local;
io_uring_sqe* sqe = (iothread ? &sqe_local : ringloop->get_sqe());
if (!sqe)
{
return false;
@@ -200,6 +204,10 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
ring_data_t* data = ((ring_data_t*)sqe->user_data);
data->callback = [this, cl](ring_data_t *data) { handle_send(data->res, cl); };
my_uring_prep_sendmsg(sqe, peer_fd, &cl->write_msg, 0);
if (iothread)
{
iothread->add_sqe(sqe_local);
}
}
else
{

View File

@@ -12,6 +12,7 @@ add_library(vitastor_cli STATIC
cli_ls.cpp
cli_create.cpp
cli_modify.cpp
cli_osd_tree.cpp
cli_flatten.cpp
cli_merge.cpp
cli_rm_data.cpp

View File

@@ -118,6 +118,12 @@ static const char* help_text =
" With --dry-run only checks if deletion is possible without data loss and\n"
" redundancy degradation.\n"
"\n"
"vitastor-cli osd-tree\n"
" Show current OSD tree.\n"
"\n"
"vitastor-cli osds|ls-osd|osd-ls\n"
" Show current OSDs as list.\n"
"\n"
"vitastor-cli create-pool|pool-create <name> (-s <pg_size>|--ec <N>+<K>) -n <pg_count> [OPTIONS]\n"
" Create a pool. Required parameters:\n"
" -s|--pg_size R Number of replicas for replicated pools\n"
@@ -389,6 +395,17 @@ static int run(cli_tool_t *p, json11::Json::object cfg)
// Allocate a new OSD number
action_cb = p->start_alloc_osd(cfg);
}
else if (cmd[0] == "osd-tree")
{
// Print OSD tree
action_cb = p->start_osd_tree(cfg);
}
else if (cmd[0] == "osds" || cmd[0] == "ls-osds" || cmd[0] == "ls-osd" || cmd[0] == "osd-ls")
{
// Print OSD list
cfg["flat"] = true;
action_cb = p->start_osd_tree(cfg);
}
else if (cmd[0] == "create-pool" || cmd[0] == "pool-create")
{
// Create a new pool

View File

@@ -7,6 +7,7 @@
#include "json11/json11.hpp"
#include "object_id.h"
#include "osd_id.h"
#include "ringloop.h"
#include <functional>
@@ -56,27 +57,31 @@ public:
friend struct snap_flattener_t;
friend struct snap_remover_t;
std::function<bool(cli_result_t &)> start_status(json11::Json);
std::function<bool(cli_result_t &)> start_alloc_osd(json11::Json);
std::function<bool(cli_result_t &)> start_create(json11::Json);
std::function<bool(cli_result_t &)> start_describe(json11::Json);
std::function<bool(cli_result_t &)> start_fix(json11::Json);
std::function<bool(cli_result_t &)> start_ls(json11::Json);
std::function<bool(cli_result_t &)> start_create(json11::Json);
std::function<bool(cli_result_t &)> start_modify(json11::Json);
std::function<bool(cli_result_t &)> start_rm_data(json11::Json);
std::function<bool(cli_result_t &)> start_merge(json11::Json);
std::function<bool(cli_result_t &)> start_flatten(json11::Json);
std::function<bool(cli_result_t &)> start_rm(json11::Json);
std::function<bool(cli_result_t &)> start_rm_osd(json11::Json cfg);
std::function<bool(cli_result_t &)> start_alloc_osd(json11::Json cfg);
std::function<bool(cli_result_t &)> start_ls(json11::Json);
std::function<bool(cli_result_t &)> start_merge(json11::Json);
std::function<bool(cli_result_t &)> start_modify(json11::Json);
std::function<bool(cli_result_t &)> start_osd_tree(json11::Json);
std::function<bool(cli_result_t &)> start_pool_create(json11::Json);
std::function<bool(cli_result_t &)> start_pool_modify(json11::Json);
std::function<bool(cli_result_t &)> start_pool_rm(json11::Json);
std::function<bool(cli_result_t &)> start_pool_ls(json11::Json);
std::function<bool(cli_result_t &)> start_rm(json11::Json);
std::function<bool(cli_result_t &)> start_rm_data(json11::Json);
std::function<bool(cli_result_t &)> start_rm_osd(json11::Json);
std::function<bool(cli_result_t &)> start_status(json11::Json);
// Should be called like loop_and_wait(start_status(), <completion callback>)
void loop_and_wait(std::function<bool(cli_result_t &)> loop_cb, std::function<void(const cli_result_t &)> complete_cb);
void etcd_txn(json11::Json txn);
void iterate_kvs_1(json11::Json kvs, const std::string & prefix, std::function<void(uint64_t num, json11::Json)> cb);
void iterate_kvs_2(json11::Json kvs, const std::string & prefix, std::function<void(pool_id_t pool_id, uint64_t num, json11::Json)> cb);
};
std::string print_table(json11::Json items, json11::Json header, bool use_esc);

View File

@@ -72,19 +72,10 @@ struct alloc_osd_t
if (!parent->etcd_result["succeeded"].bool_value())
{
std::vector<osd_num_t> used;
for (auto kv: parent->etcd_result["responses"][0]["response_range"]["kvs"].array_items())
parent->iterate_kvs_1(parent->etcd_result["responses"][0]["response_range"]["kvs"], "/osd/stats/", [&](uint64_t cur_osd, json11::Json value)
{
std::string key = base64_decode(kv["key"].string_value());
osd_num_t cur_osd;
char null_byte = 0;
int scanned = sscanf(key.c_str() + parent->cli->st_cli.etcd_prefix.length(), "/osd/stats/%ju%c", &cur_osd, &null_byte);
if (scanned != 1 || !cur_osd)
{
fprintf(stderr, "Invalid key in etcd: %s\n", key.c_str());
continue;
}
used.push_back(cur_osd);
}
});
std::sort(used.begin(), used.end());
if (used[used.size()-1] == used.size())
{

View File

@@ -165,3 +165,43 @@ void cli_tool_t::loop_and_wait(std::function<bool(cli_result_t &)> loop_cb, std:
ringloop->wakeup();
});
}
void cli_tool_t::iterate_kvs_1(json11::Json kvs, const std::string & prefix, std::function<void(uint64_t, json11::Json)> cb)
{
bool is_pool = prefix == "/pool/stats/";
for (auto & kv_item: kvs.array_items())
{
auto kv = cli->st_cli.parse_etcd_kv(kv_item);
uint64_t num = 0;
char null_byte = 0;
// OSD or pool number
int scanned = sscanf(kv.key.substr(cli->st_cli.etcd_prefix.size() + prefix.size()).c_str(), "%ju%c", &num, &null_byte);
if (scanned != 1 || !num || is_pool && num >= POOL_ID_MAX)
{
fprintf(stderr, "Invalid key in etcd: %s\n", kv.key.c_str());
continue;
}
cb(num, kv.value);
}
}
void cli_tool_t::iterate_kvs_2(json11::Json kvs, const std::string & prefix, std::function<void(pool_id_t pool_id, uint64_t num, json11::Json)> cb)
{
bool is_inode = prefix == "/config/inode/" || prefix == "/inode/stats/";
for (auto & kv_item: kvs.array_items())
{
auto kv = cli->st_cli.parse_etcd_kv(kv_item);
pool_id_t pool_id = 0;
uint64_t num = 0;
char null_byte = 0;
// pool+pg or pool+inode
int scanned = sscanf(kv.key.substr(cli->st_cli.etcd_prefix.size() + prefix.size()).c_str(),
"%u/%ju%c", &pool_id, &num, &null_byte);
if (scanned != 2 || !pool_id || is_inode && INODE_POOL(num) || !is_inode && num >= UINT32_MAX)
{
fprintf(stderr, "Invalid key in etcd: %s\n", kv.key.c_str());
continue;
}
cb(pool_id, num, kv.value);
}
}

View File

@@ -479,10 +479,14 @@ struct snap_merger_t
{
if (op->retval != op->len)
{
rwo->error_code = -op->retval;
rwo->error_code = op->retval;
rwo->error_offset = op->offset;
rwo->error_read = true;
}
else
{
rwo->error_code = 0;
}
continue_rwo.push_back(rwo);
parent->ringloop->wakeup();
};
@@ -553,12 +557,15 @@ struct snap_merger_t
if (use_cas && subop->retval == -EINTR)
{
// CAS failure - reread and repeat optimistically
assert(rwo->todo == 1); // initial refcount from read_and_write
rwo->error_code = -EINTR;
rwo->start = rwo->end = 0;
rwo->op.version = 0;
rwo_read(rwo);
delete subop;
return;
}
rwo->error_code = -subop->retval;
rwo->error_code = subop->retval;
rwo->error_offset = subop->offset;
rwo->error_read = false;
}
@@ -633,7 +640,7 @@ struct snap_merger_t
{
char buf[1024];
snprintf(buf, 1024, "Error %s target at offset %jx: %s",
rwo->error_read ? "reading" : "writing", rwo->error_offset, strerror(rwo->error_code));
rwo->error_read ? "reading" : "writing", rwo->error_offset, strerror(-rwo->error_code));
rwo_error = std::string(buf);
}
delete rwo;

377
src/cmd/cli_osd_tree.cpp Normal file
View File

@@ -0,0 +1,377 @@
// Copyright (c) Vitaliy Filippov, 2024
// License: VNPL-1.1 (see README.md for details)
#include <ctype.h>
#include "cli.h"
#include "cluster_client.h"
#include "epoll_manager.h"
#include "pg_states.h"
#include "str_util.h"
struct placement_osd_t
{
osd_num_t num;
std::string parent;
std::vector<std::string> tags;
uint64_t size;
uint64_t free;
bool up;
double reweight;
uint32_t block_size, bitmap_granularity, immediate_commit;
};
struct placement_node_t
{
std::string name;
std::string parent;
std::string level;
std::vector<std::string> child_nodes;
std::vector<osd_num_t> child_osds;
};
struct placement_tree_t
{
std::map<std::string, placement_node_t> nodes;
std::map<osd_num_t, placement_osd_t> osds;
};
struct osd_tree_printer_t
{
cli_tool_t *parent;
json11::Json cfg;
bool flat = false;
bool show_stats = false;
int state = 0;
cli_result_t result;
json11::Json node_placement;
std::map<uint64_t, json11::Json> osd_config;
std::map<uint64_t, json11::Json> osd_stats;
std::shared_ptr<placement_tree_t> placement_tree;
bool is_done() { return state == 100; }
void load_osd_tree()
{
if (state == 1)
goto resume_1;
parent->etcd_txn(json11::Json::object {
{ "success", json11::Json::array {
json11::Json::object {
{ "request_range", json11::Json::object {
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/config/node_placement") },
} },
},
json11::Json::object {
{ "request_range", json11::Json::object {
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/config/osd/") },
{ "range_end", base64_encode(parent->cli->st_cli.etcd_prefix+"/config/osd0") },
} },
},
json11::Json::object {
{ "request_range", json11::Json::object {
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/osd/stats/") },
{ "range_end", base64_encode(parent->cli->st_cli.etcd_prefix+"/osd/stats0") },
} },
},
} },
});
state = 1;
resume_1:
if (parent->waiting > 0)
return;
if (parent->etcd_err.err)
{
result = parent->etcd_err;
state = 100;
return;
}
for (auto & item: parent->etcd_result["responses"][0]["response_range"]["kvs"].array_items())
{
node_placement = parent->cli->st_cli.parse_etcd_kv(item).value;
}
parent->iterate_kvs_1(parent->etcd_result["responses"][1]["response_range"]["kvs"], "/config/osd/", [&](uint64_t cur_osd, json11::Json value)
{
osd_config[cur_osd] = value;
});
parent->iterate_kvs_1(parent->etcd_result["responses"][2]["response_range"]["kvs"], "/osd/stats/", [&](uint64_t cur_osd, json11::Json value)
{
osd_stats[cur_osd] = value;
});
placement_tree = make_osd_tree(node_placement, osd_config, osd_stats);
}
std::shared_ptr<placement_tree_t> make_osd_tree(json11::Json node_placement_json,
std::map<uint64_t, json11::Json> osd_config, std::map<uint64_t, json11::Json> osd_stats)
{
auto node_placement = node_placement_json.object_items();
auto tree = std::make_shared<placement_tree_t>();
tree->nodes[""] = (placement_node_t){};
// Add non-OSD items
for (auto & kv: node_placement)
{
auto osd_num = stoull_full(kv.first);
if (!osd_num)
{
auto level = kv.second["level"].string_value();
tree->nodes[kv.first] = (placement_node_t){
.name = kv.first,
.parent = kv.second["parent"].string_value(),
.level = level == "" ? "unknown" : level,
};
}
}
// Add OSDs
for (auto & kv: osd_stats)
{
auto & osd = tree->osds[kv.first] = (placement_osd_t){
.num = kv.first,
.parent = kv.second["host"].string_value(),
.size = kv.second["size"].uint64_value(),
.free = kv.second["free"].uint64_value(),
.up = parent->cli->st_cli.peer_states.find(kv.first) != parent->cli->st_cli.peer_states.end(),
.reweight = 1,
.block_size = (uint32_t)kv.second["data_block_size"].uint64_value(),
.bitmap_granularity = (uint32_t)kv.second["bitmap_granularity"].uint64_value(),
.immediate_commit = etcd_state_client_t::parse_immediate_commit(kv.second["immediate_commit"].string_value()),
};
if (tree->nodes.find(osd.parent) == tree->nodes.end())
{
// Autocreate all hosts
tree->nodes[osd.parent] = (placement_node_t){
.name = osd.parent,
.level = "host",
};
}
auto cfg_it = osd_config.find(osd.num);
if (cfg_it != osd_config.end())
{
auto & osd_cfg = cfg_it->second;
osd.reweight = osd_cfg["reweight"].is_number() ? osd_cfg["reweight"].number_value() : 1;
if (osd_cfg["tags"].is_array())
{
for (auto & jtag: osd_cfg["tags"].array_items())
osd.tags.push_back(jtag.string_value());
}
}
auto np_it = node_placement.find(std::to_string(osd.num));
if (np_it != node_placement.end())
{
osd.parent = np_it->second["parent"].string_value();
}
tree->nodes[osd.parent].child_osds.push_back(osd.num);
}
// Fill child_nodes
for (auto & ip: tree->nodes)
{
if (tree->nodes.find(ip.second.parent) == tree->nodes.end())
{
ip.second.parent = "";
}
if (ip.first != "")
{
tree->nodes[ip.second.parent].child_nodes.push_back(ip.first);
}
}
// FIXME: Maybe filter out loops here
return tree;
}
std::string format_tree()
{
std::vector<std::string> node_seq = { "" };
std::vector<int> indents = { -1 };
std::map<std::string, bool> seen;
for (int i = 0; i < node_seq.size(); i++)
{
if (seen[node_seq[i]])
{
continue;
}
seen[node_seq[i]] = true;
auto & child_nodes = placement_tree->nodes.at(node_seq[i]).child_nodes;
if (child_nodes.size())
{
node_seq.insert(node_seq.begin()+i+1, child_nodes.begin(), child_nodes.end());
indents.insert(indents.begin()+i+1, child_nodes.size(), indents[i]+1);
}
}
json11::Json::array fmt_items;
for (int i = 1; i < node_seq.size(); i++)
{
auto & node = placement_tree->nodes.at(node_seq[i]);
if (!flat)
{
fmt_items.push_back(json11::Json::object{
{ "type", str_repeat(" ", indents[i]) + node.level },
{ "name", node.name },
});
}
std::string parent = node.name;
if (flat)
{
auto cur = &placement_tree->nodes.at(node.name);
while (cur->parent != "" && cur->parent != node.name)
{
parent = cur->parent+"/"+parent;
cur = &placement_tree->nodes.at(cur->parent);
}
}
for (uint64_t osd_num: node.child_osds)
{
auto & osd = placement_tree->osds.at(osd_num);
auto fmt = json11::Json::object{
{ "type", (flat ? "osd" : str_repeat(" ", indents[i]+1) + "osd") },
{ "name", osd.num },
{ "parent", parent },
{ "up", osd.up ? "up" : "down" },
{ "size", format_size(osd.size, false, true) },
{ "used", format_q(100.0*(osd.size - osd.free)/osd.size)+" %" },
{ "reweight", format_q(osd.reweight) },
{ "tags", implode(",", osd.tags) },
{ "block", format_size(osd.block_size, false, true) },
{ "bitmap", format_size(osd.bitmap_granularity, false, true) },
{ "commit", osd.immediate_commit == IMMEDIATE_NONE ? "none" : (osd.immediate_commit == IMMEDIATE_ALL ? "all" : "small") },
};
if (show_stats)
{
auto op_stat = osd_stats[osd_num]["op_stats"];
fmt["read_bw"] = format_size(op_stat["primary_read"]["bps"].uint64_value())+"/s";
fmt["write_bw"] = format_size(op_stat["primary_write"]["bps"].uint64_value())+"/s";
fmt["delete_bw"] = format_size(op_stat["primary_delete"]["bps"].uint64_value())+"/s";
fmt["read_iops"] = format_q(op_stat["primary_read"]["iops"].uint64_value());
fmt["write_iops"] = format_q(op_stat["primary_write"]["iops"].uint64_value());
fmt["delete_iops"] = format_q(op_stat["primary_delete"]["iops"].uint64_value());
fmt["read_lat"] = format_lat(op_stat["primary_read"]["lat"].uint64_value());
fmt["write_lat"] = format_lat(op_stat["primary_write"]["lat"].uint64_value());
fmt["delete_lat"] = format_lat(op_stat["primary_delete"]["lat"].uint64_value());
}
fmt_items.push_back(std::move(fmt));
}
}
json11::Json::array cols;
if (!flat)
{
cols.push_back(json11::Json::object{
{ "key", "type" },
{ "title", "TYPE" },
});
}
cols.push_back(json11::Json::object{
{ "key", "name" },
{ "title", flat ? "OSD" : "NAME" },
});
if (flat)
{
cols.push_back(json11::Json::object{
{ "key", "parent" },
{ "title", "PARENT" },
});
}
cols.push_back(json11::Json::object{
{ "key", "up" },
{ "title", "UP" },
});
cols.push_back(json11::Json::object{
{ "key", "size" },
{ "title", "SIZE" },
});
cols.push_back(json11::Json::object{
{ "key", "used" },
{ "title", "USED%" },
});
cols.push_back(json11::Json::object{
{ "key", "tags" },
{ "title", "TAGS" },
});
cols.push_back(json11::Json::object{
{ "key", "reweight" },
{ "title", "WEIGHT" },
});
cols.push_back(json11::Json::object{
{ "key", "block" },
{ "title", "BLOCK" },
});
cols.push_back(json11::Json::object{
{ "key", "bitmap" },
{ "title", "BITMAP" },
});
cols.push_back(json11::Json::object{
{ "key", "commit" },
{ "title", "IMM" },
});
if (show_stats)
{
cols.push_back(json11::Json::object{
{ "key", "read_bw" },
{ "title", "READ" },
});
cols.push_back(json11::Json::object{
{ "key", "read_iops" },
{ "title", "IOPS" },
});
cols.push_back(json11::Json::object{
{ "key", "read_lat" },
{ "title", "LAT" },
});
cols.push_back(json11::Json::object{
{ "key", "write_bw" },
{ "title", "WRITE" },
});
cols.push_back(json11::Json::object{
{ "key", "write_iops" },
{ "title", "IOPS" },
});
cols.push_back(json11::Json::object{
{ "key", "write_lat" },
{ "title", "LAT" },
});
cols.push_back(json11::Json::object{
{ "key", "delete_bw" },
{ "title", "DEL" },
});
cols.push_back(json11::Json::object{
{ "key", "delete_iops" },
{ "title", "IOPS" },
});
cols.push_back(json11::Json::object{
{ "key", "delete_lat" },
{ "title", "LAT" },
});
}
return print_table(fmt_items, cols, parent->color);
}
void loop()
{
if (state == 1)
goto resume_1;
resume_1:
load_osd_tree();
if (parent->waiting > 0)
return;
result.text = format_tree();
state = 100;
}
};
std::function<bool(cli_result_t &)> cli_tool_t::start_osd_tree(json11::Json cfg)
{
auto osd_tree_printer = new osd_tree_printer_t();
osd_tree_printer->parent = this;
osd_tree_printer->cfg = cfg;
osd_tree_printer->flat = cfg["flat"].bool_value();
osd_tree_printer->show_stats = cfg["long"].bool_value();
return [osd_tree_printer](cli_result_t & result)
{
osd_tree_printer->loop();
if (osd_tree_printer->is_done())
{
result = osd_tree_printer->result;
delete osd_tree_printer;
return true;
}
return false;
};
}

View File

@@ -104,37 +104,16 @@ resume_1:
{
config_pools = parent->cli->st_cli.parse_etcd_kv(config_pools).value;
}
for (auto & kv_item: space_info["responses"][0]["response_range"]["kvs"].array_items())
parent->iterate_kvs_1(space_info["responses"][0]["response_range"]["kvs"], "/pool/stats/", [&](uint64_t pool_id, json11::Json value)
{
auto kv = parent->cli->st_cli.parse_etcd_kv(kv_item);
// pool ID
pool_id_t pool_id;
char null_byte = 0;
int scanned = sscanf(kv.key.substr(parent->cli->st_cli.etcd_prefix.length()).c_str(), "/pool/stats/%u%c", &pool_id, &null_byte);
if (scanned != 1 || !pool_id || pool_id >= POOL_ID_MAX)
{
fprintf(stderr, "Invalid key in etcd: %s\n", kv.key.c_str());
continue;
}
// pool/stats/<N>
pool_stats[pool_id] = kv.value.object_items();
}
pool_stats[pool_id] = value.object_items();
});
std::map<pool_id_t, uint64_t> osd_free;
for (auto & kv_item: space_info["responses"][1]["response_range"]["kvs"].array_items())
parent->iterate_kvs_1(space_info["responses"][1]["response_range"]["kvs"], "/osd/stats/", [&](uint64_t osd_num, json11::Json value)
{
auto kv = parent->cli->st_cli.parse_etcd_kv(kv_item);
// osd ID
osd_num_t osd_num;
char null_byte = 0;
int scanned = sscanf(kv.key.substr(parent->cli->st_cli.etcd_prefix.length()).c_str(), "/osd/stats/%ju%c", &osd_num, &null_byte);
if (scanned != 1 || !osd_num || osd_num >= POOL_ID_MAX)
{
fprintf(stderr, "Invalid key in etcd: %s\n", kv.key.c_str());
continue;
}
// osd/stats/<N>::free
osd_free[osd_num] = kv.value["free"].uint64_value();
}
osd_free[osd_num] = value["free"].uint64_value();
});
// Calculate max_avail for each pool
for (auto & pp: parent->cli->st_cli.pool_config)
{
@@ -254,29 +233,17 @@ resume_1:
state = 100;
return;
}
auto pg_stats = parent->etcd_result["responses"][0]["response_range"]["kvs"];
// Calculate recovery percent
std::map<pool_id_t, object_counts_t> counts;
for (auto & kv_item: pg_stats.array_items())
parent->iterate_kvs_2(parent->etcd_result["responses"][0]["response_range"]["kvs"], "/pg/stats/",
[&](pool_id_t pool_id, uint64_t pg_num, json11::Json value)
{
auto kv = parent->cli->st_cli.parse_etcd_kv(kv_item);
// pool ID & pg number
pool_id_t pool_id;
pg_num_t pg_num = 0;
char null_byte = 0;
int scanned = sscanf(kv.key.substr(parent->cli->st_cli.etcd_prefix.length()).c_str(),
"/pg/stats/%u/%u%c", &pool_id, &pg_num, &null_byte);
if (scanned != 2 || !pool_id || pool_id >= POOL_ID_MAX)
{
fprintf(stderr, "Invalid key in etcd: %s\n", kv.key.c_str());
continue;
}
auto & cnt = counts[pool_id];
cnt.object_count += kv.value["object_count"].uint64_value();
cnt.misplaced_count += kv.value["misplaced_count"].uint64_value();
cnt.degraded_count += kv.value["degraded_count"].uint64_value();
cnt.incomplete_count += kv.value["incomplete_count"].uint64_value();
}
cnt.object_count += value["object_count"].uint64_value();
cnt.misplaced_count += value["misplaced_count"].uint64_value();
cnt.degraded_count += value["degraded_count"].uint64_value();
cnt.incomplete_count += value["incomplete_count"].uint64_value();
});
for (auto & pp: pool_stats)
{
auto & cnt = counts[pp.first];
@@ -317,35 +284,23 @@ resume_1:
state = 100;
return;
}
auto inode_stats = parent->etcd_result["responses"][0]["response_range"]["kvs"];
// Performance statistics
std::map<pool_id_t, io_stats_t> pool_io;
for (auto & kv_item: inode_stats.array_items())
parent->iterate_kvs_2(parent->etcd_result["responses"][0]["response_range"]["kvs"], "/inode/stats/",
[&](pool_id_t pool_id, uint64_t inode_num, json11::Json value)
{
auto kv = parent->cli->st_cli.parse_etcd_kv(kv_item);
// pool ID & inode number
pool_id_t pool_id;
inode_t only_inode_num;
char null_byte = 0;
int scanned = sscanf(kv.key.substr(parent->cli->st_cli.etcd_prefix.length()).c_str(),
"/inode/stats/%u/%ju%c", &pool_id, &only_inode_num, &null_byte);
if (scanned != 2 || !pool_id || pool_id >= POOL_ID_MAX || INODE_POOL(only_inode_num) != 0)
{
fprintf(stderr, "Invalid key in etcd: %s\n", kv.key.c_str());
continue;
}
auto & io = pool_io[pool_id];
io.read_iops += kv.value["read"]["iops"].uint64_value();
io.read_bps += kv.value["read"]["bps"].uint64_value();
io.read_lat += kv.value["read"]["lat"].uint64_value();
io.write_iops += kv.value["write"]["iops"].uint64_value();
io.write_bps += kv.value["write"]["bps"].uint64_value();
io.write_lat += kv.value["write"]["lat"].uint64_value();
io.delete_iops += kv.value["delete"]["iops"].uint64_value();
io.delete_bps += kv.value["delete"]["bps"].uint64_value();
io.delete_lat += kv.value["delete"]["lat"].uint64_value();
io.read_iops += value["read"]["iops"].uint64_value();
io.read_bps += value["read"]["bps"].uint64_value();
io.read_lat += value["read"]["lat"].uint64_value();
io.write_iops += value["write"]["iops"].uint64_value();
io.write_bps += value["write"]["bps"].uint64_value();
io.write_lat += value["write"]["lat"].uint64_value();
io.delete_iops += value["delete"]["iops"].uint64_value();
io.delete_bps += value["delete"]["bps"].uint64_value();
io.delete_lat += value["delete"]["lat"].uint64_value();
io.count++;
}
});
for (auto & pp: pool_stats)
{
auto & io = pool_io[pp.first];

View File

@@ -18,7 +18,7 @@ struct status_printer_t
cli_tool_t *parent;
int state = 0;
json11::Json::array mon_members, osd_stats;
json11::Json::array mon_members;
json11::Json agg_stats;
std::map<pool_id_t, json11::Json::object> pool_stats;
json11::Json::array etcd_states;
@@ -93,7 +93,7 @@ resume_2:
return;
}
mon_members = parent->etcd_result["responses"][0]["response_range"]["kvs"].array_items();
osd_stats = parent->etcd_result["responses"][1]["response_range"]["kvs"].array_items();
auto osd_stats = parent->etcd_result["responses"][1]["response_range"]["kvs"];
if (parent->etcd_result["responses"][2]["response_range"]["kvs"].array_items().size() > 0)
{
agg_stats = parent->cli->st_cli.parse_etcd_kv(parent->etcd_result["responses"][2]["response_range"]["kvs"][0]).value;
@@ -133,20 +133,11 @@ resume_2:
}
int osd_count = 0, osd_up = 0;
uint64_t total_raw = 0, free_raw = 0, free_down_raw = 0, down_raw = 0;
for (int i = 0; i < osd_stats.size(); i++)
parent->iterate_kvs_1(osd_stats, "/osd/stats", [&](uint64_t stat_osd_num, json11::Json value)
{
auto kv = parent->cli->st_cli.parse_etcd_kv(osd_stats[i]);
osd_num_t stat_osd_num = 0;
char null_byte = 0;
int scanned = sscanf(kv.key.c_str() + parent->cli->st_cli.etcd_prefix.size(), "/osd/stats/%ju%c", &stat_osd_num, &null_byte);
if (scanned != 1 || !stat_osd_num)
{
fprintf(stderr, "Invalid key in etcd: %s\n", kv.key.c_str());
continue;
}
osd_count++;
auto osd_size = kv.value["size"].uint64_value();
auto osd_free = kv.value["free"].uint64_value();
auto osd_size = value["size"].uint64_value();
auto osd_free = value["free"].uint64_value();
total_raw += osd_size;
free_raw += osd_free;
if (!osd_free)
@@ -164,10 +155,10 @@ resume_2:
}
else
{
down_raw += kv.value["size"].uint64_value();
free_down_raw += kv.value["free"].uint64_value();
down_raw += value["size"].uint64_value();
free_down_raw += value["free"].uint64_value();
}
}
});
int pool_count = 0, pools_active = 0;
std::map<std::string, int> pgs_by_state;
std::string pgs_by_state_str;

View File

@@ -189,6 +189,12 @@ void nfs_proxy_t::run(json11::Json cfg)
cmd->epmgr = epmgr;
cmd->cli = cli;
watch_stats();
// Init Pseudo-FS before starting client because it depends on inode_change_hook
if (fsname == "")
{
blockfs = new block_fs_state_t();
blockfs->init(this, cfg);
}
// Load image metadata
while (!cli->is_ready())
{
@@ -199,13 +205,8 @@ void nfs_proxy_t::run(json11::Json cfg)
}
// Check default pool
check_default_pool();
// Check if we're using VitastorFS
if (fsname == "")
{
blockfs = new block_fs_state_t();
blockfs->init(this, cfg);
}
else
// Init VitastorFS after starting client because it depends on loaded inode configuration
if (fsname != "")
{
kvfs = new kv_fs_state_t();
kvfs->init(this, cfg);

View File

@@ -199,12 +199,14 @@ class osd_t
ring_consumer_t consumer;
// op statistics
osd_op_stats_t prev_stats;
osd_op_stats_t prev_stats, prev_report_stats;
timespec report_stats_ts;
std::map<uint64_t, inode_stats_t> inode_stats;
std::map<uint64_t, timespec> vanishing_inodes;
const char* recovery_stat_names[2] = { "degraded", "misplaced" };
recovery_stat_t recovery_stat[2];
recovery_stat_t recovery_print_prev[2];
recovery_stat_t recovery_report_prev[2];
// recovery auto-tuning
int rtune_timer_id = -1;
@@ -252,6 +254,7 @@ class osd_t
bool check_peer_config(osd_client_t *cl, json11::Json conf);
void repeer_pgs(osd_num_t osd_num);
void start_pg_peering(pg_t & pg);
void drop_dirty_pg_connections(pool_pg_num_t pg);
void submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps);
void discard_list_subop(osd_op_t *list_op);
bool stop_pg(pg_t & pg);

View File

@@ -180,6 +180,12 @@ json11::Json osd_t::get_statistics()
json11::Json::object st;
timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
uint64_t ts_diff = 0;
if (report_stats_ts.tv_sec != 0)
ts_diff = (ts.tv_sec - report_stats_ts.tv_sec + (ts.tv_nsec - report_stats_ts.tv_nsec) / 1000000000);
if (!ts_diff)
ts_diff = 1;
report_stats_ts = ts;
char time_str[50] = { 0 };
sprintf(time_str, "%jd.%03ld", (uint64_t)ts.tv_sec, ts.tv_nsec/1000000);
st["time"] = time_str;
@@ -196,33 +202,50 @@ json11::Json osd_t::get_statistics()
json11::Json::object op_stats, subop_stats;
for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++)
{
auto n = (msgr.stats.op_stat_count[i] - prev_report_stats.op_stat_count[i]);
op_stats[osd_op_names[i]] = json11::Json::object {
{ "count", msgr.stats.op_stat_count[i] },
{ "usec", msgr.stats.op_stat_sum[i] },
{ "bytes", msgr.stats.op_stat_bytes[i] },
{ "lat", (msgr.stats.op_stat_sum[i] - prev_report_stats.op_stat_sum[i]) / (n < 1 ? 1 : n) },
{ "bps", (msgr.stats.op_stat_bytes[i] - prev_report_stats.op_stat_bytes[i]) / ts_diff },
{ "iops", n / ts_diff },
};
}
for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++)
{
auto n = (msgr.stats.subop_stat_count[i] - prev_report_stats.subop_stat_count[i]);
subop_stats[osd_op_names[i]] = json11::Json::object {
{ "count", msgr.stats.subop_stat_count[i] },
{ "usec", msgr.stats.subop_stat_sum[i] },
{ "lat", (msgr.stats.subop_stat_sum[i] - prev_report_stats.subop_stat_sum[i]) / (n < 1 ? 1 : n) },
{ "iops", n / ts_diff },
};
}
st["op_stats"] = op_stats;
st["subop_stats"] = subop_stats;
auto n0 = recovery_stat[0].count - recovery_report_prev[0].count;
auto n1 = recovery_stat[1].count - recovery_report_prev[1].count;
st["recovery_stats"] = json11::Json::object {
{ recovery_stat_names[0], json11::Json::object {
{ "count", recovery_stat[0].count },
{ "bytes", recovery_stat[0].bytes },
{ "usec", recovery_stat[0].usec },
{ "lat", (recovery_stat[0].usec - recovery_report_prev[0].usec) / (n0 < 1 ? 1 : n0) },
{ "bps", (recovery_stat[0].bytes - recovery_report_prev[0].bytes) / ts_diff },
{ "iops", n0 / ts_diff },
} },
{ recovery_stat_names[1], json11::Json::object {
{ "count", recovery_stat[1].count },
{ "bytes", recovery_stat[1].bytes },
{ "usec", recovery_stat[1].usec },
{ "lat", (recovery_stat[1].usec - recovery_report_prev[1].usec) / (n1 < 1 ? 1 : n1) },
{ "bps", (recovery_stat[1].bytes - recovery_report_prev[1].bytes) / ts_diff },
{ "iops", n1 / ts_diff },
} },
};
prev_report_stats = msgr.stats;
memcpy(recovery_report_prev, recovery_stat, sizeof(recovery_stat));
return st;
}

View File

@@ -168,20 +168,15 @@ void osd_t::reset_pg(pg_t & pg)
dirty_pgs.erase({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
}
// Repeer on each connect/disconnect peer event
void osd_t::start_pg_peering(pg_t & pg)
// Drop connections of clients who have this PG in dirty_pgs
void osd_t::drop_dirty_pg_connections(pool_pg_num_t pg)
{
pg.state = PG_PEERING;
this->peering_state |= OSD_PEERING_PGS;
reset_pg(pg);
report_pg_state(pg);
// Drop connections of clients who have this PG in dirty_pgs
if (immediate_commit != IMMEDIATE_ALL)
{
std::vector<int> to_stop;
for (auto & cp: msgr.clients)
{
if (cp.second->dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) != cp.second->dirty_pgs.end())
if (cp.second->dirty_pgs.find(pg) != cp.second->dirty_pgs.end())
{
to_stop.push_back(cp.first);
}
@@ -191,6 +186,16 @@ void osd_t::start_pg_peering(pg_t & pg)
msgr.stop_client(peer_fd);
}
}
}
// Repeer on each connect/disconnect peer event
void osd_t::start_pg_peering(pg_t & pg)
{
pg.state = PG_PEERING;
this->peering_state |= OSD_PEERING_PGS;
reset_pg(pg);
report_pg_state(pg);
drop_dirty_pg_connections({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
// Try to connect with current peers if they're up, but we don't have connections to them
// Otherwise we may erroneously decide that the pg is incomplete :-)
for (auto pg_osd: pg.all_peers)
@@ -460,6 +465,7 @@ bool osd_t::stop_pg(pg_t & pg)
{
return false;
}
drop_dirty_pg_connections({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
if (!(pg.state & (PG_ACTIVE | PG_REPEERING)))
{
finish_stop_pg(pg);

View File

@@ -43,8 +43,7 @@ void configure_single_pg_pool(cluster_client_t *cli)
},
});
cli->st_cli.on_load_pgs_hook(true);
std::map<std::string, etcd_kv_t> changes;
cli->st_cli.on_change_hook(changes);
cli->st_cli.on_change_pool_config_hook();
}
int *test_write(cluster_client_t *cli, uint64_t offset, uint64_t len, uint8_t c, std::function<void()> cb = NULL, bool instant = false)
@@ -281,7 +280,8 @@ void test1()
uint8_t c = offset < 0xE000 ? 0x56 : (offset < 0x10000 ? 0x57 : 0x58);
if (((uint8_t*)op->iov.buf[buf_idx].iov_base)[i] != c)
{
printf("Write replay: mismatch at %ju\n", offset-op->req.rw.offset);
printf("Write replay: mismatch at %ju (expected %02x, have %02x)\n", offset-op->req.rw.offset,
c, ((uint8_t*)op->iov.buf[buf_idx].iov_base)[i]);
goto fail;
}
}
@@ -290,9 +290,9 @@ void test1()
assert(offset == op->req.rw.offset+op->req.rw.len);
replay_ops.push_back(op);
}
if (replay_start != 0 || replay_end != 0x14000)
if (replay_start != 0 || replay_end != 0x10000)
{
printf("Write replay: range mismatch: %jx-%jx\n", replay_start, replay_end);
printf("Write replay: range mismatch: 0x%jx-0x%jx (expected 0-0x10000)\n", replay_start, replay_end);
assert(0);
}
for (auto op: replay_ops)
@@ -320,8 +320,6 @@ void test1()
check_disconnected(cli, 1);
pretend_connected(cli, 1);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0);
check_op_count(cli, 1, 1);
can_complete(r1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0);
check_completed(r1);
@@ -341,7 +339,7 @@ void test1()
pretend_connected(cli, 1);
cli->continue_ops(true);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x2000), 0);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0);
check_op_count(cli, 1, 1);
can_complete(r2);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0x1000, 0x1000), 0);

View File

@@ -79,6 +79,7 @@ void ring_loop_t::loop()
struct io_uring_cqe *cqe;
while (!io_uring_peek_cqe(&ring, &cqe))
{
mu.lock();
struct ring_data_t *d = (struct ring_data_t*)cqe->user_data;
if (d->callback)
{
@@ -90,12 +91,14 @@ void ring_loop_t::loop()
dl.res = cqe->res;
dl.callback.swap(d->callback);
free_ring_data[free_ring_data_ptr++] = d - ring_datas;
mu.unlock();
dl.callback(&dl);
}
else
{
fprintf(stderr, "Warning: empty callback in SQE\n");
free_ring_data[free_ring_data_ptr++] = d - ring_datas;
mu.unlock();
}
io_uring_cqe_seen(&ring, cqe);
}

View File

@@ -14,6 +14,7 @@
#include <string>
#include <functional>
#include <vector>
#include <mutex>
#define RINGLOOP_DEFAULT_SIZE 1024
@@ -124,6 +125,7 @@ class ring_loop_t
std::vector<std::function<void()>> immediate_queue, immediate_queue2;
std::vector<ring_consumer_t*> consumers;
struct ring_data_t *ring_datas;
std::mutex mu;
int *free_ring_data;
unsigned free_ring_data_ptr;
bool loop_again;
@@ -138,12 +140,17 @@ public:
inline struct io_uring_sqe* get_sqe()
{
mu.lock();
if (free_ring_data_ptr == 0)
{
mu.unlock();
return NULL;
}
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
assert(sqe);
*sqe = { 0 };
io_uring_sqe_set_data(sqe, ring_datas + free_ring_data[--free_ring_data_ptr]);
mu.unlock();
return sqe;
}
inline void set_immediate(const std::function<void()> cb)

View File

@@ -151,10 +151,11 @@ static uint64_t size_thresh[] = { (uint64_t)1024*1024*1024*1024, (uint64_t)1024*
static uint64_t size_thresh_d[] = { (uint64_t)1000000000000, (uint64_t)1000000000, (uint64_t)1000000, (uint64_t)1000, 0 };
static const int size_thresh_n = sizeof(size_thresh)/sizeof(size_thresh[0]);
static const char *size_unit = "TGMKB";
static const char *size_unit_ns = "TGMk ";
std::string format_size(uint64_t size, bool nobytes)
std::string format_size(uint64_t size, bool nobytes, bool nospace)
{
uint64_t *thr = nobytes ? size_thresh_d : size_thresh;
uint64_t *thr = (nobytes ? size_thresh_d : size_thresh);
char buf[256];
for (int i = 0; i < size_thresh_n; i++)
{
@@ -165,9 +166,19 @@ std::string format_size(uint64_t size, bool nobytes)
assert(l < sizeof(buf)-2);
if (buf[l-1] == '0')
l -= 2;
buf[l] = i == size_thresh_n-1 && nobytes ? 0 : ' ';
buf[l+1] = i == size_thresh_n-1 && nobytes ? 0 : size_unit[i];
buf[l+2] = 0;
if (i == size_thresh_n-1 && nobytes)
buf[l] = 0;
else if (nospace)
{
buf[l] = size_unit_ns[i];
buf[l+1] = 0;
}
else
{
buf[l] = ' ';
buf[l+1] = size_unit[i];
buf[l+2] = 0;
}
break;
}
}

View File

@@ -16,7 +16,7 @@ std::string strtolower(const std::string & in);
std::string trim(const std::string & in, const char *rm_chars = " \n\r\t");
std::string str_replace(const std::string & in, const std::string & needle, const std::string & replacement);
uint64_t stoull_full(const std::string & str, int base = 0);
std::string format_size(uint64_t size, bool nobytes = false);
std::string format_size(uint64_t size, bool nobytes = false, bool nospace = false);
void print_help(const char *help_text, std::string exe_name, std::string cmd, bool all);
uint64_t parse_time(std::string time_str, bool *ok = NULL);
std::string read_all_fd(int fd);

View File

@@ -23,7 +23,7 @@ trap 'kill -9 $(jobs -p)' EXIT
ETCD=${ETCD:-etcd}
ETCD_IP=${ETCD_IP:-127.0.0.1}
ETCD_PORT=${ETCD_PORT:-12379}
ETCD_COUNT=${ETCD_COUNT:-0}
ETCD_COUNT=${ETCD_COUNT:-1}
if [ "$KEEP_DATA" = "" ]; then
rm -rf ./testdata
@@ -32,9 +32,12 @@ if [ "$KEEP_DATA" = "" ]; then
fi
ETCD_URL="http://$ETCD_IP:$ETCD_PORT"
ETCD_CLUSTER="etcd1=http://$ETCD_IP:$((ETCD_PORT+1))"
for i in $(seq 2 $ETCD_COUNT); do
ETCD_URL="$ETCD_URL,http://$ETCD_IP:$((ETCD_PORT+2*i-2))"
ETCD_CLUSTER="$ETCD_CLUSTER,etcd$i=http://$ETCD_IP:$((ETCD_PORT+2*i-1))"
done
ETCDCTL="${ETCD}ctl --endpoints=$ETCD_URL --dial-timeout=5s --command-timeout=10s"
start_etcd()
{
@@ -50,43 +53,15 @@ start_etcd()
eval ETCD${i}_PID=$!
}
start_etcd_cluster()
{
ETCD_CLUSTER="etcd1=http://$ETCD_IP:$((ETCD_PORT+1))"
for i in $(seq 2 $ETCD_COUNT); do
ETCD_CLUSTER="$ETCD_CLUSTER,etcd$i=http://$ETCD_IP:$((ETCD_PORT+2*i-1))"
done
for i in $(seq 1 $ETCD_COUNT); do
start_etcd $i
done
ETCDCTL="${ETCD}ctl --endpoints=$ETCD_URL --dial-timeout=5s --command-timeout=10s"
for i in {1..30}; do
${ETCD}ctl --endpoints=$ETCD_URL --dial-timeout=1s --command-timeout=1s member list >/dev/null && break
if [[ $i = 30 ]]; then
format_error "Failed to start etcd"
fi
done
}
wait_etcd()
{
for i in {1..30}; do
$ETCDCTL --dial-timeout=1s --command-timeout=1s get --prefix / && break
if [[ $i = 30 ]]; then
format_error "Failed to start etcd"
fi
sleep 1
done
}
if [[ "$ETCD_COUNT" -lt 1 ]]; then
ETCDCTL="node mon/node_modules/.bin/anticli -e $ETCD_URL"
MON_PARAMS="--use_antietcd 1 --antietcd_data_dir ./testdata --antietcd_persist_interval 500"
else
ETCDCTL="${ETCD}ctl --endpoints=$ETCD_URL --dial-timeout=5s --command-timeout=10s"
MON_PARAMS=""
start_etcd_cluster
fi
for i in $(seq 1 $ETCD_COUNT); do
start_etcd $i
done
for i in {1..30}; do
${ETCD}ctl --endpoints=$ETCD_URL --dial-timeout=1s --command-timeout=1s member list >/dev/null && break
if [[ $i = 30 ]]; then
format_error "Failed to start etcd"
fi
done
echo leak:fio >> testdata/lsan-suppress.txt
echo leak:tcmalloc >> testdata/lsan-suppress.txt

View File

@@ -18,11 +18,6 @@ else
OSD_COUNT=${OSD_COUNT:-3}
fi
node mon/mon-main.js $MON_PARAMS --etcd_address $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 >>./testdata/mon.log 2>&1 &
MON_PID=$!
sleep 3
if [ "$IMMEDIATE_COMMIT" != "" ]; then
NO_SAME="--journal_no_same_sector_overwrites true --journal_sector_buffer_count 1024 --disable_data_fsync 1 --immediate_commit all --log_level 10 --etcd_stats_interval 5"
$ETCDCTL put /vitastor/config/global '{"recovery_queue_depth":1,"recovery_tune_util_low":1,"immediate_commit":"all","client_enable_writeback":true,"client_max_writeback_iodepth":32'$GLOBAL_CONFIG'}'
@@ -59,6 +54,9 @@ for i in $(seq 1 $OSD_COUNT); do
start_osd $i
done
node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 >>./testdata/mon.log 2>&1 &
MON_PID=$!
if [ "$SCHEME" = "ec" ]; then
PG_SIZE=${PG_SIZE:-5}
PG_MINSIZE=${PG_MINSIZE:-4}

View File

@@ -2,10 +2,6 @@
. `dirname $0`/common.sh
node mon/mon-main.js $MON_PARAMS --etcd_address $ETCD_URL --etcd_prefix "/vitastor" >>./testdata/mon.log 2>&1 &
MON_PID=$!
wait_etcd
TIME=$(date '+%s')
$ETCDCTL put /vitastor/config/global '{"placement_levels":{"rack":1,"host":2,"osd":3}}'
$ETCDCTL put /vitastor/config/node_placement '{"rack1":{"level":"rack"},"rack2":{"level":"rack"},"host1":{"level":"host","parent":"rack1"},"host2":{"level":"host","parent":"rack1"},"host3":{"level":"host","parent":"rack2"},"host4":{"level":"host","parent":"rack2"}}'
@@ -26,9 +22,12 @@ $ETCDCTL get --print-value-only /vitastor/config/pools | jq -s -e '. == [{}]'
build/src/cmd/vitastor-cli --etcd_address $ETCD_URL create-pool testpool -s 2 -n 4 --failure_domain rack --force
$ETCDCTL get --print-value-only /vitastor/config/pools | jq -s -e '. == [{"1":{"name":"testpool","scheme":"replicated","pg_size":2,"pg_minsize":1,"pg_count":4,"failure_domain":"rack"}}]'
node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" >>./testdata/mon.log 2>&1 &
MON_PID=$!
sleep 2
$ETCDCTL get --prefix /vitastor/config/pgs --print-value-only | \
etcdctl --endpoints=http://localhost:12379 get --prefix /vitastor/config/pgs --print-value-only | \
jq -s -e '([ .[0].items["1"] | .[].osd_set | map_values(. | tonumber) | select((.[0] <= 4) != (.[1] <= 4)) ] | length) == 4'
format_green OK

View File

@@ -1,7 +1,5 @@
#!/bin/bash -ex
ETCD_COUNT=1
. `dirname $0`/common.sh
OSD_SIZE=1024

View File

@@ -2,10 +2,6 @@
. `dirname $0`/common.sh
node mon/mon-main.js $MON_PARAMS --etcd_address $ETCD_URL --etcd_prefix "/vitastor" >>./testdata/mon.log 2>&1 &
MON_PID=$!
wait_etcd
TIME=$(date '+%s')
$ETCDCTL put /vitastor/config/osd/1 '{"tags":["a"]}'
$ETCDCTL put /vitastor/config/osd/2 '{"tags":["a"]}'
@@ -25,12 +21,15 @@ $ETCDCTL put /vitastor/osd/stats/7 '{"host":"stor4","size":1073741824,"time":"'$
$ETCDCTL put /vitastor/osd/stats/8 '{"host":"stor4","size":1073741824,"time":"'$TIME'"}'
$ETCDCTL put /vitastor/config/pools '{"1":{"name":"testpool","scheme":"replicated","pg_size":2,"pg_minsize":2,"pg_count":16,"failure_domain":"host","osd_tags":["a"]}}'
node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" >>./testdata/mon.log 2>&1 &
MON_PID=$!
sleep 2
$ETCDCTL get --prefix /vitastor/config/pgs --print-value-only
etcdctl --endpoints=http://localhost:12379 get --prefix /vitastor/config/pgs --print-value-only
if ! ($ETCDCTL get --prefix /vitastor/config/pgs --print-value-only | \
jq -s -e '[ [ .[] | select(has("items")) | .items["1"] | .[].osd_set | map(. | select(. != "" and (.|tonumber) < 5)) ][] | select((. | length) == 2) ] | length == 16'); then
if ! (etcdctl --endpoints=http://localhost:12379 get --prefix /vitastor/config/pgs --print-value-only | \
jq -s -e '[ [ .[0].items["1"] | .[].osd_set | map(. | select(. != "" and (.|tonumber) < 5)) ][] | select((. | length) == 2) ] | length == 16'); then
format_error "Some PGs missing replicas"
fi

View File

@@ -2,10 +2,6 @@
. `dirname $0`/common.sh
node mon/mon-main.js $MON_PARAMS --etcd_address $ETCD_URL --etcd_prefix "/vitastor" >>./testdata/mon.log 2>&1 &
MON_PID=$!
wait_etcd
TIME=$(date '+%s')
$ETCDCTL put /vitastor/config/global '{"placement_levels":{"rack":100,"host":101,"osd":102}}'
$ETCDCTL put /vitastor/config/node_placement '{"rack1":{"level":"rack"},"rack2":{"level":"rack"},"stor1":{"level":"host","parent":"rack1"},"stor2":{"level":"host","parent":"rack1"},"stor3":{"level":"host","parent":"rack2"},"stor4":{"level":"host","parent":"rack2"}}'
@@ -19,11 +15,14 @@ $ETCDCTL put /vitastor/osd/stats/7 '{"host":"stor4","size":1073741824,"time":"'$
$ETCDCTL put /vitastor/osd/stats/8 '{"host":"stor4","size":1073741824,"time":"'$TIME'"}'
$ETCDCTL put /vitastor/config/pools '{"1":{"name":"testpool","scheme":"replicated","pg_size":2,"pg_minsize":2,"pg_count":16,"failure_domain":"host","root_node":"rack1"}}'
node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" >>./testdata/mon.log 2>&1 &
MON_PID=$!
sleep 2
$ETCDCTL get --prefix /vitastor/config/pgs --print-value-only
etcdctl --endpoints=http://localhost:12379 get --prefix /vitastor/config/pgs --print-value-only
if ! ($ETCDCTL get --prefix /vitastor/config/pgs --print-value-only | \
if ! (etcdctl --endpoints=http://localhost:12379 get --prefix /vitastor/config/pgs --print-value-only | \
jq -s -e '[ [ .[0].items["1"] | .[].osd_set | map(. | select(. != "" and (.|tonumber) < 5)) ][] | select((. | length) == 2) ] | length == 16'); then
format_error "Some PGs missing replicas"
fi

View File

@@ -3,13 +3,9 @@
export KEEP_DATA=1
. `dirname $0`/common.sh
node mon/mon-main.js $MON_PARAMS --etcd_address $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 >>./testdata/mon.log 2>&1 &
MON_PID=$!
wait_etcd
$ETCDCTL del --prefix /vitastor/mon/master
$ETCDCTL del --prefix /vitastor/pg/state
$ETCDCTL del --prefix /vitastor/osd/state
etcdctl --endpoints=http://127.0.0.1:12379/v3 del --prefix /vitastor/mon/master
etcdctl --endpoints=http://127.0.0.1:12379/v3 del --prefix /vitastor/pg/state
etcdctl --endpoints=http://127.0.0.1:12379/v3 del --prefix /vitastor/osd/state
OSD_COUNT=3
OSD_ARGS="$OSD_ARGS"
@@ -19,6 +15,9 @@ for i in $(seq 1 $OSD_COUNT); do
eval OSD${i}_PID=$!
done
node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 >>./testdata/mon.log 2>&1 &
MON_PID=$!
sleep 3
if ! ($ETCDCTL get /vitastor/pg/state/1/1 --print-value-only | jq -s -e '(. | length) != 0 and .[0].state == ["active"]'); then