diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index fb24839f..0d1629ef 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -195,6 +195,15 @@ target_link_libraries(vitastor-kv vitastor_client ) +add_executable(vitastor-kv-stress + kv_stress.cpp + kv_db.cpp + kv_db.h +) +target_link_libraries(vitastor-kv-stress + vitastor_client +) + # vitastor-nfs add_executable(vitastor-nfs nfs_proxy.cpp diff --git a/src/kv_stress.cpp b/src/kv_stress.cpp new file mode 100644 index 00000000..54557fb2 --- /dev/null +++ b/src/kv_stress.cpp @@ -0,0 +1,419 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 (see README.md for details) +// +// Vitastor shared key/value database stress tester / benchmark + +#define _XOPEN_SOURCE +#include + +#include +#include +#include +#include +//#include + +#include "epoll_manager.h" +#include "str_util.h" +#include "kv_db.h" + +const char *exe_name = NULL; + +struct kv_test_listing_t +{ + void *handle = NULL; + std::string next_after; + std::set inflights; +}; + +class kv_test_t +{ +public: + // Config + json11::Json::object kv_cfg; + uint64_t inode_id = 0; + uint64_t op_count = 1000000; + uint64_t parallelism = 4; + uint64_t reopen_prob = 1; + uint64_t get_prob = 30000; + uint64_t add_prob = 20000; + uint64_t update_prob = 20000; + uint64_t del_prob = 30000; + uint64_t list_prob = 300; + uint64_t min_key_len = 10; + uint64_t max_key_len = 70; + uint64_t min_value_len = 50; + uint64_t max_value_len = 300; + // FIXME: Multiple clients + // FIXME: Print op statistics + + // State + kv_dbw_t *db = NULL; + ring_loop_t *ringloop = NULL; + epoll_manager_t *epmgr = NULL; + cluster_client_t *cli = NULL; + ring_consumer_t consumer; + bool finished = false; + uint64_t total_prob = 0; + uint64_t ops_done = 0; + uint64_t get_done = 0, add_done = 0, update_done = 0, del_done = 0, list_done = 0; + int in_progress = 0; + bool reopening = false; + std::set listings; + std::set changing_keys; + std::map values; + + ~kv_test_t(); + + static json11::Json::object parse_args(int narg, const char *args[]); + void run(json11::Json cfg); + void loop(); + void start_change(const std::string & key); + void stop_change(const std::string & key); +}; + +kv_test_t::~kv_test_t() +{ + if (db) + delete db; + if (cli) + { + cli->flush(); + delete cli; + } + if (epmgr) + delete epmgr; + if (ringloop) + delete ringloop; +} + +json11::Json::object kv_test_t::parse_args(int narg, const char *args[]) +{ + json11::Json::object cfg; + for (int i = 1; i < narg; i++) + { + if (!strcmp(args[i], "-h") || !strcmp(args[i], "--help")) + { + printf( + "Vitastor Key/Value DB stress tester / benchmark\n" + "(c) Vitaliy Filippov, 2023+ (VNPL-1.1)\n" + "\n" + "USAGE: %s [--etcd_address ADDR]\n", + exe_name + ); + exit(0); + } + else if (args[i][0] == '-' && args[i][1] == '-') + { + const char *opt = args[i]+2; + cfg[opt] = !strcmp(opt, "json") || i == narg-1 ? "1" : args[++i]; + } + } + return cfg; +} + +void kv_test_t::run(json11::Json cfg) +{ + srand48(time(NULL)); + inode_id = INODE_WITH_POOL(cfg["pool_id"].uint64_value(), cfg["inode_id"].uint64_value()); + if (cfg["op_count"].uint64_value() > 0) + op_count = cfg["op_count"].uint64_value(); + if (cfg["parallelism"].uint64_value() > 0) + parallelism = cfg["parallelism"].uint64_value(); + if (!cfg["reopen_prob"].is_null()) + reopen_prob = cfg["reopen_prob"].uint64_value(); + if (!cfg["get_prob"].is_null()) + get_prob = cfg["get_prob"].uint64_value(); + if (!cfg["add_prob"].is_null()) + add_prob = cfg["add_prob"].uint64_value(); + if (!cfg["update_prob"].is_null()) + update_prob = cfg["update_prob"].uint64_value(); + if (!cfg["del_prob"].is_null()) + del_prob = cfg["del_prob"].uint64_value(); + if (!cfg["list_prob"].is_null()) + list_prob = cfg["list_prob"].uint64_value(); + if (!cfg["min_key_len"].is_null()) + min_key_len = cfg["min_key_len"].uint64_value(); + if (cfg["max_key_len"].uint64_value() > 0) + max_key_len = cfg["max_key_len"].uint64_value(); + if (!cfg["min_value_len"].is_null()) + min_value_len = cfg["min_value_len"].uint64_value(); + if (cfg["max_value_len"].uint64_value() > 0) + max_value_len = cfg["max_value_len"].uint64_value(); + if (!cfg["kv_memory_limit"].is_null()) + kv_cfg["kv_memory_limit"] = cfg["kv_memory_limit"]; + if (!cfg["kv_evict_max_misses"].is_null()) + kv_cfg["kv_evict_max_misses"] = cfg["kv_evict_max_misses"]; + if (!cfg["kv_evict_attempts_per_level"].is_null()) + kv_cfg["kv_evict_attempts_per_level"] = cfg["kv_evict_attempts_per_level"]; + if (!cfg["kv_evict_unused_age"].is_null()) + kv_cfg["kv_evict_unused_age"] = cfg["kv_evict_unused_age"]; + total_prob = reopen_prob+get_prob+add_prob+update_prob+del_prob+list_prob; + // Create client + ringloop = new ring_loop_t(512); + epmgr = new epoll_manager_t(ringloop); + cli = new cluster_client_t(ringloop, epmgr->tfd, cfg); + db = new kv_dbw_t(cli); + // Load image metadata + while (!cli->is_ready()) + { + ringloop->loop(); + if (cli->is_ready()) + break; + ringloop->wait(); + } + // Run + reopening = true; + db->open(inode_id, kv_cfg, [this](int res) + { + reopening = false; + if (res < 0) + { + printf("ERROR: Open index: %d (%s)\n", res, strerror(-res)); + exit(1); + } + printf("Index opened\n"); + ringloop->wakeup(); + }); + consumer.loop = [this]() { loop(); }; + ringloop->register_consumer(&consumer); + while (!finished) + { + ringloop->loop(); + if (!finished) + ringloop->wait(); + } + ringloop->unregister_consumer(&consumer); + // Destroy the client + delete db; + db = NULL; + cli->flush(); + delete cli; + delete epmgr; + delete ringloop; + cli = NULL; + epmgr = NULL; + ringloop = NULL; +} + +static const char *base64_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789@+/"; + +std::string random_str(int len) +{ + std::string str; + str.resize(len); + for (int i = 0; i < len; i++) + { + str[i] = base64_chars[lrand48() % 64]; + } + return str; +} + +void kv_test_t::loop() +{ + if (reopening) + { + return; + } + if (ops_done >= op_count) + { + finished = true; + } + while (!finished && in_progress < parallelism) + { + uint64_t dice = (lrand48() % total_prob); + if (dice < reopen_prob) + { + reopening = true; + db->close([this]() + { + printf("Index closed\n"); + db->open(inode_id, kv_cfg, [this](int res) + { + reopening = false; + if (res < 0) + { + printf("ERROR: Reopen index: %d (%s)\n", res, strerror(-res)); + finished = true; + return; + } + printf("Index reopened\n"); + ringloop->wakeup(); + }); + }); + return; + } + else if (dice < reopen_prob+get_prob) + { + // get existing + auto key = random_str(max_key_len); + auto k_it = values.lower_bound(key); + if (k_it == values.end()) + continue; + key = k_it->first; + if (changing_keys.find(key) != changing_keys.end()) + continue; + in_progress++; + printf("get %s\n", key.c_str()); + db->get(key, [this, key](int res, const std::string & value) + { + ops_done++; + in_progress--; + auto it = values.find(key); + if (res != (it == values.end() ? -ENOENT : 0)) + printf("ERROR: get %s: %d (%s)\n", key.c_str(), res, strerror(-res)); + else if (it != values.end() && value != it->second) + printf("ERROR: get %s: mismatch: %s vs %s\n", key.c_str(), value.c_str(), it->second.c_str()); + else + get_done++; + ringloop->wakeup(); + }); + } + else if (dice < reopen_prob+get_prob+add_prob+update_prob) + { + bool is_add = false; + std::string key; + if (dice < reopen_prob+get_prob+add_prob) + { + // add + is_add = true; + uint64_t key_len = min_key_len + (max_key_len > min_key_len ? lrand48() % (max_key_len-min_key_len) : 0); + key = random_str(key_len); + } + else + { + // update + key = random_str(max_key_len); + auto k_it = values.lower_bound(key); + if (k_it == values.end()) + continue; + key = k_it->first; + } + uint64_t value_len = min_value_len + (max_value_len > min_value_len ? lrand48() % (max_value_len-min_value_len) : 0); + auto value = random_str(value_len); + start_change(key); + in_progress++; + printf("set %s\n", key.c_str()); + db->set(key, value, [this, key, value, is_add](int res) + { + stop_change(key); + ops_done++; + in_progress--; + if (res != 0) + printf("ERROR: set %s = %s: %d (%s)\n", key.c_str(), value.c_str(), res, strerror(-res)); + else + { + if (is_add) + add_done++; + else + update_done++; + values[key] = value; + } + ringloop->wakeup(); + }, NULL); + } + else if (dice < reopen_prob+get_prob+add_prob+update_prob+del_prob) + { + // delete + auto key = random_str(max_key_len); + auto k_it = values.lower_bound(key); + if (k_it == values.end()) + continue; + key = k_it->first; + start_change(key); + in_progress++; + printf("del %s\n", key.c_str()); + db->del(key, [this, key](int res) + { + stop_change(key); + ops_done++; + in_progress--; + if (res != 0) + printf("ERROR: del %s: %d (%s)\n", key.c_str(), res, strerror(-res)); + else + { + del_done++; + values.erase(key); + } + ringloop->wakeup(); + }, NULL); + } + else if (dice < reopen_prob+get_prob+add_prob+update_prob+del_prob+list_prob) + { + // list + in_progress++; + auto key = random_str(max_key_len); + auto lst = new kv_test_listing_t; + lst->handle = db->list_start(key); + auto k_it = values.lower_bound(key); + lst->next_after = k_it == values.begin() ? "" : key; + lst->inflights = changing_keys; + listings.insert(lst); + printf("list %s\n", key.c_str()); + db->list_next(lst->handle, [this, lst](int res, const std::string & key, const std::string & value) + { + if (res < 0) + { + if (res != -ENOENT) + printf("ERROR: list: %d (%s)\n", res, strerror(-res)); + else + { + list_done++; + auto k_it = values.upper_bound(lst->next_after); + while (k_it != values.end() && lst->inflights.find(k_it->first) != lst->inflights.end()) + k_it++; + if (k_it != values.end()) + printf("ERROR: list: missed all keys from %s\n", k_it->first.c_str()); + } + ops_done++; + in_progress--; + db->list_close(lst->handle); + delete lst; + listings.erase(lst); + ringloop->wakeup(); + } + else + { + // Do not check modified keys in listing + // Listing may return their old or new state + if (lst->inflights.find(key) == lst->inflights.end()) + { + auto k_it = values.upper_bound(lst->next_after); + while (k_it != values.end() && lst->inflights.find(k_it->first) != lst->inflights.end()) + k_it++; + if (k_it == values.end()) + printf("ERROR: list: returned extra key %s\n", key.c_str()); + else if (k_it->second != value) + printf("ERROR: list: mismatch: %s = %s but should be %s\n", key.c_str(), value.c_str(), k_it->second.c_str()); + lst->next_after = k_it->first; + } + db->list_next(lst->handle, NULL); + } + }); + } + } +} + +void kv_test_t::start_change(const std::string & key) +{ + changing_keys.insert(key); + for (auto lst: listings) + { + lst->inflights.insert(key); + } +} + +void kv_test_t::stop_change(const std::string & key) +{ + changing_keys.erase(key); +} + +int main(int narg, const char *args[]) +{ + setvbuf(stdout, NULL, _IONBF, 0); + setvbuf(stderr, NULL, _IONBF, 0); + exe_name = args[0]; + kv_test_t *p = new kv_test_t(); + p->run(kv_test_t::parse_args(narg, args)); + delete p; + return 0; +}