diff --git a/src/kv_cli.cpp b/src/kv_cli.cpp index ec98ddf1..e71f38f9 100644 --- a/src/kv_cli.cpp +++ b/src/kv_cli.cpp @@ -252,11 +252,12 @@ void kv_cli_t::handle_cmd(const std::string & cmd, std::function cb) auto key = trim(cmd.substr(pos+1, pos2-pos-1)); auto value = parse_size(trim(cmd.substr(pos2+1))); if (key != "kv_memory_limit" && + key != "kv_allocate_blocks" && key != "kv_evict_max_misses" && key != "kv_evict_attempts_per_level" && key != "kv_evict_unused_age") { - fprintf(stderr, "Allowed properties: kv_memory_limit, kv_evict_max_misses, kv_evict_attempts_per_level, kv_evict_unused_age\n"); + fprintf(stderr, "Allowed properties: kv_memory_limit, kv_allocate_blocks, kv_evict_max_misses, kv_evict_attempts_per_level, kv_evict_unused_age\n"); } else { diff --git a/src/kv_db.cpp b/src/kv_db.cpp index f2d11654..38f76373 100644 --- a/src/kv_db.cpp +++ b/src/kv_db.cpp @@ -121,6 +121,7 @@ struct kv_db_t { cluster_client_t *cli = NULL; + // config. maybe should be moved to a separate structure inode_t inode_id = 0; uint64_t next_free = 0; uint32_t kv_block_size = 0; @@ -130,11 +131,15 @@ struct kv_db_t uint64_t evict_unused_age = 1000; uint64_t evict_max_misses = 10; uint64_t evict_attempts_per_level = 3; + uint64_t allocate_blocks = 4; + // state uint64_t evict_unused_counter = 0; uint64_t cache_max_blocks = 0; int base_block_level = 0; int usage_counter = 1; + int allocating_block_pos = 0; + std::vector allocating_blocks; std::set block_levels; std::map block_cache; std::map known_versions; @@ -146,7 +151,8 @@ struct kv_db_t int active_ops = 0; std::function on_close; - uint64_t transform_offset(uint64_t orig); + uint64_t alloc_block(); + void clear_allocation_block(uint64_t offset); void open(inode_t inode_id, json11::Json cfg, std::function cb); void set_config(json11::Json cfg); @@ -475,12 +481,6 @@ void kv_block_t::dump(int base_level) printf(" }\n}\n"); } -uint64_t kv_db_t::transform_offset(uint64_t orig) -{ - orig /= kv_block_size; - return (orig % 32) * ino_block_size + ((orig / 32) % 32) * kv_block_size + (orig / 1024) * ino_block_size * 1024; -} - void kv_db_t::open(inode_t inode_id, json11::Json cfg, std::function cb) { if (block_cache.size() > 0) @@ -531,6 +531,7 @@ void kv_db_t::set_config(json11::Json cfg) this->evict_attempts_per_level = cfg["kv_evict_attempts_per_level"].is_null() ? 3 : cfg["kv_evict_attempts_per_level"].uint64_value(); this->evict_unused_age = cfg["kv_evict_unused_age"].is_null() ? 1000 : cfg["kv_evict_unused_age"].uint64_value(); this->cache_max_blocks = this->memory_limit / this->kv_block_size; + this->allocate_blocks = cfg["kv_allocate_blocks"].uint64_value() ? cfg["kv_allocate_blocks"].uint64_value() : 4; } void kv_db_t::close(std::function cb) @@ -555,6 +556,54 @@ void kv_db_t::close(std::function cb) } } +uint64_t kv_db_t::alloc_block() +{ + // round robin between blocks + while (allocating_blocks.size() < allocate_blocks) + { + allocating_blocks.push_back(UINT64_MAX); + } + if (allocating_blocks[allocating_block_pos] == UINT64_MAX) + { + // try v0 and only v0 of a new inode block + while (known_versions[next_free/ino_block_size] != 0) + { + next_free += ino_block_size; + } + allocating_blocks[allocating_block_pos] = next_free; + next_free += ino_block_size; + } + auto pos = allocating_blocks[allocating_block_pos]; + allocating_blocks[allocating_block_pos] += kv_block_size; + if (!(allocating_blocks[allocating_block_pos] % ino_block_size)) + { + // Allow online reconfiguration + if (allocating_blocks.size() > allocate_blocks) + allocating_blocks.erase(allocating_blocks.begin()+allocating_block_pos, allocating_blocks.begin()+allocating_block_pos+1); + else + allocating_blocks[allocating_block_pos] = UINT64_MAX; + } + allocating_block_pos = (allocating_block_pos+1) % allocating_blocks.size(); + return pos; +} + +void kv_db_t::clear_allocation_block(uint64_t offset) +{ + // We want to be first when writing the first KV block into a new inode block + // After it other writers may modify already placed blocks and the version + // will increase, but they won't use free space in it for new blocks + if (known_versions[offset/ino_block_size] == 0) + { + for (int i = 0; i < allocating_blocks.size(); i++) + { + if (allocating_blocks[i]/ino_block_size == offset/ino_block_size) + { + allocating_blocks[i] = UINT64_MAX; + } + } + } +} + static bool is_zero(void *buf, int size) { assert(!(size % 8)); @@ -566,13 +615,14 @@ static bool is_zero(void *buf, int size) return true; } -// Phase 1: try 2^i-1 for i=0,1,2,... -// Phase 2: binary search between 2^(N-1)-1 and 2^N-1 +// Find approximate index size +// Phase 1: try 2^i-1 for i=0,1,2,... * ino_block_size +// Phase 2: binary search between 2^(N-1)-1 and 2^N-1 * ino_block_size void kv_db_t::find_size(uint64_t min, uint64_t max, int phase, std::function cb) { if (min == max-1) { - cb(0, max*kv_block_size); + cb(0, max*ino_block_size); return; } if (phase == 1 && min >= KV_INDEX_MAX_SIZE) @@ -583,7 +633,7 @@ void kv_db_t::find_size(uint64_t min, uint64_t max, int phase, std::functionopcode = OSD_OP_READ; op->inode = inode_id; - op->offset = transform_offset((phase == 1 ? min : (min+max)/2) * kv_block_size); + op->offset = (phase == 1 ? min : (min+max)/2) * ino_block_size; op->len = kv_block_size; if (op->len) { @@ -598,7 +648,7 @@ void kv_db_t::find_size(uint64_t min, uint64_t max, int phase, std::functionoffset / ino_block_size] = op->version; - if (!is_zero(op->iov.buf[0].iov_base, kv_block_size)) + if (op->version != 0) { if (phase == 1) find_size((min+1)*2 - 1, 0, 1, cb); @@ -651,6 +701,7 @@ static void invalidate(kv_db_t *db, uint64_t offset, uint64_t version) { if (db->known_versions[offset/db->ino_block_size] < version) { + db->clear_allocation_block(offset); auto b_it = db->block_cache.lower_bound(offset/db->ino_block_size * db->ino_block_size); while (b_it != db->block_cache.end() && b_it->first/db->ino_block_size == offset/db->ino_block_size) { @@ -1089,8 +1140,7 @@ static void write_block(kv_db_t *db, kv_block_t *blk, std::function c static kv_block_t *create_new_block(kv_db_t *db, kv_block_t *old_blk, const std::string & separator, const std::string & added_key, const std::string & added_value, bool right) { - auto new_offset = db->transform_offset(db->next_free); - db->next_free += db->kv_block_size; + auto new_offset = db->alloc_block(); auto blk = &db->block_cache[new_offset]; blk->usage = db->usage_counter; blk->level = old_blk->level; @@ -1115,6 +1165,7 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function re-read, then, if not zero, find position again and retry + db->clear_allocation_block(blk->offset); cluster_op_t *op = new cluster_op_t; op->opcode = OSD_OP_READ; op->inode = db->inode_id; @@ -1125,7 +1176,7 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::functionretval != op->len) { - // Failure => free the new unreferenced block and die + // Read error => free the new unreferenced block and die del_block_level(db, blk); db->block_cache.erase(blk->offset); cb(op->retval >= 0 ? -EIO : op->retval, NULL); @@ -1141,8 +1192,7 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function place again auto old_offset = blk->offset; - auto new_offset = db->transform_offset(db->next_free); - db->next_free += db->kv_block_size; + auto new_offset = db->alloc_block(); del_block_level(db, blk); std::swap(db->block_cache[new_offset], db->block_cache[old_offset]); db->block_cache.erase(old_offset); @@ -1151,8 +1201,6 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::functioniov.buf[0].iov_base); - delete op; }; db->cli->execute(op); } @@ -1283,12 +1331,13 @@ void kv_op_t::create_root() finish(-EILSEQ); return; } - db->next_free += db->kv_block_size; + auto new_offset = db->alloc_block(); + assert(new_offset == 0); auto blk = &db->block_cache[0]; blk->usage = db->usage_counter; blk->level = -db->base_block_level; blk->type = KV_LEAF; - blk->offset = 0; + blk->offset = new_offset; blk->data[key] = value; blk->set_data_size(); add_block_level(db, blk); diff --git a/src/kv_stress.cpp b/src/kv_stress.cpp index d73c86af..7c6aa788 100644 --- a/src/kv_stress.cpp +++ b/src/kv_stress.cpp @@ -152,6 +152,8 @@ json11::Json::object kv_test_t::parse_args(int narg, const char *args[]) " Stop on first execution error, mismatch, lost key or extra key during listing\n" " --kv_memory_limit 128M\n" " Maximum memory to use for vitastor-kv index cache\n" + " --kv_allocate_blocks 4\n" + " Number of PG blocks used for new tree block allocation in parallel\n" " --kv_evict_max_misses 10\n" " Eviction algorithm parameter: retry eviction from another random spot\n" " if this number of keys is used currently or was used recently\n" @@ -210,6 +212,8 @@ void kv_test_t::parse_config(json11::Json cfg) stop_on_error = cfg["stop_on_error"].bool_value(); if (!cfg["kv_memory_limit"].is_null()) kv_cfg["kv_memory_limit"] = cfg["kv_memory_limit"]; + if (!cfg["kv_allocate_blocks"].is_null()) + kv_cfg["kv_allocate_blocks"] = cfg["kv_allocate_blocks"]; if (!cfg["kv_evict_max_misses"].is_null()) kv_cfg["kv_evict_max_misses"] = cfg["kv_evict_max_misses"]; if (!cfg["kv_evict_attempts_per_level"].is_null())