Compare commits
No commits in common. "6501abc0603c23e99d4042a42c612943ea001434" and "cf60b6818c31a891b32ffd2c60972fe04b7b3e32" have entirely different histories.
6501abc060
...
cf60b6818c
|
@ -248,7 +248,7 @@ etcd_report_interval to guarantee that keepalive actually works.
|
||||||
## etcd_ws_keepalive_interval
|
## etcd_ws_keepalive_interval
|
||||||
|
|
||||||
- Type: seconds
|
- Type: seconds
|
||||||
- Default: 5
|
- Default: 30
|
||||||
- Can be changed online: yes
|
- Can be changed online: yes
|
||||||
|
|
||||||
etcd websocket ping interval required to keep the connection alive and
|
etcd websocket ping interval required to keep the connection alive and
|
||||||
|
|
|
@ -259,7 +259,7 @@ etcd_report_interval, чтобы keepalive гарантированно рабо
|
||||||
## etcd_ws_keepalive_interval
|
## etcd_ws_keepalive_interval
|
||||||
|
|
||||||
- Тип: секунды
|
- Тип: секунды
|
||||||
- Значение по умолчанию: 5
|
- Значение по умолчанию: 30
|
||||||
- Можно менять на лету: да
|
- Можно менять на лету: да
|
||||||
|
|
||||||
Интервал проверки живости вебсокет-подключений к etcd.
|
Интервал проверки живости вебсокет-подключений к etcd.
|
||||||
|
|
|
@ -282,7 +282,7 @@
|
||||||
etcd_report_interval, чтобы keepalive гарантированно работал.
|
etcd_report_interval, чтобы keepalive гарантированно работал.
|
||||||
- name: etcd_ws_keepalive_interval
|
- name: etcd_ws_keepalive_interval
|
||||||
type: sec
|
type: sec
|
||||||
default: 5
|
default: 30
|
||||||
online: true
|
online: true
|
||||||
info: |
|
info: |
|
||||||
etcd websocket ping interval required to keep the connection alive and
|
etcd websocket ping interval required to keep the connection alive and
|
||||||
|
|
|
@ -4,8 +4,6 @@
|
||||||
const http = require('http');
|
const http = require('http');
|
||||||
const WebSocket = require('ws');
|
const WebSocket = require('ws');
|
||||||
|
|
||||||
const MON_STOPPED = 'Monitor instance is stopped';
|
|
||||||
|
|
||||||
class EtcdAdapter
|
class EtcdAdapter
|
||||||
{
|
{
|
||||||
constructor(mon)
|
constructor(mon)
|
||||||
|
@ -68,12 +66,10 @@ class EtcdAdapter
|
||||||
return this.selected_etcd_url;
|
return this.selected_etcd_url;
|
||||||
}
|
}
|
||||||
|
|
||||||
stop_watcher(cur_addr)
|
restart_watcher(cur_addr)
|
||||||
{
|
{
|
||||||
cur_addr = cur_addr || this.selected_etcd_url;
|
|
||||||
if (this.ws)
|
if (this.ws)
|
||||||
{
|
{
|
||||||
console.log('Disconnected from etcd at '+this.ws_used_url);
|
|
||||||
this.ws.close();
|
this.ws.close();
|
||||||
this.ws = null;
|
this.ws = null;
|
||||||
}
|
}
|
||||||
|
@ -86,11 +82,6 @@ class EtcdAdapter
|
||||||
{
|
{
|
||||||
this.selected_etcd_url = null;
|
this.selected_etcd_url = null;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
restart_watcher(cur_addr)
|
|
||||||
{
|
|
||||||
this.stop_watcher(cur_addr);
|
|
||||||
this.start_watcher(this.mon.config.etcd_mon_retries).catch(this.mon.die);
|
this.start_watcher(this.mon.config.etcd_mon_retries).catch(this.mon.die);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,30 +98,21 @@ class EtcdAdapter
|
||||||
const cur_addr = this.pick_next_etcd();
|
const cur_addr = this.pick_next_etcd();
|
||||||
const base = 'ws'+cur_addr.substr(4);
|
const base = 'ws'+cur_addr.substr(4);
|
||||||
let now = Date.now();
|
let now = Date.now();
|
||||||
if (tried[base] && now-tried[base] < this.mon.config.etcd_start_timeout)
|
if (tried[base] && now-tried[base] < this.mon.etcd_start_timeout)
|
||||||
{
|
{
|
||||||
await new Promise(ok => setTimeout(ok, this.mon.config.etcd_start_timeout-(now-tried[base])));
|
await new Promise(ok => setTimeout(ok, this.mon.etcd_start_timeout-(now-tried[base])));
|
||||||
now = Date.now();
|
now = Date.now();
|
||||||
}
|
}
|
||||||
tried[base] = now;
|
tried[base] = now;
|
||||||
if (this.mon.stopped)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
const ok = await new Promise(ok =>
|
const ok = await new Promise(ok =>
|
||||||
{
|
{
|
||||||
const timer_id = setTimeout(() =>
|
const timer_id = setTimeout(() =>
|
||||||
{
|
{
|
||||||
if (this.ws)
|
|
||||||
{
|
|
||||||
console.log('Disconnected from etcd at '+this.ws_used_url);
|
|
||||||
this.ws.close();
|
this.ws.close();
|
||||||
this.ws = null;
|
this.ws = null;
|
||||||
}
|
|
||||||
ok(false);
|
ok(false);
|
||||||
}, this.mon.config.etcd_mon_timeout);
|
}, this.mon.config.etcd_mon_timeout);
|
||||||
this.ws = new WebSocket(base+'/watch');
|
this.ws = new WebSocket(base+'/watch');
|
||||||
this.ws_used_url = cur_addr;
|
|
||||||
const fail = () =>
|
const fail = () =>
|
||||||
{
|
{
|
||||||
ok(false);
|
ok(false);
|
||||||
|
@ -153,19 +135,13 @@ class EtcdAdapter
|
||||||
}
|
}
|
||||||
if (!this.ws)
|
if (!this.ws)
|
||||||
{
|
{
|
||||||
this.mon.die('Failed to open etcd watch websocket');
|
this.mon.failconnect('Failed to open etcd watch websocket');
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (this.mon.stopped)
|
|
||||||
{
|
|
||||||
this.stop_watcher();
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
const cur_addr = this.selected_etcd_url;
|
const cur_addr = this.selected_etcd_url;
|
||||||
this.ws_alive = true;
|
this.ws_alive = true;
|
||||||
this.ws_keepalive_timer = setInterval(() =>
|
this.ws_keepalive_timer = setInterval(() =>
|
||||||
{
|
{
|
||||||
if (this.ws_alive && this.ws)
|
if (this.ws_alive)
|
||||||
{
|
{
|
||||||
this.ws_alive = false;
|
this.ws_alive = false;
|
||||||
this.ws.send(JSON.stringify({ progress_request: {} }));
|
this.ws.send(JSON.stringify({ progress_request: {} }));
|
||||||
|
@ -175,12 +151,12 @@ class EtcdAdapter
|
||||||
console.log('etcd websocket timed out, restarting it');
|
console.log('etcd websocket timed out, restarting it');
|
||||||
this.restart_watcher(cur_addr);
|
this.restart_watcher(cur_addr);
|
||||||
}
|
}
|
||||||
}, (Number(this.mon.config.etcd_ws_keepalive_interval) || 5)*1000);
|
}, (Number(this.mon.config.etcd_ws_keepalive_interval) || 30)*1000);
|
||||||
this.ws.on('error', () => this.restart_watcher(cur_addr));
|
this.ws.on('error', () => this.restart_watcher(cur_addr));
|
||||||
this.ws.send(JSON.stringify({
|
this.ws.send(JSON.stringify({
|
||||||
create_request: {
|
create_request: {
|
||||||
key: b64(this.mon.config.etcd_prefix+'/'),
|
key: b64(this.mon.etcd_prefix+'/'),
|
||||||
range_end: b64(this.mon.config.etcd_prefix+'0'),
|
range_end: b64(this.mon.etcd_prefix+'0'),
|
||||||
start_revision: ''+this.mon.etcd_watch_revision,
|
start_revision: ''+this.mon.etcd_watch_revision,
|
||||||
watch_id: 1,
|
watch_id: 1,
|
||||||
progress_notify: true,
|
progress_notify: true,
|
||||||
|
@ -188,11 +164,6 @@ class EtcdAdapter
|
||||||
}));
|
}));
|
||||||
this.ws.on('message', (msg) =>
|
this.ws.on('message', (msg) =>
|
||||||
{
|
{
|
||||||
if (this.mon.stopped)
|
|
||||||
{
|
|
||||||
this.stop_watcher();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
this.ws_alive = true;
|
this.ws_alive = true;
|
||||||
let data;
|
let data;
|
||||||
try
|
try
|
||||||
|
@ -212,14 +183,15 @@ class EtcdAdapter
|
||||||
if (data.result.compact_revision)
|
if (data.result.compact_revision)
|
||||||
{
|
{
|
||||||
// we may miss events if we proceed
|
// we may miss events if we proceed
|
||||||
this.mon.die('Revisions before '+data.result.compact_revision+' were compacted by etcd, exiting');
|
console.error('Revisions before '+data.result.compact_revision+' were compacted by etcd, exiting');
|
||||||
|
this.mon.on_stop(1);
|
||||||
}
|
}
|
||||||
this.mon.die('Watch canceled by etcd, reason: '+data.result.cancel_reason+', exiting');
|
console.error('Watch canceled by etcd, reason: '+data.result.cancel_reason+', exiting');
|
||||||
|
this.mon.on_stop(1);
|
||||||
}
|
}
|
||||||
else if (data.result.created)
|
else if (data.result.created)
|
||||||
{
|
{
|
||||||
// etcd watch created
|
// etcd watch created
|
||||||
console.log('Successfully subscribed to etcd at '+this.selected_etcd_url+', revision '+data.result.header.revision);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -235,15 +207,15 @@ class EtcdAdapter
|
||||||
while (1)
|
while (1)
|
||||||
{
|
{
|
||||||
const res = await this.etcd_call('/kv/txn', {
|
const res = await this.etcd_call('/kv/txn', {
|
||||||
compare: [ { target: 'CREATE', create_revision: 0, key: b64(this.mon.config.etcd_prefix+'/mon/master') } ],
|
compare: [ { target: 'CREATE', create_revision: 0, key: b64(this.mon.etcd_prefix+'/mon/master') } ],
|
||||||
success: [ { requestPut: { key: b64(this.mon.config.etcd_prefix+'/mon/master'), value: b64(JSON.stringify(state)), lease: ''+this.mon.etcd_lease_id } } ],
|
success: [ { requestPut: { key: b64(this.mon.etcd_prefix+'/mon/master'), value: b64(JSON.stringify(state)), lease: ''+this.mon.etcd_lease_id } } ],
|
||||||
}, this.mon.config.etcd_start_timeout, 0);
|
}, this.mon.etcd_start_timeout, 0);
|
||||||
if (res.succeeded)
|
if (res.succeeded)
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
console.log('Waiting to become master');
|
console.log('Waiting to become master');
|
||||||
await new Promise(ok => setTimeout(ok, this.mon.config.etcd_start_timeout));
|
await new Promise(ok => setTimeout(ok, this.mon.etcd_start_timeout));
|
||||||
}
|
}
|
||||||
console.log('Became master');
|
console.log('Became master');
|
||||||
}
|
}
|
||||||
|
@ -267,33 +239,25 @@ class EtcdAdapter
|
||||||
now = Date.now();
|
now = Date.now();
|
||||||
}
|
}
|
||||||
tried[base] = now;
|
tried[base] = now;
|
||||||
if (this.mon.stopped)
|
|
||||||
{
|
|
||||||
throw new Error(MON_STOPPED);
|
|
||||||
}
|
|
||||||
const res = await POST(base+path, body, timeout);
|
const res = await POST(base+path, body, timeout);
|
||||||
if (this.mon.stopped)
|
|
||||||
{
|
|
||||||
throw new Error(MON_STOPPED);
|
|
||||||
}
|
|
||||||
if (res.error)
|
if (res.error)
|
||||||
{
|
{
|
||||||
if (this.selected_etcd_url == base)
|
if (this.selected_etcd_url == base)
|
||||||
this.selected_etcd_url = null;
|
this.selected_etcd_url = null;
|
||||||
console.error('Failed to query etcd '+path+' (retry '+retry+'/'+retries+'): '+res.error);
|
console.error('failed to query etcd: '+res.error);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (res.json)
|
if (res.json)
|
||||||
{
|
{
|
||||||
if (res.json.error)
|
if (res.json.error)
|
||||||
{
|
{
|
||||||
console.error(path+': etcd returned error: '+res.json.error);
|
console.error('etcd returned error: '+res.json.error);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
return res.json;
|
return res.json;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
throw new Error('Failed to query etcd ('+retries+' retries)');
|
this.mon.failconnect();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -91,7 +91,7 @@ const etcd_tree = {
|
||||||
etcd_quick_timeout: 1000, // ms
|
etcd_quick_timeout: 1000, // ms
|
||||||
etcd_slow_timeout: 5000, // ms
|
etcd_slow_timeout: 5000, // ms
|
||||||
etcd_keepalive_timeout: 30, // seconds, default is max(30, etcd_report_interval*2)
|
etcd_keepalive_timeout: 30, // seconds, default is max(30, etcd_report_interval*2)
|
||||||
etcd_ws_keepalive_interval: 5, // seconds
|
etcd_ws_keepalive_interval: 30, // seconds
|
||||||
// osd
|
// osd
|
||||||
etcd_report_interval: 5, // seconds
|
etcd_report_interval: 5, // seconds
|
||||||
etcd_stats_interval: 30, // seconds
|
etcd_stats_interval: 30, // seconds
|
||||||
|
|
|
@ -23,4 +23,4 @@ for (let i = 2; i < process.argv.length; i++)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Mon.run_forever(options);
|
new Mon(options).start().catch(e => { console.error(e); process.exit(1); });
|
||||||
|
|
202
mon/mon.js
202
mon/mon.js
|
@ -15,48 +15,27 @@ const { recheck_primary, save_new_pgs_txn, generate_pool_pgs } = require('./pg_g
|
||||||
|
|
||||||
class Mon
|
class Mon
|
||||||
{
|
{
|
||||||
static run_forever(config)
|
|
||||||
{
|
|
||||||
let mon;
|
|
||||||
const run = () =>
|
|
||||||
{
|
|
||||||
console.log('Starting Monitor');
|
|
||||||
const my_mon = new Mon(config);
|
|
||||||
mon = my_mon;
|
|
||||||
my_mon.on_die = () =>
|
|
||||||
{
|
|
||||||
if (mon == my_mon)
|
|
||||||
{
|
|
||||||
// Start a new instance
|
|
||||||
run();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
my_mon.start().catch(my_mon.die);
|
|
||||||
};
|
|
||||||
run();
|
|
||||||
const on_stop_cb = () => mon.on_stop().then(() => process.exit(0)).catch(err =>
|
|
||||||
{
|
|
||||||
console.error(err);
|
|
||||||
process.exit(0);
|
|
||||||
});
|
|
||||||
process.on('SIGINT', on_stop_cb);
|
|
||||||
process.on('SIGTERM', on_stop_cb);
|
|
||||||
}
|
|
||||||
|
|
||||||
constructor(config)
|
constructor(config)
|
||||||
{
|
{
|
||||||
this.stopped = false;
|
this.failconnect = (e) => this._die(e, 2);
|
||||||
this.die = (e) => this._die(e);
|
this.die = (e) => this._die(e, 1);
|
||||||
this.fileConfig = {};
|
|
||||||
if (fs.existsSync(config.config_path||'/etc/vitastor/vitastor.conf'))
|
if (fs.existsSync(config.config_path||'/etc/vitastor/vitastor.conf'))
|
||||||
{
|
{
|
||||||
this.fileConfig = JSON.parse(fs.readFileSync(config.config_path||'/etc/vitastor/vitastor.conf', { encoding: 'utf-8' }));
|
config = {
|
||||||
|
...JSON.parse(fs.readFileSync(config.config_path||'/etc/vitastor/vitastor.conf', { encoding: 'utf-8' })),
|
||||||
|
...config,
|
||||||
|
};
|
||||||
}
|
}
|
||||||
this.cliConfig = config;
|
this.verbose = config.verbose || 0;
|
||||||
this.config = { ...this.fileConfig, ...this.cliConfig };
|
this.initConfig = config;
|
||||||
this.check_config();
|
this.config = { ...config };
|
||||||
|
this.etcd_prefix = config.etcd_prefix || '/vitastor';
|
||||||
|
this.etcd_prefix = this.etcd_prefix.replace(/\/\/+/g, '/').replace(/^\/?(.*[^\/])\/?$/, '/$1');
|
||||||
|
this.etcd_start_timeout = (config.etcd_start_timeout || 5) * 1000;
|
||||||
this.state = JSON.parse(JSON.stringify(etcd_tree));
|
this.state = JSON.parse(JSON.stringify(etcd_tree));
|
||||||
this.prev_stats = { osd_stats: {}, osd_diff: {} };
|
this.prev_stats = { osd_stats: {}, osd_diff: {} };
|
||||||
|
this.signals_set = false;
|
||||||
|
this.on_stop_cb = () => this.on_stop(0).catch(console.error);
|
||||||
this.recheck_pgs_active = false;
|
this.recheck_pgs_active = false;
|
||||||
this.etcd = new EtcdAdapter(this);
|
this.etcd = new EtcdAdapter(this);
|
||||||
this.etcd.parse_config(this.config);
|
this.etcd.parse_config(this.config);
|
||||||
|
@ -86,19 +65,17 @@ class Mon
|
||||||
async load_config()
|
async load_config()
|
||||||
{
|
{
|
||||||
const res = await this.etcd.etcd_call('/kv/txn', { success: [
|
const res = await this.etcd.etcd_call('/kv/txn', { success: [
|
||||||
{ requestRange: { key: b64(this.config.etcd_prefix+'/config/global') } }
|
{ requestRange: { key: b64(this.etcd_prefix+'/config/global') } }
|
||||||
] }, this.config.etcd_start_timeout, -1);
|
] }, this.etcd_start_timeout, -1);
|
||||||
if (res.responses[0].response_range.kvs)
|
if (res.responses[0].response_range.kvs)
|
||||||
{
|
{
|
||||||
this.parse_kv(res.responses[0].response_range.kvs[0]);
|
this.parse_kv(res.responses[0].response_range.kvs[0]);
|
||||||
}
|
}
|
||||||
|
this.check_config();
|
||||||
}
|
}
|
||||||
|
|
||||||
check_config()
|
check_config()
|
||||||
{
|
{
|
||||||
this.config.etcd_prefix = this.config.etcd_prefix || '/vitastor';
|
|
||||||
this.config.etcd_prefix = this.config.etcd_prefix.replace(/\/\/+/g, '/').replace(/^\/?(.*[^\/])\/?$/, '/$1');
|
|
||||||
this.config.etcd_start_timeout = (this.config.etcd_start_timeout || 5) * 1000;
|
|
||||||
this.config.etcd_mon_ttl = Number(this.config.etcd_mon_ttl) || 5;
|
this.config.etcd_mon_ttl = Number(this.config.etcd_mon_ttl) || 5;
|
||||||
if (this.config.etcd_mon_ttl < 1)
|
if (this.config.etcd_mon_ttl < 1)
|
||||||
{
|
{
|
||||||
|
@ -140,7 +117,7 @@ class Mon
|
||||||
on_message(msg)
|
on_message(msg)
|
||||||
{
|
{
|
||||||
let stats_changed = false, changed = false, pg_states_changed = false;
|
let stats_changed = false, changed = false, pg_states_changed = false;
|
||||||
if (this.config.verbose)
|
if (this.verbose)
|
||||||
{
|
{
|
||||||
console.log('Revision '+msg.header.revision+' events: ');
|
console.log('Revision '+msg.header.revision+' events: ');
|
||||||
}
|
}
|
||||||
|
@ -148,7 +125,7 @@ class Mon
|
||||||
for (const e of msg.events||[])
|
for (const e of msg.events||[])
|
||||||
{
|
{
|
||||||
this.parse_kv(e.kv);
|
this.parse_kv(e.kv);
|
||||||
const key = e.kv.key.substr(this.config.etcd_prefix.length);
|
const key = e.kv.key.substr(this.etcd_prefix.length);
|
||||||
if (key.substr(0, 11) == '/osd/state/')
|
if (key.substr(0, 11) == '/osd/state/')
|
||||||
{
|
{
|
||||||
stats_changed = true;
|
stats_changed = true;
|
||||||
|
@ -166,7 +143,7 @@ class Mon
|
||||||
{
|
{
|
||||||
changed = true;
|
changed = true;
|
||||||
}
|
}
|
||||||
if (this.config.verbose)
|
if (this.verbose)
|
||||||
{
|
{
|
||||||
console.log(JSON.stringify(e));
|
console.log(JSON.stringify(e));
|
||||||
}
|
}
|
||||||
|
@ -188,10 +165,6 @@ class Mon
|
||||||
// Schedule save_last_clean() to to run after a small timeout (1s) (to not spam etcd)
|
// Schedule save_last_clean() to to run after a small timeout (1s) (to not spam etcd)
|
||||||
schedule_save_last_clean()
|
schedule_save_last_clean()
|
||||||
{
|
{
|
||||||
if (this.stopped)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (!this.save_last_clean_timer)
|
if (!this.save_last_clean_timer)
|
||||||
{
|
{
|
||||||
this.save_last_clean_timer = setTimeout(() =>
|
this.save_last_clean_timer = setTimeout(() =>
|
||||||
|
@ -243,10 +216,10 @@ class Mon
|
||||||
this.state.history.last_clean_pgs = new_clean_pgs;
|
this.state.history.last_clean_pgs = new_clean_pgs;
|
||||||
await this.etcd.etcd_call('/kv/txn', {
|
await this.etcd.etcd_call('/kv/txn', {
|
||||||
success: [ { requestPut: {
|
success: [ { requestPut: {
|
||||||
key: b64(this.config.etcd_prefix+'/history/last_clean_pgs'),
|
key: b64(this.etcd_prefix+'/history/last_clean_pgs'),
|
||||||
value: b64(JSON.stringify(this.state.history.last_clean_pgs))
|
value: b64(JSON.stringify(this.state.history.last_clean_pgs))
|
||||||
} } ],
|
} } ],
|
||||||
}, this.config.etcd_start_timeout, 0);
|
}, this.etcd_start_timeout, 0);
|
||||||
this.save_last_clean_running = false;
|
this.save_last_clean_running = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -264,72 +237,39 @@ class Mon
|
||||||
// Register in /mon/member, just for the information
|
// Register in /mon/member, just for the information
|
||||||
const state = this.get_mon_state();
|
const state = this.get_mon_state();
|
||||||
res = await this.etcd.etcd_call('/kv/put', {
|
res = await this.etcd.etcd_call('/kv/put', {
|
||||||
key: b64(this.config.etcd_prefix+'/mon/member/'+this.etcd_lease_id),
|
key: b64(this.etcd_prefix+'/mon/member/'+this.etcd_lease_id),
|
||||||
value: b64(JSON.stringify(state)),
|
value: b64(JSON.stringify(state)),
|
||||||
lease: ''+this.etcd_lease_id
|
lease: ''+this.etcd_lease_id
|
||||||
}, this.config.etcd_start_timeout, 0);
|
}, this.etcd_start_timeout, 0);
|
||||||
// Set refresh timer
|
// Set refresh timer
|
||||||
this.lease_timer = setInterval(() =>
|
this.lease_timer = setInterval(async () =>
|
||||||
{
|
|
||||||
this.etcd.etcd_call('/lease/keepalive', { ID: this.etcd_lease_id }, this.config.etcd_mon_timeout, this.config.etcd_mon_retries)
|
|
||||||
.then(res =>
|
|
||||||
{
|
{
|
||||||
|
const res = await this.etcd.etcd_call('/lease/keepalive', { ID: this.etcd_lease_id }, this.config.etcd_mon_timeout, this.config.etcd_mon_retries);
|
||||||
if (!res.result.TTL)
|
if (!res.result.TTL)
|
||||||
this.die('Lease expired');
|
{
|
||||||
})
|
this.failconnect('Lease expired');
|
||||||
.catch(this.die);
|
}
|
||||||
}, this.config.etcd_mon_ttl*1000);
|
}, this.config.etcd_mon_ttl*1000);
|
||||||
|
if (!this.signals_set)
|
||||||
|
{
|
||||||
|
process.on('SIGINT', this.on_stop_cb);
|
||||||
|
process.on('SIGTERM', this.on_stop_cb);
|
||||||
|
this.signals_set = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async on_stop()
|
async on_stop(status)
|
||||||
{
|
|
||||||
console.log('Stopping Monitor');
|
|
||||||
this.etcd.stop_watcher();
|
|
||||||
if (this.save_last_clean_timer)
|
|
||||||
{
|
|
||||||
clearTimeout(this.save_last_clean_timer);
|
|
||||||
this.save_last_clean_timer = null;
|
|
||||||
}
|
|
||||||
if (this.next_recheck_timer)
|
|
||||||
{
|
|
||||||
clearTimeout(this.next_recheck_timer);
|
|
||||||
this.next_recheck_timer = null;
|
|
||||||
}
|
|
||||||
if (this.recheck_timer)
|
|
||||||
{
|
|
||||||
clearTimeout(this.recheck_timer);
|
|
||||||
this.recheck_timer = null;
|
|
||||||
}
|
|
||||||
if (this.stats_timer)
|
|
||||||
{
|
|
||||||
clearTimeout(this.stats_timer);
|
|
||||||
this.stats_timer = null;
|
|
||||||
}
|
|
||||||
if (this.lease_timer)
|
|
||||||
{
|
{
|
||||||
clearInterval(this.lease_timer);
|
clearInterval(this.lease_timer);
|
||||||
this.lease_timer = null;
|
await this.etcd.etcd_call('/lease/revoke', { ID: this.etcd_lease_id }, this.config.etcd_mon_timeout, this.config.etcd_mon_retries);
|
||||||
}
|
process.exit(status);
|
||||||
let p = null;
|
|
||||||
if (this.etcd_lease_id)
|
|
||||||
{
|
|
||||||
const lease_id = this.etcd_lease_id;
|
|
||||||
this.etcd_lease_id = null;
|
|
||||||
p = this.etcd.etcd_call('/lease/revoke', { ID: lease_id }, this.config.etcd_mon_timeout, this.config.etcd_mon_retries);
|
|
||||||
}
|
|
||||||
// 'stopped' flag prevents all further etcd communications of this instance
|
|
||||||
this.stopped = true;
|
|
||||||
if (p)
|
|
||||||
{
|
|
||||||
await p;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async load_cluster_state()
|
async load_cluster_state()
|
||||||
{
|
{
|
||||||
const res = await this.etcd.etcd_call('/kv/txn', { success: [
|
const res = await this.etcd.etcd_call('/kv/txn', { success: [
|
||||||
{ requestRange: { key: b64(this.config.etcd_prefix+'/'), range_end: b64(this.config.etcd_prefix+'0') } },
|
{ requestRange: { key: b64(this.etcd_prefix+'/'), range_end: b64(this.etcd_prefix+'0') } },
|
||||||
] }, this.config.etcd_start_timeout, -1);
|
] }, this.etcd_start_timeout, -1);
|
||||||
this.etcd_watch_revision = BigInt(res.header.revision)+BigInt(1);
|
this.etcd_watch_revision = BigInt(res.header.revision)+BigInt(1);
|
||||||
this.state = JSON.parse(JSON.stringify(etcd_tree));
|
this.state = JSON.parse(JSON.stringify(etcd_tree));
|
||||||
for (const response of res.responses)
|
for (const response of res.responses)
|
||||||
|
@ -376,17 +316,17 @@ class Mon
|
||||||
const checks = [];
|
const checks = [];
|
||||||
for (const osd_num of this.all_osds())
|
for (const osd_num of this.all_osds())
|
||||||
{
|
{
|
||||||
const key = b64(this.config.etcd_prefix+'/osd/state/'+osd_num);
|
const key = b64(this.etcd_prefix+'/osd/state/'+osd_num);
|
||||||
checks.push({ key, target: 'MOD', result: 'LESS', mod_revision: ''+this.etcd_watch_revision });
|
checks.push({ key, target: 'MOD', result: 'LESS', mod_revision: ''+this.etcd_watch_revision });
|
||||||
}
|
}
|
||||||
await this.etcd.etcd_call('/kv/txn', {
|
await this.etcd.etcd_call('/kv/txn', {
|
||||||
compare: [
|
compare: [
|
||||||
{ key: b64(this.config.etcd_prefix+'/mon/master'), target: 'LEASE', lease: ''+this.etcd_lease_id },
|
{ key: b64(this.etcd_prefix+'/mon/master'), target: 'LEASE', lease: ''+this.etcd_lease_id },
|
||||||
{ key: b64(this.config.etcd_prefix+'/config/pgs'), target: 'MOD', mod_revision: ''+this.etcd_watch_revision, result: 'LESS' },
|
{ key: b64(this.etcd_prefix+'/config/pgs'), target: 'MOD', mod_revision: ''+this.etcd_watch_revision, result: 'LESS' },
|
||||||
...checks,
|
...checks,
|
||||||
],
|
],
|
||||||
success: [
|
success: [
|
||||||
{ requestPut: { key: b64(this.config.etcd_prefix+'/config/pgs'), value: b64(JSON.stringify(new_cfg)) } },
|
{ requestPut: { key: b64(this.etcd_prefix+'/config/pgs'), value: b64(JSON.stringify(new_cfg)) } },
|
||||||
],
|
],
|
||||||
}, this.config.etcd_mon_timeout, 0);
|
}, this.config.etcd_mon_timeout, 0);
|
||||||
return false;
|
return false;
|
||||||
|
@ -396,10 +336,6 @@ class Mon
|
||||||
|
|
||||||
async recheck_pgs()
|
async recheck_pgs()
|
||||||
{
|
{
|
||||||
if (this.stopped)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (this.recheck_pgs_active)
|
if (this.recheck_pgs_active)
|
||||||
{
|
{
|
||||||
this.schedule_recheck();
|
this.schedule_recheck();
|
||||||
|
@ -501,9 +437,9 @@ class Mon
|
||||||
}
|
}
|
||||||
// Also delete pool statistics
|
// Also delete pool statistics
|
||||||
etcd_request.success.push({ requestDeleteRange: {
|
etcd_request.success.push({ requestDeleteRange: {
|
||||||
key: b64(this.config.etcd_prefix+'/pool/stats/'+pool_id),
|
key: b64(this.etcd_prefix+'/pool/stats/'+pool_id),
|
||||||
} });
|
} });
|
||||||
save_new_pgs_txn(new_config_pgs, etcd_request, this.state, this.config.etcd_prefix,
|
save_new_pgs_txn(new_config_pgs, etcd_request, this.state, this.etcd_prefix,
|
||||||
this.etcd_watch_revision, pool_id, up_osds, osd_tree, prev_pgs, [], []);
|
this.etcd_watch_revision, pool_id, up_osds, osd_tree, prev_pgs, [], []);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -534,8 +470,8 @@ class Mon
|
||||||
pg_history = scale_pg_history(pg_history, real_prev_pgs, pool_res.pgs);
|
pg_history = scale_pg_history(pg_history, real_prev_pgs, pool_res.pgs);
|
||||||
// Drop stats
|
// Drop stats
|
||||||
etcd_request.success.push({ requestDeleteRange: {
|
etcd_request.success.push({ requestDeleteRange: {
|
||||||
key: b64(this.config.etcd_prefix+'/pg/stats/'+pool_id+'/'),
|
key: b64(this.etcd_prefix+'/pg/stats/'+pool_id+'/'),
|
||||||
range_end: b64(this.config.etcd_prefix+'/pg/stats/'+pool_id+'0'),
|
range_end: b64(this.etcd_prefix+'/pg/stats/'+pool_id+'0'),
|
||||||
} });
|
} });
|
||||||
}
|
}
|
||||||
const stats = {
|
const stats = {
|
||||||
|
@ -543,10 +479,10 @@ class Mon
|
||||||
...pool_res.stats,
|
...pool_res.stats,
|
||||||
};
|
};
|
||||||
etcd_request.success.push({ requestPut: {
|
etcd_request.success.push({ requestPut: {
|
||||||
key: b64(this.config.etcd_prefix+'/pool/stats/'+pool_id),
|
key: b64(this.etcd_prefix+'/pool/stats/'+pool_id),
|
||||||
value: b64(JSON.stringify(stats)),
|
value: b64(JSON.stringify(stats)),
|
||||||
} });
|
} });
|
||||||
save_new_pgs_txn(new_config_pgs, etcd_request, this.state, this.config.etcd_prefix,
|
save_new_pgs_txn(new_config_pgs, etcd_request, this.state, this.etcd_prefix,
|
||||||
this.etcd_watch_revision, pool_id, up_osds, osd_tree, real_prev_pgs, pool_res.pgs, pg_history);
|
this.etcd_watch_revision, pool_id, up_osds, osd_tree, real_prev_pgs, pool_res.pgs, pg_history);
|
||||||
}
|
}
|
||||||
new_config_pgs.hash = tree_hash;
|
new_config_pgs.hash = tree_hash;
|
||||||
|
@ -556,11 +492,11 @@ class Mon
|
||||||
async save_pg_config(new_config_pgs, etcd_request = { compare: [], success: [] })
|
async save_pg_config(new_config_pgs, etcd_request = { compare: [], success: [] })
|
||||||
{
|
{
|
||||||
etcd_request.compare.push(
|
etcd_request.compare.push(
|
||||||
{ key: b64(this.config.etcd_prefix+'/mon/master'), target: 'LEASE', lease: ''+this.etcd_lease_id },
|
{ key: b64(this.etcd_prefix+'/mon/master'), target: 'LEASE', lease: ''+this.etcd_lease_id },
|
||||||
{ key: b64(this.config.etcd_prefix+'/config/pgs'), target: 'MOD', mod_revision: ''+this.etcd_watch_revision, result: 'LESS' },
|
{ key: b64(this.etcd_prefix+'/config/pgs'), target: 'MOD', mod_revision: ''+this.etcd_watch_revision, result: 'LESS' },
|
||||||
);
|
);
|
||||||
etcd_request.success.push(
|
etcd_request.success.push(
|
||||||
{ requestPut: { key: b64(this.config.etcd_prefix+'/config/pgs'), value: b64(JSON.stringify(new_config_pgs)) } },
|
{ requestPut: { key: b64(this.etcd_prefix+'/config/pgs'), value: b64(JSON.stringify(new_config_pgs)) } },
|
||||||
);
|
);
|
||||||
const txn_res = await this.etcd.etcd_call('/kv/txn', etcd_request, this.config.etcd_mon_timeout, 0);
|
const txn_res = await this.etcd.etcd_call('/kv/txn', etcd_request, this.config.etcd_mon_timeout, 0);
|
||||||
return txn_res.succeeded;
|
return txn_res.succeeded;
|
||||||
|
@ -569,10 +505,6 @@ class Mon
|
||||||
// Schedule next recheck at least at <unixtime>
|
// Schedule next recheck at least at <unixtime>
|
||||||
schedule_next_recheck_at(unixtime)
|
schedule_next_recheck_at(unixtime)
|
||||||
{
|
{
|
||||||
if (this.stopped)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
this.next_recheck_at = !this.next_recheck_at || this.next_recheck_at > unixtime
|
this.next_recheck_at = !this.next_recheck_at || this.next_recheck_at > unixtime
|
||||||
? unixtime : this.next_recheck_at;
|
? unixtime : this.next_recheck_at;
|
||||||
const now = Date.now()/1000;
|
const now = Date.now()/1000;
|
||||||
|
@ -601,10 +533,6 @@ class Mon
|
||||||
// This is required for multiple change events to trigger at most 1 recheck in 1s
|
// This is required for multiple change events to trigger at most 1 recheck in 1s
|
||||||
schedule_recheck()
|
schedule_recheck()
|
||||||
{
|
{
|
||||||
if (this.stopped)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (!this.recheck_timer)
|
if (!this.recheck_timer)
|
||||||
{
|
{
|
||||||
this.recheck_timer = setTimeout(() =>
|
this.recheck_timer = setTimeout(() =>
|
||||||
|
@ -625,13 +553,13 @@ class Mon
|
||||||
stats.object_bytes = object_bytes;
|
stats.object_bytes = object_bytes;
|
||||||
stats = serialize_bigints(stats);
|
stats = serialize_bigints(stats);
|
||||||
inode_stats = serialize_bigints(inode_stats);
|
inode_stats = serialize_bigints(inode_stats);
|
||||||
txn.push({ requestPut: { key: b64(this.config.etcd_prefix+'/stats'), value: b64(JSON.stringify(stats)) } });
|
txn.push({ requestPut: { key: b64(this.etcd_prefix+'/stats'), value: b64(JSON.stringify(stats)) } });
|
||||||
for (const pool_id in inode_stats)
|
for (const pool_id in inode_stats)
|
||||||
{
|
{
|
||||||
for (const inode_num in inode_stats[pool_id])
|
for (const inode_num in inode_stats[pool_id])
|
||||||
{
|
{
|
||||||
txn.push({ requestPut: {
|
txn.push({ requestPut: {
|
||||||
key: b64(this.config.etcd_prefix+'/inode/stats/'+pool_id+'/'+inode_num),
|
key: b64(this.etcd_prefix+'/inode/stats/'+pool_id+'/'+inode_num),
|
||||||
value: b64(JSON.stringify(inode_stats[pool_id][inode_num])),
|
value: b64(JSON.stringify(inode_stats[pool_id][inode_num])),
|
||||||
} });
|
} });
|
||||||
}
|
}
|
||||||
|
@ -643,7 +571,7 @@ class Mon
|
||||||
if (!inode_stats[pool_id] || !inode_stats[pool_id][inode_num])
|
if (!inode_stats[pool_id] || !inode_stats[pool_id][inode_num])
|
||||||
{
|
{
|
||||||
txn.push({ requestDeleteRange: {
|
txn.push({ requestDeleteRange: {
|
||||||
key: b64(this.config.etcd_prefix+'/inode/stats/'+pool_id+'/'+inode_num),
|
key: b64(this.etcd_prefix+'/inode/stats/'+pool_id+'/'+inode_num),
|
||||||
} });
|
} });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -653,7 +581,7 @@ class Mon
|
||||||
if (!seen_pools[pool_id])
|
if (!seen_pools[pool_id])
|
||||||
{
|
{
|
||||||
txn.push({ requestDeleteRange: {
|
txn.push({ requestDeleteRange: {
|
||||||
key: b64(this.config.etcd_prefix+'/pool/stats/'+pool_id),
|
key: b64(this.etcd_prefix+'/pool/stats/'+pool_id),
|
||||||
} });
|
} });
|
||||||
delete this.state.pool.stats[pool_id];
|
delete this.state.pool.stats[pool_id];
|
||||||
}
|
}
|
||||||
|
@ -662,7 +590,7 @@ class Mon
|
||||||
const pool_stats = { ...this.state.pool.stats[pool_id] };
|
const pool_stats = { ...this.state.pool.stats[pool_id] };
|
||||||
serialize_bigints(pool_stats);
|
serialize_bigints(pool_stats);
|
||||||
txn.push({ requestPut: {
|
txn.push({ requestPut: {
|
||||||
key: b64(this.config.etcd_prefix+'/pool/stats/'+pool_id),
|
key: b64(this.etcd_prefix+'/pool/stats/'+pool_id),
|
||||||
value: b64(JSON.stringify(pool_stats)),
|
value: b64(JSON.stringify(pool_stats)),
|
||||||
} });
|
} });
|
||||||
}
|
}
|
||||||
|
@ -675,7 +603,7 @@ class Mon
|
||||||
|
|
||||||
schedule_update_stats()
|
schedule_update_stats()
|
||||||
{
|
{
|
||||||
if (this.stopped || this.stats_timer)
|
if (this.stats_timer)
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -694,7 +622,7 @@ class Mon
|
||||||
}
|
}
|
||||||
kv.key = de64(kv.key);
|
kv.key = de64(kv.key);
|
||||||
kv.value = kv.value ? de64(kv.value) : null;
|
kv.value = kv.value ? de64(kv.value) : null;
|
||||||
let key = kv.key.substr(this.config.etcd_prefix.length+1);
|
let key = kv.key.substr(this.etcd_prefix.length+1);
|
||||||
if (!etcd_allow.exec(key))
|
if (!etcd_allow.exec(key))
|
||||||
{
|
{
|
||||||
console.log('Bad key in etcd: '+kv.key+' = '+kv.value);
|
console.log('Bad key in etcd: '+kv.key+' = '+kv.value);
|
||||||
|
@ -724,9 +652,8 @@ class Mon
|
||||||
cur[key_parts[key_parts.length-1]] = kv.value;
|
cur[key_parts[key_parts.length-1]] = kv.value;
|
||||||
if (key === 'config/global')
|
if (key === 'config/global')
|
||||||
{
|
{
|
||||||
this.config = { ...this.fileConfig, ...this.state.config.global, ...this.cliConfig };
|
this.config = { ...this.initConfig, ...this.state.config.global };
|
||||||
this.check_config();
|
this.check_config();
|
||||||
this.etcd.parse_config(this.config);
|
|
||||||
for (const osd_num in this.state.osd.stats)
|
for (const osd_num in this.state.osd.stats)
|
||||||
{
|
{
|
||||||
// Recheck PGs <osd_out_time> later
|
// Recheck PGs <osd_out_time> later
|
||||||
|
@ -759,12 +686,11 @@ class Mon
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_die(err)
|
_die(err, code)
|
||||||
{
|
{
|
||||||
// Stop this instance of Monitor so we can restart
|
// In fact we can just try to rejoin
|
||||||
console.error(err instanceof Error ? err : new Error(err || 'Cluster connection failed'));
|
console.error(err instanceof Error ? err : new Error(err || 'Cluster connection failed'));
|
||||||
this.on_stop().catch(console.error);
|
process.exit(code || 2);
|
||||||
this.on_die();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
local_ips(all)
|
local_ips(all)
|
||||||
|
|
|
@ -253,7 +253,7 @@ void etcd_state_client_t::parse_config(const json11::Json & config)
|
||||||
this->etcd_ws_keepalive_interval = config["etcd_ws_keepalive_interval"].uint64_value();
|
this->etcd_ws_keepalive_interval = config["etcd_ws_keepalive_interval"].uint64_value();
|
||||||
if (this->etcd_ws_keepalive_interval <= 0)
|
if (this->etcd_ws_keepalive_interval <= 0)
|
||||||
{
|
{
|
||||||
this->etcd_ws_keepalive_interval = 5;
|
this->etcd_ws_keepalive_interval = 30;
|
||||||
}
|
}
|
||||||
this->max_etcd_attempts = config["max_etcd_attempts"].uint64_value();
|
this->max_etcd_attempts = config["max_etcd_attempts"].uint64_value();
|
||||||
if (this->max_etcd_attempts <= 0)
|
if (this->max_etcd_attempts <= 0)
|
||||||
|
|
|
@ -103,7 +103,7 @@ protected:
|
||||||
void pick_next_etcd();
|
void pick_next_etcd();
|
||||||
public:
|
public:
|
||||||
int etcd_keepalive_timeout = 30;
|
int etcd_keepalive_timeout = 30;
|
||||||
int etcd_ws_keepalive_interval = 5;
|
int etcd_ws_keepalive_interval = 30;
|
||||||
int max_etcd_attempts = 5;
|
int max_etcd_attempts = 5;
|
||||||
int etcd_quick_timeout = 1000;
|
int etcd_quick_timeout = 1000;
|
||||||
int etcd_slow_timeout = 5000;
|
int etcd_slow_timeout = 5000;
|
||||||
|
|
|
@ -54,7 +54,7 @@ for i in $(seq 1 $OSD_COUNT); do
|
||||||
start_osd $i
|
start_osd $i
|
||||||
done
|
done
|
||||||
|
|
||||||
node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 >>./testdata/mon.log 2>&1 &
|
(while true; do set +e; node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" --verbose 1; if [[ $? -ne 2 ]]; then break; fi; done) >>./testdata/mon.log 2>&1 &
|
||||||
MON_PID=$!
|
MON_PID=$!
|
||||||
|
|
||||||
if [ "$SCHEME" = "ec" ]; then
|
if [ "$SCHEME" = "ec" ]; then
|
||||||
|
|
|
@ -15,7 +15,7 @@ for i in $(seq 1 $OSD_COUNT); do
|
||||||
eval OSD${i}_PID=$!
|
eval OSD${i}_PID=$!
|
||||||
done
|
done
|
||||||
|
|
||||||
node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 >>./testdata/mon.log 2>&1 &
|
(while true; do node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 || true; done) >>./testdata/mon.log 2>&1 &
|
||||||
MON_PID=$!
|
MON_PID=$!
|
||||||
|
|
||||||
sleep 3
|
sleep 3
|
||||||
|
|
Loading…
Reference in New Issue