Take PG history into account when starting PGs

trace-sqes
Vitaliy Filippov 2020-04-19 00:20:18 +03:00
parent 9126ffb0f9
commit f95299b769
5 changed files with 239 additions and 149 deletions

View File

@ -365,6 +365,7 @@ void osd_t::stop_client(int peer_fd)
osd_client_t cl = it->second;
if (cl.osd_num)
{
// FIXME: Reload configuration from Consul when the connection is dropped
printf("[%lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl.osd_num);
}
else
@ -381,7 +382,7 @@ void osd_t::stop_client(int peer_fd)
// Cancel outbound operations
cancel_osd_ops(cl);
osd_peer_fds.erase(cl.osd_num);
repeer_pgs(cl.osd_num, false);
repeer_pgs(cl.osd_num);
peering_state |= OSD_CONNECTING_PEERS;
}
if (cl.read_op)

6
osd.h
View File

@ -267,10 +267,11 @@ class osd_t
void print_stats();
void reset_stats();
json11::Json get_status();
void consul_txn(json11::Json txn, std::function<void(std::string, json11::Json)> callback);
void init_cluster();
void report_status();
void load_pgs();
void parse_pgs(json11::Json data);
void parse_pgs(const json11::Json & pg_config, const std::map<pg_num_t, json11::Json> & pg_history);
void load_and_connect_peers();
// event loop, socket read/write
@ -297,8 +298,9 @@ class osd_t
void parse_test_peer(std::string peer);
void init_primary();
void handle_peers();
void repeer_pgs(osd_num_t osd_num, bool is_connected);
void repeer_pgs(osd_num_t osd_num);
void start_pg_peering(pg_num_t pg_num);
void submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps);
bool stop_pg(pg_num_t pg_num);
void finish_stop_pg(pg_t & pg);

View File

@ -119,19 +119,54 @@ void osd_t::report_status()
});
}
void osd_t::consul_txn(json11::Json txn, std::function<void(std::string, json11::Json)> callback)
{
std::string req = txn.dump();
req = "PUT /v1/txn HTTP/1.1\r\n"
"Host: "+consul_host+"\r\n"
"Content-Type: application/json\r\n"
"Content-Length: "+std::to_string(req.size())+"\r\n"
"Connection: close\r\n"
"\r\n"+req;
http_request_json(consul_address, req, callback);
}
uint64_t stoull_full(std::string str, int base = 10)
{
if (isspace(str[0]))
{
return 0;
}
size_t end = -1;
uint64_t r = std::stoull(str, &end, base);
if (end < str.length())
{
return 0;
}
return r;
}
// Start -> Load PGs -> Load peers -> Connect to peers -> Peer PGs
// Wait for PG changes -> Start/Stop PGs when requested
// Peer connection is lost -> Reload connection data -> Try to reconnect -> Repeat
void osd_t::load_pgs()
{
assert(this->pgs.size() == 0);
std::string req = "GET /v1/kv/"+consul_prefix+"/config/pgs?raw"+
/*(consul_change_index > 0 ? "&index="+std::to_string(consul_change_index) : "")+*/
" HTTP/1.1\r\n"+
"Host: "+consul_host+"\r\n"+
"Connection: close\r\n"+
"\r\n";
http_request_json(consul_address, req, [this](std::string err, json11::Json data)
json11::Json::array txn = {
json11::Json::object {
{ "KV", json11::Json::object {
{ "Verb", "get" },
{ "Key", consul_prefix+"/config/pgs" },
} }
},
json11::Json::object {
{ "KV", json11::Json::object {
{ "Verb", "get-tree" },
{ "Key", consul_prefix+"/pg/history/" },
} }
},
};
consul_txn(txn, [this](std::string err, json11::Json data)
{
if (err != "")
{
@ -142,19 +177,43 @@ void osd_t::load_pgs()
});
return;
}
parse_pgs(data);
json11::Json pg_config;
std::map<pg_num_t, json11::Json> pg_history;
for (auto & res: data["Results"].array_items())
{
std::string key = res["KV"]["Key"].string_value();
std::string json_err;
json11::Json value = json11::Json::parse(base64_decode(res["KV"]["Value"].string_value()), json_err);
if (json_err != "")
{
printf("Bad JSON in Consul key %s: %s\n", key.c_str(), json_err.c_str());
}
if (key == consul_prefix+"/config/pgs")
{
pg_config = value;
}
else
{
// <consul_prefix>/pg/history/%d.
pg_num_t pg_num = stoull_full(key.substr(consul_prefix.length()+13, key.length()-consul_prefix.length()-14));
if (pg_num)
{
pg_history[pg_num] = value;
}
}
}
parse_pgs(pg_config, pg_history);
peering_state = OSD_CONNECTING_PEERS;
});
}
void osd_t::parse_pgs(json11::Json data)
void osd_t::parse_pgs(const json11::Json & pg_config, const std::map<pg_num_t, json11::Json> & pg_history)
{
uint64_t pg_count = 0;
for (auto pg_item: data.object_items())
for (auto pg_item: pg_config.object_items())
{
char *pg_num_end = NULL;
pg_num_t pg_num = strtoull(pg_item.first.c_str(), &pg_num_end, 10);
if (!pg_num || *pg_num_end != 0)
pg_num_t pg_num = stoull_full(pg_item.first);
if (!pg_num)
{
throw std::runtime_error("Bad key in PG hash: "+pg_item.first);
}
@ -163,27 +222,52 @@ void osd_t::parse_pgs(json11::Json data)
if (primary_osd == this->osd_num)
{
// Take this PG
std::set<osd_num_t> all_peers;
std::vector<osd_num_t> target_set;
for (auto pg_osd_num: pg_json["osd_set"].array_items())
{
osd_num_t pg_osd = pg_osd_num.uint64_value();
target_set.push_back(pg_osd);
if (pg_osd != 0)
{
all_peers.insert(pg_osd);
}
}
if (target_set.size() != 3)
{
throw std::runtime_error("Bad PG "+std::to_string(pg_num)+" config format: incorrect osd_set");
}
std::vector<std::vector<osd_num_t>> target_history;
auto hist_it = pg_history.find(pg_num);
if (hist_it != pg_history.end())
{
for (auto hist_item: hist_it->second.array_items())
{
std::vector<osd_num_t> history_set;
for (auto pg_osd_num: hist_item["osd_set"].array_items())
{
osd_num_t pg_osd = pg_osd_num.uint64_value();
history_set.push_back(pg_osd);
if (pg_osd != 0)
{
all_peers.insert(pg_osd);
}
}
target_history.push_back(history_set);
}
}
this->pgs[pg_num] = (pg_t){
.state = PG_PEERING,
.pg_cursize = 0,
.pg_num = pg_num,
.all_peers = std::vector<osd_num_t>(all_peers.begin(), all_peers.end()),
.target_history = target_history,
.target_set = target_set,
};
this->pgs[pg_num].print_state();
// Add peers
for (auto pg_osd: target_set)
for (auto pg_osd: all_peers)
{
// FIXME: Add OSDs from PG history to peers
if (pg_osd != this->osd_num && osd_peer_fds.find(pg_osd) == osd_peer_fds.end())
{
wanted_peers[pg_osd] = { 0 };
@ -197,7 +281,7 @@ void osd_t::parse_pgs(json11::Json data)
void osd_t::load_and_connect_peers()
{
json11::Json::array consul_txn;
json11::Json::array load_peer_txn;
for (auto wp_it = wanted_peers.begin(); wp_it != wanted_peers.end();)
{
osd_num_t osd_num = wp_it->first;
@ -217,7 +301,7 @@ void osd_t::load_and_connect_peers()
{
// (Re)load OSD state from Consul
wp_it->second.last_load_attempt = time(NULL);
consul_txn.push_back(json11::Json::object {
load_peer_txn.push_back(json11::Json::object {
{ "KV", json11::Json::object {
{ "Verb", "get-tree" },
{ "Key", consul_prefix+"/osd/state/"+std::to_string(osd_num)+"." },
@ -255,7 +339,6 @@ void osd_t::load_and_connect_peers()
return;
}
printf("Connected with peer OSD %lu (fd %d)\n", clients[peer_fd].osd_num, peer_fd);
// FIXME: Check peer config after connecting
wanted_peers.erase(osd_num);
if (!wanted_peers.size())
{
@ -263,7 +346,7 @@ void osd_t::load_and_connect_peers()
printf("Connected to all peers\n");
peering_state = peering_state & ~OSD_CONNECTING_PEERS;
}
repeer_pgs(osd_num, true);
repeer_pgs(osd_num);
});
}
else
@ -272,17 +355,9 @@ void osd_t::load_and_connect_peers()
wp_it++;
}
}
if (consul_txn.size() > 0)
if (load_peer_txn.size() > 0)
{
std::string req = json11::Json(consul_txn).dump();
req = "PUT /v1/txn HTTP/1.1\r\n"
"Host: "+consul_host+"\r\n"
"Content-Type: application/json\r\n"
"Content-Length: "+std::to_string(req.size())+"\r\n"
"Connection: close\r\n"
"\r\n"+req;
loading_peer_config = true;
http_request_json(consul_address, req, [this](std::string err, json11::Json data)
consul_txn(load_peer_txn, [this](std::string err, json11::Json data)
{
loading_peer_config = false;
if (err != "")

View File

@ -217,20 +217,17 @@ void osd_t::handle_peers()
}
}
void osd_t::repeer_pgs(osd_num_t osd_num, bool is_connected)
void osd_t::repeer_pgs(osd_num_t osd_num)
{
// Re-peer affected PGs
// FIXME: We shouldn't rely just on target_set. Other OSDs may also contain PG data.
osd_num_t real_osd = (is_connected ? osd_num : 0);
for (auto & p: pgs)
{
bool repeer = false;
if (p.second.state != PG_OFFLINE)
{
for (int r = 0; r < p.second.target_set.size(); r++)
for (osd_num_t pg_osd: p.second.all_peers)
{
if (p.second.target_set[r] == osd_num &&
(p.second.cur_set.size() < r || p.second.cur_set[r] != real_osd))
if (pg_osd == osd_num)
{
repeer = true;
break;
@ -284,7 +281,7 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
}
pg.inflight = 0;
dirty_pgs.erase(pg.pg_num);
// Start peering
// Calculate current write OSD set
pg.pg_cursize = 0;
pg.cur_set.resize(pg.target_set.size());
for (int role = 0; role < pg.target_set.size(); role++)
@ -296,25 +293,47 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
pg.pg_cursize++;
}
}
if (pg.target_history.size())
{
// Refuse to start PG if no peers are available from any of the historical OSD sets
// (PG history is kept up to the latest active+clean state)
for (auto & history_set: pg.target_history)
{
bool found = false;
for (auto history_osd: history_set)
{
if (history_osd != 0 && osd_peer_fds.find(history_osd) != osd_peer_fds.end())
{
found = true;
break;
}
}
if (!found)
{
pg.state = PG_INCOMPLETE;
pg.print_state();
}
}
}
if (pg.pg_cursize < pg.pg_minsize)
{
pg.state = PG_INCOMPLETE;
pg.print_state();
}
std::set<osd_num_t> cur_peers;
for (auto peer_osd: pg.all_peers)
{
if (osd_peer_fds.find(peer_osd) != osd_peer_fds.end())
{
cur_peers.insert(peer_osd);
}
}
if (pg.peering_state)
{
// Adjust the peering operation that's still in progress
// Adjust the peering operation that's still in progress - discard unneeded results
for (auto it = pg.peering_state->list_ops.begin(); it != pg.peering_state->list_ops.end(); it++)
{
int role;
for (role = 0; role < pg.cur_set.size(); role++)
{
if (pg.cur_set[role] == it->first)
{
break;
}
}
if (pg.state == PG_INCOMPLETE || role >= pg.cur_set.size())
if (pg.state == PG_INCOMPLETE || cur_peers.find(it->first) == cur_peers.end())
{
// Discard the result after completion, which, chances are, will be unsuccessful
auto list_op = it->second;
@ -342,15 +361,7 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
}
for (auto it = pg.peering_state->list_results.begin(); it != pg.peering_state->list_results.end(); it++)
{
int role;
for (role = 0; role < pg.cur_set.size(); role++)
{
if (pg.cur_set[role] == it->first)
{
break;
}
}
if (pg.state == PG_INCOMPLETE || role >= pg.cur_set.size())
if (pg.state == PG_INCOMPLETE || cur_peers.find(it->first) == cur_peers.end())
{
if (it->second.buf)
{
@ -373,105 +384,103 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
if (!pg.peering_state)
{
pg.peering_state = new pg_peering_state_t();
pg.peering_state->pg_num = pg.pg_num;
}
auto ps = pg.peering_state;
for (int role = 0; role < pg.cur_set.size(); role++)
for (osd_num_t peer_osd: cur_peers)
{
osd_num_t role_osd = pg.cur_set[role];
if (!role_osd)
if (pg.peering_state->list_ops.find(peer_osd) != pg.peering_state->list_ops.end() ||
pg.peering_state->list_results.find(peer_osd) != pg.peering_state->list_results.end())
{
continue;
}
if (ps->list_ops.find(role_osd) != ps->list_ops.end() ||
ps->list_results.find(role_osd) != ps->list_results.end())
{
continue;
}
if (role_osd == this->osd_num)
{
// Self
osd_op_t *op = new osd_op_t();
op->op_type = 0;
op->peer_fd = 0;
op->bs_op = new blockstore_op_t();
op->bs_op->opcode = BS_OP_LIST;
op->bs_op->oid.stripe = pg_stripe_size;
op->bs_op->len = pg_count;
op->bs_op->offset = pg.pg_num-1;
op->bs_op->callback = [ps, op, role_osd](blockstore_op_t *bs_op)
{
if (op->bs_op->retval < 0)
{
throw std::runtime_error("local OP_LIST failed");
}
printf(
"Got object list from OSD %lu (local): %d object versions (%lu of them stable)\n",
role_osd, bs_op->retval, bs_op->version
);
ps->list_results[role_osd] = {
.buf = (obj_ver_id*)op->bs_op->buf,
.total_count = (uint64_t)op->bs_op->retval,
.stable_count = op->bs_op->version,
};
ps->list_done++;
ps->list_ops.erase(role_osd);
delete op;
};
bs->enqueue_op(op->bs_op);
ps->list_ops[role_osd] = op;
}
else
{
// Peer
auto & cl = clients[osd_peer_fds[role_osd]];
osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT;
op->send_list.push_back(op->req.buf, OSD_PACKET_SIZE);
op->peer_fd = cl.peer_fd;
op->req = {
.sec_list = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = this->next_subop_id++,
.opcode = OSD_OP_SECONDARY_LIST,
},
.list_pg = pg.pg_num,
.pg_count = pg_count,
.pg_stripe_size = pg_stripe_size,
},
};
op->callback = [this, ps, role_osd](osd_op_t *op)
{
if (op->reply.hdr.retval < 0)
{
printf("Failed to get object list from OSD %lu (retval=%ld), disconnecting peer\n", role_osd, op->reply.hdr.retval);
ps->list_ops.erase(role_osd);
stop_client(op->peer_fd);
delete op;
return;
}
printf(
"Got object list from OSD %lu: %ld object versions (%lu of them stable)\n",
role_osd, op->reply.hdr.retval, op->reply.sec_list.stable_count
);
ps->list_results[role_osd] = {
.buf = (obj_ver_id*)op->buf,
.total_count = (uint64_t)op->reply.hdr.retval,
.stable_count = op->reply.sec_list.stable_count,
};
// set op->buf to NULL so it doesn't get freed
op->buf = NULL;
ps->list_done++;
ps->list_ops.erase(role_osd);
delete op;
};
outbox_push(cl, op);
ps->list_ops[role_osd] = op;
}
submit_list_subop(peer_osd, pg.peering_state);
}
ringloop->wakeup();
}
void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps)
{
if (role_osd == this->osd_num)
{
// Self
osd_op_t *op = new osd_op_t();
op->op_type = 0;
op->peer_fd = 0;
op->bs_op = new blockstore_op_t();
op->bs_op->opcode = BS_OP_LIST;
op->bs_op->oid.stripe = pg_stripe_size;
op->bs_op->len = pg_count;
op->bs_op->offset = ps->pg_num-1;
op->bs_op->callback = [ps, op, role_osd](blockstore_op_t *bs_op)
{
if (op->bs_op->retval < 0)
{
throw std::runtime_error("local OP_LIST failed");
}
printf(
"[PG %u] Got object list from OSD %lu (local): %d object versions (%lu of them stable)\n",
ps->pg_num, role_osd, bs_op->retval, bs_op->version
);
ps->list_results[role_osd] = {
.buf = (obj_ver_id*)op->bs_op->buf,
.total_count = (uint64_t)op->bs_op->retval,
.stable_count = op->bs_op->version,
};
ps->list_ops.erase(role_osd);
delete op;
};
bs->enqueue_op(op->bs_op);
ps->list_ops[role_osd] = op;
}
else
{
// Peer
auto & cl = clients[osd_peer_fds[role_osd]];
osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT;
op->send_list.push_back(op->req.buf, OSD_PACKET_SIZE);
op->peer_fd = cl.peer_fd;
op->req = {
.sec_list = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = this->next_subop_id++,
.opcode = OSD_OP_SECONDARY_LIST,
},
.list_pg = ps->pg_num,
.pg_count = pg_count,
.pg_stripe_size = pg_stripe_size,
},
};
op->callback = [this, ps, role_osd](osd_op_t *op)
{
if (op->reply.hdr.retval < 0)
{
printf("Failed to get object list from OSD %lu (retval=%ld), disconnecting peer\n", role_osd, op->reply.hdr.retval);
ps->list_ops.erase(role_osd);
stop_client(op->peer_fd);
delete op;
return;
}
printf(
"[PG %u] Got object list from OSD %lu: %ld object versions (%lu of them stable)\n",
ps->pg_num, role_osd, op->reply.hdr.retval, op->reply.sec_list.stable_count
);
ps->list_results[role_osd] = {
.buf = (obj_ver_id*)op->buf,
.total_count = (uint64_t)op->reply.hdr.retval,
.stable_count = op->reply.sec_list.stable_count,
};
// set op->buf to NULL so it doesn't get freed
op->buf = NULL;
ps->list_ops.erase(role_osd);
delete op;
};
outbox_push(cl, op);
ps->list_ops[role_osd] = op;
}
}
bool osd_t::stop_pg(pg_num_t pg_num)
{
auto pg_it = pgs.find(pg_num);

View File

@ -70,7 +70,7 @@ struct pg_peering_state_t
// osd_num -> list result
std::unordered_map<osd_num_t, osd_op_t*> list_ops;
std::unordered_map<osd_num_t, pg_list_result_t> list_results;
int list_done = 0;
pg_num_t pg_num = 0;
};
struct obj_piece_id_t
@ -100,6 +100,9 @@ struct pg_t
uint64_t pg_cursize = 3, pg_size = 3, pg_minsize = 2;
pg_num_t pg_num;
uint64_t clean_count = 0, total_count = 0;
// all possible peers
std::vector<osd_num_t> all_peers;
std::vector<std::vector<osd_num_t>> target_history;
// target_set is the "correct" peer OSD set for this PG
std::vector<osd_num_t> target_set;
// cur_set is the current set of connected peer OSDs for this PG