Allow to track multiple updates per block (it should never happen though)

antietcd
Vitaliy Filippov 2023-10-22 16:56:46 +03:00
parent e4fa26f60a
commit 31ec3fa8f5
1 changed files with 10 additions and 7 deletions

View File

@ -82,7 +82,7 @@ struct kv_block_t
std::map<std::string, std::string> data; std::map<std::string, std::string> data;
// set during update // set during update
bool updating; int updating = 0;
int change_type; int change_type;
std::string change_key, change_value; std::string change_key, change_value;
std::string change_rh; std::string change_rh;
@ -625,8 +625,10 @@ void kv_db_t::run_continue_update(uint64_t offset)
void kv_db_t::stop_updating(kv_block_t *blk) void kv_db_t::stop_updating(kv_block_t *blk)
{ {
blk->updating = false; assert(blk->updating > 0);
run_continue_update(blk->offset); blk->updating--;
if (!blk->updating)
run_continue_update(blk->offset);
} }
static void del_block_level(kv_db_t *db, kv_block_t *blk) static void del_block_level(kv_db_t *db, kv_block_t *blk)
@ -646,7 +648,7 @@ static void invalidate(kv_db_t *db, uint64_t offset, uint64_t version)
auto b_it = db->block_cache.lower_bound(offset/db->ino_block_size * db->ino_block_size); auto b_it = db->block_cache.lower_bound(offset/db->ino_block_size * db->ino_block_size);
while (b_it != db->block_cache.end() && b_it->first/db->ino_block_size == offset/db->ino_block_size) while (b_it != db->block_cache.end() && b_it->first/db->ino_block_size == offset/db->ino_block_size)
{ {
if (b_it->second.updating) if (b_it->second.updating > 0)
{ {
// do not forget blocks during modification // do not forget blocks during modification
b_it++; b_it++;
@ -1073,7 +1075,7 @@ static kv_block_t *create_new_block(kv_db_t *db, kv_block_t *old_blk, const std:
blk->level = old_blk->level; blk->level = old_blk->level;
blk->type = old_blk->type == KV_LEAF_SPLIT || old_blk->type == KV_LEAF ? KV_LEAF : KV_INT; blk->type = old_blk->type == KV_LEAF_SPLIT || old_blk->type == KV_LEAF ? KV_LEAF : KV_INT;
blk->offset = new_offset; blk->offset = new_offset;
blk->updating = true; blk->updating++;
blk->key_ge = right ? separator : old_blk->key_ge; blk->key_ge = right ? separator : old_blk->key_ge;
blk->key_lt = right ? old_blk->key_lt : separator; blk->key_lt = right ? old_blk->key_lt : separator;
blk->data.insert(right ? old_blk->data.lower_bound(separator) : old_blk->data.begin(), blk->data.insert(right ? old_blk->data.lower_bound(separator) : old_blk->data.begin(),
@ -1269,7 +1271,7 @@ void kv_op_t::create_root()
blk->data[key] = value; blk->data[key] = value;
blk->set_data_size(); blk->set_data_size();
add_block_level(db, blk); add_block_level(db, blk);
blk->updating = true; blk->updating++;
write_block(db, blk, [=](int res) write_block(db, blk, [=](int res)
{ {
db->stop_updating(blk); db->stop_updating(blk);
@ -1384,7 +1386,7 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
abort(); abort();
} }
} }
blk->updating = true; blk->updating++;
if (is_delete || (blk->data_size + kv_block_t::kv_size(key, value) - rm_size) < db->kv_block_size) if (is_delete || (blk->data_size + kv_block_t::kv_size(key, value) - rm_size) < db->kv_block_size)
{ {
// New item fits. // New item fits.
@ -1463,6 +1465,7 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
new_root->data[""] = std::string((char*)&left_blk->offset, sizeof(left_blk->offset)); new_root->data[""] = std::string((char*)&left_blk->offset, sizeof(left_blk->offset));
new_root->data[separator] = std::string((char*)&right_blk->offset, sizeof(right_blk->offset)); new_root->data[separator] = std::string((char*)&right_blk->offset, sizeof(right_blk->offset));
new_root->set_data_size(); new_root->set_data_size();
new_root->updating++;
write_block(db, new_root, [=](int write_res) write_block(db, new_root, [=](int write_res)
{ {
if (write_res < 0) if (write_res < 0)