Compare commits

..

3 Commits

Author SHA1 Message Date
Vitaliy Filippov 6501abc060 Set default etcd_ws_keepalive_interval to 5
Test / test_snapshot_chain_ec (push) Successful in 2m52s Details
Test / test_rebalance_verify_imm (push) Successful in 3m4s Details
Test / test_root_node (push) Successful in 13s Details
Test / test_rebalance_verify (push) Successful in 3m46s Details
Test / test_switch_primary (push) Successful in 36s Details
Test / test_write (push) Successful in 44s Details
Test / test_write_no_same (push) Successful in 20s Details
Test / test_write_xor (push) Successful in 1m7s Details
Test / test_rebalance_verify_ec_imm (push) Successful in 4m8s Details
Test / test_rebalance_verify_ec (push) Successful in 5m25s Details
Test / test_heal_pg_size_2 (push) Successful in 3m27s Details
Test / test_heal_csum_32k_dmj (push) Successful in 5m42s Details
Test / test_heal_csum_32k_dj (push) Successful in 6m28s Details
Test / test_heal_csum_32k (push) Successful in 6m40s Details
Test / test_osd_tags (push) Successful in 25s Details
Test / test_heal_csum_4k_dmj (push) Successful in 7m16s Details
Test / test_enospc (push) Successful in 2m12s Details
Test / test_heal_csum_4k_dj (push) Successful in 6m26s Details
Test / test_enospc_imm (push) Successful in 1m41s Details
Test / test_enospc_xor (push) Successful in 2m21s Details
Test / test_heal_csum_4k (push) Successful in 6m19s Details
Test / test_enospc_imm_xor (push) Successful in 1m32s Details
Test / test_scrub (push) Successful in 49s Details
Test / test_scrub_zero_osd_2 (push) Successful in 34s Details
Test / test_scrub_xor (push) Successful in 31s Details
Test / test_scrub_pg_size_6_pg_minsize_4_osd_count_6_ec (push) Successful in 40s Details
Test / test_nfs (push) Successful in 18s Details
Test / test_scrub_pg_size_3 (push) Successful in 46s Details
Test / test_scrub_ec (push) Successful in 27s Details
Test / test_heal_ec (push) Successful in 2m57s Details
2024-06-08 00:38:48 +03:00
Vitaliy Filippov 1228403e74 Implement internal restart / run_forever in monitor
Test / test_rebalance_verify_imm (push) Successful in 1m55s Details
Test / test_rebalance_verify (push) Successful in 2m42s Details
Test / test_root_node (push) Successful in 1m19s Details
Test / test_switch_primary (push) Successful in 33s Details
Test / test_rebalance_verify_ec (push) Successful in 3m21s Details
Test / test_etcd_fail (push) Failing after 10m8s Details
Test / test_write (push) Successful in 53s Details
Test / test_write_no_same (push) Successful in 16s Details
Test / test_write_xor (push) Successful in 1m0s Details
Test / test_rebalance_verify_ec_imm (push) Successful in 3m59s Details
Test / test_heal_pg_size_2 (push) Successful in 4m37s Details
Test / test_heal_csum_32k_dmj (push) Failing after 4m45s Details
Test / test_heal_ec (push) Successful in 5m48s Details
Test / test_heal_csum_32k_dj (push) Successful in 6m12s Details
Test / test_heal_csum_32k (push) Successful in 6m30s Details
Test / test_heal_csum_4k_dmj (push) Successful in 6m16s Details
Test / test_osd_tags (push) Successful in 34s Details
Test / test_heal_csum_4k_dj (push) Successful in 6m50s Details
Test / test_enospc (push) Successful in 1m34s Details
Test / test_enospc_imm (push) Successful in 1m4s Details
Test / test_enospc_xor (push) Successful in 2m6s Details
Test / test_heal_csum_4k (push) Successful in 6m47s Details
Test / test_enospc_imm_xor (push) Successful in 1m26s Details
Test / test_scrub (push) Successful in 37s Details
Test / test_scrub_zero_osd_2 (push) Successful in 35s Details
Test / test_scrub_xor (push) Successful in 28s Details
Test / test_nfs (push) Successful in 19s Details
Test / test_scrub_pg_size_6_pg_minsize_4_osd_count_6_ec (push) Successful in 31s Details
Test / test_scrub_pg_size_3 (push) Successful in 45s Details
Test / test_scrub_ec (push) Successful in 29s Details
2024-06-08 00:35:18 +03:00
Vitaliy Filippov 4eabebd245 Put all configuration to Mon.config
Test / test_snapshot_chain_ec (push) Successful in 2m50s Details
Test / test_rebalance_verify_imm (push) Successful in 2m48s Details
Test / test_rebalance_verify (push) Successful in 3m18s Details
Test / test_root_node (push) Successful in 11s Details
Test / test_switch_primary (push) Successful in 35s Details
Test / test_write (push) Successful in 39s Details
Test / test_write_no_same (push) Successful in 19s Details
Test / test_write_xor (push) Successful in 1m2s Details
Test / test_rebalance_verify_ec (push) Successful in 4m7s Details
Test / test_rebalance_verify_ec_imm (push) Successful in 3m49s Details
Test / test_heal_pg_size_2 (push) Successful in 3m24s Details
Test / test_heal_ec (push) Successful in 5m29s Details
Test / test_heal_csum_32k_dmj (push) Successful in 6m5s Details
Test / test_heal_csum_32k_dj (push) Successful in 5m52s Details
Test / test_heal_csum_32k (push) Successful in 6m44s Details
Test / test_osd_tags (push) Successful in 21s Details
Test / test_enospc (push) Successful in 2m16s Details
Test / test_heal_csum_4k_dmj (push) Successful in 6m44s Details
Test / test_heal_csum_4k (push) Successful in 6m31s Details
Test / test_heal_csum_4k_dj (push) Successful in 6m45s Details
Test / test_enospc_imm (push) Successful in 1m21s Details
Test / test_enospc_xor (push) Successful in 1m28s Details
Test / test_scrub_zero_osd_2 (push) Successful in 35s Details
Test / test_scrub (push) Successful in 39s Details
Test / test_scrub_xor (push) Successful in 36s Details
Test / test_enospc_imm_xor (push) Successful in 49s Details
Test / test_nfs (push) Successful in 24s Details
Test / test_scrub_pg_size_6_pg_minsize_4_osd_count_6_ec (push) Successful in 35s Details
Test / test_scrub_ec (push) Successful in 33s Details
Test / test_scrub_pg_size_3 (push) Successful in 41s Details
2024-06-07 00:20:38 +03:00
11 changed files with 206 additions and 96 deletions

View File

@ -248,7 +248,7 @@ etcd_report_interval to guarantee that keepalive actually works.
## etcd_ws_keepalive_interval ## etcd_ws_keepalive_interval
- Type: seconds - Type: seconds
- Default: 30 - Default: 5
- Can be changed online: yes - Can be changed online: yes
etcd websocket ping interval required to keep the connection alive and etcd websocket ping interval required to keep the connection alive and

View File

@ -259,7 +259,7 @@ etcd_report_interval, чтобы keepalive гарантированно рабо
## etcd_ws_keepalive_interval ## etcd_ws_keepalive_interval
- Тип: секунды - Тип: секунды
- Значение по умолчанию: 30 - Значение по умолчанию: 5
- Можно менять на лету: да - Можно менять на лету: да
Интервал проверки живости вебсокет-подключений к etcd. Интервал проверки живости вебсокет-подключений к etcd.

View File

@ -282,7 +282,7 @@
etcd_report_interval, чтобы keepalive гарантированно работал. etcd_report_interval, чтобы keepalive гарантированно работал.
- name: etcd_ws_keepalive_interval - name: etcd_ws_keepalive_interval
type: sec type: sec
default: 30 default: 5
online: true online: true
info: | info: |
etcd websocket ping interval required to keep the connection alive and etcd websocket ping interval required to keep the connection alive and

View File

@ -4,6 +4,8 @@
const http = require('http'); const http = require('http');
const WebSocket = require('ws'); const WebSocket = require('ws');
const MON_STOPPED = 'Monitor instance is stopped';
class EtcdAdapter class EtcdAdapter
{ {
constructor(mon) constructor(mon)
@ -66,10 +68,12 @@ class EtcdAdapter
return this.selected_etcd_url; return this.selected_etcd_url;
} }
restart_watcher(cur_addr) stop_watcher(cur_addr)
{ {
cur_addr = cur_addr || this.selected_etcd_url;
if (this.ws) if (this.ws)
{ {
console.log('Disconnected from etcd at '+this.ws_used_url);
this.ws.close(); this.ws.close();
this.ws = null; this.ws = null;
} }
@ -82,6 +86,11 @@ class EtcdAdapter
{ {
this.selected_etcd_url = null; this.selected_etcd_url = null;
} }
}
restart_watcher(cur_addr)
{
this.stop_watcher(cur_addr);
this.start_watcher(this.mon.config.etcd_mon_retries).catch(this.mon.die); this.start_watcher(this.mon.config.etcd_mon_retries).catch(this.mon.die);
} }
@ -98,21 +107,30 @@ class EtcdAdapter
const cur_addr = this.pick_next_etcd(); const cur_addr = this.pick_next_etcd();
const base = 'ws'+cur_addr.substr(4); const base = 'ws'+cur_addr.substr(4);
let now = Date.now(); let now = Date.now();
if (tried[base] && now-tried[base] < this.mon.etcd_start_timeout) if (tried[base] && now-tried[base] < this.mon.config.etcd_start_timeout)
{ {
await new Promise(ok => setTimeout(ok, this.mon.etcd_start_timeout-(now-tried[base]))); await new Promise(ok => setTimeout(ok, this.mon.config.etcd_start_timeout-(now-tried[base])));
now = Date.now(); now = Date.now();
} }
tried[base] = now; tried[base] = now;
if (this.mon.stopped)
{
return;
}
const ok = await new Promise(ok => const ok = await new Promise(ok =>
{ {
const timer_id = setTimeout(() => const timer_id = setTimeout(() =>
{ {
if (this.ws)
{
console.log('Disconnected from etcd at '+this.ws_used_url);
this.ws.close(); this.ws.close();
this.ws = null; this.ws = null;
}
ok(false); ok(false);
}, this.mon.config.etcd_mon_timeout); }, this.mon.config.etcd_mon_timeout);
this.ws = new WebSocket(base+'/watch'); this.ws = new WebSocket(base+'/watch');
this.ws_used_url = cur_addr;
const fail = () => const fail = () =>
{ {
ok(false); ok(false);
@ -135,13 +153,19 @@ class EtcdAdapter
} }
if (!this.ws) if (!this.ws)
{ {
this.mon.failconnect('Failed to open etcd watch websocket'); this.mon.die('Failed to open etcd watch websocket');
return;
}
if (this.mon.stopped)
{
this.stop_watcher();
return;
} }
const cur_addr = this.selected_etcd_url; const cur_addr = this.selected_etcd_url;
this.ws_alive = true; this.ws_alive = true;
this.ws_keepalive_timer = setInterval(() => this.ws_keepalive_timer = setInterval(() =>
{ {
if (this.ws_alive) if (this.ws_alive && this.ws)
{ {
this.ws_alive = false; this.ws_alive = false;
this.ws.send(JSON.stringify({ progress_request: {} })); this.ws.send(JSON.stringify({ progress_request: {} }));
@ -151,12 +175,12 @@ class EtcdAdapter
console.log('etcd websocket timed out, restarting it'); console.log('etcd websocket timed out, restarting it');
this.restart_watcher(cur_addr); this.restart_watcher(cur_addr);
} }
}, (Number(this.mon.config.etcd_ws_keepalive_interval) || 30)*1000); }, (Number(this.mon.config.etcd_ws_keepalive_interval) || 5)*1000);
this.ws.on('error', () => this.restart_watcher(cur_addr)); this.ws.on('error', () => this.restart_watcher(cur_addr));
this.ws.send(JSON.stringify({ this.ws.send(JSON.stringify({
create_request: { create_request: {
key: b64(this.mon.etcd_prefix+'/'), key: b64(this.mon.config.etcd_prefix+'/'),
range_end: b64(this.mon.etcd_prefix+'0'), range_end: b64(this.mon.config.etcd_prefix+'0'),
start_revision: ''+this.mon.etcd_watch_revision, start_revision: ''+this.mon.etcd_watch_revision,
watch_id: 1, watch_id: 1,
progress_notify: true, progress_notify: true,
@ -164,6 +188,11 @@ class EtcdAdapter
})); }));
this.ws.on('message', (msg) => this.ws.on('message', (msg) =>
{ {
if (this.mon.stopped)
{
this.stop_watcher();
return;
}
this.ws_alive = true; this.ws_alive = true;
let data; let data;
try try
@ -183,15 +212,14 @@ class EtcdAdapter
if (data.result.compact_revision) if (data.result.compact_revision)
{ {
// we may miss events if we proceed // we may miss events if we proceed
console.error('Revisions before '+data.result.compact_revision+' were compacted by etcd, exiting'); this.mon.die('Revisions before '+data.result.compact_revision+' were compacted by etcd, exiting');
this.mon.on_stop(1);
} }
console.error('Watch canceled by etcd, reason: '+data.result.cancel_reason+', exiting'); this.mon.die('Watch canceled by etcd, reason: '+data.result.cancel_reason+', exiting');
this.mon.on_stop(1);
} }
else if (data.result.created) else if (data.result.created)
{ {
// etcd watch created // etcd watch created
console.log('Successfully subscribed to etcd at '+this.selected_etcd_url+', revision '+data.result.header.revision);
} }
else else
{ {
@ -207,15 +235,15 @@ class EtcdAdapter
while (1) while (1)
{ {
const res = await this.etcd_call('/kv/txn', { const res = await this.etcd_call('/kv/txn', {
compare: [ { target: 'CREATE', create_revision: 0, key: b64(this.mon.etcd_prefix+'/mon/master') } ], compare: [ { target: 'CREATE', create_revision: 0, key: b64(this.mon.config.etcd_prefix+'/mon/master') } ],
success: [ { requestPut: { key: b64(this.mon.etcd_prefix+'/mon/master'), value: b64(JSON.stringify(state)), lease: ''+this.mon.etcd_lease_id } } ], success: [ { requestPut: { key: b64(this.mon.config.etcd_prefix+'/mon/master'), value: b64(JSON.stringify(state)), lease: ''+this.mon.etcd_lease_id } } ],
}, this.mon.etcd_start_timeout, 0); }, this.mon.config.etcd_start_timeout, 0);
if (res.succeeded) if (res.succeeded)
{ {
break; break;
} }
console.log('Waiting to become master'); console.log('Waiting to become master');
await new Promise(ok => setTimeout(ok, this.mon.etcd_start_timeout)); await new Promise(ok => setTimeout(ok, this.mon.config.etcd_start_timeout));
} }
console.log('Became master'); console.log('Became master');
} }
@ -239,25 +267,33 @@ class EtcdAdapter
now = Date.now(); now = Date.now();
} }
tried[base] = now; tried[base] = now;
if (this.mon.stopped)
{
throw new Error(MON_STOPPED);
}
const res = await POST(base+path, body, timeout); const res = await POST(base+path, body, timeout);
if (this.mon.stopped)
{
throw new Error(MON_STOPPED);
}
if (res.error) if (res.error)
{ {
if (this.selected_etcd_url == base) if (this.selected_etcd_url == base)
this.selected_etcd_url = null; this.selected_etcd_url = null;
console.error('failed to query etcd: '+res.error); console.error('Failed to query etcd '+path+' (retry '+retry+'/'+retries+'): '+res.error);
continue; continue;
} }
if (res.json) if (res.json)
{ {
if (res.json.error) if (res.json.error)
{ {
console.error('etcd returned error: '+res.json.error); console.error(path+': etcd returned error: '+res.json.error);
break; break;
} }
return res.json; return res.json;
} }
} }
this.mon.failconnect(); throw new Error('Failed to query etcd ('+retries+' retries)');
} }
} }

View File

@ -91,7 +91,7 @@ const etcd_tree = {
etcd_quick_timeout: 1000, // ms etcd_quick_timeout: 1000, // ms
etcd_slow_timeout: 5000, // ms etcd_slow_timeout: 5000, // ms
etcd_keepalive_timeout: 30, // seconds, default is max(30, etcd_report_interval*2) etcd_keepalive_timeout: 30, // seconds, default is max(30, etcd_report_interval*2)
etcd_ws_keepalive_interval: 30, // seconds etcd_ws_keepalive_interval: 5, // seconds
// osd // osd
etcd_report_interval: 5, // seconds etcd_report_interval: 5, // seconds
etcd_stats_interval: 30, // seconds etcd_stats_interval: 30, // seconds

View File

@ -23,4 +23,4 @@ for (let i = 2; i < process.argv.length; i++)
} }
} }
new Mon(options).start().catch(e => { console.error(e); process.exit(1); }); Mon.run_forever(options);

View File

@ -15,27 +15,48 @@ const { recheck_primary, save_new_pgs_txn, generate_pool_pgs } = require('./pg_g
class Mon class Mon
{ {
static run_forever(config)
{
let mon;
const run = () =>
{
console.log('Starting Monitor');
const my_mon = new Mon(config);
mon = my_mon;
my_mon.on_die = () =>
{
if (mon == my_mon)
{
// Start a new instance
run();
}
};
my_mon.start().catch(my_mon.die);
};
run();
const on_stop_cb = () => mon.on_stop().then(() => process.exit(0)).catch(err =>
{
console.error(err);
process.exit(0);
});
process.on('SIGINT', on_stop_cb);
process.on('SIGTERM', on_stop_cb);
}
constructor(config) constructor(config)
{ {
this.failconnect = (e) => this._die(e, 2); this.stopped = false;
this.die = (e) => this._die(e, 1); this.die = (e) => this._die(e);
this.fileConfig = {};
if (fs.existsSync(config.config_path||'/etc/vitastor/vitastor.conf')) if (fs.existsSync(config.config_path||'/etc/vitastor/vitastor.conf'))
{ {
config = { this.fileConfig = JSON.parse(fs.readFileSync(config.config_path||'/etc/vitastor/vitastor.conf', { encoding: 'utf-8' }));
...JSON.parse(fs.readFileSync(config.config_path||'/etc/vitastor/vitastor.conf', { encoding: 'utf-8' })),
...config,
};
} }
this.verbose = config.verbose || 0; this.cliConfig = config;
this.initConfig = config; this.config = { ...this.fileConfig, ...this.cliConfig };
this.config = { ...config }; this.check_config();
this.etcd_prefix = config.etcd_prefix || '/vitastor';
this.etcd_prefix = this.etcd_prefix.replace(/\/\/+/g, '/').replace(/^\/?(.*[^\/])\/?$/, '/$1');
this.etcd_start_timeout = (config.etcd_start_timeout || 5) * 1000;
this.state = JSON.parse(JSON.stringify(etcd_tree)); this.state = JSON.parse(JSON.stringify(etcd_tree));
this.prev_stats = { osd_stats: {}, osd_diff: {} }; this.prev_stats = { osd_stats: {}, osd_diff: {} };
this.signals_set = false;
this.on_stop_cb = () => this.on_stop(0).catch(console.error);
this.recheck_pgs_active = false; this.recheck_pgs_active = false;
this.etcd = new EtcdAdapter(this); this.etcd = new EtcdAdapter(this);
this.etcd.parse_config(this.config); this.etcd.parse_config(this.config);
@ -65,17 +86,19 @@ class Mon
async load_config() async load_config()
{ {
const res = await this.etcd.etcd_call('/kv/txn', { success: [ const res = await this.etcd.etcd_call('/kv/txn', { success: [
{ requestRange: { key: b64(this.etcd_prefix+'/config/global') } } { requestRange: { key: b64(this.config.etcd_prefix+'/config/global') } }
] }, this.etcd_start_timeout, -1); ] }, this.config.etcd_start_timeout, -1);
if (res.responses[0].response_range.kvs) if (res.responses[0].response_range.kvs)
{ {
this.parse_kv(res.responses[0].response_range.kvs[0]); this.parse_kv(res.responses[0].response_range.kvs[0]);
} }
this.check_config();
} }
check_config() check_config()
{ {
this.config.etcd_prefix = this.config.etcd_prefix || '/vitastor';
this.config.etcd_prefix = this.config.etcd_prefix.replace(/\/\/+/g, '/').replace(/^\/?(.*[^\/])\/?$/, '/$1');
this.config.etcd_start_timeout = (this.config.etcd_start_timeout || 5) * 1000;
this.config.etcd_mon_ttl = Number(this.config.etcd_mon_ttl) || 5; this.config.etcd_mon_ttl = Number(this.config.etcd_mon_ttl) || 5;
if (this.config.etcd_mon_ttl < 1) if (this.config.etcd_mon_ttl < 1)
{ {
@ -117,7 +140,7 @@ class Mon
on_message(msg) on_message(msg)
{ {
let stats_changed = false, changed = false, pg_states_changed = false; let stats_changed = false, changed = false, pg_states_changed = false;
if (this.verbose) if (this.config.verbose)
{ {
console.log('Revision '+msg.header.revision+' events: '); console.log('Revision '+msg.header.revision+' events: ');
} }
@ -125,7 +148,7 @@ class Mon
for (const e of msg.events||[]) for (const e of msg.events||[])
{ {
this.parse_kv(e.kv); this.parse_kv(e.kv);
const key = e.kv.key.substr(this.etcd_prefix.length); const key = e.kv.key.substr(this.config.etcd_prefix.length);
if (key.substr(0, 11) == '/osd/state/') if (key.substr(0, 11) == '/osd/state/')
{ {
stats_changed = true; stats_changed = true;
@ -143,7 +166,7 @@ class Mon
{ {
changed = true; changed = true;
} }
if (this.verbose) if (this.config.verbose)
{ {
console.log(JSON.stringify(e)); console.log(JSON.stringify(e));
} }
@ -165,6 +188,10 @@ class Mon
// Schedule save_last_clean() to to run after a small timeout (1s) (to not spam etcd) // Schedule save_last_clean() to to run after a small timeout (1s) (to not spam etcd)
schedule_save_last_clean() schedule_save_last_clean()
{ {
if (this.stopped)
{
return;
}
if (!this.save_last_clean_timer) if (!this.save_last_clean_timer)
{ {
this.save_last_clean_timer = setTimeout(() => this.save_last_clean_timer = setTimeout(() =>
@ -216,10 +243,10 @@ class Mon
this.state.history.last_clean_pgs = new_clean_pgs; this.state.history.last_clean_pgs = new_clean_pgs;
await this.etcd.etcd_call('/kv/txn', { await this.etcd.etcd_call('/kv/txn', {
success: [ { requestPut: { success: [ { requestPut: {
key: b64(this.etcd_prefix+'/history/last_clean_pgs'), key: b64(this.config.etcd_prefix+'/history/last_clean_pgs'),
value: b64(JSON.stringify(this.state.history.last_clean_pgs)) value: b64(JSON.stringify(this.state.history.last_clean_pgs))
} } ], } } ],
}, this.etcd_start_timeout, 0); }, this.config.etcd_start_timeout, 0);
this.save_last_clean_running = false; this.save_last_clean_running = false;
} }
@ -237,39 +264,72 @@ class Mon
// Register in /mon/member, just for the information // Register in /mon/member, just for the information
const state = this.get_mon_state(); const state = this.get_mon_state();
res = await this.etcd.etcd_call('/kv/put', { res = await this.etcd.etcd_call('/kv/put', {
key: b64(this.etcd_prefix+'/mon/member/'+this.etcd_lease_id), key: b64(this.config.etcd_prefix+'/mon/member/'+this.etcd_lease_id),
value: b64(JSON.stringify(state)), value: b64(JSON.stringify(state)),
lease: ''+this.etcd_lease_id lease: ''+this.etcd_lease_id
}, this.etcd_start_timeout, 0); }, this.config.etcd_start_timeout, 0);
// Set refresh timer // Set refresh timer
this.lease_timer = setInterval(async () => this.lease_timer = setInterval(() =>
{
this.etcd.etcd_call('/lease/keepalive', { ID: this.etcd_lease_id }, this.config.etcd_mon_timeout, this.config.etcd_mon_retries)
.then(res =>
{ {
const res = await this.etcd.etcd_call('/lease/keepalive', { ID: this.etcd_lease_id }, this.config.etcd_mon_timeout, this.config.etcd_mon_retries);
if (!res.result.TTL) if (!res.result.TTL)
{ this.die('Lease expired');
this.failconnect('Lease expired'); })
} .catch(this.die);
}, this.config.etcd_mon_ttl*1000); }, this.config.etcd_mon_ttl*1000);
if (!this.signals_set)
{
process.on('SIGINT', this.on_stop_cb);
process.on('SIGTERM', this.on_stop_cb);
this.signals_set = true;
}
} }
async on_stop(status) async on_stop()
{
console.log('Stopping Monitor');
this.etcd.stop_watcher();
if (this.save_last_clean_timer)
{
clearTimeout(this.save_last_clean_timer);
this.save_last_clean_timer = null;
}
if (this.next_recheck_timer)
{
clearTimeout(this.next_recheck_timer);
this.next_recheck_timer = null;
}
if (this.recheck_timer)
{
clearTimeout(this.recheck_timer);
this.recheck_timer = null;
}
if (this.stats_timer)
{
clearTimeout(this.stats_timer);
this.stats_timer = null;
}
if (this.lease_timer)
{ {
clearInterval(this.lease_timer); clearInterval(this.lease_timer);
await this.etcd.etcd_call('/lease/revoke', { ID: this.etcd_lease_id }, this.config.etcd_mon_timeout, this.config.etcd_mon_retries); this.lease_timer = null;
process.exit(status); }
let p = null;
if (this.etcd_lease_id)
{
const lease_id = this.etcd_lease_id;
this.etcd_lease_id = null;
p = this.etcd.etcd_call('/lease/revoke', { ID: lease_id }, this.config.etcd_mon_timeout, this.config.etcd_mon_retries);
}
// 'stopped' flag prevents all further etcd communications of this instance
this.stopped = true;
if (p)
{
await p;
}
} }
async load_cluster_state() async load_cluster_state()
{ {
const res = await this.etcd.etcd_call('/kv/txn', { success: [ const res = await this.etcd.etcd_call('/kv/txn', { success: [
{ requestRange: { key: b64(this.etcd_prefix+'/'), range_end: b64(this.etcd_prefix+'0') } }, { requestRange: { key: b64(this.config.etcd_prefix+'/'), range_end: b64(this.config.etcd_prefix+'0') } },
] }, this.etcd_start_timeout, -1); ] }, this.config.etcd_start_timeout, -1);
this.etcd_watch_revision = BigInt(res.header.revision)+BigInt(1); this.etcd_watch_revision = BigInt(res.header.revision)+BigInt(1);
this.state = JSON.parse(JSON.stringify(etcd_tree)); this.state = JSON.parse(JSON.stringify(etcd_tree));
for (const response of res.responses) for (const response of res.responses)
@ -316,17 +376,17 @@ class Mon
const checks = []; const checks = [];
for (const osd_num of this.all_osds()) for (const osd_num of this.all_osds())
{ {
const key = b64(this.etcd_prefix+'/osd/state/'+osd_num); const key = b64(this.config.etcd_prefix+'/osd/state/'+osd_num);
checks.push({ key, target: 'MOD', result: 'LESS', mod_revision: ''+this.etcd_watch_revision }); checks.push({ key, target: 'MOD', result: 'LESS', mod_revision: ''+this.etcd_watch_revision });
} }
await this.etcd.etcd_call('/kv/txn', { await this.etcd.etcd_call('/kv/txn', {
compare: [ compare: [
{ key: b64(this.etcd_prefix+'/mon/master'), target: 'LEASE', lease: ''+this.etcd_lease_id }, { key: b64(this.config.etcd_prefix+'/mon/master'), target: 'LEASE', lease: ''+this.etcd_lease_id },
{ key: b64(this.etcd_prefix+'/config/pgs'), target: 'MOD', mod_revision: ''+this.etcd_watch_revision, result: 'LESS' }, { key: b64(this.config.etcd_prefix+'/config/pgs'), target: 'MOD', mod_revision: ''+this.etcd_watch_revision, result: 'LESS' },
...checks, ...checks,
], ],
success: [ success: [
{ requestPut: { key: b64(this.etcd_prefix+'/config/pgs'), value: b64(JSON.stringify(new_cfg)) } }, { requestPut: { key: b64(this.config.etcd_prefix+'/config/pgs'), value: b64(JSON.stringify(new_cfg)) } },
], ],
}, this.config.etcd_mon_timeout, 0); }, this.config.etcd_mon_timeout, 0);
return false; return false;
@ -336,6 +396,10 @@ class Mon
async recheck_pgs() async recheck_pgs()
{ {
if (this.stopped)
{
return;
}
if (this.recheck_pgs_active) if (this.recheck_pgs_active)
{ {
this.schedule_recheck(); this.schedule_recheck();
@ -437,9 +501,9 @@ class Mon
} }
// Also delete pool statistics // Also delete pool statistics
etcd_request.success.push({ requestDeleteRange: { etcd_request.success.push({ requestDeleteRange: {
key: b64(this.etcd_prefix+'/pool/stats/'+pool_id), key: b64(this.config.etcd_prefix+'/pool/stats/'+pool_id),
} }); } });
save_new_pgs_txn(new_config_pgs, etcd_request, this.state, this.etcd_prefix, save_new_pgs_txn(new_config_pgs, etcd_request, this.state, this.config.etcd_prefix,
this.etcd_watch_revision, pool_id, up_osds, osd_tree, prev_pgs, [], []); this.etcd_watch_revision, pool_id, up_osds, osd_tree, prev_pgs, [], []);
} }
} }
@ -470,8 +534,8 @@ class Mon
pg_history = scale_pg_history(pg_history, real_prev_pgs, pool_res.pgs); pg_history = scale_pg_history(pg_history, real_prev_pgs, pool_res.pgs);
// Drop stats // Drop stats
etcd_request.success.push({ requestDeleteRange: { etcd_request.success.push({ requestDeleteRange: {
key: b64(this.etcd_prefix+'/pg/stats/'+pool_id+'/'), key: b64(this.config.etcd_prefix+'/pg/stats/'+pool_id+'/'),
range_end: b64(this.etcd_prefix+'/pg/stats/'+pool_id+'0'), range_end: b64(this.config.etcd_prefix+'/pg/stats/'+pool_id+'0'),
} }); } });
} }
const stats = { const stats = {
@ -479,10 +543,10 @@ class Mon
...pool_res.stats, ...pool_res.stats,
}; };
etcd_request.success.push({ requestPut: { etcd_request.success.push({ requestPut: {
key: b64(this.etcd_prefix+'/pool/stats/'+pool_id), key: b64(this.config.etcd_prefix+'/pool/stats/'+pool_id),
value: b64(JSON.stringify(stats)), value: b64(JSON.stringify(stats)),
} }); } });
save_new_pgs_txn(new_config_pgs, etcd_request, this.state, this.etcd_prefix, save_new_pgs_txn(new_config_pgs, etcd_request, this.state, this.config.etcd_prefix,
this.etcd_watch_revision, pool_id, up_osds, osd_tree, real_prev_pgs, pool_res.pgs, pg_history); this.etcd_watch_revision, pool_id, up_osds, osd_tree, real_prev_pgs, pool_res.pgs, pg_history);
} }
new_config_pgs.hash = tree_hash; new_config_pgs.hash = tree_hash;
@ -492,11 +556,11 @@ class Mon
async save_pg_config(new_config_pgs, etcd_request = { compare: [], success: [] }) async save_pg_config(new_config_pgs, etcd_request = { compare: [], success: [] })
{ {
etcd_request.compare.push( etcd_request.compare.push(
{ key: b64(this.etcd_prefix+'/mon/master'), target: 'LEASE', lease: ''+this.etcd_lease_id }, { key: b64(this.config.etcd_prefix+'/mon/master'), target: 'LEASE', lease: ''+this.etcd_lease_id },
{ key: b64(this.etcd_prefix+'/config/pgs'), target: 'MOD', mod_revision: ''+this.etcd_watch_revision, result: 'LESS' }, { key: b64(this.config.etcd_prefix+'/config/pgs'), target: 'MOD', mod_revision: ''+this.etcd_watch_revision, result: 'LESS' },
); );
etcd_request.success.push( etcd_request.success.push(
{ requestPut: { key: b64(this.etcd_prefix+'/config/pgs'), value: b64(JSON.stringify(new_config_pgs)) } }, { requestPut: { key: b64(this.config.etcd_prefix+'/config/pgs'), value: b64(JSON.stringify(new_config_pgs)) } },
); );
const txn_res = await this.etcd.etcd_call('/kv/txn', etcd_request, this.config.etcd_mon_timeout, 0); const txn_res = await this.etcd.etcd_call('/kv/txn', etcd_request, this.config.etcd_mon_timeout, 0);
return txn_res.succeeded; return txn_res.succeeded;
@ -505,6 +569,10 @@ class Mon
// Schedule next recheck at least at <unixtime> // Schedule next recheck at least at <unixtime>
schedule_next_recheck_at(unixtime) schedule_next_recheck_at(unixtime)
{ {
if (this.stopped)
{
return;
}
this.next_recheck_at = !this.next_recheck_at || this.next_recheck_at > unixtime this.next_recheck_at = !this.next_recheck_at || this.next_recheck_at > unixtime
? unixtime : this.next_recheck_at; ? unixtime : this.next_recheck_at;
const now = Date.now()/1000; const now = Date.now()/1000;
@ -533,6 +601,10 @@ class Mon
// This is required for multiple change events to trigger at most 1 recheck in 1s // This is required for multiple change events to trigger at most 1 recheck in 1s
schedule_recheck() schedule_recheck()
{ {
if (this.stopped)
{
return;
}
if (!this.recheck_timer) if (!this.recheck_timer)
{ {
this.recheck_timer = setTimeout(() => this.recheck_timer = setTimeout(() =>
@ -553,13 +625,13 @@ class Mon
stats.object_bytes = object_bytes; stats.object_bytes = object_bytes;
stats = serialize_bigints(stats); stats = serialize_bigints(stats);
inode_stats = serialize_bigints(inode_stats); inode_stats = serialize_bigints(inode_stats);
txn.push({ requestPut: { key: b64(this.etcd_prefix+'/stats'), value: b64(JSON.stringify(stats)) } }); txn.push({ requestPut: { key: b64(this.config.etcd_prefix+'/stats'), value: b64(JSON.stringify(stats)) } });
for (const pool_id in inode_stats) for (const pool_id in inode_stats)
{ {
for (const inode_num in inode_stats[pool_id]) for (const inode_num in inode_stats[pool_id])
{ {
txn.push({ requestPut: { txn.push({ requestPut: {
key: b64(this.etcd_prefix+'/inode/stats/'+pool_id+'/'+inode_num), key: b64(this.config.etcd_prefix+'/inode/stats/'+pool_id+'/'+inode_num),
value: b64(JSON.stringify(inode_stats[pool_id][inode_num])), value: b64(JSON.stringify(inode_stats[pool_id][inode_num])),
} }); } });
} }
@ -571,7 +643,7 @@ class Mon
if (!inode_stats[pool_id] || !inode_stats[pool_id][inode_num]) if (!inode_stats[pool_id] || !inode_stats[pool_id][inode_num])
{ {
txn.push({ requestDeleteRange: { txn.push({ requestDeleteRange: {
key: b64(this.etcd_prefix+'/inode/stats/'+pool_id+'/'+inode_num), key: b64(this.config.etcd_prefix+'/inode/stats/'+pool_id+'/'+inode_num),
} }); } });
} }
} }
@ -581,7 +653,7 @@ class Mon
if (!seen_pools[pool_id]) if (!seen_pools[pool_id])
{ {
txn.push({ requestDeleteRange: { txn.push({ requestDeleteRange: {
key: b64(this.etcd_prefix+'/pool/stats/'+pool_id), key: b64(this.config.etcd_prefix+'/pool/stats/'+pool_id),
} }); } });
delete this.state.pool.stats[pool_id]; delete this.state.pool.stats[pool_id];
} }
@ -590,7 +662,7 @@ class Mon
const pool_stats = { ...this.state.pool.stats[pool_id] }; const pool_stats = { ...this.state.pool.stats[pool_id] };
serialize_bigints(pool_stats); serialize_bigints(pool_stats);
txn.push({ requestPut: { txn.push({ requestPut: {
key: b64(this.etcd_prefix+'/pool/stats/'+pool_id), key: b64(this.config.etcd_prefix+'/pool/stats/'+pool_id),
value: b64(JSON.stringify(pool_stats)), value: b64(JSON.stringify(pool_stats)),
} }); } });
} }
@ -603,7 +675,7 @@ class Mon
schedule_update_stats() schedule_update_stats()
{ {
if (this.stats_timer) if (this.stopped || this.stats_timer)
{ {
return; return;
} }
@ -622,7 +694,7 @@ class Mon
} }
kv.key = de64(kv.key); kv.key = de64(kv.key);
kv.value = kv.value ? de64(kv.value) : null; kv.value = kv.value ? de64(kv.value) : null;
let key = kv.key.substr(this.etcd_prefix.length+1); let key = kv.key.substr(this.config.etcd_prefix.length+1);
if (!etcd_allow.exec(key)) if (!etcd_allow.exec(key))
{ {
console.log('Bad key in etcd: '+kv.key+' = '+kv.value); console.log('Bad key in etcd: '+kv.key+' = '+kv.value);
@ -652,8 +724,9 @@ class Mon
cur[key_parts[key_parts.length-1]] = kv.value; cur[key_parts[key_parts.length-1]] = kv.value;
if (key === 'config/global') if (key === 'config/global')
{ {
this.config = { ...this.initConfig, ...this.state.config.global }; this.config = { ...this.fileConfig, ...this.state.config.global, ...this.cliConfig };
this.check_config(); this.check_config();
this.etcd.parse_config(this.config);
for (const osd_num in this.state.osd.stats) for (const osd_num in this.state.osd.stats)
{ {
// Recheck PGs <osd_out_time> later // Recheck PGs <osd_out_time> later
@ -686,11 +759,12 @@ class Mon
} }
} }
_die(err, code) _die(err)
{ {
// In fact we can just try to rejoin // Stop this instance of Monitor so we can restart
console.error(err instanceof Error ? err : new Error(err || 'Cluster connection failed')); console.error(err instanceof Error ? err : new Error(err || 'Cluster connection failed'));
process.exit(code || 2); this.on_stop().catch(console.error);
this.on_die();
} }
local_ips(all) local_ips(all)

View File

@ -253,7 +253,7 @@ void etcd_state_client_t::parse_config(const json11::Json & config)
this->etcd_ws_keepalive_interval = config["etcd_ws_keepalive_interval"].uint64_value(); this->etcd_ws_keepalive_interval = config["etcd_ws_keepalive_interval"].uint64_value();
if (this->etcd_ws_keepalive_interval <= 0) if (this->etcd_ws_keepalive_interval <= 0)
{ {
this->etcd_ws_keepalive_interval = 30; this->etcd_ws_keepalive_interval = 5;
} }
this->max_etcd_attempts = config["max_etcd_attempts"].uint64_value(); this->max_etcd_attempts = config["max_etcd_attempts"].uint64_value();
if (this->max_etcd_attempts <= 0) if (this->max_etcd_attempts <= 0)

View File

@ -103,7 +103,7 @@ protected:
void pick_next_etcd(); void pick_next_etcd();
public: public:
int etcd_keepalive_timeout = 30; int etcd_keepalive_timeout = 30;
int etcd_ws_keepalive_interval = 30; int etcd_ws_keepalive_interval = 5;
int max_etcd_attempts = 5; int max_etcd_attempts = 5;
int etcd_quick_timeout = 1000; int etcd_quick_timeout = 1000;
int etcd_slow_timeout = 5000; int etcd_slow_timeout = 5000;

View File

@ -54,7 +54,7 @@ for i in $(seq 1 $OSD_COUNT); do
start_osd $i start_osd $i
done done
(while true; do set +e; node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" --verbose 1; if [[ $? -ne 2 ]]; then break; fi; done) >>./testdata/mon.log 2>&1 & node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 >>./testdata/mon.log 2>&1 &
MON_PID=$! MON_PID=$!
if [ "$SCHEME" = "ec" ]; then if [ "$SCHEME" = "ec" ]; then

View File

@ -15,7 +15,7 @@ for i in $(seq 1 $OSD_COUNT); do
eval OSD${i}_PID=$! eval OSD${i}_PID=$!
done done
(while true; do node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 || true; done) >>./testdata/mon.log 2>&1 & node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 >>./testdata/mon.log 2>&1 &
MON_PID=$! MON_PID=$!
sleep 3 sleep 3