forked from vitalif/vitastor
Implement per-pool PG calculation, fix some lint warnings
parent
fe0d78bf8e
commit
a8b3cbd6af
|
@ -55,7 +55,7 @@ async function optimize_initial({ osd_tree, pg_count, pg_size = 3, pg_minsize =
|
|||
}
|
||||
const all_weights = Object.assign({}, ...Object.values(osd_tree));
|
||||
const total_weight = Object.values(all_weights).reduce((a, c) => Number(a) + Number(c), 0);
|
||||
all_pgs = Object.values(random_combinations(osd_tree, pg_size, max_combinations));
|
||||
const all_pgs = Object.values(random_combinations(osd_tree, pg_size, max_combinations));
|
||||
const pg_per_osd = {};
|
||||
for (const pg of all_pgs)
|
||||
{
|
||||
|
@ -246,7 +246,7 @@ async function optimize_change({ prev_pgs: prev_int_pgs, osd_tree, pg_size = 3,
|
|||
}
|
||||
}
|
||||
// Get all combinations
|
||||
all_pgs = random_combinations(osd_tree, pg_size, max_combinations);
|
||||
let all_pgs = random_combinations(osd_tree, pg_size, max_combinations);
|
||||
add_valid_previous(osd_tree, prev_weights, all_pgs);
|
||||
all_pgs = Object.values(all_pgs);
|
||||
const pg_per_osd = {};
|
||||
|
|
289
lp/mon.js
289
lp/mon.js
|
@ -1,4 +1,5 @@
|
|||
const http = require('http');
|
||||
const crypto = require('crypto');
|
||||
const os = require('os');
|
||||
const WebSocket = require('ws');
|
||||
const LPOptimizer = require('./lp-optimizer.js');
|
||||
|
@ -13,14 +14,15 @@ class Mon
|
|||
'config/global',
|
||||
'config/node_placement',
|
||||
'config/pools',
|
||||
'config/osd/\d+',
|
||||
'config/osd/[1-9]\d*',
|
||||
'config/pgs',
|
||||
'osd/state/\d+',
|
||||
'osd/stats/\d+',
|
||||
'osd/state/[1-9]\d*',
|
||||
'osd/stats/[1-9]\d*',
|
||||
'mon/master',
|
||||
'pg/state/\d+/\d+',
|
||||
'pg/stats/\d+/\d+',
|
||||
'pg/history/\d+/\d+',
|
||||
'pg/state/[1-9]\d*/[1-9]\d*',
|
||||
'pg/stats/[1-9]\d*/[1-9]\d*',
|
||||
'pg/history/[1-9]\d*/[1-9]\d*',
|
||||
'stats',
|
||||
].join('$|^')+'$')
|
||||
|
||||
static etcd_tree = {
|
||||
|
@ -33,7 +35,6 @@ class Mon
|
|||
mon_change_timeout: 1000, // min: 100
|
||||
mon_stats_timeout: 1000, // min: 100
|
||||
osd_out_time: 1800, // min: 0
|
||||
max_osd_combinations: 10000, // min: 100
|
||||
placement_levels: { datacenter: 1, rack: 2, host: 3, osd: 4, ... },
|
||||
// client and osd
|
||||
use_sync_send_recv: false,
|
||||
|
@ -64,10 +65,15 @@ class Mon
|
|||
}, */
|
||||
node_placement: {},
|
||||
/* pools: {
|
||||
<name>: {
|
||||
id: 1,
|
||||
<id>: {
|
||||
name: 'testpool',
|
||||
scheme: 'xor',
|
||||
pg_size: 3,
|
||||
pg_minsize: 2,
|
||||
pg_count: 100,
|
||||
failure_domain: 'host',
|
||||
max_osd_combinations: 10000,
|
||||
// FIXME add device classes/tags
|
||||
},
|
||||
...
|
||||
}, */
|
||||
|
@ -76,11 +82,14 @@ class Mon
|
|||
/* <id>: { reweight: 1 }, ... */
|
||||
},
|
||||
/* pgs: {
|
||||
<pool_id>: {
|
||||
<pg_id>: {
|
||||
osd_set: [ 1, 2, 3 ],
|
||||
primary: 1,
|
||||
pause: false,
|
||||
hash: string,
|
||||
items: {
|
||||
<pool_id>: {
|
||||
<pg_id>: {
|
||||
osd_set: [ 1, 2, 3 ],
|
||||
primary: 1,
|
||||
pause: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
}, */
|
||||
|
@ -118,7 +127,9 @@ class Mon
|
|||
},
|
||||
},
|
||||
mon: {
|
||||
master: {},
|
||||
master: {
|
||||
/* ip: [ string ], */
|
||||
},
|
||||
},
|
||||
pg: {
|
||||
state: {
|
||||
|
@ -132,11 +143,11 @@ class Mon
|
|||
stats: {
|
||||
/* <pool_id>: {
|
||||
<pg_id>: {
|
||||
object_count: int,
|
||||
clean_count: int,
|
||||
misplaced_count: int,
|
||||
degraded_count: int,
|
||||
incomplete_count: int,
|
||||
object_count: uint64_t,
|
||||
clean_count: uint64_t,
|
||||
misplaced_count: uint64_t,
|
||||
degraded_count: uint64_t,
|
||||
incomplete_count: uint64_t,
|
||||
write_osd_set: osd_num_t[],
|
||||
},
|
||||
}, */
|
||||
|
@ -146,11 +157,30 @@ class Mon
|
|||
<pg_id>: {
|
||||
osd_sets: osd_num_t[][],
|
||||
all_peers: osd_num_t[],
|
||||
epoch: int,
|
||||
epoch: uint32_t,
|
||||
},
|
||||
}, */
|
||||
},
|
||||
},
|
||||
stats: {
|
||||
/* op_stats: {
|
||||
<string>: { count: uint64_t, usec: uint64_t, bytes: uint64_t },
|
||||
},
|
||||
subop_stats: {
|
||||
<string>: { count: uint64_t, usec: uint64_t },
|
||||
},
|
||||
recovery_stats: {
|
||||
degraded: { count: uint64_t, bytes: uint64_t },
|
||||
misplaced: { count: uint64_t, bytes: uint64_t },
|
||||
},
|
||||
object_counts: {
|
||||
object: uint64_t,
|
||||
clean: uint64_t,
|
||||
misplaced: uint64_t,
|
||||
degraded: uint64_t,
|
||||
incomplete: uint64_t,
|
||||
}, */
|
||||
},
|
||||
}
|
||||
|
||||
constructor(config)
|
||||
|
@ -223,11 +253,6 @@ class Mon
|
|||
{
|
||||
this.config.osd_out_time = 30*60; // 30 minutes by default
|
||||
}
|
||||
this.config.max_osd_combinations = Number(this.config.max_osd_combinations) || 10000;
|
||||
if (this.config.max_osd_combinations < 100)
|
||||
{
|
||||
this.config.max_osd_combinations = 100;
|
||||
}
|
||||
}
|
||||
|
||||
async start_watcher(retries)
|
||||
|
@ -246,7 +271,7 @@ class Mon
|
|||
{
|
||||
this.ws.close();
|
||||
ok(false);
|
||||
}, timeout);
|
||||
}, this.config.etcd_mon_timeout);
|
||||
this.ws = new WebSocket(base+'/watch');
|
||||
this.ws.on('open', () =>
|
||||
{
|
||||
|
@ -329,7 +354,7 @@ class Mon
|
|||
{
|
||||
this.die('Lease expired');
|
||||
}
|
||||
}, config.etcd_mon_timeout);
|
||||
}, this.config.etcd_mon_timeout);
|
||||
}
|
||||
|
||||
async become_master()
|
||||
|
@ -421,28 +446,29 @@ class Mon
|
|||
return LPOptimizer.flatten_tree(tree[''].children, levels, this.config.failure_domain, 'osd');
|
||||
}
|
||||
|
||||
async stop_all_pgs()
|
||||
async stop_all_pgs(pool_id)
|
||||
{
|
||||
let has_online = false, paused = true;
|
||||
for (const pg in this.state.config.pgs.items||{})
|
||||
for (const pg in this.state.config.pgs.items[pool_id]||{})
|
||||
{
|
||||
const cur_state = ((this.state.pg.state[pg]||{}).state||[]).join(',');
|
||||
// FIXME: Change all (||{}) to ?. (optional chaining) at some point
|
||||
const cur_state = (((this.state.pg.state[pool_id]||{})[pg]||{}).state||[]).join(',');
|
||||
if (cur_state != '' && cur_state != 'offline')
|
||||
{
|
||||
has_online = true;
|
||||
}
|
||||
if (!this.state.config.pgs.items[pg].pause)
|
||||
if (!this.state.config.pgs.items[pool_id][pg].pause)
|
||||
{
|
||||
paused = false;
|
||||
}
|
||||
}
|
||||
if (!paused)
|
||||
{
|
||||
console.log('Stopping all PGs before changing PG count');
|
||||
console.log('Stopping all PGs for pool '+pool_id+' before changing PG count');
|
||||
const new_cfg = JSON.parse(JSON.stringify(this.state.config.pgs));
|
||||
for (const pg in new_cfg.items)
|
||||
for (const pg in new_cfg.items[pool_id])
|
||||
{
|
||||
new_cfg.items[pg].pause = true;
|
||||
new_cfg.items[pool_id][pg].pause = true;
|
||||
}
|
||||
// Check that no OSDs change their state before we pause PGs
|
||||
// Doing this we make sure that OSDs don't wake up in the middle of our "transaction"
|
||||
|
@ -472,9 +498,8 @@ class Mon
|
|||
return !has_online;
|
||||
}
|
||||
|
||||
async save_new_pgs(prev_pgs, new_pgs, pg_history, tree_hash)
|
||||
save_new_pgs_txn(request, pool_id, prev_pgs, new_pgs, pg_history)
|
||||
{
|
||||
const txn = [], checks = [];
|
||||
const pg_items = {};
|
||||
new_pgs.map((osd_set, i) =>
|
||||
{
|
||||
|
@ -493,109 +518,158 @@ class Mon
|
|||
});
|
||||
for (let i = 0; i < new_pgs.length || i < prev_pgs.length; i++)
|
||||
{
|
||||
checks.push({
|
||||
key: b64(this.etcd_prefix+'/pg/history/'+(i+1)),
|
||||
request.compare.push({
|
||||
key: b64(this.etcd_prefix+'/pg/history/'+pool_id+'/'+(i+1)),
|
||||
target: 'MOD',
|
||||
mod_revision: ''+this.etcd_watch_revision,
|
||||
result: 'LESS',
|
||||
});
|
||||
if (pg_history[i])
|
||||
{
|
||||
txn.push({
|
||||
request.success.push({
|
||||
requestPut: {
|
||||
key: b64(this.etcd_prefix+'/pg/history/'+(i+1)),
|
||||
key: b64(this.etcd_prefix+'/pg/history/'+pool_id+'/'+(i+1)),
|
||||
value: b64(JSON.stringify(pg_history[i])),
|
||||
},
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
txn.push({
|
||||
request.success.push({
|
||||
requestDeleteRange: {
|
||||
key: b64(this.etcd_prefix+'/pg/history/'+(i+1)),
|
||||
key: b64(this.etcd_prefix+'/pg/history/'+pool_id+'/'+(i+1)),
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
this.state.config.pgs = {
|
||||
hash: tree_hash,
|
||||
items: pg_items,
|
||||
};
|
||||
const res = await this.etcd_call('/txn', {
|
||||
compare: [
|
||||
{ key: b64(this.etcd_prefix+'/mon/master'), target: 'LEASE', lease: ''+this.etcd_lease_id },
|
||||
{ key: b64(this.etcd_prefix+'/config/pgs'), target: 'MOD', mod_revision: ''+this.etcd_watch_revision, result: 'LESS' },
|
||||
...checks,
|
||||
],
|
||||
success: [
|
||||
{ requestPut: { key: b64(this.etcd_prefix+'/config/pgs'), value: b64(JSON.stringify(this.state.config.pgs)) } },
|
||||
...txn,
|
||||
],
|
||||
}, this.config.etcd_mon_timeout, 0);
|
||||
return res.succeeded;
|
||||
this.state.config.pgs.items[pool_id] = pg_items;
|
||||
}
|
||||
|
||||
validate_pool_cfg(pool_id, pool_cfg)
|
||||
{
|
||||
pool_cfg.pg_size = Math.floor(pool_cfg.pg_size);
|
||||
pool_cfg.pg_minsize = Math.floor(pool_cfg.pg_minsize);
|
||||
pool_cfg.pg_count = Math.floor(pool_cfg.pg_count);
|
||||
pool_cfg.failure_domain = pool_cfg.failure_domain || 'host';
|
||||
pool_cfg.max_osd_combinations = Math.floor(pool_cfg.max_osd_combinations) || 10000;
|
||||
if (!/^[1-9]\d*$/.exec(''+pool_id))
|
||||
{
|
||||
console.log('Pool ID '+pool_id+' is invalid');
|
||||
return false;
|
||||
}
|
||||
if (!pool_cfg.pg_size || pool_cfg.pg_size < 1)
|
||||
{
|
||||
console.log('Pool '+pool_id+' has invalid pg_size');
|
||||
return false;
|
||||
}
|
||||
if (!pool_cfg.pg_minsize || pool_cfg.pg_minsize < 1 || pool_cfg.pg_minsize > pool_cfg.pg_size)
|
||||
{
|
||||
console.log('Pool '+pool_id+' has invalid pg_minsize');
|
||||
return false;
|
||||
}
|
||||
if (!pool_cfg.pg_count || pool_cfg.pg_count < 1)
|
||||
{
|
||||
console.log('Pool '+pool_id+' has invalid pg_count');
|
||||
return false;
|
||||
}
|
||||
if (!pool_cfg.name)
|
||||
{
|
||||
console.log('Pool '+pool_id+' has invalid pg_count');
|
||||
return false;
|
||||
}
|
||||
if (pool_cfg.max_osd_combinations < 100)
|
||||
{
|
||||
console.log('Pool '+pool_id+' has invalid max_osd_combinations');
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
async recheck_pgs()
|
||||
{
|
||||
// Take configuration and state, check it against the stored configuration hash
|
||||
// Recalculate PGs and save them to etcd if the configuration is changed
|
||||
// FIXME: Also do not change anything if the distribution is good enough and no PGs are degraded
|
||||
const tree_cfg = {
|
||||
osd_tree: this.get_osd_tree(),
|
||||
pg_count: this.config.pg_count || Object.keys(this.state.config.pgs.items||{}).length || 128,
|
||||
max_osd_combinations: this.config.max_osd_combinations,
|
||||
pools: this.state.config.pools,
|
||||
};
|
||||
const tree_hash = sha1hex(stableStringify(tree_cfg));
|
||||
if (this.state.config.pgs.hash != tree_hash)
|
||||
{
|
||||
// Something has changed
|
||||
const prev_pgs = [];
|
||||
for (const pg in this.state.config.pgs.items||{})
|
||||
const etcd_request = { compare: [], success: [] };
|
||||
for (const pool_id in this.state.config.pools)
|
||||
{
|
||||
prev_pgs[pg-1] = this.state.config.pgs.items[pg].osd_set;
|
||||
}
|
||||
const pg_history = [];
|
||||
const old_pg_count = prev_pgs.length;
|
||||
let optimize_result;
|
||||
if (old_pg_count > 0)
|
||||
{
|
||||
if (old_pg_count != tree_cfg.pg_count)
|
||||
const pool_cfg = this.state.config.pools[pool_id];
|
||||
if (!this.validate_pool_cfg(pool_id, pool_cfg))
|
||||
{
|
||||
// PG count changed. Need to bring all PGs down.
|
||||
if (!await this.stop_all_pgs())
|
||||
{
|
||||
this.schedule_recheck();
|
||||
return;
|
||||
}
|
||||
PGUtil.scale_pg_count(prev_pgs, this.state.pg.history, pg_history, new_pg_count);
|
||||
return;
|
||||
}
|
||||
optimize_result = await LPOptimizer.optimize_change({
|
||||
prev_pgs,
|
||||
osd_tree: tree_cfg.osd_tree,
|
||||
pg_size: 3,
|
||||
max_combinations: tree_cfg.max_osd_combinations,
|
||||
});
|
||||
const prev_pgs = [];
|
||||
for (const pg in (this.state.config.pgs.items[pool_id]||{}).items||{})
|
||||
{
|
||||
prev_pgs[pg-1] = this.state.config.pgs.items[pool_id][pg].osd_set;
|
||||
}
|
||||
const pg_history = [];
|
||||
const old_pg_count = prev_pgs.length;
|
||||
let optimize_result;
|
||||
if (old_pg_count > 0)
|
||||
{
|
||||
if (old_pg_count != pool_cfg.pg_count)
|
||||
{
|
||||
// PG count changed. Need to bring all PGs down.
|
||||
if (!await this.stop_all_pgs(pool_id))
|
||||
{
|
||||
this.schedule_recheck();
|
||||
return;
|
||||
}
|
||||
PGUtil.scale_pg_count(prev_pgs, this.state.pg.history[pool_id]||{}, pg_history, pool_cfg.pg_count);
|
||||
}
|
||||
optimize_result = await LPOptimizer.optimize_change({
|
||||
prev_pgs,
|
||||
osd_tree: tree_cfg.osd_tree,
|
||||
pg_size: pool_cfg.pg_size,
|
||||
pg_minsize: pool_cfg.pg_minsize,
|
||||
max_combinations: pool_cfg.max_osd_combinations,
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
optimize_result = await LPOptimizer.optimize_initial({
|
||||
osd_tree: tree_cfg.osd_tree,
|
||||
pg_count: pool_cfg.pg_count,
|
||||
pg_size: pool_cfg.pg_size,
|
||||
pg_minsize: pool_cfg.pg_minsize,
|
||||
max_combinations: pool_cfg.max_osd_combinations,
|
||||
});
|
||||
}
|
||||
if (old_pg_count != optimize_result.int_pgs.length)
|
||||
{
|
||||
console.log(
|
||||
`PG count for pool ${pool_id} (${pool_cfg.name || 'unnamed'}) `+
|
||||
`changed from: ${old_pg_count} to ${optimize_result.int_pgs.length}`
|
||||
);
|
||||
}
|
||||
LPOptimizer.print_change_stats(optimize_result);
|
||||
this.save_new_pgs_txn(etcd_request, pool_id, prev_pgs, optimize_result.int_pgs, pg_history);
|
||||
}
|
||||
else
|
||||
{
|
||||
optimize_result = await LPOptimizer.optimize_initial({
|
||||
osd_tree: tree_cfg.osd_tree,
|
||||
pg_size: 3,
|
||||
pg_count: tree_cfg.pg_count,
|
||||
max_combinations: tree_cfg.max_osd_combinations,
|
||||
});
|
||||
}
|
||||
if (!await this.save_new_pgs(prev_pgs, optimize_result.int_pgs, pg_history, tree_hash))
|
||||
this.state.config.pgs.hash = tree_hash;
|
||||
etcd_request.compare.push(
|
||||
{ key: b64(this.etcd_prefix+'/mon/master'), target: 'LEASE', lease: ''+this.etcd_lease_id },
|
||||
{ key: b64(this.etcd_prefix+'/config/pgs'), target: 'MOD', mod_revision: ''+this.etcd_watch_revision, result: 'LESS' },
|
||||
);
|
||||
etcd_request.success.push(
|
||||
{ requestPut: { key: b64(this.etcd_prefix+'/config/pgs'), value: b64(JSON.stringify(this.state.config.pgs)) } },
|
||||
);
|
||||
const res = await this.etcd_call('/txn', etcd_request, this.config.etcd_mon_timeout, 0);
|
||||
if (!res.succeeded)
|
||||
{
|
||||
console.log('Someone changed PG configuration while we also tried to change it. Retrying in '+this.config.mon_change_timeout+' ms');
|
||||
this.schedule_recheck();
|
||||
return;
|
||||
}
|
||||
console.log('PG configuration successfully changed');
|
||||
if (old_pg_count != optimize_result.int_pgs.length)
|
||||
{
|
||||
console.log(`PG count changed from: ${old_pg_count} to ${optimize_result.int_pgs.length}`);
|
||||
}
|
||||
LPOptimizer.print_change_stats(optimize_result);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -688,14 +762,17 @@ class Mon
|
|||
}
|
||||
}
|
||||
const object_counts = { object: 0n, clean: 0n, misplaced: 0n, degraded: 0n, incomplete: 0n };
|
||||
for (const pg_num in this.state.pg.stats)
|
||||
for (const pool_id in this.state.pg.stats)
|
||||
{
|
||||
const st = this.state.pg.stats[pg_num];
|
||||
for (const k in object_counts)
|
||||
for (const pg_num in this.state.pg.stats[pool_id])
|
||||
{
|
||||
if (st[k+'_count'])
|
||||
const st = this.state.pg.stats[pool_id][pg_num];
|
||||
for (const k in object_counts)
|
||||
{
|
||||
object_counts[k] += BigInt(st[k+'_count']);
|
||||
if (st[k+'_count'])
|
||||
{
|
||||
object_counts[k] += BigInt(st[k+'_count']);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -761,7 +838,7 @@ class Mon
|
|||
return;
|
||||
}
|
||||
key = key.split('/');
|
||||
const cur = this.state;
|
||||
let cur = this.state;
|
||||
for (let i = 0; i < key.length-1; i++)
|
||||
{
|
||||
cur = (cur[key[i]] = cur[key[i]] || {});
|
||||
|
@ -854,7 +931,7 @@ function POST(url, body, timeout)
|
|||
clearTimeout(timer_id);
|
||||
let res_body = '';
|
||||
res.setEncoding('utf8');
|
||||
res.on('data', chunk => { res_body += chunk });
|
||||
res.on('data', chunk => { res_body += chunk; });
|
||||
res.on('end', () =>
|
||||
{
|
||||
if (res.statusCode != 200)
|
||||
|
|
Loading…
Reference in New Issue