diff --git a/src/kv_db.cpp b/src/kv_db.cpp index 40d54836..b1ca88df 100644 --- a/src/kv_db.cpp +++ b/src/kv_db.cpp @@ -842,7 +842,8 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p invalidate(db, op->offset, op->version); auto blk_it = db->block_cache.find(op->offset); if (blk_it != db->block_cache.end() && - (db->known_versions[op->offset/db->ino_block_size] == op->version || blk_it->second.updating > 0)) + // read may start BEFORE update and end AFTER update, in this case known will be > returned version + (db->known_versions[op->offset/db->ino_block_size] >= op->version && !blk_it->second.invalidated || blk_it->second.updating > 0)) { auto blk = &db->block_cache.at(op->offset); if (blk->updating > 0 && recheck_policy == KV_RECHECK_WAIT) @@ -987,7 +988,7 @@ int kv_op_t::handle_block(int res, int refresh, bool stop_on_split) // and recheck parent blocks if their versions change. This is required // because we have to handle parallel splits of parent blocks correctly assert(path.size() > 0); - path[path.size()-1].version = db->known_versions[cur_block/db->ino_block_size]; + path[path.size()-1].version = blk->invalidated ? 0 : db->known_versions[cur_block/db->ino_block_size]; } if (key < blk->key_ge || blk->key_lt.size() && key >= blk->key_lt) { @@ -1071,7 +1072,9 @@ int kv_op_t::handle_block(int res, int refresh, bool stop_on_split) fprintf(stderr, "K/V: Internal block %lu misses boundary for %s\n", cur_block, key.c_str()); return -EILSEQ; } - auto m = child_it == blk->data.end() ? blk->key_lt : child_it->first; + auto m = child_it == blk->data.end() + ? (blk->type == KV_LEAF_SPLIT || blk->type == KV_INT_SPLIT + ? blk->right_half : blk->key_lt) : child_it->first; child_it--; if (child_it->second.size() != sizeof(uint64_t)) { @@ -1464,7 +1467,7 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key db->continue_update.emplace(blk->offset, [=]() { update_block(path_pos, is_delete, key, value, cb); }); return; } - if (db->known_versions[blk->offset/db->ino_block_size] != block_ver) + if (db->known_versions[blk->offset/db->ino_block_size] != block_ver || blk->invalidated) { // Recheck if block was modified in the meantime db->run_continue_update(blk->offset); @@ -1766,7 +1769,7 @@ void kv_op_t::next_get() this->key = kv_it->first; this->value = kv_it->second; skip_equal = true; - callback(this); + (std::function(callback))(this); } // Find next block else if (blk->type == KV_LEAF_SPLIT)