forked from vitalif/vitastor
Compare commits
39 Commits
Author | SHA1 | Date | |
---|---|---|---|
036555638e | |||
af5155fcd9 | |||
0d2efbecc9 | |||
e62e8b6bae | |||
c4ba24c305 | |||
19e47a0279 | |||
bd178ac20f | |||
7006875a24 | |||
ad577c4aac | |||
836635c518 | |||
88a03f4e98 | |||
2a5036669d | |||
2e0c853180 | |||
e91ff2a9ec | |||
086667f568 | |||
73ce20e246 | |||
1be94da437 | |||
80e12358a2 | |||
36c935ace6 | |||
0d8b5e2ef9 | |||
98f1e2c277 | |||
21e7686037 | |||
ab21a1908b | |||
30d1ccd43e | |||
8bdd6d8d78 | |||
09b3e4e789 | |||
07912fd670 | |||
bc742ccf8c | |||
314b20437b | |||
29bac892ad | |||
cf7547faf3 | |||
ab90ed747f | |||
29d8ac8b1b | |||
97795ea1b1 | |||
24e7075f08 | |||
6155b23a7e | |||
7d49706c07 | |||
46e79f3306 | |||
41fd14e024 |
18
.gitignore
vendored
Normal file
18
.gitignore
vendored
Normal file
@@ -0,0 +1,18 @@
|
||||
*.o
|
||||
*.so
|
||||
package-lock.json
|
||||
fio
|
||||
qemu
|
||||
osd
|
||||
stub_osd
|
||||
stub_uring_osd
|
||||
stub_bench
|
||||
osd_test
|
||||
osd_peering_pg_test
|
||||
dump_journal
|
||||
nbd_proxy
|
||||
rm_inode
|
||||
test_allocator
|
||||
test_blockstore
|
||||
test_shit
|
||||
osd_rmw_test
|
2
debian/build-vitastor-bullseye.sh
vendored
2
debian/build-vitastor-bullseye.sh
vendored
@@ -1,6 +1,6 @@
|
||||
#!/bin/bash
|
||||
|
||||
sed 's/$REL/bullseye/' < vitastor.Dockerfile > ../Dockerfile
|
||||
sed 's/$REL/bullseye/g' < vitastor.Dockerfile > ../Dockerfile
|
||||
cd ..
|
||||
mkdir -p packages
|
||||
sudo podman build -v `pwd`/packages:/root/packages -f Dockerfile .
|
||||
|
2
debian/build-vitastor-buster.sh
vendored
2
debian/build-vitastor-buster.sh
vendored
@@ -1,6 +1,6 @@
|
||||
#!/bin/bash
|
||||
|
||||
sed 's/$REL/buster/' < vitastor.Dockerfile > ../Dockerfile
|
||||
sed 's/$REL/buster/g' < vitastor.Dockerfile > ../Dockerfile
|
||||
cd ..
|
||||
mkdir -p packages
|
||||
sudo podman build -v `pwd`/packages:/root/packages -f Dockerfile .
|
||||
|
2
debian/changelog
vendored
2
debian/changelog
vendored
@@ -1,4 +1,4 @@
|
||||
vitastor (0.5.5-1) unstable; urgency=medium
|
||||
vitastor (0.5.9-1) unstable; urgency=medium
|
||||
|
||||
* Bugfixes
|
||||
|
||||
|
12
debian/vitastor.Dockerfile
vendored
12
debian/vitastor.Dockerfile
vendored
@@ -40,10 +40,10 @@ RUN set -e -x; \
|
||||
mkdir -p /root/packages/vitastor-$REL; \
|
||||
rm -rf /root/packages/vitastor-$REL/*; \
|
||||
cd /root/packages/vitastor-$REL; \
|
||||
cp -r /root/vitastor vitastor-0.5.5; \
|
||||
ln -s /root/packages/qemu-$REL/qemu-*/ vitastor-0.5.5/qemu; \
|
||||
ln -s /root/fio-build/fio-*/ vitastor-0.5.5/fio; \
|
||||
cd vitastor-0.5.5; \
|
||||
cp -r /root/vitastor vitastor-0.5.9; \
|
||||
ln -s /root/packages/qemu-$REL/qemu-*/ vitastor-0.5.9/qemu; \
|
||||
ln -s /root/fio-build/fio-*/ vitastor-0.5.9/fio; \
|
||||
cd vitastor-0.5.9; \
|
||||
FIO=$(head -n1 fio/debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
|
||||
QEMU=$(head -n1 qemu/debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
|
||||
sh copy-qemu-includes.sh; \
|
||||
@@ -59,8 +59,8 @@ RUN set -e -x; \
|
||||
echo "dep:fio=$FIO" > debian/substvars; \
|
||||
echo "dep:qemu=$QEMU" >> debian/substvars; \
|
||||
cd /root/packages/vitastor-$REL; \
|
||||
tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.5.5.orig.tar.xz vitastor-0.5.5; \
|
||||
cd vitastor-0.5.5; \
|
||||
tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.5.9.orig.tar.xz vitastor-0.5.9; \
|
||||
cd vitastor-0.5.9; \
|
||||
V=$(head -n1 debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
|
||||
DEBFULLNAME="Vitaliy Filippov <vitalif@yourcmc.ru>" dch -D $REL -v "$V""$REL" "Rebuild for $REL"; \
|
||||
DEB_BUILD_OPTIONS=nocheck dpkg-buildpackage --jobs=auto -sa; \
|
||||
|
@@ -1,51 +0,0 @@
|
||||
// Copyright (c) Vitaliy Filippov, 2019+
|
||||
// License: VNPL-1.1 (see README.md for details)
|
||||
|
||||
#include <iostream>
|
||||
#include <functional>
|
||||
#include <array>
|
||||
#include <cstdlib> // for malloc() and free()
|
||||
using namespace std;
|
||||
|
||||
// replace operator new and delete to log allocations
|
||||
void* operator new(std::size_t n)
|
||||
{
|
||||
cout << "Allocating " << n << " bytes" << endl;
|
||||
return malloc(n);
|
||||
}
|
||||
|
||||
void operator delete(void* p) throw()
|
||||
{
|
||||
free(p);
|
||||
}
|
||||
|
||||
class test
|
||||
{
|
||||
public:
|
||||
std::string s;
|
||||
void a(std::function<void()> & f, const char *str)
|
||||
{
|
||||
auto l = [this, str]() { cout << str << " ? " << s << " from this\n"; };
|
||||
cout << "Assigning lambda3 of size " << sizeof(l) << endl;
|
||||
f = l;
|
||||
}
|
||||
};
|
||||
|
||||
int main()
|
||||
{
|
||||
std::array<char, 16> arr1;
|
||||
auto lambda1 = [arr1](){};
|
||||
cout << "Assigning lambda1 of size " << sizeof(lambda1) << endl;
|
||||
std::function<void()> f1 = lambda1;
|
||||
|
||||
std::array<char, 17> arr2;
|
||||
auto lambda2 = [arr2](){};
|
||||
cout << "Assigning lambda2 of size " << sizeof(lambda2) << endl;
|
||||
std::function<void()> f2 = lambda2;
|
||||
|
||||
test t;
|
||||
std::function<void()> f3;
|
||||
t.s = "str";
|
||||
t.a(f3, "huyambda");
|
||||
f3();
|
||||
}
|
110
mon/PGUtil.js
110
mon/PGUtil.js
@@ -5,18 +5,55 @@ module.exports = {
|
||||
scale_pg_count,
|
||||
};
|
||||
|
||||
function add_pg_history(new_pg_history, new_pg, prev_pgs, prev_pg_history, old_pg)
|
||||
{
|
||||
if (!new_pg_history[new_pg])
|
||||
{
|
||||
new_pg_history[new_pg] = {
|
||||
osd_sets: {},
|
||||
all_peers: {},
|
||||
epoch: 0,
|
||||
};
|
||||
}
|
||||
const nh = new_pg_history[new_pg], oh = prev_pg_history[old_pg];
|
||||
nh.osd_sets[prev_pgs[old_pg].join(' ')] = prev_pgs[old_pg];
|
||||
if (oh && oh.osd_sets && oh.osd_sets.length)
|
||||
{
|
||||
for (const pg of oh.osd_sets)
|
||||
{
|
||||
nh.osd_sets[pg.join(' ')] = pg;
|
||||
}
|
||||
}
|
||||
if (oh && oh.all_peers && oh.all_peers.length)
|
||||
{
|
||||
for (const osd_num of oh.all_peers)
|
||||
{
|
||||
nh.all_peers[osd_num] = Number(osd_num);
|
||||
}
|
||||
}
|
||||
if (oh && oh.epoch)
|
||||
{
|
||||
nh.epoch = nh.epoch < oh.epoch ? oh.epoch : nh.epoch;
|
||||
}
|
||||
}
|
||||
|
||||
function finish_pg_history(merged_history)
|
||||
{
|
||||
merged_history.osd_sets = Object.values(merged_history.osd_sets);
|
||||
merged_history.all_peers = Object.values(merged_history.all_peers);
|
||||
}
|
||||
|
||||
function scale_pg_count(prev_pgs, prev_pg_history, new_pg_history, new_pg_count)
|
||||
{
|
||||
const old_pg_count = prev_pgs.length;
|
||||
// Add all possibly intersecting PGs to the history of new PGs
|
||||
if (!(new_pg_count % old_pg_count))
|
||||
{
|
||||
// New PG count is a multiple of the old PG count
|
||||
const mul = (new_pg_count / old_pg_count);
|
||||
// New PG count is a multiple of old PG count
|
||||
for (let i = 0; i < new_pg_count; i++)
|
||||
{
|
||||
const old_i = Math.floor(new_pg_count / mul);
|
||||
new_pg_history[i] = prev_pg_history[old_i] ? JSON.parse(JSON.stringify(prev_pg_history[old_i])) : undefined;
|
||||
add_pg_history(new_pg_history, i, prev_pgs, prev_pg_history, i % old_pg_count);
|
||||
finish_pg_history(new_pg_history[i]);
|
||||
}
|
||||
}
|
||||
else if (!(old_pg_count % new_pg_count))
|
||||
@@ -25,68 +62,26 @@ function scale_pg_count(prev_pgs, prev_pg_history, new_pg_history, new_pg_count)
|
||||
const mul = (old_pg_count / new_pg_count);
|
||||
for (let i = 0; i < new_pg_count; i++)
|
||||
{
|
||||
new_pg_history[i] = {
|
||||
osd_sets: [],
|
||||
all_peers: [],
|
||||
epoch: 0,
|
||||
};
|
||||
for (let j = 0; j < mul; j++)
|
||||
{
|
||||
new_pg_history[i].osd_sets.push(prev_pgs[i*mul]);
|
||||
const hist = prev_pg_history[1+i*mul+j];
|
||||
if (hist && hist.osd_sets && hist.osd_sets.length)
|
||||
{
|
||||
Array.prototype.push.apply(new_pg_history[i].osd_sets, hist.osd_sets);
|
||||
}
|
||||
if (hist && hist.all_peers && hist.all_peers.length)
|
||||
{
|
||||
Array.prototype.push.apply(new_pg_history[i].all_peers, hist.all_peers);
|
||||
}
|
||||
if (hist && hist.epoch)
|
||||
{
|
||||
new_pg_history[i].epoch = new_pg_history[i].epoch < hist.epoch ? hist.epoch : new_pg_history[i].epoch;
|
||||
}
|
||||
add_pg_history(new_pg_history, i, prev_pgs, prev_pg_history, i+j*new_pg_count);
|
||||
}
|
||||
finish_pg_history(new_pg_history[i]);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Any PG may intersect with any PG after non-multiple PG count change
|
||||
// So, merge ALL PGs history
|
||||
let all_sets = {};
|
||||
let all_peers = {};
|
||||
let max_epoch = 0;
|
||||
for (const pg of prev_pgs)
|
||||
let merged_history = {};
|
||||
for (let i = 0; i < old_pg_count; i++)
|
||||
{
|
||||
all_sets[pg.join(' ')] = pg;
|
||||
add_pg_history(merged_history, 1, prev_pgs, prev_pg_history, i);
|
||||
}
|
||||
for (const pg in prev_pg_history)
|
||||
{
|
||||
const hist = prev_pg_history[pg];
|
||||
if (hist && hist.osd_sets)
|
||||
{
|
||||
for (const pg of hist.osd_sets)
|
||||
{
|
||||
all_sets[pg.join(' ')] = pg;
|
||||
}
|
||||
}
|
||||
if (hist && hist.all_peers)
|
||||
{
|
||||
for (const osd_num of hist.all_peers)
|
||||
{
|
||||
all_peers[osd_num] = Number(osd_num);
|
||||
}
|
||||
}
|
||||
if (hist && hist.epoch)
|
||||
{
|
||||
max_epoch = max_epoch < hist.epoch ? hist.epoch : max_epoch;
|
||||
}
|
||||
}
|
||||
all_sets = Object.values(all_sets);
|
||||
all_peers = Object.values(all_peers);
|
||||
finish_pg_history(merged_history[1]);
|
||||
for (let i = 0; i < new_pg_count; i++)
|
||||
{
|
||||
new_pg_history[i] = { osd_sets: all_sets, all_peers, epoch: max_epoch };
|
||||
new_pg_history[i] = { ...merged_history[1] };
|
||||
}
|
||||
}
|
||||
// Mark history keys for removed PGs as removed
|
||||
@@ -94,19 +89,16 @@ function scale_pg_count(prev_pgs, prev_pg_history, new_pg_history, new_pg_count)
|
||||
{
|
||||
new_pg_history[i] = null;
|
||||
}
|
||||
// Just for the lp_solve optimizer - pick a "previous" PG for each "new" one
|
||||
if (old_pg_count < new_pg_count)
|
||||
{
|
||||
for (let i = new_pg_count-1; i >= 0; i--)
|
||||
for (let i = old_pg_count; i < new_pg_count; i++)
|
||||
{
|
||||
prev_pgs[i] = prev_pgs[Math.floor(i/new_pg_count*old_pg_count)];
|
||||
prev_pgs[i] = prev_pgs[i % old_pg_count];
|
||||
}
|
||||
}
|
||||
else if (old_pg_count > new_pg_count)
|
||||
{
|
||||
for (let i = 0; i < new_pg_count; i++)
|
||||
{
|
||||
prev_pgs[i] = prev_pgs[Math.round(i/new_pg_count*old_pg_count)];
|
||||
}
|
||||
prev_pgs.splice(new_pg_count, old_pg_count-new_pg_count);
|
||||
}
|
||||
}
|
||||
|
130
mon/mon.js
130
mon/mon.js
@@ -22,6 +22,7 @@ const etcd_allow = new RegExp('^'+[
|
||||
'pg/state/[1-9]\\d*/[1-9]\\d*',
|
||||
'pg/stats/[1-9]\\d*/[1-9]\\d*',
|
||||
'pg/history/[1-9]\\d*/[1-9]\\d*',
|
||||
'history/last_clean_pgs',
|
||||
'stats',
|
||||
].join('$|^')+'$');
|
||||
|
||||
@@ -34,7 +35,7 @@ const etcd_tree = {
|
||||
etcd_mon_retries: 5, // min: 0
|
||||
mon_change_timeout: 1000, // ms. min: 100
|
||||
mon_stats_timeout: 1000, // ms. min: 100
|
||||
osd_out_time: 1800, // seconds. min: 0
|
||||
osd_out_time: 600, // seconds. min: 0
|
||||
placement_levels: { datacenter: 1, rack: 2, host: 3, osd: 4, ... },
|
||||
// client and osd
|
||||
use_sync_send_recv: false,
|
||||
@@ -46,6 +47,8 @@ const etcd_tree = {
|
||||
client_dirty_limit: 33554432,
|
||||
peer_connect_interval: 5, // seconds. min: 1
|
||||
peer_connect_timeout: 5, // seconds. min: 1
|
||||
osd_idle_timeout: 5, // seconds. min: 1
|
||||
osd_ping_timeout: 5, // seconds. min: 1
|
||||
up_wait_retry_interval: 500, // ms. min: 50
|
||||
// osd
|
||||
etcd_report_interval: 30, // min: 10
|
||||
@@ -56,7 +59,10 @@ const etcd_tree = {
|
||||
client_queue_depth: 128, // unused
|
||||
recovery_queue_depth: 4,
|
||||
readonly: false,
|
||||
no_recovery: false,
|
||||
no_rebalance: false,
|
||||
print_stats_interval: 3,
|
||||
slow_log_interval: 10,
|
||||
// blockstore - fixed in superblock
|
||||
block_size,
|
||||
disk_alignment,
|
||||
@@ -76,6 +82,7 @@ const etcd_tree = {
|
||||
disable_meta_fsync,
|
||||
disable_device_lock,
|
||||
// blockstore - configurable
|
||||
max_write_iodepth,
|
||||
flusher_count,
|
||||
inmemory_metadata,
|
||||
inmemory_journal,
|
||||
@@ -213,6 +220,9 @@ const etcd_tree = {
|
||||
incomplete: uint64_t,
|
||||
}, */
|
||||
},
|
||||
history: {
|
||||
last_clean_pgs: {},
|
||||
},
|
||||
};
|
||||
|
||||
// FIXME Split into several files
|
||||
@@ -291,7 +301,7 @@ class Mon
|
||||
this.config.osd_out_time = Number(this.config.osd_out_time) || 0;
|
||||
if (!this.config.osd_out_time)
|
||||
{
|
||||
this.config.osd_out_time = 30*60; // 30 minutes by default
|
||||
this.config.osd_out_time = 600; // 10 minutes by default
|
||||
}
|
||||
}
|
||||
|
||||
@@ -313,8 +323,14 @@ class Mon
|
||||
ok(false);
|
||||
}, this.config.etcd_mon_timeout);
|
||||
this.ws = new WebSocket(base+'/watch');
|
||||
const fail = () =>
|
||||
{
|
||||
ok(false);
|
||||
};
|
||||
this.ws.on('error', fail);
|
||||
this.ws.on('open', () =>
|
||||
{
|
||||
this.ws.removeListener('error', fail);
|
||||
if (timer_id)
|
||||
clearTimeout(timer_id);
|
||||
ok(true);
|
||||
@@ -359,7 +375,7 @@ class Mon
|
||||
}
|
||||
else
|
||||
{
|
||||
let stats_changed = false, changed = false;
|
||||
let stats_changed = false, changed = false, pg_states_changed = false;
|
||||
if (this.verbose)
|
||||
{
|
||||
console.log('Revision '+data.result.header.revision+' events: ');
|
||||
@@ -373,15 +389,23 @@ class Mon
|
||||
{
|
||||
stats_changed = true;
|
||||
}
|
||||
else if (key.substr(0, 10) == '/pg/state/')
|
||||
{
|
||||
pg_states_changed = true;
|
||||
}
|
||||
else if (key != '/stats')
|
||||
{
|
||||
changed = true;
|
||||
}
|
||||
if (this.verbose)
|
||||
{
|
||||
console.log(e);
|
||||
console.log(JSON.stringify(e));
|
||||
}
|
||||
}
|
||||
if (pg_states_changed)
|
||||
{
|
||||
this.save_last_clean().catch(console.error);
|
||||
}
|
||||
if (stats_changed)
|
||||
{
|
||||
this.schedule_update_stats();
|
||||
@@ -394,10 +418,46 @@ class Mon
|
||||
});
|
||||
}
|
||||
|
||||
async save_last_clean()
|
||||
{
|
||||
// last_clean_pgs is used to avoid extra data move when observing a series of changes in the cluster
|
||||
for (const pool_id in this.state.config.pools)
|
||||
{
|
||||
const pool_cfg = this.state.config.pools[pool_id];
|
||||
if (!this.validate_pool_cfg(pool_id, pool_cfg, false))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
for (let pg_num = 1; pg_num <= pool_cfg.pg_count; pg_num++)
|
||||
{
|
||||
if (!this.state.pg.state[pool_id] ||
|
||||
!this.state.pg.state[pool_id][pg_num] ||
|
||||
!(this.state.pg.state[pool_id][pg_num].state instanceof Array))
|
||||
{
|
||||
// Unclean
|
||||
return;
|
||||
}
|
||||
let st = this.state.pg.state[pool_id][pg_num].state.join(',');
|
||||
if (st != 'active' && st != 'active,left_on_dead' && st != 'left_on_dead,active')
|
||||
{
|
||||
// Unclean
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
this.state.history.last_clean_pgs = JSON.parse(JSON.stringify(this.state.config.pgs));
|
||||
await this.etcd_call('/kv/txn', {
|
||||
success: [ { requestPut: {
|
||||
key: b64(this.etcd_prefix+'/history/last_clean_pgs'),
|
||||
value: b64(JSON.stringify(this.state.history.last_clean_pgs))
|
||||
} } ],
|
||||
}, this.etcd_start_timeout, 0);
|
||||
}
|
||||
|
||||
async get_lease()
|
||||
{
|
||||
const max_ttl = this.config.etcd_mon_ttl + this.config.etcd_mon_timeout/1000*this.config.etcd_mon_retries;
|
||||
const res = await this.etcd_call('/lease/grant', { TTL: max_ttl }, this.config.etcd_mon_timeout, this.config.etcd_mon_retries);
|
||||
const res = await this.etcd_call('/lease/grant', { TTL: max_ttl }, this.config.etcd_mon_timeout, -1);
|
||||
this.etcd_lease_id = res.ID;
|
||||
setInterval(async () =>
|
||||
{
|
||||
@@ -791,12 +851,25 @@ class Mon
|
||||
pool_tree = pool_tree ? pool_tree.children : [];
|
||||
pool_tree = LPOptimizer.flatten_tree(pool_tree, levels, pool_cfg.failure_domain, 'osd');
|
||||
this.filter_osds_by_tags(osd_tree, pool_tree, pool_cfg.osd_tags);
|
||||
const prev_pgs = [];
|
||||
for (const pg in ((this.state.config.pgs.items||{})[pool_id]||{})||{})
|
||||
// These are for the purpose of building history.osd_sets
|
||||
const real_prev_pgs = [];
|
||||
let pg_history = [];
|
||||
for (const pg in ((this.state.config.pgs.items||{})[pool_id]||{}))
|
||||
{
|
||||
prev_pgs[pg-1] = this.state.config.pgs.items[pool_id][pg].osd_set;
|
||||
real_prev_pgs[pg-1] = this.state.config.pgs.items[pool_id][pg].osd_set;
|
||||
if (this.state.pg.history[pool_id] &&
|
||||
this.state.pg.history[pool_id][pg])
|
||||
{
|
||||
pg_history[pg-1] = this.state.pg.history[pool_id][pg];
|
||||
}
|
||||
}
|
||||
const pg_history = [];
|
||||
// And these are for the purpose of minimizing data movement
|
||||
let prev_pgs = [];
|
||||
for (const pg in ((this.state.history.last_clean_pgs.items||{})[pool_id]||{}))
|
||||
{
|
||||
prev_pgs[pg-1] = this.state.history.last_clean_pgs.items[pool_id][pg].osd_set;
|
||||
}
|
||||
prev_pgs = JSON.parse(JSON.stringify(prev_pgs.length ? prev_pgs : real_prev_pgs));
|
||||
const old_pg_count = prev_pgs.length;
|
||||
let optimize_result;
|
||||
if (old_pg_count > 0)
|
||||
@@ -809,7 +882,9 @@ class Mon
|
||||
this.schedule_recheck();
|
||||
return;
|
||||
}
|
||||
PGUtil.scale_pg_count(prev_pgs, this.state.pg.history[pool_id]||{}, pg_history, pool_cfg.pg_count);
|
||||
const new_pg_history = [];
|
||||
PGUtil.scale_pg_count(prev_pgs, pg_history, new_pg_history, pool_cfg.pg_count);
|
||||
pg_history = new_pg_history;
|
||||
}
|
||||
for (const pg of prev_pgs)
|
||||
{
|
||||
@@ -846,9 +921,14 @@ class Mon
|
||||
`PG count for pool ${pool_id} (${pool_cfg.name || 'unnamed'})`+
|
||||
` changed from: ${old_pg_count} to ${optimize_result.int_pgs.length}`
|
||||
);
|
||||
// Drop stats
|
||||
etcd_request.success.push({ requestDeleteRange: {
|
||||
key: b64(this.etcd_prefix+'/pg/stats/'+pool_id+'/'),
|
||||
range_end: b64(this.etcd_prefix+'/pg/stats/'+pool_id+'0'),
|
||||
} });
|
||||
}
|
||||
LPOptimizer.print_change_stats(optimize_result);
|
||||
this.save_new_pgs_txn(etcd_request, pool_id, up_osds, prev_pgs, optimize_result.int_pgs, pg_history);
|
||||
this.save_new_pgs_txn(etcd_request, pool_id, up_osds, real_prev_pgs, optimize_result.int_pgs, pg_history);
|
||||
}
|
||||
this.state.config.pgs.hash = tree_hash;
|
||||
await this.save_pg_config(etcd_request);
|
||||
@@ -974,21 +1054,21 @@ class Mon
|
||||
for (const op in st.op_stats||{})
|
||||
{
|
||||
op_stats[op] = op_stats[op] || { count: 0n, usec: 0n, bytes: 0n };
|
||||
op_stats[op].count += BigInt(st.op_stats.count||0);
|
||||
op_stats[op].usec += BigInt(st.op_stats.usec||0);
|
||||
op_stats[op].bytes += BigInt(st.op_stats.bytes||0);
|
||||
op_stats[op].count += BigInt(st.op_stats[op].count||0);
|
||||
op_stats[op].usec += BigInt(st.op_stats[op].usec||0);
|
||||
op_stats[op].bytes += BigInt(st.op_stats[op].bytes||0);
|
||||
}
|
||||
for (const op in st.subop_stats||{})
|
||||
{
|
||||
subop_stats[op] = subop_stats[op] || { count: 0n, usec: 0n };
|
||||
subop_stats[op].count += BigInt(st.subop_stats.count||0);
|
||||
subop_stats[op].usec += BigInt(st.subop_stats.usec||0);
|
||||
subop_stats[op].count += BigInt(st.subop_stats[op].count||0);
|
||||
subop_stats[op].usec += BigInt(st.subop_stats[op].usec||0);
|
||||
}
|
||||
for (const op in st.recovery_stats||{})
|
||||
{
|
||||
recovery_stats[op] = recovery_stats[op] || { count: 0n, bytes: 0n };
|
||||
recovery_stats[op].count += BigInt(st.recovery_stats.count||0);
|
||||
recovery_stats[op].bytes += BigInt(st.recovery_stats.bytes||0);
|
||||
recovery_stats[op].count += BigInt(st.recovery_stats[op].count||0);
|
||||
recovery_stats[op].bytes += BigInt(st.recovery_stats[op].bytes||0);
|
||||
}
|
||||
}
|
||||
for (const op in op_stats)
|
||||
@@ -1043,11 +1123,14 @@ class Mon
|
||||
for (const pg_num in this.state.pg.stats[pool_id])
|
||||
{
|
||||
const st = this.state.pg.stats[pool_id][pg_num];
|
||||
for (const k in object_counts)
|
||||
if (st)
|
||||
{
|
||||
if (st[k+'_count'])
|
||||
for (const k in object_counts)
|
||||
{
|
||||
object_counts[k] += BigInt(st[k+'_count']);
|
||||
if (st[k+'_count'])
|
||||
{
|
||||
object_counts[k] += BigInt(st[k+'_count']);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1183,6 +1266,11 @@ class Mon
|
||||
console.error('etcd returned error: '+res.json.error);
|
||||
break;
|
||||
}
|
||||
if (this.etcd_urls.length > 1)
|
||||
{
|
||||
// Stick to the same etcd for the rest of calls
|
||||
this.etcd_urls = [ base ];
|
||||
}
|
||||
return res.json;
|
||||
}
|
||||
retry++;
|
||||
|
@@ -48,4 +48,4 @@ FIO=`rpm -qi fio | perl -e 'while(<>) { /^Epoch[\s:]+(\S+)/ && print "$1:"; /^Ve
|
||||
QEMU=`rpm -qi qemu qemu-kvm | perl -e 'while(<>) { /^Epoch[\s:]+(\S+)/ && print "$1:"; /^Version[\s:]+(\S+)/ && print $1; /^Release[\s:]+(\S+)/ && print "-$1"; }'`
|
||||
perl -i -pe 's/(Requires:\s*fio)([^\n]+)?/$1 = '$FIO'/' $VITASTOR/rpm/vitastor-el$EL.spec
|
||||
perl -i -pe 's/(Requires:\s*qemu(?:-kvm)?)([^\n]+)?/$1 = '$QEMU'/' $VITASTOR/rpm/vitastor-el$EL.spec
|
||||
tar --transform 's#^#vitastor-0.5.5/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.5.5$(rpm --eval '%dist').tar.gz *
|
||||
tar --transform 's#^#vitastor-0.5.9/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.5.9$(rpm --eval '%dist').tar.gz *
|
||||
|
@@ -37,7 +37,7 @@ ADD . /root/vitastor
|
||||
RUN set -e; \
|
||||
cd /root/vitastor/rpm; \
|
||||
sh build-tarball.sh; \
|
||||
cp /root/vitastor-0.5.5.el7.tar.gz ~/rpmbuild/SOURCES; \
|
||||
cp /root/vitastor-0.5.9.el7.tar.gz ~/rpmbuild/SOURCES; \
|
||||
cp vitastor-el7.spec ~/rpmbuild/SPECS/vitastor.spec; \
|
||||
cd ~/rpmbuild/SPECS/; \
|
||||
rpmbuild -ba vitastor.spec; \
|
||||
|
@@ -1,11 +1,11 @@
|
||||
Name: vitastor
|
||||
Version: 0.5.5
|
||||
Version: 0.5.9
|
||||
Release: 1%{?dist}
|
||||
Summary: Vitastor, a fast software-defined clustered block storage
|
||||
|
||||
License: Vitastor Network Public License 1.1
|
||||
URL: https://vitastor.io/
|
||||
Source0: vitastor-0.5.5.el7.tar.gz
|
||||
Source0: vitastor-0.5.9.el7.tar.gz
|
||||
|
||||
BuildRequires: liburing-devel >= 0.6
|
||||
BuildRequires: gperftools-devel
|
||||
|
@@ -35,7 +35,7 @@ ADD . /root/vitastor
|
||||
RUN set -e; \
|
||||
cd /root/vitastor/rpm; \
|
||||
sh build-tarball.sh; \
|
||||
cp /root/vitastor-0.5.5.el8.tar.gz ~/rpmbuild/SOURCES; \
|
||||
cp /root/vitastor-0.5.9.el8.tar.gz ~/rpmbuild/SOURCES; \
|
||||
cp vitastor-el8.spec ~/rpmbuild/SPECS/vitastor.spec; \
|
||||
cd ~/rpmbuild/SPECS/; \
|
||||
rpmbuild -ba vitastor.spec; \
|
||||
|
@@ -1,11 +1,11 @@
|
||||
Name: vitastor
|
||||
Version: 0.5.5
|
||||
Version: 0.5.9
|
||||
Release: 1%{?dist}
|
||||
Summary: Vitastor, a fast software-defined clustered block storage
|
||||
|
||||
License: Vitastor Network Public License 1.1
|
||||
URL: https://vitastor.io/
|
||||
Source0: vitastor-0.5.5.el8.tar.gz
|
||||
Source0: vitastor-0.5.9.el8.tar.gz
|
||||
|
||||
BuildRequires: liburing-devel >= 0.6
|
||||
BuildRequires: gperftools-devel
|
||||
|
@@ -35,12 +35,7 @@ bool blockstore_t::is_safe_to_stop()
|
||||
|
||||
void blockstore_t::enqueue_op(blockstore_op_t *op)
|
||||
{
|
||||
impl->enqueue_op(op, false);
|
||||
}
|
||||
|
||||
void blockstore_t::enqueue_op_first(blockstore_op_t *op)
|
||||
{
|
||||
impl->enqueue_op(op, true);
|
||||
impl->enqueue_op(op);
|
||||
}
|
||||
|
||||
std::unordered_map<object_id, uint64_t> & blockstore_t::get_unstable_writes()
|
||||
|
@@ -175,10 +175,6 @@ public:
|
||||
// Submission
|
||||
void enqueue_op(blockstore_op_t *op);
|
||||
|
||||
// Insert operation into the beginning of the queue
|
||||
// Intended for the OSD syncer "thread" to be able to stabilize something when the journal is full
|
||||
void enqueue_op_first(blockstore_op_t *op);
|
||||
|
||||
// Unstable writes are added here (map of object_id -> version)
|
||||
std::unordered_map<object_id, uint64_t> & get_unstable_writes();
|
||||
|
||||
|
@@ -101,26 +101,14 @@ void blockstore_impl_t::loop()
|
||||
{
|
||||
// try to submit ops
|
||||
unsigned initial_ring_space = ringloop->space_left();
|
||||
// FIXME: rework this "sync polling"
|
||||
auto cur_sync = in_progress_syncs.begin();
|
||||
while (cur_sync != in_progress_syncs.end())
|
||||
// has_writes == 0 - no writes before the current queue item
|
||||
// has_writes == 1 - some writes in progress
|
||||
// has_writes == 2 - tried to submit some writes, but failed
|
||||
int has_writes = 0, op_idx = 0, new_idx = 0;
|
||||
for (; op_idx < submit_queue.size(); op_idx++)
|
||||
{
|
||||
if (continue_sync(*cur_sync) != 2)
|
||||
{
|
||||
// List is unmodified
|
||||
cur_sync++;
|
||||
}
|
||||
else
|
||||
{
|
||||
cur_sync = in_progress_syncs.begin();
|
||||
}
|
||||
}
|
||||
auto cur = submit_queue.begin();
|
||||
int has_writes = 0;
|
||||
while (cur != submit_queue.end())
|
||||
{
|
||||
auto op_ptr = cur;
|
||||
auto op = *(cur++);
|
||||
auto op = submit_queue[op_idx];
|
||||
submit_queue[new_idx++] = op;
|
||||
// FIXME: This needs some simplification
|
||||
// Writes should not block reads if the ring is not full and reads don't depend on them
|
||||
// In all other cases we should stop submission
|
||||
@@ -142,10 +130,13 @@ void blockstore_impl_t::loop()
|
||||
}
|
||||
unsigned ring_space = ringloop->space_left();
|
||||
unsigned prev_sqe_pos = ringloop->save();
|
||||
bool dequeue_op = false;
|
||||
// 0 = can't submit
|
||||
// 1 = in progress
|
||||
// 2 = can be removed from queue
|
||||
int wr_st = 0;
|
||||
if (op->opcode == BS_OP_READ)
|
||||
{
|
||||
dequeue_op = dequeue_read(op);
|
||||
wr_st = dequeue_read(op);
|
||||
}
|
||||
else if (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE)
|
||||
{
|
||||
@@ -154,8 +145,8 @@ void blockstore_impl_t::loop()
|
||||
// Some writes already could not be submitted
|
||||
continue;
|
||||
}
|
||||
dequeue_op = dequeue_write(op);
|
||||
has_writes = dequeue_op ? 1 : 2;
|
||||
wr_st = dequeue_write(op);
|
||||
has_writes = wr_st > 0 ? 1 : 2;
|
||||
}
|
||||
else if (op->opcode == BS_OP_DELETE)
|
||||
{
|
||||
@@ -164,8 +155,8 @@ void blockstore_impl_t::loop()
|
||||
// Some writes already could not be submitted
|
||||
continue;
|
||||
}
|
||||
dequeue_op = dequeue_del(op);
|
||||
has_writes = dequeue_op ? 1 : 2;
|
||||
wr_st = dequeue_del(op);
|
||||
has_writes = wr_st > 0 ? 1 : 2;
|
||||
}
|
||||
else if (op->opcode == BS_OP_SYNC)
|
||||
{
|
||||
@@ -178,29 +169,31 @@ void blockstore_impl_t::loop()
|
||||
// Can't submit SYNC before previous writes
|
||||
continue;
|
||||
}
|
||||
dequeue_op = dequeue_sync(op);
|
||||
wr_st = continue_sync(op, false);
|
||||
if (wr_st != 2)
|
||||
{
|
||||
has_writes = wr_st > 0 ? 1 : 2;
|
||||
}
|
||||
}
|
||||
else if (op->opcode == BS_OP_STABLE)
|
||||
{
|
||||
dequeue_op = dequeue_stable(op);
|
||||
wr_st = dequeue_stable(op);
|
||||
}
|
||||
else if (op->opcode == BS_OP_ROLLBACK)
|
||||
{
|
||||
dequeue_op = dequeue_rollback(op);
|
||||
wr_st = dequeue_rollback(op);
|
||||
}
|
||||
else if (op->opcode == BS_OP_LIST)
|
||||
{
|
||||
// LIST doesn't need to be blocked by previous modifications,
|
||||
// it only needs to include all in-progress writes as they're guaranteed
|
||||
// to be readable and stabilizable/rollbackable by subsequent operations
|
||||
// LIST doesn't need to be blocked by previous modifications
|
||||
process_list(op);
|
||||
dequeue_op = true;
|
||||
wr_st = 2;
|
||||
}
|
||||
if (dequeue_op)
|
||||
if (wr_st == 2)
|
||||
{
|
||||
submit_queue.erase(op_ptr);
|
||||
new_idx--;
|
||||
}
|
||||
else
|
||||
if (wr_st == 0)
|
||||
{
|
||||
ringloop->restore(prev_sqe_pos);
|
||||
if (PRIV(op)->wait_for == WAIT_SQE)
|
||||
@@ -211,6 +204,14 @@ void blockstore_impl_t::loop()
|
||||
}
|
||||
}
|
||||
}
|
||||
if (op_idx != new_idx)
|
||||
{
|
||||
while (op_idx < submit_queue.size())
|
||||
{
|
||||
submit_queue[new_idx++] = submit_queue[op_idx++];
|
||||
}
|
||||
submit_queue.resize(new_idx);
|
||||
}
|
||||
if (!readonly)
|
||||
{
|
||||
flusher->loop();
|
||||
@@ -233,7 +234,7 @@ bool blockstore_impl_t::is_safe_to_stop()
|
||||
{
|
||||
// It's safe to stop blockstore when there are no in-flight operations,
|
||||
// no in-progress syncs and flusher isn't doing anything
|
||||
if (submit_queue.size() > 0 || in_progress_syncs.size() > 0 || !readonly && flusher->is_active())
|
||||
if (submit_queue.size() > 0 || !readonly && flusher->is_active())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
@@ -315,7 +316,7 @@ void blockstore_impl_t::check_wait(blockstore_op_t *op)
|
||||
}
|
||||
}
|
||||
|
||||
void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first)
|
||||
void blockstore_impl_t::enqueue_op(blockstore_op_t *op)
|
||||
{
|
||||
if (op->opcode < BS_OP_MIN || op->opcode > BS_OP_MAX ||
|
||||
((op->opcode == BS_OP_READ || op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE) && (
|
||||
@@ -323,8 +324,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first)
|
||||
op->len > block_size-op->offset ||
|
||||
(op->len % disk_alignment)
|
||||
)) ||
|
||||
readonly && op->opcode != BS_OP_READ && op->opcode != BS_OP_LIST ||
|
||||
first && (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE))
|
||||
readonly && op->opcode != BS_OP_READ && op->opcode != BS_OP_LIST)
|
||||
{
|
||||
// Basic verification not passed
|
||||
op->retval = -EINVAL;
|
||||
@@ -374,25 +374,12 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first)
|
||||
std::function<void (blockstore_op_t*)>(op->callback)(op);
|
||||
return;
|
||||
}
|
||||
if (op->opcode == BS_OP_SYNC && immediate_commit == IMMEDIATE_ALL)
|
||||
{
|
||||
op->retval = 0;
|
||||
std::function<void (blockstore_op_t*)>(op->callback)(op);
|
||||
return;
|
||||
}
|
||||
// Call constructor without allocating memory. We'll call destructor before returning op back
|
||||
new ((void*)op->private_data) blockstore_op_private_t;
|
||||
PRIV(op)->wait_for = 0;
|
||||
PRIV(op)->op_state = 0;
|
||||
PRIV(op)->pending_ops = 0;
|
||||
if (!first)
|
||||
{
|
||||
submit_queue.push_back(op);
|
||||
}
|
||||
else
|
||||
{
|
||||
submit_queue.push_front(op);
|
||||
}
|
||||
submit_queue.push_back(op);
|
||||
ringloop->wakeup();
|
||||
}
|
||||
|
||||
|
@@ -160,8 +160,6 @@ struct blockstore_op_private_t
|
||||
// Sync
|
||||
std::vector<obj_ver_id> sync_big_writes, sync_small_writes;
|
||||
int sync_small_checked, sync_big_checked;
|
||||
std::list<blockstore_op_t*>::iterator in_progress_ptr;
|
||||
int prev_sync_count;
|
||||
};
|
||||
|
||||
// https://github.com/algorithm-ninja/cpp-btree
|
||||
@@ -210,9 +208,8 @@ class blockstore_impl_t
|
||||
blockstore_clean_db_t clean_db;
|
||||
uint8_t *clean_bitmap = NULL;
|
||||
blockstore_dirty_db_t dirty_db;
|
||||
std::list<blockstore_op_t*> submit_queue; // FIXME: funny thing is that vector is better here
|
||||
std::vector<blockstore_op_t*> submit_queue;
|
||||
std::vector<obj_ver_id> unsynced_big_writes, unsynced_small_writes;
|
||||
std::list<blockstore_op_t*> in_progress_syncs; // ...and probably here, too
|
||||
allocator *data_alloc = NULL;
|
||||
uint8_t *zero_object;
|
||||
|
||||
@@ -271,6 +268,7 @@ class blockstore_impl_t
|
||||
|
||||
// Write
|
||||
bool enqueue_write(blockstore_op_t *op);
|
||||
void cancel_all_writes(blockstore_op_t *op, blockstore_dirty_db_t::iterator dirty_it, int retval);
|
||||
int dequeue_write(blockstore_op_t *op);
|
||||
int dequeue_del(blockstore_op_t *op);
|
||||
int continue_write(blockstore_op_t *op);
|
||||
@@ -278,11 +276,9 @@ class blockstore_impl_t
|
||||
void handle_write_event(ring_data_t *data, blockstore_op_t *op);
|
||||
|
||||
// Sync
|
||||
int dequeue_sync(blockstore_op_t *op);
|
||||
int continue_sync(blockstore_op_t *op, bool queue_has_in_progress_sync);
|
||||
void handle_sync_event(ring_data_t *data, blockstore_op_t *op);
|
||||
int continue_sync(blockstore_op_t *op);
|
||||
void ack_one_sync(blockstore_op_t *op);
|
||||
int ack_sync(blockstore_op_t *op);
|
||||
void ack_sync(blockstore_op_t *op);
|
||||
|
||||
// Stabilize
|
||||
int dequeue_stable(blockstore_op_t *op);
|
||||
@@ -322,7 +318,7 @@ public:
|
||||
bool is_stalled();
|
||||
|
||||
// Submission
|
||||
void enqueue_op(blockstore_op_t *op, bool first = false);
|
||||
void enqueue_op(blockstore_op_t *op);
|
||||
|
||||
// Unstable writes are added here (map of object_id -> version)
|
||||
std::unordered_map<object_id, uint64_t> unstable_writes;
|
||||
|
@@ -112,7 +112,7 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
|
||||
read_op->version = 0;
|
||||
read_op->retval = read_op->len;
|
||||
FINISH_OP(read_op);
|
||||
return 1;
|
||||
return 2;
|
||||
}
|
||||
uint64_t fulfilled = 0;
|
||||
PRIV(read_op)->pending_ops = 0;
|
||||
@@ -232,10 +232,10 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
|
||||
}
|
||||
read_op->retval = read_op->len;
|
||||
FINISH_OP(read_op);
|
||||
return 1;
|
||||
return 2;
|
||||
}
|
||||
read_op->retval = 0;
|
||||
return 1;
|
||||
return 2;
|
||||
}
|
||||
|
||||
void blockstore_impl_t::handle_read_event(ring_data_t *data, blockstore_op_t *op)
|
||||
|
@@ -50,7 +50,7 @@ skip_ov:
|
||||
{
|
||||
op->retval = -EBUSY;
|
||||
FINISH_OP(op);
|
||||
return 1;
|
||||
return 2;
|
||||
}
|
||||
if (dirty_it == dirty_db.begin())
|
||||
{
|
||||
@@ -66,7 +66,7 @@ skip_ov:
|
||||
// Already rolled back
|
||||
op->retval = 0;
|
||||
FINISH_OP(op);
|
||||
return 1;
|
||||
return 2;
|
||||
}
|
||||
// Check journal space
|
||||
blockstore_journal_check_t space_check(this);
|
||||
@@ -151,7 +151,7 @@ resume_5:
|
||||
// Acknowledge op
|
||||
op->retval = 0;
|
||||
FINISH_OP(op);
|
||||
return 1;
|
||||
return 2;
|
||||
}
|
||||
|
||||
void blockstore_impl_t::mark_rolled_back(const obj_ver_id & ov)
|
||||
@@ -216,10 +216,7 @@ void blockstore_impl_t::handle_rollback_event(ring_data_t *data, blockstore_op_t
|
||||
if (PRIV(op)->pending_ops == 0)
|
||||
{
|
||||
PRIV(op)->op_state++;
|
||||
if (!continue_rollback(op))
|
||||
{
|
||||
submit_queue.push_front(op);
|
||||
}
|
||||
ringloop->wakeup();
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -60,7 +60,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
|
||||
// No such object version
|
||||
op->retval = -ENOENT;
|
||||
FINISH_OP(op);
|
||||
return 1;
|
||||
return 2;
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -77,7 +77,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
|
||||
// Object not synced yet. Caller must sync it first
|
||||
op->retval = -EBUSY;
|
||||
FINISH_OP(op);
|
||||
return 1;
|
||||
return 2;
|
||||
}
|
||||
else if (!IS_STABLE(dirty_it->second.state))
|
||||
{
|
||||
@@ -89,7 +89,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
|
||||
// Already stable
|
||||
op->retval = 0;
|
||||
FINISH_OP(op);
|
||||
return 1;
|
||||
return 2;
|
||||
}
|
||||
// Check journal space
|
||||
blockstore_journal_check_t space_check(this);
|
||||
@@ -176,7 +176,7 @@ resume_5:
|
||||
// Acknowledge op
|
||||
op->retval = 0;
|
||||
FINISH_OP(op);
|
||||
return 1;
|
||||
return 2;
|
||||
}
|
||||
|
||||
void blockstore_impl_t::mark_stable(const obj_ver_id & v)
|
||||
@@ -228,9 +228,6 @@ void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t *
|
||||
if (PRIV(op)->pending_ops == 0)
|
||||
{
|
||||
PRIV(op)->op_state++;
|
||||
if (!continue_stable(op))
|
||||
{
|
||||
submit_queue.push_front(op);
|
||||
}
|
||||
ringloop->wakeup();
|
||||
}
|
||||
}
|
||||
|
@@ -12,8 +12,15 @@
|
||||
#define SYNC_JOURNAL_SYNC_SENT 7
|
||||
#define SYNC_DONE 8
|
||||
|
||||
int blockstore_impl_t::dequeue_sync(blockstore_op_t *op)
|
||||
int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_progress_sync)
|
||||
{
|
||||
if (immediate_commit == IMMEDIATE_ALL)
|
||||
{
|
||||
// We can return immediately because sync is only dequeued after all previous writes
|
||||
op->retval = 0;
|
||||
FINISH_OP(op);
|
||||
return 2;
|
||||
}
|
||||
if (PRIV(op)->op_state == 0)
|
||||
{
|
||||
stop_sync_submitted = false;
|
||||
@@ -29,34 +36,15 @@ int blockstore_impl_t::dequeue_sync(blockstore_op_t *op)
|
||||
PRIV(op)->op_state = SYNC_HAS_SMALL;
|
||||
else
|
||||
PRIV(op)->op_state = SYNC_DONE;
|
||||
// Always add sync to in_progress_syncs because we clear unsynced_big_writes and unsynced_small_writes
|
||||
PRIV(op)->prev_sync_count = in_progress_syncs.size();
|
||||
PRIV(op)->in_progress_ptr = in_progress_syncs.insert(in_progress_syncs.end(), op);
|
||||
}
|
||||
continue_sync(op);
|
||||
// Always dequeue because we always add syncs to in_progress_syncs
|
||||
return 1;
|
||||
}
|
||||
|
||||
int blockstore_impl_t::continue_sync(blockstore_op_t *op)
|
||||
{
|
||||
auto cb = [this, op](ring_data_t *data) { handle_sync_event(data, op); };
|
||||
if (PRIV(op)->op_state == SYNC_HAS_SMALL)
|
||||
{
|
||||
// No big writes, just fsync the journal
|
||||
for (; PRIV(op)->sync_small_checked < PRIV(op)->sync_small_writes.size(); PRIV(op)->sync_small_checked++)
|
||||
{
|
||||
if (IS_IN_FLIGHT(dirty_db[PRIV(op)->sync_small_writes[PRIV(op)->sync_small_checked]].state))
|
||||
{
|
||||
// Wait for small inflight writes to complete
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
if (journal.sector_info[journal.cur_sector].dirty)
|
||||
{
|
||||
// Write out the last journal sector if it happens to be dirty
|
||||
BS_SUBMIT_GET_ONLY_SQE(sqe);
|
||||
prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb);
|
||||
prepare_journal_sector_write(journal, journal.cur_sector, sqe, [this, op](ring_data_t *data) { handle_sync_event(data, op); });
|
||||
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
|
||||
PRIV(op)->pending_ops = 1;
|
||||
PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT;
|
||||
@@ -69,21 +57,13 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
|
||||
}
|
||||
if (PRIV(op)->op_state == SYNC_HAS_BIG)
|
||||
{
|
||||
for (; PRIV(op)->sync_big_checked < PRIV(op)->sync_big_writes.size(); PRIV(op)->sync_big_checked++)
|
||||
{
|
||||
if (IS_IN_FLIGHT(dirty_db[PRIV(op)->sync_big_writes[PRIV(op)->sync_big_checked]].state))
|
||||
{
|
||||
// Wait for big inflight writes to complete
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
// 1st step: fsync data
|
||||
if (!disable_data_fsync)
|
||||
{
|
||||
BS_SUBMIT_GET_SQE(sqe, data);
|
||||
my_uring_prep_fsync(sqe, data_fd, IORING_FSYNC_DATASYNC);
|
||||
data->iov = { 0 };
|
||||
data->callback = cb;
|
||||
data->callback = [this, op](ring_data_t *data) { handle_sync_event(data, op); };
|
||||
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
|
||||
PRIV(op)->pending_ops = 1;
|
||||
PRIV(op)->op_state = SYNC_DATA_SYNC_SENT;
|
||||
@@ -96,14 +76,6 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
|
||||
}
|
||||
if (PRIV(op)->op_state == SYNC_DATA_SYNC_DONE)
|
||||
{
|
||||
for (; PRIV(op)->sync_small_checked < PRIV(op)->sync_small_writes.size(); PRIV(op)->sync_small_checked++)
|
||||
{
|
||||
if (IS_IN_FLIGHT(dirty_db[PRIV(op)->sync_small_writes[PRIV(op)->sync_small_checked]].state))
|
||||
{
|
||||
// Wait for small inflight writes to complete
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
// 2nd step: Data device is synced, prepare & write journal entries
|
||||
// Check space in the journal and journal memory buffers
|
||||
blockstore_journal_check_t space_check(this);
|
||||
@@ -127,7 +99,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
|
||||
{
|
||||
if (cur_sector == -1)
|
||||
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
|
||||
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb);
|
||||
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], [this, op](ring_data_t *data) { handle_sync_event(data, op); });
|
||||
cur_sector = journal.cur_sector;
|
||||
}
|
||||
journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry(
|
||||
@@ -152,7 +124,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
|
||||
journal.crc32_last = je->crc32;
|
||||
it++;
|
||||
}
|
||||
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb);
|
||||
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], [this, op](ring_data_t *data) { handle_sync_event(data, op); });
|
||||
assert(s == space_check.sectors_to_write);
|
||||
if (cur_sector == -1)
|
||||
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
|
||||
@@ -168,7 +140,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
|
||||
BS_SUBMIT_GET_SQE(sqe, data);
|
||||
my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC);
|
||||
data->iov = { 0 };
|
||||
data->callback = cb;
|
||||
data->callback = [this, op](ring_data_t *data) { handle_sync_event(data, op); };
|
||||
PRIV(op)->pending_ops = 1;
|
||||
PRIV(op)->op_state = SYNC_JOURNAL_SYNC_SENT;
|
||||
return 1;
|
||||
@@ -178,9 +150,10 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
|
||||
PRIV(op)->op_state = SYNC_DONE;
|
||||
}
|
||||
}
|
||||
if (PRIV(op)->op_state == SYNC_DONE)
|
||||
if (PRIV(op)->op_state == SYNC_DONE && !queue_has_in_progress_sync)
|
||||
{
|
||||
return ack_sync(op);
|
||||
ack_sync(op);
|
||||
return 2;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
@@ -212,42 +185,16 @@ void blockstore_impl_t::handle_sync_event(ring_data_t *data, blockstore_op_t *op
|
||||
else if (PRIV(op)->op_state == SYNC_JOURNAL_SYNC_SENT)
|
||||
{
|
||||
PRIV(op)->op_state = SYNC_DONE;
|
||||
ack_sync(op);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw std::runtime_error("BUG: unexpected sync op state");
|
||||
}
|
||||
ringloop->wakeup();
|
||||
}
|
||||
}
|
||||
|
||||
int blockstore_impl_t::ack_sync(blockstore_op_t *op)
|
||||
{
|
||||
if (PRIV(op)->op_state == SYNC_DONE && PRIV(op)->prev_sync_count == 0)
|
||||
{
|
||||
// Remove dependency of subsequent syncs
|
||||
auto it = PRIV(op)->in_progress_ptr;
|
||||
int done_syncs = 1;
|
||||
++it;
|
||||
// Acknowledge sync
|
||||
ack_one_sync(op);
|
||||
while (it != in_progress_syncs.end())
|
||||
{
|
||||
auto & next_sync = *it++;
|
||||
PRIV(next_sync)->prev_sync_count -= done_syncs;
|
||||
if (PRIV(next_sync)->prev_sync_count == 0 && PRIV(next_sync)->op_state == SYNC_DONE)
|
||||
{
|
||||
done_syncs++;
|
||||
// Acknowledge next_sync
|
||||
ack_one_sync(next_sync);
|
||||
}
|
||||
}
|
||||
return 2;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void blockstore_impl_t::ack_one_sync(blockstore_op_t *op)
|
||||
void blockstore_impl_t::ack_sync(blockstore_op_t *op)
|
||||
{
|
||||
// Handle states
|
||||
for (auto it = PRIV(op)->sync_big_writes.begin(); it != PRIV(op)->sync_big_writes.end(); it++)
|
||||
@@ -295,7 +242,6 @@ void blockstore_impl_t::ack_one_sync(blockstore_op_t *op)
|
||||
}
|
||||
}
|
||||
}
|
||||
in_progress_syncs.erase(PRIV(op)->in_progress_ptr);
|
||||
op->retval = 0;
|
||||
FINISH_OP(op);
|
||||
}
|
||||
|
@@ -124,6 +124,29 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
|
||||
return true;
|
||||
}
|
||||
|
||||
void blockstore_impl_t::cancel_all_writes(blockstore_op_t *op, blockstore_dirty_db_t::iterator dirty_it, int retval)
|
||||
{
|
||||
while (dirty_it != dirty_db.end() && dirty_it->first.oid == op->oid)
|
||||
{
|
||||
dirty_db.erase(dirty_it++);
|
||||
}
|
||||
bool found = false;
|
||||
for (auto other_op: submit_queue)
|
||||
{
|
||||
if (!found && other_op == op)
|
||||
found = true;
|
||||
else if (found && other_op->oid == op->oid &&
|
||||
(other_op->opcode == BS_OP_WRITE || other_op->opcode == BS_OP_WRITE_STABLE))
|
||||
{
|
||||
// Mark operations to cancel them
|
||||
PRIV(other_op)->real_version = UINT64_MAX;
|
||||
other_op->retval = retval;
|
||||
}
|
||||
}
|
||||
op->retval = retval;
|
||||
FINISH_OP(op);
|
||||
}
|
||||
|
||||
// First step of the write algorithm: dequeue operation and submit initial write(s)
|
||||
int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
|
||||
{
|
||||
@@ -143,6 +166,12 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
|
||||
}
|
||||
if (PRIV(op)->real_version != 0)
|
||||
{
|
||||
if (PRIV(op)->real_version == UINT64_MAX)
|
||||
{
|
||||
// This is the flag value used to cancel operations
|
||||
FINISH_OP(op);
|
||||
return 2;
|
||||
}
|
||||
// Restore original low version number for unblocked operations
|
||||
#ifdef BLOCKSTORE_DEBUG
|
||||
printf("Restoring %lx:%lx version: v%lu -> v%lu\n", op->oid.inode, op->oid.stripe, op->version, PRIV(op)->real_version);
|
||||
@@ -152,11 +181,9 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
|
||||
if (prev_it->first.oid == op->oid && prev_it->first.version >= PRIV(op)->real_version)
|
||||
{
|
||||
// Original version is still invalid
|
||||
// FIXME Oops. Successive small writes will currently break in an unexpected way. Fix it
|
||||
dirty_db.erase(dirty_it);
|
||||
op->retval = -EEXIST;
|
||||
FINISH_OP(op);
|
||||
return 1;
|
||||
// All subsequent writes to the same object must be canceled too
|
||||
cancel_all_writes(op, dirty_it, -EEXIST);
|
||||
return 2;
|
||||
}
|
||||
op->version = PRIV(op)->real_version;
|
||||
PRIV(op)->real_version = 0;
|
||||
@@ -189,11 +216,8 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
|
||||
PRIV(op)->wait_for = WAIT_FREE;
|
||||
return 0;
|
||||
}
|
||||
// FIXME Oops. Successive small writes will currently break in an unexpected way. Fix it
|
||||
dirty_db.erase(dirty_it);
|
||||
op->retval = -ENOSPC;
|
||||
FINISH_OP(op);
|
||||
return 1;
|
||||
cancel_all_writes(op, dirty_it, -ENOSPC);
|
||||
return 2;
|
||||
}
|
||||
write_iodepth++;
|
||||
BS_SUBMIT_GET_SQE(sqe, data);
|
||||
@@ -346,7 +370,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
|
||||
if (!PRIV(op)->pending_ops)
|
||||
{
|
||||
PRIV(op)->op_state = 4;
|
||||
continue_write(op);
|
||||
return continue_write(op);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -360,17 +384,21 @@ int blockstore_impl_t::continue_write(blockstore_op_t *op)
|
||||
{
|
||||
io_uring_sqe *sqe = NULL;
|
||||
journal_entry_big_write *je;
|
||||
int op_state = PRIV(op)->op_state;
|
||||
if (op_state != 2 && op_state != 4)
|
||||
{
|
||||
// In progress
|
||||
return 1;
|
||||
}
|
||||
auto dirty_it = dirty_db.find((obj_ver_id){
|
||||
.oid = op->oid,
|
||||
.version = op->version,
|
||||
});
|
||||
assert(dirty_it != dirty_db.end());
|
||||
if (PRIV(op)->op_state == 2)
|
||||
if (op_state == 2)
|
||||
goto resume_2;
|
||||
else if (PRIV(op)->op_state == 4)
|
||||
else if (op_state == 4)
|
||||
goto resume_4;
|
||||
else
|
||||
return 1;
|
||||
resume_2:
|
||||
// Only for the immediate_commit mode: prepare and submit big_write journal entry
|
||||
sqe = get_sqe();
|
||||
@@ -440,7 +468,7 @@ resume_4:
|
||||
op->retval = op->len;
|
||||
write_iodepth--;
|
||||
FINISH_OP(op);
|
||||
return 1;
|
||||
return 2;
|
||||
}
|
||||
|
||||
void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *op)
|
||||
@@ -459,10 +487,7 @@ void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *o
|
||||
{
|
||||
release_journal_sectors(op);
|
||||
PRIV(op)->op_state++;
|
||||
if (!continue_write(op))
|
||||
{
|
||||
submit_queue.push_front(op);
|
||||
}
|
||||
ringloop->wakeup();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -500,6 +525,10 @@ void blockstore_impl_t::release_journal_sectors(blockstore_op_t *op)
|
||||
|
||||
int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
|
||||
{
|
||||
if (PRIV(op)->op_state)
|
||||
{
|
||||
return continue_write(op);
|
||||
}
|
||||
auto dirty_it = dirty_db.find((obj_ver_id){
|
||||
.oid = op->oid,
|
||||
.version = op->version,
|
||||
@@ -510,6 +539,7 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
write_iodepth++;
|
||||
io_uring_sqe *sqe = NULL;
|
||||
if (immediate_commit != IMMEDIATE_NONE ||
|
||||
(journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_del) &&
|
||||
@@ -568,7 +598,7 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
|
||||
if (!PRIV(op)->pending_ops)
|
||||
{
|
||||
PRIV(op)->op_state = 4;
|
||||
continue_write(op);
|
||||
return continue_write(op);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@@ -8,13 +8,11 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
|
||||
{
|
||||
this->ringloop = ringloop;
|
||||
this->tfd = tfd;
|
||||
|
||||
log_level = config["log_level"].int64_value();
|
||||
this->config = config;
|
||||
|
||||
msgr.osd_num = 0;
|
||||
msgr.tfd = tfd;
|
||||
msgr.ringloop = ringloop;
|
||||
msgr.log_level = log_level;
|
||||
msgr.repeer_pgs = [this](osd_num_t peer_osd)
|
||||
{
|
||||
if (msgr.osd_peer_fds.find(peer_osd) != msgr.osd_peer_fds.end())
|
||||
@@ -67,8 +65,7 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
|
||||
msgr.stop_client(op->peer_fd);
|
||||
delete op;
|
||||
};
|
||||
msgr.use_sync_send_recv = config["use_sync_send_recv"].bool_value() ||
|
||||
config["use_sync_send_recv"].uint64_value();
|
||||
msgr.init();
|
||||
|
||||
st_cli.tfd = tfd;
|
||||
st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); };
|
||||
@@ -185,16 +182,8 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config)
|
||||
{
|
||||
up_wait_retry_interval = 50;
|
||||
}
|
||||
msgr.peer_connect_interval = config["peer_connect_interval"].uint64_value();
|
||||
if (!msgr.peer_connect_interval)
|
||||
{
|
||||
msgr.peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL;
|
||||
}
|
||||
msgr.peer_connect_timeout = config["peer_connect_timeout"].uint64_value();
|
||||
if (!msgr.peer_connect_timeout)
|
||||
{
|
||||
msgr.peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
|
||||
}
|
||||
msgr.parse_config(config);
|
||||
msgr.parse_config(this->config);
|
||||
st_cli.load_pgs();
|
||||
}
|
||||
|
||||
|
@@ -82,6 +82,7 @@ class cluster_client_t
|
||||
public:
|
||||
etcd_state_client_t st_cli;
|
||||
osd_messenger_t msgr;
|
||||
json11::Json config;
|
||||
|
||||
cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config);
|
||||
~cluster_client_t();
|
||||
|
@@ -7,6 +7,16 @@
|
||||
#include "http_client.h"
|
||||
#include "base64.h"
|
||||
|
||||
etcd_state_client_t::~etcd_state_client_t()
|
||||
{
|
||||
etcd_watches_initialised = -1;
|
||||
if (etcd_watch_ws)
|
||||
{
|
||||
etcd_watch_ws->close();
|
||||
etcd_watch_ws = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
json_kv_t etcd_state_client_t::parse_etcd_kv(const json11::Json & kv_json)
|
||||
{
|
||||
json_kv_t kv;
|
||||
@@ -160,7 +170,7 @@ void etcd_state_client_t::start_etcd_watcher()
|
||||
start_etcd_watcher();
|
||||
});
|
||||
}
|
||||
else
|
||||
else if (etcd_watches_initialised > 0)
|
||||
{
|
||||
// Connection was live, retry immediately
|
||||
start_etcd_watcher();
|
||||
|
@@ -81,4 +81,5 @@ struct etcd_state_client_t
|
||||
void load_pgs();
|
||||
void parse_state(const std::string & key, const json11::Json & value);
|
||||
void parse_config(json11::Json & config);
|
||||
~etcd_state_client_t();
|
||||
};
|
||||
|
@@ -26,14 +26,106 @@ osd_op_t::~osd_op_t()
|
||||
}
|
||||
}
|
||||
|
||||
void osd_messenger_t::init()
|
||||
{
|
||||
keepalive_timer_id = tfd->set_timer(1000, true, [this](int)
|
||||
{
|
||||
for (auto cl_it = clients.begin(); cl_it != clients.end();)
|
||||
{
|
||||
auto cl = (cl_it++)->second;
|
||||
if (!cl->osd_num)
|
||||
{
|
||||
// Do not run keepalive on regular clients
|
||||
continue;
|
||||
}
|
||||
if (cl->ping_time_remaining > 0)
|
||||
{
|
||||
cl->ping_time_remaining--;
|
||||
if (!cl->ping_time_remaining)
|
||||
{
|
||||
// Ping timed out, stop the client
|
||||
stop_client(cl->peer_fd, true);
|
||||
}
|
||||
}
|
||||
else if (cl->idle_time_remaining > 0)
|
||||
{
|
||||
cl->idle_time_remaining--;
|
||||
if (!cl->idle_time_remaining)
|
||||
{
|
||||
// Connection is idle for <osd_idle_time>, send ping
|
||||
osd_op_t *op = new osd_op_t();
|
||||
op->op_type = OSD_OP_OUT;
|
||||
op->peer_fd = cl->peer_fd;
|
||||
op->req = (osd_any_op_t){
|
||||
.hdr = {
|
||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||
.id = this->next_subop_id++,
|
||||
.opcode = OSD_OP_PING,
|
||||
},
|
||||
};
|
||||
op->callback = [this, cl](osd_op_t *op)
|
||||
{
|
||||
int fail_fd = (op->reply.hdr.retval != 0 ? op->peer_fd : -1);
|
||||
cl->ping_time_remaining = 0;
|
||||
delete op;
|
||||
if (fail_fd >= 0)
|
||||
{
|
||||
stop_client(fail_fd, true);
|
||||
}
|
||||
};
|
||||
outbox_push(op);
|
||||
cl->ping_time_remaining = osd_ping_timeout;
|
||||
cl->idle_time_remaining = osd_idle_timeout;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
cl->idle_time_remaining = osd_idle_timeout;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
osd_messenger_t::~osd_messenger_t()
|
||||
{
|
||||
if (keepalive_timer_id >= 0)
|
||||
{
|
||||
tfd->clear_timer(keepalive_timer_id);
|
||||
keepalive_timer_id = -1;
|
||||
}
|
||||
while (clients.size() > 0)
|
||||
{
|
||||
stop_client(clients.begin()->first, true);
|
||||
}
|
||||
}
|
||||
|
||||
void osd_messenger_t::parse_config(const json11::Json & config)
|
||||
{
|
||||
this->use_sync_send_recv = config["use_sync_send_recv"].bool_value() ||
|
||||
config["use_sync_send_recv"].uint64_value();
|
||||
this->peer_connect_interval = config["peer_connect_interval"].uint64_value();
|
||||
if (!this->peer_connect_interval)
|
||||
{
|
||||
this->peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL;
|
||||
}
|
||||
this->peer_connect_timeout = config["peer_connect_timeout"].uint64_value();
|
||||
if (!this->peer_connect_timeout)
|
||||
{
|
||||
this->peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
|
||||
}
|
||||
this->osd_idle_timeout = config["osd_idle_timeout"].uint64_value();
|
||||
if (!this->osd_idle_timeout)
|
||||
{
|
||||
this->osd_idle_timeout = DEFAULT_OSD_PING_TIMEOUT;
|
||||
}
|
||||
this->osd_ping_timeout = config["osd_ping_timeout"].uint64_value();
|
||||
if (!this->osd_ping_timeout)
|
||||
{
|
||||
this->osd_ping_timeout = DEFAULT_OSD_PING_TIMEOUT;
|
||||
}
|
||||
this->log_level = config["log_level"].uint64_value();
|
||||
}
|
||||
|
||||
void osd_messenger_t::connect_peer(uint64_t peer_osd, json11::Json peer_state)
|
||||
{
|
||||
if (wanted_peers.find(peer_osd) == wanted_peers.end())
|
||||
|
@@ -34,6 +34,7 @@
|
||||
|
||||
#define DEFAULT_PEER_CONNECT_INTERVAL 5
|
||||
#define DEFAULT_PEER_CONNECT_TIMEOUT 5
|
||||
#define DEFAULT_OSD_PING_TIMEOUT 5
|
||||
|
||||
// Kind of a vector with small-list-optimisation
|
||||
struct osd_op_buf_list_t
|
||||
@@ -198,6 +199,8 @@ struct osd_client_t
|
||||
int peer_fd;
|
||||
int peer_state;
|
||||
int connect_timeout_id = -1;
|
||||
int ping_time_remaining = 0;
|
||||
int idle_time_remaining = 0;
|
||||
osd_num_t osd_num = 0;
|
||||
|
||||
void *in_buf = NULL;
|
||||
@@ -251,6 +254,7 @@ struct osd_messenger_t
|
||||
{
|
||||
timerfd_manager_t *tfd;
|
||||
ring_loop_t *ringloop;
|
||||
int keepalive_timer_id = -1;
|
||||
|
||||
// osd_num_t is only for logging and asserts
|
||||
osd_num_t osd_num;
|
||||
@@ -258,6 +262,8 @@ struct osd_messenger_t
|
||||
int receive_buffer_size = 64*1024;
|
||||
int peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL;
|
||||
int peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
|
||||
int osd_idle_timeout = DEFAULT_OSD_PING_TIMEOUT;
|
||||
int osd_ping_timeout = DEFAULT_OSD_PING_TIMEOUT;
|
||||
int log_level = 0;
|
||||
bool use_sync_send_recv = false;
|
||||
|
||||
@@ -274,6 +280,8 @@ struct osd_messenger_t
|
||||
osd_op_stats_t stats;
|
||||
|
||||
public:
|
||||
void init();
|
||||
void parse_config(const json11::Json & config);
|
||||
void connect_peer(uint64_t osd_num, json11::Json peer_state);
|
||||
void stop_client(int peer_fd, bool force = false);
|
||||
void outbox_push(osd_op_t *cur_op);
|
||||
|
29
src/osd.cpp
29
src/osd.cpp
@@ -37,6 +37,7 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
|
||||
c_cli.ringloop = this->ringloop;
|
||||
c_cli.exec_op = [this](osd_op_t *op) { exec_op(op); };
|
||||
c_cli.repeer_pgs = [this](osd_num_t peer_osd) { repeer_pgs(peer_osd); };
|
||||
c_cli.init();
|
||||
|
||||
init_cluster();
|
||||
|
||||
@@ -53,11 +54,12 @@ osd_t::~osd_t()
|
||||
|
||||
void osd_t::parse_config(blockstore_config_t & config)
|
||||
{
|
||||
if (config.find("log_level") == config.end())
|
||||
config["log_level"] = "1";
|
||||
// Initial startup configuration
|
||||
json11::Json json_config = json11::Json(config);
|
||||
st_cli.parse_config(json_config);
|
||||
if (config.find("log_level") == config.end())
|
||||
config["log_level"] = "1";
|
||||
log_level = strtoull(config["log_level"].c_str(), NULL, 10);
|
||||
etcd_report_interval = strtoull(config["etcd_report_interval"].c_str(), NULL, 10);
|
||||
if (etcd_report_interval <= 0)
|
||||
etcd_report_interval = 30;
|
||||
@@ -66,6 +68,8 @@ void osd_t::parse_config(blockstore_config_t & config)
|
||||
throw std::runtime_error("osd_num is required in the configuration");
|
||||
c_cli.osd_num = osd_num;
|
||||
run_primary = config["run_primary"] != "false" && config["run_primary"] != "0" && config["run_primary"] != "no";
|
||||
no_rebalance = config["no_rebalance"] == "true" || config["no_rebalance"] == "1" || config["no_rebalance"] == "yes";
|
||||
no_recovery = config["no_recovery"] == "true" || config["no_recovery"] == "1" || config["no_recovery"] == "yes";
|
||||
// Cluster configuration
|
||||
bind_address = config["bind_address"];
|
||||
if (bind_address == "")
|
||||
@@ -100,14 +104,7 @@ void osd_t::parse_config(blockstore_config_t & config)
|
||||
slow_log_interval = strtoull(config["slow_log_interval"].c_str(), NULL, 10);
|
||||
if (!slow_log_interval)
|
||||
slow_log_interval = 10;
|
||||
c_cli.peer_connect_interval = strtoull(config["peer_connect_interval"].c_str(), NULL, 10);
|
||||
if (!c_cli.peer_connect_interval)
|
||||
c_cli.peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL;
|
||||
c_cli.peer_connect_timeout = strtoull(config["peer_connect_timeout"].c_str(), NULL, 10);
|
||||
if (!c_cli.peer_connect_timeout)
|
||||
c_cli.peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
|
||||
log_level = strtoull(config["log_level"].c_str(), NULL, 10);
|
||||
c_cli.log_level = log_level;
|
||||
c_cli.parse_config(json_config);
|
||||
}
|
||||
|
||||
void osd_t::bind_socket()
|
||||
@@ -211,6 +208,12 @@ void osd_t::exec_op(osd_op_t *cur_op)
|
||||
finish_op(cur_op, -EINVAL);
|
||||
return;
|
||||
}
|
||||
if (cur_op->req.hdr.opcode == OSD_OP_PING)
|
||||
{
|
||||
// Pong
|
||||
finish_op(cur_op, 0);
|
||||
return;
|
||||
}
|
||||
if (readonly &&
|
||||
cur_op->req.hdr.opcode != OSD_OP_SEC_READ &&
|
||||
cur_op->req.hdr.opcode != OSD_OP_SEC_LIST &&
|
||||
@@ -261,9 +264,9 @@ void osd_t::reset_stats()
|
||||
|
||||
void osd_t::print_stats()
|
||||
{
|
||||
for (int i = 0; i <= OSD_OP_MAX; i++)
|
||||
for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++)
|
||||
{
|
||||
if (c_cli.stats.op_stat_count[i] != prev_stats.op_stat_count[i])
|
||||
if (c_cli.stats.op_stat_count[i] != prev_stats.op_stat_count[i] && i != OSD_OP_PING)
|
||||
{
|
||||
uint64_t avg = (c_cli.stats.op_stat_sum[i] - prev_stats.op_stat_sum[i])/(c_cli.stats.op_stat_count[i] - prev_stats.op_stat_count[i]);
|
||||
uint64_t bw = (c_cli.stats.op_stat_bytes[i] - prev_stats.op_stat_bytes[i]) / print_stats_interval;
|
||||
@@ -284,7 +287,7 @@ void osd_t::print_stats()
|
||||
prev_stats.op_stat_bytes[i] = c_cli.stats.op_stat_bytes[i];
|
||||
}
|
||||
}
|
||||
for (int i = 0; i <= OSD_OP_MAX; i++)
|
||||
for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++)
|
||||
{
|
||||
if (c_cli.stats.subop_stat_count[i] != prev_stats.subop_stat_count[i])
|
||||
{
|
||||
|
@@ -64,6 +64,8 @@ class osd_t
|
||||
bool readonly = false;
|
||||
osd_num_t osd_num = 1; // OSD numbers start with 1
|
||||
bool run_primary = false;
|
||||
bool no_rebalance = false;
|
||||
bool no_recovery = false;
|
||||
std::string bind_address;
|
||||
int bind_port, listen_backlog;
|
||||
// FIXME: Implement client queue depth limit
|
||||
@@ -160,6 +162,7 @@ class osd_t
|
||||
void submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps);
|
||||
void discard_list_subop(osd_op_t *list_op);
|
||||
bool stop_pg(pg_t & pg);
|
||||
void reset_pg(pg_t & pg);
|
||||
void finish_stop_pg(pg_t & pg);
|
||||
|
||||
// flushing, recovery and backfill
|
||||
|
@@ -37,7 +37,7 @@ void osd_t::init_cluster()
|
||||
.pg_cursize = 0,
|
||||
.pg_size = 3,
|
||||
.pg_minsize = 2,
|
||||
.parity_chunks = 1,
|
||||
.pg_data_size = 2,
|
||||
.pool_id = 1,
|
||||
.pg_num = 1,
|
||||
.target_set = { 1, 2, 3 },
|
||||
@@ -142,7 +142,7 @@ json11::Json osd_t::get_statistics()
|
||||
}
|
||||
st["host"] = self_state["host"];
|
||||
json11::Json::object op_stats, subop_stats;
|
||||
for (int i = 0; i <= OSD_OP_MAX; i++)
|
||||
for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++)
|
||||
{
|
||||
op_stats[osd_op_names[i]] = json11::Json::object {
|
||||
{ "count", c_cli.stats.op_stat_count[i] },
|
||||
@@ -150,7 +150,7 @@ json11::Json osd_t::get_statistics()
|
||||
{ "bytes", c_cli.stats.op_stat_bytes[i] },
|
||||
};
|
||||
}
|
||||
for (int i = 0; i <= OSD_OP_MAX; i++)
|
||||
for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++)
|
||||
{
|
||||
subop_stats[osd_op_names[i]] = json11::Json::object {
|
||||
{ "count", c_cli.stats.subop_stat_count[i] },
|
||||
@@ -593,7 +593,10 @@ void osd_t::apply_pg_config()
|
||||
}
|
||||
else
|
||||
{
|
||||
throw std::runtime_error("Unexpected PG "+std::to_string(pg_num)+" state: "+std::to_string(pg_it->second.state));
|
||||
throw std::runtime_error(
|
||||
"Unexpected PG "+std::to_string(pool_id)+"/"+std::to_string(pg_num)+
|
||||
" state: "+std::to_string(pg_it->second.state)
|
||||
);
|
||||
}
|
||||
}
|
||||
auto & pg = this->pgs[{ .pool_id = pool_id, .pg_num = pg_num }];
|
||||
@@ -603,7 +606,8 @@ void osd_t::apply_pg_config()
|
||||
.pg_cursize = 0,
|
||||
.pg_size = pool_item.second.pg_size,
|
||||
.pg_minsize = pool_item.second.pg_minsize,
|
||||
.parity_chunks = pool_item.second.parity_chunks,
|
||||
.pg_data_size = pg.scheme == POOL_SCHEME_REPLICATED
|
||||
? 1 : pool_item.second.pg_size - pool_item.second.parity_chunks,
|
||||
.pool_id = pool_id,
|
||||
.pg_num = pg_num,
|
||||
.reported_epoch = pg_cfg.epoch,
|
||||
@@ -613,7 +617,7 @@ void osd_t::apply_pg_config()
|
||||
};
|
||||
if (pg.scheme == POOL_SCHEME_JERASURE)
|
||||
{
|
||||
use_jerasure(pg.pg_size, pg.pg_size-pg.parity_chunks, true);
|
||||
use_jerasure(pg.pg_size, pg.pg_data_size, true);
|
||||
}
|
||||
this->pg_state_dirty.insert({ .pool_id = pool_id, .pg_num = pg_num });
|
||||
pg.print_state();
|
||||
@@ -661,7 +665,21 @@ void osd_t::report_pg_states()
|
||||
auto & pg = pg_it->second;
|
||||
reporting_pgs.push_back({ *it, pg.history_changed });
|
||||
std::string state_key_base64 = base64_encode(st_cli.etcd_prefix+"/pg/state/"+std::to_string(pg.pool_id)+"/"+std::to_string(pg.pg_num));
|
||||
if (pg.state == PG_STARTING)
|
||||
bool pg_state_exists = false;
|
||||
if (pg.state != PG_STARTING)
|
||||
{
|
||||
auto pool_it = st_cli.pool_config.find(pg.pool_id);
|
||||
if (pool_it != st_cli.pool_config.end())
|
||||
{
|
||||
auto pg_it = pool_it->second.pg_config.find(pg.pg_num);
|
||||
if (pg_it != pool_it->second.pg_config.end() &&
|
||||
pg_it->second.cur_state != 0)
|
||||
{
|
||||
pg_state_exists = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!pg_state_exists)
|
||||
{
|
||||
// Check that the PG key does not exist
|
||||
// Failed check indicates an unsuccessful PG lock attempt in this case
|
||||
@@ -673,9 +691,7 @@ void osd_t::report_pg_states()
|
||||
}
|
||||
else
|
||||
{
|
||||
// Check that the key is ours
|
||||
// Failed check indicates success for OFFLINE pgs (PG lock is already deleted)
|
||||
// and an unexpected race condition for started pgs (PG lock is held by someone else)
|
||||
// Check that the key is ours if it already exists
|
||||
checks.push_back(json11::Json::object {
|
||||
{ "target", "LEASE" },
|
||||
{ "lease", etcd_lease_id },
|
||||
@@ -797,17 +813,16 @@ void osd_t::report_pg_states()
|
||||
for (auto pp: reporting_pgs)
|
||||
{
|
||||
auto pg_it = this->pgs.find(pp.first);
|
||||
if (pg_it != this->pgs.end())
|
||||
if (pg_it != this->pgs.end() &&
|
||||
pg_it->second.state == PG_OFFLINE &&
|
||||
pg_state_dirty.find(pp.first) == pg_state_dirty.end())
|
||||
{
|
||||
if (pg_it->second.state == PG_OFFLINE)
|
||||
// Forget offline PGs after reporting their state
|
||||
if (pg_it->second.scheme == POOL_SCHEME_JERASURE)
|
||||
{
|
||||
// Remove offline PGs after reporting their state
|
||||
if (pg_it->second.scheme == POOL_SCHEME_JERASURE)
|
||||
{
|
||||
use_jerasure(pg_it->second.pg_size, pg_it->second.pg_size-pg_it->second.parity_chunks, false);
|
||||
}
|
||||
this->pgs.erase(pg_it);
|
||||
use_jerasure(pg_it->second.pg_size, pg_it->second.pg_data_size, false);
|
||||
}
|
||||
this->pgs.erase(pg_it);
|
||||
}
|
||||
}
|
||||
// Push other PG state updates, if any
|
||||
|
@@ -95,7 +95,7 @@ void osd_t::handle_flush_op(bool rollback, pool_id_t pool_id, pg_num_t pg_num, p
|
||||
{
|
||||
// This flush batch is done
|
||||
std::vector<osd_op_t*> continue_ops;
|
||||
auto & pg = pgs[pg_id];
|
||||
auto & pg = pgs.at(pg_id);
|
||||
auto it = pg.flush_actions.begin(), prev_it = it;
|
||||
auto erase_start = it;
|
||||
while (1)
|
||||
@@ -209,32 +209,38 @@ void osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t
|
||||
|
||||
bool osd_t::pick_next_recovery(osd_recovery_op_t &op)
|
||||
{
|
||||
for (auto pg_it = pgs.begin(); pg_it != pgs.end(); pg_it++)
|
||||
if (!no_recovery)
|
||||
{
|
||||
if ((pg_it->second.state & (PG_ACTIVE | PG_HAS_DEGRADED)) == (PG_ACTIVE | PG_HAS_DEGRADED))
|
||||
for (auto pg_it = pgs.begin(); pg_it != pgs.end(); pg_it++)
|
||||
{
|
||||
for (auto obj_it = pg_it->second.degraded_objects.begin(); obj_it != pg_it->second.degraded_objects.end(); obj_it++)
|
||||
if ((pg_it->second.state & (PG_ACTIVE | PG_HAS_DEGRADED)) == (PG_ACTIVE | PG_HAS_DEGRADED))
|
||||
{
|
||||
if (recovery_ops.find(obj_it->first) == recovery_ops.end())
|
||||
for (auto obj_it = pg_it->second.degraded_objects.begin(); obj_it != pg_it->second.degraded_objects.end(); obj_it++)
|
||||
{
|
||||
op.degraded = true;
|
||||
op.oid = obj_it->first;
|
||||
return true;
|
||||
if (recovery_ops.find(obj_it->first) == recovery_ops.end())
|
||||
{
|
||||
op.degraded = true;
|
||||
op.oid = obj_it->first;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for (auto pg_it = pgs.begin(); pg_it != pgs.end(); pg_it++)
|
||||
if (!no_rebalance)
|
||||
{
|
||||
if ((pg_it->second.state & (PG_ACTIVE | PG_HAS_MISPLACED)) == (PG_ACTIVE | PG_HAS_MISPLACED))
|
||||
for (auto pg_it = pgs.begin(); pg_it != pgs.end(); pg_it++)
|
||||
{
|
||||
for (auto obj_it = pg_it->second.misplaced_objects.begin(); obj_it != pg_it->second.misplaced_objects.end(); obj_it++)
|
||||
if ((pg_it->second.state & (PG_ACTIVE | PG_HAS_MISPLACED)) == (PG_ACTIVE | PG_HAS_MISPLACED))
|
||||
{
|
||||
if (recovery_ops.find(obj_it->first) == recovery_ops.end())
|
||||
for (auto obj_it = pg_it->second.misplaced_objects.begin(); obj_it != pg_it->second.misplaced_objects.end(); obj_it++)
|
||||
{
|
||||
op.degraded = false;
|
||||
op.oid = obj_it->first;
|
||||
return true;
|
||||
if (recovery_ops.find(obj_it->first) == recovery_ops.end())
|
||||
{
|
||||
op.degraded = false;
|
||||
op.oid = obj_it->first;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -19,4 +19,5 @@ const char* osd_op_names[] = {
|
||||
"primary_write",
|
||||
"primary_sync",
|
||||
"primary_delete",
|
||||
"ping",
|
||||
};
|
||||
|
@@ -27,7 +27,8 @@
|
||||
#define OSD_OP_WRITE 12
|
||||
#define OSD_OP_SYNC 13
|
||||
#define OSD_OP_DELETE 14
|
||||
#define OSD_OP_MAX 14
|
||||
#define OSD_OP_PING 15
|
||||
#define OSD_OP_MAX 15
|
||||
// Alignment & limit for read/write operations
|
||||
#ifndef MEM_ALIGNMENT
|
||||
#define MEM_ALIGNMENT 512
|
||||
|
@@ -98,13 +98,9 @@ void osd_t::repeer_pgs(osd_num_t peer_osd)
|
||||
}
|
||||
}
|
||||
|
||||
// Repeer on each connect/disconnect peer event
|
||||
void osd_t::start_pg_peering(pg_t & pg)
|
||||
// Reset PG state (when peering or stopping)
|
||||
void osd_t::reset_pg(pg_t & pg)
|
||||
{
|
||||
pg.state = PG_PEERING;
|
||||
this->peering_state |= OSD_PEERING_PGS;
|
||||
report_pg_state(pg);
|
||||
// Reset PG state
|
||||
pg.cur_peers.clear();
|
||||
pg.state_dict.clear();
|
||||
incomplete_objects -= pg.incomplete_objects.size();
|
||||
@@ -135,6 +131,15 @@ void osd_t::start_pg_peering(pg_t & pg)
|
||||
it++;
|
||||
}
|
||||
dirty_pgs.erase({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
|
||||
}
|
||||
|
||||
// Repeer on each connect/disconnect peer event
|
||||
void osd_t::start_pg_peering(pg_t & pg)
|
||||
{
|
||||
pg.state = PG_PEERING;
|
||||
this->peering_state |= OSD_PEERING_PGS;
|
||||
reset_pg(pg);
|
||||
report_pg_state(pg);
|
||||
// Drop connections of clients who have this PG in dirty_pgs
|
||||
if (immediate_commit != IMMEDIATE_ALL)
|
||||
{
|
||||
@@ -178,7 +183,8 @@ void osd_t::start_pg_peering(pg_t & pg)
|
||||
bool found = false;
|
||||
for (auto history_osd: history_set)
|
||||
{
|
||||
if (history_osd != 0 && c_cli.osd_peer_fds.find(history_osd) != c_cli.osd_peer_fds.end())
|
||||
if (history_osd != 0 && (history_osd == this->osd_num ||
|
||||
c_cli.osd_peer_fds.find(history_osd) != c_cli.osd_peer_fds.end()))
|
||||
{
|
||||
found = true;
|
||||
break;
|
||||
@@ -454,11 +460,11 @@ bool osd_t::stop_pg(pg_t & pg)
|
||||
if (pg.peering_state)
|
||||
{
|
||||
// Stop peering
|
||||
for (auto it = pg.peering_state->list_ops.begin(); it != pg.peering_state->list_ops.end();)
|
||||
for (auto it = pg.peering_state->list_ops.begin(); it != pg.peering_state->list_ops.end(); it++)
|
||||
{
|
||||
discard_list_subop(it->second);
|
||||
}
|
||||
for (auto it = pg.peering_state->list_results.begin(); it != pg.peering_state->list_results.end();)
|
||||
for (auto it = pg.peering_state->list_results.begin(); it != pg.peering_state->list_results.end(); it++)
|
||||
{
|
||||
if (it->second.buf)
|
||||
{
|
||||
@@ -468,12 +474,19 @@ bool osd_t::stop_pg(pg_t & pg)
|
||||
delete pg.peering_state;
|
||||
pg.peering_state = NULL;
|
||||
}
|
||||
if (!(pg.state & PG_ACTIVE))
|
||||
if (pg.state & (PG_STOPPING | PG_OFFLINE))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
if (!(pg.state & PG_ACTIVE))
|
||||
{
|
||||
finish_stop_pg(pg);
|
||||
return true;
|
||||
}
|
||||
pg.state = pg.state & ~PG_ACTIVE | PG_STOPPING;
|
||||
if (pg.inflight == 0 && !pg.flush_batch)
|
||||
if (pg.inflight == 0 && !pg.flush_batch &&
|
||||
// We must either forget all PG's unstable writes or wait for it to become clean
|
||||
dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) == dirty_pgs.end())
|
||||
{
|
||||
finish_stop_pg(pg);
|
||||
}
|
||||
@@ -487,6 +500,7 @@ bool osd_t::stop_pg(pg_t & pg)
|
||||
void osd_t::finish_stop_pg(pg_t & pg)
|
||||
{
|
||||
pg.state = PG_OFFLINE;
|
||||
reset_pg(pg);
|
||||
report_pg_state(pg);
|
||||
}
|
||||
|
||||
|
@@ -108,7 +108,7 @@ void pg_obj_state_check_t::start_object()
|
||||
|
||||
void pg_obj_state_check_t::handle_version()
|
||||
{
|
||||
if (!target_ver && last_ver != list[list_pos].version && (n_stable > 0 || n_roles >= pg->pg_minsize))
|
||||
if (!target_ver && last_ver != list[list_pos].version && (n_stable > 0 || n_roles >= pg->pg_data_size))
|
||||
{
|
||||
// Version is either stable or recoverable
|
||||
target_ver = last_ver;
|
||||
@@ -171,7 +171,7 @@ void pg_obj_state_check_t::handle_version()
|
||||
|
||||
void pg_obj_state_check_t::finish_object()
|
||||
{
|
||||
if (!target_ver && (n_stable > 0 || n_roles >= pg->pg_minsize))
|
||||
if (!target_ver && (n_stable > 0 || n_roles >= pg->pg_data_size))
|
||||
{
|
||||
// Version is either stable or recoverable
|
||||
target_ver = last_ver;
|
||||
@@ -233,7 +233,7 @@ void pg_obj_state_check_t::finish_object()
|
||||
{
|
||||
return;
|
||||
}
|
||||
if (!replicated && n_roles < pg->pg_minsize)
|
||||
if (!replicated && n_roles < pg->pg_data_size)
|
||||
{
|
||||
if (log_level > 1)
|
||||
{
|
||||
|
@@ -75,7 +75,7 @@ struct pg_t
|
||||
{
|
||||
int state = 0;
|
||||
uint64_t scheme = 0;
|
||||
uint64_t pg_cursize = 0, pg_size = 0, pg_minsize = 0, parity_chunks = 0;
|
||||
uint64_t pg_cursize = 0, pg_size = 0, pg_minsize = 0, pg_data_size = 0;
|
||||
pool_id_t pool_id = 0;
|
||||
pg_num_t pg_num = 0;
|
||||
uint64_t clean_count = 0, total_count = 0;
|
||||
|
@@ -103,7 +103,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op)
|
||||
if (op_data->st == 1) goto resume_1;
|
||||
else if (op_data->st == 2) goto resume_2;
|
||||
{
|
||||
auto & pg = pgs[{ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num }];
|
||||
auto & pg = pgs.at({ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num });
|
||||
for (int role = 0; role < op_data->pg_data_size; role++)
|
||||
{
|
||||
op_data->stripes[role].read_start = op_data->stripes[role].req_start;
|
||||
@@ -211,7 +211,7 @@ void osd_t::continue_primary_write(osd_op_t *cur_op)
|
||||
return;
|
||||
}
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
auto & pg = pgs[{ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num }];
|
||||
auto & pg = pgs.at({ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num });
|
||||
if (op_data->st == 1) goto resume_1;
|
||||
else if (op_data->st == 2) goto resume_2;
|
||||
else if (op_data->st == 3) goto resume_3;
|
||||
@@ -365,21 +365,19 @@ resume_7:
|
||||
recovery_stat_bytes[0][recovery_type] += op_data->stripes[role].write_end - op_data->stripes[role].write_start;
|
||||
}
|
||||
}
|
||||
if (op_data->object_state->state & OBJ_MISPLACED)
|
||||
// Any kind of a non-clean object can have extra chunks, because we don't record objects
|
||||
// as degraded & misplaced or incomplete & misplaced at the same time. So try to remove extra chunks
|
||||
submit_primary_del_subops(cur_op, pg.cur_set.data(), pg.pg_size, op_data->object_state->osd_set);
|
||||
if (op_data->n_subops > 0)
|
||||
{
|
||||
// Remove extra chunks
|
||||
submit_primary_del_subops(cur_op, pg.cur_set.data(), pg.pg_size, op_data->object_state->osd_set);
|
||||
if (op_data->n_subops > 0)
|
||||
{
|
||||
resume_8:
|
||||
op_data->st = 8;
|
||||
return;
|
||||
op_data->st = 8;
|
||||
return;
|
||||
resume_9:
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
return;
|
||||
}
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
return;
|
||||
}
|
||||
}
|
||||
// Clear object state
|
||||
@@ -391,19 +389,19 @@ continue_others:
|
||||
// Remove version override
|
||||
pg.ver_override.erase(op_data->oid);
|
||||
object_id oid = op_data->oid;
|
||||
// Remove the operation from queue before calling finish_op so it doesn't see the completed operation in queue
|
||||
auto next_it = pg.write_queue.find(oid);
|
||||
if (next_it != pg.write_queue.end() && next_it->second == cur_op)
|
||||
{
|
||||
pg.write_queue.erase(next_it++);
|
||||
}
|
||||
// finish_op would invalidate next_it if it cleared pg.write_queue, but it doesn't do that :)
|
||||
finish_op(cur_op, cur_op->reply.hdr.retval);
|
||||
// Continue other write operations to the same object
|
||||
auto next_it = pg.write_queue.find(oid);
|
||||
auto this_it = next_it;
|
||||
if (this_it != pg.write_queue.end() && this_it->second == cur_op)
|
||||
if (next_it != pg.write_queue.end() && next_it->first == oid)
|
||||
{
|
||||
next_it++;
|
||||
pg.write_queue.erase(this_it);
|
||||
if (next_it != pg.write_queue.end() && next_it->first == oid)
|
||||
{
|
||||
osd_op_t *next_op = next_it->second;
|
||||
continue_primary_write(next_op);
|
||||
}
|
||||
osd_op_t *next_op = next_it->second;
|
||||
continue_primary_write(next_op);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -582,7 +580,7 @@ resume_2:
|
||||
int dpg = 0;
|
||||
for (auto dirty_pg_num: dirty_pgs)
|
||||
{
|
||||
pgs[dirty_pg_num].inflight++;
|
||||
pgs.at(dirty_pg_num).inflight++;
|
||||
op_data->dirty_pgs[dpg++] = dirty_pg_num;
|
||||
}
|
||||
dirty_pgs.clear();
|
||||
@@ -639,7 +637,7 @@ resume_6:
|
||||
.pool_id = INODE_POOL(w.oid.inode),
|
||||
.pg_num = map_to_pg(w.oid, st_cli.pool_config.at(INODE_POOL(w.oid.inode)).pg_stripe_size),
|
||||
};
|
||||
if (pgs[wpg].state & PG_ACTIVE)
|
||||
if (pgs.at(wpg).state & PG_ACTIVE)
|
||||
{
|
||||
uint64_t & dest = this->unstable_writes[(osd_object_id_t){
|
||||
.osd_num = unstable_osd.osd_num,
|
||||
@@ -656,7 +654,9 @@ resume_6:
|
||||
{
|
||||
auto & pg = pgs.at(op_data->dirty_pgs[i]);
|
||||
pg.inflight--;
|
||||
if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch)
|
||||
if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch &&
|
||||
// We must either forget all PG's unstable writes or wait for it to become clean
|
||||
dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) == dirty_pgs.end())
|
||||
{
|
||||
finish_stop_pg(pg);
|
||||
}
|
||||
@@ -750,7 +750,7 @@ void osd_t::continue_primary_del(osd_op_t *cur_op)
|
||||
return;
|
||||
}
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
auto & pg = pgs[{ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num }];
|
||||
auto & pg = pgs.at({ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num });
|
||||
if (op_data->st == 1) goto resume_1;
|
||||
else if (op_data->st == 2) goto resume_2;
|
||||
else if (op_data->st == 3) goto resume_3;
|
||||
|
@@ -40,10 +40,12 @@ void osd_t::finish_op(osd_op_t *cur_op, int retval)
|
||||
{
|
||||
if (cur_op->op_data->pg_num > 0)
|
||||
{
|
||||
auto & pg = pgs[{ .pool_id = INODE_POOL(cur_op->op_data->oid.inode), .pg_num = cur_op->op_data->pg_num }];
|
||||
auto & pg = pgs.at({ .pool_id = INODE_POOL(cur_op->op_data->oid.inode), .pg_num = cur_op->op_data->pg_num });
|
||||
pg.inflight--;
|
||||
assert(pg.inflight >= 0);
|
||||
if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch)
|
||||
if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch &&
|
||||
// We must either forget all PG's unstable writes or wait for it to become clean
|
||||
dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) == dirty_pgs.end())
|
||||
{
|
||||
finish_stop_pg(pg);
|
||||
}
|
||||
|
@@ -1,34 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Cheatsheet for CentOS 7 packaging (not a build script)
|
||||
|
||||
set -e
|
||||
rm -f /etc/yum.repos.d/CentOS-Media.repo
|
||||
yum -y --enablerepo=extras install centos-release-scl epel-release
|
||||
yum -y --enablerepo='*' install devtoolset-9-gcc-c++ devtoolset-9-libatomic-devel gperftools-devel
|
||||
yumdownloader --source qemu
|
||||
yumdownloader --source fio
|
||||
yum-builddep -y --enablerepo='*' qemu
|
||||
yum -y install rpm-build
|
||||
. /opt/rh/devtoolset-9/enable
|
||||
rpm --nomd5 -i qemu*.src.rpm
|
||||
rpm --nomd5 -i fio*.src.rpm
|
||||
cd ~/rpmbuild/SPECS
|
||||
rpmbuild -bp fio.spec
|
||||
perl -i -pe 's/^make V=1/exit 1; make V=1/' qemu.spec
|
||||
rpmbuild -bc qemu.spec
|
||||
perl -i -pe 's/^exit 1; make V=1/make V=1/' qemu.spec
|
||||
cd ~/rpmbuild/BUILD/qemu*/
|
||||
make qapi-types.h
|
||||
mkdir -p ~/vitastor/qemu/b/qemu
|
||||
cp config-host.h ~/vitastor/qemu/b/qemu
|
||||
cp qapi-types.h ~/vitastor/qemu/b/qemu
|
||||
cp -r include ~/vitastor/qemu
|
||||
cd ~/vitastor
|
||||
sh copy-qemu-includes.sh
|
||||
mv qemu qemu-old
|
||||
mv qemu-copy qemu
|
||||
ln -s ~/rpmbuild/BUILD/fio*/ fio
|
||||
sh copy-fio-includes.sh
|
||||
rm fio
|
||||
mv fio-copy fio
|
@@ -5,25 +5,36 @@
|
||||
dd if=/dev/zero of=./testdata/test_osd1.bin bs=1024 count=1 seek=$((1024*1024-1))
|
||||
dd if=/dev/zero of=./testdata/test_osd2.bin bs=1024 count=1 seek=$((1024*1024-1))
|
||||
dd if=/dev/zero of=./testdata/test_osd3.bin bs=1024 count=1 seek=$((1024*1024-1))
|
||||
dd if=/dev/zero of=./testdata/test_osd4.bin bs=1024 count=1 seek=$((1024*1024-1))
|
||||
dd if=/dev/zero of=./testdata/test_osd5.bin bs=1024 count=1 seek=$((1024*1024-1))
|
||||
dd if=/dev/zero of=./testdata/test_osd6.bin bs=1024 count=1 seek=$((1024*1024-1))
|
||||
|
||||
build/src/vitastor-osd --osd_num 1 --bind_address 127.0.0.1 --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd1.bin 2>/dev/null) &>./testdata/osd1.log &
|
||||
build/src/vitastor-osd --osd_num 1 --bind_address 127.0.0.1 --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd1.bin 2>/dev/null) 2>&1 >>./testdata/osd1.log &
|
||||
OSD1_PID=$!
|
||||
build/src/vitastor-osd --osd_num 2 --bind_address 127.0.0.1 --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd2.bin 2>/dev/null) &>./testdata/osd2.log &
|
||||
build/src/vitastor-osd --osd_num 2 --bind_address 127.0.0.1 --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd2.bin 2>/dev/null) 2>&1 >>./testdata/osd2.log &
|
||||
OSD2_PID=$!
|
||||
build/src/vitastor-osd --osd_num 3 --bind_address 127.0.0.1 --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd3.bin 2>/dev/null) &>./testdata/osd3.log &
|
||||
build/src/vitastor-osd --osd_num 3 --bind_address 127.0.0.1 --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd3.bin 2>/dev/null) 2>&1 >>./testdata/osd3.log &
|
||||
OSD3_PID=$!
|
||||
build/src/vitastor-osd --osd_num 4 --bind_address 127.0.0.1 --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd4.bin 2>/dev/null) 2>&1 >>./testdata/osd4.log &
|
||||
OSD4_PID=$!
|
||||
build/src/vitastor-osd --osd_num 5 --bind_address 127.0.0.1 --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd5.bin 2>/dev/null) 2>&1 >>./testdata/osd5.log &
|
||||
OSD5_PID=$!
|
||||
build/src/vitastor-osd --osd_num 6 --bind_address 127.0.0.1 --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd6.bin 2>/dev/null) 2>&1 >>./testdata/osd6.log &
|
||||
OSD6_PID=$!
|
||||
|
||||
cd mon
|
||||
npm install
|
||||
cd ..
|
||||
node mon/mon-main.js --etcd_url http://$ETCD_URL --etcd_prefix "/vitastor" &>./testdata/mon.log &
|
||||
node mon/mon-main.js --etcd_url http://$ETCD_URL --etcd_prefix "/vitastor" --verbose 1 &>./testdata/mon.log &
|
||||
MON_PID=$!
|
||||
|
||||
$ETCDCTL put /vitastor/config/pools '{"1":{"name":"testpool","scheme":"replicated","pg_size":3,"pg_minsize":2,"pg_count":16,"failure_domain":"osd"}}'
|
||||
$ETCDCTL put /vitastor/config/global '{"immediate_commit":"all"}'
|
||||
|
||||
$ETCDCTL put /vitastor/config/pools '{"1":{"name":"testpool","scheme":"replicated","pg_size":2,"pg_minsize":2,"pg_count":16,"failure_domain":"osd"}}'
|
||||
|
||||
sleep 2
|
||||
|
||||
if ! ($ETCDCTL get /vitastor/config/pgs --print-value-only | jq -s -e '(.[0].items["1"] | map((.osd_set | sort) == ["1","2","3"]) | length) == 16'); then
|
||||
if ! ($ETCDCTL get /vitastor/config/pgs --print-value-only | jq -s -e '(.[0].items["1"] | map((.osd_set | select(. > 0)) | length == 2) | length) == 16'); then
|
||||
format_error "FAILED: 16 PGS NOT CONFIGURED"
|
||||
fi
|
||||
|
||||
@@ -31,20 +42,35 @@ if ! ($ETCDCTL get --prefix /vitastor/pg/state/ --print-value-only | jq -s -e '(
|
||||
format_error "FAILED: 16 PGS NOT UP"
|
||||
fi
|
||||
|
||||
LD_PRELOAD=libasan.so.5 \
|
||||
fio -thread -name=test -ioengine=build/src/libfio_vitastor.so -bs=4M -direct=1 -iodepth=1 -fsync=1 -rw=write \
|
||||
-etcd=$ETCD_URL -pool=1 -inode=2 -size=128M -cluster_log_level=10
|
||||
|
||||
try_change()
|
||||
{
|
||||
n=$1
|
||||
|
||||
$ETCDCTL put /vitastor/config/pools '{"1":{"name":"testpool","scheme":"replicated","pg_size":3,"pg_minsize":2,"pg_count":'$n',"failure_domain":"osd"}}'
|
||||
for i in {1..6}; do
|
||||
echo --- Change PG count to $n --- >>testdata/osd$i.log
|
||||
done
|
||||
|
||||
$ETCDCTL put /vitastor/config/pools '{"1":{"name":"testpool","scheme":"replicated","pg_size":2,"pg_minsize":2,"pg_count":'$n',"failure_domain":"osd"}}'
|
||||
|
||||
for i in {1..10}; do
|
||||
($ETCDCTL get /vitastor/config/pgs --print-value-only | jq -s -e '(.[0].items["1"] | map((.osd_set | sort) == ["1","2","3"]) | length) == '$n) && \
|
||||
($ETCDCTL get --prefix /vitastor/pg/state/ --print-value-only | jq -s -e '([ .[] | select(.state == ["active"]) ] | length) == '$n'') && \
|
||||
($ETCDCTL get /vitastor/config/pgs --print-value-only | jq -s -e '(.[0].items["1"] | map((.osd_set | select(. > 0)) | length == 2) | length) == '$n) && \
|
||||
($ETCDCTL get --prefix /vitastor/pg/state/ --print-value-only | jq -s -e '([ .[] | select(.state == ["active"] or .state == ["active", "has_misplaced"]) ] | length) == '$n'') && \
|
||||
break
|
||||
sleep 1
|
||||
done
|
||||
|
||||
if ! ($ETCDCTL get /vitastor/config/pgs --print-value-only | jq -s -e '(.[0].items["1"] | map((.osd_set | sort) == ["1","2","3"]) | length) == '$n); then
|
||||
# Wait for the rebalance to finish
|
||||
for i in {1..60}; do
|
||||
($ETCDCTL get --prefix /vitastor/pg/state/ --print-value-only | jq -s -e '([ .[] | select(.state == ["active"]) ] | length) == '$n'') && \
|
||||
break
|
||||
sleep 1
|
||||
done
|
||||
|
||||
if ! ($ETCDCTL get /vitastor/config/pgs --print-value-only | jq -s -e '(.[0].items["1"] | map((.osd_set | select(. > 0)) | length == 2) | length) == '$n); then
|
||||
$ETCDCTL get /vitastor/config/pgs
|
||||
$ETCDCTL get --prefix /vitastor/pg/state/
|
||||
format_error "FAILED: $n PGS NOT CONFIGURED"
|
||||
@@ -55,6 +81,12 @@ try_change()
|
||||
$ETCDCTL get --prefix /vitastor/pg/state/
|
||||
format_error "FAILED: $n PGS NOT UP"
|
||||
fi
|
||||
|
||||
# Check that no objects are lost !
|
||||
nobj=`$ETCDCTL get --prefix '/vitastor/pg/stats' --print-value-only | jq -s '[ .[].object_count ] | reduce .[] as $num (0; .+$num)'`
|
||||
if [ "$nobj" -ne 1024 ]; then
|
||||
format_error "Data lost after changing PG count to $n: 1024 objects expected, but got $nobj"
|
||||
fi
|
||||
}
|
||||
|
||||
# 16 -> 32
|
||||
@@ -77,4 +109,10 @@ try_change 17
|
||||
|
||||
try_change 16
|
||||
|
||||
# Monitor should report non-zero overall statistics at least once
|
||||
|
||||
if ! (grep /vitastor/stats ./testdata/mon.log | jq -s -e '[ .[] | select((.kv.value.op_stats.primary_write.count | tonumber) > 0) ] | length > 0'); then
|
||||
format_error "FAILED: monitor doesn't aggregate stats"
|
||||
fi
|
||||
|
||||
format_green OK
|
||||
|
79
tests/test_move_reappear.sh
Executable file
79
tests/test_move_reappear.sh
Executable file
@@ -0,0 +1,79 @@
|
||||
#!/bin/bash -ex
|
||||
|
||||
. `dirname $0`/common.sh
|
||||
|
||||
dd if=/dev/zero of=./testdata/test_osd1.bin bs=1024 count=1 seek=$((1024*1024-1))
|
||||
dd if=/dev/zero of=./testdata/test_osd2.bin bs=1024 count=1 seek=$((1024*1024-1))
|
||||
dd if=/dev/zero of=./testdata/test_osd3.bin bs=1024 count=1 seek=$((1024*1024-1))
|
||||
dd if=/dev/zero of=./testdata/test_osd4.bin bs=1024 count=1 seek=$((1024*1024-1))
|
||||
dd if=/dev/zero of=./testdata/test_osd5.bin bs=1024 count=1 seek=$((1024*1024-1))
|
||||
|
||||
build/src/vitastor-osd --osd_num 1 --bind_address 127.0.0.1 --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd1.bin 2>/dev/null) &>./testdata/osd1.log &
|
||||
OSD1_PID=$!
|
||||
build/src/vitastor-osd --osd_num 2 --bind_address 127.0.0.1 --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd2.bin 2>/dev/null) &>./testdata/osd2.log &
|
||||
OSD2_PID=$!
|
||||
build/src/vitastor-osd --osd_num 3 --bind_address 127.0.0.1 --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd3.bin 2>/dev/null) &>./testdata/osd3.log &
|
||||
OSD3_PID=$!
|
||||
build/src/vitastor-osd --osd_num 4 --bind_address 127.0.0.1 --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd4.bin 2>/dev/null) &>./testdata/osd4.log &
|
||||
OSD4_PID=$!
|
||||
build/src/vitastor-osd --osd_num 5 --bind_address 127.0.0.1 --etcd_address $ETCD_URL $(node mon/simple-offsets.js --format options --device ./testdata/test_osd5.bin 2>/dev/null) &>./testdata/osd5.log &
|
||||
OSD5_PID=$!
|
||||
|
||||
$ETCDCTL put /vitastor/config/pools '{"1":{"name":"testpool","scheme":"replicated","pg_size":2,"pg_minsize":1,"pg_count":1,"failure_domain":"osd"}}'
|
||||
|
||||
$ETCDCTL put /vitastor/config/pgs '{"items":{"1":{"1":{"osd_set":[1,0],"primary":1}}}}'
|
||||
|
||||
sleep 2
|
||||
|
||||
if ! ($ETCDCTL get /vitastor/pg/state/1/1 --print-value-only | jq -s -e '(. | length) != 0 and .[0].state == ["active","degraded"]'); then
|
||||
format_error "Failed to start the PG active+degraded"
|
||||
fi
|
||||
|
||||
LD_PRELOAD=libasan.so.5 \
|
||||
fio -thread -name=test -ioengine=build/src/libfio_vitastor.so -bs=4M -direct=1 -iodepth=1 -fsync=1 -rw=write \
|
||||
-etcd=$ETCD_URL -pool=1 -inode=2 -size=32M -cluster_log_level=10
|
||||
|
||||
$ETCDCTL put /vitastor/config/pgs '{"items":{"1":{"1":{"osd_set":[1,0],"primary":0}}}}'
|
||||
|
||||
sleep 2
|
||||
|
||||
if [ "`$ETCDCTL get /vitastor/pg/state/1/1 --print-value-only`" != "" ]; then
|
||||
format_error "Failed to stop the PG"
|
||||
fi
|
||||
|
||||
$ETCDCTL put /vitastor/pg/history/1/1 '{"all_peers":[1,2,3]}'
|
||||
|
||||
$ETCDCTL put /vitastor/config/pgs '{"items":{"1":{"1":{"osd_set":[4,5],"primary":4}}}}'
|
||||
|
||||
sleep 5
|
||||
|
||||
if ! ($ETCDCTL get /vitastor/pg/state/1/1 --print-value-only | jq -s -e '(. | length) != 0 and .[0].state == ["active"]'); then
|
||||
format_error "Failed to move degraded objects to the clean OSD set"
|
||||
fi
|
||||
|
||||
$ETCDCTL put /vitastor/config/pgs '{"items":{"1":{"1":{"osd_set":[4,5],"primary":0}}}}'
|
||||
|
||||
$ETCDCTL put /vitastor/pg/history/1/1 '{"all_peers":[1,2,3]}'
|
||||
|
||||
sleep 2
|
||||
|
||||
if [ "`$ETCDCTL get /vitastor/pg/state/1/1 --print-value-only`" != "" ]; then
|
||||
format_error "Failed to stop the PG after degraded recovery"
|
||||
fi
|
||||
|
||||
cp testdata/osd4.log testdata/osd4_pre.log
|
||||
>testdata/osd4.log
|
||||
|
||||
$ETCDCTL put /vitastor/config/pgs '{"items":{"1":{"1":{"osd_set":[4,5],"primary":4}}}}'
|
||||
|
||||
sleep 2
|
||||
|
||||
if grep -q 'PG 1/1.*is.*has_' testdata/osd4.log; then
|
||||
format_error "PG has degraded or misplaced objects after a full re-peer following a degraded recovery"
|
||||
fi
|
||||
|
||||
if ! ($ETCDCTL get /vitastor/pg/state/1/1 --print-value-only | jq -s -e '(. | length) != 0 and .[0].state == ["active"]'); then
|
||||
format_error "PG not active+clean after a full re-peer following a degraded recovery"
|
||||
fi
|
||||
|
||||
format_green OK
|
Reference in New Issue
Block a user