Compare commits

..

2 Commits

Author SHA1 Message Date
Vitaliy Filippov 1e3f143892 Implement support of RDMA via RDMA-CM
Required for iWARP and, possibly, Infiniband cards
2025-03-31 13:02:01 +03:00
Vitaliy Filippov 97a6073265 WIP Support multiple RDMA networks 2025-03-31 13:02:01 +03:00
5 changed files with 85 additions and 40 deletions

View File

@ -299,11 +299,10 @@ void osd_messenger_t::parse_config(const json11::Json & config)
this->use_rdma = config["use_rdma"].bool_value() || config["use_rdma"].uint64_value() != 0;
}
#ifdef WITH_RDMACM
if (!config["use_rdmacm"].is_null())
{
// Use RDMA CM (required for iWARP and may be useful for IB)
this->use_rdmacm = config["use_rdmacm"].bool_value() || config["use_rdmacm"].uint64_value() != 0;
}
// Use RDMA CM? (required for iWARP and may be useful for IB)
// FIXME: Only parse during start
this->use_rdmacm = config["use_rdmacm"].bool_value() || config["use_rdmacm"].uint64_value() != 0;
this->disable_tcp = this->use_rdmacm && (config["disable_tcp"].bool_value() || config["disable_tcp"].uint64_value() != 0);
#endif
if (!config["force_rdma"].is_null())
{
@ -467,6 +466,13 @@ void osd_messenger_t::try_connect_peer(uint64_t peer_osd)
void osd_messenger_t::try_connect_peer_tcp(osd_num_t peer_osd, const char *peer_host, int peer_port)
{
assert(peer_osd != this->osd_num);
#ifdef WITH_RDMACM
if (disable_tcp)
{
on_connect_peer(peer_osd, -EINVAL);
return;
}
#endif
struct sockaddr_storage addr;
if (!string_to_addr(peer_host, 0, peer_port, &addr))
{
@ -637,7 +643,7 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
if (!selected_ctx)
{
if (log_level > 0)
fprintf(stderr, "No RDMA context for OSD %lu connection (peer %d), using only TCP\n", cl->osd_num, cl->peer_fd);
fprintf(stderr, "No RDMA context for OSD %ju connection (peer %d), using only TCP\n", cl->osd_num, cl->peer_fd);
}
else
{

View File

@ -179,6 +179,7 @@ protected:
#ifdef WITH_RDMA
bool use_rdma = true;
bool use_rdmacm = false;
bool disable_tcp = false;
bool force_rdma = false;
std::string rdma_device;
uint64_t rdma_port_num = 1;
@ -242,7 +243,7 @@ public:
#endif
#ifdef WITH_RDMACM
bool is_use_rdmacm();
rdma_cm_id *rdmacm_listen(const std::string & bind_address, int rdmacm_port, int log_level);
rdma_cm_id *rdmacm_listen(const std::string & bind_address, int rdmacm_port, int *bound_port, int log_level);
void rdmacm_destroy_listener(rdma_cm_id *listener);
#endif

View File

@ -863,8 +863,9 @@ struct rdmacm_connecting_t
msgr_rdma_context_t *rdma_context = NULL;
};
rdma_cm_id *osd_messenger_t::rdmacm_listen(const std::string & bind_address, int rdmacm_port, int log_level)
rdma_cm_id *osd_messenger_t::rdmacm_listen(const std::string & bind_address, int rdmacm_port, int *bound_port, int log_level)
{
sockaddr_storage addr = {};
rdma_cm_id *listener = NULL;
int r = rdma_create_id(rdmacm_evch, &listener, NULL, RDMA_PS_TCP);
if (r != 0)
@ -872,7 +873,6 @@ rdma_cm_id *osd_messenger_t::rdmacm_listen(const std::string & bind_address, int
fprintf(stderr, "Failed to create RDMA-CM ID: %s (code %d)\n", strerror(errno), errno);
goto fail;
}
sockaddr_storage addr;
if (!string_to_addr(bind_address, 0, rdmacm_port, &addr))
{
fprintf(stderr, "Server address: %s is not valid\n", bind_address.c_str());
@ -890,9 +890,13 @@ rdma_cm_id *osd_messenger_t::rdmacm_listen(const std::string & bind_address, int
fprintf(stderr, "Failed to listen to RDMA-CM address %s:%d: %s (code %d)\n", bind_address.c_str(), rdmacm_port, strerror(errno), errno);
goto fail;
}
if (bound_port)
{
*bound_port = ntohs(rdma_get_src_port(listener));
}
if (log_level > 0)
{
fprintf(stderr, "Listening to RDMA-CM address %s port %d\n", bind_address.c_str(), rdmacm_port);
fprintf(stderr, "Listening to RDMA-CM address %s port %d\n", bind_address.c_str(), *bound_port);
}
return listener;
fail:
@ -952,6 +956,8 @@ void osd_messenger_t::handle_rdmacm_events()
}
else if (rdmacm_connecting.find(ev->id) != rdmacm_connecting.end())
{
fprintf(stderr, "Received %s event for RDMA-CM OSD %ju connection\n",
event_type_name, rdmacm_connecting[ev->id]->peer_osd);
rdmacm_established(ev);
}
else
@ -1097,6 +1103,7 @@ void osd_messenger_t::rdmacm_accept(rdma_cm_event *ev)
int fake_fd = socket(AF_INET, SOCK_STREAM, 0);
if (fake_fd < 0)
{
fprintf(stderr, "Failed to allocate a fake socket for RDMA-CM client: %s (code %d)\n", strerror(errno), errno);
rdma_destroy_id(ev->id);
return;
}
@ -1106,8 +1113,13 @@ void osd_messenger_t::rdmacm_accept(rdma_cm_event *ev)
rdma_destroy_id(ev->id);
return;
}
// We don't need private_data, RDMA_READ or ATOMIC so leave everything at 0
rdma_conn_param conn_params = { .rnr_retry_count = 7 };
// We don't need private_data, RDMA_READ or ATOMIC so use default 1
rdma_conn_param conn_params = {
.responder_resources = 1,
.initiator_depth = 1,
.retry_count = 7,
.rnr_retry_count = 7,
};
if (rdma_accept(ev->id, &conn_params) != 0)
{
fprintf(stderr, "Failed to accept RDMA-CM connection: %s (code %d)\n", strerror(errno), errno);
@ -1143,7 +1155,6 @@ void osd_messenger_t::rdmacm_accept(rdma_cm_event *ev)
void osd_messenger_t::rdmacm_on_connect_peer_error(rdma_cm_id *cmid, int res)
{
assert(res < 0);
auto conn = rdmacm_connecting.at(cmid);
auto addr = conn->addr;
auto peer_port = conn->peer_port;
@ -1162,13 +1173,21 @@ void osd_messenger_t::rdmacm_on_connect_peer_error(rdma_cm_id *cmid, int res)
}
rdmacm_connecting.erase(cmid);
delete conn;
// Fall back to TCP instead of just reporting the error to on_connect_peer()
try_connect_peer_tcp(peer_osd, addr.c_str(), peer_port);
if (!disable_tcp)
{
// Fall back to TCP instead of just reporting the error to on_connect_peer()
try_connect_peer_tcp(peer_osd, addr.c_str(), peer_port);
}
else
{
// TCP is disabled
on_connect_peer(peer_osd, res == 0 ? -EINVAL : (res > 0 ? -res : res));
}
}
void osd_messenger_t::rdmacm_try_connect_peer(uint64_t peer_osd, const std::string & addr, int peer_port)
{
struct sockaddr_storage sa;
struct sockaddr_storage sa = {};
if (!string_to_addr(addr, false, peer_port, &sa))
{
fprintf(stderr, "Address %s is invalid\n", addr.c_str());
@ -1178,8 +1197,12 @@ void osd_messenger_t::rdmacm_try_connect_peer(uint64_t peer_osd, const std::stri
rdma_cm_id *cmid = NULL;
if (rdma_create_id(rdmacm_evch, &cmid, NULL, RDMA_PS_TCP) != 0)
{
int res = -errno;
fprintf(stderr, "Failed to create RDMA-CM ID: %s (code %d), using TCP\n", strerror(errno), errno);
try_connect_peer_tcp(peer_osd, addr.c_str(), peer_port);
if (!disable_tcp)
try_connect_peer_tcp(peer_osd, addr.c_str(), peer_port);
else
on_connect_peer(peer_osd, res);
return;
}
// Make a fake FD (FIXME: do not use FDs for identifying clients!)
@ -1234,8 +1257,8 @@ void osd_messenger_t::rdmacm_address_resolved(rdma_cm_event *ev)
auto conn = conn_it->second;
if (ev->event != RDMA_CM_EVENT_ADDR_RESOLVED || ev->status != 0)
{
ev->status = (!ev->status ? -EIO : (ev->status > 0 ? -ev->status : ev->status));
fprintf(stderr, "Failed to resolve address %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), strerror(-ev->status), ev->status);
fprintf(stderr, "Failed to resolve address %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(),
ev->status > 0 ? "unknown error" : strerror(-ev->status), ev->status);
rdmacm_on_connect_peer_error(cmid, ev->status);
return;
}
@ -1267,17 +1290,22 @@ void osd_messenger_t::rdmacm_route_resolved(rdma_cm_event *ev)
auto conn = conn_it->second;
if (ev->event != RDMA_CM_EVENT_ROUTE_RESOLVED || ev->status != 0)
{
ev->status = (!ev->status ? -EIO : (ev->status > 0 ? -ev->status : ev->status));
fprintf(stderr, "Failed to resolve route to %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), strerror(-ev->status), ev->status);
fprintf(stderr, "Failed to resolve route to %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(),
ev->status > 0 ? "unknown error" : strerror(-ev->status), ev->status);
rdmacm_on_connect_peer_error(cmid, ev->status);
return;
}
// We don't need private_data, RDMA_READ or ATOMIC so leave everything at 0
rdma_conn_param conn_params = { .rnr_retry_count = 7 };
// We don't need private_data, RDMA_READ or ATOMIC so use default 1
rdma_conn_param conn_params = {
.responder_resources = 1,
.initiator_depth = 1,
.retry_count = 7,
.rnr_retry_count = 7,
};
if (rdma_connect(cmid, &conn_params) != 0)
{
int res = -errno;
fprintf(stderr, "Failed to connect to %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), strerror(errno), errno);
fprintf(stderr, "Failed to connect to %s:%d via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), conn->peer_port, strerror(errno), errno);
rdmacm_on_connect_peer_error(cmid, res);
return;
}
@ -1296,8 +1324,8 @@ void osd_messenger_t::rdmacm_established(rdma_cm_event *ev)
auto peer_osd = conn->peer_osd;
if (ev->event != RDMA_CM_EVENT_ESTABLISHED || ev->status != 0)
{
ev->status = (!ev->status ? -EIO : (ev->status > 0 ? -ev->status : ev->status));
fprintf(stderr, "Failed to connect to %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), strerror(-ev->status), ev->status);
fprintf(stderr, "Failed to connect to %s:%d via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), conn->peer_port,
ev->status > 0 ? "unknown error" : strerror(-ev->status), ev->status);
rdmacm_on_connect_peer_error(cmid, ev->status);
return;
}
@ -1329,7 +1357,7 @@ void osd_messenger_t::rdmacm_established(rdma_cm_event *ev)
rdmacm_connecting.erase(cmid);
rdmacm_connections[cmid] = cl;
if (log_level > 0)
fprintf(stderr, "Successfully connected with OSD %lu using RDMA-CM\n", peer_osd);
fprintf(stderr, "Successfully connected with OSD %ju using RDMA-CM\n", peer_osd);
// Add initial receive request(s)
try_recv_rdma(cl);
osd_peer_fds[peer_osd] = cl->peer_fd;

View File

@ -172,6 +172,11 @@ void osd_t::parse_config(bool init)
bind_port = config["bind_port"].uint64_value();
if (bind_port <= 0 || bind_port > 65535)
bind_port = 0;
#ifdef WITH_RDMACM
// Use RDMA CM? (required for iWARP and may be useful for IB)
this->use_rdmacm = config["use_rdmacm"].bool_value() || config["use_rdmacm"].uint64_value() != 0;
this->disable_tcp = this->use_rdmacm && (config["disable_tcp"].bool_value() || config["disable_tcp"].uint64_value() != 0);
#endif
// OSD configuration
etcd_report_interval = config["etcd_report_interval"].uint64_value();
if (etcd_report_interval <= 0)
@ -340,22 +345,25 @@ void osd_t::bind_socket()
{
bind_addresses.push_back("0.0.0.0");
}
for (auto & bind_address: bind_addresses)
{
int listen_fd = create_and_bind_socket(bind_address, listening_port ? listening_port : bind_port, listen_backlog, &listening_port);
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
{
msgr.accept_connections(fd);
});
listen_fds.push_back(listen_fd);
}
#ifdef WITH_RDMACM
if (msgr.is_use_rdmacm()) // FIXME: use json_is_true and move here
if (!disable_tcp)
{
for (auto & bind_address: bind_addresses)
{
auto listener = msgr.rdmacm_listen(bind_address, listening_port, log_level);
int listen_fd = create_and_bind_socket(bind_address, listening_port ? listening_port : bind_port, listen_backlog, &listening_port);
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
{
msgr.accept_connections(fd);
});
listen_fds.push_back(listen_fd);
}
}
#ifdef WITH_RDMACM
if (use_rdmacm)
{
for (auto & bind_address: bind_addresses)
{
auto listener = msgr.rdmacm_listen(bind_address, listening_port, &listening_port, log_level);
if (listener)
rdmacm_listeners.push_back(listener);
}

View File

@ -108,6 +108,8 @@ class osd_t
bool no_scrub = false;
bool allow_net_split = false;
int bind_port, listen_backlog = 128;
bool use_rdmacm = false;
bool disable_tcp = false;
// FIXME: Implement client queue depth limit
int client_queue_depth = 128;
bool allow_test_ops = false;