Compare commits

...

1 Commits

2 changed files with 17 additions and 28 deletions

View File

@ -2,17 +2,10 @@
void osd_messenger_t::read_requests()
{
for (int i = 0; i < read_ready_clients.size(); i++)
while (read_ready_clients.size() > 0)
{
int peer_fd = read_ready_clients[i];
int peer_fd = read_ready_clients[0];
auto & cl = clients[peer_fd];
io_uring_sqe* sqe = ringloop->get_sqe();
if (!sqe)
{
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i);
return;
}
ring_data_t* data = ((ring_data_t*)sqe->user_data);
if (!cl.read_op || cl.read_remaining < receive_buffer_size)
{
cl.read_iov.iov_base = cl.in_buf;
@ -25,10 +18,14 @@ void osd_messenger_t::read_requests()
}
cl.read_msg.msg_iov = &cl.read_iov;
cl.read_msg.msg_iovlen = 1;
data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data->res, peer_fd); };
my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0);
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + 1);
int result = recvmsg(peer_fd, &cl.read_msg, 0);
if (result < 0)
{
result = -errno;
}
handle_read(result, peer_fd);
}
read_ready_clients.clear();
}
bool osd_messenger_t::handle_read(int result, int peer_fd)

View File

@ -42,12 +42,6 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
bool osd_messenger_t::try_send(osd_client_t & cl)
{
int peer_fd = cl.peer_fd;
io_uring_sqe* sqe = ringloop->get_sqe();
if (!sqe)
{
return false;
}
ring_data_t* data = ((ring_data_t*)sqe->user_data);
if (!cl.write_op)
{
// pick next command
@ -84,23 +78,21 @@ bool osd_messenger_t::try_send(osd_client_t & cl)
}
cl.write_msg.msg_iov = cl.write_op->send_list.get_iovec();
cl.write_msg.msg_iovlen = cl.write_op->send_list.get_size();
data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data->res, peer_fd); };
my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0);
int result = sendmsg(peer_fd, &cl.write_msg, MSG_NOSIGNAL);
if (result < 0)
result = -errno;
handle_send(result, peer_fd);
return true;
}
void osd_messenger_t::send_replies()
{
for (int i = 0; i < write_ready_clients.size(); i++)
while (write_ready_clients.size() > 0)
{
int peer_fd = write_ready_clients[i];
if (!try_send(clients[peer_fd]))
{
write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + i);
return;
}
auto & cl = clients[write_ready_clients[0]];
write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + 1);
try_send(cl);
}
write_ready_clients.clear();
}
void osd_messenger_t::handle_send(int result, int peer_fd)