Track versions of parent blocks and recheck if changed during update

master
Vitaliy Filippov 2023-10-22 02:00:55 +03:00
parent 2c6a301d9b
commit 59ae27f9e5
1 changed files with 51 additions and 26 deletions

View File

@ -37,7 +37,6 @@
#define KV_RECHECK_NONE 0
#define KV_RECHECK_LEAF 1
#define KV_RECHECK_ALL 2
#define KV_RECHECK_RELOAD 3
#define KV_CH_ADD 1
#define KV_CH_DEL 2
@ -152,6 +151,12 @@ struct kv_db_t
void stop_updating(kv_block_t *blk);
};
struct kv_path_t
{
uint64_t offset;
uint64_t version;
};
struct kv_op_t
{
kv_db_t *db;
@ -170,7 +175,7 @@ protected:
uint64_t cur_block = 0;
std::string prev_key_ge, prev_key_lt;
int cur_level = 0;
std::vector<uint64_t> path;
std::vector<kv_path_t> path;
bool updated = false;
int retry = 0;
bool skip_equal = false;
@ -183,7 +188,7 @@ protected:
void update_find();
void create_root();
void resume_split();
void update_block(int path_pos, bool is_delete, const std::string & key, const std::string & value, uint64_t block_ver, std::function<void(int)> cb);
void update_block(int path_pos, bool is_delete, const std::string & key, const std::string & value, std::function<void(int)> cb);
void next_handle_block(int res, bool updated);
void next_get();
@ -798,8 +803,11 @@ void kv_op_t::exec()
db->usage_counter++;
}
cur_level = -db->base_block_level;
path.clear();
path.push_back(0);
if (opcode == KV_LIST)
{
path.clear();
path.push_back((kv_path_t){ .offset = 0 });
}
recheck_policy = (opcode == KV_GET ? KV_RECHECK_LEAF : KV_RECHECK_NONE);
if (opcode == KV_GET || opcode == KV_GET_CACHED)
get();
@ -872,6 +880,14 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
}
this->updated = this->updated || updated;
auto blk = &db->block_cache.at(cur_block);
if (opcode != KV_GET && opcode != KV_GET_CACHED)
{
// Track the whole path and versions of all blocks during update
// and recheck parent blocks if their versions change. This is required
// because we have to handle parallel splits of parent blocks correctly
assert(path.size() > 0);
path[path.size()-1].version = db->known_versions[cur_block/db->ino_block_size];
}
if (key < blk->key_ge || blk->key_lt.size() && key >= blk->key_lt)
{
// We got an unrelated block - recheck the whole chain from the beginning
@ -903,8 +919,11 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
prev_key_ge = prev_key_lt = "";
cur_level = -db->base_block_level;
cur_block = 0;
path.clear();
path.push_back(0);
if (opcode != KV_GET && opcode != KV_GET_CACHED)
{
path.clear();
path.push_back((kv_path_t){ .offset = 0 });
}
this->recheck_policy = KV_RECHECK_ALL;
return -EAGAIN;
}
@ -916,8 +935,12 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
else if ((blk->type == KV_INT_SPLIT || blk->type == KV_LEAF_SPLIT) && key >= blk->right_half)
{
cur_block = blk->right_half_block;
assert(path.size() > 0);
path[path.size()-1] = cur_block;
if (opcode != KV_GET && opcode != KV_GET_CACHED)
{
assert(path.size() > 0);
path[path.size()-1].offset = cur_block;
path[path.size()-1].version = 0;
}
prev_key_ge = blk->right_half;
prev_key_lt = blk->key_lt;
return -EAGAIN;
@ -947,7 +970,10 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
prev_key_lt = m;
cur_level++;
cur_block = *((uint64_t*)child_it->second.data());
path.push_back(cur_block);
if (opcode != KV_GET && opcode != KV_GET_CACHED)
{
path.push_back((kv_path_t){ .offset = cur_block });
}
return -EAGAIN;
}
return 0;
@ -1161,7 +1187,7 @@ void kv_op_t::update()
cur_level = -db->base_block_level;
cur_block = 0;
path.clear();
path.push_back(0);
path.push_back((kv_path_t){ .offset = 0 });
// The beginning is (almost) the same as in get(). First we find path to the item
update_find();
}
@ -1215,7 +1241,7 @@ void kv_op_t::update_find()
}
else
{
update_block(path.size()-1, opcode == KV_DEL, key, value, UINT64_MAX, [=](int res)
update_block(path.size()-1, opcode == KV_DEL, key, value, [=](int res)
{
finish(res);
});
@ -1273,7 +1299,7 @@ void kv_op_t::resume_split()
auto blk = &db->block_cache.at(cur_block);
update_block(
path.size()-2, false, blk->right_half,
std::string((char*)&blk->right_half_block, sizeof(blk->right_half_block)), UINT64_MAX,
std::string((char*)&blk->right_half_block, sizeof(blk->right_half_block)),
[=](int res)
{
if (res < 0)
@ -1287,25 +1313,22 @@ void kv_op_t::resume_split()
}
void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key,
const std::string & value, uint64_t block_ver, std::function<void(int)> cb)
const std::string & value, std::function<void(int)> cb)
{
auto blk_it = db->block_cache.find(path[path_pos]);
auto blk_it = db->block_cache.find(path[path_pos].offset);
if (blk_it == db->block_cache.end())
{
// Block is not in cache anymore, recheck
db->run_continue_update(path[path_pos]);
db->run_continue_update(path[path_pos].offset);
update();
return;
}
auto blk = &blk_it->second;
if (block_ver == UINT64_MAX)
{
block_ver = db->known_versions[blk->offset/db->ino_block_size];
}
auto block_ver = path[path_pos].version;
if (blk->updating)
{
// Wait if block is being modified
db->continue_update.emplace(blk->offset, [=]() { update_block(path_pos, is_delete, key, value, block_ver, cb); });
db->continue_update.emplace(blk->offset, [=]() { update_block(path_pos, is_delete, key, value, cb); });
return;
}
if (db->known_versions[blk->offset/db->ino_block_size] != block_ver)
@ -1348,7 +1371,9 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
assert(!blk->change_type);
if (blk->type == KV_LEAF_SPLIT || blk->type == KV_INT_SPLIT)
{
if (prev_key_lt != "" && prev_key_lt <= blk->right_half)
if (prev_key_lt != "" && prev_key_lt <= blk->right_half ||
// We can't check it for sure for parent blocks because we don't track their prev_key_lt/ge
path_pos < path.size()-1)
blk->change_type = KV_CH_CLEAR_RIGHT;
else
{
@ -1523,7 +1548,7 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
// Do not cleanup anything on failure because stored right_blk is already referenced
update_block(
path_pos-1, false, separator,
std::string((char*)&right_blk->offset, sizeof(right_blk->offset)), UINT64_MAX,
std::string((char*)&right_blk->offset, sizeof(right_blk->offset)),
cb
);
}
@ -1599,7 +1624,7 @@ void kv_op_t::next_get()
key = blk->right_half;
prev_key_ge = blk->right_half;
prev_key_lt = blk->key_lt;
path[path.size()-1] = cur_block = blk->right_half_block;
path[path.size()-1].offset = cur_block = blk->right_half_block;
next();
}
else
@ -1623,7 +1648,7 @@ void kv_op_t::next_go_up()
skip_equal = false;
path.pop_back();
cur_level--;
cur_block = path[path.size()-1];
cur_block = path[path.size()-1].offset;
recheck_policy = KV_RECHECK_LEAF;
// Check if we can resume listing from the next key
auto pb_it = db->block_cache.find(cur_block);
@ -1634,7 +1659,7 @@ void kv_op_t::next_go_up()
cur_level = -db->base_block_level;
cur_block = 0;
path.clear();
path.push_back(0);
path.push_back((kv_path_t){ .offset = 0 });
next();
return;
}