forked from vitalif/vitastor
Compare commits
24 Commits
Author | SHA1 | Date | |
---|---|---|---|
7e6e1a5a82 | |||
435045751d | |||
c5fb1d5987 | |||
9f59381bea | |||
9ac7e75178 | |||
88671cf745 | |||
fe1749c427 | |||
ceb9c28de7 | |||
299d7d7c95 | |||
d1526b415f | |||
f49fd53d55 | |||
dd76eda5e5 | |||
87dbd8fa57 | |||
b44f49aab2 | |||
036555638e | |||
af5155fcd9 | |||
0d2efbecc9 | |||
e62e8b6bae | |||
c4ba24c305 | |||
19e47a0279 | |||
bd178ac20f | |||
7006875a24 | |||
ad577c4aac | |||
836635c518 |
2
debian/changelog
vendored
2
debian/changelog
vendored
@@ -1,4 +1,4 @@
|
||||
vitastor (0.5.7-1) unstable; urgency=medium
|
||||
vitastor (0.5.10-1) unstable; urgency=medium
|
||||
|
||||
* Bugfixes
|
||||
|
||||
|
12
debian/vitastor.Dockerfile
vendored
12
debian/vitastor.Dockerfile
vendored
@@ -40,10 +40,10 @@ RUN set -e -x; \
|
||||
mkdir -p /root/packages/vitastor-$REL; \
|
||||
rm -rf /root/packages/vitastor-$REL/*; \
|
||||
cd /root/packages/vitastor-$REL; \
|
||||
cp -r /root/vitastor vitastor-0.5.7; \
|
||||
ln -s /root/packages/qemu-$REL/qemu-*/ vitastor-0.5.7/qemu; \
|
||||
ln -s /root/fio-build/fio-*/ vitastor-0.5.7/fio; \
|
||||
cd vitastor-0.5.7; \
|
||||
cp -r /root/vitastor vitastor-0.5.10; \
|
||||
ln -s /root/packages/qemu-$REL/qemu-*/ vitastor-0.5.10/qemu; \
|
||||
ln -s /root/fio-build/fio-*/ vitastor-0.5.10/fio; \
|
||||
cd vitastor-0.5.10; \
|
||||
FIO=$(head -n1 fio/debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
|
||||
QEMU=$(head -n1 qemu/debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
|
||||
sh copy-qemu-includes.sh; \
|
||||
@@ -59,8 +59,8 @@ RUN set -e -x; \
|
||||
echo "dep:fio=$FIO" > debian/substvars; \
|
||||
echo "dep:qemu=$QEMU" >> debian/substvars; \
|
||||
cd /root/packages/vitastor-$REL; \
|
||||
tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.5.7.orig.tar.xz vitastor-0.5.7; \
|
||||
cd vitastor-0.5.7; \
|
||||
tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.5.10.orig.tar.xz vitastor-0.5.10; \
|
||||
cd vitastor-0.5.10; \
|
||||
V=$(head -n1 debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
|
||||
DEBFULLNAME="Vitaliy Filippov <vitalif@yourcmc.ru>" dch -D $REL -v "$V""$REL" "Rebuild for $REL"; \
|
||||
DEB_BUILD_OPTIONS=nocheck dpkg-buildpackage --jobs=auto -sa; \
|
||||
|
150
mon/mon.js
150
mon/mon.js
@@ -10,6 +10,14 @@ const stableStringify = require('./stable-stringify.js');
|
||||
const PGUtil = require('./PGUtil.js');
|
||||
|
||||
// FIXME document all etcd keys and config variables in the form of JSON schema or similar
|
||||
const etcd_nonempty_keys = {
|
||||
'config/global': 1,
|
||||
'config/node_placement': 1,
|
||||
'config/pools': 1,
|
||||
'config/pgs': 1,
|
||||
'history/last_clean_pgs': 1,
|
||||
'stats': 1,
|
||||
};
|
||||
const etcd_allow = new RegExp('^'+[
|
||||
'config/global',
|
||||
'config/node_placement',
|
||||
@@ -35,7 +43,7 @@ const etcd_tree = {
|
||||
etcd_mon_retries: 5, // min: 0
|
||||
mon_change_timeout: 1000, // ms. min: 100
|
||||
mon_stats_timeout: 1000, // ms. min: 100
|
||||
osd_out_time: 1800, // seconds. min: 0
|
||||
osd_out_time: 600, // seconds. min: 0
|
||||
placement_levels: { datacenter: 1, rack: 2, host: 3, osd: 4, ... },
|
||||
// client and osd
|
||||
use_sync_send_recv: false,
|
||||
@@ -47,6 +55,8 @@ const etcd_tree = {
|
||||
client_dirty_limit: 33554432,
|
||||
peer_connect_interval: 5, // seconds. min: 1
|
||||
peer_connect_timeout: 5, // seconds. min: 1
|
||||
osd_idle_timeout: 5, // seconds. min: 1
|
||||
osd_ping_timeout: 5, // seconds. min: 1
|
||||
up_wait_retry_interval: 500, // ms. min: 50
|
||||
// osd
|
||||
etcd_report_interval: 30, // min: 10
|
||||
@@ -56,8 +66,12 @@ const etcd_tree = {
|
||||
autosync_interval: 5,
|
||||
client_queue_depth: 128, // unused
|
||||
recovery_queue_depth: 4,
|
||||
recovery_sync_batch: 16,
|
||||
readonly: false,
|
||||
no_recovery: false,
|
||||
no_rebalance: false,
|
||||
print_stats_interval: 3,
|
||||
slow_log_interval: 10,
|
||||
// blockstore - fixed in superblock
|
||||
block_size,
|
||||
disk_alignment,
|
||||
@@ -77,6 +91,7 @@ const etcd_tree = {
|
||||
disable_meta_fsync,
|
||||
disable_device_lock,
|
||||
// blockstore - configurable
|
||||
max_write_iodepth,
|
||||
flusher_count,
|
||||
inmemory_metadata,
|
||||
inmemory_journal,
|
||||
@@ -295,7 +310,7 @@ class Mon
|
||||
this.config.osd_out_time = Number(this.config.osd_out_time) || 0;
|
||||
if (!this.config.osd_out_time)
|
||||
{
|
||||
this.config.osd_out_time = 30*60; // 30 minutes by default
|
||||
this.config.osd_out_time = 600; // 10 minutes by default
|
||||
}
|
||||
}
|
||||
|
||||
@@ -317,8 +332,14 @@ class Mon
|
||||
ok(false);
|
||||
}, this.config.etcd_mon_timeout);
|
||||
this.ws = new WebSocket(base+'/watch');
|
||||
const fail = () =>
|
||||
{
|
||||
ok(false);
|
||||
};
|
||||
this.ws.on('error', fail);
|
||||
this.ws.on('open', () =>
|
||||
{
|
||||
this.ws.removeListener('error', fail);
|
||||
if (timer_id)
|
||||
clearTimeout(timer_id);
|
||||
ok(true);
|
||||
@@ -445,7 +466,7 @@ class Mon
|
||||
async get_lease()
|
||||
{
|
||||
const max_ttl = this.config.etcd_mon_ttl + this.config.etcd_mon_timeout/1000*this.config.etcd_mon_retries;
|
||||
const res = await this.etcd_call('/lease/grant', { TTL: max_ttl }, this.config.etcd_mon_timeout, this.config.etcd_mon_retries);
|
||||
const res = await this.etcd_call('/lease/grant', { TTL: max_ttl }, this.config.etcd_mon_timeout, -1);
|
||||
this.etcd_lease_id = res.ID;
|
||||
setInterval(async () =>
|
||||
{
|
||||
@@ -621,29 +642,51 @@ class Mon
|
||||
return !has_online;
|
||||
}
|
||||
|
||||
reset_rng()
|
||||
{
|
||||
this.seed = 0x5f020e43;
|
||||
}
|
||||
|
||||
rng()
|
||||
{
|
||||
this.seed ^= this.seed << 13;
|
||||
this.seed ^= this.seed >> 17;
|
||||
this.seed ^= this.seed << 5;
|
||||
return this.seed + 2147483648;
|
||||
}
|
||||
|
||||
pick_primary(pool_id, osd_set, up_osds)
|
||||
{
|
||||
let alive_set;
|
||||
if (this.state.config.pools[pool_id].scheme === 'replicated')
|
||||
alive_set = osd_set.filter(osd_num => osd_num && up_osds[osd_num]);
|
||||
else
|
||||
{
|
||||
// Prefer data OSDs for EC because they can actually read something without an additional network hop
|
||||
const pg_data_size = (this.state.config.pools[pool_id].pg_size||0) -
|
||||
(this.state.config.pools[pool_id].parity_chunks||0);
|
||||
alive_set = osd_set.slice(0, pg_data_size).filter(osd_num => osd_num && up_osds[osd_num]);
|
||||
if (!alive_set.length)
|
||||
alive_set = osd_set.filter(osd_num => osd_num && up_osds[osd_num]);
|
||||
}
|
||||
if (!alive_set.length)
|
||||
return 0;
|
||||
return alive_set[this.rng() % alive_set.length];
|
||||
}
|
||||
|
||||
save_new_pgs_txn(request, pool_id, up_osds, prev_pgs, new_pgs, pg_history)
|
||||
{
|
||||
const replicated = new_pgs.length && this.state.config.pools[pool_id].scheme === 'replicated';
|
||||
const pg_minsize = new_pgs.length && this.state.config.pools[pool_id].pg_minsize;
|
||||
const pg_items = {};
|
||||
this.reset_rng();
|
||||
new_pgs.map((osd_set, i) =>
|
||||
{
|
||||
osd_set = osd_set.map(osd_num => osd_num === LPOptimizer.NO_OSD ? 0 : osd_num);
|
||||
let alive_set;
|
||||
if (replicated)
|
||||
alive_set = osd_set.filter(osd_num => osd_num && up_osds[osd_num]);
|
||||
else
|
||||
{
|
||||
// Prefer data OSDs for EC because they can actually read something without an additional network hop
|
||||
alive_set = osd_set.slice(0, pg_minsize).filter(osd_num => osd_num && up_osds[osd_num]);
|
||||
if (!alive_set.length)
|
||||
alive_set = osd_set.filter(osd_num => osd_num && up_osds[osd_num]);
|
||||
}
|
||||
pg_items[i+1] = {
|
||||
osd_set,
|
||||
primary: alive_set.length ? alive_set[Math.floor(Math.random()*alive_set.length)] : 0,
|
||||
primary: this.pick_primary(pool_id, osd_set, up_osds),
|
||||
};
|
||||
if (prev_pgs[i] && prev_pgs[i].join(' ') != osd_set.join(' '))
|
||||
if (prev_pgs[i] && prev_pgs[i].join(' ') != osd_set.join(' ') &&
|
||||
prev_pgs[i].filter(osd_num => osd_num).length > 0)
|
||||
{
|
||||
pg_history[i] = pg_history[i] || {};
|
||||
pg_history[i].osd_sets = pg_history[i].osd_sets || [];
|
||||
@@ -839,14 +882,25 @@ class Mon
|
||||
pool_tree = pool_tree ? pool_tree.children : [];
|
||||
pool_tree = LPOptimizer.flatten_tree(pool_tree, levels, pool_cfg.failure_domain, 'osd');
|
||||
this.filter_osds_by_tags(osd_tree, pool_tree, pool_cfg.osd_tags);
|
||||
const prev_pg_hash = (this.state.history.last_clean_pgs.items||{})[pool_id]
|
||||
|| (this.state.config.pgs.items||{})[pool_id] || {};
|
||||
const prev_pgs = [];
|
||||
for (const pg in prev_pg_hash)
|
||||
// These are for the purpose of building history.osd_sets
|
||||
const real_prev_pgs = [];
|
||||
let pg_history = [];
|
||||
for (const pg in ((this.state.config.pgs.items||{})[pool_id]||{}))
|
||||
{
|
||||
prev_pgs[pg-1] = prev_pg_hash[pg].osd_set;
|
||||
real_prev_pgs[pg-1] = this.state.config.pgs.items[pool_id][pg].osd_set;
|
||||
if (this.state.pg.history[pool_id] &&
|
||||
this.state.pg.history[pool_id][pg])
|
||||
{
|
||||
pg_history[pg-1] = this.state.pg.history[pool_id][pg];
|
||||
}
|
||||
}
|
||||
const pg_history = [];
|
||||
// And these are for the purpose of minimizing data movement
|
||||
let prev_pgs = [];
|
||||
for (const pg in ((this.state.history.last_clean_pgs.items||{})[pool_id]||{}))
|
||||
{
|
||||
prev_pgs[pg-1] = this.state.history.last_clean_pgs.items[pool_id][pg].osd_set;
|
||||
}
|
||||
prev_pgs = JSON.parse(JSON.stringify(prev_pgs.length ? prev_pgs : real_prev_pgs));
|
||||
const old_pg_count = prev_pgs.length;
|
||||
let optimize_result;
|
||||
if (old_pg_count > 0)
|
||||
@@ -859,7 +913,9 @@ class Mon
|
||||
this.schedule_recheck();
|
||||
return;
|
||||
}
|
||||
PGUtil.scale_pg_count(prev_pgs, this.state.pg.history[pool_id]||{}, pg_history, pool_cfg.pg_count);
|
||||
const new_pg_history = [];
|
||||
PGUtil.scale_pg_count(prev_pgs, pg_history, new_pg_history, pool_cfg.pg_count);
|
||||
pg_history = new_pg_history;
|
||||
}
|
||||
for (const pg of prev_pgs)
|
||||
{
|
||||
@@ -903,14 +959,14 @@ class Mon
|
||||
} });
|
||||
}
|
||||
LPOptimizer.print_change_stats(optimize_result);
|
||||
this.save_new_pgs_txn(etcd_request, pool_id, up_osds, prev_pgs, optimize_result.int_pgs, pg_history);
|
||||
this.save_new_pgs_txn(etcd_request, pool_id, up_osds, real_prev_pgs, optimize_result.int_pgs, pg_history);
|
||||
}
|
||||
this.state.config.pgs.hash = tree_hash;
|
||||
await this.save_pg_config(etcd_request);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Nothing changed, but we still want to check for down OSDs
|
||||
// Nothing changed, but we still want to recheck the distribution of primaries
|
||||
let changed = false;
|
||||
for (const pool_id in this.state.config.pools)
|
||||
{
|
||||
@@ -920,22 +976,13 @@ class Mon
|
||||
continue;
|
||||
}
|
||||
const replicated = pool_cfg.scheme === 'replicated';
|
||||
for (const pg_num in ((this.state.config.pgs.items||{})[pool_id]||{})||{})
|
||||
this.reset_rng();
|
||||
for (let pg_num = 1; pg_num <= pool_cfg.pg_count; pg_num++)
|
||||
{
|
||||
const pg_cfg = this.state.config.pgs.items[pool_id][pg_num];
|
||||
if (!Number(pg_cfg.primary) || !up_osds[pg_cfg.primary])
|
||||
if (pg_cfg)
|
||||
{
|
||||
let alive_set;
|
||||
if (replicated)
|
||||
alive_set = pg_cfg.osd_set.filter(osd_num => osd_num && up_osds[osd_num]);
|
||||
else
|
||||
{
|
||||
// Prefer data OSDs for EC because they can actually read something without an additional network hop
|
||||
alive_set = pg_cfg.osd_set.slice(0, pool_cfg.pg_minsize).filter(osd_num => osd_num && up_osds[osd_num]);
|
||||
if (!alive_set.length)
|
||||
alive_set = pg_cfg.osd_set.filter(osd_num => osd_num && up_osds[osd_num]);
|
||||
}
|
||||
const new_primary = alive_set.length ? alive_set[Math.floor(Math.random()*alive_set.length)] : 0;
|
||||
const new_primary = this.pick_primary(pool_id, pg_cfg.osd_set, up_osds);
|
||||
if (pg_cfg.primary != new_primary)
|
||||
{
|
||||
console.log(
|
||||
@@ -1180,16 +1227,20 @@ class Mon
|
||||
console.log('Bad value in etcd: '+kv.key+' = '+kv.value);
|
||||
return;
|
||||
}
|
||||
key = key.split('/');
|
||||
let key_parts = key.split('/');
|
||||
let cur = this.state;
|
||||
for (let i = 0; i < key.length-1; i++)
|
||||
for (let i = 0; i < key_parts.length-1; i++)
|
||||
{
|
||||
cur = (cur[key[i]] = cur[key[i]] || {});
|
||||
cur = (cur[key_parts[i]] = cur[key_parts[i]] || {});
|
||||
}
|
||||
cur[key[key.length-1]] = kv.value;
|
||||
if (key.join('/') === 'config/global')
|
||||
if (etcd_nonempty_keys[key])
|
||||
{
|
||||
// Do not clear these to null
|
||||
kv.value = kv.value || {};
|
||||
}
|
||||
cur[key_parts[key_parts.length-1]] = kv.value;
|
||||
if (key === 'config/global')
|
||||
{
|
||||
this.state.config.global = this.state.config.global || {};
|
||||
this.config = this.state.config.global;
|
||||
this.check_config();
|
||||
for (const osd_num in this.state.osd.stats)
|
||||
@@ -1200,7 +1251,7 @@ class Mon
|
||||
);
|
||||
}
|
||||
}
|
||||
else if (key.join('/') === 'config/pools')
|
||||
else if (key === 'config/pools')
|
||||
{
|
||||
for (const pool_id in this.state.config.pools)
|
||||
{
|
||||
@@ -1209,7 +1260,7 @@ class Mon
|
||||
this.validate_pool_cfg(pool_id, pool_cfg, true);
|
||||
}
|
||||
}
|
||||
else if (key[0] === 'osd' && key[1] === 'stats')
|
||||
else if (key_parts[0] === 'osd' && key_parts[1] === 'stats')
|
||||
{
|
||||
// Recheck PGs <osd_out_time> later
|
||||
this.schedule_next_recheck_at(
|
||||
@@ -1241,6 +1292,11 @@ class Mon
|
||||
console.error('etcd returned error: '+res.json.error);
|
||||
break;
|
||||
}
|
||||
if (this.etcd_urls.length > 1)
|
||||
{
|
||||
// Stick to the same etcd for the rest of calls
|
||||
this.etcd_urls = [ base ];
|
||||
}
|
||||
return res.json;
|
||||
}
|
||||
retry++;
|
||||
|
@@ -48,4 +48,4 @@ FIO=`rpm -qi fio | perl -e 'while(<>) { /^Epoch[\s:]+(\S+)/ && print "$1:"; /^Ve
|
||||
QEMU=`rpm -qi qemu qemu-kvm | perl -e 'while(<>) { /^Epoch[\s:]+(\S+)/ && print "$1:"; /^Version[\s:]+(\S+)/ && print $1; /^Release[\s:]+(\S+)/ && print "-$1"; }'`
|
||||
perl -i -pe 's/(Requires:\s*fio)([^\n]+)?/$1 = '$FIO'/' $VITASTOR/rpm/vitastor-el$EL.spec
|
||||
perl -i -pe 's/(Requires:\s*qemu(?:-kvm)?)([^\n]+)?/$1 = '$QEMU'/' $VITASTOR/rpm/vitastor-el$EL.spec
|
||||
tar --transform 's#^#vitastor-0.5.7/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.5.7$(rpm --eval '%dist').tar.gz *
|
||||
tar --transform 's#^#vitastor-0.5.10/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.5.10$(rpm --eval '%dist').tar.gz *
|
||||
|
@@ -37,7 +37,7 @@ ADD . /root/vitastor
|
||||
RUN set -e; \
|
||||
cd /root/vitastor/rpm; \
|
||||
sh build-tarball.sh; \
|
||||
cp /root/vitastor-0.5.7.el7.tar.gz ~/rpmbuild/SOURCES; \
|
||||
cp /root/vitastor-0.5.10.el7.tar.gz ~/rpmbuild/SOURCES; \
|
||||
cp vitastor-el7.spec ~/rpmbuild/SPECS/vitastor.spec; \
|
||||
cd ~/rpmbuild/SPECS/; \
|
||||
rpmbuild -ba vitastor.spec; \
|
||||
|
@@ -1,11 +1,11 @@
|
||||
Name: vitastor
|
||||
Version: 0.5.7
|
||||
Version: 0.5.10
|
||||
Release: 1%{?dist}
|
||||
Summary: Vitastor, a fast software-defined clustered block storage
|
||||
|
||||
License: Vitastor Network Public License 1.1
|
||||
URL: https://vitastor.io/
|
||||
Source0: vitastor-0.5.7.el7.tar.gz
|
||||
Source0: vitastor-0.5.10.el7.tar.gz
|
||||
|
||||
BuildRequires: liburing-devel >= 0.6
|
||||
BuildRequires: gperftools-devel
|
||||
|
@@ -35,7 +35,7 @@ ADD . /root/vitastor
|
||||
RUN set -e; \
|
||||
cd /root/vitastor/rpm; \
|
||||
sh build-tarball.sh; \
|
||||
cp /root/vitastor-0.5.7.el8.tar.gz ~/rpmbuild/SOURCES; \
|
||||
cp /root/vitastor-0.5.10.el8.tar.gz ~/rpmbuild/SOURCES; \
|
||||
cp vitastor-el8.spec ~/rpmbuild/SPECS/vitastor.spec; \
|
||||
cd ~/rpmbuild/SPECS/; \
|
||||
rpmbuild -ba vitastor.spec; \
|
||||
|
@@ -1,11 +1,11 @@
|
||||
Name: vitastor
|
||||
Version: 0.5.7
|
||||
Version: 0.5.10
|
||||
Release: 1%{?dist}
|
||||
Summary: Vitastor, a fast software-defined clustered block storage
|
||||
|
||||
License: Vitastor Network Public License 1.1
|
||||
URL: https://vitastor.io/
|
||||
Source0: vitastor-0.5.7.el8.tar.gz
|
||||
Source0: vitastor-0.5.10.el8.tar.gz
|
||||
|
||||
BuildRequires: liburing-devel >= 0.6
|
||||
BuildRequires: gperftools-devel
|
||||
|
@@ -13,19 +13,19 @@ allocator::allocator(uint64_t blocks)
|
||||
{
|
||||
throw std::invalid_argument("blocks");
|
||||
}
|
||||
uint64_t p2 = 1, total = 1;
|
||||
uint64_t p2 = 1;
|
||||
total = 0;
|
||||
while (p2 * 64 < blocks)
|
||||
{
|
||||
p2 = p2 * 64;
|
||||
total += p2;
|
||||
p2 = p2 * 64;
|
||||
}
|
||||
total -= p2;
|
||||
total += (blocks+63) / 64;
|
||||
mask = new uint64_t[2 + total];
|
||||
mask = new uint64_t[total];
|
||||
size = free = blocks;
|
||||
last_one_mask = (blocks % 64) == 0
|
||||
? UINT64_MAX
|
||||
: ~(UINT64_MAX << (64 - blocks % 64));
|
||||
: ((1l << (blocks % 64)) - 1);
|
||||
for (uint64_t i = 0; i < total; i++)
|
||||
{
|
||||
mask[i] = 0;
|
||||
@@ -99,6 +99,10 @@ uint64_t allocator::find_free()
|
||||
uint64_t p2 = 1, offset = 0, addr = 0, f, i;
|
||||
while (p2 < size)
|
||||
{
|
||||
if (offset+addr >= total)
|
||||
{
|
||||
return UINT64_MAX;
|
||||
}
|
||||
uint64_t m = mask[offset + addr];
|
||||
for (i = 0, f = 1; i < 64; i++, f <<= 1)
|
||||
{
|
||||
@@ -113,11 +117,6 @@ uint64_t allocator::find_free()
|
||||
return UINT64_MAX;
|
||||
}
|
||||
addr = (addr * 64) | i;
|
||||
if (addr >= size)
|
||||
{
|
||||
// No space
|
||||
return UINT64_MAX;
|
||||
}
|
||||
offset += p2;
|
||||
p2 = p2 * 64;
|
||||
}
|
||||
|
@@ -8,6 +8,7 @@
|
||||
// Hierarchical bitmap allocator
|
||||
class allocator
|
||||
{
|
||||
uint64_t total;
|
||||
uint64_t size;
|
||||
uint64_t free;
|
||||
uint64_t last_one_mask;
|
||||
|
@@ -823,31 +823,34 @@ bool journal_flusher_co::fsync_batch(bool fsync_meta, int wait_base)
|
||||
sync_found:
|
||||
cur_sync->ready_count++;
|
||||
flusher->syncing_flushers++;
|
||||
if (flusher->syncing_flushers >= flusher->flusher_count || !flusher->flush_queue.size())
|
||||
resume_1:
|
||||
if (!cur_sync->state)
|
||||
{
|
||||
// Sync batch is ready. Do it.
|
||||
await_sqe(0);
|
||||
data->iov = { 0 };
|
||||
data->callback = simple_callback_w;
|
||||
my_uring_prep_fsync(sqe, fsync_meta ? bs->meta_fd : bs->data_fd, IORING_FSYNC_DATASYNC);
|
||||
cur_sync->state = 1;
|
||||
wait_count++;
|
||||
resume_1:
|
||||
if (wait_count > 0)
|
||||
if (flusher->syncing_flushers >= flusher->cur_flusher_count || !flusher->flush_queue.size())
|
||||
{
|
||||
// Sync batch is ready. Do it.
|
||||
await_sqe(0);
|
||||
data->iov = { 0 };
|
||||
data->callback = simple_callback_w;
|
||||
my_uring_prep_fsync(sqe, fsync_meta ? bs->meta_fd : bs->data_fd, IORING_FSYNC_DATASYNC);
|
||||
cur_sync->state = 1;
|
||||
wait_count++;
|
||||
resume_2:
|
||||
if (wait_count > 0)
|
||||
{
|
||||
wait_state = 2;
|
||||
return false;
|
||||
}
|
||||
// Sync completed. All previous coroutines waiting for it must be resumed
|
||||
cur_sync->state = 2;
|
||||
bs->ringloop->wakeup();
|
||||
}
|
||||
else
|
||||
{
|
||||
// Wait until someone else sends and completes a sync.
|
||||
wait_state = 1;
|
||||
return false;
|
||||
}
|
||||
// Sync completed. All previous coroutines waiting for it must be resumed
|
||||
cur_sync->state = 2;
|
||||
bs->ringloop->wakeup();
|
||||
}
|
||||
// Wait until someone else sends and completes a sync.
|
||||
resume_2:
|
||||
if (!cur_sync->state)
|
||||
{
|
||||
wait_state = 2;
|
||||
return false;
|
||||
}
|
||||
flusher->syncing_flushers--;
|
||||
cur_sync->ready_count--;
|
||||
|
@@ -105,10 +105,10 @@ void blockstore_impl_t::loop()
|
||||
// has_writes == 1 - some writes in progress
|
||||
// has_writes == 2 - tried to submit some writes, but failed
|
||||
int has_writes = 0, op_idx = 0, new_idx = 0;
|
||||
for (; op_idx < submit_queue.size(); op_idx++)
|
||||
for (; op_idx < submit_queue.size(); op_idx++, new_idx++)
|
||||
{
|
||||
auto op = submit_queue[op_idx];
|
||||
submit_queue[new_idx++] = op;
|
||||
submit_queue[new_idx] = op;
|
||||
// FIXME: This needs some simplification
|
||||
// Writes should not block reads if the ring is not full and reads don't depend on them
|
||||
// In all other cases we should stop submission
|
||||
@@ -301,7 +301,7 @@ void blockstore_impl_t::check_wait(blockstore_op_t *op)
|
||||
}
|
||||
else if (PRIV(op)->wait_for == WAIT_FREE)
|
||||
{
|
||||
if (!data_alloc->get_free_count() && !flusher->is_active())
|
||||
if (!data_alloc->get_free_count() && flusher->is_active())
|
||||
{
|
||||
#ifdef BLOCKSTORE_DEBUG
|
||||
printf("Still waiting for free space on the data device\n");
|
||||
|
@@ -191,8 +191,8 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
|
||||
if (bmp_end > bmp_start)
|
||||
{
|
||||
// fill with zeroes
|
||||
fulfill_read(read_op, fulfilled, bmp_start * bitmap_granularity,
|
||||
bmp_end * bitmap_granularity, (BS_ST_DELETE | BS_ST_STABLE), 0, 0);
|
||||
assert(fulfill_read(read_op, fulfilled, bmp_start * bitmap_granularity,
|
||||
bmp_end * bitmap_granularity, (BS_ST_DELETE | BS_ST_STABLE), 0, 0));
|
||||
}
|
||||
bmp_start = bmp_end;
|
||||
while (clean_entry_bitmap[bmp_end >> 3] & (1 << (bmp_end & 0x7)) && bmp_end < bmp_size)
|
||||
@@ -218,7 +218,7 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
|
||||
else if (fulfilled < read_op->len)
|
||||
{
|
||||
// fill remaining parts with zeroes
|
||||
fulfill_read(read_op, fulfilled, 0, block_size, (BS_ST_DELETE | BS_ST_STABLE), 0, 0);
|
||||
assert(fulfill_read(read_op, fulfilled, 0, block_size, (BS_ST_DELETE | BS_ST_STABLE), 0, 0));
|
||||
}
|
||||
assert(fulfilled == read_op->len);
|
||||
read_op->version = result_version;
|
||||
|
@@ -126,11 +126,8 @@ resume_2:
|
||||
resume_3:
|
||||
if (!disable_journal_fsync)
|
||||
{
|
||||
io_uring_sqe *sqe = get_sqe();
|
||||
if (!sqe)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
io_uring_sqe *sqe;
|
||||
BS_SUBMIT_GET_SQE_DECL(sqe);
|
||||
ring_data_t *data = ((ring_data_t*)sqe->user_data);
|
||||
my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC);
|
||||
data->iov = { 0 };
|
||||
|
@@ -150,11 +150,8 @@ resume_2:
|
||||
resume_3:
|
||||
if (!disable_journal_fsync)
|
||||
{
|
||||
io_uring_sqe *sqe = get_sqe();
|
||||
if (!sqe)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
io_uring_sqe *sqe;
|
||||
BS_SUBMIT_GET_SQE_DECL(sqe);
|
||||
ring_data_t *data = ((ring_data_t*)sqe->user_data);
|
||||
my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC);
|
||||
data->iov = { 0 };
|
||||
|
@@ -401,11 +401,7 @@ int blockstore_impl_t::continue_write(blockstore_op_t *op)
|
||||
goto resume_4;
|
||||
resume_2:
|
||||
// Only for the immediate_commit mode: prepare and submit big_write journal entry
|
||||
sqe = get_sqe();
|
||||
if (!sqe)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
BS_SUBMIT_GET_SQE_DECL(sqe);
|
||||
je = (journal_entry_big_write*)prefill_single_journal_entry(
|
||||
journal, op->opcode == BS_OP_WRITE_STABLE ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE,
|
||||
sizeof(journal_entry_big_write)
|
||||
|
@@ -8,13 +8,11 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
|
||||
{
|
||||
this->ringloop = ringloop;
|
||||
this->tfd = tfd;
|
||||
|
||||
log_level = config["log_level"].int64_value();
|
||||
this->config = config;
|
||||
|
||||
msgr.osd_num = 0;
|
||||
msgr.tfd = tfd;
|
||||
msgr.ringloop = ringloop;
|
||||
msgr.log_level = log_level;
|
||||
msgr.repeer_pgs = [this](osd_num_t peer_osd)
|
||||
{
|
||||
if (msgr.osd_peer_fds.find(peer_osd) != msgr.osd_peer_fds.end())
|
||||
@@ -67,8 +65,7 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
|
||||
msgr.stop_client(op->peer_fd);
|
||||
delete op;
|
||||
};
|
||||
msgr.use_sync_send_recv = config["use_sync_send_recv"].bool_value() ||
|
||||
config["use_sync_send_recv"].uint64_value();
|
||||
msgr.init();
|
||||
|
||||
st_cli.tfd = tfd;
|
||||
st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); };
|
||||
@@ -185,16 +182,8 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config)
|
||||
{
|
||||
up_wait_retry_interval = 50;
|
||||
}
|
||||
msgr.peer_connect_interval = config["peer_connect_interval"].uint64_value();
|
||||
if (!msgr.peer_connect_interval)
|
||||
{
|
||||
msgr.peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL;
|
||||
}
|
||||
msgr.peer_connect_timeout = config["peer_connect_timeout"].uint64_value();
|
||||
if (!msgr.peer_connect_timeout)
|
||||
{
|
||||
msgr.peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
|
||||
}
|
||||
msgr.parse_config(config);
|
||||
msgr.parse_config(this->config);
|
||||
st_cli.load_pgs();
|
||||
}
|
||||
|
||||
|
@@ -82,6 +82,7 @@ class cluster_client_t
|
||||
public:
|
||||
etcd_state_client_t st_cli;
|
||||
osd_messenger_t msgr;
|
||||
json11::Json config;
|
||||
|
||||
cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config);
|
||||
~cluster_client_t();
|
||||
|
@@ -56,6 +56,23 @@ void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int t
|
||||
http_request_json(tfd, etcd_address, req, timeout, callback);
|
||||
}
|
||||
|
||||
void etcd_state_client_t::add_etcd_url(std::string addr)
|
||||
{
|
||||
if (addr.length() > 0)
|
||||
{
|
||||
if (strtolower(addr.substr(0, 7)) == "http://")
|
||||
addr = addr.substr(7);
|
||||
else if (strtolower(addr.substr(0, 8)) == "https://")
|
||||
{
|
||||
printf("HTTPS is unsupported for etcd. Either use plain HTTP or setup a local proxy for etcd interaction\n");
|
||||
exit(1);
|
||||
}
|
||||
if (addr.find('/') < 0)
|
||||
addr += "/v3";
|
||||
this->etcd_addresses.push_back(addr);
|
||||
}
|
||||
}
|
||||
|
||||
void etcd_state_client_t::parse_config(json11::Json & config)
|
||||
{
|
||||
this->etcd_addresses.clear();
|
||||
@@ -65,13 +82,7 @@ void etcd_state_client_t::parse_config(json11::Json & config)
|
||||
while (1)
|
||||
{
|
||||
int pos = ea.find(',');
|
||||
std::string addr = pos >= 0 ? ea.substr(0, pos) : ea;
|
||||
if (addr.length() > 0)
|
||||
{
|
||||
if (addr.find('/') < 0)
|
||||
addr += "/v3";
|
||||
this->etcd_addresses.push_back(addr);
|
||||
}
|
||||
add_etcd_url(pos >= 0 ? ea.substr(0, pos) : ea);
|
||||
if (pos >= 0)
|
||||
ea = ea.substr(pos+1);
|
||||
else
|
||||
@@ -82,13 +93,7 @@ void etcd_state_client_t::parse_config(json11::Json & config)
|
||||
{
|
||||
for (auto & ea: config["etcd_address"].array_items())
|
||||
{
|
||||
std::string addr = ea.string_value();
|
||||
if (addr != "")
|
||||
{
|
||||
if (addr.find('/') < 0)
|
||||
addr += "/v3";
|
||||
this->etcd_addresses.push_back(addr);
|
||||
}
|
||||
add_etcd_url(ea.string_value());
|
||||
}
|
||||
}
|
||||
this->etcd_prefix = config["etcd_prefix"].string_value();
|
||||
|
@@ -54,6 +54,9 @@ struct pool_config_t
|
||||
|
||||
struct etcd_state_client_t
|
||||
{
|
||||
protected:
|
||||
void add_etcd_url(std::string);
|
||||
public:
|
||||
std::vector<std::string> etcd_addresses;
|
||||
std::string etcd_prefix;
|
||||
int log_level = 0;
|
||||
|
@@ -117,8 +117,15 @@ static struct fio_option options[] = {
|
||||
|
||||
static int sec_setup(struct thread_data *td)
|
||||
{
|
||||
sec_options *o = (sec_options*)td->eo;
|
||||
sec_data *bsd;
|
||||
|
||||
if (!o->etcd_host)
|
||||
{
|
||||
td_verror(td, EINVAL, "etcd address is missing");
|
||||
return 1;
|
||||
}
|
||||
|
||||
bsd = new sec_data;
|
||||
if (!bsd)
|
||||
{
|
||||
|
@@ -22,7 +22,6 @@
|
||||
#define READ_BUFFER_SIZE 9000
|
||||
|
||||
static int extract_port(std::string & host);
|
||||
static std::string strtolower(const std::string & in);
|
||||
static std::string trim(const std::string & in);
|
||||
static std::string ws_format_frame(int type, uint64_t size);
|
||||
static bool ws_parse_frame(std::string & buf, int & type, std::string & res);
|
||||
@@ -673,7 +672,7 @@ static int extract_port(std::string & host)
|
||||
return port;
|
||||
}
|
||||
|
||||
static std::string strtolower(const std::string & in)
|
||||
std::string strtolower(const std::string & in)
|
||||
{
|
||||
std::string s = in;
|
||||
for (int i = 0; i < s.length(); i++)
|
||||
|
@@ -49,6 +49,8 @@ std::vector<std::string> getifaddr_list(bool include_v6 = false);
|
||||
|
||||
uint64_t stoull_full(const std::string & str, int base = 10);
|
||||
|
||||
std::string strtolower(const std::string & in);
|
||||
|
||||
void http_request(timerfd_manager_t *tfd, const std::string & host, const std::string & request,
|
||||
const http_options_t & options, std::function<void(const http_response_t *response)> callback);
|
||||
|
||||
|
@@ -26,14 +26,106 @@ osd_op_t::~osd_op_t()
|
||||
}
|
||||
}
|
||||
|
||||
void osd_messenger_t::init()
|
||||
{
|
||||
keepalive_timer_id = tfd->set_timer(1000, true, [this](int)
|
||||
{
|
||||
for (auto cl_it = clients.begin(); cl_it != clients.end();)
|
||||
{
|
||||
auto cl = (cl_it++)->second;
|
||||
if (!cl->osd_num)
|
||||
{
|
||||
// Do not run keepalive on regular clients
|
||||
continue;
|
||||
}
|
||||
if (cl->ping_time_remaining > 0)
|
||||
{
|
||||
cl->ping_time_remaining--;
|
||||
if (!cl->ping_time_remaining)
|
||||
{
|
||||
// Ping timed out, stop the client
|
||||
stop_client(cl->peer_fd, true);
|
||||
}
|
||||
}
|
||||
else if (cl->idle_time_remaining > 0)
|
||||
{
|
||||
cl->idle_time_remaining--;
|
||||
if (!cl->idle_time_remaining)
|
||||
{
|
||||
// Connection is idle for <osd_idle_time>, send ping
|
||||
osd_op_t *op = new osd_op_t();
|
||||
op->op_type = OSD_OP_OUT;
|
||||
op->peer_fd = cl->peer_fd;
|
||||
op->req = (osd_any_op_t){
|
||||
.hdr = {
|
||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||
.id = this->next_subop_id++,
|
||||
.opcode = OSD_OP_PING,
|
||||
},
|
||||
};
|
||||
op->callback = [this, cl](osd_op_t *op)
|
||||
{
|
||||
int fail_fd = (op->reply.hdr.retval != 0 ? op->peer_fd : -1);
|
||||
cl->ping_time_remaining = 0;
|
||||
delete op;
|
||||
if (fail_fd >= 0)
|
||||
{
|
||||
stop_client(fail_fd, true);
|
||||
}
|
||||
};
|
||||
outbox_push(op);
|
||||
cl->ping_time_remaining = osd_ping_timeout;
|
||||
cl->idle_time_remaining = osd_idle_timeout;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
cl->idle_time_remaining = osd_idle_timeout;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
osd_messenger_t::~osd_messenger_t()
|
||||
{
|
||||
if (keepalive_timer_id >= 0)
|
||||
{
|
||||
tfd->clear_timer(keepalive_timer_id);
|
||||
keepalive_timer_id = -1;
|
||||
}
|
||||
while (clients.size() > 0)
|
||||
{
|
||||
stop_client(clients.begin()->first, true);
|
||||
}
|
||||
}
|
||||
|
||||
void osd_messenger_t::parse_config(const json11::Json & config)
|
||||
{
|
||||
this->use_sync_send_recv = config["use_sync_send_recv"].bool_value() ||
|
||||
config["use_sync_send_recv"].uint64_value();
|
||||
this->peer_connect_interval = config["peer_connect_interval"].uint64_value();
|
||||
if (!this->peer_connect_interval)
|
||||
{
|
||||
this->peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL;
|
||||
}
|
||||
this->peer_connect_timeout = config["peer_connect_timeout"].uint64_value();
|
||||
if (!this->peer_connect_timeout)
|
||||
{
|
||||
this->peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
|
||||
}
|
||||
this->osd_idle_timeout = config["osd_idle_timeout"].uint64_value();
|
||||
if (!this->osd_idle_timeout)
|
||||
{
|
||||
this->osd_idle_timeout = DEFAULT_OSD_PING_TIMEOUT;
|
||||
}
|
||||
this->osd_ping_timeout = config["osd_ping_timeout"].uint64_value();
|
||||
if (!this->osd_ping_timeout)
|
||||
{
|
||||
this->osd_ping_timeout = DEFAULT_OSD_PING_TIMEOUT;
|
||||
}
|
||||
this->log_level = config["log_level"].uint64_value();
|
||||
}
|
||||
|
||||
void osd_messenger_t::connect_peer(uint64_t peer_osd, json11::Json peer_state)
|
||||
{
|
||||
if (wanted_peers.find(peer_osd) == wanted_peers.end())
|
||||
|
@@ -34,6 +34,7 @@
|
||||
|
||||
#define DEFAULT_PEER_CONNECT_INTERVAL 5
|
||||
#define DEFAULT_PEER_CONNECT_TIMEOUT 5
|
||||
#define DEFAULT_OSD_PING_TIMEOUT 5
|
||||
|
||||
// Kind of a vector with small-list-optimisation
|
||||
struct osd_op_buf_list_t
|
||||
@@ -198,6 +199,8 @@ struct osd_client_t
|
||||
int peer_fd;
|
||||
int peer_state;
|
||||
int connect_timeout_id = -1;
|
||||
int ping_time_remaining = 0;
|
||||
int idle_time_remaining = 0;
|
||||
osd_num_t osd_num = 0;
|
||||
|
||||
void *in_buf = NULL;
|
||||
@@ -251,6 +254,7 @@ struct osd_messenger_t
|
||||
{
|
||||
timerfd_manager_t *tfd;
|
||||
ring_loop_t *ringloop;
|
||||
int keepalive_timer_id = -1;
|
||||
|
||||
// osd_num_t is only for logging and asserts
|
||||
osd_num_t osd_num;
|
||||
@@ -258,6 +262,8 @@ struct osd_messenger_t
|
||||
int receive_buffer_size = 64*1024;
|
||||
int peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL;
|
||||
int peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
|
||||
int osd_idle_timeout = DEFAULT_OSD_PING_TIMEOUT;
|
||||
int osd_ping_timeout = DEFAULT_OSD_PING_TIMEOUT;
|
||||
int log_level = 0;
|
||||
bool use_sync_send_recv = false;
|
||||
|
||||
@@ -274,6 +280,8 @@ struct osd_messenger_t
|
||||
osd_op_stats_t stats;
|
||||
|
||||
public:
|
||||
void init();
|
||||
void parse_config(const json11::Json & config);
|
||||
void connect_peer(uint64_t osd_num, json11::Json peer_state);
|
||||
void stop_client(int peer_fd, bool force = false);
|
||||
void outbox_push(osd_op_t *cur_op);
|
||||
|
28
src/osd.cpp
28
src/osd.cpp
@@ -37,6 +37,7 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
|
||||
c_cli.ringloop = this->ringloop;
|
||||
c_cli.exec_op = [this](osd_op_t *op) { exec_op(op); };
|
||||
c_cli.repeer_pgs = [this](osd_num_t peer_osd) { repeer_pgs(peer_osd); };
|
||||
c_cli.init();
|
||||
|
||||
init_cluster();
|
||||
|
||||
@@ -55,6 +56,7 @@ void osd_t::parse_config(blockstore_config_t & config)
|
||||
{
|
||||
if (config.find("log_level") == config.end())
|
||||
config["log_level"] = "1";
|
||||
log_level = strtoull(config["log_level"].c_str(), NULL, 10);
|
||||
// Initial startup configuration
|
||||
json11::Json json_config = json11::Json(config);
|
||||
st_cli.parse_config(json_config);
|
||||
@@ -66,6 +68,8 @@ void osd_t::parse_config(blockstore_config_t & config)
|
||||
throw std::runtime_error("osd_num is required in the configuration");
|
||||
c_cli.osd_num = osd_num;
|
||||
run_primary = config["run_primary"] != "false" && config["run_primary"] != "0" && config["run_primary"] != "no";
|
||||
no_rebalance = config["no_rebalance"] == "true" || config["no_rebalance"] == "1" || config["no_rebalance"] == "yes";
|
||||
no_recovery = config["no_recovery"] == "true" || config["no_recovery"] == "1" || config["no_recovery"] == "yes";
|
||||
// Cluster configuration
|
||||
bind_address = config["bind_address"];
|
||||
if (bind_address == "")
|
||||
@@ -92,6 +96,9 @@ void osd_t::parse_config(blockstore_config_t & config)
|
||||
recovery_queue_depth = strtoull(config["recovery_queue_depth"].c_str(), NULL, 10);
|
||||
if (recovery_queue_depth < 1 || recovery_queue_depth > MAX_RECOVERY_QUEUE)
|
||||
recovery_queue_depth = DEFAULT_RECOVERY_QUEUE;
|
||||
recovery_sync_batch = strtoull(config["recovery_sync_batch"].c_str(), NULL, 10);
|
||||
if (recovery_sync_batch < 1 || recovery_sync_batch > MAX_RECOVERY_QUEUE)
|
||||
recovery_sync_batch = DEFAULT_RECOVERY_BATCH;
|
||||
if (config["readonly"] == "true" || config["readonly"] == "1" || config["readonly"] == "yes")
|
||||
readonly = true;
|
||||
print_stats_interval = strtoull(config["print_stats_interval"].c_str(), NULL, 10);
|
||||
@@ -100,14 +107,7 @@ void osd_t::parse_config(blockstore_config_t & config)
|
||||
slow_log_interval = strtoull(config["slow_log_interval"].c_str(), NULL, 10);
|
||||
if (!slow_log_interval)
|
||||
slow_log_interval = 10;
|
||||
c_cli.peer_connect_interval = strtoull(config["peer_connect_interval"].c_str(), NULL, 10);
|
||||
if (!c_cli.peer_connect_interval)
|
||||
c_cli.peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL;
|
||||
c_cli.peer_connect_timeout = strtoull(config["peer_connect_timeout"].c_str(), NULL, 10);
|
||||
if (!c_cli.peer_connect_timeout)
|
||||
c_cli.peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
|
||||
log_level = strtoull(config["log_level"].c_str(), NULL, 10);
|
||||
c_cli.log_level = log_level;
|
||||
c_cli.parse_config(json_config);
|
||||
}
|
||||
|
||||
void osd_t::bind_socket()
|
||||
@@ -211,6 +211,12 @@ void osd_t::exec_op(osd_op_t *cur_op)
|
||||
finish_op(cur_op, -EINVAL);
|
||||
return;
|
||||
}
|
||||
if (cur_op->req.hdr.opcode == OSD_OP_PING)
|
||||
{
|
||||
// Pong
|
||||
finish_op(cur_op, 0);
|
||||
return;
|
||||
}
|
||||
if (readonly &&
|
||||
cur_op->req.hdr.opcode != OSD_OP_SEC_READ &&
|
||||
cur_op->req.hdr.opcode != OSD_OP_SEC_LIST &&
|
||||
@@ -261,9 +267,9 @@ void osd_t::reset_stats()
|
||||
|
||||
void osd_t::print_stats()
|
||||
{
|
||||
for (int i = 0; i <= OSD_OP_MAX; i++)
|
||||
for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++)
|
||||
{
|
||||
if (c_cli.stats.op_stat_count[i] != prev_stats.op_stat_count[i])
|
||||
if (c_cli.stats.op_stat_count[i] != prev_stats.op_stat_count[i] && i != OSD_OP_PING)
|
||||
{
|
||||
uint64_t avg = (c_cli.stats.op_stat_sum[i] - prev_stats.op_stat_sum[i])/(c_cli.stats.op_stat_count[i] - prev_stats.op_stat_count[i]);
|
||||
uint64_t bw = (c_cli.stats.op_stat_bytes[i] - prev_stats.op_stat_bytes[i]) / print_stats_interval;
|
||||
@@ -284,7 +290,7 @@ void osd_t::print_stats()
|
||||
prev_stats.op_stat_bytes[i] = c_cli.stats.op_stat_bytes[i];
|
||||
}
|
||||
}
|
||||
for (int i = 0; i <= OSD_OP_MAX; i++)
|
||||
for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++)
|
||||
{
|
||||
if (c_cli.stats.subop_stat_count[i] != prev_stats.subop_stat_count[i])
|
||||
{
|
||||
|
@@ -37,6 +37,7 @@
|
||||
#define DEFAULT_AUTOSYNC_INTERVAL 5
|
||||
#define MAX_RECOVERY_QUEUE 2048
|
||||
#define DEFAULT_RECOVERY_QUEUE 4
|
||||
#define DEFAULT_RECOVERY_BATCH 16
|
||||
|
||||
//#define OSD_STUB
|
||||
|
||||
@@ -64,6 +65,8 @@ class osd_t
|
||||
bool readonly = false;
|
||||
osd_num_t osd_num = 1; // OSD numbers start with 1
|
||||
bool run_primary = false;
|
||||
bool no_rebalance = false;
|
||||
bool no_recovery = false;
|
||||
std::string bind_address;
|
||||
int bind_port, listen_backlog;
|
||||
// FIXME: Implement client queue depth limit
|
||||
@@ -74,6 +77,7 @@ class osd_t
|
||||
int immediate_commit = IMMEDIATE_NONE;
|
||||
int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // sync every 5 seconds
|
||||
int recovery_queue_depth = DEFAULT_RECOVERY_QUEUE;
|
||||
int recovery_sync_batch = DEFAULT_RECOVERY_BATCH;
|
||||
int log_level = 0;
|
||||
|
||||
// cluster state
|
||||
@@ -95,9 +99,11 @@ class osd_t
|
||||
std::map<pool_pg_num_t, pg_t> pgs;
|
||||
std::set<pool_pg_num_t> dirty_pgs;
|
||||
std::set<osd_num_t> dirty_osds;
|
||||
int copies_to_delete_after_sync_count = 0;
|
||||
uint64_t misplaced_objects = 0, degraded_objects = 0, incomplete_objects = 0;
|
||||
int peering_state = 0;
|
||||
std::map<object_id, osd_recovery_op_t> recovery_ops;
|
||||
int recovery_done = 0;
|
||||
osd_op_t *autosync_op = NULL;
|
||||
|
||||
// Unstable writes
|
||||
@@ -199,6 +205,7 @@ class osd_t
|
||||
void pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval);
|
||||
void submit_primary_subops(int submit_type, uint64_t op_version, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op);
|
||||
void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, uint64_t set_size, pg_osd_set_t & loc_set);
|
||||
void submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_delete, int chunks_to_delete_count);
|
||||
void submit_primary_sync_subops(osd_op_t *cur_op);
|
||||
void submit_primary_stab_subops(osd_op_t *cur_op);
|
||||
|
||||
|
@@ -209,32 +209,38 @@ void osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t
|
||||
|
||||
bool osd_t::pick_next_recovery(osd_recovery_op_t &op)
|
||||
{
|
||||
for (auto pg_it = pgs.begin(); pg_it != pgs.end(); pg_it++)
|
||||
if (!no_recovery)
|
||||
{
|
||||
if ((pg_it->second.state & (PG_ACTIVE | PG_HAS_DEGRADED)) == (PG_ACTIVE | PG_HAS_DEGRADED))
|
||||
for (auto pg_it = pgs.begin(); pg_it != pgs.end(); pg_it++)
|
||||
{
|
||||
for (auto obj_it = pg_it->second.degraded_objects.begin(); obj_it != pg_it->second.degraded_objects.end(); obj_it++)
|
||||
if ((pg_it->second.state & (PG_ACTIVE | PG_HAS_DEGRADED)) == (PG_ACTIVE | PG_HAS_DEGRADED))
|
||||
{
|
||||
if (recovery_ops.find(obj_it->first) == recovery_ops.end())
|
||||
for (auto obj_it = pg_it->second.degraded_objects.begin(); obj_it != pg_it->second.degraded_objects.end(); obj_it++)
|
||||
{
|
||||
op.degraded = true;
|
||||
op.oid = obj_it->first;
|
||||
return true;
|
||||
if (recovery_ops.find(obj_it->first) == recovery_ops.end())
|
||||
{
|
||||
op.degraded = true;
|
||||
op.oid = obj_it->first;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for (auto pg_it = pgs.begin(); pg_it != pgs.end(); pg_it++)
|
||||
if (!no_rebalance)
|
||||
{
|
||||
if ((pg_it->second.state & (PG_ACTIVE | PG_HAS_MISPLACED)) == (PG_ACTIVE | PG_HAS_MISPLACED))
|
||||
for (auto pg_it = pgs.begin(); pg_it != pgs.end(); pg_it++)
|
||||
{
|
||||
for (auto obj_it = pg_it->second.misplaced_objects.begin(); obj_it != pg_it->second.misplaced_objects.end(); obj_it++)
|
||||
if ((pg_it->second.state & (PG_ACTIVE | PG_HAS_MISPLACED)) == (PG_ACTIVE | PG_HAS_MISPLACED))
|
||||
{
|
||||
if (recovery_ops.find(obj_it->first) == recovery_ops.end())
|
||||
for (auto obj_it = pg_it->second.misplaced_objects.begin(); obj_it != pg_it->second.misplaced_objects.end(); obj_it++)
|
||||
{
|
||||
op.degraded = false;
|
||||
op.oid = obj_it->first;
|
||||
return true;
|
||||
if (recovery_ops.find(obj_it->first) == recovery_ops.end())
|
||||
{
|
||||
op.degraded = false;
|
||||
op.oid = obj_it->first;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -264,7 +270,6 @@ void osd_t::submit_recovery_op(osd_recovery_op_t *op)
|
||||
}
|
||||
op->osd_op->callback = [this, op](osd_op_t *osd_op)
|
||||
{
|
||||
// Don't sync the write, it will be synced by our regular sync coroutine
|
||||
if (osd_op->reply.hdr.retval < 0)
|
||||
{
|
||||
// Error recovering object
|
||||
@@ -286,6 +291,17 @@ void osd_t::submit_recovery_op(osd_recovery_op_t *op)
|
||||
op->osd_op = NULL;
|
||||
recovery_ops.erase(op->oid);
|
||||
delete osd_op;
|
||||
if (immediate_commit != IMMEDIATE_ALL)
|
||||
{
|
||||
recovery_done++;
|
||||
if (recovery_done >= recovery_sync_batch)
|
||||
{
|
||||
// Force sync every <recovery_sync_batch> operations
|
||||
// This is required not to pile up an excessive amount of delete operations
|
||||
autosync();
|
||||
recovery_done = 0;
|
||||
}
|
||||
}
|
||||
continue_recovery();
|
||||
};
|
||||
exec_op(op->osd_op);
|
||||
|
@@ -19,4 +19,5 @@ const char* osd_op_names[] = {
|
||||
"primary_write",
|
||||
"primary_sync",
|
||||
"primary_delete",
|
||||
"ping",
|
||||
};
|
||||
|
@@ -27,7 +27,8 @@
|
||||
#define OSD_OP_WRITE 12
|
||||
#define OSD_OP_SYNC 13
|
||||
#define OSD_OP_DELETE 14
|
||||
#define OSD_OP_MAX 14
|
||||
#define OSD_OP_PING 15
|
||||
#define OSD_OP_MAX 15
|
||||
// Alignment & limit for read/write operations
|
||||
#ifndef MEM_ALIGNMENT
|
||||
#define MEM_ALIGNMENT 512
|
||||
|
@@ -103,6 +103,8 @@ void osd_t::reset_pg(pg_t & pg)
|
||||
{
|
||||
pg.cur_peers.clear();
|
||||
pg.state_dict.clear();
|
||||
copies_to_delete_after_sync_count -= pg.copies_to_delete_after_sync.size();
|
||||
pg.copies_to_delete_after_sync.clear();
|
||||
incomplete_objects -= pg.incomplete_objects.size();
|
||||
misplaced_objects -= pg.misplaced_objects.size();
|
||||
degraded_objects -= pg.degraded_objects.size();
|
||||
@@ -180,13 +182,18 @@ void osd_t::start_pg_peering(pg_t & pg)
|
||||
// (PG history is kept up to the latest active+clean state)
|
||||
for (auto & history_set: pg.target_history)
|
||||
{
|
||||
bool found = false;
|
||||
bool found = true;
|
||||
for (auto history_osd: history_set)
|
||||
{
|
||||
if (history_osd != 0 && c_cli.osd_peer_fds.find(history_osd) != c_cli.osd_peer_fds.end())
|
||||
if (history_osd != 0)
|
||||
{
|
||||
found = true;
|
||||
break;
|
||||
found = false;
|
||||
if (history_osd == this->osd_num ||
|
||||
c_cli.osd_peer_fds.find(history_osd) != c_cli.osd_peer_fds.end())
|
||||
{
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!found)
|
||||
|
@@ -56,6 +56,13 @@ struct obj_piece_id_t
|
||||
uint64_t osd_num;
|
||||
};
|
||||
|
||||
struct obj_ver_osd_t
|
||||
{
|
||||
uint64_t osd_num;
|
||||
object_id oid;
|
||||
uint64_t version;
|
||||
};
|
||||
|
||||
struct flush_action_t
|
||||
{
|
||||
bool rollback = false, make_stable = false;
|
||||
@@ -101,6 +108,7 @@ struct pg_t
|
||||
std::map<pg_osd_set_t, pg_osd_set_state_t> state_dict;
|
||||
btree::btree_map<object_id, pg_osd_set_state_t*> incomplete_objects, misplaced_objects, degraded_objects;
|
||||
std::map<obj_piece_id_t, flush_action_t> flush_actions;
|
||||
std::vector<obj_ver_osd_t> copies_to_delete_after_sync;
|
||||
btree::btree_map<object_id, uint64_t> ver_override;
|
||||
pg_peering_state_t *peering_state = NULL;
|
||||
pg_flush_batch_t *flush_batch = NULL;
|
||||
|
@@ -367,17 +367,44 @@ resume_7:
|
||||
}
|
||||
// Any kind of a non-clean object can have extra chunks, because we don't record objects
|
||||
// as degraded & misplaced or incomplete & misplaced at the same time. So try to remove extra chunks
|
||||
submit_primary_del_subops(cur_op, pg.cur_set.data(), pg.pg_size, op_data->object_state->osd_set);
|
||||
if (op_data->n_subops > 0)
|
||||
if (immediate_commit != IMMEDIATE_ALL)
|
||||
{
|
||||
resume_8:
|
||||
op_data->st = 8;
|
||||
return;
|
||||
resume_9:
|
||||
if (op_data->errors > 0)
|
||||
// We can't remove extra chunks yet if fsyncs are explicit, because
|
||||
// new copies may not be committed to stable storage yet
|
||||
// We can only remove extra chunks after a successful SYNC for this PG
|
||||
for (auto & chunk: op_data->object_state->osd_set)
|
||||
{
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
// Check is the same as in submit_primary_del_subops()
|
||||
if (op_data->scheme == POOL_SCHEME_REPLICATED
|
||||
? !contains_osd(pg.cur_set.data(), pg.pg_size, chunk.osd_num)
|
||||
: (chunk.osd_num != pg.cur_set[chunk.role]))
|
||||
{
|
||||
pg.copies_to_delete_after_sync.push_back((obj_ver_osd_t){
|
||||
.osd_num = chunk.osd_num,
|
||||
.oid = {
|
||||
.inode = op_data->oid.inode,
|
||||
.stripe = op_data->oid.stripe | (op_data->scheme == POOL_SCHEME_REPLICATED ? 0 : chunk.role),
|
||||
},
|
||||
.version = op_data->fact_ver,
|
||||
});
|
||||
copies_to_delete_after_sync_count++;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
submit_primary_del_subops(cur_op, pg.cur_set.data(), pg.pg_size, op_data->object_state->osd_set);
|
||||
if (op_data->n_subops > 0)
|
||||
{
|
||||
resume_8:
|
||||
op_data->st = 8;
|
||||
return;
|
||||
resume_9:
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Clear object state
|
||||
@@ -511,6 +538,8 @@ void osd_t::continue_primary_sync(osd_op_t *cur_op)
|
||||
else if (op_data->st == 4) goto resume_4;
|
||||
else if (op_data->st == 5) goto resume_5;
|
||||
else if (op_data->st == 6) goto resume_6;
|
||||
else if (op_data->st == 7) goto resume_7;
|
||||
else if (op_data->st == 8) goto resume_8;
|
||||
assert(op_data->st == 0);
|
||||
if (syncs_in_progress.size() > 0)
|
||||
{
|
||||
@@ -572,11 +601,34 @@ resume_2:
|
||||
this->unstable_writes.clear();
|
||||
}
|
||||
{
|
||||
void *dirty_buf = malloc_or_die(sizeof(pool_pg_num_t)*dirty_pgs.size() + sizeof(osd_num_t)*dirty_osds.size());
|
||||
void *dirty_buf = malloc_or_die(
|
||||
sizeof(pool_pg_num_t)*dirty_pgs.size() +
|
||||
sizeof(osd_num_t)*dirty_osds.size() +
|
||||
sizeof(obj_ver_osd_t)*this->copies_to_delete_after_sync_count
|
||||
);
|
||||
op_data->dirty_pgs = (pool_pg_num_t*)dirty_buf;
|
||||
op_data->dirty_osds = (osd_num_t*)(dirty_buf + sizeof(pool_pg_num_t)*dirty_pgs.size());
|
||||
op_data->dirty_pg_count = dirty_pgs.size();
|
||||
op_data->dirty_osd_count = dirty_osds.size();
|
||||
if (this->copies_to_delete_after_sync_count)
|
||||
{
|
||||
op_data->copies_to_delete_count = 0;
|
||||
op_data->copies_to_delete = (obj_ver_osd_t*)(op_data->dirty_osds + op_data->dirty_osd_count);
|
||||
for (auto dirty_pg_num: dirty_pgs)
|
||||
{
|
||||
auto & pg = pgs.at(dirty_pg_num);
|
||||
assert(pg.copies_to_delete_after_sync.size() <= this->copies_to_delete_after_sync_count);
|
||||
memcpy(
|
||||
op_data->copies_to_delete + op_data->copies_to_delete_count,
|
||||
pg.copies_to_delete_after_sync.data(),
|
||||
sizeof(obj_ver_osd_t)*pg.copies_to_delete_after_sync.size()
|
||||
);
|
||||
op_data->copies_to_delete_count += pg.copies_to_delete_after_sync.size();
|
||||
this->copies_to_delete_after_sync_count -= pg.copies_to_delete_after_sync.size();
|
||||
pg.copies_to_delete_after_sync.clear();
|
||||
}
|
||||
assert(this->copies_to_delete_after_sync_count == 0);
|
||||
}
|
||||
int dpg = 0;
|
||||
for (auto dirty_pg_num: dirty_pgs)
|
||||
{
|
||||
@@ -649,6 +701,36 @@ resume_6:
|
||||
}
|
||||
}
|
||||
}
|
||||
if (op_data->copies_to_delete)
|
||||
{
|
||||
// Return 'copies to delete' back into respective PGs
|
||||
for (int i = 0; i < op_data->copies_to_delete_count; i++)
|
||||
{
|
||||
auto & w = op_data->copies_to_delete[i];
|
||||
auto & pg = pgs.at((pool_pg_num_t){
|
||||
.pool_id = INODE_POOL(w.oid.inode),
|
||||
.pg_num = map_to_pg(w.oid, st_cli.pool_config.at(INODE_POOL(w.oid.inode)).pg_stripe_size),
|
||||
});
|
||||
if (pg.state & PG_ACTIVE)
|
||||
{
|
||||
pg.copies_to_delete_after_sync.push_back(w);
|
||||
copies_to_delete_after_sync_count++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (op_data->copies_to_delete)
|
||||
{
|
||||
// Actually delete copies which we wanted to delete
|
||||
submit_primary_del_batch(cur_op, op_data->copies_to_delete, op_data->copies_to_delete_count);
|
||||
resume_7:
|
||||
op_data->st = 7;
|
||||
return;
|
||||
resume_8:
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
goto resume_6;
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < op_data->dirty_pg_count; i++)
|
||||
{
|
||||
|
@@ -38,4 +38,8 @@ struct osd_primary_op_data_t
|
||||
osd_num_t *dirty_osds = NULL;
|
||||
int dirty_osd_count = 0;
|
||||
obj_ver_id *unstable_writes = NULL;
|
||||
obj_ver_osd_t *copies_to_delete = NULL;
|
||||
int copies_to_delete_count = 0;
|
||||
};
|
||||
|
||||
bool contains_osd(osd_num_t *osd_set, uint64_t size, osd_num_t osd_num);
|
||||
|
@@ -355,7 +355,7 @@ void osd_t::cancel_primary_write(osd_op_t *cur_op)
|
||||
}
|
||||
}
|
||||
|
||||
static bool contains_osd(osd_num_t *osd_set, uint64_t size, osd_num_t osd_num)
|
||||
bool contains_osd(osd_num_t *osd_set, uint64_t size, osd_num_t osd_num)
|
||||
{
|
||||
for (uint64_t i = 0; i < size; i++)
|
||||
{
|
||||
@@ -371,78 +371,82 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, osd_num_t *cur_set, uint
|
||||
{
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
bool rep = op_data->scheme == POOL_SCHEME_REPLICATED;
|
||||
int extra_chunks = 0;
|
||||
// ordered comparison for EC/XOR, unordered for replicated pools
|
||||
obj_ver_osd_t extra_chunks[loc_set.size()];
|
||||
int chunks_to_del = 0;
|
||||
for (auto & chunk: loc_set)
|
||||
{
|
||||
if (!cur_set || (rep ? !contains_osd(cur_set, set_size, chunk.osd_num) : chunk.osd_num != cur_set[chunk.role]))
|
||||
// ordered comparison for EC/XOR, unordered for replicated pools
|
||||
if (!cur_set || (rep
|
||||
? !contains_osd(cur_set, set_size, chunk.osd_num)
|
||||
: (chunk.osd_num != cur_set[chunk.role])))
|
||||
{
|
||||
extra_chunks++;
|
||||
extra_chunks[chunks_to_del++] = (obj_ver_osd_t){
|
||||
.osd_num = chunk.osd_num,
|
||||
.oid = {
|
||||
.inode = op_data->oid.inode,
|
||||
.stripe = op_data->oid.stripe | (rep ? 0 : chunk.role),
|
||||
},
|
||||
// Same version as write
|
||||
.version = op_data->fact_ver,
|
||||
};
|
||||
}
|
||||
}
|
||||
op_data->n_subops = extra_chunks;
|
||||
submit_primary_del_batch(cur_op, extra_chunks, chunks_to_del);
|
||||
}
|
||||
|
||||
void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_delete, int chunks_to_delete_count)
|
||||
{
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
op_data->n_subops = chunks_to_delete_count;
|
||||
op_data->done = op_data->errors = 0;
|
||||
if (!extra_chunks)
|
||||
if (!op_data->n_subops)
|
||||
{
|
||||
return;
|
||||
}
|
||||
osd_op_t *subops = new osd_op_t[extra_chunks];
|
||||
osd_op_t *subops = new osd_op_t[chunks_to_delete_count];
|
||||
op_data->subops = subops;
|
||||
int i = 0;
|
||||
for (auto & chunk: loc_set)
|
||||
for (int i = 0; i < chunks_to_delete_count; i++)
|
||||
{
|
||||
if (!cur_set || (rep ? !contains_osd(cur_set, set_size, chunk.osd_num) : chunk.osd_num != cur_set[chunk.role]))
|
||||
auto & chunk = chunks_to_delete[i];
|
||||
if (chunk.osd_num == this->osd_num)
|
||||
{
|
||||
int stripe_num = op_data->scheme == POOL_SCHEME_REPLICATED ? 0 : chunk.role;
|
||||
if (chunk.osd_num == this->osd_num)
|
||||
{
|
||||
clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin);
|
||||
subops[i].op_type = (uint64_t)cur_op;
|
||||
subops[i].bs_op = new blockstore_op_t({
|
||||
.opcode = BS_OP_DELETE,
|
||||
.callback = [subop = &subops[i], this](blockstore_op_t *bs_subop)
|
||||
{
|
||||
handle_primary_bs_subop(subop);
|
||||
},
|
||||
.oid = {
|
||||
.inode = op_data->oid.inode,
|
||||
.stripe = op_data->oid.stripe | stripe_num,
|
||||
},
|
||||
// Same version as write
|
||||
.version = op_data->fact_ver,
|
||||
});
|
||||
bs->enqueue_op(subops[i].bs_op);
|
||||
}
|
||||
else
|
||||
{
|
||||
subops[i].op_type = OSD_OP_OUT;
|
||||
subops[i].peer_fd = c_cli.osd_peer_fds.at(chunk.osd_num);
|
||||
subops[i].req.sec_del = {
|
||||
.header = {
|
||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||
.id = c_cli.next_subop_id++,
|
||||
.opcode = OSD_OP_SEC_DELETE,
|
||||
},
|
||||
.oid = {
|
||||
.inode = op_data->oid.inode,
|
||||
.stripe = op_data->oid.stripe | stripe_num,
|
||||
},
|
||||
// Same version as write
|
||||
.version = op_data->fact_ver,
|
||||
};
|
||||
subops[i].callback = [cur_op, this](osd_op_t *subop)
|
||||
clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin);
|
||||
subops[i].op_type = (uint64_t)cur_op;
|
||||
subops[i].bs_op = new blockstore_op_t({
|
||||
.opcode = BS_OP_DELETE,
|
||||
.callback = [subop = &subops[i], this](blockstore_op_t *bs_subop)
|
||||
{
|
||||
int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : -1;
|
||||
handle_primary_subop(subop, cur_op);
|
||||
if (fail_fd >= 0)
|
||||
{
|
||||
// delete operation failed, drop the connection
|
||||
c_cli.stop_client(fail_fd);
|
||||
}
|
||||
};
|
||||
c_cli.outbox_push(&subops[i]);
|
||||
}
|
||||
i++;
|
||||
handle_primary_bs_subop(subop);
|
||||
},
|
||||
.oid = chunk.oid,
|
||||
.version = chunk.version,
|
||||
});
|
||||
bs->enqueue_op(subops[i].bs_op);
|
||||
}
|
||||
else
|
||||
{
|
||||
subops[i].op_type = OSD_OP_OUT;
|
||||
subops[i].peer_fd = c_cli.osd_peer_fds.at(chunk.osd_num);
|
||||
subops[i].req.sec_del = {
|
||||
.header = {
|
||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||
.id = c_cli.next_subop_id++,
|
||||
.opcode = OSD_OP_SEC_DELETE,
|
||||
},
|
||||
.oid = chunk.oid,
|
||||
.version = chunk.version,
|
||||
};
|
||||
subops[i].callback = [cur_op, this](osd_op_t *subop)
|
||||
{
|
||||
int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : -1;
|
||||
handle_primary_subop(subop, cur_op);
|
||||
if (fail_fd >= 0)
|
||||
{
|
||||
// delete operation failed, drop the connection
|
||||
c_cli.stop_client(fail_fd);
|
||||
}
|
||||
};
|
||||
c_cli.outbox_push(&subops[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -2,20 +2,39 @@
|
||||
// License: VNPL-1.1 (see README.md for details)
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include "allocator.h"
|
||||
|
||||
void alloc_all(int size)
|
||||
{
|
||||
allocator *a = new allocator(size);
|
||||
for (int i = 0; i < size; i++)
|
||||
{
|
||||
uint64_t x = a->find_free();
|
||||
if (x == UINT64_MAX)
|
||||
{
|
||||
printf("ran out of space %d allocated=%d\n", size, i);
|
||||
exit(1);
|
||||
}
|
||||
if (x != i)
|
||||
{
|
||||
printf("incorrect block allocated: expected %d, got %lu\n", i, x);
|
||||
}
|
||||
a->set(x, true);
|
||||
}
|
||||
uint64_t x = a->find_free();
|
||||
if (x != UINT64_MAX)
|
||||
{
|
||||
printf("extra free space found: %lx (%d)\n", x, size);
|
||||
exit(1);
|
||||
}
|
||||
delete a;
|
||||
}
|
||||
|
||||
int main(int narg, char *args[])
|
||||
{
|
||||
allocator a(8192);
|
||||
for (int i = 0; i < 8192; i++)
|
||||
{
|
||||
uint64_t x = a.find_free();
|
||||
if (x == UINT64_MAX)
|
||||
{
|
||||
printf("ran out of space %d\n", i);
|
||||
return 1;
|
||||
}
|
||||
a.set(x, true);
|
||||
}
|
||||
alloc_all(8192);
|
||||
alloc_all(8062);
|
||||
alloc_all(4096);
|
||||
return 0;
|
||||
}
|
||||
|
@@ -28,8 +28,6 @@ cd ..
|
||||
node mon/mon-main.js --etcd_url http://$ETCD_URL --etcd_prefix "/vitastor" --verbose 1 &>./testdata/mon.log &
|
||||
MON_PID=$!
|
||||
|
||||
$ETCDCTL put /vitastor/config/global '{"immediate_commit":"all"}'
|
||||
|
||||
$ETCDCTL put /vitastor/config/pools '{"1":{"name":"testpool","scheme":"replicated","pg_size":2,"pg_minsize":2,"pg_count":16,"failure_domain":"osd"}}'
|
||||
|
||||
sleep 2
|
||||
|
109
tests/test_interrupted_rebalance.sh
Executable file
109
tests/test_interrupted_rebalance.sh
Executable file
@@ -0,0 +1,109 @@
|
||||
#!/bin/bash -ex
|
||||
|
||||
. `dirname $0`/common.sh
|
||||
|
||||
if [ "$IMMEDIATE_COMMIT" != "" ]; then
|
||||
NO_SAME="--journal_no_same_sector_overwrites true --journal_sector_buffer_count 1024 --disable_data_fsync 1 --immediate_commit all --log_level 1"
|
||||
$ETCDCTL put /vitastor/config/global '{"recovery_queue_depth":1,"osd_out_time":5,"immediate_commit":"all"}'
|
||||
else
|
||||
NO_SAME="--journal_sector_buffer_count 1024 --log_level 1"
|
||||
$ETCDCTL put /vitastor/config/global '{"recovery_queue_depth":1,"osd_out_time":5}'
|
||||
fi
|
||||
|
||||
dd if=/dev/zero of=./testdata/test_osd1.bin bs=1024 count=1 seek=$((1024*1024-1))
|
||||
dd if=/dev/zero of=./testdata/test_osd2.bin bs=1024 count=1 seek=$((1024*1024-1))
|
||||
dd if=/dev/zero of=./testdata/test_osd3.bin bs=1024 count=1 seek=$((1024*1024-1))
|
||||
dd if=/dev/zero of=./testdata/test_osd4.bin bs=1024 count=1 seek=$((1024*1024-1))
|
||||
dd if=/dev/zero of=./testdata/test_osd5.bin bs=1024 count=1 seek=$((1024*1024-1))
|
||||
dd if=/dev/zero of=./testdata/test_osd6.bin bs=1024 count=1 seek=$((1024*1024-1))
|
||||
dd if=/dev/zero of=./testdata/test_osd7.bin bs=1024 count=1 seek=$((1024*1024-1))
|
||||
|
||||
build/src/vitastor-osd --osd_num 1 --bind_address 127.0.0.1 $NO_SAME --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd1.bin 2>/dev/null) 2>&1 >>./testdata/osd1.log &
|
||||
OSD1_PID=$!
|
||||
build/src/vitastor-osd --osd_num 2 --bind_address 127.0.0.1 $NO_SAME --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd2.bin 2>/dev/null) 2>&1 >>./testdata/osd2.log &
|
||||
OSD2_PID=$!
|
||||
build/src/vitastor-osd --osd_num 3 --bind_address 127.0.0.1 $NO_SAME --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd3.bin 2>/dev/null) 2>&1 >>./testdata/osd3.log &
|
||||
OSD3_PID=$!
|
||||
build/src/vitastor-osd --osd_num 4 --bind_address 127.0.0.1 $NO_SAME --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd4.bin 2>/dev/null) 2>&1 >>./testdata/osd4.log &
|
||||
OSD4_PID=$!
|
||||
build/src/vitastor-osd --osd_num 5 --bind_address 127.0.0.1 $NO_SAME --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd5.bin 2>/dev/null) 2>&1 >>./testdata/osd5.log &
|
||||
OSD5_PID=$!
|
||||
build/src/vitastor-osd --osd_num 6 --bind_address 127.0.0.1 $NO_SAME --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd6.bin 2>/dev/null) 2>&1 >>./testdata/osd6.log &
|
||||
OSD6_PID=$!
|
||||
build/src/vitastor-osd --osd_num 7 --bind_address 127.0.0.1 $NO_SAME --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd7.bin 2>/dev/null) 2>&1 >>./testdata/osd7.log &
|
||||
OSD7_PID=$!
|
||||
|
||||
cd mon
|
||||
npm install
|
||||
cd ..
|
||||
node mon/mon-main.js --etcd_url http://$ETCD_URL --etcd_prefix "/vitastor" --verbose 1 &>./testdata/mon.log &
|
||||
MON_PID=$!
|
||||
|
||||
$ETCDCTL put /vitastor/config/pools '{"1":{"name":"testpool","scheme":"replicated","pg_size":2,"pg_minsize":1,"pg_count":32,"failure_domain":"osd"}}'
|
||||
|
||||
sleep 2
|
||||
|
||||
if ! ($ETCDCTL get /vitastor/config/pgs --print-value-only | jq -s -e '(.[0].items["1"] | map((.osd_set | select(. > 0)) | length == 2) | length) == 32'); then
|
||||
format_error "FAILED: 32 PGS NOT CONFIGURED"
|
||||
fi
|
||||
|
||||
if ! ($ETCDCTL get --prefix /vitastor/pg/state/ --print-value-only | jq -s -e '([ .[] | select(.state == ["active"]) ] | length) == 32'); then
|
||||
format_error "FAILED: 32 PGS NOT UP"
|
||||
fi
|
||||
|
||||
IMG_SIZE=960
|
||||
|
||||
LD_PRELOAD=libasan.so.5 \
|
||||
fio -thread -name=test -ioengine=build/src/libfio_vitastor.so -bs=4M -direct=1 -iodepth=16 -fsync=16 -rw=write \
|
||||
-etcd=$ETCD_URL -pool=1 -inode=2 -size=${IMG_SIZE}M -cluster_log_level=10
|
||||
|
||||
try_reweight()
|
||||
{
|
||||
osd=$1
|
||||
w=$2
|
||||
$ETCDCTL put /vitastor/config/osd/$osd '{"reweight":'$w'}'
|
||||
sleep 3
|
||||
}
|
||||
|
||||
try_reweight 1 0
|
||||
|
||||
try_reweight 2 0
|
||||
|
||||
try_reweight 3 0
|
||||
|
||||
try_reweight 4 0
|
||||
|
||||
try_reweight 5 0
|
||||
|
||||
try_reweight 1 1
|
||||
|
||||
try_reweight 2 1
|
||||
|
||||
try_reweight 3 1
|
||||
|
||||
try_reweight 4 1
|
||||
|
||||
try_reweight 5 1
|
||||
|
||||
# Wait for the rebalance to finish
|
||||
for i in {1..60}; do
|
||||
($ETCDCTL get --prefix /vitastor/pg/state/ --print-value-only | jq -s -e '([ .[] | select(.state == ["active"]) ] | length) == 32') && \
|
||||
break
|
||||
if [ $i -eq 60 ]; then
|
||||
format_error "Rebalance couldn't finish in 60 seconds"
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
|
||||
# Check that PGs never had degraded objects !
|
||||
if grep has_degraded ./testdata/mon.log; then
|
||||
format_error "Some copies of objects were lost during interrupted rebalancings"
|
||||
fi
|
||||
|
||||
# Check that no objects are lost !
|
||||
nobj=`$ETCDCTL get --prefix '/vitastor/pg/stats' --print-value-only | jq -s '[ .[].object_count ] | reduce .[] as $num (0; .+$num)'`
|
||||
if [ "$nobj" -ne $((IMG_SIZE*8)) ]; then
|
||||
format_error "Data lost after multiple interrupted rebalancings"
|
||||
fi
|
||||
|
||||
format_green OK
|
Reference in New Issue
Block a user