// Copyright (c) Vitaliy Filippov, 2019+ // License: VNPL-1.1 (see README.md for details) // // Vitastor shared key/value database stress tester / benchmark #define _XOPEN_SOURCE #include #include #include #include #include //#include #include "epoll_manager.h" #include "str_util.h" #include "kv_db.h" const char *exe_name = NULL; struct kv_test_listing_t { void *handle = NULL; std::string next_after; std::set inflights; timespec tv_begin; bool error = false; }; struct kv_test_lat_t { const char *name = NULL; uint64_t usec = 0, count = 0; }; struct kv_test_stat_t { kv_test_lat_t get, add, update, del, list; uint64_t list_keys = 0; }; class kv_test_t { public: // Config json11::Json::object kv_cfg; std::string key_suffix; uint64_t inode_id = 0; uint64_t op_count = 1000000; uint64_t runtime_sec = 0; uint64_t parallelism = 4; uint64_t reopen_prob = 1; uint64_t get_prob = 30000; uint64_t add_prob = 20000; uint64_t update_prob = 20000; uint64_t del_prob = 5000; uint64_t list_prob = 300; uint64_t min_key_len = 10; uint64_t max_key_len = 70; uint64_t min_value_len = 50; uint64_t max_value_len = 300; uint64_t print_stats_interval = 1; bool json_output = false; bool trace = false; bool stop_on_error = false; // FIXME: Multiple clients kv_test_stat_t stat, prev_stat; timespec prev_stat_time, start_stat_time; // State kv_dbw_t *db = NULL; ring_loop_t *ringloop = NULL; epoll_manager_t *epmgr = NULL; cluster_client_t *cli = NULL; ring_consumer_t consumer; bool finished = false; uint64_t total_prob = 0; uint64_t ops_sent = 0, ops_done = 0; int stat_timer_id = -1; int in_progress = 0; bool reopening = false; std::set listings; std::set changing_keys; std::map values; ~kv_test_t(); static json11::Json::object parse_args(int narg, const char *args[]); void parse_config(json11::Json cfg); void run(json11::Json cfg); void loop(); void print_stats(kv_test_stat_t & prev_stat, timespec & prev_stat_time); void print_total_stats(); void start_change(const std::string & key); void stop_change(const std::string & key); void add_stat(kv_test_lat_t & stat, timespec tv_begin); }; kv_test_t::~kv_test_t() { if (db) delete db; if (cli) { cli->flush(); delete cli; } if (epmgr) delete epmgr; if (ringloop) delete ringloop; } json11::Json::object kv_test_t::parse_args(int narg, const char *args[]) { json11::Json::object cfg; for (int i = 1; i < narg; i++) { if (!strcmp(args[i], "-h") || !strcmp(args[i], "--help")) { printf( "Vitastor Key/Value DB stress tester / benchmark\n" "(c) Vitaliy Filippov, 2023+ (VNPL-1.1)\n" "\n" "USAGE: %s --pool_id POOL_ID --inode_id INODE_ID [OPTIONS]\n" " --op_count 1000000\n" " Total operations to run during test. 0 means unlimited\n" " --key_suffix \"\"\n" " Suffix for all keys read or written (to avoid collisions)\n" " --runtime 0\n" " Run for this number of seconds. 0 means unlimited\n" " --parallelism 4\n" " Run this number of operations in parallel\n" " --get_prob 30000\n" " Fraction of key retrieve operations\n" " --add_prob 20000\n" " Fraction of key addition operations\n" " --update_prob 20000\n" " Fraction of key update operations\n" " --del_prob 30000\n" " Fraction of key delete operations\n" " --list_prob 300\n" " Fraction of listing operations\n" " --min_key_len 10\n" " Minimum key size in bytes\n" " --max_key_len 70\n" " Maximum key size in bytes\n" " --min_value_len 50\n" " Minimum value size in bytes\n" " --max_value_len 300\n" " Maximum value size in bytes\n" " --print_stats 1\n" " Print operation statistics every this number of seconds\n" " --json\n" " JSON output\n" " --stop_on_error 0\n" " Stop on first execution error, mismatch, lost key or extra key during listing\n" " --kv_memory_limit 128M\n" " Maximum memory to use for vitastor-kv index cache\n" " --kv_allocate_blocks 4\n" " Number of PG blocks used for new tree block allocation in parallel\n" " --kv_evict_max_misses 10\n" " Eviction algorithm parameter: retry eviction from another random spot\n" " if this number of keys is used currently or was used recently\n" " --kv_evict_attempts_per_level 3\n" " Retry eviction at most this number of times per tree level, starting\n" " with bottom-most levels\n" " --kv_evict_unused_age 1000\n" " Evict only keys unused during this number of last operations\n" " --kv_log_level 1\n" " Log level. 0 = errors, 1 = warnings, 10 = trace operations\n", exe_name ); exit(0); } else if (args[i][0] == '-' && args[i][1] == '-') { const char *opt = args[i]+2; cfg[opt] = !strcmp(opt, "json") || i == narg-1 ? "1" : args[++i]; } } return cfg; } void kv_test_t::parse_config(json11::Json cfg) { inode_id = INODE_WITH_POOL(cfg["pool_id"].uint64_value(), cfg["inode_id"].uint64_value()); if (cfg["op_count"].uint64_value() > 0) op_count = cfg["op_count"].uint64_value(); key_suffix = cfg["key_suffix"].string_value(); if (cfg["runtime"].uint64_value() > 0) runtime_sec = cfg["runtime"].uint64_value(); if (cfg["parallelism"].uint64_value() > 0) parallelism = cfg["parallelism"].uint64_value(); if (!cfg["reopen_prob"].is_null()) reopen_prob = cfg["reopen_prob"].uint64_value(); if (!cfg["get_prob"].is_null()) get_prob = cfg["get_prob"].uint64_value(); if (!cfg["add_prob"].is_null()) add_prob = cfg["add_prob"].uint64_value(); if (!cfg["update_prob"].is_null()) update_prob = cfg["update_prob"].uint64_value(); if (!cfg["del_prob"].is_null()) del_prob = cfg["del_prob"].uint64_value(); if (!cfg["list_prob"].is_null()) list_prob = cfg["list_prob"].uint64_value(); if (!cfg["min_key_len"].is_null()) min_key_len = cfg["min_key_len"].uint64_value(); if (cfg["max_key_len"].uint64_value() > 0) max_key_len = cfg["max_key_len"].uint64_value(); if (!cfg["min_value_len"].is_null()) min_value_len = cfg["min_value_len"].uint64_value(); if (cfg["max_value_len"].uint64_value() > 0) max_value_len = cfg["max_value_len"].uint64_value(); if (!cfg["print_stats"].is_null()) print_stats_interval = cfg["print_stats"].uint64_value(); if (!cfg["json"].is_null()) json_output = true; if (!cfg["stop_on_error"].is_null()) stop_on_error = cfg["stop_on_error"].bool_value(); if (!cfg["kv_memory_limit"].is_null()) kv_cfg["kv_memory_limit"] = cfg["kv_memory_limit"]; if (!cfg["kv_allocate_blocks"].is_null()) kv_cfg["kv_allocate_blocks"] = cfg["kv_allocate_blocks"]; if (!cfg["kv_evict_max_misses"].is_null()) kv_cfg["kv_evict_max_misses"] = cfg["kv_evict_max_misses"]; if (!cfg["kv_evict_attempts_per_level"].is_null()) kv_cfg["kv_evict_attempts_per_level"] = cfg["kv_evict_attempts_per_level"]; if (!cfg["kv_evict_unused_age"].is_null()) kv_cfg["kv_evict_unused_age"] = cfg["kv_evict_unused_age"]; if (!cfg["kv_log_level"].is_null()) { trace = cfg["kv_log_level"].uint64_value() >= 10; kv_cfg["kv_log_level"] = cfg["kv_log_level"]; } total_prob = reopen_prob+get_prob+add_prob+update_prob+del_prob+list_prob; stat.get.name = "get"; stat.add.name = "add"; stat.update.name = "update"; stat.del.name = "del"; stat.list.name = "list"; } void kv_test_t::run(json11::Json cfg) { srand48(time(NULL)); parse_config(cfg); // Create client ringloop = new ring_loop_t(512); epmgr = new epoll_manager_t(ringloop); cli = new cluster_client_t(ringloop, epmgr->tfd, cfg); db = new kv_dbw_t(cli); // Load image metadata while (!cli->is_ready()) { ringloop->loop(); if (cli->is_ready()) break; ringloop->wait(); } // Run reopening = true; db->open(inode_id, kv_cfg, [this](int res) { reopening = false; if (res < 0) { fprintf(stderr, "ERROR: Open index: %d (%s)\n", res, strerror(-res)); exit(1); } if (trace) printf("Index opened\n"); ringloop->wakeup(); }); consumer.loop = [this]() { loop(); }; ringloop->register_consumer(&consumer); if (print_stats_interval) stat_timer_id = epmgr->tfd->set_timer(print_stats_interval*1000, true, [this](int) { print_stats(prev_stat, prev_stat_time); }); clock_gettime(CLOCK_REALTIME, &start_stat_time); prev_stat_time = start_stat_time; while (!finished) { ringloop->loop(); if (!finished) ringloop->wait(); } if (stat_timer_id >= 0) epmgr->tfd->clear_timer(stat_timer_id); ringloop->unregister_consumer(&consumer); // Print total stats print_total_stats(); // Destroy the client delete db; db = NULL; cli->flush(); delete cli; delete epmgr; delete ringloop; cli = NULL; epmgr = NULL; ringloop = NULL; } static const char *base64_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789@+/"; std::string random_str(int len) { std::string str; str.resize(len); for (int i = 0; i < len; i++) { str[i] = base64_chars[lrand48() % 64]; } return str; } void kv_test_t::loop() { if (reopening) { return; } if (ops_done >= op_count) { finished = true; } while (!finished && ops_sent < op_count && in_progress < parallelism) { uint64_t dice = (lrand48() % total_prob); if (dice < reopen_prob) { reopening = true; db->close([this]() { if (trace) printf("Index closed\n"); db->open(inode_id, kv_cfg, [this](int res) { reopening = false; if (res < 0) { fprintf(stderr, "ERROR: Reopen index: %d (%s)\n", res, strerror(-res)); finished = true; return; } if (trace) printf("Index reopened\n"); ringloop->wakeup(); }); }); return; } else if (dice < reopen_prob+get_prob) { // get existing auto key = random_str(max_key_len); auto k_it = values.lower_bound(key); if (k_it == values.end()) continue; key = k_it->first; if (changing_keys.find(key) != changing_keys.end()) continue; in_progress++; ops_sent++; if (trace) printf("get %s\n", key.c_str()); timespec tv_begin; clock_gettime(CLOCK_REALTIME, &tv_begin); db->get(key, [this, key, tv_begin](int res, const std::string & value) { add_stat(stat.get, tv_begin); ops_done++; in_progress--; auto it = values.find(key); if (res != (it == values.end() ? -ENOENT : 0)) { fprintf(stderr, "ERROR: get %s: %d (%s)\n", key.c_str(), res, strerror(-res)); if (stop_on_error) exit(1); } else if (it != values.end() && value != it->second) { fprintf(stderr, "ERROR: get %s: mismatch: %s vs %s\n", key.c_str(), value.c_str(), it->second.c_str()); if (stop_on_error) exit(1); } ringloop->wakeup(); }); } else if (dice < reopen_prob+get_prob+add_prob+update_prob) { bool is_add = false; std::string key; if (dice < reopen_prob+get_prob+add_prob) { // add is_add = true; uint64_t key_len = min_key_len + (max_key_len > min_key_len ? lrand48() % (max_key_len-min_key_len) : 0); key = random_str(key_len) + key_suffix; } else { // update key = random_str(max_key_len); auto k_it = values.lower_bound(key); if (k_it == values.end()) continue; key = k_it->first; } if (changing_keys.find(key) != changing_keys.end()) continue; uint64_t value_len = min_value_len + (max_value_len > min_value_len ? lrand48() % (max_value_len-min_value_len) : 0); auto value = random_str(value_len); start_change(key); ops_sent++; in_progress++; if (trace) printf("set %s = %s\n", key.c_str(), value.c_str()); timespec tv_begin; clock_gettime(CLOCK_REALTIME, &tv_begin); db->set(key, value, [this, key, value, tv_begin, is_add](int res) { add_stat(is_add ? stat.add : stat.update, tv_begin); stop_change(key); ops_done++; in_progress--; if (res != 0) { fprintf(stderr, "ERROR: set %s = %s: %d (%s)\n", key.c_str(), value.c_str(), res, strerror(-res)); if (stop_on_error) exit(1); } else { values[key] = value; } ringloop->wakeup(); }, NULL); } else if (dice < reopen_prob+get_prob+add_prob+update_prob+del_prob) { // delete auto key = random_str(max_key_len); auto k_it = values.lower_bound(key); if (k_it == values.end()) continue; key = k_it->first; if (changing_keys.find(key) != changing_keys.end()) continue; start_change(key); ops_sent++; in_progress++; if (trace) printf("del %s\n", key.c_str()); timespec tv_begin; clock_gettime(CLOCK_REALTIME, &tv_begin); db->del(key, [this, key, tv_begin](int res) { add_stat(stat.del, tv_begin); stop_change(key); ops_done++; in_progress--; if (res != 0) { fprintf(stderr, "ERROR: del %s: %d (%s)\n", key.c_str(), res, strerror(-res)); if (stop_on_error) exit(1); } else { values.erase(key); } ringloop->wakeup(); }, NULL); } else if (dice < reopen_prob+get_prob+add_prob+update_prob+del_prob+list_prob) { // list ops_sent++; in_progress++; auto key = random_str(max_key_len); auto lst = new kv_test_listing_t; lst->handle = db->list_start(key); auto k_it = values.lower_bound(key); lst->next_after = k_it == values.begin() ? "" : key; lst->inflights = changing_keys; listings.insert(lst); if (trace) printf("list from %s\n", key.c_str()); clock_gettime(CLOCK_REALTIME, &lst->tv_begin); db->list_next(lst->handle, [this, lst](int res, const std::string & key, const std::string & value) { if (res < 0) { add_stat(stat.list, lst->tv_begin); if (res != -ENOENT) { fprintf(stderr, "ERROR: list: %d (%s)\n", res, strerror(-res)); lst->error = true; } else { auto k_it = lst->next_after == "" ? values.begin() : values.upper_bound(lst->next_after); while (k_it != values.end()) { while (k_it != values.end() && lst->inflights.find(k_it->first) != lst->inflights.end()) k_it++; if (k_it != values.end()) { fprintf(stderr, "ERROR: list: missing key %s\n", (k_it++)->first.c_str()); lst->error = true; } } } if (lst->error && stop_on_error) exit(1); ops_done++; in_progress--; db->list_close(lst->handle); delete lst; listings.erase(lst); ringloop->wakeup(); } else { stat.list_keys++; // Do not check modified keys in listing // Listing may return their old or new state if ((!key_suffix.size() || key.size() >= key_suffix.size() && key.substr(key.size()-key_suffix.size()) == key_suffix) && lst->inflights.find(key) == lst->inflights.end()) { auto k_it = lst->next_after == "" ? values.begin() : values.upper_bound(lst->next_after); while (true) { while (k_it != values.end() && lst->inflights.find(k_it->first) != lst->inflights.end()) { k_it++; } if (k_it == values.end() || k_it->first > key) { fprintf(stderr, "ERROR: list: extra key %s\n", key.c_str()); lst->error = true; break; } else if (k_it->first < key) { fprintf(stderr, "ERROR: list: missing key %s\n", k_it->first.c_str()); lst->error = true; lst->next_after = k_it->first; k_it++; } else { if (k_it->second != value) { fprintf(stderr, "ERROR: list: mismatch: %s = %s but should be %s\n", key.c_str(), value.c_str(), k_it->second.c_str()); lst->error = true; } lst->next_after = k_it->first; break; } } } db->list_next(lst->handle, NULL); } }); } } } void kv_test_t::add_stat(kv_test_lat_t & stat, timespec tv_begin) { timespec tv_end; clock_gettime(CLOCK_REALTIME, &tv_end); int64_t usec = (tv_end.tv_sec - tv_begin.tv_sec)*1000000 + (tv_end.tv_nsec - tv_begin.tv_nsec)/1000; if (usec > 0) { stat.usec += usec; stat.count++; } } void kv_test_t::print_stats(kv_test_stat_t & prev_stat, timespec & prev_stat_time) { timespec cur_stat_time; clock_gettime(CLOCK_REALTIME, &cur_stat_time); int64_t usec = (cur_stat_time.tv_sec - prev_stat_time.tv_sec)*1000000 + (cur_stat_time.tv_nsec - prev_stat_time.tv_nsec)/1000; if (usec > 0) { kv_test_lat_t *lats[] = { &stat.get, &stat.add, &stat.update, &stat.del, &stat.list }; kv_test_lat_t *prev[] = { &prev_stat.get, &prev_stat.add, &prev_stat.update, &prev_stat.del, &prev_stat.list }; if (!json_output) { char buf[128] = { 0 }; for (int i = 0; i < sizeof(lats)/sizeof(lats[0]); i++) { snprintf(buf, sizeof(buf)-1, "%.1f %s/s (%lu us)", (lats[i]->count-prev[i]->count)*1000000.0/usec, lats[i]->name, (lats[i]->usec-prev[i]->usec)/(lats[i]->count-prev[i]->count > 0 ? lats[i]->count-prev[i]->count : 1)); int k; for (k = strlen(buf); k < strlen(lats[i]->name)+21; k++) buf[k] = ' '; buf[k] = 0; printf("%s", buf); } printf("\n"); } else { int64_t runtime = (cur_stat_time.tv_sec - start_stat_time.tv_sec)*1000000 + (cur_stat_time.tv_nsec - start_stat_time.tv_nsec)/1000; printf("{\"runtime\":%.1f", (double)runtime/1000000.0); for (int i = 0; i < sizeof(lats)/sizeof(lats[0]); i++) { if (lats[i]->count > prev[i]->count) { printf( ",\"%s\":{\"avg\":{\"iops\":%.1f,\"usec\":%lu},\"total\":{\"count\":%lu,\"usec\":%lu}}", lats[i]->name, (lats[i]->count-prev[i]->count)*1000000.0/usec, (lats[i]->usec-prev[i]->usec)/(lats[i]->count-prev[i]->count), lats[i]->count, lats[i]->usec ); } } printf("}\n"); } } prev_stat = stat; prev_stat_time = cur_stat_time; } void kv_test_t::print_total_stats() { if (!json_output) printf("Total:\n"); kv_test_stat_t start_stats; timespec start_stat_time = this->start_stat_time; print_stats(start_stats, start_stat_time); } void kv_test_t::start_change(const std::string & key) { changing_keys.insert(key); for (auto lst: listings) { lst->inflights.insert(key); } } void kv_test_t::stop_change(const std::string & key) { changing_keys.erase(key); } int main(int narg, const char *args[]) { setvbuf(stdout, NULL, _IONBF, 0); setvbuf(stderr, NULL, _IONBF, 0); exe_name = args[0]; kv_test_t *p = new kv_test_t(); p->run(kv_test_t::parse_args(narg, args)); delete p; return 0; }