Remove connect callback because it is always the same

trace-sqes
Vitaliy Filippov 2020-05-22 12:45:12 +03:00
parent f6a01a4819
commit 9f842ec9a5
3 changed files with 42 additions and 45 deletions

4
osd.h
View File

@ -130,7 +130,6 @@ struct osd_client_t
int peer_port; int peer_port;
int peer_fd; int peer_fd;
int peer_state; int peer_state;
std::function<void(osd_num_t, int)> connect_callback;
int connect_timeout_id = -1; int connect_timeout_id = -1;
osd_num_t osd_num = 0; osd_num_t osd_num = 0;
@ -290,6 +289,7 @@ class osd_t
void report_pg_states(); void report_pg_states();
void apply_pg_count(); void apply_pg_count();
void apply_pg_config(); void apply_pg_config();
void on_connect_peer(osd_num_t peer_osd, int peer_fd);
void load_and_connect_peers(); void load_and_connect_peers();
// event loop, socket read/write // event loop, socket read/write
@ -307,7 +307,7 @@ class osd_t
void outbox_push(osd_client_t & cl, osd_op_t *op); void outbox_push(osd_client_t & cl, osd_op_t *op);
// peer handling (primary OSD logic) // peer handling (primary OSD logic)
void connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function<void(osd_num_t, int)> callback); void connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port);
void handle_connect_result(int peer_fd); void handle_connect_result(int peer_fd);
void check_peer_config(osd_client_t & cl); void check_peer_config(osd_client_t & cl);
void cancel_osd_ops(osd_client_t & cl); void cancel_osd_ops(osd_client_t & cl);

View File

@ -729,6 +729,38 @@ void osd_t::report_pg_states()
}); });
} }
void osd_t::on_connect_peer(osd_num_t peer_osd, int peer_fd)
{
wanted_peers[peer_osd].connecting = false;
if (peer_fd < 0)
{
int64_t peer_port = st_cli.peer_states[peer_osd]["port"].int64_value();
auto & addrs = st_cli.peer_states[peer_osd]["addresses"].array_items();
const char *addr = addrs[wanted_peers[peer_osd].address_index].string_value().c_str();
printf("Failed to connect to peer OSD %lu address %s port %ld: %s\n", peer_osd, addr, peer_port, strerror(-peer_fd));
if (wanted_peers[peer_osd].address_index < addrs.size()-1)
{
// Try all addresses
wanted_peers[peer_osd].address_index++;
}
else
{
wanted_peers[peer_osd].last_connect_attempt = time(NULL);
st_cli.peer_states.erase(peer_osd);
}
return;
}
printf("Connected with peer OSD %lu (fd %d)\n", clients[peer_fd].osd_num, peer_fd);
wanted_peers.erase(peer_osd);
if (!wanted_peers.size())
{
// Connected to all peers
printf("Connected to all peers\n");
peering_state = peering_state & ~OSD_CONNECTING_PEERS;
}
repeer_pgs(peer_osd);
}
void osd_t::load_and_connect_peers() void osd_t::load_and_connect_peers()
{ {
json11::Json::array load_peer_txn; json11::Json::array load_peer_txn;
@ -767,37 +799,7 @@ void osd_t::load_and_connect_peers()
const std::string addr = st_cli.peer_states[peer_osd]["addresses"][wp_it->second.address_index].string_value(); const std::string addr = st_cli.peer_states[peer_osd]["addresses"][wp_it->second.address_index].string_value();
int64_t peer_port = st_cli.peer_states[peer_osd]["port"].int64_value(); int64_t peer_port = st_cli.peer_states[peer_osd]["port"].int64_value();
wp_it++; wp_it++;
connect_peer(peer_osd, addr.c_str(), peer_port, [this](osd_num_t peer_osd, int peer_fd) connect_peer(peer_osd, addr.c_str(), peer_port);
{
wanted_peers[peer_osd].connecting = false;
if (peer_fd < 0)
{
int64_t peer_port = st_cli.peer_states[peer_osd]["port"].int64_value();
auto & addrs = st_cli.peer_states[peer_osd]["addresses"].array_items();
const char *addr = addrs[wanted_peers[peer_osd].address_index].string_value().c_str();
printf("Failed to connect to peer OSD %lu address %s port %ld: %s\n", peer_osd, addr, peer_port, strerror(-peer_fd));
if (wanted_peers[peer_osd].address_index < addrs.size()-1)
{
// Try all addresses
wanted_peers[peer_osd].address_index++;
}
else
{
wanted_peers[peer_osd].last_connect_attempt = time(NULL);
st_cli.peer_states.erase(peer_osd);
}
return;
}
printf("Connected with peer OSD %lu (fd %d)\n", clients[peer_fd].osd_num, peer_fd);
wanted_peers.erase(peer_osd);
if (!wanted_peers.size())
{
// Connected to all peers
printf("Connected to all peers\n");
peering_state = peering_state & ~OSD_CONNECTING_PEERS;
}
repeer_pgs(peer_osd);
});
} }
else else
{ {

View File

@ -6,13 +6,13 @@
#include "base64.h" #include "base64.h"
#include "osd.h" #include "osd.h"
void osd_t::connect_peer(osd_num_t peer_osd, const char *peer_host, int peer_port, std::function<void(osd_num_t, int)> callback) void osd_t::connect_peer(osd_num_t peer_osd, const char *peer_host, int peer_port)
{ {
struct sockaddr_in addr; struct sockaddr_in addr;
int r; int r;
if ((r = inet_pton(AF_INET, peer_host, &addr.sin_addr)) != 1) if ((r = inet_pton(AF_INET, peer_host, &addr.sin_addr)) != 1)
{ {
callback(peer_osd, -EINVAL); on_connect_peer(peer_osd, -EINVAL);
return; return;
} }
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
@ -20,7 +20,7 @@ void osd_t::connect_peer(osd_num_t peer_osd, const char *peer_host, int peer_por
int peer_fd = socket(AF_INET, SOCK_STREAM, 0); int peer_fd = socket(AF_INET, SOCK_STREAM, 0);
if (peer_fd < 0) if (peer_fd < 0)
{ {
callback(peer_osd, -errno); on_connect_peer(peer_osd, -errno);
return; return;
} }
fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
@ -29,10 +29,9 @@ void osd_t::connect_peer(osd_num_t peer_osd, const char *peer_host, int peer_por
{ {
timeout_id = tfd->set_timer(1000*peer_connect_timeout, false, [this, peer_fd](int timer_id) timeout_id = tfd->set_timer(1000*peer_connect_timeout, false, [this, peer_fd](int timer_id)
{ {
auto callback = clients[peer_fd].connect_callback;
osd_num_t peer_osd = clients[peer_fd].osd_num; osd_num_t peer_osd = clients[peer_fd].osd_num;
stop_client(peer_fd); stop_client(peer_fd);
callback(peer_osd, -EIO); on_connect_peer(peer_osd, -EIO);
return; return;
}); });
} }
@ -40,7 +39,7 @@ void osd_t::connect_peer(osd_num_t peer_osd, const char *peer_host, int peer_por
if (r < 0 && errno != EINPROGRESS) if (r < 0 && errno != EINPROGRESS)
{ {
close(peer_fd); close(peer_fd);
callback(peer_osd, -errno); on_connect_peer(peer_osd, -errno);
return; return;
} }
assert(peer_osd != osd_num); assert(peer_osd != osd_num);
@ -49,7 +48,6 @@ void osd_t::connect_peer(osd_num_t peer_osd, const char *peer_host, int peer_por
.peer_port = peer_port, .peer_port = peer_port,
.peer_fd = peer_fd, .peer_fd = peer_fd,
.peer_state = PEER_CONNECTING, .peer_state = PEER_CONNECTING,
.connect_callback = callback,
.connect_timeout_id = timeout_id, .connect_timeout_id = timeout_id,
.osd_num = peer_osd, .osd_num = peer_osd,
.in_buf = malloc(receive_buffer_size), .in_buf = malloc(receive_buffer_size),
@ -81,9 +79,8 @@ void osd_t::handle_connect_result(int peer_fd)
} }
if (result != 0) if (result != 0)
{ {
auto callback = cl.connect_callback;
stop_client(peer_fd); stop_client(peer_fd);
callback(peer_osd, -result); on_connect_peer(peer_osd, -result);
return; return;
} }
int one = 1; int one = 1;
@ -144,9 +141,7 @@ void osd_t::check_peer_config(osd_client_t & cl)
return; return;
} }
osd_peer_fds[cl.osd_num] = cl.peer_fd; osd_peer_fds[cl.osd_num] = cl.peer_fd;
auto callback = cl.connect_callback; on_connect_peer(cl.osd_num, cl.peer_fd);
cl.connect_callback = NULL;
callback(cl.osd_num, cl.peer_fd);
delete op; delete op;
}; };
outbox_push(cl, op); outbox_push(cl, op);