Rename cli_rm -> cli_rm_data, cli_snap_rm -> cli_rm
parent
839ec9e6e0
commit
e23296a327
|
@ -155,7 +155,7 @@ target_link_libraries(vitastor-nbd
|
|||
# vitastor-cli
|
||||
add_executable(vitastor-cli
|
||||
cli.cpp cli_alloc_osd.cpp cli_simple_offsets.cpp cli_df.cpp
|
||||
cli_ls.cpp cli_create.cpp cli_modify.cpp cli_flatten.cpp cli_merge.cpp cli_rm.cpp cli_snap_rm.cpp
|
||||
cli_ls.cpp cli_create.cpp cli_modify.cpp cli_flatten.cpp cli_merge.cpp cli_rm_data.cpp cli_rm.cpp
|
||||
)
|
||||
target_link_libraries(vitastor-cli
|
||||
vitastor_client
|
||||
|
|
684
src/cli_rm.cpp
684
src/cli_rm.cpp
|
@ -1,212 +1,566 @@
|
|||
// Copyright (c) Vitaliy Filippov, 2019+
|
||||
// License: VNPL-1.1 (see README.md for details)
|
||||
|
||||
#include <fcntl.h>
|
||||
#include "cli.h"
|
||||
#include "cluster_client.h"
|
||||
#include "base64.h"
|
||||
|
||||
#define RM_LISTING 1
|
||||
#define RM_REMOVING 2
|
||||
#define RM_END 3
|
||||
|
||||
struct rm_pg_t
|
||||
// Remove layer(s): similar to merge, but alters metadata and processes multiple merge targets
|
||||
//
|
||||
// Exactly one child of the requested layers may be merged using the "inverted" workflow,
|
||||
// where we merge it "down" into one of the "to-be-removed" layers and then rename the
|
||||
// "to-be-removed" layer to the child. It may be done either if all writers are stopped
|
||||
// before trying to delete layers (which is signaled by --writers-stopped) or if that child
|
||||
// is a read-only layer (snapshot) itself.
|
||||
//
|
||||
// This "inverted" workflow trades copying data of one of the deleted layers for copying
|
||||
// data of one child of the chain which is also a child of the "traded" layer. So we
|
||||
// choose the (parent,child) pair which has the largest difference between "parent" and
|
||||
// "child" inode sizes.
|
||||
//
|
||||
// All other children of the chain are processed by iterating though them, merging removed
|
||||
// parents into them and rebasing them to the last layer which isn't a member of the removed
|
||||
// chain.
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// <parent> - <from> - <layer 2> - <to> - <child 1>
|
||||
// \ \ \- <child 2>
|
||||
// \ \- <child 3>
|
||||
// \-<child 4>
|
||||
//
|
||||
// 1) Find optimal pair for the "reverse" scenario
|
||||
// Imagine that it's (<layer 2>, <child 1>) in this example
|
||||
// 2) Process all children except <child 1>:
|
||||
// - Merge <from>..<to> to <child 2>
|
||||
// - Set <child 2> parent to <parent>
|
||||
// - Repeat for others
|
||||
// 3) Process <child 1>:
|
||||
// - Merge <from>..<child 1> to <layer 2>
|
||||
// - Set <layer 2> parent to <parent>
|
||||
// - Rename <layer 2> to <child 1>
|
||||
// 4) Delete other layers of the chain (<from>, <to>)
|
||||
struct snap_remover_t
|
||||
{
|
||||
pg_num_t pg_num;
|
||||
osd_num_t rm_osd_num;
|
||||
std::set<object_id> objects;
|
||||
std::set<object_id>::iterator obj_pos;
|
||||
uint64_t obj_count = 0, obj_done = 0;
|
||||
cli_tool_t *parent;
|
||||
|
||||
// remove from..to
|
||||
std::string from_name, to_name;
|
||||
// writers are stopped, we can safely change writable layers
|
||||
bool writers_stopped = false;
|
||||
// use CAS writes (0 = never, 1 = auto, 2 = always)
|
||||
int use_cas = 1;
|
||||
// interval between fsyncs
|
||||
int fsync_interval = 128;
|
||||
|
||||
std::map<inode_t,int> sources;
|
||||
std::map<inode_t,uint64_t> inode_used;
|
||||
std::vector<inode_t> merge_children;
|
||||
std::vector<inode_t> chain_list;
|
||||
std::map<inode_t,int> inverse_candidates;
|
||||
inode_t inverse_parent = 0, inverse_child = 0;
|
||||
inode_t new_parent = 0;
|
||||
int state = 0;
|
||||
int in_flight = 0;
|
||||
};
|
||||
int current_child = 0;
|
||||
std::function<bool(void)> cb;
|
||||
|
||||
struct rm_inode_t
|
||||
{
|
||||
uint64_t inode = 0;
|
||||
pool_id_t pool_id = 0;
|
||||
uint64_t min_offset = 0;
|
||||
|
||||
cli_tool_t *parent = NULL;
|
||||
inode_list_t *lister = NULL;
|
||||
std::vector<rm_pg_t*> lists;
|
||||
uint64_t total_count = 0, total_done = 0, total_prev_pct = 0;
|
||||
uint64_t pgs_to_list = 0;
|
||||
bool lists_done = false;
|
||||
int state = 0;
|
||||
|
||||
void start_delete()
|
||||
bool is_done()
|
||||
{
|
||||
lister = parent->cli->list_inode_start(inode, [this](inode_list_t *lst,
|
||||
std::set<object_id>&& objects, pg_num_t pg_num, osd_num_t primary_osd, int status)
|
||||
return state == 9;
|
||||
}
|
||||
|
||||
void loop()
|
||||
{
|
||||
if (state == 1)
|
||||
goto resume_1;
|
||||
else if (state == 2)
|
||||
goto resume_2;
|
||||
else if (state == 3)
|
||||
goto resume_3;
|
||||
else if (state == 4)
|
||||
goto resume_4;
|
||||
else if (state == 5)
|
||||
goto resume_5;
|
||||
else if (state == 6)
|
||||
goto resume_6;
|
||||
else if (state == 7)
|
||||
goto resume_7;
|
||||
else if (state == 8)
|
||||
goto resume_8;
|
||||
else if (state == 9)
|
||||
goto resume_9;
|
||||
// Get children to merge
|
||||
get_merge_children();
|
||||
// Try to select an inode for the "inverse" optimized scenario
|
||||
// Read statistics from etcd to do it
|
||||
read_stats();
|
||||
state = 1;
|
||||
resume_1:
|
||||
if (parent->waiting > 0)
|
||||
return;
|
||||
choose_inverse_candidate();
|
||||
// Merge children one by one, except our "inverse" child
|
||||
for (current_child = 0; current_child < merge_children.size(); current_child++)
|
||||
{
|
||||
rm_pg_t *rm = new rm_pg_t((rm_pg_t){
|
||||
.pg_num = pg_num,
|
||||
.rm_osd_num = primary_osd,
|
||||
.objects = objects,
|
||||
.obj_count = objects.size(),
|
||||
.obj_done = 0,
|
||||
});
|
||||
if (min_offset == 0)
|
||||
if (merge_children[current_child] == inverse_child)
|
||||
continue;
|
||||
start_merge_child(merge_children[current_child], merge_children[current_child]);
|
||||
resume_2:
|
||||
while (!cb())
|
||||
{
|
||||
total_count += objects.size();
|
||||
state = 2;
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
for (object_id oid: objects)
|
||||
{
|
||||
if (oid.stripe >= min_offset)
|
||||
{
|
||||
total_count++;
|
||||
}
|
||||
}
|
||||
}
|
||||
rm->obj_pos = rm->objects.begin();
|
||||
lists.push_back(rm);
|
||||
if (parent->list_first)
|
||||
{
|
||||
parent->cli->list_inode_next(lister, 1);
|
||||
}
|
||||
if (status & INODE_LIST_DONE)
|
||||
{
|
||||
lists_done = true;
|
||||
}
|
||||
pgs_to_list--;
|
||||
continue_delete();
|
||||
});
|
||||
if (!lister)
|
||||
cb = NULL;
|
||||
parent->change_parent(merge_children[current_child], new_parent);
|
||||
state = 3;
|
||||
resume_3:
|
||||
if (parent->waiting > 0)
|
||||
return;
|
||||
}
|
||||
// Merge our "inverse" child into our "inverse" parent
|
||||
if (inverse_child != 0)
|
||||
{
|
||||
fprintf(stderr, "Failed to list inode %lu from pool %u objects\n", INODE_NO_POOL(inode), INODE_POOL(inode));
|
||||
start_merge_child(inverse_child, inverse_parent);
|
||||
resume_4:
|
||||
while (!cb())
|
||||
{
|
||||
state = 4;
|
||||
return;
|
||||
}
|
||||
cb = NULL;
|
||||
// Delete "inverse" child data
|
||||
start_delete_source(inverse_child);
|
||||
resume_5:
|
||||
while (!cb())
|
||||
{
|
||||
state = 5;
|
||||
return;
|
||||
}
|
||||
cb = NULL;
|
||||
// Delete "inverse" child metadata, rename parent over it,
|
||||
// and also change parent links of the previous "inverse" child
|
||||
rename_inverse_parent();
|
||||
state = 6;
|
||||
resume_6:
|
||||
if (parent->waiting > 0)
|
||||
return;
|
||||
}
|
||||
// Delete parents, except the "inverse" one
|
||||
for (current_child = 0; current_child < chain_list.size(); current_child++)
|
||||
{
|
||||
if (chain_list[current_child] == inverse_parent)
|
||||
continue;
|
||||
start_delete_source(chain_list[current_child]);
|
||||
resume_7:
|
||||
while (!cb())
|
||||
{
|
||||
state = 7;
|
||||
return;
|
||||
}
|
||||
cb = NULL;
|
||||
delete_inode_config(chain_list[current_child]);
|
||||
state = 8;
|
||||
resume_8:
|
||||
if (parent->waiting > 0)
|
||||
return;
|
||||
}
|
||||
state = 9;
|
||||
resume_9:
|
||||
// Done
|
||||
return;
|
||||
}
|
||||
|
||||
void get_merge_children()
|
||||
{
|
||||
// Get all children of from..to
|
||||
inode_config_t *from_cfg = parent->get_inode_cfg(from_name);
|
||||
inode_config_t *to_cfg = parent->get_inode_cfg(to_name);
|
||||
// Check that to_cfg is actually a child of from_cfg
|
||||
// FIXME de-copypaste the following piece of code with snap_merger_t
|
||||
inode_config_t *cur = to_cfg;
|
||||
chain_list.push_back(cur->num);
|
||||
while (cur->num != from_cfg->num && cur->parent_id != 0)
|
||||
{
|
||||
auto it = parent->cli->st_cli.inode_config.find(cur->parent_id);
|
||||
if (it == parent->cli->st_cli.inode_config.end())
|
||||
{
|
||||
fprintf(stderr, "Parent inode of layer %s (id %ld) not found\n", cur->name.c_str(), cur->parent_id);
|
||||
exit(1);
|
||||
}
|
||||
cur = &it->second;
|
||||
chain_list.push_back(cur->num);
|
||||
}
|
||||
if (cur->num != from_cfg->num)
|
||||
{
|
||||
fprintf(stderr, "Layer %s is not a child of %s\n", to_name.c_str(), from_name.c_str());
|
||||
exit(1);
|
||||
}
|
||||
pgs_to_list = parent->cli->list_pg_count(lister);
|
||||
parent->cli->list_inode_next(lister, parent->parallel_osds);
|
||||
}
|
||||
|
||||
void send_ops(rm_pg_t *cur_list)
|
||||
{
|
||||
if (parent->cli->msgr.osd_peer_fds.find(cur_list->rm_osd_num) ==
|
||||
parent->cli->msgr.osd_peer_fds.end())
|
||||
new_parent = from_cfg->parent_id;
|
||||
// Calculate ranks
|
||||
int i = chain_list.size()-1;
|
||||
for (inode_t item: chain_list)
|
||||
{
|
||||
// Initiate connection
|
||||
parent->cli->msgr.connect_peer(cur_list->rm_osd_num, parent->cli->st_cli.peer_states[cur_list->rm_osd_num]);
|
||||
return;
|
||||
sources[item] = i--;
|
||||
}
|
||||
while (cur_list->in_flight < parent->iodepth && cur_list->obj_pos != cur_list->objects.end())
|
||||
for (auto & ic: parent->cli->st_cli.inode_config)
|
||||
{
|
||||
if (cur_list->obj_pos->stripe >= min_offset)
|
||||
if (!ic.second.parent_id)
|
||||
{
|
||||
osd_op_t *op = new osd_op_t();
|
||||
op->op_type = OSD_OP_OUT;
|
||||
// Already checked that it exists above, but anyway
|
||||
op->peer_fd = parent->cli->msgr.osd_peer_fds.at(cur_list->rm_osd_num);
|
||||
op->req = (osd_any_op_t){
|
||||
.rw = {
|
||||
.header = {
|
||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||
.id = parent->cli->next_op_id(),
|
||||
.opcode = OSD_OP_DELETE,
|
||||
},
|
||||
.inode = cur_list->obj_pos->inode,
|
||||
.offset = cur_list->obj_pos->stripe,
|
||||
.len = 0,
|
||||
},
|
||||
};
|
||||
op->callback = [this, cur_list](osd_op_t *op)
|
||||
{
|
||||
cur_list->in_flight--;
|
||||
if (op->reply.hdr.retval < 0)
|
||||
{
|
||||
fprintf(stderr, "Failed to remove object %lx:%lx from PG %u (OSD %lu) (retval=%ld)\n",
|
||||
op->req.rw.inode, op->req.rw.offset,
|
||||
cur_list->pg_num, cur_list->rm_osd_num, op->reply.hdr.retval);
|
||||
}
|
||||
delete op;
|
||||
cur_list->obj_done++;
|
||||
total_done++;
|
||||
continue_delete();
|
||||
};
|
||||
cur_list->in_flight++;
|
||||
parent->cli->msgr.outbox_push(op);
|
||||
continue;
|
||||
}
|
||||
cur_list->obj_pos++;
|
||||
}
|
||||
}
|
||||
|
||||
void continue_delete()
|
||||
{
|
||||
if (parent->list_first && !lists_done)
|
||||
{
|
||||
return;
|
||||
}
|
||||
for (int i = 0; i < lists.size(); i++)
|
||||
{
|
||||
if (!lists[i]->in_flight && lists[i]->obj_pos == lists[i]->objects.end())
|
||||
auto it = sources.find(ic.second.parent_id);
|
||||
if (it != sources.end() && sources.find(ic.second.num) == sources.end())
|
||||
{
|
||||
delete lists[i];
|
||||
lists.erase(lists.begin()+i, lists.begin()+i+1);
|
||||
i--;
|
||||
if (!lists_done)
|
||||
merge_children.push_back(ic.second.num);
|
||||
if (ic.second.readonly || writers_stopped)
|
||||
{
|
||||
parent->cli->list_inode_next(lister, 1);
|
||||
inverse_candidates[ic.second.num] = it->second;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
send_ops(lists[i]);
|
||||
}
|
||||
}
|
||||
if (parent->progress && total_count > 0 && total_done*1000/total_count != total_prev_pct)
|
||||
{
|
||||
printf("\rRemoved %lu/%lu objects, %lu more PGs to list...", total_done, total_count, pgs_to_list);
|
||||
total_prev_pct = total_done*1000/total_count;
|
||||
}
|
||||
if (lists_done && !lists.size())
|
||||
{
|
||||
printf("Done, inode %lu in pool %u data removed\n", INODE_NO_POOL(inode), pool_id);
|
||||
state = 2;
|
||||
}
|
||||
}
|
||||
|
||||
bool loop()
|
||||
void read_stats()
|
||||
{
|
||||
if (state == 0)
|
||||
if (inverse_candidates.size() == 0)
|
||||
{
|
||||
start_delete();
|
||||
state = 1;
|
||||
return;
|
||||
}
|
||||
else if (state == 1)
|
||||
json11::Json::array reads;
|
||||
for (auto cp: inverse_candidates)
|
||||
{
|
||||
continue_delete();
|
||||
inode_t inode = cp.first;
|
||||
reads.push_back(json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
{ "key", base64_encode(
|
||||
parent->cli->st_cli.etcd_prefix+
|
||||
"/inode/stats/"+std::to_string(INODE_POOL(inode))+
|
||||
"/"+std::to_string(INODE_NO_POOL(inode))
|
||||
) },
|
||||
} }
|
||||
});
|
||||
}
|
||||
else if (state == 2)
|
||||
for (auto cp: sources)
|
||||
{
|
||||
return true;
|
||||
inode_t inode = cp.first;
|
||||
reads.push_back(json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
{ "key", base64_encode(
|
||||
parent->cli->st_cli.etcd_prefix+
|
||||
"/inode/stats/"+std::to_string(INODE_POOL(inode))+
|
||||
"/"+std::to_string(INODE_NO_POOL(inode))
|
||||
) },
|
||||
} }
|
||||
});
|
||||
}
|
||||
return false;
|
||||
parent->waiting++;
|
||||
parent->cli->st_cli.etcd_txn_slow(json11::Json::object {
|
||||
{ "success", reads },
|
||||
}, [this](std::string err, json11::Json data)
|
||||
{
|
||||
parent->waiting--;
|
||||
if (err != "")
|
||||
{
|
||||
fprintf(stderr, "Error reading layer statistics from etcd: %s\n", err.c_str());
|
||||
exit(1);
|
||||
}
|
||||
for (auto inode_result: data["responses"].array_items())
|
||||
{
|
||||
auto kv = parent->cli->st_cli.parse_etcd_kv(inode_result["kvs"][0]);
|
||||
pool_id_t pool_id = 0;
|
||||
inode_t inode = 0;
|
||||
char null_byte = 0;
|
||||
sscanf(kv.key.c_str() + parent->cli->st_cli.etcd_prefix.length()+13, "%u/%lu%c", &pool_id, &inode, &null_byte);
|
||||
if (!inode || null_byte != 0)
|
||||
{
|
||||
fprintf(stderr, "Bad key returned from etcd: %s\n", kv.key.c_str());
|
||||
exit(1);
|
||||
}
|
||||
auto pool_cfg_it = parent->cli->st_cli.pool_config.find(pool_id);
|
||||
if (pool_cfg_it == parent->cli->st_cli.pool_config.end())
|
||||
{
|
||||
fprintf(stderr, "Pool %u does not exist\n", pool_id);
|
||||
exit(1);
|
||||
}
|
||||
inode = INODE_WITH_POOL(pool_id, inode);
|
||||
auto & pool_cfg = pool_cfg_it->second;
|
||||
uint64_t used_bytes = kv.value["raw_used"].uint64_value() / pool_cfg.pg_size;
|
||||
if (pool_cfg.scheme != POOL_SCHEME_REPLICATED)
|
||||
{
|
||||
used_bytes *= (pool_cfg.pg_size - pool_cfg.parity_chunks);
|
||||
}
|
||||
inode_used[inode] = used_bytes;
|
||||
}
|
||||
parent->ringloop->wakeup();
|
||||
});
|
||||
}
|
||||
|
||||
void choose_inverse_candidate()
|
||||
{
|
||||
uint64_t max_diff = 0;
|
||||
for (auto cp: inverse_candidates)
|
||||
{
|
||||
inode_t child = cp.first;
|
||||
uint64_t child_used = inode_used[child];
|
||||
int rank = cp.second;
|
||||
for (int i = chain_list.size()-rank; i < chain_list.size(); i++)
|
||||
{
|
||||
inode_t parent = chain_list[i];
|
||||
uint64_t parent_used = inode_used[parent];
|
||||
if (parent_used > child_used && (!max_diff || max_diff < (parent_used-child_used)))
|
||||
{
|
||||
max_diff = (parent_used-child_used);
|
||||
inverse_parent = parent;
|
||||
inverse_child = child;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void rename_inverse_parent()
|
||||
{
|
||||
auto child_it = parent->cli->st_cli.inode_config.find(inverse_child);
|
||||
if (child_it == parent->cli->st_cli.inode_config.end())
|
||||
{
|
||||
fprintf(stderr, "Inode %ld disappeared\n", inverse_child);
|
||||
exit(1);
|
||||
}
|
||||
auto target_it = parent->cli->st_cli.inode_config.find(inverse_parent);
|
||||
if (target_it == parent->cli->st_cli.inode_config.end())
|
||||
{
|
||||
fprintf(stderr, "Inode %ld disappeared\n", inverse_parent);
|
||||
exit(1);
|
||||
}
|
||||
inode_config_t *child_cfg = &child_it->second;
|
||||
inode_config_t *target_cfg = &target_it->second;
|
||||
std::string child_name = child_cfg->name;
|
||||
std::string target_name = target_cfg->name;
|
||||
std::string child_cfg_key = base64_encode(
|
||||
parent->cli->st_cli.etcd_prefix+
|
||||
"/config/inode/"+std::to_string(INODE_POOL(inverse_child))+
|
||||
"/"+std::to_string(INODE_NO_POOL(inverse_child))
|
||||
);
|
||||
std::string target_cfg_key = base64_encode(
|
||||
parent->cli->st_cli.etcd_prefix+
|
||||
"/config/inode/"+std::to_string(INODE_POOL(inverse_parent))+
|
||||
"/"+std::to_string(INODE_NO_POOL(inverse_parent))
|
||||
);
|
||||
// Fill new configuration
|
||||
inode_config_t new_cfg = *child_cfg;
|
||||
new_cfg.num = target_cfg->num;
|
||||
new_cfg.parent_id = new_parent;
|
||||
json11::Json::array cmp = json11::Json::array {
|
||||
json11::Json::object {
|
||||
{ "target", "MOD" },
|
||||
{ "key", child_cfg_key },
|
||||
{ "result", "LESS" },
|
||||
{ "mod_revision", child_cfg->mod_revision+1 },
|
||||
},
|
||||
json11::Json::object {
|
||||
{ "target", "MOD" },
|
||||
{ "key", target_cfg_key },
|
||||
{ "result", "LESS" },
|
||||
{ "mod_revision", target_cfg->mod_revision+1 },
|
||||
},
|
||||
};
|
||||
json11::Json::array txn = json11::Json::array {
|
||||
json11::Json::object {
|
||||
{ "request_delete_range", json11::Json::object {
|
||||
{ "key", child_cfg_key },
|
||||
} },
|
||||
},
|
||||
json11::Json::object {
|
||||
{ "request_put", json11::Json::object {
|
||||
{ "key", target_cfg_key },
|
||||
{ "value", base64_encode(json11::Json(parent->cli->st_cli.serialize_inode_cfg(&new_cfg)).dump()) },
|
||||
} },
|
||||
},
|
||||
json11::Json::object {
|
||||
{ "request_put", json11::Json::object {
|
||||
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/index/image/"+child_cfg->name) },
|
||||
{ "value", base64_encode(json11::Json({
|
||||
{ "id", INODE_NO_POOL(inverse_parent) },
|
||||
{ "pool_id", (uint64_t)INODE_POOL(inverse_parent) },
|
||||
}).dump()) },
|
||||
} },
|
||||
},
|
||||
};
|
||||
// Reparent children of inverse_child
|
||||
for (auto & cp: parent->cli->st_cli.inode_config)
|
||||
{
|
||||
if (cp.second.parent_id == child_cfg->num)
|
||||
{
|
||||
auto cp_cfg = cp.second;
|
||||
cp_cfg.parent_id = inverse_parent;
|
||||
auto cp_key = base64_encode(
|
||||
parent->cli->st_cli.etcd_prefix+
|
||||
"/config/inode/"+std::to_string(INODE_POOL(cp.second.num))+
|
||||
"/"+std::to_string(INODE_NO_POOL(cp.second.num))
|
||||
);
|
||||
cmp.push_back(json11::Json::object {
|
||||
{ "target", "MOD" },
|
||||
{ "key", cp_key },
|
||||
{ "result", "LESS" },
|
||||
{ "mod_revision", cp.second.mod_revision+1 },
|
||||
});
|
||||
txn.push_back(json11::Json::object {
|
||||
{ "request_put", json11::Json::object {
|
||||
{ "key", cp_key },
|
||||
{ "value", base64_encode(json11::Json(parent->cli->st_cli.serialize_inode_cfg(&cp_cfg)).dump()) },
|
||||
} },
|
||||
});
|
||||
}
|
||||
}
|
||||
parent->waiting++;
|
||||
parent->cli->st_cli.etcd_txn_slow(json11::Json::object {
|
||||
{ "compare", cmp },
|
||||
{ "success", txn },
|
||||
}, [this, target_name, child_name](std::string err, json11::Json res)
|
||||
{
|
||||
parent->waiting--;
|
||||
if (err != "")
|
||||
{
|
||||
fprintf(stderr, "Error renaming %s to %s: %s\n", target_name.c_str(), child_name.c_str(), err.c_str());
|
||||
exit(1);
|
||||
}
|
||||
if (!res["succeeded"].bool_value())
|
||||
{
|
||||
fprintf(
|
||||
stderr, "Parent (%s), child (%s), or one of its children"
|
||||
" configuration was modified during rename\n", target_name.c_str(), child_name.c_str()
|
||||
);
|
||||
exit(1);
|
||||
}
|
||||
printf("Layer %s renamed to %s\n", target_name.c_str(), child_name.c_str());
|
||||
parent->ringloop->wakeup();
|
||||
});
|
||||
}
|
||||
|
||||
void delete_inode_config(inode_t cur)
|
||||
{
|
||||
auto cur_cfg_it = parent->cli->st_cli.inode_config.find(cur);
|
||||
if (cur_cfg_it == parent->cli->st_cli.inode_config.end())
|
||||
{
|
||||
fprintf(stderr, "Inode 0x%lx disappeared\n", cur);
|
||||
exit(1);
|
||||
}
|
||||
inode_config_t *cur_cfg = &cur_cfg_it->second;
|
||||
std::string cur_name = cur_cfg->name;
|
||||
std::string cur_cfg_key = base64_encode(
|
||||
parent->cli->st_cli.etcd_prefix+
|
||||
"/config/inode/"+std::to_string(INODE_POOL(cur))+
|
||||
"/"+std::to_string(INODE_NO_POOL(cur))
|
||||
);
|
||||
parent->waiting++;
|
||||
parent->cli->st_cli.etcd_txn_slow(json11::Json::object {
|
||||
{ "compare", json11::Json::array {
|
||||
json11::Json::object {
|
||||
{ "target", "MOD" },
|
||||
{ "key", cur_cfg_key },
|
||||
{ "result", "LESS" },
|
||||
{ "mod_revision", cur_cfg->mod_revision+1 },
|
||||
},
|
||||
} },
|
||||
{ "success", json11::Json::array {
|
||||
json11::Json::object {
|
||||
{ "request_delete_range", json11::Json::object {
|
||||
{ "key", cur_cfg_key },
|
||||
} },
|
||||
},
|
||||
json11::Json::object {
|
||||
{ "request_delete_range", json11::Json::object {
|
||||
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/index/image/"+cur_name) },
|
||||
} },
|
||||
},
|
||||
} },
|
||||
}, [this, cur_name](std::string err, json11::Json res)
|
||||
{
|
||||
parent->waiting--;
|
||||
if (err != "")
|
||||
{
|
||||
fprintf(stderr, "Error deleting %s: %s\n", cur_name.c_str(), err.c_str());
|
||||
exit(1);
|
||||
}
|
||||
if (!res["succeeded"].bool_value())
|
||||
{
|
||||
fprintf(stderr, "Layer %s configuration was modified during deletion\n", cur_name.c_str());
|
||||
exit(1);
|
||||
}
|
||||
printf("Layer %s deleted\n", cur_name.c_str());
|
||||
parent->ringloop->wakeup();
|
||||
});
|
||||
}
|
||||
|
||||
void start_merge_child(inode_t child_inode, inode_t target_inode)
|
||||
{
|
||||
auto child_it = parent->cli->st_cli.inode_config.find(child_inode);
|
||||
if (child_it == parent->cli->st_cli.inode_config.end())
|
||||
{
|
||||
fprintf(stderr, "Inode %ld disappeared\n", child_inode);
|
||||
exit(1);
|
||||
}
|
||||
auto target_it = parent->cli->st_cli.inode_config.find(target_inode);
|
||||
if (target_it == parent->cli->st_cli.inode_config.end())
|
||||
{
|
||||
fprintf(stderr, "Inode %ld disappeared\n", target_inode);
|
||||
exit(1);
|
||||
}
|
||||
cb = parent->start_merge(json11::Json::object {
|
||||
{ "command", json11::Json::array{ "merge-data", from_name, child_it->second.name } },
|
||||
{ "target", target_it->second.name },
|
||||
{ "delete-source", false },
|
||||
{ "cas", use_cas },
|
||||
{ "fsync-interval", fsync_interval },
|
||||
});
|
||||
}
|
||||
|
||||
void start_delete_source(inode_t inode)
|
||||
{
|
||||
auto source = parent->cli->st_cli.inode_config.find(inode);
|
||||
if (source == parent->cli->st_cli.inode_config.end())
|
||||
{
|
||||
fprintf(stderr, "Inode %ld disappeared\n", inode);
|
||||
exit(1);
|
||||
}
|
||||
cb = parent->start_rm(json11::Json::object {
|
||||
{ "inode", inode },
|
||||
{ "pool", (uint64_t)INODE_POOL(inode) },
|
||||
{ "fsync-interval", fsync_interval },
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
std::function<bool(void)> cli_tool_t::start_rm(json11::Json cfg)
|
||||
std::function<bool(void)> cli_tool_t::start_snap_rm(json11::Json cfg)
|
||||
{
|
||||
auto remover = new rm_inode_t();
|
||||
remover->parent = this;
|
||||
remover->inode = cfg["inode"].uint64_value();
|
||||
remover->pool_id = cfg["pool"].uint64_value();
|
||||
if (remover->pool_id)
|
||||
json11::Json::array cmd = cfg["command"].array_items();
|
||||
auto snap_remover = new snap_remover_t();
|
||||
snap_remover->parent = this;
|
||||
snap_remover->from_name = cmd.size() > 1 ? cmd[1].string_value() : "";
|
||||
snap_remover->to_name = cmd.size() > 2 ? cmd[2].string_value() : "";
|
||||
if (snap_remover->from_name == "")
|
||||
{
|
||||
remover->inode = (remover->inode & ((1l << (64-POOL_ID_BITS)) - 1)) | (((uint64_t)remover->pool_id) << (64-POOL_ID_BITS));
|
||||
}
|
||||
remover->pool_id = INODE_POOL(remover->inode);
|
||||
if (!remover->pool_id)
|
||||
{
|
||||
fprintf(stderr, "pool is missing\n");
|
||||
fprintf(stderr, "Layer to remove argument is missing\n");
|
||||
exit(1);
|
||||
}
|
||||
remover->min_offset = cfg["min-offset"].uint64_value();
|
||||
return [remover]()
|
||||
if (snap_remover->to_name == "")
|
||||
{
|
||||
if (remover->loop())
|
||||
snap_remover->to_name = snap_remover->from_name;
|
||||
}
|
||||
snap_remover->fsync_interval = cfg["fsync-interval"].uint64_value();
|
||||
if (!snap_remover->fsync_interval)
|
||||
snap_remover->fsync_interval = 128;
|
||||
if (!cfg["cas"].is_null())
|
||||
snap_remover->use_cas = cfg["cas"].uint64_value() ? 2 : 0;
|
||||
if (!cfg["writers_stopped"].is_null())
|
||||
snap_remover->writers_stopped = true;
|
||||
return [snap_remover]()
|
||||
{
|
||||
snap_remover->loop();
|
||||
if (snap_remover->is_done())
|
||||
{
|
||||
delete remover;
|
||||
delete snap_remover;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
|
|
@ -0,0 +1,214 @@
|
|||
// Copyright (c) Vitaliy Filippov, 2019+
|
||||
// License: VNPL-1.1 (see README.md for details)
|
||||
|
||||
#include "cli.h"
|
||||
#include "cluster_client.h"
|
||||
|
||||
#define RM_LISTING 1
|
||||
#define RM_REMOVING 2
|
||||
#define RM_END 3
|
||||
|
||||
struct rm_pg_t
|
||||
{
|
||||
pg_num_t pg_num;
|
||||
osd_num_t rm_osd_num;
|
||||
std::set<object_id> objects;
|
||||
std::set<object_id>::iterator obj_pos;
|
||||
uint64_t obj_count = 0, obj_done = 0;
|
||||
int state = 0;
|
||||
int in_flight = 0;
|
||||
};
|
||||
|
||||
struct rm_inode_t
|
||||
{
|
||||
uint64_t inode = 0;
|
||||
pool_id_t pool_id = 0;
|
||||
uint64_t min_offset = 0;
|
||||
|
||||
cli_tool_t *parent = NULL;
|
||||
inode_list_t *lister = NULL;
|
||||
std::vector<rm_pg_t*> lists;
|
||||
uint64_t total_count = 0, total_done = 0, total_prev_pct = 0;
|
||||
uint64_t pgs_to_list = 0;
|
||||
bool lists_done = false;
|
||||
int state = 0;
|
||||
|
||||
void start_delete()
|
||||
{
|
||||
lister = parent->cli->list_inode_start(inode, [this](inode_list_t *lst,
|
||||
std::set<object_id>&& objects, pg_num_t pg_num, osd_num_t primary_osd, int status)
|
||||
{
|
||||
rm_pg_t *rm = new rm_pg_t((rm_pg_t){
|
||||
.pg_num = pg_num,
|
||||
.rm_osd_num = primary_osd,
|
||||
.objects = objects,
|
||||
.obj_count = objects.size(),
|
||||
.obj_done = 0,
|
||||
});
|
||||
if (min_offset == 0)
|
||||
{
|
||||
total_count += objects.size();
|
||||
}
|
||||
else
|
||||
{
|
||||
for (object_id oid: objects)
|
||||
{
|
||||
if (oid.stripe >= min_offset)
|
||||
{
|
||||
total_count++;
|
||||
}
|
||||
}
|
||||
}
|
||||
rm->obj_pos = rm->objects.begin();
|
||||
lists.push_back(rm);
|
||||
if (parent->list_first)
|
||||
{
|
||||
parent->cli->list_inode_next(lister, 1);
|
||||
}
|
||||
if (status & INODE_LIST_DONE)
|
||||
{
|
||||
lists_done = true;
|
||||
}
|
||||
pgs_to_list--;
|
||||
continue_delete();
|
||||
});
|
||||
if (!lister)
|
||||
{
|
||||
fprintf(stderr, "Failed to list inode %lu from pool %u objects\n", INODE_NO_POOL(inode), INODE_POOL(inode));
|
||||
exit(1);
|
||||
}
|
||||
pgs_to_list = parent->cli->list_pg_count(lister);
|
||||
parent->cli->list_inode_next(lister, parent->parallel_osds);
|
||||
}
|
||||
|
||||
void send_ops(rm_pg_t *cur_list)
|
||||
{
|
||||
if (parent->cli->msgr.osd_peer_fds.find(cur_list->rm_osd_num) ==
|
||||
parent->cli->msgr.osd_peer_fds.end())
|
||||
{
|
||||
// Initiate connection
|
||||
parent->cli->msgr.connect_peer(cur_list->rm_osd_num, parent->cli->st_cli.peer_states[cur_list->rm_osd_num]);
|
||||
return;
|
||||
}
|
||||
while (cur_list->in_flight < parent->iodepth && cur_list->obj_pos != cur_list->objects.end())
|
||||
{
|
||||
if (cur_list->obj_pos->stripe >= min_offset)
|
||||
{
|
||||
osd_op_t *op = new osd_op_t();
|
||||
op->op_type = OSD_OP_OUT;
|
||||
// Already checked that it exists above, but anyway
|
||||
op->peer_fd = parent->cli->msgr.osd_peer_fds.at(cur_list->rm_osd_num);
|
||||
op->req = (osd_any_op_t){
|
||||
.rw = {
|
||||
.header = {
|
||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||
.id = parent->cli->next_op_id(),
|
||||
.opcode = OSD_OP_DELETE,
|
||||
},
|
||||
.inode = cur_list->obj_pos->inode,
|
||||
.offset = cur_list->obj_pos->stripe,
|
||||
.len = 0,
|
||||
},
|
||||
};
|
||||
op->callback = [this, cur_list](osd_op_t *op)
|
||||
{
|
||||
cur_list->in_flight--;
|
||||
if (op->reply.hdr.retval < 0)
|
||||
{
|
||||
fprintf(stderr, "Failed to remove object %lx:%lx from PG %u (OSD %lu) (retval=%ld)\n",
|
||||
op->req.rw.inode, op->req.rw.offset,
|
||||
cur_list->pg_num, cur_list->rm_osd_num, op->reply.hdr.retval);
|
||||
}
|
||||
delete op;
|
||||
cur_list->obj_done++;
|
||||
total_done++;
|
||||
continue_delete();
|
||||
};
|
||||
cur_list->in_flight++;
|
||||
parent->cli->msgr.outbox_push(op);
|
||||
}
|
||||
cur_list->obj_pos++;
|
||||
}
|
||||
}
|
||||
|
||||
void continue_delete()
|
||||
{
|
||||
if (parent->list_first && !lists_done)
|
||||
{
|
||||
return;
|
||||
}
|
||||
for (int i = 0; i < lists.size(); i++)
|
||||
{
|
||||
if (!lists[i]->in_flight && lists[i]->obj_pos == lists[i]->objects.end())
|
||||
{
|
||||
delete lists[i];
|
||||
lists.erase(lists.begin()+i, lists.begin()+i+1);
|
||||
i--;
|
||||
if (!lists_done)
|
||||
{
|
||||
parent->cli->list_inode_next(lister, 1);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
send_ops(lists[i]);
|
||||
}
|
||||
}
|
||||
if (parent->progress && total_count > 0 && total_done*1000/total_count != total_prev_pct)
|
||||
{
|
||||
printf("\rRemoved %lu/%lu objects, %lu more PGs to list...", total_done, total_count, pgs_to_list);
|
||||
total_prev_pct = total_done*1000/total_count;
|
||||
}
|
||||
if (lists_done && !lists.size())
|
||||
{
|
||||
printf("Done, inode %lu in pool %u data removed\n", INODE_NO_POOL(inode), pool_id);
|
||||
state = 2;
|
||||
}
|
||||
}
|
||||
|
||||
bool loop()
|
||||
{
|
||||
if (state == 0)
|
||||
{
|
||||
start_delete();
|
||||
state = 1;
|
||||
}
|
||||
else if (state == 1)
|
||||
{
|
||||
continue_delete();
|
||||
}
|
||||
else if (state == 2)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
std::function<bool(void)> cli_tool_t::start_rm(json11::Json cfg)
|
||||
{
|
||||
auto remover = new rm_inode_t();
|
||||
remover->parent = this;
|
||||
remover->inode = cfg["inode"].uint64_value();
|
||||
remover->pool_id = cfg["pool"].uint64_value();
|
||||
if (remover->pool_id)
|
||||
{
|
||||
remover->inode = (remover->inode & ((1l << (64-POOL_ID_BITS)) - 1)) | (((uint64_t)remover->pool_id) << (64-POOL_ID_BITS));
|
||||
}
|
||||
remover->pool_id = INODE_POOL(remover->inode);
|
||||
if (!remover->pool_id)
|
||||
{
|
||||
fprintf(stderr, "pool is missing\n");
|
||||
exit(1);
|
||||
}
|
||||
remover->min_offset = cfg["min-offset"].uint64_value();
|
||||
return [remover]()
|
||||
{
|
||||
if (remover->loop())
|
||||
{
|
||||
delete remover;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
};
|
||||
}
|
|
@ -1,568 +0,0 @@
|
|||
// Copyright (c) Vitaliy Filippov, 2019+
|
||||
// License: VNPL-1.1 (see README.md for details)
|
||||
|
||||
#include <fcntl.h>
|
||||
#include "cli.h"
|
||||
#include "cluster_client.h"
|
||||
#include "base64.h"
|
||||
|
||||
// Remove layer(s): similar to merge, but alters metadata and processes multiple merge targets
|
||||
//
|
||||
// Exactly one child of the requested layers may be merged using the "inverted" workflow,
|
||||
// where we merge it "down" into one of the "to-be-removed" layers and then rename the
|
||||
// "to-be-removed" layer to the child. It may be done either if all writers are stopped
|
||||
// before trying to delete layers (which is signaled by --writers-stopped) or if that child
|
||||
// is a read-only layer (snapshot) itself.
|
||||
//
|
||||
// This "inverted" workflow trades copying data of one of the deleted layers for copying
|
||||
// data of one child of the chain which is also a child of the "traded" layer. So we
|
||||
// choose the (parent,child) pair which has the largest difference between "parent" and
|
||||
// "child" inode sizes.
|
||||
//
|
||||
// All other children of the chain are processed by iterating though them, merging removed
|
||||
// parents into them and rebasing them to the last layer which isn't a member of the removed
|
||||
// chain.
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// <parent> - <from> - <layer 2> - <to> - <child 1>
|
||||
// \ \ \- <child 2>
|
||||
// \ \- <child 3>
|
||||
// \-<child 4>
|
||||
//
|
||||
// 1) Find optimal pair for the "reverse" scenario
|
||||
// Imagine that it's (<layer 2>, <child 1>) in this example
|
||||
// 2) Process all children except <child 1>:
|
||||
// - Merge <from>..<to> to <child 2>
|
||||
// - Set <child 2> parent to <parent>
|
||||
// - Repeat for others
|
||||
// 3) Process <child 1>:
|
||||
// - Merge <from>..<child 1> to <layer 2>
|
||||
// - Set <layer 2> parent to <parent>
|
||||
// - Rename <layer 2> to <child 1>
|
||||
// 4) Delete other layers of the chain (<from>, <to>)
|
||||
struct snap_remover_t
|
||||
{
|
||||
cli_tool_t *parent;
|
||||
|
||||
// remove from..to
|
||||
std::string from_name, to_name;
|
||||
// writers are stopped, we can safely change writable layers
|
||||
bool writers_stopped = false;
|
||||
// use CAS writes (0 = never, 1 = auto, 2 = always)
|
||||
int use_cas = 1;
|
||||
// interval between fsyncs
|
||||
int fsync_interval = 128;
|
||||
|
||||
std::map<inode_t,int> sources;
|
||||
std::map<inode_t,uint64_t> inode_used;
|
||||
std::vector<inode_t> merge_children;
|
||||
std::vector<inode_t> chain_list;
|
||||
std::map<inode_t,int> inverse_candidates;
|
||||
inode_t inverse_parent = 0, inverse_child = 0;
|
||||
inode_t new_parent = 0;
|
||||
int state = 0;
|
||||
int current_child = 0;
|
||||
std::function<bool(void)> cb;
|
||||
|
||||
bool is_done()
|
||||
{
|
||||
return state == 9;
|
||||
}
|
||||
|
||||
void loop()
|
||||
{
|
||||
if (state == 1)
|
||||
goto resume_1;
|
||||
else if (state == 2)
|
||||
goto resume_2;
|
||||
else if (state == 3)
|
||||
goto resume_3;
|
||||
else if (state == 4)
|
||||
goto resume_4;
|
||||
else if (state == 5)
|
||||
goto resume_5;
|
||||
else if (state == 6)
|
||||
goto resume_6;
|
||||
else if (state == 7)
|
||||
goto resume_7;
|
||||
else if (state == 8)
|
||||
goto resume_8;
|
||||
else if (state == 9)
|
||||
goto resume_9;
|
||||
// Get children to merge
|
||||
get_merge_children();
|
||||
// Try to select an inode for the "inverse" optimized scenario
|
||||
// Read statistics from etcd to do it
|
||||
read_stats();
|
||||
state = 1;
|
||||
resume_1:
|
||||
if (parent->waiting > 0)
|
||||
return;
|
||||
choose_inverse_candidate();
|
||||
// Merge children one by one, except our "inverse" child
|
||||
for (current_child = 0; current_child < merge_children.size(); current_child++)
|
||||
{
|
||||
if (merge_children[current_child] == inverse_child)
|
||||
continue;
|
||||
start_merge_child(merge_children[current_child], merge_children[current_child]);
|
||||
resume_2:
|
||||
while (!cb())
|
||||
{
|
||||
state = 2;
|
||||
return;
|
||||
}
|
||||
cb = NULL;
|
||||
parent->change_parent(merge_children[current_child], new_parent);
|
||||
state = 3;
|
||||
resume_3:
|
||||
if (parent->waiting > 0)
|
||||
return;
|
||||
}
|
||||
// Merge our "inverse" child into our "inverse" parent
|
||||
if (inverse_child != 0)
|
||||
{
|
||||
start_merge_child(inverse_child, inverse_parent);
|
||||
resume_4:
|
||||
while (!cb())
|
||||
{
|
||||
state = 4;
|
||||
return;
|
||||
}
|
||||
cb = NULL;
|
||||
// Delete "inverse" child data
|
||||
start_delete_source(inverse_child);
|
||||
resume_5:
|
||||
while (!cb())
|
||||
{
|
||||
state = 5;
|
||||
return;
|
||||
}
|
||||
cb = NULL;
|
||||
// Delete "inverse" child metadata, rename parent over it,
|
||||
// and also change parent links of the previous "inverse" child
|
||||
rename_inverse_parent();
|
||||
state = 6;
|
||||
resume_6:
|
||||
if (parent->waiting > 0)
|
||||
return;
|
||||
}
|
||||
// Delete parents, except the "inverse" one
|
||||
for (current_child = 0; current_child < chain_list.size(); current_child++)
|
||||
{
|
||||
if (chain_list[current_child] == inverse_parent)
|
||||
continue;
|
||||
start_delete_source(chain_list[current_child]);
|
||||
resume_7:
|
||||
while (!cb())
|
||||
{
|
||||
state = 7;
|
||||
return;
|
||||
}
|
||||
cb = NULL;
|
||||
delete_inode_config(chain_list[current_child]);
|
||||
state = 8;
|
||||
resume_8:
|
||||
if (parent->waiting > 0)
|
||||
return;
|
||||
}
|
||||
state = 9;
|
||||
resume_9:
|
||||
// Done
|
||||
return;
|
||||
}
|
||||
|
||||
void get_merge_children()
|
||||
{
|
||||
// Get all children of from..to
|
||||
inode_config_t *from_cfg = parent->get_inode_cfg(from_name);
|
||||
inode_config_t *to_cfg = parent->get_inode_cfg(to_name);
|
||||
// Check that to_cfg is actually a child of from_cfg
|
||||
// FIXME de-copypaste the following piece of code with snap_merger_t
|
||||
inode_config_t *cur = to_cfg;
|
||||
chain_list.push_back(cur->num);
|
||||
while (cur->num != from_cfg->num && cur->parent_id != 0)
|
||||
{
|
||||
auto it = parent->cli->st_cli.inode_config.find(cur->parent_id);
|
||||
if (it == parent->cli->st_cli.inode_config.end())
|
||||
{
|
||||
fprintf(stderr, "Parent inode of layer %s (id %ld) not found\n", cur->name.c_str(), cur->parent_id);
|
||||
exit(1);
|
||||
}
|
||||
cur = &it->second;
|
||||
chain_list.push_back(cur->num);
|
||||
}
|
||||
if (cur->num != from_cfg->num)
|
||||
{
|
||||
fprintf(stderr, "Layer %s is not a child of %s\n", to_name.c_str(), from_name.c_str());
|
||||
exit(1);
|
||||
}
|
||||
new_parent = from_cfg->parent_id;
|
||||
// Calculate ranks
|
||||
int i = chain_list.size()-1;
|
||||
for (inode_t item: chain_list)
|
||||
{
|
||||
sources[item] = i--;
|
||||
}
|
||||
for (auto & ic: parent->cli->st_cli.inode_config)
|
||||
{
|
||||
if (!ic.second.parent_id)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
auto it = sources.find(ic.second.parent_id);
|
||||
if (it != sources.end() && sources.find(ic.second.num) == sources.end())
|
||||
{
|
||||
merge_children.push_back(ic.second.num);
|
||||
if (ic.second.readonly || writers_stopped)
|
||||
{
|
||||
inverse_candidates[ic.second.num] = it->second;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void read_stats()
|
||||
{
|
||||
if (inverse_candidates.size() == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
json11::Json::array reads;
|
||||
for (auto cp: inverse_candidates)
|
||||
{
|
||||
inode_t inode = cp.first;
|
||||
reads.push_back(json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
{ "key", base64_encode(
|
||||
parent->cli->st_cli.etcd_prefix+
|
||||
"/inode/stats/"+std::to_string(INODE_POOL(inode))+
|
||||
"/"+std::to_string(INODE_NO_POOL(inode))
|
||||
) },
|
||||
} }
|
||||
});
|
||||
}
|
||||
for (auto cp: sources)
|
||||
{
|
||||
inode_t inode = cp.first;
|
||||
reads.push_back(json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
{ "key", base64_encode(
|
||||
parent->cli->st_cli.etcd_prefix+
|
||||
"/inode/stats/"+std::to_string(INODE_POOL(inode))+
|
||||
"/"+std::to_string(INODE_NO_POOL(inode))
|
||||
) },
|
||||
} }
|
||||
});
|
||||
}
|
||||
parent->waiting++;
|
||||
parent->cli->st_cli.etcd_txn_slow(json11::Json::object {
|
||||
{ "success", reads },
|
||||
}, [this](std::string err, json11::Json data)
|
||||
{
|
||||
parent->waiting--;
|
||||
if (err != "")
|
||||
{
|
||||
fprintf(stderr, "Error reading layer statistics from etcd: %s\n", err.c_str());
|
||||
exit(1);
|
||||
}
|
||||
for (auto inode_result: data["responses"].array_items())
|
||||
{
|
||||
auto kv = parent->cli->st_cli.parse_etcd_kv(inode_result["kvs"][0]);
|
||||
pool_id_t pool_id = 0;
|
||||
inode_t inode = 0;
|
||||
char null_byte = 0;
|
||||
sscanf(kv.key.c_str() + parent->cli->st_cli.etcd_prefix.length()+13, "%u/%lu%c", &pool_id, &inode, &null_byte);
|
||||
if (!inode || null_byte != 0)
|
||||
{
|
||||
fprintf(stderr, "Bad key returned from etcd: %s\n", kv.key.c_str());
|
||||
exit(1);
|
||||
}
|
||||
auto pool_cfg_it = parent->cli->st_cli.pool_config.find(pool_id);
|
||||
if (pool_cfg_it == parent->cli->st_cli.pool_config.end())
|
||||
{
|
||||
fprintf(stderr, "Pool %u does not exist\n", pool_id);
|
||||
exit(1);
|
||||
}
|
||||
inode = INODE_WITH_POOL(pool_id, inode);
|
||||
auto & pool_cfg = pool_cfg_it->second;
|
||||
uint64_t used_bytes = kv.value["raw_used"].uint64_value() / pool_cfg.pg_size;
|
||||
if (pool_cfg.scheme != POOL_SCHEME_REPLICATED)
|
||||
{
|
||||
used_bytes *= (pool_cfg.pg_size - pool_cfg.parity_chunks);
|
||||
}
|
||||
inode_used[inode] = used_bytes;
|
||||
}
|
||||
parent->ringloop->wakeup();
|
||||
});
|
||||
}
|
||||
|
||||
void choose_inverse_candidate()
|
||||
{
|
||||
uint64_t max_diff = 0;
|
||||
for (auto cp: inverse_candidates)
|
||||
{
|
||||
inode_t child = cp.first;
|
||||
uint64_t child_used = inode_used[child];
|
||||
int rank = cp.second;
|
||||
for (int i = chain_list.size()-rank; i < chain_list.size(); i++)
|
||||
{
|
||||
inode_t parent = chain_list[i];
|
||||
uint64_t parent_used = inode_used[parent];
|
||||
if (parent_used > child_used && (!max_diff || max_diff < (parent_used-child_used)))
|
||||
{
|
||||
max_diff = (parent_used-child_used);
|
||||
inverse_parent = parent;
|
||||
inverse_child = child;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void rename_inverse_parent()
|
||||
{
|
||||
auto child_it = parent->cli->st_cli.inode_config.find(inverse_child);
|
||||
if (child_it == parent->cli->st_cli.inode_config.end())
|
||||
{
|
||||
fprintf(stderr, "Inode %ld disappeared\n", inverse_child);
|
||||
exit(1);
|
||||
}
|
||||
auto target_it = parent->cli->st_cli.inode_config.find(inverse_parent);
|
||||
if (target_it == parent->cli->st_cli.inode_config.end())
|
||||
{
|
||||
fprintf(stderr, "Inode %ld disappeared\n", inverse_parent);
|
||||
exit(1);
|
||||
}
|
||||
inode_config_t *child_cfg = &child_it->second;
|
||||
inode_config_t *target_cfg = &target_it->second;
|
||||
std::string child_name = child_cfg->name;
|
||||
std::string target_name = target_cfg->name;
|
||||
std::string child_cfg_key = base64_encode(
|
||||
parent->cli->st_cli.etcd_prefix+
|
||||
"/config/inode/"+std::to_string(INODE_POOL(inverse_child))+
|
||||
"/"+std::to_string(INODE_NO_POOL(inverse_child))
|
||||
);
|
||||
std::string target_cfg_key = base64_encode(
|
||||
parent->cli->st_cli.etcd_prefix+
|
||||
"/config/inode/"+std::to_string(INODE_POOL(inverse_parent))+
|
||||
"/"+std::to_string(INODE_NO_POOL(inverse_parent))
|
||||
);
|
||||
// Fill new configuration
|
||||
inode_config_t new_cfg = *child_cfg;
|
||||
new_cfg.num = target_cfg->num;
|
||||
new_cfg.parent_id = new_parent;
|
||||
json11::Json::array cmp = json11::Json::array {
|
||||
json11::Json::object {
|
||||
{ "target", "MOD" },
|
||||
{ "key", child_cfg_key },
|
||||
{ "result", "LESS" },
|
||||
{ "mod_revision", child_cfg->mod_revision+1 },
|
||||
},
|
||||
json11::Json::object {
|
||||
{ "target", "MOD" },
|
||||
{ "key", target_cfg_key },
|
||||
{ "result", "LESS" },
|
||||
{ "mod_revision", target_cfg->mod_revision+1 },
|
||||
},
|
||||
};
|
||||
json11::Json::array txn = json11::Json::array {
|
||||
json11::Json::object {
|
||||
{ "request_delete_range", json11::Json::object {
|
||||
{ "key", child_cfg_key },
|
||||
} },
|
||||
},
|
||||
json11::Json::object {
|
||||
{ "request_put", json11::Json::object {
|
||||
{ "key", target_cfg_key },
|
||||
{ "value", base64_encode(json11::Json(parent->cli->st_cli.serialize_inode_cfg(&new_cfg)).dump()) },
|
||||
} },
|
||||
},
|
||||
json11::Json::object {
|
||||
{ "request_put", json11::Json::object {
|
||||
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/index/image/"+child_cfg->name) },
|
||||
{ "value", base64_encode(json11::Json({
|
||||
{ "id", INODE_NO_POOL(inverse_parent) },
|
||||
{ "pool_id", (uint64_t)INODE_POOL(inverse_parent) },
|
||||
}).dump()) },
|
||||
} },
|
||||
},
|
||||
};
|
||||
// Reparent children of inverse_child
|
||||
for (auto & cp: parent->cli->st_cli.inode_config)
|
||||
{
|
||||
if (cp.second.parent_id == child_cfg->num)
|
||||
{
|
||||
auto cp_cfg = cp.second;
|
||||
cp_cfg.parent_id = inverse_parent;
|
||||
auto cp_key = base64_encode(
|
||||
parent->cli->st_cli.etcd_prefix+
|
||||
"/config/inode/"+std::to_string(INODE_POOL(cp.second.num))+
|
||||
"/"+std::to_string(INODE_NO_POOL(cp.second.num))
|
||||
);
|
||||
cmp.push_back(json11::Json::object {
|
||||
{ "target", "MOD" },
|
||||
{ "key", cp_key },
|
||||
{ "result", "LESS" },
|
||||
{ "mod_revision", cp.second.mod_revision+1 },
|
||||
});
|
||||
txn.push_back(json11::Json::object {
|
||||
{ "request_put", json11::Json::object {
|
||||
{ "key", cp_key },
|
||||
{ "value", base64_encode(json11::Json(parent->cli->st_cli.serialize_inode_cfg(&cp_cfg)).dump()) },
|
||||
} },
|
||||
});
|
||||
}
|
||||
}
|
||||
parent->waiting++;
|
||||
parent->cli->st_cli.etcd_txn_slow(json11::Json::object {
|
||||
{ "compare", cmp },
|
||||
{ "success", txn },
|
||||
}, [this, target_name, child_name](std::string err, json11::Json res)
|
||||
{
|
||||
parent->waiting--;
|
||||
if (err != "")
|
||||
{
|
||||
fprintf(stderr, "Error renaming %s to %s: %s\n", target_name.c_str(), child_name.c_str(), err.c_str());
|
||||
exit(1);
|
||||
}
|
||||
if (!res["succeeded"].bool_value())
|
||||
{
|
||||
fprintf(
|
||||
stderr, "Parent (%s), child (%s), or one of its children"
|
||||
" configuration was modified during rename\n", target_name.c_str(), child_name.c_str()
|
||||
);
|
||||
exit(1);
|
||||
}
|
||||
printf("Layer %s renamed to %s\n", target_name.c_str(), child_name.c_str());
|
||||
parent->ringloop->wakeup();
|
||||
});
|
||||
}
|
||||
|
||||
void delete_inode_config(inode_t cur)
|
||||
{
|
||||
auto cur_cfg_it = parent->cli->st_cli.inode_config.find(cur);
|
||||
if (cur_cfg_it == parent->cli->st_cli.inode_config.end())
|
||||
{
|
||||
fprintf(stderr, "Inode 0x%lx disappeared\n", cur);
|
||||
exit(1);
|
||||
}
|
||||
inode_config_t *cur_cfg = &cur_cfg_it->second;
|
||||
std::string cur_name = cur_cfg->name;
|
||||
std::string cur_cfg_key = base64_encode(
|
||||
parent->cli->st_cli.etcd_prefix+
|
||||
"/config/inode/"+std::to_string(INODE_POOL(cur))+
|
||||
"/"+std::to_string(INODE_NO_POOL(cur))
|
||||
);
|
||||
parent->waiting++;
|
||||
parent->cli->st_cli.etcd_txn_slow(json11::Json::object {
|
||||
{ "compare", json11::Json::array {
|
||||
json11::Json::object {
|
||||
{ "target", "MOD" },
|
||||
{ "key", cur_cfg_key },
|
||||
{ "result", "LESS" },
|
||||
{ "mod_revision", cur_cfg->mod_revision+1 },
|
||||
},
|
||||
} },
|
||||
{ "success", json11::Json::array {
|
||||
json11::Json::object {
|
||||
{ "request_delete_range", json11::Json::object {
|
||||
{ "key", cur_cfg_key },
|
||||
} },
|
||||
},
|
||||
json11::Json::object {
|
||||
{ "request_delete_range" |