Invalidate blocks being updated too
parent
b7a41e6394
commit
bf9a279ff9
|
@ -83,6 +83,7 @@ struct kv_block_t
|
|||
|
||||
// set during update
|
||||
int updating = 0;
|
||||
bool invalidated = false;
|
||||
int change_type;
|
||||
std::string change_key, change_value;
|
||||
std::string change_rh;
|
||||
|
@ -708,6 +709,11 @@ static void invalidate(kv_db_t *db, uint64_t offset, uint64_t version)
|
|||
if (b_it->second.updating > 0)
|
||||
{
|
||||
// do not forget blocks during modification
|
||||
// but we have to remember the fact that they're not valid anymore
|
||||
// because otherwise, if we keep `updating` flag more than just during
|
||||
// the write_block() operation, we may end up modifying an outdated
|
||||
// version of the block in memory
|
||||
b_it->second.invalidated = true;
|
||||
b_it++;
|
||||
}
|
||||
else
|
||||
|
@ -790,8 +796,8 @@ static void try_evict(kv_db_t *db)
|
|||
static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_policy, std::function<void(int, bool)> cb)
|
||||
{
|
||||
auto b_it = db->block_cache.find(offset);
|
||||
if (b_it != db->block_cache.end() && (recheck_policy == KV_RECHECK_NONE ||
|
||||
recheck_policy == KV_RECHECK_LEAF && b_it->second.type != KV_LEAF ||
|
||||
if (b_it != db->block_cache.end() && (recheck_policy == KV_RECHECK_NONE && !b_it->second.invalidated ||
|
||||
recheck_policy == KV_RECHECK_LEAF && b_it->second.type != KV_LEAF && !b_it->second.invalidated ||
|
||||
b_it->second.updating))
|
||||
{
|
||||
// Block already in cache, we can proceed
|
||||
|
@ -1064,6 +1070,19 @@ static std::string find_splitter(kv_db_t *db, kv_block_t *blk)
|
|||
|
||||
static void write_block(kv_db_t *db, kv_block_t *blk, std::function<void(int)> cb)
|
||||
{
|
||||
if (blk->invalidated)
|
||||
{
|
||||
// Cancel all writes into the same inode block
|
||||
auto b_it = db->continue_write.find(blk->offset/db->ino_block_size);
|
||||
while (b_it != db->continue_write.end() && b_it->first == blk->offset/db->ino_block_size)
|
||||
{
|
||||
auto cont = b_it->second;
|
||||
db->continue_write.erase(b_it++);
|
||||
cont.cb(-EINTR);
|
||||
}
|
||||
cb(-EINTR);
|
||||
return;
|
||||
}
|
||||
auto & new_version = db->new_versions[blk->offset/db->ino_block_size];
|
||||
if (new_version != 0)
|
||||
{
|
||||
|
@ -1096,6 +1115,7 @@ static void write_block(kv_db_t *db, kv_block_t *blk, std::function<void(int)> c
|
|||
int res = op->retval == op->len ? 0 : (op->retval > 0 ? -EIO : op->retval);
|
||||
if (res == 0)
|
||||
{
|
||||
blk->invalidated = false;
|
||||
db->known_versions[blk->offset/db->ino_block_size] = op->version;
|
||||
auto b_it = db->continue_write.find(blk->offset/db->ino_block_size);
|
||||
if (b_it != db->continue_write.end())
|
||||
|
@ -1186,6 +1206,7 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int
|
|||
if (is_zero(op->iov.buf[0].iov_base, db->kv_block_size))
|
||||
{
|
||||
// OK, block is still empty, but the version apparently changed
|
||||
blk->invalidated = false;
|
||||
write_new_block(db, blk, cb);
|
||||
}
|
||||
else
|
||||
|
@ -1198,6 +1219,7 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int
|
|||
db->block_cache.erase(old_offset);
|
||||
auto new_blk = &db->block_cache[new_offset];
|
||||
new_blk->offset = new_offset;
|
||||
new_blk->invalidated = false;
|
||||
add_block_level(db, new_blk);
|
||||
write_new_block(db, new_blk, cb);
|
||||
}
|
||||
|
@ -1529,6 +1551,7 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
|
|||
{
|
||||
if (res < 0)
|
||||
{
|
||||
// FIXME: clear_block left_blk
|
||||
blk->cancel_change();
|
||||
db->stop_updating(right_blk);
|
||||
db->stop_updating(blk);
|
||||
|
@ -1547,12 +1570,18 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
|
|||
new_root->data[separator] = std::string((char*)&right_blk->offset, sizeof(right_blk->offset));
|
||||
new_root->set_data_size();
|
||||
new_root->updating++;
|
||||
if (blk->invalidated)
|
||||
{
|
||||
new_root->invalidated = true;
|
||||
}
|
||||
write_block(db, new_root, [=](int write_res)
|
||||
{
|
||||
if (write_res < 0)
|
||||
{
|
||||
blk->cancel_change();
|
||||
db->stop_updating(blk);
|
||||
auto blk_offset = blk->offset;
|
||||
del_block_level(db, blk);
|
||||
db->block_cache.erase(blk_offset);
|
||||
db->run_continue_update(blk_offset);
|
||||
clear_block(db, left_blk, 0, [=, left_offset = left_blk->offset](int res)
|
||||
{
|
||||
if (res < 0)
|
||||
|
|
Loading…
Reference in New Issue