Invalidate blocks being updated too

antietcd
Vitaliy Filippov 2023-10-25 12:56:22 +03:00
parent e98a38810d
commit a7396d2baf
1 changed files with 33 additions and 4 deletions

View File

@ -83,6 +83,7 @@ struct kv_block_t
// set during update // set during update
int updating = 0; int updating = 0;
bool invalidated = false;
int change_type; int change_type;
std::string change_key, change_value; std::string change_key, change_value;
std::string change_rh; std::string change_rh;
@ -708,6 +709,11 @@ static void invalidate(kv_db_t *db, uint64_t offset, uint64_t version)
if (b_it->second.updating > 0) if (b_it->second.updating > 0)
{ {
// do not forget blocks during modification // do not forget blocks during modification
// but we have to remember the fact that they're not valid anymore
// because otherwise, if we keep `updating` flag more than just during
// the write_block() operation, we may end up modifying an outdated
// version of the block in memory
b_it->second.invalidated = true;
b_it++; b_it++;
} }
else else
@ -790,8 +796,8 @@ static void try_evict(kv_db_t *db)
static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_policy, std::function<void(int, bool)> cb) static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_policy, std::function<void(int, bool)> cb)
{ {
auto b_it = db->block_cache.find(offset); auto b_it = db->block_cache.find(offset);
if (b_it != db->block_cache.end() && (recheck_policy == KV_RECHECK_NONE || if (b_it != db->block_cache.end() && (recheck_policy == KV_RECHECK_NONE && !b_it->second.invalidated ||
recheck_policy == KV_RECHECK_LEAF && b_it->second.type != KV_LEAF || recheck_policy == KV_RECHECK_LEAF && b_it->second.type != KV_LEAF && !b_it->second.invalidated ||
b_it->second.updating)) b_it->second.updating))
{ {
// Block already in cache, we can proceed // Block already in cache, we can proceed
@ -1064,6 +1070,19 @@ static std::string find_splitter(kv_db_t *db, kv_block_t *blk)
static void write_block(kv_db_t *db, kv_block_t *blk, std::function<void(int)> cb) static void write_block(kv_db_t *db, kv_block_t *blk, std::function<void(int)> cb)
{ {
if (blk->invalidated)
{
// Cancel all writes into the same inode block
auto b_it = db->continue_write.find(blk->offset/db->ino_block_size);
while (b_it != db->continue_write.end() && b_it->first == blk->offset/db->ino_block_size)
{
auto cont = b_it->second;
db->continue_write.erase(b_it++);
cont.cb(-EINTR);
}
cb(-EINTR);
return;
}
auto & new_version = db->new_versions[blk->offset/db->ino_block_size]; auto & new_version = db->new_versions[blk->offset/db->ino_block_size];
if (new_version != 0) if (new_version != 0)
{ {
@ -1096,6 +1115,7 @@ static void write_block(kv_db_t *db, kv_block_t *blk, std::function<void(int)> c
int res = op->retval == op->len ? 0 : (op->retval > 0 ? -EIO : op->retval); int res = op->retval == op->len ? 0 : (op->retval > 0 ? -EIO : op->retval);
if (res == 0) if (res == 0)
{ {
blk->invalidated = false;
db->known_versions[blk->offset/db->ino_block_size] = op->version; db->known_versions[blk->offset/db->ino_block_size] = op->version;
auto b_it = db->continue_write.find(blk->offset/db->ino_block_size); auto b_it = db->continue_write.find(blk->offset/db->ino_block_size);
if (b_it != db->continue_write.end()) if (b_it != db->continue_write.end())
@ -1186,6 +1206,7 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int
if (is_zero(op->iov.buf[0].iov_base, db->kv_block_size)) if (is_zero(op->iov.buf[0].iov_base, db->kv_block_size))
{ {
// OK, block is still empty, but the version apparently changed // OK, block is still empty, but the version apparently changed
blk->invalidated = false;
write_new_block(db, blk, cb); write_new_block(db, blk, cb);
} }
else else
@ -1198,6 +1219,7 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int
db->block_cache.erase(old_offset); db->block_cache.erase(old_offset);
auto new_blk = &db->block_cache[new_offset]; auto new_blk = &db->block_cache[new_offset];
new_blk->offset = new_offset; new_blk->offset = new_offset;
new_blk->invalidated = false;
add_block_level(db, new_blk); add_block_level(db, new_blk);
write_new_block(db, new_blk, cb); write_new_block(db, new_blk, cb);
} }
@ -1529,6 +1551,7 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
{ {
if (res < 0) if (res < 0)
{ {
// FIXME: clear_block left_blk
blk->cancel_change(); blk->cancel_change();
db->stop_updating(right_blk); db->stop_updating(right_blk);
db->stop_updating(blk); db->stop_updating(blk);
@ -1547,12 +1570,18 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
new_root->data[separator] = std::string((char*)&right_blk->offset, sizeof(right_blk->offset)); new_root->data[separator] = std::string((char*)&right_blk->offset, sizeof(right_blk->offset));
new_root->set_data_size(); new_root->set_data_size();
new_root->updating++; new_root->updating++;
if (blk->invalidated)
{
new_root->invalidated = true;
}
write_block(db, new_root, [=](int write_res) write_block(db, new_root, [=](int write_res)
{ {
if (write_res < 0) if (write_res < 0)
{ {
blk->cancel_change(); auto blk_offset = blk->offset;
db->stop_updating(blk); del_block_level(db, blk);
db->block_cache.erase(blk_offset);
db->run_continue_update(blk_offset);
clear_block(db, left_blk, 0, [=, left_offset = left_blk->offset](int res) clear_block(db, left_blk, 0, [=, left_offset = left_blk->offset](int res)
{ {
if (res < 0) if (res < 0)