forked from vitalif/vitastor
Basic fixes for the Monitor
parent
e051db5a73
commit
cc4714a3a7
|
@ -19,4 +19,4 @@ if (!options.etcd_url)
|
|||
process.exit();
|
||||
}
|
||||
|
||||
new Mon(options).start();
|
||||
new Mon(options).start().catch(e => { console.error(e); process.exit(); });
|
||||
|
|
90
lp/mon.js
90
lp/mon.js
|
@ -209,13 +209,13 @@ class Mon
|
|||
await this.get_lease();
|
||||
await this.become_master();
|
||||
await this.load_cluster_state();
|
||||
await this.start_watcher();
|
||||
await this.start_watcher(this.config.etcd_mon_retries);
|
||||
await this.recheck_pgs();
|
||||
}
|
||||
|
||||
async load_config()
|
||||
{
|
||||
const res = await this.etcd_call('/txn', { success: [
|
||||
const res = await this.etcd_call('/kv/txn', { success: [
|
||||
{ requestRange: { key: b64(this.etcd_prefix+'/config/global') } }
|
||||
] }, this.etcd_start_timeout, -1);
|
||||
this.parse_kv(res.responses[0].response_range.kvs[0]);
|
||||
|
@ -260,7 +260,7 @@ class Mon
|
|||
async start_watcher(retries)
|
||||
{
|
||||
let retry = 0;
|
||||
if (retries >= 0 && retries < 1)
|
||||
if (!retries || retries < 1)
|
||||
{
|
||||
retries = 1;
|
||||
}
|
||||
|
@ -282,10 +282,11 @@ class Mon
|
|||
ok(true);
|
||||
});
|
||||
});
|
||||
if (!ok)
|
||||
if (ok)
|
||||
{
|
||||
this.ws = null;
|
||||
break;
|
||||
}
|
||||
this.ws = null;
|
||||
retry++;
|
||||
}
|
||||
if (!this.ws)
|
||||
|
@ -312,12 +313,16 @@ class Mon
|
|||
}
|
||||
if (!data || !data.result || !data.result.events)
|
||||
{
|
||||
console.error('Garbage received from watch websocket: '+msg);
|
||||
if (!data || !data.result || !data.result.watch_id)
|
||||
{
|
||||
console.error('Garbage received from watch websocket: '+msg);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
let stats_changed = false, changed = false;
|
||||
console.log('Revision '+data.result.header.revision+' events: ');
|
||||
this.etcd_watch_revision = BigInt(data.result.header.revision)+BigInt(1);
|
||||
for (const e of data.result.events)
|
||||
{
|
||||
this.parse_kv(e.kv);
|
||||
|
@ -364,24 +369,27 @@ class Mon
|
|||
const state = { ip: this.local_ips() };
|
||||
while (1)
|
||||
{
|
||||
const res = await this.etcd_call('/txn', {
|
||||
const res = await this.etcd_call('/kv/txn', {
|
||||
compare: [ { target: 'CREATE', create_revision: 0, key: b64(this.etcd_prefix+'/mon/master') } ],
|
||||
success: [ { key: b64(this.etcd_prefix+'/mon/master'), value: b64(JSON.stringify(state)), lease: ''+this.etcd_lease_id } ],
|
||||
success: [ { requestPut: { key: b64(this.etcd_prefix+'/mon/master'), value: b64(JSON.stringify(state)), lease: ''+this.etcd_lease_id } } ],
|
||||
}, this.etcd_start_timeout, 0);
|
||||
if (!res.succeeded)
|
||||
if (res.succeeded)
|
||||
{
|
||||
await new Promise(ok => setTimeout(ok, this.etcd_start_timeout));
|
||||
break;
|
||||
}
|
||||
console.log('Waiting to become master');
|
||||
await new Promise(ok => setTimeout(ok, this.etcd_start_timeout));
|
||||
}
|
||||
console.log('Became master');
|
||||
}
|
||||
|
||||
async load_cluster_state()
|
||||
{
|
||||
const res = await this.etcd_call('/txn', { success: [
|
||||
const res = await this.etcd_call('/kv/txn', { success: [
|
||||
{ requestRange: { key: b64(this.etcd_prefix+'/'), range_end: b64(this.etcd_prefix+'0') } },
|
||||
] }, this.etcd_start_timeout, -1);
|
||||
this.etcd_watch_revision = BigInt(res.header.revision)+BigInt(1);
|
||||
const data = JSON.parse(JSON.stringify(this.constructor.etcd_tree));
|
||||
this.state = JSON.parse(JSON.stringify(this.constructor.etcd_tree));
|
||||
for (const response of res.responses)
|
||||
{
|
||||
for (const kv of response.response_range.kvs)
|
||||
|
@ -389,7 +397,6 @@ class Mon
|
|||
this.parse_kv(kv);
|
||||
}
|
||||
}
|
||||
this.state = data;
|
||||
}
|
||||
|
||||
all_osds()
|
||||
|
@ -481,7 +488,7 @@ class Mon
|
|||
const key = b64(this.etcd_prefix+'/osd/state/'+osd_num);
|
||||
checks.push({ key, target: 'MOD', result: 'LESS', mod_revision: ''+this.etcd_watch_revision });
|
||||
}
|
||||
const res = await this.etcd_call('/txn', {
|
||||
const res = await this.etcd_call('/kv/txn', {
|
||||
compare: [
|
||||
{ key: b64(this.etcd_prefix+'/mon/master'), target: 'LEASE', lease: ''+this.etcd_lease_id },
|
||||
{ key: b64(this.etcd_prefix+'/config/pgs'), target: 'MOD', mod_revision: ''+this.etcd_watch_revision, result: 'LESS' },
|
||||
|
@ -544,10 +551,11 @@ class Mon
|
|||
});
|
||||
}
|
||||
}
|
||||
this.state.config.pgs.items = this.state.config.pgs.items || {};
|
||||
this.state.config.pgs.items[pool_id] = pg_items;
|
||||
}
|
||||
|
||||
validate_pool_cfg(pool_id, pool_cfg)
|
||||
validate_pool_cfg(pool_id, pool_cfg, warn)
|
||||
{
|
||||
pool_cfg.pg_size = Math.floor(pool_cfg.pg_size);
|
||||
pool_cfg.pg_minsize = Math.floor(pool_cfg.pg_minsize);
|
||||
|
@ -556,39 +564,46 @@ class Mon
|
|||
pool_cfg.max_osd_combinations = Math.floor(pool_cfg.max_osd_combinations) || 10000;
|
||||
if (!/^[1-9]\d*$/.exec(''+pool_id))
|
||||
{
|
||||
console.log('Pool ID '+pool_id+' is invalid');
|
||||
if (warn)
|
||||
console.log('Pool ID '+pool_id+' is invalid');
|
||||
return false;
|
||||
}
|
||||
if (!pool_cfg.pg_size || pool_cfg.pg_size < 1 ||
|
||||
pool_cfg.scheme === 'xor' && pool_cfg.pg_size < 3)
|
||||
{
|
||||
console.log('Pool '+pool_id+' has invalid pg_size');
|
||||
if (warn)
|
||||
console.log('Pool '+pool_id+' has invalid pg_size');
|
||||
return false;
|
||||
}
|
||||
if (!pool_cfg.pg_minsize || pool_cfg.pg_minsize < 1 || pool_cfg.pg_minsize > pool_cfg.pg_size ||
|
||||
pool_cfg.scheme === 'xor' && pool_cfg.pg_minsize < (pool_cfg.pg_size - 1))
|
||||
{
|
||||
console.log('Pool '+pool_id+' has invalid pg_minsize');
|
||||
if (warn)
|
||||
console.log('Pool '+pool_id+' has invalid pg_minsize');
|
||||
return false;
|
||||
}
|
||||
if (!pool_cfg.pg_count || pool_cfg.pg_count < 1)
|
||||
{
|
||||
console.log('Pool '+pool_id+' has invalid pg_count');
|
||||
if (warn)
|
||||
console.log('Pool '+pool_id+' has invalid pg_count');
|
||||
return false;
|
||||
}
|
||||
if (!pool_cfg.name)
|
||||
{
|
||||
console.log('Pool '+pool_id+' has empty name');
|
||||
if (warn)
|
||||
console.log('Pool '+pool_id+' has empty name');
|
||||
return false;
|
||||
}
|
||||
if (pool_cfg.scheme !== 'xor' && pool_cfg.scheme !== 'replicated')
|
||||
{
|
||||
console.log('Pool '+pool_id+' has invalid coding scheme (only "xor" and "replicated" are allowed)');
|
||||
if (warn)
|
||||
console.log('Pool '+pool_id+' has invalid coding scheme (only "xor" and "replicated" are allowed)');
|
||||
return false;
|
||||
}
|
||||
if (pool_cfg.max_osd_combinations < 100)
|
||||
{
|
||||
console.log('Pool '+pool_id+' has invalid max_osd_combinations (must be at least 100)');
|
||||
if (warn)
|
||||
console.log('Pool '+pool_id+' has invalid max_osd_combinations (must be at least 100)');
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
@ -611,12 +626,12 @@ class Mon
|
|||
for (const pool_id in this.state.config.pools)
|
||||
{
|
||||
const pool_cfg = this.state.config.pools[pool_id];
|
||||
if (!this.validate_pool_cfg(pool_id, pool_cfg))
|
||||
if (!this.validate_pool_cfg(pool_id, pool_cfg, false))
|
||||
{
|
||||
return;
|
||||
continue;
|
||||
}
|
||||
const prev_pgs = [];
|
||||
for (const pg in (this.state.config.pgs.items[pool_id]||{}).items||{})
|
||||
for (const pg in ((this.state.config.pgs.items||{})[pool_id]||{}).items||{})
|
||||
{
|
||||
prev_pgs[pg-1] = this.state.config.pgs.items[pool_id][pg].osd_set;
|
||||
}
|
||||
|
@ -671,7 +686,7 @@ class Mon
|
|||
etcd_request.success.push(
|
||||
{ requestPut: { key: b64(this.etcd_prefix+'/config/pgs'), value: b64(JSON.stringify(this.state.config.pgs)) } },
|
||||
);
|
||||
const res = await this.etcd_call('/txn', etcd_request, this.config.etcd_mon_timeout, 0);
|
||||
const res = await this.etcd_call('/kv/txn', etcd_request, this.config.etcd_mon_timeout, 0);
|
||||
if (!res.succeeded)
|
||||
{
|
||||
console.log('Someone changed PG configuration while we also tried to change it. Retrying in '+this.config.mon_change_timeout+' ms');
|
||||
|
@ -812,7 +827,7 @@ class Mon
|
|||
{
|
||||
ser.object_counts[k] = ''+stats.object_counts[k];
|
||||
}
|
||||
await this.etcd_call('/txn', {
|
||||
await this.etcd_call('/kv/txn', {
|
||||
success: [ { requestPut: { key: b64(this.etcd_prefix+'/stats'), value: b64(JSON.stringify(ser)) } } ],
|
||||
}, this.config.etcd_mon_timeout, 0);
|
||||
}
|
||||
|
@ -839,13 +854,14 @@ class Mon
|
|||
return;
|
||||
}
|
||||
kv.key = de64(kv.key);
|
||||
kv.value = kv.value ? JSON.parse(de64(kv.value)) : null;
|
||||
let key = kv.key.substr(this.etcd_prefix.length);
|
||||
kv.value = kv.value ? de64(kv.value) : null;
|
||||
let key = kv.key.substr(this.etcd_prefix.length+1);
|
||||
if (!this.constructor.etcd_allow.exec(key))
|
||||
{
|
||||
console.log('Bad key in etcd: '+kv.key+' = '+kv.value);
|
||||
return;
|
||||
}
|
||||
kv.value = kv.value ? JSON.parse(kv.value) : null;
|
||||
key = key.split('/');
|
||||
let cur = this.state;
|
||||
for (let i = 0; i < key.length-1; i++)
|
||||
|
@ -859,6 +875,14 @@ class Mon
|
|||
this.config = this.state.config.global;
|
||||
this.check_config();
|
||||
}
|
||||
else if (key.join('/') === 'config/pools')
|
||||
{
|
||||
for (const pool_id in this.state.config.pools)
|
||||
{
|
||||
const pool_cfg = this.state.config.pools[pool_id];
|
||||
this.validate_pool_cfg(pool_id, pool_cfg, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async etcd_call(path, body, timeout, retries)
|
||||
|
@ -874,14 +898,14 @@ class Mon
|
|||
const res = await POST(base+path, body, timeout);
|
||||
if (res.error)
|
||||
{
|
||||
console.log('etcd returned error: '+res.error);
|
||||
console.error('etcd returned error: '+res.error);
|
||||
break;
|
||||
}
|
||||
if (res.json)
|
||||
{
|
||||
if (res.json.error)
|
||||
{
|
||||
console.log('etcd returned error: '+res.json.error);
|
||||
console.error('etcd returned error: '+res.json.error);
|
||||
break;
|
||||
}
|
||||
return res.json;
|
||||
|
@ -894,7 +918,7 @@ class Mon
|
|||
die(err)
|
||||
{
|
||||
// In fact we can just try to rejoin
|
||||
console.fatal(err || 'Cluster connection failed');
|
||||
console.error(new Error(err || 'Cluster connection failed'));
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
|
@ -980,3 +1004,5 @@ function sha1hex(str)
|
|||
hash.update(str);
|
||||
return hash.digest('hex');
|
||||
}
|
||||
|
||||
module.exports = Mon;
|
||||
|
|
Loading…
Reference in New Issue