From f95299b769996a1e7ed92a89507f3449a15a4f02 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 19 Apr 2020 00:20:18 +0300 Subject: [PATCH] Take PG history into account when starting PGs --- osd.cpp | 3 +- osd.h | 6 +- osd_cluster.cpp | 133 ++++++++++++++++++++------ osd_peering.cpp | 241 ++++++++++++++++++++++++----------------------- osd_peering_pg.h | 5 +- 5 files changed, 239 insertions(+), 149 deletions(-) diff --git a/osd.cpp b/osd.cpp index 733d82e3..876b470d 100644 --- a/osd.cpp +++ b/osd.cpp @@ -365,6 +365,7 @@ void osd_t::stop_client(int peer_fd) osd_client_t cl = it->second; if (cl.osd_num) { + // FIXME: Reload configuration from Consul when the connection is dropped printf("[%lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl.osd_num); } else @@ -381,7 +382,7 @@ void osd_t::stop_client(int peer_fd) // Cancel outbound operations cancel_osd_ops(cl); osd_peer_fds.erase(cl.osd_num); - repeer_pgs(cl.osd_num, false); + repeer_pgs(cl.osd_num); peering_state |= OSD_CONNECTING_PEERS; } if (cl.read_op) diff --git a/osd.h b/osd.h index dad47fd9..5a8c921c 100644 --- a/osd.h +++ b/osd.h @@ -267,10 +267,11 @@ class osd_t void print_stats(); void reset_stats(); json11::Json get_status(); + void consul_txn(json11::Json txn, std::function callback); void init_cluster(); void report_status(); void load_pgs(); - void parse_pgs(json11::Json data); + void parse_pgs(const json11::Json & pg_config, const std::map & pg_history); void load_and_connect_peers(); // event loop, socket read/write @@ -297,8 +298,9 @@ class osd_t void parse_test_peer(std::string peer); void init_primary(); void handle_peers(); - void repeer_pgs(osd_num_t osd_num, bool is_connected); + void repeer_pgs(osd_num_t osd_num); void start_pg_peering(pg_num_t pg_num); + void submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps); bool stop_pg(pg_num_t pg_num); void finish_stop_pg(pg_t & pg); diff --git a/osd_cluster.cpp b/osd_cluster.cpp index 8eea1710..96b25800 100644 --- a/osd_cluster.cpp +++ b/osd_cluster.cpp @@ -119,19 +119,54 @@ void osd_t::report_status() }); } +void osd_t::consul_txn(json11::Json txn, std::function callback) +{ + std::string req = txn.dump(); + req = "PUT /v1/txn HTTP/1.1\r\n" + "Host: "+consul_host+"\r\n" + "Content-Type: application/json\r\n" + "Content-Length: "+std::to_string(req.size())+"\r\n" + "Connection: close\r\n" + "\r\n"+req; + http_request_json(consul_address, req, callback); +} + +uint64_t stoull_full(std::string str, int base = 10) +{ + if (isspace(str[0])) + { + return 0; + } + size_t end = -1; + uint64_t r = std::stoull(str, &end, base); + if (end < str.length()) + { + return 0; + } + return r; +} + // Start -> Load PGs -> Load peers -> Connect to peers -> Peer PGs // Wait for PG changes -> Start/Stop PGs when requested // Peer connection is lost -> Reload connection data -> Try to reconnect -> Repeat void osd_t::load_pgs() { assert(this->pgs.size() == 0); - std::string req = "GET /v1/kv/"+consul_prefix+"/config/pgs?raw"+ - /*(consul_change_index > 0 ? "&index="+std::to_string(consul_change_index) : "")+*/ - " HTTP/1.1\r\n"+ - "Host: "+consul_host+"\r\n"+ - "Connection: close\r\n"+ - "\r\n"; - http_request_json(consul_address, req, [this](std::string err, json11::Json data) + json11::Json::array txn = { + json11::Json::object { + { "KV", json11::Json::object { + { "Verb", "get" }, + { "Key", consul_prefix+"/config/pgs" }, + } } + }, + json11::Json::object { + { "KV", json11::Json::object { + { "Verb", "get-tree" }, + { "Key", consul_prefix+"/pg/history/" }, + } } + }, + }; + consul_txn(txn, [this](std::string err, json11::Json data) { if (err != "") { @@ -142,19 +177,43 @@ void osd_t::load_pgs() }); return; } - parse_pgs(data); + json11::Json pg_config; + std::map pg_history; + for (auto & res: data["Results"].array_items()) + { + std::string key = res["KV"]["Key"].string_value(); + std::string json_err; + json11::Json value = json11::Json::parse(base64_decode(res["KV"]["Value"].string_value()), json_err); + if (json_err != "") + { + printf("Bad JSON in Consul key %s: %s\n", key.c_str(), json_err.c_str()); + } + if (key == consul_prefix+"/config/pgs") + { + pg_config = value; + } + else + { + // /pg/history/%d. + pg_num_t pg_num = stoull_full(key.substr(consul_prefix.length()+13, key.length()-consul_prefix.length()-14)); + if (pg_num) + { + pg_history[pg_num] = value; + } + } + } + parse_pgs(pg_config, pg_history); peering_state = OSD_CONNECTING_PEERS; }); } -void osd_t::parse_pgs(json11::Json data) +void osd_t::parse_pgs(const json11::Json & pg_config, const std::map & pg_history) { uint64_t pg_count = 0; - for (auto pg_item: data.object_items()) + for (auto pg_item: pg_config.object_items()) { - char *pg_num_end = NULL; - pg_num_t pg_num = strtoull(pg_item.first.c_str(), &pg_num_end, 10); - if (!pg_num || *pg_num_end != 0) + pg_num_t pg_num = stoull_full(pg_item.first); + if (!pg_num) { throw std::runtime_error("Bad key in PG hash: "+pg_item.first); } @@ -163,27 +222,52 @@ void osd_t::parse_pgs(json11::Json data) if (primary_osd == this->osd_num) { // Take this PG + std::set all_peers; std::vector target_set; for (auto pg_osd_num: pg_json["osd_set"].array_items()) { osd_num_t pg_osd = pg_osd_num.uint64_value(); target_set.push_back(pg_osd); + if (pg_osd != 0) + { + all_peers.insert(pg_osd); + } } if (target_set.size() != 3) { throw std::runtime_error("Bad PG "+std::to_string(pg_num)+" config format: incorrect osd_set"); } + std::vector> target_history; + auto hist_it = pg_history.find(pg_num); + if (hist_it != pg_history.end()) + { + for (auto hist_item: hist_it->second.array_items()) + { + std::vector history_set; + for (auto pg_osd_num: hist_item["osd_set"].array_items()) + { + osd_num_t pg_osd = pg_osd_num.uint64_value(); + history_set.push_back(pg_osd); + if (pg_osd != 0) + { + all_peers.insert(pg_osd); + } + } + target_history.push_back(history_set); + } + } this->pgs[pg_num] = (pg_t){ .state = PG_PEERING, .pg_cursize = 0, .pg_num = pg_num, + .all_peers = std::vector(all_peers.begin(), all_peers.end()), + .target_history = target_history, .target_set = target_set, }; this->pgs[pg_num].print_state(); // Add peers - for (auto pg_osd: target_set) + for (auto pg_osd: all_peers) { - // FIXME: Add OSDs from PG history to peers if (pg_osd != this->osd_num && osd_peer_fds.find(pg_osd) == osd_peer_fds.end()) { wanted_peers[pg_osd] = { 0 }; @@ -197,7 +281,7 @@ void osd_t::parse_pgs(json11::Json data) void osd_t::load_and_connect_peers() { - json11::Json::array consul_txn; + json11::Json::array load_peer_txn; for (auto wp_it = wanted_peers.begin(); wp_it != wanted_peers.end();) { osd_num_t osd_num = wp_it->first; @@ -217,7 +301,7 @@ void osd_t::load_and_connect_peers() { // (Re)load OSD state from Consul wp_it->second.last_load_attempt = time(NULL); - consul_txn.push_back(json11::Json::object { + load_peer_txn.push_back(json11::Json::object { { "KV", json11::Json::object { { "Verb", "get-tree" }, { "Key", consul_prefix+"/osd/state/"+std::to_string(osd_num)+"." }, @@ -255,7 +339,6 @@ void osd_t::load_and_connect_peers() return; } printf("Connected with peer OSD %lu (fd %d)\n", clients[peer_fd].osd_num, peer_fd); - // FIXME: Check peer config after connecting wanted_peers.erase(osd_num); if (!wanted_peers.size()) { @@ -263,7 +346,7 @@ void osd_t::load_and_connect_peers() printf("Connected to all peers\n"); peering_state = peering_state & ~OSD_CONNECTING_PEERS; } - repeer_pgs(osd_num, true); + repeer_pgs(osd_num); }); } else @@ -272,17 +355,9 @@ void osd_t::load_and_connect_peers() wp_it++; } } - if (consul_txn.size() > 0) + if (load_peer_txn.size() > 0) { - std::string req = json11::Json(consul_txn).dump(); - req = "PUT /v1/txn HTTP/1.1\r\n" - "Host: "+consul_host+"\r\n" - "Content-Type: application/json\r\n" - "Content-Length: "+std::to_string(req.size())+"\r\n" - "Connection: close\r\n" - "\r\n"+req; - loading_peer_config = true; - http_request_json(consul_address, req, [this](std::string err, json11::Json data) + consul_txn(load_peer_txn, [this](std::string err, json11::Json data) { loading_peer_config = false; if (err != "") diff --git a/osd_peering.cpp b/osd_peering.cpp index c0d60dea..7cecce7e 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -217,20 +217,17 @@ void osd_t::handle_peers() } } -void osd_t::repeer_pgs(osd_num_t osd_num, bool is_connected) +void osd_t::repeer_pgs(osd_num_t osd_num) { // Re-peer affected PGs - // FIXME: We shouldn't rely just on target_set. Other OSDs may also contain PG data. - osd_num_t real_osd = (is_connected ? osd_num : 0); for (auto & p: pgs) { bool repeer = false; if (p.second.state != PG_OFFLINE) { - for (int r = 0; r < p.second.target_set.size(); r++) + for (osd_num_t pg_osd: p.second.all_peers) { - if (p.second.target_set[r] == osd_num && - (p.second.cur_set.size() < r || p.second.cur_set[r] != real_osd)) + if (pg_osd == osd_num) { repeer = true; break; @@ -284,7 +281,7 @@ void osd_t::start_pg_peering(pg_num_t pg_num) } pg.inflight = 0; dirty_pgs.erase(pg.pg_num); - // Start peering + // Calculate current write OSD set pg.pg_cursize = 0; pg.cur_set.resize(pg.target_set.size()); for (int role = 0; role < pg.target_set.size(); role++) @@ -296,25 +293,47 @@ void osd_t::start_pg_peering(pg_num_t pg_num) pg.pg_cursize++; } } + if (pg.target_history.size()) + { + // Refuse to start PG if no peers are available from any of the historical OSD sets + // (PG history is kept up to the latest active+clean state) + for (auto & history_set: pg.target_history) + { + bool found = false; + for (auto history_osd: history_set) + { + if (history_osd != 0 && osd_peer_fds.find(history_osd) != osd_peer_fds.end()) + { + found = true; + break; + } + } + if (!found) + { + pg.state = PG_INCOMPLETE; + pg.print_state(); + } + } + } if (pg.pg_cursize < pg.pg_minsize) { pg.state = PG_INCOMPLETE; pg.print_state(); } + std::set cur_peers; + for (auto peer_osd: pg.all_peers) + { + if (osd_peer_fds.find(peer_osd) != osd_peer_fds.end()) + { + cur_peers.insert(peer_osd); + } + } if (pg.peering_state) { - // Adjust the peering operation that's still in progress + // Adjust the peering operation that's still in progress - discard unneeded results for (auto it = pg.peering_state->list_ops.begin(); it != pg.peering_state->list_ops.end(); it++) { - int role; - for (role = 0; role < pg.cur_set.size(); role++) - { - if (pg.cur_set[role] == it->first) - { - break; - } - } - if (pg.state == PG_INCOMPLETE || role >= pg.cur_set.size()) + if (pg.state == PG_INCOMPLETE || cur_peers.find(it->first) == cur_peers.end()) { // Discard the result after completion, which, chances are, will be unsuccessful auto list_op = it->second; @@ -342,15 +361,7 @@ void osd_t::start_pg_peering(pg_num_t pg_num) } for (auto it = pg.peering_state->list_results.begin(); it != pg.peering_state->list_results.end(); it++) { - int role; - for (role = 0; role < pg.cur_set.size(); role++) - { - if (pg.cur_set[role] == it->first) - { - break; - } - } - if (pg.state == PG_INCOMPLETE || role >= pg.cur_set.size()) + if (pg.state == PG_INCOMPLETE || cur_peers.find(it->first) == cur_peers.end()) { if (it->second.buf) { @@ -373,105 +384,103 @@ void osd_t::start_pg_peering(pg_num_t pg_num) if (!pg.peering_state) { pg.peering_state = new pg_peering_state_t(); + pg.peering_state->pg_num = pg.pg_num; } - auto ps = pg.peering_state; - for (int role = 0; role < pg.cur_set.size(); role++) + for (osd_num_t peer_osd: cur_peers) { - osd_num_t role_osd = pg.cur_set[role]; - if (!role_osd) + if (pg.peering_state->list_ops.find(peer_osd) != pg.peering_state->list_ops.end() || + pg.peering_state->list_results.find(peer_osd) != pg.peering_state->list_results.end()) { continue; } - if (ps->list_ops.find(role_osd) != ps->list_ops.end() || - ps->list_results.find(role_osd) != ps->list_results.end()) - { - continue; - } - if (role_osd == this->osd_num) - { - // Self - osd_op_t *op = new osd_op_t(); - op->op_type = 0; - op->peer_fd = 0; - op->bs_op = new blockstore_op_t(); - op->bs_op->opcode = BS_OP_LIST; - op->bs_op->oid.stripe = pg_stripe_size; - op->bs_op->len = pg_count; - op->bs_op->offset = pg.pg_num-1; - op->bs_op->callback = [ps, op, role_osd](blockstore_op_t *bs_op) - { - if (op->bs_op->retval < 0) - { - throw std::runtime_error("local OP_LIST failed"); - } - printf( - "Got object list from OSD %lu (local): %d object versions (%lu of them stable)\n", - role_osd, bs_op->retval, bs_op->version - ); - ps->list_results[role_osd] = { - .buf = (obj_ver_id*)op->bs_op->buf, - .total_count = (uint64_t)op->bs_op->retval, - .stable_count = op->bs_op->version, - }; - ps->list_done++; - ps->list_ops.erase(role_osd); - delete op; - }; - bs->enqueue_op(op->bs_op); - ps->list_ops[role_osd] = op; - } - else - { - // Peer - auto & cl = clients[osd_peer_fds[role_osd]]; - osd_op_t *op = new osd_op_t(); - op->op_type = OSD_OP_OUT; - op->send_list.push_back(op->req.buf, OSD_PACKET_SIZE); - op->peer_fd = cl.peer_fd; - op->req = { - .sec_list = { - .header = { - .magic = SECONDARY_OSD_OP_MAGIC, - .id = this->next_subop_id++, - .opcode = OSD_OP_SECONDARY_LIST, - }, - .list_pg = pg.pg_num, - .pg_count = pg_count, - .pg_stripe_size = pg_stripe_size, - }, - }; - op->callback = [this, ps, role_osd](osd_op_t *op) - { - if (op->reply.hdr.retval < 0) - { - printf("Failed to get object list from OSD %lu (retval=%ld), disconnecting peer\n", role_osd, op->reply.hdr.retval); - ps->list_ops.erase(role_osd); - stop_client(op->peer_fd); - delete op; - return; - } - printf( - "Got object list from OSD %lu: %ld object versions (%lu of them stable)\n", - role_osd, op->reply.hdr.retval, op->reply.sec_list.stable_count - ); - ps->list_results[role_osd] = { - .buf = (obj_ver_id*)op->buf, - .total_count = (uint64_t)op->reply.hdr.retval, - .stable_count = op->reply.sec_list.stable_count, - }; - // set op->buf to NULL so it doesn't get freed - op->buf = NULL; - ps->list_done++; - ps->list_ops.erase(role_osd); - delete op; - }; - outbox_push(cl, op); - ps->list_ops[role_osd] = op; - } + submit_list_subop(peer_osd, pg.peering_state); } ringloop->wakeup(); } +void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps) +{ + if (role_osd == this->osd_num) + { + // Self + osd_op_t *op = new osd_op_t(); + op->op_type = 0; + op->peer_fd = 0; + op->bs_op = new blockstore_op_t(); + op->bs_op->opcode = BS_OP_LIST; + op->bs_op->oid.stripe = pg_stripe_size; + op->bs_op->len = pg_count; + op->bs_op->offset = ps->pg_num-1; + op->bs_op->callback = [ps, op, role_osd](blockstore_op_t *bs_op) + { + if (op->bs_op->retval < 0) + { + throw std::runtime_error("local OP_LIST failed"); + } + printf( + "[PG %u] Got object list from OSD %lu (local): %d object versions (%lu of them stable)\n", + ps->pg_num, role_osd, bs_op->retval, bs_op->version + ); + ps->list_results[role_osd] = { + .buf = (obj_ver_id*)op->bs_op->buf, + .total_count = (uint64_t)op->bs_op->retval, + .stable_count = op->bs_op->version, + }; + ps->list_ops.erase(role_osd); + delete op; + }; + bs->enqueue_op(op->bs_op); + ps->list_ops[role_osd] = op; + } + else + { + // Peer + auto & cl = clients[osd_peer_fds[role_osd]]; + osd_op_t *op = new osd_op_t(); + op->op_type = OSD_OP_OUT; + op->send_list.push_back(op->req.buf, OSD_PACKET_SIZE); + op->peer_fd = cl.peer_fd; + op->req = { + .sec_list = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = this->next_subop_id++, + .opcode = OSD_OP_SECONDARY_LIST, + }, + .list_pg = ps->pg_num, + .pg_count = pg_count, + .pg_stripe_size = pg_stripe_size, + }, + }; + op->callback = [this, ps, role_osd](osd_op_t *op) + { + if (op->reply.hdr.retval < 0) + { + printf("Failed to get object list from OSD %lu (retval=%ld), disconnecting peer\n", role_osd, op->reply.hdr.retval); + ps->list_ops.erase(role_osd); + stop_client(op->peer_fd); + delete op; + return; + } + printf( + "[PG %u] Got object list from OSD %lu: %ld object versions (%lu of them stable)\n", + ps->pg_num, role_osd, op->reply.hdr.retval, op->reply.sec_list.stable_count + ); + ps->list_results[role_osd] = { + .buf = (obj_ver_id*)op->buf, + .total_count = (uint64_t)op->reply.hdr.retval, + .stable_count = op->reply.sec_list.stable_count, + }; + // set op->buf to NULL so it doesn't get freed + op->buf = NULL; + ps->list_ops.erase(role_osd); + delete op; + }; + outbox_push(cl, op); + ps->list_ops[role_osd] = op; + } +} + bool osd_t::stop_pg(pg_num_t pg_num) { auto pg_it = pgs.find(pg_num); diff --git a/osd_peering_pg.h b/osd_peering_pg.h index c9593980..d07ecc17 100644 --- a/osd_peering_pg.h +++ b/osd_peering_pg.h @@ -70,7 +70,7 @@ struct pg_peering_state_t // osd_num -> list result std::unordered_map list_ops; std::unordered_map list_results; - int list_done = 0; + pg_num_t pg_num = 0; }; struct obj_piece_id_t @@ -100,6 +100,9 @@ struct pg_t uint64_t pg_cursize = 3, pg_size = 3, pg_minsize = 2; pg_num_t pg_num; uint64_t clean_count = 0, total_count = 0; + // all possible peers + std::vector all_peers; + std::vector> target_history; // target_set is the "correct" peer OSD set for this PG std::vector target_set; // cur_set is the current set of connected peer OSDs for this PG