diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 81536932..fb24839f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -185,6 +185,16 @@ target_link_libraries(vitastor-nbd vitastor_client ) +# vitastor-kv +add_executable(vitastor-kv + kv_cli.cpp + kv_db.cpp + kv_db.h +) +target_link_libraries(vitastor-kv + vitastor_client +) + # vitastor-nfs add_executable(vitastor-nfs nfs_proxy.cpp diff --git a/src/cluster_client.cpp b/src/cluster_client.cpp index a054142a..f8567fd5 100644 --- a/src/cluster_client.cpp +++ b/src/cluster_client.cpp @@ -573,7 +573,7 @@ void cluster_client_t::execute_internal(cluster_op_t *op) return; } if (op->opcode == OSD_OP_WRITE && enable_writeback && !(op->flags & OP_FLUSH_BUFFER) && - !op->version /* FIXME no CAS writeback */) + !op->version /* no CAS writeback */) { if (wb->writebacks_active >= client_max_writeback_iodepth) { @@ -595,7 +595,7 @@ void cluster_client_t::execute_internal(cluster_op_t *op) } if (op->opcode == OSD_OP_WRITE && !(op->flags & OP_IMMEDIATE_COMMIT)) { - if (!(op->flags & OP_FLUSH_BUFFER)) + if (!(op->flags & OP_FLUSH_BUFFER) && !op->version /* no CAS write-repeat */) { wb->copy_write(op, CACHE_WRITTEN); } @@ -1196,7 +1196,7 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part) ); } } - else + else if (log_level > 0) { fprintf( stderr, "%s operation failed on OSD %ju: retval=%jd (expected %d)\n", diff --git a/src/kv_cli.cpp b/src/kv_cli.cpp new file mode 100644 index 00000000..60391b57 --- /dev/null +++ b/src/kv_cli.cpp @@ -0,0 +1,368 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 (see README.md for details) +// +// Vitastor shared key/value database test CLI + +#define _XOPEN_SOURCE +#include + +#include +#include +#include +#include +//#include + +#include "epoll_manager.h" +#include "str_util.h" +#include "kv_db.h" + +const char *exe_name = NULL; + +class kv_cli_t +{ +public: + kv_dbw_t *db = NULL; + ring_loop_t *ringloop = NULL; + epoll_manager_t *epmgr = NULL; + cluster_client_t *cli = NULL; + bool interactive = false; + int in_progress = 0; + char *cur_cmd = NULL; + int cur_cmd_size = 0, cur_cmd_alloc = 0; + bool finished = false, eof = false; + + ~kv_cli_t(); + + static json11::Json::object parse_args(int narg, const char *args[]); + void run(json11::Json cfg); + void read_cmd(); + void next_cmd(); + void handle_cmd(const std::string & cmd, std::function cb); +}; + +kv_cli_t::~kv_cli_t() +{ + if (cur_cmd) + { + free(cur_cmd); + cur_cmd = NULL; + } + cur_cmd_alloc = 0; + if (db) + delete db; + if (cli) + { + cli->flush(); + delete cli; + } + if (epmgr) + delete epmgr; + if (ringloop) + delete ringloop; +} + +json11::Json::object kv_cli_t::parse_args(int narg, const char *args[]) +{ + json11::Json::object cfg; + for (int i = 1; i < narg; i++) + { + if (!strcmp(args[i], "-h") || !strcmp(args[i], "--help")) + { + printf( + "Vitastor Key/Value CLI\n" + "(c) Vitaliy Filippov, 2023+ (VNPL-1.1)\n" + "\n" + "USAGE: %s [--etcd_address ADDR] [--inode INODE_ID] [OTHER OPTIONS]\n", + exe_name + ); + exit(0); + } + else if (args[i][0] == '-' && args[i][1] == '-') + { + const char *opt = args[i]+2; + cfg[opt] = !strcmp(opt, "json") || i == narg-1 ? "1" : args[++i]; + } + } + return cfg; +} + +void kv_cli_t::run(json11::Json cfg) +{ + // Create client + ringloop = new ring_loop_t(512); + epmgr = new epoll_manager_t(ringloop); + cli = new cluster_client_t(ringloop, epmgr->tfd, cfg); + db = new kv_dbw_t(cli); + // Load image metadata + while (!cli->is_ready()) + { + ringloop->loop(); + if (cli->is_ready()) + break; + ringloop->wait(); + } + // Run + fcntl(0, F_SETFL, fcntl(0, F_GETFL, 0) | O_NONBLOCK); + try + { + epmgr->tfd->set_fd_handler(0, false, [this](int fd, int events) + { + if (events & EPOLLIN) + { + read_cmd(); + } + if (events & EPOLLRDHUP) + { + epmgr->tfd->set_fd_handler(0, false, NULL); + finished = true; + } + }); + interactive = true; + printf("> "); + } + catch (std::exception & e) + { + // Can't add to epoll, STDIN is probably a file + read_cmd(); + } + while (!finished) + { + ringloop->loop(); + if (!finished) + ringloop->wait(); + } + // Destroy the client + delete db; + db = NULL; + cli->flush(); + delete cli; + delete epmgr; + delete ringloop; + cli = NULL; + epmgr = NULL; + ringloop = NULL; +} + +void kv_cli_t::read_cmd() +{ + if (!cur_cmd_alloc) + { + cur_cmd_alloc = 65536; + cur_cmd = (char*)malloc_or_die(cur_cmd_alloc); + } + while (cur_cmd_size < cur_cmd_alloc) + { + int r = read(0, cur_cmd+cur_cmd_size, cur_cmd_alloc-cur_cmd_size); + if (r < 0 && errno != EAGAIN) + fprintf(stderr, "Error reading from stdin: %s\n", strerror(errno)); + if (r > 0) + cur_cmd_size += r; + if (r == 0) + eof = true; + if (r <= 0) + break; + } + next_cmd(); +} + +void kv_cli_t::next_cmd() +{ + if (in_progress > 0) + { + return; + } + int pos = 0; + for (; pos < cur_cmd_size; pos++) + { + if (cur_cmd[pos] == '\n' || cur_cmd[pos] == '\r') + { + auto cmd = trim(std::string(cur_cmd, pos)); + pos++; + memmove(cur_cmd, cur_cmd+pos, cur_cmd_size-pos); + cur_cmd_size -= pos; + in_progress++; + handle_cmd(cmd, [this]() + { + in_progress--; + if (interactive) + printf("> "); + next_cmd(); + if (!in_progress) + read_cmd(); + }); + break; + } + } + if (eof && !in_progress) + { + finished = true; + } +} + +void kv_cli_t::handle_cmd(const std::string & cmd, std::function cb) +{ + if (cmd == "") + { + cb(); + return; + } + auto pos = cmd.find_first_of(" \t"); + if (pos != std::string::npos) + { + while (pos < cmd.size()-1 && (cmd[pos+1] == ' ' || cmd[pos+1] == '\t')) + pos++; + } + auto opname = strtolower(pos == std::string::npos ? cmd : cmd.substr(0, pos)); + if (opname == "open") + { + uint64_t pool_id = 0; + inode_t inode_id = 0; + uint32_t kv_block_size = 0; + int scanned = sscanf(cmd.c_str() + pos+1, "%lu %lu %u", &pool_id, &inode_id, &kv_block_size); + if (scanned == 2) + { + kv_block_size = 4096; + } + if (scanned < 2 || !pool_id || !inode_id || !kv_block_size || (kv_block_size & (kv_block_size-1)) != 0) + { + fprintf(stderr, "Usage: open [block_size]. Block size must be a power of 2. Default is 4096.\n"); + cb(); + return; + } + db->open(INODE_WITH_POOL(pool_id, inode_id), kv_block_size, [=](int res) + { + if (res < 0) + fprintf(stderr, "Error opening index: %s (code %d)\n", strerror(-res), res); + else + printf("Index opened. Current size: %lu bytes\n", db->get_size()); + cb(); + }); + } + else if (opname == "get" || opname == "set" || opname == "del") + { + if (opname == "get" || opname == "del") + { + if (pos == std::string::npos) + { + fprintf(stderr, "Usage: %s \n", opname.c_str()); + cb(); + return; + } + auto key = trim(cmd.substr(pos+1)); + if (opname == "get") + { + db->get(key, [this, cb](int res, const std::string & value) + { + if (res < 0) + fprintf(stderr, "Error: %s (code %d)\n", strerror(-res), res); + else + { + write(1, value.c_str(), value.size()); + write(1, "\n", 1); + } + cb(); + }); + } + else + { + db->del(key, [this, cb](int res) + { + if (res < 0) + fprintf(stderr, "Error: %s (code %d)\n", strerror(-res), res); + else + printf("OK\n"); + cb(); + }); + } + } + else + { + auto pos2 = cmd.find_first_of(" \t", pos+1); + if (pos2 == std::string::npos) + { + fprintf(stderr, "Usage: set \n"); + cb(); + return; + } + auto key = trim(cmd.substr(pos+1, pos2-pos-1)); + auto value = trim(cmd.substr(pos2+1)); + db->set(key, value, [this, cb](int res) + { + if (res < 0) + fprintf(stderr, "Error: %s (code %d)\n", strerror(-res), res); + else + printf("OK\n"); + cb(); + }); + } + } + else if (opname == "list") + { + std::string start, end; + if (pos != std::string::npos) + { + auto pos2 = cmd.find_first_of(" \t", pos+1); + if (pos2 != std::string::npos) + { + start = trim(cmd.substr(pos+1, pos2-pos-1)); + end = trim(cmd.substr(pos2+1)); + } + else + { + start = trim(cmd.substr(pos+1)); + } + } + void *handle = db->list_start(start); + db->list_next(handle, [=](int res, const std::string & key, const std::string & value) + { + if (res < 0) + { + if (res != -ENOENT) + { + fprintf(stderr, "Error: %s (code %d)\n", strerror(-res), res); + } + db->list_close(handle); + cb(); + } + else + { + printf("%s = %s\n", key.c_str(), value.c_str()); + db->list_next(handle, NULL); + } + }); + } + else if (opname == "close") + { + db->close([=]() + { + printf("Index closed\n"); + cb(); + }); + } + else if (opname == "quit" || opname == "q") + { + ::close(0); + finished = true; + } + else + { + fprintf( + stderr, "Unknown operation: %s. Supported operations:\n" + "open [block_size]\n" + "get \nset \ndel \nlist [ [end]]\n" + "close\nquit\n", opname.c_str() + ); + cb(); + } +} + +int main(int narg, const char *args[]) +{ + setvbuf(stdout, NULL, _IONBF, 0); + setvbuf(stderr, NULL, _IONBF, 0); + exe_name = args[0]; + kv_cli_t *p = new kv_cli_t(); + p->run(kv_cli_t::parse_args(narg, args)); + delete p; + return 0; +} diff --git a/src/kv_db.cpp b/src/kv_db.cpp new file mode 100644 index 00000000..9e9f760d --- /dev/null +++ b/src/kv_db.cpp @@ -0,0 +1,1287 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 (see README.md for details) +// +// Vitastor shared key/value database +// Parallel optimistic B-Tree :-) + +#define _XOPEN_SOURCE +#include + +#include +#include +#include +#include +//#include + +#include "cluster_client.h" +#include "str_util.h" +#include "kv_db.h" + +// 0x VITASTOR OPTBTREE +#define KV_BLOCK_MAGIC 0x761A5106097B18EE +#define KV_BLOCK_MAX_ITEMS 1048576 +#define KV_INDEX_MAX_SIZE (uint64_t)1024*1024*1024*1024 + +#define KV_GET 1 +#define KV_SET 2 +#define KV_DEL 3 +#define KV_LIST 4 + +#define KV_INT 1 +#define KV_INT_SPLIT 2 +#define KV_LEAF 3 +#define KV_LEAF_SPLIT 4 +#define KV_EMPTY 5 + +#define KV_RECHECK_NONE 0 +#define KV_RECHECK_LEAF 1 +#define KV_RECHECK_ALL 2 +#define KV_RECHECK_RELOAD 3 + +struct __attribute__((__packed__)) kv_stored_block_t +{ + uint64_t magic; + uint32_t block_size; + uint32_t type; // KV_* + uint64_t items; // number of items + // root/int nodes: { delimiter_len, delimiter..., 8, }[] + // leaf nodes: { key_len, key..., value_len, value... }[] + uint8_t data[0]; +}; + +struct kv_block_t +{ + // current data size, to estimate whether the block can fit more items + uint32_t data_size; + uint32_t type; + uint64_t offset; + // only new version during modification is saved here + uint64_t new_version; + bool updating; + // block only contains keys in [key_ge, key_lt). I.e. key_ge <= key < key_lt. + std::string key_ge, key_lt; + // KV_INT_SPLIT/KV_LEAF_SPLIT nodes also contain one reference to another block + // with keys in [right_half, key_lt) + // This is required for CAS to guarantee consistency during split + std::string right_half; + uint64_t right_half_block; + // non-leaf nodes: ( MIN_BOUND_i => BLOCK_i )[] + // leaf nodes: ( KEY_i => VALUE_i )[] + // FIXME: use flat map? + std::map data; + + void set_data_size(); + static int kv_size(const std::string & key, const std::string & value); + int parse(uint64_t offset, uint8_t *data, int size); + bool serialize(uint8_t *data, int size); +}; + +void kv_block_t::set_data_size() +{ + data_size = sizeof(kv_stored_block_t) + 4*2 + key_ge.size() + key_lt.size(); + if (this->type == KV_INT_SPLIT || this->type == KV_LEAF_SPLIT) + data_size += 4 + right_half.size() + 8; + for (auto & kv: data) + data_size += kv_size(kv.first, kv.second); +} + +int kv_block_t::kv_size(const std::string & key, const std::string & value) +{ + return 4*2 + key.size() + value.size(); +} + +struct kv_db_t +{ + cluster_client_t *cli = NULL; + + inode_t inode_id = 0; + uint64_t next_free = 0; + uint32_t kv_block_size = 0; + uint32_t ino_block_size = 0; + bool immediate_commit = false; + std::map block_cache; + std::map known_versions; + std::multimap> continue_update; + + bool closing = false; + int active_ops = 0; + std::function on_close; + + void open(inode_t inode_id, uint32_t kv_block_size, std::function cb); + void close(std::function cb); + + void find_size(uint64_t min, uint64_t max, int phase, std::function cb); + void stop_updating(kv_block_t *blk); +}; + +struct kv_op_t +{ + kv_db_t *db; + int opcode; + std::string key, value; + int res; + bool done = false; + std::function callback; + std::function cas_cb; + + void exec(); + void next(); // for list +protected: + int recheck_policy = KV_RECHECK_LEAF; + bool started = false; + uint64_t cur_block = 0; + std::string prev_key_ge, prev_key_lt; + std::vector path; + bool updated = false; + bool skip_equal = false; + + void finish(int res); + void get(); + int handle_block(int res, bool updated, bool stop_on_split); + + void update(); + void update_find(); + void create_root(); + void resume_split(); + void update_block(int path_pos, bool is_delete, const std::string & key, const std::string & value, std::function cb); + + void next_handle_block(int res, bool updated); + void next_get(); + void next_go_up(); +}; + +static std::string read_string(uint8_t *data, int size, int *pos) +{ + if (*pos+4 > size) + { + *pos = -1; + return ""; + } + uint32_t len = *(uint32_t*)(data+*pos); + *pos += sizeof(uint32_t); + if (*pos+len > size) + { + *pos = -1; + return ""; + } + std::string key((char*)data+*pos, len); + *pos += len; + return key; +} + +int kv_block_t::parse(uint64_t offset, uint8_t *data, int size) +{ + kv_stored_block_t *blk = (kv_stored_block_t *)data; + if (blk->magic == 0 || blk->type == KV_EMPTY) + { + // empty block + return -ENOTBLK; + } + if (blk->magic != KV_BLOCK_MAGIC || blk->block_size != size || + !blk->type || blk->type > KV_LEAF_SPLIT || blk->items > KV_BLOCK_MAX_ITEMS) + { + // invalid block + return -EILSEQ; + } + this->type = blk->type; + this->offset = offset; + this->new_version = 0; + int pos = blk->data - data; + this->key_ge = read_string(data, size, &pos); + if (pos < 0) + return -EILSEQ; + this->key_lt = read_string(data, size, &pos); + if (pos < 0) + return -EILSEQ; + if (this->type == KV_INT_SPLIT || this->type == KV_LEAF_SPLIT) + { + this->right_half = read_string(data, size, &pos); + if (pos < 0) + return -EILSEQ; + if (pos+8 > size) + return -EILSEQ; + this->right_half_block = *(uint64_t*)(data+pos); + pos += 8; + } + for (int i = 0; i < blk->items; i++) + { + auto key = read_string(data, size, &pos); + if (pos < 0) + return -EILSEQ; + auto value = read_string(data, size, &pos); + if (pos < 0) + return -EILSEQ; + this->data[key] = value; + } + this->data_size = pos; + return 0; +} + +static bool write_string(uint8_t *data, int size, int *pos, const std::string & s) +{ + if (*pos+s.size()+4 > size) + return false; + *(uint32_t*)(data+*pos) = s.size(); + *pos += 4; + memcpy(data+*pos, s.data(), s.size()); + *pos += s.size(); + return true; +} + +bool kv_block_t::serialize(uint8_t *data, int size) +{ + kv_stored_block_t *blk = (kv_stored_block_t *)data; + blk->magic = KV_BLOCK_MAGIC; + blk->block_size = size; + blk->type = this->type; + blk->items = this->data.size(); + int pos = blk->data - data; + if (!write_string(data, size, &pos, key_ge)) + return false; + if (!write_string(data, size, &pos, key_lt)) + return false; + if (this->type == KV_INT_SPLIT || this->type == KV_LEAF_SPLIT) + { + if (!write_string(data, size, &pos, right_half)) + return false; + if (pos+8 > size) + return false; + *(uint64_t*)(data+pos) = right_half_block; + pos += 8; + } + for (auto & kv: this->data) + { + if (!write_string(data, size, &pos, kv.first) || + !write_string(data, size, &pos, kv.second)) + return false; + } + return true; +} + +void kv_db_t::open(inode_t inode_id, uint32_t kv_block_size, std::function cb) +{ + if (block_cache.size() > 0) + { + cb(-EINVAL); + return; + } + auto pool_it = cli->st_cli.pool_config.find(INODE_POOL(inode_id)); + if (pool_it == cli->st_cli.pool_config.end()) + { + cb(-EINVAL); + return; + } + auto & pool_cfg = pool_it->second; + uint32_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks); + if ((pool_cfg.data_block_size*pg_data_size) % kv_block_size || + kv_block_size < pool_cfg.bitmap_granularity) + { + cb(-EINVAL); + return; + } + this->inode_id = inode_id; + this->immediate_commit = cli->get_immediate_commit(inode_id); + this->ino_block_size = pool_cfg.data_block_size * pg_data_size; + this->kv_block_size = kv_block_size; + this->next_free = 0; + // Find index size with binary search + find_size(0, 0, 1, [=](int res, uint64_t size) + { + if (res < 0) + { + this->inode_id = 0; + this->kv_block_size = 0; + } + this->next_free = size; + cb(res); + }); +} + +void kv_db_t::close(std::function cb) +{ + if (active_ops <= 0) + { + closing = false; + on_close = NULL; + inode_id = 0; + next_free = 0; + kv_block_size = 0; + ino_block_size = 0; + immediate_commit = false; + block_cache.clear(); + known_versions.clear(); + cb(); + } + else + { + closing = true; + on_close = cb; + } +} + +static bool is_zero(void *buf, int size) +{ + assert(!(size % 8)); + size /= 8; + uint64_t *ptr = (uint64_t*)buf; + for (int i = 0; i < size/8; i++) + if (ptr[i]) + return false; + return true; +} + +// Phase 1: try 2^i-1 for i=0,1,2,... +// Phase 2: binary search between 2^(N-1)-1 and 2^N-1 +void kv_db_t::find_size(uint64_t min, uint64_t max, int phase, std::function cb) +{ + if (min == max-1) + { + cb(0, max*kv_block_size); + return; + } + if (phase == 1 && min >= KV_INDEX_MAX_SIZE) + { + cb(-EFBIG, 0); + return; + } + cluster_op_t *op = new cluster_op_t; + op->opcode = OSD_OP_READ; + op->inode = inode_id; + op->offset = (phase == 1 ? min : (min+max)/2) * kv_block_size; + op->len = kv_block_size; + if (op->len) + { + op->iov.push_back(malloc_or_die(op->len), op->len); + } + op->callback = [=](cluster_op_t *op) + { + if (op->retval != op->len) + { + // error + cb(op->retval >= 0 ? -EIO : op->retval, 0); + return; + } + known_versions[op->offset / ino_block_size] = op->version; + if (!is_zero(op->iov.buf[0].iov_base, kv_block_size)) + { + if (phase == 1) + find_size((min+1)*2 - 1, 0, 1, cb); + else + find_size((min+max)/2, max, 2, cb); + } + else + { + if (phase == 1) + find_size((min+1)/2 - 1, min, 2, cb); + else + find_size(min, (min+max)/2, 2, cb); + } + free(op->iov.buf[0].iov_base); + delete op; + }; + cli->execute(op); +} + +void kv_db_t::stop_updating(kv_block_t *blk) +{ + blk->updating = false; + auto b_it = continue_update.find(blk->offset); + while (b_it != continue_update.end() && b_it->first == blk->offset) + { + auto cb = b_it->second; + continue_update.erase(b_it++); + cb(); + } +} + +static void invalidate(kv_db_t *db, uint64_t offset, uint64_t version) +{ + if (db->known_versions[offset/db->ino_block_size] != version) + { + auto b_it = db->block_cache.lower_bound(offset/db->ino_block_size * db->ino_block_size); + while (b_it != db->block_cache.end() && b_it->first/db->ino_block_size == offset/db->ino_block_size) + { + if (b_it->second.new_version != 0 || b_it->second.updating) + { + // do not forget blocks during modification + b_it++; + } + else + { + db->block_cache.erase(b_it++); + } + } + db->known_versions[offset/db->ino_block_size] = version; + } +} + +static void get_block(kv_db_t *db, uint64_t offset, int recheck_policy, std::function cb) +{ + // FIXME: Evict blocks from cache based on memory limit and block level + auto b_it = db->block_cache.find(offset); + if (b_it != db->block_cache.end() && (recheck_policy == KV_RECHECK_NONE || + recheck_policy == KV_RECHECK_LEAF && b_it->second.type != KV_LEAF || + // block is being modified + b_it->second.updating || b_it->second.new_version != 0)) + { + // Block already in cache, we can proceed + cb(0, false); + return; + } + cluster_op_t *op = new cluster_op_t; + op->opcode = OSD_OP_READ; + op->inode = db->inode_id; + op->offset = offset; + op->len = db->kv_block_size; + op->iov.push_back(malloc_or_die(op->len), op->len); + op->callback = [=](cluster_op_t *op) + { + if (op->retval != op->len) + { + // error + cb(op->retval >= 0 ? -EIO : op->retval, false); + return; + } + if (db->known_versions[op->offset/db->ino_block_size] == op->version && + db->block_cache.find(op->offset) != db->block_cache.end()) + { + cb(0, true); + } + else + { + invalidate(db, op->offset, op->version); + int err = db->block_cache[op->offset].parse(op->offset, (uint8_t*)op->iov.buf[0].iov_base, op->len); + if (err == 0) + { + cb(0, true); + } + else + { + db->block_cache.erase(op->offset); + cb(err, false); + } + } + free(op->iov.buf[0].iov_base); + delete op; + }; + db->cli->execute(op); +} + +void kv_op_t::exec() +{ + if (!db->inode_id || db->closing) + { + db->active_ops++; + finish(-EINVAL); + return; + } + if (started) + return; + db->active_ops++; + started = true; + if (opcode == KV_GET) + get(); + else if (opcode == KV_SET || opcode == KV_DEL) + update(); + else if (opcode == KV_LIST) + { + // Do nothing, next() does everything + } + else + finish(-ENOSYS); +} + +void kv_op_t::finish(int res) +{ + this->res = res; + this->done = true; + db->active_ops--; + if (!db->active_ops && db->closing) + db->close(db->on_close); + (std::function(callback))(this); +} + +void kv_op_t::get() +{ + get_block(db, cur_block, recheck_policy, [=](int res, bool updated) + { + res = handle_block(res, updated, false); + if (res == -EAGAIN) + { + get(); + } + else if (res == -ENOTBLK) + { + finish(cur_block == 0 ? -ENOENT : -EILSEQ); + } + else if (res < 0) + { + finish(res); + } + else + { + auto blk = &db->block_cache.at(cur_block); + auto kv_it = blk->data.find(key); + if (kv_it == blk->data.end()) + { + finish(-ENOENT); + } + else + { + this->res = 0; + this->value = kv_it->second; + finish(0); + } + } + }); +} + +int kv_op_t::handle_block(int res, bool updated, bool stop_on_split) +{ + if (res < 0) + { + return res; + } + this->updated = this->updated || updated; + auto blk = & db->block_cache.at(cur_block); + if (key < blk->key_ge || blk->key_lt.size() && key >= blk->key_lt) + { + // We got an unrelated block - recheck the whole chain from the beginning + // May happen during split: + // 1) Initially parent P points to block A, A contains [a, b) + // 2) New block B = [c, b) + // 3) P = ->A + ->B + // 4) A = [a, c) + // We may read P on step (1), get a link to A, and read A on step (4). + // It will miss data from [c, b). + if (!this->updated) + return -EILSEQ; + prev_key_ge = prev_key_lt = ""; + cur_block = 0; + this->recheck_policy = KV_RECHECK_ALL; + this->updated = false; + return -EAGAIN; + } + if (stop_on_split && (blk->type == KV_LEAF_SPLIT || blk->type == KV_INT_SPLIT) && + prev_key_lt > blk->right_half) + { + return -ECHILD; + } + else if ((blk->type == KV_INT_SPLIT || blk->type == KV_LEAF_SPLIT) && key >= blk->right_half) + { + cur_block = blk->right_half_block; + prev_key_ge = blk->right_half; + prev_key_lt = blk->key_lt; + return -EAGAIN; + } + else if (blk->type == KV_LEAF || blk->type == KV_LEAF_SPLIT) + { + return 0; + } + else + { + auto child_it = blk->data.upper_bound(key); + if (child_it == blk->data.begin()) + { + return -EILSEQ; + } + auto m = child_it == blk->data.end() ? "" : child_it->first; + child_it--; + if (child_it->second.size() != sizeof(uint64_t)) + { + return -EILSEQ; + } + // Track left and right boundaries which have led us to cur_block + prev_key_ge = child_it->first; + prev_key_lt = m; + cur_block = *((uint64_t*)child_it->second.data()); + return -EAGAIN; + } + return 0; +} + +static std::string find_splitter(kv_db_t *db, kv_block_t *blk) +{ + uint32_t new_size = blk->data_size; + auto d_it = blk->data.end(); + while (d_it != blk->data.begin() && new_size > db->kv_block_size/2) + { + d_it--; + new_size -= kv_block_t::kv_size(d_it->first, d_it->second); + } + assert(d_it != blk->data.begin() && d_it != blk->data.end()); + if (blk->type != KV_LEAF && blk->type != KV_LEAF_SPLIT) + { + return d_it->first; + } + auto prev_it = std::prev(d_it); + int i = 0; + while (i < d_it->first.size() && i < prev_it->first.size() && d_it->first[i] == prev_it->first[i]) + { + i++; + } + auto separator = i < d_it->first.size() ? d_it->first.substr(0, i+1) : d_it->first; + return separator; +} + +static void write_block(kv_db_t *db, kv_block_t *blk, std::function cb) +{ + blk->new_version = 1+db->known_versions[blk->offset/db->ino_block_size]; + auto op = new cluster_op_t; + op->opcode = OSD_OP_WRITE; + op->inode = db->inode_id; + op->offset = blk->offset; + op->version = blk->new_version; + op->len = db->kv_block_size; + op->iov.push_back(malloc_or_die(op->len), op->len); + if (!blk->serialize((uint8_t*)op->iov.buf[0].iov_base, op->len)) + { + fprintf(stderr, "BUG: failed to serialize block %lu\n", blk->offset); + abort(); + return; + } + op->callback = [db, blk, cb](cluster_op_t *op) + { + free(op->iov.buf[0].iov_base); + int res = op->retval; + if (res == op->len) + { + res = 0; + blk->new_version = 0; + db->known_versions[op->offset/db->ino_block_size] = op->version; + } + else if (res >= 0) + res = -EIO; + delete op; + if (res < 0 || db->immediate_commit) + cb(res); + else + { + op = new cluster_op_t; + op->opcode = OSD_OP_SYNC; + op->callback = [cb](cluster_op_t *op) + { + auto res = op->retval; + delete op; + cb(res); + }; + db->cli->execute(op); + } + }; + db->cli->execute(op); +} + +static kv_block_t *create_new_block(kv_db_t *db, kv_block_t *old_blk, std::string separator, bool right) +{ + auto new_offset = db->next_free; + db->next_free += db->kv_block_size; + auto blk = & db->block_cache[new_offset]; + blk->type = old_blk->type; + blk->offset = new_offset; + blk->key_ge = right ? separator : old_blk->key_ge; + blk->key_lt = right ? old_blk->key_lt : separator; + blk->data.insert(right ? old_blk->data.lower_bound(separator) : old_blk->data.begin(), + right ? old_blk->data.end() : old_blk->data.lower_bound(separator)); + blk->set_data_size(); + return blk; +} + +static void write_new_block(kv_db_t *db, kv_block_t *blk, std::function cb) +{ + write_block(db, blk, [=](int res) + { + if (res == -EINTR) + { + // CAS failure => re-read, then, if not zero, find position again and retry + cluster_op_t *op = new cluster_op_t; + op->opcode = OSD_OP_READ; + op->inode = db->inode_id; + op->offset = blk->offset; + op->len = db->kv_block_size; + op->iov.push_back(malloc_or_die(op->len), op->len); + op->callback = [=](cluster_op_t *op) + { + if (op->retval != op->len) + { + // Failure => free the new unreferenced block and die + db->block_cache.erase(blk->offset); + cb(op->retval >= 0 ? -EIO : op->retval); + return; + } + invalidate(db, op->offset, op->version); + if (is_zero(op->iov.buf[0].iov_base, db->kv_block_size)) + { + // OK, block is still empty, but the version apparently changed + write_new_block(db, blk, cb); + } + else + { + // Block is already occupied => place again + auto new_offset = db->next_free; + db->next_free += db->kv_block_size; + db->block_cache[new_offset] = std::move(db->block_cache[blk->offset]); + db->block_cache.erase(blk->offset); + auto new_blk = &db->block_cache[new_offset]; + new_blk->offset = new_offset; + write_new_block(db, new_blk, cb); + } + free(op->iov.buf[0].iov_base); + delete op; + }; + db->cli->execute(op); + } + else if (res != 0) + { + // Other failure => free the new unreferenced block and die + db->block_cache.erase(blk->offset); + cb(res > 0 ? -EIO : res); + } + else + { + // OK + cb(0); + } + }); +} + +static void clear_block(kv_db_t *db, uint64_t offset, uint64_t version, std::function cb) +{ + cluster_op_t *op = new cluster_op_t; + op->opcode = OSD_OP_WRITE; + op->inode = db->inode_id; + op->offset = offset; + op->version = version; + op->len = db->kv_block_size; + op->iov.push_back(malloc_or_die(op->len), op->len); + memset(op->iov.buf[0].iov_base, 0, op->len); + kv_stored_block_t *sb = (kv_stored_block_t *)op->iov.buf[0].iov_base; + // Write non-zero data to not get mistaken when finding index size with binary search + sb->magic = KV_BLOCK_MAGIC; + sb->block_size = db->kv_block_size; + sb->type = KV_EMPTY; + op->callback = [cb](cluster_op_t *op) + { + free(op->iov.buf[0].iov_base); + auto res = op->retval == op->len ? 0 : (op->retval >= 0 ? -EIO : op->retval); + delete op; + cb(res); + }; + db->block_cache.erase(offset); + db->cli->execute(op); +} + +void kv_op_t::update() +{ + if (opcode == KV_SET && kv_block_t::kv_size(key, value) > (db->kv_block_size-sizeof(kv_stored_block_t)) / 4) + { + // New item is too large for this B-Tree + finish(-EINVAL); + return; + } + // The beginning is (almost) the same as in kv_get(). First we find path to the item + update_find(); +} + +void kv_op_t::update_find() +{ + if (!cur_block) + { + path.clear(); + } + path.push_back(cur_block); + get_block(db, cur_block, recheck_policy, [=](int res, bool updated) + { + res = handle_block(res, updated, true); + if (res == -EAGAIN) + { + update_find(); + } + else if (res == -ENOTBLK) + { + if (opcode == KV_SET) + { + // Check CAS callback + if (cas_cb && !cas_cb(-ENOENT, "")) + finish(-EAGAIN); + else + create_root(); + } + else + finish(cur_block == 0 ? -ENOENT : -EILSEQ); + } + else if (res == -ECHILD) + { + resume_split(); + } + else if (res < 0) + { + finish(res); + } + else + { + update_block(path.size()-1, opcode == KV_DEL, key, value, [=](int res) + { + finish(res); + }); + } + }); +} + +void kv_op_t::create_root() +{ + // if the block does not exist (is empty) - it should be the root block. + // in this case we just create a new root leaf block. + // if a referenced non-root block is empty, we just return an error. + if (cur_block != 0 || db->next_free != 0) + { + finish(-EILSEQ); + return; + } + db->next_free += db->kv_block_size; + auto blk = &db->block_cache[0]; + blk->type = KV_LEAF; + blk->offset = 0; + blk->data[key] = value; + blk->set_data_size(); + write_block(db, blk, [=](int res) + { + if (res == -EINTR) + update(); + else + finish(res); + }); +} + +void kv_op_t::resume_split() +{ + // We hit a block which we started to split, but didn't finish splitting + // Generally it shouldn't happen, but it MAY happen if a K/V client dies + // In this case we want to finish the split and retry update from the beginning + if (path.size() == 1) + { + // It shouldn't be the root block because we don't split it via INT_SPLIT/LEAF_SPLIT + finish(-EILSEQ); + return; + } + auto blk = &db->block_cache.at(cur_block); + update_block( + path.size()-2, false, blk->right_half, + std::string((char*)&blk->right_half_block, sizeof(blk->right_half_block)), + [=](int res) + { + if (res < 0) + { + finish(res); + return; + } + update(); + } + ); +} + +void kv_op_t::update_block(int path_pos, bool is_delete, const std::string & key, const std::string & value, std::function cb) +{ + auto blk = &db->block_cache.at(path[path_pos]); + if (blk->updating) + { + // Wait if block is being modified + db->continue_update.emplace(blk->offset, [=]() { update_block(path_pos, is_delete, key, value, cb); }); + return; + } + if ((blk->type == KV_LEAF_SPLIT || blk->type == KV_INT_SPLIT) && prev_key_lt <= blk->right_half) + { + // This is a block split during previous updates + // Its parent is already updated because prev_key_lt <= blk->right_half + // That means we can safely remove the "split reference" + blk->type = KV_LEAF; + blk->key_lt = blk->right_half; + blk->right_half = ""; + blk->right_half_block = 0; + } + uint32_t rm_size = 0; + auto d_it = blk->data.find(key); + if (d_it != blk->data.end()) + { + if (!is_delete && d_it->second == value) + { + // Nothing to do + cb(0); + return; + } + rm_size = kv_block_t::kv_size(d_it->first, d_it->second); + } + else if (is_delete) + { + // Nothing to do + cb(0); + return; + } + if (cas_cb && path_pos == path.size()-1 && !cas_cb(d_it != blk->data.end() ? 0 : -ENOENT, d_it != blk->data.end() ? d_it->second : "")) + { + // CAS failure + cb(-EAGAIN); + return; + } + if (!is_delete) + { + blk->data[key] = value; + blk->data_size = blk->data_size + kv_block_t::kv_size(key, value) - rm_size; + } + else + { + // FIXME We may want to merge blocks at some point, in that case we should: + // - read both merged blocks + // - add "merged into XX" reference to the second block and write it out + // - add entries from the second block to the first one and write it out + // - remove the reference from the parent block to the second block + // - zero out the second block + blk->data.erase(key); + blk->data_size -= rm_size; + } + blk->updating = true; + if (is_delete || blk->data_size < db->kv_block_size) + { + // No need to split the block => just modify and write it + write_block(db, blk, [=](int res) + { + db->stop_updating(blk); + if (res == -EINTR) + { + update(); + } + else + { + cb(res); + } + }); + return; + } + // New item doesn't fit. The most interesting case. + // Write the right half into a new block + auto separator = find_splitter(db, blk); + auto right_blk = create_new_block(db, blk, separator, true); + write_new_block(db, right_blk, [=](int res) + { + if (res < 0) + { + db->stop_updating(blk); + cb(res); + return; + } + if (!blk->offset) + { + // Split the root block + // Write the left half into a new block + auto left_blk = create_new_block(db, blk, separator, false); + write_new_block(db, left_blk, [=](int res) + { + if (res < 0) + { + db->stop_updating(blk); + cb(res); + return; + } + // Write references to halves into the root block + blk->type = KV_INT; + blk->data.clear(); + blk->data[""] = std::string((char*)&left_blk->offset, sizeof(left_blk->offset)); + blk->data[separator] = std::string((char*)&right_blk->offset, sizeof(right_blk->offset)); + blk->set_data_size(); + write_block(db, blk, [=](int res) + { + db->stop_updating(blk); + if (res == -EINTR) + { + // CAS failure - zero garbage left_blk and right_blk and retry from the beginning + db->block_cache.erase(blk->offset); + clear_block(db, left_blk->offset, 0, [=](int res) + { + if (res < 0) + { + cb(res); + return; + } + clear_block(db, right_blk->offset, 0, [=](int res) + { + if (res < 0) + { + cb(res); + return; + } + update(); + }); + }); + } + else + { + cb(res); + } + }); + }); + } + else + { + if (path_pos == 0) + { + // Block number zero should always be the root block + cb(-EILSEQ); + return; + } + // Split a non-root block + blk->type = blk->type == KV_INT ? KV_INT_SPLIT : KV_LEAF_SPLIT; + blk->right_half = separator; + blk->right_half_block = right_blk->offset; + blk->data.erase(blk->data.lower_bound(separator), blk->data.end()); + blk->set_data_size(); + write_block(db, blk, [=](int res) + { + db->stop_updating(blk); + if (res == -EINTR) + { + // CAS failure - zero garbage right_blk and retry from the beginning + db->block_cache.erase(blk->offset); + clear_block(db, right_blk->offset, 0, [=](int res) + { + if (res < 0) + { + cb(res); + return; + } + update(); + }); + } + else if (res < 0) + { + cb(res); + } + else + { + // Add a reference to the parent block + // Do not cleanup anything on failure because stored right_blk is already referenced + update_block(path_pos-1, false, separator, std::string((char*)&right_blk->offset, sizeof(right_blk->offset)), cb); + } + }); + } + }); +} + +void kv_op_t::next() +{ + if (opcode != KV_LIST || !started || done) + { + return; + } + if (path.size() == 0 || path[path.size()-1] != cur_block) + { + if (!cur_block) + path.clear(); + path.push_back(cur_block); + } + get_block(db, cur_block, recheck_policy, [=](int res, bool updated) + { + next_handle_block(res, updated); + }); +} + +void kv_op_t::next_handle_block(int res, bool updated) +{ + res = handle_block(res, updated, false); + if (res == -EAGAIN) + { + next(); + } + else if (res == -ENOTBLK) + { + finish(cur_block == 0 ? -ENOENT : -EILSEQ); + } + else if (res < 0) + { + finish(res); + } + else + { + // OK, leaf block found + recheck_policy = KV_RECHECK_NONE; + next_get(); + } +} + +void kv_op_t::next_get() +{ + auto blk = &db->block_cache.at(cur_block); + auto kv_it = blk->data.lower_bound(key); + if (kv_it != blk->data.end() && kv_it->first == key && skip_equal) + { + kv_it++; + } + if (kv_it != blk->data.end()) + { + // Send this item + this->res = 0; + this->key = kv_it->first; + this->value = kv_it->second; + skip_equal = true; + callback(this); + } + // Find next block + else if (blk->type == KV_LEAF_SPLIT) + { + // Left half finished, go to the right + recheck_policy = KV_RECHECK_LEAF; + key = blk->right_half; + prev_key_ge = blk->right_half; + prev_key_lt = blk->key_lt; + cur_block = blk->right_half_block; + next(); + } + else + { + next_go_up(); + } +} + +void kv_op_t::next_go_up() +{ + auto blk = &db->block_cache.at(cur_block); + while (true) + { + if (blk->key_lt == "" || path.size() <= 1) + { + // End of the listing + finish(-ENOENT); + return; + } + key = blk->key_lt; + skip_equal = false; + path.pop_back(); + cur_block = path[path.size()-1]; + recheck_policy = KV_RECHECK_LEAF; + // Check if we can resume listing from the next key + auto pb_it = db->block_cache.find(cur_block); + if (pb_it == db->block_cache.end()) + { + // Block is absent in cache, recheck from the beginning + prev_key_ge = prev_key_lt = ""; + cur_block = 0; + next(); + return; + } + blk = &pb_it->second; + // Go down if block did not end + if (blk->key_lt == "" || key < blk->key_lt) + { + prev_key_ge = blk->key_ge; + prev_key_lt = blk->key_lt; + next_handle_block(0, false); + return; + } + } +} + +kv_dbw_t::kv_dbw_t(cluster_client_t *cli) +{ + db = new kv_db_t(); + db->cli = cli; +} + +kv_dbw_t::~kv_dbw_t() +{ + delete db; +} + +void kv_dbw_t::open(inode_t inode_id, uint32_t kv_block_size, std::function cb) +{ + db->open(inode_id, kv_block_size, cb); +} + +uint64_t kv_dbw_t::get_size() +{ + return db->next_free; +} + +void kv_dbw_t::close(std::function cb) +{ + db->close(cb); +} + +void kv_dbw_t::get(const std::string & key, std::function cb) +{ + auto *op = new kv_op_t; + op->db = db; + op->opcode = KV_GET; + op->key = key; + op->callback = [cb](kv_op_t *op) + { + cb(op->res, op->value); + delete op; + }; + op->exec(); +} + +void kv_dbw_t::set(const std::string & key, const std::string & value, std::function cb, + std::function cas_compare) +{ + auto *op = new kv_op_t; + op->db = db; + op->opcode = KV_SET; + op->key = key; + op->value = value; + if (cas_compare) + { + op->cas_cb = cas_compare; + } + op->callback = [cb](kv_op_t *op) + { + cb(op->res); + delete op; + }; + op->exec(); +} + +void kv_dbw_t::del(const std::string & key, std::function cb, + std::function cas_compare) +{ + auto *op = new kv_op_t; + op->db = db; + op->opcode = KV_SET; + op->key = key; + if (cas_compare) + { + op->cas_cb = cas_compare; + } + op->callback = [cb](kv_op_t *op) + { + cb(op->res); + delete op; + }; + op->exec(); +} + +void* kv_dbw_t::list_start(const std::string & start) +{ + auto *op = new kv_op_t; + op->db = db; + op->opcode = KV_LIST; + op->key = start; + op->exec(); + return op; +} + +void kv_dbw_t::list_next(void *handle, std::function cb) +{ + kv_op_t *op = (kv_op_t*)handle; + if (cb) + { + op->callback = [cb](kv_op_t *op) + { + cb(op->res, op->key, op->value); + }; + } + op->next(); +} + +void kv_dbw_t::list_close(void *handle) +{ + kv_op_t *op = (kv_op_t*)handle; + delete op; +} diff --git a/src/kv_db.h b/src/kv_db.h new file mode 100644 index 00000000..913e1e4f --- /dev/null +++ b/src/kv_db.h @@ -0,0 +1,34 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 (see README.md for details) +// +// Vitastor shared key/value database +// Parallel optimistic B-Tree O:-) + +#pragma once + +#include "cluster_client.h" + +struct kv_db_t; + +struct kv_dbw_t +{ + kv_dbw_t(cluster_client_t *cli); + ~kv_dbw_t(); + + void open(inode_t inode_id, uint32_t kv_block_size, std::function cb); + void close(std::function cb); + + uint64_t get_size(); + + void get(const std::string & key, std::function cb); + void set(const std::string & key, const std::string & value, std::function cb, + std::function cas_compare = NULL); + void del(const std::string & key, std::function cb, + std::function cas_compare = NULL); + + void* list_start(const std::string & start); + void list_next(void *handle, std::function cb); + void list_close(void *handle); + + kv_db_t *db; +};