Fix a race condition where changed blocks were parsed over existing cached blocks and getting a mix of data

antietcd
Vitaliy Filippov 2023-11-05 20:00:55 +03:00
parent 5cf9b343c0
commit 20993d9b7a
1 changed files with 12 additions and 4 deletions

View File

@ -245,6 +245,7 @@ int kv_block_t::parse(uint64_t offset, uint8_t *data, int size)
fprintf(stderr, "K/V: Invalid block %lu magic, size, type or item count\n", offset); fprintf(stderr, "K/V: Invalid block %lu magic, size, type or item count\n", offset);
return -EILSEQ; return -EILSEQ;
} }
assert(!this->type);
this->type = blk->type; this->type = blk->type;
int pos = blk->data - data; int pos = blk->data - data;
this->key_ge = read_string(data, size, &pos); this->key_ge = read_string(data, size, &pos);
@ -587,6 +588,7 @@ uint64_t kv_db_t::alloc_block()
allocating_blocks[allocating_block_pos] = UINT64_MAX; allocating_blocks[allocating_block_pos] = UINT64_MAX;
} }
allocating_block_pos = (allocating_block_pos+1) % allocating_blocks.size(); allocating_block_pos = (allocating_block_pos+1) % allocating_blocks.size();
assert(block_cache.find(pos) == block_cache.end());
return pos; return pos;
} }
@ -821,8 +823,10 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p
cb(op->retval >= 0 ? -EIO : op->retval, false); cb(op->retval >= 0 ? -EIO : op->retval, false);
return; return;
} }
if (db->block_cache.find(op->offset) != db->block_cache.end() && invalidate(db, op->offset, op->version);
db->known_versions[op->offset/db->ino_block_size] == op->version) auto blk_it = db->block_cache.find(op->offset);
if (blk_it != db->block_cache.end() &&
(db->known_versions[op->offset/db->ino_block_size] == op->version || blk_it->second.updating > 0))
{ {
auto blk = &db->block_cache.at(op->offset); auto blk = &db->block_cache.at(op->offset);
blk->usage = db->usage_counter; blk->usage = db->usage_counter;
@ -830,9 +834,12 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p
} }
else else
{ {
invalidate(db, op->offset, op->version);
try_evict(db);
auto blk = &db->block_cache[op->offset]; auto blk = &db->block_cache[op->offset];
if (blk_it != db->block_cache.end())
{
del_block_level(db, blk);
*blk = {};
}
int err = blk->parse(op->offset, (uint8_t*)op->iov.buf[0].iov_base, op->len); int err = blk->parse(op->offset, (uint8_t*)op->iov.buf[0].iov_base, op->len);
if (err == 0) if (err == 0)
{ {
@ -846,6 +853,7 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p
db->block_cache.erase(op->offset); db->block_cache.erase(op->offset);
cb(err, false); cb(err, false);
} }
try_evict(db);
} }
free(op->iov.buf[0].iov_base); free(op->iov.buf[0].iov_base);
delete op; delete op;