Track block level

master
Vitaliy Filippov 2023-09-30 17:07:07 +03:00
parent fd1d8a8520
commit 5e7f27a02d
1 changed files with 60 additions and 38 deletions

View File

@ -51,6 +51,10 @@ struct __attribute__((__packed__)) kv_stored_block_t
struct kv_block_t
{
// level of the block. root block has level equal to -db->base_block_level
int level;
// usage flag. set to db->usage_counter when block is used
int usage;
// current data size, to estimate whether the block can fit more items
uint32_t data_size;
uint32_t type;
@ -72,7 +76,7 @@ struct kv_block_t
void set_data_size();
static int kv_size(const std::string & key, const std::string & value);
int parse(uint64_t offset, uint8_t *data, int size);
int parse(uint8_t *data, int size);
bool serialize(uint8_t *data, int size);
};
@ -99,6 +103,10 @@ struct kv_db_t
uint32_t kv_block_size = 0;
uint32_t ino_block_size = 0;
bool immediate_commit = false;
uint64_t cache_max_blocks = 0;
int base_block_level = 0;
int usage_counter = 1;
std::map<uint64_t, kv_block_t> block_cache;
std::map<uint64_t, uint64_t> known_versions;
std::multimap<uint64_t, std::function<void()>> continue_update;
@ -131,6 +139,7 @@ protected:
bool started = false;
uint64_t cur_block = 0;
std::string prev_key_ge, prev_key_lt;
int cur_level = 0;
std::vector<uint64_t> path;
bool updated = false;
bool skip_equal = false;
@ -169,7 +178,7 @@ static std::string read_string(uint8_t *data, int size, int *pos)
return key;
}
int kv_block_t::parse(uint64_t offset, uint8_t *data, int size)
int kv_block_t::parse(uint8_t *data, int size)
{
kv_stored_block_t *blk = (kv_stored_block_t *)data;
if (blk->magic == 0 || blk->type == KV_EMPTY)
@ -184,7 +193,6 @@ int kv_block_t::parse(uint64_t offset, uint8_t *data, int size)
return -EILSEQ;
}
this->type = blk->type;
this->offset = offset;
this->new_version = 0;
int pos = blk->data - data;
this->key_ge = read_string(data, size, &pos);
@ -415,7 +423,7 @@ static void invalidate(kv_db_t *db, uint64_t offset, uint64_t version)
}
}
static void get_block(kv_db_t *db, uint64_t offset, int recheck_policy, std::function<void(int, bool)> cb)
static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_policy, std::function<void(int, bool)> cb)
{
// FIXME: Evict blocks from cache based on memory limit and block level
auto b_it = db->block_cache.find(offset);
@ -425,6 +433,7 @@ static void get_block(kv_db_t *db, uint64_t offset, int recheck_policy, std::fun
b_it->second.updating || b_it->second.new_version != 0))
{
// Block already in cache, we can proceed
b_it->second.usage = db->usage_counter;
cb(0, false);
return;
}
@ -445,14 +454,20 @@ static void get_block(kv_db_t *db, uint64_t offset, int recheck_policy, std::fun
if (db->known_versions[op->offset/db->ino_block_size] == op->version &&
db->block_cache.find(op->offset) != db->block_cache.end())
{
auto blk = &db->block_cache.at(op->offset);
blk->usage = db->usage_counter;
cb(0, true);
}
else
{
invalidate(db, op->offset, op->version);
int err = db->block_cache[op->offset].parse(op->offset, (uint8_t*)op->iov.buf[0].iov_base, op->len);
auto blk = &db->block_cache[op->offset];
int err = blk->parse((uint8_t*)op->iov.buf[0].iov_base, op->len);
if (err == 0)
{
blk->offset = op->offset;
blk->level = cur_level;
blk->usage = db->usage_counter;
cb(0, true);
}
else
@ -479,6 +494,7 @@ void kv_op_t::exec()
return;
db->active_ops++;
started = true;
cur_level = -db->base_block_level;
if (opcode == KV_GET)
get();
else if (opcode == KV_SET || opcode == KV_DEL)
@ -503,7 +519,7 @@ void kv_op_t::finish(int res)
void kv_op_t::get()
{
get_block(db, cur_block, recheck_policy, [=](int res, bool updated)
get_block(db, cur_block, cur_level, recheck_policy, [=](int res, bool updated)
{
res = handle_block(res, updated, false);
if (res == -EAGAIN)
@ -543,7 +559,7 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
return res;
}
this->updated = this->updated || updated;
auto blk = & db->block_cache.at(cur_block);
auto blk = &db->block_cache.at(cur_block);
if (key < blk->key_ge || blk->key_lt.size() && key >= blk->key_lt)
{
// We got an unrelated block - recheck the whole chain from the beginning
@ -557,6 +573,7 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
if (!this->updated)
return -EILSEQ;
prev_key_ge = prev_key_lt = "";
cur_level = -db->base_block_level;
cur_block = 0;
this->recheck_policy = KV_RECHECK_ALL;
this->updated = false;
@ -594,6 +611,7 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split)
// Track left and right boundaries which have led us to cur_block
prev_key_ge = child_it->first;
prev_key_lt = m;
cur_level++;
cur_block = *((uint64_t*)child_it->second.data());
return -EAGAIN;
}
@ -675,7 +693,9 @@ static kv_block_t *create_new_block(kv_db_t *db, kv_block_t *old_blk, std::strin
{
auto new_offset = db->next_free;
db->next_free += db->kv_block_size;
auto blk = & db->block_cache[new_offset];
auto blk = &db->block_cache[new_offset];
blk->usage = db->usage_counter;
blk->level = old_blk->level;
blk->type = old_blk->type;
blk->offset = new_offset;
blk->key_ge = right ? separator : old_blk->key_ge;
@ -722,7 +742,9 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int
db->block_cache[new_offset] = std::move(db->block_cache[blk->offset]);
db->block_cache.erase(blk->offset);
auto new_blk = &db->block_cache[new_offset];
*new_blk = *blk;
new_blk->offset = new_offset;
db->block_cache.erase(blk->offset);
write_new_block(db, new_blk, cb);
}
free(op->iov.buf[0].iov_base);
@ -789,7 +811,7 @@ void kv_op_t::update_find()
path.clear();
}
path.push_back(cur_block);
get_block(db, cur_block, recheck_policy, [=](int res, bool updated)
get_block(db, cur_block, cur_level, recheck_policy, [=](int res, bool updated)
{
res = handle_block(res, updated, true);
if (res == -EAGAIN)
@ -839,6 +861,8 @@ void kv_op_t::create_root()
}
db->next_free += db->kv_block_size;
auto blk = &db->block_cache[0];
blk->usage = db->usage_counter;
blk->level = -db->base_block_level;
blk->type = KV_LEAF;
blk->offset = 0;
blk->data[key] = value;
@ -983,38 +1007,38 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
}
// Write references to halves into the root block
blk->type = KV_INT;
db->base_block_level++;
blk->level--;
blk->data.clear();
blk->data[""] = std::string((char*)&left_blk->offset, sizeof(left_blk->offset));
blk->data[separator] = std::string((char*)&right_blk->offset, sizeof(right_blk->offset));
blk->set_data_size();
write_block(db, blk, [=](int res)
write_block(db, blk, [=](int write_res)
{
db->stop_updating(blk);
if (res == -EINTR)
if (write_res < 0)
{
// CAS failure - zero garbage left_blk and right_blk and retry from the beginning
db->base_block_level--;
db->block_cache.erase(blk->offset);
clear_block(db, left_blk->offset, 0, [=](int res)
clear_block(db, left_blk->offset, 0, [=, left_offset = left_blk->offset](int res)
{
if (res < 0)
{
cb(res);
return;
}
clear_block(db, right_blk->offset, 0, [=](int res)
fprintf(stderr, "Failed to clear unreferenced block %lu: %s (code %d)\n", left_offset, strerror(-res), res);
clear_block(db, right_blk->offset, 0, [=, right_offset = right_blk->offset](int res)
{
if (res < 0)
{
cb(res);
return;
}
update();
fprintf(stderr, "Failed to clear unreferenced block %lu: %s (code %d)\n", right_offset, strerror(-res), res);
// CAS failure - zero garbage left_blk and right_blk and retry from the beginning
if (write_res == -EINTR)
update();
else
cb(write_res);
});
});
}
else
{
cb(res);
cb(0);
}
});
});
@ -1033,27 +1057,23 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key
blk->right_half_block = right_blk->offset;
blk->data.erase(blk->data.lower_bound(separator), blk->data.end());
blk->set_data_size();
write_block(db, blk, [=](int res)
write_block(db, blk, [=](int write_res)
{
db->stop_updating(blk);
if (res == -EINTR)
if (write_res < 0)
{
// CAS failure - zero garbage right_blk and retry from the beginning
db->block_cache.erase(blk->offset);
clear_block(db, right_blk->offset, 0, [=](int res)
clear_block(db, right_blk->offset, 0, [=, right_offset = right_blk->offset](int res)
{
if (res < 0)
{
cb(res);
return;
}
update();
fprintf(stderr, "Failed to clear unreferenced block %lu: %s (code %d)\n", right_offset, strerror(-res), res);
// CAS failure - zero garbage right_blk and retry from the beginning
if (write_res == -EINTR)
update();
else
cb(write_res);
});
}
else if (res < 0)
{
cb(res);
}
else
{
// Add a reference to the parent block
@ -1077,7 +1097,7 @@ void kv_op_t::next()
path.clear();
path.push_back(cur_block);
}
get_block(db, cur_block, recheck_policy, [=](int res, bool updated)
get_block(db, cur_block, cur_level, recheck_policy, [=](int res, bool updated)
{
next_handle_block(res, updated);
});
@ -1154,6 +1174,7 @@ void kv_op_t::next_go_up()
key = blk->key_lt;
skip_equal = false;
path.pop_back();
cur_level--;
cur_block = path[path.size()-1];
recheck_policy = KV_RECHECK_LEAF;
// Check if we can resume listing from the next key
@ -1162,6 +1183,7 @@ void kv_op_t::next_go_up()
{
// Block is absent in cache, recheck from the beginning
prev_key_ge = prev_key_lt = "";
cur_level = -db->base_block_level;
cur_block = 0;
next();
return;