From 8315407558e24e7c8fe1485aaef16008a0fdca12 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 2 Mar 2020 02:58:00 +0300 Subject: [PATCH] Incoming data pre-buffering --- osd.cpp | 2 + osd.h | 6 ++- osd_peering.cpp | 1 + osd_receive.cpp | 126 +++++++++++++++++++++++++++++++----------------- 4 files changed, 89 insertions(+), 46 deletions(-) diff --git a/osd.cpp b/osd.cpp index e4b820ef..fc3a7587 100644 --- a/osd.cpp +++ b/osd.cpp @@ -220,6 +220,7 @@ restart: .peer_port = ntohs(addr.sin_port), .peer_fd = peer_fd, .peer_state = PEER_CONNECTED, + .in_buf = malloc(receive_buffer_size), }; // Add FD to epoll epoll_event ev; @@ -344,6 +345,7 @@ void osd_t::stop_client(int peer_fd) break; } } + free(cl.in_buf); clients.erase(it); close(peer_fd); } diff --git a/osd.h b/osd.h index 97c735f2..5cf78482 100644 --- a/osd.h +++ b/osd.h @@ -22,7 +22,7 @@ #define OSD_OP_IN 0 #define OSD_OP_OUT 1 -#define CL_READ_OP 1 +#define CL_READ_HDR 1 #define CL_READ_DATA 2 #define CL_READ_REPLY_DATA 3 #define CL_WRITE_READY 1 @@ -125,6 +125,8 @@ struct osd_client_t std::function connect_callback; osd_num_t osd_num = 0; + void *in_buf = NULL; + // Read state int read_ready = 0; osd_op_t *read_op = NULL; @@ -170,6 +172,7 @@ class osd_t int bind_port, listen_backlog; int client_queue_depth = 128; bool allow_test_ops = true; + int receive_buffer_size = 9000; // peer OSDs @@ -215,6 +218,7 @@ class osd_t void handle_epoll_events(); void read_requests(); void handle_read(ring_data_t *data, int peer_fd); + void handle_finished_read(osd_client_t & cl); void handle_op_hdr(osd_client_t *cl); void handle_reply_hdr(osd_client_t *cl); bool try_send(osd_client_t & cl); diff --git a/osd_peering.cpp b/osd_peering.cpp index 6abaa963..ae4b89b8 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -84,6 +84,7 @@ void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port .peer_state = PEER_CONNECTING, .connect_callback = callback, .osd_num = osd_num, + .in_buf = malloc(receive_buffer_size), }; osd_peer_fds[osd_num] = peer_fd; // Add FD to epoll (EPOLLOUT for tracking connect() result) diff --git a/osd_receive.cpp b/osd_receive.cpp index 2de701b7..2f2f5d71 100644 --- a/osd_receive.cpp +++ b/osd_receive.cpp @@ -13,22 +13,16 @@ void osd_t::read_requests() return; } ring_data_t* data = ((ring_data_t*)sqe->user_data); - if (!cl.read_buf) + if (!cl.read_op || cl.read_remaining < receive_buffer_size) { - // no reads in progress - // so this is either a new command or a reply to a previously sent command - if (!cl.read_op) - { - cl.read_op = new osd_op_t; - cl.read_op->peer_fd = peer_fd; - } - cl.read_op->op_type = OSD_OP_IN; - cl.read_buf = &cl.read_op->req.buf; - cl.read_remaining = OSD_PACKET_SIZE; - cl.read_state = CL_READ_OP; + cl.read_iov.iov_base = cl.in_buf; + cl.read_iov.iov_len = receive_buffer_size; + } + else + { + cl.read_iov.iov_base = cl.read_buf; + cl.read_iov.iov_len = cl.read_remaining; } - cl.read_iov.iov_base = cl.read_buf; - cl.read_iov.iov_len = cl.read_remaining; cl.read_msg.msg_iov = &cl.read_iov; cl.read_msg.msg_iovlen = 1; data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data, peer_fd); }; @@ -60,52 +54,93 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd) read_ready_clients.push_back(peer_fd); if (data->res > 0) { - cl.read_remaining -= data->res; - cl.read_buf += data->res; - if (cl.read_remaining <= 0) + if (cl.read_iov.iov_base == cl.in_buf) { - cl.read_buf = NULL; - if (cl.read_state == CL_READ_OP) + // Compose operation(s) from the buffer + int remain = data->res; + void *curbuf = cl.in_buf; + while (remain > 0) { - if (cl.read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC) + if (!cl.read_op) { - handle_reply_hdr(&cl); + cl.read_op = new osd_op_t; + cl.read_op->peer_fd = peer_fd; + cl.read_op->op_type = OSD_OP_IN; + cl.read_buf = cl.read_op->req.buf; + cl.read_remaining = OSD_PACKET_SIZE; + cl.read_state = CL_READ_HDR; + } + if (cl.read_remaining > remain) + { + memcpy(cl.read_buf, curbuf, remain); + cl.read_remaining -= remain; + cl.read_buf += remain; + remain = 0; + if (cl.read_remaining <= 0) + handle_finished_read(cl); } else { - handle_op_hdr(&cl); + memcpy(cl.read_buf, curbuf, cl.read_remaining); + curbuf += cl.read_remaining; + remain -= cl.read_remaining; + cl.read_remaining = 0; + cl.read_buf = NULL; + handle_finished_read(cl); } } - else if (cl.read_state == CL_READ_DATA) + } + else + { + // Long data + cl.read_remaining -= data->res; + cl.read_buf += data->res; + if (cl.read_remaining <= 0) { - // Operation is ready - exec_op(cl.read_op); - cl.read_op = NULL; - cl.read_state = 0; - } - else if (cl.read_state == CL_READ_REPLY_DATA) - { - // Reply is ready - auto req_it = cl.sent_ops.find(cl.read_reply_id); - osd_op_t *request = req_it->second; - cl.sent_ops.erase(req_it); - cl.read_reply_id = 0; - cl.read_state = 0; - // Measure subop latency - timespec tv_end; - clock_gettime(CLOCK_REALTIME, &tv_end); - subop_stat_count[request->req.hdr.opcode]++; - subop_stat_sum[request->req.hdr.opcode] += ( - (tv_end.tv_sec - request->tv_begin.tv_sec)*1000000 + - (tv_end.tv_nsec - request->tv_begin.tv_nsec)/1000 - ); - request->callback(request); + handle_finished_read(cl); } } } } } +void osd_t::handle_finished_read(osd_client_t & cl) +{ + if (cl.read_state == CL_READ_HDR) + { + if (cl.read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC) + handle_reply_hdr(&cl); + else + handle_op_hdr(&cl); + } + else if (cl.read_state == CL_READ_DATA) + { + // Operation is ready + exec_op(cl.read_op); + cl.read_op = NULL; + cl.read_state = 0; + } + else if (cl.read_state == CL_READ_REPLY_DATA) + { + // Reply is ready + auto req_it = cl.sent_ops.find(cl.read_reply_id); + osd_op_t *request = req_it->second; + cl.sent_ops.erase(req_it); + cl.read_reply_id = 0; + cl.read_op = NULL; + cl.read_state = 0; + // Measure subop latency + timespec tv_end; + clock_gettime(CLOCK_REALTIME, &tv_end); + subop_stat_count[request->req.hdr.opcode]++; + subop_stat_sum[request->req.hdr.opcode] += ( + (tv_end.tv_sec - request->tv_begin.tv_sec)*1000000 + + (tv_end.tv_nsec - request->tv_begin.tv_nsec)/1000 + ); + request->callback(request); + } +} + void osd_t::handle_op_hdr(osd_client_t *cl) { osd_op_t *cur_op = cl->read_op; @@ -190,6 +225,7 @@ void osd_t::handle_reply_hdr(osd_client_t *cl) else { cl->read_state = 0; + cl->read_op = NULL; cl->sent_ops.erase(req_it); // Measure subop latency timespec tv_end;