refac and signals

master
Коренберг Марк (Ноутбук HP) 2019-01-20 17:35:09 +05:00
parent aafe2ba582
commit 7ff6009925
7 changed files with 259 additions and 507 deletions

View File

@ -6,14 +6,14 @@ LDFLAGS += -pthread -lrados -ljsoncpp -lstdc++
#CC=clang-6.0
main: main.o
main: main.o mysignals.o radosutil.o
$(CC) $^ -o $@ $(LDFLAGS)
.cpp.o:
$(CC) $(CPPFLAGS) $(CXXFLAGS) -c $< -o $@
indent: main.cpp
clang-format-6.0 -i main.cpp
indent: *.cpp *.h
clang-format-6.0 -i $^
builddep:
sudo apt install -y --no-install-recommends libjsoncpp-dev

255
main.cpp
View File

@ -1,4 +1,5 @@
#include <chrono>
#include <csignal>
#include <iostream>
#include <json/json.h>
#include <librados.hpp>
@ -8,6 +9,9 @@
#include <thread>
#include <vector>
#include "mysignals.h"
#include "radosutil.h"
using namespace librados;
using namespace std;
using namespace chrono;
@ -116,20 +120,22 @@ static void print_breakdown(const vector<T> &summary, size_t thread_count) {
// Called in a thread.
static void _do_bench(unsigned int secs, const string &obj_name, IoCtx &ioctx,
vector<steady_clock::duration> *ops) {
auto b = steady_clock::now();
const auto stop = b + seconds(secs);
// cout<<"tt" <<ops<<endl;
bufferlist bar1;
bufferlist bar2;
// interleave buffers
// TODO: 4096in order not to read on writer ! But anyway
// during test this block should be cached
bar1.append("q");
bar2.append("w");
// TODO: wait for SIGINT (!)
auto b = steady_clock::now();
const auto stop = b + seconds(secs);
try {
while (b <= stop) {
abort_if_signalled();
if (ioctx.write_full(obj_name, ops->size() % 2 ? bar1 : bar2) < 0)
throw "Write error";
const auto b2 = steady_clock::now();
@ -153,9 +159,24 @@ static void do_bench(unsigned int secs, const vector<string> &names,
vector<vector<steady_clock::duration> *> listofopts;
for (const auto &name : names) {
// TODO: memory leak on exception...
auto results = new vector<steady_clock::duration>;
listofopts.push_back(results);
sigset_t new_set;
sigset_t old_set;
sigfillset(&new_set);
int err;
if ((err = pthread_sigmask(SIG_SETMASK, &new_set, &old_set)))
throw std::system_error(err, std::system_category(),
"Failed to set thread sigmask");
threads.push_back(thread(_do_bench, secs, name, ref(ioctx), results));
if ((err = pthread_sigmask(SIG_SETMASK, &old_set, NULL)))
throw std::system_error(err, std::system_category(),
"Failed to restore thread sigmask");
}
for (auto &th : threads)
@ -177,104 +198,6 @@ static void do_bench(unsigned int secs, const vector<string> &names,
print_breakdown(summary, names.size());
}
class RadosUtils {
public:
RadosUtils(Rados *rados_)
: rados(rados_), json_reader(Json::Features::strictMode()) {}
unsigned int get_obj_acting_primary(const string &name, const string &pool) {
Json::Value cmd(Json::objectValue);
cmd["prefix"] = "osd map";
cmd["object"] = name;
cmd["pool"] = pool;
auto &&location = do_mon_command(cmd);
const auto &acting_primary = location["acting_primary"];
if (!acting_primary.isNumeric())
throw "Failed to get acting_primary";
return acting_primary.asUInt();
}
map<string, string> get_osd_location(unsigned int osd) {
Json::Value cmd(Json::objectValue);
cmd["prefix"] = "osd find";
cmd["id"] = osd;
auto &&location = do_mon_command(cmd);
const auto &crush = location["crush_location"];
map<string, string> result;
for (auto &&it = crush.begin(); it != crush.end(); ++it) {
result[it.name()] = it->asString();
}
result["osd"] = "osd." + to_string(osd);
return result;
}
set<unsigned int> get_osds(const string &pool) {
Json::Value cmd(Json::objectValue);
cmd["prefix"] = "pg ls-by-pool";
cmd["poolstr"] = pool;
const auto &&pgs = do_mon_command(cmd);
set<unsigned int> osds;
// TODO:
// auto const & x: container
// https://stackoverflow.com/questions/27307373/c-how-to-create-iterator-over-one-field-of-a-struct-vector
for (const auto &pg : pgs) {
const auto &primary = pg["acting_primary"];
if (!primary.isNumeric())
throw "Failed to get acting_primary";
osds.insert(primary.asUInt());
}
return osds;
}
unsigned int get_pool_size(const string &pool) {
Json::Value cmd(Json::objectValue);
cmd["prefix"] = "osd pool get";
cmd["pool"] = pool;
cmd["var"] = "size";
const auto &&v = do_mon_command(cmd);
return v["size"].asUInt();
}
private:
Json::Value do_mon_command(Json::Value &cmd) {
int err;
bufferlist outbl;
string outs;
cmd["format"] = "json";
bufferlist inbl;
if ((err = rados->mon_command(json_writer.write(cmd), inbl, &outbl,
&outs)) < 0) {
cerr << "mon_command error: " << outs << endl;
throw "mon_command error";
}
Json::Value root;
if (!json_reader.parse(outbl.to_str(), root))
throw "JSON parse error";
return root;
}
Rados *rados;
Json::Reader json_reader;
Json::FastWriter json_writer;
};
static void _main(int argc, const char *argv[]) {
struct {
string pool;
@ -285,7 +208,6 @@ static void _main(int argc, const char *argv[]) {
} settings;
Rados rados;
int err;
if ((err = rados.init("admin")) < 0) {
cerr << "Failed to init: " << strerror(-err) << endl;
@ -329,90 +251,87 @@ static void _main(int argc, const char *argv[]) {
// https://tracker.ceph.com/issues/24114
this_thread::sleep_for(milliseconds(100));
auto rados_utils = RadosUtils(&rados);
try {
auto rados_utils = RadosUtils(&rados);
if (rados_utils.get_pool_size(settings.pool) != 1)
throw "It's required to have pool size 1";
if (rados_utils.get_pool_size(settings.pool) != 1)
throw "It's required to have pool size 1";
map<unsigned int, map<string, string>> osd2location;
map<unsigned int, map<string, string>> osd2location;
set<string> bench_items; // node1, node2 ||| osd.1, osd.2, osd.3
set<string> bench_items; // node1, node2 ||| osd.1, osd.2, osd.3
for (const auto &osd : rados_utils.get_osds(settings.pool)) {
const auto &location = rados_utils.get_osd_location(osd);
for (const auto &osd : rados_utils.get_osds(settings.pool)) {
const auto &location = rados_utils.get_osd_location(osd);
// TODO: do not fill this map if specific_bench_item specified
osd2location[osd] = location;
// TODO: do not fill this map if specific_bench_item specified
osd2location[osd] = location;
const auto &qwe = location.at(settings.mode);
if (settings.specific_bench_item.empty() ||
qwe == settings.specific_bench_item) {
bench_items.insert(qwe);
}
}
// benchitem -> [name1, name2] ||| i.e. "osd.2" => ["obj1", "obj2"]
map<string, vector<string>> name2location;
unsigned int cnt = 0;
// for each bench_item find thread_count names.
// store every name in name2location = [bench_item, names, description]
const string prefix = "bench_";
while (bench_items.size()) {
string name = prefix + to_string(++cnt);
unsigned int osd = rados_utils.get_obj_acting_primary(name, settings.pool);
const auto &location = osd2location.at(osd);
const auto &bench_item = location.at(settings.mode);
if (!bench_items.count(bench_item))
continue;
auto &names = name2location[bench_item];
if (names.size() == settings.threads) {
bench_items.erase(bench_item);
continue;
const auto &qwe = location.at(settings.mode);
if (settings.specific_bench_item.empty() ||
qwe == settings.specific_bench_item) {
bench_items.insert(qwe);
}
}
names.push_back(name);
// benchitem -> [name1, name2] ||| i.e. "osd.2" => ["obj1", "obj2"]
map<string, vector<string>> name2location;
unsigned int cnt = 0;
cout << name << " - " << bench_item << endl;
}
IoCtx ioctx;
// TODO: cleanup
/*
* NOTE: be sure to call watch_flush() prior to destroying any IoCtx
* that is used for watch events to ensure that racing callbacks
* have completed.
*/
if (rados.ioctx_create(settings.pool.c_str(), ioctx) < 0)
throw "Failed to create ioctx";
for (const auto &p : name2location) {
const auto &bench_item = p.first;
const auto &obj_names = p.second;
cout << "Benching " << settings.mode << " " << bench_item << endl;
do_bench(settings.secs, obj_names, ioctx);
// for each bench_item find thread_count names.
// store every name in name2location = [bench_item, names, description]
const string prefix = "bench_";
while (bench_items.size()) {
string name = prefix + to_string(++cnt);
unsigned int osd =
rados_utils.get_obj_acting_primary(name, settings.pool);
const auto &location = osd2location.at(osd);
const auto &bench_item = location.at(settings.mode);
if (!bench_items.count(bench_item))
continue;
auto &names = name2location[bench_item];
if (names.size() == settings.threads) {
bench_items.erase(bench_item);
continue;
}
names.push_back(name);
cout << name << " - " << bench_item << endl;
}
IoCtx ioctx;
if (rados.ioctx_create(settings.pool.c_str(), ioctx) < 0)
throw "Failed to create ioctx";
for (const auto &p : name2location) {
const auto &bench_item = p.first;
const auto &obj_names = p.second;
cout << "Benching " << settings.mode << " " << bench_item << endl;
do_bench(settings.secs, obj_names, ioctx);
}
} catch (...) {
rados.watch_flush();
throw;
}
rados.watch_flush();
}
int main(int argc, const char *argv[]) {
/*
* IoCtx p;
* rados.ioctx_create("my_pool", p);
* p->stat(&stats);
*/
try {
setup_signal_handlers();
_main(argc, argv);
} catch (const AbortException &msg) {
cerr << "Test aborted" << endl;
return 1;
} catch (const char *msg) {
cerr << "Unhandled exception: " << msg << endl;
return 1;
return 2;
}
cout << "Exiting successfully." << endl;
return 0;
}

336
main.py
View File

@ -1,336 +0,0 @@
#!/usr/bin/python3
# coding: utf-8
import json
import logging
import os
import signal
import sys
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from itertools import cycle
from threading import Thread
import rados
if sys.version_info >= (3, 0):
from time import monotonic, sleep
else:
from time import time as monotonic, sleep
log = logging.getLogger(__name__)
DO_ABORT = False
def _do_bench(secs, name, ioctx, data):
ops = []
data = cycle(data)
b = monotonic()
stop = b + secs
try:
while not DO_ABORT and b <= stop:
ioctx.write(name, next(data))
b2 = monotonic()
ops.append(b2 - b)
b = b2
finally:
try:
log.debug('Removing object %s.', name)
ioctx.remove_object(name)
except Exception as e:
log.error('Failed to remove object %s: %r', name, e)
return ops
def signal_handler(*args):
global DO_ABORT
log.info('Aborted by signal.')
DO_ABORT = True
def do_bench(secs, object_names, ioctx, data):
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
results = {}
thrds = {
# Appending is thread safe ?
Thread(
target=lambda name2: results.setdefault(name2, _do_bench(secs, name2, ioctx, data)),
args=(name,)
)
for name in object_names
}
for i in thrds:
i.start()
# Thread.join() is not signal-interruptible (!)
while thrds:
for i in list(thrds):
i.join(1)
if not i.is_alive():
thrds.remove(i)
finally:
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
if DO_ABORT:
raise RuntimeError('Aborted')
return results
def get_pool_size(cluster, pool):
(ret, outbuf, outs) = cluster.mon_command(
json.dumps({
"prefix": "osd pool get",
"pool": pool,
"format": "json",
"var": "size",
}),
'',
0
)
if ret:
raise RuntimeError(outs)
result = json.loads(outbuf.decode('utf-8'))
return result['size']
def get_osds(cluster, pool):
(ret, outbuf, outs) = cluster.mgr_command(
json.dumps({
"prefix": "pg ls-by-pool",
"poolstr": pool,
"target": ["mgr", ""],
"format": "json",
}),
'',
0
)
if ret:
raise RuntimeError(outs)
result = json.loads(outbuf.decode('utf-8'))
return {i['acting_primary'] for i in result}
def get_osd_location(cluster, osd):
(ret, outbuf, outs) = cluster.mon_command(
json.dumps({
"prefix": "osd find",
"id": osd,
"format": "json",
}),
'',
0
)
if ret:
raise RuntimeError(outs)
result = json.loads(outbuf.decode('utf-8'))
result = result['crush_location']
result['osd'] = osd
log.debug('Location of OSD %r is %r.', osd, result)
return result
def get_obj_acting_primary(cluster, pool, name):
(ret, outbuf, outs) = cluster.mon_command(
json.dumps({
"prefix": "osd map",
"object": name,
"pool": pool,
"format": "json",
}),
'',
0
)
if ret:
raise RuntimeError(outs)
result = json.loads(outbuf.decode('utf-8'))
acting_primary = result['acting_primary']
log.debug('Acting primary OSD %r (for object %r).', acting_primary, name)
return acting_primary
def get_description(cluster, location):
osd = location['osd']
(ret, outbuf, outs) = cluster.mon_command(
json.dumps({
"prefix": "osd metadata",
"id": osd,
"format": "json",
}),
'',
0
)
if ret:
raise RuntimeError(outs)
result = json.loads(outbuf.decode('utf-8'))
if result["osd_objectstore"] == 'filestore':
x = [
'jrn=%s' % ('hdd' if int(result["journal_rotational"]) else 'ssd'),
'dat=%s' % ('hdd' if int(result["rotational"]) else 'ssd'),
]
elif result["osd_objectstore"] == 'bluestore':
x = [
'db=%s(%s)' % (result['bluefs_db_type'], result["bluefs_db_model"].rstrip()),
'dat=%s(%s)' % (result['bluestore_bdev_type'], result["bluestore_bdev_model"].rstrip()),
]
else:
x = []
return ' '.join(
[
'r=%s,h=%s,osd.%s' % (location['root'], location['host'], osd),
result["osd_objectstore"],
] + x + [result['cpu']]
)
def main():
parser = ArgumentParser(
description="Socketair Ceph tester. You should create pool of size 1 and provide a keyring file with user having rights to write to this pool.",
formatter_class=ArgumentDefaultsHelpFormatter,
epilog="For all questions contact Коренберг Марк <socketpair@gmail.com> and/or Telegram user @socketpair, as well as @socketpair on GitHub."
)
parser.add_argument('--debug', action='store_true', help='Enable debug mode.')
parser.add_argument('--duration', type=int, default=10, help='Time limit for each test.', metavar='SECONDS')
parser.add_argument('--bigsize', type=int, default=4 * 1024 * 1024, help='Size of object for linear write.',
metavar='BYTES')
parser.add_argument('--smallsize', type=int, default=1, help='Size of object for linear IOPS write test.',
metavar='BYTES')
parser.add_argument('--keyring', type=str, default='./keyring.conf', help='Path to keyring file.', metavar='PATH')
parser.add_argument('--threads', type=int, default=1,
help='Parallel testing using multiple threads and different object in each.', metavar='COUNT')
parser.add_argument('pool', help='Ceph pool name.')
parser.add_argument('mode', default='host',
help='Test item selection. Possible values: any key from crush location, e.g. "host", "root". And also special value "osd" to test each OSD.')
params = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if params.debug else logging.INFO,
format='%(levelname)s:%(name)s:%(message)s' if params.debug else '%(message)s',
)
conf = {'keyring': params.keyring}
pool = params.pool
mode = params.mode
secs = params.duration
bigsize = params.bigsize
smallsize = params.smallsize
threads_count = params.threads
bigdata = [os.urandom(bigsize), os.urandom(bigsize)]
smalldata = [os.urandom(smallsize), os.urandom(smallsize)]
if smalldata[0] == smalldata[1]:
raise ValueError('You are looser.')
log.info('Attaching to CEPH cluster. pool=%s', pool)
with rados.Rados(conffile='/etc/ceph/ceph.conf', conf=conf) as cluster:
sleep(0.1) # https://tracker.ceph.com/issues/24114
log.debug('Checking that pool %r size is 1.', pool)
if get_pool_size(cluster, pool) != 1:
raise RuntimeError('Pool %r size must be 1.' % pool)
log.debug('Getting list of OSDs for pool %r.', pool)
osds = sorted(get_osds(cluster, pool))
log.debug('Total OSDs in this pool: %d.', len(osds))
log.info('Getting OSD locations.')
osd2location = {osd: get_osd_location(cluster, osd) for osd in osds}
bench_items = set(v[mode] for v in osd2location.values())
totlen = len(bench_items)
log.info('Figuring out object names for %d %ss.', totlen, mode)
name2location = {} # benchitem -> ([name1, name2], description)
cnt = 0
foundcnt = 0
while bench_items:
cnt += 1
name = 'bench_%d' % cnt
osd = get_obj_acting_primary(cluster, pool, name)
location = osd2location[osd]
bench_item = location[mode]
if bench_item not in bench_items:
continue
foundcnt += 1
xxx = name2location.get(bench_item)
if xxx is None:
xxx = [[name], get_description(cluster, location) if threads_count == 1 else '*multiple*']
name2location[bench_item] = xxx
else:
xxx[0].append(name)
if len(xxx[0]) == threads_count:
bench_items.remove(bench_item)
log.info('Found %d/%d', foundcnt, totlen * threads_count)
name2location = sorted(
(
(bench_item, names, descr)
for bench_item, (names, descr) in name2location.items()
),
key=lambda i: i[0]
)
log.debug('Opening IO context for pool %s. Each benchmark will last %d secs.', pool, secs)
with cluster.open_ioctx(pool) as ioctx:
log.info('Start write IOPS benchmarking of %d %ss with %d thread(s).', len(name2location), mode,
threads_count)
for (bench_item, names, description) in name2location:
log.debug('Benchmarking write IOPS on %r', bench_item)
# { 'name1': [1.2, 3.4, 5.6, ...], 'name2': [...], ...}
results = do_bench(secs, names, ioctx, smalldata)
latencies = []
for v in results.values():
latencies.extend(v)
latencies.sort()
elapsed = max(sum(v) for v in results.values())
ops = sum(len(v) for v in results.values())
iops = ops / elapsed
log.info(
'%s %r: %2.2f IOPS (%2.2f ops), minlat=%.4f ms, maxlat=%.4f ms. %s.',
mode,
bench_item,
iops,
ops,
latencies[0] * 1000,
latencies[-1] * 1000,
description,
)
log.info('Start Linear write benchmarking of %d %ss. blocksize=%d MiB with %d thread(s).',
len(name2location), mode,
bigsize // (1024 * 1024), threads_count)
for (bench_item, names, description) in name2location:
log.debug('Benchmarking Linear write on %r', bench_item)
results = do_bench(secs, names, ioctx, bigdata)
elapsed = max(sum(v) for v in results.values())
ops = sum(len(v) for v in results.values())
bsec = ops * bigsize / elapsed
log.info(
'%s %r: %2.2f MB/sec (%2.2f Mbit/s). %s.',
mode,
bench_item,
bsec / 1000000,
bsec * 8 / 1000000,
description,
)
if __name__ == '__main__':
main()

16
mysignals.cpp Normal file
View File

@ -0,0 +1,16 @@
#include <csignal>
#include "mysignals.h"
static volatile std::sig_atomic_t gSignalStatus;
static void signal_handler(int signal) { gSignalStatus = signal; }
void setup_signal_handlers() {
std::signal(SIGINT, signal_handler);
std::signal(SIGTERM, signal_handler);
}
void abort_if_signalled() {
if (gSignalStatus)
throw AbortException();
}

11
mysignals.h Normal file
View File

@ -0,0 +1,11 @@
#ifndef MYSIGNALS_H
#define MYSIGNALS_H
#include <exception>
void setup_signal_handlers();
class AbortException : public std::exception {};
void abort_if_signalled();
#endif

101
radosutil.cpp Normal file
View File

@ -0,0 +1,101 @@
#include <string>
#include <json/json.h>
#include <librados.hpp>
#include "radosutil.h"
using namespace std;
using namespace librados;
RadosUtils::RadosUtils(Rados *rados_)
: rados(rados_), /* */
json_reader(new Json::Reader(Json::Features::strictMode())), /* */
json_writer(new Json::FastWriter()) /* */
{}
unsigned int RadosUtils::get_obj_acting_primary(const string &name,
const string &pool) {
Json::Value cmd(Json::objectValue);
cmd["prefix"] = "osd map";
cmd["object"] = name;
cmd["pool"] = pool;
auto &&location = do_mon_command(cmd);
const auto &acting_primary = location["acting_primary"];
if (!acting_primary.isNumeric())
throw "Failed to get acting_primary";
return acting_primary.asUInt();
}
// TODO: std::map copying ? return unique_ptr ?
map<string, string> RadosUtils::get_osd_location(unsigned int osd) {
Json::Value cmd(Json::objectValue);
cmd["prefix"] = "osd find";
cmd["id"] = osd;
auto &&location = do_mon_command(cmd);
const auto &crush = location["crush_location"];
map<string, string> result;
for (auto &&it = crush.begin(); it != crush.end(); ++it) {
result[it.name()] = it->asString();
}
result["osd"] = "osd." + to_string(osd);
return result;
}
// todo: std::set copying
set<unsigned int> RadosUtils::get_osds(const string &pool) {
Json::Value cmd(Json::objectValue);
cmd["prefix"] = "pg ls-by-pool";
cmd["poolstr"] = pool;
const auto &&pgs = do_mon_command(cmd);
set<unsigned int> osds;
for (const auto &pg : pgs) {
const auto &primary = pg["acting_primary"];
if (!primary.isNumeric())
throw "Failed to get acting_primary";
osds.insert(primary.asUInt());
}
return osds;
}
unsigned int RadosUtils::get_pool_size(const string &pool) {
Json::Value cmd(Json::objectValue);
cmd["prefix"] = "osd pool get";
cmd["pool"] = pool;
cmd["var"] = "size";
const auto &&v = do_mon_command(cmd);
return v["size"].asUInt();
}
Json::Value RadosUtils::do_mon_command(Json::Value &cmd) {
int err;
bufferlist outbl;
string outs;
cmd["format"] = "json";
bufferlist inbl;
if ((err = rados->mon_command(json_writer->write(cmd), inbl, &outbl, &outs)) <
0)
throw MyRadosException(err, outs);
Json::Value root;
if (!json_reader->parse(outbl.to_str(), root))
throw "JSON parse error";
return root;
}

41
radosutil.h Normal file
View File

@ -0,0 +1,41 @@
#include <exception>
#include <map>
#include <memory>
#include <set>
#include <string>
namespace Json {
class Reader;
class FastWriter;
class Value;
} // namespace Json
namespace librados {
class Rados;
}
class RadosUtils {
public:
RadosUtils(librados::Rados *rados_);
unsigned int get_obj_acting_primary(const std::string &name,
const std::string &pool);
std::map<std::string, std::string> get_osd_location(unsigned int osd);
std::set<unsigned int> get_osds(const std::string &pool);
unsigned int get_pool_size(const std::string &pool);
private:
Json::Value do_mon_command(Json::Value &cmd);
librados::Rados *rados;
std::unique_ptr<Json::Reader> json_reader;
std::unique_ptr<Json::FastWriter> json_writer;
};
class MyRadosException : public std::exception {
public:
MyRadosException(int err, const std::string &msg)
: descr("Rados err " + std::to_string(err) + ": " + msg){};
const char *what() const throw() { return descr.c_str(); }
private:
std::string descr;
};