From f70da82317a76fe068db37b675e66dbe8c69b37d Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 15 Mar 2024 02:24:29 +0300 Subject: [PATCH] Add loadjson command to vitastor-kv --- src/kv_cli.cpp | 135 +++++++++++++++++++++++++++++++++++++++++++++-- src/str_util.cpp | 52 +++++++++++------- src/str_util.h | 5 +- 3 files changed, 169 insertions(+), 23 deletions(-) diff --git a/src/kv_cli.cpp b/src/kv_cli.cpp index fe453bc0..923e2e45 100644 --- a/src/kv_cli.cpp +++ b/src/kv_cli.cpp @@ -28,13 +28,19 @@ public: ring_loop_t *ringloop = NULL; epoll_manager_t *epmgr = NULL; cluster_client_t *cli = NULL; + int load_parallelism = 128; bool opened = false; - bool interactive = false; + bool interactive = false, is_file = false; int in_progress = 0; char *cur_cmd = NULL; int cur_cmd_size = 0, cur_cmd_alloc = 0; bool finished = false, eof = false; + std::function load_cb; + bool loading_json = false, in_loadjson = false; + int load_state = 0; + std::string load_key; + ~kv_cli_t(); void parse_args(int narg, const char *args[]); @@ -43,6 +49,7 @@ public: void next_cmd(); std::vector parse_cmd(const std::string & cmdstr); void handle_cmd(const std::vector & cmd, std::function cb); + void loadjson(); }; kv_cli_t::~kv_cli_t() @@ -86,6 +93,7 @@ void kv_cli_t::parse_args(int narg, const char *args[]) " list [ [end]]\n" " dump [ [end]]\n" " dumpjson [ [end]]\n" + " loadjson\n" "\n" " should be the name of Vitastor image with the DB.\n" "Without , you get an interactive DB shell.\n" @@ -195,6 +203,7 @@ void kv_cli_t::run() catch (std::exception & e) { // Can't add to epoll, STDIN is probably a file + is_file = true; read_cmd(); } while (!finished) @@ -240,6 +249,11 @@ void kv_cli_t::read_cmd() void kv_cli_t::next_cmd() { + if (loading_json) + { + loadjson(); + return; + } if (in_progress > 0) { return; @@ -313,6 +327,112 @@ std::vector kv_cli_t::parse_cmd(const std::string & str) return res; } +void kv_cli_t::loadjson() +{ + // simple streaming json parser + if (in_progress >= load_parallelism || in_loadjson) + { + return; + } + in_loadjson = true; + if (load_state == 5) + { +st_5: + if (!in_progress) + { + loading_json = false; + auto cb = std::move(load_cb); + cb(0); + } + in_loadjson = false; + return; + } + do + { + read_cmd(); + size_t pos = 0; + while (true) + { + while (pos < cur_cmd_size && is_white(cur_cmd[pos])) + { + pos++; + } + if (pos >= cur_cmd_size) + { + break; + } + if (load_state == 0 || load_state == 2) + { + char expected = "{ :"[load_state]; + if (cur_cmd[pos] != expected) + { + fprintf(stderr, "Unexpected %c, expected %c\n", cur_cmd[pos], expected); + exit(1); + } + pos++; + load_state++; + } + else if (load_state == 1 || load_state == 3) + { + if (cur_cmd[pos] != '"') + { + fprintf(stderr, "Unexpected %c, expected \"\n", cur_cmd[pos]); + exit(1); + } + size_t prev = pos; + auto str = scan_escaped(cur_cmd, cur_cmd_size, pos, false); + if (pos == prev) + { + break; + } + load_state++; + if (load_state == 2) + { + load_key = str; + } + else + { + in_progress++; + handle_cmd({ "set", load_key, str }, [this](int res) + { + in_progress--; + next_cmd(); + }); + if (in_progress >= load_parallelism) + { + break; + } + } + } + else if (load_state == 4) + { + if (cur_cmd[pos] == ',') + { + pos++; + load_state = 1; + } + else if (cur_cmd[pos] == '}') + { + pos++; + load_state = 5; + goto st_5; + } + else + { + fprintf(stderr, "Unexpected %c, expected , or }\n", cur_cmd[pos]); + exit(1); + } + } + } + if (pos < cur_cmd_size) + { + memmove(cur_cmd, cur_cmd+pos, cur_cmd_size-pos); + } + cur_cmd_size -= pos; + } while (loading_json && is_file); + in_loadjson = false; +} + void kv_cli_t::handle_cmd(const std::vector & cmd, std::function cb) { if (!cmd.size()) @@ -459,11 +579,11 @@ void kv_cli_t::handle_cmd(const std::vector & cmd, std::functionset(key, value, [this, cb](int res) + db->set(key, value, [this, cb, l = loading_json](int res) { if (res < 0) fprintf(stderr, "Error: %s (code %d)\n", strerror(-res), res); - else + else if (!l) fprintf(interactive ? stdout : stderr, "OK\n"); cb(res); }); @@ -505,6 +625,13 @@ void kv_cli_t::handle_cmd(const std::vector & cmd, std::functionclose([=]() @@ -526,7 +653,7 @@ void kv_cli_t::handle_cmd(const std::vector & cmd, std::function\nopen \n" "config \n" "get \nset \ndel \n" - "list [ [end]]\ndump [ [end]]\ndumpjson [ [end]]\n" + "list [ [end]]\ndump [ [end]]\ndumpjson [ [end]]\nloadjson\n" "close\nquit\n", opname.c_str() ); cb(-EINVAL); diff --git a/src/str_util.cpp b/src/str_util.cpp index be01948b..ee6ca396 100644 --- a/src/str_util.cpp +++ b/src/str_util.cpp @@ -367,45 +367,61 @@ std::vector explode(const std::string & sep, const std::string & va return res; } -// extract possibly single- or double-quoted part of string with escape characters -std::string scan_escaped(const std::string & cmd, size_t & pos) +std::string scan_escaped(const std::string & cmd, size_t & pos, bool allow_unquoted) { - pos = cmd.find_first_not_of(" \t\r\n", pos); - if (pos == std::string::npos) + return scan_escaped(cmd.data(), cmd.size(), pos, allow_unquoted); +} + +// extract possibly single- or double-quoted part of string with escape characters +std::string scan_escaped(const char *cmd, size_t size, size_t & pos, bool allow_unquoted) +{ + auto orig = pos; + while (pos < size && is_white(cmd[pos])) + pos++; + if (pos >= size) { - pos = cmd.size(); + pos = orig; return ""; } if (cmd[pos] != '"' && cmd[pos] != '\'') { - auto pos2 = cmd.find_first_of(" \t\r\n", pos); - pos2 = (pos2 == std::string::npos ? cmd.size() : pos2); - auto key = cmd.substr(pos, pos2-pos); + if (!allow_unquoted) + { + pos = orig; + return ""; + } + auto pos2 = pos; + while (pos2 < size && !is_white(cmd[pos2])) + pos2++; + auto key = std::string(cmd+pos, pos2-pos); pos = pos2; return key; } char quot = cmd[pos]; - char quot_or_slash[3] = { '\\', quot, 0 }; pos++; std::string key; - while (pos < cmd.size()) + while (true) { - auto pos2 = cmd.find_first_of(quot_or_slash, pos); - pos2 = pos2 == std::string::npos ? cmd.size() : pos2; + auto pos2 = pos; + while (pos2 < size && cmd[pos2] != '\\' && cmd[pos2] != quot) + pos2++; + if (pos2 >= size || pos2 == size-1 && cmd[pos2] == '\\') + { + // Unfinished string literal + pos = orig; + return ""; + } if (pos2 > pos) - key += cmd.substr(pos, pos2-pos); + key += std::string(cmd+pos, pos2-pos); pos = pos2; - if (pos >= cmd.size()) - break; if (cmd[pos] == quot) { pos++; break; } - if (cmd[pos] == '\\') + else /* if (cmd[pos] == '\\') */ { - if (pos < cmd.size()-1) - key += cmd[++pos]; + key += cmd[++pos]; pos++; } } diff --git a/src/str_util.h b/src/str_util.h index 92a7f18b..9f89fdf2 100644 --- a/src/str_util.h +++ b/src/str_util.h @@ -6,6 +6,8 @@ #include #include +#define is_white(a) ((a) == ' ' || (a) == '\t' || (a) == '\r' || (a) == '\n') + std::string base64_encode(const std::string &in); std::string base64_decode(const std::string &in); uint64_t parse_size(std::string size_str, bool *ok = NULL); @@ -23,7 +25,8 @@ std::string str_repeat(const std::string & str, int times); size_t utf8_length(const std::string & s); size_t utf8_length(const char *s); std::vector explode(const std::string & sep, const std::string & value, bool trim); -std::string scan_escaped(const std::string & cmd, size_t & pos); +std::string scan_escaped(const char *cmd, size_t size, size_t & pos, bool allow_unquoted = true); +std::string scan_escaped(const std::string & cmd, size_t & pos, bool allow_unquoted = true); std::string auto_addslashes(const std::string & str, const char *toescape = "\\\""); std::string addslashes(const std::string & str, const char *toescape = "\\\""); std::string realpath_str(std::string path, bool nofail = true);