Make get_block() wait for updating when unrelated block is found along the path
parent
3393463466
commit
e847e26912
|
@ -36,7 +36,8 @@
|
|||
|
||||
#define KV_RECHECK_NONE 0
|
||||
#define KV_RECHECK_LEAF 1
|
||||
#define KV_RECHECK_ALL 2
|
||||
#define KV_RECHECK_ALL 2
|
||||
#define KV_RECHECK_WAIT 3
|
||||
|
||||
#define KV_CH_ADD 1
|
||||
#define KV_CH_DEL 2
|
||||
|
@ -48,6 +49,10 @@
|
|||
#define LEVEL_BITS 8
|
||||
#define NO_LEVEL_MASK (((uint64_t)1 << (64-LEVEL_BITS)) - 1)
|
||||
|
||||
#define BLK_NOCHANGE 0
|
||||
#define BLK_RELOADED 1
|
||||
#define BLK_UPDATING 2
|
||||
|
||||
struct __attribute__((__packed__)) kv_stored_block_t
|
||||
{
|
||||
uint64_t magic;
|
||||
|
@ -190,13 +195,13 @@ protected:
|
|||
std::string prev_key_ge, prev_key_lt;
|
||||
int cur_level = 0;
|
||||
std::vector<kv_path_t> path;
|
||||
bool updated = false;
|
||||
int updating_on_path = 0;
|
||||
int retry = 0;
|
||||
bool skip_equal = false;
|
||||
|
||||
void finish(int res);
|
||||
void get();
|
||||
int handle_block(int res, bool updated, bool stop_on_split);
|
||||
int handle_block(int res, int refresh, bool stop_on_split);
|
||||
|
||||
void update();
|
||||
void update_find();
|
||||
|
@ -204,7 +209,7 @@ protected:
|
|||
void resume_split();
|
||||
void update_block(int path_pos, bool is_delete, const std::string & key, const std::string & value, std::function<void(int)> cb);
|
||||
|
||||
void next_handle_block(int res, bool updated);
|
||||
void next_handle_block(int res, int refresh);
|
||||
void next_get();
|
||||
void next_go_up();
|
||||
};
|
||||
|
@ -797,16 +802,27 @@ static void try_evict(kv_db_t *db)
|
|||
}
|
||||
}
|
||||
|
||||
static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_policy, std::function<void(int, bool)> cb)
|
||||
static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_policy, std::function<void(int, int)> cb)
|
||||
{
|
||||
auto b_it = db->block_cache.find(offset);
|
||||
if (b_it != db->block_cache.end() && (recheck_policy == KV_RECHECK_NONE && !b_it->second.invalidated ||
|
||||
recheck_policy == KV_RECHECK_LEAF && b_it->second.type != KV_LEAF && !b_it->second.invalidated ||
|
||||
b_it->second.updating))
|
||||
b_it->second.updating > 0))
|
||||
{
|
||||
auto blk = &b_it->second;
|
||||
if (blk->updating > 0 && recheck_policy == KV_RECHECK_WAIT)
|
||||
{
|
||||
// Wait until block update stops
|
||||
db->continue_update.emplace(blk->offset, [=, blk_offset = blk->offset]()
|
||||
{
|
||||
get_block(db, offset, cur_level, recheck_policy, cb);
|
||||
db->run_continue_update(blk_offset);
|
||||
});
|
||||
return;
|
||||
}
|
||||
// Block already in cache, we can proceed
|
||||
b_it->second.usage = db->usage_counter;
|
||||
cb(0, false);
|
||||
blk->usage = db->usage_counter;
|
||||
cb(0, BLK_UPDATING);
|
||||
return;
|
||||
}
|
||||
cluster_op_t *op = new cluster_op_t;
|
||||
|
@ -820,7 +836,7 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p
|
|||
if (op->retval != op->len)
|
||||
{
|
||||
// error
|
||||
cb(op->retval >= 0 ? -EIO : op->retval, false);
|
||||
cb(op->retval >= 0 ? -EIO : op->retval, BLK_NOCHANGE);
|
||||
return;
|
||||
}
|
||||
invalidate(db, op->offset, op->version);
|
||||
|
@ -829,8 +845,18 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p
|
|||
(db->known_versions[op->offset/db->ino_block_size] == op->version || blk_it->second.updating > 0))
|
||||
{
|
||||
auto blk = &db->block_cache.at(op->offset);
|
||||
if (blk->updating > 0 && recheck_policy == KV_RECHECK_WAIT)
|
||||
{
|
||||
// Wait until block update stops
|
||||
db->continue_update.emplace(blk->offset, [=, blk_offset = blk->offset]()
|
||||
{
|
||||
get_block(db, offset, cur_level, recheck_policy, cb);
|
||||
db->run_continue_update(blk_offset);
|
||||
});
|
||||
return;
|
||||
}
|
||||
blk->usage = db->usage_counter;
|
||||
cb(0, false);
|
||||
cb(0, blk->updating > 0 ? BLK_UPDATING : BLK_NOCHANGE);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -846,12 +872,12 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p
|
|||
blk->level = cur_level;
|
||||
blk->usage = db->usage_counter;
|
||||
add_block_level(db, blk);
|
||||
cb(0, true);
|
||||
cb(0, BLK_RELOADED);
|
||||
}
|
||||
else
|
||||
{
|
||||
db->block_cache.erase(op->offset);
|
||||
cb(err, false);
|
||||
cb(err, BLK_NOCHANGE);
|
||||
}
|
||||
try_evict(db);
|
||||
}
|
||||
|
@ -908,9 +934,9 @@ void kv_op_t::finish(int res)
|
|||
|
||||
void kv_op_t::get()
|
||||
{
|
||||
get_block(db, cur_block, cur_level, recheck_policy, [=](int res, bool updated)
|
||||
get_block(db, cur_block, cur_level, recheck_policy, [=](int res, int refresh)
|
||||
{
|
||||
res = handle_block(res, updated, false);
|
||||
res = handle_block(res, refresh, false);
|
||||
if (res == -EAGAIN)
|
||||
{
|
||||
get();
|
||||
|
@ -947,13 +973,13 @@ void kv_op_t::get()
|
|||
});
|
||||
}
|
||||
|
||||
int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
|
||||
int kv_op_t::handle_block(int res, int refresh, bool stop_on_split)
|
||||
{
|
||||
if (res < 0)
|
||||
{
|
||||
return res;
|
||||
}
|
||||
this->updated = this->updated || updated;
|
||||
this->updating_on_path = this->updating_on_path | refresh;
|
||||
auto blk = &db->block_cache.at(cur_block);
|
||||
if (opcode != KV_GET && opcode != KV_GET_CACHED)
|
||||
{
|
||||
|
@ -974,16 +1000,22 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
|
|||
// We may read P on step (1), get a link to A, and read A on step (4).
|
||||
// It will miss data from [c, b).
|
||||
// Retry once. If we don't see any updates after retrying - fail with EILSEQ.
|
||||
bool fatal = !this->updated && this->retry > 0;
|
||||
bool fatal = !this->updating_on_path && this->retry > 0;
|
||||
if (fatal || db->log_level > 0)
|
||||
{
|
||||
fprintf(stderr, "K/V: %sgot unrelated block %lu: key=%s range=[%s, %s) from=[%s, %s)\n",
|
||||
fatal ? "Error: " : "Warning: read/update collision: ",
|
||||
cur_block, key.c_str(), blk->key_ge.c_str(), blk->key_lt.c_str(), prev_key_ge.c_str(), prev_key_lt.c_str());
|
||||
}
|
||||
if (this->updated)
|
||||
this->recheck_policy = KV_RECHECK_ALL;
|
||||
if (this->updating_on_path)
|
||||
{
|
||||
this->updated = false;
|
||||
if (this->updating_on_path & BLK_UPDATING)
|
||||
{
|
||||
// Wait for "updating" blocks on next run
|
||||
this->recheck_policy = KV_RECHECK_WAIT;
|
||||
}
|
||||
this->updating_on_path = 0;
|
||||
this->retry = 0;
|
||||
}
|
||||
else if (this->retry > 0)
|
||||
|
@ -992,7 +1024,7 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
|
|||
}
|
||||
else
|
||||
{
|
||||
this->updated = false;
|
||||
this->updating_on_path = 0;
|
||||
this->retry++;
|
||||
}
|
||||
prev_key_ge = prev_key_lt = "";
|
||||
|
@ -1003,7 +1035,6 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
|
|||
path.clear();
|
||||
path.push_back((kv_path_t){ .offset = 0 });
|
||||
}
|
||||
this->recheck_policy = KV_RECHECK_ALL;
|
||||
return -EAGAIN;
|
||||
}
|
||||
if (stop_on_split && (blk->type == KV_LEAF_SPLIT || blk->type == KV_INT_SPLIT) &&
|
||||
|
@ -1301,9 +1332,9 @@ void kv_op_t::update()
|
|||
|
||||
void kv_op_t::update_find()
|
||||
{
|
||||
get_block(db, cur_block, cur_level, recheck_policy, [=, checked_block = cur_block](int res, bool updated)
|
||||
get_block(db, cur_block, cur_level, recheck_policy, [=, checked_block = cur_block](int res, int refresh)
|
||||
{
|
||||
res = handle_block(res, updated, true);
|
||||
res = handle_block(res, refresh, true);
|
||||
if (res == -EAGAIN)
|
||||
{
|
||||
update_find();
|
||||
|
@ -1672,15 +1703,15 @@ void kv_op_t::next()
|
|||
{
|
||||
return;
|
||||
}
|
||||
get_block(db, cur_block, cur_level, recheck_policy, [=](int res, bool updated)
|
||||
get_block(db, cur_block, cur_level, recheck_policy, [=](int res, int refresh)
|
||||
{
|
||||
next_handle_block(res, updated);
|
||||
next_handle_block(res, refresh);
|
||||
});
|
||||
}
|
||||
|
||||
void kv_op_t::next_handle_block(int res, bool updated)
|
||||
void kv_op_t::next_handle_block(int res, int refresh)
|
||||
{
|
||||
res = handle_block(res, updated, false);
|
||||
res = handle_block(res, refresh, false);
|
||||
if (res == -EAGAIN)
|
||||
{
|
||||
next();
|
||||
|
|
Loading…
Reference in New Issue