diff --git a/mon/PGUtil.js b/mon/PGUtil.js index c3512beb..d233d4b3 100644 --- a/mon/PGUtil.js +++ b/mon/PGUtil.js @@ -3,6 +3,7 @@ module.exports = { scale_pg_count, + scale_pg_history, }; function add_pg_history(new_pg_history, new_pg, prev_pgs, prev_pg_history, old_pg) @@ -43,16 +44,18 @@ function finish_pg_history(merged_history) merged_history.all_peers = Object.values(merged_history.all_peers); } -function scale_pg_count(prev_pgs, real_prev_pgs, prev_pg_history, new_pg_history, new_pg_count) +function scale_pg_history(prev_pg_history, prev_pgs, new_pgs) { - const old_pg_count = real_prev_pgs.length; + const new_pg_history = []; + const old_pg_count = prev_pgs.length; + const new_pg_count = new_pgs.length; // Add all possibly intersecting PGs to the history of new PGs if (!(new_pg_count % old_pg_count)) { // New PG count is a multiple of old PG count for (let i = 0; i < new_pg_count; i++) { - add_pg_history(new_pg_history, i, real_prev_pgs, prev_pg_history, i % old_pg_count); + add_pg_history(new_pg_history, i, prev_pgs, prev_pg_history, i % old_pg_count); finish_pg_history(new_pg_history[i]); } } @@ -64,7 +67,7 @@ function scale_pg_count(prev_pgs, real_prev_pgs, prev_pg_history, new_pg_history { for (let j = 0; j < mul; j++) { - add_pg_history(new_pg_history, i, real_prev_pgs, prev_pg_history, i+j*new_pg_count); + add_pg_history(new_pg_history, i, prev_pgs, prev_pg_history, i+j*new_pg_count); } finish_pg_history(new_pg_history[i]); } @@ -76,7 +79,7 @@ function scale_pg_count(prev_pgs, real_prev_pgs, prev_pg_history, new_pg_history let merged_history = {}; for (let i = 0; i < old_pg_count; i++) { - add_pg_history(merged_history, 1, real_prev_pgs, prev_pg_history, i); + add_pg_history(merged_history, 1, prev_pgs, prev_pg_history, i); } finish_pg_history(merged_history[1]); for (let i = 0; i < new_pg_count; i++) @@ -89,6 +92,12 @@ function scale_pg_count(prev_pgs, real_prev_pgs, prev_pg_history, new_pg_history { new_pg_history[i] = null; } + return new_pg_history; +} + +function scale_pg_count(prev_pgs, new_pg_count) +{ + const old_pg_count = prev_pgs.length; // Just for the lp_solve optimizer - pick a "previous" PG for each "new" one if (prev_pgs.length < new_pg_count) { diff --git a/mon/mon.js b/mon/mon.js index 3a2a5fa9..a7ed66a6 100644 --- a/mon/mon.js +++ b/mon/mon.js @@ -59,6 +59,7 @@ const etcd_tree = { etcd_mon_timeout: 1000, // ms. min: 0 etcd_mon_retries: 5, // min: 0 mon_change_timeout: 1000, // ms. min: 100 + mon_retry_change_timeout: 50, // ms. min: 10 mon_stats_timeout: 1000, // ms. min: 100 osd_out_time: 600, // seconds. min: 0 placement_levels: { datacenter: 1, rack: 2, host: 3, osd: 4, ... }, @@ -498,6 +499,11 @@ class Mon { this.config.mon_change_timeout = 100; } + this.config.mon_retry_change_timeout = Number(this.config.mon_retry_change_timeout) || 50; + if (this.config.mon_retry_change_timeout < 50) + { + this.config.mon_retry_change_timeout = 50; + } this.config.mon_stats_timeout = Number(this.config.mon_stats_timeout) || 1000; if (this.config.mon_stats_timeout < 100) { @@ -1230,6 +1236,89 @@ class Mon return aff_osds; } + async generate_pool_pgs(pool_id, osd_tree, levels) + { + const pool_cfg = this.state.config.pools[pool_id]; + if (!this.validate_pool_cfg(pool_id, pool_cfg, false)) + { + return null; + } + let pool_tree = osd_tree[pool_cfg.root_node || '']; + pool_tree = pool_tree ? pool_tree.children : []; + pool_tree = LPOptimizer.flatten_tree(pool_tree, levels, pool_cfg.failure_domain, 'osd'); + this.filter_osds_by_tags(osd_tree, pool_tree, pool_cfg.osd_tags); + this.filter_osds_by_block_layout( + pool_tree, + pool_cfg.block_size || this.config.block_size || 131072, + pool_cfg.bitmap_granularity || this.config.bitmap_granularity || 4096, + pool_cfg.immediate_commit || this.config.immediate_commit || 'none' + ); + // First try last_clean_pgs to minimize data movement + let prev_pgs = []; + for (const pg in ((this.state.history.last_clean_pgs.items||{})[pool_id]||{})) + { + prev_pgs[pg-1] = [ ...this.state.history.last_clean_pgs.items[pool_id][pg].osd_set ]; + } + if (!prev_pgs.length) + { + // Fall back to config/pgs if it's empty + for (const pg in ((this.state.config.pgs.items||{})[pool_id]||{})) + { + prev_pgs[pg-1] = [ ...this.state.config.pgs.items[pool_id][pg].osd_set ]; + } + } + const old_pg_count = prev_pgs.length; + const optimize_cfg = { + osd_tree: pool_tree, + pg_count: pool_cfg.pg_count, + pg_size: pool_cfg.pg_size, + pg_minsize: pool_cfg.pg_minsize, + max_combinations: pool_cfg.max_osd_combinations, + ordered: pool_cfg.scheme != 'replicated', + }; + let optimize_result; + // Re-shuffle PGs if config/pgs.hash is empty + if (old_pg_count > 0 && this.state.config.pgs.hash) + { + if (prev_pgs.length != pool_cfg.pg_count) + { + // Scale PG count + // Do it even if old_pg_count is already equal to pool_cfg.pg_count, + // because last_clean_pgs may still contain the old number of PGs + PGUtil.scale_pg_count(prev_pgs, pool_cfg.pg_count); + } + for (const pg of prev_pgs) + { + while (pg.length < pool_cfg.pg_size) + { + pg.push(0); + } + } + optimize_result = await LPOptimizer.optimize_change({ + prev_pgs, + ...optimize_cfg, + }); + } + else + { + optimize_result = await LPOptimizer.optimize_initial(optimize_cfg); + } + console.log(`Pool ${pool_id} (${pool_cfg.name || 'unnamed'}):`); + LPOptimizer.print_change_stats(optimize_result); + const pg_effsize = Math.min(pool_cfg.pg_size, Object.keys(pool_tree).length); + return { + pool_id, + pgs: optimize_result.int_pgs, + stats: { + total_raw_tb: optimize_result.space, + pg_real_size: pg_effsize || pool_cfg.pg_size, + raw_to_usable: (pg_effsize || pool_cfg.pg_size) / (pool_cfg.scheme === 'replicated' + ? 1 : (pool_cfg.pg_size - (pool_cfg.parity_chunks||0))), + space_efficiency: optimize_result.space/(optimize_result.total_space||1), + }, + }; + } + async recheck_pgs() { if (this.recheck_pgs_active) @@ -1244,158 +1333,47 @@ class Mon const { up_osds, levels, osd_tree } = this.get_osd_tree(); const tree_cfg = { osd_tree, + levels, pools: this.state.config.pools, }; const tree_hash = sha1hex(stableStringify(tree_cfg)); if (this.state.config.pgs.hash != tree_hash) { // Something has changed - const new_config_pgs = JSON.parse(JSON.stringify(this.state.config.pgs)); - const etcd_request = { compare: [], success: [] }; - for (const pool_id in (this.state.config.pgs||{}).items||{}) + console.log('Pool configuration or OSD tree changed, re-optimizing'); + // First re-optimize PGs, but don't look at history yet + const optimize_results = await Promise.all(Object.keys(this.state.config.pools) + .map(pool_id => this.generate_pool_pgs(pool_id, osd_tree, levels))); + // Then apply the modification in the form of an optimistic transaction, + // each time considering new pg/history modifications (OSDs modify it during rebalance) + while (!await this.apply_pool_pgs(optimize_results, up_osds, osd_tree, tree_hash)) { - if (!this.state.config.pools[pool_id]) - { - // Pool deleted. Delete all PGs, but first stop them. - if (!await this.stop_all_pgs(pool_id)) - { - this.recheck_pgs_active = false; - this.schedule_recheck(); - return; - } - const prev_pgs = []; - for (const pg in this.state.config.pgs.items[pool_id]||{}) - { - prev_pgs[pg-1] = this.state.config.pgs.items[pool_id][pg].osd_set; - } - // Also delete pool statistics - etcd_request.success.push({ requestDeleteRange: { - key: b64(this.etcd_prefix+'/pool/stats/'+pool_id), - } }); - this.save_new_pgs_txn(new_config_pgs, etcd_request, pool_id, up_osds, osd_tree, prev_pgs, [], []); - } - } - for (const pool_id in this.state.config.pools) - { - const pool_cfg = this.state.config.pools[pool_id]; - if (!this.validate_pool_cfg(pool_id, pool_cfg, false)) - { - continue; - } - let pool_tree = osd_tree[pool_cfg.root_node || '']; - pool_tree = pool_tree ? pool_tree.children : []; - pool_tree = LPOptimizer.flatten_tree(pool_tree, levels, pool_cfg.failure_domain, 'osd'); - this.filter_osds_by_tags(osd_tree, pool_tree, pool_cfg.osd_tags); - this.filter_osds_by_block_layout( - pool_tree, - pool_cfg.block_size || this.config.block_size || 131072, - pool_cfg.bitmap_granularity || this.config.bitmap_granularity || 4096, - pool_cfg.immediate_commit || this.config.immediate_commit || 'none' + console.log( + 'Someone changed PG configuration while we also tried to change it.'+ + ' Retrying in '+this.config.mon_retry_change_timeout+' ms' ); - // These are for the purpose of building history.osd_sets - const real_prev_pgs = []; - let pg_history = []; - for (const pg in ((this.state.config.pgs.items||{})[pool_id]||{})) + // Failed to apply - parallel change detected. Wait a bit and retry + const old_rev = this.etcd_watch_revision; + while (this.etcd_watch_revision === old_rev) { - real_prev_pgs[pg-1] = this.state.config.pgs.items[pool_id][pg].osd_set; - if (this.state.pg.history[pool_id] && - this.state.pg.history[pool_id][pg]) - { - pg_history[pg-1] = this.state.pg.history[pool_id][pg]; - } + await new Promise(ok => setTimeout(ok, this.config.mon_retry_change_timeout)); } - // And these are for the purpose of minimizing data movement - let prev_pgs = []; - for (const pg in ((this.state.history.last_clean_pgs.items||{})[pool_id]||{})) - { - prev_pgs[pg-1] = this.state.history.last_clean_pgs.items[pool_id][pg].osd_set; - } - prev_pgs = JSON.parse(JSON.stringify(prev_pgs.length ? prev_pgs : real_prev_pgs)); - const old_pg_count = real_prev_pgs.length; - const optimize_cfg = { - osd_tree: pool_tree, - pg_count: pool_cfg.pg_count, - pg_size: pool_cfg.pg_size, - pg_minsize: pool_cfg.pg_minsize, - max_combinations: pool_cfg.max_osd_combinations, - ordered: pool_cfg.scheme != 'replicated', + const new_ot = this.get_osd_tree(); + const new_tcfg = { + osd_tree: new_ot.osd_tree, + levels: new_ot.levels, + pools: this.state.config.pools, }; - let optimize_result; - if (old_pg_count > 0) + if (sha1hex(stableStringify(new_tcfg)) !== tree_hash) { - if (old_pg_count != pool_cfg.pg_count) - { - // PG count changed. Need to bring all PGs down. - if (!await this.stop_all_pgs(pool_id)) - { - this.recheck_pgs_active = false; - this.schedule_recheck(); - return; - } - } - if (prev_pgs.length != pool_cfg.pg_count) - { - // Scale PG count - // Do it even if old_pg_count is already equal to pool_cfg.pg_count, - // because last_clean_pgs may still contain the old number of PGs - const new_pg_history = []; - PGUtil.scale_pg_count(prev_pgs, real_prev_pgs, pg_history, new_pg_history, pool_cfg.pg_count); - pg_history = new_pg_history; - } - for (const pg of prev_pgs) - { - while (pg.length < pool_cfg.pg_size) - { - pg.push(0); - } - } - if (!this.state.config.pgs.hash) - { - // Re-shuffle PGs - optimize_result = await LPOptimizer.optimize_initial(optimize_cfg); - } - else - { - optimize_result = await LPOptimizer.optimize_change({ - prev_pgs, - ...optimize_cfg, - }); - } + // Configuration actually changed, restart from the beginning + this.recheck_pgs_active = false; + setImmediate(() => this.recheck_pgs().catch(this.die)); + return; } - else - { - optimize_result = await LPOptimizer.optimize_initial(optimize_cfg); - } - if (old_pg_count != optimize_result.int_pgs.length) - { - console.log( - `PG count for pool ${pool_id} (${pool_cfg.name || 'unnamed'})`+ - ` changed from: ${old_pg_count} to ${optimize_result.int_pgs.length}` - ); - // Drop stats - etcd_request.success.push({ requestDeleteRange: { - key: b64(this.etcd_prefix+'/pg/stats/'+pool_id+'/'), - range_end: b64(this.etcd_prefix+'/pg/stats/'+pool_id+'0'), - } }); - } - LPOptimizer.print_change_stats(optimize_result); - const pg_effsize = Math.min(pool_cfg.pg_size, Object.keys(pool_tree).length); - this.state.pool.stats[pool_id] = { - used_raw_tb: (this.state.pool.stats[pool_id]||{}).used_raw_tb || 0, - total_raw_tb: optimize_result.space, - pg_real_size: pg_effsize || pool_cfg.pg_size, - raw_to_usable: (pg_effsize || pool_cfg.pg_size) / (pool_cfg.scheme === 'replicated' - ? 1 : (pool_cfg.pg_size - (pool_cfg.parity_chunks||0))), - space_efficiency: optimize_result.space/(optimize_result.total_space||1), - }; - etcd_request.success.push({ requestPut: { - key: b64(this.etcd_prefix+'/pool/stats/'+pool_id), - value: b64(JSON.stringify(this.state.pool.stats[pool_id])), - } }); - this.save_new_pgs_txn(new_config_pgs, etcd_request, pool_id, up_osds, osd_tree, real_prev_pgs, optimize_result.int_pgs, pg_history); + // Configuration didn't change, PG history probably changed, so just retry } - new_config_pgs.hash = tree_hash; - await this.save_pg_config(new_config_pgs, etcd_request); + console.log('PG configuration successfully changed'); } else { @@ -1442,8 +1420,81 @@ class Mon this.recheck_pgs_active = false; } - async save_pg_config(new_config_pgs, etcd_request = { compare: [], success: [] }) + async apply_pool_pgs(results, up_osds, osd_tree, tree_hash) { + for (const pool_id in (this.state.config.pgs||{}).items||{}) + { + // We should stop all PGs when deleting a pool or changing its PG count + if (!this.state.config.pools[pool_id] || + this.state.config.pgs.items[pool_id] && this.state.config.pools[pool_id].pg_count != + Object.keys(this.state.config.pgs.items[pool_id]).reduce((a, c) => (a < (0|c) ? (0|c) : a), 0)) + { + if (!await this.stop_all_pgs(pool_id)) + { + return false; + } + } + } + const new_config_pgs = JSON.parse(JSON.stringify(this.state.config.pgs)); + const etcd_request = { compare: [], success: [] }; + for (const pool_id in (new_config_pgs||{}).items||{}) + { + if (!this.state.config.pools[pool_id]) + { + const prev_pgs = []; + for (const pg in new_config_pgs.items[pool_id]||{}) + { + prev_pgs[pg-1] = new_config_pgs.items[pool_id][pg].osd_set; + } + // Also delete pool statistics + etcd_request.success.push({ requestDeleteRange: { + key: b64(this.etcd_prefix+'/pool/stats/'+pool_id), + } }); + this.save_new_pgs_txn(new_config_pgs, etcd_request, pool_id, up_osds, osd_tree, prev_pgs, [], []); + } + } + for (const pool_res of results) + { + const pool_id = pool_res.pool_id; + const pool_cfg = this.state.config.pools[pool_id]; + let pg_history = []; + for (const pg in ((this.state.config.pgs.items||{})[pool_id]||{})) + { + if (this.state.pg.history[pool_id] && + this.state.pg.history[pool_id][pg]) + { + pg_history[pg-1] = this.state.pg.history[pool_id][pg]; + } + } + const real_prev_pgs = []; + for (const pg in ((this.state.config.pgs.items||{})[pool_id]||{})) + { + real_prev_pgs[pg-1] = [ ...this.state.config.pgs.items[pool_id][pg].osd_set ]; + } + if (real_prev_pgs.length > 0 && real_prev_pgs.length != pool_res.pgs.length) + { + console.log( + `Changing PG count for pool ${pool_id} (${pool_cfg.name || 'unnamed'})`+ + ` from: ${real_prev_pgs.length} to ${pool_res.pgs.length}` + ); + pg_history = PGUtil.scale_pg_history(pg_history, real_prev_pgs, pool_res.pgs); + // Drop stats + etcd_request.success.push({ requestDeleteRange: { + key: b64(this.etcd_prefix+'/pg/stats/'+pool_id+'/'), + range_end: b64(this.etcd_prefix+'/pg/stats/'+pool_id+'0'), + } }); + } + const stats = { + used_raw_tb: (this.state.pool.stats[pool_id]||{}).used_raw_tb || 0, + ...pool_res.stats, + }; + etcd_request.success.push({ requestPut: { + key: b64(this.etcd_prefix+'/pool/stats/'+pool_id), + value: b64(JSON.stringify(stats)), + } }); + this.save_new_pgs_txn(new_config_pgs, etcd_request, pool_id, up_osds, osd_tree, real_prev_pgs, pool_res.pgs, pg_history); + } + new_config_pgs.hash = tree_hash; etcd_request.compare.push( { key: b64(this.etcd_prefix+'/mon/master'), target: 'LEASE', lease: ''+this.etcd_lease_id }, { key: b64(this.etcd_prefix+'/config/pgs'), target: 'MOD', mod_revision: ''+this.etcd_watch_revision, result: 'LESS' }, @@ -1451,14 +1502,8 @@ class Mon etcd_request.success.push( { requestPut: { key: b64(this.etcd_prefix+'/config/pgs'), value: b64(JSON.stringify(new_config_pgs)) } }, ); - const res = await this.etcd_call('/kv/txn', etcd_request, this.config.etcd_mon_timeout, 0); - if (!res.succeeded) - { - console.log('Someone changed PG configuration while we also tried to change it. Retrying in '+this.config.mon_change_timeout+' ms'); - this.schedule_recheck(); - return; - } - console.log('PG configuration successfully changed'); + const txn_res = await this.etcd_call('/kv/txn', etcd_request, this.config.etcd_mon_timeout, 0); + return txn_res.succeeded; } // Schedule next recheck at least at diff --git a/tests/run_3osds.sh b/tests/run_3osds.sh index affa6fa9..eb243aa3 100644 --- a/tests/run_3osds.sh +++ b/tests/run_3osds.sh @@ -53,7 +53,7 @@ for i in $(seq 1 $OSD_COUNT); do start_osd $i done -(while true; do node mon/mon-main.js --etcd_url $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 || true; done) &>./testdata/mon.log & +(while true; do node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 || true; done) >>./testdata/mon.log 2>&1 & MON_PID=$! if [ "$SCHEME" = "ec" ]; then diff --git a/tests/test_change_pg_count.sh b/tests/test_change_pg_count.sh index 5492983c..2a1ce213 100755 --- a/tests/test_change_pg_count.sh +++ b/tests/test_change_pg_count.sh @@ -18,6 +18,7 @@ try_change() for i in {1..6}; do echo --- Change PG count to $n --- >>testdata/osd$i.log done + echo --- Change PG count to $n --- >>testdata/mon.log $ETCDCTL put /vitastor/config/pools '{"1":{'$POOLCFG',"pg_size":'$PG_SIZE',"pg_minsize":'$PG_MINSIZE',"pg_count":'$n'}}' diff --git a/tests/test_failure_domain.sh b/tests/test_failure_domain.sh index 99f58c4a..8296bbf1 100755 --- a/tests/test_failure_domain.sh +++ b/tests/test_failure_domain.sh @@ -15,7 +15,7 @@ $ETCDCTL put /vitastor/osd/stats/7 '{"host":"host4","size":1073741824,"time":"'$ $ETCDCTL put /vitastor/osd/stats/8 '{"host":"host4","size":1073741824,"time":"'$TIME'"}' $ETCDCTL put /vitastor/config/pools '{"1":{"name":"testpool","scheme":"replicated","pg_size":2,"pg_minsize":1,"pg_count":4,"failure_domain":"rack"}}' -node mon/mon-main.js --etcd_url $ETCD_URL --etcd_prefix "/vitastor" &>./testdata/mon.log & +node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" >>./testdata/mon.log 2>&1 & MON_PID=$! sleep 2 diff --git a/tests/test_vm_cont.sh b/tests/test_vm_cont.sh index 2116cd31..fb778c43 100755 --- a/tests/test_vm_cont.sh +++ b/tests/test_vm_cont.sh @@ -15,7 +15,7 @@ for i in $(seq 1 $OSD_COUNT); do eval OSD${i}_PID=$! done -(while true; do node mon/mon-main.js --etcd_url $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 || true; done) &>./testdata/mon.log & +(while true; do node mon/mon-main.js --etcd_address $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 || true; done) >>./testdata/mon.log 2>&1 & MON_PID=$! sleep 3