More fixes

- do not overwrite a block with older version if known version is newer
  (read may start before update and end after update)
- invalidated block versions can't be remembered and trusted
- right boundary for split blocks is right_half when diving down, not key_lt
- restart update also when block is "invalidated", not just on version mismatch
- copy callback in listings to avoid closure destruction bugs too
kv-debug
Vitaliy Filippov 2023-11-28 01:00:13 +03:00
parent c5b00f897a
commit 13e2d3ce7c
1 changed files with 8 additions and 5 deletions

View File

@ -842,7 +842,8 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p
invalidate(db, op->offset, op->version);
auto blk_it = db->block_cache.find(op->offset);
if (blk_it != db->block_cache.end() &&
(db->known_versions[op->offset/db->ino_block_size] == op->version || blk_it->second.updating > 0))
// read may start BEFORE update and end AFTER update, in this case known will be > returned version
(db->known_versions[op->offset/db->ino_block_size] >= op->version && !blk_it->second.invalidated || blk_it->second.updating > 0))
{
auto blk = &db->block_cache.at(op->offset);
if (blk->updating > 0 && recheck_policy == KV_RECHECK_WAIT)
@ -987,7 +988,7 @@ int kv_op_t::handle_block(int res, int refresh, bool stop_on_split)
// and recheck parent blocks if their versions change. This is required
// because we have to handle parallel splits of parent blocks correctly
assert(path.size() > 0);
path[path.size()-1].version = db->known_versions[cur_block/db->ino_block_size];
path[path.size()-1].version = blk->invalidated ? 0 : db->known_versions[cur_block/db->ino_block_size];
}
if (key < blk->key_ge || blk->key_lt.size() && key >= blk->key_lt)
{
@ -1071,7 +1072,9 @@ int kv_op_t::handle_block(int res, int refresh, bool stop_on_split)
fprintf(stderr, "K/V: Internal block %lu misses boundary for %s\n", cur_block, key.c_str());
return -EILSEQ;
}
auto m = child_it == blk->data.end() ? blk->key_lt : child_it->first;
auto m = child_it == blk->data.end()
? (blk->type == KV_LEAF_SPLIT || blk->type == KV_INT_SPLIT
? blk->right_half : blk->key_lt) : child_it->first;
child_it--;
if (child_it->second.size() != sizeof(uint64_t))
{
@ -1464,7 +1467,7 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
db->continue_update.emplace(blk->offset, [=]() { update_block(path_pos, is_delete, key, value, cb); });
return;
}
if (db->known_versions[blk->offset/db->ino_block_size] != block_ver)
if (db->known_versions[blk->offset/db->ino_block_size] != block_ver || blk->invalidated)
{
// Recheck if block was modified in the meantime
db->run_continue_update(blk->offset);
@ -1766,7 +1769,7 @@ void kv_op_t::next_get()
this->key = kv_it->first;
this->value = kv_it->second;
skip_equal = true;
callback(this);
(std::function<void(kv_op_t *)>(callback))(this);
}
// Find next block
else if (blk->type == KV_LEAF_SPLIT)