Compare commits
1 Commits
test1
...
kv-readahe
Author | SHA1 | Date | |
---|---|---|---|
2089a9ef6c |
@@ -96,7 +96,7 @@ struct kv_block_t
|
|||||||
|
|
||||||
void set_data_size();
|
void set_data_size();
|
||||||
static int kv_size(const std::string & key, const std::string & value);
|
static int kv_size(const std::string & key, const std::string & value);
|
||||||
int parse(uint64_t offset, uint8_t *data, int size);
|
int parse(uint64_t offset, uint8_t *data, int size, bool allow_empty = false);
|
||||||
bool serialize(uint8_t *data, int size);
|
bool serialize(uint8_t *data, int size);
|
||||||
void apply_change();
|
void apply_change();
|
||||||
void cancel_change();
|
void cancel_change();
|
||||||
@@ -243,13 +243,13 @@ static std::string read_string(uint8_t *data, int size, int *pos)
|
|||||||
return key;
|
return key;
|
||||||
}
|
}
|
||||||
|
|
||||||
int kv_block_t::parse(uint64_t offset, uint8_t *data, int size)
|
int kv_block_t::parse(uint64_t offset, uint8_t *data, int size, bool allow_empty)
|
||||||
{
|
{
|
||||||
kv_stored_block_t *blk = (kv_stored_block_t *)data;
|
kv_stored_block_t *blk = (kv_stored_block_t *)data;
|
||||||
if (blk->magic == 0 || blk->type == KV_EMPTY)
|
if (blk->magic == 0 || blk->type == KV_EMPTY)
|
||||||
{
|
{
|
||||||
// empty block
|
// empty block
|
||||||
if (offset != 0)
|
if (offset != 0 && !allow_empty)
|
||||||
fprintf(stderr, "K/V: Block %ju is %s\n", offset, blk->magic == 0 ? "empty" : "cleared");
|
fprintf(stderr, "K/V: Block %ju is %s\n", offset, blk->magic == 0 ? "empty" : "cleared");
|
||||||
return -ENOTBLK;
|
return -ENOTBLK;
|
||||||
}
|
}
|
||||||
@@ -884,7 +884,10 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
op->len = db->kv_block_size;
|
op->offset = (offset / db->ino_block_size) * db->ino_block_size;
|
||||||
|
op->len = db->ino_block_size;
|
||||||
|
//op->offset = offset;
|
||||||
|
//op->len = db->kv_block_size;
|
||||||
op->iov.push_back(malloc_or_die(op->len), op->len);
|
op->iov.push_back(malloc_or_die(op->len), op->len);
|
||||||
}
|
}
|
||||||
op->callback = [=](cluster_op_t *op)
|
op->callback = [=](cluster_op_t *op)
|
||||||
@@ -898,59 +901,82 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p
|
|||||||
delete op;
|
delete op;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
int blk_err = 0;
|
||||||
|
int blk_status = 0;
|
||||||
invalidate(db, op->offset, op->version);
|
invalidate(db, op->offset, op->version);
|
||||||
auto blk_it = db->block_cache.find(op->offset);
|
for (uint64_t in_offset = op->offset; in_offset < (op->offset+op->len) || in_offset == offset; in_offset += db->kv_block_size)
|
||||||
|
{
|
||||||
|
auto blk_it = db->block_cache.find(in_offset);
|
||||||
if (blk_it != db->block_cache.end() &&
|
if (blk_it != db->block_cache.end() &&
|
||||||
// read may start BEFORE update and end AFTER update, in this case known will be > returned version
|
// read may start BEFORE update and end AFTER update, in this case known will be > returned version
|
||||||
(db->known_versions[op->offset/db->ino_block_size] >= op->version && !blk_it->second.invalidated || blk_it->second.updating > 0))
|
(db->known_versions[in_offset/db->ino_block_size] >= op->version && !blk_it->second.invalidated || blk_it->second.updating > 0))
|
||||||
{
|
{
|
||||||
auto blk = &db->block_cache.at(op->offset);
|
auto blk = &db->block_cache.at(in_offset);
|
||||||
if (blk->updating > 0 && recheck_policy == KV_RECHECK_WAIT)
|
if (blk->updating > 0 && recheck_policy == KV_RECHECK_WAIT)
|
||||||
{
|
{
|
||||||
// Wait until block update stops
|
// Wait until block update stops
|
||||||
if (op->len)
|
if (op->len)
|
||||||
free(op->iov.buf[0].iov_base);
|
free(op->iov.buf[0].iov_base);
|
||||||
delete op;
|
delete op;
|
||||||
db->continue_update.emplace(blk->offset, [=, blk_offset = blk->offset]()
|
db->continue_update.emplace(in_offset, [=, blk_offset = blk->offset]()
|
||||||
{
|
{
|
||||||
get_block(db, offset, cur_level, recheck_policy, cb);
|
get_block(db, offset, cur_level, recheck_policy, cb);
|
||||||
db->run_continue_update(blk_offset);
|
db->run_continue_update(blk_offset);
|
||||||
});
|
});
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
blk->usage = db->usage_counter;
|
|
||||||
cb(0, blk->updating > 0 ? BLK_UPDATING : BLK_NOCHANGE);
|
|
||||||
}
|
}
|
||||||
else
|
else if (!op->len)
|
||||||
{
|
|
||||||
if (!op->len)
|
|
||||||
{
|
{
|
||||||
// Version check failed, re-read block
|
// Version check failed, re-read block
|
||||||
delete op;
|
delete op;
|
||||||
get_block(db, offset, cur_level, recheck_policy, cb);
|
get_block(db, offset, cur_level, recheck_policy, cb);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
auto blk = &db->block_cache[op->offset];
|
}
|
||||||
|
for (uint64_t in_offset = op->offset; in_offset < (op->offset+op->len) || in_offset == offset; in_offset += db->kv_block_size)
|
||||||
|
{
|
||||||
|
auto blk_it = db->block_cache.find(in_offset);
|
||||||
|
if (blk_it != db->block_cache.end() &&
|
||||||
|
// read may start BEFORE update and end AFTER update, in this case known will be > returned version
|
||||||
|
(db->known_versions[in_offset/db->ino_block_size] >= op->version && !blk_it->second.invalidated || blk_it->second.updating > 0))
|
||||||
|
{
|
||||||
|
auto blk = &db->block_cache.at(in_offset);
|
||||||
|
blk->usage = db->usage_counter;
|
||||||
|
if (in_offset == offset)
|
||||||
|
{
|
||||||
|
blk_err = 0;
|
||||||
|
blk_status = (blk->updating > 0 ? BLK_UPDATING : BLK_NOCHANGE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
auto blk = &db->block_cache[in_offset];
|
||||||
if (blk_it != db->block_cache.end())
|
if (blk_it != db->block_cache.end())
|
||||||
{
|
{
|
||||||
del_block_level(db, blk);
|
del_block_level(db, blk);
|
||||||
*blk = {};
|
*blk = {};
|
||||||
}
|
}
|
||||||
int err = blk->parse(op->offset, (uint8_t*)op->iov.buf[0].iov_base, op->len);
|
int err = blk->parse(in_offset, (uint8_t*)op->iov.buf[0].iov_base + (in_offset - op->offset), db->kv_block_size, in_offset != offset);
|
||||||
|
if (in_offset == offset)
|
||||||
|
{
|
||||||
|
blk_err = err;
|
||||||
|
blk_status = err == 0 ? BLK_RELOADED : BLK_NOCHANGE;
|
||||||
|
}
|
||||||
if (err == 0)
|
if (err == 0)
|
||||||
{
|
{
|
||||||
blk->level = cur_level;
|
blk->level = cur_level;
|
||||||
blk->usage = db->usage_counter;
|
blk->usage = db->usage_counter;
|
||||||
add_block_level(db, blk);
|
add_block_level(db, blk);
|
||||||
cb(0, BLK_RELOADED);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
db->block_cache.erase(op->offset);
|
db->block_cache.erase(in_offset);
|
||||||
cb(err, BLK_NOCHANGE);
|
|
||||||
}
|
}
|
||||||
try_evict(db);
|
try_evict(db);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
cb(blk_err, blk_status);
|
||||||
if (op->len)
|
if (op->len)
|
||||||
free(op->iov.buf[0].iov_base);
|
free(op->iov.buf[0].iov_base);
|
||||||
delete op;
|
delete op;
|
||||||
@@ -1860,7 +1886,6 @@ void kv_op_t::next_handle_block(int res, int refresh)
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
// OK, leaf block found
|
// OK, leaf block found
|
||||||
recheck_policy = KV_RECHECK_NONE;
|
|
||||||
next_get();
|
next_get();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1887,7 +1912,6 @@ void kv_op_t::next_get()
|
|||||||
else if (blk->type == KV_LEAF_SPLIT)
|
else if (blk->type == KV_LEAF_SPLIT)
|
||||||
{
|
{
|
||||||
// Left half finished, go to the right
|
// Left half finished, go to the right
|
||||||
recheck_policy = KV_RECHECK_LEAF;
|
|
||||||
key = blk->right_half;
|
key = blk->right_half;
|
||||||
skip_equal = false;
|
skip_equal = false;
|
||||||
prev_key_ge = blk->right_half;
|
prev_key_ge = blk->right_half;
|
||||||
@@ -1921,7 +1945,6 @@ void kv_op_t::next_go_up()
|
|||||||
path.pop_back();
|
path.pop_back();
|
||||||
cur_level--;
|
cur_level--;
|
||||||
cur_block = path[path.size()-1].offset;
|
cur_block = path[path.size()-1].offset;
|
||||||
recheck_policy = KV_RECHECK_LEAF;
|
|
||||||
// Check if we can resume listing from the next key
|
// Check if we can resume listing from the next key
|
||||||
auto pb_it = db->block_cache.find(cur_block);
|
auto pb_it = db->block_cache.find(cur_block);
|
||||||
if (pb_it == db->block_cache.end())
|
if (pb_it == db->block_cache.end())
|
||||||
|
Reference in New Issue
Block a user