diff --git a/lp/mon.js b/lp/mon.js index ffabde18..14944c2f 100644 --- a/lp/mon.js +++ b/lp/mon.js @@ -46,18 +46,13 @@ class Mon this.etcd_urls.push(scheme+'://'+url); } this.etcd_prefix = config.etcd_prefix || '/rage'; - if (!/^\/+/.exec(this.etcd_prefix)) - { - this.etcd_prefix = '/' + this.etcd_prefix; - } + this.etcd_prefix = this.etcd_prefix.replace(/\/\/+/g, '/').replace(/^\/?(.*[^\/])\/?$/, '/$1'); this.etcd_start_timeout = (config.etcd_start_timeout || 5) * 1000; - this.data = JSON.parse(JSON.stringify(Mon.etcd_tree)); + this.state = JSON.parse(JSON.stringify(Mon.etcd_tree)); } async start() { - await this.load_cluster_state(); - return; await this.load_config(); await this.get_lease(); await this.become_master(); @@ -71,11 +66,12 @@ class Mon const res = await this.etcd_call('/txn', { success: [ { requestRange: { key: b64(this.etcd_prefix+'/config/global') } } ] }, this.etcd_start_timeout, -1); - this.config = this.parse_kv(res.responses[0].response_range.kvs[0]).value || {}; - if (!(this.config instanceof Object) || this.config instanceof Array) - { - throw new Error(this.etcd_prefix+'/config/global is not a JSON object'); - } + this.parse_kv(res.responses[0].response_range.kvs[0]); + this.check_config(); + } + + check_config() + { this.config.etcd_mon_timeout = Number(this.config.etcd_mon_timeout) || 0; if (this.config.etcd_mon_timeout <= 0) { @@ -97,6 +93,11 @@ class Mon { this.config.osd_out_time = 30*60; // 30 minutes by default } + this.config.max_osd_combinations = Number(this.config.max_osd_combinations) || 10000; + if (this.config.max_osd_combinations < 100) + { + this.config.max_osd_combinations = 100; + } } async start_watcher(retries) @@ -138,7 +139,7 @@ class Mon create_request: { key: b64(this.etcd_prefix+'/'), range_end: b64(this.etcd_prefix+'0'), - start_revision: this.etcd_watch_revision, + start_revision: ''+this.etcd_watch_revision, watch_id: 1, }, })); @@ -158,22 +159,22 @@ class Mon } else { + let changed = false; console.log('Revision '+data.result.header.revision+' events: '); for (const e of data.result.events) { this.parse_kv(e.kv); + const key = e.kv.key.substr(this.etcd_prefix.length); + if (key.substr(0, 11) != '/osd/stats/' && key.substr(0, 10) != '/pg/stats/') + { + changed = true; + } console.log(e); } - if (this.changeTimer) + if (changed) { - clearTimeout(this.changeTimer); - this.changeTimer = null; + this.schedule_recheck(); } - this.changeTimer = setTimeout(() => - { - this.changeTimer = null; - this.recheck_pgs().catch(console.error); - }, this.config.mon_change_timeout || 1000); } }); } @@ -223,19 +224,24 @@ class Mon this.parse_kv(kv); } } - this.data = data; - this.data.config.placement_tree = this.data.config.placement_tree || {}; + this.state = data; + } + + all_osds() + { + return Object.keys(this.state.osd.stats); } get_osd_tree() { - const levels = this.data.config.placement_tree.levels || {}; + this.state.config.placement_tree = this.state.config.placement_tree||{}; + const levels = this.state.config.placement_tree.levels||{}; levels.host = levels.host || 100; levels.osd = levels.osd || 101; const tree = { '': { children: [] } }; - for (const node_id in this.data.config.placement_tree) + for (const node_id in this.state.config.placement_tree.nodes||{}) { - const node_cfg = this.data.config.placement_tree[node_id]; + const node_cfg = this.state.config.placement_tree.nodes[node_id]; if (!node_id || /^\d/.exec(node_id) || !node_cfg.level || !levels[node_cfg.level]) { @@ -244,11 +250,12 @@ class Mon } tree[node_id] = { id: node_id, level: node_cfg.level, parent: node_cfg.parent, children: [] }; } + // This requires monitor system time to be in sync with OSD system times (at least to some extent) const down_time = Date.now()/1000 - this.config.osd_out_time; - for (const osd_num of Object.keys(this.data.osd.stats).sort((a, b) => a - b)) + for (const osd_num of this.all_osds().sort((a, b) => a - b)) { - const stat = this.data.osd.stats[osd_num]; - if (stat.size && (this.data.osd.state[osd_num] || Number(stat.time) >= down_time)) + const stat = this.state.osd.stats[osd_num]; + if (stat.size && (this.state.osd.state[osd_num] || Number(stat.time) >= down_time)) { // Numeric IDs are reserved for OSDs tree[osd_num] = tree[osd_num] || { id: osd_num, parent: stat.host }; @@ -273,7 +280,217 @@ class Mon tree[parent].children.push(tree[node_id]); delete node_cfg.parent; } - return LPOptimizer.flatten_tree(tree[''].children, levels, this.data.config.failure_domain, 'osd'); + return LPOptimizer.flatten_tree(tree[''].children, levels, this.state.config.failure_domain, 'osd'); + } + + async stop_all_pgs() + { + let has_online = false, paused = true; + for (const pg in this.state.config.pgs.items||{}) + { + const cur_state = ((this.state.pg.state[pg]||{}).state||[]).join(','); + if (cur_state != '' && cur_state != 'offline') + { + has_online = true; + } + if (!this.state.config.pgs.items[pg].pause) + { + paused = false; + } + } + if (!paused) + { + console.log('Stopping all PGs before changing PG count'); + const new_cfg = JSON.parse(JSON.stringify(this.state.config.pgs)); + for (const pg in new_cfg.items) + { + new_cfg.items[pg].pause = true; + } + // Check that no OSDs change their state before we pause PGs + // Doing this we make sure that OSDs don't wake up in the middle of our "transaction" + // and can't see the old PG configuration + const checks = []; + for (const osd_num of this.all_osds()) + { + const key = b64(this.etcd_prefix+'/osd/state/'+osd_num); + checks.push({ key, target: 'MOD', result: 'LESS', mod_revision: ''+this.etcd_watch_revision }); + } + const res = await this.etcd_call('/txn', { + compare: [ + { key: b64(this.etcd_prefix+'/mon/master'), target: 'LEASE', lease: ''+this.etcd_lease_id }, + { key: b64(this.etcd_prefix+'/config/pgs'), target: 'MOD', mod_revision: ''+this.etcd_watch_revision, result: 'LESS' }, + ...checks, + ], + success: [ + { requestPut: { key: b64(this.etcd_prefix+'/config/pgs'), value: b64(JSON.stringify(new_cfg)) } }, + ], + }, this.config.etcd_mon_timeout, 0); + if (!res.succeeded) + { + return false; + } + this.state.config.pgs = new_cfg; + } + return !has_online; + } + + scale_pg_count(prev_pgs, pg_history, new_pg_count) + { + const old_pg_count = prev_pgs.length; + // Add all possibly intersecting PGs into the history of new PGs + if (!(new_pg_count % old_pg_count)) + { + // New PG count is a multiple of the old PG count + const mul = (new_pg_count / old_pg_count); + for (let i = 0; i < new_pg_count; i++) + { + const old_i = Math.floor(new_pg_count / mul); + pg_history[i] = JSON.parse(JSON.stringify(this.state.pg.history[1+old_i])); + } + } + else if (!(old_pg_count % new_pg_count)) + { + // Old PG count is a multiple of the new PG count + const mul = (old_pg_count / new_pg_count); + for (let i = 0; i < new_pg_count; i++) + { + pg_history[i] = { + osd_sets: [], + all_peers: [], + }; + for (let j = 0; j < mul; j++) + { + pg_history[i].osd_sets.push(prev_pgs[i*mul]); + const hist = this.state.pg.history[1+i*mul+j]; + if (hist && hist.osd_sets && hist.osd_sets.length) + { + Array.prototype.push.apply(pg_history[i].osd_sets, hist.osd_sets); + } + if (hist && hist.all_peers && hist.all_peers.length) + { + Array.prototype.push.apply(pg_history[i].all_peers, hist.all_peers); + } + } + } + } + else + { + // Any PG may intersect with any PG after non-multiple PG count change + // So, merge ALL PGs history + let all_sets = {}; + let all_peers = {}; + for (const pg of prev_pgs) + { + all_sets[pg.join(' ')] = pg; + } + for (const pg in this.state.pg.history) + { + const hist = this.state.pg.history[pg]; + if (hist && hist.osd_sets) + { + for (const pg of hist.osd_sets) + { + all_sets[pg.join(' ')] = pg; + } + } + if (hist && hist.all_peers) + { + for (const osd_num of hist.all_peers) + { + all_peers[osd_num] = Number(osd_num); + } + } + } + all_sets = Object.values(all_sets); + all_peers = Object.values(all_peers); + for (let i = 0; i < new_pg_count; i++) + { + pg_history[i] = { osd_sets: all_sets, all_peers }; + } + } + // Mark history keys for removed PGs as removed + for (let i = new_pg_count; i < old_pg_count; i++) + { + pg_history[i] = null; + } + if (old_pg_count < new_pg_count) + { + for (let i = new_pg_count-1; i >= 0; i--) + { + prev_pgs[i] = prev_pgs[Math.floor(i/new_pg_count*old_pg_count)]; + } + } + else if (old_pg_count > new_pg_count) + { + for (let i = 0; i < new_pg_count; i++) + { + prev_pgs[i] = prev_pgs[Math.round(i/new_pg_count*old_pg_count)]; + } + prev_pgs.splice(new_pg_count, old_pg_count-new_pg_count); + } + } + + async save_new_pgs(prev_pgs, new_pgs, pg_history, tree_hash) + { + const txn = [], checks = []; + const pg_items = {}; + new_pgs.map((osd_set, i) => + { + osd_set = osd_set.map(osd_num => osd_num === LPOptimizer.NO_OSD ? 0 : osd_num); + const alive_set = osd_set.filter(osd_num => osd_num); + pg_items[i+1] = { + osd_set, + primary: alive_set.length ? alive_set[Math.floor(Math.random()*alive_set.length)] : 0, + }; + if (prev_pgs[i] && prev_pgs[i].join(' ') != osd_set.join(' ')) + { + pg_history[i] = pg_history[i] || {}; + pg_history[i].osd_sets = pg_history[i].osd_sets || []; + pg_history[i].osd_sets.push(prev_pgs[i]); + } + }); + for (let i = 0; i < new_pgs.length || i < prev_pgs.length; i++) + { + checks.push({ + key: b64(this.etcd_prefix+'/pg/history/'+(i+1)), + target: 'MOD', + mod_revision: ''+this.etcd_watch_revision, + result: 'LESS', + }); + if (pg_history[i]) + { + txn.push({ + requestPut: { + key: b64(this.etcd_prefix+'/pg/history/'+(i+1)), + value: b64(JSON.stringify(pg_history[i])), + }, + }); + } + else + { + txn.push({ + requestDeleteRange: { + key: b64(this.etcd_prefix+'/pg/history/'+(i+1)), + }, + }); + } + } + this.state.config.pgs = { + hash: tree_hash, + items: pg_items, + }; + const res = await this.etcd_call('/txn', { + compare: [ + { key: b64(this.etcd_prefix+'/mon/master'), target: 'LEASE', lease: ''+this.etcd_lease_id }, + { key: b64(this.etcd_prefix+'/config/pgs'), target: 'MOD', mod_revision: ''+this.etcd_watch_revision, result: 'LESS' }, + ...checks, + ], + success: [ + { requestPut: { key: b64(this.etcd_prefix+'/config/pgs'), value: b64(JSON.stringify(this.state.config.pgs)) } }, + ...txn, + ], + }, this.config.etcd_mon_timeout, 0); + return res.succeeded; } async recheck_pgs() @@ -282,190 +499,68 @@ class Mon // Recalculate PGs and save them to etcd if the configuration is changed const tree_cfg = { osd_tree: this.get_osd_tree(), - pg_count: this.data.config.global.pg_count || Object.keys(this.data.config.pgs.items||{}).length || 128, - max_osd_combinations: this.data.config.global.max_osd_combinations, + pg_count: this.config.pg_count || Object.keys(this.state.config.pgs.items||{}).length || 128, + max_osd_combinations: this.config.max_osd_combinations, }; const tree_hash = sha1hex(stableStringify(tree_cfg)); - if (this.data.config.pgs.hash != tree_hash) + if (this.state.config.pgs.hash != tree_hash) { // Something has changed - const pg_history = []; const prev_pgs = []; - for (const pg in this.data.config.pgs.items||{}) + for (const pg in this.state.config.pgs.items||{}) { - prev_pgs[pg-1] = this.data.config.pgs.items[pg].osd_set; + prev_pgs[pg-1] = this.state.config.pgs.items[pg].osd_set; } - let pgs; - if (prev_pgs.length > 0) + const pg_history = []; + const old_pg_count = prev_pgs.length; + let optimize_result; + if (old_pg_count > 0) { - if (prev_pgs.length != tree_cfg.pg_count) + if (old_pg_count != tree_cfg.pg_count) { // PG count changed. Need to bring all PGs down. - for (const pg in this.data.config.pgs.items) + if (!await this.stop_all_pgs()) { - const cur_state = ((this.data.pg.state[pg]||{}).state||[]).join(','); - if (cur_state != '' && cur_state != 'offline') - { - await this.stop_all_pgs(); - return; - } - } - all_osds = Object.keys(all_osds); - // ...and add all possibly intersecting PGs into the history of new PGs - if (!(tree_cfg.pg_count % prev_pgs.length)) - { - // New PG count is a multiple of the old PG count - const mul = (tree_cfg.pg_count / prev_pgs.length); - for (let i = 0; i < tree_cfg.pg_count; i++) - { - const old_i = Math.floor(tree_cfg.pg_count / mul); - pg_history[i] = JSON.parse(JSON.stringify(this.data.pg.history[1+old_i])); - } - } - else if (!(prev_pgs.length % tree_cfg.pg_count)) - { - // Old PG count is a multiple of the new PG count - const mul = (prev_pgs.length / tree_cfg.pg_count); - for (let i = 0; i < tree_cfg.pg_count; i++) - { - pg_history[i] = { - osd_sets: [], - all_peers: [], - }; - for (let j = 0; j < mul; j++) - { - pg_history[i].osd_sets.push(prev_pgs[i*mul]); - const hist = this.data.pg.history[1+i*mul+j]; - if (hist && hist.osd_sets && hist.osd_sets.length) - { - Array.prototype.push.apply(pg_history[i].osd_sets, hist.osd_sets); - } - if (hist && hist.all_peers && hist.all_peers.length) - { - Array.prototype.push.apply(pg_history[i].all_peers, hist.all_peers); - } - } - } - } - else - { - // Any PG may intersect with any PG after non-multiple PG count change - // So, merge ALL PGs history - let all_sets = {}; - let all_peers = {}; - for (const pg of prev_pgs) - { - all_sets[pg.join(' ')] = pg; - } - for (const pg in this.data.pg.history) - { - const hist = this.data.pg.history[pg]; - if (hist && hist.osd_sets) - { - for (const pg of hist.osd_sets) - { - all_sets[pg.join(' ')] = pg; - } - } - if (hist && hist.all_peers) - { - for (const osd_num of hist.all_peers) - { - all_peers[osd_num] = Number(osd_num); - } - } - } - all_sets = Object.values(all_sets); - all_peers = Object.values(all_peers); - for (let i = 0; i < tree_cfg.pg_count; i++) - { - pg_history[i] = { osd_sets: all_sets, all_peers }; - } - } - // Mark history keys for removed PGs as removed - for (let i = tree_cfg.pg_count; i < prev_pgs.length; i++) - { - pg_history[i] = null; + this.schedule_recheck(); + return; } + this.scale_pg_count(prev_pgs, pg_history, new_pg_count); } - if (prev_pgs.length < tree_cfg.pg_count) - { - for (let i = tree_cfg.pg_count-1; i >= 0; i--) - { - prev_pgs[i] = prev_pgs[Math.floor(i/tree_cfg.pg_count*prev_pgs.length)]; - } - } - else if (prev_pgs.length > tree_cfg.pg_count) - { - for (let i = 0; i < tree_cfg.pg_count; i++) - { - prev_pgs[i] = prev_pgs[Math.round(i/tree_cfg.pg_count*prev_pgs.length)]; - } - prev_pgs.splice(tree_cfg.pg_count, prev_pgs.length-tree_cfg.pg_count); - } - pgs = LPOptimizer.optimize_change(prev_pgs, tree_cfg.osd_tree, tree_cfg.max_osd_combinations); + optimize_result = await LPOptimizer.optimize_change(prev_pgs, tree_cfg.osd_tree, tree_cfg.max_osd_combinations); } else { - pgs = LPOptimizer.optimize_initial(tree_cfg.osd_tree, tree_cfg.pg_count, tree_cfg.max_osd_combinations); + optimize_result = await LPOptimizer.optimize_initial(tree_cfg.osd_tree, tree_cfg.pg_count, tree_cfg.max_osd_combinations); } - // FIXME: Handle insufficient failure domain count - const txn = []; - const pg_items = {}; - pgs.map((osd_set, i) => + if (!await this.save_new_pgs(prev_pgs, optimize_result.int_pgs, pg_history, tree_hash)) { - const alive_set = osd_set.filter(osd_num => osd_num); - pg_items[i+1] = { - osd_set, - primary: alive_set.length ? alive_set[Math.floor(Math.random()*alive_set.length)] : 0, - }; - if (prev_pgs[i] && prev_pgs[i].join(' ') != osd_set.join(' ')) - { - pg_history[i] = pg_history[i] || {}; - pg_history[i].osd_sets = pg_history[i].osd_sets || []; - pg_history[i].osd_sets.push(prev_pgs[i]); - } - }); - for (let i = 0; i < tree_cfg.pg_count || i < prev_pgs.length; i++) - { - if (pg_history[i]) - { - txn.push({ - requestPut: { - key: b64(this.etcd_prefix+'/pg/history/'+(i+1)), - value: b64(JSON.stringify(pg_history[i])), - }, - }); - } - else - { - txn.push({ - requestDeleteRange: { - key: b64(this.etcd_prefix+'/pg/history/'+(i+1)), - }, - }); - } + console.log('Someone changed PG configuration while we also tried to change it. Retrying in '+this.config.mon_change_timeout+' ms'); + this.schedule_recheck(); + return; } - this.data.config.pgs = { - hash: tree_hash, - count: tree_cfg.pg_count, - items: pg_items, - }; - const res = await this.etcd_call('/txn', { - compare: [ - { key: b64(this.etcd_prefix+'/mon/master'), target: 'LEASE', lease: ''+this.etcd_lease_id }, - { key: b64(this.etcd_prefix+'/config/pgs'), target: 'MOD', mod_revision: this.etcd_watch_revision, result: 'LESS' }, - { key: b64(this.etcd_prefix+'/pg/change_stamp'), target: 'MOD', mod_revision: this.etcd_watch_revision, result: 'LESS' }, - ], - success: [ - { requestPut: { key: b64(this.etcd_prefix+'/config/pgs'), value: b64(JSON.stringify(this.data.config.pgs)) } }, - ...txn, - ], - }, this.config.etcd_mon_timeout, 0); - + console.log('PG configuration successfully changed'); + if (old_pg_count != optimize_result.int_pgs.length) + { + console.log(`PG count changed from: ${old_pg_count} to ${optimize_result.int_pgs.length}`); + } + LPOptimizer.print_change_stats(optimize_result); } } + schedule_recheck() + { + if (this.changeTimer) + { + clearTimeout(this.changeTimer); + this.changeTimer = null; + } + this.changeTimer = setTimeout(() => + { + this.changeTimer = null; + this.recheck_pgs().catch(console.error); + }, this.config.mon_change_timeout || 1000); + } + parse_kv(kv) { if (!kv || !kv.key) @@ -475,7 +570,7 @@ class Mon kv.key = de64(kv.key); kv.value = kv.value ? JSON.parse(de64(kv.value)) : null; const key = kv.key.substr(this.etcd_prefix.length).replace(/^\/+/, '').split('/'); - const cur = this.data, orig = Mon.etcd_tree; + const cur = this.state, orig = Mon.etcd_tree; for (let i = 0; i < key.length-1; i++) { if (!orig[key[i]]) @@ -494,12 +589,9 @@ class Mon cur[key[key.length-1]] = kv.value; if (key.join('/') === 'config/global') { - this.data.config.global = this.data.config.global || {}; - this.data.config.global.max_osd_combinations = Number(this.data.config.global.max_osd_combinations) || 10000; - if (this.data.config.global.max_osd_combinations < 100) - { - this.data.config.global.max_osd_combinations = 100; - } + this.state.config.global = this.state.config.global || {}; + this.config = this.state.config.global; + this.check_config(); } }