diff --git a/src/kv_db.cpp b/src/kv_db.cpp index b1ca88df..09717117 100644 --- a/src/kv_db.cpp +++ b/src/kv_db.cpp @@ -123,6 +123,13 @@ struct kv_continue_write_t std::function cb; }; +struct kv_alloc_block_t +{ + uint64_t offset; + bool writing; + bool confirmed; +}; + struct kv_db_t { cluster_client_t *cli = NULL; @@ -137,7 +144,7 @@ struct kv_db_t uint64_t evict_unused_age = 1000; uint64_t evict_max_misses = 10; uint64_t evict_attempts_per_level = 3; - uint64_t allocate_blocks = 4; + uint64_t max_allocate_blocks = 4; uint64_t log_level = 1; // state @@ -146,7 +153,7 @@ struct kv_db_t int base_block_level = 0; int usage_counter = 1; int allocating_block_pos = 0; - std::vector allocating_blocks; + std::vector allocating_blocks; std::set block_levels; std::map block_cache; std::map known_versions; @@ -160,6 +167,8 @@ struct kv_db_t uint64_t alloc_block(); void clear_allocation_block(uint64_t offset); + void confirm_allocation_block(uint64_t offset); + void stop_writing_new(uint64_t offset); void open(inode_t inode_id, json11::Json cfg, std::function cb); void set_config(json11::Json cfg); @@ -539,7 +548,7 @@ void kv_db_t::set_config(json11::Json cfg) this->evict_attempts_per_level = cfg["kv_evict_attempts_per_level"].is_null() ? 3 : cfg["kv_evict_attempts_per_level"].uint64_value(); this->evict_unused_age = cfg["kv_evict_unused_age"].is_null() ? 1000 : cfg["kv_evict_unused_age"].uint64_value(); this->cache_max_blocks = this->memory_limit / this->kv_block_size; - this->allocate_blocks = cfg["kv_allocate_blocks"].uint64_value() ? cfg["kv_allocate_blocks"].uint64_value() : 4; + this->max_allocate_blocks = cfg["kv_allocate_blocks"].uint64_value() ? cfg["kv_allocate_blocks"].uint64_value() : 4; this->log_level = !cfg["kv_log_level"].is_null() ? cfg["kv_log_level"].uint64_value() : 1; } @@ -567,49 +576,96 @@ void kv_db_t::close(std::function cb) uint64_t kv_db_t::alloc_block() { - // round robin between blocks - while (allocating_blocks.size() < allocate_blocks) + // select from at least allocation blocks + while (allocating_blocks.size() < max_allocate_blocks) { - allocating_blocks.push_back(UINT64_MAX); + allocating_blocks.push_back({ .offset = UINT64_MAX }); } - if (allocating_blocks[allocating_block_pos] == UINT64_MAX) + bool found = false; + for (int i = 0; i < allocating_blocks.size(); i++) { - // try v0 and only v0 of a new inode block - while (known_versions[next_free/ino_block_size] != 0) + int next = (allocating_block_pos+i) % allocating_blocks.size(); + if (allocating_blocks[allocating_block_pos].offset != UINT64_MAX && + allocating_blocks[next].confirmed && !allocating_blocks[next].writing) { - next_free += ino_block_size; + allocating_block_pos = next; + found = true; + break; } - allocating_blocks[allocating_block_pos] = next_free; + } + if (!found) + { + // Allow to allocate more new blocks in parallel than + allocating_blocks.push_back({ .offset = UINT64_MAX }); + allocating_block_pos = allocating_blocks.size()-1; + } + if (allocating_blocks[allocating_block_pos].offset == UINT64_MAX) + { + // allocate new blocks in the end + auto known_max = known_versions.end(); + while (known_max != known_versions.begin()) + { + known_max--; + // try v0 and only v0 of a new inode block + if (known_max->second != 0) + { + auto probably_unused = (known_max->first+1)*ino_block_size; + if (next_free < probably_unused) + next_free = probably_unused; + break; + } + } + allocating_blocks[allocating_block_pos] = { + .offset = next_free, + .writing = false, + .confirmed = false, + }; next_free += ino_block_size; } - auto pos = allocating_blocks[allocating_block_pos]; - allocating_blocks[allocating_block_pos] += kv_block_size; - if (!(allocating_blocks[allocating_block_pos] % ino_block_size)) + auto pos = allocating_blocks[allocating_block_pos].offset; + allocating_blocks[allocating_block_pos].writing = true; + allocating_blocks[allocating_block_pos].offset += kv_block_size; + if (!(allocating_blocks[allocating_block_pos].offset % ino_block_size)) { - // Allow online reconfiguration - if (allocating_blocks.size() > allocate_blocks) + // Allow to reconfigure online + if (allocating_blocks.size() > max_allocate_blocks) allocating_blocks.erase(allocating_blocks.begin()+allocating_block_pos, allocating_blocks.begin()+allocating_block_pos+1); else - allocating_blocks[allocating_block_pos] = UINT64_MAX; + allocating_blocks[allocating_block_pos].offset = UINT64_MAX; } - allocating_block_pos = (allocating_block_pos+1) % allocating_blocks.size(); assert(block_cache.find(pos) == block_cache.end()); return pos; } void kv_db_t::clear_allocation_block(uint64_t offset) { - // We want to be first when writing the first KV block into a new inode block - // After it other writers may modify already placed blocks and the version - // will increase, but they won't use free space in it for new blocks - if (known_versions[offset/ino_block_size] == 0) + for (int i = 0; i < allocating_blocks.size(); i++) { - for (int i = 0; i < allocating_blocks.size(); i++) + if (allocating_blocks[i].offset/ino_block_size == offset/ino_block_size) { - if (allocating_blocks[i]/ino_block_size == offset/ino_block_size) - { - allocating_blocks[i] = UINT64_MAX; - } + allocating_blocks[i].offset = UINT64_MAX; + } + } +} + +void kv_db_t::confirm_allocation_block(uint64_t offset) +{ + for (int i = 0; i < allocating_blocks.size(); i++) + { + if (allocating_blocks[i].offset/ino_block_size == offset/ino_block_size) + { + allocating_blocks[i].confirmed = true; + } + } +} + +void kv_db_t::stop_writing_new(uint64_t offset) +{ + for (int i = 0; i < allocating_blocks.size(); i++) + { + if (allocating_blocks[i].offset/ino_block_size == offset/ino_block_size) + { + allocating_blocks[i].writing = false; } } } @@ -711,7 +767,10 @@ static void invalidate(kv_db_t *db, uint64_t offset, uint64_t version) { if (db->known_versions[offset/db->ino_block_size] < version) { - db->clear_allocation_block(offset); + if (db->known_versions[offset/db->ino_block_size] == 0) + { + db->clear_allocation_block(offset); + } auto b_it = db->block_cache.lower_bound(offset/db->ino_block_size * db->ino_block_size); while (b_it != db->block_cache.end() && b_it->first/db->ino_block_size == offset/db->ino_block_size) { @@ -1230,14 +1289,41 @@ static kv_block_t *create_new_block(kv_db_t *db, kv_block_t *old_blk, const std: return blk; } +static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function cb); + +static void place_again(kv_db_t *db, kv_block_t *blk, std::function cb) +{ + auto old_offset = blk->offset; + auto new_offset = db->alloc_block(); + del_block_level(db, blk); + std::swap(db->block_cache[new_offset], db->block_cache[old_offset]); + db->block_cache.erase(old_offset); + auto new_blk = &db->block_cache[new_offset]; + new_blk->offset = new_offset; + new_blk->invalidated = false; + add_block_level(db, new_blk); + write_new_block(db, new_blk, cb); +} + static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function cb) { write_block(db, blk, [=](int res) { + db->stop_writing_new(blk->offset); if (res == -EINTR) { // CAS failure => re-read, then, if not zero, find position again and retry - db->clear_allocation_block(blk->offset); + if (!(blk->offset % db->ino_block_size)) + { + // Any failed write of the first block within an inode block + // means that someone else is already allocating blocks in it, + // so we HAVE to move to the next inode block immediately + db->clear_allocation_block(blk->offset); + place_again(db, blk, cb); + return; + } + // On the other hand, if the block is already "ours", then live parts + // of it may change and we MAY recheck if the block is still zero on CAS failure cluster_op_t *op = new cluster_op_t; op->opcode = OSD_OP_READ; op->inode = db->inode_id; @@ -1264,16 +1350,7 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function place again - auto old_offset = blk->offset; - auto new_offset = db->alloc_block(); - del_block_level(db, blk); - std::swap(db->block_cache[new_offset], db->block_cache[old_offset]); - db->block_cache.erase(old_offset); - auto new_blk = &db->block_cache[new_offset]; - new_blk->offset = new_offset; - new_blk->invalidated = false; - add_block_level(db, new_blk); - write_new_block(db, new_blk, cb); + place_again(db, blk, cb); } }; db->cli->execute(op); @@ -1281,6 +1358,7 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function free the new unreferenced block and die + db->clear_allocation_block(blk->offset); del_block_level(db, blk); db->block_cache.erase(blk->offset); cb(res > 0 ? -EIO : res, NULL); @@ -1288,6 +1366,12 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::functionoffset % db->ino_block_size)) + { + // A successful first write into a new allocation block + // confirms that it's now "locked" by us + db->confirm_allocation_block(blk->offset); + } cb(0, blk); } }); @@ -1405,6 +1489,7 @@ void kv_op_t::create_root() { if (res == -EINTR) { + db->clear_allocation_block(blk->offset); auto blk_offset = blk->offset; del_block_level(db, blk); db->block_cache.erase(blk_offset); @@ -1413,6 +1498,8 @@ void kv_op_t::create_root() } else { + db->stop_writing_new(blk->offset); + db->confirm_allocation_block(blk->offset); db->stop_updating(blk); finish(res); }