From 28c4324c3683340ab4cdc9f9e4dece25f9239ba1 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 23 Oct 2023 01:23:20 +0300 Subject: [PATCH] Remove blocks from cache on unsuccessful updates --- src/kv_db.cpp | 69 +++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 51 insertions(+), 18 deletions(-) diff --git a/src/kv_db.cpp b/src/kv_db.cpp index e430262b..f2d11654 100644 --- a/src/kv_db.cpp +++ b/src/kv_db.cpp @@ -111,6 +111,12 @@ int kv_block_t::kv_size(const std::string & key, const std::string & value) return 4*2 + key.size() + value.size(); } +struct kv_continue_write_t +{ + kv_block_t *blk; + std::function cb; +}; + struct kv_db_t { cluster_client_t *cli = NULL; @@ -133,7 +139,7 @@ struct kv_db_t std::map block_cache; std::map known_versions; std::map new_versions; - std::multimap> continue_write; + std::multimap continue_write; std::multimap> continue_update; bool closing = false; @@ -959,7 +965,7 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split) fprintf(stderr, "K/V: Internal block %lu misses boundary for %s\n", cur_block, key.c_str()); return -EILSEQ; } - auto m = child_it == blk->data.end() ? "" : child_it->first; + auto m = child_it == blk->data.end() ? blk->key_lt : child_it->first; child_it--; if (child_it->second.size() != sizeof(uint64_t)) { @@ -1011,7 +1017,7 @@ static void write_block(kv_db_t *db, kv_block_t *blk, std::function c if (new_version != 0) { // Wait if block is being modified - db->continue_write.emplace(blk->offset/db->ino_block_size, [=]() { write_block(db, blk, cb); }); + db->continue_write.emplace(blk->offset/db->ino_block_size, (kv_continue_write_t){ .blk = blk, .cb = cb }); return; } new_version = 1+db->known_versions[blk->offset/db->ino_block_size]; @@ -1024,7 +1030,11 @@ static void write_block(kv_db_t *db, kv_block_t *blk, std::function c op->iov.push_back(malloc_or_die(op->len), op->len); if (!blk->serialize((uint8_t*)op->iov.buf[0].iov_base, op->len)) { - fprintf(stderr, "K/V: block %lu grew too large: %u bytes\n", blk->offset, blk->data_size); + blk->dump(db->base_block_level); + uint64_t old_size = blk->data_size; + blk->set_data_size(); + fprintf(stderr, "K/V: block %lu (ptr=%lx) grew too large: tracked %lu, but real is %u bytes\n", + blk->offset, (uint64_t)blk, old_size, blk->data_size); abort(); return; } @@ -1036,13 +1046,24 @@ static void write_block(kv_db_t *db, kv_block_t *blk, std::function c if (res == 0) { db->known_versions[blk->offset/db->ino_block_size] = op->version; + auto b_it = db->continue_write.find(blk->offset/db->ino_block_size); + if (b_it != db->continue_write.end()) + { + auto cont = b_it->second; + db->continue_write.erase(b_it++); + write_block(db, cont.blk, cont.cb); + } } - auto b_it = db->continue_write.find(blk->offset/db->ino_block_size); - if (b_it != db->continue_write.end()) + else { - auto cont_cb = b_it->second; - db->continue_write.erase(b_it); - cont_cb(); + // Cancel all writes into the same inode block + auto b_it = db->continue_write.find(blk->offset/db->ino_block_size); + while (b_it != db->continue_write.end() && b_it->first == blk->offset/db->ino_block_size) + { + auto cont = b_it->second; + db->continue_write.erase(b_it++); + cont.cb(res); + } } delete op; if (res < 0 || db->immediate_commit) @@ -1274,15 +1295,19 @@ void kv_op_t::create_root() blk->updating++; write_block(db, blk, [=](int res) { - db->stop_updating(blk); if (res == -EINTR) { + auto blk_offset = blk->offset; del_block_level(db, blk); - db->block_cache.erase(blk->offset); + db->block_cache.erase(blk_offset); + db->run_continue_update(blk_offset); update(); } else + { + db->stop_updating(blk); finish(res); + } }); } @@ -1405,10 +1430,17 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key write_block(db, blk, [=](int res) { if (res < 0) - blk->cancel_change(); + { + auto blk_offset = blk->offset; + del_block_level(db, blk); + db->block_cache.erase(blk_offset); + db->run_continue_update(blk_offset); + } else + { blk->apply_change(); - db->stop_updating(blk); + db->stop_updating(blk); + } if (res == -EINTR) { update(); @@ -1522,13 +1554,12 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key } write_block(db, blk, [=](int write_res) { - if (write_res < 0) - blk->cancel_change(); - else - blk->apply_change(); - db->stop_updating(blk); if (write_res < 0) { + auto blk_offset = blk->offset; + del_block_level(db, blk); + db->block_cache.erase(blk_offset); + db->run_continue_update(blk_offset); clear_block(db, right_blk, 0, [=, right_offset = right_blk->offset](int res) { if (res < 0) @@ -1542,6 +1573,8 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key } else { + blk->apply_change(); + db->stop_updating(blk); db->stop_updating(right_blk); // Add a reference to the parent block // Do not cleanup anything on failure because stored right_blk is already referenced