From 8aec63fddde1b4d27f533990924cf992a0c7749b Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 24 Mar 2023 01:47:32 +0300 Subject: [PATCH] Implement simple "flow control" for RDMA (to check hypothesis about slowdowns) --- src/msgr_rdma.cpp | 25 +++++++++++++++++++++++-- src/msgr_rdma.h | 3 ++- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/src/msgr_rdma.cpp b/src/msgr_rdma.cpp index 8e63195f..20b48f13 100644 --- a/src/msgr_rdma.cpp +++ b/src/msgr_rdma.cpp @@ -353,8 +353,10 @@ static void try_send_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge) .wr_id = (uint64_t)(cl->peer_fd*2+1), .sg_list = sge, .num_sge = op_sge, - .opcode = IBV_WR_SEND, + .opcode = cl->rdma_conn->avail_recv > 0 ? IBV_WR_SEND_WITH_IMM : IBV_WR_SEND, .send_flags = IBV_SEND_SIGNALED, + // Notify peer about our available incoming buffers + .imm_data = (uint32_t)cl->rdma_conn->avail_recv, }; int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr); if (err || bad_wr) @@ -363,15 +365,27 @@ static void try_send_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge) exit(1); } cl->rdma_conn->cur_send++; + cl->rdma_conn->avail_send--; + cl->rdma_conn->avail_recv = 0; } bool osd_messenger_t::try_send_rdma(osd_client_t *cl) { auto rc = cl->rdma_conn; - if (!cl->send_list.size() || rc->cur_send >= rc->max_send) + if (rc->cur_send >= rc->max_send) { return true; } + if (!cl->send_list.size() || rc->use_flow_control && rc->avail_send <= 0) + { + if (rc->avail_recv) + { + // Only notify about available buffers so 2 peers don't lock each other + rc->send_sizes.push_back(0); + try_send_rdma_wr(cl, NULL, 0); + } + return true; + } uint64_t op_size = 0, op_sge = 0; ibv_sge sge[rc->max_sge]; while (rc->send_pos < cl->send_list.size()) @@ -431,6 +445,7 @@ static void try_recv_rdma_wr(osd_client_t *cl, void *buf) exit(1); } cl->rdma_conn->cur_recv++; + cl->rdma_conn->avail_recv++; } bool osd_messenger_t::try_recv_rdma(osd_client_t *cl) @@ -492,6 +507,11 @@ void osd_messenger_t::handle_rdma_events() if (!is_send) { rc->cur_recv--; + if ((wc[i].wc_flags & IBV_WC_WITH_IMM) && wc[i].imm_data > 0) + { + rc->avail_send += wc[i].imm_data; + rc->use_flow_control = true; + } if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf], wc[i].byte_len)) { // handle_read_buffer may stop the client @@ -499,6 +519,7 @@ void osd_messenger_t::handle_rdma_events() } try_recv_rdma_wr(cl, rc->recv_buffers[rc->next_recv_buf]); rc->next_recv_buf = (rc->next_recv_buf+1) % rc->recv_buffers.size(); + try_send_rdma(cl); } else { diff --git a/src/msgr_rdma.h b/src/msgr_rdma.h index 7c56f4a1..659fbd56 100644 --- a/src/msgr_rdma.h +++ b/src/msgr_rdma.h @@ -45,7 +45,8 @@ struct msgr_rdma_connection_t ibv_qp *qp = NULL; msgr_rdma_address_t addr; int max_send = 0, max_recv = 0, max_sge = 0; - int cur_send = 0, cur_recv = 0; + int cur_send = 0, cur_recv = 0, avail_recv = 0, avail_send = 0; + bool use_flow_control = false; uint64_t max_msg = 0; int send_pos = 0, send_buf_pos = 0;