forked from vitalif/vitastor
Compare commits
1 Commits
mon-self-r
...
epoch-dele
Author | SHA1 | Date | |
---|---|---|---|
4c9bf6727b |
@@ -10,18 +10,25 @@ function add_pg_history(new_pg_history, new_pg, prev_pgs, prev_pg_history, old_p
|
||||
if (!new_pg_history[new_pg])
|
||||
{
|
||||
new_pg_history[new_pg] = {
|
||||
osd_sets: {},
|
||||
osd_set_epochs: {},
|
||||
all_peers: {},
|
||||
epoch: 0,
|
||||
};
|
||||
}
|
||||
const nh = new_pg_history[new_pg], oh = prev_pg_history[old_pg];
|
||||
nh.osd_sets[prev_pgs[old_pg].join(' ')] = prev_pgs[old_pg];
|
||||
nh.osd_set_epochs[prev_pgs[old_pg].join(' ')] = { osd_set: prev_pgs[old_pg] };
|
||||
if (oh && oh.osd_sets && oh.osd_sets.length)
|
||||
{
|
||||
for (const pg of oh.osd_sets)
|
||||
{
|
||||
nh.osd_sets[pg.join(' ')] = pg.map(osd_num => Number(osd_num));
|
||||
nh.osd_set_epochs[pg.join(' ')] = { osd_set: pg.map(osd_num => Number(osd_num)) };
|
||||
}
|
||||
}
|
||||
if (oh && oh.osd_set_epochs && oh.osd_set_epochs.length)
|
||||
{
|
||||
for (const pg of oh.osd_set_epochs)
|
||||
{
|
||||
nh.osd_set_epochs[pg.osd_set.join(' ')] = { osd_set: pg.osd_set.map(osd_num => Number(osd_num)) };
|
||||
}
|
||||
}
|
||||
if (oh && oh.all_peers && oh.all_peers.length)
|
||||
@@ -39,20 +46,20 @@ function add_pg_history(new_pg_history, new_pg, prev_pgs, prev_pg_history, old_p
|
||||
|
||||
function finish_pg_history(merged_history)
|
||||
{
|
||||
merged_history.osd_sets = Object.values(merged_history.osd_sets);
|
||||
merged_history.osd_set_epochs = Object.values(merged_history.osd_set_epochs);
|
||||
merged_history.all_peers = Object.values(merged_history.all_peers);
|
||||
}
|
||||
|
||||
function scale_pg_count(prev_pgs, real_prev_pgs, prev_pg_history, new_pg_history, new_pg_count)
|
||||
function scale_pg_count(prev_pgs, prev_pg_history, new_pg_history, new_pg_count)
|
||||
{
|
||||
const old_pg_count = real_prev_pgs.length;
|
||||
const old_pg_count = prev_pgs.length;
|
||||
// Add all possibly intersecting PGs to the history of new PGs
|
||||
if (!(new_pg_count % old_pg_count))
|
||||
{
|
||||
// New PG count is a multiple of old PG count
|
||||
for (let i = 0; i < new_pg_count; i++)
|
||||
{
|
||||
add_pg_history(new_pg_history, i, real_prev_pgs, prev_pg_history, i % old_pg_count);
|
||||
add_pg_history(new_pg_history, i, prev_pgs, prev_pg_history, i % old_pg_count);
|
||||
finish_pg_history(new_pg_history[i]);
|
||||
}
|
||||
}
|
||||
@@ -64,7 +71,7 @@ function scale_pg_count(prev_pgs, real_prev_pgs, prev_pg_history, new_pg_history
|
||||
{
|
||||
for (let j = 0; j < mul; j++)
|
||||
{
|
||||
add_pg_history(new_pg_history, i, real_prev_pgs, prev_pg_history, i+j*new_pg_count);
|
||||
add_pg_history(new_pg_history, i, prev_pgs, prev_pg_history, i+j*new_pg_count);
|
||||
}
|
||||
finish_pg_history(new_pg_history[i]);
|
||||
}
|
||||
@@ -76,7 +83,7 @@ function scale_pg_count(prev_pgs, real_prev_pgs, prev_pg_history, new_pg_history
|
||||
let merged_history = {};
|
||||
for (let i = 0; i < old_pg_count; i++)
|
||||
{
|
||||
add_pg_history(merged_history, 1, real_prev_pgs, prev_pg_history, i);
|
||||
add_pg_history(merged_history, 1, prev_pgs, prev_pg_history, i);
|
||||
}
|
||||
finish_pg_history(merged_history[1]);
|
||||
for (let i = 0; i < new_pg_count; i++)
|
||||
@@ -90,15 +97,15 @@ function scale_pg_count(prev_pgs, real_prev_pgs, prev_pg_history, new_pg_history
|
||||
new_pg_history[i] = null;
|
||||
}
|
||||
// Just for the lp_solve optimizer - pick a "previous" PG for each "new" one
|
||||
if (prev_pgs.length < new_pg_count)
|
||||
if (old_pg_count < new_pg_count)
|
||||
{
|
||||
for (let i = prev_pgs.length; i < new_pg_count; i++)
|
||||
for (let i = old_pg_count; i < new_pg_count; i++)
|
||||
{
|
||||
prev_pgs[i] = prev_pgs[i % prev_pgs.length];
|
||||
prev_pgs[i] = prev_pgs[i % old_pg_count];
|
||||
}
|
||||
}
|
||||
else if (prev_pgs.length > new_pg_count)
|
||||
else if (old_pg_count > new_pg_count)
|
||||
{
|
||||
prev_pgs.splice(new_pg_count, prev_pgs.length-new_pg_count);
|
||||
prev_pgs.splice(new_pg_count, old_pg_count-new_pg_count);
|
||||
}
|
||||
}
|
||||
|
@@ -13,7 +13,7 @@ for (let i = 2; i < process.argv.length; i++)
|
||||
{
|
||||
console.error('USAGE: '+process.argv[0]+' '+process.argv[1]+' [--verbose 1]'+
|
||||
' [--etcd_address "http://127.0.0.1:2379,..."] [--config_path /etc/vitastor/vitastor.conf]'+
|
||||
' [--etcd_prefix "/vitastor"] [--etcd_start_timeout 5] [--restart_interval 5]');
|
||||
' [--etcd_prefix "/vitastor"] [--etcd_start_timeout 5]');
|
||||
process.exit();
|
||||
}
|
||||
else if (process.argv[i].substr(0, 2) == '--')
|
||||
|
94
mon/mon.js
94
mon/mon.js
@@ -286,7 +286,12 @@ const etcd_tree = {
|
||||
history: {
|
||||
/* <pool_id>: {
|
||||
<pg_id>: {
|
||||
osd_sets: osd_num_t[][],
|
||||
osd_set_epochs: {
|
||||
osd_set: osd_num_t[],
|
||||
min_epoch: uint64_t,
|
||||
max_epoch: uint64_t,
|
||||
}[],
|
||||
osd_sets: osd_num_t[][], // outdated
|
||||
all_peers: osd_num_t[],
|
||||
epoch: uint64_t,
|
||||
},
|
||||
@@ -561,7 +566,7 @@ class Mon
|
||||
}
|
||||
if (!this.ws)
|
||||
{
|
||||
await this.die('Failed to open etcd watch websocket');
|
||||
this.die('Failed to open etcd watch websocket');
|
||||
}
|
||||
const cur_addr = this.selected_etcd_url;
|
||||
this.ws_alive = true;
|
||||
@@ -728,7 +733,7 @@ class Mon
|
||||
const res = await this.etcd_call('/lease/keepalive', { ID: this.etcd_lease_id }, this.config.etcd_mon_timeout, this.config.etcd_mon_retries);
|
||||
if (!res.result.TTL)
|
||||
{
|
||||
await this.die('Lease expired');
|
||||
this.die('Lease expired');
|
||||
}
|
||||
}, this.config.etcd_mon_timeout);
|
||||
if (!this.signals_set)
|
||||
@@ -741,32 +746,9 @@ class Mon
|
||||
|
||||
async on_stop(status)
|
||||
{
|
||||
if (this.ws_keepalive_timer)
|
||||
{
|
||||
clearInterval(this.ws_keepalive_timer);
|
||||
this.ws_keepalive_timer = null;
|
||||
}
|
||||
if (this.lease_timer)
|
||||
{
|
||||
clearInterval(this.lease_timer);
|
||||
this.lease_timer = null;
|
||||
}
|
||||
if (this.etcd_lease_id)
|
||||
{
|
||||
const lease_id = this.etcd_lease_id;
|
||||
this.etcd_lease_id = null;
|
||||
await this.etcd_call('/lease/revoke', { ID: lease_id }, this.config.etcd_mon_timeout, this.config.etcd_mon_retries);
|
||||
}
|
||||
if (!status || !this.initConfig.restart_interval)
|
||||
{
|
||||
process.exit(status);
|
||||
}
|
||||
else
|
||||
{
|
||||
console.log('Restarting after '+this.initConfig.restart_interval+' seconds');
|
||||
await new Promise(ok => setTimeout(ok, this.initConfig.restart_interval*1000));
|
||||
await this.start();
|
||||
}
|
||||
clearInterval(this.lease_timer);
|
||||
await this.etcd_call('/lease/revoke', { ID: this.etcd_lease_id }, this.config.etcd_mon_timeout, this.config.etcd_mon_retries);
|
||||
process.exit(status);
|
||||
}
|
||||
|
||||
async become_master()
|
||||
@@ -979,7 +961,7 @@ class Mon
|
||||
return alive_set[this.rng() % alive_set.length];
|
||||
}
|
||||
|
||||
save_new_pgs_txn(save_to, request, pool_id, up_osds, osd_tree, prev_pgs, new_pgs, pg_history)
|
||||
save_new_pgs_txn(request, pool_id, up_osds, osd_tree, prev_pgs, new_pgs, pg_history)
|
||||
{
|
||||
const aff_osds = this.get_affinity_osds(this.state.config.pools[pool_id], up_osds, osd_tree);
|
||||
const pg_items = {};
|
||||
@@ -991,18 +973,6 @@ class Mon
|
||||
osd_set,
|
||||
primary: this.pick_primary(pool_id, osd_set, up_osds, aff_osds),
|
||||
};
|
||||
if (prev_pgs[i] && prev_pgs[i].join(' ') != osd_set.join(' ') &&
|
||||
prev_pgs[i].filter(osd_num => osd_num).length > 0)
|
||||
{
|
||||
pg_history[i] = pg_history[i] || {};
|
||||
pg_history[i].osd_sets = pg_history[i].osd_sets || [];
|
||||
pg_history[i].osd_sets.push(prev_pgs[i]);
|
||||
}
|
||||
if (pg_history[i] && pg_history[i].osd_sets)
|
||||
{
|
||||
pg_history[i].osd_sets = Object.values(pg_history[i].osd_sets
|
||||
.reduce((a, c) => { a[c.join(' ')] = c; return a; }, {}));
|
||||
}
|
||||
});
|
||||
for (let i = 0; i < new_pgs.length || i < prev_pgs.length; i++)
|
||||
{
|
||||
@@ -1032,14 +1002,14 @@ class Mon
|
||||
});
|
||||
}
|
||||
}
|
||||
save_to.items = save_to.items || {};
|
||||
this.state.config.pgs.items = this.state.config.pgs.items || {};
|
||||
if (!new_pgs.length)
|
||||
{
|
||||
delete save_to.items[pool_id];
|
||||
delete this.state.config.pgs.items[pool_id];
|
||||
}
|
||||
else
|
||||
{
|
||||
save_to.items[pool_id] = pg_items;
|
||||
this.state.config.pgs.items[pool_id] = pg_items;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1183,7 +1153,6 @@ class Mon
|
||||
if (this.state.config.pgs.hash != tree_hash)
|
||||
{
|
||||
// Something has changed
|
||||
const new_config_pgs = JSON.parse(JSON.stringify(this.state.config.pgs));
|
||||
const etcd_request = { compare: [], success: [] };
|
||||
for (const pool_id in (this.state.config.pgs||{}).items||{})
|
||||
{
|
||||
@@ -1204,7 +1173,7 @@ class Mon
|
||||
etcd_request.success.push({ requestDeleteRange: {
|
||||
key: b64(this.etcd_prefix+'/pool/stats/'+pool_id),
|
||||
} });
|
||||
this.save_new_pgs_txn(new_config_pgs, etcd_request, pool_id, up_osds, osd_tree, prev_pgs, [], []);
|
||||
this.save_new_pgs_txn(etcd_request, pool_id, up_osds, osd_tree, prev_pgs, [], []);
|
||||
}
|
||||
}
|
||||
for (const pool_id in this.state.config.pools)
|
||||
@@ -1258,7 +1227,7 @@ class Mon
|
||||
return;
|
||||
}
|
||||
const new_pg_history = [];
|
||||
PGUtil.scale_pg_count(prev_pgs, real_prev_pgs, pg_history, new_pg_history, pool_cfg.pg_count);
|
||||
PGUtil.scale_pg_count(prev_pgs, pg_history, new_pg_history, pool_cfg.pg_count);
|
||||
pg_history = new_pg_history;
|
||||
}
|
||||
for (const pg of prev_pgs)
|
||||
@@ -1311,15 +1280,14 @@ class Mon
|
||||
key: b64(this.etcd_prefix+'/pool/stats/'+pool_id),
|
||||
value: b64(JSON.stringify(this.state.pool.stats[pool_id])),
|
||||
} });
|
||||
this.save_new_pgs_txn(new_config_pgs, etcd_request, pool_id, up_osds, osd_tree, real_prev_pgs, optimize_result.int_pgs, pg_history);
|
||||
this.save_new_pgs_txn(etcd_request, pool_id, up_osds, osd_tree, real_prev_pgs, optimize_result.int_pgs, pg_history);
|
||||
}
|
||||
new_config_pgs.hash = tree_hash;
|
||||
await this.save_pg_config(new_config_pgs, etcd_request);
|
||||
this.state.config.pgs.hash = tree_hash;
|
||||
await this.save_pg_config(etcd_request);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Nothing changed, but we still want to recheck the distribution of primaries
|
||||
let new_config_pgs;
|
||||
let changed = false;
|
||||
for (const pool_id in this.state.config.pools)
|
||||
{
|
||||
@@ -1339,35 +1307,31 @@ class Mon
|
||||
const new_primary = this.pick_primary(pool_id, pg_cfg.osd_set, up_osds, aff_osds);
|
||||
if (pg_cfg.primary != new_primary)
|
||||
{
|
||||
if (!new_config_pgs)
|
||||
{
|
||||
new_config_pgs = JSON.parse(JSON.stringify(this.state.config.pgs));
|
||||
}
|
||||
console.log(
|
||||
`Moving pool ${pool_id} (${pool_cfg.name || 'unnamed'}) PG ${pg_num}`+
|
||||
` primary OSD from ${pg_cfg.primary} to ${new_primary}`
|
||||
);
|
||||
changed = true;
|
||||
new_config_pgs.items[pool_id][pg_num].primary = new_primary;
|
||||
pg_cfg.primary = new_primary;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (changed)
|
||||
{
|
||||
await this.save_pg_config(new_config_pgs);
|
||||
await this.save_pg_config();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async save_pg_config(new_config_pgs, etcd_request = { compare: [], success: [] })
|
||||
async save_pg_config(etcd_request = { compare: [], success: [] })
|
||||
{
|
||||
etcd_request.compare.push(
|
||||
{ key: b64(this.etcd_prefix+'/mon/master'), target: 'LEASE', lease: ''+this.etcd_lease_id },
|
||||
{ key: b64(this.etcd_prefix+'/config/pgs'), target: 'MOD', mod_revision: ''+this.etcd_watch_revision, result: 'LESS' },
|
||||
);
|
||||
etcd_request.success.push(
|
||||
{ requestPut: { key: b64(this.etcd_prefix+'/config/pgs'), value: b64(JSON.stringify(new_config_pgs)) } },
|
||||
{ requestPut: { key: b64(this.etcd_prefix+'/config/pgs'), value: b64(JSON.stringify(this.state.config.pgs)) } },
|
||||
);
|
||||
const res = await this.etcd_call('/kv/txn', etcd_request, this.config.etcd_mon_timeout, 0);
|
||||
if (!res.succeeded)
|
||||
@@ -1794,13 +1758,14 @@ class Mon
|
||||
return res.json;
|
||||
}
|
||||
}
|
||||
await this.die();
|
||||
this.die();
|
||||
}
|
||||
|
||||
async _die(err)
|
||||
_die(err)
|
||||
{
|
||||
// In fact we can just try to rejoin
|
||||
console.error(new Error(err || 'Cluster connection failed'));
|
||||
await this.on_stop(1);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
local_ips(all)
|
||||
@@ -1845,7 +1810,6 @@ function POST(url, body, timeout)
|
||||
clearTimeout(timer_id);
|
||||
let res_body = '';
|
||||
res.setEncoding('utf8');
|
||||
res.on('error', no);
|
||||
res.on('data', chunk => { res_body += chunk; });
|
||||
res.on('end', () =>
|
||||
{
|
||||
@@ -1865,8 +1829,6 @@ function POST(url, body, timeout)
|
||||
}
|
||||
});
|
||||
});
|
||||
req.on('error', no);
|
||||
req.on('close', () => no(new Error('Connection closed prematurely')));
|
||||
req.write(body_text);
|
||||
req.end();
|
||||
});
|
||||
|
@@ -307,18 +307,6 @@ void blockstore_impl_t::check_wait(blockstore_op_t *op)
|
||||
}
|
||||
PRIV(op)->wait_for = 0;
|
||||
}
|
||||
else if (PRIV(op)->wait_for == WAIT_FREE)
|
||||
{
|
||||
if (!data_alloc->get_free_count() && big_to_flush > 0)
|
||||
{
|
||||
#ifdef BLOCKSTORE_DEBUG
|
||||
printf("Still waiting for free space on the data device\n");
|
||||
#endif
|
||||
return;
|
||||
}
|
||||
flusher->release_trim();
|
||||
PRIV(op)->wait_for = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
throw std::runtime_error("BUG: op->wait_for value is unexpected");
|
||||
|
@@ -160,8 +160,6 @@ struct __attribute__((__packed__)) dirty_entry
|
||||
#define WAIT_JOURNAL 3
|
||||
// Suspend operation until the next journal sector buffer is free
|
||||
#define WAIT_JOURNAL_BUFFER 4
|
||||
// Suspend operation until there is some free space on the data device
|
||||
#define WAIT_FREE 5
|
||||
|
||||
struct fulfill_read_t
|
||||
{
|
||||
@@ -265,7 +263,6 @@ class blockstore_impl_t
|
||||
|
||||
struct journal_t journal;
|
||||
journal_flusher_t *flusher;
|
||||
int big_to_flush = 0;
|
||||
int write_iodepth = 0;
|
||||
|
||||
bool live = false, queue_stall = false;
|
||||
|
@@ -201,11 +201,6 @@ void blockstore_impl_t::erase_dirty(blockstore_dirty_db_t::iterator dirty_start,
|
||||
}
|
||||
while (1)
|
||||
{
|
||||
if ((IS_BIG_WRITE(dirty_it->second.state) || IS_DELETE(dirty_it->second.state)) &&
|
||||
IS_STABLE(dirty_it->second.state))
|
||||
{
|
||||
big_to_flush--;
|
||||
}
|
||||
if (IS_BIG_WRITE(dirty_it->second.state) && dirty_it->second.location != clean_loc &&
|
||||
dirty_it->second.location != UINT64_MAX)
|
||||
{
|
||||
|
@@ -446,7 +446,6 @@ void blockstore_impl_t::mark_stable(const obj_ver_id & v, bool forget_dirty)
|
||||
{
|
||||
inode_space_stats[dirty_it->first.oid.inode] += dsk.data_block_size;
|
||||
}
|
||||
big_to_flush++;
|
||||
}
|
||||
else if (IS_DELETE(dirty_it->second.state))
|
||||
{
|
||||
@@ -455,7 +454,6 @@ void blockstore_impl_t::mark_stable(const obj_ver_id & v, bool forget_dirty)
|
||||
sp -= dsk.data_block_size;
|
||||
else
|
||||
inode_space_stats.erase(dirty_it->first.oid.inode);
|
||||
big_to_flush++;
|
||||
}
|
||||
}
|
||||
if (forget_dirty && (IS_BIG_WRITE(dirty_it->second.state) ||
|
||||
|
@@ -271,13 +271,6 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
|
||||
if (loc == UINT64_MAX)
|
||||
{
|
||||
// no space
|
||||
if (big_to_flush > 0)
|
||||
{
|
||||
// hope that some space will be available after flush
|
||||
flusher->request_trim();
|
||||
PRIV(op)->wait_for = WAIT_FREE;
|
||||
return 0;
|
||||
}
|
||||
cancel_all_writes(op, dirty_it, -ENOSPC);
|
||||
return 2;
|
||||
}
|
||||
|
@@ -88,7 +88,7 @@ struct rm_osd_t
|
||||
for (auto & hist_item: pg_cfg.target_history)
|
||||
{
|
||||
int hist_size = 0, hist_rm = 0;
|
||||
for (auto & old_osd: hist_item)
|
||||
for (auto & old_osd: hist_item.osd_set)
|
||||
{
|
||||
if (old_osd != 0)
|
||||
{
|
||||
@@ -382,7 +382,7 @@ struct rm_osd_t
|
||||
for (int i = 0; i < pg_cfg.target_history.size(); i++)
|
||||
{
|
||||
int hist_size = 0, hist_rm = 0;
|
||||
for (auto & old_osd: pg_cfg.target_history[i])
|
||||
for (auto & old_osd: pg_cfg.target_history[i].osd_set)
|
||||
{
|
||||
if (old_osd != 0)
|
||||
{
|
||||
@@ -406,6 +406,15 @@ struct rm_osd_t
|
||||
}
|
||||
if (update_pg_history)
|
||||
{
|
||||
json11::Json::array target_history;
|
||||
for (auto & pgh: pg_cfg.target_history)
|
||||
{
|
||||
target_history.push_back(json11::Json::object {
|
||||
{ "osd_set", pgh.osd_set },
|
||||
{ "min_epoch", pgh.min_epoch },
|
||||
{ "max_epoch", pgh.max_epoch },
|
||||
});
|
||||
}
|
||||
std::string history_key = base64_encode(
|
||||
parent->cli->st_cli.etcd_prefix+"/pg/history/"+
|
||||
std::to_string(pool_cfg.id)+"/"+std::to_string(pg_num)
|
||||
@@ -416,7 +425,7 @@ struct rm_osd_t
|
||||
{ "value", base64_encode(json11::Json(json11::Json::object {
|
||||
{ "epoch", pg_cfg.epoch },
|
||||
{ "all_peers", pg_cfg.all_peers },
|
||||
{ "osd_sets", pg_cfg.target_history },
|
||||
{ "osd_set_epochs", target_history },
|
||||
}).dump()) },
|
||||
} },
|
||||
});
|
||||
|
@@ -96,7 +96,7 @@ inode_list_t* cluster_client_t::list_inode_start(inode_t inode,
|
||||
}
|
||||
for (auto & hist_item: pg.target_history)
|
||||
{
|
||||
for (auto pg_osd: hist_item)
|
||||
for (auto pg_osd: hist_item.osd_set)
|
||||
{
|
||||
if (pg_osd != 0)
|
||||
{
|
||||
@@ -106,11 +106,14 @@ inode_list_t* cluster_client_t::list_inode_start(inode_t inode,
|
||||
}
|
||||
for (osd_num_t peer_osd: all_peers)
|
||||
{
|
||||
r->list_osds.push_back((inode_list_osd_t){
|
||||
.pg = r,
|
||||
.osd_num = peer_osd,
|
||||
.sent = false,
|
||||
});
|
||||
if (st_cli.peer_states.find(peer_osd) != st_cli.peer_states.end())
|
||||
{
|
||||
r->list_osds.push_back((inode_list_osd_t){
|
||||
.pg = r,
|
||||
.osd_num = peer_osd,
|
||||
.sent = false,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@@ -54,13 +54,6 @@ void epoll_manager_t::set_fd_handler(int fd, bool wr, std::function<void(int, in
|
||||
ev.events = (wr ? EPOLLOUT : 0) | EPOLLIN | EPOLLRDHUP | EPOLLET;
|
||||
if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0)
|
||||
{
|
||||
if (errno == ENOENT)
|
||||
{
|
||||
// The FD is probably already closed
|
||||
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL);
|
||||
epoll_handlers.erase(fd);
|
||||
return;
|
||||
}
|
||||
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
|
||||
}
|
||||
epoll_handlers[fd] = handler;
|
||||
|
@@ -902,9 +902,32 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||
history_set.insert(it, pg_osd_num);
|
||||
}
|
||||
}
|
||||
auto it = std::lower_bound(pg_cfg.target_history.begin(), pg_cfg.target_history.end(), history_set);
|
||||
if (it == pg_cfg.target_history.end() || *it != history_set)
|
||||
pg_cfg.target_history.insert(it, history_set);
|
||||
pg_history_set_t epoch_set = { .osd_set = history_set };
|
||||
auto it = std::lower_bound(pg_cfg.target_history.begin(), pg_cfg.target_history.end(), epoch_set);
|
||||
if (it == pg_cfg.target_history.end() || *it != epoch_set)
|
||||
pg_cfg.target_history.insert(it, epoch_set);
|
||||
}
|
||||
// Newer format with epochs
|
||||
for (auto hist_item: value["osd_set_epochs"].array_items())
|
||||
{
|
||||
pg_history_set_t history_set;
|
||||
history_set.min_epoch = hist_item["min_epoch"].uint64_value();
|
||||
history_set.max_epoch = hist_item["max_epoch"].uint64_value();
|
||||
if (history_set.max_epoch < history_set.min_epoch)
|
||||
{
|
||||
history_set.max_epoch = 0;
|
||||
history_set.min_epoch = 0;
|
||||
}
|
||||
for (auto pg_osd: hist_item["osd_set"].array_items())
|
||||
{
|
||||
history_set.osd_set.push_back(pg_osd.uint64_value());
|
||||
}
|
||||
if (history_set.max_epoch || history_set.osd_set.size())
|
||||
{
|
||||
auto it = std::lower_bound(pg_cfg.target_history.begin(), pg_cfg.target_history.end(), history_set);
|
||||
if (it == pg_cfg.target_history.end() || *it != history_set)
|
||||
pg_cfg.target_history.insert(it, history_set);
|
||||
}
|
||||
}
|
||||
// Include these additional OSDs when peering the PG
|
||||
for (auto pg_osd: value["all_peers"].array_items())
|
||||
|
@@ -33,7 +33,7 @@ struct pg_config_t
|
||||
bool exists;
|
||||
osd_num_t primary;
|
||||
std::vector<osd_num_t> target_set;
|
||||
std::vector<std::vector<osd_num_t>> target_history;
|
||||
std::vector<pg_history_set_t> target_history;
|
||||
std::vector<osd_num_t> all_peers;
|
||||
bool pause;
|
||||
osd_num_t cur_primary;
|
||||
|
@@ -192,6 +192,7 @@ class osd_t
|
||||
void reset_stats();
|
||||
json11::Json get_statistics();
|
||||
void report_statistics();
|
||||
void add_pg_history(pg_t & pg);
|
||||
void report_pg_state(pg_t & pg);
|
||||
void report_pg_states();
|
||||
void apply_pg_count();
|
||||
|
@@ -674,7 +674,7 @@ void osd_t::apply_pg_config()
|
||||
}
|
||||
for (auto & hist_item: pg_cfg.target_history)
|
||||
{
|
||||
for (auto pg_osd: hist_item)
|
||||
for (auto pg_osd: hist_item.osd_set)
|
||||
{
|
||||
if (pg_osd != 0)
|
||||
{
|
||||
@@ -868,11 +868,40 @@ void osd_t::report_pg_states()
|
||||
// Prevent race conditions (for the case when the monitor is updating this key at the same time)
|
||||
pg.history_changed = false;
|
||||
std::string history_key = base64_encode(st_cli.etcd_prefix+"/pg/history/"+std::to_string(pg.pool_id)+"/"+std::to_string(pg.pg_num));
|
||||
json11::Json::array target_history;
|
||||
for (auto & pgh: pg.target_history)
|
||||
{
|
||||
target_history.push_back(json11::Json::object {
|
||||
{ "osd_set", pgh.osd_set },
|
||||
{ "min_epoch", pgh.min_epoch },
|
||||
{ "max_epoch", pgh.max_epoch },
|
||||
});
|
||||
}
|
||||
std::vector<osd_num_t> all_peers;
|
||||
for (auto peer_osd: pg.all_peers)
|
||||
{
|
||||
bool found = false;
|
||||
for (auto target_peer: pg.target_set)
|
||||
{
|
||||
if (target_peer == peer_osd)
|
||||
{
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!found)
|
||||
{
|
||||
all_peers.push_back(peer_osd);
|
||||
}
|
||||
}
|
||||
json11::Json::object history_value = {
|
||||
{ "epoch", pg.epoch },
|
||||
{ "all_peers", pg.all_peers },
|
||||
{ "osd_sets", pg.target_history },
|
||||
{ "osd_set_epochs", target_history },
|
||||
};
|
||||
if (all_peers.size())
|
||||
{
|
||||
history_value["all_peers"] = all_peers;
|
||||
}
|
||||
checks.push_back(json11::Json::object {
|
||||
{ "target", "MOD" },
|
||||
{ "key", history_key },
|
||||
|
@@ -287,6 +287,25 @@ bool osd_t::pick_next_recovery(osd_recovery_op_t &op)
|
||||
|
||||
void osd_t::submit_recovery_op(osd_recovery_op_t *op)
|
||||
{
|
||||
// Check if the object is deleted
|
||||
bool is_deleted = false;
|
||||
pool_id_t pool_id = INODE_POOL(op->oid.inode);
|
||||
auto pool_cfg_it = st_cli.pool_config.find(pool_id);
|
||||
if (pool_cfg_it != st_cli.pool_config.end())
|
||||
{
|
||||
pg_num_t pg_num = (op->oid.stripe/pool_cfg_it->second.pg_stripe_size) % pg_counts[pool_id] + 1; // like map_to_pg()
|
||||
auto pg_it = pgs.find({ .pool_id = pool_id, .pg_num = pg_num });
|
||||
if (pg_it != pgs.end())
|
||||
{
|
||||
pg_osd_set_state_t *object_state;
|
||||
get_object_osd_set(pg_it->second, op->oid, pg_it->second.cur_set.data(), &object_state);
|
||||
if (object_state && (object_state->state & OBJ_DELETED))
|
||||
{
|
||||
// Object is deleted, but not from all OSDs - delete remaining copies
|
||||
is_deleted = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
op->osd_op = new osd_op_t();
|
||||
op->osd_op->op_type = OSD_OP_OUT;
|
||||
op->osd_op->req = (osd_any_op_t){
|
||||
@@ -294,7 +313,7 @@ void osd_t::submit_recovery_op(osd_recovery_op_t *op)
|
||||
.header = {
|
||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||
.id = 1,
|
||||
.opcode = OSD_OP_WRITE,
|
||||
.opcode = (uint64_t)(is_deleted ? OSD_OP_DELETE : OSD_OP_WRITE),
|
||||
},
|
||||
.inode = op->oid.inode,
|
||||
.offset = op->oid.stripe,
|
||||
|
24
src/osd_id.h
24
src/osd_id.h
@@ -3,6 +3,8 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <vector>
|
||||
|
||||
#define POOL_SCHEME_REPLICATED 1
|
||||
#define POOL_SCHEME_XOR 2
|
||||
#define POOL_SCHEME_EC 3
|
||||
@@ -38,3 +40,25 @@ inline bool operator != (const pool_pg_num_t & a, const pool_pg_num_t & b)
|
||||
{
|
||||
return a.pool_id != b.pool_id || a.pg_num != b.pg_num;
|
||||
}
|
||||
|
||||
struct pg_history_set_t
|
||||
{
|
||||
std::vector<osd_num_t> osd_set;
|
||||
uint64_t min_epoch, max_epoch;
|
||||
};
|
||||
|
||||
inline bool operator == (const pg_history_set_t & a, const pg_history_set_t & b)
|
||||
{
|
||||
return a.min_epoch == b.min_epoch && a.max_epoch == b.max_epoch && a.osd_set == b.osd_set;
|
||||
}
|
||||
|
||||
inline bool operator != (const pg_history_set_t & a, const pg_history_set_t & b)
|
||||
{
|
||||
return a.min_epoch != b.min_epoch || a.max_epoch != b.max_epoch || a.osd_set != b.osd_set;
|
||||
}
|
||||
|
||||
inline bool operator < (const pg_history_set_t & a, const pg_history_set_t & b)
|
||||
{
|
||||
return a.min_epoch < b.min_epoch || a.min_epoch == b.min_epoch &&
|
||||
(a.max_epoch < b.max_epoch || a.max_epoch == b.max_epoch && a.osd_set < b.osd_set);
|
||||
}
|
||||
|
@@ -191,7 +191,7 @@ struct __attribute__((__packed__)) osd_op_rw_t
|
||||
uint64_t inode;
|
||||
// offset
|
||||
uint64_t offset;
|
||||
// length. 0 means to read all bitmaps of the specified range, but no data.
|
||||
// length
|
||||
uint32_t len;
|
||||
// flags (for future)
|
||||
uint32_t flags;
|
||||
|
@@ -231,7 +231,7 @@ void osd_t::start_pg_peering(pg_t & pg)
|
||||
for (auto & history_set: pg.target_history)
|
||||
{
|
||||
bool found = true;
|
||||
for (auto history_osd: history_set)
|
||||
for (auto history_osd: history_set.osd_set)
|
||||
{
|
||||
if (history_osd != 0)
|
||||
{
|
||||
@@ -471,61 +471,71 @@ void osd_t::finish_stop_pg(pg_t & pg)
|
||||
report_pg_state(pg);
|
||||
}
|
||||
|
||||
static int count_nonzero_osds(const std::vector<osd_num_t> & v)
|
||||
{
|
||||
int n = 0;
|
||||
for (auto & osd_num: v)
|
||||
{
|
||||
if (osd_num != 0)
|
||||
{
|
||||
n++;
|
||||
}
|
||||
}
|
||||
return n;
|
||||
}
|
||||
|
||||
void osd_t::report_pg_state(pg_t & pg)
|
||||
{
|
||||
pg.print_state();
|
||||
this->pg_state_dirty.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
|
||||
if (pg.state == PG_ACTIVE && (pg.target_history.size() > 0 || pg.all_peers.size() > pg.target_set.size()))
|
||||
if ((pg.state == PG_ACTIVE || pg.state == (PG_ACTIVE|PG_LEFT_ON_DEAD)) &&
|
||||
(pg.target_history.size() != 1 ||
|
||||
pg.target_history[0].osd_set != pg.target_set ||
|
||||
pg.target_history[0].min_epoch != 0 ||
|
||||
pg.target_history[0].max_epoch != pg.epoch ||
|
||||
pg.all_peers.size() > count_nonzero_osds(pg.target_set)))
|
||||
{
|
||||
// Clear history of active+clean PGs
|
||||
pg.history_changed = true;
|
||||
pg.target_history.clear();
|
||||
pg.all_peers = pg.target_set;
|
||||
std::sort(pg.all_peers.begin(), pg.all_peers.end());
|
||||
pg.cur_peers = pg.target_set;
|
||||
// Change pg_config at the same time, otherwise our PG reconciling loop may try to apply the old metadata
|
||||
auto & pg_cfg = st_cli.pool_config[pg.pool_id].pg_config[pg.pg_num];
|
||||
pg_cfg.target_history = pg.target_history;
|
||||
pg_cfg.all_peers = pg.all_peers;
|
||||
}
|
||||
else if (pg.state == (PG_ACTIVE|PG_LEFT_ON_DEAD))
|
||||
{
|
||||
// Clear history of active+left_on_dead PGs, but leave dead OSDs in all_peers
|
||||
if (pg.target_history.size())
|
||||
pg.target_history.push_back((pg_history_set_t){
|
||||
.osd_set = pg.cur_set,
|
||||
.min_epoch = 0,
|
||||
.max_epoch = pg.epoch,
|
||||
});
|
||||
if (pg.state == PG_ACTIVE)
|
||||
{
|
||||
pg.history_changed = true;
|
||||
pg.target_history.clear();
|
||||
}
|
||||
std::set<osd_num_t> dead_peers;
|
||||
for (auto pg_osd: pg.all_peers)
|
||||
{
|
||||
dead_peers.insert(pg_osd);
|
||||
}
|
||||
for (auto pg_osd: pg.cur_peers)
|
||||
{
|
||||
dead_peers.erase(pg_osd);
|
||||
}
|
||||
for (auto pg_osd: pg.target_set)
|
||||
{
|
||||
if (pg_osd)
|
||||
pg.all_peers.clear();
|
||||
for (auto pg_osd: pg.target_set)
|
||||
{
|
||||
dead_peers.insert(pg_osd);
|
||||
if (pg_osd)
|
||||
pg.all_peers.push_back(pg_osd);
|
||||
}
|
||||
}
|
||||
auto new_all_peers = std::vector<osd_num_t>(dead_peers.begin(), dead_peers.end());
|
||||
if (pg.all_peers != new_all_peers)
|
||||
else
|
||||
{
|
||||
pg.history_changed = true;
|
||||
pg.all_peers = new_all_peers;
|
||||
// Clear history of active+left_on_dead PGs, but leave dead OSDs in all_peers
|
||||
std::set<osd_num_t> dead_peers(pg.all_peers.begin(), pg.all_peers.end());
|
||||
for (auto pg_osd: pg.cur_peers)
|
||||
{
|
||||
dead_peers.erase(pg_osd);
|
||||
}
|
||||
for (auto pg_osd: pg.target_set)
|
||||
{
|
||||
if (pg_osd)
|
||||
dead_peers.insert(pg_osd);
|
||||
}
|
||||
pg.all_peers.clear();
|
||||
pg.all_peers.insert(pg.all_peers.begin(), dead_peers.begin(), dead_peers.end());
|
||||
}
|
||||
std::sort(pg.all_peers.begin(), pg.all_peers.end());
|
||||
pg.cur_peers.clear();
|
||||
for (auto pg_osd: pg.target_set)
|
||||
{
|
||||
if (pg_osd)
|
||||
{
|
||||
pg.cur_peers.push_back(pg_osd);
|
||||
}
|
||||
}
|
||||
// Change pg_config at the same time, otherwise our PG reconciling loop may try to apply the old metadata
|
||||
auto & pg_cfg = st_cli.pool_config[pg.pool_id].pg_config[pg.pg_num];
|
||||
pg_cfg.target_history = pg.target_history;
|
||||
pg_cfg.all_peers = pg.all_peers;
|
||||
@@ -536,3 +546,51 @@ void osd_t::report_pg_state(pg_t & pg)
|
||||
}
|
||||
report_pg_states();
|
||||
}
|
||||
|
||||
void osd_t::add_pg_history(pg_t & pg)
|
||||
{
|
||||
bool epoch_already_reported = false;
|
||||
int max_epoch_pos = -1;
|
||||
for (int i = pg.target_history.size()-1; i >= 0; i--)
|
||||
{
|
||||
if (pg.target_history[i].min_epoch > pg.epoch)
|
||||
{
|
||||
printf("[PG %u/%u] Invalid PG history: there is an entry with min_epoch (%lu) > current epoch (%lu)\n",
|
||||
pg.pool_id, pg.pg_num, pg.target_history[i].min_epoch, pg.epoch);
|
||||
force_stop(1);
|
||||
return;
|
||||
}
|
||||
if (max_epoch_pos < 0 || pg.target_history[i].max_epoch > pg.target_history[max_epoch_pos].max_epoch)
|
||||
{
|
||||
max_epoch_pos = i;
|
||||
}
|
||||
if (pg.target_history[i].min_epoch <= pg.epoch &&
|
||||
pg.target_history[i].max_epoch >= pg.epoch)
|
||||
{
|
||||
if (pg.target_history[i].osd_set != pg.cur_set)
|
||||
{
|
||||
printf("[PG %u/%u] Invalid target_history: epoch %lu has another OSD set already registered\n", pg.pool_id, pg.pg_num, pg.epoch);
|
||||
force_stop(1);
|
||||
return;
|
||||
}
|
||||
// Already reported
|
||||
epoch_already_reported = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!epoch_already_reported)
|
||||
{
|
||||
if (max_epoch_pos >= 0 && pg.target_history[max_epoch_pos].osd_set == pg.cur_set)
|
||||
{
|
||||
pg.target_history[max_epoch_pos].max_epoch = pg.epoch;
|
||||
}
|
||||
else
|
||||
{
|
||||
pg.target_history.push_back((pg_history_set_t){
|
||||
.osd_set = pg.cur_set,
|
||||
.min_epoch = pg.epoch,
|
||||
.max_epoch = pg.epoch,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -52,6 +52,7 @@ struct pg_obj_state_check_t
|
||||
|
||||
void walk();
|
||||
void start_object();
|
||||
void recheck_version_osd_set();
|
||||
void handle_version();
|
||||
void finish_object();
|
||||
};
|
||||
@@ -84,6 +85,7 @@ void pg_obj_state_check_t::walk()
|
||||
pg->state = PG_INCOMPLETE | PG_HAS_INVALID;
|
||||
return;
|
||||
}
|
||||
// Activate PG
|
||||
if (pg->pg_cursize < pg->pg_size)
|
||||
{
|
||||
// Activate as degraded
|
||||
@@ -108,13 +110,85 @@ void pg_obj_state_check_t::start_object()
|
||||
n_unstable = n_invalid = 0;
|
||||
}
|
||||
|
||||
// FIXME: Put this under a feature flag
|
||||
// FIXME: Implement OSD 'cookies' to be fool-proof so that if an OSD is wiped and
|
||||
// recreated it doesn't also wipe all other data
|
||||
void pg_obj_state_check_t::recheck_version_osd_set()
|
||||
{
|
||||
uint64_t epoch = (last_ver >> (64-PG_EPOCH_BITS));
|
||||
if (!pg->epoch_sizes_differ && n_copies >= pg->pg_size)
|
||||
{
|
||||
// Enough copies
|
||||
return;
|
||||
}
|
||||
auto epoch_it = pg->target_by_epoch.lower_bound(epoch);
|
||||
if (epoch_it == pg->target_by_epoch.end() || epoch_it->second.min_epoch > epoch)
|
||||
{
|
||||
// Epoch info not found
|
||||
return;
|
||||
}
|
||||
if (pg->epoch_sizes_differ && n_copies >= epoch_it->second.osd_set.size())
|
||||
{
|
||||
// For the (unlikely) case of PG size change - enough copies
|
||||
return;
|
||||
}
|
||||
// Recheck version against the OSD set corresponding to epoch if it's known
|
||||
if (epoch_it != pg->target_by_epoch.end() && epoch_it->second.min_epoch <= epoch)
|
||||
{
|
||||
for (int j = 0; j < epoch_it->second.osd_set.size(); j++)
|
||||
{
|
||||
osd_num_t cur_osd = epoch_it->second.osd_set[j];
|
||||
bool found = false;
|
||||
for (int i = ver_start; i < ver_end; i++)
|
||||
{
|
||||
if (cur_osd == list[i].osd_num)
|
||||
{
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!found)
|
||||
{
|
||||
// Check if a newer version is present on the same OSD and masks the older one
|
||||
// It happens for overwritten replicas in the following case:
|
||||
// Version 1 is present on OSD 1,2,3
|
||||
// Client tries to write Version 2
|
||||
// OSD 3 succeeds to write Version 2, others don't. OSD 3 crashes, then starts again
|
||||
// OSD 1 sees: version 1 on OSD 1,2 and version 2 on OSD 3
|
||||
// (version 1 on OSD 3 is already masked/removed)
|
||||
// Version 1 is not present on a full set, but it must not be removed
|
||||
if (replicated)
|
||||
{
|
||||
for (int i = obj_start; i < ver_start; i++)
|
||||
{
|
||||
if (cur_osd == list[i].osd_num)
|
||||
{
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!found)
|
||||
{
|
||||
// Object is missing from one of the OSDs of that set.
|
||||
// This means it's deleted or moved and we can safely drop this version.
|
||||
target_ver = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void pg_obj_state_check_t::handle_version()
|
||||
{
|
||||
if (!target_ver && last_ver != list[list_pos].version && (n_stable > 0 || n_roles >= pg->pg_data_size))
|
||||
{
|
||||
// Version is either stable or recoverable
|
||||
target_ver = last_ver;
|
||||
ver_end = list_pos;
|
||||
target_ver = last_ver;
|
||||
// Skip versions that are not present on any of OSDs for the corresponding PG epoch
|
||||
recheck_version_osd_set();
|
||||
}
|
||||
if (!target_ver)
|
||||
{
|
||||
@@ -178,6 +252,8 @@ void pg_obj_state_check_t::finish_object()
|
||||
// Version is either stable or recoverable
|
||||
target_ver = last_ver;
|
||||
ver_end = list_pos;
|
||||
// Skip versions that are not present on any of OSDs for the corresponding PG epoch
|
||||
recheck_version_osd_set();
|
||||
}
|
||||
obj_end = list_pos;
|
||||
// Remember the decision
|
||||
@@ -231,11 +307,23 @@ void pg_obj_state_check_t::finish_object()
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!target_ver)
|
||||
if (!target_ver && (n_unstable >= obj_end-obj_start))
|
||||
{
|
||||
return;
|
||||
}
|
||||
if (!replicated && n_roles < pg->pg_data_size)
|
||||
if (!target_ver)
|
||||
{
|
||||
// Object is present, but should not be :) i.e. it's a deleted object that reappeared
|
||||
if (log_level > 1)
|
||||
{
|
||||
printf("Object is deleted: %lx:%lx version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver);
|
||||
}
|
||||
state = OBJ_DELETED;
|
||||
pg->state = pg->state | PG_HAS_MISPLACED;
|
||||
// To record all versions as outdated:
|
||||
ver_end = obj_start;
|
||||
}
|
||||
else if (!replicated && n_roles < pg->pg_data_size)
|
||||
{
|
||||
if (log_level > 1)
|
||||
{
|
||||
@@ -263,7 +351,7 @@ void pg_obj_state_check_t::finish_object()
|
||||
pg->state = pg->state | PG_HAS_MISPLACED;
|
||||
}
|
||||
if (log_level > 1 && (state & (OBJ_INCOMPLETE | OBJ_DEGRADED)) ||
|
||||
log_level > 2 && (state & OBJ_MISPLACED))
|
||||
log_level > 2 && (state & (OBJ_MISPLACED | OBJ_DELETED)))
|
||||
{
|
||||
for (int i = obj_start; i < obj_end; i++)
|
||||
{
|
||||
@@ -272,9 +360,9 @@ void pg_obj_state_check_t::finish_object()
|
||||
}
|
||||
}
|
||||
pg->total_count++;
|
||||
if (state != 0 || ver_end < obj_end)
|
||||
osd_set.clear();
|
||||
if (target_ver != 0 && (state != 0 || ver_end < obj_end))
|
||||
{
|
||||
osd_set.clear();
|
||||
for (int i = ver_start; i < ver_end; i++)
|
||||
{
|
||||
osd_set.push_back((pg_obj_loc_t){
|
||||
@@ -297,7 +385,8 @@ void pg_obj_state_check_t::finish_object()
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (j >= osd_set.size() && pg->cur_set[list[i].oid.stripe & STRIPE_MASK] != list[i].osd_num)
|
||||
if (j >= osd_set.size() && ((state & OBJ_DELETED) ||
|
||||
pg->cur_set[list[i].oid.stripe & STRIPE_MASK] != list[i].osd_num))
|
||||
{
|
||||
osd_set.push_back((pg_obj_loc_t){
|
||||
.role = (list[i].oid.stripe & STRIPE_MASK),
|
||||
@@ -312,7 +401,11 @@ void pg_obj_state_check_t::finish_object()
|
||||
}
|
||||
}
|
||||
}
|
||||
if (target_ver < max_ver)
|
||||
if (state & OBJ_DELETED)
|
||||
{
|
||||
pg->ver_override[oid] = max_ver;
|
||||
}
|
||||
else if (target_ver < max_ver)
|
||||
{
|
||||
pg->ver_override[oid] = target_ver;
|
||||
}
|
||||
@@ -366,6 +459,7 @@ void pg_obj_state_check_t::finish_object()
|
||||
}
|
||||
else
|
||||
{
|
||||
assert(it->second.state == state);
|
||||
it->second.object_count++;
|
||||
}
|
||||
if (state & OBJ_INCOMPLETE)
|
||||
@@ -386,6 +480,34 @@ void pg_obj_state_check_t::finish_object()
|
||||
// FIXME: Write at least some tests for this function
|
||||
void pg_t::calc_object_states(int log_level)
|
||||
{
|
||||
// Calculate intersections of target_history with cur_peers
|
||||
for (auto & history_item: target_history)
|
||||
{
|
||||
if (history_item.max_epoch)
|
||||
{
|
||||
pg_history_set_t & set_copy = target_by_epoch[history_item.max_epoch];
|
||||
set_copy.min_epoch = history_item.min_epoch;
|
||||
set_copy.max_epoch = history_item.max_epoch;
|
||||
for (int i = 0; i < history_item.osd_set.size(); i++)
|
||||
{
|
||||
if (history_item.osd_set[i] != 0)
|
||||
{
|
||||
for (int j = 0; j < cur_set.size(); j++)
|
||||
{
|
||||
if (cur_set[j] == history_item.osd_set[i])
|
||||
{
|
||||
set_copy.osd_set.push_back(history_item.osd_set[i]);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (set_copy.osd_set.size() != pg_size)
|
||||
{
|
||||
epoch_sizes_differ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Copy all object lists into one array
|
||||
pg_obj_state_check_t st;
|
||||
st.log_level = log_level;
|
||||
@@ -422,10 +544,18 @@ void pg_t::calc_object_states(int log_level)
|
||||
std::sort(st.list.begin(), st.list.end());
|
||||
// Walk over it and check object states
|
||||
st.walk();
|
||||
target_by_epoch.clear(); // needed only in this function
|
||||
if (this->state != PG_ACTIVE)
|
||||
{
|
||||
assert(epoch != (((uint64_t)1 << PG_EPOCH_BITS)-1));
|
||||
epoch++;
|
||||
for (auto & pgh: target_history)
|
||||
{
|
||||
if (epoch <= pgh.max_epoch)
|
||||
{
|
||||
epoch = pgh.max_epoch+1;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (log_level > 0)
|
||||
{
|
||||
|
@@ -89,7 +89,9 @@ struct pg_t
|
||||
// epoch number - should increase with each non-clean activation of the PG
|
||||
uint64_t epoch = 0, reported_epoch = 0;
|
||||
// target history and all potential peers
|
||||
std::vector<std::vector<osd_num_t>> target_history;
|
||||
std::vector<pg_history_set_t> target_history;
|
||||
std::map<uint64_t, pg_history_set_t> target_by_epoch;
|
||||
bool epoch_sizes_differ = false;
|
||||
std::vector<osd_num_t> all_peers;
|
||||
bool history_changed = false;
|
||||
// peer list from the last peering event
|
||||
|
@@ -186,22 +186,10 @@ void osd_t::continue_primary_read(osd_op_t *cur_op)
|
||||
cur_op->reply.rw.bitmap_len = 0;
|
||||
{
|
||||
auto & pg = pgs.at({ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num });
|
||||
if (cur_op->req.rw.len == 0)
|
||||
for (int role = 0; role < op_data->pg_data_size; role++)
|
||||
{
|
||||
// len=0 => bitmap read
|
||||
for (int role = 0; role < op_data->pg_data_size; role++)
|
||||
{
|
||||
op_data->stripes[role].read_start = 0;
|
||||
op_data->stripes[role].read_end = UINT32_MAX;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (int role = 0; role < op_data->pg_data_size; role++)
|
||||
{
|
||||
op_data->stripes[role].read_start = op_data->stripes[role].req_start;
|
||||
op_data->stripes[role].read_end = op_data->stripes[role].req_end;
|
||||
}
|
||||
op_data->stripes[role].read_start = op_data->stripes[role].req_start;
|
||||
op_data->stripes[role].read_end = op_data->stripes[role].req_end;
|
||||
}
|
||||
// Determine version
|
||||
auto vo_it = pg.ver_override.find(op_data->oid);
|
||||
@@ -211,6 +199,21 @@ void osd_t::continue_primary_read(osd_op_t *cur_op)
|
||||
{
|
||||
// PG may be degraded or have misplaced objects
|
||||
op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state);
|
||||
if (op_data->object_state && (op_data->object_state->state & OBJ_DELETED))
|
||||
{
|
||||
// Object is deleted, just return zeroes
|
||||
cur_op->reply.rw.version = 0;
|
||||
cur_op->reply.rw.bitmap_len = op_data->pg_data_size * clean_entry_bitmap_size;
|
||||
uint64_t zero_len = cur_op->reply.rw.bitmap_len + cur_op->req.rw.len;
|
||||
while (zero_len >= 0)
|
||||
{
|
||||
uint64_t cur_zero_len = zero_buffer_size > zero_len ? zero_len : zero_buffer_size;
|
||||
cur_op->iov.push_back(zero_buffer, cur_zero_len);
|
||||
zero_len -= cur_zero_len;
|
||||
}
|
||||
finish_op(cur_op, cur_op->req.rw.len);
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (pg.state == PG_ACTIVE || op_data->scheme == POOL_SCHEME_REPLICATED)
|
||||
{
|
||||
@@ -302,7 +305,7 @@ void osd_t::remove_object_from_state(object_id & oid, pg_osd_set_state_t *object
|
||||
report_pg_state(pg);
|
||||
}
|
||||
}
|
||||
else if (object_state->state & OBJ_MISPLACED)
|
||||
else if (object_state->state & (OBJ_MISPLACED | OBJ_DELETED))
|
||||
{
|
||||
this->misplaced_objects--;
|
||||
pg.misplaced_objects.erase(oid);
|
||||
@@ -341,12 +344,6 @@ void osd_t::continue_primary_del(osd_op_t *cur_op)
|
||||
else if (op_data->st == 4) goto resume_4;
|
||||
else if (op_data->st == 5) goto resume_5;
|
||||
assert(op_data->st == 0);
|
||||
// Delete is forbidden even in active PGs if they're also degraded or have previous dead OSDs
|
||||
if (pg.state & (PG_DEGRADED | PG_LEFT_ON_DEAD))
|
||||
{
|
||||
finish_op(cur_op, -EBUSY);
|
||||
return;
|
||||
}
|
||||
if (!check_write_queue(cur_op, pg))
|
||||
{
|
||||
return;
|
||||
@@ -354,11 +351,18 @@ void osd_t::continue_primary_del(osd_op_t *cur_op)
|
||||
resume_1:
|
||||
// Determine which OSDs contain this object and delete it
|
||||
op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state);
|
||||
// Submit 1 read to determine the actual version number
|
||||
submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, op_data->prev_set, cur_op);
|
||||
if (op_data->object_state && (op_data->object_state->state & OBJ_DELETED))
|
||||
{
|
||||
op_data->fact_ver = pg.ver_override[op_data->oid];
|
||||
}
|
||||
else
|
||||
{
|
||||
// Submit 1 read to determine the actual version number
|
||||
submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, op_data->prev_set, cur_op);
|
||||
resume_2:
|
||||
op_data->st = 2;
|
||||
return;
|
||||
op_data->st = 2;
|
||||
return;
|
||||
}
|
||||
resume_3:
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
|
@@ -133,6 +133,12 @@ int osd_t::collect_bitmap_requests(osd_op_t *cur_op, pg_t & pg, std::vector<bitm
|
||||
uint64_t target_version = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX;
|
||||
pg_osd_set_state_t *object_state;
|
||||
uint64_t* cur_set = get_object_osd_set(pg, cur_oid, pg.cur_set.data(), &object_state);
|
||||
if (object_state && (object_state->state & OBJ_DELETED))
|
||||
{
|
||||
// Object is deleted, zero out the bitmap
|
||||
memset((uint8_t*)op_data->snapshot_bitmaps + chain_num*clean_entry_bitmap_size, 0, clean_entry_bitmap_size);
|
||||
continue;
|
||||
}
|
||||
if (pg.scheme == POOL_SCHEME_REPLICATED)
|
||||
{
|
||||
osd_num_t read_target = 0;
|
||||
|
@@ -119,17 +119,19 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, const ui
|
||||
if (osd_set[role] != 0 && (wr || !rep && stripes[role].read_end != 0))
|
||||
n_subops++;
|
||||
}
|
||||
if (!n_subops && (submit_type == SUBMIT_RMW_READ || rep))
|
||||
if (!n_subops && (submit_type == SUBMIT_RMW_READ || rep) && zero_read >= 0)
|
||||
n_subops = 1;
|
||||
else
|
||||
zero_read = -1;
|
||||
osd_op_t *subops = new osd_op_t[n_subops];
|
||||
op_data->fact_ver = 0;
|
||||
op_data->done = op_data->errors = op_data->errcode = 0;
|
||||
op_data->n_subops = n_subops;
|
||||
op_data->subops = subops;
|
||||
int sent = submit_primary_subop_batch(submit_type, op_data->oid.inode, op_version, op_data->stripes, osd_set, cur_op, 0, zero_read);
|
||||
assert(sent == n_subops);
|
||||
if (n_subops > 0)
|
||||
{
|
||||
op_data->subops = new osd_op_t[n_subops];
|
||||
int sent = submit_primary_subop_batch(submit_type, op_data->oid.inode, op_version, op_data->stripes, osd_set, cur_op, 0, zero_read);
|
||||
assert(sent == n_subops);
|
||||
}
|
||||
}
|
||||
|
||||
int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t op_version,
|
||||
@@ -151,13 +153,6 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o
|
||||
{
|
||||
int stripe_num = rep ? 0 : role;
|
||||
osd_op_t *subop = op_data->subops + i;
|
||||
uint32_t subop_len = wr
|
||||
? stripes[stripe_num].write_end - stripes[stripe_num].write_start
|
||||
: stripes[stripe_num].read_end - stripes[stripe_num].read_start;
|
||||
if (!wr && stripes[stripe_num].read_end == UINT32_MAX)
|
||||
{
|
||||
subop_len = 0;
|
||||
}
|
||||
if (role_osd_num == this->osd_num)
|
||||
{
|
||||
clock_gettime(CLOCK_REALTIME, &subop->tv_begin);
|
||||
@@ -176,7 +171,7 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o
|
||||
},
|
||||
.version = op_version,
|
||||
.offset = wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start,
|
||||
.len = subop_len,
|
||||
.len = wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start,
|
||||
.buf = wr ? stripes[stripe_num].write_buf : stripes[stripe_num].read_buf,
|
||||
.bitmap = stripes[stripe_num].bmp_buf,
|
||||
});
|
||||
@@ -206,7 +201,7 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o
|
||||
},
|
||||
.version = op_version,
|
||||
.offset = wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start,
|
||||
.len = subop_len,
|
||||
.len = wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start,
|
||||
.attr_len = wr ? clean_entry_bitmap_size : 0,
|
||||
};
|
||||
#ifdef OSD_DEBUG
|
||||
@@ -225,9 +220,9 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o
|
||||
}
|
||||
else
|
||||
{
|
||||
if (subop_len > 0)
|
||||
if (stripes[stripe_num].read_end > stripes[stripe_num].read_start)
|
||||
{
|
||||
subop->iov.push_back(stripes[stripe_num].read_buf, subop_len);
|
||||
subop->iov.push_back(stripes[stripe_num].read_buf, stripes[stripe_num].read_end - stripes[stripe_num].read_start);
|
||||
}
|
||||
}
|
||||
subop->callback = [cur_op, this](osd_op_t *subop)
|
||||
|
@@ -156,21 +156,13 @@ resume_3:
|
||||
{
|
||||
// Report newer epoch before writing
|
||||
// FIXME: We don't have to report all changed PG states here
|
||||
this->pg_state_dirty.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
|
||||
if (pg.state != PG_ACTIVE)
|
||||
{
|
||||
// Check that current OSD set is in history and/or add it there
|
||||
std::vector<osd_num_t> history_set;
|
||||
for (auto peer_osd: pg.cur_set)
|
||||
if (peer_osd != 0)
|
||||
history_set.push_back(peer_osd);
|
||||
std::sort(history_set.begin(), history_set.end());
|
||||
auto it = std::lower_bound(pg.target_history.begin(), pg.target_history.end(), history_set);
|
||||
if (it == pg.target_history.end() || *it != history_set)
|
||||
pg.target_history.insert(it, history_set);
|
||||
add_pg_history(pg);
|
||||
}
|
||||
pg.history_changed = true;
|
||||
report_pg_states();
|
||||
report_pg_state(pg);
|
||||
resume_10:
|
||||
if (pg.epoch > pg.reported_epoch)
|
||||
{
|
||||
|
@@ -28,9 +28,7 @@ static inline void extend_read(uint32_t start, uint32_t end, osd_rmw_stripe_t &
|
||||
}
|
||||
else
|
||||
{
|
||||
if (stripe.read_end < end && end != UINT32_MAX ||
|
||||
// UINT32_MAX means that stripe only needs bitmap, end != 0 => needs also data
|
||||
stripe.read_end == UINT32_MAX && end != 0)
|
||||
if (stripe.read_end < end)
|
||||
stripe.read_end = end;
|
||||
if (stripe.read_start > start)
|
||||
stripe.read_start = start;
|
||||
@@ -107,30 +105,24 @@ void reconstruct_stripes_xor(osd_rmw_stripe_t *stripes, int pg_size, uint32_t bi
|
||||
}
|
||||
else if (prev >= 0)
|
||||
{
|
||||
if (stripes[role].read_end != UINT32_MAX)
|
||||
{
|
||||
assert(stripes[role].read_start >= stripes[prev].read_start &&
|
||||
stripes[role].read_start >= stripes[other].read_start);
|
||||
memxor(
|
||||
(uint8_t*)stripes[prev].read_buf + (stripes[role].read_start - stripes[prev].read_start),
|
||||
(uint8_t*)stripes[other].read_buf + (stripes[role].read_start - stripes[other].read_start),
|
||||
stripes[role].read_buf, stripes[role].read_end - stripes[role].read_start
|
||||
);
|
||||
}
|
||||
assert(stripes[role].read_start >= stripes[prev].read_start &&
|
||||
stripes[role].read_start >= stripes[other].read_start);
|
||||
memxor(
|
||||
(uint8_t*)stripes[prev].read_buf + (stripes[role].read_start - stripes[prev].read_start),
|
||||
(uint8_t*)stripes[other].read_buf + (stripes[role].read_start - stripes[other].read_start),
|
||||
stripes[role].read_buf, stripes[role].read_end - stripes[role].read_start
|
||||
);
|
||||
memxor(stripes[prev].bmp_buf, stripes[other].bmp_buf, stripes[role].bmp_buf, bitmap_size);
|
||||
prev = -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (stripes[role].read_end != UINT32_MAX)
|
||||
{
|
||||
assert(stripes[role].read_start >= stripes[other].read_start);
|
||||
memxor(
|
||||
stripes[role].read_buf,
|
||||
(uint8_t*)stripes[other].read_buf + (stripes[role].read_start - stripes[other].read_start),
|
||||
stripes[role].read_buf, stripes[role].read_end - stripes[role].read_start
|
||||
);
|
||||
}
|
||||
assert(stripes[role].read_start >= stripes[other].read_start);
|
||||
memxor(
|
||||
stripes[role].read_buf,
|
||||
(uint8_t*)stripes[other].read_buf + (stripes[role].read_start - stripes[other].read_start),
|
||||
stripes[role].read_buf, stripes[role].read_end - stripes[role].read_start
|
||||
);
|
||||
memxor(stripes[role].bmp_buf, stripes[other].bmp_buf, stripes[role].bmp_buf, bitmap_size);
|
||||
}
|
||||
}
|
||||
@@ -364,23 +356,20 @@ void reconstruct_stripes_ec(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsi
|
||||
uint64_t read_start = 0, read_end = 0;
|
||||
auto recover_seq = [&]()
|
||||
{
|
||||
if (read_end != UINT32_MAX)
|
||||
int orig = 0;
|
||||
for (int other = 0; other < pg_size; other++)
|
||||
{
|
||||
int orig = 0;
|
||||
for (int other = 0; other < pg_size && orig < pg_minsize; other++)
|
||||
if (stripes[other].read_end != 0 && !stripes[other].missing)
|
||||
{
|
||||
if (stripes[other].read_end != 0 && !stripes[other].missing)
|
||||
{
|
||||
assert(stripes[other].read_start <= read_start);
|
||||
assert(stripes[other].read_end >= read_end);
|
||||
data_ptrs[orig++] = (uint8_t*)stripes[other].read_buf + (read_start - stripes[other].read_start);
|
||||
}
|
||||
assert(stripes[other].read_start <= read_start);
|
||||
assert(stripes[other].read_end >= read_end);
|
||||
data_ptrs[orig++] = (uint8_t*)stripes[other].read_buf + (read_start - stripes[other].read_start);
|
||||
}
|
||||
ec_encode_data(
|
||||
read_end-read_start, pg_minsize, wanted, dectable + wanted_base*32*pg_minsize,
|
||||
data_ptrs, data_ptrs + pg_minsize
|
||||
);
|
||||
}
|
||||
ec_encode_data(
|
||||
read_end-read_start, pg_minsize, wanted, dectable + wanted_base*32*pg_minsize,
|
||||
data_ptrs, data_ptrs + pg_minsize
|
||||
);
|
||||
wanted_base += wanted;
|
||||
wanted = 0;
|
||||
};
|
||||
@@ -402,32 +391,6 @@ void reconstruct_stripes_ec(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsi
|
||||
{
|
||||
recover_seq();
|
||||
}
|
||||
// Recover bitmaps
|
||||
if (bitmap_size > 0)
|
||||
{
|
||||
for (int role = 0; role < pg_minsize; role++)
|
||||
{
|
||||
if (stripes[role].read_end != 0 && stripes[role].missing)
|
||||
{
|
||||
data_ptrs[pg_minsize + (wanted++)] = (uint8_t*)stripes[role].bmp_buf;
|
||||
}
|
||||
}
|
||||
if (wanted > 0)
|
||||
{
|
||||
int orig = 0;
|
||||
for (int other = 0; other < pg_size && orig < pg_minsize; other++)
|
||||
{
|
||||
if (stripes[other].read_end != 0 && !stripes[other].missing)
|
||||
{
|
||||
data_ptrs[orig++] = (uint8_t*)stripes[other].bmp_buf;
|
||||
}
|
||||
}
|
||||
ec_encode_data(
|
||||
bitmap_size, pg_minsize, wanted, dectable,
|
||||
data_ptrs, data_ptrs + pg_minsize
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
#else
|
||||
void reconstruct_stripes_ec(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize, uint32_t bitmap_size)
|
||||
@@ -449,8 +412,7 @@ void reconstruct_stripes_ec(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsi
|
||||
if (stripes[role].read_end != 0 && stripes[role].missing)
|
||||
{
|
||||
recovered = true;
|
||||
if (stripes[role].read_end > stripes[role].read_start &&
|
||||
stripes[role].read_end != UINT32_MAX)
|
||||
if (stripes[role].read_end > stripes[role].read_start)
|
||||
{
|
||||
for (int other = 0; other < pg_size; other++)
|
||||
{
|
||||
@@ -569,8 +531,7 @@ void* alloc_read_buffer(osd_rmw_stripe_t *stripes, int read_pg_size, uint64_t ad
|
||||
uint64_t buf_size = add_size;
|
||||
for (int role = 0; role < read_pg_size; role++)
|
||||
{
|
||||
if (stripes[role].read_end != 0 &&
|
||||
stripes[role].read_end != UINT32_MAX)
|
||||
if (stripes[role].read_end != 0)
|
||||
{
|
||||
buf_size += stripes[role].read_end - stripes[role].read_start;
|
||||
}
|
||||
@@ -580,8 +541,7 @@ void* alloc_read_buffer(osd_rmw_stripe_t *stripes, int read_pg_size, uint64_t ad
|
||||
uint64_t buf_pos = add_size;
|
||||
for (int role = 0; role < read_pg_size; role++)
|
||||
{
|
||||
if (stripes[role].read_end != 0 &&
|
||||
stripes[role].read_end != UINT32_MAX)
|
||||
if (stripes[role].read_end != 0)
|
||||
{
|
||||
stripes[role].read_buf = (uint8_t*)buf + buf_pos;
|
||||
buf_pos += stripes[role].read_end - stripes[role].read_start;
|
||||
|
@@ -23,7 +23,6 @@ struct osd_rmw_stripe_t
|
||||
void *read_buf, *write_buf;
|
||||
void *bmp_buf;
|
||||
uint32_t req_start, req_end;
|
||||
// read_end=UINT32_MAX means to only read bitmap, but not data
|
||||
uint32_t read_start, read_end;
|
||||
uint32_t write_start, write_end;
|
||||
bool missing;
|
||||
|
@@ -27,7 +27,6 @@ void test13();
|
||||
void test14();
|
||||
void test15(bool second);
|
||||
void test16();
|
||||
void test_recover_22_d2();
|
||||
|
||||
int main(int narg, char *args[])
|
||||
{
|
||||
@@ -62,8 +61,6 @@ int main(int narg, char *args[])
|
||||
test15(true);
|
||||
// Test 16
|
||||
test16();
|
||||
// Test 17
|
||||
test_recover_22_d2();
|
||||
// End
|
||||
printf("all ok\n");
|
||||
return 0;
|
||||
@@ -1048,12 +1045,7 @@ void test16()
|
||||
assert(stripes[3].read_buf == (uint8_t*)read_buf+2*128*1024);
|
||||
set_pattern(stripes[1].read_buf, 128*1024, PATTERN2);
|
||||
memcpy(stripes[3].read_buf, rmw_buf, 128*1024);
|
||||
memset(stripes[0].bmp_buf, 0xa8, bmp);
|
||||
memset(stripes[2].bmp_buf, 0xb7, bmp);
|
||||
assert(bitmaps[1] == 0xFFFFFFFF);
|
||||
assert(bitmaps[3] == 0xF1F1F1F1);
|
||||
reconstruct_stripes_ec(stripes, 4, 2, bmp);
|
||||
assert(*(uint32_t*)stripes[3].bmp_buf == 0xF1F1F1F1);
|
||||
assert(bitmaps[0] == 0xFFFFFFFF);
|
||||
check_pattern(stripes[0].read_buf, 128*1024, PATTERN1);
|
||||
free(read_buf);
|
||||
@@ -1062,47 +1054,3 @@ void test16()
|
||||
free(write_buf);
|
||||
use_ec(4, 2, false);
|
||||
}
|
||||
|
||||
/***
|
||||
|
||||
17. EC 2+2 recover second data block
|
||||
|
||||
***/
|
||||
|
||||
void test_recover_22_d2()
|
||||
{
|
||||
const int bmp = 128*1024 / 4096 / 8;
|
||||
use_ec(4, 2, true);
|
||||
osd_num_t osd_set[4] = { 1, 0, 3, 4 };
|
||||
osd_rmw_stripe_t stripes[4] = {};
|
||||
unsigned bitmaps[4] = { 0 };
|
||||
// Read 0-256K
|
||||
split_stripes(2, 128*1024, 0, 256*1024, stripes);
|
||||
assert(stripes[0].req_start == 0 && stripes[0].req_end == 128*1024);
|
||||
assert(stripes[1].req_start == 0 && stripes[1].req_end == 128*1024);
|
||||
assert(stripes[2].req_start == 0 && stripes[2].req_end == 0);
|
||||
assert(stripes[3].req_start == 0 && stripes[3].req_end == 0);
|
||||
uint8_t *data_buf = (uint8_t*)malloc_or_die(128*1024*4);
|
||||
for (int i = 0; i < 4; i++)
|
||||
{
|
||||
stripes[i].read_start = stripes[i].req_start;
|
||||
stripes[i].read_end = stripes[i].req_end;
|
||||
stripes[i].read_buf = data_buf + i*128*1024;
|
||||
stripes[i].bmp_buf = bitmaps + i;
|
||||
}
|
||||
// Read using parity
|
||||
assert(extend_missing_stripes(stripes, osd_set, 2, 4) == 0);
|
||||
assert(stripes[2].read_start == 0 && stripes[2].read_end == 128*1024);
|
||||
assert(stripes[3].read_start == 0 && stripes[3].read_end == 0);
|
||||
bitmaps[0] = 0xffffffff;
|
||||
bitmaps[2] = 0;
|
||||
set_pattern(stripes[0].read_buf, 128*1024, PATTERN1);
|
||||
set_pattern(stripes[2].read_buf, 128*1024, PATTERN1^PATTERN2);
|
||||
// Reconstruct
|
||||
reconstruct_stripes_ec(stripes, 4, 2, bmp);
|
||||
check_pattern(stripes[1].read_buf, 128*1024, PATTERN2);
|
||||
assert(bitmaps[1] == 0xFFFFFFFF);
|
||||
free(data_buf);
|
||||
// Done
|
||||
use_ec(4, 2, false);
|
||||
}
|
||||
|
@@ -32,6 +32,7 @@
|
||||
#define OBJ_DEGRADED 0x02
|
||||
#define OBJ_INCOMPLETE 0x04
|
||||
#define OBJ_MISPLACED 0x08
|
||||
#define OBJ_DELETED 0x10
|
||||
#define OBJ_NEEDS_STABLE 0x10000
|
||||
#define OBJ_NEEDS_ROLLBACK 0x20000
|
||||
|
||||
|
@@ -36,12 +36,12 @@ for i in $(seq 2 $ETCD_COUNT); do
|
||||
ETCD_URL="$ETCD_URL,http://$ETCD_IP:$((ETCD_PORT+2*i-2))"
|
||||
ETCD_CLUSTER="$ETCD_CLUSTER,etcd$i=http://$ETCD_IP:$((ETCD_PORT+2*i-1))"
|
||||
done
|
||||
ETCDCTL="${ETCD}ctl --endpoints=$ETCD_URL --dial-timeout=5s --command-timeout=10s"
|
||||
ETCDCTL="${ETCD}ctl --endpoints=$ETCD_URL"
|
||||
|
||||
start_etcd()
|
||||
{
|
||||
local i=$1
|
||||
ionice -c2 -n0 $ETCD -name etcd$i --data-dir ./testdata/etcd$i \
|
||||
$ETCD -name etcd$i --data-dir ./testdata/etcd$i \
|
||||
--advertise-client-urls http://$ETCD_IP:$((ETCD_PORT+2*i-2)) --listen-client-urls http://$ETCD_IP:$((ETCD_PORT+2*i-2)) \
|
||||
--initial-advertise-peer-urls http://$ETCD_IP:$((ETCD_PORT+2*i-1)) --listen-peer-urls http://$ETCD_IP:$((ETCD_PORT+2*i-1)) \
|
||||
--initial-cluster-token vitastor-tests-etcd --initial-cluster-state new \
|
||||
@@ -53,11 +53,8 @@ start_etcd()
|
||||
for i in $(seq 1 $ETCD_COUNT); do
|
||||
start_etcd $i
|
||||
done
|
||||
for i in {1..10}; do
|
||||
${ETCD}ctl --endpoints=$ETCD_URL --dial-timeout=1s --command-timeout=1s member list >/dev/null && break
|
||||
done
|
||||
if [[ $i = 10 ]]; then
|
||||
format_error "Failed to start etcd"
|
||||
if [ $ETCD_COUNT -gt 1 ]; then
|
||||
sleep 1
|
||||
fi
|
||||
|
||||
echo leak:fio >> testdata/lsan-suppress.txt
|
||||
|
@@ -39,7 +39,7 @@ done
|
||||
cd mon
|
||||
npm install
|
||||
cd ..
|
||||
node mon/mon-main.js --etcd_url $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 --restart_interval 5 &>./testdata/mon.log &
|
||||
node mon/mon-main.js --etcd_url $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 &>./testdata/mon.log &
|
||||
MON_PID=$!
|
||||
|
||||
if [ "$SCHEME" = "ec" ]; then
|
||||
@@ -100,13 +100,13 @@ wait_finish_rebalance()
|
||||
sec=$1
|
||||
i=0
|
||||
while [[ $i -lt $sec ]]; do
|
||||
($ETCDCTL get --prefix /vitastor/pg/state/ --print-value-only | jq -s -e '([ .[] | select(.state == ["active"] or .state == ["active", "left_on_dead"]) ] | length) == '$PG_COUNT) && \
|
||||
($ETCDCTL get --prefix /vitastor/pg/state/ --print-value-only | jq -s -e '([ .[] | select(.state == ["active"]) ] | length) == 32') && \
|
||||
break
|
||||
sleep 1
|
||||
i=$((i+1))
|
||||
if [ $i -eq $sec ]; then
|
||||
if [ $i -eq 60 ]; then
|
||||
format_error "Rebalance couldn't finish in $sec seconds"
|
||||
fi
|
||||
sleep 1
|
||||
i=$((i+1))
|
||||
done
|
||||
}
|
||||
|
||||
@@ -117,14 +117,3 @@ check_qemu()
|
||||
sudo ln -s "$(realpath .)/build/src/block-vitastor.so" /usr/lib/x86_64-linux-gnu/qemu/block-vitastor.so
|
||||
fi
|
||||
}
|
||||
|
||||
check_nbd()
|
||||
{
|
||||
if [[ -d /sys/module/nbd && ! -e /dev/nbd0 ]]; then
|
||||
max_part=$(cat /sys/module/nbd/parameters/max_part)
|
||||
nbds_max=$(cat /sys/module/nbd/parameters/nbds_max)
|
||||
for i in $(seq 1 $nbds_max); do
|
||||
mknod /dev/nbd$((i-1)) b 43 $(((i-1)*(max_part+1)))
|
||||
done
|
||||
fi
|
||||
}
|
||||
|
@@ -14,6 +14,8 @@ SCHEME=ec ./test_change_pg_count.sh
|
||||
|
||||
./test_create_nomaxid.sh
|
||||
|
||||
./test_degraded_delete.sh
|
||||
|
||||
./test_etcd_fail.sh
|
||||
|
||||
./test_failure_domain.sh
|
||||
|
@@ -15,10 +15,10 @@ done
|
||||
|
||||
sleep 2
|
||||
|
||||
for i in {1..30}; do
|
||||
for i in {1..10}; do
|
||||
($ETCDCTL get /vitastor/config/pgs --print-value-only |\
|
||||
jq -s -e '([ .[0].items["1"] | map(.osd_set)[][] ] | sort | unique == ["1","2","3","4"])') && \
|
||||
($ETCDCTL get --prefix /vitastor/pg/state/ --print-value-only | jq -s -e '([ .[] | select(.state == ["active"]) ] | length) == '$PG_COUNT) && \
|
||||
($ETCDCTL get --prefix /vitastor/pg/state/ --print-value-only | jq -s -e '([ .[] | select(.state == ["active"]) ] | length) == '$PG_COUNT'') && \
|
||||
break
|
||||
sleep 1
|
||||
done
|
||||
@@ -28,7 +28,7 @@ if ! ($ETCDCTL get /vitastor/config/pgs --print-value-only |\
|
||||
format_error "FAILED: OSD NOT ADDED INTO DISTRIBUTION"
|
||||
fi
|
||||
|
||||
wait_finish_rebalance 20
|
||||
wait_finish_rebalance 10
|
||||
|
||||
sleep 1
|
||||
kill -9 $OSD4_PID
|
||||
@@ -37,7 +37,7 @@ build/src/vitastor-cli --etcd_address $ETCD_URL rm-osd --force 4
|
||||
|
||||
sleep 2
|
||||
|
||||
for i in {1..30}; do
|
||||
for i in {1..10}; do
|
||||
($ETCDCTL get /vitastor/config/pgs --print-value-only |\
|
||||
jq -s -e '([ .[0].items["1"] | map(.osd_set)[][] ] | sort | unique == ["1","2","3"])') && \
|
||||
($ETCDCTL get --prefix /vitastor/pg/state/ --print-value-only | jq -s -e '([ .[] | select(.state == ["active"] or .state == ["active", "left_on_dead"]) ] | length) == '$PG_COUNT'') && \
|
||||
@@ -50,6 +50,6 @@ if ! ($ETCDCTL get /vitastor/config/pgs --print-value-only |\
|
||||
format_error "FAILED: OSD NOT REMOVED FROM DISTRIBUTION"
|
||||
fi
|
||||
|
||||
wait_finish_rebalance 20
|
||||
wait_finish_rebalance 10
|
||||
|
||||
format_green OK
|
||||
|
131
tests/test_degraded_delete.sh
Executable file
131
tests/test_degraded_delete.sh
Executable file
@@ -0,0 +1,131 @@
|
||||
#!/bin/bash -ex
|
||||
|
||||
# Run 3 OSDs
|
||||
|
||||
. `dirname $0`/run_3osds.sh
|
||||
|
||||
# Write inodes 1 and 2
|
||||
|
||||
LD_PRELOAD="build/src/libfio_vitastor.so" \
|
||||
fio -thread -name=test -ioengine=build/src/libfio_vitastor.so -bs=1M -direct=1 -iodepth=4 \
|
||||
-rw=write -etcd=$ETCD_URL -pool=1 -inode=1 -size=128M -runtime=10
|
||||
|
||||
LD_PRELOAD="build/src/libfio_vitastor.so" \
|
||||
fio -thread -name=test -ioengine=build/src/libfio_vitastor.so -bs=1M -direct=1 -iodepth=4 \
|
||||
-rw=write -etcd=$ETCD_URL -pool=1 -inode=2 -size=128M -runtime=10
|
||||
|
||||
LD_PRELOAD="build/src/libfio_vitastor.so" \
|
||||
fio -thread -name=test -ioengine=build/src/libfio_vitastor.so -bs=4k -direct=1 -iodepth=16 \
|
||||
-rw=randwrite -etcd=$ETCD_URL -pool=1 -inode=1 -size=128M -runtime=10 &>/dev/null &
|
||||
|
||||
sleep 5
|
||||
|
||||
# Stop OSD 1
|
||||
|
||||
kill -INT $OSD1_PID
|
||||
sleep 2
|
||||
|
||||
# Remove inode 2
|
||||
|
||||
build/src/vitastor-cli rm-data --etcd_address $ETCD_URL --pool 1 --inode 2
|
||||
|
||||
# Run 3 more OSDs and move PG to 4,5,6
|
||||
|
||||
for i in $(seq 4 6); do
|
||||
dd if=/dev/zero of=./testdata/test_osd$i.bin bs=1024 count=1 seek=$((OSD_SIZE*1024-1))
|
||||
build/src/vitastor-osd --osd_num $i --bind_address 127.0.0.1 $OSD_ARGS --etcd_address $ETCD_URL $(build/src/vitastor-cli simple-offsets --format options ./testdata/test_osd$i.bin 2>/dev/null) &>./testdata/osd$i.log &
|
||||
eval OSD${i}_PID=$!
|
||||
done
|
||||
|
||||
$ETCDCTL put /vitastor/config/osd/1 '{"reweight":0}'
|
||||
$ETCDCTL put /vitastor/config/osd/2 '{"reweight":0}'
|
||||
$ETCDCTL put /vitastor/config/osd/3 '{"reweight":0}'
|
||||
|
||||
# Wait for rebalance to finish
|
||||
|
||||
wait_finish_rebalance()
|
||||
{
|
||||
local sec=$1
|
||||
local st=$2
|
||||
local i=0
|
||||
while [[ $i -lt $sec ]]; do
|
||||
if $ETCDCTL get --prefix /vitastor/pg/state/ --print-value-only | jq -s -e \
|
||||
'([ .[] | select(.state == ['$st'] and (.peers | contains([1]) | not) and (.peers | contains([2,3]) | not)) ] | length) == '$PG_COUNT; then
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
i=$((i+1))
|
||||
if [ $i -eq $sec ]; then
|
||||
format_error "Rebalance couldn't finish in $sec seconds"
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
wait_finish_rebalance 60 '"active","left_on_dead"'
|
||||
|
||||
# Stop OSD 2,3
|
||||
|
||||
kill -INT $OSD2_PID
|
||||
kill -INT $OSD3_PID
|
||||
sleep 2
|
||||
|
||||
# Verify that PGs are still active
|
||||
|
||||
if ! ($ETCDCTL get /vitastor/pg/state/1/ --prefix --print-value-only | jq -s -e '[ .[] | select(.state == ["active","left_on_dead"]) ] | length == '$PG_COUNT); then
|
||||
format_error "FAILED: $PG_COUNT PG(s) NOT UP"
|
||||
fi
|
||||
|
||||
# Start OSD 1
|
||||
|
||||
build/src/vitastor-osd --osd_num 1 --bind_address 127.0.0.1 $OSD_ARGS --etcd_address $ETCD_URL $(build/src/vitastor-cli simple-offsets --format options ./testdata/test_osd1.bin 2>/dev/null) &>./testdata/osd1.log &
|
||||
OSD1_PID=$!
|
||||
|
||||
# Verify that inode 2 is removed and inode 1 is in place
|
||||
|
||||
wait_repeer_1()
|
||||
{
|
||||
local sec=$1
|
||||
local i=0
|
||||
while [[ $i -lt $sec ]]; do
|
||||
if grep -q 'Repeer because of OSD 1' testdata/osd4.log testdata/osd5.log testdata/osd6.log; then
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
i=$((i+1))
|
||||
if [ $i -eq $sec ]; then
|
||||
format_error "OSD 4/5/6 do not peer with older OSD 1"
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
wait_repeer_1 15
|
||||
|
||||
wait_finish_rebalance 15 '"active"'
|
||||
|
||||
if [ "$SCHEME" = "replicated" ]; then
|
||||
NOBJ=1024
|
||||
else
|
||||
NOBJ=$((1024/(PG_SIZE-1)))
|
||||
fi
|
||||
|
||||
if ! ($ETCDCTL get /vitastor/pg/stats/1/1 --print-value-only | jq -s -e '.[0].object_count == '$NOBJ); then
|
||||
format_error "FAILED: PG SHOULD CONTAIN EXACTLY 128 MB OF DATA, BUT IT DOESN'T"
|
||||
fi
|
||||
|
||||
qemu-img convert -S 4096 -p \
|
||||
-f raw "vitastor:etcd_host=127.0.0.1\:$ETCD_PORT/v3:pool=1:inode=1:size=4096" \
|
||||
-O raw ./testdata/inode1.bin
|
||||
|
||||
qemu-img convert -S 4096 -p \
|
||||
-f raw "vitastor:etcd_host=127.0.0.1\:$ETCD_PORT/v3:pool=1:inode=2:size="$((128*1024*1024)) \
|
||||
-O raw ./testdata/inode2.bin
|
||||
|
||||
if (dd if=/dev/zero bs=4096 count=1 | diff - ./testdata/inode1.bin); then
|
||||
format_error "FAILED: INODE 1 SEEMS LOST"
|
||||
fi
|
||||
|
||||
if ! (dd if=/dev/zero bs=1M count=128 | diff - ./testdata/inode2.bin); then
|
||||
format_error "FAILED: INODE 2 SEEMS RESTORED"
|
||||
fi
|
||||
|
||||
format_green OK
|
@@ -18,7 +18,7 @@ $ETCDCTL put /vitastor/config/pools '{"1":{"name":"testpool","scheme":"replicate
|
||||
cd mon
|
||||
npm install
|
||||
cd ..
|
||||
node mon/mon-main.js --etcd_url $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 --restart_interval 5 &>./testdata/mon.log &
|
||||
node mon/mon-main.js --etcd_url $ETCD_URL --etcd_prefix "/vitastor" &>./testdata/mon.log &
|
||||
MON_PID=$!
|
||||
|
||||
sleep 2
|
||||
|
@@ -4,8 +4,6 @@ OSD_COUNT=7
|
||||
PG_COUNT=32
|
||||
. `dirname $0`/run_3osds.sh
|
||||
|
||||
check_nbd
|
||||
|
||||
IMG_SIZE=256
|
||||
|
||||
$ETCDCTL put /vitastor/config/inode/1/1 '{"name":"testimg","size":'$((IMG_SIZE*1024*1024))'}'
|
||||
|
@@ -14,7 +14,7 @@ for i in $(seq 1 $OSD_COUNT); do
|
||||
eval OSD${i}_PID=$!
|
||||
done
|
||||
|
||||
node mon/mon-main.js --etcd_url $ETCD_URL --etcd_prefix "/vitastor" --verbose 1 --restart_interval 5 &>./testdata/mon.log &
|
||||
node mon/mon-main.js --etcd_url $ETCD_URL --etcd_prefix "/vitastor" &>./testdata/mon.log &
|
||||
MON_PID=$!
|
||||
|
||||
sleep 3
|
||||
|
Reference in New Issue
Block a user