diff --git a/src/cluster_client.cpp b/src/cluster_client.cpp index f8567fd5..9050b6bd 100644 --- a/src/cluster_client.cpp +++ b/src/cluster_client.cpp @@ -6,7 +6,7 @@ #include "cluster_client_impl.h" #include "http_client.h" // json_is_true -cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config) +cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json config) { wb = new writeback_cache_t(); diff --git a/src/cluster_client.h b/src/cluster_client.h index 89ca4bfc..7f9d8e3f 100644 --- a/src/cluster_client.h +++ b/src/cluster_client.h @@ -123,7 +123,7 @@ public: json11::Json::object cli_config, file_config, etcd_global_config; json11::Json::object config; - cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config); + cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json config); ~cluster_client_t(); void execute(cluster_op_t *op); void execute_raw(osd_num_t osd_num, osd_op_t *op); diff --git a/src/kv_cli.cpp b/src/kv_cli.cpp index 60391b57..deebaedc 100644 --- a/src/kv_cli.cpp +++ b/src/kv_cli.cpp @@ -30,11 +30,12 @@ public: char *cur_cmd = NULL; int cur_cmd_size = 0, cur_cmd_alloc = 0; bool finished = false, eof = false; + json11::Json::object cfg; ~kv_cli_t(); static json11::Json::object parse_args(int narg, const char *args[]); - void run(json11::Json cfg); + void run(const json11::Json::object & cfg); void read_cmd(); void next_cmd(); void handle_cmd(const std::string & cmd, std::function cb); @@ -86,7 +87,7 @@ json11::Json::object kv_cli_t::parse_args(int narg, const char *args[]) return cfg; } -void kv_cli_t::run(json11::Json cfg) +void kv_cli_t::run(const json11::Json::object & cfg) { // Create client ringloop = new ring_loop_t(512); @@ -229,7 +230,8 @@ void kv_cli_t::handle_cmd(const std::string & cmd, std::function cb) cb(); return; } - db->open(INODE_WITH_POOL(pool_id, inode_id), kv_block_size, [=](int res) + cfg["kv_block_size"] = (uint64_t)kv_block_size; + db->open(INODE_WITH_POOL(pool_id, inode_id), cfg, [=](int res) { if (res < 0) fprintf(stderr, "Error opening index: %s (code %d)\n", strerror(-res), res); @@ -238,6 +240,31 @@ void kv_cli_t::handle_cmd(const std::string & cmd, std::function cb) cb(); }); } + else if (opname == "config") + { + auto pos2 = cmd.find_first_of(" \t", pos+1); + if (pos2 == std::string::npos) + { + fprintf(stderr, "Usage: config \n"); + cb(); + return; + } + auto key = trim(cmd.substr(pos+1, pos2-pos-1)); + auto value = parse_size(trim(cmd.substr(pos2+1))); + if (key != "kv_memory_limit" && + key != "kv_evict_max_misses" && + key != "kv_evict_attempts_per_level" && + key != "kv_evict_unused_age") + { + fprintf(stderr, "Allowed properties: kv_memory_limit, kv_evict_max_misses, kv_evict_attempts_per_level, kv_evict_unused_age\n"); + } + else + { + cfg[key] = value; + db->set_config(cfg); + } + cb(); + } else if (opname == "get" || opname == "set" || opname == "del") { if (opname == "get" || opname == "del") @@ -349,6 +376,7 @@ void kv_cli_t::handle_cmd(const std::string & cmd, std::function cb) fprintf( stderr, "Unknown operation: %s. Supported operations:\n" "open [block_size]\n" + "config \n" "get \nset \ndel \nlist [ [end]]\n" "close\nquit\n", opname.c_str() ); diff --git a/src/kv_db.cpp b/src/kv_db.cpp index 4aaba470..21e83756 100644 --- a/src/kv_db.cpp +++ b/src/kv_db.cpp @@ -38,6 +38,9 @@ #define KV_RECHECK_ALL 2 #define KV_RECHECK_RELOAD 3 +#define LEVEL_BITS 8 +#define NO_LEVEL_MASK (((uint64_t)1 << (64-LEVEL_BITS)) - 1) + struct __attribute__((__packed__)) kv_stored_block_t { uint64_t magic; @@ -103,8 +106,13 @@ struct kv_db_t uint32_t kv_block_size = 0; uint32_t ino_block_size = 0; bool immediate_commit = false; - uint64_t cache_max_blocks = 0; + uint64_t memory_limit = 128*1024*1024; + uint64_t evict_unused_age = 1000; + uint64_t evict_max_misses = 10; + uint64_t evict_attempts_per_level = 3; + uint64_t evict_unused_counter = 0; + uint64_t cache_max_blocks = 0; int base_block_level = 0; int usage_counter = 1; std::set block_levels; @@ -116,7 +124,8 @@ struct kv_db_t int active_ops = 0; std::function on_close; - void open(inode_t inode_id, uint32_t kv_block_size, std::function cb); + void open(inode_t inode_id, json11::Json cfg, std::function cb); + void set_config(json11::Json cfg); void close(std::function cb); void find_size(uint64_t min, uint64_t max, int phase, std::function cb); @@ -267,7 +276,7 @@ bool kv_block_t::serialize(uint8_t *data, int size) return true; } -void kv_db_t::open(inode_t inode_id, uint32_t kv_block_size, std::function cb) +void kv_db_t::open(inode_t inode_id, json11::Json cfg, std::function cb) { if (block_cache.size() > 0) { @@ -282,6 +291,9 @@ void kv_db_t::open(inode_t inode_id, uint32_t kv_block_size, std::functionsecond; uint32_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks); + uint64_t kv_block_size = cfg["kv_block_size"].uint64_value(); + if (!kv_block_size) + kv_block_size = 4096; if ((pool_cfg.data_block_size*pg_data_size) % kv_block_size || kv_block_size < pool_cfg.bitmap_granularity) { @@ -293,6 +305,7 @@ void kv_db_t::open(inode_t inode_id, uint32_t kv_block_size, std::functionino_block_size = pool_cfg.data_block_size * pg_data_size; this->kv_block_size = kv_block_size; this->next_free = 0; + set_config(cfg); // Find index size with binary search find_size(0, 0, 1, [=](int res, uint64_t size) { @@ -306,6 +319,15 @@ void kv_db_t::open(inode_t inode_id, uint32_t kv_block_size, std::functionmemory_limit = cfg["kv_memory_limit"].is_null() ? 128*1024*1024 : cfg["kv_memory_limit"].uint64_value(); + this->evict_max_misses = cfg["kv_evict_max_misses"].is_null() ? 10 : cfg["kv_evict_max_misses"].uint64_value(); + this->evict_attempts_per_level = cfg["kv_evict_attempts_per_level"].is_null() ? 3 : cfg["kv_evict_attempts_per_level"].uint64_value(); + this->evict_unused_age = cfg["kv_evict_unused_age"].is_null() ? 1000 : cfg["kv_evict_unused_age"].uint64_value(); + this->cache_max_blocks = this->memory_limit / this->kv_block_size; +} + void kv_db_t::close(std::function cb) { if (active_ops <= 0) @@ -405,12 +427,12 @@ void kv_db_t::stop_updating(kv_block_t *blk) static void del_block_level(kv_db_t *db, kv_block_t *blk) { - db->block_levels.erase((((uint64_t)(db->base_block_level+blk->level) & 0xFFFF) << 48) | (blk->offset/db->kv_block_size)); + db->block_levels.erase((((uint64_t)(db->base_block_level+blk->level) & 0xFFFF) << (64-LEVEL_BITS)) | (blk->offset/db->kv_block_size)); } static void add_block_level(kv_db_t *db, kv_block_t *blk) { - db->block_levels.insert((((uint64_t)(db->base_block_level+blk->level) & 0xFFFF) << 48) | (blk->offset/db->kv_block_size)); + db->block_levels.insert((((uint64_t)(db->base_block_level+blk->level) & 0xFFFF) << (64-LEVEL_BITS)) | (blk->offset/db->kv_block_size)); } static void invalidate(kv_db_t *db, uint64_t offset, uint64_t version) @@ -436,9 +458,75 @@ static void invalidate(kv_db_t *db, uint64_t offset, uint64_t version) } } +static uint64_t get_max_level(kv_db_t *db) +{ + auto el_it = db->block_levels.end(); + if (el_it == db->block_levels.begin()) + { + return 0; + } + el_it--; + return (*el_it >> (64-LEVEL_BITS)); +} + +static void try_evict(kv_db_t *db) +{ + // Evict blocks from cache based on memory limit and block level + if (db->cache_max_blocks <= 10 || db->block_cache.size() <= db->cache_max_blocks) + { + return; + } + for (uint64_t evict_level = get_max_level(db); evict_level > 0; evict_level--) + { + // Do eviction attempts at random block positions per each block level + for (int attempt = 0; attempt < db->evict_attempts_per_level; attempt++) + { + auto start_it = db->block_levels.lower_bound(evict_level << (64-LEVEL_BITS)); + auto end_it = db->block_levels.lower_bound((evict_level+1) << (64-LEVEL_BITS)); + if (start_it == end_it) + continue; + end_it--; + if (start_it == end_it) + continue; + auto random_pos = *start_it + (lrand48() % (*end_it - *start_it)); + auto random_it = db->block_levels.lower_bound(random_pos); + int misses = 0; + bool wrapped = false; + while (db->block_cache.size() > db->cache_max_blocks && + (!wrapped || *random_it < random_pos) && + (db->evict_max_misses <= 0 || misses < db->evict_max_misses)) + { + auto b_it = db->block_cache.find((*random_it & NO_LEVEL_MASK) * db->kv_block_size); + auto blk = &b_it->second; + if (b_it != db->block_cache.end() && !blk->updating && + !blk->new_version && blk->usage < db->usage_counter) + { + db->block_cache.erase(b_it); + db->block_levels.erase(random_it++); + } + else + { + random_it++; + misses++; + } + if (random_it == db->block_levels.end() || (*random_it >> (64-LEVEL_BITS)) > evict_level) + { + if (wrapped) + break; + random_it = db->block_levels.lower_bound(evict_level << (64-LEVEL_BITS)); + wrapped = true; + } + } + if (db->block_cache.size() <= db->cache_max_blocks) + { + return; + } + } + } +} + static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_policy, std::function cb) { - // FIXME: Evict blocks from cache based on memory limit and block level auto b_it = db->block_cache.find(offset); if (b_it != db->block_cache.end() && (recheck_policy == KV_RECHECK_NONE || recheck_policy == KV_RECHECK_LEAF && b_it->second.type != KV_LEAF || @@ -474,6 +562,7 @@ static void get_block(kv_db_t *db, uint64_t offset, int cur_level, int recheck_p else { invalidate(db, op->offset, op->version); + try_evict(db); auto blk = &db->block_cache[op->offset]; int err = blk->parse((uint8_t*)op->iov.buf[0].iov_base, op->len); if (err == 0) @@ -508,6 +597,11 @@ void kv_op_t::exec() return; db->active_ops++; started = true; + if (++db->evict_unused_counter >= db->evict_unused_age) + { + db->evict_unused_counter = 0; + db->usage_counter++; + } cur_level = -db->base_block_level; if (opcode == KV_GET) get(); @@ -1233,9 +1327,14 @@ kv_dbw_t::~kv_dbw_t() delete db; } -void kv_dbw_t::open(inode_t inode_id, uint32_t kv_block_size, std::function cb) +void kv_dbw_t::open(inode_t inode_id, json11::Json cfg, std::function cb) { - db->open(inode_id, kv_block_size, cb); + db->open(inode_id, cfg, cb); +} + +void kv_dbw_t::set_config(json11::Json cfg) +{ + db->set_config(cfg); } uint64_t kv_dbw_t::get_size() diff --git a/src/kv_db.h b/src/kv_db.h index 913e1e4f..45a1cc21 100644 --- a/src/kv_db.h +++ b/src/kv_db.h @@ -15,7 +15,8 @@ struct kv_dbw_t kv_dbw_t(cluster_client_t *cli); ~kv_dbw_t(); - void open(inode_t inode_id, uint32_t kv_block_size, std::function cb); + void open(inode_t inode_id, json11::Json cfg, std::function cb); + void set_config(json11::Json cfg); void close(std::function cb); uint64_t get_size();