diff --git a/src/cmd.cpp b/src/cmd.cpp index 7fb83af5..60d9ca39 100644 --- a/src/cmd.cpp +++ b/src/cmd.cpp @@ -13,6 +13,7 @@ #include "epoll_manager.h" #include "cluster_client.h" #include "pg_states.h" +#include "base64.h" #define RM_LISTING 1 #define RM_REMOVING 2 @@ -33,6 +34,8 @@ struct rm_pg_t struct rm_inode_t; struct snap_merger_t; +struct snap_flattener_t; +struct snap_remover_t; class cli_tool_t { @@ -48,9 +51,12 @@ protected: cluster_client_t *cli = NULL; ring_consumer_t consumer; bool started = false; + int waiting = 0; rm_inode_t *remover = NULL; snap_merger_t *merger = NULL; + snap_flattener_t *flattener = NULL; + snap_remover_t *snap_remover = NULL; public: static json11::Json::object parse_args(int narg, const char *args[]) @@ -107,6 +113,69 @@ public: friend struct rm_inode_t; friend struct snap_merger_t; + friend struct snap_flattener_t; + friend struct snap_remover_t; + + void change_parent(inode_t cur, inode_t new_parent) + { + auto cur_cfg_it = cli->st_cli.inode_config.find(cur); + if (cur_cfg_it == cli->st_cli.inode_config.end()) + { + fprintf(stderr, "Inode 0x%lx disappeared\n", cur); + exit(1); + } + inode_config_t *cur_cfg = &cur_cfg_it->second; + std::string cur_cfg_key = base64_encode(cli->st_cli.etcd_prefix+ + "/config/inode/"+std::to_string(INODE_POOL(cur))+ + "/"+std::to_string(INODE_NO_POOL(cur))); + json11::Json::object cur_cfg_json = json11::Json::object { + { "name", cur_cfg->name }, + { "size", cur_cfg->size }, + }; + if (new_parent) + { + if (INODE_POOL(cur) != INODE_POOL(new_parent)) + cur_cfg_json["parent_pool"] = (uint64_t)INODE_POOL(new_parent); + cur_cfg_json["parent_id"] = (uint64_t)INODE_NO_POOL(new_parent); + } + if (cur_cfg->readonly) + { + cur_cfg_json["readonly"] = true; + } + waiting++; + cli->st_cli.etcd_txn(json11::Json::object { + { "compare", json11::Json::array { + json11::Json::object { + { "target", "MOD" }, + { "key", cur_cfg_key }, + { "result", "LESS" }, + { "mod_revision", cur_cfg->mod_revision }, + }, + } }, + { "success", json11::Json::array { + json11::Json::object { + { "request_put", json11::Json::object { + { "key", cur_cfg_key }, + { "value", base64_encode(json11::Json(cur_cfg_json).dump()) }, + } } + }, + } }, + }, ETCD_SLOW_TIMEOUT, [this, cur_cfg](std::string err, json11::Json res) + { + if (err != "") + { + fprintf(stderr, "Error changing parent of %s: %s\n", cur_cfg->name.c_str(), err.c_str()); + exit(1); + } + if (!res["succeeded"].bool_value()) + { + fprintf(stderr, "Inode %s was modified during snapshot deletion\n", cur_cfg->name.c_str()); + exit(1); + } + waiting--; + ringloop->wakeup(); + }); + } }; struct rm_inode_t @@ -120,6 +189,7 @@ struct rm_inode_t uint64_t total_count = 0, total_done = 0, total_prev_pct = 0; uint64_t pgs_to_list = 0; bool lists_done = false; + bool finished = false; void start_delete() { @@ -203,6 +273,10 @@ struct rm_inode_t void continue_delete() { + if (finished) + { + return; + } if (parent->list_first && !lists_done) { return; @@ -232,9 +306,14 @@ struct rm_inode_t if (lists_done && !lists.size()) { printf("Done, inode %lu in pool %u removed\n", (inode & ((1l << (64-POOL_ID_BITS)) - 1)), pool_id); - exit(0); + finished = true; } } + + bool is_done() + { + return finished; + } }; struct snap_rw_op_t @@ -246,6 +325,19 @@ struct snap_rw_op_t uint32_t start = 0, end = 0; }; +static inode_config_t* get_inode_cfg(cluster_client_t *cli, const std::string & name) +{ + for (auto & ic: cli->st_cli.inode_config) + { + if (ic.second.name == name) + { + return &ic.second; + } + } + fprintf(stderr, "Layer %s not found\n", name.c_str()); + exit(1); +} + // Layer merge is the base for multiple operations: // 1) Delete snapshot "up" = merge child layer into the parent layer, remove the child // and rename the parent to the child @@ -255,15 +347,11 @@ struct snap_merger_t { cli_tool_t *parent; + // -- CONFIGURATION -- // merge from..to into target (target may be one of from..to) std::string from_name, to_name, target_name; - // inode=>rank (bigger rank means child layers) std::map sources; - // target to merge data into - inode_t target; - // rank of the target - int target_rank; // delete merged source inode data during merge bool delete_source = false; // use CAS writes (0 = never, 1 = auto, 2 = always) @@ -273,6 +361,9 @@ struct snap_merger_t // interval between fsyncs int fsync_interval = 128; + // -- STATE -- + inode_t target; + int target_rank; bool inside_continue = false; int state = 0; int lists_todo = 0; @@ -288,25 +379,12 @@ struct snap_merger_t int deleted_unsynced = 0; uint64_t processed = 0, to_process = 0; - inode_config_t* get_inode_cfg(const std::string & name) - { - for (auto & ic: parent->cli->st_cli.inode_config) - { - if (ic.second.name == name) - { - return &ic.second; - } - } - fprintf(stderr, "Layer %s not found\n", name.c_str()); - exit(1); - } - void start_merge() { check_delete_source = delete_source || check_delete_source; - inode_config_t *from_cfg = get_inode_cfg(from_name); - inode_config_t *to_cfg = get_inode_cfg(to_name); - inode_config_t *target_cfg = target_name == "" ? from_cfg : get_inode_cfg(target_name); + inode_config_t *from_cfg = get_inode_cfg(parent->cli, from_name); + inode_config_t *to_cfg = get_inode_cfg(parent->cli, to_name); + inode_config_t *target_cfg = target_name == "" ? from_cfg : get_inode_cfg(parent->cli, target_name); if (to_cfg->num == from_cfg->num) { fprintf(stderr, "Only one layer specified, nothing to merge\n"); @@ -324,7 +402,7 @@ struct snap_merger_t auto it = parent->cli->st_cli.inode_config.find(cur->parent_id); if (it == parent->cli->st_cli.inode_config.end()) { - fprintf(stderr, "Parent inode of layer %s (%lx) not found\n", cur->name.c_str(), cur->parent_id); + fprintf(stderr, "Parent inode of layer %s (id %ld) not found\n", cur->name.c_str(), cur->parent_id); exit(1); } cur = &it->second; @@ -424,6 +502,11 @@ struct snap_merger_t } } + bool is_done() + { + return state == 7; + } + void continue_merge() { if (state == 1) @@ -438,6 +521,8 @@ struct snap_merger_t goto resume_5; else if (state == 6) goto resume_6; + else if (state == 7) + return; // First list lower layers list_layers(true); state = 1; @@ -517,7 +602,7 @@ struct snap_merger_t resume_6: // Done printf("Done, layers from %s to %s merged into %s\n", from_name.c_str(), to_name.c_str(), target_name.c_str()); - exit(0); + state = 7; } void list_layers(bool lower) @@ -789,6 +874,335 @@ struct snap_merger_t } }; +// Flatten a layer: merge all parents into a layer and break the connection completely +struct snap_flattener_t +{ + cli_tool_t *parent; + + // target to flatten + std::string target_name; + // writers are stopped, we can safely change writable layers + bool writers_stopped = false; + // use CAS writes (0 = never, 1 = auto, 2 = always) + int use_cas = 1; + // interval between fsyncs + int fsync_interval = 128; + + std::string top_parent_name; + inode_t target_id = 0; + int state = 0; + snap_merger_t *merger = NULL; + + void get_merge_parents() + { + // Get all parents of target + inode_config_t *target_cfg = get_inode_cfg(parent->cli, target_name); + target_id = target_cfg->num; + std::vector chain_list; + inode_config_t *cur = target_cfg; + chain_list.push_back(cur->num); + while (cur->parent_id != 0 && cur->parent_id != target_cfg->num) + { + auto it = parent->cli->st_cli.inode_config.find(cur->parent_id); + if (it == parent->cli->st_cli.inode_config.end()) + { + fprintf(stderr, "Parent inode of layer %s (id %ld) not found\n", cur->name.c_str(), cur->parent_id); + exit(1); + } + cur = &it->second; + chain_list.push_back(cur->num); + } + if (cur->parent_id != 0) + { + fprintf(stderr, "Layer %s has a loop in parents\n", target_name.c_str()); + exit(1); + } + top_parent_name = cur->name; + } + + bool is_done() + { + return state == 5; + } + + void loop() + { + if (state == 1) + goto resume_1; + else if (state == 2) + goto resume_2; + else if (state == 3) + goto resume_3; + // Get parent layers + get_merge_parents(); + // Start merger + merger = new snap_merger_t(); + merger->parent = parent; + merger->from_name = top_parent_name; + merger->to_name = target_name; + merger->target_name = target_name; + merger->delete_source = false; + merger->use_cas = this->use_cas; + merger->fsync_interval = this->fsync_interval; + merger->start_merge(); + // Wait for it + while (!merger->is_done()) + { + state = 1; + return; +resume_1: + merger->continue_merge_reent(); + } + delete merger; + // Change parent + parent->change_parent(target_id, 0); + // Wait for it to complete + state = 2; +resume_2: + if (parent->waiting > 0) + return; + state = 3; +resume_3: + // Done + return; + } +}; + +// Remove layer(s): similar to merge, but alters metadata and processes multiple merge targets +// If the requested snapshot chain has only 1 child and --writers-stopped is specified +// then that child can be merged "down" into the snapshot chain. +// Otherwise we iterate over all children of the chain, merge removed parents into them, +// and delete children afterwards. +// +// Example: +// +// - - - - +// \ \ \- +// \ \- +// \- +// +// 1) Merge .. to +// 2) Set parent to +// 3) Variant #1, trickier, beneficial when has less data than +// (not implemented yet): +// - Merge .. to +// - Rename to +// It can be done without extra precautions if is a read-only layer itself +// Otherwise it should be either done offline or by pausing writers +// - is now deleted, repeat deletion with .. +// 4) Variant #2, simple: +// - Repeat 1-2 with +// - Delete +// 5) Process all other children +struct snap_remover_t +{ + cli_tool_t *parent; + + // remove from..to + std::string from_name, to_name; + // writers are stopped, we can safely change writable layers + bool writers_stopped = false; + // use CAS writes (0 = never, 1 = auto, 2 = always) + int use_cas = 1; + // interval between fsyncs + int fsync_interval = 128; + + std::vector merge_children; + std::vector chain_list; + inode_t new_parent = 0; + int state = 0; + int current_child = 0; + snap_merger_t *merger = NULL; + rm_inode_t *remover = NULL; + + void get_merge_children() + { + // Get all children of from..to + inode_config_t *from_cfg = get_inode_cfg(parent->cli, from_name); + inode_config_t *to_cfg = get_inode_cfg(parent->cli, to_name); + // Check that to_cfg is actually a child of from_cfg + // FIXME de-copypaste the following piece of code with snap_merger_t + inode_config_t *cur = to_cfg; + chain_list.push_back(cur->num); + while (cur->num != from_cfg->num && cur->parent_id != 0) + { + auto it = parent->cli->st_cli.inode_config.find(cur->parent_id); + if (it == parent->cli->st_cli.inode_config.end()) + { + fprintf(stderr, "Parent inode of layer %s (id %ld) not found\n", cur->name.c_str(), cur->parent_id); + exit(1); + } + cur = &it->second; + chain_list.push_back(cur->num); + } + if (cur->num != from_cfg->num) + { + fprintf(stderr, "Layer %s is not a child of %s\n", to_name.c_str(), from_name.c_str()); + exit(1); + } + new_parent = from_cfg->parent_id; + // Calculate ranks + std::map sources; + int i = chain_list.size()-1; + for (inode_t item: chain_list) + { + sources[item] = i--; + } + for (auto & ic: parent->cli->st_cli.inode_config) + { + if (!ic.second.parent_id) + { + continue; + } + auto it = sources.find(ic.second.parent_id); + if (it != sources.end() && sources.find(ic.second.num) == sources.end()) + { + merge_children.push_back(ic.second.num); + } + } + } + + bool is_done() + { + return state == 5; + } + + void loop() + { + if (state == 1) + goto resume_1; + else if (state == 2) + goto resume_2; + else if (state == 3) + goto resume_3; + else if (state == 4) + goto resume_4; + else if (state == 5) + goto resume_5; + // Get children to merge + get_merge_children(); + // Merge children one by one + for (current_child = 0; current_child < merge_children.size(); current_child++) + { + start_merge_child(); + while (!merger->is_done()) + { + state = 1; + return; +resume_1: + merger->continue_merge_reent(); + } + delete merger; + parent->change_parent(merge_children[current_child], new_parent); + state = 2; +resume_2: + if (parent->waiting > 0) + return; + } + // Delete sources + for (current_child = 0; current_child < chain_list.size(); current_child++) + { + start_delete_source(); + while (!remover->is_done()) + { + state = 3; + return; +resume_3: + remover->continue_delete(); + } + delete remover; + delete_inode_config(chain_list[current_child]); + state = 4; +resume_4: + if (parent->waiting > 0) + return; + } +resume_5: + // Done + return; + } + + void delete_inode_config(inode_t cur) + { + auto cur_cfg_it = parent->cli->st_cli.inode_config.find(cur); + if (cur_cfg_it == parent->cli->st_cli.inode_config.end()) + { + fprintf(stderr, "Inode 0x%lx disappeared\n", cur); + exit(1); + } + inode_config_t *cur_cfg = &cur_cfg_it->second; + std::string cur_cfg_key = base64_encode(parent->cli->st_cli.etcd_prefix+ + "/config/inode/"+std::to_string(INODE_POOL(cur))+ + "/"+std::to_string(INODE_NO_POOL(cur))); + parent->waiting++; + parent->cli->st_cli.etcd_txn(json11::Json::object { + { "compare", json11::Json::array { + json11::Json::object { + { "target", "MOD" }, + { "key", cur_cfg_key }, + { "result", "LESS" }, + { "mod_revision", cur_cfg->mod_revision }, + }, + } }, + { "success", json11::Json::array { + json11::Json::object { + { "request_delete_range", json11::Json::object { + { "key", cur_cfg_key }, + } } + }, + } }, + }, ETCD_SLOW_TIMEOUT, [this, cur_cfg](std::string err, json11::Json res) + { + if (err != "") + { + fprintf(stderr, "Error deleting %s: %s\n", cur_cfg->name.c_str(), err.c_str()); + exit(1); + } + if (!res["succeeded"].bool_value()) + { + fprintf(stderr, "Inode %s was modified during deletion\n", cur_cfg->name.c_str()); + exit(1); + } + parent->waiting--; + parent->ringloop->wakeup(); + }); + } + + void start_merge_child() + { + auto target = parent->cli->st_cli.inode_config.find(merge_children[current_child]); + if (target == parent->cli->st_cli.inode_config.end()) + { + fprintf(stderr, "Inode %ld disappeared\n", merge_children[current_child]); + exit(1); + } + merger = new snap_merger_t(); + merger->parent = parent; + merger->from_name = from_name; + merger->to_name = target->second.name; + merger->target_name = target->second.name; + merger->delete_source = false; + merger->use_cas = this->use_cas; + merger->fsync_interval = this->fsync_interval; + merger->start_merge(); + } + + void start_delete_source() + { + auto source = parent->cli->st_cli.inode_config.find(chain_list[current_child]); + if (source == parent->cli->st_cli.inode_config.end()) + { + fprintf(stderr, "Inode %ld disappeared\n", chain_list[current_child]); + exit(1); + } + remover = new rm_inode_t(); + remover->parent = parent; + remover->inode = chain_list[current_child]; + remover->pool_id = INODE_POOL(remover->inode); + remover->start_delete(); + } +}; + void cli_tool_t::run(json11::Json cfg) { json11::Json::array cmd = cfg["command"].array_items(); @@ -833,6 +1247,45 @@ void cli_tool_t::run(json11::Json cfg) if (!cfg["cas"].is_null()) merger->use_cas = cfg["cas"].uint64_value() ? 2 : 0; } + else if (cmd[0] == "flatten") + { + // Merge layer data without affecting metadata + flattener = new snap_flattener_t(); + flattener->parent = this; + flattener->target_name = cmd[1].string_value(); + if (flattener->target_name == "") + { + fprintf(stderr, "Layer to flatten argument is missing\n"); + exit(1); + } + flattener->fsync_interval = cfg["fsync-interval"].uint64_value(); + if (!flattener->fsync_interval) + flattener->fsync_interval = 128; + if (!cfg["cas"].is_null()) + flattener->use_cas = cfg["cas"].uint64_value() ? 2 : 0; + } + else if (cmd[0] == "snap-rm") + { + // Remove multiple snapshots and rebase their children + snap_remover = new snap_remover_t(); + snap_remover->parent = this; + snap_remover->from_name = cmd[1].string_value(); + snap_remover->to_name = cmd[2].string_value(); + if (snap_remover->from_name == "") + { + fprintf(stderr, "Layer to remove argument is missing\n"); + exit(1); + } + if (snap_remover->to_name == "") + { + snap_remover->to_name = snap_remover->from_name; + } + snap_remover->fsync_interval = cfg["fsync-interval"].uint64_value(); + if (!snap_remover->fsync_interval) + snap_remover->fsync_interval = 128; + if (!cfg["cas"].is_null()) + snap_remover->use_cas = cfg["cas"].uint64_value() ? 2 : 0; + } else { fprintf(stderr, "unknown command: %s\n", cmd[0].string_value().c_str()); @@ -878,6 +1331,14 @@ void cli_tool_t::start_work() { merger->start_merge(); } + else if (flattener) + { + flattener->loop(); + } + else if (snap_remover) + { + snap_remover->loop(); + } started = true; } @@ -886,10 +1347,34 @@ void cli_tool_t::continue_work() if (remover) { remover->continue_delete(); + if (remover->is_done()) + { + exit(0); + } } else if (merger) { merger->continue_merge_reent(); + if (merger->is_done()) + { + exit(0); + } + } + else if (flattener) + { + flattener->loop(); + if (flattener->is_done()) + { + exit(0); + } + } + else if (snap_remover) + { + snap_remover->loop(); + if (snap_remover->is_done()) + { + exit(0); + } } } diff --git a/src/osd_id.h b/src/osd_id.h index 2daf0b25..b14f11d3 100644 --- a/src/osd_id.h +++ b/src/osd_id.h @@ -9,6 +9,7 @@ #define POOL_ID_MAX 0x10000 #define POOL_ID_BITS 16 #define INODE_POOL(inode) (pool_id_t)((inode) >> (64 - POOL_ID_BITS)) +#define INODE_NO_POOL(inode) (inode_t)(inode & ((1l << (64-POOL_ID_BITS)) - 1)) // Pool ID is 16 bits long typedef uint32_t pool_id_t;