Compare commits

...

19 Commits

Author SHA1 Message Date
7e6e1a5a82 Release 0.5.10
The version seems to be stable after this bunch of fixes :)

- Fix delete & write operation ordering during rebalance to not lose objects in the immediate_commit=off mode
- Fix a possible crash caused by very high iodepths
- Re-distribute PG primaries over OSDs that come up after a short downtime
- Allow to specify etcd URLs for OSDs with http://, do not die with a strange error if -etcd option is missing for fio
- Fix a journal flushing deadlock which sometimes occurred in the immediate_commit=off mode
- Fix a bug where OSDs could hang if the data device filled up
- Fix an allocator bug where it was unable to allocate up to last (n%64) data device blocks
- Fix monitor crash that occurred on removal of some etcd keys
- Fix a bug where PGs could remain incomplete due to incorrect PG history with just zeroes in osd_sets
2021-03-16 12:48:26 +03:00
435045751d Delete objects only after a SYNC during rebalance in the non-immediate_commit mode
Previously OSDs could commit deletes before writes during recovery or rebalance
in the "lazy fsync" (immediate_commit=off) mode which could result in lost objects
2021-03-16 12:48:26 +03:00
c5fb1d5987 Do not duplicate blockstore operations when io_uring fills up
This bug was leading to OSDs dying with "Assertion `fulfilled == read_op->len' failed"
when testing fio -rw=randread -numjobs=8 -iodepth=128
2021-03-16 12:48:26 +03:00
9f59381bea Re-distribute PG primaries over OSDs that come up after a short downtime 2021-03-16 12:48:26 +03:00
9ac7e75178 Allow to specify etcd URLs for OSDs with http://, do not die with a strange error if -etcd option is missing for fio 2021-03-16 12:48:26 +03:00
88671cf745 Fix a bug causing all flushers to wait for an fsync without actually trying to do it
This happened because flusher_count became dynamic and fsync_batch() was comparing the number
of flushers currently ready to do an fsync with the maximum number of flushers. Also the number
wasn't rechecked on every loop which was also incorrect.

Now the interrupted_rebalance test passes even without IMMEDIATE_COMMIT=1.
2021-03-13 17:27:29 +03:00
fe1749c427 Fix the multiple_interrupted_rebalance test 2021-03-13 17:19:45 +03:00
ceb9c28de7 Set default log_level before passing config to etcd_state_client 2021-03-13 17:19:45 +03:00
299d7d7c95 Use common macro for get_sqe 2021-03-13 17:19:45 +03:00
d1526b415f Correctly resume writes when OSD is full to return an error 2021-03-13 17:19:45 +03:00
f49fd53d55 Fix a bug where allocator was unable to allocate up to last (n%64) blocks, add tests for it 2021-03-13 02:19:02 +03:00
dd76eda5e5 Test multiple interrupted rebalancings
Currently only passes with immediate_commit=all configuration
(env variable IMMEDIATE_COMMIT=1 for the bash script)
2021-03-12 12:55:44 +03:00
87dbd8fa57 Use empty hash as the default value for some etcd keys in the monitor 2021-03-12 12:40:15 +03:00
b44f49aab2 Ignore zero OSDs in history osd_sets 2021-03-12 12:40:15 +03:00
036555638e Release 0.5.9
- Fix two monitor bugs which led to objects being "logically lost" (physically
  present on some secondary OSDs while primary doesn't know about it) after multiple
  interrupted rebalancings
- Implement "no_recovery" and "no_rebalance" flags
2021-03-11 00:39:10 +03:00
af5155fcd9 Implement "no_recovery" and "no_rebalance" flags 2021-03-11 00:36:31 +03:00
0d2efbecc9 Preserve previous PG history when changing PG distribution
Fixes incorrect PG history in case when a new rebalance is started
before the finish of the previous one which could make primary OSDs unable
to locate some objects on some secondaries.
2021-03-11 00:16:10 +03:00
e62e8b6bae Use real pg configuration instead of the "last clean" one for generating PG history
Basically fixes the bug introduced in 0.5.7 where an rebalance interrupted
by the monitor could result in forgetting objects moved to the new place
2021-03-10 02:01:44 +03:00
c4ba24c305 Do not print ping op latency 2021-03-10 02:01:44 +03:00
32 changed files with 543 additions and 230 deletions

2
debian/changelog vendored
View File

@@ -1,4 +1,4 @@
vitastor (0.5.8-1) unstable; urgency=medium
vitastor (0.5.10-1) unstable; urgency=medium
* Bugfixes

View File

@@ -40,10 +40,10 @@ RUN set -e -x; \
mkdir -p /root/packages/vitastor-$REL; \
rm -rf /root/packages/vitastor-$REL/*; \
cd /root/packages/vitastor-$REL; \
cp -r /root/vitastor vitastor-0.5.8; \
ln -s /root/packages/qemu-$REL/qemu-*/ vitastor-0.5.8/qemu; \
ln -s /root/fio-build/fio-*/ vitastor-0.5.8/fio; \
cd vitastor-0.5.8; \
cp -r /root/vitastor vitastor-0.5.10; \
ln -s /root/packages/qemu-$REL/qemu-*/ vitastor-0.5.10/qemu; \
ln -s /root/fio-build/fio-*/ vitastor-0.5.10/fio; \
cd vitastor-0.5.10; \
FIO=$(head -n1 fio/debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
QEMU=$(head -n1 qemu/debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
sh copy-qemu-includes.sh; \
@@ -59,8 +59,8 @@ RUN set -e -x; \
echo "dep:fio=$FIO" > debian/substvars; \
echo "dep:qemu=$QEMU" >> debian/substvars; \
cd /root/packages/vitastor-$REL; \
tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.5.8.orig.tar.xz vitastor-0.5.8; \
cd vitastor-0.5.8; \
tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.5.10.orig.tar.xz vitastor-0.5.10; \
cd vitastor-0.5.10; \
V=$(head -n1 debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
DEBFULLNAME="Vitaliy Filippov <vitalif@yourcmc.ru>" dch -D $REL -v "$V""$REL" "Rebuild for $REL"; \
DEB_BUILD_OPTIONS=nocheck dpkg-buildpackage --jobs=auto -sa; \

View File

@@ -10,6 +10,14 @@ const stableStringify = require('./stable-stringify.js');
const PGUtil = require('./PGUtil.js');
// FIXME document all etcd keys and config variables in the form of JSON schema or similar
const etcd_nonempty_keys = {
'config/global': 1,
'config/node_placement': 1,
'config/pools': 1,
'config/pgs': 1,
'history/last_clean_pgs': 1,
'stats': 1,
};
const etcd_allow = new RegExp('^'+[
'config/global',
'config/node_placement',
@@ -47,6 +55,8 @@ const etcd_tree = {
client_dirty_limit: 33554432,
peer_connect_interval: 5, // seconds. min: 1
peer_connect_timeout: 5, // seconds. min: 1
osd_idle_timeout: 5, // seconds. min: 1
osd_ping_timeout: 5, // seconds. min: 1
up_wait_retry_interval: 500, // ms. min: 50
// osd
etcd_report_interval: 30, // min: 10
@@ -56,8 +66,12 @@ const etcd_tree = {
autosync_interval: 5,
client_queue_depth: 128, // unused
recovery_queue_depth: 4,
recovery_sync_batch: 16,
readonly: false,
no_recovery: false,
no_rebalance: false,
print_stats_interval: 3,
slow_log_interval: 10,
// blockstore - fixed in superblock
block_size,
disk_alignment,
@@ -77,6 +91,7 @@ const etcd_tree = {
disable_meta_fsync,
disable_device_lock,
// blockstore - configurable
max_write_iodepth,
flusher_count,
inmemory_metadata,
inmemory_journal,
@@ -627,29 +642,51 @@ class Mon
return !has_online;
}
reset_rng()
{
this.seed = 0x5f020e43;
}
rng()
{
this.seed ^= this.seed << 13;
this.seed ^= this.seed >> 17;
this.seed ^= this.seed << 5;
return this.seed + 2147483648;
}
pick_primary(pool_id, osd_set, up_osds)
{
let alive_set;
if (this.state.config.pools[pool_id].scheme === 'replicated')
alive_set = osd_set.filter(osd_num => osd_num && up_osds[osd_num]);
else
{
// Prefer data OSDs for EC because they can actually read something without an additional network hop
const pg_data_size = (this.state.config.pools[pool_id].pg_size||0) -
(this.state.config.pools[pool_id].parity_chunks||0);
alive_set = osd_set.slice(0, pg_data_size).filter(osd_num => osd_num && up_osds[osd_num]);
if (!alive_set.length)
alive_set = osd_set.filter(osd_num => osd_num && up_osds[osd_num]);
}
if (!alive_set.length)
return 0;
return alive_set[this.rng() % alive_set.length];
}
save_new_pgs_txn(request, pool_id, up_osds, prev_pgs, new_pgs, pg_history)
{
const replicated = new_pgs.length && this.state.config.pools[pool_id].scheme === 'replicated';
const pg_minsize = new_pgs.length && this.state.config.pools[pool_id].pg_minsize;
const pg_items = {};
this.reset_rng();
new_pgs.map((osd_set, i) =>
{
osd_set = osd_set.map(osd_num => osd_num === LPOptimizer.NO_OSD ? 0 : osd_num);
let alive_set;
if (replicated)
alive_set = osd_set.filter(osd_num => osd_num && up_osds[osd_num]);
else
{
// Prefer data OSDs for EC because they can actually read something without an additional network hop
alive_set = osd_set.slice(0, pg_minsize).filter(osd_num => osd_num && up_osds[osd_num]);
if (!alive_set.length)
alive_set = osd_set.filter(osd_num => osd_num && up_osds[osd_num]);
}
pg_items[i+1] = {
osd_set,
primary: alive_set.length ? alive_set[Math.floor(Math.random()*alive_set.length)] : 0,
primary: this.pick_primary(pool_id, osd_set, up_osds),
};
if (prev_pgs[i] && prev_pgs[i].join(' ') != osd_set.join(' '))
if (prev_pgs[i] && prev_pgs[i].join(' ') != osd_set.join(' ') &&
prev_pgs[i].filter(osd_num => osd_num).length > 0)
{
pg_history[i] = pg_history[i] || {};
pg_history[i].osd_sets = pg_history[i].osd_sets || [];
@@ -845,14 +882,25 @@ class Mon
pool_tree = pool_tree ? pool_tree.children : [];
pool_tree = LPOptimizer.flatten_tree(pool_tree, levels, pool_cfg.failure_domain, 'osd');
this.filter_osds_by_tags(osd_tree, pool_tree, pool_cfg.osd_tags);
const prev_pg_hash = (this.state.history.last_clean_pgs.items||{})[pool_id]
|| (this.state.config.pgs.items||{})[pool_id] || {};
const prev_pgs = [];
for (const pg in prev_pg_hash)
// These are for the purpose of building history.osd_sets
const real_prev_pgs = [];
let pg_history = [];
for (const pg in ((this.state.config.pgs.items||{})[pool_id]||{}))
{
prev_pgs[pg-1] = prev_pg_hash[pg].osd_set;
real_prev_pgs[pg-1] = this.state.config.pgs.items[pool_id][pg].osd_set;
if (this.state.pg.history[pool_id] &&
this.state.pg.history[pool_id][pg])
{
pg_history[pg-1] = this.state.pg.history[pool_id][pg];
}
}
const pg_history = [];
// And these are for the purpose of minimizing data movement
let prev_pgs = [];
for (const pg in ((this.state.history.last_clean_pgs.items||{})[pool_id]||{}))
{
prev_pgs[pg-1] = this.state.history.last_clean_pgs.items[pool_id][pg].osd_set;
}
prev_pgs = JSON.parse(JSON.stringify(prev_pgs.length ? prev_pgs : real_prev_pgs));
const old_pg_count = prev_pgs.length;
let optimize_result;
if (old_pg_count > 0)
@@ -865,7 +913,9 @@ class Mon
this.schedule_recheck();
return;
}
PGUtil.scale_pg_count(prev_pgs, this.state.pg.history[pool_id]||{}, pg_history, pool_cfg.pg_count);
const new_pg_history = [];
PGUtil.scale_pg_count(prev_pgs, pg_history, new_pg_history, pool_cfg.pg_count);
pg_history = new_pg_history;
}
for (const pg of prev_pgs)
{
@@ -909,14 +959,14 @@ class Mon
} });
}
LPOptimizer.print_change_stats(optimize_result);
this.save_new_pgs_txn(etcd_request, pool_id, up_osds, prev_pgs, optimize_result.int_pgs, pg_history);
this.save_new_pgs_txn(etcd_request, pool_id, up_osds, real_prev_pgs, optimize_result.int_pgs, pg_history);
}
this.state.config.pgs.hash = tree_hash;
await this.save_pg_config(etcd_request);
}
else
{
// Nothing changed, but we still want to check for down OSDs
// Nothing changed, but we still want to recheck the distribution of primaries
let changed = false;
for (const pool_id in this.state.config.pools)
{
@@ -926,22 +976,13 @@ class Mon
continue;
}
const replicated = pool_cfg.scheme === 'replicated';
for (const pg_num in ((this.state.config.pgs.items||{})[pool_id]||{})||{})
this.reset_rng();
for (let pg_num = 1; pg_num <= pool_cfg.pg_count; pg_num++)
{
const pg_cfg = this.state.config.pgs.items[pool_id][pg_num];
if (!Number(pg_cfg.primary) || !up_osds[pg_cfg.primary])
if (pg_cfg)
{
let alive_set;
if (replicated)
alive_set = pg_cfg.osd_set.filter(osd_num => osd_num && up_osds[osd_num]);
else
{
// Prefer data OSDs for EC because they can actually read something without an additional network hop
alive_set = pg_cfg.osd_set.slice(0, pool_cfg.pg_minsize).filter(osd_num => osd_num && up_osds[osd_num]);
if (!alive_set.length)
alive_set = pg_cfg.osd_set.filter(osd_num => osd_num && up_osds[osd_num]);
}
const new_primary = alive_set.length ? alive_set[Math.floor(Math.random()*alive_set.length)] : 0;
const new_primary = this.pick_primary(pool_id, pg_cfg.osd_set, up_osds);
if (pg_cfg.primary != new_primary)
{
console.log(
@@ -1186,16 +1227,20 @@ class Mon
console.log('Bad value in etcd: '+kv.key+' = '+kv.value);
return;
}
key = key.split('/');
let key_parts = key.split('/');
let cur = this.state;
for (let i = 0; i < key.length-1; i++)
for (let i = 0; i < key_parts.length-1; i++)
{
cur = (cur[key[i]] = cur[key[i]] || {});
cur = (cur[key_parts[i]] = cur[key_parts[i]] || {});
}
cur[key[key.length-1]] = kv.value;
if (key.join('/') === 'config/global')
if (etcd_nonempty_keys[key])
{
// Do not clear these to null
kv.value = kv.value || {};
}
cur[key_parts[key_parts.length-1]] = kv.value;
if (key === 'config/global')
{
this.state.config.global = this.state.config.global || {};
this.config = this.state.config.global;
this.check_config();
for (const osd_num in this.state.osd.stats)
@@ -1206,7 +1251,7 @@ class Mon
);
}
}
else if (key.join('/') === 'config/pools')
else if (key === 'config/pools')
{
for (const pool_id in this.state.config.pools)
{
@@ -1215,7 +1260,7 @@ class Mon
this.validate_pool_cfg(pool_id, pool_cfg, true);
}
}
else if (key[0] === 'osd' && key[1] === 'stats')
else if (key_parts[0] === 'osd' && key_parts[1] === 'stats')
{
// Recheck PGs <osd_out_time> later
this.schedule_next_recheck_at(

View File

@@ -48,4 +48,4 @@ FIO=`rpm -qi fio | perl -e 'while(<>) { /^Epoch[\s:]+(\S+)/ && print "$1:"; /^Ve
QEMU=`rpm -qi qemu qemu-kvm | perl -e 'while(<>) { /^Epoch[\s:]+(\S+)/ && print "$1:"; /^Version[\s:]+(\S+)/ && print $1; /^Release[\s:]+(\S+)/ && print "-$1"; }'`
perl -i -pe 's/(Requires:\s*fio)([^\n]+)?/$1 = '$FIO'/' $VITASTOR/rpm/vitastor-el$EL.spec
perl -i -pe 's/(Requires:\s*qemu(?:-kvm)?)([^\n]+)?/$1 = '$QEMU'/' $VITASTOR/rpm/vitastor-el$EL.spec
tar --transform 's#^#vitastor-0.5.8/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.5.8$(rpm --eval '%dist').tar.gz *
tar --transform 's#^#vitastor-0.5.10/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.5.10$(rpm --eval '%dist').tar.gz *

View File

@@ -37,7 +37,7 @@ ADD . /root/vitastor
RUN set -e; \
cd /root/vitastor/rpm; \
sh build-tarball.sh; \
cp /root/vitastor-0.5.8.el7.tar.gz ~/rpmbuild/SOURCES; \
cp /root/vitastor-0.5.10.el7.tar.gz ~/rpmbuild/SOURCES; \
cp vitastor-el7.spec ~/rpmbuild/SPECS/vitastor.spec; \
cd ~/rpmbuild/SPECS/; \
rpmbuild -ba vitastor.spec; \

View File

@@ -1,11 +1,11 @@
Name: vitastor
Version: 0.5.8
Version: 0.5.10
Release: 1%{?dist}
Summary: Vitastor, a fast software-defined clustered block storage
License: Vitastor Network Public License 1.1
URL: https://vitastor.io/
Source0: vitastor-0.5.8.el7.tar.gz
Source0: vitastor-0.5.10.el7.tar.gz
BuildRequires: liburing-devel >= 0.6
BuildRequires: gperftools-devel

View File

@@ -35,7 +35,7 @@ ADD . /root/vitastor
RUN set -e; \
cd /root/vitastor/rpm; \
sh build-tarball.sh; \
cp /root/vitastor-0.5.8.el8.tar.gz ~/rpmbuild/SOURCES; \
cp /root/vitastor-0.5.10.el8.tar.gz ~/rpmbuild/SOURCES; \
cp vitastor-el8.spec ~/rpmbuild/SPECS/vitastor.spec; \
cd ~/rpmbuild/SPECS/; \
rpmbuild -ba vitastor.spec; \

View File

@@ -1,11 +1,11 @@
Name: vitastor
Version: 0.5.8
Version: 0.5.10
Release: 1%{?dist}
Summary: Vitastor, a fast software-defined clustered block storage
License: Vitastor Network Public License 1.1
URL: https://vitastor.io/
Source0: vitastor-0.5.8.el8.tar.gz
Source0: vitastor-0.5.10.el8.tar.gz
BuildRequires: liburing-devel >= 0.6
BuildRequires: gperftools-devel

View File

@@ -13,19 +13,19 @@ allocator::allocator(uint64_t blocks)
{
throw std::invalid_argument("blocks");
}
uint64_t p2 = 1, total = 1;
uint64_t p2 = 1;
total = 0;
while (p2 * 64 < blocks)
{
p2 = p2 * 64;
total += p2;
p2 = p2 * 64;
}
total -= p2;
total += (blocks+63) / 64;
mask = new uint64_t[2 + total];
mask = new uint64_t[total];
size = free = blocks;
last_one_mask = (blocks % 64) == 0
? UINT64_MAX
: ~(UINT64_MAX << (64 - blocks % 64));
: ((1l << (blocks % 64)) - 1);
for (uint64_t i = 0; i < total; i++)
{
mask[i] = 0;
@@ -99,6 +99,10 @@ uint64_t allocator::find_free()
uint64_t p2 = 1, offset = 0, addr = 0, f, i;
while (p2 < size)
{
if (offset+addr >= total)
{
return UINT64_MAX;
}
uint64_t m = mask[offset + addr];
for (i = 0, f = 1; i < 64; i++, f <<= 1)
{
@@ -113,11 +117,6 @@ uint64_t allocator::find_free()
return UINT64_MAX;
}
addr = (addr * 64) | i;
if (addr >= size)
{
// No space
return UINT64_MAX;
}
offset += p2;
p2 = p2 * 64;
}

View File

@@ -8,6 +8,7 @@
// Hierarchical bitmap allocator
class allocator
{
uint64_t total;
uint64_t size;
uint64_t free;
uint64_t last_one_mask;

View File

@@ -823,31 +823,34 @@ bool journal_flusher_co::fsync_batch(bool fsync_meta, int wait_base)
sync_found:
cur_sync->ready_count++;
flusher->syncing_flushers++;
if (flusher->syncing_flushers >= flusher->flusher_count || !flusher->flush_queue.size())
resume_1:
if (!cur_sync->state)
{
// Sync batch is ready. Do it.
await_sqe(0);
data->iov = { 0 };
data->callback = simple_callback_w;
my_uring_prep_fsync(sqe, fsync_meta ? bs->meta_fd : bs->data_fd, IORING_FSYNC_DATASYNC);
cur_sync->state = 1;
wait_count++;
resume_1:
if (wait_count > 0)
if (flusher->syncing_flushers >= flusher->cur_flusher_count || !flusher->flush_queue.size())
{
// Sync batch is ready. Do it.
await_sqe(0);
data->iov = { 0 };
data->callback = simple_callback_w;
my_uring_prep_fsync(sqe, fsync_meta ? bs->meta_fd : bs->data_fd, IORING_FSYNC_DATASYNC);
cur_sync->state = 1;
wait_count++;
resume_2:
if (wait_count > 0)
{
wait_state = 2;
return false;
}
// Sync completed. All previous coroutines waiting for it must be resumed
cur_sync->state = 2;
bs->ringloop->wakeup();
}
else
{
// Wait until someone else sends and completes a sync.
wait_state = 1;
return false;
}
// Sync completed. All previous coroutines waiting for it must be resumed
cur_sync->state = 2;
bs->ringloop->wakeup();
}
// Wait until someone else sends and completes a sync.
resume_2:
if (!cur_sync->state)
{
wait_state = 2;
return false;
}
flusher->syncing_flushers--;
cur_sync->ready_count--;

View File

@@ -105,10 +105,10 @@ void blockstore_impl_t::loop()
// has_writes == 1 - some writes in progress
// has_writes == 2 - tried to submit some writes, but failed
int has_writes = 0, op_idx = 0, new_idx = 0;
for (; op_idx < submit_queue.size(); op_idx++)
for (; op_idx < submit_queue.size(); op_idx++, new_idx++)
{
auto op = submit_queue[op_idx];
submit_queue[new_idx++] = op;
submit_queue[new_idx] = op;
// FIXME: This needs some simplification
// Writes should not block reads if the ring is not full and reads don't depend on them
// In all other cases we should stop submission
@@ -301,7 +301,7 @@ void blockstore_impl_t::check_wait(blockstore_op_t *op)
}
else if (PRIV(op)->wait_for == WAIT_FREE)
{
if (!data_alloc->get_free_count() && !flusher->is_active())
if (!data_alloc->get_free_count() && flusher->is_active())
{
#ifdef BLOCKSTORE_DEBUG
printf("Still waiting for free space on the data device\n");

View File

@@ -191,8 +191,8 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
if (bmp_end > bmp_start)
{
// fill with zeroes
fulfill_read(read_op, fulfilled, bmp_start * bitmap_granularity,
bmp_end * bitmap_granularity, (BS_ST_DELETE | BS_ST_STABLE), 0, 0);
assert(fulfill_read(read_op, fulfilled, bmp_start * bitmap_granularity,
bmp_end * bitmap_granularity, (BS_ST_DELETE | BS_ST_STABLE), 0, 0));
}
bmp_start = bmp_end;
while (clean_entry_bitmap[bmp_end >> 3] & (1 << (bmp_end & 0x7)) && bmp_end < bmp_size)
@@ -218,7 +218,7 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
else if (fulfilled < read_op->len)
{
// fill remaining parts with zeroes
fulfill_read(read_op, fulfilled, 0, block_size, (BS_ST_DELETE | BS_ST_STABLE), 0, 0);
assert(fulfill_read(read_op, fulfilled, 0, block_size, (BS_ST_DELETE | BS_ST_STABLE), 0, 0));
}
assert(fulfilled == read_op->len);
read_op->version = result_version;

View File

@@ -126,11 +126,8 @@ resume_2:
resume_3:
if (!disable_journal_fsync)
{
io_uring_sqe *sqe = get_sqe();
if (!sqe)
{
return 0;
}
io_uring_sqe *sqe;
BS_SUBMIT_GET_SQE_DECL(sqe);
ring_data_t *data = ((ring_data_t*)sqe->user_data);
my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC);
data->iov = { 0 };

View File

@@ -150,11 +150,8 @@ resume_2:
resume_3:
if (!disable_journal_fsync)
{
io_uring_sqe *sqe = get_sqe();
if (!sqe)
{
return 0;
}
io_uring_sqe *sqe;
BS_SUBMIT_GET_SQE_DECL(sqe);
ring_data_t *data = ((ring_data_t*)sqe->user_data);
my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC);
data->iov = { 0 };

View File

@@ -401,11 +401,7 @@ int blockstore_impl_t::continue_write(blockstore_op_t *op)
goto resume_4;
resume_2:
// Only for the immediate_commit mode: prepare and submit big_write journal entry
sqe = get_sqe();
if (!sqe)
{
return 0;
}
BS_SUBMIT_GET_SQE_DECL(sqe);
je = (journal_entry_big_write*)prefill_single_journal_entry(
journal, op->opcode == BS_OP_WRITE_STABLE ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE,
sizeof(journal_entry_big_write)

View File

@@ -56,6 +56,23 @@ void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int t
http_request_json(tfd, etcd_address, req, timeout, callback);
}
void etcd_state_client_t::add_etcd_url(std::string addr)
{
if (addr.length() > 0)
{
if (strtolower(addr.substr(0, 7)) == "http://")
addr = addr.substr(7);
else if (strtolower(addr.substr(0, 8)) == "https://")
{
printf("HTTPS is unsupported for etcd. Either use plain HTTP or setup a local proxy for etcd interaction\n");
exit(1);
}
if (addr.find('/') < 0)
addr += "/v3";
this->etcd_addresses.push_back(addr);
}
}
void etcd_state_client_t::parse_config(json11::Json & config)
{
this->etcd_addresses.clear();
@@ -65,13 +82,7 @@ void etcd_state_client_t::parse_config(json11::Json & config)
while (1)
{
int pos = ea.find(',');
std::string addr = pos >= 0 ? ea.substr(0, pos) : ea;
if (addr.length() > 0)
{
if (addr.find('/') < 0)
addr += "/v3";
this->etcd_addresses.push_back(addr);
}
add_etcd_url(pos >= 0 ? ea.substr(0, pos) : ea);
if (pos >= 0)
ea = ea.substr(pos+1);
else
@@ -82,13 +93,7 @@ void etcd_state_client_t::parse_config(json11::Json & config)
{
for (auto & ea: config["etcd_address"].array_items())
{
std::string addr = ea.string_value();
if (addr != "")
{
if (addr.find('/') < 0)
addr += "/v3";
this->etcd_addresses.push_back(addr);
}
add_etcd_url(ea.string_value());
}
}
this->etcd_prefix = config["etcd_prefix"].string_value();

View File

@@ -54,6 +54,9 @@ struct pool_config_t
struct etcd_state_client_t
{
protected:
void add_etcd_url(std::string);
public:
std::vector<std::string> etcd_addresses;
std::string etcd_prefix;
int log_level = 0;

View File

@@ -117,8 +117,15 @@ static struct fio_option options[] = {
static int sec_setup(struct thread_data *td)
{
sec_options *o = (sec_options*)td->eo;
sec_data *bsd;
if (!o->etcd_host)
{
td_verror(td, EINVAL, "etcd address is missing");
return 1;
}
bsd = new sec_data;
if (!bsd)
{

View File

@@ -22,7 +22,6 @@
#define READ_BUFFER_SIZE 9000
static int extract_port(std::string & host);
static std::string strtolower(const std::string & in);
static std::string trim(const std::string & in);
static std::string ws_format_frame(int type, uint64_t size);
static bool ws_parse_frame(std::string & buf, int & type, std::string & res);
@@ -673,7 +672,7 @@ static int extract_port(std::string & host)
return port;
}
static std::string strtolower(const std::string & in)
std::string strtolower(const std::string & in)
{
std::string s = in;
for (int i = 0; i < s.length(); i++)

View File

@@ -49,6 +49,8 @@ std::vector<std::string> getifaddr_list(bool include_v6 = false);
uint64_t stoull_full(const std::string & str, int base = 10);
std::string strtolower(const std::string & in);
void http_request(timerfd_manager_t *tfd, const std::string & host, const std::string & request,
const http_options_t & options, std::function<void(const http_response_t *response)> callback);

View File

@@ -56,6 +56,7 @@ void osd_t::parse_config(blockstore_config_t & config)
{
if (config.find("log_level") == config.end())
config["log_level"] = "1";
log_level = strtoull(config["log_level"].c_str(), NULL, 10);
// Initial startup configuration
json11::Json json_config = json11::Json(config);
st_cli.parse_config(json_config);
@@ -67,6 +68,8 @@ void osd_t::parse_config(blockstore_config_t & config)
throw std::runtime_error("osd_num is required in the configuration");
c_cli.osd_num = osd_num;
run_primary = config["run_primary"] != "false" && config["run_primary"] != "0" && config["run_primary"] != "no";
no_rebalance = config["no_rebalance"] == "true" || config["no_rebalance"] == "1" || config["no_rebalance"] == "yes";
no_recovery = config["no_recovery"] == "true" || config["no_recovery"] == "1" || config["no_recovery"] == "yes";
// Cluster configuration
bind_address = config["bind_address"];
if (bind_address == "")
@@ -93,6 +96,9 @@ void osd_t::parse_config(blockstore_config_t & config)
recovery_queue_depth = strtoull(config["recovery_queue_depth"].c_str(), NULL, 10);
if (recovery_queue_depth < 1 || recovery_queue_depth > MAX_RECOVERY_QUEUE)
recovery_queue_depth = DEFAULT_RECOVERY_QUEUE;
recovery_sync_batch = strtoull(config["recovery_sync_batch"].c_str(), NULL, 10);
if (recovery_sync_batch < 1 || recovery_sync_batch > MAX_RECOVERY_QUEUE)
recovery_sync_batch = DEFAULT_RECOVERY_BATCH;
if (config["readonly"] == "true" || config["readonly"] == "1" || config["readonly"] == "yes")
readonly = true;
print_stats_interval = strtoull(config["print_stats_interval"].c_str(), NULL, 10);
@@ -261,9 +267,9 @@ void osd_t::reset_stats()
void osd_t::print_stats()
{
for (int i = 0; i <= OSD_OP_MAX; i++)
for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++)
{
if (c_cli.stats.op_stat_count[i] != prev_stats.op_stat_count[i])
if (c_cli.stats.op_stat_count[i] != prev_stats.op_stat_count[i] && i != OSD_OP_PING)
{
uint64_t avg = (c_cli.stats.op_stat_sum[i] - prev_stats.op_stat_sum[i])/(c_cli.stats.op_stat_count[i] - prev_stats.op_stat_count[i]);
uint64_t bw = (c_cli.stats.op_stat_bytes[i] - prev_stats.op_stat_bytes[i]) / print_stats_interval;
@@ -284,7 +290,7 @@ void osd_t::print_stats()
prev_stats.op_stat_bytes[i] = c_cli.stats.op_stat_bytes[i];
}
}
for (int i = 0; i <= OSD_OP_MAX; i++)
for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++)
{
if (c_cli.stats.subop_stat_count[i] != prev_stats.subop_stat_count[i])
{

View File

@@ -37,6 +37,7 @@
#define DEFAULT_AUTOSYNC_INTERVAL 5
#define MAX_RECOVERY_QUEUE 2048
#define DEFAULT_RECOVERY_QUEUE 4
#define DEFAULT_RECOVERY_BATCH 16
//#define OSD_STUB
@@ -64,6 +65,8 @@ class osd_t
bool readonly = false;
osd_num_t osd_num = 1; // OSD numbers start with 1
bool run_primary = false;
bool no_rebalance = false;
bool no_recovery = false;
std::string bind_address;
int bind_port, listen_backlog;
// FIXME: Implement client queue depth limit
@@ -74,6 +77,7 @@ class osd_t
int immediate_commit = IMMEDIATE_NONE;
int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // sync every 5 seconds
int recovery_queue_depth = DEFAULT_RECOVERY_QUEUE;
int recovery_sync_batch = DEFAULT_RECOVERY_BATCH;
int log_level = 0;
// cluster state
@@ -95,9 +99,11 @@ class osd_t
std::map<pool_pg_num_t, pg_t> pgs;
std::set<pool_pg_num_t> dirty_pgs;
std::set<osd_num_t> dirty_osds;
int copies_to_delete_after_sync_count = 0;
uint64_t misplaced_objects = 0, degraded_objects = 0, incomplete_objects = 0;
int peering_state = 0;
std::map<object_id, osd_recovery_op_t> recovery_ops;
int recovery_done = 0;
osd_op_t *autosync_op = NULL;
// Unstable writes
@@ -199,6 +205,7 @@ class osd_t
void pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval);
void submit_primary_subops(int submit_type, uint64_t op_version, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op);
void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, uint64_t set_size, pg_osd_set_t & loc_set);
void submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_delete, int chunks_to_delete_count);
void submit_primary_sync_subops(osd_op_t *cur_op);
void submit_primary_stab_subops(osd_op_t *cur_op);

View File

@@ -209,32 +209,38 @@ void osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t
bool osd_t::pick_next_recovery(osd_recovery_op_t &op)
{
for (auto pg_it = pgs.begin(); pg_it != pgs.end(); pg_it++)
if (!no_recovery)
{
if ((pg_it->second.state & (PG_ACTIVE | PG_HAS_DEGRADED)) == (PG_ACTIVE | PG_HAS_DEGRADED))
for (auto pg_it = pgs.begin(); pg_it != pgs.end(); pg_it++)
{
for (auto obj_it = pg_it->second.degraded_objects.begin(); obj_it != pg_it->second.degraded_objects.end(); obj_it++)
if ((pg_it->second.state & (PG_ACTIVE | PG_HAS_DEGRADED)) == (PG_ACTIVE | PG_HAS_DEGRADED))
{
if (recovery_ops.find(obj_it->first) == recovery_ops.end())
for (auto obj_it = pg_it->second.degraded_objects.begin(); obj_it != pg_it->second.degraded_objects.end(); obj_it++)
{
op.degraded = true;
op.oid = obj_it->first;
return true;
if (recovery_ops.find(obj_it->first) == recovery_ops.end())
{
op.degraded = true;
op.oid = obj_it->first;
return true;
}
}
}
}
}
for (auto pg_it = pgs.begin(); pg_it != pgs.end(); pg_it++)
if (!no_rebalance)
{
if ((pg_it->second.state & (PG_ACTIVE | PG_HAS_MISPLACED)) == (PG_ACTIVE | PG_HAS_MISPLACED))
for (auto pg_it = pgs.begin(); pg_it != pgs.end(); pg_it++)
{
for (auto obj_it = pg_it->second.misplaced_objects.begin(); obj_it != pg_it->second.misplaced_objects.end(); obj_it++)
if ((pg_it->second.state & (PG_ACTIVE | PG_HAS_MISPLACED)) == (PG_ACTIVE | PG_HAS_MISPLACED))
{
if (recovery_ops.find(obj_it->first) == recovery_ops.end())
for (auto obj_it = pg_it->second.misplaced_objects.begin(); obj_it != pg_it->second.misplaced_objects.end(); obj_it++)
{
op.degraded = false;
op.oid = obj_it->first;
return true;
if (recovery_ops.find(obj_it->first) == recovery_ops.end())
{
op.degraded = false;
op.oid = obj_it->first;
return true;
}
}
}
}
@@ -264,7 +270,6 @@ void osd_t::submit_recovery_op(osd_recovery_op_t *op)
}
op->osd_op->callback = [this, op](osd_op_t *osd_op)
{
// Don't sync the write, it will be synced by our regular sync coroutine
if (osd_op->reply.hdr.retval < 0)
{
// Error recovering object
@@ -286,6 +291,17 @@ void osd_t::submit_recovery_op(osd_recovery_op_t *op)
op->osd_op = NULL;
recovery_ops.erase(op->oid);
delete osd_op;
if (immediate_commit != IMMEDIATE_ALL)
{
recovery_done++;
if (recovery_done >= recovery_sync_batch)
{
// Force sync every <recovery_sync_batch> operations
// This is required not to pile up an excessive amount of delete operations
autosync();
recovery_done = 0;
}
}
continue_recovery();
};
exec_op(op->osd_op);

View File

@@ -103,6 +103,8 @@ void osd_t::reset_pg(pg_t & pg)
{
pg.cur_peers.clear();
pg.state_dict.clear();
copies_to_delete_after_sync_count -= pg.copies_to_delete_after_sync.size();
pg.copies_to_delete_after_sync.clear();
incomplete_objects -= pg.incomplete_objects.size();
misplaced_objects -= pg.misplaced_objects.size();
degraded_objects -= pg.degraded_objects.size();
@@ -180,14 +182,18 @@ void osd_t::start_pg_peering(pg_t & pg)
// (PG history is kept up to the latest active+clean state)
for (auto & history_set: pg.target_history)
{
bool found = false;
bool found = true;
for (auto history_osd: history_set)
{
if (history_osd != 0 && (history_osd == this->osd_num ||
c_cli.osd_peer_fds.find(history_osd) != c_cli.osd_peer_fds.end()))
if (history_osd != 0)
{
found = true;
break;
found = false;
if (history_osd == this->osd_num ||
c_cli.osd_peer_fds.find(history_osd) != c_cli.osd_peer_fds.end())
{
found = true;
break;
}
}
}
if (!found)

View File

@@ -56,6 +56,13 @@ struct obj_piece_id_t
uint64_t osd_num;
};
struct obj_ver_osd_t
{
uint64_t osd_num;
object_id oid;
uint64_t version;
};
struct flush_action_t
{
bool rollback = false, make_stable = false;
@@ -101,6 +108,7 @@ struct pg_t
std::map<pg_osd_set_t, pg_osd_set_state_t> state_dict;
btree::btree_map<object_id, pg_osd_set_state_t*> incomplete_objects, misplaced_objects, degraded_objects;
std::map<obj_piece_id_t, flush_action_t> flush_actions;
std::vector<obj_ver_osd_t> copies_to_delete_after_sync;
btree::btree_map<object_id, uint64_t> ver_override;
pg_peering_state_t *peering_state = NULL;
pg_flush_batch_t *flush_batch = NULL;

View File

@@ -367,17 +367,44 @@ resume_7:
}
// Any kind of a non-clean object can have extra chunks, because we don't record objects
// as degraded & misplaced or incomplete & misplaced at the same time. So try to remove extra chunks
submit_primary_del_subops(cur_op, pg.cur_set.data(), pg.pg_size, op_data->object_state->osd_set);
if (op_data->n_subops > 0)
if (immediate_commit != IMMEDIATE_ALL)
{
resume_8:
op_data->st = 8;
return;
resume_9:
if (op_data->errors > 0)
// We can't remove extra chunks yet if fsyncs are explicit, because
// new copies may not be committed to stable storage yet
// We can only remove extra chunks after a successful SYNC for this PG
for (auto & chunk: op_data->object_state->osd_set)
{
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
// Check is the same as in submit_primary_del_subops()
if (op_data->scheme == POOL_SCHEME_REPLICATED
? !contains_osd(pg.cur_set.data(), pg.pg_size, chunk.osd_num)
: (chunk.osd_num != pg.cur_set[chunk.role]))
{
pg.copies_to_delete_after_sync.push_back((obj_ver_osd_t){
.osd_num = chunk.osd_num,
.oid = {
.inode = op_data->oid.inode,
.stripe = op_data->oid.stripe | (op_data->scheme == POOL_SCHEME_REPLICATED ? 0 : chunk.role),
},
.version = op_data->fact_ver,
});
copies_to_delete_after_sync_count++;
}
}
}
else
{
submit_primary_del_subops(cur_op, pg.cur_set.data(), pg.pg_size, op_data->object_state->osd_set);
if (op_data->n_subops > 0)
{
resume_8:
op_data->st = 8;
return;
resume_9:
if (op_data->errors > 0)
{
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
return;
}
}
}
// Clear object state
@@ -511,6 +538,8 @@ void osd_t::continue_primary_sync(osd_op_t *cur_op)
else if (op_data->st == 4) goto resume_4;
else if (op_data->st == 5) goto resume_5;
else if (op_data->st == 6) goto resume_6;
else if (op_data->st == 7) goto resume_7;
else if (op_data->st == 8) goto resume_8;
assert(op_data->st == 0);
if (syncs_in_progress.size() > 0)
{
@@ -572,11 +601,34 @@ resume_2:
this->unstable_writes.clear();
}
{
void *dirty_buf = malloc_or_die(sizeof(pool_pg_num_t)*dirty_pgs.size() + sizeof(osd_num_t)*dirty_osds.size());
void *dirty_buf = malloc_or_die(
sizeof(pool_pg_num_t)*dirty_pgs.size() +
sizeof(osd_num_t)*dirty_osds.size() +
sizeof(obj_ver_osd_t)*this->copies_to_delete_after_sync_count
);
op_data->dirty_pgs = (pool_pg_num_t*)dirty_buf;
op_data->dirty_osds = (osd_num_t*)(dirty_buf + sizeof(pool_pg_num_t)*dirty_pgs.size());
op_data->dirty_pg_count = dirty_pgs.size();
op_data->dirty_osd_count = dirty_osds.size();
if (this->copies_to_delete_after_sync_count)
{
op_data->copies_to_delete_count = 0;
op_data->copies_to_delete = (obj_ver_osd_t*)(op_data->dirty_osds + op_data->dirty_osd_count);
for (auto dirty_pg_num: dirty_pgs)
{
auto & pg = pgs.at(dirty_pg_num);
assert(pg.copies_to_delete_after_sync.size() <= this->copies_to_delete_after_sync_count);
memcpy(
op_data->copies_to_delete + op_data->copies_to_delete_count,
pg.copies_to_delete_after_sync.data(),
sizeof(obj_ver_osd_t)*pg.copies_to_delete_after_sync.size()
);
op_data->copies_to_delete_count += pg.copies_to_delete_after_sync.size();
this->copies_to_delete_after_sync_count -= pg.copies_to_delete_after_sync.size();
pg.copies_to_delete_after_sync.clear();
}
assert(this->copies_to_delete_after_sync_count == 0);
}
int dpg = 0;
for (auto dirty_pg_num: dirty_pgs)
{
@@ -649,6 +701,36 @@ resume_6:
}
}
}
if (op_data->copies_to_delete)
{
// Return 'copies to delete' back into respective PGs
for (int i = 0; i < op_data->copies_to_delete_count; i++)
{
auto & w = op_data->copies_to_delete[i];
auto & pg = pgs.at((pool_pg_num_t){
.pool_id = INODE_POOL(w.oid.inode),
.pg_num = map_to_pg(w.oid, st_cli.pool_config.at(INODE_POOL(w.oid.inode)).pg_stripe_size),
});
if (pg.state & PG_ACTIVE)
{
pg.copies_to_delete_after_sync.push_back(w);
copies_to_delete_after_sync_count++;
}
}
}
}
else if (op_data->copies_to_delete)
{
// Actually delete copies which we wanted to delete
submit_primary_del_batch(cur_op, op_data->copies_to_delete, op_data->copies_to_delete_count);
resume_7:
op_data->st = 7;
return;
resume_8:
if (op_data->errors > 0)
{
goto resume_6;
}
}
for (int i = 0; i < op_data->dirty_pg_count; i++)
{

View File

@@ -38,4 +38,8 @@ struct osd_primary_op_data_t
osd_num_t *dirty_osds = NULL;
int dirty_osd_count = 0;
obj_ver_id *unstable_writes = NULL;
obj_ver_osd_t *copies_to_delete = NULL;
int copies_to_delete_count = 0;
};
bool contains_osd(osd_num_t *osd_set, uint64_t size, osd_num_t osd_num);

View File

@@ -355,7 +355,7 @@ void osd_t::cancel_primary_write(osd_op_t *cur_op)
}
}
static bool contains_osd(osd_num_t *osd_set, uint64_t size, osd_num_t osd_num)
bool contains_osd(osd_num_t *osd_set, uint64_t size, osd_num_t osd_num)
{
for (uint64_t i = 0; i < size; i++)
{
@@ -371,78 +371,82 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, osd_num_t *cur_set, uint
{
osd_primary_op_data_t *op_data = cur_op->op_data;
bool rep = op_data->scheme == POOL_SCHEME_REPLICATED;
int extra_chunks = 0;
// ordered comparison for EC/XOR, unordered for replicated pools
obj_ver_osd_t extra_chunks[loc_set.size()];
int chunks_to_del = 0;
for (auto & chunk: loc_set)
{
if (!cur_set || (rep ? !contains_osd(cur_set, set_size, chunk.osd_num) : chunk.osd_num != cur_set[chunk.role]))
// ordered comparison for EC/XOR, unordered for replicated pools
if (!cur_set || (rep
? !contains_osd(cur_set, set_size, chunk.osd_num)
: (chunk.osd_num != cur_set[chunk.role])))
{
extra_chunks++;
extra_chunks[chunks_to_del++] = (obj_ver_osd_t){
.osd_num = chunk.osd_num,
.oid = {
.inode = op_data->oid.inode,
.stripe = op_data->oid.stripe | (rep ? 0 : chunk.role),
},
// Same version as write
.version = op_data->fact_ver,
};
}
}
op_data->n_subops = extra_chunks;
submit_primary_del_batch(cur_op, extra_chunks, chunks_to_del);
}
void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_delete, int chunks_to_delete_count)
{
osd_primary_op_data_t *op_data = cur_op->op_data;
op_data->n_subops = chunks_to_delete_count;
op_data->done = op_data->errors = 0;
if (!extra_chunks)
if (!op_data->n_subops)
{
return;
}
osd_op_t *subops = new osd_op_t[extra_chunks];
osd_op_t *subops = new osd_op_t[chunks_to_delete_count];
op_data->subops = subops;
int i = 0;
for (auto & chunk: loc_set)
for (int i = 0; i < chunks_to_delete_count; i++)
{
if (!cur_set || (rep ? !contains_osd(cur_set, set_size, chunk.osd_num) : chunk.osd_num != cur_set[chunk.role]))
auto & chunk = chunks_to_delete[i];
if (chunk.osd_num == this->osd_num)
{
int stripe_num = op_data->scheme == POOL_SCHEME_REPLICATED ? 0 : chunk.role;
if (chunk.osd_num == this->osd_num)
{
clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin);
subops[i].op_type = (uint64_t)cur_op;
subops[i].bs_op = new blockstore_op_t({
.opcode = BS_OP_DELETE,
.callback = [subop = &subops[i], this](blockstore_op_t *bs_subop)
{
handle_primary_bs_subop(subop);
},
.oid = {
.inode = op_data->oid.inode,
.stripe = op_data->oid.stripe | stripe_num,
},
// Same version as write
.version = op_data->fact_ver,
});
bs->enqueue_op(subops[i].bs_op);
}
else
{
subops[i].op_type = OSD_OP_OUT;
subops[i].peer_fd = c_cli.osd_peer_fds.at(chunk.osd_num);
subops[i].req.sec_del = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = c_cli.next_subop_id++,
.opcode = OSD_OP_SEC_DELETE,
},
.oid = {
.inode = op_data->oid.inode,
.stripe = op_data->oid.stripe | stripe_num,
},
// Same version as write
.version = op_data->fact_ver,
};
subops[i].callback = [cur_op, this](osd_op_t *subop)
clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin);
subops[i].op_type = (uint64_t)cur_op;
subops[i].bs_op = new blockstore_op_t({
.opcode = BS_OP_DELETE,
.callback = [subop = &subops[i], this](blockstore_op_t *bs_subop)
{
int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : -1;
handle_primary_subop(subop, cur_op);
if (fail_fd >= 0)
{
// delete operation failed, drop the connection
c_cli.stop_client(fail_fd);
}
};
c_cli.outbox_push(&subops[i]);
}
i++;
handle_primary_bs_subop(subop);
},
.oid = chunk.oid,
.version = chunk.version,
});
bs->enqueue_op(subops[i].bs_op);
}
else
{
subops[i].op_type = OSD_OP_OUT;
subops[i].peer_fd = c_cli.osd_peer_fds.at(chunk.osd_num);
subops[i].req.sec_del = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = c_cli.next_subop_id++,
.opcode = OSD_OP_SEC_DELETE,
},
.oid = chunk.oid,
.version = chunk.version,
};
subops[i].callback = [cur_op, this](osd_op_t *subop)
{
int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : -1;
handle_primary_subop(subop, cur_op);
if (fail_fd >= 0)
{
// delete operation failed, drop the connection
c_cli.stop_client(fail_fd);
}
};
c_cli.outbox_push(&subops[i]);
}
}
}

View File

@@ -2,20 +2,39 @@
// License: VNPL-1.1 (see README.md for details)
#include <stdio.h>
#include <stdlib.h>
#include "allocator.h"
void alloc_all(int size)
{
allocator *a = new allocator(size);
for (int i = 0; i < size; i++)
{
uint64_t x = a->find_free();
if (x == UINT64_MAX)
{
printf("ran out of space %d allocated=%d\n", size, i);
exit(1);
}
if (x != i)
{
printf("incorrect block allocated: expected %d, got %lu\n", i, x);
}
a->set(x, true);
}
uint64_t x = a->find_free();
if (x != UINT64_MAX)
{
printf("extra free space found: %lx (%d)\n", x, size);
exit(1);
}
delete a;
}
int main(int narg, char *args[])
{
allocator a(8192);
for (int i = 0; i < 8192; i++)
{
uint64_t x = a.find_free();
if (x == UINT64_MAX)
{
printf("ran out of space %d\n", i);
return 1;
}
a.set(x, true);
}
alloc_all(8192);
alloc_all(8062);
alloc_all(4096);
return 0;
}

View File

@@ -28,8 +28,6 @@ cd ..
node mon/mon-main.js --etcd_url http://$ETCD_URL --etcd_prefix "/vitastor" --verbose 1 &>./testdata/mon.log &
MON_PID=$!
$ETCDCTL put /vitastor/config/global '{"immediate_commit":"all"}'
$ETCDCTL put /vitastor/config/pools '{"1":{"name":"testpool","scheme":"replicated","pg_size":2,"pg_minsize":2,"pg_count":16,"failure_domain":"osd"}}'
sleep 2

View File

@@ -0,0 +1,109 @@
#!/bin/bash -ex
. `dirname $0`/common.sh
if [ "$IMMEDIATE_COMMIT" != "" ]; then
NO_SAME="--journal_no_same_sector_overwrites true --journal_sector_buffer_count 1024 --disable_data_fsync 1 --immediate_commit all --log_level 1"
$ETCDCTL put /vitastor/config/global '{"recovery_queue_depth":1,"osd_out_time":5,"immediate_commit":"all"}'
else
NO_SAME="--journal_sector_buffer_count 1024 --log_level 1"
$ETCDCTL put /vitastor/config/global '{"recovery_queue_depth":1,"osd_out_time":5}'
fi
dd if=/dev/zero of=./testdata/test_osd1.bin bs=1024 count=1 seek=$((1024*1024-1))
dd if=/dev/zero of=./testdata/test_osd2.bin bs=1024 count=1 seek=$((1024*1024-1))
dd if=/dev/zero of=./testdata/test_osd3.bin bs=1024 count=1 seek=$((1024*1024-1))
dd if=/dev/zero of=./testdata/test_osd4.bin bs=1024 count=1 seek=$((1024*1024-1))
dd if=/dev/zero of=./testdata/test_osd5.bin bs=1024 count=1 seek=$((1024*1024-1))
dd if=/dev/zero of=./testdata/test_osd6.bin bs=1024 count=1 seek=$((1024*1024-1))
dd if=/dev/zero of=./testdata/test_osd7.bin bs=1024 count=1 seek=$((1024*1024-1))
build/src/vitastor-osd --osd_num 1 --bind_address 127.0.0.1 $NO_SAME --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd1.bin 2>/dev/null) 2>&1 >>./testdata/osd1.log &
OSD1_PID=$!
build/src/vitastor-osd --osd_num 2 --bind_address 127.0.0.1 $NO_SAME --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd2.bin 2>/dev/null) 2>&1 >>./testdata/osd2.log &
OSD2_PID=$!
build/src/vitastor-osd --osd_num 3 --bind_address 127.0.0.1 $NO_SAME --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd3.bin 2>/dev/null) 2>&1 >>./testdata/osd3.log &
OSD3_PID=$!
build/src/vitastor-osd --osd_num 4 --bind_address 127.0.0.1 $NO_SAME --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd4.bin 2>/dev/null) 2>&1 >>./testdata/osd4.log &
OSD4_PID=$!
build/src/vitastor-osd --osd_num 5 --bind_address 127.0.0.1 $NO_SAME --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd5.bin 2>/dev/null) 2>&1 >>./testdata/osd5.log &
OSD5_PID=$!
build/src/vitastor-osd --osd_num 6 --bind_address 127.0.0.1 $NO_SAME --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd6.bin 2>/dev/null) 2>&1 >>./testdata/osd6.log &
OSD6_PID=$!
build/src/vitastor-osd --osd_num 7 --bind_address 127.0.0.1 $NO_SAME --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd7.bin 2>/dev/null) 2>&1 >>./testdata/osd7.log &
OSD7_PID=$!
cd mon
npm install
cd ..
node mon/mon-main.js --etcd_url http://$ETCD_URL --etcd_prefix "/vitastor" --verbose 1 &>./testdata/mon.log &
MON_PID=$!
$ETCDCTL put /vitastor/config/pools '{"1":{"name":"testpool","scheme":"replicated","pg_size":2,"pg_minsize":1,"pg_count":32,"failure_domain":"osd"}}'
sleep 2
if ! ($ETCDCTL get /vitastor/config/pgs --print-value-only | jq -s -e '(.[0].items["1"] | map((.osd_set | select(. > 0)) | length == 2) | length) == 32'); then
format_error "FAILED: 32 PGS NOT CONFIGURED"
fi
if ! ($ETCDCTL get --prefix /vitastor/pg/state/ --print-value-only | jq -s -e '([ .[] | select(.state == ["active"]) ] | length) == 32'); then
format_error "FAILED: 32 PGS NOT UP"
fi
IMG_SIZE=960
LD_PRELOAD=libasan.so.5 \
fio -thread -name=test -ioengine=build/src/libfio_vitastor.so -bs=4M -direct=1 -iodepth=16 -fsync=16 -rw=write \
-etcd=$ETCD_URL -pool=1 -inode=2 -size=${IMG_SIZE}M -cluster_log_level=10
try_reweight()
{
osd=$1
w=$2
$ETCDCTL put /vitastor/config/osd/$osd '{"reweight":'$w'}'
sleep 3
}
try_reweight 1 0
try_reweight 2 0
try_reweight 3 0
try_reweight 4 0
try_reweight 5 0
try_reweight 1 1
try_reweight 2 1
try_reweight 3 1
try_reweight 4 1
try_reweight 5 1
# Wait for the rebalance to finish
for i in {1..60}; do
($ETCDCTL get --prefix /vitastor/pg/state/ --print-value-only | jq -s -e '([ .[] | select(.state == ["active"]) ] | length) == 32') && \
break
if [ $i -eq 60 ]; then
format_error "Rebalance couldn't finish in 60 seconds"
fi
sleep 1
done
# Check that PGs never had degraded objects !
if grep has_degraded ./testdata/mon.log; then
format_error "Some copies of objects were lost during interrupted rebalancings"
fi
# Check that no objects are lost !
nobj=`$ETCDCTL get --prefix '/vitastor/pg/stats' --print-value-only | jq -s '[ .[].object_count ] | reduce .[] as $num (0; .+$num)'`
if [ "$nobj" -ne $((IMG_SIZE*8)) ]; then
format_error "Data lost after multiple interrupted rebalancings"
fi
format_green OK