Compare commits

..

2 Commits

Author SHA1 Message Date
Vitaliy Filippov 1e3f143892 Implement support of RDMA via RDMA-CM
Required for iWARP and, possibly, Infiniband cards
2025-03-31 13:02:01 +03:00
Vitaliy Filippov 97a6073265 WIP Support multiple RDMA networks 2025-03-31 13:02:01 +03:00
5 changed files with 85 additions and 40 deletions

View File

@ -299,11 +299,10 @@ void osd_messenger_t::parse_config(const json11::Json & config)
this->use_rdma = config["use_rdma"].bool_value() || config["use_rdma"].uint64_value() != 0; this->use_rdma = config["use_rdma"].bool_value() || config["use_rdma"].uint64_value() != 0;
} }
#ifdef WITH_RDMACM #ifdef WITH_RDMACM
if (!config["use_rdmacm"].is_null()) // Use RDMA CM? (required for iWARP and may be useful for IB)
{ // FIXME: Only parse during start
// Use RDMA CM (required for iWARP and may be useful for IB) this->use_rdmacm = config["use_rdmacm"].bool_value() || config["use_rdmacm"].uint64_value() != 0;
this->use_rdmacm = config["use_rdmacm"].bool_value() || config["use_rdmacm"].uint64_value() != 0; this->disable_tcp = this->use_rdmacm && (config["disable_tcp"].bool_value() || config["disable_tcp"].uint64_value() != 0);
}
#endif #endif
if (!config["force_rdma"].is_null()) if (!config["force_rdma"].is_null())
{ {
@ -467,6 +466,13 @@ void osd_messenger_t::try_connect_peer(uint64_t peer_osd)
void osd_messenger_t::try_connect_peer_tcp(osd_num_t peer_osd, const char *peer_host, int peer_port) void osd_messenger_t::try_connect_peer_tcp(osd_num_t peer_osd, const char *peer_host, int peer_port)
{ {
assert(peer_osd != this->osd_num); assert(peer_osd != this->osd_num);
#ifdef WITH_RDMACM
if (disable_tcp)
{
on_connect_peer(peer_osd, -EINVAL);
return;
}
#endif
struct sockaddr_storage addr; struct sockaddr_storage addr;
if (!string_to_addr(peer_host, 0, peer_port, &addr)) if (!string_to_addr(peer_host, 0, peer_port, &addr))
{ {
@ -637,7 +643,7 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
if (!selected_ctx) if (!selected_ctx)
{ {
if (log_level > 0) if (log_level > 0)
fprintf(stderr, "No RDMA context for OSD %lu connection (peer %d), using only TCP\n", cl->osd_num, cl->peer_fd); fprintf(stderr, "No RDMA context for OSD %ju connection (peer %d), using only TCP\n", cl->osd_num, cl->peer_fd);
} }
else else
{ {

View File

@ -179,6 +179,7 @@ protected:
#ifdef WITH_RDMA #ifdef WITH_RDMA
bool use_rdma = true; bool use_rdma = true;
bool use_rdmacm = false; bool use_rdmacm = false;
bool disable_tcp = false;
bool force_rdma = false; bool force_rdma = false;
std::string rdma_device; std::string rdma_device;
uint64_t rdma_port_num = 1; uint64_t rdma_port_num = 1;
@ -242,7 +243,7 @@ public:
#endif #endif
#ifdef WITH_RDMACM #ifdef WITH_RDMACM
bool is_use_rdmacm(); bool is_use_rdmacm();
rdma_cm_id *rdmacm_listen(const std::string & bind_address, int rdmacm_port, int log_level); rdma_cm_id *rdmacm_listen(const std::string & bind_address, int rdmacm_port, int *bound_port, int log_level);
void rdmacm_destroy_listener(rdma_cm_id *listener); void rdmacm_destroy_listener(rdma_cm_id *listener);
#endif #endif

View File

@ -863,8 +863,9 @@ struct rdmacm_connecting_t
msgr_rdma_context_t *rdma_context = NULL; msgr_rdma_context_t *rdma_context = NULL;
}; };
rdma_cm_id *osd_messenger_t::rdmacm_listen(const std::string & bind_address, int rdmacm_port, int log_level) rdma_cm_id *osd_messenger_t::rdmacm_listen(const std::string & bind_address, int rdmacm_port, int *bound_port, int log_level)
{ {
sockaddr_storage addr = {};
rdma_cm_id *listener = NULL; rdma_cm_id *listener = NULL;
int r = rdma_create_id(rdmacm_evch, &listener, NULL, RDMA_PS_TCP); int r = rdma_create_id(rdmacm_evch, &listener, NULL, RDMA_PS_TCP);
if (r != 0) if (r != 0)
@ -872,7 +873,6 @@ rdma_cm_id *osd_messenger_t::rdmacm_listen(const std::string & bind_address, int
fprintf(stderr, "Failed to create RDMA-CM ID: %s (code %d)\n", strerror(errno), errno); fprintf(stderr, "Failed to create RDMA-CM ID: %s (code %d)\n", strerror(errno), errno);
goto fail; goto fail;
} }
sockaddr_storage addr;
if (!string_to_addr(bind_address, 0, rdmacm_port, &addr)) if (!string_to_addr(bind_address, 0, rdmacm_port, &addr))
{ {
fprintf(stderr, "Server address: %s is not valid\n", bind_address.c_str()); fprintf(stderr, "Server address: %s is not valid\n", bind_address.c_str());
@ -890,9 +890,13 @@ rdma_cm_id *osd_messenger_t::rdmacm_listen(const std::string & bind_address, int
fprintf(stderr, "Failed to listen to RDMA-CM address %s:%d: %s (code %d)\n", bind_address.c_str(), rdmacm_port, strerror(errno), errno); fprintf(stderr, "Failed to listen to RDMA-CM address %s:%d: %s (code %d)\n", bind_address.c_str(), rdmacm_port, strerror(errno), errno);
goto fail; goto fail;
} }
if (bound_port)
{
*bound_port = ntohs(rdma_get_src_port(listener));
}
if (log_level > 0) if (log_level > 0)
{ {
fprintf(stderr, "Listening to RDMA-CM address %s port %d\n", bind_address.c_str(), rdmacm_port); fprintf(stderr, "Listening to RDMA-CM address %s port %d\n", bind_address.c_str(), *bound_port);
} }
return listener; return listener;
fail: fail:
@ -952,6 +956,8 @@ void osd_messenger_t::handle_rdmacm_events()
} }
else if (rdmacm_connecting.find(ev->id) != rdmacm_connecting.end()) else if (rdmacm_connecting.find(ev->id) != rdmacm_connecting.end())
{ {
fprintf(stderr, "Received %s event for RDMA-CM OSD %ju connection\n",
event_type_name, rdmacm_connecting[ev->id]->peer_osd);
rdmacm_established(ev); rdmacm_established(ev);
} }
else else
@ -1097,6 +1103,7 @@ void osd_messenger_t::rdmacm_accept(rdma_cm_event *ev)
int fake_fd = socket(AF_INET, SOCK_STREAM, 0); int fake_fd = socket(AF_INET, SOCK_STREAM, 0);
if (fake_fd < 0) if (fake_fd < 0)
{ {
fprintf(stderr, "Failed to allocate a fake socket for RDMA-CM client: %s (code %d)\n", strerror(errno), errno);
rdma_destroy_id(ev->id); rdma_destroy_id(ev->id);
return; return;
} }
@ -1106,8 +1113,13 @@ void osd_messenger_t::rdmacm_accept(rdma_cm_event *ev)
rdma_destroy_id(ev->id); rdma_destroy_id(ev->id);
return; return;
} }
// We don't need private_data, RDMA_READ or ATOMIC so leave everything at 0 // We don't need private_data, RDMA_READ or ATOMIC so use default 1
rdma_conn_param conn_params = { .rnr_retry_count = 7 }; rdma_conn_param conn_params = {
.responder_resources = 1,
.initiator_depth = 1,
.retry_count = 7,
.rnr_retry_count = 7,
};
if (rdma_accept(ev->id, &conn_params) != 0) if (rdma_accept(ev->id, &conn_params) != 0)
{ {
fprintf(stderr, "Failed to accept RDMA-CM connection: %s (code %d)\n", strerror(errno), errno); fprintf(stderr, "Failed to accept RDMA-CM connection: %s (code %d)\n", strerror(errno), errno);
@ -1143,7 +1155,6 @@ void osd_messenger_t::rdmacm_accept(rdma_cm_event *ev)
void osd_messenger_t::rdmacm_on_connect_peer_error(rdma_cm_id *cmid, int res) void osd_messenger_t::rdmacm_on_connect_peer_error(rdma_cm_id *cmid, int res)
{ {
assert(res < 0);
auto conn = rdmacm_connecting.at(cmid); auto conn = rdmacm_connecting.at(cmid);
auto addr = conn->addr; auto addr = conn->addr;
auto peer_port = conn->peer_port; auto peer_port = conn->peer_port;
@ -1162,13 +1173,21 @@ void osd_messenger_t::rdmacm_on_connect_peer_error(rdma_cm_id *cmid, int res)
} }
rdmacm_connecting.erase(cmid); rdmacm_connecting.erase(cmid);
delete conn; delete conn;
// Fall back to TCP instead of just reporting the error to on_connect_peer() if (!disable_tcp)
try_connect_peer_tcp(peer_osd, addr.c_str(), peer_port); {
// Fall back to TCP instead of just reporting the error to on_connect_peer()
try_connect_peer_tcp(peer_osd, addr.c_str(), peer_port);
}
else
{
// TCP is disabled
on_connect_peer(peer_osd, res == 0 ? -EINVAL : (res > 0 ? -res : res));
}
} }
void osd_messenger_t::rdmacm_try_connect_peer(uint64_t peer_osd, const std::string & addr, int peer_port) void osd_messenger_t::rdmacm_try_connect_peer(uint64_t peer_osd, const std::string & addr, int peer_port)
{ {
struct sockaddr_storage sa; struct sockaddr_storage sa = {};
if (!string_to_addr(addr, false, peer_port, &sa)) if (!string_to_addr(addr, false, peer_port, &sa))
{ {
fprintf(stderr, "Address %s is invalid\n", addr.c_str()); fprintf(stderr, "Address %s is invalid\n", addr.c_str());
@ -1178,8 +1197,12 @@ void osd_messenger_t::rdmacm_try_connect_peer(uint64_t peer_osd, const std::stri
rdma_cm_id *cmid = NULL; rdma_cm_id *cmid = NULL;
if (rdma_create_id(rdmacm_evch, &cmid, NULL, RDMA_PS_TCP) != 0) if (rdma_create_id(rdmacm_evch, &cmid, NULL, RDMA_PS_TCP) != 0)
{ {
int res = -errno;
fprintf(stderr, "Failed to create RDMA-CM ID: %s (code %d), using TCP\n", strerror(errno), errno); fprintf(stderr, "Failed to create RDMA-CM ID: %s (code %d), using TCP\n", strerror(errno), errno);
try_connect_peer_tcp(peer_osd, addr.c_str(), peer_port); if (!disable_tcp)
try_connect_peer_tcp(peer_osd, addr.c_str(), peer_port);
else
on_connect_peer(peer_osd, res);
return; return;
} }
// Make a fake FD (FIXME: do not use FDs for identifying clients!) // Make a fake FD (FIXME: do not use FDs for identifying clients!)
@ -1234,8 +1257,8 @@ void osd_messenger_t::rdmacm_address_resolved(rdma_cm_event *ev)
auto conn = conn_it->second; auto conn = conn_it->second;
if (ev->event != RDMA_CM_EVENT_ADDR_RESOLVED || ev->status != 0) if (ev->event != RDMA_CM_EVENT_ADDR_RESOLVED || ev->status != 0)
{ {
ev->status = (!ev->status ? -EIO : (ev->status > 0 ? -ev->status : ev->status)); fprintf(stderr, "Failed to resolve address %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(),
fprintf(stderr, "Failed to resolve address %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), strerror(-ev->status), ev->status); ev->status > 0 ? "unknown error" : strerror(-ev->status), ev->status);
rdmacm_on_connect_peer_error(cmid, ev->status); rdmacm_on_connect_peer_error(cmid, ev->status);
return; return;
} }
@ -1267,17 +1290,22 @@ void osd_messenger_t::rdmacm_route_resolved(rdma_cm_event *ev)
auto conn = conn_it->second; auto conn = conn_it->second;
if (ev->event != RDMA_CM_EVENT_ROUTE_RESOLVED || ev->status != 0) if (ev->event != RDMA_CM_EVENT_ROUTE_RESOLVED || ev->status != 0)
{ {
ev->status = (!ev->status ? -EIO : (ev->status > 0 ? -ev->status : ev->status)); fprintf(stderr, "Failed to resolve route to %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(),
fprintf(stderr, "Failed to resolve route to %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), strerror(-ev->status), ev->status); ev->status > 0 ? "unknown error" : strerror(-ev->status), ev->status);
rdmacm_on_connect_peer_error(cmid, ev->status); rdmacm_on_connect_peer_error(cmid, ev->status);
return; return;
} }
// We don't need private_data, RDMA_READ or ATOMIC so leave everything at 0 // We don't need private_data, RDMA_READ or ATOMIC so use default 1
rdma_conn_param conn_params = { .rnr_retry_count = 7 }; rdma_conn_param conn_params = {
.responder_resources = 1,
.initiator_depth = 1,
.retry_count = 7,
.rnr_retry_count = 7,
};
if (rdma_connect(cmid, &conn_params) != 0) if (rdma_connect(cmid, &conn_params) != 0)
{ {
int res = -errno; int res = -errno;
fprintf(stderr, "Failed to connect to %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), strerror(errno), errno); fprintf(stderr, "Failed to connect to %s:%d via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), conn->peer_port, strerror(errno), errno);
rdmacm_on_connect_peer_error(cmid, res); rdmacm_on_connect_peer_error(cmid, res);
return; return;
} }
@ -1296,8 +1324,8 @@ void osd_messenger_t::rdmacm_established(rdma_cm_event *ev)
auto peer_osd = conn->peer_osd; auto peer_osd = conn->peer_osd;
if (ev->event != RDMA_CM_EVENT_ESTABLISHED || ev->status != 0) if (ev->event != RDMA_CM_EVENT_ESTABLISHED || ev->status != 0)
{ {
ev->status = (!ev->status ? -EIO : (ev->status > 0 ? -ev->status : ev->status)); fprintf(stderr, "Failed to connect to %s:%d via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), conn->peer_port,
fprintf(stderr, "Failed to connect to %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), strerror(-ev->status), ev->status); ev->status > 0 ? "unknown error" : strerror(-ev->status), ev->status);
rdmacm_on_connect_peer_error(cmid, ev->status); rdmacm_on_connect_peer_error(cmid, ev->status);
return; return;
} }
@ -1329,7 +1357,7 @@ void osd_messenger_t::rdmacm_established(rdma_cm_event *ev)
rdmacm_connecting.erase(cmid); rdmacm_connecting.erase(cmid);
rdmacm_connections[cmid] = cl; rdmacm_connections[cmid] = cl;
if (log_level > 0) if (log_level > 0)
fprintf(stderr, "Successfully connected with OSD %lu using RDMA-CM\n", peer_osd); fprintf(stderr, "Successfully connected with OSD %ju using RDMA-CM\n", peer_osd);
// Add initial receive request(s) // Add initial receive request(s)
try_recv_rdma(cl); try_recv_rdma(cl);
osd_peer_fds[peer_osd] = cl->peer_fd; osd_peer_fds[peer_osd] = cl->peer_fd;

View File

@ -172,6 +172,11 @@ void osd_t::parse_config(bool init)
bind_port = config["bind_port"].uint64_value(); bind_port = config["bind_port"].uint64_value();
if (bind_port <= 0 || bind_port > 65535) if (bind_port <= 0 || bind_port > 65535)
bind_port = 0; bind_port = 0;
#ifdef WITH_RDMACM
// Use RDMA CM? (required for iWARP and may be useful for IB)
this->use_rdmacm = config["use_rdmacm"].bool_value() || config["use_rdmacm"].uint64_value() != 0;
this->disable_tcp = this->use_rdmacm && (config["disable_tcp"].bool_value() || config["disable_tcp"].uint64_value() != 0);
#endif
// OSD configuration // OSD configuration
etcd_report_interval = config["etcd_report_interval"].uint64_value(); etcd_report_interval = config["etcd_report_interval"].uint64_value();
if (etcd_report_interval <= 0) if (etcd_report_interval <= 0)
@ -340,22 +345,25 @@ void osd_t::bind_socket()
{ {
bind_addresses.push_back("0.0.0.0"); bind_addresses.push_back("0.0.0.0");
} }
for (auto & bind_address: bind_addresses) if (!disable_tcp)
{
int listen_fd = create_and_bind_socket(bind_address, listening_port ? listening_port : bind_port, listen_backlog, &listening_port);
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
{
msgr.accept_connections(fd);
});
listen_fds.push_back(listen_fd);
}
#ifdef WITH_RDMACM
if (msgr.is_use_rdmacm()) // FIXME: use json_is_true and move here
{ {
for (auto & bind_address: bind_addresses) for (auto & bind_address: bind_addresses)
{ {
auto listener = msgr.rdmacm_listen(bind_address, listening_port, log_level); int listen_fd = create_and_bind_socket(bind_address, listening_port ? listening_port : bind_port, listen_backlog, &listening_port);
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
{
msgr.accept_connections(fd);
});
listen_fds.push_back(listen_fd);
}
}
#ifdef WITH_RDMACM
if (use_rdmacm)
{
for (auto & bind_address: bind_addresses)
{
auto listener = msgr.rdmacm_listen(bind_address, listening_port, &listening_port, log_level);
if (listener) if (listener)
rdmacm_listeners.push_back(listener); rdmacm_listeners.push_back(listener);
} }

View File

@ -108,6 +108,8 @@ class osd_t
bool no_scrub = false; bool no_scrub = false;
bool allow_net_split = false; bool allow_net_split = false;
int bind_port, listen_backlog = 128; int bind_port, listen_backlog = 128;
bool use_rdmacm = false;
bool disable_tcp = false;
// FIXME: Implement client queue depth limit // FIXME: Implement client queue depth limit
int client_queue_depth = 128; int client_queue_depth = 128;
bool allow_test_ops = false; bool allow_test_ops = false;