diff --git a/src/msgr_rdma.cpp b/src/msgr_rdma.cpp index d39105de..1d5a59bd 100644 --- a/src/msgr_rdma.cpp +++ b/src/msgr_rdma.cpp @@ -47,8 +47,25 @@ msgr_rdma_connection_t::~msgr_rdma_connection_t() if (qp) ibv_destroy_qp(qp); if (recv_buffers.size()) + { for (auto b: recv_buffers) - free(b); + { + if (b.mr) + ibv_dereg_mr(b.mr); + free(b.buf); + } + recv_buffers.clear(); + } + if (send_mrs.size()) + { + for (auto & mr: send_mrs) + { + if (mr) + ibv_dereg_mr(mr); + mr = NULL; + } + send_mrs.clear(); + } } msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu, int log_level) @@ -141,16 +158,22 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t !(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) || !(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV)) { - fprintf(stderr, "The RDMA device isn't implicit ODP (On-Demand Paging) capable or does not support RC send and receive with ODP\n"); - goto cleanup; + ctx->odp = false; + if (log_level > 0) + fprintf(stderr, "The RDMA device isn't implicit ODP (On-Demand Paging) capable. Falling back to slower version without ODP\n"); } + else + ctx->odp = true; } - ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND); - if (!ctx->mr) + if (ctx->odp) { - fprintf(stderr, "Couldn't register RDMA memory region\n"); - goto cleanup; + ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND); + if (!ctx->mr) + { + fprintf(stderr, "Couldn't register RDMA memory region\n"); + goto cleanup; + } } ctx->channel = ibv_create_comp_channel(ctx->context); @@ -376,7 +399,6 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl) ibv_sge sge[rc->max_sge]; while (rc->send_pos < cl->send_list.size()) { - iovec & iov = cl->send_list[rc->send_pos]; if (op_size >= rc->max_msg || op_sge >= rc->max_sge) { rc->send_sizes.push_back(op_size); @@ -388,12 +410,23 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl) break; } } + iovec & iov = cl->send_list[rc->send_pos]; + if (!rdma_context->odp && rc->send_mrs.size() <= rc->send_pos) + { + ibv_mr *mr = ibv_reg_mr(rdma_context->pd, iov.iov_base, iov.iov_len, IBV_ACCESS_LOCAL_WRITE); + if (!mr) + { + fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno)); + exit(1); + } + rc->send_mrs.push_back(mr); + } uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->send_buf_pos < rc->max_msg ? iov.iov_len-rc->send_buf_pos : rc->max_msg-op_size); sge[op_sge++] = { .addr = (uintptr_t)((uint8_t*)iov.iov_base+rc->send_buf_pos), .length = len, - .lkey = rc->ctx->mr->lkey, + .lkey = rdma_context->odp ? rc->ctx->mr->lkey : rc->send_mrs[rc->send_pos]->lkey, }; op_size += len; rc->send_buf_pos += len; @@ -411,12 +444,12 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl) return true; } -static void try_recv_rdma_wr(osd_client_t *cl, void *buf) +static void try_recv_rdma_wr(osd_client_t *cl, msgr_rdma_buf_t b) { ibv_sge sge = { - .addr = (uintptr_t)buf, + .addr = (uintptr_t)b.buf, .length = (uint32_t)cl->rdma_conn->max_msg, - .lkey = cl->rdma_conn->ctx->mr->lkey, + .lkey = !b.mr ? cl->rdma_conn->ctx->mr->lkey : b.mr->lkey, }; ibv_recv_wr *bad_wr = NULL; ibv_recv_wr wr = { @@ -438,9 +471,19 @@ bool osd_messenger_t::try_recv_rdma(osd_client_t *cl) auto rc = cl->rdma_conn; while (rc->cur_recv < rc->max_recv) { - void *buf = malloc_or_die(rc->max_msg); - rc->recv_buffers.push_back(buf); - try_recv_rdma_wr(cl, buf); + msgr_rdma_buf_t b; + b.buf = malloc_or_die(rc->max_msg); + if (!rdma_context->odp) + { + b.mr = ibv_reg_mr(rdma_context->pd, b.buf, rc->max_msg, IBV_ACCESS_LOCAL_WRITE); + if (!b.mr) + { + fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno)); + exit(1); + } + } + rc->recv_buffers.push_back(b); + try_recv_rdma_wr(cl, b); } return true; } @@ -492,7 +535,7 @@ void osd_messenger_t::handle_rdma_events() if (!is_send) { rc->cur_recv--; - if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf], wc[i].byte_len)) + if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf].buf, wc[i].byte_len)) { // handle_read_buffer may stop the client continue; @@ -527,6 +570,11 @@ void osd_messenger_t::handle_rdma_events() rc->send_pos -= send_pos; for (int i = 0; i < send_pos; i++) { + if (!rdma_context->odp) + { + ibv_dereg_mr(rc->send_mrs[i]); + rc->send_mrs[i] = NULL; + } if (cl->outbox[i].flags & MSGR_SENDP_FREE) { // Reply fully sent @@ -535,6 +583,8 @@ void osd_messenger_t::handle_rdma_events() } if (send_pos > 0) { + if (!rdma_context->odp) + rc->send_mrs.erase(rc->send_mrs.begin(), rc->send_mrs.begin()+send_pos); cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+send_pos); cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+send_pos); } diff --git a/src/msgr_rdma.h b/src/msgr_rdma.h index 7c56f4a1..b4502daa 100644 --- a/src/msgr_rdma.h +++ b/src/msgr_rdma.h @@ -23,6 +23,7 @@ struct msgr_rdma_context_t ibv_device *dev = NULL; ibv_device_attr_ex attrx; ibv_pd *pd = NULL; + bool odp = false; ibv_mr *mr = NULL; ibv_comp_channel *channel = NULL; ibv_cq *cq = NULL; @@ -39,6 +40,12 @@ struct msgr_rdma_context_t ~msgr_rdma_context_t(); }; +struct msgr_rdma_buf_t +{ + void *buf = NULL; + ibv_mr *mr = NULL; +}; + struct msgr_rdma_connection_t { msgr_rdma_context_t *ctx = NULL; @@ -50,8 +57,9 @@ struct msgr_rdma_connection_t int send_pos = 0, send_buf_pos = 0; int next_recv_buf = 0; - std::vector recv_buffers; + std::vector recv_buffers; std::vector send_sizes; + std::vector send_mrs; ~msgr_rdma_connection_t(); static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge, uint32_t max_msg);