forked from vitalif/vitastor
Compare commits
1 Commits
rdma-simpl
...
hotfix-1.2
Author | SHA1 | Date | |
---|---|---|---|
7e82573ed0 |
@@ -490,7 +490,14 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
|
||||
fprintf(stderr, "Connected to OSD %lu using RDMA\n", cl->osd_num);
|
||||
}
|
||||
cl->peer_state = PEER_RDMA;
|
||||
tfd->set_fd_handler(cl->peer_fd, false, NULL);
|
||||
tfd->set_fd_handler(cl->peer_fd, false, [this](int peer_fd, int epoll_events)
|
||||
{
|
||||
// Do not miss the disconnection!
|
||||
if (epoll_events & EPOLLRDHUP)
|
||||
{
|
||||
handle_peer_epoll(peer_fd, epoll_events);
|
||||
}
|
||||
});
|
||||
// Add the initial receive request
|
||||
try_recv_rdma(cl);
|
||||
}
|
||||
|
@@ -47,25 +47,8 @@ msgr_rdma_connection_t::~msgr_rdma_connection_t()
|
||||
if (qp)
|
||||
ibv_destroy_qp(qp);
|
||||
if (recv_buffers.size())
|
||||
{
|
||||
for (auto b: recv_buffers)
|
||||
{
|
||||
if (b.mr)
|
||||
ibv_dereg_mr(b.mr);
|
||||
free(b.buf);
|
||||
}
|
||||
recv_buffers.clear();
|
||||
}
|
||||
if (send_mrs.size())
|
||||
{
|
||||
for (auto & mr: send_mrs)
|
||||
{
|
||||
if (mr)
|
||||
ibv_dereg_mr(mr);
|
||||
mr = NULL;
|
||||
}
|
||||
send_mrs.clear();
|
||||
}
|
||||
free(b);
|
||||
}
|
||||
|
||||
msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu, int log_level)
|
||||
@@ -158,24 +141,18 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t
|
||||
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) ||
|
||||
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV))
|
||||
{
|
||||
ctx->odp = false;
|
||||
if (log_level > 0)
|
||||
fprintf(stderr, "The RDMA device isn't implicit ODP (On-Demand Paging) capable. Falling back to slower version without ODP\n");
|
||||
}
|
||||
else
|
||||
ctx->odp = true;
|
||||
}
|
||||
|
||||
if (ctx->odp)
|
||||
{
|
||||
ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND);
|
||||
if (!ctx->mr)
|
||||
{
|
||||
fprintf(stderr, "Couldn't register RDMA memory region\n");
|
||||
fprintf(stderr, "The RDMA device isn't implicit ODP (On-Demand Paging) capable or does not support RC send and receive with ODP\n");
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
|
||||
ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND);
|
||||
if (!ctx->mr)
|
||||
{
|
||||
fprintf(stderr, "Couldn't register RDMA memory region\n");
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
ctx->channel = ibv_create_comp_channel(ctx->context);
|
||||
if (!ctx->channel)
|
||||
{
|
||||
@@ -399,6 +376,7 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
|
||||
ibv_sge sge[rc->max_sge];
|
||||
while (rc->send_pos < cl->send_list.size())
|
||||
{
|
||||
iovec & iov = cl->send_list[rc->send_pos];
|
||||
if (op_size >= rc->max_msg || op_sge >= rc->max_sge)
|
||||
{
|
||||
rc->send_sizes.push_back(op_size);
|
||||
@@ -410,23 +388,12 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
|
||||
break;
|
||||
}
|
||||
}
|
||||
iovec & iov = cl->send_list[rc->send_pos];
|
||||
if (!rdma_context->odp && rc->send_mrs.size() <= rc->send_pos)
|
||||
{
|
||||
ibv_mr *mr = ibv_reg_mr(rdma_context->pd, iov.iov_base, iov.iov_len, IBV_ACCESS_LOCAL_WRITE);
|
||||
if (!mr)
|
||||
{
|
||||
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
|
||||
exit(1);
|
||||
}
|
||||
rc->send_mrs.push_back(mr);
|
||||
}
|
||||
uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->send_buf_pos < rc->max_msg
|
||||
? iov.iov_len-rc->send_buf_pos : rc->max_msg-op_size);
|
||||
sge[op_sge++] = {
|
||||
.addr = (uintptr_t)((uint8_t*)iov.iov_base+rc->send_buf_pos),
|
||||
.length = len,
|
||||
.lkey = rdma_context->odp ? rc->ctx->mr->lkey : rc->send_mrs[rc->send_pos]->lkey,
|
||||
.lkey = rc->ctx->mr->lkey,
|
||||
};
|
||||
op_size += len;
|
||||
rc->send_buf_pos += len;
|
||||
@@ -444,12 +411,12 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
|
||||
return true;
|
||||
}
|
||||
|
||||
static void try_recv_rdma_wr(osd_client_t *cl, msgr_rdma_buf_t b)
|
||||
static void try_recv_rdma_wr(osd_client_t *cl, void *buf)
|
||||
{
|
||||
ibv_sge sge = {
|
||||
.addr = (uintptr_t)b.buf,
|
||||
.addr = (uintptr_t)buf,
|
||||
.length = (uint32_t)cl->rdma_conn->max_msg,
|
||||
.lkey = !b.mr ? cl->rdma_conn->ctx->mr->lkey : b.mr->lkey,
|
||||
.lkey = cl->rdma_conn->ctx->mr->lkey,
|
||||
};
|
||||
ibv_recv_wr *bad_wr = NULL;
|
||||
ibv_recv_wr wr = {
|
||||
@@ -471,19 +438,9 @@ bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
|
||||
auto rc = cl->rdma_conn;
|
||||
while (rc->cur_recv < rc->max_recv)
|
||||
{
|
||||
msgr_rdma_buf_t b;
|
||||
b.buf = malloc_or_die(rc->max_msg);
|
||||
if (!rdma_context->odp)
|
||||
{
|
||||
b.mr = ibv_reg_mr(rdma_context->pd, b.buf, rc->max_msg, IBV_ACCESS_LOCAL_WRITE);
|
||||
if (!b.mr)
|
||||
{
|
||||
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
rc->recv_buffers.push_back(b);
|
||||
try_recv_rdma_wr(cl, b);
|
||||
void *buf = malloc_or_die(rc->max_msg);
|
||||
rc->recv_buffers.push_back(buf);
|
||||
try_recv_rdma_wr(cl, buf);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@@ -535,7 +492,7 @@ void osd_messenger_t::handle_rdma_events()
|
||||
if (!is_send)
|
||||
{
|
||||
rc->cur_recv--;
|
||||
if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf].buf, wc[i].byte_len))
|
||||
if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf], wc[i].byte_len))
|
||||
{
|
||||
// handle_read_buffer may stop the client
|
||||
continue;
|
||||
@@ -570,11 +527,6 @@ void osd_messenger_t::handle_rdma_events()
|
||||
rc->send_pos -= send_pos;
|
||||
for (int i = 0; i < send_pos; i++)
|
||||
{
|
||||
if (!rdma_context->odp)
|
||||
{
|
||||
ibv_dereg_mr(rc->send_mrs[i]);
|
||||
rc->send_mrs[i] = NULL;
|
||||
}
|
||||
if (cl->outbox[i].flags & MSGR_SENDP_FREE)
|
||||
{
|
||||
// Reply fully sent
|
||||
@@ -583,8 +535,6 @@ void osd_messenger_t::handle_rdma_events()
|
||||
}
|
||||
if (send_pos > 0)
|
||||
{
|
||||
if (!rdma_context->odp)
|
||||
rc->send_mrs.erase(rc->send_mrs.begin(), rc->send_mrs.begin()+send_pos);
|
||||
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+send_pos);
|
||||
cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+send_pos);
|
||||
}
|
||||
|
@@ -23,7 +23,6 @@ struct msgr_rdma_context_t
|
||||
ibv_device *dev = NULL;
|
||||
ibv_device_attr_ex attrx;
|
||||
ibv_pd *pd = NULL;
|
||||
bool odp = false;
|
||||
ibv_mr *mr = NULL;
|
||||
ibv_comp_channel *channel = NULL;
|
||||
ibv_cq *cq = NULL;
|
||||
@@ -40,12 +39,6 @@ struct msgr_rdma_context_t
|
||||
~msgr_rdma_context_t();
|
||||
};
|
||||
|
||||
struct msgr_rdma_buf_t
|
||||
{
|
||||
void *buf = NULL;
|
||||
ibv_mr *mr = NULL;
|
||||
};
|
||||
|
||||
struct msgr_rdma_connection_t
|
||||
{
|
||||
msgr_rdma_context_t *ctx = NULL;
|
||||
@@ -57,9 +50,8 @@ struct msgr_rdma_connection_t
|
||||
|
||||
int send_pos = 0, send_buf_pos = 0;
|
||||
int next_recv_buf = 0;
|
||||
std::vector<msgr_rdma_buf_t> recv_buffers;
|
||||
std::vector<void*> recv_buffers;
|
||||
std::vector<uint64_t> send_sizes;
|
||||
std::vector<ibv_mr*> send_mrs;
|
||||
|
||||
~msgr_rdma_connection_t();
|
||||
static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge, uint32_t max_msg);
|
||||
|
@@ -3,6 +3,7 @@
|
||||
|
||||
#define _XOPEN_SOURCE
|
||||
#include <limits.h>
|
||||
#include <sys/epoll.h>
|
||||
|
||||
#include "messenger.h"
|
||||
|
||||
@@ -283,7 +284,14 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
|
||||
fprintf(stderr, "Successfully connected with client %d using RDMA\n", cl->peer_fd);
|
||||
}
|
||||
cl->peer_state = PEER_RDMA;
|
||||
tfd->set_fd_handler(cl->peer_fd, false, NULL);
|
||||
tfd->set_fd_handler(cl->peer_fd, false, [this](int peer_fd, int epoll_events)
|
||||
{
|
||||
// Do not miss the disconnection!
|
||||
if (epoll_events & EPOLLRDHUP)
|
||||
{
|
||||
handle_peer_epoll(peer_fd, epoll_events);
|
||||
}
|
||||
});
|
||||
// Add the initial receive request
|
||||
try_recv_rdma(cl);
|
||||
}
|
||||
|
Reference in New Issue
Block a user