From 79839ec31d079674fd7623e81061633b83e304bb Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 2 Mar 2020 00:20:28 +0300 Subject: [PATCH] Start sending immediately instead of waiting for another loop --- osd.h | 1 + osd_send.cpp | 84 ++++++++++++++++++++++++++++++---------------------- 2 files changed, 49 insertions(+), 36 deletions(-) diff --git a/osd.h b/osd.h index 37a5283d..7e681e3d 100644 --- a/osd.h +++ b/osd.h @@ -219,6 +219,7 @@ class osd_t void handle_read(ring_data_t *data, int peer_fd); void handle_op_hdr(osd_client_t *cl); void handle_reply_hdr(osd_client_t *cl); + bool try_send(osd_client_t & cl); void send_replies(); void handle_send(ring_data_t *data, int peer_fd); void outbox_push(osd_client_t & cl, osd_op_t *op); diff --git a/osd_send.cpp b/osd_send.cpp index 4b2e580f..75927317 100644 --- a/osd_send.cpp +++ b/osd_send.cpp @@ -7,13 +7,54 @@ void osd_t::outbox_push(osd_client_t & cl, osd_op_t *cur_op) { gettimeofday(&cur_op->tv_begin, NULL); } - if (cl.write_state == 0) - { - cl.write_state = CL_WRITE_READY; - write_ready_clients.push_back(cur_op->peer_fd); - } cl.outbox.push_back(cur_op); - ringloop->wakeup(); + if (cl.write_op || cl.outbox.size() > 1 || !try_send(cl)) + { + if (cl.write_state == 0) + { + cl.write_state = CL_WRITE_READY; + write_ready_clients.push_back(cur_op->peer_fd); + } + ringloop->wakeup(); + } +} + +bool osd_t::try_send(osd_client_t & cl) +{ + int peer_fd = cl.peer_fd; + io_uring_sqe* sqe = ringloop->get_sqe(); + if (!sqe) + { + return false; + } + ring_data_t* data = ((ring_data_t*)sqe->user_data); + if (!cl.write_op) + { + // pick next command + cl.write_op = cl.outbox.front(); + cl.outbox.pop_front(); + cl.write_state = CL_WRITE_REPLY; + if (cl.write_op->op_type == OSD_OP_OUT) + { + gettimeofday(&cl.write_op->tv_send, NULL); + } + else + { + // Measure execution latency + timeval tv_end; + gettimeofday(&tv_end, NULL); + op_stat_count[cl.write_op->req.hdr.opcode]++; + op_stat_sum[cl.write_op->req.hdr.opcode] += ( + (tv_end.tv_sec - cl.write_op->tv_begin.tv_sec)*1000000 + + tv_end.tv_usec - cl.write_op->tv_begin.tv_usec + ); + } + } + cl.write_msg.msg_iov = cl.write_op->send_list.get_iovec(); + cl.write_msg.msg_iovlen = cl.write_op->send_list.get_size(); + data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data, peer_fd); }; + my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0); + return true; } void osd_t::send_replies() @@ -21,40 +62,11 @@ void osd_t::send_replies() for (int i = 0; i < write_ready_clients.size(); i++) { int peer_fd = write_ready_clients[i]; - auto & cl = clients[peer_fd]; - io_uring_sqe* sqe = ringloop->get_sqe(); - if (!sqe) + if (!try_send(clients[peer_fd])) { write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + i); return; } - ring_data_t* data = ((ring_data_t*)sqe->user_data); - if (!cl.write_op) - { - // pick next command - cl.write_op = cl.outbox.front(); - cl.outbox.pop_front(); - cl.write_state = CL_WRITE_REPLY; - if (cl.write_op->op_type == OSD_OP_OUT) - { - gettimeofday(&cl.write_op->tv_send, NULL); - } - else - { - // Measure execution latency - timeval tv_end; - gettimeofday(&tv_end, NULL); - op_stat_count[cl.write_op->req.hdr.opcode]++; - op_stat_sum[cl.write_op->req.hdr.opcode] += ( - (tv_end.tv_sec - cl.write_op->tv_begin.tv_sec)*1000000 + - tv_end.tv_usec - cl.write_op->tv_begin.tv_usec - ); - } - } - cl.write_msg.msg_iov = cl.write_op->send_list.get_iovec(); - cl.write_msg.msg_iovlen = cl.write_op->send_list.get_size(); - data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data, peer_fd); }; - my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0); } write_ready_clients.clear(); }