From 9f842ec9a54f453c2e18858e7843ccb2b2746b22 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 22 May 2020 12:45:12 +0300 Subject: [PATCH] Remove connect callback because it is always the same --- osd.h | 4 ++-- osd_cluster.cpp | 64 +++++++++++++++++++++++++------------------------ osd_peering.cpp | 19 ++++++--------- 3 files changed, 42 insertions(+), 45 deletions(-) diff --git a/osd.h b/osd.h index 09899399..690734a9 100644 --- a/osd.h +++ b/osd.h @@ -130,7 +130,6 @@ struct osd_client_t int peer_port; int peer_fd; int peer_state; - std::function connect_callback; int connect_timeout_id = -1; osd_num_t osd_num = 0; @@ -290,6 +289,7 @@ class osd_t void report_pg_states(); void apply_pg_count(); void apply_pg_config(); + void on_connect_peer(osd_num_t peer_osd, int peer_fd); void load_and_connect_peers(); // event loop, socket read/write @@ -307,7 +307,7 @@ class osd_t void outbox_push(osd_client_t & cl, osd_op_t *op); // peer handling (primary OSD logic) - void connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function callback); + void connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port); void handle_connect_result(int peer_fd); void check_peer_config(osd_client_t & cl); void cancel_osd_ops(osd_client_t & cl); diff --git a/osd_cluster.cpp b/osd_cluster.cpp index d65e2606..c9c4f131 100644 --- a/osd_cluster.cpp +++ b/osd_cluster.cpp @@ -729,6 +729,38 @@ void osd_t::report_pg_states() }); } +void osd_t::on_connect_peer(osd_num_t peer_osd, int peer_fd) +{ + wanted_peers[peer_osd].connecting = false; + if (peer_fd < 0) + { + int64_t peer_port = st_cli.peer_states[peer_osd]["port"].int64_value(); + auto & addrs = st_cli.peer_states[peer_osd]["addresses"].array_items(); + const char *addr = addrs[wanted_peers[peer_osd].address_index].string_value().c_str(); + printf("Failed to connect to peer OSD %lu address %s port %ld: %s\n", peer_osd, addr, peer_port, strerror(-peer_fd)); + if (wanted_peers[peer_osd].address_index < addrs.size()-1) + { + // Try all addresses + wanted_peers[peer_osd].address_index++; + } + else + { + wanted_peers[peer_osd].last_connect_attempt = time(NULL); + st_cli.peer_states.erase(peer_osd); + } + return; + } + printf("Connected with peer OSD %lu (fd %d)\n", clients[peer_fd].osd_num, peer_fd); + wanted_peers.erase(peer_osd); + if (!wanted_peers.size()) + { + // Connected to all peers + printf("Connected to all peers\n"); + peering_state = peering_state & ~OSD_CONNECTING_PEERS; + } + repeer_pgs(peer_osd); +} + void osd_t::load_and_connect_peers() { json11::Json::array load_peer_txn; @@ -767,37 +799,7 @@ void osd_t::load_and_connect_peers() const std::string addr = st_cli.peer_states[peer_osd]["addresses"][wp_it->second.address_index].string_value(); int64_t peer_port = st_cli.peer_states[peer_osd]["port"].int64_value(); wp_it++; - connect_peer(peer_osd, addr.c_str(), peer_port, [this](osd_num_t peer_osd, int peer_fd) - { - wanted_peers[peer_osd].connecting = false; - if (peer_fd < 0) - { - int64_t peer_port = st_cli.peer_states[peer_osd]["port"].int64_value(); - auto & addrs = st_cli.peer_states[peer_osd]["addresses"].array_items(); - const char *addr = addrs[wanted_peers[peer_osd].address_index].string_value().c_str(); - printf("Failed to connect to peer OSD %lu address %s port %ld: %s\n", peer_osd, addr, peer_port, strerror(-peer_fd)); - if (wanted_peers[peer_osd].address_index < addrs.size()-1) - { - // Try all addresses - wanted_peers[peer_osd].address_index++; - } - else - { - wanted_peers[peer_osd].last_connect_attempt = time(NULL); - st_cli.peer_states.erase(peer_osd); - } - return; - } - printf("Connected with peer OSD %lu (fd %d)\n", clients[peer_fd].osd_num, peer_fd); - wanted_peers.erase(peer_osd); - if (!wanted_peers.size()) - { - // Connected to all peers - printf("Connected to all peers\n"); - peering_state = peering_state & ~OSD_CONNECTING_PEERS; - } - repeer_pgs(peer_osd); - }); + connect_peer(peer_osd, addr.c_str(), peer_port); } else { diff --git a/osd_peering.cpp b/osd_peering.cpp index 35650361..4b91a2a6 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -6,13 +6,13 @@ #include "base64.h" #include "osd.h" -void osd_t::connect_peer(osd_num_t peer_osd, const char *peer_host, int peer_port, std::function callback) +void osd_t::connect_peer(osd_num_t peer_osd, const char *peer_host, int peer_port) { struct sockaddr_in addr; int r; if ((r = inet_pton(AF_INET, peer_host, &addr.sin_addr)) != 1) { - callback(peer_osd, -EINVAL); + on_connect_peer(peer_osd, -EINVAL); return; } addr.sin_family = AF_INET; @@ -20,7 +20,7 @@ void osd_t::connect_peer(osd_num_t peer_osd, const char *peer_host, int peer_por int peer_fd = socket(AF_INET, SOCK_STREAM, 0); if (peer_fd < 0) { - callback(peer_osd, -errno); + on_connect_peer(peer_osd, -errno); return; } fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); @@ -29,10 +29,9 @@ void osd_t::connect_peer(osd_num_t peer_osd, const char *peer_host, int peer_por { timeout_id = tfd->set_timer(1000*peer_connect_timeout, false, [this, peer_fd](int timer_id) { - auto callback = clients[peer_fd].connect_callback; osd_num_t peer_osd = clients[peer_fd].osd_num; stop_client(peer_fd); - callback(peer_osd, -EIO); + on_connect_peer(peer_osd, -EIO); return; }); } @@ -40,7 +39,7 @@ void osd_t::connect_peer(osd_num_t peer_osd, const char *peer_host, int peer_por if (r < 0 && errno != EINPROGRESS) { close(peer_fd); - callback(peer_osd, -errno); + on_connect_peer(peer_osd, -errno); return; } assert(peer_osd != osd_num); @@ -49,7 +48,6 @@ void osd_t::connect_peer(osd_num_t peer_osd, const char *peer_host, int peer_por .peer_port = peer_port, .peer_fd = peer_fd, .peer_state = PEER_CONNECTING, - .connect_callback = callback, .connect_timeout_id = timeout_id, .osd_num = peer_osd, .in_buf = malloc(receive_buffer_size), @@ -81,9 +79,8 @@ void osd_t::handle_connect_result(int peer_fd) } if (result != 0) { - auto callback = cl.connect_callback; stop_client(peer_fd); - callback(peer_osd, -result); + on_connect_peer(peer_osd, -result); return; } int one = 1; @@ -144,9 +141,7 @@ void osd_t::check_peer_config(osd_client_t & cl) return; } osd_peer_fds[cl.osd_num] = cl.peer_fd; - auto callback = cl.connect_callback; - cl.connect_callback = NULL; - callback(cl.osd_num, cl.peer_fd); + on_connect_peer(cl.osd_num, cl.peer_fd); delete op; }; outbox_push(cl, op);