Compare commits

..

6 Commits

13 changed files with 374 additions and 34 deletions

View File

@ -378,11 +378,11 @@ Examples:
Create a pool. Required parameters: Create a pool. Required parameters:
| <!-- --> | <!-- --> | | <!-- --> | <!-- --> |
|--------------------------|---------------------------------------------------------------------------------------| |--------------------------|-----------------------------------------------------------------------------------------|
| `-s R` or `--pg_size R` | Number of replicas for replicated pools | | `-s R` or `--pg_size R` | Number of replicas for replicated pools |
| `--ec N+K` | Number of data (N) and parity (K) chunks for erasure-coded pools | | `--ec N+K` | Number of data (N) and parity (K) chunks for erasure-coded pools |
| `-n N` or `--pg_count N` | PG count for the new pool (start with 10*<OSD count>/pg_size rounded to a power of 2) | | `-n N` or `--pg_count N` | PG count for the new pool (start with 10*\<OSD count\>/pg_size rounded to a power of 2) |
Optional parameters: Optional parameters:

View File

@ -395,11 +395,11 @@ OSD PARENT UP SIZE USED% TAGS WEIGHT BLOCK BITMAP
Создать пул. Обязательные параметры: Создать пул. Обязательные параметры:
| <!-- --> | <!-- --> | | <!-- --> | <!-- --> |
|---------------------------|---------------------------------------------------------------------------------------------| |---------------------------|-----------------------------------------------------------------------------------------------|
| `-s R` или `--pg_size R` | Число копий данных для реплицированных пулов | | `-s R` или `--pg_size R` | Число копий данных для реплицированных пулов |
| `--ec N+K` | Число частей данных (N) и чётности (K) для пулов с кодами коррекции ошибок | | `--ec N+K` | Число частей данных (N) и чётности (K) для пулов с кодами коррекции ошибок |
| `-n N` или `--pg_count N` | Число PG для нового пула (начните с 10*<число OSD>/pg_size, округлённого до степени двойки) | | `-n N` или `--pg_count N` | Число PG для нового пула (начните с 10*\<число OSD\>/pg_size, округлённого до степени двойки) |
Необязательные параметры: Необязательные параметры:

View File

@ -28,6 +28,7 @@ target_link_libraries(vitastor_client
vitastor_cli vitastor_cli
${LIBURING_LIBRARIES} ${LIBURING_LIBRARIES}
${IBVERBS_LIBRARIES} ${IBVERBS_LIBRARIES}
${RDMACM_LIBRARIES}
) )
set_target_properties(vitastor_client PROPERTIES VERSION ${VITASTOR_VERSION} SOVERSION 0) set_target_properties(vitastor_client PROPERTIES VERSION ${VITASTOR_VERSION} SOVERSION 0)
configure_file(vitastor.pc.in vitastor.pc @ONLY) configure_file(vitastor.pc.in vitastor.pc @ONLY)

View File

@ -117,6 +117,26 @@ void msgr_iothread_t::run()
void osd_messenger_t::init() void osd_messenger_t::init()
{ {
#ifdef WITH_RDMACM
if (use_rdmacm)
{
// RDMA-CM only requires the event channel. All the remaining work is done separately
rdmacm_evch = rdma_create_event_channel();
if (!rdmacm_evch)
{
fprintf(stderr, "Failed to initialize RDMA-CM event channel: %s (code %d)\n", strerror(errno), errno);
}
else
{
fcntl(rdmacm_evch->fd, F_SETFL, fcntl(rdmacm_evch->fd, F_GETFL, 0) | O_NONBLOCK);
tfd->set_fd_handler(rdmacm_evch->fd, false, [this](int rdmacm_eventfd, int epoll_events)
{
handle_rdmacm_events();
});
}
}
else
#endif
#ifdef WITH_RDMA #ifdef WITH_RDMA
if (use_rdma) if (use_rdma)
{ {
@ -260,6 +280,14 @@ osd_messenger_t::~osd_messenger_t()
} }
rdma_contexts.clear(); rdma_contexts.clear();
#endif #endif
#ifdef WITH_RDMACM
if (rdmacm_evch)
{
tfd->set_fd_handler(rdmacm_evch->fd, false, NULL);
rdma_destroy_event_channel(rdmacm_evch);
rdmacm_evch = NULL;
}
#endif
} }
void osd_messenger_t::parse_config(const json11::Json & config) void osd_messenger_t::parse_config(const json11::Json & config)
@ -270,6 +298,13 @@ void osd_messenger_t::parse_config(const json11::Json & config)
// RDMA is on by default in RDMA-enabled builds // RDMA is on by default in RDMA-enabled builds
this->use_rdma = config["use_rdma"].bool_value() || config["use_rdma"].uint64_value() != 0; this->use_rdma = config["use_rdma"].bool_value() || config["use_rdma"].uint64_value() != 0;
} }
#ifdef WITH_RDMACM
if (!config["use_rdmacm"].is_null())
{
// Use RDMA CM (required for iWARP and may be useful for IB)
this->use_rdmacm = config["use_rdmacm"].bool_value() || config["use_rdmacm"].uint64_value() != 0;
}
#endif
if (!config["force_rdma"].is_null()) if (!config["force_rdma"].is_null())
{ {
this->force_rdma = config["force_rdma"].bool_value() || config["force_rdma"].uint64_value() != 0; this->force_rdma = config["force_rdma"].bool_value() || config["force_rdma"].uint64_value() != 0;
@ -755,6 +790,11 @@ bool osd_messenger_t::is_rdma_enabled()
{ {
return rdma_contexts.size() > 0; return rdma_contexts.size() > 0;
} }
bool osd_messenger_t::is_use_rdmacm()
{
return use_rdmacm;
}
#endif #endif
json11::Json::object osd_messenger_t::read_config(const json11::Json & config) json11::Json::object osd_messenger_t::read_config(const json11::Json & config)

View File

@ -152,6 +152,12 @@ public:
}; };
#endif #endif
#ifdef WITH_RDMA
struct rdma_event_channel;
struct rdma_cm_id;
struct rdma_cm_event;
#endif
struct osd_messenger_t struct osd_messenger_t
{ {
protected: protected:
@ -168,6 +174,7 @@ protected:
#ifdef WITH_RDMA #ifdef WITH_RDMA
bool use_rdma = true; bool use_rdma = true;
bool use_rdmacm = false;
bool force_rdma = false; bool force_rdma = false;
std::string rdma_device; std::string rdma_device;
uint64_t rdma_port_num = 1; uint64_t rdma_port_num = 1;
@ -177,6 +184,8 @@ protected:
uint64_t rdma_max_sge = 0, rdma_max_send = 0, rdma_max_recv = 0; uint64_t rdma_max_sge = 0, rdma_max_send = 0, rdma_max_recv = 0;
uint64_t rdma_max_msg = 0; uint64_t rdma_max_msg = 0;
bool rdma_odp = false; bool rdma_odp = false;
rdma_event_channel *rdmacm_evch = NULL;
std::map<rdma_cm_id*, osd_client_t*> rdmacm_connections;
#endif #endif
std::vector<msgr_iothread_t*> iothreads; std::vector<msgr_iothread_t*> iothreads;
@ -226,6 +235,11 @@ public:
bool is_rdma_enabled(); bool is_rdma_enabled();
bool connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg); bool connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg);
#endif #endif
#ifdef WITH_RDMACM
bool is_use_rdmacm();
rdma_cm_id *rdmacm_listen(const std::string & bind_address, int rdmacm_port);
void rdmacm_destroy_listener(rdma_cm_id *listener);
#endif
void inc_op_stats(osd_op_stats_t & stats, uint64_t opcode, timespec & tv_begin, timespec & tv_end, uint64_t len); void inc_op_stats(osd_op_stats_t & stats, uint64_t opcode, timespec & tv_begin, timespec & tv_end, uint64_t len);
void measure_exec(osd_op_t *cur_op); void measure_exec(osd_op_t *cur_op);
@ -260,4 +274,9 @@ protected:
void handle_rdma_events(msgr_rdma_context_t *rdma_context); void handle_rdma_events(msgr_rdma_context_t *rdma_context);
msgr_rdma_context_t* choose_rdma_context(osd_client_t *cl); msgr_rdma_context_t* choose_rdma_context(osd_client_t *cl);
#endif #endif
#ifdef WITH_RDMACM
void handle_rdmacm_events();
void rdmacm_accept(rdma_cm_event *ev);
void rdmacm_established(rdma_cm_event *ev);
#endif
}; };

View File

@ -37,15 +37,22 @@ msgr_rdma_context_t::~msgr_rdma_context_t()
ibv_dereg_mr(mr); ibv_dereg_mr(mr);
if (pd) if (pd)
ibv_dealloc_pd(pd); ibv_dealloc_pd(pd);
if (context) if (context && !is_cm)
ibv_close_device(context); ibv_close_device(context);
} }
msgr_rdma_connection_t::~msgr_rdma_connection_t() msgr_rdma_connection_t::~msgr_rdma_connection_t()
{ {
ctx->used_max_cqe -= max_send+max_recv; ctx->reserve_cqe(-max_send-max_recv);
if (qp) if (qp && !cmid)
ibv_destroy_qp(qp); ibv_destroy_qp(qp);
if (cmid)
{
ctx->cm_refs--;
if (cmid->qp)
rdma_destroy_qp(cmid);
rdma_destroy_id(cmid);
}
if (recv_buffers.size()) if (recv_buffers.size())
{ {
for (auto b: recv_buffers) for (auto b: recv_buffers)
@ -356,9 +363,34 @@ cleanup:
return NULL; return NULL;
} }
bool msgr_rdma_context_t::reserve_cqe(int n)
{
this->used_max_cqe += n;
if (this->used_max_cqe > this->max_cqe)
{
// Resize CQ
// Mellanox ConnectX-4 supports up to 4194303 CQEs, so it's fine to put everything into a single CQ
int new_max_cqe = this->max_cqe;
while (this->used_max_cqe > new_max_cqe)
{
new_max_cqe *= 2;
}
if (ibv_resize_cq(this->cq, new_max_cqe) != 0)
{
fprintf(stderr, "Couldn't resize RDMA completion queue to %d entries\n", new_max_cqe);
return false;
}
this->max_cqe = new_max_cqe;
}
return true;
}
msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx, uint32_t max_send, msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx, uint32_t max_send,
uint32_t max_recv, uint32_t max_sge, uint32_t max_msg) uint32_t max_recv, uint32_t max_sge, uint32_t max_msg)
{ {
if (!ctx->reserve_cqe(max_send+max_recv))
return NULL;
msgr_rdma_connection_t *conn = new msgr_rdma_connection_t; msgr_rdma_connection_t *conn = new msgr_rdma_connection_t;
max_sge = max_sge > ctx->attrx.orig_attr.max_sge ? ctx->attrx.orig_attr.max_sge : max_sge; max_sge = max_sge > ctx->attrx.orig_attr.max_sge ? ctx->attrx.orig_attr.max_sge : max_sge;
@ -369,25 +401,6 @@ msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx,
conn->max_sge = max_sge; conn->max_sge = max_sge;
conn->max_msg = max_msg; conn->max_msg = max_msg;
ctx->used_max_cqe += max_send+max_recv;
if (ctx->used_max_cqe > ctx->max_cqe)
{
// Resize CQ
// Mellanox ConnectX-4 supports up to 4194303 CQEs, so it's fine to put everything into a single CQ
int new_max_cqe = ctx->max_cqe;
while (ctx->used_max_cqe > new_max_cqe)
{
new_max_cqe *= 2;
}
if (ibv_resize_cq(ctx->cq, new_max_cqe) != 0)
{
fprintf(stderr, "Couldn't resize RDMA completion queue to %d entries\n", new_max_cqe);
delete conn;
return NULL;
}
ctx->max_cqe = new_max_cqe;
}
ibv_qp_init_attr init_attr = { ibv_qp_init_attr init_attr = {
.send_cq = ctx->cq, .send_cq = ctx->cq,
.recv_cq = ctx->cq, .recv_cq = ctx->cq,
@ -833,3 +846,235 @@ void osd_messenger_t::handle_rdma_events(msgr_rdma_context_t *rdma_context)
} while (event_count > 0); } while (event_count > 0);
handle_immediate_ops(); handle_immediate_ops();
} }
#ifdef WITH_RDMACM
rdma_cm_id *osd_messenger_t::rdmacm_listen(const std::string & bind_address, int rdmacm_port)
{
rdma_cm_id *listener = NULL;
int r = rdma_create_id(rdmacm_evch, &listener, NULL, RDMA_PS_TCP);
if (r != 0)
{
fprintf(stderr, "Failed to create RDMA-CM ID: %s (code %d)\n", strerror(errno), errno);
goto fail;
}
sockaddr_storage addr;
if (!string_to_addr(bind_address, 0, rdmacm_port, &addr))
{
fprintf(stderr, "Server address: %s is not valid\n", bind_address.c_str());
goto fail;
}
r = rdma_bind_addr(listener, (sockaddr*)&addr);
if (r != 0)
{
fprintf(stderr, "Failed to bind RDMA-CM to %s:%d: %s (code %d)\n", bind_address.c_str(), rdmacm_port, strerror(errno), errno);
goto fail;
}
r = rdma_listen(listener, 128);
if (r != 0)
{
fprintf(stderr, "Failed to listen RDMA-CM: %s (code %d)\n", strerror(errno), errno);
goto fail;
}
return listener;
fail:
rdma_destroy_id(listener);
return NULL;
}
void osd_messenger_t::rdmacm_destroy_listener(rdma_cm_id *listener)
{
rdma_destroy_id(listener);
}
void osd_messenger_t::handle_rdmacm_events()
{
rdma_cm_event *ev = NULL;
std::vector<osd_client_t*> stop_clients;
while (1)
{
int r = rdma_get_cm_event(rdmacm_evch, &ev);
if (r != 0)
{
if (errno == EAGAIN || errno == EINTR)
break;
fprintf(stderr, "Failed to get RDMA-CM event: %s (code %d)\n", strerror(errno), errno);
exit(1);
}
if (ev->event == RDMA_CM_EVENT_CONNECT_REQUEST)
{
rdmacm_accept(ev);
}
else if (ev->event == RDMA_CM_EVENT_CONNECT_ERROR ||
ev->event == RDMA_CM_EVENT_REJECTED ||
ev->event == RDMA_CM_EVENT_DISCONNECTED ||
ev->event == RDMA_CM_EVENT_DEVICE_REMOVAL)
{
auto event_type_name = ev->event == RDMA_CM_EVENT_CONNECT_ERROR ? "RDMA_CM_EVENT_CONNECT_ERROR" : (
ev->event == RDMA_CM_EVENT_REJECTED ? "RDMA_CM_EVENT_REJECTED" : (
ev->event == RDMA_CM_EVENT_DISCONNECTED ? "RDMA_CM_EVENT_DISCONNECTED" : "RDMA_CM_EVENT_DEVICE_REMOVAL"));
auto cli_it = rdmacm_connections.find(ev->id);
if (cli_it == rdmacm_connections.end())
{
fprintf(stderr, "Received %s event for an unknown connection 0x%jx - ignoring\n",
event_type_name, (uint64_t)ev->id);
}
else
{
fprintf(stderr, "Received %s event for connection 0x%jx - closing it\n",
event_type_name, (uint64_t)ev->id);
auto cli = cli_it->second;
stop_clients.push_back(cli);
}
}
else if (ev->event == RDMA_CM_EVENT_ESTABLISHED)
{
rdmacm_established(ev);
}
else if (ev->event == RDMA_CM_EVENT_ADDR_CHANGE || ev->event == RDMA_CM_EVENT_TIMEWAIT_EXIT)
{
// Do nothing
}
else
{
// Other events are unexpected
fprintf(stderr, "Unexpected RDMA-CM event type: %d\n", ev->event);
}
r = rdma_ack_cm_event(ev);
if (r != 0)
{
fprintf(stderr, "Failed to ack (free) RDMA-CM event: %s (code %d)\n", strerror(errno), errno);
exit(1);
}
}
// Stop only after flushing all events, otherwise rdma_destroy_id infinitely waits for pthread_cond
for (auto cli: stop_clients)
{
stop_client(cli->peer_fd);
}
}
msgr_rdma_context_t* msgr_rdma_context_t::create_cm(ibv_context *ctx)
{
auto rdma_context = new msgr_rdma_context_t;
rdma_context->is_cm = true;
rdma_context->context = ctx;
rdma_context->pd = ibv_alloc_pd(ctx);
if (!rdma_context->pd)
{
fprintf(stderr, "Couldn't allocate RDMA protection domain\n");
delete rdma_context;
return NULL;
}
rdma_context->odp = false;
rdma_context->channel = ibv_create_comp_channel(rdma_context->context);
if (!rdma_context->channel)
{
fprintf(stderr, "Couldn't create RDMA completion channel\n");
delete rdma_context;
return NULL;
}
rdma_context->max_cqe = 4096;
rdma_context->cq = ibv_create_cq(rdma_context->context, rdma_context->max_cqe, NULL, rdma_context->channel, 0);
if (!rdma_context->cq)
{
fprintf(stderr, "Couldn't create RDMA completion queue\n");
delete rdma_context;
return NULL;
}
if (ibv_query_device_ex(rdma_context->context, NULL, &rdma_context->attrx))
{
fprintf(stderr, "Couldn't query RDMA device for its features\n");
delete rdma_context;
return NULL;
}
return rdma_context;
}
void osd_messenger_t::rdmacm_accept(rdma_cm_event *ev)
{
// Find the context by device
// We assume that ev->id->verbs is always the same for the same device (but PD for example isn't)
msgr_rdma_context_t *rdma_context = NULL;
for (auto ctx: rdma_contexts)
{
if (ctx->context == ev->id->verbs)
{
rdma_context = ctx;
break;
}
}
if (!rdma_context)
{
// Wrap into a new msgr_rdma_context_t
rdma_context = msgr_rdma_context_t::create_cm(ev->id->verbs);
if (!rdma_context)
{
rdma_destroy_id(ev->id);
return;
}
fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK);
tfd->set_fd_handler(rdma_context->channel->fd, false, [this, rdma_context](int notify_fd, int epoll_events)
{
handle_rdma_events(rdma_context);
});
handle_rdma_events(rdma_context);
rdma_contexts.push_back(rdma_context);
}
rdma_context->reserve_cqe(rdma_max_send+rdma_max_recv);
ibv_qp_init_attr init_attr = {
.send_cq = rdma_context->cq,
.recv_cq = rdma_context->cq,
.cap = {
.max_send_wr = (uint32_t)rdma_max_send,
.max_recv_wr = (uint32_t)rdma_max_recv,
.max_send_sge = (uint32_t)rdma_max_sge,
.max_recv_sge = (uint32_t)rdma_max_sge,
},
.qp_type = IBV_QPT_RC,
};
int r = rdma_create_qp(ev->id, rdma_context->pd, &init_attr);
if (r != 0)
{
fprintf(stderr, "Failed to create a queue pair via RDMA-CM: %s (code %d)\n", strerror(errno), errno);
rdma_context->reserve_cqe(-rdma_max_send-rdma_max_recv);
rdma_destroy_id(ev->id);
return;
}
// We don't need private_data, RDMA_READ or ATOMIC so leave everything at 0
rdma_conn_param conn_params = { .rnr_retry_count = 7 };
r = rdma_accept(ev->id, &conn_params);
if (r != 0)
{
fprintf(stderr, "Failed to accept RDMA-CM connection: %s (code %d)\n", strerror(errno), errno);
rdma_context->reserve_cqe(-rdma_max_send-rdma_max_recv);
rdma_destroy_qp(ev->id);
rdma_destroy_id(ev->id);
return;
}
rdma_context->cm_refs++;
// Wrap into a new msgr_rdma_connection_t
msgr_rdma_connection_t *conn = new msgr_rdma_connection_t;
conn->ctx = rdma_context;
conn->max_send = rdma_max_send;
conn->max_recv = rdma_max_recv;
conn->max_sge = rdma_max_sge > rdma_context->attrx.orig_attr.max_sge
? rdma_context->attrx.orig_attr.max_sge : rdma_max_sge;
conn->max_msg = rdma_max_msg;
conn->cmid = ev->id;
conn->qp = ev->id->qp;
// Make a fake FD (FIXME: do not use FDs for identifying clients!)
int fake_fd = open("/dev/null", O_RDONLY);
auto cli = new osd_client_t();
cli->peer_fd = fake_fd;
cli->peer_state = PEER_RDMA;
cli->in_buf = malloc_or_die(receive_buffer_size);
cli->rdma_conn = conn;
clients[fake_fd] = cli;
rdmacm_connections[ev->id] = cli;
}
void osd_messenger_t::rdmacm_established(rdma_cm_event *ev)
{
}
#endif

View File

@ -2,6 +2,9 @@
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) // License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#pragma once #pragma once
#ifdef WITH_RDMACM
#include <rdma/rdma_cma.h>
#endif
#include <infiniband/verbs.h> #include <infiniband/verbs.h>
#include <string> #include <string>
#include <vector> #include <vector>
@ -36,11 +39,15 @@ struct msgr_rdma_context_t
int max_cqe = 0; int max_cqe = 0;
int used_max_cqe = 0; int used_max_cqe = 0;
addr_mask_t net_mask = {}; addr_mask_t net_mask = {};
bool is_cm = false;
int cm_refs = 0;
static std::vector<msgr_rdma_context_t*> create_all(const std::vector<addr_mask_t> & osd_network_masks, static std::vector<msgr_rdma_context_t*> create_all(const std::vector<addr_mask_t> & osd_network_masks,
const char *sel_dev_name, int sel_port_num, int sel_gid_index, uint32_t sel_mtu, bool odp, int log_level); const char *sel_dev_name, int sel_port_num, int sel_gid_index, uint32_t sel_mtu, bool odp, int log_level);
static msgr_rdma_context_t *create(ibv_device *dev, ibv_port_attr & portinfo, static msgr_rdma_context_t *create(ibv_device *dev, ibv_port_attr & portinfo,
int ib_port, int gid_index, uint32_t mtu, bool odp, int log_level); int ib_port, int gid_index, uint32_t mtu, bool odp, int log_level);
static msgr_rdma_context_t* create_cm(ibv_context *ctx);
bool reserve_cqe(int n);
~msgr_rdma_context_t(); ~msgr_rdma_context_t();
}; };
@ -55,6 +62,9 @@ struct msgr_rdma_connection_t
{ {
msgr_rdma_context_t *ctx = NULL; msgr_rdma_context_t *ctx = NULL;
ibv_qp *qp = NULL; ibv_qp *qp = NULL;
#ifdef WITH_RDMACM
rdma_cm_id *cmid = NULL;
#endif
msgr_rdma_address_t addr; msgr_rdma_address_t addr;
int max_send = 0, max_recv = 0, max_sge = 0; int max_send = 0, max_recv = 0, max_sge = 0;
int cur_send = 0, cur_recv = 0; int cur_send = 0, cur_recv = 0;

View File

@ -47,7 +47,7 @@ struct nfs_rdma_dev_state_t
ibv_comp_channel *channel = NULL; ibv_comp_channel *channel = NULL;
ibv_cq *cq = NULL; ibv_cq *cq = NULL;
rdma_allocator_t *alloc = NULL; rdma_allocator_t *alloc = NULL;
int max_cqe = 0, used_max_cqe = 0; int max_cqe = 0;
static nfs_rdma_dev_state_t *create(rdma_cm_id *cmid, uint64_t rdma_malloc_round_to, uint64_t rdma_max_unused_buffers); static nfs_rdma_dev_state_t *create(rdma_cm_id *cmid, uint64_t rdma_malloc_round_to, uint64_t rdma_max_unused_buffers);
~nfs_rdma_dev_state_t(); ~nfs_rdma_dev_state_t();

View File

@ -14,6 +14,7 @@ target_link_libraries(vitastor-osd
Jerasure Jerasure
${ISAL_LIBRARIES} ${ISAL_LIBRARIES}
${IBVERBS_LIBRARIES} ${IBVERBS_LIBRARIES}
${RDMACM_LIBRARIES}
) )
# osd_rmw_test # osd_rmw_test

View File

@ -120,6 +120,11 @@ osd_t::~osd_t()
delete epmgr; delete epmgr;
if (bs) if (bs)
delete bs; delete bs;
#ifdef WITH_RDMACM
for (rdma_cm_id *listener: rdmacm_listeners)
msgr.rdmacm_destroy_listener(listener);
rdmacm_listeners.clear();
#endif
for (auto listen_fd: listen_fds) for (auto listen_fd: listen_fds)
close(listen_fd); close(listen_fd);
listen_fds.clear(); listen_fds.clear();
@ -337,7 +342,7 @@ void osd_t::bind_socket()
} }
for (auto & bind_address: bind_addresses) for (auto & bind_address: bind_addresses)
{ {
int listen_fd = create_and_bind_socket(bind_address, bind_port, listen_backlog, &listening_port); int listen_fd = create_and_bind_socket(bind_address, listening_port ? listening_port : bind_port, listen_backlog, &listening_port);
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK); fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events) epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
{ {
@ -345,6 +350,17 @@ void osd_t::bind_socket()
}); });
listen_fds.push_back(listen_fd); listen_fds.push_back(listen_fd);
} }
#ifdef WITH_RDMACM
if (msgr.is_use_rdmacm()) // FIXME: use json_is_true and move here
{
for (auto & bind_address: bind_addresses)
{
auto listener = msgr.rdmacm_listen(bind_address, listening_port);
if (listener)
rdmacm_listeners.push_back(listener);
}
}
#endif
} }
bool osd_t::shutdown() bool osd_t::shutdown()

View File

@ -201,6 +201,9 @@ class osd_t
int listening_port = 0; int listening_port = 0;
std::vector<std::string> bind_addresses; std::vector<std::string> bind_addresses;
std::vector<int> listen_fds; std::vector<int> listen_fds;
#ifdef WITH_RDMACM
std::vector<rdma_cm_id *> rdmacm_listeners;
#endif
ring_consumer_t consumer; ring_consumer_t consumer;
// op statistics // op statistics

View File

@ -172,6 +172,10 @@ json11::Json osd_t::get_osd_state()
st["host"] = std::string(hostname.data(), hostname.size()); st["host"] = std::string(hostname.data(), hostname.size());
st["version"] = VITASTOR_VERSION; st["version"] = VITASTOR_VERSION;
st["port"] = listening_port; st["port"] = listening_port;
#ifdef WITH_RDMACM
if (rdmacm_listeners.size())
st["rdmacm"] = true;
#endif
st["primary_enabled"] = run_primary; st["primary_enabled"] = run_primary;
st["blockstore_enabled"] = bs ? true : false; st["blockstore_enabled"] = bs ? true : false;
return st; return st;

View File

@ -25,6 +25,7 @@ target_link_libraries(stub_uring_osd
vitastor_common vitastor_common
${LIBURING_LIBRARIES} ${LIBURING_LIBRARIES}
${IBVERBS_LIBRARIES} ${IBVERBS_LIBRARIES}
${RDMACM_LIBRARIES}
tcmalloc_minimal tcmalloc_minimal
) )