diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0c6eb29 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +*.[oais] +/main +*~ +*.ii +*.bc diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..0738284 --- /dev/null +++ b/Makefile @@ -0,0 +1,19 @@ +# -fsanitize=undefined +CXXFLAGS += -std=c++11 -O3 -Wall -Wextra -I/usr/include/rados -I/usr/include/jsoncpp +#-Wa,-adhln -g +LDFLAGS += -pthread -lrados -ljsoncpp -lstdc++ + +#CC=clang-6.0 + + +main: main.o + $(CC) $^ -o $@ $(LDFLAGS) + +.cpp.o: + $(CC) $(CPPFLAGS) $(CXXFLAGS) -c $< -o $@ + +indent: main.cpp + clang-format-6.0 -i main.cpp + +builddep: + sudo apt install -y --no-install-recommends libjsoncpp-dev diff --git a/main.cpp b/main.cpp new file mode 100644 index 0000000..557c2df --- /dev/null +++ b/main.cpp @@ -0,0 +1,418 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace librados; +using namespace std; +using namespace chrono; + +template static double dur2sec(const T &dur) { + return duration_cast>(dur).count(); +} + +template static double dur2msec(const T &dur) { + return duration_cast>(dur).count(); +} + +template +static void print_breakdown(const vector &summary, size_t thread_count) { + + T totaltime(0); + + map dur2count; + map dur2totaltime; + static const size_t usecs[] = { + 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, + 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, + 30000, 40000, 50000, 60000, 70000, 80000, 90000, 100000, 1000000000}; + + for (const auto &m : usecs) { + dur2count[m] = 0; + dur2totaltime[m] = T(0); + } + + T mindur(minutes(42)); + T maxdur(0); + for (const auto &res : summary) { + totaltime += res; + if (res > maxdur) + maxdur = res; + if (res < mindur) + mindur = res; + for (const auto &m : usecs) { + if (res <= microseconds(m)) { + dur2count.at(m)++; + dur2totaltime.at(m) += res; + break; + } + } + } + + cout << "min delay " << dur2msec(mindur) << " msec." << endl; + cout << "max delay " << dur2msec(maxdur) << " msec." << endl; + + // skip first counters equal zero + auto b = dur2count.begin(); + while (b != dur2count.end() && !b->second) + b++; + + // skip last counters equal zero + auto e = dur2count.end(); + if (e != b) { + e--; + while (e != b && !e->second) + e--; + e++; + } + + size_t maxcount = 0; + for (auto p = b; p != e; p++) { + const auto &usecgroup = p->first; + const auto &count = p->second; + // const auto ×pent = dur2totaltime.at(usecgroup); + + if (count > maxcount) + maxcount = count; + + auto bar = std::string(count * 70 / summary.size(), '#'); + if (usecgroup == 1000000000) + cout << "> 100"; + else + cout << "<=" << setw(5) << usecgroup / 1000.0; + cout << " ms: " << setw(3) << count * 100 / summary.size() << "% " << bar + << " cnt=" << count; + // cout << " (" << (count * thread_count) / dur2sec(timespent) + // << " IOPS)"; + cout << endl; + } + + size_t sum = 0; + T sumtime(0); + for (auto p = b; p != e; p++) { + const auto &usecgroup = p->first; + const auto &count = p->second; + if (count > maxcount / 100.0) { + sum += count; + sumtime += dur2totaltime.at(usecgroup); + } + } + + cout << "ops: " << (summary.size() * thread_count) / dur2sec(totaltime) + << endl; + + cout << "ops (count > 0.01 of max): " + << (sum * thread_count) / dur2sec(sumtime) << endl; + + if (thread_count > 1) + cout << "ops per thread: " << summary.size() / dur2sec(totaltime) << endl; +} + +// Called in a thread. +static void _do_bench(unsigned int secs, const string &obj_name, IoCtx &ioctx, + vector *ops) { + auto b = steady_clock::now(); + const auto stop = b + seconds(secs); + + // cout<<"tt" <size() % 2 ? bar1 : bar2) < 0) + throw "Write error"; + const auto b2 = steady_clock::now(); + ops->push_back(b2 - b); + b = b2; + } + } catch (...) { + ioctx.remove(obj_name); // ignore errors. + throw; + } + ioctx.remove(obj_name); // ignore errors. +} + +static void do_bench(unsigned int secs, const vector &names, + IoCtx &ioctx) { + + vector summary; + + if (names.size() > 1) { + vector threads; + vector *> listofopts; + + for (const auto &name : names) { + auto results = new vector; + listofopts.push_back(results); + threads.push_back(thread(_do_bench, secs, name, ref(ioctx), results)); + } + + for (auto &th : threads) + th.join(); + + // just an optimisation :) + size_t qwe = 0; + for (const auto &res : listofopts) + qwe += res->size(); + summary.reserve(qwe); + + for (const auto &res : listofopts) { + summary.insert(summary.end(), res->begin(), res->end()); + delete res; + } + } else { + _do_bench(secs, names.at(0), ioctx, &summary); + } + print_breakdown(summary, names.size()); +} + +class RadosUtils { +public: + RadosUtils(Rados *rados_) + : rados(rados_), json_reader(Json::Features::strictMode()) {} + + unsigned int get_obj_acting_primary(const string &name, const string &pool) { + + Json::Value cmd(Json::objectValue); + cmd["prefix"] = "osd map"; + cmd["object"] = name; + cmd["pool"] = pool; + + auto &&location = do_mon_command(cmd); + + const auto &acting_primary = location["acting_primary"]; + if (!acting_primary.isNumeric()) + throw "Failed to get acting_primary"; + + return acting_primary.asUInt(); + } + + map get_osd_location(unsigned int osd) { + Json::Value cmd(Json::objectValue); + cmd["prefix"] = "osd find"; + cmd["id"] = osd; + + auto &&location = do_mon_command(cmd); + const auto &crush = location["crush_location"]; + + map result; + + for (auto &&it = crush.begin(); it != crush.end(); ++it) { + result[it.name()] = it->asString(); + } + + result["osd"] = "osd." + to_string(osd); + + return result; + } + + set get_osds(const string &pool) { + Json::Value cmd(Json::objectValue); + cmd["prefix"] = "pg ls-by-pool"; + cmd["poolstr"] = pool; + + const auto &&pgs = do_mon_command(cmd); + + set osds; + + // TODO: + // auto const & x: container + // https://stackoverflow.com/questions/27307373/c-how-to-create-iterator-over-one-field-of-a-struct-vector + for (const auto &pg : pgs) { + const auto &primary = pg["acting_primary"]; + if (!primary.isNumeric()) + throw "Failed to get acting_primary"; + osds.insert(primary.asUInt()); + } + + return osds; + } + + unsigned int get_pool_size(const string &pool) { + Json::Value cmd(Json::objectValue); + cmd["prefix"] = "osd pool get"; + cmd["pool"] = pool; + cmd["var"] = "size"; + + const auto &&v = do_mon_command(cmd); + + return v["size"].asUInt(); + } + +private: + Json::Value do_mon_command(Json::Value &cmd) { + int err; + bufferlist outbl; + string outs; + cmd["format"] = "json"; + bufferlist inbl; + if ((err = rados->mon_command(json_writer.write(cmd), inbl, &outbl, + &outs)) < 0) { + cerr << "mon_command error: " << outs << endl; + throw "mon_command error"; + } + + Json::Value root; + if (!json_reader.parse(outbl.to_str(), root)) + throw "JSON parse error"; + + return root; + } + + Rados *rados; + Json::Reader json_reader; + Json::FastWriter json_writer; +}; + +static void _main(int argc, const char *argv[]) { + struct { + string pool; + string mode; + string specific_bench_item; + unsigned int threads; + unsigned int secs; + } settings; + + Rados rados; + + int err; + if ((err = rados.init("admin")) < 0) { + cerr << "Failed to init: " << strerror(-err) << endl; + throw "Failed to init"; + } + + if ((err = rados.conf_read_file("/etc/ceph/ceph.conf")) < 0) { + cerr << "Failed to read conf file: " << strerror(-err) << endl; + throw "Failed to read conf file"; + } + + if ((err = rados.conf_parse_argv(argc, argv)) < 0) { + cerr << "Failed to parse argv: " << strerror(-err) << endl; + throw "Failed to parse argv"; + } + + switch (argc) { + case 3: + settings.pool = argv[1]; + settings.mode = argv[2]; + break; + case 4: + settings.pool = argv[1]; + settings.mode = argv[2]; + settings.specific_bench_item = argv[3]; + break; + default: + cerr << "Usage: " << argv[0] + << " [poolname] [mode=host|osd] " << endl; + throw "Wrong cmdline"; + } + + settings.secs = 10; + settings.threads = 1; + + if ((err = rados.connect()) < 0) { + cerr << "Failed to connect: " << strerror(-err) << endl; + throw "Failed to connect"; + } + + // https://tracker.ceph.com/issues/24114 + this_thread::sleep_for(milliseconds(100)); + + auto rados_utils = RadosUtils(&rados); + + if (rados_utils.get_pool_size(settings.pool) != 1) + throw "It's required to have pool size 1"; + + map> osd2location; + + set bench_items; // node1, node2 ||| osd.1, osd.2, osd.3 + + for (const auto &osd : rados_utils.get_osds(settings.pool)) { + const auto &location = rados_utils.get_osd_location(osd); + + // TODO: do not fill this map if specific_bench_item specified + osd2location[osd] = location; + + const auto &qwe = location.at(settings.mode); + if (settings.specific_bench_item.empty() || + qwe == settings.specific_bench_item) { + bench_items.insert(qwe); + } + } + + // benchitem -> [name1, name2] ||| i.e. "osd.2" => ["obj1", "obj2"] + map> name2location; + unsigned int cnt = 0; + + // for each bench_item find thread_count names. + // store every name in name2location = [bench_item, names, description] + const string prefix = "bench_"; + while (bench_items.size()) { + string name = prefix + to_string(++cnt); + + unsigned int osd = rados_utils.get_obj_acting_primary(name, settings.pool); + + const auto &location = osd2location.at(osd); + const auto &bench_item = location.at(settings.mode); + if (!bench_items.count(bench_item)) + continue; + + auto &names = name2location[bench_item]; + if (names.size() == settings.threads) { + bench_items.erase(bench_item); + continue; + } + + names.push_back(name); + + cout << name << " - " << bench_item << endl; + } + + IoCtx ioctx; + // TODO: cleanup + /* + * NOTE: be sure to call watch_flush() prior to destroying any IoCtx + * that is used for watch events to ensure that racing callbacks + * have completed. + */ + + if (rados.ioctx_create(settings.pool.c_str(), ioctx) < 0) + throw "Failed to create ioctx"; + + for (const auto &p : name2location) { + const auto &bench_item = p.first; + const auto &obj_names = p.second; + cout << "Benching " << settings.mode << " " << bench_item << endl; + do_bench(settings.secs, obj_names, ioctx); + } +} + +int main(int argc, const char *argv[]) { + /* + * IoCtx p; + * rados.ioctx_create("my_pool", p); + * p->stat(&stats); + + + */ + try { + _main(argc, argv); + } catch (const char *msg) { + cerr << "Unhandled exception: " << msg << endl; + return 1; + } + + cout << "Exiting successfully." << endl; + return 0; +} diff --git a/main.py b/main.py index 137bc3a..b562987 100755 --- a/main.py +++ b/main.py @@ -301,10 +301,11 @@ def main(): iops = ops / elapsed log.info( - '%s %r: %2.2f IOPS, minlat=%.4f ms, maxlat=%.4f ms. %s.', + '%s %r: %2.2f IOPS (%2.2f ops), minlat=%.4f ms, maxlat=%.4f ms. %s.', mode, bench_item, iops, + ops, latencies[0] * 1000, latencies[-1] * 1000, description,