1
0
Fork 0

Compare commits

...

1 Commits

2 changed files with 75 additions and 17 deletions

View File

@ -47,8 +47,25 @@ msgr_rdma_connection_t::~msgr_rdma_connection_t()
if (qp)
ibv_destroy_qp(qp);
if (recv_buffers.size())
{
for (auto b: recv_buffers)
free(b);
{
if (b.mr)
ibv_dereg_mr(b.mr);
free(b.buf);
}
recv_buffers.clear();
}
if (send_mrs.size())
{
for (auto & mr: send_mrs)
{
if (mr)
ibv_dereg_mr(mr);
mr = NULL;
}
send_mrs.clear();
}
}
msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu, int log_level)
@ -141,16 +158,22 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) ||
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV))
{
fprintf(stderr, "The RDMA device isn't implicit ODP (On-Demand Paging) capable or does not support RC send and receive with ODP\n");
goto cleanup;
ctx->odp = false;
if (log_level > 0)
fprintf(stderr, "The RDMA device isn't implicit ODP (On-Demand Paging) capable. Falling back to slower version without ODP\n");
}
else
ctx->odp = true;
}
ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND);
if (!ctx->mr)
if (ctx->odp)
{
fprintf(stderr, "Couldn't register RDMA memory region\n");
goto cleanup;
ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND);
if (!ctx->mr)
{
fprintf(stderr, "Couldn't register RDMA memory region\n");
goto cleanup;
}
}
ctx->channel = ibv_create_comp_channel(ctx->context);
@ -376,7 +399,6 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
ibv_sge sge[rc->max_sge];
while (rc->send_pos < cl->send_list.size())
{
iovec & iov = cl->send_list[rc->send_pos];
if (op_size >= rc->max_msg || op_sge >= rc->max_sge)
{
rc->send_sizes.push_back(op_size);
@ -388,12 +410,23 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
break;
}
}
iovec & iov = cl->send_list[rc->send_pos];
if (!rdma_context->odp && rc->send_mrs.size() <= rc->send_pos)
{
ibv_mr *mr = ibv_reg_mr(rdma_context->pd, iov.iov_base, iov.iov_len, IBV_ACCESS_LOCAL_WRITE);
if (!mr)
{
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
exit(1);
}
rc->send_mrs.push_back(mr);
}
uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->send_buf_pos < rc->max_msg
? iov.iov_len-rc->send_buf_pos : rc->max_msg-op_size);
sge[op_sge++] = {
.addr = (uintptr_t)((uint8_t*)iov.iov_base+rc->send_buf_pos),
.length = len,
.lkey = rc->ctx->mr->lkey,
.lkey = rdma_context->odp ? rc->ctx->mr->lkey : rc->send_mrs[rc->send_pos]->lkey,
};
op_size += len;
rc->send_buf_pos += len;
@ -411,12 +444,12 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
return true;
}
static void try_recv_rdma_wr(osd_client_t *cl, void *buf)
static void try_recv_rdma_wr(osd_client_t *cl, msgr_rdma_buf_t b)
{
ibv_sge sge = {
.addr = (uintptr_t)buf,
.addr = (uintptr_t)b.buf,
.length = (uint32_t)cl->rdma_conn->max_msg,
.lkey = cl->rdma_conn->ctx->mr->lkey,
.lkey = !b.mr ? cl->rdma_conn->ctx->mr->lkey : b.mr->lkey,
};
ibv_recv_wr *bad_wr = NULL;
ibv_recv_wr wr = {
@ -438,9 +471,19 @@ bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
auto rc = cl->rdma_conn;
while (rc->cur_recv < rc->max_recv)
{
void *buf = malloc_or_die(rc->max_msg);
rc->recv_buffers.push_back(buf);
try_recv_rdma_wr(cl, buf);
msgr_rdma_buf_t b;
b.buf = malloc_or_die(rc->max_msg);
if (!rdma_context->odp)
{
b.mr = ibv_reg_mr(rdma_context->pd, b.buf, rc->max_msg, IBV_ACCESS_LOCAL_WRITE);
if (!b.mr)
{
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
exit(1);
}
}
rc->recv_buffers.push_back(b);
try_recv_rdma_wr(cl, b);
}
return true;
}
@ -492,7 +535,7 @@ void osd_messenger_t::handle_rdma_events()
if (!is_send)
{
rc->cur_recv--;
if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf], wc[i].byte_len))
if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf].buf, wc[i].byte_len))
{
// handle_read_buffer may stop the client
continue;
@ -527,6 +570,11 @@ void osd_messenger_t::handle_rdma_events()
rc->send_pos -= send_pos;
for (int i = 0; i < send_pos; i++)
{
if (!rdma_context->odp)
{
ibv_dereg_mr(rc->send_mrs[i]);
rc->send_mrs[i] = NULL;
}
if (cl->outbox[i].flags & MSGR_SENDP_FREE)
{
// Reply fully sent
@ -535,6 +583,8 @@ void osd_messenger_t::handle_rdma_events()
}
if (send_pos > 0)
{
if (!rdma_context->odp)
rc->send_mrs.erase(rc->send_mrs.begin(), rc->send_mrs.begin()+send_pos);
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+send_pos);
cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+send_pos);
}

View File

@ -23,6 +23,7 @@ struct msgr_rdma_context_t
ibv_device *dev = NULL;
ibv_device_attr_ex attrx;
ibv_pd *pd = NULL;
bool odp = false;
ibv_mr *mr = NULL;
ibv_comp_channel *channel = NULL;
ibv_cq *cq = NULL;
@ -39,6 +40,12 @@ struct msgr_rdma_context_t
~msgr_rdma_context_t();
};
struct msgr_rdma_buf_t
{
void *buf = NULL;
ibv_mr *mr = NULL;
};
struct msgr_rdma_connection_t
{
msgr_rdma_context_t *ctx = NULL;
@ -50,8 +57,9 @@ struct msgr_rdma_connection_t
int send_pos = 0, send_buf_pos = 0;
int next_recv_buf = 0;
std::vector<void*> recv_buffers;
std::vector<msgr_rdma_buf_t> recv_buffers;
std::vector<uint64_t> send_sizes;
std::vector<ibv_mr*> send_mrs;
~msgr_rdma_connection_t();
static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge, uint32_t max_msg);