Compare commits
1 Commits
master
...
rdma-simpl
Author | SHA1 | Date |
---|---|---|
Vitaliy Filippov | f5e5686540 |
|
@ -47,8 +47,25 @@ msgr_rdma_connection_t::~msgr_rdma_connection_t()
|
|||
if (qp)
|
||||
ibv_destroy_qp(qp);
|
||||
if (recv_buffers.size())
|
||||
{
|
||||
for (auto b: recv_buffers)
|
||||
free(b);
|
||||
{
|
||||
if (b.mr)
|
||||
ibv_dereg_mr(b.mr);
|
||||
free(b.buf);
|
||||
}
|
||||
recv_buffers.clear();
|
||||
}
|
||||
if (send_mrs.size())
|
||||
{
|
||||
for (auto & mr: send_mrs)
|
||||
{
|
||||
if (mr)
|
||||
ibv_dereg_mr(mr);
|
||||
mr = NULL;
|
||||
}
|
||||
send_mrs.clear();
|
||||
}
|
||||
}
|
||||
|
||||
msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu, int log_level)
|
||||
|
@ -141,17 +158,23 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t
|
|||
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) ||
|
||||
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV))
|
||||
{
|
||||
fprintf(stderr, "The RDMA device isn't implicit ODP (On-Demand Paging) capable or does not support RC send and receive with ODP\n");
|
||||
goto cleanup;
|
||||
ctx->odp = false;
|
||||
if (log_level > 0)
|
||||
fprintf(stderr, "The RDMA device isn't implicit ODP (On-Demand Paging) capable. Falling back to slower version without ODP\n");
|
||||
}
|
||||
else
|
||||
ctx->odp = true;
|
||||
}
|
||||
|
||||
if (ctx->odp)
|
||||
{
|
||||
ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND);
|
||||
if (!ctx->mr)
|
||||
{
|
||||
fprintf(stderr, "Couldn't register RDMA memory region\n");
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
|
||||
ctx->channel = ibv_create_comp_channel(ctx->context);
|
||||
if (!ctx->channel)
|
||||
|
@ -376,7 +399,6 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
|
|||
ibv_sge sge[rc->max_sge];
|
||||
while (rc->send_pos < cl->send_list.size())
|
||||
{
|
||||
iovec & iov = cl->send_list[rc->send_pos];
|
||||
if (op_size >= rc->max_msg || op_sge >= rc->max_sge)
|
||||
{
|
||||
rc->send_sizes.push_back(op_size);
|
||||
|
@ -388,12 +410,23 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
|
|||
break;
|
||||
}
|
||||
}
|
||||
iovec & iov = cl->send_list[rc->send_pos];
|
||||
if (!rdma_context->odp && rc->send_mrs.size() <= rc->send_pos)
|
||||
{
|
||||
ibv_mr *mr = ibv_reg_mr(rdma_context->pd, iov.iov_base, iov.iov_len, IBV_ACCESS_LOCAL_WRITE);
|
||||
if (!mr)
|
||||
{
|
||||
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
|
||||
exit(1);
|
||||
}
|
||||
rc->send_mrs.push_back(mr);
|
||||
}
|
||||
uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->send_buf_pos < rc->max_msg
|
||||
? iov.iov_len-rc->send_buf_pos : rc->max_msg-op_size);
|
||||
sge[op_sge++] = {
|
||||
.addr = (uintptr_t)((uint8_t*)iov.iov_base+rc->send_buf_pos),
|
||||
.length = len,
|
||||
.lkey = rc->ctx->mr->lkey,
|
||||
.lkey = rdma_context->odp ? rc->ctx->mr->lkey : rc->send_mrs[rc->send_pos]->lkey,
|
||||
};
|
||||
op_size += len;
|
||||
rc->send_buf_pos += len;
|
||||
|
@ -411,12 +444,12 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
|
|||
return true;
|
||||
}
|
||||
|
||||
static void try_recv_rdma_wr(osd_client_t *cl, void *buf)
|
||||
static void try_recv_rdma_wr(osd_client_t *cl, msgr_rdma_buf_t b)
|
||||
{
|
||||
ibv_sge sge = {
|
||||
.addr = (uintptr_t)buf,
|
||||
.addr = (uintptr_t)b.buf,
|
||||
.length = (uint32_t)cl->rdma_conn->max_msg,
|
||||
.lkey = cl->rdma_conn->ctx->mr->lkey,
|
||||
.lkey = !b.mr ? cl->rdma_conn->ctx->mr->lkey : b.mr->lkey,
|
||||
};
|
||||
ibv_recv_wr *bad_wr = NULL;
|
||||
ibv_recv_wr wr = {
|
||||
|
@ -438,9 +471,19 @@ bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
|
|||
auto rc = cl->rdma_conn;
|
||||
while (rc->cur_recv < rc->max_recv)
|
||||
{
|
||||
void *buf = malloc_or_die(rc->max_msg);
|
||||
rc->recv_buffers.push_back(buf);
|
||||
try_recv_rdma_wr(cl, buf);
|
||||
msgr_rdma_buf_t b;
|
||||
b.buf = malloc_or_die(rc->max_msg);
|
||||
if (!rdma_context->odp)
|
||||
{
|
||||
b.mr = ibv_reg_mr(rdma_context->pd, b.buf, rc->max_msg, IBV_ACCESS_LOCAL_WRITE);
|
||||
if (!b.mr)
|
||||
{
|
||||
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
rc->recv_buffers.push_back(b);
|
||||
try_recv_rdma_wr(cl, b);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -492,7 +535,7 @@ void osd_messenger_t::handle_rdma_events()
|
|||
if (!is_send)
|
||||
{
|
||||
rc->cur_recv--;
|
||||
if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf], wc[i].byte_len))
|
||||
if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf].buf, wc[i].byte_len))
|
||||
{
|
||||
// handle_read_buffer may stop the client
|
||||
continue;
|
||||
|
@ -527,6 +570,11 @@ void osd_messenger_t::handle_rdma_events()
|
|||
rc->send_pos -= send_pos;
|
||||
for (int i = 0; i < send_pos; i++)
|
||||
{
|
||||
if (!rdma_context->odp)
|
||||
{
|
||||
ibv_dereg_mr(rc->send_mrs[i]);
|
||||
rc->send_mrs[i] = NULL;
|
||||
}
|
||||
if (cl->outbox[i].flags & MSGR_SENDP_FREE)
|
||||
{
|
||||
// Reply fully sent
|
||||
|
@ -535,6 +583,8 @@ void osd_messenger_t::handle_rdma_events()
|
|||
}
|
||||
if (send_pos > 0)
|
||||
{
|
||||
if (!rdma_context->odp)
|
||||
rc->send_mrs.erase(rc->send_mrs.begin(), rc->send_mrs.begin()+send_pos);
|
||||
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+send_pos);
|
||||
cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+send_pos);
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ struct msgr_rdma_context_t
|
|||
ibv_device *dev = NULL;
|
||||
ibv_device_attr_ex attrx;
|
||||
ibv_pd *pd = NULL;
|
||||
bool odp = false;
|
||||
ibv_mr *mr = NULL;
|
||||
ibv_comp_channel *channel = NULL;
|
||||
ibv_cq *cq = NULL;
|
||||
|
@ -39,6 +40,12 @@ struct msgr_rdma_context_t
|
|||
~msgr_rdma_context_t();
|
||||
};
|
||||
|
||||
struct msgr_rdma_buf_t
|
||||
{
|
||||
void *buf = NULL;
|
||||
ibv_mr *mr = NULL;
|
||||
};
|
||||
|
||||
struct msgr_rdma_connection_t
|
||||
{
|
||||
msgr_rdma_context_t *ctx = NULL;
|
||||
|
@ -50,8 +57,9 @@ struct msgr_rdma_connection_t
|
|||
|
||||
int send_pos = 0, send_buf_pos = 0;
|
||||
int next_recv_buf = 0;
|
||||
std::vector<void*> recv_buffers;
|
||||
std::vector<msgr_rdma_buf_t> recv_buffers;
|
||||
std::vector<uint64_t> send_sizes;
|
||||
std::vector<ibv_mr*> send_mrs;
|
||||
|
||||
~msgr_rdma_connection_t();
|
||||
static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge, uint32_t max_msg);
|
||||
|
|
Loading…
Reference in New Issue