From 642802b595ab8fc747ab84e37efaf289f76ac8ab Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 20 Apr 2020 17:44:03 +0300 Subject: [PATCH] Auto-select port numbers --- Makefile | 2 +- osd.cpp | 96 +++++++++++++++++++++++++++++++------------- osd.h | 4 +- osd_cluster.cpp | 98 ++++++++++++++++++++++++++++++--------------- osd_http.cpp | 8 ++-- osd_http.h | 2 +- osd_main.cpp | 2 + osd_peering.cpp | 41 ++++++++++--------- timerfd_manager.cpp | 5 ++- 9 files changed, 170 insertions(+), 88 deletions(-) diff --git a/Makefile b/Makefile index e8a5a89f..10504d87 100644 --- a/Makefile +++ b/Makefile @@ -55,7 +55,7 @@ osd_rmw_test: osd_rmw_test.cpp osd_rmw.cpp osd_rmw.h xor.h g++ $(CXXFLAGS) -o $@ $< osd_primary.o: osd_primary.cpp osd.h osd_ops.h osd_peering_pg.h xor.h ringloop.h g++ $(CXXFLAGS) -c -o $@ $< -osd.o: osd.cpp osd.h osd_ops.h osd_peering_pg.h ringloop.h +osd.o: osd.cpp osd.h osd_http.h osd_ops.h osd_peering_pg.h ringloop.h g++ $(CXXFLAGS) -c -o $@ $< osd: ./libblockstore.so osd_main.cpp osd.h osd_ops.h $(OSD_OBJS) g++ $(CXXFLAGS) -o osd osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring diff --git a/osd.cpp b/osd.cpp index 35a590d4..3c6d0971 100644 --- a/osd.cpp +++ b/osd.cpp @@ -6,6 +6,7 @@ #include #include "osd.h" +#include "osd_http.h" const char* osd_op_names[] = { "", @@ -35,7 +36,11 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo parse_config(config); - bind_socket(); + epoll_fd = epoll_create(1); + if (epoll_fd < 0) + { + throw std::runtime_error(std::string("epoll_create: ") + strerror(errno)); + } this->stats_tfd = new timerfd_interval(ringloop, print_stats_interval, [this]() { @@ -101,6 +106,7 @@ osd_op_t::~osd_op_t() void osd_t::parse_config(blockstore_config_t & config) { + // Initial startup configuration consul_address = config["consul_address"]; consul_host = consul_address.find(':') >= 0 ? consul_address.substr(0, consul_address.find(':')) : consul_address; consul_prefix = config["consul_prefix"]; @@ -109,21 +115,25 @@ void osd_t::parse_config(blockstore_config_t & config) consul_report_interval = strtoull(config["consul_report_interval"].c_str(), NULL, 10); if (consul_report_interval <= 0) consul_report_interval = 30; - bind_address = config["bind_address"]; - if (bind_address == "") - bind_address = "0.0.0.0"; - // FIXME: select port automatically from range - bind_port = strtoull(config["bind_port"].c_str(), NULL, 10); - if (!bind_port || bind_port > 65535) - bind_port = 11203; osd_num = strtoull(config["osd_num"].c_str(), NULL, 10); if (!osd_num) throw std::runtime_error("osd_num is required in the configuration"); + run_primary = config["run_primary"] != "false" && config["run_primary"] != "0" && config["run_primary"] != "no"; + // Cluster configuration + bind_address = config["bind_address"]; + if (bind_address == "") + bind_address = "0.0.0.0"; + bind_port = stoull_full(config["bind_port"]); + if (bind_port <= 0 || bind_port > 65535) + bind_port = 0; + if (config.find("bind_port_range_start") != config.end()) + bind_port_range_start = stoull_full(config["bind_port_range_start"]); + if (config.find("bind_port_range_end") != config.end()) + bind_port_range_end = stoull_full(config["bind_port_range_end"]); if (config["immediate_commit"] == "all") immediate_commit = IMMEDIATE_ALL; else if (config["immediate_commit"] == "small") immediate_commit = IMMEDIATE_SMALL; - run_primary = config["run_primary"] == "true" || config["run_primary"] == "1" || config["run_primary"] == "yes"; autosync_interval = strtoull(config["autosync_interval"].c_str(), NULL, 10); if (autosync_interval < 0 || autosync_interval > MAX_AUTOSYNC_INTERVAL) autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; @@ -164,12 +174,47 @@ void osd_t::bind_socket() throw std::runtime_error("bind address "+bind_address+(r == 0 ? " is not valid" : ": no ipv4 support")); } addr.sin_family = AF_INET; - addr.sin_port = htons(bind_port); - if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0) + if (bind_port == 0 && bind_port_range_start > 0 && + bind_port_range_end > bind_port_range_start && bind_port_range_end < 65535) { - close(listen_fd); - throw std::runtime_error(std::string("bind: ") + strerror(errno)); + for (listening_port = bind_port_range_start; listening_port != bind_port_range_end; listening_port++) + { + addr.sin_port = htons(listening_port); + if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) == 0) + { + break; + } + } + if (listening_port == bind_port_range_end) + { + listening_port = 0; + close(listen_fd); + throw std::runtime_error(std::string("bind: ") + strerror(errno)); + } + } + else + { + addr.sin_port = htons(bind_port); + if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0) + { + close(listen_fd); + throw std::runtime_error(std::string("bind: ") + strerror(errno)); + } + if (bind_port == 0) + { + socklen_t len = sizeof(addr); + if (getsockname(listen_fd, (sockaddr *)&addr, &len) == -1) + { + close(listen_fd); + throw std::runtime_error(std::string("getsockname: ") + strerror(errno)); + } + listening_port = ntohs(addr.sin_port); + } + else + { + listening_port = bind_port; + } } if (listen(listen_fd, listen_backlog) < 0) @@ -180,13 +225,6 @@ void osd_t::bind_socket() fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK); - epoll_fd = epoll_create(1); - if (epoll_fd < 0) - { - close(listen_fd); - throw std::runtime_error(std::string("epoll_create: ") + strerror(errno)); - } - epoll_event ev; ev.data.fd = listen_fd; ev.events = EPOLLIN | EPOLLET; @@ -366,18 +404,19 @@ void osd_t::stop_client(int peer_fd) return; } osd_client_t cl = it->second; - if (cl.osd_num) + if (cl.peer_state == PEER_CONNECTED) { - printf("[%lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl.osd_num); - if (cl.peer_state == PEER_CONNECTED) + if (cl.osd_num) { // Reload configuration from Consul when the connection is dropped + printf("[%lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl.osd_num); peer_states.erase(cl.osd_num); + repeer_pgs(cl.osd_num); + } + else + { + printf("[%lu] Stopping client %d (regular client)\n", osd_num, peer_fd); } - } - else - { - printf("[%lu] Stopping client %d (regular client)\n", osd_num, peer_fd); } clients.erase(it); if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, peer_fd, NULL) < 0) @@ -389,7 +428,6 @@ void osd_t::stop_client(int peer_fd) // Cancel outbound operations cancel_osd_ops(cl); osd_peer_fds.erase(cl.osd_num); - repeer_pgs(cl.osd_num); peering_state |= OSD_CONNECTING_PEERS; } if (cl.read_op) @@ -506,6 +544,8 @@ void osd_t::print_stats() { uint64_t avg = (subop_stat_sum[0][i] - subop_stat_sum[1][i])/(subop_stat_count[0][i] - subop_stat_count[1][i]); printf("avg latency for subop %d (%s): %ld us\n", i, osd_op_names[i], avg); + subop_stat_count[1][i] = subop_stat_count[0][i]; + subop_stat_sum[1][i] = subop_stat_sum[0][i]; } } if (incomplete_objects > 0) diff --git a/osd.h b/osd.h index bea29803..d95ce141 100644 --- a/osd.h +++ b/osd.h @@ -196,13 +196,14 @@ class osd_t // config + blockstore_config_t config; bool readonly = false; std::string consul_address, consul_host, consul_prefix = "microceph"; osd_num_t osd_num = 1; // OSD numbers start with 1 bool run_primary = false; - blockstore_config_t config; std::string bind_address; int bind_port, listen_backlog; + int bind_port_range_start = 11200, bind_port_range_end = 12000; int client_queue_depth = 128; bool allow_test_ops = true; int receive_buffer_size = 9000; @@ -250,6 +251,7 @@ class osd_t int wait_state = 0; int epoll_fd = 0; + int listening_port = 0; int listen_fd = 0; ring_consumer_t consumer; std::map> epoll_handlers; diff --git a/osd_cluster.cpp b/osd_cluster.cpp index fdf68f07..0acfd98b 100644 --- a/osd_cluster.cpp +++ b/osd_cluster.cpp @@ -20,6 +20,10 @@ void osd_t::init_cluster() report_status(); }); } + else + { + bind_socket(); + } } void osd_t::init_primary() @@ -74,20 +78,20 @@ void osd_t::parse_test_peer(std::string peer) std::string addr = peer.substr(pos1+1, pos2-pos1-1); std::string osd_num_str = peer.substr(0, pos1); std::string port_str = peer.substr(pos2+1); - osd_num_t osd_num = strtoull(osd_num_str.c_str(), NULL, 10); - if (!osd_num) + osd_num_t peer_osd = strtoull(osd_num_str.c_str(), NULL, 10); + if (!peer_osd) throw new std::runtime_error("Could not parse OSD peer osd_num"); - else if (peer_states.find(osd_num) != peer_states.end()) - throw std::runtime_error("Same osd number "+std::to_string(osd_num)+" specified twice in peers"); + else if (peer_states.find(peer_osd) != peer_states.end()) + throw std::runtime_error("Same osd number "+std::to_string(peer_osd)+" specified twice in peers"); int port = strtoull(port_str.c_str(), NULL, 10); if (!port) throw new std::runtime_error("Could not parse OSD peer port"); - peer_states[osd_num] = json11::Json::object { + peer_states[peer_osd] = json11::Json::object { { "state", "up" }, { "addresses", json11::Json::array { addr } }, { "port", port }, }; - wanted_peers[osd_num] = { 0 }; + wanted_peers[peer_osd] = { 0 }; } json11::Json osd_t::get_status() @@ -105,7 +109,7 @@ json11::Json osd_t::get_status() bind_addresses = getifaddr_list(); st["addresses"] = bind_addresses; } - st["port"] = bind_port; + st["port"] = listening_port; st["primary_enabled"] = run_primary; st["blockstore_ready"] = bs->is_started(); st["blockstore_enabled"] = bs ? true : false; @@ -217,7 +221,7 @@ void osd_t::consul_txn(json11::Json txn, std::function Load PGs -> Load peers -> Connect to peers -> Peer PGs +// Start -> Load config & PGs -> Load peers -> Connect to peers -> Peer PGs // Wait for PG changes -> Start/Stop PGs when requested // Peer connection is lost -> Reload connection data -> Try to reconnect -> Repeat void osd_t::load_pgs() @@ -232,6 +236,12 @@ void osd_t::load_pgs() { "Value", base64_encode(get_status().dump()) }, } } }, + json11::Json::object { + { "KV", json11::Json::object { + { "Verb", "get-tree" }, + { "Key", consul_prefix+"/config/osd/all" }, + } } + }, json11::Json::object { { "KV", json11::Json::object { { "Verb", "get" }, @@ -257,18 +267,33 @@ void osd_t::load_pgs() return; } peering_state &= ~OSD_LOADING_PGS; + blockstore_config_t osd_config = this->config; json11::Json pg_config; std::map pg_history; for (auto & res: data["Results"].array_items()) { std::string key = res["KV"]["Key"].string_value(); - std::string json_err; - json11::Json value = json11::Json::parse(base64_decode(res["KV"]["Value"].string_value()), json_err); + if (key == (consul_prefix+"/osd/state/"+std::to_string(osd_num)+".")) + { + continue; + } + std::string json_err, json_text = base64_decode(res["KV"]["Value"].string_value()); + json11::Json value = json11::Json::parse(json_text, json_err); if (json_err != "") { - printf("Bad JSON in Consul key %s: %s\n", key.c_str(), json_err.c_str()); + printf("Bad JSON in Consul key %s: %s (value: %s)\n", key.c_str(), json_err.c_str(), json_text.c_str()); } - if (key == consul_prefix+"/config/pgs") + if (key == consul_prefix+"/config/osd/all") + { + for (auto & cfg_var: value.object_items()) + { + if (this->config.find(cfg_var.first) == this->config.end()) + { + osd_config[cfg_var.first] = cfg_var.second.string_value(); + } + } + } + else if (key == consul_prefix+"/config/pgs") { pg_config = value; } @@ -282,7 +307,10 @@ void osd_t::load_pgs() } } } + parse_config(osd_config); + bind_socket(); parse_pgs(pg_config, pg_history); + report_status(); }); } @@ -298,7 +326,7 @@ void osd_t::parse_pgs(const json11::Json & pg_config, const std::maposd_num) + if (primary_osd != 0 && primary_osd == this->osd_num) { // Take this PG std::set all_peers; @@ -367,8 +395,8 @@ void osd_t::load_and_connect_peers() json11::Json::array load_peer_txn; for (auto wp_it = wanted_peers.begin(); wp_it != wanted_peers.end();) { - osd_num_t osd_num = wp_it->first; - if (osd_peer_fds.find(osd_num) != osd_peer_fds.end()) + osd_num_t peer_osd = wp_it->first; + if (osd_peer_fds.find(peer_osd) != osd_peer_fds.end()) { // It shouldn't be here wanted_peers.erase(wp_it++); @@ -378,7 +406,7 @@ void osd_t::load_and_connect_peers() peering_state = peering_state & ~OSD_CONNECTING_PEERS; } } - else if (peer_states.find(osd_num) == peer_states.end()) + else if (peer_states.find(peer_osd) == peer_states.end()) { if (!loading_peer_config && (time(NULL) - wp_it->second.last_load_attempt >= peer_connect_interval)) { @@ -387,7 +415,7 @@ void osd_t::load_and_connect_peers() load_peer_txn.push_back(json11::Json::object { { "KV", json11::Json::object { { "Verb", "get-tree" }, - { "Key", consul_prefix+"/osd/state/"+std::to_string(osd_num)+"." }, + { "Key", consul_prefix+"/osd/state/"+std::to_string(peer_osd)+"." }, } } }); } @@ -398,38 +426,39 @@ void osd_t::load_and_connect_peers() { // Try to connect wp_it->second.connecting = true; - const std::string addr = peer_states[osd_num]["addresses"][wp_it->second.address_index].string_value(); - int64_t port = peer_states[osd_num]["port"].int64_value(); + const std::string addr = peer_states[peer_osd]["addresses"][wp_it->second.address_index].string_value(); + int64_t peer_port = peer_states[peer_osd]["port"].int64_value(); wp_it++; - connect_peer(osd_num, addr.c_str(), port, [this](osd_num_t osd_num, int peer_fd) + connect_peer(peer_osd, addr.c_str(), peer_port, [this](osd_num_t peer_osd, int peer_fd) { - wanted_peers[osd_num].connecting = false; + wanted_peers[peer_osd].connecting = false; if (peer_fd < 0) { - auto & addrs = peer_states[osd_num]["addresses"].array_items(); - const char *addr = addrs[wanted_peers[osd_num].address_index].string_value().c_str(); - printf("Failed to connect to peer OSD %lu address %s: %s\n", osd_num, addr, strerror(-peer_fd)); - if (wanted_peers[osd_num].address_index < addrs.size()-1) + int64_t peer_port = peer_states[peer_osd]["port"].int64_value(); + auto & addrs = peer_states[peer_osd]["addresses"].array_items(); + const char *addr = addrs[wanted_peers[peer_osd].address_index].string_value().c_str(); + printf("Failed to connect to peer OSD %lu address %s port %ld: %s\n", peer_osd, addr, peer_port, strerror(-peer_fd)); + if (wanted_peers[peer_osd].address_index < addrs.size()-1) { // Try all addresses - wanted_peers[osd_num].address_index++; + wanted_peers[peer_osd].address_index++; } else { - wanted_peers[osd_num].last_connect_attempt = time(NULL); - peer_states.erase(osd_num); + wanted_peers[peer_osd].last_connect_attempt = time(NULL); + peer_states.erase(peer_osd); } return; } printf("Connected with peer OSD %lu (fd %d)\n", clients[peer_fd].osd_num, peer_fd); - wanted_peers.erase(osd_num); + wanted_peers.erase(peer_osd); if (!wanted_peers.size()) { // Connected to all peers printf("Connected to all peers\n"); peering_state = peering_state & ~OSD_CONNECTING_PEERS; } - repeer_pgs(osd_num); + repeer_pgs(peer_osd); }); } else @@ -442,6 +471,8 @@ void osd_t::load_and_connect_peers() { consul_txn(load_peer_txn, [this](std::string err, json11::Json data) { + // Ugly, but required to wake up the loop + tfd->set_timer(peer_connect_interval*1000, false, [](int timer_id){}); loading_peer_config = false; if (err != "") { @@ -454,13 +485,14 @@ void osd_t::load_and_connect_peers() // /osd/state/. osd_num_t peer_osd = std::stoull(key.substr(consul_prefix.length()+11, key.length()-consul_prefix.length()-12)); std::string json_err; - json11::Json st = json11::Json::parse(base64_decode(res["KV"]["Value"].string_value()), json_err); + std::string json_text = base64_decode(res["KV"]["Value"].string_value()); + json11::Json st = json11::Json::parse(json_text, json_err); if (json_err != "") { - printf("Bad JSON in Consul key %s: %s\n", key.c_str(), json_err.c_str()); + printf("Bad JSON in Consul key %s: %s (value: %s)\n", key.c_str(), json_err.c_str(), json_text.c_str()); } if (peer_osd > 0 && st.is_object() && st["state"] == "up" && - st["addresses"].is_array() && st["port"].is_number()) + st["addresses"].is_array() && st["port"].int64_value() > 0 && st["port"].int64_value() < 65536) { peer_states[peer_osd] = st; } diff --git a/osd_http.cpp b/osd_http.cpp index 7d17d90e..83d705f5 100644 --- a/osd_http.cpp +++ b/osd_http.cpp @@ -384,15 +384,15 @@ void http_co_t::resume() } } -uint64_t stoull_full(std::string str, int base) +uint64_t stoull_full(const std::string & str, int base) { if (isspace(str[0])) { return 0; } - size_t end = -1; - uint64_t r = std::stoull(str, &end, base); - if (end < str.length()) + char *end = NULL; + uint64_t r = strtoull(str.c_str(), &end, base); + if (end != str.c_str()+str.length()) { return 0; } diff --git a/osd_http.h b/osd_http.h index 75cbf467..7dbeedbd 100644 --- a/osd_http.h +++ b/osd_http.h @@ -14,4 +14,4 @@ struct http_response_t http_response_t *parse_http_response(std::string res); std::vector getifaddr_list(bool include_v6 = false); -uint64_t stoull_full(std::string str, int base = 10); +uint64_t stoull_full(const std::string & str, int base = 10); diff --git a/osd_main.cpp b/osd_main.cpp index 77684939..ceab1480 100644 --- a/osd_main.cpp +++ b/osd_main.cpp @@ -26,6 +26,8 @@ int main(int narg, char *args[]) } signal(SIGINT, handle_sigint); ring_loop_t *ringloop = new ring_loop_t(512); + // FIXME: Create Blockstore from on-disk superblock config and check it against the OSD cluster config + // FIXME: Prevent two OSD starting with same number blockstore_t *bs = new blockstore_t(config, ringloop); osd_t *osd = new osd_t(config, bs, ringloop); while (1) diff --git a/osd_peering.cpp b/osd_peering.cpp index 12df17b9..d90e7c94 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -6,13 +6,13 @@ #include "base64.h" #include "osd.h" -void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function callback) +void osd_t::connect_peer(osd_num_t peer_osd, const char *peer_host, int peer_port, std::function callback) { struct sockaddr_in addr; int r; if ((r = inet_pton(AF_INET, peer_host, &addr.sin_addr)) != 1) { - callback(osd_num, -EINVAL); + callback(peer_osd, -EINVAL); return; } addr.sin_family = AF_INET; @@ -20,19 +20,19 @@ void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port int peer_fd = socket(AF_INET, SOCK_STREAM, 0); if (peer_fd < 0) { - callback(osd_num, -errno); + callback(peer_osd, -errno); return; } fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); int timeout_id = -1; if (peer_connect_timeout > 0) { - tfd->set_timer(1000*peer_connect_timeout, false, [this, peer_fd](int timer_id) + timeout_id = tfd->set_timer(1000*peer_connect_timeout, false, [this, peer_fd](int timer_id) { auto callback = clients[peer_fd].connect_callback; - osd_num_t osd_num = clients[peer_fd].osd_num; + osd_num_t peer_osd = clients[peer_fd].osd_num; stop_client(peer_fd); - callback(osd_num, -EIO); + callback(peer_osd, -EIO); return; }); } @@ -40,9 +40,10 @@ void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port if (r < 0 && errno != EINPROGRESS) { close(peer_fd); - callback(osd_num, -errno); + callback(peer_osd, -errno); return; } + assert(peer_osd != osd_num); clients[peer_fd] = (osd_client_t){ .peer_addr = addr, .peer_port = peer_port, @@ -50,10 +51,10 @@ void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port .peer_state = PEER_CONNECTING, .connect_callback = callback, .connect_timeout_id = timeout_id, - .osd_num = osd_num, + .osd_num = peer_osd, .in_buf = malloc(receive_buffer_size), }; - osd_peer_fds[osd_num] = peer_fd; + osd_peer_fds[peer_osd] = peer_fd; // Add FD to epoll (EPOLLOUT for tracking connect() result) epoll_event ev; ev.data.fd = peer_fd; @@ -72,7 +73,7 @@ void osd_t::handle_connect_result(int peer_fd) tfd->clear_timer(cl.connect_timeout_id); cl.connect_timeout_id = -1; } - osd_num_t osd_num = cl.osd_num; + osd_num_t peer_osd = cl.osd_num; auto callback = cl.connect_callback; int result = 0; socklen_t result_len = sizeof(result); @@ -83,7 +84,7 @@ void osd_t::handle_connect_result(int peer_fd) if (result != 0) { stop_client(peer_fd); - callback(osd_num, -result); + callback(peer_osd, -result); return; } int one = 1; @@ -98,7 +99,7 @@ void osd_t::handle_connect_result(int peer_fd) { throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); } - callback(osd_num, peer_fd); + callback(peer_osd, peer_fd); } // Peering loop @@ -167,7 +168,7 @@ void osd_t::handle_peers() } } -void osd_t::repeer_pgs(osd_num_t osd_num) +void osd_t::repeer_pgs(osd_num_t peer_osd) { // Re-peer affected PGs for (auto & p: pgs) @@ -177,7 +178,7 @@ void osd_t::repeer_pgs(osd_num_t osd_num) { for (osd_num_t pg_osd: p.second.all_peers) { - if (pg_osd == osd_num) + if (pg_osd == peer_osd) { repeer = true; break; @@ -186,7 +187,7 @@ void osd_t::repeer_pgs(osd_num_t osd_num) if (repeer) { // Repeer this pg - printf("Repeer PG %d because of OSD %lu\n", p.second.pg_num, osd_num); + printf("Repeer PG %d because of OSD %lu\n", p.second.pg_num, peer_osd); start_pg_peering(p.second.pg_num); peering_state |= OSD_PEERING_PGS; } @@ -273,7 +274,7 @@ void osd_t::start_pg_peering(pg_num_t pg_num) std::set cur_peers; for (auto peer_osd: pg.all_peers) { - if (osd_peer_fds.find(peer_osd) != osd_peer_fds.end()) + if (peer_osd == this->osd_num || osd_peer_fds.find(peer_osd) != osd_peer_fds.end()) { cur_peers.insert(peer_osd); } @@ -286,7 +287,7 @@ void osd_t::start_pg_peering(pg_num_t pg_num) if (pg.peering_state) { // Adjust the peering operation that's still in progress - discard unneeded results - for (auto it = pg.peering_state->list_ops.begin(); it != pg.peering_state->list_ops.end(); it++) + for (auto it = pg.peering_state->list_ops.begin(); it != pg.peering_state->list_ops.end();) { if (pg.state == PG_INCOMPLETE || cur_peers.find(it->first) == cur_peers.end()) { @@ -313,8 +314,10 @@ void osd_t::start_pg_peering(pg_num_t pg_num) pg.peering_state->list_ops.erase(it); it = pg.peering_state->list_ops.begin(); } + else + it++; } - for (auto it = pg.peering_state->list_results.begin(); it != pg.peering_state->list_results.end(); it++) + for (auto it = pg.peering_state->list_results.begin(); it != pg.peering_state->list_results.end();) { if (pg.state == PG_INCOMPLETE || cur_peers.find(it->first) == cur_peers.end()) { @@ -325,6 +328,8 @@ void osd_t::start_pg_peering(pg_num_t pg_num) pg.peering_state->list_results.erase(it); it = pg.peering_state->list_results.begin(); } + else + it++; } } if (pg.state == PG_INCOMPLETE) diff --git a/timerfd_manager.cpp b/timerfd_manager.cpp index 3c8f1cb9..6d260c75 100644 --- a/timerfd_manager.cpp +++ b/timerfd_manager.cpp @@ -35,10 +35,11 @@ void timerfd_manager_t::inc_timer(timerfd_timer_t & t) int timerfd_manager_t::set_timer(uint64_t millis, bool repeat, std::function callback) { + int timer_id = id++; timespec start; clock_gettime(CLOCK_MONOTONIC, &start); timers.push_back({ - .id = id++, + .id = timer_id, .millis = millis, .start = start, .next = start, @@ -48,7 +49,7 @@ int timerfd_manager_t::set_timer(uint64_t millis, bool repeat, std::function