Compare commits
1 Commits
test1
...
kv-readahe
Author | SHA1 | Date | |
---|---|---|---|
2089a9ef6c |
107
src/kv/kv_db.cpp
107
src/kv/kv_db.cpp
@@ -96,7 +96,7 @@ struct kv_block_t
|
||||
|
||||
void set_data_size();
|
||||
static int kv_size(const std::string & key, const std::string & value);
|
||||
int parse(uint64_t offset, uint8_t *data, int size);
|
||||
int parse(uint64_t offset, uint8_t *data, int size, bool allow_empty = false);
|
||||
bool serialize(uint8_t *data, int size);
|
||||
void apply_change();
|
||||
void cancel_change();
|
||||
@@ -243,13 +243,13 @@ static std::string read_string(uint8_t *data, int size, int *pos)
|
||||
return key;
|
||||
}
|
||||
|
||||
int kv_block_t::parse(uint64_t offset, uint8_t *data, int size)
|
||||
int kv_block_t::parse(uint64_t offset, uint8_t *data, int size, bool allow_empty)
|
||||
{
|
||||
kv_stored_block_t *blk = (kv_stored_block_t *)data;
|
||||
if (blk->magic == 0 || blk->type == KV_EMPTY)
|
||||
{
|
||||
// empty block
|
||||
if (offset != 0)
|
||||
if (offset != 0 && !allow_empty)
|
||||
fprintf(stderr, "K/V: Block %ju is %s\n", offset, blk->magic == 0 ? "empty" : "cleared");
|
||||
return -ENOTBLK;
|
||||
}
|
||||
@@ -884,7 +884,10 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p
|
||||
}
|
||||
else
|
||||
{
|
||||
op->len = db->kv_block_size;
|
||||
op->offset = (offset / db->ino_block_size) * db->ino_block_size;
|
||||
op->len = db->ino_block_size;
|
||||
//op->offset = offset;
|
||||
//op->len = db->kv_block_size;
|
||||
op->iov.push_back(malloc_or_die(op->len), op->len);
|
||||
}
|
||||
op->callback = [=](cluster_op_t *op)
|
||||
@@ -898,59 +901,82 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p
|
||||
delete op;
|
||||
return;
|
||||
}
|
||||
int blk_err = 0;
|
||||
int blk_status = 0;
|
||||
invalidate(db, op->offset, op->version);
|
||||
auto blk_it = db->block_cache.find(op->offset);
|
||||
if (blk_it != db->block_cache.end() &&
|
||||
// read may start BEFORE update and end AFTER update, in this case known will be > returned version
|
||||
(db->known_versions[op->offset/db->ino_block_size] >= op->version && !blk_it->second.invalidated || blk_it->second.updating > 0))
|
||||
for (uint64_t in_offset = op->offset; in_offset < (op->offset+op->len) || in_offset == offset; in_offset += db->kv_block_size)
|
||||
{
|
||||
auto blk = &db->block_cache.at(op->offset);
|
||||
if (blk->updating > 0 && recheck_policy == KV_RECHECK_WAIT)
|
||||
auto blk_it = db->block_cache.find(in_offset);
|
||||
if (blk_it != db->block_cache.end() &&
|
||||
// read may start BEFORE update and end AFTER update, in this case known will be > returned version
|
||||
(db->known_versions[in_offset/db->ino_block_size] >= op->version && !blk_it->second.invalidated || blk_it->second.updating > 0))
|
||||
{
|
||||
// Wait until block update stops
|
||||
if (op->len)
|
||||
free(op->iov.buf[0].iov_base);
|
||||
delete op;
|
||||
db->continue_update.emplace(blk->offset, [=, blk_offset = blk->offset]()
|
||||
auto blk = &db->block_cache.at(in_offset);
|
||||
if (blk->updating > 0 && recheck_policy == KV_RECHECK_WAIT)
|
||||
{
|
||||
get_block(db, offset, cur_level, recheck_policy, cb);
|
||||
db->run_continue_update(blk_offset);
|
||||
});
|
||||
return;
|
||||
// Wait until block update stops
|
||||
if (op->len)
|
||||
free(op->iov.buf[0].iov_base);
|
||||
delete op;
|
||||
db->continue_update.emplace(in_offset, [=, blk_offset = blk->offset]()
|
||||
{
|
||||
get_block(db, offset, cur_level, recheck_policy, cb);
|
||||
db->run_continue_update(blk_offset);
|
||||
});
|
||||
return;
|
||||
}
|
||||
}
|
||||
blk->usage = db->usage_counter;
|
||||
cb(0, blk->updating > 0 ? BLK_UPDATING : BLK_NOCHANGE);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!op->len)
|
||||
else if (!op->len)
|
||||
{
|
||||
// Version check failed, re-read block
|
||||
delete op;
|
||||
get_block(db, offset, cur_level, recheck_policy, cb);
|
||||
return;
|
||||
}
|
||||
auto blk = &db->block_cache[op->offset];
|
||||
if (blk_it != db->block_cache.end())
|
||||
}
|
||||
for (uint64_t in_offset = op->offset; in_offset < (op->offset+op->len) || in_offset == offset; in_offset += db->kv_block_size)
|
||||
{
|
||||
auto blk_it = db->block_cache.find(in_offset);
|
||||
if (blk_it != db->block_cache.end() &&
|
||||
// read may start BEFORE update and end AFTER update, in this case known will be > returned version
|
||||
(db->known_versions[in_offset/db->ino_block_size] >= op->version && !blk_it->second.invalidated || blk_it->second.updating > 0))
|
||||
{
|
||||
del_block_level(db, blk);
|
||||
*blk = {};
|
||||
}
|
||||
int err = blk->parse(op->offset, (uint8_t*)op->iov.buf[0].iov_base, op->len);
|
||||
if (err == 0)
|
||||
{
|
||||
blk->level = cur_level;
|
||||
auto blk = &db->block_cache.at(in_offset);
|
||||
blk->usage = db->usage_counter;
|
||||
add_block_level(db, blk);
|
||||
cb(0, BLK_RELOADED);
|
||||
if (in_offset == offset)
|
||||
{
|
||||
blk_err = 0;
|
||||
blk_status = (blk->updating > 0 ? BLK_UPDATING : BLK_NOCHANGE);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
db->block_cache.erase(op->offset);
|
||||
cb(err, BLK_NOCHANGE);
|
||||
auto blk = &db->block_cache[in_offset];
|
||||
if (blk_it != db->block_cache.end())
|
||||
{
|
||||
del_block_level(db, blk);
|
||||
*blk = {};
|
||||
}
|
||||
int err = blk->parse(in_offset, (uint8_t*)op->iov.buf[0].iov_base + (in_offset - op->offset), db->kv_block_size, in_offset != offset);
|
||||
if (in_offset == offset)
|
||||
{
|
||||
blk_err = err;
|
||||
blk_status = err == 0 ? BLK_RELOADED : BLK_NOCHANGE;
|
||||
}
|
||||
if (err == 0)
|
||||
{
|
||||
blk->level = cur_level;
|
||||
blk->usage = db->usage_counter;
|
||||
add_block_level(db, blk);
|
||||
}
|
||||
else
|
||||
{
|
||||
db->block_cache.erase(in_offset);
|
||||
}
|
||||
try_evict(db);
|
||||
}
|
||||
try_evict(db);
|
||||
}
|
||||
cb(blk_err, blk_status);
|
||||
if (op->len)
|
||||
free(op->iov.buf[0].iov_base);
|
||||
delete op;
|
||||
@@ -1860,7 +1886,6 @@ void kv_op_t::next_handle_block(int res, int refresh)
|
||||
else
|
||||
{
|
||||
// OK, leaf block found
|
||||
recheck_policy = KV_RECHECK_NONE;
|
||||
next_get();
|
||||
}
|
||||
}
|
||||
@@ -1887,7 +1912,6 @@ void kv_op_t::next_get()
|
||||
else if (blk->type == KV_LEAF_SPLIT)
|
||||
{
|
||||
// Left half finished, go to the right
|
||||
recheck_policy = KV_RECHECK_LEAF;
|
||||
key = blk->right_half;
|
||||
skip_equal = false;
|
||||
prev_key_ge = blk->right_half;
|
||||
@@ -1921,7 +1945,6 @@ void kv_op_t::next_go_up()
|
||||
path.pop_back();
|
||||
cur_level--;
|
||||
cur_block = path[path.size()-1].offset;
|
||||
recheck_policy = KV_RECHECK_LEAF;
|
||||
// Check if we can resume listing from the next key
|
||||
auto pb_it = db->block_cache.find(cur_block);
|
||||
if (pb_it == db->block_cache.end())
|
||||
|
Reference in New Issue
Block a user