Compare commits

..

1 Commits

4 changed files with 36 additions and 79 deletions

View File

@@ -490,7 +490,14 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
fprintf(stderr, "Connected to OSD %lu using RDMA\n", cl->osd_num); fprintf(stderr, "Connected to OSD %lu using RDMA\n", cl->osd_num);
} }
cl->peer_state = PEER_RDMA; cl->peer_state = PEER_RDMA;
tfd->set_fd_handler(cl->peer_fd, false, NULL); tfd->set_fd_handler(cl->peer_fd, false, [this](int peer_fd, int epoll_events)
{
// Do not miss the disconnection!
if (epoll_events & EPOLLRDHUP)
{
handle_peer_epoll(peer_fd, epoll_events);
}
});
// Add the initial receive request // Add the initial receive request
try_recv_rdma(cl); try_recv_rdma(cl);
} }

View File

@@ -47,25 +47,8 @@ msgr_rdma_connection_t::~msgr_rdma_connection_t()
if (qp) if (qp)
ibv_destroy_qp(qp); ibv_destroy_qp(qp);
if (recv_buffers.size()) if (recv_buffers.size())
{
for (auto b: recv_buffers) for (auto b: recv_buffers)
{ free(b);
if (b.mr)
ibv_dereg_mr(b.mr);
free(b.buf);
}
recv_buffers.clear();
}
if (send_mrs.size())
{
for (auto & mr: send_mrs)
{
if (mr)
ibv_dereg_mr(mr);
mr = NULL;
}
send_mrs.clear();
}
} }
msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu, int log_level) msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu, int log_level)
@@ -158,24 +141,18 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) || !(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) ||
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV)) !(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV))
{ {
ctx->odp = false; fprintf(stderr, "The RDMA device isn't implicit ODP (On-Demand Paging) capable or does not support RC send and receive with ODP\n");
if (log_level > 0)
fprintf(stderr, "The RDMA device isn't implicit ODP (On-Demand Paging) capable. Falling back to slower version without ODP\n");
}
else
ctx->odp = true;
}
if (ctx->odp)
{
ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND);
if (!ctx->mr)
{
fprintf(stderr, "Couldn't register RDMA memory region\n");
goto cleanup; goto cleanup;
} }
} }
ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND);
if (!ctx->mr)
{
fprintf(stderr, "Couldn't register RDMA memory region\n");
goto cleanup;
}
ctx->channel = ibv_create_comp_channel(ctx->context); ctx->channel = ibv_create_comp_channel(ctx->context);
if (!ctx->channel) if (!ctx->channel)
{ {
@@ -399,6 +376,7 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
ibv_sge sge[rc->max_sge]; ibv_sge sge[rc->max_sge];
while (rc->send_pos < cl->send_list.size()) while (rc->send_pos < cl->send_list.size())
{ {
iovec & iov = cl->send_list[rc->send_pos];
if (op_size >= rc->max_msg || op_sge >= rc->max_sge) if (op_size >= rc->max_msg || op_sge >= rc->max_sge)
{ {
rc->send_sizes.push_back(op_size); rc->send_sizes.push_back(op_size);
@@ -410,23 +388,12 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
break; break;
} }
} }
iovec & iov = cl->send_list[rc->send_pos];
if (!rdma_context->odp && rc->send_mrs.size() <= rc->send_pos)
{
ibv_mr *mr = ibv_reg_mr(rdma_context->pd, iov.iov_base, iov.iov_len, IBV_ACCESS_LOCAL_WRITE);
if (!mr)
{
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
exit(1);
}
rc->send_mrs.push_back(mr);
}
uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->send_buf_pos < rc->max_msg uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->send_buf_pos < rc->max_msg
? iov.iov_len-rc->send_buf_pos : rc->max_msg-op_size); ? iov.iov_len-rc->send_buf_pos : rc->max_msg-op_size);
sge[op_sge++] = { sge[op_sge++] = {
.addr = (uintptr_t)((uint8_t*)iov.iov_base+rc->send_buf_pos), .addr = (uintptr_t)((uint8_t*)iov.iov_base+rc->send_buf_pos),
.length = len, .length = len,
.lkey = rdma_context->odp ? rc->ctx->mr->lkey : rc->send_mrs[rc->send_pos]->lkey, .lkey = rc->ctx->mr->lkey,
}; };
op_size += len; op_size += len;
rc->send_buf_pos += len; rc->send_buf_pos += len;
@@ -444,12 +411,12 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
return true; return true;
} }
static void try_recv_rdma_wr(osd_client_t *cl, msgr_rdma_buf_t b) static void try_recv_rdma_wr(osd_client_t *cl, void *buf)
{ {
ibv_sge sge = { ibv_sge sge = {
.addr = (uintptr_t)b.buf, .addr = (uintptr_t)buf,
.length = (uint32_t)cl->rdma_conn->max_msg, .length = (uint32_t)cl->rdma_conn->max_msg,
.lkey = !b.mr ? cl->rdma_conn->ctx->mr->lkey : b.mr->lkey, .lkey = cl->rdma_conn->ctx->mr->lkey,
}; };
ibv_recv_wr *bad_wr = NULL; ibv_recv_wr *bad_wr = NULL;
ibv_recv_wr wr = { ibv_recv_wr wr = {
@@ -471,19 +438,9 @@ bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
auto rc = cl->rdma_conn; auto rc = cl->rdma_conn;
while (rc->cur_recv < rc->max_recv) while (rc->cur_recv < rc->max_recv)
{ {
msgr_rdma_buf_t b; void *buf = malloc_or_die(rc->max_msg);
b.buf = malloc_or_die(rc->max_msg); rc->recv_buffers.push_back(buf);
if (!rdma_context->odp) try_recv_rdma_wr(cl, buf);
{
b.mr = ibv_reg_mr(rdma_context->pd, b.buf, rc->max_msg, IBV_ACCESS_LOCAL_WRITE);
if (!b.mr)
{
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
exit(1);
}
}
rc->recv_buffers.push_back(b);
try_recv_rdma_wr(cl, b);
} }
return true; return true;
} }
@@ -535,7 +492,7 @@ void osd_messenger_t::handle_rdma_events()
if (!is_send) if (!is_send)
{ {
rc->cur_recv--; rc->cur_recv--;
if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf].buf, wc[i].byte_len)) if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf], wc[i].byte_len))
{ {
// handle_read_buffer may stop the client // handle_read_buffer may stop the client
continue; continue;
@@ -570,11 +527,6 @@ void osd_messenger_t::handle_rdma_events()
rc->send_pos -= send_pos; rc->send_pos -= send_pos;
for (int i = 0; i < send_pos; i++) for (int i = 0; i < send_pos; i++)
{ {
if (!rdma_context->odp)
{
ibv_dereg_mr(rc->send_mrs[i]);
rc->send_mrs[i] = NULL;
}
if (cl->outbox[i].flags & MSGR_SENDP_FREE) if (cl->outbox[i].flags & MSGR_SENDP_FREE)
{ {
// Reply fully sent // Reply fully sent
@@ -583,8 +535,6 @@ void osd_messenger_t::handle_rdma_events()
} }
if (send_pos > 0) if (send_pos > 0)
{ {
if (!rdma_context->odp)
rc->send_mrs.erase(rc->send_mrs.begin(), rc->send_mrs.begin()+send_pos);
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+send_pos); cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+send_pos);
cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+send_pos); cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+send_pos);
} }

View File

@@ -23,7 +23,6 @@ struct msgr_rdma_context_t
ibv_device *dev = NULL; ibv_device *dev = NULL;
ibv_device_attr_ex attrx; ibv_device_attr_ex attrx;
ibv_pd *pd = NULL; ibv_pd *pd = NULL;
bool odp = false;
ibv_mr *mr = NULL; ibv_mr *mr = NULL;
ibv_comp_channel *channel = NULL; ibv_comp_channel *channel = NULL;
ibv_cq *cq = NULL; ibv_cq *cq = NULL;
@@ -40,12 +39,6 @@ struct msgr_rdma_context_t
~msgr_rdma_context_t(); ~msgr_rdma_context_t();
}; };
struct msgr_rdma_buf_t
{
void *buf = NULL;
ibv_mr *mr = NULL;
};
struct msgr_rdma_connection_t struct msgr_rdma_connection_t
{ {
msgr_rdma_context_t *ctx = NULL; msgr_rdma_context_t *ctx = NULL;
@@ -57,9 +50,8 @@ struct msgr_rdma_connection_t
int send_pos = 0, send_buf_pos = 0; int send_pos = 0, send_buf_pos = 0;
int next_recv_buf = 0; int next_recv_buf = 0;
std::vector<msgr_rdma_buf_t> recv_buffers; std::vector<void*> recv_buffers;
std::vector<uint64_t> send_sizes; std::vector<uint64_t> send_sizes;
std::vector<ibv_mr*> send_mrs;
~msgr_rdma_connection_t(); ~msgr_rdma_connection_t();
static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge, uint32_t max_msg); static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge, uint32_t max_msg);

View File

@@ -3,6 +3,7 @@
#define _XOPEN_SOURCE #define _XOPEN_SOURCE
#include <limits.h> #include <limits.h>
#include <sys/epoll.h>
#include "messenger.h" #include "messenger.h"
@@ -283,7 +284,14 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
fprintf(stderr, "Successfully connected with client %d using RDMA\n", cl->peer_fd); fprintf(stderr, "Successfully connected with client %d using RDMA\n", cl->peer_fd);
} }
cl->peer_state = PEER_RDMA; cl->peer_state = PEER_RDMA;
tfd->set_fd_handler(cl->peer_fd, false, NULL); tfd->set_fd_handler(cl->peer_fd, false, [this](int peer_fd, int epoll_events)
{
// Do not miss the disconnection!
if (epoll_events & EPOLLRDHUP)
{
handle_peer_epoll(peer_fd, epoll_events);
}
});
// Add the initial receive request // Add the initial receive request
try_recv_rdma(cl); try_recv_rdma(cl);
} }