Compare commits

..

16 Commits

Author SHA1 Message Date
9b3bdb13aa Run most tests with internal antietcd 2024-06-12 15:33:45 +03:00
bf9dbfc52f Implement experimental antietcd-based version of monitor 2024-06-11 12:53:46 +03:00
6501abc060 Set default etcd_ws_keepalive_interval to 5 2024-06-08 00:38:48 +03:00
1228403e74 Implement internal restart / run_forever in monitor 2024-06-08 00:35:18 +03:00
4eabebd245 Put all configuration to Mon.config 2024-06-07 00:20:38 +03:00
cf60b6818c Extract PG generation into pg_gen.js 2024-06-05 11:22:06 +03:00
1a4a7cdc37 Extract OSD Tree generation functions to osd_tree.js 2024-06-05 11:19:35 +03:00
1b48085e21 Extract remote etcd interaction to etcd_adapter.js 2024-06-05 11:19:35 +03:00
a71847244e Rename PGUtil.js to pg_utils.js 2024-06-05 10:51:20 +03:00
848c2d2722 Move LPOptimizer, DSL and tests to lp_optimizer/ 2024-06-05 10:51:20 +03:00
86832dc43f Add eslint import/no-unresolved 2024-06-05 10:51:20 +03:00
1f6da79463 Extract stats calculation into a separate file 2024-06-05 10:51:20 +03:00
9bf57c3760 Mention generic Toshiba MG instead of specific MGxx, fix russian vitastorfs link 2024-06-05 02:08:09 +03:00
a0305b5b4a Extract pool configuration validation into a separate file 2024-06-05 02:08:08 +03:00
1546f8e447 Extract etcd data "schema" into a separate file 2024-06-05 02:07:53 +03:00
8ce962b312 Move scripts 2024-06-05 02:07:53 +03:00
65 changed files with 2304 additions and 2953 deletions

View File

@@ -1,2 +1,3 @@
mon usr/lib/vitastor
mon/vitastor-mon.service /lib/systemd/system
mon usr/lib/vitastor/mon
mon/scripts/make-etcd usr/lib/vitastor/mon
mon/scripts/vitastor-mon.service /lib/systemd/system

View File

@@ -1,6 +1,6 @@
usr/bin/vitastor-osd
usr/bin/vitastor-disk
usr/bin/vitastor-dump-journal
mon/vitastor-osd@.service /lib/systemd/system
mon/vitastor.target /lib/systemd/system
mon/90-vitastor.rules /lib/udev/rules.d
mon/scripts/vitastor-osd@.service /lib/systemd/system
mon/scripts/vitastor.target /lib/systemd/system
mon/scripts/90-vitastor.rules /lib/udev/rules.d

View File

@@ -248,7 +248,7 @@ etcd_report_interval to guarantee that keepalive actually works.
## etcd_ws_keepalive_interval
- Type: seconds
- Default: 30
- Default: 5
- Can be changed online: yes
etcd websocket ping interval required to keep the connection alive and

View File

@@ -259,7 +259,7 @@ etcd_report_interval, чтобы keepalive гарантированно рабо
## etcd_ws_keepalive_interval
- Тип: секунды
- Значение по умолчанию: 30
- Значение по умолчанию: 5
- Можно менять на лету: да
Интервал проверки живости вебсокет-подключений к etcd.

View File

@@ -282,7 +282,7 @@
etcd_report_interval, чтобы keepalive гарантированно работал.
- name: etcd_ws_keepalive_interval
type: sec
default: 30
default: 5
online: true
info: |
etcd websocket ping interval required to keep the connection alive and

View File

@@ -22,7 +22,7 @@
with lazy fsync, but prepare for inferior single-thread latency. Read more about capacitors
[here](../config/layout-cluster.en.md#immediate_commit).
- If you want to use HDDs, get modern HDDs with Media Cache or SSD Cache: HGST Ultrastar,
Toshiba MG08, Seagate EXOS or something similar. If your drives don't have such cache then
Toshiba MG, Seagate EXOS or something similar. If your drives don't have such cache then
you also need small SSDs for journal and metadata (even 2 GB per 1 TB of HDD space is enough).
- Get a fast network (at least 10 Gbit/s). Something like Mellanox ConnectX-4 with RoCEv2 is ideal.
- Disable CPU powersaving: `cpupower idle-set -D 0 && cpupower frequency-set -g performance`.
@@ -33,7 +33,7 @@
- SATA SSD: Micron 5100/5200/5300/5400, Samsung PM863/PM883/PM893, Intel D3-S4510/4520/4610/4620, Kingston DC500M
- NVMe: Micron 9100/9200/9300/9400, Micron 7300/7450, Samsung PM983/PM9A3, Samsung PM1723/1735/1743,
Intel DC-P3700/P4500/P4600, Intel D7-P5500/P5600, Intel Optane, Kingston DC1000B/DC1500M
- HDD: HGST Ultrastar, Toshiba MG06/MG07/MG08, Seagate EXOS
- HDD: HGST Ultrastar, Toshiba MG, Seagate EXOS
## Configure monitors

View File

@@ -123,4 +123,4 @@ vitastor-cli create -s 10G testimg
Если вы хотите использовать не только блочные образы виртуальных машин или контейнеров,
а также кластерную файловую систему, то:
- [Следуйте инструкциям](../usage/nfs.en.md#vitastorfs)
- [Следуйте инструкциям](../usage/nfs.ru.md#vitastorfs)

View File

@@ -11,6 +11,7 @@ module.exports = {
"ecmaVersion": 2020
},
"plugins": [
"import"
],
"rules": {
"indent": [
@@ -44,6 +45,10 @@ module.exports = {
],
"node/shebang": [
"off"
],
"import/no-unresolved": [
2,
{ "commonjs": true }
]
}
};

191
mon/antietcd_adapter.js Normal file
View File

@@ -0,0 +1,191 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
const fs = require('fs');
const AntiEtcd = require('antietcd');
const vitastor_persist_filter = require('./vitastor_persist_filter.js');
const { b64, local_ips } = require('./utils.js');
class AntiEtcdAdapter
{
static start_antietcd(config)
{
let antietcd;
if (config.use_antietcd)
{
let fileConfig = {};
if (fs.existsSync(config.config_path||'/etc/vitastor/vitastor.conf'))
{
fileConfig = JSON.parse(fs.readFileSync(config.config_path||'/etc/vitastor/vitastor.conf', { encoding: 'utf-8' }));
}
let mergedConfig = { ...fileConfig, ...config };
let cluster = mergedConfig.etcd_address;
if (!(cluster instanceof Array))
cluster = cluster ? (''+(cluster||'')).split(/,+/) : [];
cluster = Object.keys(cluster.reduce((a, url) =>
{
a[url.toLowerCase().replace(/^https?:\/\//, '').replace(/\/.*$/, '')] = true;
return a;
}, {}));
const cfg_port = mergedConfig.antietcd_port;
const is_local = local_ips(true).reduce((a, c) => { a[c] = true; return a; }, {});
const selected = cluster.map(s => s.split(':', 2)).filter(ip => is_local[ip[0]] && (!cfg_port || ip[1] == cfg_port));
if (selected.length > 1)
{
console.error('More than 1 etcd_address matches local IPs, please specify port');
process.exit(1);
}
else if (selected.length == 1)
{
const antietcd_config = {
ip: selected[0][0],
port: selected[0][1],
data: mergedConfig.antietcd_data_file || ((mergedConfig.antietcd_data_dir || '/var/lib/vitastor') + '/mon_'+selected[0][1]+'.json.gz'),
persist_filter: vitastor_persist_filter(mergedConfig.etcd_prefix || '/vitastor'),
node_id: selected[0][0]+':'+selected[0][1], // node_id = ip:port
cluster: (cluster.length == 1 ? null : cluster),
cluster_key: (mergedConfig.etcd_prefix || '/vitastor'),
stale_read: 1,
};
for (const key in config)
{
if (key.substr(0, 9) === 'antietcd_')
{
const noprefix = key.substr(9);
if (!(noprefix in antietcd_config))
{
antietcd_config[noprefix] = config[key];
}
}
}
antietcd = new AntiEtcd(antietcd_config);
antietcd.start();
}
else
{
console.log('Antietcd is enabled, but etcd_address does not contain local IPs, proceeding without it');
}
}
return antietcd;
}
constructor(mon, antietcd)
{
this.mon = mon;
this.antietcd = antietcd;
this.on_leader = [];
this.on_change = (st) =>
{
if (st.state === 'leader')
{
for (const cb of this.on_leader)
{
cb();
}
this.on_leader = [];
}
};
this.antietcd.on('raftchange', this.on_change);
}
parse_config(/*config*/)
{
}
stop_watcher()
{
this.antietcd.off('raftchange', this.on_change);
const watch_id = this.watch_id;
if (watch_id)
{
this.watch_id = null;
this.antietcd.cancel_watch(watch_id).catch(console.error);
}
}
async start_watcher()
{
if (this.watch_id)
{
await this.antietcd.cancel_watch(this.watch_id);
this.watch_id = null;
}
const watch_id = await this.antietcd.create_watch({
key: b64(this.mon.config.etcd_prefix+'/'),
range_end: b64(this.mon.config.etcd_prefix+'0'),
start_revision: ''+this.mon.etcd_watch_revision,
watch_id: 1,
progress_notify: true,
}, (message) =>
{
this.mon.on_message(message.result);
});
console.log('Successfully subscribed to antietcd revision '+this.antietcd.etctree.mod_revision);
this.watch_id = watch_id;
}
async become_master()
{
if (!this.antietcd.raft)
{
console.log('Running in non-clustered mode');
}
else
{
console.log('Waiting to become master');
await new Promise(ok => this.on_leader.push(ok));
}
const state = { ...this.mon.get_mon_state(), id: ''+this.mon.etcd_lease_id };
await this.etcd_call('/kv/txn', {
success: [ { requestPut: { key: b64(this.mon.config.etcd_prefix+'/mon/master'), value: b64(JSON.stringify(state)), lease: ''+this.mon.etcd_lease_id } } ],
}, this.mon.config.etcd_start_timeout, 0);
if (this.antietcd.raft)
{
console.log('Became master');
}
}
async etcd_call(path, body, timeout, retries)
{
let retry = 0;
if (retries >= 0 && retries < 1)
{
retries = 1;
}
let prev = 0;
while (retries < 0 || retry < retries)
{
retry++;
if (this.mon.stopped)
{
throw new Error('Monitor instance is stopped');
}
try
{
if (Date.now()-prev < timeout)
{
await new Promise(ok => setTimeout(ok, timeout-(Date.now()-prev)));
}
prev = Date.now();
const res = await this.antietcd.api(path.replace(/^\/+/, '').replace(/\/+$/, '').replace(/\/+/g, '_'), body);
if (res.error)
{
console.error('Failed to query antietcd '+path+' (retry '+retry+'/'+retries+'): '+res.error);
}
else
{
return res;
}
}
catch (e)
{
console.error('Failed to query antietcd '+path+' (retry '+retry+'/'+retries+'): '+e.stack);
}
}
throw new Error('Failed to query antietcd ('+retries+' retries)');
}
}
module.exports = AntiEtcdAdapter;

352
mon/etcd_adapter.js Normal file
View File

@@ -0,0 +1,352 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
const http = require('http');
const WebSocket = require('ws');
const { b64, local_ips } = require('./utils.js');
const MON_STOPPED = 'Monitor instance is stopped';
class EtcdAdapter
{
constructor(mon)
{
this.mon = mon;
this.ws = null;
this.ws_alive = false;
this.ws_keepalive_timer = null;
}
parse_config(config)
{
this.parse_etcd_addresses(config.etcd_address||config.etcd_url);
}
parse_etcd_addresses(addrs)
{
const is_local_ip = local_ips(true).reduce((a, c) => { a[c] = true; return a; }, {});
this.etcd_local = [];
this.etcd_urls = [];
this.selected_etcd_url = null;
this.etcd_urls_to_try = [];
if (!(addrs instanceof Array))
addrs = addrs ? (''+(addrs||'')).split(/,/) : [];
if (!addrs.length)
{
console.error('Vitastor etcd address(es) not specified. Please set on the command line or in the config file');
process.exit(1);
}
for (let url of addrs)
{
let scheme = 'http';
url = url.trim().replace(/^(https?):\/\//, (m, m1) => { scheme = m1; return ''; });
const slash = url.indexOf('/');
const colon = url.indexOf(':');
const is_local = is_local_ip[colon >= 0 ? url.substr(0, colon) : (slash >= 0 ? url.substr(0, slash) : url)];
url = scheme+'://'+(slash >= 0 ? url : url+'/v3');
if (is_local)
this.etcd_local.push(url);
else
this.etcd_urls.push(url);
}
}
pick_next_etcd()
{
if (this.selected_etcd_url)
return this.selected_etcd_url;
if (!this.etcd_urls_to_try || !this.etcd_urls_to_try.length)
{
this.etcd_urls_to_try = [ ...this.etcd_local ];
const others = [ ...this.etcd_urls ];
while (others.length)
{
const url = others.splice(0|(others.length*Math.random()), 1);
this.etcd_urls_to_try.push(url[0]);
}
}
this.selected_etcd_url = this.etcd_urls_to_try.shift();
return this.selected_etcd_url;
}
stop_watcher(cur_addr)
{
cur_addr = cur_addr || this.selected_etcd_url;
if (this.ws)
{
console.log('Disconnected from etcd at '+this.ws_used_url);
this.ws.close();
this.ws = null;
}
if (this.ws_keepalive_timer)
{
clearInterval(this.ws_keepalive_timer);
this.ws_keepalive_timer = null;
}
if (this.selected_etcd_url == cur_addr)
{
this.selected_etcd_url = null;
}
}
restart_watcher(cur_addr)
{
this.stop_watcher(cur_addr);
this.start_watcher(this.mon.config.etcd_mon_retries).catch(this.mon.die);
}
async start_watcher(retries)
{
let retry = 0;
if (!retries || retries < 1)
{
retries = 1;
}
const tried = {};
while (retries < 0 || retry < retries)
{
const cur_addr = this.pick_next_etcd();
const base = 'ws'+cur_addr.substr(4);
let now = Date.now();
if (tried[base] && now-tried[base] < this.mon.config.etcd_start_timeout)
{
await new Promise(ok => setTimeout(ok, this.mon.config.etcd_start_timeout-(now-tried[base])));
now = Date.now();
}
tried[base] = now;
if (this.mon.stopped)
{
return;
}
const ok = await new Promise(ok =>
{
const timer_id = setTimeout(() =>
{
if (this.ws)
{
console.log('Disconnected from etcd at '+this.ws_used_url);
this.ws.close();
this.ws = null;
}
ok(false);
}, this.mon.config.etcd_mon_timeout);
this.ws = new WebSocket(base+'/watch');
this.ws_used_url = cur_addr;
const fail = () =>
{
ok(false);
};
this.ws.on('error', fail);
this.ws.on('open', () =>
{
this.ws.removeListener('error', fail);
if (timer_id)
clearTimeout(timer_id);
ok(true);
});
});
if (ok)
break;
if (this.selected_etcd_url == cur_addr)
this.selected_etcd_url = null;
this.ws = null;
retry++;
}
if (!this.ws)
{
this.mon.die('Failed to open etcd watch websocket');
return;
}
if (this.mon.stopped)
{
this.stop_watcher();
return;
}
const cur_addr = this.selected_etcd_url;
this.ws_alive = true;
this.ws_keepalive_timer = setInterval(() =>
{
if (this.ws_alive && this.ws)
{
this.ws_alive = false;
this.ws.send(JSON.stringify({ progress_request: {} }));
}
else
{
console.log('etcd websocket timed out, restarting it');
this.restart_watcher(cur_addr);
}
}, (Number(this.mon.config.etcd_ws_keepalive_interval) || 5)*1000);
this.ws.on('error', () => this.restart_watcher(cur_addr));
this.ws.send(JSON.stringify({
create_request: {
key: b64(this.mon.config.etcd_prefix+'/'),
range_end: b64(this.mon.config.etcd_prefix+'0'),
start_revision: ''+this.mon.etcd_watch_revision,
watch_id: 1,
progress_notify: true,
},
}));
this.ws.on('message', (msg) =>
{
if (this.mon.stopped)
{
this.stop_watcher();
return;
}
this.ws_alive = true;
let data;
try
{
data = JSON.parse(msg);
}
catch (e)
{
}
if (!data || !data.result)
{
console.error('Unknown message received from watch websocket: '+msg);
}
else if (data.result.canceled)
{
// etcd watch canceled
if (data.result.compact_revision)
{
// we may miss events if we proceed
this.mon.die('Revisions before '+data.result.compact_revision+' were compacted by etcd, exiting');
}
this.mon.die('Watch canceled by etcd, reason: '+data.result.cancel_reason+', exiting');
}
else if (data.result.created)
{
// etcd watch created
console.log('Successfully subscribed to etcd at '+this.selected_etcd_url+', revision '+data.result.header.revision);
}
else
{
this.mon.on_message(data.result);
}
});
}
async become_master()
{
const state = { ...this.mon.get_mon_state(), id: ''+this.mon.etcd_lease_id };
// eslint-disable-next-line no-constant-condition
while (1)
{
const res = await this.etcd_call('/kv/txn', {
compare: [ { target: 'CREATE', create_revision: 0, key: b64(this.mon.config.etcd_prefix+'/mon/master') } ],
success: [ { requestPut: { key: b64(this.mon.config.etcd_prefix+'/mon/master'), value: b64(JSON.stringify(state)), lease: ''+this.mon.etcd_lease_id } } ],
}, this.mon.config.etcd_start_timeout, 0);
if (res.succeeded)
{
break;
}
console.log('Waiting to become master');
await new Promise(ok => setTimeout(ok, this.mon.config.etcd_start_timeout));
}
console.log('Became master');
}
async etcd_call(path, body, timeout, retries)
{
let retry = 0;
if (retries >= 0 && retries < 1)
{
retries = 1;
}
const tried = {};
while (retries < 0 || retry < retries)
{
retry++;
const base = this.pick_next_etcd();
let now = Date.now();
if (tried[base] && now-tried[base] < timeout)
{
await new Promise(ok => setTimeout(ok, timeout-(now-tried[base])));
now = Date.now();
}
tried[base] = now;
if (this.mon.stopped)
{
throw new Error(MON_STOPPED);
}
const res = await POST(base+path, body, timeout);
if (this.mon.stopped)
{
throw new Error(MON_STOPPED);
}
if (res.error)
{
if (this.selected_etcd_url == base)
this.selected_etcd_url = null;
console.error('Failed to query etcd '+path+' (retry '+retry+'/'+retries+'): '+res.error);
continue;
}
if (res.json)
{
if (res.json.error)
{
console.error(path+': etcd returned error: '+res.json.error);
break;
}
return res.json;
}
}
throw new Error('Failed to query etcd ('+retries+' retries)');
}
}
function POST(url, body, timeout)
{
return new Promise(ok =>
{
const body_text = Buffer.from(JSON.stringify(body));
let timer_id = timeout > 0 ? setTimeout(() =>
{
if (req)
req.abort();
req = null;
ok({ error: 'timeout' });
}, timeout) : null;
let req = http.request(url, { method: 'POST', headers: {
'Content-Type': 'application/json',
'Content-Length': body_text.length,
} }, (res) =>
{
if (!req)
{
return;
}
clearTimeout(timer_id);
let res_body = '';
res.setEncoding('utf8');
res.on('error', (error) => ok({ error }));
res.on('data', chunk => { res_body += chunk; });
res.on('end', () =>
{
if (res.statusCode != 200)
{
ok({ error: res_body, code: res.statusCode });
return;
}
try
{
res_body = JSON.parse(res_body);
ok({ response: res, json: res_body });
}
catch (e)
{
ok({ error: e, response: res, body: res_body });
}
});
});
req.on('error', (error) => ok({ error }));
req.on('close', () => ok({ error: new Error('Connection closed prematurely') }));
req.write(body_text);
req.end();
});
}
module.exports = EtcdAdapter;

391
mon/etcd_schema.js Normal file
View File

@@ -0,0 +1,391 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
// FIXME document all etcd keys and config variables in the form of JSON schema or similar
const etcd_nonempty_keys = {
'config/global': 1,
'config/node_placement': 1,
'config/pools': 1,
'config/pgs': 1,
'history/last_clean_pgs': 1,
'stats': 1,
};
const etcd_allow = new RegExp('^'+[
'config/global',
'config/node_placement',
'config/pools',
'config/osd/[1-9]\\d*',
'config/pgs',
'config/inode/[1-9]\\d*/[1-9]\\d*',
'osd/state/[1-9]\\d*',
'osd/stats/[1-9]\\d*',
'osd/inodestats/[1-9]\\d*',
'osd/space/[1-9]\\d*',
'mon/master',
'mon/member/[a-f0-9]+',
'pg/state/[1-9]\\d*/[1-9]\\d*',
'pg/stats/[1-9]\\d*/[1-9]\\d*',
'pg/history/[1-9]\\d*/[1-9]\\d*',
'history/last_clean_pgs',
'inode/stats/[1-9]\\d*/\\d+',
'pool/stats/[1-9]\\d*',
'stats',
'index/image/.*',
'index/maxid/[1-9]\\d*',
].join('$|^')+'$');
const etcd_tree = {
config: {
/* global: {
// WARNING: NOT ALL OF THESE ARE ACTUALLY CONFIGURABLE HERE
// THIS IS JUST A POOR MAN'S CONFIG DOCUMENTATION
// etcd connection
config_path: "/etc/vitastor/vitastor.conf",
etcd_prefix: "/vitastor",
// etcd connection - configurable online
etcd_address: "10.0.115.10:2379/v3",
// mon
etcd_mon_ttl: 5, // min: 1
etcd_mon_timeout: 1000, // ms. min: 0
etcd_mon_retries: 5, // min: 0
mon_change_timeout: 1000, // ms. min: 100
mon_retry_change_timeout: 50, // ms. min: 10
mon_stats_timeout: 1000, // ms. min: 100
osd_out_time: 600, // seconds. min: 0
placement_levels: { datacenter: 1, rack: 2, host: 3, osd: 4, ... },
use_old_pg_combinator: false,
// client and osd
tcp_header_buffer_size: 65536,
use_sync_send_recv: false,
use_rdma: true,
rdma_device: null, // for example, "rocep5s0f0"
rdma_port_num: 1,
rdma_gid_index: 0,
rdma_mtu: 4096,
rdma_max_sge: 128,
rdma_max_send: 8,
rdma_max_recv: 16,
rdma_max_msg: 132096,
block_size: 131072,
disk_alignment: 4096,
bitmap_granularity: 4096,
immediate_commit: false, // 'all' or 'small'
// client - configurable online
client_max_dirty_bytes: 33554432,
client_max_dirty_ops: 1024,
client_enable_writeback: false,
client_max_buffered_bytes: 33554432,
client_max_buffered_ops: 1024,
client_max_writeback_iodepth: 256,
client_retry_interval: 50, // ms. min: 10
client_eio_retry_interval: 1000, // ms
client_retry_enospc: true,
osd_nearfull_ratio: 0.95,
// client and osd - configurable online
log_level: 0,
peer_connect_interval: 5, // seconds. min: 1
peer_connect_timeout: 5, // seconds. min: 1
osd_idle_timeout: 5, // seconds. min: 1
osd_ping_timeout: 5, // seconds. min: 1
max_etcd_attempts: 5,
etcd_quick_timeout: 1000, // ms
etcd_slow_timeout: 5000, // ms
etcd_keepalive_timeout: 30, // seconds, default is max(30, etcd_report_interval*2)
etcd_ws_keepalive_interval: 5, // seconds
// osd
etcd_report_interval: 5, // seconds
etcd_stats_interval: 30, // seconds
run_primary: true,
osd_network: null, // "192.168.7.0/24" or an array of masks
bind_address: "0.0.0.0",
bind_port: 0,
readonly: false,
osd_memlock: false,
// osd - configurable online
autosync_interval: 5,
autosync_writes: 128,
client_queue_depth: 128, // unused
recovery_queue_depth: 1,
recovery_sleep_us: 0,
recovery_tune_util_low: 0.1,
recovery_tune_client_util_low: 0,
recovery_tune_util_high: 1.0,
recovery_tune_client_util_high: 0.5,
recovery_tune_interval: 1,
recovery_tune_agg_interval: 10, // 10 times recovery_tune_interval
recovery_tune_sleep_min_us: 10, // 10 microseconds
recovery_pg_switch: 128,
recovery_sync_batch: 16,
no_recovery: false,
no_rebalance: false,
print_stats_interval: 3,
slow_log_interval: 10,
inode_vanish_time: 60,
auto_scrub: false,
no_scrub: false,
scrub_interval: '30d', // 1s/1m/1h/1d
scrub_queue_depth: 1,
scrub_sleep: 0, // milliseconds
scrub_list_limit: 1000, // objects to list on one scrub iteration
scrub_find_best: true,
scrub_ec_max_bruteforce: 100, // maximum EC error locator brute-force iterators
// blockstore - fixed in superblock
block_size,
disk_alignment,
journal_block_size,
meta_block_size,
bitmap_granularity,
journal_device,
journal_offset,
journal_size,
disable_journal_fsync,
data_device,
data_offset,
data_size,
disable_data_fsync,
meta_device,
meta_offset,
disable_meta_fsync,
disable_device_lock,
// blockstore - configurable offline
inmemory_metadata,
inmemory_journal,
journal_sector_buffer_count,
journal_no_same_sector_overwrites,
// blockstore - configurable online
max_write_iodepth,
min_flusher_count: 1,
max_flusher_count: 256,
throttle_small_writes: false,
throttle_target_iops: 100,
throttle_target_mbs: 100,
throttle_target_parallelism: 1,
throttle_threshold_us: 50,
}, */
global: {},
/* node_placement: {
host1: { level: 'host', parent: 'rack1' },
...
}, */
node_placement: {},
/* pools: {
<id>: {
name: 'testpool',
// 'ec' uses Reed-Solomon-Vandermonde codes, 'jerasure' is an alias for 'ec'
scheme: 'replicated' | 'xor' | 'ec' | 'jerasure',
pg_size: 3,
pg_minsize: 2,
// number of parity chunks, required for EC
parity_chunks?: 1,
pg_count: 100,
// default is failure_domain=host
failure_domain?: 'host',
// additional failure domain rules; failure_domain=x is equivalent to x=123..N
level_placement?: 'dc=112233 host=123456',
raw_placement?: 'any, dc=1 host!=1, dc=1 host!=(1,2)',
old_combinator: false,
max_osd_combinations: 10000,
// block_size, bitmap_granularity, immediate_commit must match all OSDs used in that pool
block_size: 131072,
bitmap_granularity: 4096,
// 'all'/'small'/'none', same as in OSD options
immediate_commit: 'none',
pg_stripe_size: 0,
root_node?: 'rack1',
// restrict pool to OSDs having all of these tags
osd_tags?: 'nvme' | [ 'nvme', ... ],
// prefer to put primary on OSD with these tags
primary_affinity_tags?: 'nvme' | [ 'nvme', ... ],
// scrub interval
scrub_interval?: '30d',
},
...
}, */
pools: {},
osd: {
/* <id>: { reweight?: 1, tags?: [ 'nvme', ... ], noout?: true }, ... */
},
/* pgs: {
hash: string,
items: {
<pool_id>: {
<pg_id>: {
osd_set: [ 1, 2, 3 ],
primary: 1,
pause: false,
}
}
}
}, */
pgs: {},
/* inode: {
<pool_id>: {
<inode_t>: {
name: string,
size?: uint64_t, // bytes
parent_pool?: <pool_id>,
parent_id?: <inode_t>,
readonly?: boolean,
}
}
}, */
inode: {},
},
osd: {
state: {
/* <osd_num_t>: {
state: "up",
addresses: string[],
host: string,
port: uint16_t,
primary_enabled: boolean,
blockstore_enabled: boolean,
}, */
},
stats: {
/* <osd_num_t>: {
time: number, // unix time
blockstore_ready: boolean,
size: uint64_t, // bytes
free: uint64_t, // bytes
host: string,
op_stats: {
<string>: { count: uint64_t, usec: uint64_t, bytes: uint64_t },
},
subop_stats: {
<string>: { count: uint64_t, usec: uint64_t },
},
recovery_stats: {
degraded: { count: uint64_t, bytes: uint64_t },
misplaced: { count: uint64_t, bytes: uint64_t },
},
}, */
},
inodestats: {
/* <pool_id>: {
<inode_t>: {
read: { count: uint64_t, usec: uint64_t, bytes: uint64_t },
write: { count: uint64_t, usec: uint64_t, bytes: uint64_t },
delete: { count: uint64_t, usec: uint64_t, bytes: uint64_t },
},
}, */
},
space: {
/* <osd_num_t>: {
<pool_id>: {
<inode_t>: uint64_t, // bytes
},
}, */
},
},
mon: {
master: {
/* ip: [ string ], id: uint64_t */
},
standby: {
/* <uint64_t>: { ip: [ string ] }, */
},
},
pg: {
state: {
/* <pool_id>: {
<pg_id>: {
primary: osd_num_t,
state: ("starting"|"peering"|"incomplete"|"active"|"repeering"|"stopping"|"offline"|
"degraded"|"has_incomplete"|"has_degraded"|"has_misplaced"|"has_unclean"|
"has_invalid"|"has_inconsistent"|"has_corrupted"|"left_on_dead"|"scrubbing")[],
}
}, */
},
stats: {
/* <pool_id>: {
<pg_id>: {
object_count: uint64_t,
clean_count: uint64_t,
misplaced_count: uint64_t,
degraded_count: uint64_t,
incomplete_count: uint64_t,
write_osd_set: osd_num_t[],
},
}, */
},
history: {
/* <pool_id>: {
<pg_id>: {
osd_sets: osd_num_t[][],
all_peers: osd_num_t[],
epoch: uint64_t,
next_scrub: uint64_t,
},
}, */
},
},
inode: {
stats: {
/* <pool_id>: {
<inode_t>: {
raw_used: uint64_t, // raw used bytes on OSDs
read: { count: uint64_t, usec: uint64_t, bytes: uint64_t, bps: uint64_t, iops: uint64_t, lat: uint64_t },
write: { count: uint64_t, usec: uint64_t, bytes: uint64_t, bps: uint64_t, iops: uint64_t, lat: uint64_t },
delete: { count: uint64_t, usec: uint64_t, bytes: uint64_t, bps: uint64_t, iops: uint64_t, lat: uint64_t },
},
}, */
},
},
pool: {
stats: {
/* <pool_id>: {
used_raw_tb: float, // used raw space in the pool
total_raw_tb: float, // maximum amount of space in the pool
raw_to_usable: float, // raw to usable ratio
space_efficiency: float, // 0..1
} */
},
},
stats: {
/* op_stats: {
<string>: { count: uint64_t, usec: uint64_t, bytes: uint64_t, bps: uint64_t, iops: uint64_t, lat: uint64_t },
},
subop_stats: {
<string>: { count: uint64_t, usec: uint64_t, iops: uint64_t, lat: uint64_t },
},
recovery_stats: {
degraded: { count: uint64_t, bytes: uint64_t, bps: uint64_t, iops: uint64_t },
misplaced: { count: uint64_t, bytes: uint64_t, bps: uint64_t, iops: uint64_t },
},
object_counts: {
object: uint64_t,
clean: uint64_t,
misplaced: uint64_t,
degraded: uint64_t,
incomplete: uint64_t,
},
object_bytes: {
total: uint64_t,
clean: uint64_t,
misplaced: uint64_t,
degraded: uint64_t,
incomplete: uint64_t,
}, */
},
history: {
last_clean_pgs: {},
},
index: {
image: {
/* <name>: {
id: uint64_t,
pool_id: uint64_t,
}, */
},
maxid: {
/* <pool_id>: uint64_t, */
},
},
};
module.exports = {
etcd_nonempty_keys,
etcd_allow,
etcd_tree,
};

View File

@@ -8,7 +8,7 @@
// But we support this case with the "parity_space" parameter in optimize_initial()/optimize_change().
const { SimpleCombinator } = require('./simple_pgs.js');
const LPOptimizer = require('./lp-optimizer.js');
const LPOptimizer = require('./lp_optimizer.js');
const osd_tree = {
ripper5: {

View File

@@ -2,7 +2,7 @@
// License: VNPL-1.1 (see README.md for details)
const { compat } = require('./simple_pgs.js');
const LPOptimizer = require('./lp-optimizer.js');
const LPOptimizer = require('./lp_optimizer.js');
async function run()
{

View File

@@ -2,7 +2,7 @@
// License: VNPL-1.1 (see README.md for details)
const { compat, flatten_tree } = require('./simple_pgs.js');
const LPOptimizer = require('./lp-optimizer.js');
const LPOptimizer = require('./lp_optimizer.js');
const crush_tree = [
{ level: 1, children: [

View File

@@ -2,7 +2,7 @@
// License: VNPL-1.1 (see README.md for details)
const { compat } = require('./simple_pgs.js');
const LPOptimizer = require('./lp-optimizer.js');
const LPOptimizer = require('./lp_optimizer.js');
const osd_tree = {
100: {

View File

@@ -2,7 +2,7 @@
// License: VNPL-1.1 (see README.md for details)
const { compat, flatten_tree } = require('./simple_pgs.js');
const LPOptimizer = require('./lp-optimizer.js');
const LPOptimizer = require('./lp_optimizer.js');
const osd_tree = {
100: {

View File

@@ -23,4 +23,4 @@ for (let i = 2; i < process.argv.length; i++)
}
}
new Mon(options).start().catch(e => { console.error(e); process.exit(1); });
Mon.run_forever(options);

1930
mon/mon.js

File diff suppressed because it is too large Load Diff

215
mon/osd_tree.js Normal file
View File

@@ -0,0 +1,215 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
function get_osd_tree(global_config, state)
{
const levels = global_config.placement_levels||{};
levels.host = levels.host || 100;
levels.osd = levels.osd || 101;
const tree = {};
let up_osds = {};
// This requires monitor system time to be in sync with OSD system times (at least to some extent)
const down_time = Date.now()/1000 - global_config.osd_out_time;
for (const osd_num of Object.keys(state.osd.stats).sort((a, b) => a - b))
{
const stat = state.osd.stats[osd_num];
const osd_cfg = state.config.osd[osd_num];
let reweight = osd_cfg == null ? 1 : Number(osd_cfg.reweight);
if (reweight < 0 || isNaN(reweight))
reweight = 1;
if (stat && stat.size && reweight && (state.osd.state[osd_num] || Number(stat.time) >= down_time ||
osd_cfg && osd_cfg.noout))
{
// Numeric IDs are reserved for OSDs
if (state.osd.state[osd_num] && reweight > 0)
{
// React to down OSDs immediately
up_osds[osd_num] = true;
}
tree[osd_num] = tree[osd_num] || {};
tree[osd_num].id = osd_num;
tree[osd_num].parent = tree[osd_num].parent || stat.host;
tree[osd_num].level = 'osd';
tree[osd_num].size = reweight * stat.size / 1024 / 1024 / 1024 / 1024; // terabytes
if (osd_cfg && osd_cfg.tags)
{
tree[osd_num].tags = (osd_cfg.tags instanceof Array ? [ ...osd_cfg.tags ] : [ osd_cfg.tags ])
.reduce((a, c) => { a[c] = true; return a; }, {});
}
delete tree[osd_num].children;
if (!tree[stat.host])
{
tree[stat.host] = {
id: stat.host,
level: 'host',
parent: null,
children: [],
};
}
}
}
for (const node_id in state.config.node_placement||{})
{
const node_cfg = state.config.node_placement[node_id];
if (/^\d+$/.exec(node_id))
{
node_cfg.level = 'osd';
}
if (!node_id || !node_cfg.level || !levels[node_cfg.level] ||
node_cfg.level === 'osd' && !tree[node_id])
{
// All nodes must have non-empty IDs and valid levels
// OSDs have to actually exist
continue;
}
tree[node_id] = tree[node_id] || {};
tree[node_id].id = node_id;
tree[node_id].level = node_cfg.level;
tree[node_id].parent = node_cfg.parent;
if (node_cfg.level !== 'osd')
{
tree[node_id].children = [];
}
}
return { up_osds, levels, osd_tree: tree };
}
function make_hier_tree(global_config, tree)
{
const levels = global_config.placement_levels||{};
levels.host = levels.host || 100;
levels.osd = levels.osd || 101;
tree = { ...tree };
for (const node_id in tree)
{
tree[node_id] = { ...tree[node_id], children: [] };
}
tree[''] = { children: [] };
for (const node_id in tree)
{
if (node_id === '' || tree[node_id].level === 'osd' && (!tree[node_id].size || tree[node_id].size <= 0))
{
continue;
}
const node_cfg = tree[node_id];
const node_level = levels[node_cfg.level] || node_cfg.level;
let parent_level = node_cfg.parent && tree[node_cfg.parent] && tree[node_cfg.parent].children
&& tree[node_cfg.parent].level;
parent_level = parent_level ? (levels[parent_level] || parent_level) : null;
// Parent's level must be less than child's; OSDs must be leaves
const parent = parent_level && parent_level < node_level ? node_cfg.parent : '';
tree[parent].children.push(tree[node_id]);
}
// Delete empty nodes
let deleted = 0;
do
{
deleted = 0;
for (const node_id in tree)
{
if (tree[node_id].level !== 'osd' && (!tree[node_id].children || !tree[node_id].children.length))
{
const parent = tree[node_id].parent;
if (parent)
{
tree[parent].children = tree[parent].children.filter(c => c != tree[node_id]);
}
deleted++;
delete tree[node_id];
}
}
} while (deleted > 0);
return tree;
}
function filter_osds_by_root_node(global_config, pool_tree, root_node)
{
if (!root_node)
{
return;
}
let hier_tree = make_hier_tree(global_config, pool_tree);
let included = [ ...(hier_tree[root_node] || {}).children||[] ];
for (let i = 0; i < included.length; i++)
{
if (included[i].children)
{
included.splice(i+1, 0, ...included[i].children);
}
}
let cur = pool_tree[root_node] || {};
while (cur && cur.id)
{
included.unshift(cur);
cur = pool_tree[cur.parent||''];
}
included = included.reduce((a, c) => { a[c.id||''] = true; return a; }, {});
for (const item in pool_tree)
{
if (!included[item])
{
delete pool_tree[item];
}
}
}
function filter_osds_by_tags(orig_tree, tags)
{
if (!tags)
{
return;
}
for (const tag of (tags instanceof Array ? tags : [ tags ]))
{
for (const osd in orig_tree)
{
if (orig_tree[osd].level === 'osd' &&
(!orig_tree[osd].tags || !orig_tree[osd].tags[tag]))
{
delete orig_tree[osd];
}
}
}
}
function filter_osds_by_block_layout(orig_tree, osd_stats, block_size, bitmap_granularity, immediate_commit)
{
for (const osd in orig_tree)
{
if (orig_tree[osd].level === 'osd')
{
const osd_stat = osd_stats[osd];
if (osd_stat && (osd_stat.bs_block_size && osd_stat.bs_block_size != block_size ||
osd_stat.bitmap_granularity && osd_stat.bitmap_granularity != bitmap_granularity ||
osd_stat.immediate_commit == 'small' && immediate_commit == 'all' ||
osd_stat.immediate_commit == 'none' && immediate_commit != 'none'))
{
delete orig_tree[osd];
}
}
}
}
function get_affinity_osds(pool_cfg, up_osds, osd_tree)
{
let aff_osds = up_osds;
if (pool_cfg.primary_affinity_tags)
{
aff_osds = Object.keys(up_osds).reduce((a, c) => { a[c] = osd_tree[c]; return a; }, {});
filter_osds_by_tags(aff_osds, pool_cfg.primary_affinity_tags);
for (const osd in aff_osds)
{
aff_osds[osd] = true;
}
}
return aff_osds;
}
module.exports = {
get_osd_tree,
make_hier_tree,
filter_osds_by_root_node,
filter_osds_by_tags,
filter_osds_by_block_layout,
get_affinity_osds,
};

View File

@@ -4,22 +4,21 @@
"description": "Vitastor SDS monitor service",
"main": "mon-main.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
"lint": "eslint *.js lp_optimizer/*.js scripts/*.js"
},
"author": "Vitaliy Filippov",
"license": "UNLICENSED",
"dependencies": {
"antietcd": "^1.0.1",
"sprintf-js": "^1.1.2",
"ws": "^7.2.5"
},
"devDependencies": {
"eslint": "^8.0.0",
"eslint-plugin-import": "^2.29.1",
"eslint-plugin-node": "^11.1.0"
},
"engines": {
"node": ">=12.0.0"
},
"scripts": {
"lint": "eslint *.js"
}
}

267
mon/pg_gen.js Normal file
View File

@@ -0,0 +1,267 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
const { RuleCombinator } = require('./lp_optimizer/dsl_pgs.js');
const { SimpleCombinator, flatten_tree } = require('./lp_optimizer/simple_pgs.js');
const { validate_pool_cfg, get_pg_rules } = require('./pool_config.js');
const LPOptimizer = require('./lp_optimizer/lp_optimizer.js');
const { scale_pg_count } = require('./pg_utils.js');
const { make_hier_tree, filter_osds_by_root_node,
filter_osds_by_tags, filter_osds_by_block_layout, get_affinity_osds } = require('./osd_tree.js');
let seed;
function reset_rng()
{
seed = 0x5f020e43;
}
function rng()
{
seed ^= seed << 13;
seed ^= seed >> 17;
seed ^= seed << 5;
return seed + 2147483648;
}
function pick_primary(pool_config, osd_set, up_osds, aff_osds)
{
let alive_set;
if (pool_config.scheme === 'replicated')
{
// Prefer "affinity" OSDs
alive_set = osd_set.filter(osd_num => osd_num && aff_osds[osd_num]);
if (!alive_set.length)
alive_set = osd_set.filter(osd_num => osd_num && up_osds[osd_num]);
}
else
{
// Prefer data OSDs for EC because they can actually read something without an additional network hop
const pg_data_size = (pool_config.pg_size||0) - (pool_config.parity_chunks||0);
alive_set = osd_set.slice(0, pg_data_size).filter(osd_num => osd_num && aff_osds[osd_num]);
if (!alive_set.length)
alive_set = osd_set.filter(osd_num => osd_num && aff_osds[osd_num]);
if (!alive_set.length)
{
alive_set = osd_set.slice(0, pg_data_size).filter(osd_num => osd_num && up_osds[osd_num]);
if (!alive_set.length)
alive_set = osd_set.filter(osd_num => osd_num && up_osds[osd_num]);
}
}
if (!alive_set.length)
{
return 0;
}
return alive_set[rng() % alive_set.length];
}
function recheck_primary(state, global_config, up_osds, osd_tree)
{
let new_config_pgs;
for (const pool_id in state.config.pools)
{
const pool_cfg = state.config.pools[pool_id];
if (!validate_pool_cfg(pool_id, pool_cfg, global_config.placement_levels, false))
{
continue;
}
const aff_osds = get_affinity_osds(pool_cfg, up_osds, osd_tree);
reset_rng();
for (let pg_num = 1; pg_num <= pool_cfg.pg_count; pg_num++)
{
if (!state.config.pgs.items[pool_id])
{
continue;
}
const pg_cfg = state.config.pgs.items[pool_id][pg_num];
if (pg_cfg)
{
const new_primary = pick_primary(state.config.pools[pool_id], pg_cfg.osd_set, up_osds, aff_osds);
if (pg_cfg.primary != new_primary)
{
if (!new_config_pgs)
{
new_config_pgs = JSON.parse(JSON.stringify(state.config.pgs));
}
console.log(
`Moving pool ${pool_id} (${pool_cfg.name || 'unnamed'}) PG ${pg_num}`+
` primary OSD from ${pg_cfg.primary} to ${new_primary}`
);
new_config_pgs.items[pool_id][pg_num].primary = new_primary;
}
}
}
}
return new_config_pgs;
}
function save_new_pgs_txn(save_to, request, state, etcd_prefix, etcd_watch_revision, pool_id, up_osds, osd_tree, prev_pgs, new_pgs, pg_history)
{
const aff_osds = get_affinity_osds(state.config.pools[pool_id] || {}, up_osds, osd_tree);
const pg_items = {};
reset_rng();
new_pgs.map((osd_set, i) =>
{
osd_set = osd_set.map(osd_num => osd_num === LPOptimizer.NO_OSD ? 0 : osd_num);
pg_items[i+1] = {
osd_set,
primary: pick_primary(state.config.pools[pool_id], osd_set, up_osds, aff_osds),
};
if (prev_pgs[i] && prev_pgs[i].join(' ') != osd_set.join(' ') &&
prev_pgs[i].filter(osd_num => osd_num).length > 0)
{
pg_history[i] = pg_history[i] || {};
pg_history[i].osd_sets = pg_history[i].osd_sets || [];
pg_history[i].osd_sets.push(prev_pgs[i]);
}
if (pg_history[i] && pg_history[i].osd_sets)
{
pg_history[i].osd_sets = Object.values(pg_history[i].osd_sets
.reduce((a, c) => { a[c.join(' ')] = c; return a; }, {}));
}
});
for (let i = 0; i < new_pgs.length || i < prev_pgs.length; i++)
{
// FIXME: etcd has max_txn_ops limit, and it's 128 by default
// Sooo we probably want to change our storage scheme for PG histories...
request.compare.push({
key: b64(etcd_prefix+'/pg/history/'+pool_id+'/'+(i+1)),
target: 'MOD',
mod_revision: ''+etcd_watch_revision,
result: 'LESS',
});
if (pg_history[i])
{
request.success.push({
requestPut: {
key: b64(etcd_prefix+'/pg/history/'+pool_id+'/'+(i+1)),
value: b64(JSON.stringify(pg_history[i])),
},
});
}
else
{
request.success.push({
requestDeleteRange: {
key: b64(etcd_prefix+'/pg/history/'+pool_id+'/'+(i+1)),
},
});
}
}
save_to.items = save_to.items || {};
if (!new_pgs.length)
{
delete save_to.items[pool_id];
}
else
{
save_to.items[pool_id] = pg_items;
}
}
async function generate_pool_pgs(state, global_config, pool_id, osd_tree, levels)
{
const pool_cfg = state.config.pools[pool_id];
if (!validate_pool_cfg(pool_id, pool_cfg, global_config.placement_levels, false))
{
return null;
}
let pool_tree = { ...osd_tree };
filter_osds_by_root_node(global_config, pool_tree, pool_cfg.root_node);
filter_osds_by_tags(pool_tree, pool_cfg.osd_tags);
filter_osds_by_block_layout(
pool_tree,
state.osd.stats,
pool_cfg.block_size || global_config.block_size || 131072,
pool_cfg.bitmap_granularity || global_config.bitmap_granularity || 4096,
pool_cfg.immediate_commit || global_config.immediate_commit || 'none'
);
pool_tree = make_hier_tree(global_config, pool_tree);
// First try last_clean_pgs to minimize data movement
let prev_pgs = [];
for (const pg in ((state.history.last_clean_pgs.items||{})[pool_id]||{}))
{
prev_pgs[pg-1] = [ ...state.history.last_clean_pgs.items[pool_id][pg].osd_set ];
}
if (!prev_pgs.length)
{
// Fall back to config/pgs if it's empty
for (const pg in ((state.config.pgs.items||{})[pool_id]||{}))
{
prev_pgs[pg-1] = [ ...state.config.pgs.items[pool_id][pg].osd_set ];
}
}
const old_pg_count = prev_pgs.length;
const optimize_cfg = {
osd_weights: Object.values(pool_tree).filter(item => item.level === 'osd').reduce((a, c) => { a[c.id] = c.size; return a; }, {}),
combinator: !global_config.use_old_pg_combinator || pool_cfg.level_placement || pool_cfg.raw_placement
// new algorithm:
? new RuleCombinator(pool_tree, get_pg_rules(pool_id, pool_cfg, global_config.placement_levels), pool_cfg.max_osd_combinations)
// old algorithm:
: new SimpleCombinator(flatten_tree(pool_tree[''].children, levels, pool_cfg.failure_domain, 'osd'), pool_cfg.pg_size, pool_cfg.max_osd_combinations),
pg_count: pool_cfg.pg_count,
pg_size: pool_cfg.pg_size,
pg_minsize: pool_cfg.pg_minsize,
ordered: pool_cfg.scheme != 'replicated',
};
let optimize_result;
// Re-shuffle PGs if config/pgs.hash is empty
if (old_pg_count > 0 && state.config.pgs.hash)
{
if (prev_pgs.length != pool_cfg.pg_count)
{
// Scale PG count
// Do it even if old_pg_count is already equal to pool_cfg.pg_count,
// because last_clean_pgs may still contain the old number of PGs
scale_pg_count(prev_pgs, pool_cfg.pg_count);
}
for (const pg of prev_pgs)
{
while (pg.length < pool_cfg.pg_size)
{
pg.push(0);
}
}
optimize_result = await LPOptimizer.optimize_change({
prev_pgs,
...optimize_cfg,
});
}
else
{
optimize_result = await LPOptimizer.optimize_initial(optimize_cfg);
}
console.log(`Pool ${pool_id} (${pool_cfg.name || 'unnamed'}):`);
LPOptimizer.print_change_stats(optimize_result);
let pg_effsize = pool_cfg.pg_size;
for (const pg of optimize_result.int_pgs)
{
const this_pg_size = pg.filter(osd => osd != LPOptimizer.NO_OSD).length;
if (this_pg_size && this_pg_size < pg_effsize)
{
pg_effsize = this_pg_size;
}
}
return {
pool_id,
pgs: optimize_result.int_pgs,
stats: {
total_raw_tb: optimize_result.space,
pg_real_size: pg_effsize || pool_cfg.pg_size,
raw_to_usable: (pg_effsize || pool_cfg.pg_size) / (pool_cfg.scheme === 'replicated'
? 1 : (pool_cfg.pg_size - (pool_cfg.parity_chunks||0))),
space_efficiency: optimize_result.space/(optimize_result.total_space||1),
},
};
}
function b64(str)
{
return Buffer.from(str).toString('base64');
}
module.exports = {
recheck_primary,
save_new_pgs_txn,
generate_pool_pgs,
};

169
mon/pool_config.js Normal file
View File

@@ -0,0 +1,169 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
const { parse_level_indexes, parse_pg_dsl } = require('./lp_optimizer/dsl_pgs.js');
function validate_pool_cfg(pool_id, pool_cfg, placement_levels, warn)
{
pool_cfg.pg_size = Math.floor(pool_cfg.pg_size);
pool_cfg.pg_minsize = Math.floor(pool_cfg.pg_minsize);
pool_cfg.parity_chunks = Math.floor(pool_cfg.parity_chunks) || undefined;
pool_cfg.pg_count = Math.floor(pool_cfg.pg_count);
pool_cfg.max_osd_combinations = Math.floor(pool_cfg.max_osd_combinations) || 10000;
if (!/^[1-9]\d*$/.exec(''+pool_id))
{
if (warn)
console.log('Pool ID '+pool_id+' is invalid');
return false;
}
if (pool_cfg.scheme !== 'xor' && pool_cfg.scheme !== 'replicated' &&
pool_cfg.scheme !== 'ec' && pool_cfg.scheme !== 'jerasure')
{
if (warn)
console.log('Pool '+pool_id+' has invalid coding scheme (one of "xor", "replicated", "ec" and "jerasure" required)');
return false;
}
if (!pool_cfg.pg_size || pool_cfg.pg_size < 1 || pool_cfg.pg_size > 256 ||
pool_cfg.scheme !== 'replicated' && pool_cfg.pg_size < 3)
{
if (warn)
console.log('Pool '+pool_id+' has invalid pg_size');
return false;
}
if (!pool_cfg.pg_minsize || pool_cfg.pg_minsize < 1 || pool_cfg.pg_minsize > pool_cfg.pg_size ||
pool_cfg.scheme === 'xor' && pool_cfg.pg_minsize < (pool_cfg.pg_size - 1))
{
if (warn)
console.log('Pool '+pool_id+' has invalid pg_minsize');
return false;
}
if (pool_cfg.scheme === 'xor' && pool_cfg.parity_chunks != 0 && pool_cfg.parity_chunks != 1)
{
if (warn)
console.log('Pool '+pool_id+' has invalid parity_chunks (must be 1)');
return false;
}
if ((pool_cfg.scheme === 'ec' || pool_cfg.scheme === 'jerasure') &&
(pool_cfg.parity_chunks < 1 || pool_cfg.parity_chunks > pool_cfg.pg_size-2))
{
if (warn)
console.log('Pool '+pool_id+' has invalid parity_chunks (must be between 1 and pg_size-2)');
return false;
}
if (!pool_cfg.pg_count || pool_cfg.pg_count < 1)
{
if (warn)
console.log('Pool '+pool_id+' has invalid pg_count');
return false;
}
if (!pool_cfg.name)
{
if (warn)
console.log('Pool '+pool_id+' has empty name');
return false;
}
if (pool_cfg.max_osd_combinations < 100)
{
if (warn)
console.log('Pool '+pool_id+' has invalid max_osd_combinations (must be at least 100)');
return false;
}
if (pool_cfg.root_node && typeof(pool_cfg.root_node) != 'string')
{
if (warn)
console.log('Pool '+pool_id+' has invalid root_node (must be a string)');
return false;
}
if (pool_cfg.osd_tags && typeof(pool_cfg.osd_tags) != 'string' &&
(!(pool_cfg.osd_tags instanceof Array) || pool_cfg.osd_tags.filter(t => typeof t != 'string').length > 0))
{
if (warn)
console.log('Pool '+pool_id+' has invalid osd_tags (must be a string or array of strings)');
return false;
}
if (pool_cfg.primary_affinity_tags && typeof(pool_cfg.primary_affinity_tags) != 'string' &&
(!(pool_cfg.primary_affinity_tags instanceof Array) || pool_cfg.primary_affinity_tags.filter(t => typeof t != 'string').length > 0))
{
if (warn)
console.log('Pool '+pool_id+' has invalid primary_affinity_tags (must be a string or array of strings)');
return false;
}
if (!get_pg_rules(pool_id, pool_cfg, placement_levels, true))
{
return false;
}
return true;
}
function get_pg_rules(pool_id, pool_cfg, placement_levels, warn)
{
if (pool_cfg.level_placement)
{
const pg_size = (0|pool_cfg.pg_size);
let rules = pool_cfg.level_placement;
if (typeof rules === 'string')
{
rules = rules.split(/\s+/).map(s => s.split(/=/, 2)).reduce((a, c) => { a[c[0]] = c[1]; return a; }, {});
}
else
{
rules = { ...rules };
}
// Always add failure_domain to prevent rules from being totally incorrect
const all_diff = [];
for (let i = 1; i <= pg_size; i++)
{
all_diff.push(i);
}
rules[pool_cfg.failure_domain || 'host'] = all_diff;
placement_levels = placement_levels||{};
placement_levels.host = placement_levels.host || 100;
placement_levels.osd = placement_levels.osd || 101;
for (const k in rules)
{
if (!placement_levels[k] || typeof rules[k] !== 'string' &&
(!(rules[k] instanceof Array) ||
rules[k].filter(s => typeof s !== 'string' && typeof s !== 'number').length > 0))
{
if (warn)
console.log('Pool '+pool_id+' configuration is invalid: level_placement should be { [level]: string | (string|number)[] }');
return null;
}
else if (rules[k].length != pg_size)
{
if (warn)
console.log('Pool '+pool_id+' configuration is invalid: values in level_placement should contain exactly pg_size ('+pg_size+') items');
return null;
}
}
return parse_level_indexes(rules);
}
else if (typeof pool_cfg.raw_placement === 'string')
{
try
{
return parse_pg_dsl(pool_cfg.raw_placement);
}
catch (e)
{
if (warn)
console.log('Pool '+pool_id+' configuration is invalid: invalid raw_placement: '+e.message);
}
}
else
{
let rules = [ [] ];
let prev = [ 1 ];
for (let i = 1; i < pool_cfg.pg_size; i++)
{
rules.push([ [ pool_cfg.failure_domain||'host', '!=', prev ] ]);
prev = [ ...prev, i+1 ];
}
return rules;
}
}
module.exports = {
validate_pool_cfg,
get_pg_rules,
};

286
mon/stats.js Normal file
View File

@@ -0,0 +1,286 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
function derive_osd_stats(st, prev, prev_diff)
{
const diff = { op_stats: {}, subop_stats: {}, recovery_stats: {}, inode_stats: {} };
if (!st || !st.time || !prev || !prev.time || prev.time >= st.time)
{
return prev_diff || diff;
}
const timediff = BigInt(st.time*1000 - prev.time*1000);
for (const op in st.op_stats||{})
{
const pr = prev && prev.op_stats && prev.op_stats[op];
let c = st.op_stats[op];
c = { bytes: BigInt(c.bytes||0), usec: BigInt(c.usec||0), count: BigInt(c.count||0) };
const b = c.bytes - BigInt(pr && pr.bytes||0);
const us = c.usec - BigInt(pr && pr.usec||0);
const n = c.count - BigInt(pr && pr.count||0);
if (n > 0)
diff.op_stats[op] = { ...c, bps: b*1000n/timediff, iops: n*1000n/timediff, lat: us/n };
}
for (const op in st.subop_stats||{})
{
const pr = prev && prev.subop_stats && prev.subop_stats[op];
let c = st.subop_stats[op];
c = { usec: BigInt(c.usec||0), count: BigInt(c.count||0) };
const us = c.usec - BigInt(pr && pr.usec||0);
const n = c.count - BigInt(pr && pr.count||0);
if (n > 0)
diff.subop_stats[op] = { ...c, iops: n*1000n/timediff, lat: us/n };
}
for (const op in st.recovery_stats||{})
{
const pr = prev && prev.recovery_stats && prev.recovery_stats[op];
let c = st.recovery_stats[op];
c = { bytes: BigInt(c.bytes||0), count: BigInt(c.count||0) };
const b = c.bytes - BigInt(pr && pr.bytes||0);
const n = c.count - BigInt(pr && pr.count||0);
if (n > 0)
diff.recovery_stats[op] = { ...c, bps: b*1000n/timediff, iops: n*1000n/timediff };
}
for (const pool_id in st.inode_stats||{})
{
diff.inode_stats[pool_id] = {};
for (const inode_num in st.inode_stats[pool_id])
{
const inode_diff = diff.inode_stats[pool_id][inode_num] = {};
for (const op of [ 'read', 'write', 'delete' ])
{
const c = st.inode_stats[pool_id][inode_num][op];
const pr = prev && prev.inode_stats && prev.inode_stats[pool_id] &&
prev.inode_stats[pool_id][inode_num] && prev.inode_stats[pool_id][inode_num][op];
const n = BigInt(c.count||0) - BigInt(pr && pr.count||0);
inode_diff[op] = {
bps: (BigInt(c.bytes||0) - BigInt(pr && pr.bytes||0))*1000n/timediff,
iops: n*1000n/timediff,
lat: (BigInt(c.usec||0) - BigInt(pr && pr.usec||0))/(n || 1n),
};
}
}
}
return diff;
}
// sum_op_stats(this.state.osd, this.prev_stats)
function sum_op_stats(all_osd, prev_stats)
{
for (const osd in all_osd.stats)
{
const cur = { ...all_osd.stats[osd], inode_stats: all_osd.inodestats[osd]||{} };
prev_stats.osd_diff[osd] = derive_osd_stats(
cur, prev_stats.osd_stats[osd], prev_stats.osd_diff[osd]
);
prev_stats.osd_stats[osd] = cur;
}
const sum_diff = { op_stats: {}, subop_stats: {}, recovery_stats: {} };
// Sum derived values instead of deriving summed
for (const osd in all_osd.state)
{
const derived = prev_stats.osd_diff[osd];
if (!all_osd.state[osd] || !derived)
{
continue;
}
for (const type in sum_diff)
{
for (const op in derived[type]||{})
{
for (const k in derived[type][op])
{
sum_diff[type][op] = sum_diff[type][op] || {};
sum_diff[type][op][k] = (sum_diff[type][op][k] || 0n) + derived[type][op][k];
}
}
}
}
return sum_diff;
}
// sum_object_counts(this.state, this.config)
function sum_object_counts(state, global_config)
{
const object_counts = { object: 0n, clean: 0n, misplaced: 0n, degraded: 0n, incomplete: 0n };
const object_bytes = { object: 0n, clean: 0n, misplaced: 0n, degraded: 0n, incomplete: 0n };
for (const pool_id in state.pg.stats)
{
let object_size = 0;
for (const osd_num of state.pg.stats[pool_id].write_osd_set||[])
{
if (osd_num && state.osd.stats[osd_num] && state.osd.stats[osd_num].block_size)
{
object_size = state.osd.stats[osd_num].block_size;
break;
}
}
const pool_cfg = (state.config.pools[pool_id]||{});
if (!object_size)
{
object_size = pool_cfg.block_size || global_config.block_size || 131072;
}
if (pool_cfg.scheme !== 'replicated')
{
object_size *= ((pool_cfg.pg_size||0) - (pool_cfg.parity_chunks||0));
}
object_size = BigInt(object_size);
for (const pg_num in state.pg.stats[pool_id])
{
const st = state.pg.stats[pool_id][pg_num];
if (st)
{
for (const k in object_counts)
{
if (st[k+'_count'])
{
object_counts[k] += BigInt(st[k+'_count']);
object_bytes[k] += BigInt(st[k+'_count']) * object_size;
}
}
}
}
}
return { object_counts, object_bytes };
}
// sum_inode_stats(this.state, this.prev_stats)
function sum_inode_stats(state, prev_stats)
{
const inode_stats = {};
const inode_stub = () => ({
raw_used: 0n,
read: { count: 0n, usec: 0n, bytes: 0n, bps: 0n, iops: 0n, lat: 0n },
write: { count: 0n, usec: 0n, bytes: 0n, bps: 0n, iops: 0n, lat: 0n },
delete: { count: 0n, usec: 0n, bytes: 0n, bps: 0n, iops: 0n, lat: 0n },
});
const seen_pools = {};
for (const pool_id in state.config.pools)
{
seen_pools[pool_id] = true;
state.pool.stats[pool_id] = state.pool.stats[pool_id] || {};
state.pool.stats[pool_id].used_raw_tb = 0n;
}
for (const osd_num in state.osd.space)
{
for (const pool_id in state.osd.space[osd_num])
{
state.pool.stats[pool_id] = state.pool.stats[pool_id] || {};
if (!seen_pools[pool_id])
{
state.pool.stats[pool_id].used_raw_tb = 0n;
seen_pools[pool_id] = true;
}
inode_stats[pool_id] = inode_stats[pool_id] || {};
for (const inode_num in state.osd.space[osd_num][pool_id])
{
const u = BigInt(state.osd.space[osd_num][pool_id][inode_num]||0);
if (inode_num)
{
inode_stats[pool_id][inode_num] = inode_stats[pool_id][inode_num] || inode_stub();
inode_stats[pool_id][inode_num].raw_used += u;
}
state.pool.stats[pool_id].used_raw_tb += u;
}
}
}
for (const pool_id in seen_pools)
{
const used = state.pool.stats[pool_id].used_raw_tb;
state.pool.stats[pool_id].used_raw_tb = Number(used)/1024/1024/1024/1024;
}
for (const osd_num in state.osd.state)
{
const ist = state.osd.inodestats[osd_num];
if (!ist || !state.osd.state[osd_num])
{
continue;
}
for (const pool_id in ist)
{
inode_stats[pool_id] = inode_stats[pool_id] || {};
for (const inode_num in ist[pool_id])
{
inode_stats[pool_id][inode_num] = inode_stats[pool_id][inode_num] || inode_stub();
for (const op of [ 'read', 'write', 'delete' ])
{
inode_stats[pool_id][inode_num][op].count += BigInt(ist[pool_id][inode_num][op].count||0);
inode_stats[pool_id][inode_num][op].usec += BigInt(ist[pool_id][inode_num][op].usec||0);
inode_stats[pool_id][inode_num][op].bytes += BigInt(ist[pool_id][inode_num][op].bytes||0);
}
}
}
}
for (const osd in state.osd.state)
{
const osd_diff = prev_stats.osd_diff[osd];
if (!osd_diff || !state.osd.state[osd])
{
continue;
}
for (const pool_id in osd_diff.inode_stats)
{
for (const inode_num in prev_stats.osd_diff[osd].inode_stats[pool_id])
{
inode_stats[pool_id][inode_num] = inode_stats[pool_id][inode_num] || inode_stub();
for (const op of [ 'read', 'write', 'delete' ])
{
const op_diff = prev_stats.osd_diff[osd].inode_stats[pool_id][inode_num][op] || {};
const op_st = inode_stats[pool_id][inode_num][op];
op_st.bps += op_diff.bps;
op_st.iops += op_diff.iops;
op_st.lat += op_diff.lat;
op_st.n_osd = (op_st.n_osd || 0) + 1;
}
}
}
}
for (const pool_id in inode_stats)
{
for (const inode_num in inode_stats[pool_id])
{
let nonzero = inode_stats[pool_id][inode_num].raw_used > 0;
for (const op of [ 'read', 'write', 'delete' ])
{
const op_st = inode_stats[pool_id][inode_num][op];
if (op_st.n_osd)
{
op_st.lat /= BigInt(op_st.n_osd);
delete op_st.n_osd;
}
if (op_st.bps > 0 || op_st.iops > 0)
nonzero = true;
}
if (!nonzero && (!state.config.inode[pool_id] || !state.config.inode[pool_id][inode_num]))
{
// Deleted inode (no data, no I/O, no config)
delete inode_stats[pool_id][inode_num];
}
}
}
return { inode_stats, seen_pools };
}
function serialize_bigints(obj)
{
obj = { ...obj };
for (const k in obj)
{
if (typeof obj[k] == 'bigint')
{
obj[k] = ''+obj[k];
}
else if (typeof obj[k] == 'object')
{
obj[k] = serialize_bigints(obj[k]);
}
}
return obj;
}
module.exports = {
derive_osd_stats,
sum_op_stats,
sum_object_counts,
sum_inode_stats,
serialize_bigints,
};

37
mon/utils.js Normal file
View File

@@ -0,0 +1,37 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
const os = require('os');
function local_ips(all)
{
const ips = [];
const ifaces = os.networkInterfaces();
for (const ifname in ifaces)
{
for (const iface of ifaces[ifname])
{
if (iface.family == 'IPv4' && !iface.internal || all)
{
ips.push(iface.address);
}
}
}
return ips;
}
function b64(str)
{
return Buffer.from(str).toString('base64');
}
function de64(str)
{
return Buffer.from(str, 'base64').toString();
}
module.exports = {
b64,
de64,
local_ips,
};

View File

@@ -0,0 +1,48 @@
// AntiEtcd persistence filter for Vitastor
// (c) Vitaliy Filippov, 2024
// License: Mozilla Public License 2.0 or Vitastor Network Public License 1.1
function vitastor_persist_filter(cfg)
{
const prefix = cfg.vitastor_prefix || '/vitastor';
return (key, value) =>
{
if (key.substr(0, prefix.length+'/osd/stats/'.length) == prefix+'/osd/stats/')
{
if (value)
{
try
{
value = JSON.parse(value);
value = JSON.stringify({
bitmap_granularity: value.bitmap_granularity || undefined,
data_block_size: value.data_block_size || undefined,
host: value.host || undefined,
immediate_commit: value.immediate_commit || undefined,
});
}
catch (e)
{
console.error('invalid JSON in '+key+' = '+value+': '+e);
value = {};
}
}
else
{
value = undefined;
}
return value;
}
else if (key.substr(0, prefix.length+'/osd/'.length) == prefix+'/osd/' ||
key.substr(0, prefix.length+'/inode/stats/'.length) == prefix+'/inode/stats/' ||
key.substr(0, prefix.length+'/pg/stats/'.length) == prefix+'/pg/stats/' ||
key.substr(0, prefix.length+'/pool/stats/'.length) == prefix+'/pool/stats/' ||
key == prefix+'/stats')
{
return undefined;
}
return value;
};
}
module.exports = vitastor_persist_filter;

View File

@@ -1,77 +0,0 @@
#include "addon.h"
// Initialize the node addon
NAN_MODULE_INIT(InitAddon)
{
// vitastor.Client
v8::Local<v8::FunctionTemplate> tpl = Nan::New<v8::FunctionTemplate>(NodeVitastor::Create);
tpl->SetClassName(Nan::New("Client").ToLocalChecked());
tpl->InstanceTemplate()->SetInternalFieldCount(1);
Nan::SetPrototypeMethod(tpl, "read", NodeVitastor::Read);
Nan::SetPrototypeMethod(tpl, "write", NodeVitastor::Write);
Nan::SetPrototypeMethod(tpl, "sync", NodeVitastor::Sync);
Nan::SetPrototypeMethod(tpl, "read_bitmap", NodeVitastor::ReadBitmap);
//Nan::SetPrototypeMethod(tpl, "destroy", NodeVitastor::Destroy);
Nan::Set(target, Nan::New("Client").ToLocalChecked(), Nan::GetFunction(tpl).ToLocalChecked());
// vitastor.Image (opened image)
tpl = Nan::New<v8::FunctionTemplate>(NodeVitastorImage::Create);
tpl->SetClassName(Nan::New("Image").ToLocalChecked());
tpl->InstanceTemplate()->SetInternalFieldCount(2);
Nan::SetPrototypeMethod(tpl, "read", NodeVitastorImage::Read);
Nan::SetPrototypeMethod(tpl, "write", NodeVitastorImage::Write);
Nan::SetPrototypeMethod(tpl, "sync", NodeVitastorImage::Sync);
Nan::SetPrototypeMethod(tpl, "get_info", NodeVitastorImage::GetInfo);
Nan::SetPrototypeMethod(tpl, "read_bitmap", NodeVitastorImage::ReadBitmap);
Nan::Set(target, Nan::New("Image").ToLocalChecked(), Nan::GetFunction(tpl).ToLocalChecked());
// vitastor.KV
tpl = Nan::New<v8::FunctionTemplate>(NodeVitastorKV::Create);
tpl->SetClassName(Nan::New("KV").ToLocalChecked());
tpl->InstanceTemplate()->SetInternalFieldCount(1);
Nan::SetPrototypeMethod(tpl, "open", NodeVitastorKV::Open);
Nan::SetPrototypeMethod(tpl, "set_config", NodeVitastorKV::SetConfig);
Nan::SetPrototypeMethod(tpl, "close", NodeVitastorKV::Close);
Nan::SetPrototypeMethod(tpl, "get_size", NodeVitastorKV::GetSize);
Nan::SetPrototypeMethod(tpl, "get", NodeVitastorKV::Get);
Nan::SetPrototypeMethod(tpl, "get_cached", NodeVitastorKV::GetCached);
Nan::SetPrototypeMethod(tpl, "set", NodeVitastorKV::Set);
Nan::SetPrototypeMethod(tpl, "del", NodeVitastorKV::Del);
Nan::SetPrototypeMethod(tpl, "list", NodeVitastorKV::List);
Nan::Set(target, Nan::New("KV").ToLocalChecked(), Nan::GetFunction(tpl).ToLocalChecked());
Nan::Set(target, Nan::New("ENOENT").ToLocalChecked(), Nan::New<v8::Int32>(-ENOENT));
Nan::Set(target, Nan::New("EIO").ToLocalChecked(), Nan::New<v8::Int32>(-EIO));
Nan::Set(target, Nan::New("EINVAL").ToLocalChecked(), Nan::New<v8::Int32>(-EINVAL));
Nan::Set(target, Nan::New("EROFS").ToLocalChecked(), Nan::New<v8::Int32>(-EROFS));
Nan::Set(target, Nan::New("ENOSPC").ToLocalChecked(), Nan::New<v8::Int32>(-ENOSPC));
Nan::Set(target, Nan::New("EINTR").ToLocalChecked(), Nan::New<v8::Int32>(-EINTR));
Nan::Set(target, Nan::New("EILSEQ").ToLocalChecked(), Nan::New<v8::Int32>(-EILSEQ));
Nan::Set(target, Nan::New("ENOTBLK").ToLocalChecked(), Nan::New<v8::Int32>(-ENOTBLK));
Nan::Set(target, Nan::New("ENOSYS").ToLocalChecked(), Nan::New<v8::Int32>(-ENOSYS));
Nan::Set(target, Nan::New("EAGAIN").ToLocalChecked(), Nan::New<v8::Int32>(-EAGAIN));
// Listing handle
tpl = Nan::New<v8::FunctionTemplate>(NodeVitastorKVListing::Create);
tpl->SetClassName(Nan::New("KVListing").ToLocalChecked());
tpl->InstanceTemplate()->SetInternalFieldCount(2);
Nan::SetPrototypeMethod(tpl, "next", NodeVitastorKVListing::Next);
Nan::SetPrototypeMethod(tpl, "close", NodeVitastorKVListing::Close);
Nan::Set(target, Nan::New("KVListing").ToLocalChecked(), Nan::GetFunction(tpl).ToLocalChecked());
NodeVitastorKV::listing_class.Reset(Nan::GetFunction(tpl).ToLocalChecked());
}
NODE_MODULE(addon, (void*)InitAddon)

View File

@@ -1,17 +0,0 @@
#ifndef NODE_VITASTOR_ADDON_H
#define NODE_VITASTOR_ADDON_H
#include <nan.h>
#include <vitastor_c.h>
#include "client.h"
#define ERRORF(format, ...) fprintf(stderr, format "\n", __VA_ARGS__);
#define TRACEF(format, ...) fprintf(stderr, format "\n", __VA_ARGS__);
#define TRACE(msg) fprintf(stderr, "%s\n", msg);
//#define TRACEF(format, arg) ;
//#define TRACE(msg) ;
#endif

View File

@@ -1,20 +0,0 @@
{
'targets': [
{
'target_name': 'addon',
'sources': [
'client.cc',
'addon.cc'
],
'include_dirs': [
'<!(node -e "require(\'nan\')")'
],
'cflags': [
'<!(pkg-config --cflags vitastor)'
],
'libraries': [
'<!(pkg-config --libs vitastor)'
]
}
]
}

View File

@@ -1,850 +0,0 @@
#include "addon.h"
#define NODE_VITASTOR_READ 1
#define NODE_VITASTOR_WRITE 2
#define NODE_VITASTOR_SYNC 3
#define NODE_VITASTOR_READ_BITMAP 4
#define NODE_VITASTOR_GET_INFO 5
#ifndef INODE_POOL
#define INODE_POOL(inode) (uint32_t)((inode) >> (64 - POOL_ID_BITS))
#define INODE_NO_POOL(inode) (uint64_t)((inode) & (((uint64_t)1 << (64-POOL_ID_BITS)) - 1))
#define INODE_WITH_POOL(pool_id, inode) (((uint64_t)(pool_id) << (64-POOL_ID_BITS)) | INODE_NO_POOL(inode))
#endif
class NodeVitastorRequest: public Nan::AsyncResource
{
public:
NodeVitastorRequest(v8::Local<v8::Function> cb): Nan::AsyncResource("NodeVitastorRequest")
{
callback.Reset(cb);
}
iovec iov;
NodeVitastorImage *img = NULL;
int op = 0;
uint64_t offset = 0, len = 0, version = 0;
bool with_parents = false;
Nan::Persistent<v8::Function> callback;
};
//////////////////////////////////////////////////
// NodeVitastor
//////////////////////////////////////////////////
NodeVitastor::NodeVitastor(): Nan::ObjectWrap()
{
TRACE("NodeVitastor: constructor");
poll_watcher.data = this;
}
NodeVitastor::~NodeVitastor()
{
uv_poll_stop(&poll_watcher);
vitastor_c_destroy(c);
c = NULL;
}
NAN_METHOD(NodeVitastor::Create)
{
TRACE("NodeVitastor::Create");
v8::Local<v8::Object> jsParams = info[0].As<v8::Object>();
v8::Local<v8::Array> keys = Nan::GetOwnPropertyNames(jsParams).ToLocalChecked();
std::vector<std::string> cfg;
for (uint32_t i = 0; i < keys->Length(); i++)
{
auto key = Nan::Get(keys, i).ToLocalChecked();
cfg.push_back(std::string(*Nan::Utf8String(key)));
cfg.push_back(std::string(*Nan::Utf8String(Nan::Get(jsParams, key).ToLocalChecked())));
}
const char **c_cfg = new const char*[cfg.size()];
for (size_t i = 0; i < cfg.size(); i++)
{
c_cfg[i] = cfg[i].c_str();
}
NodeVitastor* cli = new NodeVitastor();
cli->c = vitastor_c_create_uring_json(c_cfg, cfg.size());
delete[] c_cfg;
int res = vitastor_c_uring_register_eventfd(cli->c);
if (res >= 0)
{
cli->eventfd = res;
res = uv_poll_init_socket(uv_default_loop(), &cli->poll_watcher, cli->eventfd);
if (res >= 0)
res = uv_poll_start(&cli->poll_watcher, UV_READABLE, on_io_readable);
}
if (res < 0)
{
ERRORF("NodeVitastor: failed to create and register io_uring eventfd in libuv: %s", strerror(-cli->eventfd));
vitastor_c_destroy(cli->c);
cli->c = NULL;
Nan::ThrowError("failed to create and register io_uring eventfd");
return;
}
cli->Wrap(info.This());
info.GetReturnValue().Set(info.This());
}
void NodeVitastor::on_io_readable(uv_poll_t* handle, int status, int revents)
{
TRACEF("NodeVitastor::on_io_readable status/revents %d %d", status, revents);
if (revents & UV_READABLE)
{
NodeVitastor* self = (NodeVitastor*)handle->data;
std::unique_lock<std::mutex> lock(self->mu);
vitastor_c_uring_handle_events(self->c);
}
}
static NodeVitastorRequest* getReadRequest(const Nan::FunctionCallbackInfo<v8::Value> & info, int argpos)
{
uint64_t offset = Nan::To<int64_t>(info[argpos+0]).FromJust();
uint64_t len = Nan::To<int64_t>(info[argpos+1]).FromJust();
uint8_t *buf = (uint8_t*)malloc(len);
if (!buf)
{
Nan::ThrowError("failed to allocate memory");
return NULL;
}
v8::Local<v8::Function> callback = info[argpos+2].As<v8::Function>();
auto req = new NodeVitastorRequest(callback);
req->offset = offset;
req->len = len;
req->iov = { .iov_base = buf, .iov_len = len };
return req;
}
// read(pool, inode, offset, len, callback(err, buffer, version))
NAN_METHOD(NodeVitastor::Read)
{
TRACE("NodeVitastor::Read");
NodeVitastor* self = Nan::ObjectWrap::Unwrap<NodeVitastor>(info.This());
uint64_t pool = Nan::To<int64_t>(info[0]).FromJust();
uint64_t inode = Nan::To<int64_t>(info[1]).FromJust();
auto req = getReadRequest(info, 2);
std::unique_lock<std::mutex> lock(self->mu);
vitastor_c_read(self->c, ((pool << (64-POOL_ID_BITS)) | inode), req->offset, req->len, &req->iov, 1, on_read_finish, req);
}
static NodeVitastorRequest* getWriteRequest(const Nan::FunctionCallbackInfo<v8::Value> & info, int argpos)
{
uint64_t offset = Nan::To<int64_t>(info[argpos+0]).FromJust();
char *buf = node::Buffer::Data(info[argpos+1]);
uint64_t len = node::Buffer::Length(info[argpos+1]);
uint64_t version = 0;
if (!info[argpos+2].IsEmpty() && info[argpos+2]->IsObject())
{
auto key = Nan::New<v8::String>("version").ToLocalChecked();
auto params = info[argpos+2].As<v8::Object>();
auto versionObj = Nan::Get(params, key).ToLocalChecked();
if (!versionObj.IsEmpty())
version = Nan::To<int64_t>(versionObj).FromJust();
argpos++;
}
v8::Local<v8::Function> callback = info[argpos+2].As<v8::Function>();
auto req = new NodeVitastorRequest(callback);
req->offset = offset;
req->len = len;
req->version = version;
req->iov = { .iov_base = buf, .iov_len = req->len };
return req;
}
// write(pool, inode, offset, buffer, { version }?, callback(err))
NAN_METHOD(NodeVitastor::Write)
{
TRACE("NodeVitastor::Write");
NodeVitastor* self = Nan::ObjectWrap::Unwrap<NodeVitastor>(info.This());
uint64_t pool = Nan::To<int64_t>(info[0]).FromJust();
uint64_t inode = Nan::To<int64_t>(info[1]).FromJust();
auto req = getWriteRequest(info, 2);
std::unique_lock<std::mutex> lock(self->mu);
vitastor_c_write(self->c, ((pool << (64-POOL_ID_BITS)) | inode), req->offset, req->len, req->version, &req->iov, 1, on_write_finish, req);
}
// sync(callback(err))
NAN_METHOD(NodeVitastor::Sync)
{
TRACE("NodeVitastor::Sync");
NodeVitastor* self = Nan::ObjectWrap::Unwrap<NodeVitastor>(info.This());
v8::Local<v8::Function> callback = info[0].As<v8::Function>();
auto req = new NodeVitastorRequest(callback);
std::unique_lock<std::mutex> lock(self->mu);
vitastor_c_sync(self->c, on_write_finish, req);
}
// read_bitmap(pool, inode, offset, len, with_parents, callback(err, bitmap_buffer))
NAN_METHOD(NodeVitastor::ReadBitmap)
{
TRACE("NodeVitastor::ReadBitmap");
NodeVitastor* self = Nan::ObjectWrap::Unwrap<NodeVitastor>(info.This());
uint64_t pool = Nan::To<int64_t>(info[0]).FromJust();
uint64_t inode = Nan::To<int64_t>(info[1]).FromJust();
uint64_t offset = Nan::To<int64_t>(info[2]).FromJust();
uint64_t len = Nan::To<int64_t>(info[3]).FromJust();
bool with_parents = Nan::To<bool>(info[4]).FromJust();
v8::Local<v8::Function> callback = info[5].As<v8::Function>();
auto req = new NodeVitastorRequest(callback);
vitastor_c_read_bitmap(self->c, ((pool << (64-POOL_ID_BITS)) | inode), offset, len, with_parents, on_read_bitmap_finish, req);
}
static void on_error(NodeVitastorRequest *req, Nan::Callback & nanCallback, long retval)
{
// Legal errors: EINVAL, EIO, EROFS, ENOSPC, EINTR, ENOENT
v8::Local<v8::Value> args[1];
if (!retval)
args[0] = Nan::Null();
else
args[0] = Nan::New<v8::Int32>((int32_t)retval);
nanCallback.Call(1, args, req);
}
void NodeVitastor::on_read_finish(void *opaque, long retval, uint64_t version)
{
Nan::HandleScope scope;
NodeVitastorRequest *req = (NodeVitastorRequest *)opaque;
Nan::Callback nanCallback(Nan::New(req->callback));
if (retval == -ENOENT)
{
free(req->iov.iov_base);
nanCallback.Call(0, NULL, req);
}
else if (retval < 0)
{
free(req->iov.iov_base);
on_error(req, nanCallback, retval);
}
else
{
v8::Local<v8::Value> args[3];
args[0] = Nan::Null();
args[1] = Nan::NewBuffer((char*)req->iov.iov_base, req->iov.iov_len).ToLocalChecked();
args[2] = v8::BigInt::NewFromUnsigned(v8::Isolate::GetCurrent(), version);
nanCallback.Call(3, args, req);
}
delete req;
}
void NodeVitastor::on_write_finish(void *opaque, long retval)
{
Nan::HandleScope scope;
NodeVitastorRequest *req = (NodeVitastorRequest *)opaque;
Nan::Callback nanCallback(Nan::New(req->callback));
on_error(req, nanCallback, retval);
delete req;
}
void NodeVitastor::on_read_bitmap_finish(void *opaque, long retval, uint8_t *bitmap)
{
Nan::HandleScope scope;
NodeVitastorRequest *req = (NodeVitastorRequest *)opaque;
Nan::Callback nanCallback(Nan::New(req->callback));
if (retval == -ENOENT)
nanCallback.Call(0, NULL, req);
else if (retval < 0)
on_error(req, nanCallback, retval);
else
{
v8::Local<v8::Value> args[2];
args[0] = Nan::Null();
args[1] = Nan::NewBuffer((char*)bitmap, (retval+7)/8).ToLocalChecked();
nanCallback.Call(2, args, req);
}
delete req;
}
//NAN_METHOD(NodeVitastor::Destroy)
//{
// TRACE("NodeVitastor::Destroy");
//}
//////////////////////////////////////////////////
// NodeVitastorImage
//////////////////////////////////////////////////
NAN_METHOD(NodeVitastorImage::Create)
{
TRACE("NodeVitastorImage::Create");
v8::Local<v8::Object> parent = info[0].As<v8::Object>();
std::string name = std::string(*Nan::Utf8String(info[1].As<v8::String>()));
NodeVitastor *cli = Nan::ObjectWrap::Unwrap<NodeVitastor>(parent);
NodeVitastorImage *img = new NodeVitastorImage();
img->cli = cli;
img->name = name;
img->Ref();
cli->Ref();
std::unique_lock<std::mutex> lock(cli->mu);
vitastor_c_watch_inode(cli->c, (char*)img->name.c_str(), on_watch_start, img);
img->Wrap(info.This());
info.GetReturnValue().Set(info.This());
}
NodeVitastorImage::~NodeVitastorImage()
{
if (watch)
{
vitastor_c_close_watch(cli->c, watch);
watch = NULL;
}
cli->Unref();
}
// read(offset, len, callback(err, buffer, version))
NAN_METHOD(NodeVitastorImage::Read)
{
TRACE("NodeVitastorImage::Read");
NodeVitastorImage* img = Nan::ObjectWrap::Unwrap<NodeVitastorImage>(info.This());
auto req = getReadRequest(info, 0);
req->img = img;
req->op = NODE_VITASTOR_READ;
img->exec_or_wait(req);
}
// write(offset, buffer, { version }?, callback(err))
NAN_METHOD(NodeVitastorImage::Write)
{
TRACE("NodeVitastorImage::Write");
NodeVitastorImage* img = Nan::ObjectWrap::Unwrap<NodeVitastorImage>(info.This());
auto req = getWriteRequest(info, 0);
req->img = img;
req->op = NODE_VITASTOR_WRITE;
img->exec_or_wait(req);
}
NAN_METHOD(NodeVitastorImage::Sync)
{
TRACE("NodeVitastorImage::Sync");
NodeVitastorImage* img = Nan::ObjectWrap::Unwrap<NodeVitastorImage>(info.This());
v8::Local<v8::Function> callback = info[0].As<v8::Function>();
auto req = new NodeVitastorRequest(callback);
req->img = img;
req->op = NODE_VITASTOR_SYNC;
img->exec_or_wait(req);
}
// read_bitmap(offset, len, with_parents, callback(err, bitmap_buffer))
NAN_METHOD(NodeVitastorImage::ReadBitmap)
{
TRACE("NodeVitastorImage::ReadBitmap");
NodeVitastorImage* img = Nan::ObjectWrap::Unwrap<NodeVitastorImage>(info.This());
uint64_t offset = Nan::To<int64_t>(info[0]).FromJust();
uint64_t len = Nan::To<int64_t>(info[1]).FromJust();
bool with_parents = Nan::To<bool>(info[2]).FromJust();
v8::Local<v8::Function> callback = info[3].As<v8::Function>();
auto req = new NodeVitastorRequest(callback);
req->img = img;
req->op = NODE_VITASTOR_READ_BITMAP;
req->offset = offset;
req->len = len;
req->with_parents = with_parents;
img->exec_or_wait(req);
}
NAN_METHOD(NodeVitastorImage::GetInfo)
{
TRACE("NodeVitastorImage::Sync");
NodeVitastorImage* img = Nan::ObjectWrap::Unwrap<NodeVitastorImage>(info.This());
v8::Local<v8::Function> callback = info[0].As<v8::Function>();
auto req = new NodeVitastorRequest(callback);
req->img = img;
req->op = NODE_VITASTOR_GET_INFO;
img->exec_or_wait(req);
}
void NodeVitastorImage::exec_or_wait(NodeVitastorRequest *req)
{
if (!watch)
{
// Need to wait for initialisation
on_init.push_back(req);
}
else
{
exec_request(req);
}
}
void NodeVitastorImage::exec_request(NodeVitastorRequest *req)
{
std::unique_lock<std::mutex> lock(cli->mu);
if (req->op == NODE_VITASTOR_READ)
{
uint64_t ino = vitastor_c_inode_get_num(watch);
vitastor_c_read(cli->c, ino, req->offset, req->len, &req->iov, 1, NodeVitastor::on_read_finish, req);
}
else if (req->op == NODE_VITASTOR_WRITE)
{
uint64_t ino = vitastor_c_inode_get_num(watch);
vitastor_c_write(cli->c, ino, req->offset, req->len, req->version, &req->iov, 1, NodeVitastor::on_write_finish, req);
}
else if (req->op == NODE_VITASTOR_SYNC)
{
uint64_t ino = vitastor_c_inode_get_num(watch);
uint32_t imm = vitastor_c_inode_get_immediate_commit(cli->c, ino);
if (imm != IMMEDIATE_ALL)
{
vitastor_c_sync(cli->c, NodeVitastor::on_write_finish, req);
}
else
{
NodeVitastor::on_write_finish(req, 0);
}
}
else if (req->op == NODE_VITASTOR_READ_BITMAP)
{
uint64_t ino = vitastor_c_inode_get_num(watch);
vitastor_c_read_bitmap(cli->c, ino, req->offset, req->len, req->with_parents, NodeVitastor::on_read_bitmap_finish, req);
}
else if (req->op == NODE_VITASTOR_GET_INFO)
{
uint64_t size = vitastor_c_inode_get_size(watch);
uint64_t num = vitastor_c_inode_get_num(watch);
uint32_t block_size = vitastor_c_inode_get_block_size(cli->c, num);
uint32_t bitmap_granularity = vitastor_c_inode_get_bitmap_granularity(cli->c, num);
int readonly = vitastor_c_inode_get_readonly(watch);
uint32_t immediate_commit = vitastor_c_inode_get_immediate_commit(cli->c, num);
uint64_t parent_id = vitastor_c_inode_get_parent_id(watch);
char *meta = vitastor_c_inode_get_meta(watch);
uint64_t mod_revision = vitastor_c_inode_get_mod_revision(watch);
Nan::HandleScope scope;
v8::Local<v8::Object> res = Nan::New<v8::Object>();
Nan::Set(res, Nan::New<v8::String>("pool_id").ToLocalChecked(), Nan::New<v8::Number>(INODE_POOL(num)));
Nan::Set(res, Nan::New<v8::String>("inode_num").ToLocalChecked(), Nan::New<v8::Number>(INODE_NO_POOL(num)));
if (size < ((uint64_t)1<<53))
Nan::Set(res, Nan::New<v8::String>("size").ToLocalChecked(), Nan::New<v8::Number>(size));
else
Nan::Set(res, Nan::New<v8::String>("size").ToLocalChecked(), v8::BigInt::NewFromUnsigned(v8::Isolate::GetCurrent(), size));
if (parent_id)
{
Nan::Set(res, Nan::New<v8::String>("parent_pool_id").ToLocalChecked(), Nan::New<v8::Number>(INODE_POOL(parent_id)));
Nan::Set(res, Nan::New<v8::String>("parent_inode_num").ToLocalChecked(), Nan::New<v8::Number>(INODE_NO_POOL(parent_id)));
}
Nan::Set(res, Nan::New<v8::String>("readonly").ToLocalChecked(), Nan::New((bool)readonly));
if (meta)
{
Nan::JSON nanJSON;
Nan::Set(res, Nan::New<v8::String>("meta").ToLocalChecked(), nanJSON.Parse(Nan::New<v8::String>(meta).ToLocalChecked()).ToLocalChecked());
}
if (mod_revision < ((uint64_t)1<<53))
Nan::Set(res, Nan::New<v8::String>("mod_revision").ToLocalChecked(), Nan::New<v8::Number>(mod_revision));
else
Nan::Set(res, Nan::New<v8::String>("mod_revision").ToLocalChecked(), v8::BigInt::NewFromUnsigned(v8::Isolate::GetCurrent(), mod_revision));
Nan::Set(res, Nan::New<v8::String>("block_size").ToLocalChecked(), Nan::New(block_size));
Nan::Set(res, Nan::New<v8::String>("bitmap_granularity").ToLocalChecked(), Nan::New(bitmap_granularity));
Nan::Set(res, Nan::New<v8::String>("immediate_commit").ToLocalChecked(), Nan::New(immediate_commit));
Nan::Callback nanCallback(Nan::New(req->callback));
v8::Local<v8::Value> args[1];
args[0] = res;
nanCallback.Call(1, args, req);
delete req;
}
}
void NodeVitastorImage::on_watch_start(void *opaque, long retval)
{
NodeVitastorImage *img = (NodeVitastorImage *)opaque;
{
img->watch = (void*)retval;
auto on_init = std::move(img->on_init);
for (auto req: on_init)
{
img->exec_request(req);
}
}
img->Unref();
}
//////////////////////////////////////////////////
// NodeVitastorKV
//////////////////////////////////////////////////
// constructor(node_vitastor)
NAN_METHOD(NodeVitastorKV::Create)
{
TRACE("NodeVitastorKV::Create");
v8::Local<v8::Object> parent = info[0].As<v8::Object>();
NodeVitastor *cli = Nan::ObjectWrap::Unwrap<NodeVitastor>(parent);
NodeVitastorKV *kv = new NodeVitastorKV();
kv->cli = cli;
{
std::unique_lock<std::mutex> lock(cli->mu);
kv->dbw = new vitastorkv_dbw_t((cluster_client_t*)vitastor_c_get_internal_client(cli->c));
}
kv->Wrap(info.This());
info.GetReturnValue().Set(info.This());
}
NodeVitastorKV::~NodeVitastorKV()
{
delete dbw;
}
// open(inode_id, { ...config }, callback(err))
NAN_METHOD(NodeVitastorKV::Open)
{
TRACE("NodeVitastorKV::Open");
NodeVitastorKV* kv = Nan::ObjectWrap::Unwrap<NodeVitastorKV>(info.This());
uint64_t inode_id = Nan::To<int64_t>(info[0]).FromJust();
v8::Local<v8::Object> jsParams = info[1].As<v8::Object>();
v8::Local<v8::Array> keys = Nan::GetOwnPropertyNames(jsParams).ToLocalChecked();
std::map<std::string, std::string> cfg;
for (uint32_t i = 0; i < keys->Length(); i++)
{
auto key = Nan::Get(keys, i).ToLocalChecked();
cfg[std::string(*Nan::Utf8String(key))] = std::string(*Nan::Utf8String(Nan::Get(jsParams, key).ToLocalChecked()));
}
v8::Local<v8::Function> callback = info[2].As<v8::Function>();
auto req = new NodeVitastorRequest(callback);
kv->Ref();
kv->dbw->open(inode_id, cfg, [kv, req](int res)
{
Nan::HandleScope scope;
Nan::Callback nanCallback(Nan::New(req->callback));
v8::Local<v8::Value> args[1];
args[0] = !res ? v8::Local<v8::Value>(Nan::Null()) : v8::Local<v8::Value>(Nan::New<v8::Int32>(res));
nanCallback.Call(1, args, req);
delete req;
kv->Unref();
});
}
// close(callback(err))
NAN_METHOD(NodeVitastorKV::Close)
{
TRACE("NodeVitastorKV::Close");
NodeVitastorKV* kv = Nan::ObjectWrap::Unwrap<NodeVitastorKV>(info.This());
v8::Local<v8::Function> callback = info[0].As<v8::Function>();
auto req = new NodeVitastorRequest(callback);
kv->Ref();
kv->dbw->close([kv, req]()
{
Nan::HandleScope scope;
Nan::Callback nanCallback(Nan::New(req->callback));
nanCallback.Call(0, NULL, req);
delete req;
kv->Unref();
});
}
// set_config({ ...config })
NAN_METHOD(NodeVitastorKV::SetConfig)
{
TRACE("NodeVitastorKV::SetConfig");
NodeVitastorKV* kv = Nan::ObjectWrap::Unwrap<NodeVitastorKV>(info.This());
v8::Local<v8::Object> jsParams = info[0].As<v8::Object>();
v8::Local<v8::Array> keys = Nan::GetOwnPropertyNames(jsParams).ToLocalChecked();
std::map<std::string, std::string> cfg;
for (uint32_t i = 0; i < keys->Length(); i++)
{
auto key = Nan::Get(keys, i).ToLocalChecked();
cfg[std::string(*Nan::Utf8String(key))] = std::string(*Nan::Utf8String(Nan::Get(jsParams, key).ToLocalChecked()));
}
kv->dbw->set_config(cfg);
}
// get_size()
NAN_METHOD(NodeVitastorKV::GetSize)
{
TRACE("NodeVitastorKV::GetSize");
NodeVitastorKV* kv = Nan::ObjectWrap::Unwrap<NodeVitastorKV>(info.This());
auto size = kv->dbw->get_size();
info.GetReturnValue().Set((size < ((uint64_t)1<<53))
? v8::Local<v8::Value>(Nan::New<v8::Number>(size))
: v8::Local<v8::Value>(v8::BigInt::NewFromUnsigned(info.GetIsolate(), size)));
}
void NodeVitastorKV::get_impl(const Nan::FunctionCallbackInfo<v8::Value> & info, bool allow_cache)
{
NodeVitastorKV* kv = Nan::ObjectWrap::Unwrap<NodeVitastorKV>(info.This());
// FIXME: Handle Buffer too
std::string key(*Nan::Utf8String(info[0].As<v8::String>()));
v8::Local<v8::Function> callback = info[1].As<v8::Function>();
auto req = new NodeVitastorRequest(callback);
kv->Ref();
kv->dbw->get(key, [kv, req](int res, const std::string & value)
{
Nan::HandleScope scope;
Nan::Callback nanCallback(Nan::New(req->callback));
v8::Local<v8::Value> args[2];
args[0] = !res ? v8::Local<v8::Value>(Nan::Null()) : v8::Local<v8::Value>(Nan::New<v8::Int32>(res));
args[1] = !res ? v8::Local<v8::Value>(Nan::New<v8::String>(value).ToLocalChecked()) : v8::Local<v8::Value>(Nan::Null());
nanCallback.Call(2, args, req);
delete req;
kv->Unref();
}, allow_cache);
}
// get(key, callback(err, value))
NAN_METHOD(NodeVitastorKV::Get)
{
TRACE("NodeVitastorKV::Get");
get_impl(info, false);
}
// get_cached(key, callback(err, value))
NAN_METHOD(NodeVitastorKV::GetCached)
{
TRACE("NodeVitastorKV::GetCached");
get_impl(info, true);
}
static std::function<bool(int, const std::string &)> make_cas_callback(NodeVitastorRequest *cas_req)
{
return [cas_req](int res, const std::string & value)
{
Nan::HandleScope scope;
Nan::Callback nanCallback(Nan::New(cas_req->callback));
v8::Local<v8::Value> args[1];
args[0] = !res ? v8::Local<v8::Value>(Nan::New<v8::String>(value).ToLocalChecked()) : v8::Local<v8::Value>(Nan::Null());
Nan::MaybeLocal<v8::Value> ret = nanCallback.Call(1, args, cas_req);
if (ret.IsEmpty())
return false;
return Nan::To<bool>(ret.ToLocalChecked()).FromJust();
};
}
// set(key, value, callback(err), cas_compare(old_value))
NAN_METHOD(NodeVitastorKV::Set)
{
TRACE("NodeVitastorKV::Set");
NodeVitastorKV* kv = Nan::ObjectWrap::Unwrap<NodeVitastorKV>(info.This());
// FIXME: Handle Buffer too
std::string key(*Nan::Utf8String(info[0].As<v8::String>()));
std::string value(*Nan::Utf8String(info[1].As<v8::String>()));
v8::Local<v8::Function> callback = info[2].As<v8::Function>();
NodeVitastorRequest *req = new NodeVitastorRequest(callback), *cas_req = NULL;
std::function<bool(int, const std::string &)> cas_cb;
if (info.Length() > 3 && info[3]->IsObject())
{
v8::Local<v8::Function> cas_callback = info[3].As<v8::Function>();
cas_req = new NodeVitastorRequest(cas_callback);
cas_cb = make_cas_callback(cas_req);
}
kv->Ref();
kv->dbw->set(key, value, [kv, req, cas_req](int res)
{
Nan::HandleScope scope;
Nan::Callback nanCallback(Nan::New(req->callback));
v8::Local<v8::Value> args[1];
args[0] = !res ? v8::Local<v8::Value>(Nan::Null()) : v8::Local<v8::Value>(Nan::New<v8::Int32>(res));
nanCallback.Call(1, args, req);
delete req;
if (cas_req)
delete cas_req;
kv->Unref();
}, cas_cb);
}
// del(key, callback(err), cas_compare(old_value))
NAN_METHOD(NodeVitastorKV::Del)
{
TRACE("NodeVitastorKV::Del");
NodeVitastorKV* kv = Nan::ObjectWrap::Unwrap<NodeVitastorKV>(info.This());
// FIXME: Handle Buffer too
std::string key(*Nan::Utf8String(info[0].As<v8::String>()));
v8::Local<v8::Function> callback = info[1].As<v8::Function>();
NodeVitastorRequest *req = new NodeVitastorRequest(callback), *cas_req = NULL;
std::function<bool(int, const std::string &)> cas_cb;
if (info.Length() > 2 && info[2]->IsObject())
{
v8::Local<v8::Function> cas_callback = info[2].As<v8::Function>();
cas_req = new NodeVitastorRequest(cas_callback);
cas_cb = make_cas_callback(cas_req);
}
kv->Ref();
kv->dbw->del(key, [kv, req, cas_req](int res)
{
Nan::HandleScope scope;
Nan::Callback nanCallback(Nan::New(req->callback));
v8::Local<v8::Value> args[1];
args[0] = !res ? v8::Local<v8::Value>(Nan::Null()) : v8::Local<v8::Value>(Nan::New<v8::Int32>(res));
nanCallback.Call(1, args, req);
delete req;
if (cas_req)
delete cas_req;
kv->Unref();
}, cas_cb);
}
// list(start_key?)
NAN_METHOD(NodeVitastorKV::List)
{
TRACE("NodeVitastorKV::List");
v8::Local<v8::Function> cons = Nan::New(listing_class);
v8::Local<v8::Value> args[2];
args[0] = info.This();
int narg = 1;
if (info.Length() > 1 && info[1]->IsString())
{
args[1] = info[1];
narg = 2;
}
info.GetReturnValue().Set(Nan::NewInstance(cons, narg, args).ToLocalChecked());
}
//////////////////////////////////////////////////
// NodeVitastorKVListing
//////////////////////////////////////////////////
// constructor(node_vitastor_kv, start_key?)
NAN_METHOD(NodeVitastorKVListing::Create)
{
TRACE("NodeVitastorKVListing::Create");
v8::Local<v8::Object> parent = info[0].As<v8::Object>();
NodeVitastorKV *kv = Nan::ObjectWrap::Unwrap<NodeVitastorKV>(parent);
std::string start_key;
// FIXME: Handle Buffer too
if (info.Length() > 1 && info[1]->IsString())
{
start_key = std::string(*Nan::Utf8String(info[1].As<v8::String>()));
}
NodeVitastorKVListing *list = new NodeVitastorKVListing();
list->kv = kv;
{
std::unique_lock<std::mutex> lock(kv->cli->mu);
list->handle = list->kv->dbw->list_start(start_key);
}
list->Wrap(info.This());
info.GetReturnValue().Set(info.This());
}
NodeVitastorKVListing::~NodeVitastorKVListing()
{
if (handle)
{
std::unique_lock<std::mutex> lock(kv->cli->mu);
kv->dbw->list_close(handle);
handle = NULL;
}
}
// next(callback(err, value))
NAN_METHOD(NodeVitastorKVListing::Next)
{
TRACE("NodeVitastorKVListing::Next");
NodeVitastorKVListing* list = Nan::ObjectWrap::Unwrap<NodeVitastorKVListing>(info.This());
v8::Local<v8::Function> callback = info[0].As<v8::Function>();
auto req = new NodeVitastorRequest(callback);
if (!list->handle)
{
// Already closed
Nan::Callback nanCallback(Nan::New(req->callback));
v8::Local<v8::Value> args[1];
args[0] = Nan::New<v8::Int32>(-EINVAL);
nanCallback.Call(1, args, req);
delete req;
return;
}
list->kv->Ref();
list->kv->dbw->list_next(list->handle, [list, req](int res, const std::string & key, const std::string & value)
{
Nan::HandleScope scope;
Nan::Callback nanCallback(Nan::New(req->callback));
v8::Local<v8::Value> args[3];
args[0] = Nan::New<v8::Int32>(res);
args[1] = !res ? v8::Local<v8::Value>(Nan::New<v8::String>(key).ToLocalChecked()) : v8::Local<v8::Value>(Nan::Null());
args[2] = !res ? v8::Local<v8::Value>(Nan::New<v8::String>(value).ToLocalChecked()) : v8::Local<v8::Value>(Nan::Null());
nanCallback.Call(3, args, req);
delete req;
list->kv->Unref();
});
}
// close()
NAN_METHOD(NodeVitastorKVListing::Close)
{
TRACE("NodeVitastorKVListing::Close");
NodeVitastorKVListing* list = Nan::ObjectWrap::Unwrap<NodeVitastorKVListing>(info.This());
if (list->handle)
{
std::unique_lock<std::mutex> lock(list->kv->cli->mu);
list->kv->dbw->list_close(list->handle);
list->handle = NULL;
}
}

View File

@@ -1,139 +0,0 @@
#ifndef NODE_VITASTOR_CLIENT_H
#define NODE_VITASTOR_CLIENT_H
#include <mutex>
#include <nan.h>
#include <vitastor_c.h>
#include <vitastor_kv.h>
class NodeVitastorRequest;
class NodeVitastor: public Nan::ObjectWrap
{
public:
// constructor({ ...config })
static NAN_METHOD(Create);
// read(pool, inode, offset, len, callback(err, buffer, version))
static NAN_METHOD(Read);
// write(pool, inode, offset, buffer, { version }?, callback(err))
static NAN_METHOD(Write);
// sync(callback(err))
static NAN_METHOD(Sync);
// read_bitmap(pool, inode, offset, len, with_parents, callback(err, bitmap_buffer))
static NAN_METHOD(ReadBitmap);
// // destroy()
// static NAN_METHOD(Destroy);
~NodeVitastor();
private:
vitastor_c *c = NULL;
int eventfd = -1;
uv_poll_t poll_watcher;
// FIXME: Is it really needed?
std::mutex mu;
NodeVitastor();
static void on_io_readable(uv_poll_t* handle, int status, int revents);
static void on_read_finish(void *opaque, long retval, uint64_t version);
static void on_write_finish(void *opaque, long retval);
static void on_read_bitmap_finish(void *opaque, long retval, uint8_t *bitmap);
friend class NodeVitastorImage;
friend class NodeVitastorKV;
friend class NodeVitastorKVListing;
};
class NodeVitastorImage: public Nan::ObjectWrap
{
public:
// constructor(node_vitastor, name)
static NAN_METHOD(Create);
// read(offset, len, callback(err, buffer, version))
static NAN_METHOD(Read);
// write(offset, buffer, { version }?, callback(err))
static NAN_METHOD(Write);
// sync(callback(err))
static NAN_METHOD(Sync);
// read_bitmap(offset, len, with_parents, callback(err, bitmap_buffer))
static NAN_METHOD(ReadBitmap);
// get_info(callback({ num, name, size, parent_id?, readonly?, meta?, mod_revision, block_size, bitmap_granularity, immediate_commit }))
static NAN_METHOD(GetInfo);
~NodeVitastorImage();
private:
NodeVitastor *cli = NULL;
std::string name;
void *watch = NULL;
std::vector<NodeVitastorRequest*> on_init;
Nan::Persistent<v8::Object> cliObj;
NodeVitastorImage();
static void on_watch_start(void *opaque, long retval);
void exec_request(NodeVitastorRequest *req);
void exec_or_wait(NodeVitastorRequest *req);
};
class NodeVitastorKV: public Nan::ObjectWrap
{
public:
// constructor(node_vitastor)
static NAN_METHOD(Create);
// open(inode_id, { ...config }, callback(err))
static NAN_METHOD(Open);
// set_config({ ...config })
static NAN_METHOD(SetConfig);
// close(callback())
static NAN_METHOD(Close);
// get_size()
static NAN_METHOD(GetSize);
// get(key, callback(err, value))
static NAN_METHOD(Get);
// get_cached(key, callback(err, value))
static NAN_METHOD(GetCached);
// set(key, value, callback(err), cas_compare(old_value))
static NAN_METHOD(Set);
// del(key, callback(err), cas_compare(old_value))
static NAN_METHOD(Del);
// list(start_key?)
static NAN_METHOD(List);
~NodeVitastorKV();
static Nan::Persistent<v8::Function> listing_class;
private:
NodeVitastor *cli = NULL;
vitastorkv_dbw_t *dbw = NULL;
NodeVitastorKV();
static void get_impl(const Nan::FunctionCallbackInfo<v8::Value> & info, bool allow_cache);
friend class NodeVitastorKVListing;
};
class NodeVitastorKVListing: public Nan::ObjectWrap
{
public:
// constructor(node_vitastor_kv, start_key?)
static NAN_METHOD(Create);
// next(callback(err, value))
static NAN_METHOD(Next);
// close()
static NAN_METHOD(Close);
~NodeVitastorKVListing();
private:
NodeVitastorKV *kv = NULL;
void *handle = NULL;
NodeVitastorKVListing();
};
#endif

View File

@@ -1,24 +0,0 @@
{
"name": "vitastor",
"version": "1.7.0",
"description": "Low-level native bindings to Vitastor client library",
"main": "index.js",
"keywords": [
"storage",
"sds",
"vitastor"
],
"repository": {
"type": "git",
"url": "git://git.yourcmc.ru/vitalif/vitastor.git"
},
"scripts": {
"build": "node-gyp rebuild"
},
"author": "Vitaliy Filippov",
"license": "GPL-2.0-or-later",
"dependencies": {
"bindings": "1.5.0",
"nan": "^2.19.0"
}
}

View File

@@ -108,10 +108,11 @@ npm install --production
cd ..
mkdir -p %buildroot/usr/lib/vitastor
cp -r mon %buildroot/usr/lib/vitastor
mv %buildroot/usr/lib/vitastor/mon/scripts/make-etcd %buildroot/usr/lib/vitastor/mon/
mkdir -p %buildroot/lib/systemd/system
cp mon/vitastor.target mon/vitastor-mon.service mon/vitastor-osd@.service %buildroot/lib/systemd/system
cp mon/scripts/vitastor.target mon/scripts/vitastor-mon.service mon/scripts/vitastor-osd@.service %buildroot/lib/systemd/system
mkdir -p %buildroot/lib/udev/rules.d
cp mon/90-vitastor.rules %buildroot/lib/udev/rules.d
cp mon/scripts/90-vitastor.rules %buildroot/lib/udev/rules.d
%files

View File

@@ -105,10 +105,11 @@ npm install --production
cd ..
mkdir -p %buildroot/usr/lib/vitastor
cp -r mon %buildroot/usr/lib/vitastor
mv %buildroot/usr/lib/vitastor/mon/scripts/make-etcd %buildroot/usr/lib/vitastor/mon/
mkdir -p %buildroot/lib/systemd/system
cp mon/vitastor.target mon/vitastor-mon.service mon/vitastor-osd@.service %buildroot/lib/systemd/system
cp mon/scripts/vitastor.target mon/scripts/vitastor-mon.service mon/scripts/vitastor-osd@.service %buildroot/lib/systemd/system
mkdir -p %buildroot/lib/udev/rules.d
cp mon/90-vitastor.rules %buildroot/lib/udev/rules.d
cp mon/scripts/90-vitastor.rules %buildroot/lib/udev/rules.d
%files

View File

@@ -98,10 +98,11 @@ npm install --production
cd ..
mkdir -p %buildroot/usr/lib/vitastor
cp -r mon %buildroot/usr/lib/vitastor
mv %buildroot/usr/lib/vitastor/mon/scripts/make-etcd %buildroot/usr/lib/vitastor/mon/
mkdir -p %buildroot/lib/systemd/system
cp mon/vitastor.target mon/vitastor-mon.service mon/vitastor-osd@.service %buildroot/lib/systemd/system
cp mon/scripts/vitastor.target mon/scripts/vitastor-mon.service mon/scripts/vitastor-osd@.service %buildroot/lib/systemd/system
mkdir -p %buildroot/lib/udev/rules.d
cp mon/90-vitastor.rules %buildroot/lib/udev/rules.d
cp mon/scripts/90-vitastor.rules %buildroot/lib/udev/rules.d
%files

View File

@@ -253,7 +253,7 @@ void etcd_state_client_t::parse_config(const json11::Json & config)
this->etcd_ws_keepalive_interval = config["etcd_ws_keepalive_interval"].uint64_value();
if (this->etcd_ws_keepalive_interval <= 0)
{
this->etcd_ws_keepalive_interval = 30;
this->etcd_ws_keepalive_interval = 5;
}
this->max_etcd_attempts = config["max_etcd_attempts"].uint64_value();
if (this->max_etcd_attempts <= 0)

View File

@@ -103,7 +103,7 @@ protected:
void pick_next_etcd();
public:
int etcd_keepalive_timeout = 30;
int etcd_ws_keepalive_interval = 30;
int etcd_ws_keepalive_interval = 5;
int max_etcd_attempts = 5;
int etcd_quick_timeout = 1000;
int etcd_slow_timeout = 5000;

View File

@@ -384,28 +384,6 @@ int vitastor_c_inode_get_readonly(void *handle)
return watch->cfg.readonly;
}
uint64_t vitastor_c_inode_get_parent_id(void *handle)
{
inode_watch_t *watch = (inode_watch_t*)handle;
return watch->cfg.parent_id;
}
char* vitastor_c_inode_get_meta(void *handle)
{
inode_watch_t *watch = (inode_watch_t*)handle;
if (watch->cfg.meta.is_null())
{
return NULL;
}
return strdup(watch->cfg.meta.dump().c_str());
}
uint64_t vitastor_c_inode_get_mod_revision(void *handle)
{
inode_watch_t *watch = (inode_watch_t*)handle;
return watch->cfg.mod_revision;
}
uint32_t vitastor_c_inode_get_immediate_commit(vitastor_c *client, uint64_t inode_num)
{
auto pool_it = client->cli->st_cli.pool_config.find(INODE_POOL(inode_num));

View File

@@ -69,9 +69,6 @@ void vitastor_c_watch_inode(vitastor_c *client, char *image, VitastorIOHandler c
void vitastor_c_close_watch(vitastor_c *client, void *handle);
uint64_t vitastor_c_inode_get_size(void *handle);
uint64_t vitastor_c_inode_get_num(void *handle);
uint64_t vitastor_c_inode_get_parent_id(void *handle);
char* vitastor_c_inode_get_meta(void *handle);
uint64_t vitastor_c_inode_get_mod_revision(void *handle);
uint32_t vitastor_c_inode_get_block_size(vitastor_c *client, uint64_t inode_num);
uint32_t vitastor_c_inode_get_bitmap_granularity(vitastor_c *client, uint64_t inode_num);
int vitastor_c_inode_get_readonly(void *handle);

View File

@@ -25,7 +25,7 @@ public:
std::map<std::string, std::string> cfg;
std::vector<std::string> cli_cmd;
vitastorkv_dbw_t *db = NULL;
kv_dbw_t *db = NULL;
ring_loop_t *ringloop = NULL;
epoll_manager_t *epmgr = NULL;
cluster_client_t *cli = NULL;
@@ -144,7 +144,7 @@ void kv_cli_t::run()
ringloop = new ring_loop_t(512);
epmgr = new epoll_manager_t(ringloop);
cli = new cluster_client_t(ringloop, epmgr->tfd, cfg);
db = new vitastorkv_dbw_t(cli);
db = new kv_dbw_t(cli);
// Load image metadata
while (!cli->is_ready())
{
@@ -289,7 +289,7 @@ void kv_cli_t::next_cmd()
struct kv_cli_list_t
{
vitastorkv_dbw_t *db = NULL;
kv_dbw_t *db = NULL;
void *handle = NULL;
int format = 0;
int n = 0;

View File

@@ -501,7 +501,7 @@ void kv_block_t::dump(int base_level)
void kv_db_t::open(inode_t inode_id, json11::Json cfg, std::function<void(int)> cb)
{
if (block_cache.size() > 0 || inode_id)
if (block_cache.size() > 0)
{
cb(-EINVAL);
return;
@@ -1958,38 +1958,38 @@ void kv_op_t::next_go_up()
}
}
vitastorkv_dbw_t::vitastorkv_dbw_t(cluster_client_t *cli)
kv_dbw_t::kv_dbw_t(cluster_client_t *cli)
{
db = new kv_db_t();
db->cli = cli;
}
vitastorkv_dbw_t::~vitastorkv_dbw_t()
kv_dbw_t::~kv_dbw_t()
{
delete db;
}
void vitastorkv_dbw_t::open(uint64_t inode_id, std::map<std::string, std::string> cfg, std::function<void(int)> cb)
void kv_dbw_t::open(uint64_t inode_id, std::map<std::string, std::string> cfg, std::function<void(int)> cb)
{
db->open(inode_id, cfg, cb);
}
void vitastorkv_dbw_t::set_config(std::map<std::string, std::string> cfg)
void kv_dbw_t::set_config(std::map<std::string, std::string> cfg)
{
db->set_config(cfg);
}
uint64_t vitastorkv_dbw_t::get_size()
uint64_t kv_dbw_t::get_size()
{
return db->next_free;
}
void vitastorkv_dbw_t::close(std::function<void()> cb)
void kv_dbw_t::close(std::function<void()> cb)
{
db->close(cb);
}
void vitastorkv_dbw_t::get(const std::string & key, std::function<void(int res, const std::string & value)> cb, bool cached)
void kv_dbw_t::get(const std::string & key, std::function<void(int res, const std::string & value)> cb, bool cached)
{
auto *op = new kv_op_t;
op->db = db;
@@ -2003,7 +2003,7 @@ void vitastorkv_dbw_t::get(const std::string & key, std::function<void(int res,
op->exec();
}
void vitastorkv_dbw_t::set(const std::string & key, const std::string & value, std::function<void(int res)> cb,
void kv_dbw_t::set(const std::string & key, const std::string & value, std::function<void(int res)> cb,
std::function<bool(int res, const std::string & value)> cas_compare)
{
auto *op = new kv_op_t;
@@ -2023,7 +2023,7 @@ void vitastorkv_dbw_t::set(const std::string & key, const std::string & value, s
op->exec();
}
void vitastorkv_dbw_t::del(const std::string & key, std::function<void(int res)> cb,
void kv_dbw_t::del(const std::string & key, std::function<void(int res)> cb,
std::function<bool(int res, const std::string & value)> cas_compare)
{
auto *op = new kv_op_t;
@@ -2042,7 +2042,7 @@ void vitastorkv_dbw_t::del(const std::string & key, std::function<void(int res)>
op->exec();
}
void* vitastorkv_dbw_t::list_start(const std::string & start)
void* kv_dbw_t::list_start(const std::string & start)
{
if (!db->inode_id || db->closing)
return NULL;
@@ -2055,7 +2055,7 @@ void* vitastorkv_dbw_t::list_start(const std::string & start)
return op;
}
void vitastorkv_dbw_t::list_next(void *handle, std::function<void(int res, const std::string & key, const std::string & value)> cb)
void kv_dbw_t::list_next(void *handle, std::function<void(int res, const std::string & key, const std::string & value)> cb)
{
kv_op_t *op = (kv_op_t*)handle;
if (cb)
@@ -2068,7 +2068,7 @@ void vitastorkv_dbw_t::list_next(void *handle, std::function<void(int res, const
op->next();
}
void vitastorkv_dbw_t::list_close(void *handle)
void kv_dbw_t::list_close(void *handle)
{
kv_op_t *op = (kv_op_t*)handle;
delete op;

View File

@@ -73,7 +73,7 @@ public:
timespec prev_stat_time, start_stat_time;
// State
vitastorkv_dbw_t *db = NULL;
kv_dbw_t *db = NULL;
ring_loop_t *ringloop = NULL;
epoll_manager_t *epmgr = NULL;
cluster_client_t *cli = NULL;
@@ -272,7 +272,7 @@ void kv_test_t::run(json11::Json cfg)
ringloop = new ring_loop_t(512);
epmgr = new epoll_manager_t(ringloop);
cli = new cluster_client_t(ringloop, epmgr->tfd, cfg);
db = new vitastorkv_dbw_t(cli);
db = new kv_dbw_t(cli);
// Load image metadata
while (!cli->is_ready())
{

View File

@@ -19,11 +19,11 @@ class cluster_client_t;
struct kv_db_t;
struct vitastorkv_dbw_t
struct kv_dbw_t
{
// cli = vitastor_c_get_internal_client(client)
vitastorkv_dbw_t(cluster_client_t *cli);
~vitastorkv_dbw_t();
kv_dbw_t(cluster_client_t *cli);
~kv_dbw_t();
void open(uint64_t inode_id, std::map<std::string, std::string> cfg, std::function<void(int)> cb);
void set_config(std::map<std::string, std::string> cfg);

View File

@@ -255,7 +255,7 @@ void kv_fs_state_t::init(nfs_proxy_t *proxy, json11::Json cfg)
// Open DB and wait
int open_res = 0;
bool open_done = false;
proxy->db = new vitastorkv_dbw_t(proxy->cli);
proxy->db = new kv_dbw_t(proxy->cli);
std::map<std::string, std::string> kv_cfg;
for (auto & kv: cfg.object_items())
{

View File

@@ -51,7 +51,7 @@ public:
epoll_manager_t *epmgr = NULL;
cluster_client_t *cli = NULL;
cli_tool_t *cmd = NULL;
vitastorkv_dbw_t *db = NULL;
kv_dbw_t *db = NULL;
kv_fs_state_t *kvfs = NULL;
block_fs_state_t *blockfs = NULL;

View File

@@ -23,7 +23,7 @@ trap 'kill -9 $(jobs -p)' EXIT
ETCD=${ETCD:-etcd}
ETCD_IP=${ETCD_IP:-127.0.0.1}
ETCD_PORT=${ETCD_PORT:-12379}
ETCD_COUNT=${ETCD_COUNT:-1}
ETCD_COUNT=${ETCD_COUNT:-0}
if [ "$KEEP_DATA" = "" ]; then
rm -rf ./testdata
@@ -32,12 +32,9 @@ if [ "$KEEP_DATA" = "" ]; then
fi
ETCD_URL="http://$ETCD_IP:$ETCD_PORT"
ETCD_CLUSTER="etcd1=http://$ETCD_IP:$((ETCD_PORT+1))"
for i in $(seq 2 $ETCD_COUNT); do
ETCD_URL="$ETCD_URL,http://$ETCD_IP:$((ETCD_PORT+2*i-2))"
ETCD_CLUSTER="$ETCD_CLUSTER,etcd$i=http://$ETCD_IP:$((ETCD_PORT+2*i-1))"
done
ETCDCTL="${ETCD}ctl --endpoints=$ETCD_URL --dial-timeout=5s --command-timeout=10s"
start_etcd()
{
@@ -53,15 +50,43 @@ start_etcd()
eval ETCD${i}_PID=$!
}
for i in $(seq 1 $ETCD_COUNT); do
start_etcd $i
done
for i in {1..30}; do
${ETCD}ctl --endpoints=$ETCD_URL --dial-timeout=1s --command-timeout=1s member list >/dev/null && break
if [[ $i = 30 ]]; then
format_error "Failed to start etcd"
fi
done
start_etcd_cluster()
{
ETCD_CLUSTER="etcd1=http://$ETCD_IP:$((ETCD_PORT+1))"
for i in $(seq 2 $ETCD_COUNT); do
ETCD_CLUSTER="$ETCD_CLUSTER,etcd$i=http://$ETCD_IP:$((ETCD_PORT+2*i-1))"
done
for i in $(seq 1 $ETCD_COUNT); do
start_etcd $i
done
ETCDCTL="${ETCD}ctl --endpoints=$ETCD_URL --dial-timeout=5s --command-timeout=10s"
for i in {1..30}; do
${ETCD}ctl --endpoints=$ETCD_URL --dial-timeout=1s --command-timeout=1s member list >/dev/null && break
if [[ $i = 30 ]]; then
format_error "Failed to start etcd"
fi
done
}
wait_etcd()
{
for i in {1..30}; do
$ETCDCTL --dial-timeout=1s --command-timeout=1s get --prefix / && break
if [[ $i = 30 ]]; then
format_error "Failed to start etcd"
fi
sleep 1
done
}
if [[ "$ETCD_COUNT" -lt 1 ]]; then
ETCDCTL="node mon/node_modules/.bin/anticli -e $ETCD_URL"
MON_PARAMS="--use_antietcd 1 --antietcd_data_dir ./testdata --antietcd_persist_interval 500"
else
ETCDCTL="${ETCD}ctl --endpoints=$ETCD_URL --dial-timeout=5s --command-timeout=10s"
MON_PARAMS=""
start_etcd_cluster
fi
echo leak:fio >> testdata/lsan-suppress.txt
echo leak:tcmalloc >> testdata/lsan-suppress.txt

View File

@@ -18,6 +18,11 @@ else
OSD_COUNT=${OSD_COUNT:-3}
fi
node mon/mon-main.js $MON_PARAMS --etcd_address $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 >>./testdata/mon.log 2>&1 &
MON_PID=$!
sleep 3
if [ "$IMMEDIATE_COMMIT" != "" ]; then
NO_SAME="--journal_no_same_sector_overwrites true --journal_sector_buffer_count 1024 --disable_data_fsync 1 --immediate_commit all --log_level 10 --etcd_stats_interval 5"
$ETCDCTL put /vitastor/config/global '{"recovery_queue_depth":1,"recovery_tune_util_low":1,"immediate_commit":"all","client_enable_writeback":true,"client_max_writeback_iodepth":32'$GLOBAL_CONFIG'}'
@@ -54,9 +59,6 @@ for i in $(seq 1 $OSD_COUNT); do
start_osd $i
done
(while true; do set +e; node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" --verbose 1; if [[ $? -ne 2 ]]; then break; fi; done) >>./testdata/mon.log 2>&1 &
MON_PID=$!
if [ "$SCHEME" = "ec" ]; then
PG_SIZE=${PG_SIZE:-5}
PG_MINSIZE=${PG_MINSIZE:-4}

View File

@@ -2,6 +2,10 @@
. `dirname $0`/common.sh
node mon/mon-main.js $MON_PARAMS --etcd_address $ETCD_URL --etcd_prefix "/vitastor" >>./testdata/mon.log 2>&1 &
MON_PID=$!
wait_etcd
TIME=$(date '+%s')
$ETCDCTL put /vitastor/config/global '{"placement_levels":{"rack":1,"host":2,"osd":3}}'
$ETCDCTL put /vitastor/config/node_placement '{"rack1":{"level":"rack"},"rack2":{"level":"rack"},"host1":{"level":"host","parent":"rack1"},"host2":{"level":"host","parent":"rack1"},"host3":{"level":"host","parent":"rack2"},"host4":{"level":"host","parent":"rack2"}}'
@@ -22,12 +26,9 @@ $ETCDCTL get --print-value-only /vitastor/config/pools | jq -s -e '. == [{}]'
build/src/cmd/vitastor-cli --etcd_address $ETCD_URL create-pool testpool -s 2 -n 4 --failure_domain rack --force
$ETCDCTL get --print-value-only /vitastor/config/pools | jq -s -e '. == [{"1":{"name":"testpool","scheme":"replicated","pg_size":2,"pg_minsize":1,"pg_count":4,"failure_domain":"rack"}}]'
node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" >>./testdata/mon.log 2>&1 &
MON_PID=$!
sleep 2
etcdctl --endpoints=http://localhost:12379 get --prefix /vitastor/config/pgs --print-value-only | \
$ETCDCTL get --prefix /vitastor/config/pgs --print-value-only | \
jq -s -e '([ .[0].items["1"] | .[].osd_set | map_values(. | tonumber) | select((.[0] <= 4) != (.[1] <= 4)) ] | length) == 4'
format_green OK

View File

@@ -1,5 +1,7 @@
#!/bin/bash -ex
ETCD_COUNT=1
. `dirname $0`/common.sh
OSD_SIZE=1024

View File

@@ -2,6 +2,10 @@
. `dirname $0`/common.sh
node mon/mon-main.js $MON_PARAMS --etcd_address $ETCD_URL --etcd_prefix "/vitastor" >>./testdata/mon.log 2>&1 &
MON_PID=$!
wait_etcd
TIME=$(date '+%s')
$ETCDCTL put /vitastor/config/osd/1 '{"tags":["a"]}'
$ETCDCTL put /vitastor/config/osd/2 '{"tags":["a"]}'
@@ -21,15 +25,12 @@ $ETCDCTL put /vitastor/osd/stats/7 '{"host":"stor4","size":1073741824,"time":"'$
$ETCDCTL put /vitastor/osd/stats/8 '{"host":"stor4","size":1073741824,"time":"'$TIME'"}'
$ETCDCTL put /vitastor/config/pools '{"1":{"name":"testpool","scheme":"replicated","pg_size":2,"pg_minsize":2,"pg_count":16,"failure_domain":"host","osd_tags":["a"]}}'
node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" >>./testdata/mon.log 2>&1 &
MON_PID=$!
sleep 2
etcdctl --endpoints=http://localhost:12379 get --prefix /vitastor/config/pgs --print-value-only
$ETCDCTL get --prefix /vitastor/config/pgs --print-value-only
if ! (etcdctl --endpoints=http://localhost:12379 get --prefix /vitastor/config/pgs --print-value-only | \
jq -s -e '[ [ .[0].items["1"] | .[].osd_set | map(. | select(. != "" and (.|tonumber) < 5)) ][] | select((. | length) == 2) ] | length == 16'); then
if ! ($ETCDCTL get --prefix /vitastor/config/pgs --print-value-only | \
jq -s -e '[ [ .[] | select(has("items")) | .items["1"] | .[].osd_set | map(. | select(. != "" and (.|tonumber) < 5)) ][] | select((. | length) == 2) ] | length == 16'); then
format_error "Some PGs missing replicas"
fi

View File

@@ -2,6 +2,10 @@
. `dirname $0`/common.sh
node mon/mon-main.js $MON_PARAMS --etcd_address $ETCD_URL --etcd_prefix "/vitastor" >>./testdata/mon.log 2>&1 &
MON_PID=$!
wait_etcd
TIME=$(date '+%s')
$ETCDCTL put /vitastor/config/global '{"placement_levels":{"rack":100,"host":101,"osd":102}}'
$ETCDCTL put /vitastor/config/node_placement '{"rack1":{"level":"rack"},"rack2":{"level":"rack"},"stor1":{"level":"host","parent":"rack1"},"stor2":{"level":"host","parent":"rack1"},"stor3":{"level":"host","parent":"rack2"},"stor4":{"level":"host","parent":"rack2"}}'
@@ -15,14 +19,11 @@ $ETCDCTL put /vitastor/osd/stats/7 '{"host":"stor4","size":1073741824,"time":"'$
$ETCDCTL put /vitastor/osd/stats/8 '{"host":"stor4","size":1073741824,"time":"'$TIME'"}'
$ETCDCTL put /vitastor/config/pools '{"1":{"name":"testpool","scheme":"replicated","pg_size":2,"pg_minsize":2,"pg_count":16,"failure_domain":"host","root_node":"rack1"}}'
node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" >>./testdata/mon.log 2>&1 &
MON_PID=$!
sleep 2
etcdctl --endpoints=http://localhost:12379 get --prefix /vitastor/config/pgs --print-value-only
$ETCDCTL get --prefix /vitastor/config/pgs --print-value-only
if ! (etcdctl --endpoints=http://localhost:12379 get --prefix /vitastor/config/pgs --print-value-only | \
if ! ($ETCDCTL get --prefix /vitastor/config/pgs --print-value-only | \
jq -s -e '[ [ .[0].items["1"] | .[].osd_set | map(. | select(. != "" and (.|tonumber) < 5)) ][] | select((. | length) == 2) ] | length == 16'); then
format_error "Some PGs missing replicas"
fi

View File

@@ -3,9 +3,13 @@
export KEEP_DATA=1
. `dirname $0`/common.sh
etcdctl --endpoints=http://127.0.0.1:12379/v3 del --prefix /vitastor/mon/master
etcdctl --endpoints=http://127.0.0.1:12379/v3 del --prefix /vitastor/pg/state
etcdctl --endpoints=http://127.0.0.1:12379/v3 del --prefix /vitastor/osd/state
node mon/mon-main.js $MON_PARAMS --etcd_address $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 >>./testdata/mon.log 2>&1 &
MON_PID=$!
wait_etcd
$ETCDCTL del --prefix /vitastor/mon/master
$ETCDCTL del --prefix /vitastor/pg/state
$ETCDCTL del --prefix /vitastor/osd/state
OSD_COUNT=3
OSD_ARGS="$OSD_ARGS"
@@ -15,9 +19,6 @@ for i in $(seq 1 $OSD_COUNT); do
eval OSD${i}_PID=$!
done
(while true; do node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 || true; done) >>./testdata/mon.log 2>&1 &
MON_PID=$!
sleep 3
if ! ($ETCDCTL get /vitastor/pg/state/1/1 --print-value-only | jq -s -e '(. | length) != 0 and .[0].state == ["active"]'); then