diff --git a/src/kv_cli.cpp b/src/kv_cli.cpp index deebaedc..ec98ddf1 100644 --- a/src/kv_cli.cpp +++ b/src/kv_cli.cpp @@ -73,7 +73,7 @@ json11::Json::object kv_cli_t::parse_args(int narg, const char *args[]) "Vitastor Key/Value CLI\n" "(c) Vitaliy Filippov, 2023+ (VNPL-1.1)\n" "\n" - "USAGE: %s [--etcd_address ADDR] [--inode INODE_ID] [OTHER OPTIONS]\n", + "USAGE: %s [--etcd_address ADDR] [OTHER OPTIONS]\n", exe_name ); exit(0); diff --git a/src/kv_db.cpp b/src/kv_db.cpp index 21e83756..cfb69e47 100644 --- a/src/kv_db.cpp +++ b/src/kv_db.cpp @@ -22,10 +22,11 @@ #define KV_BLOCK_MAX_ITEMS 1048576 #define KV_INDEX_MAX_SIZE (uint64_t)1024*1024*1024*1024 -#define KV_GET 1 -#define KV_SET 2 -#define KV_DEL 3 -#define KV_LIST 4 +#define KV_GET_CACHED 1 +#define KV_GET 2 +#define KV_SET 3 +#define KV_DEL 4 +#define KV_LIST 5 #define KV_INT 1 #define KV_INT_SPLIT 2 @@ -38,6 +39,13 @@ #define KV_RECHECK_ALL 2 #define KV_RECHECK_RELOAD 3 +#define KV_CH_ADD 1 +#define KV_CH_DEL 2 +// UPD=ADD|DEL +#define KV_CH_UPD 3 +#define KV_CH_SPLIT 4 +#define KV_CH_CLEAR_RIGHT 8 + #define LEVEL_BITS 8 #define NO_LEVEL_MASK (((uint64_t)1 << (64-LEVEL_BITS)) - 1) @@ -62,9 +70,6 @@ struct kv_block_t uint32_t data_size; uint32_t type; uint64_t offset; - // only new version during modification is saved here - uint64_t new_version; - bool updating; // block only contains keys in [key_ge, key_lt). I.e. key_ge <= key < key_lt. std::string key_ge, key_lt; // KV_INT_SPLIT/KV_LEAF_SPLIT nodes also contain one reference to another block @@ -77,10 +82,20 @@ struct kv_block_t // FIXME: use flat map? std::map data; + // set during update + bool updating; + int change_type; + std::string change_key, change_value; + std::string change_rh; + uint64_t change_rh_block; + void set_data_size(); static int kv_size(const std::string & key, const std::string & value); - int parse(uint8_t *data, int size); + int parse(uint64_t offset, uint8_t *data, int size); bool serialize(uint8_t *data, int size); + void apply_change(); + void cancel_change(); + void dump(int base_level); }; void kv_block_t::set_data_size() @@ -118,6 +133,8 @@ struct kv_db_t std::set block_levels; std::map block_cache; std::map known_versions; + std::map new_versions; + std::multimap> continue_write; std::multimap> continue_update; bool closing = false; @@ -129,6 +146,7 @@ struct kv_db_t void close(std::function cb); void find_size(uint64_t min, uint64_t max, int phase, std::function cb); + void run_continue_update(uint64_t offset); void stop_updating(kv_block_t *blk); }; @@ -152,6 +170,7 @@ protected: int cur_level = 0; std::vector path; bool updated = false; + int retry = 0; bool skip_equal = false; void finish(int res); @@ -162,7 +181,7 @@ protected: void update_find(); void create_root(); void resume_split(); - void update_block(int path_pos, bool is_delete, const std::string & key, const std::string & value, std::function cb); + void update_block(int path_pos, bool is_delete, const std::string & key, const std::string & value, uint64_t block_ver, std::function cb); void next_handle_block(int res, bool updated); void next_get(); @@ -188,36 +207,49 @@ static std::string read_string(uint8_t *data, int size, int *pos) return key; } -int kv_block_t::parse(uint8_t *data, int size) +int kv_block_t::parse(uint64_t offset, uint8_t *data, int size) { kv_stored_block_t *blk = (kv_stored_block_t *)data; if (blk->magic == 0 || blk->type == KV_EMPTY) { // empty block + fprintf(stderr, "K/V: Block %lu is %s\n", offset, blk->magic == 0 ? "empty" : "cleared"); return -ENOTBLK; } if (blk->magic != KV_BLOCK_MAGIC || blk->block_size != size || - !blk->type || blk->type > KV_LEAF_SPLIT || blk->items > KV_BLOCK_MAX_ITEMS) + !blk->type || blk->type > KV_EMPTY || blk->items > KV_BLOCK_MAX_ITEMS) { // invalid block + fprintf(stderr, "K/V: Invalid block %lu magic, size, type or item count\n", offset); return -EILSEQ; } this->type = blk->type; - this->new_version = 0; int pos = blk->data - data; this->key_ge = read_string(data, size, &pos); if (pos < 0) + { + fprintf(stderr, "K/V: Invalid block %lu left bound\n", offset); return -EILSEQ; + } this->key_lt = read_string(data, size, &pos); if (pos < 0) + { + fprintf(stderr, "K/V: Invalid block %lu right bound\n", offset); return -EILSEQ; + } if (this->type == KV_INT_SPLIT || this->type == KV_LEAF_SPLIT) { this->right_half = read_string(data, size, &pos); if (pos < 0) + { + fprintf(stderr, "K/V: Invalid block %lu split bound\n", offset); return -EILSEQ; + } if (pos+8 > size) + { + fprintf(stderr, "K/V: Invalid block %lu split block ref\n", offset); return -EILSEQ; + } this->right_half_block = *(uint64_t*)(data+pos); pos += 8; } @@ -225,13 +257,20 @@ int kv_block_t::parse(uint8_t *data, int size) { auto key = read_string(data, size, &pos); if (pos < 0) + { + fprintf(stderr, "K/V: Invalid block %lu key %d\n", offset, i); return -EILSEQ; + } auto value = read_string(data, size, &pos); if (pos < 0) + { + fprintf(stderr, "K/V: Invalid block %lu value %d\n", offset, i); return -EILSEQ; + } this->data[key] = value; } this->data_size = pos; + this->offset = offset; return 0; } @@ -246,36 +285,183 @@ static bool write_string(uint8_t *data, int size, int *pos, const std::string & return true; } -bool kv_block_t::serialize(uint8_t *data, int size) +bool kv_block_t::serialize(uint8_t *buf, int size) { - kv_stored_block_t *blk = (kv_stored_block_t *)data; + kv_stored_block_t *blk = (kv_stored_block_t *)buf; blk->magic = KV_BLOCK_MAGIC; blk->block_size = size; - blk->type = this->type; - blk->items = this->data.size(); - int pos = blk->data - data; - if (!write_string(data, size, &pos, key_ge)) - return false; - if (!write_string(data, size, &pos, key_lt)) - return false; - if (this->type == KV_INT_SPLIT || this->type == KV_LEAF_SPLIT) + if ((change_type & KV_CH_CLEAR_RIGHT)) { - if (!write_string(data, size, &pos, right_half)) + if (type == KV_LEAF_SPLIT) + blk->type = KV_LEAF; + else if (type == KV_INT_SPLIT) + blk->type = KV_INT; + else + blk->type = type; + } + else if ((change_type & KV_CH_SPLIT)) + { + if (type == KV_LEAF) + blk->type = KV_LEAF_SPLIT; + else if (type == KV_INT) + blk->type = KV_INT_SPLIT; + else + blk->type = type; + } + else + blk->type = type; + int pos = blk->data - buf; + if (!write_string(buf, size, &pos, key_ge)) + return false; + if (!write_string(buf, size, &pos, (change_type & KV_CH_CLEAR_RIGHT) ? right_half : key_lt)) + return false; + if (blk->type == KV_LEAF_SPLIT || blk->type == KV_INT_SPLIT) + { + if (!write_string(buf, size, &pos, (change_type & KV_CH_SPLIT) ? change_rh : right_half)) return false; if (pos+8 > size) return false; - *(uint64_t*)(data+pos) = right_half_block; + *(uint64_t*)(buf+pos) = (change_type & KV_CH_SPLIT) ? change_rh_block : right_half_block; pos += 8; } - for (auto & kv: this->data) + auto old_it = (change_type & KV_CH_DEL) ? data.find(change_key) + : ((change_type & KV_CH_ADD) ? data.lower_bound(change_key) : data.end()); + auto end_it = (change_type & KV_CH_SPLIT) ? data.lower_bound(change_rh) : data.end(); + blk->items = 0; + for (auto kv_it = data.begin(); kv_it != end_it; kv_it++) { - if (!write_string(data, size, &pos, kv.first) || - !write_string(data, size, &pos, kv.second)) + if (!(change_type & KV_CH_DEL) || kv_it != old_it) + { + if (!write_string(buf, size, &pos, kv_it->first) || + !write_string(buf, size, &pos, kv_it->second)) + return false; + blk->items++; + } + if ((change_type & KV_CH_ADD) && kv_it == old_it) + { + if (!write_string(buf, size, &pos, change_key) || + !write_string(buf, size, &pos, change_value)) + return false; + blk->items++; + } + } + if ((change_type & KV_CH_ADD) && end_it == old_it) + { + if (!write_string(buf, size, &pos, change_key) || + !write_string(buf, size, &pos, change_value)) return false; + blk->items++; } return true; } +void kv_block_t::apply_change() +{ + if ((change_type & KV_CH_UPD) == KV_CH_DEL) + { + auto kv_it = data.find(change_key); + assert(kv_it != data.end()); + data_size -= kv_block_t::kv_size(kv_it->first, kv_it->second); + data.erase(kv_it); + } + if ((change_type & KV_CH_ADD)) + { + auto kv_it = data.find(change_key); + if (kv_it != data.end()) + data_size -= kv_block_t::kv_size(kv_it->first, kv_it->second); + data_size += kv_block_t::kv_size(change_key, change_value); + data[change_key] = change_value; + } + if ((change_type & KV_CH_CLEAR_RIGHT) && (type == KV_INT_SPLIT || type == KV_LEAF_SPLIT)) + { + type = (type == KV_LEAF_SPLIT ? KV_LEAF : KV_INT); + key_lt = right_half; + right_half = ""; + right_half_block = 0; + set_data_size(); + } + else if ((change_type & KV_CH_SPLIT) && (type == KV_INT || type == KV_LEAF)) + { + type = (type == KV_LEAF ? KV_LEAF_SPLIT : KV_INT_SPLIT); + right_half = change_rh; + right_half_block = change_rh_block; + data.erase(data.lower_bound(change_rh), data.end()); + set_data_size(); + } + change_type = 0; + change_key = change_value = change_rh = ""; + change_rh_block = 0; +} + +void kv_block_t::cancel_change() +{ + change_type = 0; + apply_change(); +} + +static const char *block_type_names[] = { + "unknown", + "int", + "int_split", + "leaf", + "leaf_split", + "empty", +}; + +static void dump_str(const std::string & str) +{ + size_t pos = 0; + fwrite("\"", 1, 1, stdout); + while (true) + { + auto pos2 = str.find('"', pos); + if (pos2 == std::string::npos) + { + fwrite(str.data()+pos, str.size()-pos, 1, stdout); + break; + } + else + { + fwrite(str.data()+pos, pos2-pos, 1, stdout); + fwrite("\\\"", 2, 1, stdout); + pos = pos2+1; + } + } + fwrite("\"", 1, 1, stdout); +} + +void kv_block_t::dump(int base_level) +{ + printf( + "{\n \"block\": %lu,\n \"level\": %d,\n \"type\": \"%s\",\n \"range\": [", + offset, base_level+level, + type < sizeof(block_type_names)/sizeof(block_type_names[0]) ? block_type_names[type] : "unknown" + ); + dump_str(key_ge); + printf(", "); + dump_str(key_lt); + printf("],\n"); + if (type == KV_INT_SPLIT || type == KV_LEAF_SPLIT) + { + printf(" \"right_half\": { "); + dump_str(right_half); + printf(": %lu },\n", right_half_block); + } + printf(" \"data\": {\n"); + for (auto & kv: data) + { + printf(" "); + dump_str(kv.first); + printf(": "); + if (type == KV_LEAF || type == KV_LEAF_SPLIT || kv.second.size() != 8) + dump_str(kv.second); + else + printf("%lu", *(uint64_t*)kv.second.c_str()); + printf(",\n"); + } + printf(" }\n}\n"); +} + void kv_db_t::open(inode_t inode_id, json11::Json cfg, std::function cb) { if (block_cache.size() > 0) @@ -413,16 +599,21 @@ void kv_db_t::find_size(uint64_t min, uint64_t max, int phase, std::functionexecute(op); } +void kv_db_t::run_continue_update(uint64_t offset) +{ + auto b_it = continue_update.find(offset); + if (b_it != continue_update.end()) + { + auto cb = b_it->second; + continue_update.erase(b_it); + cb(); + } +} + void kv_db_t::stop_updating(kv_block_t *blk) { blk->updating = false; - auto b_it = continue_update.find(blk->offset); - while (b_it != continue_update.end() && b_it->first == blk->offset) - { - auto cb = b_it->second; - continue_update.erase(b_it++); - cb(); - } + run_continue_update(blk->offset); } static void del_block_level(kv_db_t *db, kv_block_t *blk) @@ -437,12 +628,12 @@ static void add_block_level(kv_db_t *db, kv_block_t *blk) static void invalidate(kv_db_t *db, uint64_t offset, uint64_t version) { - if (db->known_versions[offset/db->ino_block_size] != version) + if (db->known_versions[offset/db->ino_block_size] < version) { auto b_it = db->block_cache.lower_bound(offset/db->ino_block_size * db->ino_block_size); while (b_it != db->block_cache.end() && b_it->first/db->ino_block_size == offset/db->ino_block_size) { - if (b_it->second.new_version != 0 || b_it->second.updating) + if (b_it->second.updating) { // do not forget blocks during modification b_it++; @@ -498,8 +689,7 @@ static void try_evict(kv_db_t *db) { auto b_it = db->block_cache.find((*random_it & NO_LEVEL_MASK) * db->kv_block_size); auto blk = &b_it->second; - if (b_it != db->block_cache.end() && !blk->updating && - !blk->new_version && blk->usage < db->usage_counter) + if (b_it != db->block_cache.end() && !blk->updating && blk->usage < db->usage_counter) { db->block_cache.erase(b_it); db->block_levels.erase(random_it++); @@ -530,8 +720,7 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p auto b_it = db->block_cache.find(offset); if (b_it != db->block_cache.end() && (recheck_policy == KV_RECHECK_NONE || recheck_policy == KV_RECHECK_LEAF && b_it->second.type != KV_LEAF || - // block is being modified - b_it->second.updating || b_it->second.new_version != 0)) + b_it->second.updating)) { // Block already in cache, we can proceed b_it->second.usage = db->usage_counter; @@ -552,22 +741,21 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p cb(op->retval >= 0 ? -EIO : op->retval, false); return; } - if (db->known_versions[op->offset/db->ino_block_size] == op->version && - db->block_cache.find(op->offset) != db->block_cache.end()) + if (db->block_cache.find(op->offset) != db->block_cache.end() && + db->known_versions[op->offset/db->ino_block_size] == op->version) { auto blk = &db->block_cache.at(op->offset); blk->usage = db->usage_counter; - cb(0, true); + cb(0, false); } else { invalidate(db, op->offset, op->version); try_evict(db); auto blk = &db->block_cache[op->offset]; - int err = blk->parse((uint8_t*)op->iov.buf[0].iov_base, op->len); + int err = blk->parse(op->offset, (uint8_t*)op->iov.buf[0].iov_base, op->len); if (err == 0) { - blk->offset = op->offset; blk->level = cur_level; blk->usage = db->usage_counter; add_block_level(db, blk); @@ -587,23 +775,25 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p void kv_op_t::exec() { + if (started) + return; + started = true; + db->active_ops++; if (!db->inode_id || db->closing) { - db->active_ops++; finish(-EINVAL); return; } - if (started) - return; - db->active_ops++; - started = true; if (++db->evict_unused_counter >= db->evict_unused_age) { db->evict_unused_counter = 0; db->usage_counter++; } cur_level = -db->base_block_level; - if (opcode == KV_GET) + path.clear(); + path.push_back(0); + recheck_policy = (opcode == KV_GET_CACHED ? KV_RECHECK_NONE : KV_RECHECK_LEAF); + if (opcode == KV_GET || opcode == KV_GET_CACHED) get(); else if (opcode == KV_SET || opcode == KV_DEL) update(); @@ -636,7 +826,13 @@ void kv_op_t::get() } else if (res == -ENOTBLK) { - finish(cur_block == 0 ? -ENOENT : -EILSEQ); + if (cur_block != 0) + { + fprintf(stderr, "K/V: Hit empty block %lu while searching\n", cur_block); + finish(-EILSEQ); + } + else + finish(-ENOENT); } else if (res < 0) { @@ -678,13 +874,30 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split) // 4) A = [a, c) // We may read P on step (1), get a link to A, and read A on step (4). // It will miss data from [c, b). - if (!this->updated) + // Retry once. If we don't see any updates after retrying - fail with EILSEQ. + fprintf(stderr, "K/V: %sgot unrelated block %lu: key=%s range=[%s, %s) from=[%s, %s)\n", + !this->updated && this->retry > 0 ? "Error: " : "Warning: read/update collision: ", + cur_block, key.c_str(), blk->key_ge.c_str(), blk->key_lt.c_str(), prev_key_ge.c_str(), prev_key_lt.c_str()); + if (this->updated) + { + this->updated = false; + this->retry = 0; + } + else if (this->retry > 0) + { return -EILSEQ; + } + else + { + this->updated = false; + this->retry++; + } prev_key_ge = prev_key_lt = ""; cur_level = -db->base_block_level; cur_block = 0; + path.clear(); + path.push_back(0); this->recheck_policy = KV_RECHECK_ALL; - this->updated = false; return -EAGAIN; } if (stop_on_split && (blk->type == KV_LEAF_SPLIT || blk->type == KV_INT_SPLIT) && @@ -695,6 +908,8 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split) else if ((blk->type == KV_INT_SPLIT || blk->type == KV_LEAF_SPLIT) && key >= blk->right_half) { cur_block = blk->right_half_block; + assert(path.size() > 0); + path[path.size()-1] = cur_block; prev_key_ge = blk->right_half; prev_key_lt = blk->key_lt; return -EAGAIN; @@ -708,12 +923,15 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split) auto child_it = blk->data.upper_bound(key); if (child_it == blk->data.begin()) { + fprintf(stderr, "K/V: Internal block %lu misses boundary for %s\n", cur_block, key.c_str()); return -EILSEQ; } auto m = child_it == blk->data.end() ? "" : child_it->first; child_it--; if (child_it->second.size() != sizeof(uint64_t)) { + fprintf(stderr, "K/V: Internal block %lu reference is not 8 byte long\n", cur_block); + blk->dump(db->base_block_level); return -EILSEQ; } // Track left and right boundaries which have led us to cur_block @@ -721,6 +939,7 @@ int kv_op_t::handle_block(int res, bool updated, bool stop_on_split) prev_key_lt = m; cur_level++; cur_block = *((uint64_t*)child_it->second.data()); + path.push_back(cur_block); return -EAGAIN; } return 0; @@ -752,35 +971,48 @@ static std::string find_splitter(kv_db_t *db, kv_block_t *blk) static void write_block(kv_db_t *db, kv_block_t *blk, std::function cb) { - blk->new_version = 1+db->known_versions[blk->offset/db->ino_block_size]; + auto & new_version = db->new_versions[blk->offset/db->ino_block_size]; + if (new_version != 0) + { + // Wait if block is being modified + db->continue_write.emplace(blk->offset/db->ino_block_size, [=]() { write_block(db, blk, cb); }); + return; + } + new_version = 1+db->known_versions[blk->offset/db->ino_block_size]; auto op = new cluster_op_t; op->opcode = OSD_OP_WRITE; op->inode = db->inode_id; op->offset = blk->offset; - op->version = blk->new_version; + op->version = new_version; op->len = db->kv_block_size; op->iov.push_back(malloc_or_die(op->len), op->len); if (!blk->serialize((uint8_t*)op->iov.buf[0].iov_base, op->len)) { - fprintf(stderr, "BUG: failed to serialize block %lu\n", blk->offset); + fprintf(stderr, "K/V: block %lu grew too large: %u bytes\n", blk->offset, blk->data_size); abort(); return; } op->callback = [db, blk, cb](cluster_op_t *op) { + db->new_versions.erase(blk->offset/db->ino_block_size); free(op->iov.buf[0].iov_base); - int res = op->retval; - if (res == op->len) + int res = op->retval == op->len ? 0 : (op->retval > 0 ? -EIO : op->retval); + if (res == 0) { - res = 0; - blk->new_version = 0; - db->known_versions[op->offset/db->ino_block_size] = op->version; + db->known_versions[blk->offset/db->ino_block_size] = op->version; + } + auto b_it = db->continue_write.find(blk->offset/db->ino_block_size); + if (b_it != db->continue_write.end()) + { + auto cont_cb = b_it->second; + db->continue_write.erase(b_it); + cont_cb(); } - else if (res >= 0) - res = -EIO; delete op; if (res < 0 || db->immediate_commit) + { cb(res); + } else { op = new cluster_op_t; @@ -797,7 +1029,8 @@ static void write_block(kv_db_t *db, kv_block_t *blk, std::function c db->cli->execute(op); } -static kv_block_t *create_new_block(kv_db_t *db, kv_block_t *old_blk, std::string separator, bool right) +static kv_block_t *create_new_block(kv_db_t *db, kv_block_t *old_blk, const std::string & separator, + const std::string & added_key, const std::string & added_value, bool right) { auto new_offset = db->next_free; db->next_free += db->kv_block_size; @@ -806,16 +1039,19 @@ static kv_block_t *create_new_block(kv_db_t *db, kv_block_t *old_blk, std::strin blk->level = old_blk->level; blk->type = old_blk->type; blk->offset = new_offset; + blk->updating = true; blk->key_ge = right ? separator : old_blk->key_ge; blk->key_lt = right ? old_blk->key_lt : separator; blk->data.insert(right ? old_blk->data.lower_bound(separator) : old_blk->data.begin(), right ? old_blk->data.end() : old_blk->data.lower_bound(separator)); + if ((added_key >= separator) == right) + blk->data[added_key] = added_value; blk->set_data_size(); add_block_level(db, blk); return blk; } -static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function cb) +static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function cb) { write_block(db, blk, [=](int res) { @@ -835,7 +1071,7 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function free the new unreferenced block and die del_block_level(db, blk); db->block_cache.erase(blk->offset); - cb(op->retval >= 0 ? -EIO : op->retval); + cb(op->retval >= 0 ? -EIO : op->retval, NULL); return; } invalidate(db, op->offset, op->version); @@ -847,14 +1083,14 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function place again + auto old_offset = blk->offset; auto new_offset = db->next_free; db->next_free += db->kv_block_size; - db->block_cache[new_offset] = std::move(db->block_cache[blk->offset]); - auto new_blk = &db->block_cache[new_offset]; - *new_blk = *blk; - new_blk->offset = new_offset; - db->block_cache.erase(blk->offset); del_block_level(db, blk); + std::swap(db->block_cache[new_offset], db->block_cache[old_offset]); + db->block_cache.erase(old_offset); + auto new_blk = &db->block_cache[new_offset]; + new_blk->offset = new_offset; add_block_level(db, new_blk); write_new_block(db, new_blk, cb); } @@ -868,12 +1104,12 @@ static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function free the new unreferenced block and die del_block_level(db, blk); db->block_cache.erase(blk->offset); - cb(res > 0 ? -EIO : res); + cb(res > 0 ? -EIO : res, NULL); } else { // OK - cb(0); + cb(0, blk); } }); } @@ -913,26 +1149,39 @@ void kv_op_t::update() finish(-EINVAL); return; } - // The beginning is (almost) the same as in kv_get(). First we find path to the item + prev_key_ge = prev_key_lt = ""; + cur_level = -db->base_block_level; + cur_block = 0; + path.clear(); + path.push_back(0); + // The beginning is (almost) the same as in get(). First we find path to the item update_find(); } void kv_op_t::update_find() { - if (!cur_block) + auto blk_it = db->block_cache.find(cur_block); + if (blk_it != db->block_cache.end()) { - path.clear(); + auto blk = &blk_it->second; + if (blk->updating) + { + // Optimisation: do not recheck the block if it's already being modified + db->continue_update.emplace(blk->offset, [=]() { update_find(); }); + return; + } } - path.push_back(cur_block); - get_block(db, cur_block, cur_level, recheck_policy, [=](int res, bool updated) + get_block(db, cur_block, cur_level, recheck_policy, [=, checked_block = cur_block](int res, bool updated) { res = handle_block(res, updated, true); if (res == -EAGAIN) { + db->run_continue_update(checked_block); update_find(); } else if (res == -ENOTBLK) { + db->run_continue_update(checked_block); if (opcode == KV_SET) { // Check CAS callback @@ -941,20 +1190,24 @@ void kv_op_t::update_find() else create_root(); } + else if (cur_block == 0) + finish(-ENOENT); else - finish(cur_block == 0 ? -ENOENT : -EILSEQ); + fprintf(stderr, "K/V: Hit empty block %lu while searching\n", cur_block); } else if (res == -ECHILD) { + db->run_continue_update(checked_block); resume_split(); } else if (res < 0) { + db->run_continue_update(checked_block); finish(res); } else { - update_block(path.size()-1, opcode == KV_DEL, key, value, [=](int res) + update_block(path.size()-1, opcode == KV_DEL, key, value, 0, [=](int res) { finish(res); }); @@ -969,6 +1222,7 @@ void kv_op_t::create_root() // if a referenced non-root block is empty, we just return an error. if (cur_block != 0 || db->next_free != 0) { + fprintf(stderr, "K/V: create_root called with non-empty DB (cur_block=%lu)\n", cur_block); finish(-EILSEQ); return; } @@ -981,10 +1235,16 @@ void kv_op_t::create_root() blk->data[key] = value; blk->set_data_size(); add_block_level(db, blk); + blk->updating = true; write_block(db, blk, [=](int res) { + db->stop_updating(blk); if (res == -EINTR) + { + del_block_level(db, blk); + db->block_cache.erase(blk->offset); update(); + } else finish(res); }); @@ -998,13 +1258,14 @@ void kv_op_t::resume_split() if (path.size() == 1) { // It shouldn't be the root block because we don't split it via INT_SPLIT/LEAF_SPLIT + fprintf(stderr, "K/V: resume_split at root item (cur_block=%lu)\n", cur_block); finish(-EILSEQ); return; } auto blk = &db->block_cache.at(cur_block); update_block( path.size()-2, false, blk->right_half, - std::string((char*)&blk->right_half_block, sizeof(blk->right_half_block)), + std::string((char*)&blk->right_half_block, sizeof(blk->right_half_block)), 0, [=](int res) { if (res < 0) @@ -1017,24 +1278,33 @@ void kv_op_t::resume_split() ); } -void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key, const std::string & value, std::function cb) +void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key, const std::string & value, uint64_t block_ver, std::function cb) { - auto blk = &db->block_cache.at(path[path_pos]); + auto blk_it = db->block_cache.find(path[path_pos]); + if (blk_it == db->block_cache.end()) + { + // Block is not in cache anymore, recheck + db->run_continue_update(path[path_pos]); + update(); + return; + } + auto blk = &blk_it->second; + if (block_ver == 0) + { + block_ver = db->known_versions[blk->offset/db->ino_block_size]; + } if (blk->updating) { // Wait if block is being modified - db->continue_update.emplace(blk->offset, [=]() { update_block(path_pos, is_delete, key, value, cb); }); + db->continue_update.emplace(blk->offset, [=]() { update_block(path_pos, is_delete, key, value, block_ver, cb); }); return; } - if ((blk->type == KV_LEAF_SPLIT || blk->type == KV_INT_SPLIT) && prev_key_lt <= blk->right_half) + if (db->known_versions[blk->offset/db->ino_block_size] != block_ver) { - // This is a block split during previous updates - // Its parent is already updated because prev_key_lt <= blk->right_half - // That means we can safely remove the "split reference" - blk->type = KV_LEAF; - blk->key_lt = blk->right_half; - blk->right_half = ""; - blk->right_half_block = 0; + // Recheck if block was modified in the meantime + db->run_continue_update(blk->offset); + update(); + return; } uint32_t rm_size = 0; auto d_it = blk->data.find(key); @@ -1043,6 +1313,7 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key if (!is_delete && d_it->second == value) { // Nothing to do + db->run_continue_update(blk->offset); cb(0); return; } @@ -1051,44 +1322,60 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key else if (is_delete) { // Nothing to do + db->run_continue_update(blk->offset); cb(0); return; } if (cas_cb && path_pos == path.size()-1 && !cas_cb(d_it != blk->data.end() ? 0 : -ENOENT, d_it != blk->data.end() ? d_it->second : "")) { // CAS failure + db->run_continue_update(blk->offset); cb(-EAGAIN); return; } - if (!is_delete) - { - blk->data[key] = value; - blk->data_size = blk->data_size + kv_block_t::kv_size(key, value) - rm_size; - } - else - { - // FIXME We may want to merge blocks at some point, in that case we should: - // - read both merged blocks - // - add "merged into XX" reference to the second block and write it out - // - add entries from the second block to the first one and write it out - // - remove the reference from the parent block to the second block - // - zero out the second block - blk->data.erase(key); - blk->data_size -= rm_size; - } + // This condition means this is a block split during previous updates + // Its parent is already updated because prev_key_lt <= blk->right_half + // That means we can safely remove the "split reference" + assert(!blk->change_type); + blk->change_type = (blk->type == KV_LEAF_SPLIT || blk->type == KV_INT_SPLIT) && + (prev_key_lt != "" && prev_key_lt <= blk->right_half) + ? KV_CH_CLEAR_RIGHT + : 0; blk->updating = true; - if (is_delete || blk->data_size < db->kv_block_size) + if (is_delete || (blk->data_size + kv_block_t::kv_size(key, value) - rm_size) < db->kv_block_size) { + // New item fits. // No need to split the block => just modify and write it + if (is_delete) + { + blk->change_type |= KV_CH_DEL; + blk->change_key = key; + } + else + { + blk->change_type |= (d_it != blk->data.end() ? KV_CH_UPD : KV_CH_ADD); + blk->change_key = key; + blk->change_value = value; + } write_block(db, blk, [=](int res) { + if (res < 0) + blk->cancel_change(); + else + blk->apply_change(); db->stop_updating(blk); - if (res == -EINTR) + if (res == EINTR) { update(); } else { + // FIXME We may want to merge blocks at some point, in that case we should: + // - read both merged blocks + // - add "merged into XX" reference to the second block and write it out + // - add entries from the second block to the first one and write it out + // - remove the reference from the parent block to the second block + // - zero out the second block cb(res); } }); @@ -1097,11 +1384,13 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key // New item doesn't fit. The most interesting case. // Write the right half into a new block auto separator = find_splitter(db, blk); - auto right_blk = create_new_block(db, blk, separator, true); - write_new_block(db, right_blk, [=](int res) + auto orig_right_blk = create_new_block(db, blk, separator, key, value, true); + write_new_block(db, orig_right_blk, [=](int res, kv_block_t *right_blk) { + db->stop_updating(right_blk); if (res < 0) { + blk->cancel_change(); db->stop_updating(blk); cb(res); return; @@ -1110,31 +1399,34 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key { // Split the root block // Write the left half into a new block - auto left_blk = create_new_block(db, blk, separator, false); - write_new_block(db, left_blk, [=](int res) + auto orig_left_blk = create_new_block(db, blk, separator, key, value, false); + write_new_block(db, orig_left_blk, [=](int res, kv_block_t *left_blk) { + db->stop_updating(left_blk); if (res < 0) { + blk->cancel_change(); db->stop_updating(blk); cb(res); return; } - // Write references to halves into the root block - blk->type = KV_INT; - db->base_block_level++; - blk->level--; - blk->data.clear(); - blk->data[""] = std::string((char*)&left_blk->offset, sizeof(left_blk->offset)); - blk->data[separator] = std::string((char*)&right_blk->offset, sizeof(right_blk->offset)); - blk->set_data_size(); - write_block(db, blk, [=](int write_res) + // Write references to halves into the new root block + auto new_root = new kv_block_t; + new_root->offset = 0; + new_root->usage = db->usage_counter; + new_root->type = KV_INT; + new_root->level = blk->level-1; + new_root->change_type = 0; + new_root->data.clear(); + new_root->data[""] = std::string((char*)&left_blk->offset, sizeof(left_blk->offset)); + new_root->data[separator] = std::string((char*)&right_blk->offset, sizeof(right_blk->offset)); + new_root->set_data_size(); + write_block(db, new_root, [=](int write_res) { - db->stop_updating(blk); if (write_res < 0) { - del_block_level(db, blk); - db->block_cache.erase(blk->offset); - db->base_block_level--; + blk->cancel_change(); + db->stop_updating(blk); clear_block(db, left_blk, 0, [=, left_offset = left_blk->offset](int res) { if (res < 0) @@ -1153,8 +1445,12 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key } else { + std::swap(db->block_cache[0], *new_root); + db->base_block_level = -new_root->level; + db->stop_updating(&db->block_cache[0]); cb(0); } + delete new_root; }); }); } @@ -1163,22 +1459,29 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key if (path_pos == 0) { // Block number zero should always be the root block + fprintf(stderr, "K/V: root block is not 0, but %lu\n", cur_block); cb(-EILSEQ); return; } // Split a non-root block - blk->type = blk->type == KV_INT ? KV_INT_SPLIT : KV_LEAF_SPLIT; - blk->right_half = separator; - blk->right_half_block = right_blk->offset; - blk->data.erase(blk->data.lower_bound(separator), blk->data.end()); - blk->set_data_size(); + blk->change_type |= KV_CH_SPLIT; + blk->change_rh = separator; + blk->change_rh_block = right_blk->offset; + if (key < separator) + { + blk->change_type |= (blk->data.find(key) != blk->data.end() ? KV_CH_UPD : KV_CH_ADD); + blk->change_key = key; + blk->change_value = value; + } write_block(db, blk, [=](int write_res) { + if (write_res < 0) + blk->cancel_change(); + else + blk->apply_change(); db->stop_updating(blk); if (write_res < 0) { - del_block_level(db, blk); - db->block_cache.erase(blk->offset); clear_block(db, right_blk, 0, [=, right_offset = right_blk->offset](int res) { if (res < 0) @@ -1194,7 +1497,11 @@ void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key { // Add a reference to the parent block // Do not cleanup anything on failure because stored right_blk is already referenced - update_block(path_pos-1, false, separator, std::string((char*)&right_blk->offset, sizeof(right_blk->offset)), cb); + update_block( + path_pos-1, false, separator, + std::string((char*)&right_blk->offset, sizeof(right_blk->offset)), 0, + cb + ); } }); } @@ -1207,12 +1514,6 @@ void kv_op_t::next() { return; } - if (path.size() == 0 || path[path.size()-1] != cur_block) - { - if (!cur_block) - path.clear(); - path.push_back(cur_block); - } get_block(db, cur_block, cur_level, recheck_policy, [=](int res, bool updated) { next_handle_block(res, updated); @@ -1228,7 +1529,13 @@ void kv_op_t::next_handle_block(int res, bool updated) } else if (res == -ENOTBLK) { - finish(cur_block == 0 ? -ENOENT : -EILSEQ); + if (cur_block == 0) + finish(-ENOENT); + else + { + fprintf(stderr, "K/V: Hit empty block %lu while searching\n", cur_block); + finish(-EILSEQ); + } } else if (res < 0) { @@ -1246,13 +1553,14 @@ void kv_op_t::next_get() { auto blk = &db->block_cache.at(cur_block); auto kv_it = blk->data.lower_bound(key); - if (kv_it != blk->data.end() && kv_it->first == key && skip_equal) + if (skip_equal && kv_it != blk->data.end() && kv_it->first == key) { kv_it++; } if (kv_it != blk->data.end()) { // Send this item + assert(blk->type == KV_LEAF || blk->type == KV_LEAF_SPLIT); this->res = 0; this->key = kv_it->first; this->value = kv_it->second; @@ -1267,7 +1575,7 @@ void kv_op_t::next_get() key = blk->right_half; prev_key_ge = blk->right_half; prev_key_lt = blk->key_lt; - cur_block = blk->right_half_block; + path[path.size()-1] = cur_block = blk->right_half_block; next(); } else @@ -1301,6 +1609,8 @@ void kv_op_t::next_go_up() prev_key_ge = prev_key_lt = ""; cur_level = -db->base_block_level; cur_block = 0; + path.clear(); + path.push_back(0); next(); return; } @@ -1347,11 +1657,11 @@ void kv_dbw_t::close(std::function cb) db->close(cb); } -void kv_dbw_t::get(const std::string & key, std::function cb) +void kv_dbw_t::get(const std::string & key, std::function cb, bool cached) { auto *op = new kv_op_t; op->db = db; - op->opcode = KV_GET; + op->opcode = cached ? KV_GET_CACHED : KV_GET; op->key = key; op->callback = [cb](kv_op_t *op) { @@ -1386,7 +1696,7 @@ void kv_dbw_t::del(const std::string & key, std::function cb, { auto *op = new kv_op_t; op->db = db; - op->opcode = KV_SET; + op->opcode = KV_DEL; op->key = key; if (cas_compare) { diff --git a/src/kv_db.h b/src/kv_db.h index 45a1cc21..79d0b97c 100644 --- a/src/kv_db.h +++ b/src/kv_db.h @@ -21,7 +21,8 @@ struct kv_dbw_t uint64_t get_size(); - void get(const std::string & key, std::function cb); + void get(const std::string & key, std::function cb, + bool allow_old_cached = false); void set(const std::string & key, const std::string & value, std::function cb, std::function cas_compare = NULL); void del(const std::string & key, std::function cb, diff --git a/src/kv_stress.cpp b/src/kv_stress.cpp index 54557fb2..f62e681a 100644 --- a/src/kv_stress.cpp +++ b/src/kv_stress.cpp @@ -23,6 +23,7 @@ struct kv_test_listing_t void *handle = NULL; std::string next_after; std::set inflights; + bool error = false; }; class kv_test_t @@ -43,6 +44,7 @@ public: uint64_t max_key_len = 70; uint64_t min_value_len = 50; uint64_t max_value_len = 300; + bool stop_on_error = false; // FIXME: Multiple clients // FIXME: Print op statistics @@ -97,7 +99,44 @@ json11::Json::object kv_test_t::parse_args(int narg, const char *args[]) "Vitastor Key/Value DB stress tester / benchmark\n" "(c) Vitaliy Filippov, 2023+ (VNPL-1.1)\n" "\n" - "USAGE: %s [--etcd_address ADDR]\n", + "USAGE: %s --pool_id POOL_ID --inode_id INODE_ID [OPTIONS]\n" + " --op_count 1000000\n" + " Total operations to run during test\n" + " --parallelism 4\n" + " Run this number of operations in parallel\n" + " --get_prob 30000\n" + " Fraction of key retrieve operations\n" + " --add_prob 20000\n" + " Fraction of key addition operations\n" + " --update_prob 20000\n" + " Fraction of key update operations\n" + " --del_prob 30000\n" + " Fraction of key delete operations\n" + " --list_prob 300\n" + " Fraction of listing operations\n" + " --min_key_len 10\n" + " Minimum key size in bytes\n" + " --max_key_len 70\n" + " Maximum key size in bytes\n" + " --min_value_len 50\n" + " Minimum value size in bytes\n" + " --max_value_len 300\n" + " Maximum value size in bytes\n" + " --verify 1\n" + " Verify results of retrieve and list operations\n" + " Uses extra RAM because a copy of the DB is stored in memory\n" + " --stop_on_error 0\n" + " Stop on first execution error, mismatch, lost key or extra key during listing\n" + " --kv_memory_limit 128M\n" + " Maximum memory to use for vitastor-kv index cache\n" + " --kv_evict_max_misses 10\n" + " Eviction algorithm parameter: retry eviction from another random spot\n" + " if this number of keys is used currently or was used recently\n" + " --kv_evict_attempts_per_level 3\n" + " Retry eviction at most this number of times per tree level, starting\n" + " with bottom-most levels\n" + " --kv_evict_unused_age 1000\n" + " Evict only keys unused during this number of last operations\n", exe_name ); exit(0); @@ -139,6 +178,8 @@ void kv_test_t::run(json11::Json cfg) min_value_len = cfg["min_value_len"].uint64_value(); if (cfg["max_value_len"].uint64_value() > 0) max_value_len = cfg["max_value_len"].uint64_value(); + if (cfg["stop_on_error"].bool_value()) + stop_on_error = true; if (!cfg["kv_memory_limit"].is_null()) kv_cfg["kv_memory_limit"] = cfg["kv_memory_limit"]; if (!cfg["kv_evict_max_misses"].is_null()) @@ -260,9 +301,17 @@ void kv_test_t::loop() in_progress--; auto it = values.find(key); if (res != (it == values.end() ? -ENOENT : 0)) + { printf("ERROR: get %s: %d (%s)\n", key.c_str(), res, strerror(-res)); + if (stop_on_error) + exit(1); + } else if (it != values.end() && value != it->second) + { printf("ERROR: get %s: mismatch: %s vs %s\n", key.c_str(), value.c_str(), it->second.c_str()); + if (stop_on_error) + exit(1); + } else get_done++; ringloop->wakeup(); @@ -288,18 +337,24 @@ void kv_test_t::loop() continue; key = k_it->first; } + if (changing_keys.find(key) != changing_keys.end()) + continue; uint64_t value_len = min_value_len + (max_value_len > min_value_len ? lrand48() % (max_value_len-min_value_len) : 0); auto value = random_str(value_len); start_change(key); in_progress++; - printf("set %s\n", key.c_str()); + printf("set %s = %s\n", key.c_str(), value.c_str()); db->set(key, value, [this, key, value, is_add](int res) { stop_change(key); ops_done++; in_progress--; if (res != 0) + { printf("ERROR: set %s = %s: %d (%s)\n", key.c_str(), value.c_str(), res, strerror(-res)); + if (stop_on_error) + exit(1); + } else { if (is_add) @@ -319,6 +374,8 @@ void kv_test_t::loop() if (k_it == values.end()) continue; key = k_it->first; + if (changing_keys.find(key) != changing_keys.end()) + continue; start_change(key); in_progress++; printf("del %s\n", key.c_str()); @@ -328,7 +385,11 @@ void kv_test_t::loop() ops_done++; in_progress--; if (res != 0) + { printf("ERROR: del %s: %d (%s)\n", key.c_str(), res, strerror(-res)); + if (stop_on_error) + exit(1); + } else { del_done++; @@ -348,22 +409,33 @@ void kv_test_t::loop() lst->next_after = k_it == values.begin() ? "" : key; lst->inflights = changing_keys; listings.insert(lst); - printf("list %s\n", key.c_str()); + printf("list from %s\n", key.c_str()); db->list_next(lst->handle, [this, lst](int res, const std::string & key, const std::string & value) { if (res < 0) { if (res != -ENOENT) + { printf("ERROR: list: %d (%s)\n", res, strerror(-res)); + lst->error = true; + } else { list_done++; - auto k_it = values.upper_bound(lst->next_after); - while (k_it != values.end() && lst->inflights.find(k_it->first) != lst->inflights.end()) - k_it++; - if (k_it != values.end()) - printf("ERROR: list: missed all keys from %s\n", k_it->first.c_str()); + auto k_it = lst->next_after == "" ? values.begin() : values.upper_bound(lst->next_after); + while (k_it != values.end()) + { + while (k_it != values.end() && lst->inflights.find(k_it->first) != lst->inflights.end()) + k_it++; + if (k_it != values.end()) + { + printf("ERROR: list: missing key %s\n", (k_it++)->first.c_str()); + lst->error = true; + } + } } + if (lst->error && stop_on_error) + exit(1); ops_done++; in_progress--; db->list_close(lst->handle); @@ -377,14 +449,38 @@ void kv_test_t::loop() // Listing may return their old or new state if (lst->inflights.find(key) == lst->inflights.end()) { - auto k_it = values.upper_bound(lst->next_after); - while (k_it != values.end() && lst->inflights.find(k_it->first) != lst->inflights.end()) - k_it++; - if (k_it == values.end()) - printf("ERROR: list: returned extra key %s\n", key.c_str()); - else if (k_it->second != value) - printf("ERROR: list: mismatch: %s = %s but should be %s\n", key.c_str(), value.c_str(), k_it->second.c_str()); - lst->next_after = k_it->first; + auto k_it = lst->next_after == "" ? values.begin() : values.upper_bound(lst->next_after); + while (true) + { + while (k_it != values.end() && lst->inflights.find(k_it->first) != lst->inflights.end()) + { + k_it++; + } + if (k_it == values.end() || k_it->first > key) + { + printf("ERROR: list: extra key %s\n", key.c_str()); + lst->error = true; + break; + } + else if (k_it->first < key) + { + printf("ERROR: list: missing key %s\n", k_it->first.c_str()); + lst->error = true; + lst->next_after = k_it->first; + k_it++; + } + else + { + if (k_it->second != value) + { + printf("ERROR: list: mismatch: %s = %s but should be %s\n", + key.c_str(), value.c_str(), k_it->second.c_str()); + lst->error = true; + } + lst->next_after = k_it->first; + break; + } + } } db->list_next(lst->handle, NULL); }