diff --git a/src/kv_db.cpp b/src/kv_db.cpp index 38f76373..6ddce588 100644 --- a/src/kv_db.cpp +++ b/src/kv_db.cpp @@ -83,6 +83,7 @@ struct kv_block_t // set during update int updating = 0; + bool invalidated = false; int change_type; std::string change_key, change_value; std::string change_rh; @@ -708,6 +709,11 @@ static void invalidate(kv_db_t *db, uint64_t offset, uint64_t version) if (b_it->second.updating > 0) { // do not forget blocks during modification + // but we have to remember the fact that they're not valid anymore + // because otherwise, if we keep `updating` flag more than just during + // the write_block() operation, we may end up modifying an outdated + // version of the block in memory + b_it->second.invalidated = true; b_it++; } else @@ -790,8 +796,8 @@ static void try_evict(kv_db_t *db) static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_policy, std::function cb) { auto b_it = db->block_cache.find(offset); - if (b_it != db->block_cache.end() && (recheck_policy == KV_RECHECK_NONE || - recheck_policy == KV_RECHECK_LEAF && b_it->second.type != KV_LEAF || + if (b_it != db->block_cache.end() && (recheck_policy == KV_RECHECK_NONE && !b_it->second.invalidated || + recheck_policy == KV_RECHECK_LEAF && b_it->second.type != KV_LEAF && !b_it->second.invalidated || b_it->second.updating)) { // Block already in cache, we can proceed @@ -1064,6 +1070,19 @@ static std::string find_splitter(kv_db_t *db, kv_block_t *blk) static void write_block(kv_db_t *db, kv_block_t *blk, std::function cb) { + if (blk->invalidated) + { + // Cancel all writes into the same inode block + auto b_it = db->continue_write.find(blk->offset/db->ino_block_size); + while (b_it != db->continue_write.end() && b_it->first == blk->offset/db->ino_block_size) + { + auto cont = b_it->second; + db->continue_write.erase(b_it++); + cont.cb(-EINTR); + } + cb(-EINTR); + return; + } auto & new_version = db->new_versions[blk->offset/db->ino_block_size]; if (new_version != 0) { @@ -1096,6 +1115,7 @@ static void write_block(kv_db_t *db, kv_block_t *blk, std::function c int res = op->retval == op->len ? 0 : (op->retval > 0 ? -EIO : op->retval); if (res == 0) { + blk->invalidated = false; db->known_versions[blk->offset/db->ino_block_size] = op->version; auto b_it = db->continue_write.find(blk->offset/db->ino_block_size); if (b_it != db->continue_write.end()) @@ -1186,6 +1206,7 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::functioniov.buf[0].iov_base, db->kv_block_size)) { // OK, block is still empty, but the version apparently changed + blk->invalidated = false; write_new_block(db, blk, cb); } else @@ -1198,6 +1219,7 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::functionblock_cache.erase(old_offset); auto new_blk = &db->block_cache[new_offset]; new_blk->offset = new_offset; + new_blk->invalidated = false; add_block_level(db, new_blk); write_new_block(db, new_blk, cb); } @@ -1529,6 +1551,7 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key { if (res < 0) { + // FIXME: clear_block left_blk blk->cancel_change(); db->stop_updating(right_blk); db->stop_updating(blk); @@ -1547,12 +1570,18 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key new_root->data[separator] = std::string((char*)&right_blk->offset, sizeof(right_blk->offset)); new_root->set_data_size(); new_root->updating++; + if (blk->invalidated) + { + new_root->invalidated = true; + } write_block(db, new_root, [=](int write_res) { if (write_res < 0) { - blk->cancel_change(); - db->stop_updating(blk); + auto blk_offset = blk->offset; + del_block_level(db, blk); + db->block_cache.erase(blk_offset); + db->run_continue_update(blk_offset); clear_block(db, left_blk, 0, [=, left_offset = left_blk->offset](int res) { if (res < 0)