1
0
Fork 0

Compare commits

...

2 Commits

13 changed files with 365 additions and 79 deletions

191
mon/antietcd_adapter.js Normal file
View File

@ -0,0 +1,191 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
const fs = require('fs');
const AntiEtcd = require('antietcd');
const vitastor_persist_filter = require('./vitastor_persist_filter.js');
const { b64, local_ips } = require('./utils.js');
class AntiEtcdAdapter
{
static start_antietcd(config)
{
let antietcd;
if (config.use_antietcd)
{
let fileConfig = {};
if (fs.existsSync(config.config_path||'/etc/vitastor/vitastor.conf'))
{
fileConfig = JSON.parse(fs.readFileSync(config.config_path||'/etc/vitastor/vitastor.conf', { encoding: 'utf-8' }));
}
let mergedConfig = { ...fileConfig, ...config };
let cluster = mergedConfig.etcd_address;
if (!(cluster instanceof Array))
cluster = cluster ? (''+(cluster||'')).split(/,+/) : [];
cluster = Object.keys(cluster.reduce((a, url) =>
{
a[url.toLowerCase().replace(/^https?:\/\//, '').replace(/\/.*$/, '')] = true;
return a;
}, {}));
const cfg_port = mergedConfig.antietcd_port;
const is_local = local_ips(true).reduce((a, c) => { a[c] = true; return a; }, {});
const selected = cluster.map(s => s.split(':', 2)).filter(ip => is_local[ip[0]] && (!cfg_port || ip[1] == cfg_port));
if (selected.length > 1)
{
console.error('More than 1 etcd_address matches local IPs, please specify port');
process.exit(1);
}
else if (selected.length == 1)
{
const antietcd_config = {
ip: selected[0][0],
port: selected[0][1],
data: mergedConfig.antietcd_data_file || ((mergedConfig.antietcd_data_dir || '/var/lib/vitastor') + '/mon_'+selected[0][1]+'.json.gz'),
persist_filter: vitastor_persist_filter(mergedConfig.etcd_prefix || '/vitastor'),
node_id: selected[0][0]+':'+selected[0][1], // node_id = ip:port
cluster: (cluster.length == 1 ? null : cluster),
cluster_key: (mergedConfig.etcd_prefix || '/vitastor'),
stale_read: 1,
};
for (const key in config)
{
if (key.substr(0, 9) === 'antietcd_')
{
const noprefix = key.substr(9);
if (!(noprefix in antietcd_config))
{
antietcd_config[noprefix] = config[key];
}
}
}
antietcd = new AntiEtcd(antietcd_config);
antietcd.start();
}
else
{
console.log('Antietcd is enabled, but etcd_address does not contain local IPs, proceeding without it');
}
}
return antietcd;
}
constructor(mon, antietcd)
{
this.mon = mon;
this.antietcd = antietcd;
this.on_leader = [];
this.on_change = (st) =>
{
if (st.state === 'leader')
{
for (const cb of this.on_leader)
{
cb();
}
this.on_leader = [];
}
};
this.antietcd.on('raftchange', this.on_change);
}
parse_config(/*config*/)
{
}
stop_watcher()
{
this.antietcd.off('raftchange', this.on_change);
const watch_id = this.watch_id;
if (watch_id)
{
this.watch_id = null;
this.antietcd.cancel_watch(watch_id).catch(console.error);
}
}
async start_watcher()
{
if (this.watch_id)
{
await this.antietcd.cancel_watch(this.watch_id);
this.watch_id = null;
}
const watch_id = await this.antietcd.create_watch({
key: b64(this.mon.config.etcd_prefix+'/'),
range_end: b64(this.mon.config.etcd_prefix+'0'),
start_revision: ''+this.mon.etcd_watch_revision,
watch_id: 1,
progress_notify: true,
}, (message) =>
{
this.mon.on_message(message.result);
});
console.log('Successfully subscribed to antietcd revision '+this.antietcd.etctree.mod_revision);
this.watch_id = watch_id;
}
async become_master()
{
if (!this.antietcd.raft)
{
console.log('Running in non-clustered mode');
}
else
{
console.log('Waiting to become master');
await new Promise(ok => this.on_leader.push(ok));
}
const state = { ...this.mon.get_mon_state(), id: ''+this.mon.etcd_lease_id };
await this.etcd_call('/kv/txn', {
success: [ { requestPut: { key: b64(this.mon.config.etcd_prefix+'/mon/master'), value: b64(JSON.stringify(state)), lease: ''+this.mon.etcd_lease_id } } ],
}, this.mon.config.etcd_start_timeout, 0);
if (this.antietcd.raft)
{
console.log('Became master');
}
}
async etcd_call(path, body, timeout, retries)
{
let retry = 0;
if (retries >= 0 && retries < 1)
{
retries = 1;
}
let prev = 0;
while (retries < 0 || retry < retries)
{
retry++;
if (this.mon.stopped)
{
throw new Error('Monitor instance is stopped');
}
try
{
if (Date.now()-prev < timeout)
{
await new Promise(ok => setTimeout(ok, timeout-(Date.now()-prev)));
}
prev = Date.now();
const res = await this.antietcd.api(path.replace(/^\/+/, '').replace(/\/+$/, '').replace(/\/+/g, '_'), body);
if (res.error)
{
console.error('Failed to query antietcd '+path+' (retry '+retry+'/'+retries+'): '+res.error);
}
else
{
return res;
}
}
catch (e)
{
console.error('Failed to query antietcd '+path+' (retry '+retry+'/'+retries+'): '+e.stack);
}
}
throw new Error('Failed to query antietcd ('+retries+' retries)');
}
}
module.exports = AntiEtcdAdapter;

View File

@ -3,6 +3,7 @@
const http = require('http');
const WebSocket = require('ws');
const { b64, local_ips } = require('./utils.js');
const MON_STOPPED = 'Monitor instance is stopped';
@ -23,7 +24,7 @@ class EtcdAdapter
parse_etcd_addresses(addrs)
{
const is_local_ip = this.mon.local_ips(true).reduce((a, c) => { a[c] = true; return a; }, {});
const is_local_ip = local_ips(true).reduce((a, c) => { a[c] = true; return a; }, {});
this.etcd_local = [];
this.etcd_urls = [];
this.selected_etcd_url = null;
@ -348,9 +349,4 @@ function POST(url, body, timeout)
});
}
function b64(str)
{
return Buffer.from(str).toString('base64');
}
module.exports = EtcdAdapter;

View File

@ -4,6 +4,7 @@
const fs = require('fs');
const crypto = require('crypto');
const os = require('os');
const AntiEtcdAdapter = require('./antietcd_adapter.js');
const EtcdAdapter = require('./etcd_adapter.js');
const { etcd_tree, etcd_allow, etcd_nonempty_keys } = require('./etcd_schema.js');
const { validate_pool_cfg } = require('./pool_config.js');
@ -11,17 +12,23 @@ const { sum_op_stats, sum_object_counts, sum_inode_stats, serialize_bigints } =
const stableStringify = require('./stable-stringify.js');
const { scale_pg_history } = require('./pg_utils.js');
const { get_osd_tree } = require('./osd_tree.js');
const { b64, de64, local_ips } = require('./utils.js');
const { recheck_primary, save_new_pgs_txn, generate_pool_pgs } = require('./pg_gen.js');
class Mon
{
static run_forever(config)
{
let antietcd = AntiEtcdAdapter.start_antietcd(config);
let mon;
const run = () =>
{
console.log('Starting Monitor');
const my_mon = new Mon(config);
my_mon.etcd = antietcd
? new AntiEtcdAdapter(my_mon, antietcd)
: new EtcdAdapter(my_mon);
my_mon.etcd.parse_config(my_mon.config);
mon = my_mon;
my_mon.on_die = () =>
{
@ -58,8 +65,6 @@ class Mon
this.state = JSON.parse(JSON.stringify(etcd_tree));
this.prev_stats = { osd_stats: {}, osd_diff: {} };
this.recheck_pgs_active = false;
this.etcd = new EtcdAdapter(this);
this.etcd.parse_config(this.config);
}
async start()
@ -147,8 +152,8 @@ class Mon
this.etcd_watch_revision = BigInt(msg.header.revision)+BigInt(1);
for (const e of msg.events||[])
{
this.parse_kv(e.kv);
const key = e.kv.key.substr(this.config.etcd_prefix.length);
const kv = this.parse_kv(e.kv);
const key = kv.key.substr(this.config.etcd_prefix.length);
if (key.substr(0, 11) == '/osd/state/')
{
stats_changed = true;
@ -168,7 +173,7 @@ class Mon
}
if (this.config.verbose)
{
console.log(JSON.stringify(e));
console.log(JSON.stringify({ ...e, kv: kv || undefined }));
}
}
if (pg_states_changed)
@ -252,7 +257,7 @@ class Mon
get_mon_state()
{
return { ip: this.local_ips(), hostname: os.hostname() };
return { ip: local_ips(), hostname: os.hostname() };
}
async get_lease()
@ -690,15 +695,16 @@ class Mon
{
if (!kv || !kv.key)
{
return;
return kv;
}
kv = { ...kv };
kv.key = de64(kv.key);
kv.value = kv.value ? de64(kv.value) : null;
let key = kv.key.substr(this.config.etcd_prefix.length+1);
if (!etcd_allow.exec(key))
{
console.log('Bad key in etcd: '+kv.key+' = '+kv.value);
return;
return kv;
}
try
{
@ -707,7 +713,7 @@ class Mon
catch (e)
{
console.log('Bad value in etcd: '+kv.key+' = '+kv.value);
return;
return kv;
}
let key_parts = key.split('/');
let cur = this.state;
@ -757,6 +763,7 @@ class Mon
!this.state.osd.stats[osd_num] ? 0 : this.state.osd.stats[osd_num].time+this.config.osd_out_time
);
}
return kv;
}
_die(err)
@ -766,33 +773,6 @@ class Mon
this.on_stop().catch(console.error);
this.on_die();
}
local_ips(all)
{
const ips = [];
const ifaces = os.networkInterfaces();
for (const ifname in ifaces)
{
for (const iface of ifaces[ifname])
{
if (iface.family == 'IPv4' && !iface.internal || all)
{
ips.push(iface.address);
}
}
}
return ips;
}
}
function b64(str)
{
return Buffer.from(str).toString('base64');
}
function de64(str)
{
return Buffer.from(str, 'base64').toString();
}
function sha1hex(str)

View File

@ -9,6 +9,7 @@
"author": "Vitaliy Filippov",
"license": "UNLICENSED",
"dependencies": {
"antietcd": "^1.0.1",
"sprintf-js": "^1.1.2",
"ws": "^7.2.5"
},

37
mon/utils.js Normal file
View File

@ -0,0 +1,37 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
const os = require('os');
function local_ips(all)
{
const ips = [];
const ifaces = os.networkInterfaces();
for (const ifname in ifaces)
{
for (const iface of ifaces[ifname])
{
if (iface.family == 'IPv4' && !iface.internal || all)
{
ips.push(iface.address);
}
}
}
return ips;
}
function b64(str)
{
return Buffer.from(str).toString('base64');
}
function de64(str)
{
return Buffer.from(str, 'base64').toString();
}
module.exports = {
b64,
de64,
local_ips,
};

View File

@ -0,0 +1,48 @@
// AntiEtcd persistence filter for Vitastor
// (c) Vitaliy Filippov, 2024
// License: Mozilla Public License 2.0 or Vitastor Network Public License 1.1
function vitastor_persist_filter(cfg)
{
const prefix = cfg.vitastor_prefix || '/vitastor';
return (key, value) =>
{
if (key.substr(0, prefix.length+'/osd/stats/'.length) == prefix+'/osd/stats/')
{
if (value)
{
try
{
value = JSON.parse(value);
value = JSON.stringify({
bitmap_granularity: value.bitmap_granularity || undefined,
data_block_size: value.data_block_size || undefined,
host: value.host || undefined,
immediate_commit: value.immediate_commit || undefined,
});
}
catch (e)
{
console.error('invalid JSON in '+key+' = '+value+': '+e);
value = {};
}
}
else
{
value = undefined;
}
return value;
}
else if (key.substr(0, prefix.length+'/osd/'.length) == prefix+'/osd/' ||
key.substr(0, prefix.length+'/inode/stats/'.length) == prefix+'/inode/stats/' ||
key.substr(0, prefix.length+'/pg/stats/'.length) == prefix+'/pg/stats/' ||
key.substr(0, prefix.length+'/pool/stats/'.length) == prefix+'/pool/stats/' ||
key == prefix+'/stats')
{
return undefined;
}
return value;
};
}
module.exports = vitastor_persist_filter;

View File

@ -23,7 +23,7 @@ trap 'kill -9 $(jobs -p)' EXIT
ETCD=${ETCD:-etcd}
ETCD_IP=${ETCD_IP:-127.0.0.1}
ETCD_PORT=${ETCD_PORT:-12379}
ETCD_COUNT=${ETCD_COUNT:-1}
ETCD_COUNT=${ETCD_COUNT:-0}
if [ "$KEEP_DATA" = "" ]; then
rm -rf ./testdata
@ -32,12 +32,9 @@ if [ "$KEEP_DATA" = "" ]; then
fi
ETCD_URL="http://$ETCD_IP:$ETCD_PORT"
ETCD_CLUSTER="etcd1=http://$ETCD_IP:$((ETCD_PORT+1))"
for i in $(seq 2 $ETCD_COUNT); do
ETCD_URL="$ETCD_URL,http://$ETCD_IP:$((ETCD_PORT+2*i-2))"
ETCD_CLUSTER="$ETCD_CLUSTER,etcd$i=http://$ETCD_IP:$((ETCD_PORT+2*i-1))"
done
ETCDCTL="${ETCD}ctl --endpoints=$ETCD_URL --dial-timeout=5s --command-timeout=10s"
start_etcd()
{
@ -53,15 +50,43 @@ start_etcd()
eval ETCD${i}_PID=$!
}
for i in $(seq 1 $ETCD_COUNT); do
start_etcd $i
done
for i in {1..30}; do
${ETCD}ctl --endpoints=$ETCD_URL --dial-timeout=1s --command-timeout=1s member list >/dev/null && break
if [[ $i = 30 ]]; then
format_error "Failed to start etcd"
fi
done
start_etcd_cluster()
{
ETCD_CLUSTER="etcd1=http://$ETCD_IP:$((ETCD_PORT+1))"
for i in $(seq 2 $ETCD_COUNT); do
ETCD_CLUSTER="$ETCD_CLUSTER,etcd$i=http://$ETCD_IP:$((ETCD_PORT+2*i-1))"
done
for i in $(seq 1 $ETCD_COUNT); do
start_etcd $i
done
ETCDCTL="${ETCD}ctl --endpoints=$ETCD_URL --dial-timeout=5s --command-timeout=10s"
for i in {1..30}; do
${ETCD}ctl --endpoints=$ETCD_URL --dial-timeout=1s --command-timeout=1s member list >/dev/null && break
if [[ $i = 30 ]]; then
format_error "Failed to start etcd"
fi
done
}
wait_etcd()
{
for i in {1..30}; do
$ETCDCTL --dial-timeout=1s --command-timeout=1s get --prefix / && break
if [[ $i = 30 ]]; then
format_error "Failed to start etcd"
fi
sleep 1
done
}
if [[ "$ETCD_COUNT" -lt 1 ]]; then
ETCDCTL="node mon/node_modules/.bin/anticli -e $ETCD_URL"
MON_PARAMS="--use_antietcd 1 --antietcd_data_dir ./testdata --antietcd_persist_interval 500"
else
ETCDCTL="${ETCD}ctl --endpoints=$ETCD_URL --dial-timeout=5s --command-timeout=10s"
MON_PARAMS=""
start_etcd_cluster
fi
echo leak:fio >> testdata/lsan-suppress.txt
echo leak:tcmalloc >> testdata/lsan-suppress.txt

View File

@ -18,6 +18,11 @@ else
OSD_COUNT=${OSD_COUNT:-3}
fi
node mon/mon-main.js $MON_PARAMS --etcd_address $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 >>./testdata/mon.log 2>&1 &
MON_PID=$!
sleep 3
if [ "$IMMEDIATE_COMMIT" != "" ]; then
NO_SAME="--journal_no_same_sector_overwrites true --journal_sector_buffer_count 1024 --disable_data_fsync 1 --immediate_commit all --log_level 10 --etcd_stats_interval 5"
$ETCDCTL put /vitastor/config/global '{"recovery_queue_depth":1,"recovery_tune_util_low":1,"immediate_commit":"all","client_enable_writeback":true,"client_max_writeback_iodepth":32'$GLOBAL_CONFIG'}'
@ -54,9 +59,6 @@ for i in $(seq 1 $OSD_COUNT); do
start_osd $i
done
node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 >>./testdata/mon.log 2>&1 &
MON_PID=$!
if [ "$SCHEME" = "ec" ]; then
PG_SIZE=${PG_SIZE:-5}
PG_MINSIZE=${PG_MINSIZE:-4}

View File

@ -2,6 +2,10 @@
. `dirname $0`/common.sh
node mon/mon-main.js $MON_PARAMS --etcd_address $ETCD_URL --etcd_prefix "/vitastor" >>./testdata/mon.log 2>&1 &
MON_PID=$!
wait_etcd
TIME=$(date '+%s')
$ETCDCTL put /vitastor/config/global '{"placement_levels":{"rack":1,"host":2,"osd":3}}'
$ETCDCTL put /vitastor/config/node_placement '{"rack1":{"level":"rack"},"rack2":{"level":"rack"},"host1":{"level":"host","parent":"rack1"},"host2":{"level":"host","parent":"rack1"},"host3":{"level":"host","parent":"rack2"},"host4":{"level":"host","parent":"rack2"}}'
@ -22,12 +26,9 @@ $ETCDCTL get --print-value-only /vitastor/config/pools | jq -s -e '. == [{}]'
build/src/cmd/vitastor-cli --etcd_address $ETCD_URL create-pool testpool -s 2 -n 4 --failure_domain rack --force
$ETCDCTL get --print-value-only /vitastor/config/pools | jq -s -e '. == [{"1":{"name":"testpool","scheme":"replicated","pg_size":2,"pg_minsize":1,"pg_count":4,"failure_domain":"rack"}}]'
node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" >>./testdata/mon.log 2>&1 &
MON_PID=$!
sleep 2
etcdctl --endpoints=http://localhost:12379 get --prefix /vitastor/config/pgs --print-value-only | \
$ETCDCTL get --prefix /vitastor/config/pgs --print-value-only | \
jq -s -e '([ .[0].items["1"] | .[].osd_set | map_values(. | tonumber) | select((.[0] <= 4) != (.[1] <= 4)) ] | length) == 4'
format_green OK

View File

@ -1,5 +1,7 @@
#!/bin/bash -ex
ETCD_COUNT=1
. `dirname $0`/common.sh
OSD_SIZE=1024

View File

@ -2,6 +2,10 @@
. `dirname $0`/common.sh
node mon/mon-main.js $MON_PARAMS --etcd_address $ETCD_URL --etcd_prefix "/vitastor" >>./testdata/mon.log 2>&1 &
MON_PID=$!
wait_etcd
TIME=$(date '+%s')
$ETCDCTL put /vitastor/config/osd/1 '{"tags":["a"]}'
$ETCDCTL put /vitastor/config/osd/2 '{"tags":["a"]}'
@ -21,15 +25,12 @@ $ETCDCTL put /vitastor/osd/stats/7 '{"host":"stor4","size":1073741824,"time":"'$
$ETCDCTL put /vitastor/osd/stats/8 '{"host":"stor4","size":1073741824,"time":"'$TIME'"}'
$ETCDCTL put /vitastor/config/pools '{"1":{"name":"testpool","scheme":"replicated","pg_size":2,"pg_minsize":2,"pg_count":16,"failure_domain":"host","osd_tags":["a"]}}'
node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" >>./testdata/mon.log 2>&1 &
MON_PID=$!
sleep 2
etcdctl --endpoints=http://localhost:12379 get --prefix /vitastor/config/pgs --print-value-only
$ETCDCTL get --prefix /vitastor/config/pgs --print-value-only
if ! (etcdctl --endpoints=http://localhost:12379 get --prefix /vitastor/config/pgs --print-value-only | \
jq -s -e '[ [ .[0].items["1"] | .[].osd_set | map(. | select(. != "" and (.|tonumber) < 5)) ][] | select((. | length) == 2) ] | length == 16'); then
if ! ($ETCDCTL get --prefix /vitastor/config/pgs --print-value-only | \
jq -s -e '[ [ .[] | select(has("items")) | .items["1"] | .[].osd_set | map(. | select(. != "" and (.|tonumber) < 5)) ][] | select((. | length) == 2) ] | length == 16'); then
format_error "Some PGs missing replicas"
fi

View File

@ -2,6 +2,10 @@
. `dirname $0`/common.sh
node mon/mon-main.js $MON_PARAMS --etcd_address $ETCD_URL --etcd_prefix "/vitastor" >>./testdata/mon.log 2>&1 &
MON_PID=$!
wait_etcd
TIME=$(date '+%s')
$ETCDCTL put /vitastor/config/global '{"placement_levels":{"rack":100,"host":101,"osd":102}}'
$ETCDCTL put /vitastor/config/node_placement '{"rack1":{"level":"rack"},"rack2":{"level":"rack"},"stor1":{"level":"host","parent":"rack1"},"stor2":{"level":"host","parent":"rack1"},"stor3":{"level":"host","parent":"rack2"},"stor4":{"level":"host","parent":"rack2"}}'
@ -15,14 +19,11 @@ $ETCDCTL put /vitastor/osd/stats/7 '{"host":"stor4","size":1073741824,"time":"'$
$ETCDCTL put /vitastor/osd/stats/8 '{"host":"stor4","size":1073741824,"time":"'$TIME'"}'
$ETCDCTL put /vitastor/config/pools '{"1":{"name":"testpool","scheme":"replicated","pg_size":2,"pg_minsize":2,"pg_count":16,"failure_domain":"host","root_node":"rack1"}}'
node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" >>./testdata/mon.log 2>&1 &
MON_PID=$!
sleep 2
etcdctl --endpoints=http://localhost:12379 get --prefix /vitastor/config/pgs --print-value-only
$ETCDCTL get --prefix /vitastor/config/pgs --print-value-only
if ! (etcdctl --endpoints=http://localhost:12379 get --prefix /vitastor/config/pgs --print-value-only | \
if ! ($ETCDCTL get --prefix /vitastor/config/pgs --print-value-only | \
jq -s -e '[ [ .[0].items["1"] | .[].osd_set | map(. | select(. != "" and (.|tonumber) < 5)) ][] | select((. | length) == 2) ] | length == 16'); then
format_error "Some PGs missing replicas"
fi

View File

@ -3,9 +3,13 @@
export KEEP_DATA=1
. `dirname $0`/common.sh
etcdctl --endpoints=http://127.0.0.1:12379/v3 del --prefix /vitastor/mon/master
etcdctl --endpoints=http://127.0.0.1:12379/v3 del --prefix /vitastor/pg/state
etcdctl --endpoints=http://127.0.0.1:12379/v3 del --prefix /vitastor/osd/state
node mon/mon-main.js $MON_PARAMS --etcd_address $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 >>./testdata/mon.log 2>&1 &
MON_PID=$!
wait_etcd
$ETCDCTL del --prefix /vitastor/mon/master
$ETCDCTL del --prefix /vitastor/pg/state
$ETCDCTL del --prefix /vitastor/osd/state
OSD_COUNT=3
OSD_ARGS="$OSD_ARGS"
@ -15,9 +19,6 @@ for i in $(seq 1 $OSD_COUNT); do
eval OSD${i}_PID=$!
done
node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 >>./testdata/mon.log 2>&1 &
MON_PID=$!
sleep 3
if ! ($ETCDCTL get /vitastor/pg/state/1/1 --print-value-only | jq -s -e '(. | length) != 0 and .[0].state == ["active"]'); then