Compare commits
3 Commits
4ea62d32c5
...
191eafd72a
Author | SHA1 | Date |
---|---|---|
|
191eafd72a | |
|
640926346a | |
|
8d6ab2300f |
|
@ -182,7 +182,7 @@ void etcd_state_client_t::add_etcd_url(std::string addr)
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
if (!local_ips.size())
|
if (!local_ips.size())
|
||||||
local_ips = getifaddr_list(std::vector<std::string>(), true);
|
local_ips = getifaddr_list(std::vector<addr_mask_t>(), true);
|
||||||
std::string check_addr;
|
std::string check_addr;
|
||||||
int pos = addr.find('/');
|
int pos = addr.find('/');
|
||||||
int pos2 = addr.find(':');
|
int pos2 = addr.find(':');
|
||||||
|
|
|
@ -120,26 +120,33 @@ void osd_messenger_t::init()
|
||||||
#ifdef WITH_RDMA
|
#ifdef WITH_RDMA
|
||||||
if (use_rdma)
|
if (use_rdma)
|
||||||
{
|
{
|
||||||
rdma_context = msgr_rdma_context_t::create(
|
rdma_contexts = msgr_rdma_context_t::create_all(
|
||||||
osd_networks, rdma_device != "" ? rdma_device.c_str() : NULL,
|
osd_num && osd_cluster_network_masks.size() ? osd_cluster_network_masks : osd_network_masks,
|
||||||
|
rdma_device != "" ? rdma_device.c_str() : NULL,
|
||||||
rdma_port_num, rdma_gid_index, rdma_mtu, rdma_odp, log_level
|
rdma_port_num, rdma_gid_index, rdma_mtu, rdma_odp, log_level
|
||||||
);
|
);
|
||||||
if (!rdma_context)
|
if (!rdma_contexts.size())
|
||||||
{
|
{
|
||||||
|
if (force_rdma)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "[OSD %ju] Couldn't initialize RDMA, force_rdma is enabled, exiting\n", osd_num);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
if (log_level > 0)
|
if (log_level > 0)
|
||||||
fprintf(stderr, "[OSD %ju] Couldn't initialize RDMA, proceeding with TCP only\n", osd_num);
|
fprintf(stderr, "[OSD %ju] Couldn't initialize RDMA, proceeding with TCP only\n", osd_num);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
rdma_max_sge = rdma_max_sge < rdma_context->attrx.orig_attr.max_sge
|
|
||||||
? rdma_max_sge : rdma_context->attrx.orig_attr.max_sge;
|
|
||||||
fprintf(stderr, "[OSD %ju] RDMA initialized successfully\n", osd_num);
|
fprintf(stderr, "[OSD %ju] RDMA initialized successfully\n", osd_num);
|
||||||
fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK);
|
for (msgr_rdma_context_t* rdma_context: rdma_contexts)
|
||||||
tfd->set_fd_handler(rdma_context->channel->fd, false, [this](int notify_fd, int epoll_events)
|
|
||||||
{
|
{
|
||||||
handle_rdma_events();
|
fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK);
|
||||||
});
|
tfd->set_fd_handler(rdma_context->channel->fd, false, [this, rdma_context](int notify_fd, int epoll_events)
|
||||||
handle_rdma_events();
|
{
|
||||||
|
handle_rdma_events(rdma_context);
|
||||||
|
});
|
||||||
|
handle_rdma_events(rdma_context);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
@ -247,10 +254,11 @@ osd_messenger_t::~osd_messenger_t()
|
||||||
iothreads.clear();
|
iothreads.clear();
|
||||||
}
|
}
|
||||||
#ifdef WITH_RDMA
|
#ifdef WITH_RDMA
|
||||||
if (rdma_context)
|
for (auto rdma_context: rdma_contexts)
|
||||||
{
|
{
|
||||||
delete rdma_context;
|
delete rdma_context;
|
||||||
}
|
}
|
||||||
|
rdma_contexts.clear();
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -262,10 +270,12 @@ void osd_messenger_t::parse_config(const json11::Json & config)
|
||||||
// RDMA is on by default in RDMA-enabled builds
|
// RDMA is on by default in RDMA-enabled builds
|
||||||
this->use_rdma = config["use_rdma"].bool_value() || config["use_rdma"].uint64_value() != 0;
|
this->use_rdma = config["use_rdma"].bool_value() || config["use_rdma"].uint64_value() != 0;
|
||||||
}
|
}
|
||||||
|
if (!config["force_rdma"].is_null())
|
||||||
|
{
|
||||||
|
this->force_rdma = config["force_rdma"].bool_value() || config["force_rdma"].uint64_value() != 0;
|
||||||
|
}
|
||||||
this->rdma_device = config["rdma_device"].string_value();
|
this->rdma_device = config["rdma_device"].string_value();
|
||||||
this->rdma_port_num = (uint8_t)config["rdma_port_num"].uint64_value();
|
this->rdma_port_num = (uint8_t)config["rdma_port_num"].uint64_value();
|
||||||
if (!this->rdma_port_num)
|
|
||||||
this->rdma_port_num = 1;
|
|
||||||
if (!config["rdma_gid_index"].is_null())
|
if (!config["rdma_gid_index"].is_null())
|
||||||
this->rdma_gid_index = (uint8_t)config["rdma_gid_index"].uint64_value();
|
this->rdma_gid_index = (uint8_t)config["rdma_gid_index"].uint64_value();
|
||||||
this->rdma_mtu = (uint32_t)config["rdma_mtu"].uint64_value();
|
this->rdma_mtu = (uint32_t)config["rdma_mtu"].uint64_value();
|
||||||
|
@ -282,15 +292,6 @@ void osd_messenger_t::parse_config(const json11::Json & config)
|
||||||
if (!this->rdma_max_msg || this->rdma_max_msg > 128*1024*1024)
|
if (!this->rdma_max_msg || this->rdma_max_msg > 128*1024*1024)
|
||||||
this->rdma_max_msg = 129*1024;
|
this->rdma_max_msg = 129*1024;
|
||||||
this->rdma_odp = config["rdma_odp"].bool_value();
|
this->rdma_odp = config["rdma_odp"].bool_value();
|
||||||
std::vector<std::string> mask;
|
|
||||||
if (config["bind_address"].is_string())
|
|
||||||
mask.push_back(config["bind_address"].string_value());
|
|
||||||
else if (config["osd_network"].is_string())
|
|
||||||
mask.push_back(config["osd_network"].string_value());
|
|
||||||
else
|
|
||||||
for (auto v: config["osd_network"].array_items())
|
|
||||||
mask.push_back(v.string_value());
|
|
||||||
this->osd_networks = mask;
|
|
||||||
#endif
|
#endif
|
||||||
if (!osd_num)
|
if (!osd_num)
|
||||||
this->iothread_count = (uint32_t)config["client_iothread_count"].uint64_value();
|
this->iothread_count = (uint32_t)config["client_iothread_count"].uint64_value();
|
||||||
|
@ -314,23 +315,81 @@ void osd_messenger_t::parse_config(const json11::Json & config)
|
||||||
if (!this->osd_ping_timeout)
|
if (!this->osd_ping_timeout)
|
||||||
this->osd_ping_timeout = 5;
|
this->osd_ping_timeout = 5;
|
||||||
this->log_level = config["log_level"].uint64_value();
|
this->log_level = config["log_level"].uint64_value();
|
||||||
|
// OSD public & cluster networks
|
||||||
|
this->osd_networks.clear();
|
||||||
|
if (config["bind_address"].is_string())
|
||||||
|
this->osd_networks.push_back(config["bind_address"].string_value());
|
||||||
|
else if (config["osd_network"].is_string())
|
||||||
|
this->osd_networks.push_back(config["osd_network"].string_value());
|
||||||
|
else
|
||||||
|
for (auto v: config["osd_network"].array_items())
|
||||||
|
this->osd_networks.push_back(v.string_value());
|
||||||
|
this->osd_cluster_networks.clear();
|
||||||
|
if (config["osd_cluster_network"].is_string())
|
||||||
|
this->osd_cluster_networks.push_back(config["osd_cluster_network"].string_value());
|
||||||
|
else
|
||||||
|
for (auto v: config["osd_cluster_network"].array_items())
|
||||||
|
this->osd_cluster_networks.push_back(v.string_value());
|
||||||
|
if (this->osd_cluster_networks.size())
|
||||||
|
for (auto & net: this->osd_cluster_networks)
|
||||||
|
for (int i = this->osd_networks.size()-1; i >= 0; i--)
|
||||||
|
if (this->osd_networks[i] == net)
|
||||||
|
this->osd_networks.erase(this->osd_networks.begin()+i, this->osd_networks.begin()+i+1);
|
||||||
|
this->osd_network_masks.clear();
|
||||||
|
for (auto & netstr: this->osd_networks)
|
||||||
|
this->osd_network_masks.push_back(cidr_parse(netstr));
|
||||||
|
this->osd_cluster_network_masks.clear();
|
||||||
|
for (auto & netstr: this->osd_cluster_networks)
|
||||||
|
this->osd_cluster_network_masks.push_back(cidr_parse(netstr));
|
||||||
|
this->all_osd_networks.insert(this->all_osd_networks.end(), this->osd_networks.begin(), this->osd_networks.end());
|
||||||
|
this->all_osd_networks.insert(this->all_osd_networks.end(), this->osd_cluster_networks.begin(), this->osd_cluster_networks.end());
|
||||||
|
if (!this->osd_networks.size())
|
||||||
|
{
|
||||||
|
this->osd_networks = this->osd_cluster_networks;
|
||||||
|
this->osd_network_masks = this->osd_cluster_network_masks;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void osd_messenger_t::connect_peer(uint64_t peer_osd, json11::Json peer_state)
|
void osd_messenger_t::connect_peer(uint64_t peer_osd, json11::Json peer_state)
|
||||||
{
|
{
|
||||||
if (wanted_peers.find(peer_osd) == wanted_peers.end())
|
if (wanted_peers[peer_osd].raw_address_list != peer_state["addresses"])
|
||||||
{
|
{
|
||||||
wanted_peers[peer_osd] = (osd_wanted_peer_t){
|
wanted_peers[peer_osd].raw_address_list = peer_state["addresses"];
|
||||||
.address_list = peer_state["addresses"],
|
if (osd_cluster_networks.size())
|
||||||
.port = (int)peer_state["port"].int64_value(),
|
{
|
||||||
};
|
json11::Json::array address_list, cluster_address_list;
|
||||||
|
for (auto json_addr: peer_state["addresses"].array_items())
|
||||||
|
{
|
||||||
|
struct sockaddr_storage addr;
|
||||||
|
auto ok = string_to_addr(json_addr.string_value(), false, 0, &addr);
|
||||||
|
if (ok)
|
||||||
|
{
|
||||||
|
bool is_cluster = false;
|
||||||
|
for (auto & mask: osd_cluster_network_masks)
|
||||||
|
{
|
||||||
|
if (cidr_sockaddr_match(addr, mask))
|
||||||
|
{
|
||||||
|
is_cluster = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (is_cluster)
|
||||||
|
cluster_address_list.push_back(json_addr);
|
||||||
|
else
|
||||||
|
address_list.push_back(json_addr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
auto n_cluster = this->osd_num ? cluster_address_list.size() : 0;
|
||||||
|
if (this->osd_num)
|
||||||
|
address_list.insert(address_list.begin(), cluster_address_list.begin(), cluster_address_list.end());
|
||||||
|
wanted_peers[peer_osd].address_list = address_list;
|
||||||
|
wanted_peers[peer_osd].n_cluster_addr = n_cluster;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
wanted_peers[peer_osd].address_list = peer_state["addresses"];
|
||||||
|
wanted_peers[peer_osd].address_changed = true;
|
||||||
}
|
}
|
||||||
else
|
wanted_peers[peer_osd].port = (int)peer_state["port"].int64_value();
|
||||||
{
|
|
||||||
wanted_peers[peer_osd].address_list = peer_state["addresses"];
|
|
||||||
wanted_peers[peer_osd].port = (int)peer_state["port"].int64_value();
|
|
||||||
}
|
|
||||||
wanted_peers[peer_osd].address_changed = true;
|
|
||||||
try_connect_peer(peer_osd);
|
try_connect_peer(peer_osd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -524,20 +583,30 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
#ifdef WITH_RDMA
|
#ifdef WITH_RDMA
|
||||||
if (rdma_context)
|
if (rdma_contexts.size())
|
||||||
{
|
{
|
||||||
cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg);
|
// Choose the right context for the selected network
|
||||||
if (cl->rdma_conn)
|
msgr_rdma_context_t *selected_ctx = choose_rdma_context(cl);
|
||||||
|
if (!selected_ctx)
|
||||||
{
|
{
|
||||||
json11::Json payload = json11::Json::object {
|
if (log_level > 0)
|
||||||
{ "connect_rdma", cl->rdma_conn->addr.to_string() },
|
fprintf(stderr, "No RDMA context for OSD %lu connection (peer %d), using only TCP\n", cl->osd_num, cl->peer_fd);
|
||||||
{ "rdma_max_msg", cl->rdma_conn->max_msg },
|
}
|
||||||
};
|
else
|
||||||
std::string payload_str = payload.dump();
|
{
|
||||||
op->req.show_conf.json_len = payload_str.size();
|
cl->rdma_conn = msgr_rdma_connection_t::create(selected_ctx, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg);
|
||||||
op->buf = malloc_or_die(payload_str.size());
|
if (cl->rdma_conn)
|
||||||
op->iov.push_back(op->buf, payload_str.size());
|
{
|
||||||
memcpy(op->buf, payload_str.c_str(), payload_str.size());
|
json11::Json payload = json11::Json::object {
|
||||||
|
{ "connect_rdma", cl->rdma_conn->addr.to_string() },
|
||||||
|
{ "rdma_max_msg", cl->rdma_conn->max_msg },
|
||||||
|
};
|
||||||
|
std::string payload_str = payload.dump();
|
||||||
|
op->req.show_conf.json_len = payload_str.size();
|
||||||
|
op->buf = malloc_or_die(payload_str.size());
|
||||||
|
op->iov.push_back(op->buf, payload_str.size());
|
||||||
|
memcpy(op->buf, payload_str.c_str(), payload_str.size());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
@ -667,9 +736,24 @@ void osd_messenger_t::accept_connections(int listen_fd)
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef WITH_RDMA
|
#ifdef WITH_RDMA
|
||||||
|
msgr_rdma_context_t* osd_messenger_t::choose_rdma_context(osd_client_t *cl)
|
||||||
|
{
|
||||||
|
// Choose the right context for the selected network
|
||||||
|
msgr_rdma_context_t *selected_ctx = NULL;
|
||||||
|
for (auto rdma_ctx: rdma_contexts)
|
||||||
|
{
|
||||||
|
if (!rdma_ctx->net_mask.family && !selected_ctx ||
|
||||||
|
rdma_ctx->net_mask.family && cidr_sockaddr_match(cl->peer_addr, rdma_ctx->net_mask))
|
||||||
|
{
|
||||||
|
selected_ctx = rdma_ctx;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return selected_ctx;
|
||||||
|
}
|
||||||
|
|
||||||
bool osd_messenger_t::is_rdma_enabled()
|
bool osd_messenger_t::is_rdma_enabled()
|
||||||
{
|
{
|
||||||
return rdma_context != NULL;
|
return rdma_contexts.size() > 0;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#include "json11/json11.hpp"
|
#include "json11/json11.hpp"
|
||||||
#include "msgr_op.h"
|
#include "msgr_op.h"
|
||||||
#include "timerfd_manager.h"
|
#include "timerfd_manager.h"
|
||||||
|
#include "addr_util.h"
|
||||||
#include <ringloop.h>
|
#include <ringloop.h>
|
||||||
|
|
||||||
#define CL_READ_HDR 1
|
#define CL_READ_HDR 1
|
||||||
|
@ -93,13 +94,15 @@ struct osd_client_t
|
||||||
|
|
||||||
struct osd_wanted_peer_t
|
struct osd_wanted_peer_t
|
||||||
{
|
{
|
||||||
|
json11::Json raw_address_list;
|
||||||
json11::Json address_list;
|
json11::Json address_list;
|
||||||
int port;
|
int n_cluster_addr = 0;
|
||||||
time_t last_connect_attempt;
|
int port = 0;
|
||||||
bool connecting, address_changed;
|
time_t last_connect_attempt = 0;
|
||||||
int address_index;
|
bool connecting = false, address_changed = false;
|
||||||
|
int address_index = 0;
|
||||||
std::string cur_addr;
|
std::string cur_addr;
|
||||||
int cur_port;
|
int cur_port = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct osd_op_stats_t
|
struct osd_op_stats_t
|
||||||
|
@ -165,11 +168,12 @@ protected:
|
||||||
|
|
||||||
#ifdef WITH_RDMA
|
#ifdef WITH_RDMA
|
||||||
bool use_rdma = true;
|
bool use_rdma = true;
|
||||||
std::vector<std::string> osd_networks;
|
bool force_rdma = false;
|
||||||
std::string rdma_device;
|
std::string rdma_device;
|
||||||
uint64_t rdma_port_num = 1, rdma_mtu = 0;
|
uint64_t rdma_port_num = 1;
|
||||||
|
int rdma_mtu = 0;
|
||||||
int rdma_gid_index = -1;
|
int rdma_gid_index = -1;
|
||||||
msgr_rdma_context_t *rdma_context = NULL;
|
std::vector<msgr_rdma_context_t *> rdma_contexts;
|
||||||
uint64_t rdma_max_sge = 0, rdma_max_send = 0, rdma_max_recv = 0;
|
uint64_t rdma_max_sge = 0, rdma_max_send = 0, rdma_max_recv = 0;
|
||||||
uint64_t rdma_max_msg = 0;
|
uint64_t rdma_max_msg = 0;
|
||||||
bool rdma_odp = false;
|
bool rdma_odp = false;
|
||||||
|
@ -190,6 +194,12 @@ public:
|
||||||
std::map<int, osd_client_t*> clients;
|
std::map<int, osd_client_t*> clients;
|
||||||
std::map<osd_num_t, osd_wanted_peer_t> wanted_peers;
|
std::map<osd_num_t, osd_wanted_peer_t> wanted_peers;
|
||||||
std::map<uint64_t, int> osd_peer_fds;
|
std::map<uint64_t, int> osd_peer_fds;
|
||||||
|
std::vector<std::string> osd_networks;
|
||||||
|
std::vector<addr_mask_t> osd_network_masks;
|
||||||
|
std::vector<std::string> osd_cluster_networks;
|
||||||
|
std::vector<addr_mask_t> osd_cluster_network_masks;
|
||||||
|
std::vector<std::string> all_osd_networks;
|
||||||
|
std::vector<addr_mask_t> all_osd_network_masks;
|
||||||
// op statistics
|
// op statistics
|
||||||
osd_op_stats_t stats, recovery_stats;
|
osd_op_stats_t stats, recovery_stats;
|
||||||
|
|
||||||
|
@ -247,6 +257,7 @@ protected:
|
||||||
void try_send_rdma_odp(osd_client_t *cl);
|
void try_send_rdma_odp(osd_client_t *cl);
|
||||||
void try_send_rdma_nodp(osd_client_t *cl);
|
void try_send_rdma_nodp(osd_client_t *cl);
|
||||||
bool try_recv_rdma(osd_client_t *cl);
|
bool try_recv_rdma(osd_client_t *cl);
|
||||||
void handle_rdma_events();
|
void handle_rdma_events(msgr_rdma_context_t *rdma_context);
|
||||||
|
msgr_rdma_context_t* choose_rdma_context(osd_client_t *cl);
|
||||||
#endif
|
#endif
|
||||||
};
|
};
|
||||||
|
|
|
@ -3,7 +3,6 @@
|
||||||
|
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include "addr_util.h"
|
|
||||||
#include "msgr_rdma.h"
|
#include "msgr_rdma.h"
|
||||||
#include "messenger.h"
|
#include "messenger.h"
|
||||||
|
|
||||||
|
@ -77,21 +76,21 @@ static bool is_ipv4_gid(ibv_gid_entry *gidx)
|
||||||
((uint32_t*)gidx->gid.raw)[2] == 0xffff0000);
|
((uint32_t*)gidx->gid.raw)[2] == 0xffff0000);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool match_gid(ibv_gid_entry *gidx, addr_mask_t *networks, int nnet)
|
static int match_gid(ibv_gid_entry *gidx, const addr_mask_t *networks, int nnet)
|
||||||
{
|
{
|
||||||
if (gidx->gid_type != IBV_GID_TYPE_ROCE_V1 &&
|
if (gidx->gid_type != IBV_GID_TYPE_ROCE_V1 &&
|
||||||
gidx->gid_type != IBV_GID_TYPE_ROCE_V2 ||
|
gidx->gid_type != IBV_GID_TYPE_ROCE_V2 ||
|
||||||
((uint64_t*)gidx->gid.raw)[0] == 0 &&
|
((uint64_t*)gidx->gid.raw)[0] == 0 &&
|
||||||
((uint64_t*)gidx->gid.raw)[1] == 0)
|
((uint64_t*)gidx->gid.raw)[1] == 0)
|
||||||
{
|
{
|
||||||
return false;
|
return -1;
|
||||||
}
|
}
|
||||||
if (is_ipv4_gid(gidx))
|
if (is_ipv4_gid(gidx))
|
||||||
{
|
{
|
||||||
for (int i = 0; i < nnet; i++)
|
for (int i = 0; i < nnet; i++)
|
||||||
{
|
{
|
||||||
if (networks[i].family == AF_INET && cidr_match(*(in_addr*)(gidx->gid.raw+12), networks[i].ipv4, networks[i].bits))
|
if (networks[i].family == AF_INET && cidr_match(*(in_addr*)(gidx->gid.raw+12), networks[i].ipv4, networks[i].bits))
|
||||||
return true;
|
return i;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -99,119 +98,67 @@ static bool match_gid(ibv_gid_entry *gidx, addr_mask_t *networks, int nnet)
|
||||||
for (int i = 0; i < nnet; i++)
|
for (int i = 0; i < nnet; i++)
|
||||||
{
|
{
|
||||||
if (networks[i].family == AF_INET6 && cidr6_match(*(in6_addr*)gidx->gid.raw, networks[i].ipv6, networks[i].bits))
|
if (networks[i].family == AF_INET6 && cidr6_match(*(in6_addr*)gidx->gid.raw, networks[i].ipv6, networks[i].bits))
|
||||||
return true;
|
return i;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct matched_dev
|
static void log_rdma_dev_port_gid(ibv_device *dev, int ib_port, int gid_index, int mtu, ibv_gid_entry & gidx)
|
||||||
{
|
|
||||||
int dev = -1;
|
|
||||||
int port = -1;
|
|
||||||
int gid = -1;
|
|
||||||
bool rocev2 = false;
|
|
||||||
};
|
|
||||||
|
|
||||||
static void log_rdma_dev_port_gid(ibv_device *dev, int ib_port, int gid_index, ibv_gid_entry & gidx)
|
|
||||||
{
|
{
|
||||||
bool is4 = ((uint64_t*)gidx.gid.raw)[0] == 0 && ((uint32_t*)gidx.gid.raw)[2] == 0xffff0000;
|
bool is4 = ((uint64_t*)gidx.gid.raw)[0] == 0 && ((uint32_t*)gidx.gid.raw)[2] == 0xffff0000;
|
||||||
char buf[256];
|
char buf[256];
|
||||||
inet_ntop(is4 ? AF_INET : AF_INET6, is4 ? gidx.gid.raw+12 : gidx.gid.raw, buf, sizeof(buf));
|
inet_ntop(is4 ? AF_INET : AF_INET6, is4 ? gidx.gid.raw+12 : gidx.gid.raw, buf, sizeof(buf));
|
||||||
fprintf(
|
fprintf(
|
||||||
stderr, "Auto-selected RDMA device %s port %d GID %d - ROCEv%d IPv%d %s\n",
|
stderr, "Selected RDMA device %s port %d GID %d - ROCEv%d IPv%d %s, MTU %d\n",
|
||||||
ibv_get_device_name(dev), ib_port, gid_index,
|
ibv_get_device_name(dev), ib_port, gid_index,
|
||||||
gidx.gid_type == IBV_GID_TYPE_ROCE_V2 ? 2 : 1, is4 ? 4 : 6, buf
|
gidx.gid_type == IBV_GID_TYPE_ROCE_V2 ? 2 : 1, is4 ? 4 : 6, buf, mtu
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
static matched_dev match_device(ibv_device **dev_list, addr_mask_t *networks, int nnet, int log_level)
|
static int match_port_gid(const std::vector<addr_mask_t> & osd_network_masks, ibv_context *context,
|
||||||
|
int port_num, int gid_count, int log_level, ibv_gid_entry *best_gidx, int *net_num)
|
||||||
{
|
{
|
||||||
matched_dev best;
|
// Try to find a port with matching address
|
||||||
ibv_device_attr attr;
|
int best_gid_idx = -1, res = 0;
|
||||||
ibv_port_attr portinfo;
|
for (int k = 0; k < gid_count; k++)
|
||||||
ibv_gid_entry best_gidx;
|
|
||||||
int res;
|
|
||||||
bool have_non_roce = false, have_roce = false;
|
|
||||||
for (int i = 0; dev_list[i]; ++i)
|
|
||||||
{
|
{
|
||||||
auto dev = dev_list[i];
|
ibv_gid_entry gidx;
|
||||||
ibv_context *context = ibv_open_device(dev_list[i]);
|
if ((res = ibv_query_gid_ex(context, port_num, k, &gidx, 0)) != 0)
|
||||||
if ((res = ibv_query_device(context, &attr)) != 0)
|
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Couldn't query RDMA device %s for its features: %s\n", ibv_get_device_name(dev_list[i]), strerror(res));
|
if (res != ENODATA)
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
for (int j = 1; j <= attr.phys_port_cnt; j++)
|
|
||||||
{
|
|
||||||
// Try to find a port with matching address
|
|
||||||
if ((res = ibv_query_port(context, j, &portinfo)) != 0)
|
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Couldn't get RDMA device %s port %d info: %s\n", ibv_get_device_name(dev), j, strerror(res));
|
fprintf(stderr, "Couldn't read RDMA device %s GID index %d: %s\n", ibv_get_device_name(context->device), k, strerror(res));
|
||||||
goto cleanup;
|
continue;
|
||||||
}
|
}
|
||||||
for (int k = 0; k < portinfo.gid_tbl_len; k++)
|
else
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if ((res = match_gid(&gidx, osd_network_masks.data(), osd_network_masks.size())) >= 0)
|
||||||
|
{
|
||||||
|
// Prefer RoCEv2
|
||||||
|
if (best_gid_idx < 0 || best_gidx->gid_type != IBV_GID_TYPE_ROCE_V2 && gidx.gid_type == IBV_GID_TYPE_ROCE_V2)
|
||||||
{
|
{
|
||||||
ibv_gid_entry gidx;
|
best_gid_idx = k;
|
||||||
if ((res = ibv_query_gid_ex(context, j, k, &gidx, 0)) != 0)
|
*best_gidx = gidx;
|
||||||
{
|
*net_num = res;
|
||||||
if (res != ENODATA)
|
|
||||||
{
|
|
||||||
fprintf(stderr, "Couldn't read RDMA device %s GID index %d: %s\n", ibv_get_device_name(dev), k, strerror(res));
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (gidx.gid_type != IBV_GID_TYPE_ROCE_V1 &&
|
|
||||||
gidx.gid_type != IBV_GID_TYPE_ROCE_V2)
|
|
||||||
have_non_roce = true;
|
|
||||||
else
|
|
||||||
have_roce = true;
|
|
||||||
if (match_gid(&gidx, networks, nnet))
|
|
||||||
{
|
|
||||||
// Prefer RoCEv2
|
|
||||||
if (!best.rocev2)
|
|
||||||
{
|
|
||||||
best.dev = i;
|
|
||||||
best.port = j;
|
|
||||||
best.gid = k;
|
|
||||||
best.rocev2 = (gidx.gid_type == IBV_GID_TYPE_ROCE_V2);
|
|
||||||
best_gidx = gidx;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
cleanup:
|
|
||||||
ibv_close_device(context);
|
|
||||||
if (best.rocev2)
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (best.dev >= 0 && log_level > 0)
|
return best_gid_idx;
|
||||||
{
|
|
||||||
log_rdma_dev_port_gid(dev_list[best.dev], best.port, best.gid, best_gidx);
|
|
||||||
}
|
|
||||||
if (best.dev < 0 && have_non_roce && !have_roce)
|
|
||||||
{
|
|
||||||
best.dev = -2;
|
|
||||||
}
|
|
||||||
return best;
|
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
msgr_rdma_context_t *msgr_rdma_context_t::create(std::vector<std::string> osd_networks, const char *ib_devname, uint8_t ib_port, int gid_index, uint32_t mtu, bool odp, int log_level)
|
std::vector<msgr_rdma_context_t*> msgr_rdma_context_t::create_all(const std::vector<addr_mask_t> & osd_network_masks,
|
||||||
|
const char *sel_dev_name, int sel_port_num, int sel_gid_index, uint32_t sel_mtu, bool odp, int log_level)
|
||||||
{
|
{
|
||||||
int res;
|
int res;
|
||||||
|
std::vector<msgr_rdma_context_t*> ret;
|
||||||
|
ibv_device **raw_dev_list = NULL;
|
||||||
ibv_device **dev_list = NULL;
|
ibv_device **dev_list = NULL;
|
||||||
msgr_rdma_context_t *ctx = new msgr_rdma_context_t();
|
ibv_device *single_list[2] = {};
|
||||||
ctx->mtu = mtu;
|
|
||||||
|
|
||||||
timespec tv;
|
raw_dev_list = dev_list = ibv_get_device_list(NULL);
|
||||||
clock_gettime(CLOCK_REALTIME, &tv);
|
|
||||||
srand48(tv.tv_sec*1000000000 + tv.tv_nsec);
|
|
||||||
dev_list = ibv_get_device_list(NULL);
|
|
||||||
if (!dev_list || !*dev_list)
|
if (!dev_list || !*dev_list)
|
||||||
{
|
{
|
||||||
if (errno == -ENOSYS || errno == ENOSYS)
|
if (errno == -ENOSYS || errno == ENOSYS)
|
||||||
|
@ -228,121 +175,126 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(std::vector<std::string> osd_ne
|
||||||
fprintf(stderr, "Failed to get RDMA device list: %s\n", strerror(errno));
|
fprintf(stderr, "Failed to get RDMA device list: %s\n", strerror(errno));
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
if (ib_devname)
|
|
||||||
|
if (sel_dev_name)
|
||||||
{
|
{
|
||||||
int i;
|
int i;
|
||||||
for (i = 0; dev_list[i]; ++i)
|
for (i = 0; dev_list[i]; ++i)
|
||||||
if (!strcmp(ibv_get_device_name(dev_list[i]), ib_devname))
|
if (!strcmp(ibv_get_device_name(dev_list[i]), sel_dev_name))
|
||||||
break;
|
break;
|
||||||
ctx->dev = dev_list[i];
|
if (!dev_list[i])
|
||||||
if (!ctx->dev)
|
|
||||||
{
|
{
|
||||||
fprintf(stderr, "RDMA device %s not found\n", ib_devname);
|
fprintf(stderr, "RDMA device %s not found\n", sel_dev_name);
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
}
|
single_list[0] = dev_list[i];
|
||||||
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT
|
dev_list = single_list;
|
||||||
else if (osd_networks.size())
|
|
||||||
{
|
|
||||||
std::vector<addr_mask_t> nets;
|
|
||||||
for (auto & netstr: osd_networks)
|
|
||||||
{
|
|
||||||
nets.push_back(cidr_parse(netstr));
|
|
||||||
}
|
|
||||||
auto best = match_device(dev_list, nets.data(), nets.size(), log_level);
|
|
||||||
if (best.dev == -2)
|
|
||||||
{
|
|
||||||
best.dev = 0;
|
|
||||||
if (log_level > 0)
|
|
||||||
fprintf(stderr, "No RoCE devices found, using first available RDMA device %s\n", ibv_get_device_name(*dev_list));
|
|
||||||
}
|
|
||||||
else if (best.dev < 0)
|
|
||||||
{
|
|
||||||
if (log_level > 0)
|
|
||||||
fprintf(stderr, "RDMA device matching osd_network is not found, disabling RDMA\n");
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
ib_port = best.port;
|
|
||||||
gid_index = best.gid;
|
|
||||||
}
|
|
||||||
ctx->dev = dev_list[best.dev];
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
else
|
|
||||||
{
|
|
||||||
ctx->dev = *dev_list;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx->context = ibv_open_device(ctx->dev);
|
for (int i = 0; dev_list[i]; ++i)
|
||||||
if (!ctx->context)
|
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Couldn't get RDMA context for %s\n", ibv_get_device_name(ctx->dev));
|
auto dev = dev_list[i];
|
||||||
goto cleanup;
|
ibv_context *context = ibv_open_device(dev);
|
||||||
}
|
if (!context)
|
||||||
|
|
||||||
ctx->ib_port = ib_port;
|
|
||||||
if ((res = ibv_query_port(ctx->context, ib_port, &ctx->portinfo)) != 0)
|
|
||||||
{
|
|
||||||
fprintf(stderr, "Couldn't get RDMA device %s port %d info: %s\n", ibv_get_device_name(ctx->dev), ib_port, strerror(res));
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
ctx->my_lid = ctx->portinfo.lid;
|
|
||||||
if (ctx->portinfo.link_layer != IBV_LINK_LAYER_ETHERNET && !ctx->my_lid)
|
|
||||||
{
|
|
||||||
fprintf(stderr, "RDMA device %s must have local LID because it's not Ethernet, but LID is zero\n", ibv_get_device_name(ctx->dev));
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT
|
|
||||||
if (gid_index != -1)
|
|
||||||
#endif
|
|
||||||
{
|
|
||||||
ctx->gid_index = gid_index < 0 ? 0 : gid_index;
|
|
||||||
if (ibv_query_gid(ctx->context, ib_port, gid_index, &ctx->my_gid))
|
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Couldn't read RDMA device %s GID index %d\n", ibv_get_device_name(ctx->dev), gid_index);
|
fprintf(stderr, "Couldn't get RDMA context for %s\n", ibv_get_device_name(dev));
|
||||||
goto cleanup;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
ibv_device_attr attr;
|
||||||
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT
|
if ((res = ibv_query_device(context, &attr)) != 0)
|
||||||
else
|
|
||||||
{
|
|
||||||
// Auto-guess GID
|
|
||||||
ibv_gid_entry best_gidx;
|
|
||||||
for (int k = 0; k < ctx->portinfo.gid_tbl_len; k++)
|
|
||||||
{
|
{
|
||||||
ibv_gid_entry gidx;
|
fprintf(stderr, "Couldn't query RDMA device %s for its features: %s\n", ibv_get_device_name(dev), strerror(res));
|
||||||
if (ibv_query_gid_ex(ctx->context, ib_port, k, &gidx, 0) != 0)
|
goto cleanup_dev;
|
||||||
|
}
|
||||||
|
if (sel_port_num && sel_port_num > attr.phys_port_cnt)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "RDMA device %s port %d does not exist\n", ibv_get_device_name(dev), sel_port_num);
|
||||||
|
goto cleanup_dev;
|
||||||
|
}
|
||||||
|
for (int port_num = (sel_port_num ? sel_port_num : 1); port_num <= (sel_port_num ? sel_port_num : attr.phys_port_cnt); port_num++)
|
||||||
|
{
|
||||||
|
ibv_port_attr portinfo;
|
||||||
|
if ((res = ibv_query_port(context, port_num, &portinfo)) != 0)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Couldn't read RDMA device %s GID index %d\n", ibv_get_device_name(ctx->dev), k);
|
fprintf(stderr, "Couldn't get RDMA device %s port %d info: %s\n", ibv_get_device_name(dev), port_num, strerror(res));
|
||||||
goto cleanup;
|
continue;
|
||||||
}
|
}
|
||||||
// Skip empty GID
|
if (portinfo.state != IBV_PORT_ACTIVE)
|
||||||
if (((uint64_t*)gidx.gid.raw)[0] == 0 &&
|
|
||||||
((uint64_t*)gidx.gid.raw)[1] == 0)
|
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// Prefer IPv4 RoCEv2 -> IPv6 RoCEv2 -> IPv4 RoCEv1 -> IPv6 RoCEv1 -> IB
|
if (sel_gid_index >= (int)portinfo.gid_tbl_len)
|
||||||
if (gid_index == -1 ||
|
|
||||||
gidx.gid_type == IBV_GID_TYPE_ROCE_V2 && best_gidx.gid_type != IBV_GID_TYPE_ROCE_V2 ||
|
|
||||||
gidx.gid_type == IBV_GID_TYPE_ROCE_V1 && best_gidx.gid_type == IBV_GID_TYPE_IB ||
|
|
||||||
gidx.gid_type == best_gidx.gid_type && is_ipv4_gid(&gidx))
|
|
||||||
{
|
{
|
||||||
gid_index = k;
|
fprintf(stderr, "RDMA device %s port %d GID %d does not exist\n", ibv_get_device_name(dev), port_num, sel_gid_index);
|
||||||
best_gidx = gidx;
|
continue;
|
||||||
|
}
|
||||||
|
uint32_t port_mtu = sel_mtu ? sel_mtu : portinfo.active_mtu;
|
||||||
|
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT
|
||||||
|
if (sel_gid_index < 0)
|
||||||
|
{
|
||||||
|
ibv_gid_entry best_gidx;
|
||||||
|
int net_num = 0;
|
||||||
|
int best_gid_idx = match_port_gid(osd_network_masks, context, port_num, portinfo.gid_tbl_len, log_level, &best_gidx, &net_num);
|
||||||
|
if (best_gid_idx >= 0)
|
||||||
|
{
|
||||||
|
if (log_level > 0)
|
||||||
|
log_rdma_dev_port_gid(dev, port_num, best_gid_idx, port_mtu, best_gidx);
|
||||||
|
auto ctx = msgr_rdma_context_t::create(dev, portinfo, port_num, best_gid_idx, port_mtu, odp, log_level);
|
||||||
|
if (ctx)
|
||||||
|
{
|
||||||
|
ctx->net_mask = osd_network_masks[net_num];
|
||||||
|
ret.push_back(ctx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
#endif
|
||||||
|
{
|
||||||
|
int best_gid_idx = sel_gid_index >= 0 ? sel_gid_index : 0;
|
||||||
|
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT
|
||||||
|
if (log_level > 0)
|
||||||
|
{
|
||||||
|
ibv_gid_entry gidx;
|
||||||
|
ibv_query_gid_ex(context, port_num, best_gid_idx, &gidx, 0);
|
||||||
|
log_rdma_dev_port_gid(dev, port_num, best_gid_idx, port_mtu, gidx);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
auto ctx = msgr_rdma_context_t::create(dev, portinfo, port_num, best_gid_idx, port_mtu, odp, log_level);
|
||||||
|
if (ctx)
|
||||||
|
ret.push_back(ctx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ctx->gid_index = gid_index = (gid_index == -1 ? 0 : gid_index);
|
cleanup_dev:
|
||||||
if (log_level > 0)
|
ibv_close_device(context);
|
||||||
{
|
|
||||||
log_rdma_dev_port_gid(ctx->dev, ctx->ib_port, ctx->gid_index, best_gidx);
|
|
||||||
}
|
|
||||||
ctx->my_gid = best_gidx.gid;
|
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
cleanup:
|
||||||
|
if (raw_dev_list)
|
||||||
|
ibv_free_device_list(raw_dev_list);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
msgr_rdma_context_t *msgr_rdma_context_t::create(ibv_device *dev, ibv_port_attr & portinfo, int ib_port, int gid_index, uint32_t mtu, bool odp, int log_level)
|
||||||
|
{
|
||||||
|
msgr_rdma_context_t *ctx = new msgr_rdma_context_t();
|
||||||
|
ibv_context *context = ibv_open_device(dev);
|
||||||
|
if (!context)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "Couldn't get RDMA context for %s\n", ibv_get_device_name(dev));
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx->mtu = mtu;
|
||||||
|
ctx->context = context;
|
||||||
|
ctx->ib_port = ib_port;
|
||||||
|
ctx->portinfo = portinfo;
|
||||||
|
ctx->my_lid = ctx->portinfo.lid;
|
||||||
|
if (ctx->portinfo.link_layer != IBV_LINK_LAYER_ETHERNET && !ctx->my_lid)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "RDMA device %s must have local LID because it's not Ethernet, but LID is zero\n", ibv_get_device_name(dev));
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
ctx->gid_index = gid_index;
|
||||||
|
|
||||||
ctx->pd = ibv_alloc_pd(ctx->context);
|
ctx->pd = ibv_alloc_pd(ctx->context);
|
||||||
if (!ctx->pd)
|
if (!ctx->pd)
|
||||||
|
@ -351,18 +303,18 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(std::vector<std::string> osd_ne
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx->odp = odp;
|
||||||
|
if (ctx->odp)
|
||||||
{
|
{
|
||||||
if (ibv_query_device_ex(ctx->context, NULL, &ctx->attrx))
|
if (ibv_query_device_ex(ctx->context, NULL, &ctx->attrx))
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Couldn't query RDMA device for its features\n");
|
fprintf(stderr, "Couldn't query RDMA device for its features\n");
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
ctx->odp = odp;
|
if (!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT) ||
|
||||||
if (ctx->odp &&
|
|
||||||
(!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT) ||
|
|
||||||
!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT_IMPLICIT) ||
|
!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT_IMPLICIT) ||
|
||||||
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) ||
|
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) ||
|
||||||
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV)))
|
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV))
|
||||||
{
|
{
|
||||||
ctx->odp = false;
|
ctx->odp = false;
|
||||||
if (log_level > 0)
|
if (log_level > 0)
|
||||||
|
@ -395,14 +347,12 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(std::vector<std::string> osd_ne
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dev_list)
|
|
||||||
ibv_free_device_list(dev_list);
|
|
||||||
return ctx;
|
return ctx;
|
||||||
|
|
||||||
cleanup:
|
cleanup:
|
||||||
|
if (context)
|
||||||
|
ibv_close_device(context);
|
||||||
delete ctx;
|
delete ctx;
|
||||||
if (dev_list)
|
|
||||||
ibv_free_device_list(dev_list);
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -550,7 +500,15 @@ bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address, uint64
|
||||||
{
|
{
|
||||||
client_max_msg = rdma_max_msg;
|
client_max_msg = rdma_max_msg;
|
||||||
}
|
}
|
||||||
auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, client_max_msg);
|
auto cl = clients.at(peer_fd);
|
||||||
|
msgr_rdma_context_t *selected_ctx = choose_rdma_context(cl);
|
||||||
|
if (!selected_ctx)
|
||||||
|
{
|
||||||
|
if (log_level > 0)
|
||||||
|
fprintf(stderr, "No RDMA context for peer %d, using only TCP\n", cl->peer_fd);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
msgr_rdma_connection_t *rdma_conn = msgr_rdma_connection_t::create(selected_ctx, rdma_max_send, rdma_max_recv, rdma_max_sge, client_max_msg);
|
||||||
if (rdma_conn)
|
if (rdma_conn)
|
||||||
{
|
{
|
||||||
int r = rdma_conn->connect(&addr);
|
int r = rdma_conn->connect(&addr);
|
||||||
|
@ -669,9 +627,9 @@ void osd_messenger_t::try_send_rdma_nodp(osd_client_t *cl)
|
||||||
// Allocate send ring buffer, if not yet
|
// Allocate send ring buffer, if not yet
|
||||||
rc->send_out_size = rc->max_msg*rdma_max_send;
|
rc->send_out_size = rc->max_msg*rdma_max_send;
|
||||||
rc->send_out.buf = malloc_or_die(rc->send_out_size);
|
rc->send_out.buf = malloc_or_die(rc->send_out_size);
|
||||||
if (!rdma_context->odp)
|
if (!rc->ctx->odp)
|
||||||
{
|
{
|
||||||
rc->send_out.mr = ibv_reg_mr(rdma_context->pd, rc->send_out.buf, rc->send_out_size, 0);
|
rc->send_out.mr = ibv_reg_mr(rc->ctx->pd, rc->send_out.buf, rc->send_out_size, 0);
|
||||||
if (!rc->send_out.mr)
|
if (!rc->send_out.mr)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
|
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
|
||||||
|
@ -701,7 +659,7 @@ void osd_messenger_t::try_send_rdma_nodp(osd_client_t *cl)
|
||||||
ibv_sge sge = {
|
ibv_sge sge = {
|
||||||
.addr = (uintptr_t)dst,
|
.addr = (uintptr_t)dst,
|
||||||
.length = (uint32_t)copied,
|
.length = (uint32_t)copied,
|
||||||
.lkey = rdma_context->odp ? rdma_context->mr->lkey : rc->send_out.mr->lkey,
|
.lkey = rc->ctx->odp ? rc->ctx->mr->lkey : rc->send_out.mr->lkey,
|
||||||
};
|
};
|
||||||
try_send_rdma_wr(cl, &sge, 1);
|
try_send_rdma_wr(cl, &sge, 1);
|
||||||
rc->send_sizes.push_back(copied);
|
rc->send_sizes.push_back(copied);
|
||||||
|
@ -711,7 +669,7 @@ void osd_messenger_t::try_send_rdma_nodp(osd_client_t *cl)
|
||||||
|
|
||||||
void osd_messenger_t::try_send_rdma(osd_client_t *cl)
|
void osd_messenger_t::try_send_rdma(osd_client_t *cl)
|
||||||
{
|
{
|
||||||
if (rdma_context->odp)
|
if (cl->rdma_conn->ctx->odp)
|
||||||
try_send_rdma_odp(cl);
|
try_send_rdma_odp(cl);
|
||||||
else
|
else
|
||||||
try_send_rdma_nodp(cl);
|
try_send_rdma_nodp(cl);
|
||||||
|
@ -746,9 +704,9 @@ bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
|
||||||
{
|
{
|
||||||
msgr_rdma_buf_t b;
|
msgr_rdma_buf_t b;
|
||||||
b.buf = malloc_or_die(rc->max_msg);
|
b.buf = malloc_or_die(rc->max_msg);
|
||||||
if (!rdma_context->odp)
|
if (!rc->ctx->odp)
|
||||||
{
|
{
|
||||||
b.mr = ibv_reg_mr(rdma_context->pd, b.buf, rc->max_msg, IBV_ACCESS_LOCAL_WRITE);
|
b.mr = ibv_reg_mr(rc->ctx->pd, b.buf, rc->max_msg, IBV_ACCESS_LOCAL_WRITE);
|
||||||
if (!b.mr)
|
if (!b.mr)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
|
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
|
||||||
|
@ -763,7 +721,7 @@ bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
|
||||||
|
|
||||||
#define RDMA_EVENTS_AT_ONCE 32
|
#define RDMA_EVENTS_AT_ONCE 32
|
||||||
|
|
||||||
void osd_messenger_t::handle_rdma_events()
|
void osd_messenger_t::handle_rdma_events(msgr_rdma_context_t *rdma_context)
|
||||||
{
|
{
|
||||||
// Request next notification
|
// Request next notification
|
||||||
ibv_cq *ev_cq;
|
ibv_cq *ev_cq;
|
||||||
|
|
|
@ -5,6 +5,7 @@
|
||||||
#include <infiniband/verbs.h>
|
#include <infiniband/verbs.h>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
#include "addr_util.h"
|
||||||
|
|
||||||
struct msgr_rdma_address_t
|
struct msgr_rdma_address_t
|
||||||
{
|
{
|
||||||
|
@ -20,7 +21,6 @@ struct msgr_rdma_address_t
|
||||||
struct msgr_rdma_context_t
|
struct msgr_rdma_context_t
|
||||||
{
|
{
|
||||||
ibv_context *context = NULL;
|
ibv_context *context = NULL;
|
||||||
ibv_device *dev = NULL;
|
|
||||||
ibv_device_attr_ex attrx;
|
ibv_device_attr_ex attrx;
|
||||||
ibv_pd *pd = NULL;
|
ibv_pd *pd = NULL;
|
||||||
bool odp = false;
|
bool odp = false;
|
||||||
|
@ -35,8 +35,13 @@ struct msgr_rdma_context_t
|
||||||
uint32_t mtu;
|
uint32_t mtu;
|
||||||
int max_cqe = 0;
|
int max_cqe = 0;
|
||||||
int used_max_cqe = 0;
|
int used_max_cqe = 0;
|
||||||
|
addr_mask_t net_mask = {};
|
||||||
|
|
||||||
|
static std::vector<msgr_rdma_context_t*> create_all(const std::vector<addr_mask_t> & osd_network_masks,
|
||||||
|
const char *sel_dev_name, int sel_port_num, int sel_gid_index, uint32_t sel_mtu, bool odp, int log_level);
|
||||||
|
static msgr_rdma_context_t *create(ibv_device *dev, ibv_port_attr & portinfo,
|
||||||
|
int ib_port, int gid_index, uint32_t mtu, bool odp, int log_level);
|
||||||
|
|
||||||
static msgr_rdma_context_t *create(std::vector<std::string> osd_networks, const char *ib_devname, uint8_t ib_port, int gid_index, uint32_t mtu, bool odp, int log_level);
|
|
||||||
~msgr_rdma_context_t();
|
~msgr_rdma_context_t();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -98,5 +98,3 @@ std::string format_lat(uint64_t lat);
|
||||||
std::string format_q(double depth);
|
std::string format_q(double depth);
|
||||||
|
|
||||||
bool stupid_glob(const std::string str, const std::string glob);
|
bool stupid_glob(const std::string str, const std::string glob);
|
||||||
|
|
||||||
std::string implode(const std::string & sep, json11::Json array);
|
|
||||||
|
|
|
@ -7,6 +7,7 @@
|
||||||
#include "epoll_manager.h"
|
#include "epoll_manager.h"
|
||||||
#include "pg_states.h"
|
#include "pg_states.h"
|
||||||
#include "str_util.h"
|
#include "str_util.h"
|
||||||
|
#include "json_util.h"
|
||||||
|
|
||||||
struct placement_osd_t
|
struct placement_osd_t
|
||||||
{
|
{
|
||||||
|
|
|
@ -5,6 +5,7 @@
|
||||||
#include "cluster_client.h"
|
#include "cluster_client.h"
|
||||||
#include "pg_states.h"
|
#include "pg_states.h"
|
||||||
#include "str_util.h"
|
#include "str_util.h"
|
||||||
|
#include "json_util.h"
|
||||||
|
|
||||||
struct pg_lister_t
|
struct pg_lister_t
|
||||||
{
|
{
|
||||||
|
|
|
@ -10,6 +10,7 @@
|
||||||
#include "epoll_manager.h"
|
#include "epoll_manager.h"
|
||||||
#include "pg_states.h"
|
#include "pg_states.h"
|
||||||
#include "str_util.h"
|
#include "str_util.h"
|
||||||
|
#include "json_util.h"
|
||||||
|
|
||||||
struct pool_creator_t
|
struct pool_creator_t
|
||||||
{
|
{
|
||||||
|
|
|
@ -5,6 +5,7 @@
|
||||||
#include "cli.h"
|
#include "cli.h"
|
||||||
#include "cluster_client.h"
|
#include "cluster_client.h"
|
||||||
#include "str_util.h"
|
#include "str_util.h"
|
||||||
|
#include "json_util.h"
|
||||||
#include "pg_states.h"
|
#include "pg_states.h"
|
||||||
|
|
||||||
// List pools with space statistics
|
// List pools with space statistics
|
||||||
|
@ -665,19 +666,3 @@ std::function<bool(cli_result_t &)> cli_tool_t::start_pool_ls(json11::Json cfg)
|
||||||
return false;
|
return false;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string implode(const std::string & sep, json11::Json array)
|
|
||||||
{
|
|
||||||
if (array.is_number() || array.is_bool() || array.is_string())
|
|
||||||
{
|
|
||||||
return array.as_string();
|
|
||||||
}
|
|
||||||
std::string res;
|
|
||||||
bool first = true;
|
|
||||||
for (auto & item: array.array_items())
|
|
||||||
{
|
|
||||||
res += (first ? item.as_string() : sep+item.as_string());
|
|
||||||
first = false;
|
|
||||||
}
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
|
@ -120,7 +120,9 @@ osd_t::~osd_t()
|
||||||
delete epmgr;
|
delete epmgr;
|
||||||
if (bs)
|
if (bs)
|
||||||
delete bs;
|
delete bs;
|
||||||
close(listen_fd);
|
for (auto listen_fd: listen_fds)
|
||||||
|
close(listen_fd);
|
||||||
|
listen_fds.clear();
|
||||||
free(zero_buffer);
|
free(zero_buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -162,9 +164,6 @@ void osd_t::parse_config(bool init)
|
||||||
else
|
else
|
||||||
immediate_commit = IMMEDIATE_NONE;
|
immediate_commit = IMMEDIATE_NONE;
|
||||||
// Bind address
|
// Bind address
|
||||||
bind_address = config["bind_address"].string_value();
|
|
||||||
if (bind_address == "")
|
|
||||||
bind_address = "0.0.0.0";
|
|
||||||
bind_port = config["bind_port"].uint64_value();
|
bind_port = config["bind_port"].uint64_value();
|
||||||
if (bind_port <= 0 || bind_port > 65535)
|
if (bind_port <= 0 || bind_port > 65535)
|
||||||
bind_port = 0;
|
bind_port = 0;
|
||||||
|
@ -322,41 +321,30 @@ void osd_t::parse_config(bool init)
|
||||||
|
|
||||||
void osd_t::bind_socket()
|
void osd_t::bind_socket()
|
||||||
{
|
{
|
||||||
if (config["osd_network"].is_string() ||
|
if (msgr.all_osd_network_masks.size())
|
||||||
config["osd_network"].is_array())
|
|
||||||
{
|
{
|
||||||
std::vector<std::string> mask;
|
bind_addresses = getifaddr_list(msgr.all_osd_network_masks);
|
||||||
if (config["osd_network"].is_string())
|
if (!bind_addresses.size())
|
||||||
mask.push_back(config["osd_network"].string_value());
|
|
||||||
else
|
|
||||||
for (auto v: config["osd_network"].array_items())
|
|
||||||
mask.push_back(v.string_value());
|
|
||||||
auto matched_addrs = getifaddr_list(mask);
|
|
||||||
if (matched_addrs.size() > 1)
|
|
||||||
{
|
{
|
||||||
fprintf(stderr, "More than 1 address matches requested network(s): %s\n", json11::Json(matched_addrs).dump().c_str());
|
auto nets = implode(", ", msgr.all_osd_networks);
|
||||||
force_stop(1);
|
|
||||||
}
|
|
||||||
if (!matched_addrs.size())
|
|
||||||
{
|
|
||||||
std::string nets;
|
|
||||||
for (auto v: mask)
|
|
||||||
nets += (nets == "" ? v : ","+v);
|
|
||||||
fprintf(stderr, "Addresses matching osd_network(s) %s not found\n", nets.c_str());
|
fprintf(stderr, "Addresses matching osd_network(s) %s not found\n", nets.c_str());
|
||||||
force_stop(1);
|
force_stop(1);
|
||||||
}
|
}
|
||||||
bind_address = matched_addrs[0];
|
|
||||||
}
|
}
|
||||||
|
else
|
||||||
// FIXME Support multiple listening sockets
|
|
||||||
|
|
||||||
listen_fd = create_and_bind_socket(bind_address, bind_port, listen_backlog, &listening_port);
|
|
||||||
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
|
|
||||||
|
|
||||||
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
|
|
||||||
{
|
{
|
||||||
msgr.accept_connections(listen_fd);
|
bind_addresses.push_back("0.0.0.0");
|
||||||
});
|
}
|
||||||
|
for (auto & bind_address: bind_addresses)
|
||||||
|
{
|
||||||
|
int listen_fd = create_and_bind_socket(bind_address, bind_port, listen_backlog, &listening_port);
|
||||||
|
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
|
||||||
|
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
|
||||||
|
{
|
||||||
|
msgr.accept_connections(fd);
|
||||||
|
});
|
||||||
|
listen_fds.push_back(listen_fd);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool osd_t::shutdown()
|
bool osd_t::shutdown()
|
||||||
|
|
|
@ -107,7 +107,6 @@ class osd_t
|
||||||
bool no_recovery = false;
|
bool no_recovery = false;
|
||||||
bool no_scrub = false;
|
bool no_scrub = false;
|
||||||
bool allow_net_split = false;
|
bool allow_net_split = false;
|
||||||
std::string bind_address;
|
|
||||||
int bind_port, listen_backlog = 128;
|
int bind_port, listen_backlog = 128;
|
||||||
// FIXME: Implement client queue depth limit
|
// FIXME: Implement client queue depth limit
|
||||||
int client_queue_depth = 128;
|
int client_queue_depth = 128;
|
||||||
|
@ -200,7 +199,8 @@ class osd_t
|
||||||
epoll_manager_t *epmgr = NULL;
|
epoll_manager_t *epmgr = NULL;
|
||||||
|
|
||||||
int listening_port = 0;
|
int listening_port = 0;
|
||||||
int listen_fd = 0;
|
std::vector<std::string> bind_addresses;
|
||||||
|
std::vector<int> listen_fds;
|
||||||
ring_consumer_t consumer;
|
ring_consumer_t consumer;
|
||||||
|
|
||||||
// op statistics
|
// op statistics
|
||||||
|
|
|
@ -165,8 +165,8 @@ json11::Json osd_t::get_osd_state()
|
||||||
hostname.resize(strnlen(hostname.data(), hostname.size()));
|
hostname.resize(strnlen(hostname.data(), hostname.size()));
|
||||||
json11::Json::object st;
|
json11::Json::object st;
|
||||||
st["state"] = "up";
|
st["state"] = "up";
|
||||||
if (bind_address != "0.0.0.0")
|
if (bind_addresses.size() != 1 || bind_addresses[0] != "0.0.0.0")
|
||||||
st["addresses"] = json11::Json::array { bind_address };
|
st["addresses"] = bind_addresses;
|
||||||
else
|
else
|
||||||
st["addresses"] = getifaddr_list();
|
st["addresses"] = getifaddr_list();
|
||||||
st["host"] = std::string(hostname.data(), hostname.size());
|
st["host"] = std::string(hostname.data(), hostname.size());
|
||||||
|
|
|
@ -93,6 +93,13 @@ bool cidr6_match(const in6_addr &address, const in6_addr &network, uint8_t bits)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool cidr_sockaddr_match(const sockaddr_storage &addr, const addr_mask_t &mask)
|
||||||
|
{
|
||||||
|
return mask.family == addr.ss_family && (mask.family == AF_INET
|
||||||
|
? cidr_match(*(in_addr*)&addr, mask.ipv4, mask.bits)
|
||||||
|
: cidr6_match(*(in6_addr*)&addr, mask.ipv6, mask.bits));
|
||||||
|
}
|
||||||
|
|
||||||
addr_mask_t cidr_parse(std::string mask)
|
addr_mask_t cidr_parse(std::string mask)
|
||||||
{
|
{
|
||||||
unsigned bits = 255;
|
unsigned bits = 255;
|
||||||
|
@ -126,13 +133,11 @@ addr_mask_t cidr_parse(std::string mask)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<std::string> getifaddr_list(std::vector<std::string> mask_cfg, bool include_v6)
|
std::vector<std::string> getifaddr_list(const std::vector<addr_mask_t> & masks, bool include_v6)
|
||||||
{
|
{
|
||||||
std::vector<addr_mask_t> masks;
|
for (auto & mask: masks)
|
||||||
for (auto mask: mask_cfg)
|
|
||||||
{
|
{
|
||||||
masks.push_back(cidr_parse(mask));
|
if (mask.family == AF_INET6)
|
||||||
if (masks[masks.size()-1].family == AF_INET6)
|
|
||||||
{
|
{
|
||||||
// Auto-enable IPv6 addresses
|
// Auto-enable IPv6 addresses
|
||||||
include_v6 = true;
|
include_v6 = true;
|
||||||
|
|
|
@ -18,5 +18,6 @@ std::string addr_to_string(const sockaddr_storage &addr);
|
||||||
addr_mask_t cidr_parse(std::string mask);
|
addr_mask_t cidr_parse(std::string mask);
|
||||||
bool cidr_match(const in_addr &address, const in_addr &network, uint8_t bits);
|
bool cidr_match(const in_addr &address, const in_addr &network, uint8_t bits);
|
||||||
bool cidr6_match(const in6_addr &address, const in6_addr &network, uint8_t bits);
|
bool cidr6_match(const in6_addr &address, const in6_addr &network, uint8_t bits);
|
||||||
std::vector<std::string> getifaddr_list(std::vector<std::string> mask_cfg = std::vector<std::string>(), bool include_v6 = false);
|
bool cidr_sockaddr_match(const sockaddr_storage &addr, const addr_mask_t &mask);
|
||||||
|
std::vector<std::string> getifaddr_list(const std::vector<addr_mask_t> & masks = std::vector<addr_mask_t>(), bool include_v6 = false);
|
||||||
int create_and_bind_socket(std::string bind_address, int bind_port, int listen_backlog, int *listening_port);
|
int create_and_bind_socket(std::string bind_address, int bind_port, int listen_backlog, int *listening_port);
|
||||||
|
|
|
@ -33,3 +33,19 @@ bool json_is_false(const json11::Json & val)
|
||||||
return !val.bool_value();
|
return !val.bool_value();
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::string implode(const std::string & sep, json11::Json array)
|
||||||
|
{
|
||||||
|
if (array.is_number() || array.is_bool() || array.is_string())
|
||||||
|
{
|
||||||
|
return array.as_string();
|
||||||
|
}
|
||||||
|
std::string res;
|
||||||
|
bool first = true;
|
||||||
|
for (auto & item: array.array_items())
|
||||||
|
{
|
||||||
|
res += (first ? item.as_string() : sep+item.as_string());
|
||||||
|
first = false;
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
|
@ -11,3 +11,4 @@
|
||||||
std::map<std::string, std::string> json_to_string_map(const json11::Json::object & config);
|
std::map<std::string, std::string> json_to_string_map(const json11::Json::object & config);
|
||||||
bool json_is_true(const json11::Json & val);
|
bool json_is_true(const json11::Json & val);
|
||||||
bool json_is_false(const json11::Json & val);
|
bool json_is_false(const json11::Json & val);
|
||||||
|
std::string implode(const std::string & sep, json11::Json array);
|
||||||
|
|
Loading…
Reference in New Issue