From eab67a6e8f9e4bd0c7a7bc6b49db22d9d6f302b8 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 6 Nov 2023 02:30:25 +0300 Subject: [PATCH] Make get_block() wait for updating when unrelated block is found along the path --- src/kv_db.cpp | 85 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 58 insertions(+), 27 deletions(-) diff --git a/src/kv_db.cpp b/src/kv_db.cpp index cd879046..6561a9d9 100644 --- a/src/kv_db.cpp +++ b/src/kv_db.cpp @@ -36,7 +36,8 @@ #define KV_RECHECK_NONE 0 #define KV_RECHECK_LEAF 1 -#define KV_RECHECK_ALL 2 +#define KV_RECHECK_ALL 2 +#define KV_RECHECK_WAIT 3 #define KV_CH_ADD 1 #define KV_CH_DEL 2 @@ -48,6 +49,10 @@ #define LEVEL_BITS 8 #define NO_LEVEL_MASK (((uint64_t)1 << (64-LEVEL_BITS)) - 1) +#define BLK_NOCHANGE 0 +#define BLK_RELOADED 1 +#define BLK_UPDATING 2 + struct __attribute__((__packed__)) kv_stored_block_t { uint64_t magic; @@ -190,13 +195,13 @@ protected: std::string prev_key_ge, prev_key_lt; int cur_level = 0; std::vector path; - bool updated = false; + int updating_on_path = 0; int retry = 0; bool skip_equal = false; void finish(int res); void get(); - int handle_block(int res, bool updated, bool stop_on_split); + int handle_block(int res, int refresh, bool stop_on_split); void update(); void update_find(); @@ -204,7 +209,7 @@ protected: void resume_split(); void update_block(int path_pos, bool is_delete, const std::string & key, const std::string & value, std::function cb); - void next_handle_block(int res, bool updated); + void next_handle_block(int res, int refresh); void next_get(); void next_go_up(); }; @@ -797,16 +802,27 @@ static void try_evict(kv_db_t *db) } } -static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_policy, std::function cb) +static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_policy, std::function cb) { auto b_it = db->block_cache.find(offset); if (b_it != db->block_cache.end() && (recheck_policy == KV_RECHECK_NONE && !b_it->second.invalidated || recheck_policy == KV_RECHECK_LEAF && b_it->second.type != KV_LEAF && !b_it->second.invalidated || - b_it->second.updating)) + b_it->second.updating > 0)) { + auto blk = &b_it->second; + if (blk->updating > 0 && recheck_policy == KV_RECHECK_WAIT) + { + // Wait until block update stops + db->continue_update.emplace(blk->offset, [=, blk_offset = blk->offset]() + { + get_block(db, offset, cur_level, recheck_policy, cb); + db->run_continue_update(blk_offset); + }); + return; + } // Block already in cache, we can proceed - b_it->second.usage = db->usage_counter; - cb(0, false); + blk->usage = db->usage_counter; + cb(0, BLK_UPDATING); return; } cluster_op_t *op = new cluster_op_t; @@ -820,7 +836,7 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p if (op->retval != op->len) { // error - cb(op->retval >= 0 ? -EIO : op->retval, false); + cb(op->retval >= 0 ? -EIO : op->retval, BLK_NOCHANGE); return; } invalidate(db, op->offset, op->version); @@ -829,8 +845,18 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p (db->known_versions[op->offset/db->ino_block_size] == op->version || blk_it->second.updating > 0)) { auto blk = &db->block_cache.at(op->offset); + if (blk->updating > 0 && recheck_policy == KV_RECHECK_WAIT) + { + // Wait until block update stops + db->continue_update.emplace(blk->offset, [=, blk_offset = blk->offset]() + { + get_block(db, offset, cur_level, recheck_policy, cb); + db->run_continue_update(blk_offset); + }); + return; + } blk->usage = db->usage_counter; - cb(0, false); + cb(0, blk->updating > 0 ? BLK_UPDATING : BLK_NOCHANGE); } else { @@ -846,12 +872,12 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p blk->level = cur_level; blk->usage = db->usage_counter; add_block_level(db, blk); - cb(0, true); + cb(0, BLK_RELOADED); } else { db->block_cache.erase(op->offset); - cb(err, false); + cb(err, BLK_NOCHANGE); } try_evict(db); } @@ -908,9 +934,9 @@ void kv_op_t::finish(int res) void kv_op_t::get() { - get_block(db, cur_block, cur_level, recheck_policy, [=](int res, bool updated) + get_block(db, cur_block, cur_level, recheck_policy, [=](int res, int refresh) { - res = handle_block(res, updated, false); + res = handle_block(res, refresh, false); if (res == -EAGAIN) { get(); @@ -947,13 +973,13 @@ void kv_op_t::get() }); } -int kv_op_t::handle_block(int res, bool updated, bool stop_on_split) +int kv_op_t::handle_block(int res, int refresh, bool stop_on_split) { if (res < 0) { return res; } - this->updated = this->updated || updated; + this->updating_on_path = this->updating_on_path | refresh; auto blk = &db->block_cache.at(cur_block); if (opcode != KV_GET && opcode != KV_GET_CACHED) { @@ -974,16 +1000,22 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split) // We may read P on step (1), get a link to A, and read A on step (4). // It will miss data from [c, b). // Retry once. If we don't see any updates after retrying - fail with EILSEQ. - bool fatal = !this->updated && this->retry > 0; + bool fatal = !this->updating_on_path && this->retry > 0; if (fatal || db->log_level > 0) { fprintf(stderr, "K/V: %sgot unrelated block %lu: key=%s range=[%s, %s) from=[%s, %s)\n", fatal ? "Error: " : "Warning: read/update collision: ", cur_block, key.c_str(), blk->key_ge.c_str(), blk->key_lt.c_str(), prev_key_ge.c_str(), prev_key_lt.c_str()); } - if (this->updated) + this->recheck_policy = KV_RECHECK_ALL; + if (this->updating_on_path) { - this->updated = false; + if (this->updating_on_path & BLK_UPDATING) + { + // Wait for "updating" blocks on next run + this->recheck_policy = KV_RECHECK_WAIT; + } + this->updating_on_path = 0; this->retry = 0; } else if (this->retry > 0) @@ -992,7 +1024,7 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split) } else { - this->updated = false; + this->updating_on_path = 0; this->retry++; } prev_key_ge = prev_key_lt = ""; @@ -1003,7 +1035,6 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split) path.clear(); path.push_back((kv_path_t){ .offset = 0 }); } - this->recheck_policy = KV_RECHECK_ALL; return -EAGAIN; } if (stop_on_split && (blk->type == KV_LEAF_SPLIT || blk->type == KV_INT_SPLIT) && @@ -1301,9 +1332,9 @@ void kv_op_t::update() void kv_op_t::update_find() { - get_block(db, cur_block, cur_level, recheck_policy, [=, checked_block = cur_block](int res, bool updated) + get_block(db, cur_block, cur_level, recheck_policy, [=, checked_block = cur_block](int res, int refresh) { - res = handle_block(res, updated, true); + res = handle_block(res, refresh, true); if (res == -EAGAIN) { update_find(); @@ -1672,15 +1703,15 @@ void kv_op_t::next() { return; } - get_block(db, cur_block, cur_level, recheck_policy, [=](int res, bool updated) + get_block(db, cur_block, cur_level, recheck_policy, [=](int res, int refresh) { - next_handle_block(res, updated); + next_handle_block(res, refresh); }); } -void kv_op_t::next_handle_block(int res, bool updated) +void kv_op_t::next_handle_block(int res, int refresh) { - res = handle_block(res, updated, false); + res = handle_block(res, refresh, false); if (res == -EAGAIN) { next();