Compare commits

...

1 Commits

Author SHA1 Message Date
2089a9ef6c WIP Support "prefetch" in kv_db
All checks were successful
Test / test_heal_csum_4k_dmj (push) Successful in 2m19s
Test / test_heal_csum_4k_dj (push) Successful in 2m20s
Test / test_resize_auto (push) Successful in 8s
Test / test_resize (push) Successful in 14s
Test / test_osd_tags (push) Successful in 6s
Test / test_snapshot_pool2 (push) Successful in 14s
Test / test_enospc (push) Successful in 11s
Test / test_enospc_xor (push) Successful in 13s
Test / test_enospc_imm (push) Successful in 10s
Test / test_enospc_imm_xor (push) Successful in 12s
Test / test_scrub (push) Successful in 11s
Test / test_scrub_zero_osd_2 (push) Successful in 14s
Test / test_scrub_xor (push) Successful in 14s
Test / test_scrub_pg_size_3 (push) Successful in 12s
Test / test_scrub_pg_size_6_pg_minsize_4_osd_count_6_ec (push) Successful in 16s
Test / test_scrub_ec (push) Successful in 14s
Test / test_heal_csum_4k (push) Successful in 2m17s
Test / test_nfs (push) Successful in 12s
Test / buildenv (push) Successful in 10s
Test / build (push) Successful in 14s
Test / npm_lint (push) Successful in 9s
Test / test_cas (push) Successful in 7s
Test / make_test (push) Successful in 36s
Test / test_change_pg_count (push) Successful in 17s
Test / test_change_pg_size (push) Successful in 7s
Test / test_change_pg_count_ec (push) Successful in 20s
Test / test_create_nomaxid (push) Successful in 8s
Test / test_add_osd (push) Successful in 1m11s
Test / test_etcd_fail (push) Successful in 38s
Test / test_etcd_fail_antietcd (push) Successful in 38s
2025-05-03 15:06:59 +03:00

View File

@@ -96,7 +96,7 @@ struct kv_block_t
void set_data_size(); void set_data_size();
static int kv_size(const std::string & key, const std::string & value); static int kv_size(const std::string & key, const std::string & value);
int parse(uint64_t offset, uint8_t *data, int size); int parse(uint64_t offset, uint8_t *data, int size, bool allow_empty = false);
bool serialize(uint8_t *data, int size); bool serialize(uint8_t *data, int size);
void apply_change(); void apply_change();
void cancel_change(); void cancel_change();
@@ -243,13 +243,13 @@ static std::string read_string(uint8_t *data, int size, int *pos)
return key; return key;
} }
int kv_block_t::parse(uint64_t offset, uint8_t *data, int size) int kv_block_t::parse(uint64_t offset, uint8_t *data, int size, bool allow_empty)
{ {
kv_stored_block_t *blk = (kv_stored_block_t *)data; kv_stored_block_t *blk = (kv_stored_block_t *)data;
if (blk->magic == 0 || blk->type == KV_EMPTY) if (blk->magic == 0 || blk->type == KV_EMPTY)
{ {
// empty block // empty block
if (offset != 0) if (offset != 0 && !allow_empty)
fprintf(stderr, "K/V: Block %ju is %s\n", offset, blk->magic == 0 ? "empty" : "cleared"); fprintf(stderr, "K/V: Block %ju is %s\n", offset, blk->magic == 0 ? "empty" : "cleared");
return -ENOTBLK; return -ENOTBLK;
} }
@@ -884,7 +884,10 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p
} }
else else
{ {
op->len = db->kv_block_size; op->offset = (offset / db->ino_block_size) * db->ino_block_size;
op->len = db->ino_block_size;
//op->offset = offset;
//op->len = db->kv_block_size;
op->iov.push_back(malloc_or_die(op->len), op->len); op->iov.push_back(malloc_or_die(op->len), op->len);
} }
op->callback = [=](cluster_op_t *op) op->callback = [=](cluster_op_t *op)
@@ -898,59 +901,82 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p
delete op; delete op;
return; return;
} }
int blk_err = 0;
int blk_status = 0;
invalidate(db, op->offset, op->version); invalidate(db, op->offset, op->version);
auto blk_it = db->block_cache.find(op->offset); for (uint64_t in_offset = op->offset; in_offset < (op->offset+op->len) || in_offset == offset; in_offset += db->kv_block_size)
if (blk_it != db->block_cache.end() &&
// read may start BEFORE update and end AFTER update, in this case known will be > returned version
(db->known_versions[op->offset/db->ino_block_size] >= op->version && !blk_it->second.invalidated || blk_it->second.updating > 0))
{ {
auto blk = &db->block_cache.at(op->offset); auto blk_it = db->block_cache.find(in_offset);
if (blk->updating > 0 && recheck_policy == KV_RECHECK_WAIT) if (blk_it != db->block_cache.end() &&
// read may start BEFORE update and end AFTER update, in this case known will be > returned version
(db->known_versions[in_offset/db->ino_block_size] >= op->version && !blk_it->second.invalidated || blk_it->second.updating > 0))
{ {
// Wait until block update stops auto blk = &db->block_cache.at(in_offset);
if (op->len) if (blk->updating > 0 && recheck_policy == KV_RECHECK_WAIT)
free(op->iov.buf[0].iov_base);
delete op;
db->continue_update.emplace(blk->offset, [=, blk_offset = blk->offset]()
{ {
get_block(db, offset, cur_level, recheck_policy, cb); // Wait until block update stops
db->run_continue_update(blk_offset); if (op->len)
}); free(op->iov.buf[0].iov_base);
return; delete op;
db->continue_update.emplace(in_offset, [=, blk_offset = blk->offset]()
{
get_block(db, offset, cur_level, recheck_policy, cb);
db->run_continue_update(blk_offset);
});
return;
}
} }
blk->usage = db->usage_counter; else if (!op->len)
cb(0, blk->updating > 0 ? BLK_UPDATING : BLK_NOCHANGE);
}
else
{
if (!op->len)
{ {
// Version check failed, re-read block // Version check failed, re-read block
delete op; delete op;
get_block(db, offset, cur_level, recheck_policy, cb); get_block(db, offset, cur_level, recheck_policy, cb);
return; return;
} }
auto blk = &db->block_cache[op->offset]; }
if (blk_it != db->block_cache.end()) for (uint64_t in_offset = op->offset; in_offset < (op->offset+op->len) || in_offset == offset; in_offset += db->kv_block_size)
{
auto blk_it = db->block_cache.find(in_offset);
if (blk_it != db->block_cache.end() &&
// read may start BEFORE update and end AFTER update, in this case known will be > returned version
(db->known_versions[in_offset/db->ino_block_size] >= op->version && !blk_it->second.invalidated || blk_it->second.updating > 0))
{ {
del_block_level(db, blk); auto blk = &db->block_cache.at(in_offset);
*blk = {};
}
int err = blk->parse(op->offset, (uint8_t*)op->iov.buf[0].iov_base, op->len);
if (err == 0)
{
blk->level = cur_level;
blk->usage = db->usage_counter; blk->usage = db->usage_counter;
add_block_level(db, blk); if (in_offset == offset)
cb(0, BLK_RELOADED); {
blk_err = 0;
blk_status = (blk->updating > 0 ? BLK_UPDATING : BLK_NOCHANGE);
}
} }
else else
{ {
db->block_cache.erase(op->offset); auto blk = &db->block_cache[in_offset];
cb(err, BLK_NOCHANGE); if (blk_it != db->block_cache.end())
{
del_block_level(db, blk);
*blk = {};
}
int err = blk->parse(in_offset, (uint8_t*)op->iov.buf[0].iov_base + (in_offset - op->offset), db->kv_block_size, in_offset != offset);
if (in_offset == offset)
{
blk_err = err;
blk_status = err == 0 ? BLK_RELOADED : BLK_NOCHANGE;
}
if (err == 0)
{
blk->level = cur_level;
blk->usage = db->usage_counter;
add_block_level(db, blk);
}
else
{
db->block_cache.erase(in_offset);
}
try_evict(db);
} }
try_evict(db);
} }
cb(blk_err, blk_status);
if (op->len) if (op->len)
free(op->iov.buf[0].iov_base); free(op->iov.buf[0].iov_base);
delete op; delete op;
@@ -1860,7 +1886,6 @@ void kv_op_t::next_handle_block(int res, int refresh)
else else
{ {
// OK, leaf block found // OK, leaf block found
recheck_policy = KV_RECHECK_NONE;
next_get(); next_get();
} }
} }
@@ -1887,7 +1912,6 @@ void kv_op_t::next_get()
else if (blk->type == KV_LEAF_SPLIT) else if (blk->type == KV_LEAF_SPLIT)
{ {
// Left half finished, go to the right // Left half finished, go to the right
recheck_policy = KV_RECHECK_LEAF;
key = blk->right_half; key = blk->right_half;
skip_equal = false; skip_equal = false;
prev_key_ge = blk->right_half; prev_key_ge = blk->right_half;
@@ -1921,7 +1945,6 @@ void kv_op_t::next_go_up()
path.pop_back(); path.pop_back();
cur_level--; cur_level--;
cur_block = path[path.size()-1].offset; cur_block = path[path.size()-1].offset;
recheck_policy = KV_RECHECK_LEAF;
// Check if we can resume listing from the next key // Check if we can resume listing from the next key
auto pb_it = db->block_cache.find(cur_block); auto pb_it = db->block_cache.find(cur_block);
if (pb_it == db->block_cache.end()) if (pb_it == db->block_cache.end())