Compare commits
38 Commits
7ef14d586f
...
d1f92d3cad
Author | SHA1 | Date |
---|---|---|
Vitaliy Filippov | d1f92d3cad | |
Vitaliy Filippov | 5217fb7915 | |
Vitaliy Filippov | 0de8a04d06 | |
Vitaliy Filippov | 95f95bac7b | |
Vitaliy Filippov | 6419477cfd | |
Vitaliy Filippov | e5c22f5f27 | |
Vitaliy Filippov | 4de61f092b | |
Vitaliy Filippov | a762831882 | |
Vitaliy Filippov | a8bf2e29fe | |
Vitaliy Filippov | 8b190fd41c | |
Vitaliy Filippov | d6c55948cc | |
Vitaliy Filippov | fc9df30a00 | |
Vitaliy Filippov | af96c0dd3f | |
Vitaliy Filippov | b6cd7d69fe | |
Vitaliy Filippov | 9d60523f28 | |
Vitaliy Filippov | 0378739449 | |
Vitaliy Filippov | 847ee33b03 | |
Vitaliy Filippov | 956da75eba | |
Vitaliy Filippov | 8d30ebbe6e | |
Vitaliy Filippov | 649459d4d0 | |
Vitaliy Filippov | f44c54b49c | |
Vitaliy Filippov | 91c835fb61 | |
Vitaliy Filippov | 540ea4b055 | |
Vitaliy Filippov | 2fcc60f79b | |
Vitaliy Filippov | edfaee8148 | |
Vitaliy Filippov | dd53a52f1b | |
Vitaliy Filippov | 5e6c7c5f69 | |
Vitaliy Filippov | fc402c6a11 | |
Vitaliy Filippov | 9f31cd32d0 | |
Vitaliy Filippov | 5c465332f0 | |
Vitaliy Filippov | 8e7fce7e28 | |
Vitaliy Filippov | 7ed4f0e9cf | |
Vitaliy Filippov | 381630e670 | |
Vitaliy Filippov | a772954154 | |
Vitaliy Filippov | 73a0eaf784 | |
Vitaliy Filippov | aa79d1db1c | |
Vitaliy Filippov | a1fecb7eff | |
Vitaliy Filippov | ff74b19423 |
|
@ -32,7 +32,7 @@ void blockstore_init_meta::handle_event(ring_data_t *data, int buf_num)
|
|||
if (data->res < 0)
|
||||
{
|
||||
throw std::runtime_error(
|
||||
std::string("read metadata failed at offset ") + std::to_string(bufs[buf_num].offset) +
|
||||
std::string("read metadata failed at offset ") + std::to_string(buf_num >= 0 ? bufs[buf_num].offset : last_read_offset) +
|
||||
std::string(": ") + strerror(-data->res)
|
||||
);
|
||||
}
|
||||
|
@ -63,6 +63,7 @@ int blockstore_init_meta::loop()
|
|||
throw std::runtime_error("Failed to allocate metadata read buffer");
|
||||
// Read superblock
|
||||
GET_SQE();
|
||||
last_read_offset = 0;
|
||||
data->iov = { metadata_buffer, (size_t)bs->dsk.meta_block_size };
|
||||
data->callback = [this](ring_data_t *data) { handle_event(data, -1); };
|
||||
my_uring_prep_readv(sqe, bs->dsk.meta_fd, &data->iov, 1, bs->dsk.meta_offset);
|
||||
|
@ -100,6 +101,7 @@ resume_1:
|
|||
{
|
||||
printf("Initializing metadata area\n");
|
||||
GET_SQE();
|
||||
last_read_offset = 0;
|
||||
data->iov = (struct iovec){ metadata_buffer, (size_t)bs->dsk.meta_block_size };
|
||||
data->callback = [this](ring_data_t *data) { handle_event(data, -1); };
|
||||
my_uring_prep_writev(sqe, bs->dsk.meta_fd, &data->iov, 1, bs->dsk.meta_offset);
|
||||
|
@ -259,9 +261,11 @@ resume_2:
|
|||
next_offset = entries_to_zero[i]/entries_per_block;
|
||||
for (j = i; j < entries_to_zero.size() && entries_to_zero[j]/entries_per_block == next_offset; j++) {}
|
||||
GET_SQE();
|
||||
last_read_offset = (1+next_offset)*bs->dsk.meta_block_size;
|
||||
data->iov = { metadata_buffer, (size_t)bs->dsk.meta_block_size };
|
||||
data->callback = [this](ring_data_t *data) { handle_event(data, -1); };
|
||||
my_uring_prep_readv(sqe, bs->dsk.meta_fd, &data->iov, 1, bs->dsk.meta_offset + (1+next_offset)*bs->dsk.meta_block_size);
|
||||
bs->ringloop->submit();
|
||||
submitted++;
|
||||
resume_5:
|
||||
if (submitted > 0)
|
||||
|
@ -278,6 +282,7 @@ resume_5:
|
|||
data->iov = { metadata_buffer, (size_t)bs->dsk.meta_block_size };
|
||||
data->callback = [this](ring_data_t *data) { handle_event(data, -1); };
|
||||
my_uring_prep_writev(sqe, bs->dsk.meta_fd, &data->iov, 1, bs->dsk.meta_offset + (1+next_offset)*bs->dsk.meta_block_size);
|
||||
bs->ringloop->submit();
|
||||
submitted++;
|
||||
resume_6:
|
||||
if (submitted > 0)
|
||||
|
@ -299,6 +304,7 @@ resume_6:
|
|||
{
|
||||
GET_SQE();
|
||||
my_uring_prep_fsync(sqe, bs->dsk.meta_fd, IORING_FSYNC_DATASYNC);
|
||||
last_read_offset = 0;
|
||||
data->iov = { 0 };
|
||||
data->callback = [this](ring_data_t *data) { handle_event(data, -1); };
|
||||
submitted++;
|
||||
|
|
|
@ -23,6 +23,7 @@ class blockstore_init_meta
|
|||
struct ring_data_t *data;
|
||||
uint64_t md_offset = 0;
|
||||
uint64_t next_offset = 0;
|
||||
uint64_t last_read_offset = 0;
|
||||
uint64_t entries_loaded = 0;
|
||||
unsigned entries_per_block = 0;
|
||||
int i = 0, j = 0;
|
||||
|
|
|
@ -64,10 +64,6 @@ std::string validate_pool_config(json11::Json::object & new_cfg, json11::Json ol
|
|||
(new_cfg["parity_chunks"].uint64_value() > 1 ? 1 : 0);
|
||||
}
|
||||
}
|
||||
if (new_cfg["scheme"] != "ec")
|
||||
{
|
||||
new_cfg.erase("parity_chunks");
|
||||
}
|
||||
|
||||
// Check integer values and unknown keys
|
||||
for (auto kv_it = new_cfg.begin(); kv_it != new_cfg.end(); )
|
||||
|
@ -118,6 +114,12 @@ std::string validate_pool_config(json11::Json::object & new_cfg, json11::Json ol
|
|||
}
|
||||
}
|
||||
|
||||
// Check after merging
|
||||
if (new_cfg["scheme"] != "ec")
|
||||
{
|
||||
new_cfg.erase("parity_chunks");
|
||||
}
|
||||
|
||||
// Prevent autovivification of object keys. Now we don't modify the config, we just check it
|
||||
json11::Json cfg = new_cfg;
|
||||
|
||||
|
|
|
@ -238,7 +238,8 @@ void cluster_client_t::erase_op(cluster_op_t *op)
|
|||
// which may continue following SYNCs, but these SYNCs
|
||||
// should know about the changed buffer state
|
||||
// This is ugly but this is the way we do it
|
||||
std::function<void(cluster_op_t*)>(op->callback)(op);
|
||||
auto cb = std::move(op->callback);
|
||||
cb(op);
|
||||
}
|
||||
if (!(flags & OP_IMMEDIATE_COMMIT) || enable_writeback)
|
||||
{
|
||||
|
@ -248,7 +249,8 @@ void cluster_client_t::erase_op(cluster_op_t *op)
|
|||
{
|
||||
// Call callback at the end to avoid inconsistencies in prev_wait
|
||||
// if the callback adds more operations itself
|
||||
std::function<void(cluster_op_t*)>(op->callback)(op);
|
||||
auto cb = std::move(op->callback);
|
||||
cb(op);
|
||||
}
|
||||
if (flags & OP_FLUSH_BUFFER)
|
||||
{
|
||||
|
@ -548,7 +550,8 @@ void cluster_client_t::execute(cluster_op_t *op)
|
|||
op->opcode != OSD_OP_READ_BITMAP && op->opcode != OSD_OP_READ_CHAIN_BITMAP && op->opcode != OSD_OP_WRITE)
|
||||
{
|
||||
op->retval = -EINVAL;
|
||||
std::function<void(cluster_op_t*)>(op->callback)(op);
|
||||
auto cb = std::move(op->callback);
|
||||
cb(op);
|
||||
return;
|
||||
}
|
||||
if (!pgs_loaded)
|
||||
|
@ -586,7 +589,8 @@ void cluster_client_t::execute_internal(cluster_op_t *op)
|
|||
wb->start_writebacks(this, 1);
|
||||
}
|
||||
op->retval = op->len;
|
||||
std::function<void(cluster_op_t*)>(op->callback)(op);
|
||||
auto cb = std::move(op->callback);
|
||||
cb(op);
|
||||
return;
|
||||
}
|
||||
if (op->opcode == OSD_OP_WRITE && !(op->flags & OP_IMMEDIATE_COMMIT))
|
||||
|
@ -655,7 +659,8 @@ bool cluster_client_t::check_rw(cluster_op_t *op)
|
|||
if (!pool_id)
|
||||
{
|
||||
op->retval = -EINVAL;
|
||||
std::function<void(cluster_op_t*)>(op->callback)(op);
|
||||
auto cb = std::move(op->callback);
|
||||
cb(op);
|
||||
return false;
|
||||
}
|
||||
auto pool_it = st_cli.pool_config.find(pool_id);
|
||||
|
@ -663,7 +668,8 @@ bool cluster_client_t::check_rw(cluster_op_t *op)
|
|||
{
|
||||
// Pools are loaded, but this one is unknown
|
||||
op->retval = -EINVAL;
|
||||
std::function<void(cluster_op_t*)>(op->callback)(op);
|
||||
auto cb = std::move(op->callback);
|
||||
cb(op);
|
||||
return false;
|
||||
}
|
||||
// Check alignment
|
||||
|
@ -671,7 +677,8 @@ bool cluster_client_t::check_rw(cluster_op_t *op)
|
|||
op->offset % pool_it->second.bitmap_granularity || op->len % pool_it->second.bitmap_granularity)
|
||||
{
|
||||
op->retval = -EINVAL;
|
||||
std::function<void(cluster_op_t*)>(op->callback)(op);
|
||||
auto cb = std::move(op->callback);
|
||||
cb(op);
|
||||
return false;
|
||||
}
|
||||
if (pool_it->second.immediate_commit == IMMEDIATE_ALL)
|
||||
|
@ -684,7 +691,8 @@ bool cluster_client_t::check_rw(cluster_op_t *op)
|
|||
if (ino_it != st_cli.inode_config.end() && ino_it->second.readonly)
|
||||
{
|
||||
op->retval = -EROFS;
|
||||
std::function<void(cluster_op_t*)>(op->callback)(op);
|
||||
auto cb = std::move(op->callback);
|
||||
cb(op);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -118,8 +118,9 @@ void kv_cli_t::run(const json11::Json::object & cfg)
|
|||
finished = true;
|
||||
}
|
||||
});
|
||||
interactive = true;
|
||||
printf("> ");
|
||||
interactive = isatty(0);
|
||||
if (interactive)
|
||||
printf("> ");
|
||||
}
|
||||
catch (std::exception & e)
|
||||
{
|
||||
|
@ -236,7 +237,7 @@ void kv_cli_t::handle_cmd(const std::string & cmd, std::function<void()> cb)
|
|||
if (res < 0)
|
||||
fprintf(stderr, "Error opening index: %s (code %d)\n", strerror(-res), res);
|
||||
else
|
||||
printf("Index opened. Current size: %lu bytes\n", db->get_size());
|
||||
fprintf(interactive ? stdout : stderr, "Index opened. Current size: %lu bytes\n", db->get_size());
|
||||
cb();
|
||||
});
|
||||
}
|
||||
|
@ -272,15 +273,15 @@ void kv_cli_t::handle_cmd(const std::string & cmd, std::function<void()> cb)
|
|||
}
|
||||
else if (opname == "get" || opname == "set" || opname == "del")
|
||||
{
|
||||
std::string key = scan_escaped(cmd, pos);
|
||||
if (opname == "get" || opname == "del")
|
||||
{
|
||||
if (pos == std::string::npos)
|
||||
if (key == "")
|
||||
{
|
||||
fprintf(stderr, "Usage: %s <key>\n", opname.c_str());
|
||||
cb();
|
||||
return;
|
||||
}
|
||||
auto key = trim(cmd.substr(pos+1));
|
||||
if (opname == "get")
|
||||
{
|
||||
db->get(key, [this, cb](int res, const std::string & value)
|
||||
|
@ -302,34 +303,33 @@ void kv_cli_t::handle_cmd(const std::string & cmd, std::function<void()> cb)
|
|||
if (res < 0)
|
||||
fprintf(stderr, "Error: %s (code %d)\n", strerror(-res), res);
|
||||
else
|
||||
printf("OK\n");
|
||||
fprintf(interactive ? stdout : stderr, "OK\n");
|
||||
cb();
|
||||
});
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
auto pos2 = cmd.find_first_of(" \t", pos+1);
|
||||
if (pos2 == std::string::npos)
|
||||
if (key == "" || pos >= cmd.size())
|
||||
{
|
||||
fprintf(stderr, "Usage: set <key> <value>\n");
|
||||
cb();
|
||||
return;
|
||||
}
|
||||
auto key = trim(cmd.substr(pos+1, pos2-pos-1));
|
||||
auto value = trim(cmd.substr(pos2+1));
|
||||
auto value = trim(cmd.substr(pos));
|
||||
db->set(key, value, [this, cb](int res)
|
||||
{
|
||||
if (res < 0)
|
||||
fprintf(stderr, "Error: %s (code %d)\n", strerror(-res), res);
|
||||
else
|
||||
printf("OK\n");
|
||||
fprintf(interactive ? stdout : stderr, "OK\n");
|
||||
cb();
|
||||
});
|
||||
}
|
||||
}
|
||||
else if (opname == "list")
|
||||
else if (opname == "list" || opname == "dump")
|
||||
{
|
||||
bool dump = opname == "dump";
|
||||
std::string start, end;
|
||||
if (pos != std::string::npos)
|
||||
{
|
||||
|
@ -358,7 +358,10 @@ void kv_cli_t::handle_cmd(const std::string & cmd, std::function<void()> cb)
|
|||
}
|
||||
else
|
||||
{
|
||||
printf("%s = %s\n", key.c_str(), value.c_str());
|
||||
if (dump)
|
||||
printf("set %s %s\n", auto_addslashes(key).c_str(), value.c_str());
|
||||
else
|
||||
printf("%s = %s\n", key.c_str(), value.c_str());
|
||||
db->list_next(handle, NULL);
|
||||
}
|
||||
});
|
||||
|
@ -367,7 +370,7 @@ void kv_cli_t::handle_cmd(const std::string & cmd, std::function<void()> cb)
|
|||
{
|
||||
db->close([=]()
|
||||
{
|
||||
printf("Index closed\n");
|
||||
fprintf(interactive ? stdout : stderr, "Index closed\n");
|
||||
cb();
|
||||
});
|
||||
}
|
||||
|
@ -382,7 +385,8 @@ void kv_cli_t::handle_cmd(const std::string & cmd, std::function<void()> cb)
|
|||
stderr, "Unknown operation: %s. Supported operations:\n"
|
||||
"open <pool_id> <inode_id> [block_size]\n"
|
||||
"config <property> <value>\n"
|
||||
"get <key>\nset <key> <value>\ndel <key>\nlist [<start> [end]]\n"
|
||||
"get <key>\nset <key> <value>\ndel <key>\n"
|
||||
"list [<start> [end]]\ndump [<start> [end]]\n"
|
||||
"close\nquit\n", opname.c_str()
|
||||
);
|
||||
cb();
|
||||
|
|
|
@ -701,15 +701,13 @@ void kv_db_t::find_size(uint64_t min, uint64_t max, int phase, std::function<voi
|
|||
op->inode = inode_id;
|
||||
op->offset = (phase == 1 ? min : (min+max)/2) * ino_block_size;
|
||||
op->len = kv_block_size;
|
||||
if (op->len)
|
||||
{
|
||||
op->iov.push_back(malloc_or_die(op->len), op->len);
|
||||
}
|
||||
op->iov.push_back(malloc_or_die(op->len), op->len);
|
||||
op->callback = [=](cluster_op_t *op)
|
||||
{
|
||||
if (op->retval != op->len)
|
||||
{
|
||||
// error
|
||||
free(op->iov.buf[0].iov_base);
|
||||
cb(op->retval >= 0 ? -EIO : op->retval, 0);
|
||||
return;
|
||||
}
|
||||
|
@ -896,7 +894,9 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p
|
|||
if (op->retval != op->len)
|
||||
{
|
||||
// error
|
||||
free(op->iov.buf[0].iov_base);
|
||||
cb(op->retval >= 0 ? -EIO : op->retval, BLK_NOCHANGE);
|
||||
delete op;
|
||||
return;
|
||||
}
|
||||
invalidate(db, op->offset, op->version);
|
||||
|
@ -909,6 +909,8 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p
|
|||
if (blk->updating > 0 && recheck_policy == KV_RECHECK_WAIT)
|
||||
{
|
||||
// Wait until block update stops
|
||||
free(op->iov.buf[0].iov_base);
|
||||
delete op;
|
||||
db->continue_update.emplace(blk->offset, [=, blk_offset = blk->offset]()
|
||||
{
|
||||
get_block(db, offset, cur_level, recheck_policy, cb);
|
||||
|
@ -1339,6 +1341,8 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int
|
|||
del_block_level(db, blk);
|
||||
db->block_cache.erase(blk->offset);
|
||||
cb(op->retval >= 0 ? -EIO : op->retval, NULL);
|
||||
free(op->iov.buf[0].iov_base);
|
||||
delete op;
|
||||
return;
|
||||
}
|
||||
invalidate(db, op->offset, op->version);
|
||||
|
@ -1353,6 +1357,8 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int
|
|||
// Block is already occupied => place again
|
||||
place_again(db, blk, cb);
|
||||
}
|
||||
free(op->iov.buf[0].iov_base);
|
||||
delete op;
|
||||
};
|
||||
db->cli->execute(op);
|
||||
}
|
||||
|
|
|
@ -348,3 +348,65 @@ std::vector<std::string> explode(const std::string & sep, const std::string & va
|
|||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
// extract possibly double-quoted part of string with escape characters
|
||||
std::string scan_escaped(const std::string & cmd, size_t & pos)
|
||||
{
|
||||
std::string key;
|
||||
auto pos2 = cmd.find_first_not_of(" \t\r\n", pos);
|
||||
if (pos2 == std::string::npos)
|
||||
{
|
||||
pos = cmd.size();
|
||||
return "";
|
||||
}
|
||||
pos = pos2;
|
||||
if (cmd[pos] != '"')
|
||||
{
|
||||
pos2 = cmd.find_first_of(" \t\r\n", pos);
|
||||
pos2 = pos2 == std::string::npos ? cmd.size() : pos2;
|
||||
key = cmd.substr(pos, pos2-pos);
|
||||
pos2 = cmd.find_first_not_of(" \t\r\n", pos2);
|
||||
pos = pos2 == std::string::npos ? cmd.size() : pos2;
|
||||
return key;
|
||||
}
|
||||
pos++;
|
||||
while (pos < cmd.size())
|
||||
{
|
||||
auto pos2 = cmd.find_first_of("\\\"", pos);
|
||||
pos2 = pos2 == std::string::npos ? cmd.size() : pos2;
|
||||
if (pos2 > pos)
|
||||
key += cmd.substr(pos, pos2-pos);
|
||||
pos = pos2;
|
||||
if (pos >= cmd.size())
|
||||
break;
|
||||
if (cmd[pos] == '"')
|
||||
{
|
||||
pos++;
|
||||
break;
|
||||
}
|
||||
if (cmd[pos] == '\\')
|
||||
{
|
||||
if (pos < cmd.size()-1)
|
||||
key += cmd[++pos];
|
||||
pos++;
|
||||
}
|
||||
}
|
||||
return key;
|
||||
}
|
||||
|
||||
std::string auto_addslashes(const std::string & str)
|
||||
{
|
||||
auto pos = str.find_first_of("\\\"");
|
||||
if (pos == std::string::npos)
|
||||
return str;
|
||||
std::string res = "\""+str.substr(0, pos)+"\\"+str[pos];
|
||||
while (pos < str.size()-1)
|
||||
{
|
||||
auto pos2 = str.find_first_of("\\\"", pos+1);
|
||||
if (pos2 == std::string::npos)
|
||||
return res + str.substr(pos+1) + "\"";
|
||||
res += str.substr(pos, pos2-pos)+"\\"+str[pos2];
|
||||
pos = pos2;
|
||||
}
|
||||
return res+"\"";
|
||||
}
|
||||
|
|
|
@ -22,3 +22,5 @@ std::string str_repeat(const std::string & str, int times);
|
|||
size_t utf8_length(const std::string & s);
|
||||
size_t utf8_length(const char *s);
|
||||
std::vector<std::string> explode(const std::string & sep, const std::string & value, bool trim);
|
||||
std::string scan_escaped(const std::string & cmd, size_t & pos);
|
||||
std::string auto_addslashes(const std::string & str);
|
||||
|
|
Loading…
Reference in New Issue