Change new block allocation method: make each writer choose multiple empty PG blocks and place blocks in them
parent
28c4324c36
commit
e98a38810d
|
@ -252,11 +252,12 @@ void kv_cli_t::handle_cmd(const std::string & cmd, std::function<void()> cb)
|
|||
auto key = trim(cmd.substr(pos+1, pos2-pos-1));
|
||||
auto value = parse_size(trim(cmd.substr(pos2+1)));
|
||||
if (key != "kv_memory_limit" &&
|
||||
key != "kv_allocate_blocks" &&
|
||||
key != "kv_evict_max_misses" &&
|
||||
key != "kv_evict_attempts_per_level" &&
|
||||
key != "kv_evict_unused_age")
|
||||
{
|
||||
fprintf(stderr, "Allowed properties: kv_memory_limit, kv_evict_max_misses, kv_evict_attempts_per_level, kv_evict_unused_age\n");
|
||||
fprintf(stderr, "Allowed properties: kv_memory_limit, kv_allocate_blocks, kv_evict_max_misses, kv_evict_attempts_per_level, kv_evict_unused_age\n");
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -121,6 +121,7 @@ struct kv_db_t
|
|||
{
|
||||
cluster_client_t *cli = NULL;
|
||||
|
||||
// config. maybe should be moved to a separate structure
|
||||
inode_t inode_id = 0;
|
||||
uint64_t next_free = 0;
|
||||
uint32_t kv_block_size = 0;
|
||||
|
@ -130,11 +131,15 @@ struct kv_db_t
|
|||
uint64_t evict_unused_age = 1000;
|
||||
uint64_t evict_max_misses = 10;
|
||||
uint64_t evict_attempts_per_level = 3;
|
||||
uint64_t allocate_blocks = 4;
|
||||
|
||||
// state
|
||||
uint64_t evict_unused_counter = 0;
|
||||
uint64_t cache_max_blocks = 0;
|
||||
int base_block_level = 0;
|
||||
int usage_counter = 1;
|
||||
int allocating_block_pos = 0;
|
||||
std::vector<uint64_t> allocating_blocks;
|
||||
std::set<uint64_t> block_levels;
|
||||
std::map<uint64_t, kv_block_t> block_cache;
|
||||
std::map<uint64_t, uint64_t> known_versions;
|
||||
|
@ -146,7 +151,8 @@ struct kv_db_t
|
|||
int active_ops = 0;
|
||||
std::function<void()> on_close;
|
||||
|
||||
uint64_t transform_offset(uint64_t orig);
|
||||
uint64_t alloc_block();
|
||||
void clear_allocation_block(uint64_t offset);
|
||||
|
||||
void open(inode_t inode_id, json11::Json cfg, std::function<void(int)> cb);
|
||||
void set_config(json11::Json cfg);
|
||||
|
@ -475,12 +481,6 @@ void kv_block_t::dump(int base_level)
|
|||
printf(" }\n}\n");
|
||||
}
|
||||
|
||||
uint64_t kv_db_t::transform_offset(uint64_t orig)
|
||||
{
|
||||
orig /= kv_block_size;
|
||||
return (orig % 32) * ino_block_size + ((orig / 32) % 32) * kv_block_size + (orig / 1024) * ino_block_size * 1024;
|
||||
}
|
||||
|
||||
void kv_db_t::open(inode_t inode_id, json11::Json cfg, std::function<void(int)> cb)
|
||||
{
|
||||
if (block_cache.size() > 0)
|
||||
|
@ -531,6 +531,7 @@ void kv_db_t::set_config(json11::Json cfg)
|
|||
this->evict_attempts_per_level = cfg["kv_evict_attempts_per_level"].is_null() ? 3 : cfg["kv_evict_attempts_per_level"].uint64_value();
|
||||
this->evict_unused_age = cfg["kv_evict_unused_age"].is_null() ? 1000 : cfg["kv_evict_unused_age"].uint64_value();
|
||||
this->cache_max_blocks = this->memory_limit / this->kv_block_size;
|
||||
this->allocate_blocks = cfg["kv_allocate_blocks"].uint64_value() ? cfg["kv_allocate_blocks"].uint64_value() : 4;
|
||||
}
|
||||
|
||||
void kv_db_t::close(std::function<void()> cb)
|
||||
|
@ -555,6 +556,54 @@ void kv_db_t::close(std::function<void()> cb)
|
|||
}
|
||||
}
|
||||
|
||||
uint64_t kv_db_t::alloc_block()
|
||||
{
|
||||
// round robin between <allocate_blocks> blocks
|
||||
while (allocating_blocks.size() < allocate_blocks)
|
||||
{
|
||||
allocating_blocks.push_back(UINT64_MAX);
|
||||
}
|
||||
if (allocating_blocks[allocating_block_pos] == UINT64_MAX)
|
||||
{
|
||||
// try v0 and only v0 of a new inode block
|
||||
while (known_versions[next_free/ino_block_size] != 0)
|
||||
{
|
||||
next_free += ino_block_size;
|
||||
}
|
||||
allocating_blocks[allocating_block_pos] = next_free;
|
||||
next_free += ino_block_size;
|
||||
}
|
||||
auto pos = allocating_blocks[allocating_block_pos];
|
||||
allocating_blocks[allocating_block_pos] += kv_block_size;
|
||||
if (!(allocating_blocks[allocating_block_pos] % ino_block_size))
|
||||
{
|
||||
// Allow online reconfiguration
|
||||
if (allocating_blocks.size() > allocate_blocks)
|
||||
allocating_blocks.erase(allocating_blocks.begin()+allocating_block_pos, allocating_blocks.begin()+allocating_block_pos+1);
|
||||
else
|
||||
allocating_blocks[allocating_block_pos] = UINT64_MAX;
|
||||
}
|
||||
allocating_block_pos = (allocating_block_pos+1) % allocating_blocks.size();
|
||||
return pos;
|
||||
}
|
||||
|
||||
void kv_db_t::clear_allocation_block(uint64_t offset)
|
||||
{
|
||||
// We want to be first when writing the first KV block into a new inode block
|
||||
// After it other writers may modify already placed blocks and the version
|
||||
// will increase, but they won't use free space in it for new blocks
|
||||
if (known_versions[offset/ino_block_size] == 0)
|
||||
{
|
||||
for (int i = 0; i < allocating_blocks.size(); i++)
|
||||
{
|
||||
if (allocating_blocks[i]/ino_block_size == offset/ino_block_size)
|
||||
{
|
||||
allocating_blocks[i] = UINT64_MAX;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static bool is_zero(void *buf, int size)
|
||||
{
|
||||
assert(!(size % 8));
|
||||
|
@ -566,13 +615,14 @@ static bool is_zero(void *buf, int size)
|
|||
return true;
|
||||
}
|
||||
|
||||
// Phase 1: try 2^i-1 for i=0,1,2,...
|
||||
// Phase 2: binary search between 2^(N-1)-1 and 2^N-1
|
||||
// Find approximate index size
|
||||
// Phase 1: try 2^i-1 for i=0,1,2,... * ino_block_size
|
||||
// Phase 2: binary search between 2^(N-1)-1 and 2^N-1 * ino_block_size
|
||||
void kv_db_t::find_size(uint64_t min, uint64_t max, int phase, std::function<void(int, uint64_t)> cb)
|
||||
{
|
||||
if (min == max-1)
|
||||
{
|
||||
cb(0, max*kv_block_size);
|
||||
cb(0, max*ino_block_size);
|
||||
return;
|
||||
}
|
||||
if (phase == 1 && min >= KV_INDEX_MAX_SIZE)
|
||||
|
@ -583,7 +633,7 @@ void kv_db_t::find_size(uint64_t min, uint64_t max, int phase, std::function<voi
|
|||
cluster_op_t *op = new cluster_op_t;
|
||||
op->opcode = OSD_OP_READ;
|
||||
op->inode = inode_id;
|
||||
op->offset = transform_offset((phase == 1 ? min : (min+max)/2) * kv_block_size);
|
||||
op->offset = (phase == 1 ? min : (min+max)/2) * ino_block_size;
|
||||
op->len = kv_block_size;
|
||||
if (op->len)
|
||||
{
|
||||
|
@ -598,7 +648,7 @@ void kv_db_t::find_size(uint64_t min, uint64_t max, int phase, std::function<voi
|
|||
return;
|
||||
}
|
||||
known_versions[op->offset / ino_block_size] = op->version;
|
||||
if (!is_zero(op->iov.buf[0].iov_base, kv_block_size))
|
||||
if (op->version != 0)
|
||||
{
|
||||
if (phase == 1)
|
||||
find_size((min+1)*2 - 1, 0, 1, cb);
|
||||
|
@ -651,6 +701,7 @@ static void invalidate(kv_db_t *db, uint64_t offset, uint64_t version)
|
|||
{
|
||||
if (db->known_versions[offset/db->ino_block_size] < version)
|
||||
{
|
||||
db->clear_allocation_block(offset);
|
||||
auto b_it = db->block_cache.lower_bound(offset/db->ino_block_size * db->ino_block_size);
|
||||
while (b_it != db->block_cache.end() && b_it->first/db->ino_block_size == offset/db->ino_block_size)
|
||||
{
|
||||
|
@ -1089,8 +1140,7 @@ static void write_block(kv_db_t *db, kv_block_t *blk, std::function<void(int)> c
|
|||
static kv_block_t *create_new_block(kv_db_t *db, kv_block_t *old_blk, const std::string & separator,
|
||||
const std::string & added_key, const std::string & added_value, bool right)
|
||||
{
|
||||
auto new_offset = db->transform_offset(db->next_free);
|
||||
db->next_free += db->kv_block_size;
|
||||
auto new_offset = db->alloc_block();
|
||||
auto blk = &db->block_cache[new_offset];
|
||||
blk->usage = db->usage_counter;
|
||||
blk->level = old_blk->level;
|
||||
|
@ -1115,6 +1165,7 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int
|
|||
if (res == -EINTR)
|
||||
{
|
||||
// CAS failure => re-read, then, if not zero, find position again and retry
|
||||
db->clear_allocation_block(blk->offset);
|
||||
cluster_op_t *op = new cluster_op_t;
|
||||
op->opcode = OSD_OP_READ;
|
||||
op->inode = db->inode_id;
|
||||
|
@ -1125,7 +1176,7 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int
|
|||
{
|
||||
if (op->retval != op->len)
|
||||
{
|
||||
// Failure => free the new unreferenced block and die
|
||||
// Read error => free the new unreferenced block and die
|
||||
del_block_level(db, blk);
|
||||
db->block_cache.erase(blk->offset);
|
||||
cb(op->retval >= 0 ? -EIO : op->retval, NULL);
|
||||
|
@ -1141,8 +1192,7 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int
|
|||
{
|
||||
// Block is already occupied => place again
|
||||
auto old_offset = blk->offset;
|
||||
auto new_offset = db->transform_offset(db->next_free);
|
||||
db->next_free += db->kv_block_size;
|
||||
auto new_offset = db->alloc_block();
|
||||
del_block_level(db, blk);
|
||||
std::swap(db->block_cache[new_offset], db->block_cache[old_offset]);
|
||||
db->block_cache.erase(old_offset);
|
||||
|
@ -1151,8 +1201,6 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function<void(int
|
|||
add_block_level(db, new_blk);
|
||||
write_new_block(db, new_blk, cb);
|
||||
}
|
||||
free(op->iov.buf[0].iov_base);
|
||||
delete op;
|
||||
};
|
||||
db->cli->execute(op);
|
||||
}
|
||||
|
@ -1283,12 +1331,13 @@ void kv_op_t::create_root()
|
|||
finish(-EILSEQ);
|
||||
return;
|
||||
}
|
||||
db->next_free += db->kv_block_size;
|
||||
auto new_offset = db->alloc_block();
|
||||
assert(new_offset == 0);
|
||||
auto blk = &db->block_cache[0];
|
||||
blk->usage = db->usage_counter;
|
||||
blk->level = -db->base_block_level;
|
||||
blk->type = KV_LEAF;
|
||||
blk->offset = 0;
|
||||
blk->offset = new_offset;
|
||||
blk->data[key] = value;
|
||||
blk->set_data_size();
|
||||
add_block_level(db, blk);
|
||||
|
|
|
@ -152,6 +152,8 @@ json11::Json::object kv_test_t::parse_args(int narg, const char *args[])
|
|||
" Stop on first execution error, mismatch, lost key or extra key during listing\n"
|
||||
" --kv_memory_limit 128M\n"
|
||||
" Maximum memory to use for vitastor-kv index cache\n"
|
||||
" --kv_allocate_blocks 4\n"
|
||||
" Number of PG blocks used for new tree block allocation in parallel\n"
|
||||
" --kv_evict_max_misses 10\n"
|
||||
" Eviction algorithm parameter: retry eviction from another random spot\n"
|
||||
" if this number of keys is used currently or was used recently\n"
|
||||
|
@ -210,6 +212,8 @@ void kv_test_t::parse_config(json11::Json cfg)
|
|||
stop_on_error = cfg["stop_on_error"].bool_value();
|
||||
if (!cfg["kv_memory_limit"].is_null())
|
||||
kv_cfg["kv_memory_limit"] = cfg["kv_memory_limit"];
|
||||
if (!cfg["kv_allocate_blocks"].is_null())
|
||||
kv_cfg["kv_allocate_blocks"] = cfg["kv_allocate_blocks"];
|
||||
if (!cfg["kv_evict_max_misses"].is_null())
|
||||
kv_cfg["kv_evict_max_misses"] = cfg["kv_evict_max_misses"];
|
||||
if (!cfg["kv_evict_attempts_per_level"].is_null())
|
||||
|
|
Loading…
Reference in New Issue