Compare commits

...

1 Commits

Author SHA1 Message Date
2089a9ef6c WIP Support "prefetch" in kv_db
All checks were successful
Test / test_heal_csum_4k_dmj (push) Successful in 2m19s
Test / test_heal_csum_4k_dj (push) Successful in 2m20s
Test / test_resize_auto (push) Successful in 8s
Test / test_resize (push) Successful in 14s
Test / test_osd_tags (push) Successful in 6s
Test / test_snapshot_pool2 (push) Successful in 14s
Test / test_enospc (push) Successful in 11s
Test / test_enospc_xor (push) Successful in 13s
Test / test_enospc_imm (push) Successful in 10s
Test / test_enospc_imm_xor (push) Successful in 12s
Test / test_scrub (push) Successful in 11s
Test / test_scrub_zero_osd_2 (push) Successful in 14s
Test / test_scrub_xor (push) Successful in 14s
Test / test_scrub_pg_size_3 (push) Successful in 12s
Test / test_scrub_pg_size_6_pg_minsize_4_osd_count_6_ec (push) Successful in 16s
Test / test_scrub_ec (push) Successful in 14s
Test / test_heal_csum_4k (push) Successful in 2m17s
Test / test_nfs (push) Successful in 12s
Test / buildenv (push) Successful in 10s
Test / build (push) Successful in 14s
Test / npm_lint (push) Successful in 9s
Test / test_cas (push) Successful in 7s
Test / make_test (push) Successful in 36s
Test / test_change_pg_count (push) Successful in 17s
Test / test_change_pg_size (push) Successful in 7s
Test / test_change_pg_count_ec (push) Successful in 20s
Test / test_create_nomaxid (push) Successful in 8s
Test / test_add_osd (push) Successful in 1m11s
Test / test_etcd_fail (push) Successful in 38s
Test / test_etcd_fail_antietcd (push) Successful in 38s
2025-05-03 15:06:59 +03:00

View File

@@ -96,7 +96,7 @@ struct kv_block_t
void set_data_size();
static int kv_size(const std::string & key, const std::string & value);
int parse(uint64_t offset, uint8_t *data, int size);
int parse(uint64_t offset, uint8_t *data, int size, bool allow_empty = false);
bool serialize(uint8_t *data, int size);
void apply_change();
void cancel_change();
@@ -243,13 +243,13 @@ static std::string read_string(uint8_t *data, int size, int *pos)
return key;
}
int kv_block_t::parse(uint64_t offset, uint8_t *data, int size)
int kv_block_t::parse(uint64_t offset, uint8_t *data, int size, bool allow_empty)
{
kv_stored_block_t *blk = (kv_stored_block_t *)data;
if (blk->magic == 0 || blk->type == KV_EMPTY)
{
// empty block
if (offset != 0)
if (offset != 0 && !allow_empty)
fprintf(stderr, "K/V: Block %ju is %s\n", offset, blk->magic == 0 ? "empty" : "cleared");
return -ENOTBLK;
}
@@ -884,7 +884,10 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p
}
else
{
op->len = db->kv_block_size;
op->offset = (offset / db->ino_block_size) * db->ino_block_size;
op->len = db->ino_block_size;
//op->offset = offset;
//op->len = db->kv_block_size;
op->iov.push_back(malloc_or_die(op->len), op->len);
}
op->callback = [=](cluster_op_t *op)
@@ -898,59 +901,82 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p
delete op;
return;
}
int blk_err = 0;
int blk_status = 0;
invalidate(db, op->offset, op->version);
auto blk_it = db->block_cache.find(op->offset);
if (blk_it != db->block_cache.end() &&
// read may start BEFORE update and end AFTER update, in this case known will be > returned version
(db->known_versions[op->offset/db->ino_block_size] >= op->version && !blk_it->second.invalidated || blk_it->second.updating > 0))
for (uint64_t in_offset = op->offset; in_offset < (op->offset+op->len) || in_offset == offset; in_offset += db->kv_block_size)
{
auto blk = &db->block_cache.at(op->offset);
if (blk->updating > 0 && recheck_policy == KV_RECHECK_WAIT)
auto blk_it = db->block_cache.find(in_offset);
if (blk_it != db->block_cache.end() &&
// read may start BEFORE update and end AFTER update, in this case known will be > returned version
(db->known_versions[in_offset/db->ino_block_size] >= op->version && !blk_it->second.invalidated || blk_it->second.updating > 0))
{
// Wait until block update stops
if (op->len)
free(op->iov.buf[0].iov_base);
delete op;
db->continue_update.emplace(blk->offset, [=, blk_offset = blk->offset]()
auto blk = &db->block_cache.at(in_offset);
if (blk->updating > 0 && recheck_policy == KV_RECHECK_WAIT)
{
get_block(db, offset, cur_level, recheck_policy, cb);
db->run_continue_update(blk_offset);
});
return;
// Wait until block update stops
if (op->len)
free(op->iov.buf[0].iov_base);
delete op;
db->continue_update.emplace(in_offset, [=, blk_offset = blk->offset]()
{
get_block(db, offset, cur_level, recheck_policy, cb);
db->run_continue_update(blk_offset);
});
return;
}
}
blk->usage = db->usage_counter;
cb(0, blk->updating > 0 ? BLK_UPDATING : BLK_NOCHANGE);
}
else
{
if (!op->len)
else if (!op->len)
{
// Version check failed, re-read block
delete op;
get_block(db, offset, cur_level, recheck_policy, cb);
return;
}
auto blk = &db->block_cache[op->offset];
if (blk_it != db->block_cache.end())
}
for (uint64_t in_offset = op->offset; in_offset < (op->offset+op->len) || in_offset == offset; in_offset += db->kv_block_size)
{
auto blk_it = db->block_cache.find(in_offset);
if (blk_it != db->block_cache.end() &&
// read may start BEFORE update and end AFTER update, in this case known will be > returned version
(db->known_versions[in_offset/db->ino_block_size] >= op->version && !blk_it->second.invalidated || blk_it->second.updating > 0))
{
del_block_level(db, blk);
*blk = {};
}
int err = blk->parse(op->offset, (uint8_t*)op->iov.buf[0].iov_base, op->len);
if (err == 0)
{
blk->level = cur_level;
auto blk = &db->block_cache.at(in_offset);
blk->usage = db->usage_counter;
add_block_level(db, blk);
cb(0, BLK_RELOADED);
if (in_offset == offset)
{
blk_err = 0;
blk_status = (blk->updating > 0 ? BLK_UPDATING : BLK_NOCHANGE);
}
}
else
{
db->block_cache.erase(op->offset);
cb(err, BLK_NOCHANGE);
auto blk = &db->block_cache[in_offset];
if (blk_it != db->block_cache.end())
{
del_block_level(db, blk);
*blk = {};
}
int err = blk->parse(in_offset, (uint8_t*)op->iov.buf[0].iov_base + (in_offset - op->offset), db->kv_block_size, in_offset != offset);
if (in_offset == offset)
{
blk_err = err;
blk_status = err == 0 ? BLK_RELOADED : BLK_NOCHANGE;
}
if (err == 0)
{
blk->level = cur_level;
blk->usage = db->usage_counter;
add_block_level(db, blk);
}
else
{
db->block_cache.erase(in_offset);
}
try_evict(db);
}
try_evict(db);
}
cb(blk_err, blk_status);
if (op->len)
free(op->iov.buf[0].iov_base);
delete op;
@@ -1860,7 +1886,6 @@ void kv_op_t::next_handle_block(int res, int refresh)
else
{
// OK, leaf block found
recheck_policy = KV_RECHECK_NONE;
next_get();
}
}
@@ -1887,7 +1912,6 @@ void kv_op_t::next_get()
else if (blk->type == KV_LEAF_SPLIT)
{
// Left half finished, go to the right
recheck_policy = KV_RECHECK_LEAF;
key = blk->right_half;
skip_equal = false;
prev_key_ge = blk->right_half;
@@ -1921,7 +1945,6 @@ void kv_op_t::next_go_up()
path.pop_back();
cur_level--;
cur_block = path[path.size()-1].offset;
recheck_policy = KV_RECHECK_LEAF;
// Check if we can resume listing from the next key
auto pb_it = db->block_cache.find(cur_block);
if (pb_it == db->block_cache.end())