Fix crashes on multiple OSD reconnects

Identify clients by pointers instead of peer_fd as peer may be dropped
and reconnected between callbacks

Yeah maybe I need some Rust, but ... maybe in the future :)
Vitaliy Filippov 2020-10-17 10:52:21 +00:00
parent 9350656af6
commit 776fe954a5
6 changed files with 258 additions and 240 deletions

View File

@ -102,7 +102,7 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer
{ {
timeout_id = tfd->set_timer(1000*peer_connect_timeout, false, [this, peer_fd](int timer_id) timeout_id = tfd->set_timer(1000*peer_connect_timeout, false, [this, peer_fd](int timer_id)
{ {
osd_num_t peer_osd = clients[peer_fd].osd_num; osd_num_t peer_osd = clients[peer_fd]->osd_num;
stop_client(peer_fd); stop_client(peer_fd);
on_connect_peer(peer_osd, -EIO); on_connect_peer(peer_osd, -EIO);
return; return;
@ -116,7 +116,7 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer
return; return;
} }
assert(peer_osd != this->osd_num); assert(peer_osd != this->osd_num);
clients[peer_fd] = (osd_client_t){ clients[peer_fd] = new osd_client_t({
.peer_addr = addr, .peer_addr = addr,
.peer_port = peer_port, .peer_port = peer_port,
.peer_fd = peer_fd, .peer_fd = peer_fd,
@ -124,7 +124,7 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer
.connect_timeout_id = timeout_id, .connect_timeout_id = timeout_id,
.osd_num = peer_osd, .osd_num = peer_osd,
.in_buf = malloc_or_die(receive_buffer_size), .in_buf = malloc_or_die(receive_buffer_size),
}; });
tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events) tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events)
{ {
// Either OUT (connected) or HUP // Either OUT (connected) or HUP
@ -134,13 +134,13 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer
void osd_messenger_t::handle_connect_epoll(int peer_fd) void osd_messenger_t::handle_connect_epoll(int peer_fd)
{ {
auto & cl = clients[peer_fd]; auto cl = clients[peer_fd];
if (cl.connect_timeout_id >= 0) if (cl->connect_timeout_id >= 0)
{ {
tfd->clear_timer(cl.connect_timeout_id); tfd->clear_timer(cl->connect_timeout_id);
cl.connect_timeout_id = -1; cl->connect_timeout_id = -1;
} }
osd_num_t peer_osd = cl.osd_num; osd_num_t peer_osd = cl->osd_num;
int result = 0; int result = 0;
socklen_t result_len = sizeof(result); socklen_t result_len = sizeof(result);
if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0)
@ -155,7 +155,7 @@ void osd_messenger_t::handle_connect_epoll(int peer_fd)
} }
int one = 1; int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
cl.peer_state = PEER_CONNECTED; cl->peer_state = PEER_CONNECTED;
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events) tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
{ {
handle_peer_epoll(peer_fd, epoll_events); handle_peer_epoll(peer_fd, epoll_events);
@ -176,11 +176,11 @@ void osd_messenger_t::handle_peer_epoll(int peer_fd, int epoll_events)
else if (epoll_events & EPOLLIN) else if (epoll_events & EPOLLIN)
{ {
// Mark client as ready (i.e. some data is available) // Mark client as ready (i.e. some data is available)
auto & cl = clients[peer_fd]; auto cl = clients[peer_fd];
cl.read_ready++; cl->read_ready++;
if (cl.read_ready == 1) if (cl->read_ready == 1)
{ {
read_ready_clients.push_back(cl.peer_fd); read_ready_clients.push_back(cl->peer_fd);
if (ringloop) if (ringloop)
ringloop->wakeup(); ringloop->wakeup();
else else
@ -228,11 +228,11 @@ void osd_messenger_t::on_connect_peer(osd_num_t peer_osd, int peer_fd)
repeer_pgs(peer_osd); repeer_pgs(peer_osd);
} }
void osd_messenger_t::check_peer_config(osd_client_t & cl) void osd_messenger_t::check_peer_config(osd_client_t *cl)
{ {
osd_op_t *op = new osd_op_t(); osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT; op->op_type = OSD_OP_OUT;
op->peer_fd = cl.peer_fd; op->peer_fd = cl->peer_fd;
op->req = { op->req = {
.show_conf = { .show_conf = {
.header = { .header = {
@ -242,16 +242,15 @@ void osd_messenger_t::check_peer_config(osd_client_t & cl)
}, },
}, },
}; };
op->callback = [this](osd_op_t *op) op->callback = [this, cl](osd_op_t *op)
{ {
osd_client_t & cl = clients[op->peer_fd];
std::string json_err; std::string json_err;
json11::Json config; json11::Json config;
bool err = false; bool err = false;
if (op->reply.hdr.retval < 0) if (op->reply.hdr.retval < 0)
{ {
err = true; err = true;
printf("Failed to get config from OSD %lu (retval=%ld), disconnecting peer\n", cl.osd_num, op->reply.hdr.retval); printf("Failed to get config from OSD %lu (retval=%ld), disconnecting peer\n", cl->osd_num, op->reply.hdr.retval);
} }
else else
{ {
@ -259,45 +258,45 @@ void osd_messenger_t::check_peer_config(osd_client_t & cl)
if (json_err != "") if (json_err != "")
{ {
err = true; err = true;
printf("Failed to get config from OSD %lu: bad JSON: %s, disconnecting peer\n", cl.osd_num, json_err.c_str()); printf("Failed to get config from OSD %lu: bad JSON: %s, disconnecting peer\n", cl->osd_num, json_err.c_str());
} }
else if (config["osd_num"].uint64_value() != cl.osd_num) else if (config["osd_num"].uint64_value() != cl->osd_num)
{ {
err = true; err = true;
printf("Connected to OSD %lu instead of OSD %lu, peer state is outdated, disconnecting peer\n", config["osd_num"].uint64_value(), cl.osd_num); printf("Connected to OSD %lu instead of OSD %lu, peer state is outdated, disconnecting peer\n", config["osd_num"].uint64_value(), cl->osd_num);
} }
} }
if (err) if (err)
{ {
osd_num_t osd_num = cl.osd_num; osd_num_t osd_num = cl->osd_num;
stop_client(op->peer_fd); stop_client(op->peer_fd);
on_connect_peer(osd_num, -1); on_connect_peer(osd_num, -1);
delete op; delete op;
return; return;
} }
osd_peer_fds[cl.osd_num] = cl.peer_fd; osd_peer_fds[cl->osd_num] = cl->peer_fd;
on_connect_peer(cl.osd_num, cl.peer_fd); on_connect_peer(cl->osd_num, cl->peer_fd);
delete op; delete op;
}; };
outbox_push(op); outbox_push(op);
} }
void osd_messenger_t::cancel_osd_ops(osd_client_t & cl) void osd_messenger_t::cancel_osd_ops(osd_client_t *cl)
{ {
for (auto p: cl.sent_ops) for (auto p: cl->sent_ops)
{ {
cancel_op(p.second); cancel_op(p.second);
} }
cl.sent_ops.clear(); cl->sent_ops.clear();
for (auto op: cl.outbox) for (auto op: cl->outbox)
{ {
cancel_op(op); cancel_op(op);
} }
cl.outbox.clear(); cl->outbox.clear();
if (cl.write_op) if (cl->write_op)
{ {
cancel_op(cl.write_op); cancel_op(cl->write_op);
cl.write_op = NULL; cl->write_op = NULL;
} }
} }
@ -328,15 +327,15 @@ void osd_messenger_t::stop_client(int peer_fd)
return; return;
} }
uint64_t repeer_osd = 0; uint64_t repeer_osd = 0;
osd_client_t cl = it->second; osd_client_t *cl = it->second;
if (cl.peer_state == PEER_CONNECTED) if (cl->peer_state == PEER_CONNECTED)
{ {
if (cl.osd_num) if (cl->osd_num)
{ {
// Reload configuration from etcd when the connection is dropped // Reload configuration from etcd when the connection is dropped
if (log_level > 0) if (log_level > 0)
printf("[OSD %lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl.osd_num); printf("[OSD %lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl->osd_num);
repeer_osd = cl.osd_num; repeer_osd = cl->osd_num;
} }
else else
{ {
@ -344,18 +343,19 @@ void osd_messenger_t::stop_client(int peer_fd)
printf("[OSD %lu] Stopping client %d (regular client)\n", osd_num, peer_fd); printf("[OSD %lu] Stopping client %d (regular client)\n", osd_num, peer_fd);
} }
} }
cl->peer_state = PEER_STOPPED;
clients.erase(it); clients.erase(it);
tfd->set_fd_handler(peer_fd, false, NULL); tfd->set_fd_handler(peer_fd, false, NULL);
if (cl.osd_num) if (cl->osd_num)
{ {
osd_peer_fds.erase(cl.osd_num); osd_peer_fds.erase(cl->osd_num);
// Cancel outbound operations // Cancel outbound operations
cancel_osd_ops(cl); cancel_osd_ops(cl);
} }
if (cl.read_op) if (cl->read_op)
{ {
delete cl.read_op; delete cl->read_op;
cl.read_op = NULL; cl->read_op = NULL;
} }
for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++) for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++)
{ {
@ -373,8 +373,13 @@ void osd_messenger_t::stop_client(int peer_fd)
break; break;
} }
} }
free(cl.in_buf); free(cl->in_buf);
cl->in_buf = NULL;
close(peer_fd); close(peer_fd);
if (cl->refs <= 0)
{
delete cl;
}
if (repeer_osd) if (repeer_osd)
{ {
repeer_pgs(repeer_osd); repeer_pgs(repeer_osd);
@ -396,13 +401,13 @@ void osd_messenger_t::accept_connections(int listen_fd)
fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
int one = 1; int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
clients[peer_fd] = { clients[peer_fd] = new osd_client_t({
.peer_addr = addr, .peer_addr = addr,
.peer_port = ntohs(addr.sin_port), .peer_port = ntohs(addr.sin_port),
.peer_fd = peer_fd, .peer_fd = peer_fd,
.peer_state = PEER_CONNECTED, .peer_state = PEER_CONNECTED,
.in_buf = malloc_or_die(receive_buffer_size), .in_buf = malloc_or_die(receive_buffer_size),
}; });
// Add FD to epoll // Add FD to epoll
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events) tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
{ {

View File

@ -30,6 +30,7 @@
#define PEER_CONNECTING 1 #define PEER_CONNECTING 1
#define PEER_CONNECTED 2 #define PEER_CONNECTED 2
#define PEER_STOPPED 3
#define DEFAULT_PEER_CONNECT_INTERVAL 5 #define DEFAULT_PEER_CONNECT_INTERVAL 5
#define DEFAULT_PEER_CONNECT_TIMEOUT 5 #define DEFAULT_PEER_CONNECT_TIMEOUT 5
@ -190,6 +191,8 @@ struct osd_op_t
struct osd_client_t struct osd_client_t
{ {
int refs = 0;
sockaddr_in peer_addr; sockaddr_in peer_addr;
int peer_port; int peer_port;
int peer_fd; int peer_fd;
@ -263,7 +266,7 @@ struct osd_messenger_t
std::map<uint64_t, int> osd_peer_fds; std::map<uint64_t, int> osd_peer_fds;
uint64_t next_subop_id = 1; uint64_t next_subop_id = 1;
std::map<int, osd_client_t> clients; std::map<int, osd_client_t*> clients;
std::vector<int> read_ready_clients; std::vector<int> read_ready_clients;
std::vector<int> write_ready_clients; std::vector<int> write_ready_clients;
std::vector<std::function<void()>> set_immediate; std::vector<std::function<void()>> set_immediate;
@ -288,15 +291,15 @@ protected:
void try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port); void try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port);
void handle_connect_epoll(int peer_fd); void handle_connect_epoll(int peer_fd);
void on_connect_peer(osd_num_t peer_osd, int peer_fd); void on_connect_peer(osd_num_t peer_osd, int peer_fd);
void check_peer_config(osd_client_t & cl); void check_peer_config(osd_client_t *cl);
void cancel_osd_ops(osd_client_t & cl); void cancel_osd_ops(osd_client_t *cl);
void cancel_op(osd_op_t *op); void cancel_op(osd_op_t *op);
bool try_send(osd_client_t & cl); bool try_send(osd_client_t *cl);
void handle_send(int result, int peer_fd); void handle_send(int result, osd_client_t *cl);
bool handle_read(int result, int peer_fd); bool handle_read(int result, osd_client_t *cl);
bool handle_finished_read(osd_client_t & cl); bool handle_finished_read(osd_client_t *cl);
void handle_op_hdr(osd_client_t *cl); void handle_op_hdr(osd_client_t *cl);
bool handle_reply_hdr(osd_client_t *cl); bool handle_reply_hdr(osd_client_t *cl);
void handle_reply_ready(osd_op_t *op); void handle_reply_ready(osd_op_t *op);

View File

@ -8,21 +8,22 @@ void osd_messenger_t::read_requests()
for (int i = 0; i < read_ready_clients.size(); i++) for (int i = 0; i < read_ready_clients.size(); i++)
{ {
int peer_fd = read_ready_clients[i]; int peer_fd = read_ready_clients[i];
auto & cl = clients[peer_fd]; osd_client_t *cl = clients[peer_fd];
if (cl.read_remaining < receive_buffer_size) if (cl->read_remaining < receive_buffer_size)
{ {
cl.read_iov.iov_base = cl.in_buf; cl->read_iov.iov_base = cl->in_buf;
cl.read_iov.iov_len = receive_buffer_size; cl->read_iov.iov_len = receive_buffer_size;
cl.read_msg.msg_iov = &cl.read_iov; cl->read_msg.msg_iov = &cl->read_iov;
cl.read_msg.msg_iovlen = 1; cl->read_msg.msg_iovlen = 1;
} }
else else
{ {
cl.read_iov.iov_base = 0; cl->read_iov.iov_base = 0;
cl.read_iov.iov_len = cl.read_remaining; cl->read_iov.iov_len = cl->read_remaining;
cl.read_msg.msg_iov = cl.recv_list.get_iovec(); cl->read_msg.msg_iov = cl->recv_list.get_iovec();
cl.read_msg.msg_iovlen = cl.recv_list.get_size(); cl->read_msg.msg_iovlen = cl->recv_list.get_size();
} }
cl->refs++;
if (ringloop && !use_sync_send_recv) if (ringloop && !use_sync_send_recv)
{ {
io_uring_sqe* sqe = ringloop->get_sqe(); io_uring_sqe* sqe = ringloop->get_sqe();
@ -32,111 +33,115 @@ void osd_messenger_t::read_requests()
return; return;
} }
ring_data_t* data = ((ring_data_t*)sqe->user_data); ring_data_t* data = ((ring_data_t*)sqe->user_data);
data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data->res, peer_fd); }; data->callback = [this, cl](ring_data_t *data) { handle_read(data->res, cl); };
my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0); my_uring_prep_recvmsg(sqe, peer_fd, &cl->read_msg, 0);
} }
else else
{ {
int result = recvmsg(peer_fd, &cl.read_msg, 0); int result = recvmsg(peer_fd, &cl->read_msg, 0);
if (result < 0) if (result < 0)
{ {
result = -errno; result = -errno;
} }
handle_read(result, peer_fd); handle_read(result, cl);
} }
} }
read_ready_clients.clear(); read_ready_clients.clear();
} }
bool osd_messenger_t::handle_read(int result, int peer_fd) bool osd_messenger_t::handle_read(int result, osd_client_t *cl)
{ {
bool ret = false; bool ret = false;
auto cl_it = clients.find(peer_fd); cl->refs--;
if (cl_it != clients.end()) if (cl->peer_state == PEER_STOPPED)
{ {
auto & cl = cl_it->second; if (cl->refs <= 0)
if (result <= 0 && result != -EAGAIN)
{ {
// this is a client socket, so don't panic on error. just disconnect it delete cl;
if (result != 0)
{
printf("Client %d socket read error: %d (%s). Disconnecting client\n", peer_fd, -result, strerror(-result));
}
stop_client(peer_fd);
return false;
} }
if (result == -EAGAIN || result < cl.read_iov.iov_len) return false;
}
if (result <= 0 && result != -EAGAIN)
{
// this is a client socket, so don't panic on error. just disconnect it
if (result != 0)
{ {
cl.read_ready--; printf("Client %d socket read error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result));
if (cl.read_ready > 0) }
read_ready_clients.push_back(peer_fd); stop_client(cl->peer_fd);
return false;
}
if (result == -EAGAIN || result < cl->read_iov.iov_len)
{
cl->read_ready--;
if (cl->read_ready > 0)
read_ready_clients.push_back(cl->peer_fd);
}
else
{
read_ready_clients.push_back(cl->peer_fd);
}
if (result > 0)
{
if (cl->read_iov.iov_base == cl->in_buf)
{
// Compose operation(s) from the buffer
int remain = result;
void *curbuf = cl->in_buf;
while (remain > 0)
{
if (!cl->read_op)
{
cl->read_op = new osd_op_t;
cl->read_op->peer_fd = cl->peer_fd;
cl->read_op->op_type = OSD_OP_IN;
cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE);
cl->read_remaining = OSD_PACKET_SIZE;
cl->read_state = CL_READ_HDR;
}
while (cl->recv_list.done < cl->recv_list.count && remain > 0)
{
iovec* cur = cl->recv_list.get_iovec();
if (cur->iov_len > remain)
{
memcpy(cur->iov_base, curbuf, remain);
cl->read_remaining -= remain;
cur->iov_len -= remain;
cur->iov_base += remain;
remain = 0;
}
else
{
memcpy(cur->iov_base, curbuf, cur->iov_len);
curbuf += cur->iov_len;
cl->read_remaining -= cur->iov_len;
remain -= cur->iov_len;
cur->iov_len = 0;
cl->recv_list.done++;
}
}
if (cl->recv_list.done >= cl->recv_list.count)
{
if (!handle_finished_read(cl))
{
goto fin;
}
}
}
} }
else else
{ {
read_ready_clients.push_back(peer_fd); // Long data
cl->read_remaining -= result;
cl->recv_list.eat(result);
if (cl->recv_list.done >= cl->recv_list.count)
{
handle_finished_read(cl);
}
} }
if (result > 0) if (result >= cl->read_iov.iov_len)
{ {
if (cl.read_iov.iov_base == cl.in_buf) ret = true;
{
// Compose operation(s) from the buffer
int remain = result;
void *curbuf = cl.in_buf;
while (remain > 0)
{
if (!cl.read_op)
{
cl.read_op = new osd_op_t;
cl.read_op->peer_fd = peer_fd;
cl.read_op->op_type = OSD_OP_IN;
cl.recv_list.push_back(cl.read_op->req.buf, OSD_PACKET_SIZE);
cl.read_remaining = OSD_PACKET_SIZE;
cl.read_state = CL_READ_HDR;
}
while (cl.recv_list.done < cl.recv_list.count && remain > 0)
{
iovec* cur = cl.recv_list.get_iovec();
if (cur->iov_len > remain)
{
memcpy(cur->iov_base, curbuf, remain);
cl.read_remaining -= remain;
cur->iov_len -= remain;
cur->iov_base += remain;
remain = 0;
}
else
{
memcpy(cur->iov_base, curbuf, cur->iov_len);
curbuf += cur->iov_len;
cl.read_remaining -= cur->iov_len;
remain -= cur->iov_len;
cur->iov_len = 0;
cl.recv_list.done++;
}
}
if (cl.recv_list.done >= cl.recv_list.count)
{
if (!handle_finished_read(cl))
{
goto fin;
}
}
}
}
else
{
// Long data
cl.read_remaining -= result;
cl.recv_list.eat(result);
if (cl.recv_list.done >= cl.recv_list.count)
{
handle_finished_read(cl);
}
}
if (result >= cl.read_iov.iov_len)
{
ret = true;
}
} }
} }
fin: fin:
@ -148,30 +153,30 @@ fin:
return ret; return ret;
} }
bool osd_messenger_t::handle_finished_read(osd_client_t & cl) bool osd_messenger_t::handle_finished_read(osd_client_t *cl)
{ {
cl.recv_list.reset(); cl->recv_list.reset();
if (cl.read_state == CL_READ_HDR) if (cl->read_state == CL_READ_HDR)
{ {
if (cl.read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC) if (cl->read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC)
return handle_reply_hdr(&cl); return handle_reply_hdr(cl);
else else
handle_op_hdr(&cl); handle_op_hdr(cl);
} }
else if (cl.read_state == CL_READ_DATA) else if (cl->read_state == CL_READ_DATA)
{ {
// Operation is ready // Operation is ready
cl.received_ops.push_back(cl.read_op); cl->received_ops.push_back(cl->read_op);
set_immediate.push_back([this, op = cl.read_op]() { exec_op(op); }); set_immediate.push_back([this, op = cl->read_op]() { exec_op(op); });
cl.read_op = NULL; cl->read_op = NULL;
cl.read_state = 0; cl->read_state = 0;
} }
else if (cl.read_state == CL_READ_REPLY_DATA) else if (cl->read_state == CL_READ_REPLY_DATA)
{ {
// Reply is ready // Reply is ready
handle_reply_ready(cl.read_op); handle_reply_ready(cl->read_op);
cl.read_op = NULL; cl->read_op = NULL;
cl.read_state = 0; cl->read_state = 0;
} }
else else
{ {

View File

@ -6,7 +6,7 @@
void osd_messenger_t::outbox_push(osd_op_t *cur_op) void osd_messenger_t::outbox_push(osd_op_t *cur_op)
{ {
assert(cur_op->peer_fd); assert(cur_op->peer_fd);
auto & cl = clients.at(cur_op->peer_fd); osd_client_t *cl = clients.at(cur_op->peer_fd);
if (cur_op->op_type == OSD_OP_OUT) if (cur_op->op_type == OSD_OP_OUT)
{ {
clock_gettime(CLOCK_REALTIME, &cur_op->tv_begin); clock_gettime(CLOCK_REALTIME, &cur_op->tv_begin);
@ -15,12 +15,12 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
{ {
// Check that operation actually belongs to this client // Check that operation actually belongs to this client
bool found = false; bool found = false;
for (auto it = cl.received_ops.begin(); it != cl.received_ops.end(); it++) for (auto it = cl->received_ops.begin(); it != cl->received_ops.end(); it++)
{ {
if (*it == cur_op) if (*it == cur_op)
{ {
found = true; found = true;
cl.received_ops.erase(it, it+1); cl->received_ops.erase(it, it+1);
break; break;
} }
} }
@ -30,85 +30,86 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
return; return;
} }
} }
cl.outbox.push_back(cur_op); cl->outbox.push_back(cur_op);
if (!ringloop) if (!ringloop)
{ {
while (cl.write_op || cl.outbox.size()) while (cl->write_op || cl->outbox.size())
{ {
try_send(cl); try_send(cl);
} }
} }
else if (cl.write_op || cl.outbox.size() > 1 || !try_send(cl)) else if (cl->write_op || cl->outbox.size() > 1 || !try_send(cl))
{ {
if (cl.write_state == 0) if (cl->write_state == 0)
{ {
cl.write_state = CL_WRITE_READY; cl->write_state = CL_WRITE_READY;
write_ready_clients.push_back(cur_op->peer_fd); write_ready_clients.push_back(cur_op->peer_fd);
} }
ringloop->wakeup(); ringloop->wakeup();
} }
} }
bool osd_messenger_t::try_send(osd_client_t & cl) bool osd_messenger_t::try_send(osd_client_t *cl)
{ {
int peer_fd = cl.peer_fd; int peer_fd = cl->peer_fd;
if (!cl.write_op) if (!cl->write_op)
{ {
// pick next command // pick next command
cl.write_op = cl.outbox.front(); cl->write_op = cl->outbox.front();
cl.outbox.pop_front(); cl->outbox.pop_front();
cl.write_state = CL_WRITE_REPLY; cl->write_state = CL_WRITE_REPLY;
if (cl.write_op->op_type == OSD_OP_IN) if (cl->write_op->op_type == OSD_OP_IN)
{ {
// Measure execution latency // Measure execution latency
timespec tv_end; timespec tv_end;
clock_gettime(CLOCK_REALTIME, &tv_end); clock_gettime(CLOCK_REALTIME, &tv_end);
stats.op_stat_count[cl.write_op->req.hdr.opcode]++; stats.op_stat_count[cl->write_op->req.hdr.opcode]++;
if (!stats.op_stat_count[cl.write_op->req.hdr.opcode]) if (!stats.op_stat_count[cl->write_op->req.hdr.opcode])
{ {
stats.op_stat_count[cl.write_op->req.hdr.opcode]++; stats.op_stat_count[cl->write_op->req.hdr.opcode]++;
stats.op_stat_sum[cl.write_op->req.hdr.opcode] = 0; stats.op_stat_sum[cl->write_op->req.hdr.opcode] = 0;
stats.op_stat_bytes[cl.write_op->req.hdr.opcode] = 0; stats.op_stat_bytes[cl->write_op->req.hdr.opcode] = 0;
} }
stats.op_stat_sum[cl.write_op->req.hdr.opcode] += ( stats.op_stat_sum[cl->write_op->req.hdr.opcode] += (
(tv_end.tv_sec - cl.write_op->tv_begin.tv_sec)*1000000 + (tv_end.tv_sec - cl->write_op->tv_begin.tv_sec)*1000000 +
(tv_end.tv_nsec - cl.write_op->tv_begin.tv_nsec)/1000 (tv_end.tv_nsec - cl->write_op->tv_begin.tv_nsec)/1000
); );
if (cl.write_op->req.hdr.opcode == OSD_OP_READ || if (cl->write_op->req.hdr.opcode == OSD_OP_READ ||
cl.write_op->req.hdr.opcode == OSD_OP_WRITE) cl->write_op->req.hdr.opcode == OSD_OP_WRITE)
{ {
stats.op_stat_bytes[cl.write_op->req.hdr.opcode] += cl.write_op->req.rw.len; stats.op_stat_bytes[cl->write_op->req.hdr.opcode] += cl->write_op->req.rw.len;
} }
else if (cl.write_op->req.hdr.opcode == OSD_OP_SEC_READ || else if (cl->write_op->req.hdr.opcode == OSD_OP_SEC_READ ||
cl.write_op->req.hdr.opcode == OSD_OP_SEC_WRITE || cl->write_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
cl.write_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE) cl->write_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE)
{ {
stats.op_stat_bytes[cl.write_op->req.hdr.opcode] += cl.write_op->req.sec_rw.len; stats.op_stat_bytes[cl->write_op->req.hdr.opcode] += cl->write_op->req.sec_rw.len;
} }
cl.send_list.push_back(cl.write_op->reply.buf, OSD_PACKET_SIZE); cl->send_list.push_back(cl->write_op->reply.buf, OSD_PACKET_SIZE);
if (cl.write_op->req.hdr.opcode == OSD_OP_READ || if (cl->write_op->req.hdr.opcode == OSD_OP_READ ||
cl.write_op->req.hdr.opcode == OSD_OP_SEC_READ || cl->write_op->req.hdr.opcode == OSD_OP_SEC_READ ||
cl.write_op->req.hdr.opcode == OSD_OP_SEC_LIST || cl->write_op->req.hdr.opcode == OSD_OP_SEC_LIST ||
cl.write_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG) cl->write_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG)
{ {
cl.send_list.append(cl.write_op->iov); cl->send_list.append(cl->write_op->iov);
} }
} }
else else
{ {
cl.send_list.push_back(cl.write_op->req.buf, OSD_PACKET_SIZE); cl->send_list.push_back(cl->write_op->req.buf, OSD_PACKET_SIZE);
if (cl.write_op->req.hdr.opcode == OSD_OP_WRITE || if (cl->write_op->req.hdr.opcode == OSD_OP_WRITE ||
cl.write_op->req.hdr.opcode == OSD_OP_SEC_WRITE || cl->write_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
cl.write_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE || cl->write_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE ||
cl.write_op->req.hdr.opcode == OSD_OP_SEC_STABILIZE || cl->write_op->req.hdr.opcode == OSD_OP_SEC_STABILIZE ||
cl.write_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK) cl->write_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK)
{ {
cl.send_list.append(cl.write_op->iov); cl->send_list.append(cl->write_op->iov);
} }
} }
} }
cl.write_msg.msg_iov = cl.send_list.get_iovec(); cl->write_msg.msg_iov = cl->send_list.get_iovec();
cl.write_msg.msg_iovlen = cl.send_list.get_size(); cl->write_msg.msg_iovlen = cl->send_list.get_size();
cl->refs++;
if (ringloop && !use_sync_send_recv) if (ringloop && !use_sync_send_recv)
{ {
io_uring_sqe* sqe = ringloop->get_sqe(); io_uring_sqe* sqe = ringloop->get_sqe();
@ -117,17 +118,17 @@ bool osd_messenger_t::try_send(osd_client_t & cl)
return false; return false;
} }
ring_data_t* data = ((ring_data_t*)sqe->user_data); ring_data_t* data = ((ring_data_t*)sqe->user_data);
data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data->res, peer_fd); }; data->callback = [this, cl](ring_data_t *data) { handle_send(data->res, cl); };
my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0); my_uring_prep_sendmsg(sqe, peer_fd, &cl->write_msg, 0);
} }
else else
{ {
int result = sendmsg(peer_fd, &cl.write_msg, MSG_NOSIGNAL); int result = sendmsg(peer_fd, &cl->write_msg, MSG_NOSIGNAL);
if (result < 0) if (result < 0)
{ {
result = -errno; result = -errno;
} }
handle_send(result, peer_fd); handle_send(result, cl);
} }
return true; return true;
} }
@ -146,41 +147,45 @@ void osd_messenger_t::send_replies()
write_ready_clients.clear(); write_ready_clients.clear();
} }
void osd_messenger_t::handle_send(int result, int peer_fd) void osd_messenger_t::handle_send(int result, osd_client_t *cl)
{ {
auto cl_it = clients.find(peer_fd); cl->refs--;
if (cl_it != clients.end()) if (cl->peer_state == PEER_STOPPED)
{ {
auto & cl = cl_it->second; if (!cl->refs)
if (result < 0 && result != -EAGAIN)
{ {
// this is a client socket, so don't panic. just disconnect it delete cl;
printf("Client %d socket write error: %d (%s). Disconnecting client\n", peer_fd, -result, strerror(-result));
stop_client(peer_fd);
return;
} }
if (result >= 0) return;
}
if (result < 0 && result != -EAGAIN)
{
// this is a client socket, so don't panic. just disconnect it
printf("Client %d socket write error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result));
stop_client(cl->peer_fd);
return;
}
if (result >= 0)
{
cl->send_list.eat(result);
if (cl->send_list.done >= cl->send_list.count)
{ {
cl.send_list.eat(result); // Done
if (cl.send_list.done >= cl.send_list.count) cl->send_list.reset();
if (cl->write_op->op_type == OSD_OP_IN)
{ {
// Done delete cl->write_op;
cl.send_list.reset();
if (cl.write_op->op_type == OSD_OP_IN)
{
delete cl.write_op;
}
else
{
cl.sent_ops[cl.write_op->req.hdr.id] = cl.write_op;
}
cl.write_op = NULL;
cl.write_state = cl.outbox.size() > 0 ? CL_WRITE_READY : 0;
} }
} else
if (cl.write_state != 0) {
{ cl->sent_ops[cl->write_op->req.hdr.id] = cl->write_op;
write_ready_clients.push_back(peer_fd); }
cl->write_op = NULL;
cl->write_state = cl->outbox.size() > 0 ? CL_WRITE_READY : 0;
} }
} }
if (cl->write_state != 0)
{
write_ready_clients.push_back(cl->peer_fd);
}
} }

View File

@ -141,7 +141,7 @@ void osd_t::start_pg_peering(pg_t & pg)
std::vector<int> to_stop; std::vector<int> to_stop;
for (auto & cp: c_cli.clients) for (auto & cp: c_cli.clients)
{ {
if (cp.second.dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) != cp.second.dirty_pgs.end()) if (cp.second->dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) != cp.second->dirty_pgs.end())
{ {
to_stop.push_back(cp.first); to_stop.push_back(cp.first);
} }
@ -308,7 +308,7 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p
auto & cl = c_cli.clients.at(c_cli.osd_peer_fds[role_osd]); auto & cl = c_cli.clients.at(c_cli.osd_peer_fds[role_osd]);
osd_op_t *op = new osd_op_t(); osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT; op->op_type = OSD_OP_OUT;
op->peer_fd = cl.peer_fd; op->peer_fd = cl->peer_fd;
op->req = { op->req = {
.sec_sync = { .sec_sync = {
.header = { .header = {

View File

@ -461,7 +461,7 @@ resume_7:
} }
// Remember PG as dirty to drop the connection when PG goes offline // Remember PG as dirty to drop the connection when PG goes offline
// (this is required because of the "lazy sync") // (this is required because of the "lazy sync")
c_cli.clients[cur_op->peer_fd].dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }); c_cli.clients[cur_op->peer_fd]->dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }); dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
} }
return true; return true;
@ -651,7 +651,7 @@ finish:
{ {
auto it = c_cli.clients.find(cur_op->peer_fd); auto it = c_cli.clients.find(cur_op->peer_fd);
if (it != c_cli.clients.end()) if (it != c_cli.clients.end())
it->second.dirty_pgs.clear(); it->second->dirty_pgs.clear();
} }
finish_op(cur_op, 0); finish_op(cur_op, 0);
} }