Compare commits
2 Commits
a28d1a4a01
...
1e3f143892
Author | SHA1 | Date |
---|---|---|
|
1e3f143892 | |
|
97a6073265 |
|
@ -299,11 +299,10 @@ void osd_messenger_t::parse_config(const json11::Json & config)
|
||||||
this->use_rdma = config["use_rdma"].bool_value() || config["use_rdma"].uint64_value() != 0;
|
this->use_rdma = config["use_rdma"].bool_value() || config["use_rdma"].uint64_value() != 0;
|
||||||
}
|
}
|
||||||
#ifdef WITH_RDMACM
|
#ifdef WITH_RDMACM
|
||||||
if (!config["use_rdmacm"].is_null())
|
// Use RDMA CM? (required for iWARP and may be useful for IB)
|
||||||
{
|
// FIXME: Only parse during start
|
||||||
// Use RDMA CM (required for iWARP and may be useful for IB)
|
this->use_rdmacm = config["use_rdmacm"].bool_value() || config["use_rdmacm"].uint64_value() != 0;
|
||||||
this->use_rdmacm = config["use_rdmacm"].bool_value() || config["use_rdmacm"].uint64_value() != 0;
|
this->disable_tcp = this->use_rdmacm && (config["disable_tcp"].bool_value() || config["disable_tcp"].uint64_value() != 0);
|
||||||
}
|
|
||||||
#endif
|
#endif
|
||||||
if (!config["force_rdma"].is_null())
|
if (!config["force_rdma"].is_null())
|
||||||
{
|
{
|
||||||
|
@ -467,6 +466,13 @@ void osd_messenger_t::try_connect_peer(uint64_t peer_osd)
|
||||||
void osd_messenger_t::try_connect_peer_tcp(osd_num_t peer_osd, const char *peer_host, int peer_port)
|
void osd_messenger_t::try_connect_peer_tcp(osd_num_t peer_osd, const char *peer_host, int peer_port)
|
||||||
{
|
{
|
||||||
assert(peer_osd != this->osd_num);
|
assert(peer_osd != this->osd_num);
|
||||||
|
#ifdef WITH_RDMACM
|
||||||
|
if (disable_tcp)
|
||||||
|
{
|
||||||
|
on_connect_peer(peer_osd, -EINVAL);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
struct sockaddr_storage addr;
|
struct sockaddr_storage addr;
|
||||||
if (!string_to_addr(peer_host, 0, peer_port, &addr))
|
if (!string_to_addr(peer_host, 0, peer_port, &addr))
|
||||||
{
|
{
|
||||||
|
@ -637,7 +643,7 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
|
||||||
if (!selected_ctx)
|
if (!selected_ctx)
|
||||||
{
|
{
|
||||||
if (log_level > 0)
|
if (log_level > 0)
|
||||||
fprintf(stderr, "No RDMA context for OSD %lu connection (peer %d), using only TCP\n", cl->osd_num, cl->peer_fd);
|
fprintf(stderr, "No RDMA context for OSD %ju connection (peer %d), using only TCP\n", cl->osd_num, cl->peer_fd);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
|
@ -179,6 +179,7 @@ protected:
|
||||||
#ifdef WITH_RDMA
|
#ifdef WITH_RDMA
|
||||||
bool use_rdma = true;
|
bool use_rdma = true;
|
||||||
bool use_rdmacm = false;
|
bool use_rdmacm = false;
|
||||||
|
bool disable_tcp = false;
|
||||||
bool force_rdma = false;
|
bool force_rdma = false;
|
||||||
std::string rdma_device;
|
std::string rdma_device;
|
||||||
uint64_t rdma_port_num = 1;
|
uint64_t rdma_port_num = 1;
|
||||||
|
@ -242,7 +243,7 @@ public:
|
||||||
#endif
|
#endif
|
||||||
#ifdef WITH_RDMACM
|
#ifdef WITH_RDMACM
|
||||||
bool is_use_rdmacm();
|
bool is_use_rdmacm();
|
||||||
rdma_cm_id *rdmacm_listen(const std::string & bind_address, int rdmacm_port, int log_level);
|
rdma_cm_id *rdmacm_listen(const std::string & bind_address, int rdmacm_port, int *bound_port, int log_level);
|
||||||
void rdmacm_destroy_listener(rdma_cm_id *listener);
|
void rdmacm_destroy_listener(rdma_cm_id *listener);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
|
@ -863,8 +863,9 @@ struct rdmacm_connecting_t
|
||||||
msgr_rdma_context_t *rdma_context = NULL;
|
msgr_rdma_context_t *rdma_context = NULL;
|
||||||
};
|
};
|
||||||
|
|
||||||
rdma_cm_id *osd_messenger_t::rdmacm_listen(const std::string & bind_address, int rdmacm_port, int log_level)
|
rdma_cm_id *osd_messenger_t::rdmacm_listen(const std::string & bind_address, int rdmacm_port, int *bound_port, int log_level)
|
||||||
{
|
{
|
||||||
|
sockaddr_storage addr = {};
|
||||||
rdma_cm_id *listener = NULL;
|
rdma_cm_id *listener = NULL;
|
||||||
int r = rdma_create_id(rdmacm_evch, &listener, NULL, RDMA_PS_TCP);
|
int r = rdma_create_id(rdmacm_evch, &listener, NULL, RDMA_PS_TCP);
|
||||||
if (r != 0)
|
if (r != 0)
|
||||||
|
@ -872,7 +873,6 @@ rdma_cm_id *osd_messenger_t::rdmacm_listen(const std::string & bind_address, int
|
||||||
fprintf(stderr, "Failed to create RDMA-CM ID: %s (code %d)\n", strerror(errno), errno);
|
fprintf(stderr, "Failed to create RDMA-CM ID: %s (code %d)\n", strerror(errno), errno);
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
sockaddr_storage addr;
|
|
||||||
if (!string_to_addr(bind_address, 0, rdmacm_port, &addr))
|
if (!string_to_addr(bind_address, 0, rdmacm_port, &addr))
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Server address: %s is not valid\n", bind_address.c_str());
|
fprintf(stderr, "Server address: %s is not valid\n", bind_address.c_str());
|
||||||
|
@ -890,9 +890,13 @@ rdma_cm_id *osd_messenger_t::rdmacm_listen(const std::string & bind_address, int
|
||||||
fprintf(stderr, "Failed to listen to RDMA-CM address %s:%d: %s (code %d)\n", bind_address.c_str(), rdmacm_port, strerror(errno), errno);
|
fprintf(stderr, "Failed to listen to RDMA-CM address %s:%d: %s (code %d)\n", bind_address.c_str(), rdmacm_port, strerror(errno), errno);
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
|
if (bound_port)
|
||||||
|
{
|
||||||
|
*bound_port = ntohs(rdma_get_src_port(listener));
|
||||||
|
}
|
||||||
if (log_level > 0)
|
if (log_level > 0)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Listening to RDMA-CM address %s port %d\n", bind_address.c_str(), rdmacm_port);
|
fprintf(stderr, "Listening to RDMA-CM address %s port %d\n", bind_address.c_str(), *bound_port);
|
||||||
}
|
}
|
||||||
return listener;
|
return listener;
|
||||||
fail:
|
fail:
|
||||||
|
@ -952,6 +956,8 @@ void osd_messenger_t::handle_rdmacm_events()
|
||||||
}
|
}
|
||||||
else if (rdmacm_connecting.find(ev->id) != rdmacm_connecting.end())
|
else if (rdmacm_connecting.find(ev->id) != rdmacm_connecting.end())
|
||||||
{
|
{
|
||||||
|
fprintf(stderr, "Received %s event for RDMA-CM OSD %ju connection\n",
|
||||||
|
event_type_name, rdmacm_connecting[ev->id]->peer_osd);
|
||||||
rdmacm_established(ev);
|
rdmacm_established(ev);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -1097,6 +1103,7 @@ void osd_messenger_t::rdmacm_accept(rdma_cm_event *ev)
|
||||||
int fake_fd = socket(AF_INET, SOCK_STREAM, 0);
|
int fake_fd = socket(AF_INET, SOCK_STREAM, 0);
|
||||||
if (fake_fd < 0)
|
if (fake_fd < 0)
|
||||||
{
|
{
|
||||||
|
fprintf(stderr, "Failed to allocate a fake socket for RDMA-CM client: %s (code %d)\n", strerror(errno), errno);
|
||||||
rdma_destroy_id(ev->id);
|
rdma_destroy_id(ev->id);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1106,8 +1113,13 @@ void osd_messenger_t::rdmacm_accept(rdma_cm_event *ev)
|
||||||
rdma_destroy_id(ev->id);
|
rdma_destroy_id(ev->id);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// We don't need private_data, RDMA_READ or ATOMIC so leave everything at 0
|
// We don't need private_data, RDMA_READ or ATOMIC so use default 1
|
||||||
rdma_conn_param conn_params = { .rnr_retry_count = 7 };
|
rdma_conn_param conn_params = {
|
||||||
|
.responder_resources = 1,
|
||||||
|
.initiator_depth = 1,
|
||||||
|
.retry_count = 7,
|
||||||
|
.rnr_retry_count = 7,
|
||||||
|
};
|
||||||
if (rdma_accept(ev->id, &conn_params) != 0)
|
if (rdma_accept(ev->id, &conn_params) != 0)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Failed to accept RDMA-CM connection: %s (code %d)\n", strerror(errno), errno);
|
fprintf(stderr, "Failed to accept RDMA-CM connection: %s (code %d)\n", strerror(errno), errno);
|
||||||
|
@ -1143,7 +1155,6 @@ void osd_messenger_t::rdmacm_accept(rdma_cm_event *ev)
|
||||||
|
|
||||||
void osd_messenger_t::rdmacm_on_connect_peer_error(rdma_cm_id *cmid, int res)
|
void osd_messenger_t::rdmacm_on_connect_peer_error(rdma_cm_id *cmid, int res)
|
||||||
{
|
{
|
||||||
assert(res < 0);
|
|
||||||
auto conn = rdmacm_connecting.at(cmid);
|
auto conn = rdmacm_connecting.at(cmid);
|
||||||
auto addr = conn->addr;
|
auto addr = conn->addr;
|
||||||
auto peer_port = conn->peer_port;
|
auto peer_port = conn->peer_port;
|
||||||
|
@ -1162,13 +1173,21 @@ void osd_messenger_t::rdmacm_on_connect_peer_error(rdma_cm_id *cmid, int res)
|
||||||
}
|
}
|
||||||
rdmacm_connecting.erase(cmid);
|
rdmacm_connecting.erase(cmid);
|
||||||
delete conn;
|
delete conn;
|
||||||
// Fall back to TCP instead of just reporting the error to on_connect_peer()
|
if (!disable_tcp)
|
||||||
try_connect_peer_tcp(peer_osd, addr.c_str(), peer_port);
|
{
|
||||||
|
// Fall back to TCP instead of just reporting the error to on_connect_peer()
|
||||||
|
try_connect_peer_tcp(peer_osd, addr.c_str(), peer_port);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// TCP is disabled
|
||||||
|
on_connect_peer(peer_osd, res == 0 ? -EINVAL : (res > 0 ? -res : res));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void osd_messenger_t::rdmacm_try_connect_peer(uint64_t peer_osd, const std::string & addr, int peer_port)
|
void osd_messenger_t::rdmacm_try_connect_peer(uint64_t peer_osd, const std::string & addr, int peer_port)
|
||||||
{
|
{
|
||||||
struct sockaddr_storage sa;
|
struct sockaddr_storage sa = {};
|
||||||
if (!string_to_addr(addr, false, peer_port, &sa))
|
if (!string_to_addr(addr, false, peer_port, &sa))
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Address %s is invalid\n", addr.c_str());
|
fprintf(stderr, "Address %s is invalid\n", addr.c_str());
|
||||||
|
@ -1178,8 +1197,12 @@ void osd_messenger_t::rdmacm_try_connect_peer(uint64_t peer_osd, const std::stri
|
||||||
rdma_cm_id *cmid = NULL;
|
rdma_cm_id *cmid = NULL;
|
||||||
if (rdma_create_id(rdmacm_evch, &cmid, NULL, RDMA_PS_TCP) != 0)
|
if (rdma_create_id(rdmacm_evch, &cmid, NULL, RDMA_PS_TCP) != 0)
|
||||||
{
|
{
|
||||||
|
int res = -errno;
|
||||||
fprintf(stderr, "Failed to create RDMA-CM ID: %s (code %d), using TCP\n", strerror(errno), errno);
|
fprintf(stderr, "Failed to create RDMA-CM ID: %s (code %d), using TCP\n", strerror(errno), errno);
|
||||||
try_connect_peer_tcp(peer_osd, addr.c_str(), peer_port);
|
if (!disable_tcp)
|
||||||
|
try_connect_peer_tcp(peer_osd, addr.c_str(), peer_port);
|
||||||
|
else
|
||||||
|
on_connect_peer(peer_osd, res);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// Make a fake FD (FIXME: do not use FDs for identifying clients!)
|
// Make a fake FD (FIXME: do not use FDs for identifying clients!)
|
||||||
|
@ -1234,8 +1257,8 @@ void osd_messenger_t::rdmacm_address_resolved(rdma_cm_event *ev)
|
||||||
auto conn = conn_it->second;
|
auto conn = conn_it->second;
|
||||||
if (ev->event != RDMA_CM_EVENT_ADDR_RESOLVED || ev->status != 0)
|
if (ev->event != RDMA_CM_EVENT_ADDR_RESOLVED || ev->status != 0)
|
||||||
{
|
{
|
||||||
ev->status = (!ev->status ? -EIO : (ev->status > 0 ? -ev->status : ev->status));
|
fprintf(stderr, "Failed to resolve address %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(),
|
||||||
fprintf(stderr, "Failed to resolve address %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), strerror(-ev->status), ev->status);
|
ev->status > 0 ? "unknown error" : strerror(-ev->status), ev->status);
|
||||||
rdmacm_on_connect_peer_error(cmid, ev->status);
|
rdmacm_on_connect_peer_error(cmid, ev->status);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1267,17 +1290,22 @@ void osd_messenger_t::rdmacm_route_resolved(rdma_cm_event *ev)
|
||||||
auto conn = conn_it->second;
|
auto conn = conn_it->second;
|
||||||
if (ev->event != RDMA_CM_EVENT_ROUTE_RESOLVED || ev->status != 0)
|
if (ev->event != RDMA_CM_EVENT_ROUTE_RESOLVED || ev->status != 0)
|
||||||
{
|
{
|
||||||
ev->status = (!ev->status ? -EIO : (ev->status > 0 ? -ev->status : ev->status));
|
fprintf(stderr, "Failed to resolve route to %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(),
|
||||||
fprintf(stderr, "Failed to resolve route to %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), strerror(-ev->status), ev->status);
|
ev->status > 0 ? "unknown error" : strerror(-ev->status), ev->status);
|
||||||
rdmacm_on_connect_peer_error(cmid, ev->status);
|
rdmacm_on_connect_peer_error(cmid, ev->status);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// We don't need private_data, RDMA_READ or ATOMIC so leave everything at 0
|
// We don't need private_data, RDMA_READ or ATOMIC so use default 1
|
||||||
rdma_conn_param conn_params = { .rnr_retry_count = 7 };
|
rdma_conn_param conn_params = {
|
||||||
|
.responder_resources = 1,
|
||||||
|
.initiator_depth = 1,
|
||||||
|
.retry_count = 7,
|
||||||
|
.rnr_retry_count = 7,
|
||||||
|
};
|
||||||
if (rdma_connect(cmid, &conn_params) != 0)
|
if (rdma_connect(cmid, &conn_params) != 0)
|
||||||
{
|
{
|
||||||
int res = -errno;
|
int res = -errno;
|
||||||
fprintf(stderr, "Failed to connect to %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), strerror(errno), errno);
|
fprintf(stderr, "Failed to connect to %s:%d via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), conn->peer_port, strerror(errno), errno);
|
||||||
rdmacm_on_connect_peer_error(cmid, res);
|
rdmacm_on_connect_peer_error(cmid, res);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1296,8 +1324,8 @@ void osd_messenger_t::rdmacm_established(rdma_cm_event *ev)
|
||||||
auto peer_osd = conn->peer_osd;
|
auto peer_osd = conn->peer_osd;
|
||||||
if (ev->event != RDMA_CM_EVENT_ESTABLISHED || ev->status != 0)
|
if (ev->event != RDMA_CM_EVENT_ESTABLISHED || ev->status != 0)
|
||||||
{
|
{
|
||||||
ev->status = (!ev->status ? -EIO : (ev->status > 0 ? -ev->status : ev->status));
|
fprintf(stderr, "Failed to connect to %s:%d via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), conn->peer_port,
|
||||||
fprintf(stderr, "Failed to connect to %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), strerror(-ev->status), ev->status);
|
ev->status > 0 ? "unknown error" : strerror(-ev->status), ev->status);
|
||||||
rdmacm_on_connect_peer_error(cmid, ev->status);
|
rdmacm_on_connect_peer_error(cmid, ev->status);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1329,7 +1357,7 @@ void osd_messenger_t::rdmacm_established(rdma_cm_event *ev)
|
||||||
rdmacm_connecting.erase(cmid);
|
rdmacm_connecting.erase(cmid);
|
||||||
rdmacm_connections[cmid] = cl;
|
rdmacm_connections[cmid] = cl;
|
||||||
if (log_level > 0)
|
if (log_level > 0)
|
||||||
fprintf(stderr, "Successfully connected with OSD %lu using RDMA-CM\n", peer_osd);
|
fprintf(stderr, "Successfully connected with OSD %ju using RDMA-CM\n", peer_osd);
|
||||||
// Add initial receive request(s)
|
// Add initial receive request(s)
|
||||||
try_recv_rdma(cl);
|
try_recv_rdma(cl);
|
||||||
osd_peer_fds[peer_osd] = cl->peer_fd;
|
osd_peer_fds[peer_osd] = cl->peer_fd;
|
||||||
|
|
|
@ -172,6 +172,11 @@ void osd_t::parse_config(bool init)
|
||||||
bind_port = config["bind_port"].uint64_value();
|
bind_port = config["bind_port"].uint64_value();
|
||||||
if (bind_port <= 0 || bind_port > 65535)
|
if (bind_port <= 0 || bind_port > 65535)
|
||||||
bind_port = 0;
|
bind_port = 0;
|
||||||
|
#ifdef WITH_RDMACM
|
||||||
|
// Use RDMA CM? (required for iWARP and may be useful for IB)
|
||||||
|
this->use_rdmacm = config["use_rdmacm"].bool_value() || config["use_rdmacm"].uint64_value() != 0;
|
||||||
|
this->disable_tcp = this->use_rdmacm && (config["disable_tcp"].bool_value() || config["disable_tcp"].uint64_value() != 0);
|
||||||
|
#endif
|
||||||
// OSD configuration
|
// OSD configuration
|
||||||
etcd_report_interval = config["etcd_report_interval"].uint64_value();
|
etcd_report_interval = config["etcd_report_interval"].uint64_value();
|
||||||
if (etcd_report_interval <= 0)
|
if (etcd_report_interval <= 0)
|
||||||
|
@ -340,22 +345,25 @@ void osd_t::bind_socket()
|
||||||
{
|
{
|
||||||
bind_addresses.push_back("0.0.0.0");
|
bind_addresses.push_back("0.0.0.0");
|
||||||
}
|
}
|
||||||
for (auto & bind_address: bind_addresses)
|
if (!disable_tcp)
|
||||||
{
|
|
||||||
int listen_fd = create_and_bind_socket(bind_address, listening_port ? listening_port : bind_port, listen_backlog, &listening_port);
|
|
||||||
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
|
|
||||||
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
|
|
||||||
{
|
|
||||||
msgr.accept_connections(fd);
|
|
||||||
});
|
|
||||||
listen_fds.push_back(listen_fd);
|
|
||||||
}
|
|
||||||
#ifdef WITH_RDMACM
|
|
||||||
if (msgr.is_use_rdmacm()) // FIXME: use json_is_true and move here
|
|
||||||
{
|
{
|
||||||
for (auto & bind_address: bind_addresses)
|
for (auto & bind_address: bind_addresses)
|
||||||
{
|
{
|
||||||
auto listener = msgr.rdmacm_listen(bind_address, listening_port, log_level);
|
int listen_fd = create_and_bind_socket(bind_address, listening_port ? listening_port : bind_port, listen_backlog, &listening_port);
|
||||||
|
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
|
||||||
|
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
|
||||||
|
{
|
||||||
|
msgr.accept_connections(fd);
|
||||||
|
});
|
||||||
|
listen_fds.push_back(listen_fd);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#ifdef WITH_RDMACM
|
||||||
|
if (use_rdmacm)
|
||||||
|
{
|
||||||
|
for (auto & bind_address: bind_addresses)
|
||||||
|
{
|
||||||
|
auto listener = msgr.rdmacm_listen(bind_address, listening_port, &listening_port, log_level);
|
||||||
if (listener)
|
if (listener)
|
||||||
rdmacm_listeners.push_back(listener);
|
rdmacm_listeners.push_back(listener);
|
||||||
}
|
}
|
||||||
|
|
|
@ -108,6 +108,8 @@ class osd_t
|
||||||
bool no_scrub = false;
|
bool no_scrub = false;
|
||||||
bool allow_net_split = false;
|
bool allow_net_split = false;
|
||||||
int bind_port, listen_backlog = 128;
|
int bind_port, listen_backlog = 128;
|
||||||
|
bool use_rdmacm = false;
|
||||||
|
bool disable_tcp = false;
|
||||||
// FIXME: Implement client queue depth limit
|
// FIXME: Implement client queue depth limit
|
||||||
int client_queue_depth = 128;
|
int client_queue_depth = 128;
|
||||||
bool allow_test_ops = false;
|
bool allow_test_ops = false;
|
||||||
|
|
Loading…
Reference in New Issue