Compare commits
2 Commits
a28d1a4a01
...
1e3f143892
Author | SHA1 | Date |
---|---|---|
|
1e3f143892 | |
|
97a6073265 |
|
@ -28,6 +28,7 @@ target_link_libraries(vitastor_client
|
||||||
vitastor_cli
|
vitastor_cli
|
||||||
${LIBURING_LIBRARIES}
|
${LIBURING_LIBRARIES}
|
||||||
${IBVERBS_LIBRARIES}
|
${IBVERBS_LIBRARIES}
|
||||||
|
${RDMACM_LIBRARIES}
|
||||||
)
|
)
|
||||||
set_target_properties(vitastor_client PROPERTIES VERSION ${VITASTOR_VERSION} SOVERSION 0)
|
set_target_properties(vitastor_client PROPERTIES VERSION ${VITASTOR_VERSION} SOVERSION 0)
|
||||||
configure_file(vitastor.pc.in vitastor.pc @ONLY)
|
configure_file(vitastor.pc.in vitastor.pc @ONLY)
|
||||||
|
|
|
@ -117,31 +117,56 @@ void msgr_iothread_t::run()
|
||||||
|
|
||||||
void osd_messenger_t::init()
|
void osd_messenger_t::init()
|
||||||
{
|
{
|
||||||
// FIXME: Support multiple RDMA networks?!
|
#ifdef WITH_RDMACM
|
||||||
|
if (use_rdmacm)
|
||||||
|
{
|
||||||
|
// RDMA-CM only requires the event channel. All the remaining work is done separately
|
||||||
|
rdmacm_evch = rdma_create_event_channel();
|
||||||
|
if (!rdmacm_evch)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "Failed to initialize RDMA-CM event channel: %s (code %d)\n", strerror(errno), errno);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
fcntl(rdmacm_evch->fd, F_SETFL, fcntl(rdmacm_evch->fd, F_GETFL, 0) | O_NONBLOCK);
|
||||||
|
tfd->set_fd_handler(rdmacm_evch->fd, false, [this](int rdmacm_eventfd, int epoll_events)
|
||||||
|
{
|
||||||
|
handle_rdmacm_events();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
#endif
|
||||||
#ifdef WITH_RDMA
|
#ifdef WITH_RDMA
|
||||||
if (use_rdma)
|
if (use_rdma)
|
||||||
{
|
{
|
||||||
rdma_context = msgr_rdma_context_t::create(
|
rdma_contexts = msgr_rdma_context_t::create_all(
|
||||||
osd_num && osd_cluster_network_masks.size() ? osd_cluster_network_masks : osd_network_masks,
|
osd_num && osd_cluster_network_masks.size() ? osd_cluster_network_masks : osd_network_masks,
|
||||||
rdma_device != "" ? rdma_device.c_str() : NULL,
|
rdma_device != "" ? rdma_device.c_str() : NULL,
|
||||||
rdma_port_num, rdma_gid_index, rdma_mtu, rdma_odp, log_level
|
rdma_port_num, rdma_gid_index, rdma_mtu, rdma_odp, log_level
|
||||||
);
|
);
|
||||||
if (!rdma_context)
|
if (!rdma_contexts.size())
|
||||||
{
|
{
|
||||||
|
if (force_rdma)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "[OSD %ju] Couldn't initialize RDMA, force_rdma is enabled, exiting\n", osd_num);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
if (log_level > 0)
|
if (log_level > 0)
|
||||||
fprintf(stderr, "[OSD %ju] Couldn't initialize RDMA, proceeding with TCP only\n", osd_num);
|
fprintf(stderr, "[OSD %ju] Couldn't initialize RDMA, proceeding with TCP only\n", osd_num);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
rdma_max_sge = rdma_max_sge < rdma_context->attrx.orig_attr.max_sge
|
|
||||||
? rdma_max_sge : rdma_context->attrx.orig_attr.max_sge;
|
|
||||||
fprintf(stderr, "[OSD %ju] RDMA initialized successfully\n", osd_num);
|
fprintf(stderr, "[OSD %ju] RDMA initialized successfully\n", osd_num);
|
||||||
fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK);
|
for (msgr_rdma_context_t* rdma_context: rdma_contexts)
|
||||||
tfd->set_fd_handler(rdma_context->channel->fd, false, [this](int notify_fd, int epoll_events)
|
|
||||||
{
|
{
|
||||||
handle_rdma_events();
|
fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK);
|
||||||
});
|
tfd->set_fd_handler(rdma_context->channel->fd, false, [this, rdma_context](int notify_fd, int epoll_events)
|
||||||
handle_rdma_events();
|
{
|
||||||
|
handle_rdma_events(rdma_context);
|
||||||
|
});
|
||||||
|
handle_rdma_events(rdma_context);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
@ -249,10 +274,19 @@ osd_messenger_t::~osd_messenger_t()
|
||||||
iothreads.clear();
|
iothreads.clear();
|
||||||
}
|
}
|
||||||
#ifdef WITH_RDMA
|
#ifdef WITH_RDMA
|
||||||
if (rdma_context)
|
for (auto rdma_context: rdma_contexts)
|
||||||
{
|
{
|
||||||
delete rdma_context;
|
delete rdma_context;
|
||||||
}
|
}
|
||||||
|
rdma_contexts.clear();
|
||||||
|
#endif
|
||||||
|
#ifdef WITH_RDMACM
|
||||||
|
if (rdmacm_evch)
|
||||||
|
{
|
||||||
|
tfd->set_fd_handler(rdmacm_evch->fd, false, NULL);
|
||||||
|
rdma_destroy_event_channel(rdmacm_evch);
|
||||||
|
rdmacm_evch = NULL;
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -264,10 +298,18 @@ void osd_messenger_t::parse_config(const json11::Json & config)
|
||||||
// RDMA is on by default in RDMA-enabled builds
|
// RDMA is on by default in RDMA-enabled builds
|
||||||
this->use_rdma = config["use_rdma"].bool_value() || config["use_rdma"].uint64_value() != 0;
|
this->use_rdma = config["use_rdma"].bool_value() || config["use_rdma"].uint64_value() != 0;
|
||||||
}
|
}
|
||||||
|
#ifdef WITH_RDMACM
|
||||||
|
// Use RDMA CM? (required for iWARP and may be useful for IB)
|
||||||
|
// FIXME: Only parse during start
|
||||||
|
this->use_rdmacm = config["use_rdmacm"].bool_value() || config["use_rdmacm"].uint64_value() != 0;
|
||||||
|
this->disable_tcp = this->use_rdmacm && (config["disable_tcp"].bool_value() || config["disable_tcp"].uint64_value() != 0);
|
||||||
|
#endif
|
||||||
|
if (!config["force_rdma"].is_null())
|
||||||
|
{
|
||||||
|
this->force_rdma = config["force_rdma"].bool_value() || config["force_rdma"].uint64_value() != 0;
|
||||||
|
}
|
||||||
this->rdma_device = config["rdma_device"].string_value();
|
this->rdma_device = config["rdma_device"].string_value();
|
||||||
this->rdma_port_num = (uint8_t)config["rdma_port_num"].uint64_value();
|
this->rdma_port_num = (uint8_t)config["rdma_port_num"].uint64_value();
|
||||||
if (!this->rdma_port_num)
|
|
||||||
this->rdma_port_num = 1;
|
|
||||||
if (!config["rdma_gid_index"].is_null())
|
if (!config["rdma_gid_index"].is_null())
|
||||||
this->rdma_gid_index = (uint8_t)config["rdma_gid_index"].uint64_value();
|
this->rdma_gid_index = (uint8_t)config["rdma_gid_index"].uint64_value();
|
||||||
this->rdma_mtu = (uint32_t)config["rdma_mtu"].uint64_value();
|
this->rdma_mtu = (uint32_t)config["rdma_mtu"].uint64_value();
|
||||||
|
@ -363,9 +405,7 @@ void osd_messenger_t::connect_peer(uint64_t peer_osd, json11::Json peer_state)
|
||||||
bool is_cluster = false;
|
bool is_cluster = false;
|
||||||
for (auto & mask: osd_cluster_network_masks)
|
for (auto & mask: osd_cluster_network_masks)
|
||||||
{
|
{
|
||||||
if (mask.family == addr.ss_family && (mask.family == AF_INET
|
if (cidr_sockaddr_match(addr, mask))
|
||||||
? cidr_match(*(in_addr*)&addr, mask.ipv4, mask.bits)
|
|
||||||
: cidr6_match(*(in6_addr*)&addr, mask.ipv6, mask.bits)))
|
|
||||||
{
|
{
|
||||||
is_cluster = true;
|
is_cluster = true;
|
||||||
break;
|
break;
|
||||||
|
@ -387,6 +427,9 @@ void osd_messenger_t::connect_peer(uint64_t peer_osd, json11::Json peer_state)
|
||||||
wanted_peers[peer_osd].address_list = peer_state["addresses"];
|
wanted_peers[peer_osd].address_list = peer_state["addresses"];
|
||||||
wanted_peers[peer_osd].address_changed = true;
|
wanted_peers[peer_osd].address_changed = true;
|
||||||
}
|
}
|
||||||
|
#ifdef WITH_RDMACM
|
||||||
|
wanted_peers[peer_osd].peer_rdmacm = peer_state["rdmacm"].bool_value();
|
||||||
|
#endif
|
||||||
wanted_peers[peer_osd].port = (int)peer_state["port"].int64_value();
|
wanted_peers[peer_osd].port = (int)peer_state["port"].int64_value();
|
||||||
try_connect_peer(peer_osd);
|
try_connect_peer(peer_osd);
|
||||||
}
|
}
|
||||||
|
@ -412,12 +455,24 @@ void osd_messenger_t::try_connect_peer(uint64_t peer_osd)
|
||||||
wp.cur_addr = wp.address_list[wp.address_index].string_value();
|
wp.cur_addr = wp.address_list[wp.address_index].string_value();
|
||||||
wp.cur_port = wp.port;
|
wp.cur_port = wp.port;
|
||||||
wp.connecting = true;
|
wp.connecting = true;
|
||||||
try_connect_peer_addr(peer_osd, wp.cur_addr.c_str(), wp.cur_port);
|
#ifdef WITH_RDMACM
|
||||||
|
if (use_rdmacm && wp.peer_rdmacm)
|
||||||
|
rdmacm_try_connect_peer(peer_osd, wp.cur_addr.c_str(), wp.cur_port);
|
||||||
|
else
|
||||||
|
#endif
|
||||||
|
try_connect_peer_tcp(peer_osd, wp.cur_addr.c_str(), wp.cur_port);
|
||||||
}
|
}
|
||||||
|
|
||||||
void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port)
|
void osd_messenger_t::try_connect_peer_tcp(osd_num_t peer_osd, const char *peer_host, int peer_port)
|
||||||
{
|
{
|
||||||
assert(peer_osd != this->osd_num);
|
assert(peer_osd != this->osd_num);
|
||||||
|
#ifdef WITH_RDMACM
|
||||||
|
if (disable_tcp)
|
||||||
|
{
|
||||||
|
on_connect_peer(peer_osd, -EINVAL);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
struct sockaddr_storage addr;
|
struct sockaddr_storage addr;
|
||||||
if (!string_to_addr(peer_host, 0, peer_port, &addr))
|
if (!string_to_addr(peer_host, 0, peer_port, &addr))
|
||||||
{
|
{
|
||||||
|
@ -581,20 +636,30 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
#ifdef WITH_RDMA
|
#ifdef WITH_RDMA
|
||||||
if (rdma_context)
|
if (rdma_contexts.size())
|
||||||
{
|
{
|
||||||
cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg);
|
// Choose the right context for the selected network
|
||||||
if (cl->rdma_conn)
|
msgr_rdma_context_t *selected_ctx = choose_rdma_context(cl);
|
||||||
|
if (!selected_ctx)
|
||||||
{
|
{
|
||||||
json11::Json payload = json11::Json::object {
|
if (log_level > 0)
|
||||||
{ "connect_rdma", cl->rdma_conn->addr.to_string() },
|
fprintf(stderr, "No RDMA context for OSD %ju connection (peer %d), using only TCP\n", cl->osd_num, cl->peer_fd);
|
||||||
{ "rdma_max_msg", cl->rdma_conn->max_msg },
|
}
|
||||||
};
|
else
|
||||||
std::string payload_str = payload.dump();
|
{
|
||||||
op->req.show_conf.json_len = payload_str.size();
|
cl->rdma_conn = msgr_rdma_connection_t::create(selected_ctx, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg);
|
||||||
op->buf = malloc_or_die(payload_str.size());
|
if (cl->rdma_conn)
|
||||||
op->iov.push_back(op->buf, payload_str.size());
|
{
|
||||||
memcpy(op->buf, payload_str.c_str(), payload_str.size());
|
json11::Json payload = json11::Json::object {
|
||||||
|
{ "connect_rdma", cl->rdma_conn->addr.to_string() },
|
||||||
|
{ "rdma_max_msg", cl->rdma_conn->max_msg },
|
||||||
|
};
|
||||||
|
std::string payload_str = payload.dump();
|
||||||
|
op->req.show_conf.json_len = payload_str.size();
|
||||||
|
op->buf = malloc_or_die(payload_str.size());
|
||||||
|
op->iov.push_back(op->buf, payload_str.size());
|
||||||
|
memcpy(op->buf, payload_str.c_str(), payload_str.size());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
@ -724,9 +789,29 @@ void osd_messenger_t::accept_connections(int listen_fd)
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef WITH_RDMA
|
#ifdef WITH_RDMA
|
||||||
|
msgr_rdma_context_t* osd_messenger_t::choose_rdma_context(osd_client_t *cl)
|
||||||
|
{
|
||||||
|
// Choose the right context for the selected network
|
||||||
|
msgr_rdma_context_t *selected_ctx = NULL;
|
||||||
|
for (auto rdma_ctx: rdma_contexts)
|
||||||
|
{
|
||||||
|
if (!rdma_ctx->net_mask.family && !selected_ctx ||
|
||||||
|
rdma_ctx->net_mask.family && cidr_sockaddr_match(cl->peer_addr, rdma_ctx->net_mask))
|
||||||
|
{
|
||||||
|
selected_ctx = rdma_ctx;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return selected_ctx;
|
||||||
|
}
|
||||||
|
|
||||||
bool osd_messenger_t::is_rdma_enabled()
|
bool osd_messenger_t::is_rdma_enabled()
|
||||||
{
|
{
|
||||||
return rdma_context != NULL;
|
return rdma_contexts.size() > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool osd_messenger_t::is_use_rdmacm()
|
||||||
|
{
|
||||||
|
return use_rdmacm;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
|
@ -50,10 +50,10 @@ struct osd_client_t
|
||||||
{
|
{
|
||||||
int refs = 0;
|
int refs = 0;
|
||||||
|
|
||||||
sockaddr_storage peer_addr;
|
sockaddr_storage peer_addr = {};
|
||||||
int peer_port;
|
int peer_port = 0;
|
||||||
int peer_fd = -1;
|
int peer_fd = -1;
|
||||||
int peer_state;
|
int peer_state = 0;
|
||||||
int connect_timeout_id = -1;
|
int connect_timeout_id = -1;
|
||||||
int ping_time_remaining = 0;
|
int ping_time_remaining = 0;
|
||||||
int idle_time_remaining = 0;
|
int idle_time_remaining = 0;
|
||||||
|
@ -96,6 +96,7 @@ struct osd_wanted_peer_t
|
||||||
{
|
{
|
||||||
json11::Json raw_address_list;
|
json11::Json raw_address_list;
|
||||||
json11::Json address_list;
|
json11::Json address_list;
|
||||||
|
bool peer_rdmacm = false;
|
||||||
int n_cluster_addr = 0;
|
int n_cluster_addr = 0;
|
||||||
int port = 0;
|
int port = 0;
|
||||||
time_t last_connect_attempt = 0;
|
time_t last_connect_attempt = 0;
|
||||||
|
@ -152,6 +153,15 @@ public:
|
||||||
};
|
};
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#ifdef WITH_RDMA
|
||||||
|
struct rdma_event_channel;
|
||||||
|
struct rdma_cm_id;
|
||||||
|
struct rdma_cm_event;
|
||||||
|
struct ibv_context;
|
||||||
|
struct osd_messenger_t;
|
||||||
|
struct rdmacm_connecting_t;
|
||||||
|
#endif
|
||||||
|
|
||||||
struct osd_messenger_t
|
struct osd_messenger_t
|
||||||
{
|
{
|
||||||
protected:
|
protected:
|
||||||
|
@ -168,13 +178,20 @@ protected:
|
||||||
|
|
||||||
#ifdef WITH_RDMA
|
#ifdef WITH_RDMA
|
||||||
bool use_rdma = true;
|
bool use_rdma = true;
|
||||||
|
bool use_rdmacm = false;
|
||||||
|
bool disable_tcp = false;
|
||||||
|
bool force_rdma = false;
|
||||||
std::string rdma_device;
|
std::string rdma_device;
|
||||||
uint64_t rdma_port_num = 1, rdma_mtu = 0;
|
uint64_t rdma_port_num = 1;
|
||||||
|
int rdma_mtu = 0;
|
||||||
int rdma_gid_index = -1;
|
int rdma_gid_index = -1;
|
||||||
msgr_rdma_context_t *rdma_context = NULL;
|
std::vector<msgr_rdma_context_t *> rdma_contexts;
|
||||||
uint64_t rdma_max_sge = 0, rdma_max_send = 0, rdma_max_recv = 0;
|
uint64_t rdma_max_sge = 0, rdma_max_send = 0, rdma_max_recv = 0;
|
||||||
uint64_t rdma_max_msg = 0;
|
uint64_t rdma_max_msg = 0;
|
||||||
bool rdma_odp = false;
|
bool rdma_odp = false;
|
||||||
|
rdma_event_channel *rdmacm_evch = NULL;
|
||||||
|
std::map<rdma_cm_id*, osd_client_t*> rdmacm_connections;
|
||||||
|
std::map<rdma_cm_id*, rdmacm_connecting_t*> rdmacm_connecting;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
std::vector<msgr_iothread_t*> iothreads;
|
std::vector<msgr_iothread_t*> iothreads;
|
||||||
|
@ -224,13 +241,18 @@ public:
|
||||||
bool is_rdma_enabled();
|
bool is_rdma_enabled();
|
||||||
bool connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg);
|
bool connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg);
|
||||||
#endif
|
#endif
|
||||||
|
#ifdef WITH_RDMACM
|
||||||
|
bool is_use_rdmacm();
|
||||||
|
rdma_cm_id *rdmacm_listen(const std::string & bind_address, int rdmacm_port, int *bound_port, int log_level);
|
||||||
|
void rdmacm_destroy_listener(rdma_cm_id *listener);
|
||||||
|
#endif
|
||||||
|
|
||||||
void inc_op_stats(osd_op_stats_t & stats, uint64_t opcode, timespec & tv_begin, timespec & tv_end, uint64_t len);
|
void inc_op_stats(osd_op_stats_t & stats, uint64_t opcode, timespec & tv_begin, timespec & tv_end, uint64_t len);
|
||||||
void measure_exec(osd_op_t *cur_op);
|
void measure_exec(osd_op_t *cur_op);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
void try_connect_peer(uint64_t osd_num);
|
void try_connect_peer(uint64_t osd_num);
|
||||||
void try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port);
|
void try_connect_peer_tcp(osd_num_t peer_osd, const char *peer_host, int peer_port);
|
||||||
void handle_peer_epoll(int peer_fd, int epoll_events);
|
void handle_peer_epoll(int peer_fd, int epoll_events);
|
||||||
void handle_connect_epoll(int peer_fd);
|
void handle_connect_epoll(int peer_fd);
|
||||||
void on_connect_peer(osd_num_t peer_osd, int peer_fd);
|
void on_connect_peer(osd_num_t peer_osd, int peer_fd);
|
||||||
|
@ -255,6 +277,18 @@ protected:
|
||||||
void try_send_rdma_odp(osd_client_t *cl);
|
void try_send_rdma_odp(osd_client_t *cl);
|
||||||
void try_send_rdma_nodp(osd_client_t *cl);
|
void try_send_rdma_nodp(osd_client_t *cl);
|
||||||
bool try_recv_rdma(osd_client_t *cl);
|
bool try_recv_rdma(osd_client_t *cl);
|
||||||
void handle_rdma_events();
|
void handle_rdma_events(msgr_rdma_context_t *rdma_context);
|
||||||
|
msgr_rdma_context_t* choose_rdma_context(osd_client_t *cl);
|
||||||
|
#endif
|
||||||
|
#ifdef WITH_RDMACM
|
||||||
|
void handle_rdmacm_events();
|
||||||
|
msgr_rdma_context_t* rdmacm_get_context(ibv_context *verbs);
|
||||||
|
msgr_rdma_context_t* rdmacm_create_qp(rdma_cm_id *cmid);
|
||||||
|
void rdmacm_accept(rdma_cm_event *ev);
|
||||||
|
void rdmacm_try_connect_peer(uint64_t peer_osd, const std::string & addr, int peer_port);
|
||||||
|
void rdmacm_on_connect_peer_error(rdma_cm_id *cmid, int res);
|
||||||
|
void rdmacm_address_resolved(rdma_cm_event *ev);
|
||||||
|
void rdmacm_route_resolved(rdma_cm_event *ev);
|
||||||
|
void rdmacm_established(rdma_cm_event *ev);
|
||||||
#endif
|
#endif
|
||||||
};
|
};
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -2,6 +2,9 @@
|
||||||
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
|
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
|
||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
#ifdef WITH_RDMACM
|
||||||
|
#include <rdma/rdma_cma.h>
|
||||||
|
#endif
|
||||||
#include <infiniband/verbs.h>
|
#include <infiniband/verbs.h>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
@ -21,7 +24,6 @@ struct msgr_rdma_address_t
|
||||||
struct msgr_rdma_context_t
|
struct msgr_rdma_context_t
|
||||||
{
|
{
|
||||||
ibv_context *context = NULL;
|
ibv_context *context = NULL;
|
||||||
ibv_device *dev = NULL;
|
|
||||||
ibv_device_attr_ex attrx;
|
ibv_device_attr_ex attrx;
|
||||||
ibv_pd *pd = NULL;
|
ibv_pd *pd = NULL;
|
||||||
bool odp = false;
|
bool odp = false;
|
||||||
|
@ -36,8 +38,17 @@ struct msgr_rdma_context_t
|
||||||
uint32_t mtu;
|
uint32_t mtu;
|
||||||
int max_cqe = 0;
|
int max_cqe = 0;
|
||||||
int used_max_cqe = 0;
|
int used_max_cqe = 0;
|
||||||
|
addr_mask_t net_mask = {};
|
||||||
|
bool is_cm = false;
|
||||||
|
int cm_refs = 0;
|
||||||
|
|
||||||
|
static std::vector<msgr_rdma_context_t*> create_all(const std::vector<addr_mask_t> & osd_network_masks,
|
||||||
|
const char *sel_dev_name, int sel_port_num, int sel_gid_index, uint32_t sel_mtu, bool odp, int log_level);
|
||||||
|
static msgr_rdma_context_t *create(ibv_device *dev, ibv_port_attr & portinfo,
|
||||||
|
int ib_port, int gid_index, uint32_t mtu, bool odp, int log_level);
|
||||||
|
static msgr_rdma_context_t* create_cm(ibv_context *ctx);
|
||||||
|
bool reserve_cqe(int n);
|
||||||
|
|
||||||
static msgr_rdma_context_t *create(const std::vector<addr_mask_t> & osd_network_masks, const char *ib_devname, uint8_t ib_port, int gid_index, uint32_t mtu, bool odp, int log_level);
|
|
||||||
~msgr_rdma_context_t();
|
~msgr_rdma_context_t();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -51,11 +62,14 @@ struct msgr_rdma_connection_t
|
||||||
{
|
{
|
||||||
msgr_rdma_context_t *ctx = NULL;
|
msgr_rdma_context_t *ctx = NULL;
|
||||||
ibv_qp *qp = NULL;
|
ibv_qp *qp = NULL;
|
||||||
|
#ifdef WITH_RDMACM
|
||||||
|
rdma_cm_id *cmid = NULL;
|
||||||
|
#endif
|
||||||
msgr_rdma_address_t addr;
|
msgr_rdma_address_t addr;
|
||||||
int max_send = 0, max_recv = 0, max_sge = 0;
|
int max_send = 0, max_recv = 0, max_sge = 0;
|
||||||
int cur_send = 0, cur_recv = 0;
|
|
||||||
uint64_t max_msg = 0;
|
uint64_t max_msg = 0;
|
||||||
|
|
||||||
|
int cur_send = 0, cur_recv = 0;
|
||||||
int send_pos = 0, send_buf_pos = 0;
|
int send_pos = 0, send_buf_pos = 0;
|
||||||
int next_recv_buf = 0;
|
int next_recv_buf = 0;
|
||||||
std::vector<msgr_rdma_buf_t> recv_buffers;
|
std::vector<msgr_rdma_buf_t> recv_buffers;
|
||||||
|
|
|
@ -14,6 +14,7 @@ target_link_libraries(vitastor-osd
|
||||||
Jerasure
|
Jerasure
|
||||||
${ISAL_LIBRARIES}
|
${ISAL_LIBRARIES}
|
||||||
${IBVERBS_LIBRARIES}
|
${IBVERBS_LIBRARIES}
|
||||||
|
${RDMACM_LIBRARIES}
|
||||||
)
|
)
|
||||||
|
|
||||||
# osd_rmw_test
|
# osd_rmw_test
|
||||||
|
|
|
@ -120,6 +120,11 @@ osd_t::~osd_t()
|
||||||
delete epmgr;
|
delete epmgr;
|
||||||
if (bs)
|
if (bs)
|
||||||
delete bs;
|
delete bs;
|
||||||
|
#ifdef WITH_RDMACM
|
||||||
|
for (rdma_cm_id *listener: rdmacm_listeners)
|
||||||
|
msgr.rdmacm_destroy_listener(listener);
|
||||||
|
rdmacm_listeners.clear();
|
||||||
|
#endif
|
||||||
for (auto listen_fd: listen_fds)
|
for (auto listen_fd: listen_fds)
|
||||||
close(listen_fd);
|
close(listen_fd);
|
||||||
listen_fds.clear();
|
listen_fds.clear();
|
||||||
|
@ -167,6 +172,11 @@ void osd_t::parse_config(bool init)
|
||||||
bind_port = config["bind_port"].uint64_value();
|
bind_port = config["bind_port"].uint64_value();
|
||||||
if (bind_port <= 0 || bind_port > 65535)
|
if (bind_port <= 0 || bind_port > 65535)
|
||||||
bind_port = 0;
|
bind_port = 0;
|
||||||
|
#ifdef WITH_RDMACM
|
||||||
|
// Use RDMA CM? (required for iWARP and may be useful for IB)
|
||||||
|
this->use_rdmacm = config["use_rdmacm"].bool_value() || config["use_rdmacm"].uint64_value() != 0;
|
||||||
|
this->disable_tcp = this->use_rdmacm && (config["disable_tcp"].bool_value() || config["disable_tcp"].uint64_value() != 0);
|
||||||
|
#endif
|
||||||
// OSD configuration
|
// OSD configuration
|
||||||
etcd_report_interval = config["etcd_report_interval"].uint64_value();
|
etcd_report_interval = config["etcd_report_interval"].uint64_value();
|
||||||
if (etcd_report_interval <= 0)
|
if (etcd_report_interval <= 0)
|
||||||
|
@ -335,16 +345,30 @@ void osd_t::bind_socket()
|
||||||
{
|
{
|
||||||
bind_addresses.push_back("0.0.0.0");
|
bind_addresses.push_back("0.0.0.0");
|
||||||
}
|
}
|
||||||
for (auto & bind_address: bind_addresses)
|
if (!disable_tcp)
|
||||||
{
|
{
|
||||||
int listen_fd = create_and_bind_socket(bind_address, listening_port ? listening_port : bind_port, listen_backlog, &listening_port);
|
for (auto & bind_address: bind_addresses)
|
||||||
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
|
|
||||||
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
|
|
||||||
{
|
{
|
||||||
msgr.accept_connections(fd);
|
int listen_fd = create_and_bind_socket(bind_address, listening_port ? listening_port : bind_port, listen_backlog, &listening_port);
|
||||||
});
|
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
|
||||||
listen_fds.push_back(listen_fd);
|
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
|
||||||
|
{
|
||||||
|
msgr.accept_connections(fd);
|
||||||
|
});
|
||||||
|
listen_fds.push_back(listen_fd);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
#ifdef WITH_RDMACM
|
||||||
|
if (use_rdmacm)
|
||||||
|
{
|
||||||
|
for (auto & bind_address: bind_addresses)
|
||||||
|
{
|
||||||
|
auto listener = msgr.rdmacm_listen(bind_address, listening_port, &listening_port, log_level);
|
||||||
|
if (listener)
|
||||||
|
rdmacm_listeners.push_back(listener);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
bool osd_t::shutdown()
|
bool osd_t::shutdown()
|
||||||
|
|
|
@ -108,6 +108,8 @@ class osd_t
|
||||||
bool no_scrub = false;
|
bool no_scrub = false;
|
||||||
bool allow_net_split = false;
|
bool allow_net_split = false;
|
||||||
int bind_port, listen_backlog = 128;
|
int bind_port, listen_backlog = 128;
|
||||||
|
bool use_rdmacm = false;
|
||||||
|
bool disable_tcp = false;
|
||||||
// FIXME: Implement client queue depth limit
|
// FIXME: Implement client queue depth limit
|
||||||
int client_queue_depth = 128;
|
int client_queue_depth = 128;
|
||||||
bool allow_test_ops = false;
|
bool allow_test_ops = false;
|
||||||
|
@ -201,6 +203,9 @@ class osd_t
|
||||||
int listening_port = 0;
|
int listening_port = 0;
|
||||||
std::vector<std::string> bind_addresses;
|
std::vector<std::string> bind_addresses;
|
||||||
std::vector<int> listen_fds;
|
std::vector<int> listen_fds;
|
||||||
|
#ifdef WITH_RDMACM
|
||||||
|
std::vector<rdma_cm_id *> rdmacm_listeners;
|
||||||
|
#endif
|
||||||
ring_consumer_t consumer;
|
ring_consumer_t consumer;
|
||||||
|
|
||||||
// op statistics
|
// op statistics
|
||||||
|
|
|
@ -172,6 +172,10 @@ json11::Json osd_t::get_osd_state()
|
||||||
st["host"] = std::string(hostname.data(), hostname.size());
|
st["host"] = std::string(hostname.data(), hostname.size());
|
||||||
st["version"] = VITASTOR_VERSION;
|
st["version"] = VITASTOR_VERSION;
|
||||||
st["port"] = listening_port;
|
st["port"] = listening_port;
|
||||||
|
#ifdef WITH_RDMACM
|
||||||
|
if (rdmacm_listeners.size())
|
||||||
|
st["rdmacm"] = true;
|
||||||
|
#endif
|
||||||
st["primary_enabled"] = run_primary;
|
st["primary_enabled"] = run_primary;
|
||||||
st["blockstore_enabled"] = bs ? true : false;
|
st["blockstore_enabled"] = bs ? true : false;
|
||||||
return st;
|
return st;
|
||||||
|
|
|
@ -25,6 +25,7 @@ target_link_libraries(stub_uring_osd
|
||||||
vitastor_common
|
vitastor_common
|
||||||
${LIBURING_LIBRARIES}
|
${LIBURING_LIBRARIES}
|
||||||
${IBVERBS_LIBRARIES}
|
${IBVERBS_LIBRARIES}
|
||||||
|
${RDMACM_LIBRARIES}
|
||||||
tcmalloc_minimal
|
tcmalloc_minimal
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -93,6 +93,13 @@ bool cidr6_match(const in6_addr &address, const in6_addr &network, uint8_t bits)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool cidr_sockaddr_match(const sockaddr_storage &addr, const addr_mask_t &mask)
|
||||||
|
{
|
||||||
|
return mask.family == addr.ss_family && (mask.family == AF_INET
|
||||||
|
? cidr_match(*(in_addr*)&addr, mask.ipv4, mask.bits)
|
||||||
|
: cidr6_match(*(in6_addr*)&addr, mask.ipv6, mask.bits));
|
||||||
|
}
|
||||||
|
|
||||||
addr_mask_t cidr_parse(std::string mask)
|
addr_mask_t cidr_parse(std::string mask)
|
||||||
{
|
{
|
||||||
unsigned bits = 255;
|
unsigned bits = 255;
|
||||||
|
|
|
@ -18,5 +18,6 @@ std::string addr_to_string(const sockaddr_storage &addr);
|
||||||
addr_mask_t cidr_parse(std::string mask);
|
addr_mask_t cidr_parse(std::string mask);
|
||||||
bool cidr_match(const in_addr &address, const in_addr &network, uint8_t bits);
|
bool cidr_match(const in_addr &address, const in_addr &network, uint8_t bits);
|
||||||
bool cidr6_match(const in6_addr &address, const in6_addr &network, uint8_t bits);
|
bool cidr6_match(const in6_addr &address, const in6_addr &network, uint8_t bits);
|
||||||
|
bool cidr_sockaddr_match(const sockaddr_storage &addr, const addr_mask_t &mask);
|
||||||
std::vector<std::string> getifaddr_list(const std::vector<addr_mask_t> & masks = std::vector<addr_mask_t>(), bool include_v6 = false);
|
std::vector<std::string> getifaddr_list(const std::vector<addr_mask_t> & masks = std::vector<addr_mask_t>(), bool include_v6 = false);
|
||||||
int create_and_bind_socket(std::string bind_address, int bind_port, int listen_backlog, int *listening_port);
|
int create_and_bind_socket(std::string bind_address, int bind_port, int listen_backlog, int *listening_port);
|
||||||
|
|
Loading…
Reference in New Issue