diff --git a/src/kv_stress.cpp b/src/kv_stress.cpp index f62e681a..d73c86af 100644 --- a/src/kv_stress.cpp +++ b/src/kv_stress.cpp @@ -23,9 +23,22 @@ struct kv_test_listing_t void *handle = NULL; std::string next_after; std::set inflights; + timespec tv_begin; bool error = false; }; +struct kv_test_lat_t +{ + const char *name = NULL; + uint64_t usec = 0, count = 0; +}; + +struct kv_test_stat_t +{ + kv_test_lat_t get, add, update, del, list; + uint64_t list_keys = 0; +}; + class kv_test_t { public: @@ -33,6 +46,7 @@ public: json11::Json::object kv_cfg; uint64_t inode_id = 0; uint64_t op_count = 1000000; + uint64_t runtime_sec = 0; uint64_t parallelism = 4; uint64_t reopen_prob = 1; uint64_t get_prob = 30000; @@ -44,9 +58,12 @@ public: uint64_t max_key_len = 70; uint64_t min_value_len = 50; uint64_t max_value_len = 300; + uint64_t print_stats_interval = 1; + bool trace = true; bool stop_on_error = false; // FIXME: Multiple clients - // FIXME: Print op statistics + kv_test_stat_t stat, prev_stat; + timespec prev_stat_time, start_stat_time; // State kv_dbw_t *db = NULL; @@ -57,7 +74,7 @@ public: bool finished = false; uint64_t total_prob = 0; uint64_t ops_done = 0; - uint64_t get_done = 0, add_done = 0, update_done = 0, del_done = 0, list_done = 0; + int stat_timer_id = -1; int in_progress = 0; bool reopening = false; std::set listings; @@ -67,10 +84,13 @@ public: ~kv_test_t(); static json11::Json::object parse_args(int narg, const char *args[]); + void parse_config(json11::Json cfg); void run(json11::Json cfg); void loop(); + void print_stats(); void start_change(const std::string & key); void stop_change(const std::string & key); + void add_stat(kv_test_lat_t & stat, timespec tv_begin); }; kv_test_t::~kv_test_t() @@ -101,7 +121,9 @@ json11::Json::object kv_test_t::parse_args(int narg, const char *args[]) "\n" "USAGE: %s --pool_id POOL_ID --inode_id INODE_ID [OPTIONS]\n" " --op_count 1000000\n" - " Total operations to run during test\n" + " Total operations to run during test. 0 means unlimited\n" + " --runtime 0\n" + " Run for this number of seconds. 0 means unlimited\n" " --parallelism 4\n" " Run this number of operations in parallel\n" " --get_prob 30000\n" @@ -122,9 +144,10 @@ json11::Json::object kv_test_t::parse_args(int narg, const char *args[]) " Minimum value size in bytes\n" " --max_value_len 300\n" " Maximum value size in bytes\n" - " --verify 1\n" - " Verify results of retrieve and list operations\n" - " Uses extra RAM because a copy of the DB is stored in memory\n" + " --print_stats 1\n" + " Print operation statistics every this number of seconds\n" + " --trace 1\n" + " Print all executed operations\n" " --stop_on_error 0\n" " Stop on first execution error, mismatch, lost key or extra key during listing\n" " --kv_memory_limit 128M\n" @@ -150,12 +173,13 @@ json11::Json::object kv_test_t::parse_args(int narg, const char *args[]) return cfg; } -void kv_test_t::run(json11::Json cfg) +void kv_test_t::parse_config(json11::Json cfg) { - srand48(time(NULL)); inode_id = INODE_WITH_POOL(cfg["pool_id"].uint64_value(), cfg["inode_id"].uint64_value()); if (cfg["op_count"].uint64_value() > 0) op_count = cfg["op_count"].uint64_value(); + if (cfg["runtime"].uint64_value() > 0) + runtime_sec = cfg["runtime"].uint64_value(); if (cfg["parallelism"].uint64_value() > 0) parallelism = cfg["parallelism"].uint64_value(); if (!cfg["reopen_prob"].is_null()) @@ -178,8 +202,12 @@ void kv_test_t::run(json11::Json cfg) min_value_len = cfg["min_value_len"].uint64_value(); if (cfg["max_value_len"].uint64_value() > 0) max_value_len = cfg["max_value_len"].uint64_value(); - if (cfg["stop_on_error"].bool_value()) - stop_on_error = true; + if (!cfg["print_stats"].is_null()) + print_stats_interval = cfg["print_stats"].uint64_value(); + if (!cfg["trace"].is_null()) + trace = cfg["trace"].bool_value(); + if (!cfg["stop_on_error"].is_null()) + stop_on_error = cfg["stop_on_error"].bool_value(); if (!cfg["kv_memory_limit"].is_null()) kv_cfg["kv_memory_limit"] = cfg["kv_memory_limit"]; if (!cfg["kv_evict_max_misses"].is_null()) @@ -189,6 +217,17 @@ void kv_test_t::run(json11::Json cfg) if (!cfg["kv_evict_unused_age"].is_null()) kv_cfg["kv_evict_unused_age"] = cfg["kv_evict_unused_age"]; total_prob = reopen_prob+get_prob+add_prob+update_prob+del_prob+list_prob; + stat.get.name = "get"; + stat.add.name = "add"; + stat.update.name = "update"; + stat.del.name = "del"; + stat.list.name = "list"; +} + +void kv_test_t::run(json11::Json cfg) +{ + srand48(time(NULL)); + parse_config(cfg); // Create client ringloop = new ring_loop_t(512); epmgr = new epoll_manager_t(ringloop); @@ -217,12 +256,18 @@ void kv_test_t::run(json11::Json cfg) }); consumer.loop = [this]() { loop(); }; ringloop->register_consumer(&consumer); + if (print_stats_interval) + stat_timer_id = epmgr->tfd->set_timer(print_stats_interval*1000, true, [this](int) { print_stats(); }); + clock_gettime(CLOCK_REALTIME, &start_stat_time); + prev_stat_time = start_stat_time; while (!finished) { ringloop->loop(); if (!finished) ringloop->wait(); } + if (stat_timer_id >= 0) + epmgr->tfd->clear_timer(stat_timer_id); ringloop->unregister_consumer(&consumer); // Destroy the client delete db; @@ -294,9 +339,13 @@ void kv_test_t::loop() if (changing_keys.find(key) != changing_keys.end()) continue; in_progress++; - printf("get %s\n", key.c_str()); - db->get(key, [this, key](int res, const std::string & value) + if (trace) + printf("get %s\n", key.c_str()); + timespec tv_begin; + clock_gettime(CLOCK_REALTIME, &tv_begin); + db->get(key, [this, key, tv_begin](int res, const std::string & value) { + add_stat(stat.get, tv_begin); ops_done++; in_progress--; auto it = values.find(key); @@ -312,8 +361,6 @@ void kv_test_t::loop() if (stop_on_error) exit(1); } - else - get_done++; ringloop->wakeup(); }); } @@ -343,9 +390,13 @@ void kv_test_t::loop() auto value = random_str(value_len); start_change(key); in_progress++; - printf("set %s = %s\n", key.c_str(), value.c_str()); - db->set(key, value, [this, key, value, is_add](int res) + if (trace) + printf("set %s = %s\n", key.c_str(), value.c_str()); + timespec tv_begin; + clock_gettime(CLOCK_REALTIME, &tv_begin); + db->set(key, value, [this, key, value, tv_begin, is_add](int res) { + add_stat(is_add ? stat.add : stat.update, tv_begin); stop_change(key); ops_done++; in_progress--; @@ -357,10 +408,6 @@ void kv_test_t::loop() } else { - if (is_add) - add_done++; - else - update_done++; values[key] = value; } ringloop->wakeup(); @@ -378,9 +425,13 @@ void kv_test_t::loop() continue; start_change(key); in_progress++; - printf("del %s\n", key.c_str()); - db->del(key, [this, key](int res) + if (trace) + printf("del %s\n", key.c_str()); + timespec tv_begin; + clock_gettime(CLOCK_REALTIME, &tv_begin); + db->del(key, [this, key, tv_begin](int res) { + add_stat(stat.del, tv_begin); stop_change(key); ops_done++; in_progress--; @@ -392,7 +443,6 @@ void kv_test_t::loop() } else { - del_done++; values.erase(key); } ringloop->wakeup(); @@ -409,11 +459,14 @@ void kv_test_t::loop() lst->next_after = k_it == values.begin() ? "" : key; lst->inflights = changing_keys; listings.insert(lst); - printf("list from %s\n", key.c_str()); + if (trace) + printf("list from %s\n", key.c_str()); + clock_gettime(CLOCK_REALTIME, &lst->tv_begin); db->list_next(lst->handle, [this, lst](int res, const std::string & key, const std::string & value) { if (res < 0) { + add_stat(stat.list, lst->tv_begin); if (res != -ENOENT) { printf("ERROR: list: %d (%s)\n", res, strerror(-res)); @@ -421,7 +474,6 @@ void kv_test_t::loop() } else { - list_done++; auto k_it = lst->next_after == "" ? values.begin() : values.upper_bound(lst->next_after); while (k_it != values.end()) { @@ -445,6 +497,7 @@ void kv_test_t::loop() } else { + stat.list_keys++; // Do not check modified keys in listing // Listing may return their old or new state if (lst->inflights.find(key) == lst->inflights.end()) @@ -489,6 +542,46 @@ void kv_test_t::loop() } } +void kv_test_t::add_stat(kv_test_lat_t & stat, timespec tv_begin) +{ + timespec tv_end; + clock_gettime(CLOCK_REALTIME, &tv_end); + int64_t usec = (tv_end.tv_sec - tv_begin.tv_sec)*1000000 + + (tv_end.tv_nsec - tv_begin.tv_nsec)/1000; + if (usec > 0) + { + stat.usec += usec; + stat.count++; + } +} + +void kv_test_t::print_stats() +{ + timespec cur_stat_time; + clock_gettime(CLOCK_REALTIME, &cur_stat_time); + int64_t usec = (cur_stat_time.tv_sec - prev_stat_time.tv_sec)*1000000 + + (cur_stat_time.tv_nsec - prev_stat_time.tv_nsec)/1000; + if (usec > 0) + { + kv_test_lat_t *lats[] = { &stat.get, &stat.add, &stat.update, &stat.del, &stat.list }; + kv_test_lat_t *prev[] = { &prev_stat.get, &prev_stat.add, &prev_stat.update, &prev_stat.del, &prev_stat.list }; + char buf[128] = { 0 }; + for (int i = 0; i < sizeof(lats)/sizeof(lats[0]); i++) + { + snprintf(buf, sizeof(buf)-1, "%.1f %s/s (%lu us)", (lats[i]->count-prev[i]->count)*1000000.0/usec, + lats[i]->name, (lats[i]->usec-prev[i]->usec)/(lats[i]->count-prev[i]->count > 0 ? lats[i]->count-prev[i]->count : 1)); + int k; + for (k = strlen(buf); k < strlen(lats[i]->name)+21; k++) + buf[k] = ' '; + buf[k] = 0; + printf("%s", buf); + } + printf("\n"); + } + prev_stat = stat; + prev_stat_time = cur_stat_time; +} + void kv_test_t::start_change(const std::string & key) { changing_keys.insert(key);