Compare commits
2 Commits
a28d1a4a01
...
1e3f143892
Author | SHA1 | Date |
---|---|---|
|
1e3f143892 | |
|
97a6073265 |
|
@ -299,11 +299,10 @@ void osd_messenger_t::parse_config(const json11::Json & config)
|
|||
this->use_rdma = config["use_rdma"].bool_value() || config["use_rdma"].uint64_value() != 0;
|
||||
}
|
||||
#ifdef WITH_RDMACM
|
||||
if (!config["use_rdmacm"].is_null())
|
||||
{
|
||||
// Use RDMA CM (required for iWARP and may be useful for IB)
|
||||
this->use_rdmacm = config["use_rdmacm"].bool_value() || config["use_rdmacm"].uint64_value() != 0;
|
||||
}
|
||||
// Use RDMA CM? (required for iWARP and may be useful for IB)
|
||||
// FIXME: Only parse during start
|
||||
this->use_rdmacm = config["use_rdmacm"].bool_value() || config["use_rdmacm"].uint64_value() != 0;
|
||||
this->disable_tcp = this->use_rdmacm && (config["disable_tcp"].bool_value() || config["disable_tcp"].uint64_value() != 0);
|
||||
#endif
|
||||
if (!config["force_rdma"].is_null())
|
||||
{
|
||||
|
@ -467,6 +466,13 @@ void osd_messenger_t::try_connect_peer(uint64_t peer_osd)
|
|||
void osd_messenger_t::try_connect_peer_tcp(osd_num_t peer_osd, const char *peer_host, int peer_port)
|
||||
{
|
||||
assert(peer_osd != this->osd_num);
|
||||
#ifdef WITH_RDMACM
|
||||
if (disable_tcp)
|
||||
{
|
||||
on_connect_peer(peer_osd, -EINVAL);
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
struct sockaddr_storage addr;
|
||||
if (!string_to_addr(peer_host, 0, peer_port, &addr))
|
||||
{
|
||||
|
@ -637,7 +643,7 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
|
|||
if (!selected_ctx)
|
||||
{
|
||||
if (log_level > 0)
|
||||
fprintf(stderr, "No RDMA context for OSD %lu connection (peer %d), using only TCP\n", cl->osd_num, cl->peer_fd);
|
||||
fprintf(stderr, "No RDMA context for OSD %ju connection (peer %d), using only TCP\n", cl->osd_num, cl->peer_fd);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -179,6 +179,7 @@ protected:
|
|||
#ifdef WITH_RDMA
|
||||
bool use_rdma = true;
|
||||
bool use_rdmacm = false;
|
||||
bool disable_tcp = false;
|
||||
bool force_rdma = false;
|
||||
std::string rdma_device;
|
||||
uint64_t rdma_port_num = 1;
|
||||
|
@ -242,7 +243,7 @@ public:
|
|||
#endif
|
||||
#ifdef WITH_RDMACM
|
||||
bool is_use_rdmacm();
|
||||
rdma_cm_id *rdmacm_listen(const std::string & bind_address, int rdmacm_port, int log_level);
|
||||
rdma_cm_id *rdmacm_listen(const std::string & bind_address, int rdmacm_port, int *bound_port, int log_level);
|
||||
void rdmacm_destroy_listener(rdma_cm_id *listener);
|
||||
#endif
|
||||
|
||||
|
|
|
@ -863,8 +863,9 @@ struct rdmacm_connecting_t
|
|||
msgr_rdma_context_t *rdma_context = NULL;
|
||||
};
|
||||
|
||||
rdma_cm_id *osd_messenger_t::rdmacm_listen(const std::string & bind_address, int rdmacm_port, int log_level)
|
||||
rdma_cm_id *osd_messenger_t::rdmacm_listen(const std::string & bind_address, int rdmacm_port, int *bound_port, int log_level)
|
||||
{
|
||||
sockaddr_storage addr = {};
|
||||
rdma_cm_id *listener = NULL;
|
||||
int r = rdma_create_id(rdmacm_evch, &listener, NULL, RDMA_PS_TCP);
|
||||
if (r != 0)
|
||||
|
@ -872,7 +873,6 @@ rdma_cm_id *osd_messenger_t::rdmacm_listen(const std::string & bind_address, int
|
|||
fprintf(stderr, "Failed to create RDMA-CM ID: %s (code %d)\n", strerror(errno), errno);
|
||||
goto fail;
|
||||
}
|
||||
sockaddr_storage addr;
|
||||
if (!string_to_addr(bind_address, 0, rdmacm_port, &addr))
|
||||
{
|
||||
fprintf(stderr, "Server address: %s is not valid\n", bind_address.c_str());
|
||||
|
@ -890,9 +890,13 @@ rdma_cm_id *osd_messenger_t::rdmacm_listen(const std::string & bind_address, int
|
|||
fprintf(stderr, "Failed to listen to RDMA-CM address %s:%d: %s (code %d)\n", bind_address.c_str(), rdmacm_port, strerror(errno), errno);
|
||||
goto fail;
|
||||
}
|
||||
if (bound_port)
|
||||
{
|
||||
*bound_port = ntohs(rdma_get_src_port(listener));
|
||||
}
|
||||
if (log_level > 0)
|
||||
{
|
||||
fprintf(stderr, "Listening to RDMA-CM address %s port %d\n", bind_address.c_str(), rdmacm_port);
|
||||
fprintf(stderr, "Listening to RDMA-CM address %s port %d\n", bind_address.c_str(), *bound_port);
|
||||
}
|
||||
return listener;
|
||||
fail:
|
||||
|
@ -952,6 +956,8 @@ void osd_messenger_t::handle_rdmacm_events()
|
|||
}
|
||||
else if (rdmacm_connecting.find(ev->id) != rdmacm_connecting.end())
|
||||
{
|
||||
fprintf(stderr, "Received %s event for RDMA-CM OSD %ju connection\n",
|
||||
event_type_name, rdmacm_connecting[ev->id]->peer_osd);
|
||||
rdmacm_established(ev);
|
||||
}
|
||||
else
|
||||
|
@ -1097,6 +1103,7 @@ void osd_messenger_t::rdmacm_accept(rdma_cm_event *ev)
|
|||
int fake_fd = socket(AF_INET, SOCK_STREAM, 0);
|
||||
if (fake_fd < 0)
|
||||
{
|
||||
fprintf(stderr, "Failed to allocate a fake socket for RDMA-CM client: %s (code %d)\n", strerror(errno), errno);
|
||||
rdma_destroy_id(ev->id);
|
||||
return;
|
||||
}
|
||||
|
@ -1106,8 +1113,13 @@ void osd_messenger_t::rdmacm_accept(rdma_cm_event *ev)
|
|||
rdma_destroy_id(ev->id);
|
||||
return;
|
||||
}
|
||||
// We don't need private_data, RDMA_READ or ATOMIC so leave everything at 0
|
||||
rdma_conn_param conn_params = { .rnr_retry_count = 7 };
|
||||
// We don't need private_data, RDMA_READ or ATOMIC so use default 1
|
||||
rdma_conn_param conn_params = {
|
||||
.responder_resources = 1,
|
||||
.initiator_depth = 1,
|
||||
.retry_count = 7,
|
||||
.rnr_retry_count = 7,
|
||||
};
|
||||
if (rdma_accept(ev->id, &conn_params) != 0)
|
||||
{
|
||||
fprintf(stderr, "Failed to accept RDMA-CM connection: %s (code %d)\n", strerror(errno), errno);
|
||||
|
@ -1143,7 +1155,6 @@ void osd_messenger_t::rdmacm_accept(rdma_cm_event *ev)
|
|||
|
||||
void osd_messenger_t::rdmacm_on_connect_peer_error(rdma_cm_id *cmid, int res)
|
||||
{
|
||||
assert(res < 0);
|
||||
auto conn = rdmacm_connecting.at(cmid);
|
||||
auto addr = conn->addr;
|
||||
auto peer_port = conn->peer_port;
|
||||
|
@ -1162,13 +1173,21 @@ void osd_messenger_t::rdmacm_on_connect_peer_error(rdma_cm_id *cmid, int res)
|
|||
}
|
||||
rdmacm_connecting.erase(cmid);
|
||||
delete conn;
|
||||
// Fall back to TCP instead of just reporting the error to on_connect_peer()
|
||||
try_connect_peer_tcp(peer_osd, addr.c_str(), peer_port);
|
||||
if (!disable_tcp)
|
||||
{
|
||||
// Fall back to TCP instead of just reporting the error to on_connect_peer()
|
||||
try_connect_peer_tcp(peer_osd, addr.c_str(), peer_port);
|
||||
}
|
||||
else
|
||||
{
|
||||
// TCP is disabled
|
||||
on_connect_peer(peer_osd, res == 0 ? -EINVAL : (res > 0 ? -res : res));
|
||||
}
|
||||
}
|
||||
|
||||
void osd_messenger_t::rdmacm_try_connect_peer(uint64_t peer_osd, const std::string & addr, int peer_port)
|
||||
{
|
||||
struct sockaddr_storage sa;
|
||||
struct sockaddr_storage sa = {};
|
||||
if (!string_to_addr(addr, false, peer_port, &sa))
|
||||
{
|
||||
fprintf(stderr, "Address %s is invalid\n", addr.c_str());
|
||||
|
@ -1178,8 +1197,12 @@ void osd_messenger_t::rdmacm_try_connect_peer(uint64_t peer_osd, const std::stri
|
|||
rdma_cm_id *cmid = NULL;
|
||||
if (rdma_create_id(rdmacm_evch, &cmid, NULL, RDMA_PS_TCP) != 0)
|
||||
{
|
||||
int res = -errno;
|
||||
fprintf(stderr, "Failed to create RDMA-CM ID: %s (code %d), using TCP\n", strerror(errno), errno);
|
||||
try_connect_peer_tcp(peer_osd, addr.c_str(), peer_port);
|
||||
if (!disable_tcp)
|
||||
try_connect_peer_tcp(peer_osd, addr.c_str(), peer_port);
|
||||
else
|
||||
on_connect_peer(peer_osd, res);
|
||||
return;
|
||||
}
|
||||
// Make a fake FD (FIXME: do not use FDs for identifying clients!)
|
||||
|
@ -1234,8 +1257,8 @@ void osd_messenger_t::rdmacm_address_resolved(rdma_cm_event *ev)
|
|||
auto conn = conn_it->second;
|
||||
if (ev->event != RDMA_CM_EVENT_ADDR_RESOLVED || ev->status != 0)
|
||||
{
|
||||
ev->status = (!ev->status ? -EIO : (ev->status > 0 ? -ev->status : ev->status));
|
||||
fprintf(stderr, "Failed to resolve address %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), strerror(-ev->status), ev->status);
|
||||
fprintf(stderr, "Failed to resolve address %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(),
|
||||
ev->status > 0 ? "unknown error" : strerror(-ev->status), ev->status);
|
||||
rdmacm_on_connect_peer_error(cmid, ev->status);
|
||||
return;
|
||||
}
|
||||
|
@ -1267,17 +1290,22 @@ void osd_messenger_t::rdmacm_route_resolved(rdma_cm_event *ev)
|
|||
auto conn = conn_it->second;
|
||||
if (ev->event != RDMA_CM_EVENT_ROUTE_RESOLVED || ev->status != 0)
|
||||
{
|
||||
ev->status = (!ev->status ? -EIO : (ev->status > 0 ? -ev->status : ev->status));
|
||||
fprintf(stderr, "Failed to resolve route to %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), strerror(-ev->status), ev->status);
|
||||
fprintf(stderr, "Failed to resolve route to %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(),
|
||||
ev->status > 0 ? "unknown error" : strerror(-ev->status), ev->status);
|
||||
rdmacm_on_connect_peer_error(cmid, ev->status);
|
||||
return;
|
||||
}
|
||||
// We don't need private_data, RDMA_READ or ATOMIC so leave everything at 0
|
||||
rdma_conn_param conn_params = { .rnr_retry_count = 7 };
|
||||
// We don't need private_data, RDMA_READ or ATOMIC so use default 1
|
||||
rdma_conn_param conn_params = {
|
||||
.responder_resources = 1,
|
||||
.initiator_depth = 1,
|
||||
.retry_count = 7,
|
||||
.rnr_retry_count = 7,
|
||||
};
|
||||
if (rdma_connect(cmid, &conn_params) != 0)
|
||||
{
|
||||
int res = -errno;
|
||||
fprintf(stderr, "Failed to connect to %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), strerror(errno), errno);
|
||||
fprintf(stderr, "Failed to connect to %s:%d via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), conn->peer_port, strerror(errno), errno);
|
||||
rdmacm_on_connect_peer_error(cmid, res);
|
||||
return;
|
||||
}
|
||||
|
@ -1296,8 +1324,8 @@ void osd_messenger_t::rdmacm_established(rdma_cm_event *ev)
|
|||
auto peer_osd = conn->peer_osd;
|
||||
if (ev->event != RDMA_CM_EVENT_ESTABLISHED || ev->status != 0)
|
||||
{
|
||||
ev->status = (!ev->status ? -EIO : (ev->status > 0 ? -ev->status : ev->status));
|
||||
fprintf(stderr, "Failed to connect to %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), strerror(-ev->status), ev->status);
|
||||
fprintf(stderr, "Failed to connect to %s:%d via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), conn->peer_port,
|
||||
ev->status > 0 ? "unknown error" : strerror(-ev->status), ev->status);
|
||||
rdmacm_on_connect_peer_error(cmid, ev->status);
|
||||
return;
|
||||
}
|
||||
|
@ -1329,7 +1357,7 @@ void osd_messenger_t::rdmacm_established(rdma_cm_event *ev)
|
|||
rdmacm_connecting.erase(cmid);
|
||||
rdmacm_connections[cmid] = cl;
|
||||
if (log_level > 0)
|
||||
fprintf(stderr, "Successfully connected with OSD %lu using RDMA-CM\n", peer_osd);
|
||||
fprintf(stderr, "Successfully connected with OSD %ju using RDMA-CM\n", peer_osd);
|
||||
// Add initial receive request(s)
|
||||
try_recv_rdma(cl);
|
||||
osd_peer_fds[peer_osd] = cl->peer_fd;
|
||||
|
|
|
@ -172,6 +172,11 @@ void osd_t::parse_config(bool init)
|
|||
bind_port = config["bind_port"].uint64_value();
|
||||
if (bind_port <= 0 || bind_port > 65535)
|
||||
bind_port = 0;
|
||||
#ifdef WITH_RDMACM
|
||||
// Use RDMA CM? (required for iWARP and may be useful for IB)
|
||||
this->use_rdmacm = config["use_rdmacm"].bool_value() || config["use_rdmacm"].uint64_value() != 0;
|
||||
this->disable_tcp = this->use_rdmacm && (config["disable_tcp"].bool_value() || config["disable_tcp"].uint64_value() != 0);
|
||||
#endif
|
||||
// OSD configuration
|
||||
etcd_report_interval = config["etcd_report_interval"].uint64_value();
|
||||
if (etcd_report_interval <= 0)
|
||||
|
@ -340,22 +345,25 @@ void osd_t::bind_socket()
|
|||
{
|
||||
bind_addresses.push_back("0.0.0.0");
|
||||
}
|
||||
for (auto & bind_address: bind_addresses)
|
||||
{
|
||||
int listen_fd = create_and_bind_socket(bind_address, listening_port ? listening_port : bind_port, listen_backlog, &listening_port);
|
||||
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
|
||||
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
|
||||
{
|
||||
msgr.accept_connections(fd);
|
||||
});
|
||||
listen_fds.push_back(listen_fd);
|
||||
}
|
||||
#ifdef WITH_RDMACM
|
||||
if (msgr.is_use_rdmacm()) // FIXME: use json_is_true and move here
|
||||
if (!disable_tcp)
|
||||
{
|
||||
for (auto & bind_address: bind_addresses)
|
||||
{
|
||||
auto listener = msgr.rdmacm_listen(bind_address, listening_port, log_level);
|
||||
int listen_fd = create_and_bind_socket(bind_address, listening_port ? listening_port : bind_port, listen_backlog, &listening_port);
|
||||
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
|
||||
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
|
||||
{
|
||||
msgr.accept_connections(fd);
|
||||
});
|
||||
listen_fds.push_back(listen_fd);
|
||||
}
|
||||
}
|
||||
#ifdef WITH_RDMACM
|
||||
if (use_rdmacm)
|
||||
{
|
||||
for (auto & bind_address: bind_addresses)
|
||||
{
|
||||
auto listener = msgr.rdmacm_listen(bind_address, listening_port, &listening_port, log_level);
|
||||
if (listener)
|
||||
rdmacm_listeners.push_back(listener);
|
||||
}
|
||||
|
|
|
@ -108,6 +108,8 @@ class osd_t
|
|||
bool no_scrub = false;
|
||||
bool allow_net_split = false;
|
||||
int bind_port, listen_backlog = 128;
|
||||
bool use_rdmacm = false;
|
||||
bool disable_tcp = false;
|
||||
// FIXME: Implement client queue depth limit
|
||||
int client_queue_depth = 128;
|
||||
bool allow_test_ops = false;
|
||||
|
|
Loading…
Reference in New Issue