Make get_block() wait for updating when unrelated block is found along the path

master
Vitaliy Filippov 2023-11-06 02:30:25 +03:00
parent 20993d9b7a
commit eab67a6e8f
1 changed files with 58 additions and 27 deletions

View File

@ -36,7 +36,8 @@
#define KV_RECHECK_NONE 0
#define KV_RECHECK_LEAF 1
#define KV_RECHECK_ALL 2
#define KV_RECHECK_ALL 2
#define KV_RECHECK_WAIT 3
#define KV_CH_ADD 1
#define KV_CH_DEL 2
@ -48,6 +49,10 @@
#define LEVEL_BITS 8
#define NO_LEVEL_MASK (((uint64_t)1 << (64-LEVEL_BITS)) - 1)
#define BLK_NOCHANGE 0
#define BLK_RELOADED 1
#define BLK_UPDATING 2
struct __attribute__((__packed__)) kv_stored_block_t
{
uint64_t magic;
@ -190,13 +195,13 @@ protected:
std::string prev_key_ge, prev_key_lt;
int cur_level = 0;
std::vector<kv_path_t> path;
bool updated = false;
int updating_on_path = 0;
int retry = 0;
bool skip_equal = false;
void finish(int res);
void get();
int handle_block(int res, bool updated, bool stop_on_split);
int handle_block(int res, int refresh, bool stop_on_split);
void update();
void update_find();
@ -204,7 +209,7 @@ protected:
void resume_split();
void update_block(int path_pos, bool is_delete, const std::string & key, const std::string & value, std::function<void(int)> cb);
void next_handle_block(int res, bool updated);
void next_handle_block(int res, int refresh);
void next_get();
void next_go_up();
};
@ -797,16 +802,27 @@ static void try_evict(kv_db_t *db)
}
}
static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_policy, std::function<void(int, bool)> cb)
static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_policy, std::function<void(int, int)> cb)
{
auto b_it = db->block_cache.find(offset);
if (b_it != db->block_cache.end() && (recheck_policy == KV_RECHECK_NONE && !b_it->second.invalidated ||
recheck_policy == KV_RECHECK_LEAF && b_it->second.type != KV_LEAF && !b_it->second.invalidated ||
b_it->second.updating))
b_it->second.updating > 0))
{
auto blk = &b_it->second;
if (blk->updating > 0 && recheck_policy == KV_RECHECK_WAIT)
{
// Wait until block update stops
db->continue_update.emplace(blk->offset, [=, blk_offset = blk->offset]()
{
get_block(db, offset, cur_level, recheck_policy, cb);
db->run_continue_update(blk_offset);
});
return;
}
// Block already in cache, we can proceed
b_it->second.usage = db->usage_counter;
cb(0, false);
blk->usage = db->usage_counter;
cb(0, BLK_UPDATING);
return;
}
cluster_op_t *op = new cluster_op_t;
@ -820,7 +836,7 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p
if (op->retval != op->len)
{
// error
cb(op->retval >= 0 ? -EIO : op->retval, false);
cb(op->retval >= 0 ? -EIO : op->retval, BLK_NOCHANGE);
return;
}
invalidate(db, op->offset, op->version);
@ -829,8 +845,18 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p
(db->known_versions[op->offset/db->ino_block_size] == op->version || blk_it->second.updating > 0))
{
auto blk = &db->block_cache.at(op->offset);
if (blk->updating > 0 && recheck_policy == KV_RECHECK_WAIT)
{
// Wait until block update stops
db->continue_update.emplace(blk->offset, [=, blk_offset = blk->offset]()
{
get_block(db, offset, cur_level, recheck_policy, cb);
db->run_continue_update(blk_offset);
});
return;
}
blk->usage = db->usage_counter;
cb(0, false);
cb(0, blk->updating > 0 ? BLK_UPDATING : BLK_NOCHANGE);
}
else
{
@ -846,12 +872,12 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p
blk->level = cur_level;
blk->usage = db->usage_counter;
add_block_level(db, blk);
cb(0, true);
cb(0, BLK_RELOADED);
}
else
{
db->block_cache.erase(op->offset);
cb(err, false);
cb(err, BLK_NOCHANGE);
}
try_evict(db);
}
@ -908,9 +934,9 @@ void kv_op_t::finish(int res)
void kv_op_t::get()
{
get_block(db, cur_block, cur_level, recheck_policy, [=](int res, bool updated)
get_block(db, cur_block, cur_level, recheck_policy, [=](int res, int refresh)
{
res = handle_block(res, updated, false);
res = handle_block(res, refresh, false);
if (res == -EAGAIN)
{
get();
@ -947,13 +973,13 @@ void kv_op_t::get()
});
}
int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
int kv_op_t::handle_block(int res, int refresh, bool stop_on_split)
{
if (res < 0)
{
return res;
}
this->updated = this->updated || updated;
this->updating_on_path = this->updating_on_path | refresh;
auto blk = &db->block_cache.at(cur_block);
if (opcode != KV_GET && opcode != KV_GET_CACHED)
{
@ -974,16 +1000,22 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
// We may read P on step (1), get a link to A, and read A on step (4).
// It will miss data from [c, b).
// Retry once. If we don't see any updates after retrying - fail with EILSEQ.
bool fatal = !this->updated && this->retry > 0;
bool fatal = !this->updating_on_path && this->retry > 0;
if (fatal || db->log_level > 0)
{
fprintf(stderr, "K/V: %sgot unrelated block %lu: key=%s range=[%s, %s) from=[%s, %s)\n",
fatal ? "Error: " : "Warning: read/update collision: ",
cur_block, key.c_str(), blk->key_ge.c_str(), blk->key_lt.c_str(), prev_key_ge.c_str(), prev_key_lt.c_str());
}
if (this->updated)
this->recheck_policy = KV_RECHECK_ALL;
if (this->updating_on_path)
{
this->updated = false;
if (this->updating_on_path & BLK_UPDATING)
{
// Wait for "updating" blocks on next run
this->recheck_policy = KV_RECHECK_WAIT;
}
this->updating_on_path = 0;
this->retry = 0;
}
else if (this->retry > 0)
@ -992,7 +1024,7 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
}
else
{
this->updated = false;
this->updating_on_path = 0;
this->retry++;
}
prev_key_ge = prev_key_lt = "";
@ -1003,7 +1035,6 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
path.clear();
path.push_back((kv_path_t){ .offset = 0 });
}
this->recheck_policy = KV_RECHECK_ALL;
return -EAGAIN;
}
if (stop_on_split && (blk->type == KV_LEAF_SPLIT || blk->type == KV_INT_SPLIT) &&
@ -1301,9 +1332,9 @@ void kv_op_t::update()
void kv_op_t::update_find()
{
get_block(db, cur_block, cur_level, recheck_policy, [=, checked_block = cur_block](int res, bool updated)
get_block(db, cur_block, cur_level, recheck_policy, [=, checked_block = cur_block](int res, int refresh)
{
res = handle_block(res, updated, true);
res = handle_block(res, refresh, true);
if (res == -EAGAIN)
{
update_find();
@ -1672,15 +1703,15 @@ void kv_op_t::next()
{
return;
}
get_block(db, cur_block, cur_level, recheck_policy, [=](int res, bool updated)
get_block(db, cur_block, cur_level, recheck_policy, [=](int res, int refresh)
{
next_handle_block(res, updated);
next_handle_block(res, refresh);
});
}
void kv_op_t::next_handle_block(int res, bool updated)
void kv_op_t::next_handle_block(int res, int refresh)
{
res = handle_block(res, updated, false);
res = handle_block(res, refresh, false);
if (res == -EAGAIN)
{
next();