Compare commits

...

2 Commits

17 changed files with 155 additions and 100 deletions

View File

@ -182,7 +182,7 @@ void etcd_state_client_t::add_etcd_url(std::string addr)
exit(1);
}
if (!local_ips.size())
local_ips = getifaddr_list(std::vector<std::string>(), true);
local_ips = getifaddr_list(std::vector<addr_mask_t>(), true);
std::string check_addr;
int pos = addr.find('/');
int pos2 = addr.find(':');

View File

@ -117,15 +117,22 @@ void msgr_iothread_t::run()
void osd_messenger_t::init()
{
// FIXME: Support multiple RDMA networks?!
#ifdef WITH_RDMA
if (use_rdma)
{
rdma_context = msgr_rdma_context_t::create(
osd_networks, rdma_device != "" ? rdma_device.c_str() : NULL,
osd_num && osd_cluster_network_masks.size() ? osd_cluster_network_masks : osd_network_masks,
rdma_device != "" ? rdma_device.c_str() : NULL,
rdma_port_num, rdma_gid_index, rdma_mtu, rdma_odp, log_level
);
if (!rdma_context)
{
if (force_rdma)
{
fprintf(stderr, "[OSD %ju] Couldn't initialize RDMA, force_rdma is enabled, exiting\n", osd_num);
exit(1);
}
if (log_level > 0)
fprintf(stderr, "[OSD %ju] Couldn't initialize RDMA, proceeding with TCP only\n", osd_num);
}
@ -262,6 +269,10 @@ void osd_messenger_t::parse_config(const json11::Json & config)
// RDMA is on by default in RDMA-enabled builds
this->use_rdma = config["use_rdma"].bool_value() || config["use_rdma"].uint64_value() != 0;
}
if (!config["force_rdma"].is_null())
{
this->force_rdma = config["force_rdma"].bool_value() || config["force_rdma"].uint64_value() != 0;
}
this->rdma_device = config["rdma_device"].string_value();
this->rdma_port_num = (uint8_t)config["rdma_port_num"].uint64_value();
if (!this->rdma_port_num)
@ -282,15 +293,6 @@ void osd_messenger_t::parse_config(const json11::Json & config)
if (!this->rdma_max_msg || this->rdma_max_msg > 128*1024*1024)
this->rdma_max_msg = 129*1024;
this->rdma_odp = config["rdma_odp"].bool_value();
std::vector<std::string> mask;
if (config["bind_address"].is_string())
mask.push_back(config["bind_address"].string_value());
else if (config["osd_network"].is_string())
mask.push_back(config["osd_network"].string_value());
else
for (auto v: config["osd_network"].array_items())
mask.push_back(v.string_value());
this->osd_networks = mask;
#endif
if (!osd_num)
this->iothread_count = (uint32_t)config["client_iothread_count"].uint64_value();
@ -314,23 +316,83 @@ void osd_messenger_t::parse_config(const json11::Json & config)
if (!this->osd_ping_timeout)
this->osd_ping_timeout = 5;
this->log_level = config["log_level"].uint64_value();
// OSD public & cluster networks
this->osd_networks.clear();
if (config["bind_address"].is_string())
this->osd_networks.push_back(config["bind_address"].string_value());
else if (config["osd_network"].is_string())
this->osd_networks.push_back(config["osd_network"].string_value());
else
for (auto v: config["osd_network"].array_items())
this->osd_networks.push_back(v.string_value());
this->osd_cluster_networks.clear();
if (config["osd_cluster_network"].is_string())
this->osd_cluster_networks.push_back(config["osd_cluster_network"].string_value());
else
for (auto v: config["osd_cluster_network"].array_items())
this->osd_cluster_networks.push_back(v.string_value());
if (this->osd_cluster_networks.size())
for (auto & net: this->osd_cluster_networks)
for (int i = this->osd_networks.size()-1; i >= 0; i--)
if (this->osd_networks[i] == net)
this->osd_networks.erase(this->osd_networks.begin()+i, this->osd_networks.begin()+i+1);
this->osd_network_masks.clear();
for (auto & netstr: this->osd_networks)
this->osd_network_masks.push_back(cidr_parse(netstr));
this->osd_cluster_network_masks.clear();
for (auto & netstr: this->osd_cluster_networks)
this->osd_cluster_network_masks.push_back(cidr_parse(netstr));
this->all_osd_networks.insert(this->all_osd_networks.end(), this->osd_networks.begin(), this->osd_networks.end());
this->all_osd_networks.insert(this->all_osd_networks.end(), this->osd_cluster_networks.begin(), this->osd_cluster_networks.end());
if (!this->osd_networks.size())
{
this->osd_networks = this->osd_cluster_networks;
this->osd_network_masks = this->osd_cluster_network_masks;
}
}
void osd_messenger_t::connect_peer(uint64_t peer_osd, json11::Json peer_state)
{
if (wanted_peers.find(peer_osd) == wanted_peers.end())
if (wanted_peers[peer_osd].raw_address_list != peer_state["addresses"])
{
wanted_peers[peer_osd] = (osd_wanted_peer_t){
.address_list = peer_state["addresses"],
.port = (int)peer_state["port"].int64_value(),
};
wanted_peers[peer_osd].raw_address_list = peer_state["addresses"];
if (osd_cluster_networks.size())
{
json11::Json::array address_list, cluster_address_list;
for (auto json_addr: peer_state["addresses"].array_items())
{
struct sockaddr_storage addr;
auto ok = string_to_addr(json_addr.string_value(), false, 0, &addr);
if (ok)
{
bool is_cluster = false;
for (auto & mask: osd_cluster_network_masks)
{
if (mask.family == addr.ss_family && (mask.family == AF_INET
? cidr_match(*(in_addr*)&addr, mask.ipv4, mask.bits)
: cidr6_match(*(in6_addr*)&addr, mask.ipv6, mask.bits)))
{
is_cluster = true;
break;
}
}
if (is_cluster)
cluster_address_list.push_back(json_addr);
else
address_list.push_back(json_addr);
}
}
auto n_cluster = this->osd_num ? cluster_address_list.size() : 0;
if (this->osd_num)
address_list.insert(address_list.begin(), cluster_address_list.begin(), cluster_address_list.end());
wanted_peers[peer_osd].address_list = address_list;
wanted_peers[peer_osd].n_cluster_addr = n_cluster;
}
else
wanted_peers[peer_osd].address_list = peer_state["addresses"];
wanted_peers[peer_osd].address_changed = true;
}
else
{
wanted_peers[peer_osd].address_list = peer_state["addresses"];
wanted_peers[peer_osd].port = (int)peer_state["port"].int64_value();
}
wanted_peers[peer_osd].address_changed = true;
wanted_peers[peer_osd].port = (int)peer_state["port"].int64_value();
try_connect_peer(peer_osd);
}

View File

@ -16,6 +16,7 @@
#include "json11/json11.hpp"
#include "msgr_op.h"
#include "timerfd_manager.h"
#include "addr_util.h"
#include <ringloop.h>
#define CL_READ_HDR 1
@ -93,13 +94,15 @@ struct osd_client_t
struct osd_wanted_peer_t
{
json11::Json raw_address_list;
json11::Json address_list;
int port;
time_t last_connect_attempt;
bool connecting, address_changed;
int address_index;
int n_cluster_addr = 0;
int port = 0;
time_t last_connect_attempt = 0;
bool connecting = false, address_changed = false;
int address_index = 0;
std::string cur_addr;
int cur_port;
int cur_port = 0;
};
struct osd_op_stats_t
@ -165,7 +168,7 @@ protected:
#ifdef WITH_RDMA
bool use_rdma = true;
std::vector<std::string> osd_networks;
bool force_rdma = false;
std::string rdma_device;
uint64_t rdma_port_num = 1, rdma_mtu = 0;
int rdma_gid_index = -1;
@ -190,6 +193,12 @@ public:
std::map<int, osd_client_t*> clients;
std::map<osd_num_t, osd_wanted_peer_t> wanted_peers;
std::map<uint64_t, int> osd_peer_fds;
std::vector<std::string> osd_networks;
std::vector<addr_mask_t> osd_network_masks;
std::vector<std::string> osd_cluster_networks;
std::vector<addr_mask_t> osd_cluster_network_masks;
std::vector<std::string> all_osd_networks;
std::vector<addr_mask_t> all_osd_network_masks;
// op statistics
osd_op_stats_t stats, recovery_stats;

View File

@ -3,7 +3,6 @@
#include <stdio.h>
#include <stdlib.h>
#include "addr_util.h"
#include "msgr_rdma.h"
#include "messenger.h"
@ -77,7 +76,7 @@ static bool is_ipv4_gid(ibv_gid_entry *gidx)
((uint32_t*)gidx->gid.raw)[2] == 0xffff0000);
}
static bool match_gid(ibv_gid_entry *gidx, addr_mask_t *networks, int nnet)
static bool match_gid(ibv_gid_entry *gidx, const addr_mask_t *networks, int nnet)
{
if (gidx->gid_type != IBV_GID_TYPE_ROCE_V1 &&
gidx->gid_type != IBV_GID_TYPE_ROCE_V2 ||
@ -125,7 +124,7 @@ static void log_rdma_dev_port_gid(ibv_device *dev, int ib_port, int gid_index, i
);
}
static matched_dev match_device(ibv_device **dev_list, addr_mask_t *networks, int nnet, int log_level)
static matched_dev match_device(ibv_device **dev_list, const addr_mask_t *networks, int nnet, int log_level)
{
matched_dev best;
ibv_device_attr attr;
@ -201,7 +200,7 @@ cleanup:
}
#endif
msgr_rdma_context_t *msgr_rdma_context_t::create(std::vector<std::string> osd_networks, const char *ib_devname, uint8_t ib_port, int gid_index, uint32_t mtu, bool odp, int log_level)
msgr_rdma_context_t *msgr_rdma_context_t::create(const std::vector<addr_mask_t> & osd_network_masks, const char *ib_devname, uint8_t ib_port, int gid_index, uint32_t mtu, bool odp, int log_level)
{
int res;
ibv_device **dev_list = NULL;
@ -242,14 +241,9 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(std::vector<std::string> osd_ne
}
}
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT
else if (osd_networks.size())
else if (osd_network_masks.size())
{
std::vector<addr_mask_t> nets;
for (auto & netstr: osd_networks)
{
nets.push_back(cidr_parse(netstr));
}
auto best = match_device(dev_list, nets.data(), nets.size(), log_level);
auto best = match_device(dev_list, osd_network_masks.data(), osd_network_masks.size(), log_level);
if (best.dev == -2)
{
best.dev = 0;

View File

@ -5,6 +5,7 @@
#include <infiniband/verbs.h>
#include <string>
#include <vector>
#include "addr_util.h"
struct msgr_rdma_address_t
{
@ -36,7 +37,7 @@ struct msgr_rdma_context_t
int max_cqe = 0;
int used_max_cqe = 0;
static msgr_rdma_context_t *create(std::vector<std::string> osd_networks, const char *ib_devname, uint8_t ib_port, int gid_index, uint32_t mtu, bool odp, int log_level);
static msgr_rdma_context_t *create(const std::vector<addr_mask_t> & osd_network_masks, const char *ib_devname, uint8_t ib_port, int gid_index, uint32_t mtu, bool odp, int log_level);
~msgr_rdma_context_t();
};

View File

@ -98,5 +98,3 @@ std::string format_lat(uint64_t lat);
std::string format_q(double depth);
bool stupid_glob(const std::string str, const std::string glob);
std::string implode(const std::string & sep, json11::Json array);

View File

@ -7,6 +7,7 @@
#include "epoll_manager.h"
#include "pg_states.h"
#include "str_util.h"
#include "json_util.h"
struct placement_osd_t
{

View File

@ -5,6 +5,7 @@
#include "cluster_client.h"
#include "pg_states.h"
#include "str_util.h"
#include "json_util.h"
struct pg_lister_t
{

View File

@ -10,6 +10,7 @@
#include "epoll_manager.h"
#include "pg_states.h"
#include "str_util.h"
#include "json_util.h"
struct pool_creator_t
{

View File

@ -5,6 +5,7 @@
#include "cli.h"
#include "cluster_client.h"
#include "str_util.h"
#include "json_util.h"
#include "pg_states.h"
// List pools with space statistics
@ -665,19 +666,3 @@ std::function<bool(cli_result_t &)> cli_tool_t::start_pool_ls(json11::Json cfg)
return false;
};
}
std::string implode(const std::string & sep, json11::Json array)
{
if (array.is_number() || array.is_bool() || array.is_string())
{
return array.as_string();
}
std::string res;
bool first = true;
for (auto & item: array.array_items())
{
res += (first ? item.as_string() : sep+item.as_string());
first = false;
}
return res;
}

View File

@ -120,7 +120,9 @@ osd_t::~osd_t()
delete epmgr;
if (bs)
delete bs;
close(listen_fd);
for (auto listen_fd: listen_fds)
close(listen_fd);
listen_fds.clear();
free(zero_buffer);
}
@ -162,9 +164,6 @@ void osd_t::parse_config(bool init)
else
immediate_commit = IMMEDIATE_NONE;
// Bind address
bind_address = config["bind_address"].string_value();
if (bind_address == "")
bind_address = "0.0.0.0";
bind_port = config["bind_port"].uint64_value();
if (bind_port <= 0 || bind_port > 65535)
bind_port = 0;
@ -322,41 +321,30 @@ void osd_t::parse_config(bool init)
void osd_t::bind_socket()
{
if (config["osd_network"].is_string() ||
config["osd_network"].is_array())
if (msgr.all_osd_network_masks.size())
{
std::vector<std::string> mask;
if (config["osd_network"].is_string())
mask.push_back(config["osd_network"].string_value());
else
for (auto v: config["osd_network"].array_items())
mask.push_back(v.string_value());
auto matched_addrs = getifaddr_list(mask);
if (matched_addrs.size() > 1)
bind_addresses = getifaddr_list(msgr.all_osd_network_masks);
if (!bind_addresses.size())
{
fprintf(stderr, "More than 1 address matches requested network(s): %s\n", json11::Json(matched_addrs).dump().c_str());
force_stop(1);
}
if (!matched_addrs.size())
{
std::string nets;
for (auto v: mask)
nets += (nets == "" ? v : ","+v);
auto nets = implode(", ", msgr.all_osd_networks);
fprintf(stderr, "Addresses matching osd_network(s) %s not found\n", nets.c_str());
force_stop(1);
}
bind_address = matched_addrs[0];
}
// FIXME Support multiple listening sockets
listen_fd = create_and_bind_socket(bind_address, bind_port, listen_backlog, &listening_port);
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
else
{
msgr.accept_connections(listen_fd);
});
bind_addresses.push_back("0.0.0.0");
}
for (auto & bind_address: bind_addresses)
{
int listen_fd = create_and_bind_socket(bind_address, bind_port, listen_backlog, &listening_port);
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
{
msgr.accept_connections(fd);
});
listen_fds.push_back(listen_fd);
}
}
bool osd_t::shutdown()

View File

@ -107,7 +107,6 @@ class osd_t
bool no_recovery = false;
bool no_scrub = false;
bool allow_net_split = false;
std::string bind_address;
int bind_port, listen_backlog = 128;
// FIXME: Implement client queue depth limit
int client_queue_depth = 128;
@ -200,7 +199,8 @@ class osd_t
epoll_manager_t *epmgr = NULL;
int listening_port = 0;
int listen_fd = 0;
std::vector<std::string> bind_addresses;
std::vector<int> listen_fds;
ring_consumer_t consumer;
// op statistics

View File

@ -165,8 +165,8 @@ json11::Json osd_t::get_osd_state()
hostname.resize(strnlen(hostname.data(), hostname.size()));
json11::Json::object st;
st["state"] = "up";
if (bind_address != "0.0.0.0")
st["addresses"] = json11::Json::array { bind_address };
if (bind_addresses.size() != 1 || bind_addresses[0] != "0.0.0.0")
st["addresses"] = bind_addresses;
else
st["addresses"] = getifaddr_list();
st["host"] = std::string(hostname.data(), hostname.size());

View File

@ -126,13 +126,11 @@ addr_mask_t cidr_parse(std::string mask)
}
}
std::vector<std::string> getifaddr_list(std::vector<std::string> mask_cfg, bool include_v6)
std::vector<std::string> getifaddr_list(const std::vector<addr_mask_t> & masks, bool include_v6)
{
std::vector<addr_mask_t> masks;
for (auto mask: mask_cfg)
for (auto & mask: masks)
{
masks.push_back(cidr_parse(mask));
if (masks[masks.size()-1].family == AF_INET6)
if (mask.family == AF_INET6)
{
// Auto-enable IPv6 addresses
include_v6 = true;

View File

@ -18,5 +18,5 @@ std::string addr_to_string(const sockaddr_storage &addr);
addr_mask_t cidr_parse(std::string mask);
bool cidr_match(const in_addr &address, const in_addr &network, uint8_t bits);
bool cidr6_match(const in6_addr &address, const in6_addr &network, uint8_t bits);
std::vector<std::string> getifaddr_list(std::vector<std::string> mask_cfg = std::vector<std::string>(), bool include_v6 = false);
std::vector<std::string> getifaddr_list(const std::vector<addr_mask_t> & masks = std::vector<addr_mask_t>(), bool include_v6 = false);
int create_and_bind_socket(std::string bind_address, int bind_port, int listen_backlog, int *listening_port);

View File

@ -33,3 +33,19 @@ bool json_is_false(const json11::Json & val)
return !val.bool_value();
return false;
}
std::string implode(const std::string & sep, json11::Json array)
{
if (array.is_number() || array.is_bool() || array.is_string())
{
return array.as_string();
}
std::string res;
bool first = true;
for (auto & item: array.array_items())
{
res += (first ? item.as_string() : sep+item.as_string());
first = false;
}
return res;
}

View File

@ -11,3 +11,4 @@
std::map<std::string, std::string> json_to_string_map(const json11::Json::object & config);
bool json_is_true(const json11::Json & val);
bool json_is_false(const json11::Json & val);
std::string implode(const std::string & sep, json11::Json array);