Compare commits

...

2 Commits

Author SHA1 Message Date
Vitaliy Filippov ee99e4abb1 WIP NFS RDMA support
Test / test_rebalance_verify_ec (push) Successful in 1m51s Details
Test / test_rebalance_verify_ec_imm (push) Successful in 1m52s Details
Test / test_write_no_same (push) Successful in 8s Details
Test / test_switch_primary (push) Successful in 33s Details
Test / test_write (push) Successful in 33s Details
Test / test_write_xor (push) Successful in 36s Details
Test / test_heal_pg_size_2 (push) Successful in 2m17s Details
Test / test_heal_ec (push) Successful in 2m17s Details
Test / test_heal_antietcd (push) Successful in 2m18s Details
Test / test_heal_csum_32k_dmj (push) Successful in 2m19s Details
Test / test_heal_csum_32k_dj (push) Successful in 2m20s Details
Test / test_heal_csum_32k (push) Successful in 2m23s Details
Test / test_heal_csum_4k_dmj (push) Successful in 2m21s Details
Test / test_heal_csum_4k_dj (push) Successful in 2m17s Details
Test / test_resize_auto (push) Successful in 10s Details
Test / test_resize (push) Successful in 14s Details
Test / test_snapshot_pool2 (push) Successful in 14s Details
Test / test_osd_tags (push) Successful in 8s Details
Test / test_enospc (push) Successful in 12s Details
Test / test_enospc_xor (push) Successful in 12s Details
Test / test_enospc_imm (push) Successful in 12s Details
Test / test_enospc_imm_xor (push) Successful in 13s Details
Test / test_scrub (push) Successful in 16s Details
Test / test_scrub_zero_osd_2 (push) Successful in 14s Details
Test / test_scrub_xor (push) Successful in 15s Details
Test / test_scrub_pg_size_3 (push) Successful in 17s Details
Test / test_scrub_pg_size_6_pg_minsize_4_osd_count_6_ec (push) Successful in 15s Details
Test / test_scrub_ec (push) Successful in 15s Details
Test / test_nfs (push) Successful in 12s Details
Test / test_heal_csum_4k (push) Successful in 2m18s Details
2024-11-27 12:41:52 +03:00
Vitaliy Filippov 0d01573da3 Fix typos 2024-11-26 14:31:47 +03:00
20 changed files with 1754 additions and 65 deletions

View File

@ -218,5 +218,5 @@ All other client-side components are based on the client library:
- Deleting images in a degraded cluster may currently lead to objects reappearing
after dead OSDs come back, and in case of erasure-coded pools, they may even
reappear as incomplete. Just repeat the removal request again in this case.
This problem will be fixed in the nearest future, the fix is already implemented
in the "epoch-deletions" branch.
This problem will be fixed in the future, along with the metadata disk storage
format update.

View File

@ -24,8 +24,8 @@
Один OSD управляет одним диском (или разделом). OSD общаются с etcd и друг с другом — от etcd они
получают состояние кластера, а друг другу передают запросы записи и чтения вторичных копий данных.
- **etcd** — кластерная key/value база данных, используется для хранения настроек и верхнеуровневого
состояния кластера, а также предотвращения разделения сознания. Блоки данных в etcd не хранятся,
в обработке клиентских запросов чтения и записи etcd не участвует.
состояния кластера, а также предотвращения разделения сознания (splitbrain). Блоки данных в etcd не
хранятся, в обработке клиентских запросов чтения и записи etcd не участвует.
- **Монитор** — отдельный демон на node.js, рассчитывающий необходимые изменения в конфигурацию
кластера, сохраняющий эту информацию в etcd и таким образом командующий OSD применить эти изменения.
Также агрегирует статистику. Контактирует только с etcd, OSD с монитором не общаются.
@ -43,8 +43,8 @@
## Клиентские компоненты
- **Клиентская библиотека** — инкапсулирует логику на стороне клиента. Соединяются с etcd и со всеми OSD,
от etcd получают состояние кластера, команды чтения и записи отправляют на все OSD напрямую.
- **Клиентская библиотека** — инкапсулирует логику на стороне клиента. Соединяется с etcd и со всеми OSD,
от etcd получает состояние кластера, команды чтения и записи отправляет на все OSD напрямую.
В силу архитектуры все отдельные блоки данных (по умолчанию по 128 КБ) располагается на разных
OSD, но клиент устроен так, что всегда точно знает, к какому OSD обращаться, и подключается
к нему напрямую.
@ -227,5 +227,5 @@
- Удаление образов в деградированном кластере может в данный момент приводить к повторному
"появлению" удалённых объектов после поднятия отключённых OSD, причём в случае EC-пулов,
объекты могут появиться в виде "неполных". Если вы столкнётесь с такой ситуацией, просто
повторите запрос удаления. Исправление этой проблемы уже реализовано в ветке "epoch-deletions"
и вскоре будет включено в релиз.
повторите запрос удаления. Данная проблема будет исправлена в будущем вместе с обновлением
дискового формата хранения метаданных.

View File

@ -61,6 +61,10 @@ pkg_check_modules(ISAL libisal)
if (ISAL_LIBRARIES)
add_definitions(-DWITH_ISAL)
endif (ISAL_LIBRARIES)
pkg_check_modules(RDMACM librdmacm)
if (RDMACM_LIBRARIES)
add_definitions(-DWITH_RDMACM)
endif (RDMACM_LIBRARIES)
add_custom_target(build_tests)
add_custom_target(test

View File

@ -5,6 +5,7 @@ project(vitastor)
# vitastor-nfs
add_executable(vitastor-nfs
nfs_proxy.cpp
nfs_proxy_rdma.cpp
nfs_block.cpp
nfs_kv.cpp
nfs_kv_create.cpp
@ -21,8 +22,10 @@ add_executable(vitastor-nfs
nfs_fsstat.cpp
nfs_mount.cpp
nfs_portmap.cpp
rdma_alloc.cpp
../util/sha256.c
proto/xdr_impl.cpp
proto/rpc_rdma_xdr.cpp
proto/rpc_xdr.cpp
proto/portmap_xdr.cpp
proto/nfs_xdr.cpp
@ -30,4 +33,5 @@ add_executable(vitastor-nfs
target_link_libraries(vitastor-nfs
vitastor_client
vitastor_kv
${RDMACM_LIBRARIES}
)

View File

@ -315,8 +315,7 @@ static int block_nfs3_read_proc(void *opaque, rpc_op_t *rop)
if (aligned_count % alignment)
aligned_count = aligned_count + alignment - (aligned_count % alignment);
aligned_count -= aligned_offset;
void *buf = malloc_or_die(aligned_count);
xdr_add_malloc(rop->xdrs, buf);
void *buf = self->malloc_or_rdma(rop, aligned_count);
cluster_op_t *op = new cluster_op_t;
op->opcode = OSD_OP_READ;
op->inode = ino_it->second;

View File

@ -499,6 +499,20 @@ void nfs_proxy_t::check_default_pool()
}
}
nfs_client_t *nfs_proxy_t::create_client()
{
auto cli = new nfs_client_t();
cli->parent = this;
if (kvfs)
nfs_kv_procs(cli);
else
nfs_block_procs(cli);
for (auto & fn: pmap.proc_table)
cli->proc_table.insert(fn);
rpc_clients.insert(cli);
return cli;
}
void nfs_proxy_t::do_accept(int listen_fd)
{
struct sockaddr_storage addr;
@ -512,18 +526,8 @@ void nfs_proxy_t::do_accept(int listen_fd)
fcntl(nfs_fd, F_SETFL, fcntl(nfs_fd, F_GETFL, 0) | O_NONBLOCK);
int one = 1;
setsockopt(nfs_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
auto cli = new nfs_client_t();
if (kvfs)
nfs_kv_procs(cli);
else
nfs_block_procs(cli);
cli->parent = this;
auto cli = this->create_client();
cli->nfs_fd = nfs_fd;
for (auto & fn: pmap.proc_table)
{
cli->proc_table.insert(fn);
}
rpc_clients[nfs_fd] = cli;
epmgr->tfd->set_fd_handler(nfs_fd, true, [cli](int nfs_fd, int epoll_events)
{
// Handle incoming event
@ -780,11 +784,17 @@ void nfs_client_t::stop()
stopped = true;
if (refs <= 0)
{
#ifdef WITH_RDMACM
destroy_rdma_conn();
#endif
auto parent = this->parent;
parent->rpc_clients.erase(nfs_fd);
parent->rpc_clients.erase(this);
parent->active_connections--;
parent->epmgr->tfd->set_fd_handler(nfs_fd, true, NULL);
close(nfs_fd);
if (nfs_fd >= 0)
{
parent->epmgr->tfd->set_fd_handler(nfs_fd, true, NULL);
close(nfs_fd);
}
delete this;
parent->check_exit();
}
@ -876,8 +886,12 @@ void nfs_client_t::handle_send(int result)
void rpc_queue_reply(rpc_op_t *rop)
{
nfs_client_t *self = (nfs_client_t*)rop->client;
iovec *iov_list = NULL;
unsigned iov_count = 0;
#ifdef WITH_RDMACM
if (self->rdma_conn)
{
self->rdma_encode_header(rop);
}
#endif
int r = xdr_encode(rop->xdrs, (xdrproc_t)xdr_rpc_msg, &rop->out_msg);
assert(r);
if (rop->reply_fn != NULL)
@ -885,25 +899,38 @@ void rpc_queue_reply(rpc_op_t *rop)
r = xdr_encode(rop->xdrs, rop->reply_fn, rop->reply);
assert(r);
}
xdr_encode_finish(rop->xdrs, &iov_list, &iov_count);
assert(iov_count > 0);
rop->reply_marker = 0;
for (unsigned i = 0; i < iov_count; i++)
#ifdef WITH_RDMACM
if (!self->rdma_conn)
#endif
{
rop->reply_marker += iov_list[i].iov_len;
}
rop->reply_marker = htobe32(rop->reply_marker | 0x80000000);
auto & to_send_list = self->write_msg.msg_iovlen ? self->next_send_list : self->send_list;
auto & to_outbox = self->write_msg.msg_iovlen ? self->next_outbox : self->outbox;
to_send_list.push_back((iovec){ .iov_base = &rop->reply_marker, .iov_len = 4 });
to_outbox.push_back(NULL);
for (unsigned i = 0; i < iov_count; i++)
{
to_send_list.push_back(iov_list[i]);
iovec *iov_list = NULL;
unsigned iov_count = 0;
xdr_encode_finish(rop->xdrs, &iov_list, &iov_count);
assert(iov_count > 0);
rop->reply_marker = 0;
for (unsigned i = 0; i < iov_count; i++)
{
rop->reply_marker += iov_list[i].iov_len;
}
rop->reply_marker = htobe32(rop->reply_marker | 0x80000000);
auto & to_send_list = self->write_msg.msg_iovlen ? self->next_send_list : self->send_list;
auto & to_outbox = self->write_msg.msg_iovlen ? self->next_outbox : self->outbox;
to_send_list.push_back((iovec){ .iov_base = &rop->reply_marker, .iov_len = 4 });
to_outbox.push_back(NULL);
for (unsigned i = 0; i < iov_count; i++)
{
to_send_list.push_back(iov_list[i]);
to_outbox.push_back(NULL);
}
to_outbox[to_outbox.size()-1] = rop;
self->submit_send();
}
to_outbox[to_outbox.size()-1] = rop;
self->submit_send();
#ifdef WITH_RDMACM
else
{
self->rdma_queue_reply(rop);
}
#endif
}
int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg_len)
@ -968,6 +995,18 @@ int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg
// Incoming buffer isn't needed to handle request, so return 0
return 0;
}
auto rop = create_rpc_op(xdrs, inmsg, NULL);
if (!rop)
{
// No such procedure
return 0;
}
rop->buffer = (uint8_t*)base_buf;
return handle_rpc_op(rop);
}
rpc_op_t *nfs_client_t::create_rpc_op(XDR *xdrs, rpc_msg *inmsg, rdma_msg *rmsg)
{
// Find decoder for the request
auto proc_it = proc_table.find((rpc_service_proc_t){
.prog = inmsg->body.cbody.prog,
@ -1019,7 +1058,7 @@ int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg
};
rpc_queue_reply(rop);
// Incoming buffer isn't needed to handle request, so return 0
return 0;
return NULL;
}
// Allocate memory
rpc_op_t *rop = (rpc_op_t*)malloc_or_die(
@ -1028,7 +1067,6 @@ int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg
rpc_reply_stat x = RPC_MSG_ACCEPTED;
*rop = (rpc_op_t){
.client = this,
.buffer = (uint8_t*)base_buf,
.xdrs = xdrs,
.out_msg = (rpc_msg){
.xid = inmsg->xid,
@ -1045,10 +1083,25 @@ int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg
.request = ((uint8_t*)rop) + sizeof(rpc_op_t),
.reply = ((uint8_t*)rop) + sizeof(rpc_op_t) + proc_it->req_size,
};
// FIXME: malloc and avoid copy?
memcpy(&rop->in_msg, inmsg, sizeof(rpc_msg));
if (rmsg)
{
memcpy(&rop->in_rdma_msg, rmsg, sizeof(rdma_msg));
}
return rop;
}
int nfs_client_t::handle_rpc_op(rpc_op_t *rop)
{
// Try to decode the request
// req_fn may be NULL, that means function has no arguments
if (proc_it->req_fn && !proc_it->req_fn(xdrs, rop->request))
auto proc_it = proc_table.find((rpc_service_proc_t){
.prog = rop->in_msg.body.cbody.prog,
.vers = rop->in_msg.body.cbody.vers,
.proc = rop->in_msg.body.cbody.proc,
});
if (proc_it == proc_table.end() || proc_it->req_fn && !proc_it->req_fn(rop->xdrs, rop->request))
{
// Invalid request
rop->out_msg.body.rbody.areply.reply_data.stat = RPC_GARBAGE_ARGS;
@ -1063,13 +1116,30 @@ int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg
return ref;
}
void *nfs_client_t::malloc_or_rdma(rpc_op_t *rop, size_t size)
{
#ifdef WITH_RDMACM
if (!rdma_conn)
{
#endif
void *buf = malloc_or_die(size);
xdr_add_malloc(rop->xdrs, buf);
return buf;
#ifdef WITH_RDMACM
}
void *buf = rdma_malloc(size);
xdr_set_rdma_chunk(rop->xdrs, buf);
return buf;
#endif
}
void nfs_proxy_t::daemonize()
{
// Stop all clients because client I/O sometimes breaks during daemonize
// I.e. the new process stops receiving events on the old FD
// It doesn't happen if we call sleep(1) here, but we don't want to call sleep(1)...
for (auto & clp: rpc_clients)
clp.second->stop();
for (auto & cli: rpc_clients)
cli->stop();
if (fork())
exit(0);
setsid();

View File

@ -22,6 +22,7 @@ class cli_tool_t;
struct kv_fs_state_t;
struct block_fs_state_t;
class nfs_client_t;
struct nfs_rdma_context_t;
class nfs_proxy_t
{
@ -55,7 +56,7 @@ public:
vitastorkv_dbw_t *db = NULL;
kv_fs_state_t *kvfs = NULL;
block_fs_state_t *blockfs = NULL;
std::map<int, nfs_client_t*> rpc_clients;
std::set<nfs_client_t*> rpc_clients;
std::vector<XDR*> xdr_pool;
@ -72,12 +73,15 @@ public:
void watch_stats();
void parse_stats(etcd_kv_t & kv);
void check_default_pool();
nfs_client_t* create_client();
void do_accept(int listen_fd);
void daemonize();
void write_pid();
void mount_fs();
void check_already_mounted();
void check_exit();
nfs_rdma_context_t* create_rdma(const std::string & bind_address, int rdmacm_port);
};
struct rpc_cur_buffer_t
@ -101,15 +105,20 @@ struct rpc_free_buffer_t
unsigned size;
};
struct nfs_rdma_conn_t;
class nfs_client_t
{
public:
nfs_proxy_t *parent = NULL;
int nfs_fd;
int epoll_events = 0;
int refs = 0;
bool stopped = false;
std::set<rpc_service_proc_t> proc_table;
nfs_rdma_conn_t *rdma_conn = NULL;
// <TCP>
int nfs_fd = -1;
int epoll_events = 0;
// Read state
rpc_cur_buffer_t cur_buffer = { 0 };
@ -130,7 +139,15 @@ public:
void submit_send();
void handle_send(int result);
int handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg_len);
// </TCP>
rpc_op_t *create_rpc_op(XDR *xdrs, rpc_msg *inmsg, rdma_msg *rmsg);
int handle_rpc_op(rpc_op_t *rop);
bool deref();
void stop();
void *malloc_or_rdma(rpc_op_t *rop, size_t size);
void *rdma_malloc(size_t size);
void rdma_encode_header(rpc_op_t *rop);
void rdma_queue_reply(rpc_op_t *rop);
void destroy_rdma_conn();
};

808
src/nfs/nfs_proxy_rdma.cpp Normal file
View File

@ -0,0 +1,808 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
//
// NFS RDMA support
#ifdef WITH_RDMACM
#define _XOPEN_SOURCE
#include <rdma/rdma_cma.h>
#include <fcntl.h>
#include "addr_util.h"
#include "proto/nfs.h"
#include "proto/rpc.h"
#include "proto/rpc_rdma.h"
#include "nfs_proxy.h"
#include "rdma_alloc.h"
#define NFS_RDMACM_PRIVATE_DATA_MAGIC_LE 0x180eabf6
struct __attribute__((__packed__)) nfs_rdmacm_private
{
uint32_t format_identifier; // magic, should be 0xf6ab0e18 in big endian
uint8_t version; // version, 1
uint8_t remote_invalidate; // remote invalidation flag (1 or 0)
uint8_t max_send_size; // maximum RDMA Send operation size / 1024 - 1 (i.e. 0 is 1 KB, 255 is 256 KB)
uint8_t max_recv_size; // maximum RDMA Receive operation size / 1024 - 1 (i.e. 0 is 1 KB, 255 is 256 KB)
};
struct nfs_rdma_buf_t
{
void *buf = NULL;
size_t len = 0;
ibv_mr *mr = NULL;
};
struct nfs_rdma_conn_t;
struct nfs_rdma_context_t
{
std::string bind_address;
int rdmacm_port = 0;
uint32_t max_iodepth = 16;
uint64_t rdma_malloc_round_to = 1048576, rdma_max_unused_buffers = 500*1048576;
uint64_t max_send_size = 256*1024, max_recv_size = 256*1024;
nfs_proxy_t *proxy = NULL;
epoll_manager_t *epmgr = NULL;
int max_cqe = 0, used_max_cqe = 0;
rdma_event_channel *rdmacm_evch = NULL;
rdma_cm_id *listener_id = NULL;
ibv_comp_channel *channel = NULL;
ibv_cq *cq = NULL;
rdma_allocator_t *alloc = NULL;
std::map<rdma_cm_id*, nfs_rdma_conn_t*> rdma_connections;
std::map<uint32_t, nfs_rdma_conn_t*> rdma_connections_by_qp;
~nfs_rdma_context_t();
void handle_io();
void handle_rdmacm_events();
void rdmacm_accept(rdma_cm_event *ev);
void rdmacm_established(rdma_cm_event *ev);
};
struct nfs_rdma_conn_t
{
nfs_rdma_context_t *ctx = NULL;
nfs_client_t *client = NULL;
rdma_cm_id *id = NULL;
int max_send_size = 256*1024, max_recv_size = 256*1024;
int remote_max_send_size = 1024, remote_max_recv_size = 1024;
bool remote_invalidate = false;
bool established = false;
uint32_t cur_credit = 16;
std::vector<nfs_rdma_buf_t> recv_buffers;
std::map<void*, nfs_rdma_buf_t> used_buffers;
int next_recv_buf = 0;
std::vector<nfs_rdma_buf_t> send_buffers;
std::vector<rpc_op_t*> outbox;
std::vector<rpc_op_t*> chunk_inbox;
int outbox_pos = 0;
void post_initial_receives();
~nfs_rdma_conn_t();
nfs_rdma_buf_t create_buf(size_t len);
void post_recv(nfs_rdma_buf_t b);
void post_send();
bool handle_recv(void *buf, size_t len);
void free_rdma_rpc_op(rpc_op_t *rop);
};
nfs_rdma_context_t* nfs_proxy_t::create_rdma(const std::string & bind_address, int rdmacm_port)
{
nfs_rdma_context_t* self = new nfs_rdma_context_t;
self->proxy = this;
self->epmgr = epmgr;
self->bind_address = bind_address;
self->rdmacm_port = rdmacm_port;
self->rdmacm_evch = rdma_create_event_channel();
if (!self->rdmacm_evch)
{
fprintf(stderr, "Failed to initialize RDMA-CM event channel: %s (code %d)\n", strerror(errno), errno);
delete self;
return NULL;
}
fcntl(self->rdmacm_evch->fd, F_SETFL, fcntl(self->rdmacm_evch->fd, F_GETFL, 0) | O_NONBLOCK);
epmgr->tfd->set_fd_handler(self->rdmacm_evch->fd, false, [self](int rdmacm_eventfd, int epoll_events)
{
self->handle_rdmacm_events();
});
int r = rdma_create_id(self->rdmacm_evch, &self->listener_id, NULL, RDMA_PS_TCP);
if (r != 0)
{
fprintf(stderr, "Failed to create RDMA-CM ID: %s (code %d)\n", strerror(errno), errno);
delete self;
return NULL;
}
sockaddr_storage addr;
if (!string_to_addr(bind_address, 0, rdmacm_port, &addr))
{
fprintf(stderr, "Server address: %s is not valid\n", bind_address.c_str());
delete self;
return NULL;
}
r = rdma_bind_addr(self->listener_id, (sockaddr*)&addr);
if (r != 0)
{
fprintf(stderr, "Failed to bind RDMA-CM to %s:%d: %s (code %d)\n", bind_address.c_str(), rdmacm_port, strerror(errno), errno);
delete self;
return NULL;
}
r = rdma_listen(self->listener_id, 128);
if (r != 0)
{
fprintf(stderr, "Failed to listen RDMA-CM: %s (code %d)\n", strerror(errno), errno);
delete self;
return NULL;
}
self->channel = ibv_create_comp_channel(self->listener_id->verbs);
if (!self->channel)
{
fprintf(stderr, "Couldn't create RDMA completion channel\n");
delete self;
return NULL;
}
fcntl(self->channel->fd, F_SETFL, fcntl(self->channel->fd, F_GETFL, 0) | O_NONBLOCK);
epmgr->tfd->set_fd_handler(self->channel->fd, false, [self](int channel_eventfd, int epoll_events)
{
self->handle_io();
});
self->max_cqe = 4096;
self->cq = ibv_create_cq(self->listener_id->verbs, self->max_cqe, NULL, self->channel, 0);
if (!self->cq)
{
fprintf(stderr, "Couldn't create RDMA completion queue\n");
delete self;
return NULL;
}
self->alloc = rdma_malloc_create(self->listener_id->pd, self->rdma_malloc_round_to, self->rdma_max_unused_buffers, IBV_ACCESS_LOCAL_WRITE);
return self;
}
nfs_rdma_context_t::~nfs_rdma_context_t()
{
if (listener_id)
{
int r = rdma_destroy_id(listener_id);
if (r != 0)
fprintf(stderr, "Failed to destroy RDMA-CM ID: %s (code %d)\n", strerror(errno), errno);
else
listener_id = NULL;
}
if (rdmacm_evch)
{
epmgr->tfd->set_fd_handler(rdmacm_evch->fd, false, NULL);
rdma_destroy_event_channel(rdmacm_evch);
rdmacm_evch = NULL;
}
if (cq)
{
ibv_destroy_cq(cq);
cq = NULL;
}
if (channel)
{
ibv_destroy_comp_channel(channel);
channel = NULL;
}
if (alloc)
{
rdma_malloc_destroy(alloc);
alloc = NULL;
}
//if (mr)
// ibv_dereg_mr(mr);
//if (pd)
// ibv_dealloc_pd(pd);
//if (context)
// ibv_close_device(context);
}
void nfs_rdma_context_t::handle_rdmacm_events()
{
rdma_cm_event *ev = NULL;
while (1)
{
int r = rdma_get_cm_event(rdmacm_evch, &ev);
if (r != 0)
{
if (errno == EAGAIN || errno == EINTR)
break;
fprintf(stderr, "Failed to get RDMA-CM event: %s (code %d)\n", strerror(errno), errno);
exit(1);
}
if (ev->event == RDMA_CM_EVENT_CONNECT_REQUEST)
{
rdmacm_accept(ev);
}
else if (ev->event == RDMA_CM_EVENT_CONNECT_ERROR ||
ev->event == RDMA_CM_EVENT_REJECTED ||
ev->event == RDMA_CM_EVENT_DISCONNECTED ||
ev->event == RDMA_CM_EVENT_DEVICE_REMOVAL)
{
auto event_type_name = ev->event == RDMA_CM_EVENT_CONNECT_ERROR ? "RDMA_CM_EVENT_CONNECT_ERROR" : (
ev->event == RDMA_CM_EVENT_REJECTED ? "RDMA_CM_EVENT_REJECTED" : (
ev->event == RDMA_CM_EVENT_DISCONNECTED ? "RDMA_CM_EVENT_DISCONNECTED" : "RDMA_CM_EVENT_DEVICE_REMOVAL"));
auto conn_it = rdma_connections.find(ev->id);
if (conn_it == rdma_connections.end())
{
fprintf(stderr, "Received %s event for an unknown connection 0x%lx - ignoring\n",
event_type_name, (uint64_t)ev->id);
}
else
{
fprintf(stderr, "Received %s event for connection 0x%lx - closing it\n",
event_type_name, (uint64_t)ev->id);
auto conn = conn_it->second;
conn->client->stop();
}
}
else if (ev->event == RDMA_CM_EVENT_ESTABLISHED)
{
rdmacm_established(ev);
}
else if (ev->event == RDMA_CM_EVENT_ADDR_CHANGE || ev->event == RDMA_CM_EVENT_TIMEWAIT_EXIT)
{
// Do nothing
}
else
{
// Other events are unexpected
fprintf(stderr, "Unexpected RDMA-CM event type: %d\n", ev->event);
}
r = rdma_ack_cm_event(ev);
if (r != 0)
{
fprintf(stderr, "Failed to ack (free) RDMA-CM event: %s (code %d)\n", strerror(errno), errno);
exit(1);
}
}
}
void nfs_rdma_context_t::rdmacm_accept(rdma_cm_event *ev)
{
this->used_max_cqe += max_iodepth*2;
if (this->used_max_cqe > this->max_cqe)
{
// Resize CQ
int new_max_cqe = this->max_cqe;
while (this->used_max_cqe > new_max_cqe)
{
new_max_cqe *= 2;
}
if (ibv_resize_cq(this->cq, new_max_cqe) != 0)
{
fprintf(stderr, "Couldn't resize RDMA completion queue to %d entries\n", new_max_cqe);
return;
}
this->max_cqe = new_max_cqe;
}
ibv_qp_init_attr init_attr = {
.send_cq = this->cq,
.recv_cq = this->cq,
.cap = {
// each op at each moment takes 1 RDMA_RECV or 1 RDMA_READ or 1 RDMA_WRITE + 1 RDMA_SEND
.max_send_wr = max_iodepth*2,
.max_recv_wr = max_iodepth,
.max_send_sge = 1, // we don't need S/G currently
.max_recv_sge = 1,
},
.qp_type = IBV_QPT_RC,
};
int r = rdma_create_qp(ev->id, NULL, &init_attr);
if (r != 0)
{
fprintf(stderr, "Failed to create a queue pair via RDMA-CM: %s (code %d)\n", strerror(errno), errno);
exit(1);
}
nfs_rdmacm_private private_data = {
.format_identifier = NFS_RDMACM_PRIVATE_DATA_MAGIC_LE,
.version = 1,
.remote_invalidate = 0, // FIXME what is remote_invalidate?
.max_send_size = (uint8_t)(max_send_size <= 256*1024 ? max_send_size/1024 - 1 : 255),
.max_recv_size = (uint8_t)(max_recv_size <= 256*1024 ? max_recv_size/1024 - 1 : 255),
};
rdma_conn_param conn_params = {
.private_data = &private_data,
.private_data_len = sizeof(private_data),
//.responder_resources = max_qp_rd_atom of the device?,
//.initiator_depth = max_qp_init_rd_atom of the device?,
.rnr_retry_count = 7,
//.qp_num = manually created QP number?,
};
r = rdma_accept(ev->id, &conn_params);
if (r != 0)
{
fprintf(stderr, "Failed to accept RDMA-CM connection: %s (code %d)\n", strerror(errno), errno);
rdma_destroy_qp(ev->id);
rdma_destroy_id(ev->id);
}
else
{
auto conn = new nfs_rdma_conn_t();
conn->ctx = this;
conn->id = ev->id;
conn->cur_credit = max_iodepth;
conn->max_send_size = max_send_size;
conn->max_recv_size = max_recv_size;
rdma_connections[ev->id] = conn;
rdma_connections_by_qp[conn->id->qp->qp_num] = conn;
auto cli = this->proxy->create_client();
conn->client = cli;
cli->rdma_conn = conn;
}
}
nfs_rdma_conn_t::~nfs_rdma_conn_t()
{
if (id)
{
ctx->rdma_connections.erase(id);
if (id->qp)
{
ctx->rdma_connections_by_qp.erase(id->qp->qp_num);
rdma_destroy_qp(id);
}
rdma_destroy_id(id);
}
}
void nfs_rdma_context_t::rdmacm_established(rdma_cm_event *ev)
{
auto conn_it = rdma_connections.find(ev->id);
if (conn_it == rdma_connections.end())
{
fprintf(stderr, "Received RDMA_CM_EVENT_ESTABLISHED event for an unknown connection 0x%lx - ignoring\n", (uint64_t)ev->id);
return;
}
fprintf(stderr, "Received RDMA_CM_EVENT_ESTABLISHED event for connection 0x%lx - connection established\n", (uint64_t)ev->id);
auto conn = conn_it->second;
conn->established = true;
// Handle NFS private_data
if (ev->param.ud.private_data_len >= sizeof(nfs_rdmacm_private))
{
nfs_rdmacm_private *private_data = (nfs_rdmacm_private *)ev->param.ud.private_data;
if (private_data->format_identifier == NFS_RDMACM_PRIVATE_DATA_MAGIC_LE &&
private_data->version == 1)
{
conn->remote_invalidate = private_data->remote_invalidate;
conn->remote_max_send_size = (private_data->max_send_size+1) * 1024;
conn->remote_max_recv_size = (private_data->max_recv_size+1) * 1024;
if (conn->remote_max_recv_size < conn->max_send_size)
conn->max_send_size = conn->remote_max_recv_size;
}
}
// Post initial receive requests
conn->post_initial_receives();
}
void nfs_rdma_conn_t::post_initial_receives()
{
for (int i = 0; i < cur_credit; i++)
{
auto b = create_buf(max_recv_size);
recv_buffers.push_back(b);
post_recv(b);
}
}
nfs_rdma_buf_t nfs_rdma_conn_t::create_buf(size_t len)
{
nfs_rdma_buf_t b;
b.buf = malloc_or_die(len);
b.len = len;
b.mr = ibv_reg_mr(id->pd, b.buf, len, IBV_ACCESS_LOCAL_WRITE);
if (!b.mr)
{
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
exit(1);
}
return b;
}
void nfs_rdma_conn_t::post_recv(nfs_rdma_buf_t b)
{
if (client->stopped)
{
return;
}
ibv_sge sge = {
.addr = (uintptr_t)b.buf,
.length = (uint32_t)b.len,
.lkey = b.mr->lkey,
};
ibv_recv_wr *bad_wr = NULL;
ibv_recv_wr wr = {
.wr_id = 1, // 1 is any read, 2 is any write :)
.sg_list = &sge,
.num_sge = 1,
};
int err = ibv_post_recv(id->qp, &wr, &bad_wr);
if (err || bad_wr)
{
fprintf(stderr, "RDMA receive failed: %s\n", strerror(err));
exit(1);
}
}
void nfs_client_t::rdma_encode_header(rpc_op_t *rop)
{
rdma_msg outrmsg = {
.rdma_xid = rop->in_rdma_msg.rdma_xid,
.rdma_vers = rop->in_rdma_msg.rdma_vers,
.rdma_credit = rdma_conn->cur_credit,
.rdma_body = {
.proc = rop->rdma_error ? RDMA_ERROR : RDMA_MSG,
},
};
if (rop->rdma_error)
{
outrmsg.rdma_body.rdma_error.err = rop->rdma_error;
if (rop->rdma_error == ERR_VERS)
outrmsg.rdma_body.rdma_error.range = (rpc_rdma_errvers){ 1, 1 };
}
else
{
outrmsg.rdma_body.rdma_msg = {};
}
int r = xdr_encode(rop->xdrs, (xdrproc_t)xdr_rdma_msg, &outrmsg);
assert(r);
}
void nfs_client_t::rdma_queue_reply(rpc_op_t *rop)
{
rdma_conn->outbox.push_back(rop);
rdma_conn->post_send();
}
void nfs_rdma_conn_t::post_send()
{
again:
while (outbox.size() > outbox_pos)
{
auto rop = outbox[outbox_pos];
nfs_rdma_buf_t b;
if (send_buffers.size())
{
b = send_buffers.back();
send_buffers.pop_back();
}
else
{
b = create_buf(max_send_size);
}
iovec *chunk_iov = NULL;
iovec *iov_list = NULL;
unsigned iov_count = 0;
xdr_encode_finish(rop->xdrs, &iov_list, &iov_count);
assert(iov_count > 0);
if (!rop->rdma_error)
{
// READ3resok and READLINK3resok - extract last byte buffer from iovecs and send it in a "write chunk"
if (rop->in_msg.body.cbody.prog == NFS_PROGRAM &&
(rop->in_msg.body.cbody.proc == NFS3_READ && ((READ3res*)rop->reply)->status == NFS3_OK ||
rop->in_msg.body.cbody.proc == NFS3_READLINK && ((READLINK3res*)rop->reply)->status == NFS3_OK))
{
assert(iov_count > 1);
iov_count--;
chunk_iov = &iov_list[iov_count];
}
}
// FIXME: Maybe avoid extra copy? To do that we have to initially encode into nfs_rdma_buf_t
size_t pos = 0;
for (unsigned i = 0; i < iov_count; i++)
{
if (pos + iov_list[i].iov_len > b.len)
{
// Message exceeds buffer - stop the client
auto cli = ((nfs_client_t*)rop->client);
cli->stop();
outbox.erase(outbox.begin()+outbox_pos, outbox.begin()+outbox_pos+1);
cli->rdma_conn->free_rdma_rpc_op(rop);
goto again;
}
memcpy(b.buf + pos, iov_list[i].iov_base, iov_list[i].iov_len);
pos += iov_list[i].iov_len;
}
ibv_sge chunk_sge;
ibv_send_wr chunk_wr;
ibv_sge sge = {
.addr = (uintptr_t)b.buf,
.length = (uint32_t)pos,
.lkey = b.mr->lkey,
};
ibv_send_wr *bad_wr = NULL;
ibv_send_wr wr = {
.wr_id = 2, // 2 is send
.sg_list = &sge,
.num_sge = 1,
.opcode = IBV_WR_SEND,
.send_flags = IBV_SEND_SIGNALED,
};
ibv_send_wr *send_wr = &wr;
if (chunk_iov != NULL)
{
auto & wr_chunk = *rop->in_rdma_msg.rdma_body.rdma_msg.rdma_writes->entry.target.target_val;
chunk_sge = {
.addr = (uintptr_t)chunk_iov->iov_base,
.length = (uint32_t)chunk_iov->iov_len,
.lkey = rdma_malloc_get_lkey(ctx->alloc, chunk_iov->iov_base),
};
chunk_wr = {
.wr_id = 4, // 4 is chunk write
.sg_list = &chunk_sge,
.num_sge = 1,
.opcode = IBV_WR_RDMA_WRITE,
.wr = {
.rdma = {
.remote_addr = wr_chunk.offset,
.rkey = wr_chunk.handle,
},
},
};
// send chunk_wr first, then normal wr
chunk_wr.next = &wr;
send_wr = &chunk_wr;
}
int err = ibv_post_send(id->qp, send_wr, &bad_wr);
if (err || bad_wr)
{
fprintf(stderr, "RDMA send failed: %s\n", strerror(err));
exit(1);
}
outbox_pos++;
}
}
#define RDMA_EVENTS_AT_ONCE 32
void nfs_rdma_context_t::handle_io()
{
// Request next notification
ibv_cq *ev_cq;
void *ev_ctx;
// FIXME: This is inefficient as it calls read()...
if (ibv_get_cq_event(channel, &ev_cq, &ev_ctx) == 0)
{
ibv_ack_cq_events(cq, 1);
}
if (ibv_req_notify_cq(cq, 0) != 0)
{
fprintf(stderr, "Failed to request RDMA completion notification, exiting\n");
exit(1);
}
ibv_wc wc[RDMA_EVENTS_AT_ONCE];
int event_count;
do
{
event_count = ibv_poll_cq(cq, RDMA_EVENTS_AT_ONCE, wc);
for (int i = 0; i < event_count; i++)
{
auto conn_it = rdma_connections_by_qp.find(wc[i].qp_num);
if (conn_it == rdma_connections_by_qp.end())
{
continue;
}
auto conn = conn_it->second;
if (wc[i].status != IBV_WC_SUCCESS)
{
fprintf(stderr, "RDMA work request failed for queue %d with status: %s, stopping client\n", wc[i].qp_num, ibv_wc_status_str(wc[i].status));
conn->client->stop();
// but continue to handle events to purge the queue
}
if (wc[i].wr_id == 1)
{
// 1 = receive
auto & b = conn->recv_buffers[conn->next_recv_buf];
bool is_continued = false;
if (conn->cur_credit > 0 && !conn->client->stopped)
{
// Increase client refcount while the RPC call is being processed
conn->client->refs++;
conn->cur_credit--;
is_continued = conn->handle_recv(b.buf, wc[i].byte_len);
}
else
{
fprintf(stderr, "Warning: NFS client credit exceeded for queue %d, stopping client\n", wc[i].qp_num);
conn->client->stop();
continue;
}
if (is_continued)
{
// Buffer is required to handle request
// Due to the credit-based flow control in RPC-RDMA, we can just remove that buffer and reuse it later
conn->used_buffers[b.buf] = b;
conn->recv_buffers.erase(conn->recv_buffers.begin()+conn->next_recv_buf, conn->recv_buffers.begin()+conn->next_recv_buf+1);
}
else
{
// Buffer is not required to handle request and can be reused immediately
conn->post_recv(b);
conn->next_recv_buf = (conn->next_recv_buf+1) % conn->recv_buffers.size();
}
}
else if (wc[i].wr_id == 2)
{
// 2 = send
auto rop = conn->outbox[0];
conn->outbox.erase(conn->outbox.begin(), conn->outbox.begin()+1);
conn->outbox_pos--;
// Free rpc_op
conn->free_rdma_rpc_op(rop);
}
else if (wc[i].wr_id == 3)
{
// 3 = chunk read
auto rop = conn->chunk_inbox[0];
conn->chunk_inbox.erase(conn->chunk_inbox.begin(), conn->chunk_inbox.begin()+1);
int ref = conn->client->handle_rpc_op(rop);
if (rop->buffer && !ref)
{
// Request is handled, reuse the buffer
auto & ub = conn->used_buffers.at(rop->buffer);
conn->recv_buffers.push_back(ub);
conn->post_recv(ub);
}
}
}
} while (event_count > 0);
}
void nfs_rdma_conn_t::free_rdma_rpc_op(rpc_op_t *rop)
{
xdr_reset(rop->xdrs);
ctx->proxy->xdr_pool.push_back(rop->xdrs);
if (rop->buffer && rop->referenced)
{
// Reuse the buffer
auto & ub = used_buffers.at(rop->buffer);
recv_buffers.push_back(ub);
post_recv(ub);
}
auto rdma_chunk = xdr_get_rdma_chunk(rop->xdrs);
if (rdma_chunk)
{
rdma_malloc_free(ctx->alloc, rdma_chunk);
xdr_set_rdma_chunk(rop->xdrs, NULL);
}
free(rop);
cur_credit++;
client->deref();
}
// returns false if handling is done, returns true if handling is continued asynchronously
bool nfs_rdma_conn_t::handle_recv(void *buf, size_t len)
{
// Take an XDR object from the pool
XDR *xdrs;
if (ctx->proxy->xdr_pool.size())
{
xdrs = ctx->proxy->xdr_pool.back();
ctx->proxy->xdr_pool.pop_back();
}
else
{
xdrs = xdr_create();
}
xdr_set_rdma(xdrs);
// Decode the RDMA-RPC header
rdma_msg rmsg;
if (!xdr_decode(xdrs, buf, len, (xdrproc_t)xdr_rdma_msg, &rmsg))
{
// Invalid message, ignore it
xdr_reset(xdrs);
ctx->proxy->xdr_pool.push_back(xdrs);
return 0;
}
if (rmsg.rdma_vers != 1 || rmsg.rdma_body.proc != RDMA_MSG /*&& rmsg.rdma_body.proc != RDMA_NOMSG*/)
{
// Bad RDMA-RPC version or message type
rpc_op_t *rop = (rpc_op_t*)malloc_or_die(sizeof(rpc_op_t));
*rop = (rpc_op_t){
.client = this,
.xdrs = xdrs,
.in_msg = {
.xid = rmsg.rdma_xid,
},
.in_rdma_msg = rmsg,
.rdma_error = rmsg.rdma_vers != 1 ? ERR_VERS : ERR_CHUNK,
};
rpc_queue_reply(rop);
// Incoming buffer isn't needed to handle request, so return 0
return 0;
}
rpc_msg inmsg = { .xid = rmsg.rdma_xid };
if (!xdr_rpc_msg_body(xdrs, &inmsg.body) || inmsg.body.dir != RPC_CALL)
{
// Invalid message, ignore it
xdr_reset(xdrs);
ctx->proxy->xdr_pool.push_back(xdrs);
return 0;
}
rpc_op_t *rop = client->create_rpc_op(xdrs, &inmsg, &rmsg);
if (!rop)
{
// No such procedure
return 0;
}
if (inmsg.body.cbody.prog == NFS_PROGRAM && (
// Check that exactly 1 read chunk is provided for WRITE3 and SYMLINK3, and only for them
(
(inmsg.body.cbody.proc == NFS3_WRITE || inmsg.body.cbody.proc == NFS3_SYMLINK)
? (!rmsg.rdma_body.rdma_msg.rdma_reads || rmsg.rdma_body.rdma_msg.rdma_reads->next)
: (!!rmsg.rdma_body.rdma_msg.rdma_reads)
) ||
// Check that exactly 1 write chunk is provided for READ3 and READLINK3, and only for them
(
(inmsg.body.cbody.proc == NFS3_READ || inmsg.body.cbody.proc == NFS3_READLINK)
? (!rop->in_rdma_msg.rdma_body.rdma_msg.rdma_writes ||
rop->in_rdma_msg.rdma_body.rdma_msg.rdma_writes->next ||
rop->in_rdma_msg.rdma_body.rdma_msg.rdma_writes->entry.target.target_len != 1)
: (!!rop->in_rdma_msg.rdma_body.rdma_msg.rdma_writes)
)))
{
rop->rdma_error = ERR_CHUNK;
rpc_queue_reply(rop);
return 0;
}
// Set rop->buffer only when we're sure that we return 1
rop->buffer = (uint8_t*)buf;
// Read that chunk
if (inmsg.body.cbody.prog == NFS_PROGRAM && inmsg.body.cbody.proc == NFS3_WRITE)
{
auto & rd_chunk = rmsg.rdma_body.rdma_msg.rdma_reads->entry.target;
void *buf = rdma_malloc_alloc(ctx->alloc, rd_chunk.length);
ibv_sge chunk_sge = {
.addr = (uintptr_t)buf,
.length = rd_chunk.length,
.lkey = rdma_malloc_get_lkey(ctx->alloc, buf),
};
ibv_send_wr *bad_wr = NULL;
ibv_send_wr wr = {
.wr_id = 3, // 3 is chunk read
.sg_list = &chunk_sge,
.num_sge = 1,
.opcode = IBV_WR_RDMA_READ,
.wr = {
.rdma = {
.remote_addr = rd_chunk.offset,
.rkey = rd_chunk.handle,
},
},
};
int err = ibv_post_send(id->qp, &wr, &bad_wr);
if (err || bad_wr)
{
fprintf(stderr, "RDMA receive failed: %s\n", strerror(err));
exit(1);
}
xdr_set_rdma_chunk(rop->xdrs, buf);
rop->referenced = 1;
chunk_inbox.push_back(rop);
return 1;
}
return client->handle_rpc_op(rop);
}
void *nfs_client_t::rdma_malloc(size_t size)
{
return rdma_malloc_alloc(rdma_conn->ctx->alloc, size);
}
void nfs_client_t::destroy_rdma_conn()
{
if (rdma_conn)
{
delete rdma_conn;
rdma_conn = NULL;
}
}
#endif

View File

@ -168,7 +168,7 @@ struct WRITE3args {
offset3 offset;
count3 count;
stable_how stable;
opaque data<>;
opaque data<>; /* RDMA DDP-eligible */
};
typedef opaque writeverf3[NFS3_WRITEVERFSIZE];
@ -409,7 +409,7 @@ struct READ3resok {
post_op_attr file_attributes;
count3 count;
bool eof;
opaque data<>;
opaque data<>; /* RDMA DDP-eligible */
};
struct READ3resfail {
@ -514,7 +514,7 @@ typedef string nfspath3<>;
struct symlinkdata3 {
sattr3 symlink_attributes;
nfspath3 symlink_data;
nfspath3 symlink_data; /* RDMA DDP-eligible */
};
struct SYMLINK3args {
@ -546,7 +546,7 @@ struct READLINK3args {
struct READLINK3resok {
post_op_attr symlink_attributes;
nfspath3 data;
nfspath3 data; /* RDMA DDP-eligible */
};
struct READLINK3resfail {

View File

@ -272,7 +272,7 @@ xdr_WRITE3args (XDR *xdrs, WRITE3args *objp)
return FALSE;
if (!xdr_stable_how (xdrs, &objp->stable))
return FALSE;
if (!xdr_bytes(xdrs, &objp->data, ~0))
if (!xdr_bytes(xdrs, &objp->data, ~0, true))
return FALSE;
return TRUE;
}
@ -829,7 +829,7 @@ xdr_READ3resok (XDR *xdrs, READ3resok *objp)
return FALSE;
if (!xdr_bool (xdrs, &objp->eof))
return FALSE;
if (!xdr_bytes(xdrs, &objp->data, ~0))
if (!xdr_bytes(xdrs, &objp->data, ~0, true))
return FALSE;
return TRUE;
}
@ -1173,10 +1173,10 @@ xdr_PATHCONF3res (XDR *xdrs, PATHCONF3res *objp)
}
bool_t
xdr_nfspath3 (XDR *xdrs, nfspath3 *objp)
xdr_nfspath3 (XDR *xdrs, nfspath3 *objp, bool rdma_chunk)
{
if (!xdr_string (xdrs, objp, ~0))
if (!xdr_string (xdrs, objp, ~0, rdma_chunk))
return FALSE;
return TRUE;
}
@ -1187,7 +1187,7 @@ xdr_symlinkdata3 (XDR *xdrs, symlinkdata3 *objp)
if (!xdr_sattr3 (xdrs, &objp->symlink_attributes))
return FALSE;
if (!xdr_nfspath3 (xdrs, &objp->symlink_data))
if (!xdr_nfspath3 (xdrs, &objp->symlink_data, true))
return FALSE;
return TRUE;
}
@ -1259,7 +1259,7 @@ xdr_READLINK3resok (XDR *xdrs, READLINK3resok *objp)
if (!xdr_post_op_attr (xdrs, &objp->symlink_attributes))
return FALSE;
if (!xdr_nfspath3 (xdrs, &objp->data))
if (!xdr_nfspath3 (xdrs, &objp->data, true))
return FALSE;
return TRUE;
}

View File

@ -1,6 +1,7 @@
#pragma once
#include "rpc.h"
#include "rpc_rdma.h"
struct rpc_op_t;
@ -27,12 +28,16 @@ inline bool operator < (const rpc_service_proc_t & a, const rpc_service_proc_t &
return a.prog < b.prog || a.prog == b.prog && (a.vers < b.vers || a.vers == b.vers && a.proc < b.proc);
}
struct rdma_msg;
struct rpc_op_t
{
void *client;
uint8_t *buffer;
XDR *xdrs;
rpc_msg in_msg, out_msg;
rdma_msg in_rdma_msg;
rpc_rdma_errcode rdma_error;
void *request;
void *reply;
xdrproc_t reply_fn;

144
src/nfs/proto/rpc_rdma.h Normal file
View File

@ -0,0 +1,144 @@
/*
* Please do not edit this file.
* It was generated using rpcgen.
*/
#ifndef _RPC_RDMA_H_RPCGEN
#define _RPC_RDMA_H_RPCGEN
#include "xdr_impl.h"
#ifdef __cplusplus
extern "C" {
#endif
struct xdr_rdma_segment {
uint32_t handle;
uint32_t length;
uint64_t offset;
};
typedef struct xdr_rdma_segment xdr_rdma_segment;
struct xdr_read_chunk {
uint32_t position;
struct xdr_rdma_segment target;
};
typedef struct xdr_read_chunk xdr_read_chunk;
struct xdr_read_list {
struct xdr_read_chunk entry;
struct xdr_read_list *next;
};
typedef struct xdr_read_list xdr_read_list;
struct xdr_write_chunk {
struct {
u_int target_len;
struct xdr_rdma_segment *target_val;
} target;
};
typedef struct xdr_write_chunk xdr_write_chunk;
struct xdr_write_list {
struct xdr_write_chunk entry;
struct xdr_write_list *next;
};
typedef struct xdr_write_list xdr_write_list;
struct rpc_rdma_header {
struct xdr_read_list *rdma_reads;
struct xdr_write_list *rdma_writes;
struct xdr_write_chunk *rdma_reply;
};
typedef struct rpc_rdma_header rpc_rdma_header;
struct rpc_rdma_header_nomsg {
struct xdr_read_list *rdma_reads;
struct xdr_write_list *rdma_writes;
struct xdr_write_chunk *rdma_reply;
};
typedef struct rpc_rdma_header_nomsg rpc_rdma_header_nomsg;
struct rpc_rdma_header_padded {
uint32_t rdma_align;
uint32_t rdma_thresh;
struct xdr_read_list *rdma_reads;
struct xdr_write_list *rdma_writes;
struct xdr_write_chunk *rdma_reply;
};
typedef struct rpc_rdma_header_padded rpc_rdma_header_padded;
enum rpc_rdma_errcode {
ERR_VERS = 1,
ERR_CHUNK = 2,
};
typedef enum rpc_rdma_errcode rpc_rdma_errcode;
struct rpc_rdma_errvers {
uint32_t rdma_vers_low;
uint32_t rdma_vers_high;
};
typedef struct rpc_rdma_errvers rpc_rdma_errvers;
struct rpc_rdma_error {
rpc_rdma_errcode err;
union {
rpc_rdma_errvers range;
};
};
typedef struct rpc_rdma_error rpc_rdma_error;
enum rdma_proc {
RDMA_MSG = 0,
RDMA_NOMSG = 1,
RDMA_MSGP = 2,
RDMA_DONE = 3,
RDMA_ERROR = 4,
};
typedef enum rdma_proc rdma_proc;
struct rdma_body {
rdma_proc proc;
union {
rpc_rdma_header rdma_msg;
rpc_rdma_header_nomsg rdma_nomsg;
rpc_rdma_header_padded rdma_msgp;
rpc_rdma_error rdma_error;
};
};
typedef struct rdma_body rdma_body;
struct rdma_msg {
uint32_t rdma_xid;
uint32_t rdma_vers;
uint32_t rdma_credit;
rdma_body rdma_body;
};
typedef struct rdma_msg rdma_msg;
/* the xdr functions */
extern bool_t xdr_xdr_rdma_segment (XDR *, xdr_rdma_segment*);
extern bool_t xdr_xdr_read_chunk (XDR *, xdr_read_chunk*);
extern bool_t xdr_xdr_read_list (XDR *, xdr_read_list*);
extern bool_t xdr_xdr_write_chunk (XDR *, xdr_write_chunk*);
extern bool_t xdr_xdr_write_list (XDR *, xdr_write_list*);
extern bool_t xdr_rpc_rdma_header (XDR *, rpc_rdma_header*);
extern bool_t xdr_rpc_rdma_header_nomsg (XDR *, rpc_rdma_header_nomsg*);
extern bool_t xdr_rpc_rdma_header_padded (XDR *, rpc_rdma_header_padded*);
extern bool_t xdr_rpc_rdma_errcode (XDR *, rpc_rdma_errcode*);
extern bool_t xdr_rpc_rdma_errvers (XDR *, rpc_rdma_errvers*);
extern bool_t xdr_rpc_rdma_error (XDR *, rpc_rdma_error*);
extern bool_t xdr_rdma_proc (XDR *, rdma_proc*);
extern bool_t xdr_rdma_body (XDR *, rdma_body*);
extern bool_t xdr_rdma_msg (XDR *, rdma_msg*);
#ifdef __cplusplus
}
#endif
#endif /* !_RPC_RDMA_H_RPCGEN */

166
src/nfs/proto/rpc_rdma.x Normal file
View File

@ -0,0 +1,166 @@
/* RFC 8166 - Remote Direct Memory Access Transport for Remote Procedure Call Version 1 */
/*
* Copyright (c) 2010-2017 IETF Trust and the persons
* identified as authors of the code. All rights reserved.
*
* The authors of the code are:
* B. Callaghan, T. Talpey, and C. Lever
*
* Redistribution and use in source and binary forms, with
* or without modification, are permitted provided that the
* following conditions are met:
*
* - Redistributions of source code must retain the above
* copyright notice, this list of conditions and the
* following disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the
* following disclaimer in the documentation and/or other
* materials provided with the distribution.
*
* - Neither the name of Internet Society, IETF or IETF
* Trust, nor the names of specific contributors, may be
* used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS
* AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
* EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/*
* Plain RDMA segment (Section 3.4.3)
*/
struct xdr_rdma_segment {
uint32_t handle; /* Registered memory handle */
uint32_t length; /* Length of the chunk in bytes */
uint64_t offset; /* Chunk virtual address or offset */
};
/*
* RDMA read segment (Section 3.4.5)
*/
struct xdr_read_chunk {
uint32_t position; /* Position in XDR stream */
struct xdr_rdma_segment target;
};
/*
* Read list (Section 4.3.1)
*/
struct xdr_read_list {
struct xdr_read_chunk entry;
struct xdr_read_list *next;
};
/*
* Write chunk (Section 3.4.6)
*/
struct xdr_write_chunk {
struct xdr_rdma_segment target<>;
};
/*
* Write list (Section 4.3.2)
*/
struct xdr_write_list {
struct xdr_write_chunk entry;
struct xdr_write_list *next;
};
/*
* Chunk lists (Section 4.3)
*/
struct rpc_rdma_header {
struct xdr_read_list *rdma_reads;
struct xdr_write_list *rdma_writes;
struct xdr_write_chunk *rdma_reply;
/* rpc body follows */
};
struct rpc_rdma_header_nomsg {
struct xdr_read_list *rdma_reads;
struct xdr_write_list *rdma_writes;
struct xdr_write_chunk *rdma_reply;
};
/* Not to be used */
struct rpc_rdma_header_padded {
uint32_t rdma_align;
uint32_t rdma_thresh;
struct xdr_read_list *rdma_reads;
struct xdr_write_list *rdma_writes;
struct xdr_write_chunk *rdma_reply;
/* rpc body follows */
};
/*
* Error handling (Section 4.5)
*/
enum rpc_rdma_errcode {
ERR_VERS = 1, /* Value fixed for all versions */
ERR_CHUNK = 2
};
/* Structure fixed for all versions */
struct rpc_rdma_errvers {
uint32_t rdma_vers_low;
uint32_t rdma_vers_high;
};
union rpc_rdma_error switch (rpc_rdma_errcode err) {
case ERR_VERS:
rpc_rdma_errvers range;
case ERR_CHUNK:
void;
};
/*
* Procedures (Section 4.2.4)
*/
enum rdma_proc {
RDMA_MSG = 0, /* Value fixed for all versions */
RDMA_NOMSG = 1, /* Value fixed for all versions */
RDMA_MSGP = 2, /* Not to be used */
RDMA_DONE = 3, /* Not to be used */
RDMA_ERROR = 4 /* Value fixed for all versions */
};
/* The position of the proc discriminator field is
* fixed for all versions */
union rdma_body switch (rdma_proc proc) {
case RDMA_MSG:
rpc_rdma_header rdma_msg;
case RDMA_NOMSG:
rpc_rdma_header_nomsg rdma_nomsg;
case RDMA_MSGP: /* Not to be used */
rpc_rdma_header_padded rdma_msgp;
case RDMA_DONE: /* Not to be used */
void;
case RDMA_ERROR:
rpc_rdma_error rdma_error;
};
/*
* Fixed header fields (Section 4.2)
*/
struct rdma_msg {
uint32_t rdma_xid; /* Position fixed for all versions */
uint32_t rdma_vers; /* Position fixed for all versions */
uint32_t rdma_credit; /* Position fixed for all versions */
rdma_body rdma_body;
};

View File

@ -0,0 +1,200 @@
/*
* Please do not edit this file.
* It was generated using rpcgen.
*/
#include "rpc_rdma.h"
#include "xdr_impl_inline.h"
bool_t
xdr_xdr_rdma_segment (XDR *xdrs, xdr_rdma_segment *objp)
{
if (!xdr_uint32_t (xdrs, &objp->handle))
return FALSE;
if (!xdr_uint32_t (xdrs, &objp->length))
return FALSE;
if (!xdr_uint64_t (xdrs, &objp->offset))
return FALSE;
return TRUE;
}
bool_t
xdr_xdr_read_chunk (XDR *xdrs, xdr_read_chunk *objp)
{
if (!xdr_uint32_t (xdrs, &objp->position))
return FALSE;
if (!xdr_xdr_rdma_segment (xdrs, &objp->target))
return FALSE;
return TRUE;
}
bool_t
xdr_xdr_read_list (XDR *xdrs, xdr_read_list *objp)
{
if (!xdr_xdr_read_chunk (xdrs, &objp->entry))
return FALSE;
if (!xdr_pointer (xdrs, (char **)&objp->next, sizeof (xdr_read_list), (xdrproc_t) xdr_xdr_read_list))
return FALSE;
return TRUE;
}
bool_t
xdr_xdr_write_chunk (XDR *xdrs, xdr_write_chunk *objp)
{
if (!xdr_array (xdrs, (char **)&objp->target.target_val, (u_int *) &objp->target.target_len, ~0,
sizeof (xdr_rdma_segment), (xdrproc_t) xdr_xdr_rdma_segment))
return FALSE;
return TRUE;
}
bool_t
xdr_xdr_write_list (XDR *xdrs, xdr_write_list *objp)
{
if (!xdr_xdr_write_chunk (xdrs, &objp->entry))
return FALSE;
if (!xdr_pointer (xdrs, (char **)&objp->next, sizeof (xdr_write_list), (xdrproc_t) xdr_xdr_write_list))
return FALSE;
return TRUE;
}
bool_t
xdr_rpc_rdma_header (XDR *xdrs, rpc_rdma_header *objp)
{
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reads, sizeof (xdr_read_list), (xdrproc_t) xdr_xdr_read_list))
return FALSE;
if (!xdr_pointer (xdrs, (char **)&objp->rdma_writes, sizeof (xdr_write_list), (xdrproc_t) xdr_xdr_write_list))
return FALSE;
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reply, sizeof (xdr_write_chunk), (xdrproc_t) xdr_xdr_write_chunk))
return FALSE;
return TRUE;
}
bool_t
xdr_rpc_rdma_header_nomsg (XDR *xdrs, rpc_rdma_header_nomsg *objp)
{
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reads, sizeof (xdr_read_list), (xdrproc_t) xdr_xdr_read_list))
return FALSE;
if (!xdr_pointer (xdrs, (char **)&objp->rdma_writes, sizeof (xdr_write_list), (xdrproc_t) xdr_xdr_write_list))
return FALSE;
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reply, sizeof (xdr_write_chunk), (xdrproc_t) xdr_xdr_write_chunk))
return FALSE;
return TRUE;
}
bool_t
xdr_rpc_rdma_header_padded (XDR *xdrs, rpc_rdma_header_padded *objp)
{
if (!xdr_uint32_t (xdrs, &objp->rdma_align))
return FALSE;
if (!xdr_uint32_t (xdrs, &objp->rdma_thresh))
return FALSE;
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reads, sizeof (xdr_read_list), (xdrproc_t) xdr_xdr_read_list))
return FALSE;
if (!xdr_pointer (xdrs, (char **)&objp->rdma_writes, sizeof (xdr_write_list), (xdrproc_t) xdr_xdr_write_list))
return FALSE;
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reply, sizeof (xdr_write_chunk), (xdrproc_t) xdr_xdr_write_chunk))
return FALSE;
return TRUE;
}
bool_t
xdr_rpc_rdma_errcode (XDR *xdrs, rpc_rdma_errcode *objp)
{
if (!xdr_enum (xdrs, (enum_t *) objp))
return FALSE;
return TRUE;
}
bool_t
xdr_rpc_rdma_errvers (XDR *xdrs, rpc_rdma_errvers *objp)
{
if (!xdr_uint32_t (xdrs, &objp->rdma_vers_low))
return FALSE;
if (!xdr_uint32_t (xdrs, &objp->rdma_vers_high))
return FALSE;
return TRUE;
}
bool_t
xdr_rpc_rdma_error (XDR *xdrs, rpc_rdma_error *objp)
{
if (!xdr_rpc_rdma_errcode (xdrs, &objp->err))
return FALSE;
switch (objp->err) {
case ERR_VERS:
if (!xdr_rpc_rdma_errvers (xdrs, &objp->range))
return FALSE;
break;
case ERR_CHUNK:
break;
default:
return FALSE;
}
return TRUE;
}
bool_t
xdr_rdma_proc (XDR *xdrs, rdma_proc *objp)
{
if (!xdr_enum (xdrs, (enum_t *) objp))
return FALSE;
return TRUE;
}
bool_t
xdr_rdma_body (XDR *xdrs, rdma_body *objp)
{
if (!xdr_rdma_proc (xdrs, &objp->proc))
return FALSE;
switch (objp->proc) {
case RDMA_MSG:
if (!xdr_rpc_rdma_header (xdrs, &objp->rdma_msg))
return FALSE;
break;
case RDMA_NOMSG:
if (!xdr_rpc_rdma_header_nomsg (xdrs, &objp->rdma_nomsg))
return FALSE;
break;
case RDMA_MSGP:
if (!xdr_rpc_rdma_header_padded (xdrs, &objp->rdma_msgp))
return FALSE;
break;
case RDMA_DONE:
break;
case RDMA_ERROR:
if (!xdr_rpc_rdma_error (xdrs, &objp->rdma_error))
return FALSE;
break;
default:
return FALSE;
}
return TRUE;
}
bool_t
xdr_rdma_msg (XDR *xdrs, rdma_msg *objp)
{
if (!xdr_uint32_t (xdrs, &objp->rdma_xid))
return FALSE;
if (!xdr_uint32_t (xdrs, &objp->rdma_vers))
return FALSE;
if (!xdr_uint32_t (xdrs, &objp->rdma_credit))
return FALSE;
if (!xdr_rdma_body (xdrs, &objp->rdma_body))
return FALSE;
return TRUE;
}

View File

@ -46,3 +46,4 @@ run_rpcgen() {
run_rpcgen nfs
run_rpcgen rpc
run_rpcgen portmap
run_rpcgen rpc_rdma

View File

@ -16,6 +16,22 @@ void xdr_destroy(XDR* xdrs)
delete xdrs;
}
void xdr_set_rdma(XDR *xdrs)
{
xdrs->rdma = true;
}
void xdr_set_rdma_chunk(XDR *xdrs, void *chunk)
{
assert(!xdrs->rdma_chunk || !chunk);
xdrs->rdma_chunk = chunk;
}
void* xdr_get_rdma_chunk(XDR *xdrs)
{
return xdrs->rdma_chunk;
}
void xdr_reset(XDR *xdrs)
{
for (auto buf: xdrs->allocs)
@ -23,6 +39,9 @@ void xdr_reset(XDR *xdrs)
free(buf);
}
xdrs->buf = NULL;
xdrs->rdma = false;
xdrs->rdma_chunk = NULL;
xdrs->rdma_chunk_used = false;
xdrs->avail = 0;
xdrs->allocs.resize(0);
xdrs->in_linked_list.resize(0);

View File

@ -55,6 +55,15 @@ void xdr_destroy(XDR* xdrs);
// Free resources from any previous xdr_decode/xdr_encode calls
void xdr_reset(XDR *xdrs);
// Mark XDR as used for RDMA
void xdr_set_rdma(XDR *xdrs);
// Set (single) RDMA chunk buffer for this xdr before decoding an RDMA message
void xdr_set_rdma_chunk(XDR *xdrs, void *chunk);
// Get the current RDMA chunk buffer
void* xdr_get_rdma_chunk(XDR *xdrs);
// Try to decode <size> bytes from buffer <buf> using <fn>
// Result may contain memory allocations that will be valid until the next call to xdr_{reset,destroy,decode,encode}
int xdr_decode(XDR *xdrs, void *buf, unsigned size, xdrproc_t fn, void *data);

View File

@ -28,6 +28,19 @@
// RPC over TCP:
//
// BE 32bit length, then rpc_msg, then the procedure message itself
//
// RPC over RDMA:
// RFC 8166 - Remote Direct Memory Access Transport for Remote Procedure Call Version 1
// RFC 8267 - Network File System (NFS) Upper-Layer Binding to RPC-over-RDMA Version 1
// RFC 8797 - Remote Direct Memory Access - Connection Manager (RDMA-CM) Private Data for RPC-over-RDMA Version 1
// message is received in an RDMA Receive operation
// message: list of read chunks, list of write chunks, optional reply write chunk, then actual RPC body if present
// read chunk: BE 32bit position, BE 32bit registered memory key, BE 32bit length, BE 64bit offset
// write chunk: BE 32bit registered memory key, BE 32bit length, BE 64bit offset
// in reality for NFS 3.0: only 1 read chunk in write3 and symlink3, only 1 write chunk in read3 and readlink3
// read chunk is read by the server using RDMA Read from the client memory after receiving RPC request
// write chunk is pushed by the server using RDMA Write to the client memory before sending RPC reply
// connection is established using RDMA-CM at default port 20049
#pragma once
@ -35,6 +48,7 @@
#include <string.h>
#include <endian.h>
#include <assert.h>
#include <vector>
#include "malloc_or_die.h"
@ -61,6 +75,9 @@ struct xdr_linked_list_t
struct XDR
{
int x_op;
bool rdma = false;
void *rdma_chunk = NULL;
bool rdma_chunk_used = false;
// For decoding:
uint8_t *buf = NULL;
@ -106,13 +123,23 @@ inline int xdr_opaque(XDR *xdrs, void *data, uint32_t len)
return 1;
}
inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen, bool rdma_chunk = false)
{
if (xdrs->x_op == XDR_DECODE)
{
if (xdrs->avail < 4)
return 0;
uint32_t len = be32toh(*((uint32_t*)xdrs->buf));
if (rdma_chunk && xdrs->rdma)
{
// Take (only a single) RDMA chunk from xdrs->rdma_chunk while decoding
assert(xdrs->rdma_chunk);
assert(!xdrs->rdma_chunk_used);
xdrs->rdma_chunk_used = true;
data->data = (char*)xdrs->rdma_chunk;
data->size = len;
return 1;
}
uint32_t padded = len_pad4(len);
if (xdrs->avail < 4+padded)
return 0;
@ -123,7 +150,8 @@ inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
}
else
{
if (data->size < XDR_COPY_LENGTH)
// Always encode RDMA chunks as separate iovecs
if (data->size < XDR_COPY_LENGTH && (!rdma_chunk || !xdrs->rdma))
{
unsigned old = xdrs->cur_out.size();
xdrs->cur_out.resize(old + 4+data->size);
@ -146,8 +174,9 @@ inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
.iov_len = data->size,
});
}
if (data->size & 3)
if ((data->size & 3) && (!rdma_chunk || !xdrs->rdma))
{
// No padding for RDMA chunks
int pad = 4-(data->size & 3);
unsigned old = xdrs->cur_out.size();
xdrs->cur_out.resize(old+pad);
@ -158,9 +187,9 @@ inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
return 1;
}
inline int xdr_string(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
inline int xdr_string(XDR *xdrs, xdr_string_t *data, uint32_t maxlen, bool rdma_chunk = false)
{
return xdr_bytes(xdrs, data, maxlen);
return xdr_bytes(xdrs, data, maxlen, rdma_chunk);
}
inline int xdr_u_int(XDR *xdrs, void *data)
@ -182,6 +211,11 @@ inline int xdr_u_int(XDR *xdrs, void *data)
return 1;
}
inline int xdr_uint32_t(XDR *xdrs, void *data)
{
return xdr_u_int(xdrs, data);
}
inline int xdr_enum(XDR *xdrs, void *data)
{
return xdr_u_int(xdrs, data);

192
src/nfs/rdma_alloc.cpp Normal file
View File

@ -0,0 +1,192 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
//
// Simple & stupid RDMA-enabled memory allocator (allocates buffers within ibv_mr's)
#include <stdio.h>
#include <assert.h>
#include <map>
#include <set>
#include "rdma_alloc.h"
#include "malloc_or_die.h"
struct rdma_buf_t
{
void *buf = NULL;
size_t len = 0;
ibv_mr *mr = NULL;
};
struct rdma_frag_t
{
rdma_buf_t *buf = NULL;
size_t len = 0;
bool is_free = false;
};
struct rdma_allocator_t
{
size_t rdma_alloc_size = 1048576;
size_t rdma_max_unused = 500*1048576;
int rdma_access = IBV_ACCESS_LOCAL_WRITE;
ibv_pd *pd = NULL;
std::set<rdma_buf_t*> buffers;
std::map<void*, rdma_frag_t> bufferfrags;
std::set<std::pair<size_t, void*>> freelist;
size_t freebuffers = 0;
};
rdma_allocator_t *rdma_malloc_create(ibv_pd *pd, size_t rdma_alloc_size, size_t rdma_max_unused, int rdma_access)
{
rdma_allocator_t *self = (rdma_allocator_t*)malloc_or_die(sizeof(rdma_allocator_t));
self->pd = pd;
self->rdma_alloc_size = rdma_alloc_size ? rdma_alloc_size : 1048576;
self->rdma_max_unused = rdma_max_unused ? rdma_max_unused : 500*1048576;
self->rdma_access = rdma_access;
return self;
}
static void rdma_malloc_free_unused_buffers(rdma_allocator_t *self, size_t max_unused, bool force)
{
auto free_it = self->freelist.end();
if (free_it == self->freelist.begin())
return;
free_it--;
do
{
auto frag_it = self->bufferfrags.find(free_it->second);
assert(frag_it != self->bufferfrags.end());
if (frag_it->second.len != frag_it->second.buf->len)
{
if (force)
{
fprintf(stderr, "BUG: Attempt to destroy RDMA allocator while buffers are not freed yet\n");
abort();
}
break;
}
self->freebuffers -= frag_it->second.buf->len;
ibv_dereg_mr(frag_it->second.buf->mr);
free(frag_it->second.buf);
self->buffers.erase(frag_it->second.buf);
self->bufferfrags.erase(frag_it);
self->freelist.erase(free_it--);
} while (free_it != self->freelist.begin() && self->freebuffers > max_unused);
}
void rdma_malloc_destroy(rdma_allocator_t *self)
{
rdma_malloc_free_unused_buffers(self, 0, true);
assert(!self->freebuffers);
assert(!self->buffers.size());
assert(!self->bufferfrags.size());
assert(!self->freelist.size());
free(self);
}
void *rdma_malloc_alloc(rdma_allocator_t *self, size_t size)
{
auto it = self->freelist.lower_bound(std::pair<size_t, void*>(size, 0));
if (it == self->freelist.end())
{
// round size up to rdma_malloc_size (1 MB)
size_t alloc_size = ((size + self->rdma_alloc_size - 1) / self->rdma_alloc_size) * self->rdma_alloc_size;
rdma_buf_t *b = (rdma_buf_t*)malloc_or_die(alloc_size + sizeof(rdma_buf_t));
b->buf = b+1;
b->len = alloc_size;
b->mr = ibv_reg_mr(self->pd, b->buf, b->len, self->rdma_access);
if (!b->mr)
{
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
exit(1);
}
self->buffers.insert(b);
self->bufferfrags[b->buf] = (rdma_frag_t){ .buf = b, .len = alloc_size, .is_free = true };
it = self->freelist.insert(std::pair<size_t, void*>(alloc_size, b->buf)).first;
}
void *ptr = it->second;
auto & frag = self->bufferfrags.at(ptr);
assert(frag.len >= size && frag.is_free);
if (frag.len == size)
{
frag.is_free = false;
self->freelist.erase(it);
}
else
{
ptr = (uint8_t*)ptr + frag.len - size;
frag.len -= size;
self->bufferfrags[ptr] = (rdma_frag_t){ .buf = frag.buf, .len = size, .is_free = false };
}
return ptr;
}
void rdma_malloc_free(rdma_allocator_t *self, void *buf)
{
auto frag_it = self->bufferfrags.find(buf);
if (frag_it == self->bufferfrags.end())
{
fprintf(stderr, "BUG: Attempt to double-free RDMA buffer fragment 0x%jx\n", (size_t)buf);
return;
}
auto prev_it = frag_it, next_it = frag_it;
if (frag_it != self->bufferfrags.begin())
prev_it--;
next_it++;
bool merge_back = prev_it != frag_it &&
prev_it->second.is_free &&
prev_it->second.buf == frag_it->second.buf &&
(uint8_t*)prev_it->first+prev_it->second.len == frag_it->first;
bool merge_next = next_it != self->bufferfrags.end() &&
next_it->second.is_free &&
next_it->second.buf == frag_it->second.buf &&
next_it->first == (uint8_t*)frag_it->first+frag_it->second.len;
if (merge_back && merge_next)
{
prev_it->second.len += frag_it->second.len + next_it->second.len;
self->freelist.erase(std::pair<size_t, void*>(next_it->second.len, next_it->first));
self->bufferfrags.erase(next_it);
self->bufferfrags.erase(frag_it);
frag_it = prev_it;
}
else if (merge_back)
{
prev_it->second.len += frag_it->second.len;
self->bufferfrags.erase(frag_it);
frag_it = prev_it;
}
else if (merge_next)
{
frag_it->second.is_free = true;
frag_it->second.len += next_it->second.len;
self->freelist.erase(std::pair<size_t, void*>(next_it->second.len, next_it->first));
self->bufferfrags.erase(next_it);
}
else
{
frag_it->second.is_free = true;
self->freelist.insert(std::pair<size_t, void*>(frag_it->second.len, frag_it->first));
}
assert(frag_it->second.len <= frag_it->second.buf->len);
if (frag_it->second.len == frag_it->second.buf->len)
{
// The whole buffer is freed
self->freebuffers += frag_it->second.buf->len;
if (self->freebuffers > self->rdma_max_unused)
{
rdma_malloc_free_unused_buffers(self, self->rdma_max_unused, false);
}
}
}
uint32_t rdma_malloc_get_lkey(rdma_allocator_t *self, void *buf)
{
auto frag_it = self->bufferfrags.find(buf);
if (frag_it == self->bufferfrags.end())
{
fprintf(stderr, "BUG: Attempt to use an unknown RDMA buffer fragment 0x%zx\n", (size_t)buf);
abort();
}
return frag_it->second.buf->mr->lkey;
}

17
src/nfs/rdma_alloc.h Normal file
View File

@ -0,0 +1,17 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
//
// Simple & stupid RDMA-enabled memory allocator (allocates buffers within ibv_mr's)
#pragma once
#include <infiniband/verbs.h>
#include <stdint.h>
struct rdma_allocator_t;
rdma_allocator_t *rdma_malloc_create(ibv_pd *pd, size_t rdma_alloc_size, size_t rdma_max_unused, int rdma_access);
void rdma_malloc_destroy(rdma_allocator_t *self);
void *rdma_malloc_alloc(rdma_allocator_t *self, size_t size);
void rdma_malloc_free(rdma_allocator_t *self, void *buf);
uint32_t rdma_malloc_get_lkey(rdma_allocator_t *self, void *buf);