diff --git a/src/kv_db.cpp b/src/kv_db.cpp index 9e9f760d..4605c640 100644 --- a/src/kv_db.cpp +++ b/src/kv_db.cpp @@ -51,6 +51,10 @@ struct __attribute__((__packed__)) kv_stored_block_t struct kv_block_t { + // level of the block. root block has level equal to -db->base_block_level + int level; + // usage flag. set to db->usage_counter when block is used + int usage; // current data size, to estimate whether the block can fit more items uint32_t data_size; uint32_t type; @@ -72,7 +76,7 @@ struct kv_block_t void set_data_size(); static int kv_size(const std::string & key, const std::string & value); - int parse(uint64_t offset, uint8_t *data, int size); + int parse(uint8_t *data, int size); bool serialize(uint8_t *data, int size); }; @@ -99,6 +103,10 @@ struct kv_db_t uint32_t kv_block_size = 0; uint32_t ino_block_size = 0; bool immediate_commit = false; + uint64_t cache_max_blocks = 0; + + int base_block_level = 0; + int usage_counter = 1; std::map block_cache; std::map known_versions; std::multimap> continue_update; @@ -131,6 +139,7 @@ protected: bool started = false; uint64_t cur_block = 0; std::string prev_key_ge, prev_key_lt; + int cur_level = 0; std::vector path; bool updated = false; bool skip_equal = false; @@ -169,7 +178,7 @@ static std::string read_string(uint8_t *data, int size, int *pos) return key; } -int kv_block_t::parse(uint64_t offset, uint8_t *data, int size) +int kv_block_t::parse(uint8_t *data, int size) { kv_stored_block_t *blk = (kv_stored_block_t *)data; if (blk->magic == 0 || blk->type == KV_EMPTY) @@ -184,7 +193,6 @@ int kv_block_t::parse(uint64_t offset, uint8_t *data, int size) return -EILSEQ; } this->type = blk->type; - this->offset = offset; this->new_version = 0; int pos = blk->data - data; this->key_ge = read_string(data, size, &pos); @@ -415,7 +423,7 @@ static void invalidate(kv_db_t *db, uint64_t offset, uint64_t version) } } -static void get_block(kv_db_t *db, uint64_t offset, int recheck_policy, std::function cb) +static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_policy, std::function cb) { // FIXME: Evict blocks from cache based on memory limit and block level auto b_it = db->block_cache.find(offset); @@ -425,6 +433,7 @@ static void get_block(kv_db_t *db, uint64_t offset, int recheck_policy, std::fun b_it->second.updating || b_it->second.new_version != 0)) { // Block already in cache, we can proceed + b_it->second.usage = db->usage_counter; cb(0, false); return; } @@ -445,14 +454,20 @@ static void get_block(kv_db_t *db, uint64_t offset, int recheck_policy, std::fun if (db->known_versions[op->offset/db->ino_block_size] == op->version && db->block_cache.find(op->offset) != db->block_cache.end()) { + auto blk = &db->block_cache.at(op->offset); + blk->usage = db->usage_counter; cb(0, true); } else { invalidate(db, op->offset, op->version); - int err = db->block_cache[op->offset].parse(op->offset, (uint8_t*)op->iov.buf[0].iov_base, op->len); + auto blk = &db->block_cache[op->offset]; + int err = blk->parse((uint8_t*)op->iov.buf[0].iov_base, op->len); if (err == 0) { + blk->offset = op->offset; + blk->level = cur_level; + blk->usage = db->usage_counter; cb(0, true); } else @@ -479,6 +494,7 @@ void kv_op_t::exec() return; db->active_ops++; started = true; + cur_level = -db->base_block_level; if (opcode == KV_GET) get(); else if (opcode == KV_SET || opcode == KV_DEL) @@ -503,7 +519,7 @@ void kv_op_t::finish(int res) void kv_op_t::get() { - get_block(db, cur_block, recheck_policy, [=](int res, bool updated) + get_block(db, cur_block, cur_level, recheck_policy, [=](int res, bool updated) { res = handle_block(res, updated, false); if (res == -EAGAIN) @@ -543,7 +559,7 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split) return res; } this->updated = this->updated || updated; - auto blk = & db->block_cache.at(cur_block); + auto blk = &db->block_cache.at(cur_block); if (key < blk->key_ge || blk->key_lt.size() && key >= blk->key_lt) { // We got an unrelated block - recheck the whole chain from the beginning @@ -557,6 +573,7 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split) if (!this->updated) return -EILSEQ; prev_key_ge = prev_key_lt = ""; + cur_level = -db->base_block_level; cur_block = 0; this->recheck_policy = KV_RECHECK_ALL; this->updated = false; @@ -594,6 +611,7 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split) // Track left and right boundaries which have led us to cur_block prev_key_ge = child_it->first; prev_key_lt = m; + cur_level++; cur_block = *((uint64_t*)child_it->second.data()); return -EAGAIN; } @@ -675,7 +693,9 @@ static kv_block_t *create_new_block(kv_db_t *db, kv_block_t *old_blk, std::strin { auto new_offset = db->next_free; db->next_free += db->kv_block_size; - auto blk = & db->block_cache[new_offset]; + auto blk = &db->block_cache[new_offset]; + blk->usage = db->usage_counter; + blk->level = old_blk->level; blk->type = old_blk->type; blk->offset = new_offset; blk->key_ge = right ? separator : old_blk->key_ge; @@ -722,7 +742,9 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::functionblock_cache[new_offset] = std::move(db->block_cache[blk->offset]); db->block_cache.erase(blk->offset); auto new_blk = &db->block_cache[new_offset]; + *new_blk = *blk; new_blk->offset = new_offset; + db->block_cache.erase(blk->offset); write_new_block(db, new_blk, cb); } free(op->iov.buf[0].iov_base); @@ -789,7 +811,7 @@ void kv_op_t::update_find() path.clear(); } path.push_back(cur_block); - get_block(db, cur_block, recheck_policy, [=](int res, bool updated) + get_block(db, cur_block, cur_level, recheck_policy, [=](int res, bool updated) { res = handle_block(res, updated, true); if (res == -EAGAIN) @@ -839,6 +861,8 @@ void kv_op_t::create_root() } db->next_free += db->kv_block_size; auto blk = &db->block_cache[0]; + blk->usage = db->usage_counter; + blk->level = -db->base_block_level; blk->type = KV_LEAF; blk->offset = 0; blk->data[key] = value; @@ -983,38 +1007,38 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key } // Write references to halves into the root block blk->type = KV_INT; + db->base_block_level++; + blk->level--; blk->data.clear(); blk->data[""] = std::string((char*)&left_blk->offset, sizeof(left_blk->offset)); blk->data[separator] = std::string((char*)&right_blk->offset, sizeof(right_blk->offset)); blk->set_data_size(); - write_block(db, blk, [=](int res) + write_block(db, blk, [=](int write_res) { db->stop_updating(blk); - if (res == -EINTR) + if (write_res < 0) { - // CAS failure - zero garbage left_blk and right_blk and retry from the beginning + db->base_block_level--; db->block_cache.erase(blk->offset); - clear_block(db, left_blk->offset, 0, [=](int res) + clear_block(db, left_blk->offset, 0, [=, left_offset = left_blk->offset](int res) { if (res < 0) - { - cb(res); - return; - } - clear_block(db, right_blk->offset, 0, [=](int res) + fprintf(stderr, "Failed to clear unreferenced block %lu: %s (code %d)\n", left_offset, strerror(-res), res); + clear_block(db, right_blk->offset, 0, [=, right_offset = right_blk->offset](int res) { if (res < 0) - { - cb(res); - return; - } - update(); + fprintf(stderr, "Failed to clear unreferenced block %lu: %s (code %d)\n", right_offset, strerror(-res), res); + // CAS failure - zero garbage left_blk and right_blk and retry from the beginning + if (write_res == -EINTR) + update(); + else + cb(write_res); }); }); } else { - cb(res); + cb(0); } }); }); @@ -1033,27 +1057,23 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key blk->right_half_block = right_blk->offset; blk->data.erase(blk->data.lower_bound(separator), blk->data.end()); blk->set_data_size(); - write_block(db, blk, [=](int res) + write_block(db, blk, [=](int write_res) { db->stop_updating(blk); - if (res == -EINTR) + if (write_res < 0) { - // CAS failure - zero garbage right_blk and retry from the beginning db->block_cache.erase(blk->offset); - clear_block(db, right_blk->offset, 0, [=](int res) + clear_block(db, right_blk->offset, 0, [=, right_offset = right_blk->offset](int res) { if (res < 0) - { - cb(res); - return; - } - update(); + fprintf(stderr, "Failed to clear unreferenced block %lu: %s (code %d)\n", right_offset, strerror(-res), res); + // CAS failure - zero garbage right_blk and retry from the beginning + if (write_res == -EINTR) + update(); + else + cb(write_res); }); } - else if (res < 0) - { - cb(res); - } else { // Add a reference to the parent block @@ -1077,7 +1097,7 @@ void kv_op_t::next() path.clear(); path.push_back(cur_block); } - get_block(db, cur_block, recheck_policy, [=](int res, bool updated) + get_block(db, cur_block, cur_level, recheck_policy, [=](int res, bool updated) { next_handle_block(res, updated); }); @@ -1154,6 +1174,7 @@ void kv_op_t::next_go_up() key = blk->key_lt; skip_equal = false; path.pop_back(); + cur_level--; cur_block = path[path.size()-1]; recheck_policy = KV_RECHECK_LEAF; // Check if we can resume listing from the next key @@ -1162,6 +1183,7 @@ void kv_op_t::next_go_up() { // Block is absent in cache, recheck from the beginning prev_key_ge = prev_key_lt = ""; + cur_level = -db->base_block_level; cur_block = 0; next(); return;