Add loadjson command to vitastor-kv

master
Vitaliy Filippov 2024-03-15 02:24:29 +03:00
parent e42148f347
commit f70da82317
3 changed files with 169 additions and 23 deletions

View File

@ -28,13 +28,19 @@ public:
ring_loop_t *ringloop = NULL;
epoll_manager_t *epmgr = NULL;
cluster_client_t *cli = NULL;
int load_parallelism = 128;
bool opened = false;
bool interactive = false;
bool interactive = false, is_file = false;
int in_progress = 0;
char *cur_cmd = NULL;
int cur_cmd_size = 0, cur_cmd_alloc = 0;
bool finished = false, eof = false;
std::function<void(int)> load_cb;
bool loading_json = false, in_loadjson = false;
int load_state = 0;
std::string load_key;
~kv_cli_t();
void parse_args(int narg, const char *args[]);
@ -43,6 +49,7 @@ public:
void next_cmd();
std::vector<std::string> parse_cmd(const std::string & cmdstr);
void handle_cmd(const std::vector<std::string> & cmd, std::function<void(int)> cb);
void loadjson();
};
kv_cli_t::~kv_cli_t()
@ -86,6 +93,7 @@ void kv_cli_t::parse_args(int narg, const char *args[])
" list [<start> [end]]\n"
" dump [<start> [end]]\n"
" dumpjson [<start> [end]]\n"
" loadjson\n"
"\n"
"<IMAGE> should be the name of Vitastor image with the DB.\n"
"Without <COMMAND>, you get an interactive DB shell.\n"
@ -195,6 +203,7 @@ void kv_cli_t::run()
catch (std::exception & e)
{
// Can't add to epoll, STDIN is probably a file
is_file = true;
read_cmd();
}
while (!finished)
@ -240,6 +249,11 @@ void kv_cli_t::read_cmd()
void kv_cli_t::next_cmd()
{
if (loading_json)
{
loadjson();
return;
}
if (in_progress > 0)
{
return;
@ -313,6 +327,112 @@ std::vector<std::string> kv_cli_t::parse_cmd(const std::string & str)
return res;
}
void kv_cli_t::loadjson()
{
// simple streaming json parser
if (in_progress >= load_parallelism || in_loadjson)
{
return;
}
in_loadjson = true;
if (load_state == 5)
{
st_5:
if (!in_progress)
{
loading_json = false;
auto cb = std::move(load_cb);
cb(0);
}
in_loadjson = false;
return;
}
do
{
read_cmd();
size_t pos = 0;
while (true)
{
while (pos < cur_cmd_size && is_white(cur_cmd[pos]))
{
pos++;
}
if (pos >= cur_cmd_size)
{
break;
}
if (load_state == 0 || load_state == 2)
{
char expected = "{ :"[load_state];
if (cur_cmd[pos] != expected)
{
fprintf(stderr, "Unexpected %c, expected %c\n", cur_cmd[pos], expected);
exit(1);
}
pos++;
load_state++;
}
else if (load_state == 1 || load_state == 3)
{
if (cur_cmd[pos] != '"')
{
fprintf(stderr, "Unexpected %c, expected \"\n", cur_cmd[pos]);
exit(1);
}
size_t prev = pos;
auto str = scan_escaped(cur_cmd, cur_cmd_size, pos, false);
if (pos == prev)
{
break;
}
load_state++;
if (load_state == 2)
{
load_key = str;
}
else
{
in_progress++;
handle_cmd({ "set", load_key, str }, [this](int res)
{
in_progress--;
next_cmd();
});
if (in_progress >= load_parallelism)
{
break;
}
}
}
else if (load_state == 4)
{
if (cur_cmd[pos] == ',')
{
pos++;
load_state = 1;
}
else if (cur_cmd[pos] == '}')
{
pos++;
load_state = 5;
goto st_5;
}
else
{
fprintf(stderr, "Unexpected %c, expected , or }\n", cur_cmd[pos]);
exit(1);
}
}
}
if (pos < cur_cmd_size)
{
memmove(cur_cmd, cur_cmd+pos, cur_cmd_size-pos);
}
cur_cmd_size -= pos;
} while (loading_json && is_file);
in_loadjson = false;
}
void kv_cli_t::handle_cmd(const std::vector<std::string> & cmd, std::function<void(int)> cb)
{
if (!cmd.size())
@ -459,11 +579,11 @@ void kv_cli_t::handle_cmd(const std::vector<std::string> & cmd, std::function<vo
}
auto & key = cmd[1];
auto & value = cmd[2];
db->set(key, value, [this, cb](int res)
db->set(key, value, [this, cb, l = loading_json](int res)
{
if (res < 0)
fprintf(stderr, "Error: %s (code %d)\n", strerror(-res), res);
else
else if (!l)
fprintf(interactive ? stdout : stderr, "OK\n");
cb(res);
});
@ -505,6 +625,13 @@ void kv_cli_t::handle_cmd(const std::vector<std::string> & cmd, std::function<vo
}
});
}
else if (opname == "loadjson")
{
loading_json = true;
load_state = 0;
load_cb = cb;
loadjson();
}
else if (opname == "close")
{
db->close([=]()
@ -526,7 +653,7 @@ void kv_cli_t::handle_cmd(const std::vector<std::string> & cmd, std::function<vo
"open <image>\nopen <pool_id> <inode_id>\n"
"config <property> <value>\n"
"get <key>\nset <key> <value>\ndel <key>\n"
"list [<start> [end]]\ndump [<start> [end]]\ndumpjson [<start> [end]]\n"
"list [<start> [end]]\ndump [<start> [end]]\ndumpjson [<start> [end]]\nloadjson\n"
"close\nquit\n", opname.c_str()
);
cb(-EINVAL);

View File

@ -367,45 +367,61 @@ std::vector<std::string> explode(const std::string & sep, const std::string & va
return res;
}
// extract possibly single- or double-quoted part of string with escape characters
std::string scan_escaped(const std::string & cmd, size_t & pos)
std::string scan_escaped(const std::string & cmd, size_t & pos, bool allow_unquoted)
{
pos = cmd.find_first_not_of(" \t\r\n", pos);
if (pos == std::string::npos)
return scan_escaped(cmd.data(), cmd.size(), pos, allow_unquoted);
}
// extract possibly single- or double-quoted part of string with escape characters
std::string scan_escaped(const char *cmd, size_t size, size_t & pos, bool allow_unquoted)
{
auto orig = pos;
while (pos < size && is_white(cmd[pos]))
pos++;
if (pos >= size)
{
pos = cmd.size();
pos = orig;
return "";
}
if (cmd[pos] != '"' && cmd[pos] != '\'')
{
auto pos2 = cmd.find_first_of(" \t\r\n", pos);
pos2 = (pos2 == std::string::npos ? cmd.size() : pos2);
auto key = cmd.substr(pos, pos2-pos);
if (!allow_unquoted)
{
pos = orig;
return "";
}
auto pos2 = pos;
while (pos2 < size && !is_white(cmd[pos2]))
pos2++;
auto key = std::string(cmd+pos, pos2-pos);
pos = pos2;
return key;
}
char quot = cmd[pos];
char quot_or_slash[3] = { '\\', quot, 0 };
pos++;
std::string key;
while (pos < cmd.size())
while (true)
{
auto pos2 = cmd.find_first_of(quot_or_slash, pos);
pos2 = pos2 == std::string::npos ? cmd.size() : pos2;
auto pos2 = pos;
while (pos2 < size && cmd[pos2] != '\\' && cmd[pos2] != quot)
pos2++;
if (pos2 >= size || pos2 == size-1 && cmd[pos2] == '\\')
{
// Unfinished string literal
pos = orig;
return "";
}
if (pos2 > pos)
key += cmd.substr(pos, pos2-pos);
key += std::string(cmd+pos, pos2-pos);
pos = pos2;
if (pos >= cmd.size())
break;
if (cmd[pos] == quot)
{
pos++;
break;
}
if (cmd[pos] == '\\')
else /* if (cmd[pos] == '\\') */
{
if (pos < cmd.size()-1)
key += cmd[++pos];
key += cmd[++pos];
pos++;
}
}

View File

@ -6,6 +6,8 @@
#include <string>
#include <vector>
#define is_white(a) ((a) == ' ' || (a) == '\t' || (a) == '\r' || (a) == '\n')
std::string base64_encode(const std::string &in);
std::string base64_decode(const std::string &in);
uint64_t parse_size(std::string size_str, bool *ok = NULL);
@ -23,7 +25,8 @@ std::string str_repeat(const std::string & str, int times);
size_t utf8_length(const std::string & s);
size_t utf8_length(const char *s);
std::vector<std::string> explode(const std::string & sep, const std::string & value, bool trim);
std::string scan_escaped(const std::string & cmd, size_t & pos);
std::string scan_escaped(const char *cmd, size_t size, size_t & pos, bool allow_unquoted = true);
std::string scan_escaped(const std::string & cmd, size_t & pos, bool allow_unquoted = true);
std::string auto_addslashes(const std::string & str, const char *toescape = "\\\"");
std::string addslashes(const std::string & str, const char *toescape = "\\\"");
std::string realpath_str(std::string path, bool nofail = true);