diff --git a/osd.cpp b/osd.cpp index 15c7e46c..35a590d4 100644 --- a/osd.cpp +++ b/osd.cpp @@ -44,9 +44,6 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo this->tfd = new timerfd_manager_t(ringloop); - if (run_primary) - init_primary(); - init_cluster(); consumer.loop = [this]() { loop(); }; @@ -141,6 +138,12 @@ void osd_t::parse_config(blockstore_config_t & config) peer_connect_interval = strtoull(config["peer_connect_interval"].c_str(), NULL, 10); if (!peer_connect_interval) peer_connect_interval = 5; + http_request_timeout = strtoull(config["http_request_timeout"].c_str(), NULL, 10); + if (!http_request_timeout) + http_request_timeout = 5; + peer_connect_timeout = strtoull(config["peer_connect_timeout"].c_str(), NULL, 10); + if (!peer_connect_timeout) + peer_connect_timeout = 5; } void osd_t::bind_socket() diff --git a/osd.h b/osd.h index 5a8c921c..bea29803 100644 --- a/osd.h +++ b/osd.h @@ -136,6 +136,7 @@ struct osd_client_t int peer_fd; int peer_state; std::function connect_callback; + int connect_timeout_id = -1; osd_num_t osd_num = 0; void *in_buf = NULL; @@ -211,6 +212,8 @@ class osd_t int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // sync every 5 seconds int recovery_queue_depth = DEFAULT_RECOVERY_QUEUE; int peer_connect_interval = 5; + int http_request_timeout = 5; + int peer_connect_timeout = 5; // peer OSDs diff --git a/osd_cluster.cpp b/osd_cluster.cpp index c7e2c6d0..fdf68f07 100644 --- a/osd_cluster.cpp +++ b/osd_cluster.cpp @@ -4,10 +4,17 @@ void osd_t::init_cluster() { + if (run_primary) + { + init_primary(); + } if (consul_address != "") { + if (!run_primary) + { + report_status(); + } printf("OSD %lu reporting to Consul at %s each %d seconds\n", osd_num, consul_address.c_str(), consul_report_interval); - report_status(); this->consul_tfd = new timerfd_interval(ringloop, consul_report_interval, [this]() { report_status(); @@ -15,6 +22,74 @@ void osd_t::init_cluster() } } +void osd_t::init_primary() +{ + if (consul_address == "") + { + // Test version of clustering code with 1 PG and 2 peers + // Example: peers = 2:127.0.0.1:11204,3:127.0.0.1:11205 + std::string peerstr = config["peers"]; + while (peerstr.size()) + { + int pos = peerstr.find(','); + parse_test_peer(pos < 0 ? peerstr : peerstr.substr(0, pos)); + peerstr = pos < 0 ? std::string("") : peerstr.substr(pos+1); + } + if (peer_states.size() < 2) + { + throw std::runtime_error("run_primary requires at least 2 peers"); + } + pgs[1] = (pg_t){ + .state = PG_PEERING, + .pg_cursize = 0, + .pg_num = 1, + .target_set = { 1, 2, 3 }, + .cur_set = { 0, 0, 0 }, + }; + pgs[1].print_state(); + pg_count = 1; + peering_state = OSD_CONNECTING_PEERS; + } + else + { + peering_state = OSD_LOADING_PGS; + load_pgs(); + } + if (autosync_interval > 0) + { + this->sync_tfd = new timerfd_interval(ringloop, autosync_interval, [this]() + { + autosync(); + }); + } +} + +void osd_t::parse_test_peer(std::string peer) +{ + // OSD_NUM:IP:PORT + int pos1 = peer.find(':'); + int pos2 = peer.find(':', pos1+1); + if (pos1 < 0 || pos2 < 0) + throw new std::runtime_error("OSD peer string must be in the form OSD_NUM:IP:PORT"); + std::string addr = peer.substr(pos1+1, pos2-pos1-1); + std::string osd_num_str = peer.substr(0, pos1); + std::string port_str = peer.substr(pos2+1); + osd_num_t osd_num = strtoull(osd_num_str.c_str(), NULL, 10); + if (!osd_num) + throw new std::runtime_error("Could not parse OSD peer osd_num"); + else if (peer_states.find(osd_num) != peer_states.end()) + throw std::runtime_error("Same osd number "+std::to_string(osd_num)+" specified twice in peers"); + int port = strtoull(port_str.c_str(), NULL, 10); + if (!port) + throw new std::runtime_error("Could not parse OSD peer port"); + peer_states[osd_num] = json11::Json::object { + { "state", "up" }, + { "addresses", json11::Json::array { addr } }, + { "port", port }, + }; + wanted_peers[osd_num] = { 0 }; +} + json11::Json osd_t::get_status() { json11::Json::object st; @@ -142,21 +217,6 @@ void osd_t::consul_txn(json11::Json txn, std::function Load PGs -> Load peers -> Connect to peers -> Peer PGs // Wait for PG changes -> Start/Stop PGs when requested // Peer connection is lost -> Reload connection data -> Try to reconnect -> Repeat @@ -164,6 +224,14 @@ void osd_t::load_pgs() { assert(this->pgs.size() == 0); json11::Json::array txn = { + // Update OSD state when loading PGs to allow "monitors" do CAS transactions when moving PGs + json11::Json::object { + { "KV", json11::Json::object { + { "Verb", "set" }, + { "Key", consul_prefix+"/osd/state/"+std::to_string(osd_num)+"." }, + { "Value", base64_encode(get_status().dump()) }, + } } + }, json11::Json::object { { "KV", json11::Json::object { { "Verb", "get" }, @@ -188,6 +256,7 @@ void osd_t::load_pgs() }); return; } + peering_state &= ~OSD_LOADING_PGS; json11::Json pg_config; std::map pg_history; for (auto & res: data["Results"].array_items()) @@ -203,10 +272,10 @@ void osd_t::load_pgs() { pg_config = value; } - else + else if (key.substr(0, consul_prefix.length()+12) == consul_prefix+"/pg/history/") { // /pg/history/%d. - pg_num_t pg_num = stoull_full(key.substr(consul_prefix.length()+13, key.length()-consul_prefix.length()-14)); + pg_num_t pg_num = stoull_full(key.substr(consul_prefix.length()+12, key.length()-consul_prefix.length()-13)); if (pg_num) { pg_history[pg_num] = value; diff --git a/osd_http.cpp b/osd_http.cpp index cbf00e2f..7d17d90e 100644 --- a/osd_http.cpp +++ b/osd_http.cpp @@ -25,7 +25,7 @@ static int extract_port(std::string & host) return port; } -std::vector getifaddr_list() +std::vector getifaddr_list(bool include_v6) { std::vector addresses; ifaddrs *list, *ifa; @@ -40,7 +40,7 @@ std::vector getifaddr_list() continue; } int family = ifa->ifa_addr->sa_family; - if ((family == AF_INET || family == AF_INET6) && + if ((family == AF_INET || family == AF_INET6 && include_v6) && (ifa->ifa_flags & (IFF_UP | IFF_RUNNING | IFF_LOOPBACK)) == (IFF_UP | IFF_RUNNING)) { void *addr_ptr; @@ -70,6 +70,7 @@ struct http_co_t int st = 0; int peer_fd = -1; + int timeout_id = -1; int epoll_events = 0; int code = 0; int sent = 0, received = 0; @@ -181,6 +182,11 @@ http_response_t *parse_http_response(std::string res) http_co_t::~http_co_t() { + if (timeout_id >= 0) + { + osd->tfd->clear_timer(timeout_id); + timeout_id = -1; + } callback(code, response); if (peer_fd >= 0) { @@ -214,6 +220,17 @@ void http_co_t::resume() return; } fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); + if (osd->http_request_timeout > 0) + { + timeout_id = osd->tfd->set_timer(1000*osd->http_request_timeout, false, [this](int timer_id) + { + if (response.length() == 0) + { + code = EIO; + } + delete this; + }); + } r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); if (r < 0 && errno != EINPROGRESS) { @@ -366,3 +383,18 @@ void http_co_t::resume() return; } } + +uint64_t stoull_full(std::string str, int base) +{ + if (isspace(str[0])) + { + return 0; + } + size_t end = -1; + uint64_t r = std::stoull(str, &end, base); + if (end < str.length()) + { + return 0; + } + return r; +} diff --git a/osd_http.h b/osd_http.h index 3f7eb2bb..75cbf467 100644 --- a/osd_http.h +++ b/osd_http.h @@ -13,4 +13,5 @@ struct http_response_t }; http_response_t *parse_http_response(std::string res); -std::vector getifaddr_list(); +std::vector getifaddr_list(bool include_v6 = false); +uint64_t stoull_full(std::string str, int base = 10); diff --git a/osd_peering.cpp b/osd_peering.cpp index 3afcb8ee..12df17b9 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -6,74 +6,6 @@ #include "base64.h" #include "osd.h" -void osd_t::init_primary() -{ - if (consul_address == "") - { - // Test version of clustering code with 1 PG and 2 peers - // Example: peers = 2:127.0.0.1:11204,3:127.0.0.1:11205 - std::string peerstr = config["peers"]; - while (peerstr.size()) - { - int pos = peerstr.find(','); - parse_test_peer(pos < 0 ? peerstr : peerstr.substr(0, pos)); - peerstr = pos < 0 ? std::string("") : peerstr.substr(pos+1); - } - if (peer_states.size() < 2) - { - throw std::runtime_error("run_primary requires at least 2 peers"); - } - pgs[1] = (pg_t){ - .state = PG_PEERING, - .pg_cursize = 0, - .pg_num = 1, - .target_set = { 1, 2, 3 }, - .cur_set = { 0, 0, 0 }, - }; - pgs[1].print_state(); - pg_count = 1; - peering_state = OSD_CONNECTING_PEERS; - } - else - { - peering_state = OSD_LOADING_PGS; - load_pgs(); - } - if (autosync_interval > 0) - { - this->sync_tfd = new timerfd_interval(ringloop, 3, [this]() - { - autosync(); - }); - } -} - -void osd_t::parse_test_peer(std::string peer) -{ - // OSD_NUM:IP:PORT - int pos1 = peer.find(':'); - int pos2 = peer.find(':', pos1+1); - if (pos1 < 0 || pos2 < 0) - throw new std::runtime_error("OSD peer string must be in the form OSD_NUM:IP:PORT"); - std::string addr = peer.substr(pos1+1, pos2-pos1-1); - std::string osd_num_str = peer.substr(0, pos1); - std::string port_str = peer.substr(pos2+1); - osd_num_t osd_num = strtoull(osd_num_str.c_str(), NULL, 10); - if (!osd_num) - throw new std::runtime_error("Could not parse OSD peer osd_num"); - else if (peer_states.find(osd_num) != peer_states.end()) - throw std::runtime_error("Same osd number "+std::to_string(osd_num)+" specified twice in peers"); - int port = strtoull(port_str.c_str(), NULL, 10); - if (!port) - throw new std::runtime_error("Could not parse OSD peer port"); - peer_states[osd_num] = json11::Json::object { - { "state", "up" }, - { "addresses", json11::Json::array { addr } }, - { "port", port }, - }; - wanted_peers[osd_num] = { 0 }; -} - void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function callback) { struct sockaddr_in addr; @@ -92,6 +24,18 @@ void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port return; } fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); + int timeout_id = -1; + if (peer_connect_timeout > 0) + { + tfd->set_timer(1000*peer_connect_timeout, false, [this, peer_fd](int timer_id) + { + auto callback = clients[peer_fd].connect_callback; + osd_num_t osd_num = clients[peer_fd].osd_num; + stop_client(peer_fd); + callback(osd_num, -EIO); + return; + }); + } r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); if (r < 0 && errno != EINPROGRESS) { @@ -105,6 +49,7 @@ void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port .peer_fd = peer_fd, .peer_state = PEER_CONNECTING, .connect_callback = callback, + .connect_timeout_id = timeout_id, .osd_num = osd_num, .in_buf = malloc(receive_buffer_size), }; @@ -122,6 +67,11 @@ void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port void osd_t::handle_connect_result(int peer_fd) { auto & cl = clients[peer_fd]; + if (cl.connect_timeout_id >= 0) + { + tfd->clear_timer(cl.connect_timeout_id); + cl.connect_timeout_id = -1; + } osd_num_t osd_num = cl.osd_num; auto callback = cl.connect_callback; int result = 0; diff --git a/osd_peering_pg.cpp b/osd_peering_pg.cpp index 21d2a7d7..e554070b 100644 --- a/osd_peering_pg.cpp +++ b/osd_peering_pg.cpp @@ -110,7 +110,6 @@ void pg_obj_state_check_t::handle_version() n_copies++; if (replica >= pg->pg_size) { - // FIXME In the future, check it against the PG epoch number to handle replication factor/scheme changes n_buggy++; } else