Remove blocks from cache on unsuccessful updates

master
Vitaliy Filippov 2023-10-23 01:23:20 +03:00
parent 31ec3fa8f5
commit 28c4324c36
1 changed files with 51 additions and 18 deletions

View File

@ -111,6 +111,12 @@ int kv_block_t::kv_size(const std::string & key, const std::string & value)
return 4*2 + key.size() + value.size();
}
struct kv_continue_write_t
{
kv_block_t *blk;
std::function<void(int)> cb;
};
struct kv_db_t
{
cluster_client_t *cli = NULL;
@ -133,7 +139,7 @@ struct kv_db_t
std::map<uint64_t, kv_block_t> block_cache;
std::map<uint64_t, uint64_t> known_versions;
std::map<uint64_t, uint64_t> new_versions;
std::multimap<uint64_t, std::function<void()>> continue_write;
std::multimap<uint64_t, kv_continue_write_t> continue_write;
std::multimap<uint64_t, std::function<void()>> continue_update;
bool closing = false;
@ -959,7 +965,7 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
fprintf(stderr, "K/V: Internal block %lu misses boundary for %s\n", cur_block, key.c_str());
return -EILSEQ;
}
auto m = child_it == blk->data.end() ? "" : child_it->first;
auto m = child_it == blk->data.end() ? blk->key_lt : child_it->first;
child_it--;
if (child_it->second.size() != sizeof(uint64_t))
{
@ -1011,7 +1017,7 @@ static void write_block(kv_db_t *db, kv_block_t *blk, std::function<void(int)> c
if (new_version != 0)
{
// Wait if block is being modified
db->continue_write.emplace(blk->offset/db->ino_block_size, [=]() { write_block(db, blk, cb); });
db->continue_write.emplace(blk->offset/db->ino_block_size, (kv_continue_write_t){ .blk = blk, .cb = cb });
return;
}
new_version = 1+db->known_versions[blk->offset/db->ino_block_size];
@ -1024,7 +1030,11 @@ static void write_block(kv_db_t *db, kv_block_t *blk, std::function<void(int)> c
op->iov.push_back(malloc_or_die(op->len), op->len);
if (!blk->serialize((uint8_t*)op->iov.buf[0].iov_base, op->len))
{
fprintf(stderr, "K/V: block %lu grew too large: %u bytes\n", blk->offset, blk->data_size);
blk->dump(db->base_block_level);
uint64_t old_size = blk->data_size;
blk->set_data_size();
fprintf(stderr, "K/V: block %lu (ptr=%lx) grew too large: tracked %lu, but real is %u bytes\n",
blk->offset, (uint64_t)blk, old_size, blk->data_size);
abort();
return;
}
@ -1036,13 +1046,24 @@ static void write_block(kv_db_t *db, kv_block_t *blk, std::function<void(int)> c
if (res == 0)
{
db->known_versions[blk->offset/db->ino_block_size] = op->version;
auto b_it = db->continue_write.find(blk->offset/db->ino_block_size);
if (b_it != db->continue_write.end())
{
auto cont = b_it->second;
db->continue_write.erase(b_it++);
write_block(db, cont.blk, cont.cb);
}
}
auto b_it = db->continue_write.find(blk->offset/db->ino_block_size);
if (b_it != db->continue_write.end())
else
{
auto cont_cb = b_it->second;
db->continue_write.erase(b_it);
cont_cb();
// Cancel all writes into the same inode block
auto b_it = db->continue_write.find(blk->offset/db->ino_block_size);
while (b_it != db->continue_write.end() && b_it->first == blk->offset/db->ino_block_size)
{
auto cont = b_it->second;
db->continue_write.erase(b_it++);
cont.cb(res);
}
}
delete op;
if (res < 0 || db->immediate_commit)
@ -1274,15 +1295,19 @@ void kv_op_t::create_root()
blk->updating++;
write_block(db, blk, [=](int res)
{
db->stop_updating(blk);
if (res == -EINTR)
{
auto blk_offset = blk->offset;
del_block_level(db, blk);
db->block_cache.erase(blk->offset);
db->block_cache.erase(blk_offset);
db->run_continue_update(blk_offset);
update();
}
else
{
db->stop_updating(blk);
finish(res);
}
});
}
@ -1405,10 +1430,17 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
write_block(db, blk, [=](int res)
{
if (res < 0)
blk->cancel_change();
{
auto blk_offset = blk->offset;
del_block_level(db, blk);
db->block_cache.erase(blk_offset);
db->run_continue_update(blk_offset);
}
else
{
blk->apply_change();
db->stop_updating(blk);
db->stop_updating(blk);
}
if (res == -EINTR)
{
update();
@ -1522,13 +1554,12 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
}
write_block(db, blk, [=](int write_res)
{
if (write_res < 0)
blk->cancel_change();
else
blk->apply_change();
db->stop_updating(blk);
if (write_res < 0)
{
auto blk_offset = blk->offset;
del_block_level(db, blk);
db->block_cache.erase(blk_offset);
db->run_continue_update(blk_offset);
clear_block(db, right_blk, 0, [=, right_offset = right_blk->offset](int res)
{
if (res < 0)
@ -1542,6 +1573,8 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
}
else
{
blk->apply_change();
db->stop_updating(blk);
db->stop_updating(right_blk);
// Add a reference to the parent block
// Do not cleanup anything on failure because stored right_blk is already referenced