Compare commits

...

1 Commits

2 changed files with 17 additions and 28 deletions

View File

@ -2,17 +2,10 @@
void osd_messenger_t::read_requests() void osd_messenger_t::read_requests()
{ {
for (int i = 0; i < read_ready_clients.size(); i++) while (read_ready_clients.size() > 0)
{ {
int peer_fd = read_ready_clients[i]; int peer_fd = read_ready_clients[0];
auto & cl = clients[peer_fd]; auto & cl = clients[peer_fd];
io_uring_sqe* sqe = ringloop->get_sqe();
if (!sqe)
{
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i);
return;
}
ring_data_t* data = ((ring_data_t*)sqe->user_data);
if (!cl.read_op || cl.read_remaining < receive_buffer_size) if (!cl.read_op || cl.read_remaining < receive_buffer_size)
{ {
cl.read_iov.iov_base = cl.in_buf; cl.read_iov.iov_base = cl.in_buf;
@ -25,10 +18,14 @@ void osd_messenger_t::read_requests()
} }
cl.read_msg.msg_iov = &cl.read_iov; cl.read_msg.msg_iov = &cl.read_iov;
cl.read_msg.msg_iovlen = 1; cl.read_msg.msg_iovlen = 1;
data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data->res, peer_fd); }; read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + 1);
my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0); int result = recvmsg(peer_fd, &cl.read_msg, 0);
if (result < 0)
{
result = -errno;
}
handle_read(result, peer_fd);
} }
read_ready_clients.clear();
} }
bool osd_messenger_t::handle_read(int result, int peer_fd) bool osd_messenger_t::handle_read(int result, int peer_fd)

View File

@ -42,12 +42,6 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
bool osd_messenger_t::try_send(osd_client_t & cl) bool osd_messenger_t::try_send(osd_client_t & cl)
{ {
int peer_fd = cl.peer_fd; int peer_fd = cl.peer_fd;
io_uring_sqe* sqe = ringloop->get_sqe();
if (!sqe)
{
return false;
}
ring_data_t* data = ((ring_data_t*)sqe->user_data);
if (!cl.write_op) if (!cl.write_op)
{ {
// pick next command // pick next command
@ -84,23 +78,21 @@ bool osd_messenger_t::try_send(osd_client_t & cl)
} }
cl.write_msg.msg_iov = cl.write_op->send_list.get_iovec(); cl.write_msg.msg_iov = cl.write_op->send_list.get_iovec();
cl.write_msg.msg_iovlen = cl.write_op->send_list.get_size(); cl.write_msg.msg_iovlen = cl.write_op->send_list.get_size();
data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data->res, peer_fd); }; int result = sendmsg(peer_fd, &cl.write_msg, MSG_NOSIGNAL);
my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0); if (result < 0)
result = -errno;
handle_send(result, peer_fd);
return true; return true;
} }
void osd_messenger_t::send_replies() void osd_messenger_t::send_replies()
{ {
for (int i = 0; i < write_ready_clients.size(); i++) while (write_ready_clients.size() > 0)
{ {
int peer_fd = write_ready_clients[i]; auto & cl = clients[write_ready_clients[0]];
if (!try_send(clients[peer_fd])) write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + 1);
{ try_send(cl);
write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + i);
return;
}
} }
write_ready_clients.clear();
} }
void osd_messenger_t::handle_send(int result, int peer_fd) void osd_messenger_t::handle_send(int result, int peer_fd)