From 59ae27f9e5ec22fea41075db670efcd1d942eec1 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 22 Oct 2023 02:00:55 +0300 Subject: [PATCH] Track versions of parent blocks and recheck if changed during update --- src/kv_db.cpp | 77 ++++++++++++++++++++++++++++++++++----------------- 1 file changed, 51 insertions(+), 26 deletions(-) diff --git a/src/kv_db.cpp b/src/kv_db.cpp index d3e2f82b..062acb4f 100644 --- a/src/kv_db.cpp +++ b/src/kv_db.cpp @@ -37,7 +37,6 @@ #define KV_RECHECK_NONE 0 #define KV_RECHECK_LEAF 1 #define KV_RECHECK_ALL 2 -#define KV_RECHECK_RELOAD 3 #define KV_CH_ADD 1 #define KV_CH_DEL 2 @@ -152,6 +151,12 @@ struct kv_db_t void stop_updating(kv_block_t *blk); }; +struct kv_path_t +{ + uint64_t offset; + uint64_t version; +}; + struct kv_op_t { kv_db_t *db; @@ -170,7 +175,7 @@ protected: uint64_t cur_block = 0; std::string prev_key_ge, prev_key_lt; int cur_level = 0; - std::vector path; + std::vector path; bool updated = false; int retry = 0; bool skip_equal = false; @@ -183,7 +188,7 @@ protected: void update_find(); void create_root(); void resume_split(); - void update_block(int path_pos, bool is_delete, const std::string & key, const std::string & value, uint64_t block_ver, std::function cb); + void update_block(int path_pos, bool is_delete, const std::string & key, const std::string & value, std::function cb); void next_handle_block(int res, bool updated); void next_get(); @@ -798,8 +803,11 @@ void kv_op_t::exec() db->usage_counter++; } cur_level = -db->base_block_level; - path.clear(); - path.push_back(0); + if (opcode == KV_LIST) + { + path.clear(); + path.push_back((kv_path_t){ .offset = 0 }); + } recheck_policy = (opcode == KV_GET ? KV_RECHECK_LEAF : KV_RECHECK_NONE); if (opcode == KV_GET || opcode == KV_GET_CACHED) get(); @@ -872,6 +880,14 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split) } this->updated = this->updated || updated; auto blk = &db->block_cache.at(cur_block); + if (opcode != KV_GET && opcode != KV_GET_CACHED) + { + // Track the whole path and versions of all blocks during update + // and recheck parent blocks if their versions change. This is required + // because we have to handle parallel splits of parent blocks correctly + assert(path.size() > 0); + path[path.size()-1].version = db->known_versions[cur_block/db->ino_block_size]; + } if (key < blk->key_ge || blk->key_lt.size() && key >= blk->key_lt) { // We got an unrelated block - recheck the whole chain from the beginning @@ -903,8 +919,11 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split) prev_key_ge = prev_key_lt = ""; cur_level = -db->base_block_level; cur_block = 0; - path.clear(); - path.push_back(0); + if (opcode != KV_GET && opcode != KV_GET_CACHED) + { + path.clear(); + path.push_back((kv_path_t){ .offset = 0 }); + } this->recheck_policy = KV_RECHECK_ALL; return -EAGAIN; } @@ -916,8 +935,12 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split) else if ((blk->type == KV_INT_SPLIT || blk->type == KV_LEAF_SPLIT) && key >= blk->right_half) { cur_block = blk->right_half_block; - assert(path.size() > 0); - path[path.size()-1] = cur_block; + if (opcode != KV_GET && opcode != KV_GET_CACHED) + { + assert(path.size() > 0); + path[path.size()-1].offset = cur_block; + path[path.size()-1].version = 0; + } prev_key_ge = blk->right_half; prev_key_lt = blk->key_lt; return -EAGAIN; @@ -947,7 +970,10 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split) prev_key_lt = m; cur_level++; cur_block = *((uint64_t*)child_it->second.data()); - path.push_back(cur_block); + if (opcode != KV_GET && opcode != KV_GET_CACHED) + { + path.push_back((kv_path_t){ .offset = cur_block }); + } return -EAGAIN; } return 0; @@ -1161,7 +1187,7 @@ void kv_op_t::update() cur_level = -db->base_block_level; cur_block = 0; path.clear(); - path.push_back(0); + path.push_back((kv_path_t){ .offset = 0 }); // The beginning is (almost) the same as in get(). First we find path to the item update_find(); } @@ -1215,7 +1241,7 @@ void kv_op_t::update_find() } else { - update_block(path.size()-1, opcode == KV_DEL, key, value, UINT64_MAX, [=](int res) + update_block(path.size()-1, opcode == KV_DEL, key, value, [=](int res) { finish(res); }); @@ -1273,7 +1299,7 @@ void kv_op_t::resume_split() auto blk = &db->block_cache.at(cur_block); update_block( path.size()-2, false, blk->right_half, - std::string((char*)&blk->right_half_block, sizeof(blk->right_half_block)), UINT64_MAX, + std::string((char*)&blk->right_half_block, sizeof(blk->right_half_block)), [=](int res) { if (res < 0) @@ -1287,25 +1313,22 @@ void kv_op_t::resume_split() } void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key, - const std::string & value, uint64_t block_ver, std::function cb) + const std::string & value, std::function cb) { - auto blk_it = db->block_cache.find(path[path_pos]); + auto blk_it = db->block_cache.find(path[path_pos].offset); if (blk_it == db->block_cache.end()) { // Block is not in cache anymore, recheck - db->run_continue_update(path[path_pos]); + db->run_continue_update(path[path_pos].offset); update(); return; } auto blk = &blk_it->second; - if (block_ver == UINT64_MAX) - { - block_ver = db->known_versions[blk->offset/db->ino_block_size]; - } + auto block_ver = path[path_pos].version; if (blk->updating) { // Wait if block is being modified - db->continue_update.emplace(blk->offset, [=]() { update_block(path_pos, is_delete, key, value, block_ver, cb); }); + db->continue_update.emplace(blk->offset, [=]() { update_block(path_pos, is_delete, key, value, cb); }); return; } if (db->known_versions[blk->offset/db->ino_block_size] != block_ver) @@ -1348,7 +1371,9 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key assert(!blk->change_type); if (blk->type == KV_LEAF_SPLIT || blk->type == KV_INT_SPLIT) { - if (prev_key_lt != "" && prev_key_lt <= blk->right_half) + if (prev_key_lt != "" && prev_key_lt <= blk->right_half || + // We can't check it for sure for parent blocks because we don't track their prev_key_lt/ge + path_pos < path.size()-1) blk->change_type = KV_CH_CLEAR_RIGHT; else { @@ -1523,7 +1548,7 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key // Do not cleanup anything on failure because stored right_blk is already referenced update_block( path_pos-1, false, separator, - std::string((char*)&right_blk->offset, sizeof(right_blk->offset)), UINT64_MAX, + std::string((char*)&right_blk->offset, sizeof(right_blk->offset)), cb ); } @@ -1599,7 +1624,7 @@ void kv_op_t::next_get() key = blk->right_half; prev_key_ge = blk->right_half; prev_key_lt = blk->key_lt; - path[path.size()-1] = cur_block = blk->right_half_block; + path[path.size()-1].offset = cur_block = blk->right_half_block; next(); } else @@ -1623,7 +1648,7 @@ void kv_op_t::next_go_up() skip_equal = false; path.pop_back(); cur_level--; - cur_block = path[path.size()-1]; + cur_block = path[path.size()-1].offset; recheck_policy = KV_RECHECK_LEAF; // Check if we can resume listing from the next key auto pb_it = db->block_cache.find(cur_block); @@ -1634,7 +1659,7 @@ void kv_op_t::next_go_up() cur_level = -db->base_block_level; cur_block = 0; path.clear(); - path.push_back(0); + path.push_back((kv_path_t){ .offset = 0 }); next(); return; }