1
0
Fork 0

Compare commits

...

1 Commits

2 changed files with 75 additions and 17 deletions

View File

@ -47,8 +47,25 @@ msgr_rdma_connection_t::~msgr_rdma_connection_t()
if (qp) if (qp)
ibv_destroy_qp(qp); ibv_destroy_qp(qp);
if (recv_buffers.size()) if (recv_buffers.size())
{
for (auto b: recv_buffers) for (auto b: recv_buffers)
free(b); {
if (b.mr)
ibv_dereg_mr(b.mr);
free(b.buf);
}
recv_buffers.clear();
}
if (send_mrs.size())
{
for (auto & mr: send_mrs)
{
if (mr)
ibv_dereg_mr(mr);
mr = NULL;
}
send_mrs.clear();
}
} }
msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu, int log_level) msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu, int log_level)
@ -141,17 +158,23 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) || !(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) ||
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV)) !(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV))
{ {
fprintf(stderr, "The RDMA device isn't implicit ODP (On-Demand Paging) capable or does not support RC send and receive with ODP\n"); ctx->odp = false;
goto cleanup; if (log_level > 0)
fprintf(stderr, "The RDMA device isn't implicit ODP (On-Demand Paging) capable. Falling back to slower version without ODP\n");
} }
else
ctx->odp = true;
} }
if (ctx->odp)
{
ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND); ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND);
if (!ctx->mr) if (!ctx->mr)
{ {
fprintf(stderr, "Couldn't register RDMA memory region\n"); fprintf(stderr, "Couldn't register RDMA memory region\n");
goto cleanup; goto cleanup;
} }
}
ctx->channel = ibv_create_comp_channel(ctx->context); ctx->channel = ibv_create_comp_channel(ctx->context);
if (!ctx->channel) if (!ctx->channel)
@ -376,7 +399,6 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
ibv_sge sge[rc->max_sge]; ibv_sge sge[rc->max_sge];
while (rc->send_pos < cl->send_list.size()) while (rc->send_pos < cl->send_list.size())
{ {
iovec & iov = cl->send_list[rc->send_pos];
if (op_size >= rc->max_msg || op_sge >= rc->max_sge) if (op_size >= rc->max_msg || op_sge >= rc->max_sge)
{ {
rc->send_sizes.push_back(op_size); rc->send_sizes.push_back(op_size);
@ -388,12 +410,23 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
break; break;
} }
} }
iovec & iov = cl->send_list[rc->send_pos];
if (!rdma_context->odp && rc->send_mrs.size() <= rc->send_pos)
{
ibv_mr *mr = ibv_reg_mr(rdma_context->pd, iov.iov_base, iov.iov_len, IBV_ACCESS_LOCAL_WRITE);
if (!mr)
{
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
exit(1);
}
rc->send_mrs.push_back(mr);
}
uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->send_buf_pos < rc->max_msg uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->send_buf_pos < rc->max_msg
? iov.iov_len-rc->send_buf_pos : rc->max_msg-op_size); ? iov.iov_len-rc->send_buf_pos : rc->max_msg-op_size);
sge[op_sge++] = { sge[op_sge++] = {
.addr = (uintptr_t)((uint8_t*)iov.iov_base+rc->send_buf_pos), .addr = (uintptr_t)((uint8_t*)iov.iov_base+rc->send_buf_pos),
.length = len, .length = len,
.lkey = rc->ctx->mr->lkey, .lkey = rdma_context->odp ? rc->ctx->mr->lkey : rc->send_mrs[rc->send_pos]->lkey,
}; };
op_size += len; op_size += len;
rc->send_buf_pos += len; rc->send_buf_pos += len;
@ -411,12 +444,12 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
return true; return true;
} }
static void try_recv_rdma_wr(osd_client_t *cl, void *buf) static void try_recv_rdma_wr(osd_client_t *cl, msgr_rdma_buf_t b)
{ {
ibv_sge sge = { ibv_sge sge = {
.addr = (uintptr_t)buf, .addr = (uintptr_t)b.buf,
.length = (uint32_t)cl->rdma_conn->max_msg, .length = (uint32_t)cl->rdma_conn->max_msg,
.lkey = cl->rdma_conn->ctx->mr->lkey, .lkey = !b.mr ? cl->rdma_conn->ctx->mr->lkey : b.mr->lkey,
}; };
ibv_recv_wr *bad_wr = NULL; ibv_recv_wr *bad_wr = NULL;
ibv_recv_wr wr = { ibv_recv_wr wr = {
@ -438,9 +471,19 @@ bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
auto rc = cl->rdma_conn; auto rc = cl->rdma_conn;
while (rc->cur_recv < rc->max_recv) while (rc->cur_recv < rc->max_recv)
{ {
void *buf = malloc_or_die(rc->max_msg); msgr_rdma_buf_t b;
rc->recv_buffers.push_back(buf); b.buf = malloc_or_die(rc->max_msg);
try_recv_rdma_wr(cl, buf); if (!rdma_context->odp)
{
b.mr = ibv_reg_mr(rdma_context->pd, b.buf, rc->max_msg, IBV_ACCESS_LOCAL_WRITE);
if (!b.mr)
{
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
exit(1);
}
}
rc->recv_buffers.push_back(b);
try_recv_rdma_wr(cl, b);
} }
return true; return true;
} }
@ -492,7 +535,7 @@ void osd_messenger_t::handle_rdma_events()
if (!is_send) if (!is_send)
{ {
rc->cur_recv--; rc->cur_recv--;
if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf], wc[i].byte_len)) if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf].buf, wc[i].byte_len))
{ {
// handle_read_buffer may stop the client // handle_read_buffer may stop the client
continue; continue;
@ -527,6 +570,11 @@ void osd_messenger_t::handle_rdma_events()
rc->send_pos -= send_pos; rc->send_pos -= send_pos;
for (int i = 0; i < send_pos; i++) for (int i = 0; i < send_pos; i++)
{ {
if (!rdma_context->odp)
{
ibv_dereg_mr(rc->send_mrs[i]);
rc->send_mrs[i] = NULL;
}
if (cl->outbox[i].flags & MSGR_SENDP_FREE) if (cl->outbox[i].flags & MSGR_SENDP_FREE)
{ {
// Reply fully sent // Reply fully sent
@ -535,6 +583,8 @@ void osd_messenger_t::handle_rdma_events()
} }
if (send_pos > 0) if (send_pos > 0)
{ {
if (!rdma_context->odp)
rc->send_mrs.erase(rc->send_mrs.begin(), rc->send_mrs.begin()+send_pos);
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+send_pos); cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+send_pos);
cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+send_pos); cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+send_pos);
} }

View File

@ -23,6 +23,7 @@ struct msgr_rdma_context_t
ibv_device *dev = NULL; ibv_device *dev = NULL;
ibv_device_attr_ex attrx; ibv_device_attr_ex attrx;
ibv_pd *pd = NULL; ibv_pd *pd = NULL;
bool odp = false;
ibv_mr *mr = NULL; ibv_mr *mr = NULL;
ibv_comp_channel *channel = NULL; ibv_comp_channel *channel = NULL;
ibv_cq *cq = NULL; ibv_cq *cq = NULL;
@ -39,6 +40,12 @@ struct msgr_rdma_context_t
~msgr_rdma_context_t(); ~msgr_rdma_context_t();
}; };
struct msgr_rdma_buf_t
{
void *buf = NULL;
ibv_mr *mr = NULL;
};
struct msgr_rdma_connection_t struct msgr_rdma_connection_t
{ {
msgr_rdma_context_t *ctx = NULL; msgr_rdma_context_t *ctx = NULL;
@ -50,8 +57,9 @@ struct msgr_rdma_connection_t
int send_pos = 0, send_buf_pos = 0; int send_pos = 0, send_buf_pos = 0;
int next_recv_buf = 0; int next_recv_buf = 0;
std::vector<void*> recv_buffers; std::vector<msgr_rdma_buf_t> recv_buffers;
std::vector<uint64_t> send_sizes; std::vector<uint64_t> send_sizes;
std::vector<ibv_mr*> send_mrs;
~msgr_rdma_connection_t(); ~msgr_rdma_connection_t();
static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge, uint32_t max_msg); static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge, uint32_t max_msg);