Compare commits
1 Commits
Author | SHA1 | Date |
---|---|---|
Vitaliy Filippov | 22a80bb4cb |
|
@ -61,6 +61,10 @@ pkg_check_modules(ISAL libisal)
|
||||||
if (ISAL_LIBRARIES)
|
if (ISAL_LIBRARIES)
|
||||||
add_definitions(-DWITH_ISAL)
|
add_definitions(-DWITH_ISAL)
|
||||||
endif (ISAL_LIBRARIES)
|
endif (ISAL_LIBRARIES)
|
||||||
|
pkg_check_modules(RDMACM librdmacm)
|
||||||
|
if (RDMACM_LIBRARIES)
|
||||||
|
add_definitions(-DWITH_RDMACM)
|
||||||
|
endif (RDMACM_LIBRARIES)
|
||||||
|
|
||||||
add_custom_target(build_tests)
|
add_custom_target(build_tests)
|
||||||
add_custom_target(test
|
add_custom_target(test
|
||||||
|
|
|
@ -5,6 +5,7 @@ project(vitastor)
|
||||||
# vitastor-nfs
|
# vitastor-nfs
|
||||||
add_executable(vitastor-nfs
|
add_executable(vitastor-nfs
|
||||||
nfs_proxy.cpp
|
nfs_proxy.cpp
|
||||||
|
nfs_proxy_rdma.cpp
|
||||||
nfs_block.cpp
|
nfs_block.cpp
|
||||||
nfs_kv.cpp
|
nfs_kv.cpp
|
||||||
nfs_kv_create.cpp
|
nfs_kv_create.cpp
|
||||||
|
@ -21,8 +22,10 @@ add_executable(vitastor-nfs
|
||||||
nfs_fsstat.cpp
|
nfs_fsstat.cpp
|
||||||
nfs_mount.cpp
|
nfs_mount.cpp
|
||||||
nfs_portmap.cpp
|
nfs_portmap.cpp
|
||||||
|
rdma_alloc.cpp
|
||||||
../util/sha256.c
|
../util/sha256.c
|
||||||
proto/xdr_impl.cpp
|
proto/xdr_impl.cpp
|
||||||
|
proto/rpc_rdma_xdr.cpp
|
||||||
proto/rpc_xdr.cpp
|
proto/rpc_xdr.cpp
|
||||||
proto/portmap_xdr.cpp
|
proto/portmap_xdr.cpp
|
||||||
proto/nfs_xdr.cpp
|
proto/nfs_xdr.cpp
|
||||||
|
|
|
@ -315,8 +315,7 @@ static int block_nfs3_read_proc(void *opaque, rpc_op_t *rop)
|
||||||
if (aligned_count % alignment)
|
if (aligned_count % alignment)
|
||||||
aligned_count = aligned_count + alignment - (aligned_count % alignment);
|
aligned_count = aligned_count + alignment - (aligned_count % alignment);
|
||||||
aligned_count -= aligned_offset;
|
aligned_count -= aligned_offset;
|
||||||
void *buf = malloc_or_die(aligned_count);
|
void *buf = self->malloc_or_rdma(rop, aligned_count);
|
||||||
xdr_add_malloc(rop->xdrs, buf);
|
|
||||||
cluster_op_t *op = new cluster_op_t;
|
cluster_op_t *op = new cluster_op_t;
|
||||||
op->opcode = OSD_OP_READ;
|
op->opcode = OSD_OP_READ;
|
||||||
op->inode = ino_it->second;
|
op->inode = ino_it->second;
|
||||||
|
|
|
@ -499,6 +499,20 @@ void nfs_proxy_t::check_default_pool()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nfs_client_t *nfs_proxy_t::create_client()
|
||||||
|
{
|
||||||
|
auto cli = new nfs_client_t();
|
||||||
|
cli->parent = this;
|
||||||
|
if (kvfs)
|
||||||
|
nfs_kv_procs(cli);
|
||||||
|
else
|
||||||
|
nfs_block_procs(cli);
|
||||||
|
for (auto & fn: pmap.proc_table)
|
||||||
|
cli->proc_table.insert(fn);
|
||||||
|
rpc_clients.insert(cli);
|
||||||
|
return cli;
|
||||||
|
}
|
||||||
|
|
||||||
void nfs_proxy_t::do_accept(int listen_fd)
|
void nfs_proxy_t::do_accept(int listen_fd)
|
||||||
{
|
{
|
||||||
struct sockaddr_storage addr;
|
struct sockaddr_storage addr;
|
||||||
|
@ -512,18 +526,8 @@ void nfs_proxy_t::do_accept(int listen_fd)
|
||||||
fcntl(nfs_fd, F_SETFL, fcntl(nfs_fd, F_GETFL, 0) | O_NONBLOCK);
|
fcntl(nfs_fd, F_SETFL, fcntl(nfs_fd, F_GETFL, 0) | O_NONBLOCK);
|
||||||
int one = 1;
|
int one = 1;
|
||||||
setsockopt(nfs_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
setsockopt(nfs_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
||||||
auto cli = new nfs_client_t();
|
auto cli = this->create_client();
|
||||||
if (kvfs)
|
|
||||||
nfs_kv_procs(cli);
|
|
||||||
else
|
|
||||||
nfs_block_procs(cli);
|
|
||||||
cli->parent = this;
|
|
||||||
cli->nfs_fd = nfs_fd;
|
cli->nfs_fd = nfs_fd;
|
||||||
for (auto & fn: pmap.proc_table)
|
|
||||||
{
|
|
||||||
cli->proc_table.insert(fn);
|
|
||||||
}
|
|
||||||
rpc_clients[nfs_fd] = cli;
|
|
||||||
epmgr->tfd->set_fd_handler(nfs_fd, true, [cli](int nfs_fd, int epoll_events)
|
epmgr->tfd->set_fd_handler(nfs_fd, true, [cli](int nfs_fd, int epoll_events)
|
||||||
{
|
{
|
||||||
// Handle incoming event
|
// Handle incoming event
|
||||||
|
@ -781,7 +785,7 @@ void nfs_client_t::stop()
|
||||||
if (refs <= 0)
|
if (refs <= 0)
|
||||||
{
|
{
|
||||||
auto parent = this->parent;
|
auto parent = this->parent;
|
||||||
parent->rpc_clients.erase(nfs_fd);
|
parent->rpc_clients.erase(this);
|
||||||
parent->active_connections--;
|
parent->active_connections--;
|
||||||
parent->epmgr->tfd->set_fd_handler(nfs_fd, true, NULL);
|
parent->epmgr->tfd->set_fd_handler(nfs_fd, true, NULL);
|
||||||
close(nfs_fd);
|
close(nfs_fd);
|
||||||
|
@ -885,25 +889,32 @@ void rpc_queue_reply(rpc_op_t *rop)
|
||||||
r = xdr_encode(rop->xdrs, rop->reply_fn, rop->reply);
|
r = xdr_encode(rop->xdrs, rop->reply_fn, rop->reply);
|
||||||
assert(r);
|
assert(r);
|
||||||
}
|
}
|
||||||
xdr_encode_finish(rop->xdrs, &iov_list, &iov_count);
|
if (!self->rdma_conn)
|
||||||
assert(iov_count > 0);
|
|
||||||
rop->reply_marker = 0;
|
|
||||||
for (unsigned i = 0; i < iov_count; i++)
|
|
||||||
{
|
{
|
||||||
rop->reply_marker += iov_list[i].iov_len;
|
xdr_encode_finish(rop->xdrs, &iov_list, &iov_count);
|
||||||
}
|
assert(iov_count > 0);
|
||||||
rop->reply_marker = htobe32(rop->reply_marker | 0x80000000);
|
rop->reply_marker = 0;
|
||||||
auto & to_send_list = self->write_msg.msg_iovlen ? self->next_send_list : self->send_list;
|
for (unsigned i = 0; i < iov_count; i++)
|
||||||
auto & to_outbox = self->write_msg.msg_iovlen ? self->next_outbox : self->outbox;
|
{
|
||||||
to_send_list.push_back((iovec){ .iov_base = &rop->reply_marker, .iov_len = 4 });
|
rop->reply_marker += iov_list[i].iov_len;
|
||||||
to_outbox.push_back(NULL);
|
}
|
||||||
for (unsigned i = 0; i < iov_count; i++)
|
rop->reply_marker = htobe32(rop->reply_marker | 0x80000000);
|
||||||
{
|
auto & to_send_list = self->write_msg.msg_iovlen ? self->next_send_list : self->send_list;
|
||||||
to_send_list.push_back(iov_list[i]);
|
auto & to_outbox = self->write_msg.msg_iovlen ? self->next_outbox : self->outbox;
|
||||||
|
to_send_list.push_back((iovec){ .iov_base = &rop->reply_marker, .iov_len = 4 });
|
||||||
to_outbox.push_back(NULL);
|
to_outbox.push_back(NULL);
|
||||||
|
for (unsigned i = 0; i < iov_count; i++)
|
||||||
|
{
|
||||||
|
to_send_list.push_back(iov_list[i]);
|
||||||
|
to_outbox.push_back(NULL);
|
||||||
|
}
|
||||||
|
to_outbox[to_outbox.size()-1] = rop;
|
||||||
|
self->submit_send();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
self->rdma_queue_reply(rop);
|
||||||
}
|
}
|
||||||
to_outbox[to_outbox.size()-1] = rop;
|
|
||||||
self->submit_send();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg_len)
|
int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg_len)
|
||||||
|
@ -968,6 +979,18 @@ int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg
|
||||||
// Incoming buffer isn't needed to handle request, so return 0
|
// Incoming buffer isn't needed to handle request, so return 0
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
auto rop = create_rpc_op(xdrs, inmsg, NULL);
|
||||||
|
if (!rop)
|
||||||
|
{
|
||||||
|
// No such procedure
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
rop->buffer = (uint8_t*)base_buf;
|
||||||
|
return handle_rpc_op(rop);
|
||||||
|
}
|
||||||
|
|
||||||
|
rpc_op_t *nfs_client_t::create_rpc_op(XDR *xdrs, rpc_msg *inmsg, rdma_msg *rmsg)
|
||||||
|
{
|
||||||
// Find decoder for the request
|
// Find decoder for the request
|
||||||
auto proc_it = proc_table.find((rpc_service_proc_t){
|
auto proc_it = proc_table.find((rpc_service_proc_t){
|
||||||
.prog = inmsg->body.cbody.prog,
|
.prog = inmsg->body.cbody.prog,
|
||||||
|
@ -1019,7 +1042,7 @@ int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg
|
||||||
};
|
};
|
||||||
rpc_queue_reply(rop);
|
rpc_queue_reply(rop);
|
||||||
// Incoming buffer isn't needed to handle request, so return 0
|
// Incoming buffer isn't needed to handle request, so return 0
|
||||||
return 0;
|
return NULL;
|
||||||
}
|
}
|
||||||
// Allocate memory
|
// Allocate memory
|
||||||
rpc_op_t *rop = (rpc_op_t*)malloc_or_die(
|
rpc_op_t *rop = (rpc_op_t*)malloc_or_die(
|
||||||
|
@ -1028,7 +1051,6 @@ int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg
|
||||||
rpc_reply_stat x = RPC_MSG_ACCEPTED;
|
rpc_reply_stat x = RPC_MSG_ACCEPTED;
|
||||||
*rop = (rpc_op_t){
|
*rop = (rpc_op_t){
|
||||||
.client = this,
|
.client = this,
|
||||||
.buffer = (uint8_t*)base_buf,
|
|
||||||
.xdrs = xdrs,
|
.xdrs = xdrs,
|
||||||
.out_msg = (rpc_msg){
|
.out_msg = (rpc_msg){
|
||||||
.xid = inmsg->xid,
|
.xid = inmsg->xid,
|
||||||
|
@ -1045,10 +1067,25 @@ int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg
|
||||||
.request = ((uint8_t*)rop) + sizeof(rpc_op_t),
|
.request = ((uint8_t*)rop) + sizeof(rpc_op_t),
|
||||||
.reply = ((uint8_t*)rop) + sizeof(rpc_op_t) + proc_it->req_size,
|
.reply = ((uint8_t*)rop) + sizeof(rpc_op_t) + proc_it->req_size,
|
||||||
};
|
};
|
||||||
|
// FIXME: malloc and avoid copy?
|
||||||
memcpy(&rop->in_msg, inmsg, sizeof(rpc_msg));
|
memcpy(&rop->in_msg, inmsg, sizeof(rpc_msg));
|
||||||
|
if (rmsg)
|
||||||
|
{
|
||||||
|
memcpy(&rop->in_rdma_msg, rmsg, sizeof(rdma_msg));
|
||||||
|
}
|
||||||
|
return rop;
|
||||||
|
}
|
||||||
|
|
||||||
|
int nfs_client_t::handle_rpc_op(rpc_op_t *rop)
|
||||||
|
{
|
||||||
// Try to decode the request
|
// Try to decode the request
|
||||||
// req_fn may be NULL, that means function has no arguments
|
// req_fn may be NULL, that means function has no arguments
|
||||||
if (proc_it->req_fn && !proc_it->req_fn(xdrs, rop->request))
|
auto proc_it = proc_table.find((rpc_service_proc_t){
|
||||||
|
.prog = rop->in_msg.body.cbody.prog,
|
||||||
|
.vers = rop->in_msg.body.cbody.vers,
|
||||||
|
.proc = rop->in_msg.body.cbody.proc,
|
||||||
|
});
|
||||||
|
if (proc_it == proc_table.end() || proc_it->req_fn && !proc_it->req_fn(rop->xdrs, rop->request))
|
||||||
{
|
{
|
||||||
// Invalid request
|
// Invalid request
|
||||||
rop->out_msg.body.rbody.areply.reply_data.stat = RPC_GARBAGE_ARGS;
|
rop->out_msg.body.rbody.areply.reply_data.stat = RPC_GARBAGE_ARGS;
|
||||||
|
@ -1063,13 +1100,26 @@ int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg
|
||||||
return ref;
|
return ref;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void *nfs_client_t::malloc_or_rdma(rpc_op_t *rop, size_t size)
|
||||||
|
{
|
||||||
|
if (!rdma_conn)
|
||||||
|
{
|
||||||
|
void *buf = malloc_or_die(size);
|
||||||
|
xdr_add_malloc(rop->xdrs, buf);
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
void *buf = rdma_malloc(size);
|
||||||
|
xdr_set_rdma_chunk(rop->xdrs, buf);
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
void nfs_proxy_t::daemonize()
|
void nfs_proxy_t::daemonize()
|
||||||
{
|
{
|
||||||
// Stop all clients because client I/O sometimes breaks during daemonize
|
// Stop all clients because client I/O sometimes breaks during daemonize
|
||||||
// I.e. the new process stops receiving events on the old FD
|
// I.e. the new process stops receiving events on the old FD
|
||||||
// It doesn't happen if we call sleep(1) here, but we don't want to call sleep(1)...
|
// It doesn't happen if we call sleep(1) here, but we don't want to call sleep(1)...
|
||||||
for (auto & clp: rpc_clients)
|
for (auto & cli: rpc_clients)
|
||||||
clp.second->stop();
|
cli->stop();
|
||||||
if (fork())
|
if (fork())
|
||||||
exit(0);
|
exit(0);
|
||||||
setsid();
|
setsid();
|
||||||
|
|
|
@ -22,6 +22,7 @@ class cli_tool_t;
|
||||||
struct kv_fs_state_t;
|
struct kv_fs_state_t;
|
||||||
struct block_fs_state_t;
|
struct block_fs_state_t;
|
||||||
class nfs_client_t;
|
class nfs_client_t;
|
||||||
|
struct nfs_rdma_context_t;
|
||||||
|
|
||||||
class nfs_proxy_t
|
class nfs_proxy_t
|
||||||
{
|
{
|
||||||
|
@ -55,7 +56,7 @@ public:
|
||||||
vitastorkv_dbw_t *db = NULL;
|
vitastorkv_dbw_t *db = NULL;
|
||||||
kv_fs_state_t *kvfs = NULL;
|
kv_fs_state_t *kvfs = NULL;
|
||||||
block_fs_state_t *blockfs = NULL;
|
block_fs_state_t *blockfs = NULL;
|
||||||
std::map<int, nfs_client_t*> rpc_clients;
|
std::set<nfs_client_t*> rpc_clients;
|
||||||
|
|
||||||
std::vector<XDR*> xdr_pool;
|
std::vector<XDR*> xdr_pool;
|
||||||
|
|
||||||
|
@ -72,12 +73,15 @@ public:
|
||||||
void watch_stats();
|
void watch_stats();
|
||||||
void parse_stats(etcd_kv_t & kv);
|
void parse_stats(etcd_kv_t & kv);
|
||||||
void check_default_pool();
|
void check_default_pool();
|
||||||
|
nfs_client_t* create_client();
|
||||||
void do_accept(int listen_fd);
|
void do_accept(int listen_fd);
|
||||||
void daemonize();
|
void daemonize();
|
||||||
void write_pid();
|
void write_pid();
|
||||||
void mount_fs();
|
void mount_fs();
|
||||||
void check_already_mounted();
|
void check_already_mounted();
|
||||||
void check_exit();
|
void check_exit();
|
||||||
|
|
||||||
|
nfs_rdma_context_t* create_rdma(const std::string & bind_address, int rdmacm_port);
|
||||||
};
|
};
|
||||||
|
|
||||||
struct rpc_cur_buffer_t
|
struct rpc_cur_buffer_t
|
||||||
|
@ -101,15 +105,20 @@ struct rpc_free_buffer_t
|
||||||
unsigned size;
|
unsigned size;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct nfs_rdma_conn_t;
|
||||||
|
|
||||||
class nfs_client_t
|
class nfs_client_t
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
nfs_proxy_t *parent = NULL;
|
nfs_proxy_t *parent = NULL;
|
||||||
int nfs_fd;
|
|
||||||
int epoll_events = 0;
|
|
||||||
int refs = 0;
|
int refs = 0;
|
||||||
bool stopped = false;
|
bool stopped = false;
|
||||||
std::set<rpc_service_proc_t> proc_table;
|
std::set<rpc_service_proc_t> proc_table;
|
||||||
|
nfs_rdma_conn_t *rdma_conn = NULL;
|
||||||
|
|
||||||
|
// <TCP>
|
||||||
|
int nfs_fd;
|
||||||
|
int epoll_events = 0;
|
||||||
|
|
||||||
// Read state
|
// Read state
|
||||||
rpc_cur_buffer_t cur_buffer = { 0 };
|
rpc_cur_buffer_t cur_buffer = { 0 };
|
||||||
|
@ -130,7 +139,13 @@ public:
|
||||||
void submit_send();
|
void submit_send();
|
||||||
void handle_send(int result);
|
void handle_send(int result);
|
||||||
int handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg_len);
|
int handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg_len);
|
||||||
|
// </TCP>
|
||||||
|
|
||||||
|
rpc_op_t *create_rpc_op(XDR *xdrs, rpc_msg *inmsg, rdma_msg *rmsg);
|
||||||
|
int handle_rpc_op(rpc_op_t *rop);
|
||||||
bool deref();
|
bool deref();
|
||||||
void stop();
|
void stop();
|
||||||
|
void *malloc_or_rdma(rpc_op_t *rop, size_t size);
|
||||||
|
void *rdma_malloc(size_t size);
|
||||||
|
void rdma_queue_reply(rpc_op_t *rop);
|
||||||
};
|
};
|
||||||
|
|
|
@ -0,0 +1,747 @@
|
||||||
|
// Copyright (c) Vitaliy Filippov, 2019+
|
||||||
|
// License: VNPL-1.1 (see README.md for details)
|
||||||
|
//
|
||||||
|
// NFS RDMA support
|
||||||
|
|
||||||
|
#define _XOPEN_SOURCE
|
||||||
|
|
||||||
|
#include <rdma/rdma_cma.h>
|
||||||
|
|
||||||
|
#include "addr_util.h"
|
||||||
|
|
||||||
|
#include "proto/nfs.h"
|
||||||
|
#include "proto/rpc.h"
|
||||||
|
#include "proto/rpc_rdma.h"
|
||||||
|
|
||||||
|
#include "nfs_proxy.h"
|
||||||
|
|
||||||
|
#include "rdma_alloc.h"
|
||||||
|
|
||||||
|
#define NFS_RDMACM_PRIVATE_DATA_MAGIC_LE 0x180eabf6
|
||||||
|
|
||||||
|
struct __attribute__((__packed__)) nfs_rdmacm_private
|
||||||
|
{
|
||||||
|
uint32_t format_identifier; // magic, should be 0xf6ab0e18 in big endian
|
||||||
|
uint8_t version; // version, 1
|
||||||
|
uint8_t remote_invalidate; // remote invalidation flag (1 or 0)
|
||||||
|
uint8_t max_send_size; // maximum RDMA Send operation size / 1024 - 1 (i.e. 0 is 1 KB, 255 is 256 KB)
|
||||||
|
uint8_t max_recv_size; // maximum RDMA Receive operation size / 1024 - 1 (i.e. 0 is 1 KB, 255 is 256 KB)
|
||||||
|
};
|
||||||
|
|
||||||
|
struct nfs_rdma_buf_t
|
||||||
|
{
|
||||||
|
void *buf = NULL;
|
||||||
|
size_t len = 0;
|
||||||
|
ibv_mr *mr = NULL;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct nfs_rdma_conn_t;
|
||||||
|
|
||||||
|
struct nfs_rdma_context_t
|
||||||
|
{
|
||||||
|
std::string bind_address;
|
||||||
|
int rdmacm_port = 0;
|
||||||
|
int max_send = 8, max_recv = 8; // FIXME max_send and max_recv should probably be equal
|
||||||
|
uint64_t rdma_malloc_round_to = 1048576, rdma_max_unused_buffers = 500*1048576;
|
||||||
|
uint64_t max_send_size = 256*1024, max_recv_size = 256*1024;
|
||||||
|
|
||||||
|
nfs_proxy_t *proxy = NULL;
|
||||||
|
epoll_manager_t *epmgr = NULL;
|
||||||
|
|
||||||
|
int max_cqe = 0, used_max_cqe = 0;
|
||||||
|
rdma_event_channel *rdmacm_evch = NULL;
|
||||||
|
rdma_cm_id *listener_id = NULL;
|
||||||
|
ibv_comp_channel *channel = NULL;
|
||||||
|
ibv_cq *cq = NULL;
|
||||||
|
rdma_allocator_t *alloc = NULL;
|
||||||
|
std::map<rdma_cm_id*, nfs_rdma_conn_t*> rdma_connections;
|
||||||
|
std::map<uint32_t, nfs_rdma_conn_t*> rdma_connections_by_qp;
|
||||||
|
|
||||||
|
~nfs_rdma_context_t();
|
||||||
|
void handle_io();
|
||||||
|
void handle_rdmacm_events();
|
||||||
|
void rdmacm_accept(rdma_cm_event *ev);
|
||||||
|
void rdmacm_established(rdma_cm_event *ev);
|
||||||
|
};
|
||||||
|
|
||||||
|
struct nfs_rdma_conn_t
|
||||||
|
{
|
||||||
|
nfs_rdma_context_t *ctx = NULL;
|
||||||
|
nfs_client_t *client = NULL;
|
||||||
|
rdma_cm_id *id = NULL;
|
||||||
|
int max_send = 8, max_recv = 8; // FIXME set it
|
||||||
|
int max_send_size = 256*1024, max_recv_size = 256*1024;
|
||||||
|
int remote_max_send_size = 1024, remote_max_recv_size = 1024; // FIXME is it used?
|
||||||
|
bool remote_invalidate = false;
|
||||||
|
bool established = false;
|
||||||
|
int cur_recv = 0, cur_send = 0; // FIXME use it ...
|
||||||
|
std::vector<nfs_rdma_buf_t> recv_buffers;
|
||||||
|
std::map<void*, nfs_rdma_buf_t> used_buffers;
|
||||||
|
int next_recv_buf = 0;
|
||||||
|
std::vector<nfs_rdma_buf_t> send_buffers;
|
||||||
|
std::vector<rpc_op_t*> outbox;
|
||||||
|
std::vector<rpc_op_t*> chunk_inbox;
|
||||||
|
int outbox_pos = 0;
|
||||||
|
|
||||||
|
void post_initial_receives();
|
||||||
|
~nfs_rdma_conn_t();
|
||||||
|
nfs_rdma_buf_t create_buf(size_t len);
|
||||||
|
void post_recv(nfs_rdma_buf_t b);
|
||||||
|
void post_send();
|
||||||
|
bool handle_recv(void *buf, size_t len);
|
||||||
|
};
|
||||||
|
|
||||||
|
nfs_rdma_context_t* nfs_proxy_t::create_rdma(const std::string & bind_address, int rdmacm_port)
|
||||||
|
{
|
||||||
|
nfs_rdma_context_t* self = new nfs_rdma_context_t;
|
||||||
|
self->proxy = this;
|
||||||
|
self->epmgr = epmgr;
|
||||||
|
self->bind_address = bind_address;
|
||||||
|
self->rdmacm_port = rdmacm_port;
|
||||||
|
self->rdmacm_evch = rdma_create_event_channel();
|
||||||
|
if (!self->rdmacm_evch)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "Failed to initialize RDMA-CM event channel: %s (code %d)\n", strerror(errno), errno);
|
||||||
|
delete self;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
fcntl(self->rdmacm_evch->fd, F_SETFL, fcntl(self->rdmacm_evch->fd, F_GETFL, 0) | O_NONBLOCK);
|
||||||
|
epmgr->tfd->set_fd_handler(self->rdmacm_evch->fd, false, [self](int rdmacm_eventfd, int epoll_events)
|
||||||
|
{
|
||||||
|
self->handle_rdmacm_events();
|
||||||
|
});
|
||||||
|
int r = rdma_create_id(self->rdmacm_evch, &self->listener_id, NULL, RDMA_PS_TCP);
|
||||||
|
if (r != 0)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "Failed to create RDMA-CM ID: %s (code %d)\n", strerror(errno), errno);
|
||||||
|
delete self;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
sockaddr_storage addr;
|
||||||
|
if (!string_to_addr(bind_address, 0, rdmacm_port, &addr))
|
||||||
|
{
|
||||||
|
fprintf(stderr, "Server address: %s is not valid\n", bind_address.c_str());
|
||||||
|
delete self;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
r = rdma_bind_addr(self->listener_id, (sockaddr*)&addr);
|
||||||
|
if (r != 0)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "Failed to bind RDMA-CM to %s:%d: %s (code %d)\n", bind_address.c_str(), rdmacm_port, strerror(errno), errno);
|
||||||
|
delete self;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
r = rdma_listen(self->listener_id, 128);
|
||||||
|
if (r != 0)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "Failed to listen RDMA-CM: %s (code %d)\n", strerror(errno), errno);
|
||||||
|
delete self;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
self->channel = ibv_create_comp_channel(self->listener_id->verbs);
|
||||||
|
if (!self->channel)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "Couldn't create RDMA completion channel\n");
|
||||||
|
delete self;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
fcntl(self->channel->fd, F_SETFL, fcntl(self->channel->fd, F_GETFL, 0) | O_NONBLOCK);
|
||||||
|
epmgr->tfd->set_fd_handler(self->channel->fd, false, [self](int channel_eventfd, int epoll_events)
|
||||||
|
{
|
||||||
|
self->handle_io();
|
||||||
|
});
|
||||||
|
self->max_cqe = 4096;
|
||||||
|
self->cq = ibv_create_cq(self->listener_id->verbs, self->max_cqe, NULL, self->channel, 0);
|
||||||
|
if (!self->cq)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "Couldn't create RDMA completion queue\n");
|
||||||
|
delete self;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
self->alloc = rdma_malloc_create(self->listener_id->pd, self->rdma_malloc_round_to, self->rdma_max_unused_buffers, IBV_ACCESS_LOCAL_WRITE);
|
||||||
|
return self;
|
||||||
|
}
|
||||||
|
|
||||||
|
nfs_rdma_context_t::~nfs_rdma_context_t()
|
||||||
|
{
|
||||||
|
if (listener_id)
|
||||||
|
{
|
||||||
|
int r = rdma_destroy_id(listener_id);
|
||||||
|
if (r != 0)
|
||||||
|
fprintf(stderr, "Failed to destroy RDMA-CM ID: %s (code %d)\n", strerror(errno), errno);
|
||||||
|
else
|
||||||
|
listener_id = NULL;
|
||||||
|
}
|
||||||
|
if (rdmacm_evch)
|
||||||
|
{
|
||||||
|
epmgr->tfd->set_fd_handler(rdmacm_evch->fd, false, NULL);
|
||||||
|
rdma_destroy_event_channel(rdmacm_evch);
|
||||||
|
rdmacm_evch = NULL;
|
||||||
|
}
|
||||||
|
if (cq)
|
||||||
|
{
|
||||||
|
ibv_destroy_cq(cq);
|
||||||
|
cq = NULL;
|
||||||
|
}
|
||||||
|
if (channel)
|
||||||
|
{
|
||||||
|
ibv_destroy_comp_channel(channel);
|
||||||
|
channel = NULL;
|
||||||
|
}
|
||||||
|
if (alloc)
|
||||||
|
{
|
||||||
|
rdma_malloc_destroy(alloc);
|
||||||
|
alloc = NULL;
|
||||||
|
}
|
||||||
|
//if (mr)
|
||||||
|
// ibv_dereg_mr(mr);
|
||||||
|
//if (pd)
|
||||||
|
// ibv_dealloc_pd(pd);
|
||||||
|
//if (context)
|
||||||
|
// ibv_close_device(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
void nfs_rdma_context_t::handle_rdmacm_events()
|
||||||
|
{
|
||||||
|
rdma_cm_event *ev = NULL;
|
||||||
|
while (1)
|
||||||
|
{
|
||||||
|
int r = rdma_get_cm_event(rdmacm_evch, &ev);
|
||||||
|
if (r != 0)
|
||||||
|
{
|
||||||
|
if (errno == EAGAIN || errno == EINTR)
|
||||||
|
break;
|
||||||
|
fprintf(stderr, "Failed to get RDMA-CM event: %s (code %d)\n", strerror(errno), errno);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
if (ev->event == RDMA_CM_EVENT_CONNECT_REQUEST)
|
||||||
|
{
|
||||||
|
rdmacm_accept(ev);
|
||||||
|
}
|
||||||
|
else if (ev->event == RDMA_CM_EVENT_CONNECT_ERROR ||
|
||||||
|
ev->event == RDMA_CM_EVENT_REJECTED ||
|
||||||
|
ev->event == RDMA_CM_EVENT_DISCONNECTED ||
|
||||||
|
ev->event == RDMA_CM_EVENT_DEVICE_REMOVAL)
|
||||||
|
{
|
||||||
|
auto event_type_name = ev->event == RDMA_CM_EVENT_CONNECT_ERROR ? "RDMA_CM_EVENT_CONNECT_ERROR" : (
|
||||||
|
ev->event == RDMA_CM_EVENT_REJECTED ? "RDMA_CM_EVENT_REJECTED" : (
|
||||||
|
ev->event == RDMA_CM_EVENT_DISCONNECTED ? "RDMA_CM_EVENT_DISCONNECTED" : "RDMA_CM_EVENT_DEVICE_REMOVAL"));
|
||||||
|
auto conn_it = rdma_connections.find(ev->id);
|
||||||
|
if (conn_it == rdma_connections.end())
|
||||||
|
{
|
||||||
|
fprintf(stderr, "Received %s event for an unknown connection 0x%lx - ignoring\n",
|
||||||
|
event_type_name, (uint64_t)ev->id);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
fprintf(stderr, "Received %s event for connection 0x%lx - closing it\n",
|
||||||
|
event_type_name, (uint64_t)ev->id);
|
||||||
|
auto conn = conn_it->second;
|
||||||
|
delete conn;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (ev->event == RDMA_CM_EVENT_ESTABLISHED)
|
||||||
|
{
|
||||||
|
rdmacm_established(ev);
|
||||||
|
}
|
||||||
|
else if (ev->event == RDMA_CM_EVENT_ADDR_CHANGE || ev->event == RDMA_CM_EVENT_TIMEWAIT_EXIT)
|
||||||
|
{
|
||||||
|
// Do nothing
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Other events are unexpected
|
||||||
|
fprintf(stderr, "Unexpected RDMA-CM event type: %d\n", ev->event);
|
||||||
|
}
|
||||||
|
r = rdma_ack_cm_event(ev);
|
||||||
|
if (r != 0)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "Failed to ack (free) RDMA-CM event: %s (code %d)\n", strerror(errno), errno);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void nfs_rdma_context_t::rdmacm_accept(rdma_cm_event *ev)
|
||||||
|
{
|
||||||
|
this->used_max_cqe += max_send+max_recv;
|
||||||
|
if (this->used_max_cqe > this->max_cqe)
|
||||||
|
{
|
||||||
|
// Resize CQ
|
||||||
|
int new_max_cqe = this->max_cqe;
|
||||||
|
while (this->used_max_cqe > new_max_cqe)
|
||||||
|
{
|
||||||
|
new_max_cqe *= 2;
|
||||||
|
}
|
||||||
|
if (ibv_resize_cq(this->cq, new_max_cqe) != 0)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "Couldn't resize RDMA completion queue to %d entries\n", new_max_cqe);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this->max_cqe = new_max_cqe;
|
||||||
|
}
|
||||||
|
ibv_qp_init_attr init_attr = {
|
||||||
|
.send_cq = this->cq,
|
||||||
|
.recv_cq = this->cq,
|
||||||
|
.cap = {
|
||||||
|
.max_send_wr = max_send*2, // FIXME how many max_send/max_recv?
|
||||||
|
.max_recv_wr = max_recv,
|
||||||
|
.max_send_sge = 1, // we don't need S/G currently
|
||||||
|
.max_recv_sge = 1,
|
||||||
|
},
|
||||||
|
.qp_type = IBV_QPT_RC,
|
||||||
|
};
|
||||||
|
int r = rdma_create_qp(ev->id, NULL, &init_attr);
|
||||||
|
if (r != 0)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "Failed to create a queue pair via RDMA-CM: %s (code %d)\n", strerror(errno), errno);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
nfs_rdmacm_private private_data = {
|
||||||
|
.format_identifier = NFS_RDMACM_PRIVATE_DATA_MAGIC_LE,
|
||||||
|
.version = 1,
|
||||||
|
.remote_invalidate = 0, // FIXME what is remote_invalidate?
|
||||||
|
.max_send_size = (max_send_size <= 256*1024 ? max_send_size/1024 - 1 : 255),
|
||||||
|
.max_recv_size = (max_recv_size <= 256*1024 ? max_recv_size/1024 - 1 : 255),
|
||||||
|
};
|
||||||
|
rdma_conn_param conn_params = {
|
||||||
|
.private_data = &private_data,
|
||||||
|
.private_data_len = sizeof(private_data),
|
||||||
|
//.responder_resources = max_qp_rd_atom of the device?,
|
||||||
|
//.initiator_depth = max_qp_init_rd_atom of the device?,
|
||||||
|
.rnr_retry_count = 7,
|
||||||
|
//.qp_num = manually created QP number?,
|
||||||
|
};
|
||||||
|
r = rdma_accept(ev->id, &conn_params);
|
||||||
|
if (r != 0)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "Failed to accept RDMA-CM connection: %s (code %d)\n", strerror(errno), errno);
|
||||||
|
rdma_destroy_qp(ev->id);
|
||||||
|
rdma_destroy_id(ev->id);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
auto conn = new nfs_rdma_conn_t();
|
||||||
|
conn->ctx = this;
|
||||||
|
conn->id = ev->id;
|
||||||
|
conn->max_send_size = max_send_size;
|
||||||
|
conn->max_recv_size = max_recv_size;
|
||||||
|
rdma_connections[ev->id] = conn;
|
||||||
|
rdma_connections_by_qp[conn->id->qp->qp_num] = conn;
|
||||||
|
auto cli = this->proxy->create_client();
|
||||||
|
conn->client = cli;
|
||||||
|
cli->rdma_conn = conn;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
nfs_rdma_conn_t::~nfs_rdma_conn_t()
|
||||||
|
{
|
||||||
|
if (id)
|
||||||
|
{
|
||||||
|
ctx->rdma_connections.erase(id);
|
||||||
|
if (id->qp)
|
||||||
|
{
|
||||||
|
ctx->rdma_connections_by_qp.erase(id->qp->qp_num);
|
||||||
|
rdma_destroy_qp(id);
|
||||||
|
}
|
||||||
|
rdma_destroy_id(id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void nfs_rdma_context_t::rdmacm_established(rdma_cm_event *ev)
|
||||||
|
{
|
||||||
|
auto conn_it = rdma_connections.find(ev->id);
|
||||||
|
if (conn_it == rdma_connections.end())
|
||||||
|
{
|
||||||
|
fprintf(stderr, "Received RDMA_CM_EVENT_ESTABLISHED event for an unknown connection 0x%lx - ignoring\n", (uint64_t)ev->id);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
fprintf(stderr, "Received RDMA_CM_EVENT_ESTABLISHED event for connection 0x%lx - connection established\n", (uint64_t)ev->id);
|
||||||
|
auto conn = conn_it->second;
|
||||||
|
conn->established = true;
|
||||||
|
// Handle NFS private_data
|
||||||
|
if (ev->param.ud.private_data_len >= sizeof(nfs_rdmacm_private))
|
||||||
|
{
|
||||||
|
nfs_rdmacm_private *private_data = (nfs_rdmacm_private *)ev->param.ud.private_data;
|
||||||
|
if (private_data->format_identifier == NFS_RDMACM_PRIVATE_DATA_MAGIC_LE &&
|
||||||
|
private_data->version == 1)
|
||||||
|
{
|
||||||
|
conn->remote_invalidate = private_data->remote_invalidate;
|
||||||
|
conn->remote_max_send_size = (private_data->max_send_size+1) * 1024;
|
||||||
|
conn->remote_max_recv_size = (private_data->max_recv_size+1) * 1024;
|
||||||
|
if (conn->remote_max_recv_size < conn->max_send_size)
|
||||||
|
conn->max_send_size = conn->remote_max_recv_size;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Post initial receive requests
|
||||||
|
conn->post_initial_receives();
|
||||||
|
}
|
||||||
|
|
||||||
|
void nfs_rdma_conn_t::post_initial_receives()
|
||||||
|
{
|
||||||
|
while (cur_recv < max_recv)
|
||||||
|
{
|
||||||
|
auto b = create_buf(max_recv_size);
|
||||||
|
recv_buffers.push_back(b);
|
||||||
|
post_recv(b);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
nfs_rdma_buf_t nfs_rdma_conn_t::create_buf(size_t len)
|
||||||
|
{
|
||||||
|
nfs_rdma_buf_t b;
|
||||||
|
b.buf = malloc_or_die(len);
|
||||||
|
b.len = len;
|
||||||
|
b.mr = ibv_reg_mr(id->pd, b.buf, len, IBV_ACCESS_LOCAL_WRITE);
|
||||||
|
if (!b.mr)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
|
||||||
|
void nfs_rdma_conn_t::post_recv(nfs_rdma_buf_t b)
|
||||||
|
{
|
||||||
|
ibv_sge sge = {
|
||||||
|
.addr = (uintptr_t)b.buf,
|
||||||
|
.length = (uint32_t)b.len,
|
||||||
|
.lkey = b.mr->lkey,
|
||||||
|
};
|
||||||
|
ibv_recv_wr *bad_wr = NULL;
|
||||||
|
ibv_recv_wr wr = {
|
||||||
|
.wr_id = 1, // 1 is any read, 2 is any write :)
|
||||||
|
.sg_list = &sge,
|
||||||
|
.num_sge = 1,
|
||||||
|
};
|
||||||
|
int err = ibv_post_recv(id->qp, &wr, &bad_wr);
|
||||||
|
if (err || bad_wr)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "RDMA receive failed: %s\n", strerror(err));
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
cur_recv++;
|
||||||
|
}
|
||||||
|
|
||||||
|
void nfs_client_t::rdma_queue_reply(rpc_op_t *rop)
|
||||||
|
{
|
||||||
|
rdma_conn->outbox.push_back(rop);
|
||||||
|
rdma_conn->post_send();
|
||||||
|
}
|
||||||
|
|
||||||
|
void nfs_rdma_conn_t::post_send()
|
||||||
|
{
|
||||||
|
while (outbox.size() > outbox_pos)
|
||||||
|
{
|
||||||
|
auto rop = outbox[outbox_pos];
|
||||||
|
nfs_rdma_buf_t b;
|
||||||
|
if (send_buffers.size())
|
||||||
|
{
|
||||||
|
b = send_buffers.back();
|
||||||
|
send_buffers.pop_back();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
b = create_buf(max_send_size);
|
||||||
|
}
|
||||||
|
rdma_msg outmsg = {
|
||||||
|
.rdma_xid = rop->in_rdma_msg.rdma_xid,
|
||||||
|
.rdma_vers = rop->in_rdma_msg.rdma_vers,
|
||||||
|
.rdma_credit = 1,
|
||||||
|
.rdma_body = {
|
||||||
|
.proc = rop->rdma_error ? RDMA_ERROR : RDMA_MSG,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
iovec *chunk_iov = NULL;
|
||||||
|
if (rop->rdma_error)
|
||||||
|
{
|
||||||
|
outmsg.rdma_body.rdma_error.err = rop->rdma_error;
|
||||||
|
if (rop->rdma_error == ERR_VERS)
|
||||||
|
outmsg.rdma_body.rdma_error.range = (rpc_rdma_errvers){ 1, 1 };
|
||||||
|
|
||||||
|
}
|
||||||
|
iovec *iov_list = NULL;
|
||||||
|
unsigned iov_count = 0;
|
||||||
|
xdr_encode_finish(rop->xdrs, &iov_list, &iov_count);
|
||||||
|
assert(iov_count > 0);
|
||||||
|
if (!rop->rdma_error)
|
||||||
|
{
|
||||||
|
outmsg.rdma_body.rdma_msg = {};
|
||||||
|
// READ3resok and READLINK3resok - extract last byte buffer from iovecs and send it in a "write chunk"
|
||||||
|
if (rop->in_msg.body.cbody.prog == NFS_PROGRAM &&
|
||||||
|
(rop->in_msg.body.cbody.proc == NFS3_READ && ((READ3res*)rop->reply)->status == NFS3_OK ||
|
||||||
|
rop->in_msg.body.cbody.proc == NFS3_READLINK && ((READLINK3res*)rop->reply)->status == NFS3_OK))
|
||||||
|
{
|
||||||
|
assert(iov_count > 1);
|
||||||
|
iov_count--;
|
||||||
|
chunk_iov = &iov_list[iov_count];
|
||||||
|
}
|
||||||
|
// FIXME: Maybe avoid extra copy? To do that we have to initially encode into nfs_rdma_buf_t
|
||||||
|
size_t pos = 0;
|
||||||
|
for (unsigned i = 0; i < iov_count; i++)
|
||||||
|
{
|
||||||
|
assert(pos + iov_list[i].iov_len <= b.len);
|
||||||
|
memcpy(b.buf + pos, iov_list[i].iov_base, iov_list[i].iov_len);
|
||||||
|
pos += iov_list[i].iov_len;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ibv_sge chunk_sge;
|
||||||
|
ibv_send_wr chunk_wr;
|
||||||
|
ibv_sge sge = {
|
||||||
|
.addr = (uintptr_t)b.buf,
|
||||||
|
.length = (uint32_t)pos,
|
||||||
|
.lkey = b.mr->lkey,
|
||||||
|
};
|
||||||
|
ibv_send_wr *bad_wr = NULL;
|
||||||
|
ibv_send_wr wr = {
|
||||||
|
.wr_id = 2, // 2 is send
|
||||||
|
.sg_list = &sge,
|
||||||
|
.num_sge = 1,
|
||||||
|
.opcode = IBV_WR_SEND,
|
||||||
|
.send_flags = IBV_SEND_SIGNALED,
|
||||||
|
};
|
||||||
|
ibv_send_wr *send_wr = ≀
|
||||||
|
if (chunk_iov != NULL)
|
||||||
|
{
|
||||||
|
auto & wr_chunk = *rop->in_rdma_msg.rdma_body.rdma_msg.rdma_writes->entry.target.target_val;
|
||||||
|
chunk_sge = {
|
||||||
|
.addr = (uintptr_t)chunk_iov->iov_base,
|
||||||
|
.length = (uint32_t)chunk_iov->iov_len,
|
||||||
|
.lkey = rdma_malloc_get_lkey(ctx->alloc, chunk_iov->iov_base),
|
||||||
|
};
|
||||||
|
chunk_wr = {
|
||||||
|
.wr_id = 4, // 4 is chunk write
|
||||||
|
.sg_list = &chunk_sge,
|
||||||
|
.num_sge = 1,
|
||||||
|
.opcode = IBV_WR_RDMA_WRITE,
|
||||||
|
.wr = {
|
||||||
|
.rdma = {
|
||||||
|
.remote_addr = wr_chunk.offset,
|
||||||
|
.rkey = wr_chunk.handle,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
// send chunk_wr first, then normal wr
|
||||||
|
chunk_wr.next = ≀
|
||||||
|
send_wr = &chunk_wr;
|
||||||
|
}
|
||||||
|
int err = ibv_post_send(id->qp, send_wr, &bad_wr);
|
||||||
|
if (err || bad_wr)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "RDMA send failed: %s\n", strerror(err));
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
cur_send++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#define RDMA_EVENTS_AT_ONCE 32
|
||||||
|
|
||||||
|
void nfs_rdma_context_t::handle_io()
|
||||||
|
{
|
||||||
|
// Request next notification
|
||||||
|
ibv_cq *ev_cq;
|
||||||
|
void *ev_ctx;
|
||||||
|
// FIXME: This is inefficient as it calls read()...
|
||||||
|
if (ibv_get_cq_event(channel, &ev_cq, &ev_ctx) == 0)
|
||||||
|
{
|
||||||
|
ibv_ack_cq_events(cq, 1);
|
||||||
|
}
|
||||||
|
if (ibv_req_notify_cq(cq, 0) != 0)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "Failed to request RDMA completion notification, exiting\n");
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
ibv_wc wc[RDMA_EVENTS_AT_ONCE];
|
||||||
|
int event_count;
|
||||||
|
do
|
||||||
|
{
|
||||||
|
event_count = ibv_poll_cq(cq, RDMA_EVENTS_AT_ONCE, wc);
|
||||||
|
for (int i = 0; i < event_count; i++)
|
||||||
|
{
|
||||||
|
auto conn_it = rdma_connections_by_qp.find(wc[i].qp_num);
|
||||||
|
if (conn_it == rdma_connections_by_qp.end())
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
auto conn = conn_it->second;
|
||||||
|
if (wc[i].status != IBV_WC_SUCCESS)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "RDMA work request failed for queue %d with status: %s, stopping client\n", wc[i].qp_num, ibv_wc_status_str(wc[i].status));
|
||||||
|
delete conn;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (wc[i].wr_id == 1)
|
||||||
|
{
|
||||||
|
// 1 = receive
|
||||||
|
conn->cur_recv--;
|
||||||
|
auto & b = conn->recv_buffers[conn->next_recv_buf];
|
||||||
|
auto is_continued = conn->handle_recv(b.buf, wc[i].byte_len);
|
||||||
|
if (is_continued)
|
||||||
|
{
|
||||||
|
// Buffer is required to handle request
|
||||||
|
// Due to the credit-based flow control in RPC-RDMA, we can just remove that buffer and reuse it later
|
||||||
|
conn->used_buffers[b.buf] = b;
|
||||||
|
conn->recv_buffers.erase(conn->recv_buffers.begin()+conn->next_recv_buf, conn->recv_buffers.begin()+conn->next_recv_buf+1);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Buffer is not required to handle request and can be reused immediately
|
||||||
|
conn->post_recv(b);
|
||||||
|
conn->next_recv_buf = (conn->next_recv_buf+1) % conn->recv_buffers.size();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (wc[i].wr_id == 2)
|
||||||
|
{
|
||||||
|
// 2 = send
|
||||||
|
auto rop = conn->outbox[0];
|
||||||
|
conn->outbox.erase(conn->outbox.begin(), conn->outbox.begin()+1);
|
||||||
|
// Free rpc_op
|
||||||
|
xdr_reset(rop->xdrs);
|
||||||
|
proxy->xdr_pool.push_back(rop->xdrs);
|
||||||
|
if (rop->buffer && rop->referenced)
|
||||||
|
{
|
||||||
|
// Reuse the buffer
|
||||||
|
auto & ub = conn->used_buffers.at(rop->buffer);
|
||||||
|
conn->recv_buffers.push_back(ub);
|
||||||
|
conn->post_recv(ub);
|
||||||
|
}
|
||||||
|
auto rdma_chunk = xdr_get_rdma_chunk(rop->xdrs);
|
||||||
|
if (rdma_chunk)
|
||||||
|
{
|
||||||
|
rdma_malloc_free(alloc, rdma_chunk);
|
||||||
|
xdr_set_rdma_chunk(rop->xdrs, NULL);
|
||||||
|
}
|
||||||
|
free(rop);
|
||||||
|
conn->post_send();
|
||||||
|
}
|
||||||
|
else if (wc[i].wr_id == 3)
|
||||||
|
{
|
||||||
|
// 3 = chunk read
|
||||||
|
auto rop = conn->chunk_inbox[0];
|
||||||
|
conn->chunk_inbox.erase(conn->chunk_inbox.begin(), conn->chunk_inbox.begin()+1);
|
||||||
|
int ref = conn->client->handle_rpc_op(rop);
|
||||||
|
if (rop->buffer && !ref)
|
||||||
|
{
|
||||||
|
// Request is handled, reuse the buffer
|
||||||
|
auto & ub = conn->used_buffers.at(rop->buffer);
|
||||||
|
conn->recv_buffers.push_back(ub);
|
||||||
|
conn->post_recv(ub);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while (event_count > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// returns false if handling is done, returns true if handling is continued asynchronously
|
||||||
|
bool nfs_rdma_conn_t::handle_recv(void *buf, size_t len)
|
||||||
|
{
|
||||||
|
// Take an XDR object from the pool
|
||||||
|
XDR *xdrs;
|
||||||
|
if (ctx->proxy->xdr_pool.size())
|
||||||
|
{
|
||||||
|
xdrs = ctx->proxy->xdr_pool.back();
|
||||||
|
ctx->proxy->xdr_pool.pop_back();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
xdrs = xdr_create();
|
||||||
|
}
|
||||||
|
xdr_set_rdma(xdrs);
|
||||||
|
// Decode the RDMA-RPC header
|
||||||
|
rdma_msg rmsg;
|
||||||
|
if (!xdr_decode(xdrs, buf, len, (xdrproc_t)xdr_rdma_msg, &rmsg))
|
||||||
|
{
|
||||||
|
// Invalid message, ignore it
|
||||||
|
xdr_reset(xdrs);
|
||||||
|
ctx->proxy->xdr_pool.push_back(xdrs);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
if (rmsg.rdma_vers != 1 || rmsg.rdma_body.proc != RDMA_MSG /*&& rmsg.rdma_body.proc != RDMA_NOMSG*/)
|
||||||
|
{
|
||||||
|
// Bad RDMA-RPC version
|
||||||
|
rpc_op_t *rop = (rpc_op_t*)malloc_or_die(sizeof(rpc_op_t));
|
||||||
|
*rop = (rpc_op_t){
|
||||||
|
.client = this,
|
||||||
|
.xdrs = xdrs,
|
||||||
|
.rdma_error = ERR_VERS,
|
||||||
|
};
|
||||||
|
rpc_queue_reply(rop);
|
||||||
|
// Incoming buffer isn't needed to handle request, so return 0
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
rpc_msg inmsg = { .xid = rmsg.rdma_xid };
|
||||||
|
if (!xdr_rpc_msg_body(xdrs, &inmsg.body) || inmsg.body.dir != RPC_CALL)
|
||||||
|
{
|
||||||
|
// Invalid message, ignore it
|
||||||
|
xdr_reset(xdrs);
|
||||||
|
ctx->proxy->xdr_pool.push_back(xdrs);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
rpc_op_t *rop = client->create_rpc_op(xdrs, &inmsg, &rmsg);
|
||||||
|
if (!rop)
|
||||||
|
{
|
||||||
|
// No such procedure
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
rop->buffer = (uint8_t*)buf;
|
||||||
|
if (inmsg.body.cbody.prog == NFS_PROGRAM && (
|
||||||
|
// Check that exactly 1 read chunk is provided for WRITE3 and SYMLINK3
|
||||||
|
(
|
||||||
|
(inmsg.body.cbody.proc == NFS3_WRITE || inmsg.body.cbody.proc == NFS3_SYMLINK) &&
|
||||||
|
(!rmsg.rdma_body.rdma_msg.rdma_reads || rmsg.rdma_body.rdma_msg.rdma_reads->next)
|
||||||
|
) ||
|
||||||
|
// Check that exactly 1 write chunk is provided for READ3 and READLINK3
|
||||||
|
(
|
||||||
|
(inmsg.body.cbody.proc == NFS3_READ || inmsg.body.cbody.proc == NFS3_READLINK) &&
|
||||||
|
(!rop->in_rdma_msg.rdma_body.rdma_msg.rdma_writes ||
|
||||||
|
rop->in_rdma_msg.rdma_body.rdma_msg.rdma_writes->next ||
|
||||||
|
rop->in_rdma_msg.rdma_body.rdma_msg.rdma_writes->entry.target.target_len != 1)
|
||||||
|
)))
|
||||||
|
{
|
||||||
|
rop->rdma_error = ERR_CHUNK;
|
||||||
|
rpc_queue_reply(rop);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
// Read that chunk
|
||||||
|
if (inmsg.body.cbody.prog == NFS_PROGRAM && inmsg.body.cbody.proc == NFS3_WRITE)
|
||||||
|
{
|
||||||
|
auto & rd_chunk = rmsg.rdma_body.rdma_msg.rdma_reads->entry.target;
|
||||||
|
void *buf = rdma_malloc_alloc(ctx->alloc, rd_chunk.length);
|
||||||
|
ibv_sge chunk_sge = {
|
||||||
|
.addr = (uintptr_t)buf,
|
||||||
|
.length = rd_chunk.length,
|
||||||
|
.lkey = rdma_malloc_get_lkey(ctx->alloc, buf),
|
||||||
|
};
|
||||||
|
ibv_send_wr *bad_wr = NULL;
|
||||||
|
ibv_send_wr wr = {
|
||||||
|
.wr_id = 3, // 3 is chunk read
|
||||||
|
.sg_list = &chunk_sge,
|
||||||
|
.num_sge = 1,
|
||||||
|
.opcode = IBV_WR_RDMA_READ,
|
||||||
|
.wr = {
|
||||||
|
.rdma = {
|
||||||
|
.remote_addr = rd_chunk.offset,
|
||||||
|
.rkey = rd_chunk.handle,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
int err = ibv_post_send(id->qp, &wr, &bad_wr);
|
||||||
|
if (err || bad_wr)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "RDMA receive failed: %s\n", strerror(err));
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
cur_send++;
|
||||||
|
xdr_set_rdma_chunk(rop->xdrs, buf);
|
||||||
|
rop->referenced = 1;
|
||||||
|
chunk_inbox.push_back(rop);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return client->handle_rpc_op(rop);
|
||||||
|
}
|
||||||
|
|
||||||
|
void *nfs_client_t::rdma_malloc(size_t size)
|
||||||
|
{
|
||||||
|
return rdma_malloc_alloc(parent->rdma_context->alloc, size);
|
||||||
|
}
|
|
@ -168,7 +168,7 @@ struct WRITE3args {
|
||||||
offset3 offset;
|
offset3 offset;
|
||||||
count3 count;
|
count3 count;
|
||||||
stable_how stable;
|
stable_how stable;
|
||||||
opaque data<>;
|
opaque data<>; /* RDMA DDP-eligible */
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef opaque writeverf3[NFS3_WRITEVERFSIZE];
|
typedef opaque writeverf3[NFS3_WRITEVERFSIZE];
|
||||||
|
@ -409,7 +409,7 @@ struct READ3resok {
|
||||||
post_op_attr file_attributes;
|
post_op_attr file_attributes;
|
||||||
count3 count;
|
count3 count;
|
||||||
bool eof;
|
bool eof;
|
||||||
opaque data<>;
|
opaque data<>; /* RDMA DDP-eligible */
|
||||||
};
|
};
|
||||||
|
|
||||||
struct READ3resfail {
|
struct READ3resfail {
|
||||||
|
@ -514,7 +514,7 @@ typedef string nfspath3<>;
|
||||||
|
|
||||||
struct symlinkdata3 {
|
struct symlinkdata3 {
|
||||||
sattr3 symlink_attributes;
|
sattr3 symlink_attributes;
|
||||||
nfspath3 symlink_data;
|
nfspath3 symlink_data; /* RDMA DDP-eligible */
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SYMLINK3args {
|
struct SYMLINK3args {
|
||||||
|
@ -546,7 +546,7 @@ struct READLINK3args {
|
||||||
|
|
||||||
struct READLINK3resok {
|
struct READLINK3resok {
|
||||||
post_op_attr symlink_attributes;
|
post_op_attr symlink_attributes;
|
||||||
nfspath3 data;
|
nfspath3 data; /* RDMA DDP-eligible */
|
||||||
};
|
};
|
||||||
|
|
||||||
struct READLINK3resfail {
|
struct READLINK3resfail {
|
||||||
|
|
|
@ -272,7 +272,7 @@ xdr_WRITE3args (XDR *xdrs, WRITE3args *objp)
|
||||||
return FALSE;
|
return FALSE;
|
||||||
if (!xdr_stable_how (xdrs, &objp->stable))
|
if (!xdr_stable_how (xdrs, &objp->stable))
|
||||||
return FALSE;
|
return FALSE;
|
||||||
if (!xdr_bytes(xdrs, &objp->data, ~0))
|
if (!xdr_bytes(xdrs, &objp->data, ~0, true))
|
||||||
return FALSE;
|
return FALSE;
|
||||||
return TRUE;
|
return TRUE;
|
||||||
}
|
}
|
||||||
|
@ -829,7 +829,7 @@ xdr_READ3resok (XDR *xdrs, READ3resok *objp)
|
||||||
return FALSE;
|
return FALSE;
|
||||||
if (!xdr_bool (xdrs, &objp->eof))
|
if (!xdr_bool (xdrs, &objp->eof))
|
||||||
return FALSE;
|
return FALSE;
|
||||||
if (!xdr_bytes(xdrs, &objp->data, ~0))
|
if (!xdr_bytes(xdrs, &objp->data, ~0, true))
|
||||||
return FALSE;
|
return FALSE;
|
||||||
return TRUE;
|
return TRUE;
|
||||||
}
|
}
|
||||||
|
@ -1173,10 +1173,10 @@ xdr_PATHCONF3res (XDR *xdrs, PATHCONF3res *objp)
|
||||||
}
|
}
|
||||||
|
|
||||||
bool_t
|
bool_t
|
||||||
xdr_nfspath3 (XDR *xdrs, nfspath3 *objp)
|
xdr_nfspath3 (XDR *xdrs, nfspath3 *objp, bool rdma_chunk)
|
||||||
{
|
{
|
||||||
|
|
||||||
if (!xdr_string (xdrs, objp, ~0))
|
if (!xdr_string (xdrs, objp, ~0, rdma_chunk))
|
||||||
return FALSE;
|
return FALSE;
|
||||||
return TRUE;
|
return TRUE;
|
||||||
}
|
}
|
||||||
|
@ -1187,7 +1187,7 @@ xdr_symlinkdata3 (XDR *xdrs, symlinkdata3 *objp)
|
||||||
|
|
||||||
if (!xdr_sattr3 (xdrs, &objp->symlink_attributes))
|
if (!xdr_sattr3 (xdrs, &objp->symlink_attributes))
|
||||||
return FALSE;
|
return FALSE;
|
||||||
if (!xdr_nfspath3 (xdrs, &objp->symlink_data))
|
if (!xdr_nfspath3 (xdrs, &objp->symlink_data, true))
|
||||||
return FALSE;
|
return FALSE;
|
||||||
return TRUE;
|
return TRUE;
|
||||||
}
|
}
|
||||||
|
@ -1259,7 +1259,7 @@ xdr_READLINK3resok (XDR *xdrs, READLINK3resok *objp)
|
||||||
|
|
||||||
if (!xdr_post_op_attr (xdrs, &objp->symlink_attributes))
|
if (!xdr_post_op_attr (xdrs, &objp->symlink_attributes))
|
||||||
return FALSE;
|
return FALSE;
|
||||||
if (!xdr_nfspath3 (xdrs, &objp->data))
|
if (!xdr_nfspath3 (xdrs, &objp->data, true))
|
||||||
return FALSE;
|
return FALSE;
|
||||||
return TRUE;
|
return TRUE;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "rpc.h"
|
#include "rpc.h"
|
||||||
|
#include "rpc_rdma.h"
|
||||||
|
|
||||||
struct rpc_op_t;
|
struct rpc_op_t;
|
||||||
|
|
||||||
|
@ -27,12 +28,15 @@ inline bool operator < (const rpc_service_proc_t & a, const rpc_service_proc_t &
|
||||||
return a.prog < b.prog || a.prog == b.prog && (a.vers < b.vers || a.vers == b.vers && a.proc < b.proc);
|
return a.prog < b.prog || a.prog == b.prog && (a.vers < b.vers || a.vers == b.vers && a.proc < b.proc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct rdma_msg;
|
||||||
|
|
||||||
struct rpc_op_t
|
struct rpc_op_t
|
||||||
{
|
{
|
||||||
void *client;
|
void *client;
|
||||||
uint8_t *buffer;
|
uint8_t *buffer;
|
||||||
XDR *xdrs;
|
XDR *xdrs;
|
||||||
rpc_msg in_msg, out_msg;
|
rpc_msg in_msg, out_msg;
|
||||||
|
rdma_msg in_rdma_msg;
|
||||||
void *request;
|
void *request;
|
||||||
void *reply;
|
void *reply;
|
||||||
xdrproc_t reply_fn;
|
xdrproc_t reply_fn;
|
||||||
|
|
|
@ -0,0 +1,144 @@
|
||||||
|
/*
|
||||||
|
* Please do not edit this file.
|
||||||
|
* It was generated using rpcgen.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _RPC_RDMA_H_RPCGEN
|
||||||
|
#define _RPC_RDMA_H_RPCGEN
|
||||||
|
|
||||||
|
#include "xdr_impl.h"
|
||||||
|
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
struct xdr_rdma_segment {
|
||||||
|
uint32_t handle;
|
||||||
|
uint32_t length;
|
||||||
|
uint64_t offset;
|
||||||
|
};
|
||||||
|
typedef struct xdr_rdma_segment xdr_rdma_segment;
|
||||||
|
|
||||||
|
struct xdr_read_chunk {
|
||||||
|
uint32_t position;
|
||||||
|
struct xdr_rdma_segment target;
|
||||||
|
};
|
||||||
|
typedef struct xdr_read_chunk xdr_read_chunk;
|
||||||
|
|
||||||
|
struct xdr_read_list {
|
||||||
|
struct xdr_read_chunk entry;
|
||||||
|
struct xdr_read_list *next;
|
||||||
|
};
|
||||||
|
typedef struct xdr_read_list xdr_read_list;
|
||||||
|
|
||||||
|
struct xdr_write_chunk {
|
||||||
|
struct {
|
||||||
|
u_int target_len;
|
||||||
|
struct xdr_rdma_segment *target_val;
|
||||||
|
} target;
|
||||||
|
};
|
||||||
|
typedef struct xdr_write_chunk xdr_write_chunk;
|
||||||
|
|
||||||
|
struct xdr_write_list {
|
||||||
|
struct xdr_write_chunk entry;
|
||||||
|
struct xdr_write_list *next;
|
||||||
|
};
|
||||||
|
typedef struct xdr_write_list xdr_write_list;
|
||||||
|
|
||||||
|
struct rpc_rdma_header {
|
||||||
|
struct xdr_read_list *rdma_reads;
|
||||||
|
struct xdr_write_list *rdma_writes;
|
||||||
|
struct xdr_write_chunk *rdma_reply;
|
||||||
|
};
|
||||||
|
typedef struct rpc_rdma_header rpc_rdma_header;
|
||||||
|
|
||||||
|
struct rpc_rdma_header_nomsg {
|
||||||
|
struct xdr_read_list *rdma_reads;
|
||||||
|
struct xdr_write_list *rdma_writes;
|
||||||
|
struct xdr_write_chunk *rdma_reply;
|
||||||
|
};
|
||||||
|
typedef struct rpc_rdma_header_nomsg rpc_rdma_header_nomsg;
|
||||||
|
|
||||||
|
struct rpc_rdma_header_padded {
|
||||||
|
uint32_t rdma_align;
|
||||||
|
uint32_t rdma_thresh;
|
||||||
|
struct xdr_read_list *rdma_reads;
|
||||||
|
struct xdr_write_list *rdma_writes;
|
||||||
|
struct xdr_write_chunk *rdma_reply;
|
||||||
|
};
|
||||||
|
typedef struct rpc_rdma_header_padded rpc_rdma_header_padded;
|
||||||
|
|
||||||
|
enum rpc_rdma_errcode {
|
||||||
|
ERR_VERS = 1,
|
||||||
|
ERR_CHUNK = 2,
|
||||||
|
};
|
||||||
|
typedef enum rpc_rdma_errcode rpc_rdma_errcode;
|
||||||
|
|
||||||
|
struct rpc_rdma_errvers {
|
||||||
|
uint32_t rdma_vers_low;
|
||||||
|
uint32_t rdma_vers_high;
|
||||||
|
};
|
||||||
|
typedef struct rpc_rdma_errvers rpc_rdma_errvers;
|
||||||
|
|
||||||
|
struct rpc_rdma_error {
|
||||||
|
rpc_rdma_errcode err;
|
||||||
|
union {
|
||||||
|
rpc_rdma_errvers range;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
typedef struct rpc_rdma_error rpc_rdma_error;
|
||||||
|
|
||||||
|
enum rdma_proc {
|
||||||
|
RDMA_MSG = 0,
|
||||||
|
RDMA_NOMSG = 1,
|
||||||
|
RDMA_MSGP = 2,
|
||||||
|
RDMA_DONE = 3,
|
||||||
|
RDMA_ERROR = 4,
|
||||||
|
};
|
||||||
|
typedef enum rdma_proc rdma_proc;
|
||||||
|
|
||||||
|
struct rdma_body {
|
||||||
|
rdma_proc proc;
|
||||||
|
union {
|
||||||
|
rpc_rdma_header rdma_msg;
|
||||||
|
rpc_rdma_header_nomsg rdma_nomsg;
|
||||||
|
rpc_rdma_header_padded rdma_msgp;
|
||||||
|
rpc_rdma_error rdma_error;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
typedef struct rdma_body rdma_body;
|
||||||
|
|
||||||
|
struct rdma_msg {
|
||||||
|
uint32_t rdma_xid;
|
||||||
|
uint32_t rdma_vers;
|
||||||
|
uint32_t rdma_credit;
|
||||||
|
rdma_body rdma_body;
|
||||||
|
};
|
||||||
|
typedef struct rdma_msg rdma_msg;
|
||||||
|
|
||||||
|
/* the xdr functions */
|
||||||
|
|
||||||
|
|
||||||
|
extern bool_t xdr_xdr_rdma_segment (XDR *, xdr_rdma_segment*);
|
||||||
|
extern bool_t xdr_xdr_read_chunk (XDR *, xdr_read_chunk*);
|
||||||
|
extern bool_t xdr_xdr_read_list (XDR *, xdr_read_list*);
|
||||||
|
extern bool_t xdr_xdr_write_chunk (XDR *, xdr_write_chunk*);
|
||||||
|
extern bool_t xdr_xdr_write_list (XDR *, xdr_write_list*);
|
||||||
|
extern bool_t xdr_rpc_rdma_header (XDR *, rpc_rdma_header*);
|
||||||
|
extern bool_t xdr_rpc_rdma_header_nomsg (XDR *, rpc_rdma_header_nomsg*);
|
||||||
|
extern bool_t xdr_rpc_rdma_header_padded (XDR *, rpc_rdma_header_padded*);
|
||||||
|
extern bool_t xdr_rpc_rdma_errcode (XDR *, rpc_rdma_errcode*);
|
||||||
|
extern bool_t xdr_rpc_rdma_errvers (XDR *, rpc_rdma_errvers*);
|
||||||
|
extern bool_t xdr_rpc_rdma_error (XDR *, rpc_rdma_error*);
|
||||||
|
extern bool_t xdr_rdma_proc (XDR *, rdma_proc*);
|
||||||
|
extern bool_t xdr_rdma_body (XDR *, rdma_body*);
|
||||||
|
extern bool_t xdr_rdma_msg (XDR *, rdma_msg*);
|
||||||
|
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /* !_RPC_RDMA_H_RPCGEN */
|
|
@ -0,0 +1,166 @@
|
||||||
|
/* RFC 8166 - Remote Direct Memory Access Transport for Remote Procedure Call Version 1 */
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2010-2017 IETF Trust and the persons
|
||||||
|
* identified as authors of the code. All rights reserved.
|
||||||
|
*
|
||||||
|
* The authors of the code are:
|
||||||
|
* B. Callaghan, T. Talpey, and C. Lever
|
||||||
|
*
|
||||||
|
* Redistribution and use in source and binary forms, with
|
||||||
|
* or without modification, are permitted provided that the
|
||||||
|
* following conditions are met:
|
||||||
|
*
|
||||||
|
* - Redistributions of source code must retain the above
|
||||||
|
* copyright notice, this list of conditions and the
|
||||||
|
* following disclaimer.
|
||||||
|
*
|
||||||
|
* - Redistributions in binary form must reproduce the above
|
||||||
|
* copyright notice, this list of conditions and the
|
||||||
|
* following disclaimer in the documentation and/or other
|
||||||
|
* materials provided with the distribution.
|
||||||
|
*
|
||||||
|
* - Neither the name of Internet Society, IETF or IETF
|
||||||
|
* Trust, nor the names of specific contributors, may be
|
||||||
|
* used to endorse or promote products derived from this
|
||||||
|
* software without specific prior written permission.
|
||||||
|
*
|
||||||
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS
|
||||||
|
* AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
|
||||||
|
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||||
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
|
||||||
|
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
|
||||||
|
* EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
||||||
|
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
|
||||||
|
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
|
||||||
|
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||||
|
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||||
|
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
|
||||||
|
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||||
|
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
|
||||||
|
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
|
||||||
|
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Plain RDMA segment (Section 3.4.3)
|
||||||
|
*/
|
||||||
|
struct xdr_rdma_segment {
|
||||||
|
uint32_t handle; /* Registered memory handle */
|
||||||
|
uint32_t length; /* Length of the chunk in bytes */
|
||||||
|
uint64_t offset; /* Chunk virtual address or offset */
|
||||||
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RDMA read segment (Section 3.4.5)
|
||||||
|
*/
|
||||||
|
struct xdr_read_chunk {
|
||||||
|
uint32_t position; /* Position in XDR stream */
|
||||||
|
struct xdr_rdma_segment target;
|
||||||
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Read list (Section 4.3.1)
|
||||||
|
*/
|
||||||
|
struct xdr_read_list {
|
||||||
|
struct xdr_read_chunk entry;
|
||||||
|
struct xdr_read_list *next;
|
||||||
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Write chunk (Section 3.4.6)
|
||||||
|
*/
|
||||||
|
struct xdr_write_chunk {
|
||||||
|
struct xdr_rdma_segment target<>;
|
||||||
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Write list (Section 4.3.2)
|
||||||
|
*/
|
||||||
|
struct xdr_write_list {
|
||||||
|
struct xdr_write_chunk entry;
|
||||||
|
struct xdr_write_list *next;
|
||||||
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Chunk lists (Section 4.3)
|
||||||
|
*/
|
||||||
|
struct rpc_rdma_header {
|
||||||
|
struct xdr_read_list *rdma_reads;
|
||||||
|
struct xdr_write_list *rdma_writes;
|
||||||
|
struct xdr_write_chunk *rdma_reply;
|
||||||
|
/* rpc body follows */
|
||||||
|
};
|
||||||
|
|
||||||
|
struct rpc_rdma_header_nomsg {
|
||||||
|
struct xdr_read_list *rdma_reads;
|
||||||
|
struct xdr_write_list *rdma_writes;
|
||||||
|
struct xdr_write_chunk *rdma_reply;
|
||||||
|
};
|
||||||
|
|
||||||
|
/* Not to be used */
|
||||||
|
struct rpc_rdma_header_padded {
|
||||||
|
uint32_t rdma_align;
|
||||||
|
uint32_t rdma_thresh;
|
||||||
|
struct xdr_read_list *rdma_reads;
|
||||||
|
struct xdr_write_list *rdma_writes;
|
||||||
|
struct xdr_write_chunk *rdma_reply;
|
||||||
|
/* rpc body follows */
|
||||||
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Error handling (Section 4.5)
|
||||||
|
*/
|
||||||
|
enum rpc_rdma_errcode {
|
||||||
|
ERR_VERS = 1, /* Value fixed for all versions */
|
||||||
|
ERR_CHUNK = 2
|
||||||
|
};
|
||||||
|
|
||||||
|
/* Structure fixed for all versions */
|
||||||
|
struct rpc_rdma_errvers {
|
||||||
|
uint32_t rdma_vers_low;
|
||||||
|
uint32_t rdma_vers_high;
|
||||||
|
};
|
||||||
|
|
||||||
|
union rpc_rdma_error switch (rpc_rdma_errcode err) {
|
||||||
|
case ERR_VERS:
|
||||||
|
rpc_rdma_errvers range;
|
||||||
|
case ERR_CHUNK:
|
||||||
|
void;
|
||||||
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Procedures (Section 4.2.4)
|
||||||
|
*/
|
||||||
|
enum rdma_proc {
|
||||||
|
RDMA_MSG = 0, /* Value fixed for all versions */
|
||||||
|
RDMA_NOMSG = 1, /* Value fixed for all versions */
|
||||||
|
RDMA_MSGP = 2, /* Not to be used */
|
||||||
|
RDMA_DONE = 3, /* Not to be used */
|
||||||
|
RDMA_ERROR = 4 /* Value fixed for all versions */
|
||||||
|
};
|
||||||
|
|
||||||
|
/* The position of the proc discriminator field is
|
||||||
|
* fixed for all versions */
|
||||||
|
union rdma_body switch (rdma_proc proc) {
|
||||||
|
case RDMA_MSG:
|
||||||
|
rpc_rdma_header rdma_msg;
|
||||||
|
case RDMA_NOMSG:
|
||||||
|
rpc_rdma_header_nomsg rdma_nomsg;
|
||||||
|
case RDMA_MSGP: /* Not to be used */
|
||||||
|
rpc_rdma_header_padded rdma_msgp;
|
||||||
|
case RDMA_DONE: /* Not to be used */
|
||||||
|
void;
|
||||||
|
case RDMA_ERROR:
|
||||||
|
rpc_rdma_error rdma_error;
|
||||||
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Fixed header fields (Section 4.2)
|
||||||
|
*/
|
||||||
|
struct rdma_msg {
|
||||||
|
uint32_t rdma_xid; /* Position fixed for all versions */
|
||||||
|
uint32_t rdma_vers; /* Position fixed for all versions */
|
||||||
|
uint32_t rdma_credit; /* Position fixed for all versions */
|
||||||
|
rdma_body rdma_body;
|
||||||
|
};
|
|
@ -0,0 +1,200 @@
|
||||||
|
/*
|
||||||
|
* Please do not edit this file.
|
||||||
|
* It was generated using rpcgen.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "rpc_rdma.h"
|
||||||
|
#include "xdr_impl_inline.h"
|
||||||
|
|
||||||
|
bool_t
|
||||||
|
xdr_xdr_rdma_segment (XDR *xdrs, xdr_rdma_segment *objp)
|
||||||
|
{
|
||||||
|
|
||||||
|
if (!xdr_uint32_t (xdrs, &objp->handle))
|
||||||
|
return FALSE;
|
||||||
|
if (!xdr_uint32_t (xdrs, &objp->length))
|
||||||
|
return FALSE;
|
||||||
|
if (!xdr_uint64_t (xdrs, &objp->offset))
|
||||||
|
return FALSE;
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool_t
|
||||||
|
xdr_xdr_read_chunk (XDR *xdrs, xdr_read_chunk *objp)
|
||||||
|
{
|
||||||
|
|
||||||
|
if (!xdr_uint32_t (xdrs, &objp->position))
|
||||||
|
return FALSE;
|
||||||
|
if (!xdr_xdr_rdma_segment (xdrs, &objp->target))
|
||||||
|
return FALSE;
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool_t
|
||||||
|
xdr_xdr_read_list (XDR *xdrs, xdr_read_list *objp)
|
||||||
|
{
|
||||||
|
|
||||||
|
if (!xdr_xdr_read_chunk (xdrs, &objp->entry))
|
||||||
|
return FALSE;
|
||||||
|
if (!xdr_pointer (xdrs, (char **)&objp->next, sizeof (xdr_read_list), (xdrproc_t) xdr_xdr_read_list))
|
||||||
|
return FALSE;
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool_t
|
||||||
|
xdr_xdr_write_chunk (XDR *xdrs, xdr_write_chunk *objp)
|
||||||
|
{
|
||||||
|
|
||||||
|
if (!xdr_array (xdrs, (char **)&objp->target.target_val, (u_int *) &objp->target.target_len, ~0,
|
||||||
|
sizeof (xdr_rdma_segment), (xdrproc_t) xdr_xdr_rdma_segment))
|
||||||
|
return FALSE;
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool_t
|
||||||
|
xdr_xdr_write_list (XDR *xdrs, xdr_write_list *objp)
|
||||||
|
{
|
||||||
|
|
||||||
|
if (!xdr_xdr_write_chunk (xdrs, &objp->entry))
|
||||||
|
return FALSE;
|
||||||
|
if (!xdr_pointer (xdrs, (char **)&objp->next, sizeof (xdr_write_list), (xdrproc_t) xdr_xdr_write_list))
|
||||||
|
return FALSE;
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool_t
|
||||||
|
xdr_rpc_rdma_header (XDR *xdrs, rpc_rdma_header *objp)
|
||||||
|
{
|
||||||
|
|
||||||
|
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reads, sizeof (xdr_read_list), (xdrproc_t) xdr_xdr_read_list))
|
||||||
|
return FALSE;
|
||||||
|
if (!xdr_pointer (xdrs, (char **)&objp->rdma_writes, sizeof (xdr_write_list), (xdrproc_t) xdr_xdr_write_list))
|
||||||
|
return FALSE;
|
||||||
|
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reply, sizeof (xdr_write_chunk), (xdrproc_t) xdr_xdr_write_chunk))
|
||||||
|
return FALSE;
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool_t
|
||||||
|
xdr_rpc_rdma_header_nomsg (XDR *xdrs, rpc_rdma_header_nomsg *objp)
|
||||||
|
{
|
||||||
|
|
||||||
|
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reads, sizeof (xdr_read_list), (xdrproc_t) xdr_xdr_read_list))
|
||||||
|
return FALSE;
|
||||||
|
if (!xdr_pointer (xdrs, (char **)&objp->rdma_writes, sizeof (xdr_write_list), (xdrproc_t) xdr_xdr_write_list))
|
||||||
|
return FALSE;
|
||||||
|
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reply, sizeof (xdr_write_chunk), (xdrproc_t) xdr_xdr_write_chunk))
|
||||||
|
return FALSE;
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool_t
|
||||||
|
xdr_rpc_rdma_header_padded (XDR *xdrs, rpc_rdma_header_padded *objp)
|
||||||
|
{
|
||||||
|
|
||||||
|
if (!xdr_uint32_t (xdrs, &objp->rdma_align))
|
||||||
|
return FALSE;
|
||||||
|
if (!xdr_uint32_t (xdrs, &objp->rdma_thresh))
|
||||||
|
return FALSE;
|
||||||
|
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reads, sizeof (xdr_read_list), (xdrproc_t) xdr_xdr_read_list))
|
||||||
|
return FALSE;
|
||||||
|
if (!xdr_pointer (xdrs, (char **)&objp->rdma_writes, sizeof (xdr_write_list), (xdrproc_t) xdr_xdr_write_list))
|
||||||
|
return FALSE;
|
||||||
|
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reply, sizeof (xdr_write_chunk), (xdrproc_t) xdr_xdr_write_chunk))
|
||||||
|
return FALSE;
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool_t
|
||||||
|
xdr_rpc_rdma_errcode (XDR *xdrs, rpc_rdma_errcode *objp)
|
||||||
|
{
|
||||||
|
|
||||||
|
if (!xdr_enum (xdrs, (enum_t *) objp))
|
||||||
|
return FALSE;
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool_t
|
||||||
|
xdr_rpc_rdma_errvers (XDR *xdrs, rpc_rdma_errvers *objp)
|
||||||
|
{
|
||||||
|
|
||||||
|
if (!xdr_uint32_t (xdrs, &objp->rdma_vers_low))
|
||||||
|
return FALSE;
|
||||||
|
if (!xdr_uint32_t (xdrs, &objp->rdma_vers_high))
|
||||||
|
return FALSE;
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool_t
|
||||||
|
xdr_rpc_rdma_error (XDR *xdrs, rpc_rdma_error *objp)
|
||||||
|
{
|
||||||
|
|
||||||
|
if (!xdr_rpc_rdma_errcode (xdrs, &objp->err))
|
||||||
|
return FALSE;
|
||||||
|
switch (objp->err) {
|
||||||
|
case ERR_VERS:
|
||||||
|
if (!xdr_rpc_rdma_errvers (xdrs, &objp->range))
|
||||||
|
return FALSE;
|
||||||
|
break;
|
||||||
|
case ERR_CHUNK:
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
return FALSE;
|
||||||
|
}
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool_t
|
||||||
|
xdr_rdma_proc (XDR *xdrs, rdma_proc *objp)
|
||||||
|
{
|
||||||
|
|
||||||
|
if (!xdr_enum (xdrs, (enum_t *) objp))
|
||||||
|
return FALSE;
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool_t
|
||||||
|
xdr_rdma_body (XDR *xdrs, rdma_body *objp)
|
||||||
|
{
|
||||||
|
|
||||||
|
if (!xdr_rdma_proc (xdrs, &objp->proc))
|
||||||
|
return FALSE;
|
||||||
|
switch (objp->proc) {
|
||||||
|
case RDMA_MSG:
|
||||||
|
if (!xdr_rpc_rdma_header (xdrs, &objp->rdma_msg))
|
||||||
|
return FALSE;
|
||||||
|
break;
|
||||||
|
case RDMA_NOMSG:
|
||||||
|
if (!xdr_rpc_rdma_header_nomsg (xdrs, &objp->rdma_nomsg))
|
||||||
|
return FALSE;
|
||||||
|
break;
|
||||||
|
case RDMA_MSGP:
|
||||||
|
if (!xdr_rpc_rdma_header_padded (xdrs, &objp->rdma_msgp))
|
||||||
|
return FALSE;
|
||||||
|
break;
|
||||||
|
case RDMA_DONE:
|
||||||
|
break;
|
||||||
|
case RDMA_ERROR:
|
||||||
|
if (!xdr_rpc_rdma_error (xdrs, &objp->rdma_error))
|
||||||
|
return FALSE;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
return FALSE;
|
||||||
|
}
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool_t
|
||||||
|
xdr_rdma_msg (XDR *xdrs, rdma_msg *objp)
|
||||||
|
{
|
||||||
|
|
||||||
|
if (!xdr_uint32_t (xdrs, &objp->rdma_xid))
|
||||||
|
return FALSE;
|
||||||
|
if (!xdr_uint32_t (xdrs, &objp->rdma_vers))
|
||||||
|
return FALSE;
|
||||||
|
if (!xdr_uint32_t (xdrs, &objp->rdma_credit))
|
||||||
|
return FALSE;
|
||||||
|
if (!xdr_rdma_body (xdrs, &objp->rdma_body))
|
||||||
|
return FALSE;
|
||||||
|
return TRUE;
|
||||||
|
}
|
|
@ -46,3 +46,4 @@ run_rpcgen() {
|
||||||
run_rpcgen nfs
|
run_rpcgen nfs
|
||||||
run_rpcgen rpc
|
run_rpcgen rpc
|
||||||
run_rpcgen portmap
|
run_rpcgen portmap
|
||||||
|
run_rpcgen rpc_rdma
|
||||||
|
|
|
@ -16,6 +16,22 @@ void xdr_destroy(XDR* xdrs)
|
||||||
delete xdrs;
|
delete xdrs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void xdr_set_rdma(XDR *xdrs)
|
||||||
|
{
|
||||||
|
xdrs->rdma = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void xdr_set_rdma_chunk(XDR *xdrs, void *chunk)
|
||||||
|
{
|
||||||
|
assert(!xdrs->rdma_chunk || !chunk);
|
||||||
|
xdrs->rdma_chunk = chunk;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* xdr_get_rdma_chunk(XDR *xdrs)
|
||||||
|
{
|
||||||
|
return xdrs->rdma_chunk;
|
||||||
|
}
|
||||||
|
|
||||||
void xdr_reset(XDR *xdrs)
|
void xdr_reset(XDR *xdrs)
|
||||||
{
|
{
|
||||||
for (auto buf: xdrs->allocs)
|
for (auto buf: xdrs->allocs)
|
||||||
|
@ -23,6 +39,9 @@ void xdr_reset(XDR *xdrs)
|
||||||
free(buf);
|
free(buf);
|
||||||
}
|
}
|
||||||
xdrs->buf = NULL;
|
xdrs->buf = NULL;
|
||||||
|
xdrs->rdma = false;
|
||||||
|
xdrs->rdma_chunk = NULL;
|
||||||
|
xdrs->rdma_chunk_used = false;
|
||||||
xdrs->avail = 0;
|
xdrs->avail = 0;
|
||||||
xdrs->allocs.resize(0);
|
xdrs->allocs.resize(0);
|
||||||
xdrs->in_linked_list.resize(0);
|
xdrs->in_linked_list.resize(0);
|
||||||
|
|
|
@ -55,6 +55,15 @@ void xdr_destroy(XDR* xdrs);
|
||||||
// Free resources from any previous xdr_decode/xdr_encode calls
|
// Free resources from any previous xdr_decode/xdr_encode calls
|
||||||
void xdr_reset(XDR *xdrs);
|
void xdr_reset(XDR *xdrs);
|
||||||
|
|
||||||
|
// Mark XDR as used for RDMA
|
||||||
|
void xdr_set_rdma(XDR *xdrs);
|
||||||
|
|
||||||
|
// Set (single) RDMA chunk buffer for this xdr before decoding an RDMA message
|
||||||
|
void xdr_set_rdma_chunk(XDR *xdrs, void *chunk);
|
||||||
|
|
||||||
|
// Get the current RDMA chunk buffer
|
||||||
|
void* xdr_get_rdma_chunk(XDR *xdrs);
|
||||||
|
|
||||||
// Try to decode <size> bytes from buffer <buf> using <fn>
|
// Try to decode <size> bytes from buffer <buf> using <fn>
|
||||||
// Result may contain memory allocations that will be valid until the next call to xdr_{reset,destroy,decode,encode}
|
// Result may contain memory allocations that will be valid until the next call to xdr_{reset,destroy,decode,encode}
|
||||||
int xdr_decode(XDR *xdrs, void *buf, unsigned size, xdrproc_t fn, void *data);
|
int xdr_decode(XDR *xdrs, void *buf, unsigned size, xdrproc_t fn, void *data);
|
||||||
|
|
|
@ -28,6 +28,19 @@
|
||||||
// RPC over TCP:
|
// RPC over TCP:
|
||||||
//
|
//
|
||||||
// BE 32bit length, then rpc_msg, then the procedure message itself
|
// BE 32bit length, then rpc_msg, then the procedure message itself
|
||||||
|
//
|
||||||
|
// RPC over RDMA:
|
||||||
|
// RFC 8166 - Remote Direct Memory Access Transport for Remote Procedure Call Version 1
|
||||||
|
// RFC 8267 - Network File System (NFS) Upper-Layer Binding to RPC-over-RDMA Version 1
|
||||||
|
// RFC 8797 - Remote Direct Memory Access - Connection Manager (RDMA-CM) Private Data for RPC-over-RDMA Version 1
|
||||||
|
// message is received in an RDMA Receive operation
|
||||||
|
// message: list of read chunks, list of write chunks, optional reply write chunk, then actual RPC body if present
|
||||||
|
// read chunk: BE 32bit position, BE 32bit registered memory key, BE 32bit length, BE 64bit offset
|
||||||
|
// write chunk: BE 32bit registered memory key, BE 32bit length, BE 64bit offset
|
||||||
|
// in reality for NFS 3.0: only 1 read chunk in write3 and symlink3, only 1 write chunk in read3 and readlink3
|
||||||
|
// read chunk is read by the server using RDMA Read from the client memory after receiving RPC request
|
||||||
|
// write chunk is pushed by the server using RDMA Write to the client memory before sending RPC reply
|
||||||
|
// connection is established using RDMA-CM at default port 20049
|
||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
@ -35,6 +48,7 @@
|
||||||
|
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <endian.h>
|
#include <endian.h>
|
||||||
|
#include <assert.h>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
#include "malloc_or_die.h"
|
#include "malloc_or_die.h"
|
||||||
|
@ -61,6 +75,9 @@ struct xdr_linked_list_t
|
||||||
struct XDR
|
struct XDR
|
||||||
{
|
{
|
||||||
int x_op;
|
int x_op;
|
||||||
|
bool rdma = false;
|
||||||
|
void *rdma_chunk = NULL;
|
||||||
|
bool rdma_chunk_used = false;
|
||||||
|
|
||||||
// For decoding:
|
// For decoding:
|
||||||
uint8_t *buf = NULL;
|
uint8_t *buf = NULL;
|
||||||
|
@ -106,13 +123,23 @@ inline int xdr_opaque(XDR *xdrs, void *data, uint32_t len)
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
|
inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen, bool rdma_chunk = false)
|
||||||
{
|
{
|
||||||
if (xdrs->x_op == XDR_DECODE)
|
if (xdrs->x_op == XDR_DECODE)
|
||||||
{
|
{
|
||||||
if (xdrs->avail < 4)
|
if (xdrs->avail < 4)
|
||||||
return 0;
|
return 0;
|
||||||
uint32_t len = be32toh(*((uint32_t*)xdrs->buf));
|
uint32_t len = be32toh(*((uint32_t*)xdrs->buf));
|
||||||
|
if (rdma_chunk && xdrs->rdma)
|
||||||
|
{
|
||||||
|
// Take (only a single) RDMA chunk from xdrs->rdma_chunk while decoding
|
||||||
|
assert(xdrs->rdma_chunk);
|
||||||
|
assert(!xdrs->rdma_chunk_used);
|
||||||
|
xdrs->rdma_chunk_used = true;
|
||||||
|
data->data = (char*)xdrs->rdma_chunk;
|
||||||
|
data->size = len;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
uint32_t padded = len_pad4(len);
|
uint32_t padded = len_pad4(len);
|
||||||
if (xdrs->avail < 4+padded)
|
if (xdrs->avail < 4+padded)
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -123,7 +150,8 @@ inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (data->size < XDR_COPY_LENGTH)
|
// Always encode RDMA chunks as separate iovecs
|
||||||
|
if (data->size < XDR_COPY_LENGTH && (!rdma_chunk || !xdrs->rdma))
|
||||||
{
|
{
|
||||||
unsigned old = xdrs->cur_out.size();
|
unsigned old = xdrs->cur_out.size();
|
||||||
xdrs->cur_out.resize(old + 4+data->size);
|
xdrs->cur_out.resize(old + 4+data->size);
|
||||||
|
@ -146,8 +174,9 @@ inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
|
||||||
.iov_len = data->size,
|
.iov_len = data->size,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
if (data->size & 3)
|
if ((data->size & 3) && (!rdma_chunk || !xdrs->rdma))
|
||||||
{
|
{
|
||||||
|
// No padding for RDMA chunks
|
||||||
int pad = 4-(data->size & 3);
|
int pad = 4-(data->size & 3);
|
||||||
unsigned old = xdrs->cur_out.size();
|
unsigned old = xdrs->cur_out.size();
|
||||||
xdrs->cur_out.resize(old+pad);
|
xdrs->cur_out.resize(old+pad);
|
||||||
|
@ -158,9 +187,9 @@ inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
inline int xdr_string(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
|
inline int xdr_string(XDR *xdrs, xdr_string_t *data, uint32_t maxlen, bool rdma_chunk = false)
|
||||||
{
|
{
|
||||||
return xdr_bytes(xdrs, data, maxlen);
|
return xdr_bytes(xdrs, data, maxlen, rdma_chunk);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline int xdr_u_int(XDR *xdrs, void *data)
|
inline int xdr_u_int(XDR *xdrs, void *data)
|
||||||
|
@ -182,6 +211,11 @@ inline int xdr_u_int(XDR *xdrs, void *data)
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline int xdr_uint32_t(XDR *xdrs, void *data)
|
||||||
|
{
|
||||||
|
return xdr_u_int(xdrs, data);
|
||||||
|
}
|
||||||
|
|
||||||
inline int xdr_enum(XDR *xdrs, void *data)
|
inline int xdr_enum(XDR *xdrs, void *data)
|
||||||
{
|
{
|
||||||
return xdr_u_int(xdrs, data);
|
return xdr_u_int(xdrs, data);
|
||||||
|
|
|
@ -0,0 +1,192 @@
|
||||||
|
// Copyright (c) Vitaliy Filippov, 2019+
|
||||||
|
// License: VNPL-1.1 (see README.md for details)
|
||||||
|
//
|
||||||
|
// Simple & stupid RDMA-enabled memory allocator (allocates buffers within ibv_mr's)
|
||||||
|
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <assert.h>
|
||||||
|
#include <map>
|
||||||
|
#include <set>
|
||||||
|
#include "rdma_alloc.h"
|
||||||
|
#include "malloc_or_die.h"
|
||||||
|
|
||||||
|
struct rdma_buf_t
|
||||||
|
{
|
||||||
|
void *buf = NULL;
|
||||||
|
size_t len = 0;
|
||||||
|
ibv_mr *mr = NULL;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct rdma_frag_t
|
||||||
|
{
|
||||||
|
rdma_buf_t *buf = NULL;
|
||||||
|
size_t len = 0;
|
||||||
|
bool is_free = false;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct rdma_allocator_t
|
||||||
|
{
|
||||||
|
size_t rdma_alloc_size = 1048576;
|
||||||
|
size_t rdma_max_unused = 500*1048576;
|
||||||
|
int rdma_access = IBV_ACCESS_LOCAL_WRITE;
|
||||||
|
ibv_pd *pd = NULL;
|
||||||
|
|
||||||
|
std::set<rdma_buf_t*> buffers;
|
||||||
|
std::map<void*, rdma_frag_t> bufferfrags;
|
||||||
|
std::set<std::pair<size_t, void*>> freelist;
|
||||||
|
size_t freebuffers = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
rdma_allocator_t *rdma_malloc_create(ibv_pd *pd, size_t rdma_alloc_size, size_t rdma_max_unused, int rdma_access)
|
||||||
|
{
|
||||||
|
rdma_allocator_t *self = (rdma_allocator_t*)malloc_or_die(sizeof(rdma_allocator_t));
|
||||||
|
self->pd = pd;
|
||||||
|
self->rdma_alloc_size = rdma_alloc_size ? rdma_alloc_size : 1048576;
|
||||||
|
self->rdma_max_unused = rdma_max_unused ? rdma_max_unused : 500*1048576;
|
||||||
|
self->rdma_access = rdma_access;
|
||||||
|
return self;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void rdma_malloc_free_unused_buffers(rdma_allocator_t *self, size_t max_unused, bool force)
|
||||||
|
{
|
||||||
|
auto free_it = self->freelist.end();
|
||||||
|
if (free_it == self->freelist.begin())
|
||||||
|
return;
|
||||||
|
free_it--;
|
||||||
|
do
|
||||||
|
{
|
||||||
|
auto frag_it = self->bufferfrags.find(free_it->second);
|
||||||
|
assert(frag_it != self->bufferfrags.end());
|
||||||
|
if (frag_it->second.len != frag_it->second.buf->len)
|
||||||
|
{
|
||||||
|
if (force)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "BUG: Attempt to destroy RDMA allocator while buffers are not freed yet\n");
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
self->freebuffers -= frag_it->second.buf->len;
|
||||||
|
ibv_dereg_mr(frag_it->second.buf->mr);
|
||||||
|
free(frag_it->second.buf);
|
||||||
|
self->buffers.erase(frag_it->second.buf);
|
||||||
|
self->bufferfrags.erase(frag_it);
|
||||||
|
self->freelist.erase(free_it--);
|
||||||
|
} while (free_it != self->freelist.begin() && self->freebuffers > max_unused);
|
||||||
|
}
|
||||||
|
|
||||||
|
void rdma_malloc_destroy(rdma_allocator_t *self)
|
||||||
|
{
|
||||||
|
rdma_malloc_free_unused_buffers(self, 0, true);
|
||||||
|
assert(!self->freebuffers);
|
||||||
|
assert(!self->buffers.size());
|
||||||
|
assert(!self->bufferfrags.size());
|
||||||
|
assert(!self->freelist.size());
|
||||||
|
free(self);
|
||||||
|
}
|
||||||
|
|
||||||
|
void *rdma_malloc_alloc(rdma_allocator_t *self, size_t size)
|
||||||
|
{
|
||||||
|
auto it = self->freelist.lower_bound(std::pair<size_t, void*>(size, 0));
|
||||||
|
if (it == self->freelist.end())
|
||||||
|
{
|
||||||
|
// round size up to rdma_malloc_size (1 MB)
|
||||||
|
size_t alloc_size = ((size + self->rdma_alloc_size - 1) / self->rdma_alloc_size) * self->rdma_alloc_size;
|
||||||
|
rdma_buf_t *b = (rdma_buf_t*)malloc_or_die(alloc_size + sizeof(rdma_buf_t));
|
||||||
|
b->buf = b+1;
|
||||||
|
b->len = alloc_size;
|
||||||
|
b->mr = ibv_reg_mr(self->pd, b->buf, b->len, self->rdma_access);
|
||||||
|
if (!b->mr)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
self->buffers.insert(b);
|
||||||
|
self->bufferfrags[b->buf] = (rdma_frag_t){ .buf = b, .len = alloc_size, .is_free = true };
|
||||||
|
it = self->freelist.insert(std::pair<size_t, void*>(alloc_size, b->buf)).first;
|
||||||
|
}
|
||||||
|
void *ptr = it->second;
|
||||||
|
auto & frag = self->bufferfrags.at(ptr);
|
||||||
|
assert(frag.len >= size && frag.is_free);
|
||||||
|
if (frag.len == size)
|
||||||
|
{
|
||||||
|
frag.is_free = false;
|
||||||
|
self->freelist.erase(it);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ptr = (uint8_t*)ptr + frag.len - size;
|
||||||
|
frag.len -= size;
|
||||||
|
self->bufferfrags[ptr] = (rdma_frag_t){ .buf = frag.buf, .len = size, .is_free = false };
|
||||||
|
}
|
||||||
|
return ptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
void rdma_malloc_free(rdma_allocator_t *self, void *buf)
|
||||||
|
{
|
||||||
|
auto frag_it = self->bufferfrags.find(buf);
|
||||||
|
if (frag_it == self->bufferfrags.end())
|
||||||
|
{
|
||||||
|
fprintf(stderr, "BUG: Attempt to double-free RDMA buffer fragment 0x%jx\n", (size_t)buf);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
auto prev_it = frag_it, next_it = frag_it;
|
||||||
|
if (frag_it != self->bufferfrags.begin())
|
||||||
|
prev_it--;
|
||||||
|
next_it++;
|
||||||
|
bool merge_back = prev_it != frag_it &&
|
||||||
|
prev_it->second.is_free &&
|
||||||
|
prev_it->second.buf == frag_it->second.buf &&
|
||||||
|
(uint8_t*)prev_it->first+prev_it->second.len == frag_it->first;
|
||||||
|
bool merge_next = next_it != self->bufferfrags.end() &&
|
||||||
|
next_it->second.is_free &&
|
||||||
|
next_it->second.buf == frag_it->second.buf &&
|
||||||
|
next_it->first == (uint8_t*)frag_it->first+frag_it->second.len;
|
||||||
|
if (merge_back && merge_next)
|
||||||
|
{
|
||||||
|
prev_it->second.len += frag_it->second.len + next_it->second.len;
|
||||||
|
self->freelist.erase(std::pair<size_t, void*>(next_it->second.len, next_it->first));
|
||||||
|
self->bufferfrags.erase(next_it);
|
||||||
|
self->bufferfrags.erase(frag_it);
|
||||||
|
frag_it = prev_it;
|
||||||
|
}
|
||||||
|
else if (merge_back)
|
||||||
|
{
|
||||||
|
prev_it->second.len += frag_it->second.len;
|
||||||
|
self->bufferfrags.erase(frag_it);
|
||||||
|
frag_it = prev_it;
|
||||||
|
}
|
||||||
|
else if (merge_next)
|
||||||
|
{
|
||||||
|
frag_it->second.is_free = true;
|
||||||
|
frag_it->second.len += next_it->second.len;
|
||||||
|
self->freelist.erase(std::pair<size_t, void*>(next_it->second.len, next_it->first));
|
||||||
|
self->bufferfrags.erase(next_it);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
frag_it->second.is_free = true;
|
||||||
|
self->freelist.insert(std::pair<size_t, void*>(frag_it->second.len, frag_it->first));
|
||||||
|
}
|
||||||
|
assert(frag_it->second.len <= frag_it->second.buf->len);
|
||||||
|
if (frag_it->second.len == frag_it->second.buf->len)
|
||||||
|
{
|
||||||
|
// The whole buffer is freed
|
||||||
|
self->freebuffers += frag_it->second.buf->len;
|
||||||
|
if (self->freebuffers > self->rdma_max_unused)
|
||||||
|
{
|
||||||
|
rdma_malloc_free_unused_buffers(self, self->rdma_max_unused, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t rdma_malloc_get_lkey(rdma_allocator_t *self, void *buf)
|
||||||
|
{
|
||||||
|
auto frag_it = self->bufferfrags.find(buf);
|
||||||
|
if (frag_it == self->bufferfrags.end())
|
||||||
|
{
|
||||||
|
fprintf(stderr, "BUG: Attempt to use an unknown RDMA buffer fragment 0x%zx\n", (size_t)buf);
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
return frag_it->second.buf->mr->lkey;
|
||||||
|
}
|
|
@ -0,0 +1,17 @@
|
||||||
|
// Copyright (c) Vitaliy Filippov, 2019+
|
||||||
|
// License: VNPL-1.1 (see README.md for details)
|
||||||
|
//
|
||||||
|
// Simple & stupid RDMA-enabled memory allocator (allocates buffers within ibv_mr's)
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <infiniband/verbs.h>
|
||||||
|
#include <stdint.h>
|
||||||
|
|
||||||
|
struct rdma_allocator_t;
|
||||||
|
|
||||||
|
rdma_allocator_t *rdma_malloc_create(ibv_pd *pd, size_t rdma_alloc_size, size_t rdma_max_unused, int rdma_access);
|
||||||
|
void rdma_malloc_destroy(rdma_allocator_t *self);
|
||||||
|
void *rdma_malloc_alloc(rdma_allocator_t *self, size_t size);
|
||||||
|
void rdma_malloc_free(rdma_allocator_t *self, void *buf);
|
||||||
|
uint32_t rdma_malloc_get_lkey(rdma_allocator_t *self, void *buf);
|
Loading…
Reference in New Issue