Compare commits

..

5 Commits

13 changed files with 374 additions and 34 deletions

View File

@ -378,11 +378,11 @@ Examples:
Create a pool. Required parameters:
| <!-- --> | <!-- --> |
|--------------------------|---------------------------------------------------------------------------------------|
| `-s R` or `--pg_size R` | Number of replicas for replicated pools |
| `--ec N+K` | Number of data (N) and parity (K) chunks for erasure-coded pools |
| `-n N` or `--pg_count N` | PG count for the new pool (start with 10*<OSD count>/pg_size rounded to a power of 2) |
| <!-- --> | <!-- --> |
|--------------------------|-----------------------------------------------------------------------------------------|
| `-s R` or `--pg_size R` | Number of replicas for replicated pools |
| `--ec N+K` | Number of data (N) and parity (K) chunks for erasure-coded pools |
| `-n N` or `--pg_count N` | PG count for the new pool (start with 10*\<OSD count\>/pg_size rounded to a power of 2) |
Optional parameters:

View File

@ -395,11 +395,11 @@ OSD PARENT UP SIZE USED% TAGS WEIGHT BLOCK BITMAP
Создать пул. Обязательные параметры:
| <!-- --> | <!-- --> |
|---------------------------|---------------------------------------------------------------------------------------------|
| `-s R` или `--pg_size R` | Число копий данных для реплицированных пулов |
| `--ec N+K` | Число частей данных (N) и чётности (K) для пулов с кодами коррекции ошибок |
| `-n N` или `--pg_count N` | Число PG для нового пула (начните с 10*<число OSD>/pg_size, округлённого до степени двойки) |
| <!-- --> | <!-- --> |
|---------------------------|-----------------------------------------------------------------------------------------------|
| `-s R` или `--pg_size R` | Число копий данных для реплицированных пулов |
| `--ec N+K` | Число частей данных (N) и чётности (K) для пулов с кодами коррекции ошибок |
| `-n N` или `--pg_count N` | Число PG для нового пула (начните с 10*\<число OSD\>/pg_size, округлённого до степени двойки) |
Необязательные параметры:

View File

@ -28,6 +28,7 @@ target_link_libraries(vitastor_client
vitastor_cli
${LIBURING_LIBRARIES}
${IBVERBS_LIBRARIES}
${RDMACM_LIBRARIES}
)
set_target_properties(vitastor_client PROPERTIES VERSION ${VITASTOR_VERSION} SOVERSION 0)
configure_file(vitastor.pc.in vitastor.pc @ONLY)

View File

@ -117,6 +117,26 @@ void msgr_iothread_t::run()
void osd_messenger_t::init()
{
#ifdef WITH_RDMACM
if (use_rdmacm)
{
// RDMA-CM only requires the event channel. All the remaining work is done separately
rdmacm_evch = rdma_create_event_channel();
if (!rdmacm_evch)
{
fprintf(stderr, "Failed to initialize RDMA-CM event channel: %s (code %d)\n", strerror(errno), errno);
}
else
{
fcntl(rdmacm_evch->fd, F_SETFL, fcntl(rdmacm_evch->fd, F_GETFL, 0) | O_NONBLOCK);
tfd->set_fd_handler(rdmacm_evch->fd, false, [this](int rdmacm_eventfd, int epoll_events)
{
handle_rdmacm_events();
});
}
}
else
#endif
#ifdef WITH_RDMA
if (use_rdma)
{
@ -260,6 +280,14 @@ osd_messenger_t::~osd_messenger_t()
}
rdma_contexts.clear();
#endif
#ifdef WITH_RDMACM
if (rdmacm_evch)
{
tfd->set_fd_handler(rdmacm_evch->fd, false, NULL);
rdma_destroy_event_channel(rdmacm_evch);
rdmacm_evch = NULL;
}
#endif
}
void osd_messenger_t::parse_config(const json11::Json & config)
@ -270,6 +298,13 @@ void osd_messenger_t::parse_config(const json11::Json & config)
// RDMA is on by default in RDMA-enabled builds
this->use_rdma = config["use_rdma"].bool_value() || config["use_rdma"].uint64_value() != 0;
}
#ifdef WITH_RDMACM
if (!config["use_rdmacm"].is_null())
{
// Use RDMA CM (required for iWARP and may be useful for IB)
this->use_rdmacm = config["use_rdmacm"].bool_value() || config["use_rdmacm"].uint64_value() != 0;
}
#endif
if (!config["force_rdma"].is_null())
{
this->force_rdma = config["force_rdma"].bool_value() || config["force_rdma"].uint64_value() != 0;
@ -755,6 +790,11 @@ bool osd_messenger_t::is_rdma_enabled()
{
return rdma_contexts.size() > 0;
}
bool osd_messenger_t::is_use_rdmacm()
{
return use_rdmacm;
}
#endif
json11::Json::object osd_messenger_t::read_config(const json11::Json & config)

View File

@ -152,6 +152,12 @@ public:
};
#endif
#ifdef WITH_RDMA
struct rdma_event_channel;
struct rdma_cm_id;
struct rdma_cm_event;
#endif
struct osd_messenger_t
{
protected:
@ -168,6 +174,7 @@ protected:
#ifdef WITH_RDMA
bool use_rdma = true;
bool use_rdmacm = false;
bool force_rdma = false;
std::string rdma_device;
uint64_t rdma_port_num = 1;
@ -177,6 +184,8 @@ protected:
uint64_t rdma_max_sge = 0, rdma_max_send = 0, rdma_max_recv = 0;
uint64_t rdma_max_msg = 0;
bool rdma_odp = false;
rdma_event_channel *rdmacm_evch = NULL;
std::map<rdma_cm_id*, osd_client_t*> rdmacm_connections;
#endif
std::vector<msgr_iothread_t*> iothreads;
@ -226,6 +235,11 @@ public:
bool is_rdma_enabled();
bool connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg);
#endif
#ifdef WITH_RDMACM
bool is_use_rdmacm();
rdma_cm_id *rdmacm_listen(const std::string & bind_address, int rdmacm_port);
void rdmacm_destroy_listener(rdma_cm_id *listener);
#endif
void inc_op_stats(osd_op_stats_t & stats, uint64_t opcode, timespec & tv_begin, timespec & tv_end, uint64_t len);
void measure_exec(osd_op_t *cur_op);
@ -260,4 +274,9 @@ protected:
void handle_rdma_events(msgr_rdma_context_t *rdma_context);
msgr_rdma_context_t* choose_rdma_context(osd_client_t *cl);
#endif
#ifdef WITH_RDMACM
void handle_rdmacm_events();
void rdmacm_accept(rdma_cm_event *ev);
void rdmacm_established(rdma_cm_event *ev);
#endif
};

View File

@ -37,15 +37,22 @@ msgr_rdma_context_t::~msgr_rdma_context_t()
ibv_dereg_mr(mr);
if (pd)
ibv_dealloc_pd(pd);
if (context)
if (context && !is_cm)
ibv_close_device(context);
}
msgr_rdma_connection_t::~msgr_rdma_connection_t()
{
ctx->used_max_cqe -= max_send+max_recv;
if (qp)
ctx->reserve_cqe(-max_send-max_recv);
if (qp && !cmid)
ibv_destroy_qp(qp);
if (cmid)
{
ctx->cm_refs--;
if (cmid->qp)
rdma_destroy_qp(cmid);
rdma_destroy_id(cmid);
}
if (recv_buffers.size())
{
for (auto b: recv_buffers)
@ -356,9 +363,34 @@ cleanup:
return NULL;
}
bool msgr_rdma_context_t::reserve_cqe(int n)
{
this->used_max_cqe += n;
if (this->used_max_cqe > this->max_cqe)
{
// Resize CQ
// Mellanox ConnectX-4 supports up to 4194303 CQEs, so it's fine to put everything into a single CQ
int new_max_cqe = this->max_cqe;
while (this->used_max_cqe > new_max_cqe)
{
new_max_cqe *= 2;
}
if (ibv_resize_cq(this->cq, new_max_cqe) != 0)
{
fprintf(stderr, "Couldn't resize RDMA completion queue to %d entries\n", new_max_cqe);
return false;
}
this->max_cqe = new_max_cqe;
}
return true;
}
msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx, uint32_t max_send,
uint32_t max_recv, uint32_t max_sge, uint32_t max_msg)
{
if (!ctx->reserve_cqe(max_send+max_recv))
return NULL;
msgr_rdma_connection_t *conn = new msgr_rdma_connection_t;
max_sge = max_sge > ctx->attrx.orig_attr.max_sge ? ctx->attrx.orig_attr.max_sge : max_sge;
@ -369,25 +401,6 @@ msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx,
conn->max_sge = max_sge;
conn->max_msg = max_msg;
ctx->used_max_cqe += max_send+max_recv;
if (ctx->used_max_cqe > ctx->max_cqe)
{
// Resize CQ
// Mellanox ConnectX-4 supports up to 4194303 CQEs, so it's fine to put everything into a single CQ
int new_max_cqe = ctx->max_cqe;
while (ctx->used_max_cqe > new_max_cqe)
{
new_max_cqe *= 2;
}
if (ibv_resize_cq(ctx->cq, new_max_cqe) != 0)
{
fprintf(stderr, "Couldn't resize RDMA completion queue to %d entries\n", new_max_cqe);
delete conn;
return NULL;
}
ctx->max_cqe = new_max_cqe;
}
ibv_qp_init_attr init_attr = {
.send_cq = ctx->cq,
.recv_cq = ctx->cq,
@ -833,3 +846,235 @@ void osd_messenger_t::handle_rdma_events(msgr_rdma_context_t *rdma_context)
} while (event_count > 0);
handle_immediate_ops();
}
#ifdef WITH_RDMACM
rdma_cm_id *osd_messenger_t::rdmacm_listen(const std::string & bind_address, int rdmacm_port)
{
rdma_cm_id *listener = NULL;
int r = rdma_create_id(rdmacm_evch, &listener, NULL, RDMA_PS_TCP);
if (r != 0)
{
fprintf(stderr, "Failed to create RDMA-CM ID: %s (code %d)\n", strerror(errno), errno);
goto fail;
}
sockaddr_storage addr;
if (!string_to_addr(bind_address, 0, rdmacm_port, &addr))
{
fprintf(stderr, "Server address: %s is not valid\n", bind_address.c_str());
goto fail;
}
r = rdma_bind_addr(listener, (sockaddr*)&addr);
if (r != 0)
{
fprintf(stderr, "Failed to bind RDMA-CM to %s:%d: %s (code %d)\n", bind_address.c_str(), rdmacm_port, strerror(errno), errno);
goto fail;
}
r = rdma_listen(listener, 128);
if (r != 0)
{
fprintf(stderr, "Failed to listen RDMA-CM: %s (code %d)\n", strerror(errno), errno);
goto fail;
}
return listener;
fail:
rdma_destroy_id(listener);
return NULL;
}
void osd_messenger_t::rdmacm_destroy_listener(rdma_cm_id *listener)
{
rdma_destroy_id(listener);
}
void osd_messenger_t::handle_rdmacm_events()
{
rdma_cm_event *ev = NULL;
std::vector<osd_client_t*> stop_clients;
while (1)
{
int r = rdma_get_cm_event(rdmacm_evch, &ev);
if (r != 0)
{
if (errno == EAGAIN || errno == EINTR)
break;
fprintf(stderr, "Failed to get RDMA-CM event: %s (code %d)\n", strerror(errno), errno);
exit(1);
}
if (ev->event == RDMA_CM_EVENT_CONNECT_REQUEST)
{
rdmacm_accept(ev);
}
else if (ev->event == RDMA_CM_EVENT_CONNECT_ERROR ||
ev->event == RDMA_CM_EVENT_REJECTED ||
ev->event == RDMA_CM_EVENT_DISCONNECTED ||
ev->event == RDMA_CM_EVENT_DEVICE_REMOVAL)
{
auto event_type_name = ev->event == RDMA_CM_EVENT_CONNECT_ERROR ? "RDMA_CM_EVENT_CONNECT_ERROR" : (
ev->event == RDMA_CM_EVENT_REJECTED ? "RDMA_CM_EVENT_REJECTED" : (
ev->event == RDMA_CM_EVENT_DISCONNECTED ? "RDMA_CM_EVENT_DISCONNECTED" : "RDMA_CM_EVENT_DEVICE_REMOVAL"));
auto cli_it = rdmacm_connections.find(ev->id);
if (cli_it == rdmacm_connections.end())
{
fprintf(stderr, "Received %s event for an unknown connection 0x%jx - ignoring\n",
event_type_name, (uint64_t)ev->id);
}
else
{
fprintf(stderr, "Received %s event for connection 0x%jx - closing it\n",
event_type_name, (uint64_t)ev->id);
auto cli = cli_it->second;
stop_clients.push_back(cli);
}
}
else if (ev->event == RDMA_CM_EVENT_ESTABLISHED)
{
rdmacm_established(ev);
}
else if (ev->event == RDMA_CM_EVENT_ADDR_CHANGE || ev->event == RDMA_CM_EVENT_TIMEWAIT_EXIT)
{
// Do nothing
}
else
{
// Other events are unexpected
fprintf(stderr, "Unexpected RDMA-CM event type: %d\n", ev->event);
}
r = rdma_ack_cm_event(ev);
if (r != 0)
{
fprintf(stderr, "Failed to ack (free) RDMA-CM event: %s (code %d)\n", strerror(errno), errno);
exit(1);
}
}
// Stop only after flushing all events, otherwise rdma_destroy_id infinitely waits for pthread_cond
for (auto cli: stop_clients)
{
stop_client(cli->peer_fd);
}
}
msgr_rdma_context_t* msgr_rdma_context_t::create_cm(ibv_context *ctx)
{
auto rdma_context = new msgr_rdma_context_t;
rdma_context->is_cm = true;
rdma_context->context = ctx;
rdma_context->pd = ibv_alloc_pd(ctx);
if (!rdma_context->pd)
{
fprintf(stderr, "Couldn't allocate RDMA protection domain\n");
delete rdma_context;
return NULL;
}
rdma_context->odp = false;
rdma_context->channel = ibv_create_comp_channel(rdma_context->context);
if (!rdma_context->channel)
{
fprintf(stderr, "Couldn't create RDMA completion channel\n");
delete rdma_context;
return NULL;
}
rdma_context->max_cqe = 4096;
rdma_context->cq = ibv_create_cq(rdma_context->context, rdma_context->max_cqe, NULL, rdma_context->channel, 0);
if (!rdma_context->cq)
{
fprintf(stderr, "Couldn't create RDMA completion queue\n");
delete rdma_context;
return NULL;
}
if (ibv_query_device_ex(rdma_context->context, NULL, &rdma_context->attrx))
{
fprintf(stderr, "Couldn't query RDMA device for its features\n");
delete rdma_context;
return NULL;
}
return rdma_context;
}
void osd_messenger_t::rdmacm_accept(rdma_cm_event *ev)
{
// Find the context by device
// We assume that ev->id->verbs is always the same for the same device (but PD for example isn't)
msgr_rdma_context_t *rdma_context = NULL;
for (auto ctx: rdma_contexts)
{
if (ctx->context == ev->id->verbs)
{
rdma_context = ctx;
break;
}
}
if (!rdma_context)
{
// Wrap into a new msgr_rdma_context_t
rdma_context = msgr_rdma_context_t::create_cm(ev->id->verbs);
if (!rdma_context)
{
rdma_destroy_id(ev->id);
return;
}
fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK);
tfd->set_fd_handler(rdma_context->channel->fd, false, [this, rdma_context](int notify_fd, int epoll_events)
{
handle_rdma_events(rdma_context);
});
handle_rdma_events(rdma_context);
rdma_contexts.push_back(rdma_context);
}
rdma_context->reserve_cqe(rdma_max_send+rdma_max_recv);
ibv_qp_init_attr init_attr = {
.send_cq = rdma_context->cq,
.recv_cq = rdma_context->cq,
.cap = {
.max_send_wr = (uint32_t)rdma_max_send,
.max_recv_wr = (uint32_t)rdma_max_recv,
.max_send_sge = (uint32_t)rdma_max_sge,
.max_recv_sge = (uint32_t)rdma_max_sge,
},
.qp_type = IBV_QPT_RC,
};
int r = rdma_create_qp(ev->id, rdma_context->pd, &init_attr);
if (r != 0)
{
fprintf(stderr, "Failed to create a queue pair via RDMA-CM: %s (code %d)\n", strerror(errno), errno);
rdma_context->reserve_cqe(-rdma_max_send-rdma_max_recv);
rdma_destroy_id(ev->id);
return;
}
// We don't need private_data, RDMA_READ or ATOMIC so leave everything at 0
rdma_conn_param conn_params = { .rnr_retry_count = 7 };
r = rdma_accept(ev->id, &conn_params);
if (r != 0)
{
fprintf(stderr, "Failed to accept RDMA-CM connection: %s (code %d)\n", strerror(errno), errno);
rdma_context->reserve_cqe(-rdma_max_send-rdma_max_recv);
rdma_destroy_qp(ev->id);
rdma_destroy_id(ev->id);
return;
}
rdma_context->cm_refs++;
// Wrap into a new msgr_rdma_connection_t
msgr_rdma_connection_t *conn = new msgr_rdma_connection_t;
conn->ctx = rdma_context;
conn->max_send = rdma_max_send;
conn->max_recv = rdma_max_recv;
conn->max_sge = rdma_max_sge > rdma_context->attrx.orig_attr.max_sge
? rdma_context->attrx.orig_attr.max_sge : rdma_max_sge;
conn->max_msg = rdma_max_msg;
conn->cmid = ev->id;
conn->qp = ev->id->qp;
// Make a fake FD (FIXME: do not use FDs for identifying clients!)
int fake_fd = open("/dev/null", O_RDONLY);
auto cli = new osd_client_t();
cli->peer_fd = fake_fd;
cli->peer_state = PEER_RDMA;
cli->in_buf = malloc_or_die(receive_buffer_size);
cli->rdma_conn = conn;
clients[fake_fd] = cli;
rdmacm_connections[ev->id] = cli;
}
void osd_messenger_t::rdmacm_established(rdma_cm_event *ev)
{
}
#endif

View File

@ -2,6 +2,9 @@
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#pragma once
#ifdef WITH_RDMACM
#include <rdma/rdma_cma.h>
#endif
#include <infiniband/verbs.h>
#include <string>
#include <vector>
@ -36,11 +39,15 @@ struct msgr_rdma_context_t
int max_cqe = 0;
int used_max_cqe = 0;
addr_mask_t net_mask = {};
bool is_cm = false;
int cm_refs = 0;
static std::vector<msgr_rdma_context_t*> create_all(const std::vector<addr_mask_t> & osd_network_masks,
const char *sel_dev_name, int sel_port_num, int sel_gid_index, uint32_t sel_mtu, bool odp, int log_level);
static msgr_rdma_context_t *create(ibv_device *dev, ibv_port_attr & portinfo,
int ib_port, int gid_index, uint32_t mtu, bool odp, int log_level);
static msgr_rdma_context_t* create_cm(ibv_context *ctx);
bool reserve_cqe(int n);
~msgr_rdma_context_t();
};
@ -55,6 +62,9 @@ struct msgr_rdma_connection_t
{
msgr_rdma_context_t *ctx = NULL;
ibv_qp *qp = NULL;
#ifdef WITH_RDMACM
rdma_cm_id *cmid = NULL;
#endif
msgr_rdma_address_t addr;
int max_send = 0, max_recv = 0, max_sge = 0;
int cur_send = 0, cur_recv = 0;

View File

@ -47,7 +47,7 @@ struct nfs_rdma_dev_state_t
ibv_comp_channel *channel = NULL;
ibv_cq *cq = NULL;
rdma_allocator_t *alloc = NULL;
int max_cqe = 0, used_max_cqe = 0;
int max_cqe = 0;
static nfs_rdma_dev_state_t *create(rdma_cm_id *cmid, uint64_t rdma_malloc_round_to, uint64_t rdma_max_unused_buffers);
~nfs_rdma_dev_state_t();

View File

@ -14,6 +14,7 @@ target_link_libraries(vitastor-osd
Jerasure
${ISAL_LIBRARIES}
${IBVERBS_LIBRARIES}
${RDMACM_LIBRARIES}
)
# osd_rmw_test

View File

@ -120,6 +120,11 @@ osd_t::~osd_t()
delete epmgr;
if (bs)
delete bs;
#ifdef WITH_RDMACM
for (rdma_cm_id *listener: rdmacm_listeners)
msgr.rdmacm_destroy_listener(listener);
rdmacm_listeners.clear();
#endif
for (auto listen_fd: listen_fds)
close(listen_fd);
listen_fds.clear();
@ -337,7 +342,7 @@ void osd_t::bind_socket()
}
for (auto & bind_address: bind_addresses)
{
int listen_fd = create_and_bind_socket(bind_address, bind_port, listen_backlog, &listening_port);
int listen_fd = create_and_bind_socket(bind_address, listening_port ? listening_port : bind_port, listen_backlog, &listening_port);
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
{
@ -345,6 +350,17 @@ void osd_t::bind_socket()
});
listen_fds.push_back(listen_fd);
}
#ifdef WITH_RDMACM
if (msgr.is_use_rdmacm()) // FIXME: use json_is_true and move here
{
for (auto & bind_address: bind_addresses)
{
auto listener = msgr.rdmacm_listen(bind_address, listening_port);
if (listener)
rdmacm_listeners.push_back(listener);
}
}
#endif
}
bool osd_t::shutdown()

View File

@ -201,6 +201,9 @@ class osd_t
int listening_port = 0;
std::vector<std::string> bind_addresses;
std::vector<int> listen_fds;
#ifdef WITH_RDMACM
std::vector<rdma_cm_id *> rdmacm_listeners;
#endif
ring_consumer_t consumer;
// op statistics

View File

@ -172,6 +172,10 @@ json11::Json osd_t::get_osd_state()
st["host"] = std::string(hostname.data(), hostname.size());
st["version"] = VITASTOR_VERSION;
st["port"] = listening_port;
#ifdef WITH_RDMACM
if (rdmacm_listeners.size())
st["rdmacm"] = true;
#endif
st["primary_enabled"] = run_primary;
st["blockstore_enabled"] = bs ? true : false;
return st;

View File

@ -25,6 +25,7 @@ target_link_libraries(stub_uring_osd
vitastor_common
${LIBURING_LIBRARIES}
${IBVERBS_LIBRARIES}
${RDMACM_LIBRARIES}
tcmalloc_minimal
)