Compare commits

...

2 Commits

Author SHA1 Message Date
Vitaliy Filippov 321b5963cb Add an alternative RDMA implementation via RDMA-CM
Required for non-RoCE cards: iWARP and, possibly, Infiniband
2025-03-31 15:56:29 +03:00
Vitaliy Filippov a0d862f4e6 Support multiple RDMA networks 2025-03-31 15:56:29 +03:00
13 changed files with 944 additions and 263 deletions

View File

@ -7,10 +7,14 @@ set(MSGR_RDMA "")
if (IBVERBS_LIBRARIES)
set(MSGR_RDMA "msgr_rdma.cpp")
endif (IBVERBS_LIBRARIES)
set(MSGR_RDMACM "")
if (RDMACM_LIBRARIES)
set(MSGR_RDMACM "msgr_rdmacm.cpp")
endif (RDMACM_LIBRARIES)
add_library(vitastor_common STATIC
../util/epoll_manager.cpp etcd_state_client.cpp messenger.cpp ../util/addr_util.cpp
msgr_stop.cpp msgr_op.cpp msgr_send.cpp msgr_receive.cpp ../util/ringloop.cpp ../../json11/json11.cpp
http_client.cpp osd_ops.cpp pg_states.cpp ../util/timerfd_manager.cpp ../util/str_util.cpp ../util/json_util.cpp ${MSGR_RDMA}
http_client.cpp osd_ops.cpp pg_states.cpp ../util/timerfd_manager.cpp ../util/str_util.cpp ../util/json_util.cpp ${MSGR_RDMA} ${MSGR_RDMACM}
)
target_link_libraries(vitastor_common pthread)
target_compile_options(vitastor_common PUBLIC -fPIC)
@ -28,6 +32,7 @@ target_link_libraries(vitastor_client
vitastor_cli
${LIBURING_LIBRARIES}
${IBVERBS_LIBRARIES}
${RDMACM_LIBRARIES}
)
set_target_properties(vitastor_client PROPERTIES VERSION ${VITASTOR_VERSION} SOVERSION 0)
configure_file(vitastor.pc.in vitastor.pc @ONLY)

View File

@ -117,31 +117,56 @@ void msgr_iothread_t::run()
void osd_messenger_t::init()
{
// FIXME: Support multiple RDMA networks?!
#ifdef WITH_RDMACM
if (use_rdmacm)
{
// RDMA-CM only requires the event channel. All the remaining work is done separately
rdmacm_evch = rdma_create_event_channel();
if (!rdmacm_evch)
{
fprintf(stderr, "Failed to initialize RDMA-CM event channel: %s (code %d)\n", strerror(errno), errno);
}
else
{
fcntl(rdmacm_evch->fd, F_SETFL, fcntl(rdmacm_evch->fd, F_GETFL, 0) | O_NONBLOCK);
tfd->set_fd_handler(rdmacm_evch->fd, false, [this](int rdmacm_eventfd, int epoll_events)
{
handle_rdmacm_events();
});
}
}
else
#endif
#ifdef WITH_RDMA
if (use_rdma)
{
rdma_context = msgr_rdma_context_t::create(
rdma_contexts = msgr_rdma_context_t::create_all(
osd_num && osd_cluster_network_masks.size() ? osd_cluster_network_masks : osd_network_masks,
rdma_device != "" ? rdma_device.c_str() : NULL,
rdma_port_num, rdma_gid_index, rdma_mtu, rdma_odp, log_level
);
if (!rdma_context)
if (!rdma_contexts.size())
{
if (force_rdma)
{
fprintf(stderr, "[OSD %ju] Couldn't initialize RDMA, force_rdma is enabled, exiting\n", osd_num);
exit(1);
}
if (log_level > 0)
fprintf(stderr, "[OSD %ju] Couldn't initialize RDMA, proceeding with TCP only\n", osd_num);
}
else
{
rdma_max_sge = rdma_max_sge < rdma_context->attrx.orig_attr.max_sge
? rdma_max_sge : rdma_context->attrx.orig_attr.max_sge;
fprintf(stderr, "[OSD %ju] RDMA initialized successfully\n", osd_num);
fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK);
tfd->set_fd_handler(rdma_context->channel->fd, false, [this](int notify_fd, int epoll_events)
for (msgr_rdma_context_t* rdma_context: rdma_contexts)
{
handle_rdma_events();
});
handle_rdma_events();
fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK);
tfd->set_fd_handler(rdma_context->channel->fd, false, [this, rdma_context](int notify_fd, int epoll_events)
{
handle_rdma_events(rdma_context);
});
handle_rdma_events(rdma_context);
}
}
}
#endif
@ -249,10 +274,19 @@ osd_messenger_t::~osd_messenger_t()
iothreads.clear();
}
#ifdef WITH_RDMA
if (rdma_context)
for (auto rdma_context: rdma_contexts)
{
delete rdma_context;
}
rdma_contexts.clear();
#endif
#ifdef WITH_RDMACM
if (rdmacm_evch)
{
tfd->set_fd_handler(rdmacm_evch->fd, false, NULL);
rdma_destroy_event_channel(rdmacm_evch);
rdmacm_evch = NULL;
}
#endif
}
@ -264,10 +298,18 @@ void osd_messenger_t::parse_config(const json11::Json & config)
// RDMA is on by default in RDMA-enabled builds
this->use_rdma = config["use_rdma"].bool_value() || config["use_rdma"].uint64_value() != 0;
}
#ifdef WITH_RDMACM
// Use RDMA CM? (required for iWARP and may be useful for IB)
// FIXME: Only parse during start
this->use_rdmacm = config["use_rdmacm"].bool_value() || config["use_rdmacm"].uint64_value() != 0;
this->disable_tcp = this->use_rdmacm && (config["disable_tcp"].bool_value() || config["disable_tcp"].uint64_value() != 0);
#endif
if (!config["force_rdma"].is_null())
{
this->force_rdma = config["force_rdma"].bool_value() || config["force_rdma"].uint64_value() != 0;
}
this->rdma_device = config["rdma_device"].string_value();
this->rdma_port_num = (uint8_t)config["rdma_port_num"].uint64_value();
if (!this->rdma_port_num)
this->rdma_port_num = 1;
if (!config["rdma_gid_index"].is_null())
this->rdma_gid_index = (uint8_t)config["rdma_gid_index"].uint64_value();
this->rdma_mtu = (uint32_t)config["rdma_mtu"].uint64_value();
@ -351,7 +393,7 @@ void osd_messenger_t::connect_peer(uint64_t peer_osd, json11::Json peer_state)
if (wanted_peers[peer_osd].raw_address_list != peer_state["addresses"])
{
wanted_peers[peer_osd].raw_address_list = peer_state["addresses"];
if (osd_cluster_networks.size())
if (osd_cluster_network_masks.size())
{
json11::Json::array address_list, cluster_address_list;
for (auto json_addr: peer_state["addresses"].array_items())
@ -363,9 +405,7 @@ void osd_messenger_t::connect_peer(uint64_t peer_osd, json11::Json peer_state)
bool is_cluster = false;
for (auto & mask: osd_cluster_network_masks)
{
if (mask.family == addr.ss_family && (mask.family == AF_INET
? cidr_match(*(in_addr*)&addr, mask.ipv4, mask.bits)
: cidr6_match(*(in6_addr*)&addr, mask.ipv6, mask.bits)))
if (cidr_sockaddr_match(addr, mask))
{
is_cluster = true;
break;
@ -387,6 +427,9 @@ void osd_messenger_t::connect_peer(uint64_t peer_osd, json11::Json peer_state)
wanted_peers[peer_osd].address_list = peer_state["addresses"];
wanted_peers[peer_osd].address_changed = true;
}
#ifdef WITH_RDMACM
wanted_peers[peer_osd].peer_rdmacm = peer_state["rdmacm"].bool_value();
#endif
wanted_peers[peer_osd].port = (int)peer_state["port"].int64_value();
try_connect_peer(peer_osd);
}
@ -412,12 +455,24 @@ void osd_messenger_t::try_connect_peer(uint64_t peer_osd)
wp.cur_addr = wp.address_list[wp.address_index].string_value();
wp.cur_port = wp.port;
wp.connecting = true;
try_connect_peer_addr(peer_osd, wp.cur_addr.c_str(), wp.cur_port);
#ifdef WITH_RDMACM
if (use_rdmacm && wp.peer_rdmacm)
rdmacm_try_connect_peer(peer_osd, wp.cur_addr.c_str(), wp.cur_port);
else
#endif
try_connect_peer_tcp(peer_osd, wp.cur_addr.c_str(), wp.cur_port);
}
void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port)
void osd_messenger_t::try_connect_peer_tcp(osd_num_t peer_osd, const char *peer_host, int peer_port)
{
assert(peer_osd != this->osd_num);
#ifdef WITH_RDMACM
if (disable_tcp)
{
on_connect_peer(peer_osd, -EINVAL);
return;
}
#endif
struct sockaddr_storage addr;
if (!string_to_addr(peer_host, 0, peer_port, &addr))
{
@ -581,20 +636,30 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
},
};
#ifdef WITH_RDMA
if (rdma_context)
if (rdma_contexts.size())
{
cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg);
if (cl->rdma_conn)
// Choose the right context for the selected network
msgr_rdma_context_t *selected_ctx = choose_rdma_context(cl);
if (!selected_ctx)
{
json11::Json payload = json11::Json::object {
{ "connect_rdma", cl->rdma_conn->addr.to_string() },
{ "rdma_max_msg", cl->rdma_conn->max_msg },
};
std::string payload_str = payload.dump();
op->req.show_conf.json_len = payload_str.size();
op->buf = malloc_or_die(payload_str.size());
op->iov.push_back(op->buf, payload_str.size());
memcpy(op->buf, payload_str.c_str(), payload_str.size());
if (log_level > 0)
fprintf(stderr, "No RDMA context for OSD %ju connection (peer %d), using only TCP\n", cl->osd_num, cl->peer_fd);
}
else
{
cl->rdma_conn = msgr_rdma_connection_t::create(selected_ctx, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg);
if (cl->rdma_conn)
{
json11::Json payload = json11::Json::object {
{ "connect_rdma", cl->rdma_conn->addr.to_string() },
{ "rdma_max_msg", cl->rdma_conn->max_msg },
};
std::string payload_str = payload.dump();
op->req.show_conf.json_len = payload_str.size();
op->buf = malloc_or_die(payload_str.size());
op->iov.push_back(op->buf, payload_str.size());
memcpy(op->buf, payload_str.c_str(), payload_str.size());
}
}
}
#endif
@ -724,9 +789,29 @@ void osd_messenger_t::accept_connections(int listen_fd)
}
#ifdef WITH_RDMA
msgr_rdma_context_t* osd_messenger_t::choose_rdma_context(osd_client_t *cl)
{
// Choose the right context for the selected network
msgr_rdma_context_t *selected_ctx = NULL;
for (auto rdma_ctx: rdma_contexts)
{
if (!rdma_ctx->net_mask.family && !selected_ctx ||
rdma_ctx->net_mask.family && cidr_sockaddr_match(cl->peer_addr, rdma_ctx->net_mask))
{
selected_ctx = rdma_ctx;
}
}
return selected_ctx;
}
bool osd_messenger_t::is_rdma_enabled()
{
return rdma_context != NULL;
return rdma_contexts.size() > 0;
}
bool osd_messenger_t::is_use_rdmacm()
{
return use_rdmacm;
}
#endif

View File

@ -50,10 +50,10 @@ struct osd_client_t
{
int refs = 0;
sockaddr_storage peer_addr;
int peer_port;
sockaddr_storage peer_addr = {};
int peer_port = 0;
int peer_fd = -1;
int peer_state;
int peer_state = 0;
int connect_timeout_id = -1;
int ping_time_remaining = 0;
int idle_time_remaining = 0;
@ -96,6 +96,7 @@ struct osd_wanted_peer_t
{
json11::Json raw_address_list;
json11::Json address_list;
bool peer_rdmacm = false;
int n_cluster_addr = 0;
int port = 0;
time_t last_connect_attempt = 0;
@ -152,6 +153,15 @@ public:
};
#endif
#ifdef WITH_RDMA
struct rdma_event_channel;
struct rdma_cm_id;
struct rdma_cm_event;
struct ibv_context;
struct osd_messenger_t;
struct rdmacm_connecting_t;
#endif
struct osd_messenger_t
{
protected:
@ -168,13 +178,20 @@ protected:
#ifdef WITH_RDMA
bool use_rdma = true;
bool use_rdmacm = false;
bool disable_tcp = false;
bool force_rdma = false;
std::string rdma_device;
uint64_t rdma_port_num = 1, rdma_mtu = 0;
uint64_t rdma_port_num = 1;
int rdma_mtu = 0;
int rdma_gid_index = -1;
msgr_rdma_context_t *rdma_context = NULL;
std::vector<msgr_rdma_context_t *> rdma_contexts;
uint64_t rdma_max_sge = 0, rdma_max_send = 0, rdma_max_recv = 0;
uint64_t rdma_max_msg = 0;
bool rdma_odp = false;
rdma_event_channel *rdmacm_evch = NULL;
std::map<rdma_cm_id*, osd_client_t*> rdmacm_connections;
std::map<rdma_cm_id*, rdmacm_connecting_t*> rdmacm_connecting;
#endif
std::vector<msgr_iothread_t*> iothreads;
@ -224,13 +241,18 @@ public:
bool is_rdma_enabled();
bool connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg);
#endif
#ifdef WITH_RDMACM
bool is_use_rdmacm();
rdma_cm_id *rdmacm_listen(const std::string & bind_address, int rdmacm_port, int *bound_port, int log_level);
void rdmacm_destroy_listener(rdma_cm_id *listener);
#endif
void inc_op_stats(osd_op_stats_t & stats, uint64_t opcode, timespec & tv_begin, timespec & tv_end, uint64_t len);
void measure_exec(osd_op_t *cur_op);
protected:
void try_connect_peer(uint64_t osd_num);
void try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port);
void try_connect_peer_tcp(osd_num_t peer_osd, const char *peer_host, int peer_port);
void handle_peer_epoll(int peer_fd, int epoll_events);
void handle_connect_epoll(int peer_fd);
void on_connect_peer(osd_num_t peer_osd, int peer_fd);
@ -255,6 +277,18 @@ protected:
void try_send_rdma_odp(osd_client_t *cl);
void try_send_rdma_nodp(osd_client_t *cl);
bool try_recv_rdma(osd_client_t *cl);
void handle_rdma_events();
void handle_rdma_events(msgr_rdma_context_t *rdma_context);
msgr_rdma_context_t* choose_rdma_context(osd_client_t *cl);
#endif
#ifdef WITH_RDMACM
void handle_rdmacm_events();
msgr_rdma_context_t* rdmacm_get_context(ibv_context *verbs);
msgr_rdma_context_t* rdmacm_create_qp(rdma_cm_id *cmid);
void rdmacm_accept(rdma_cm_event *ev);
void rdmacm_try_connect_peer(uint64_t peer_osd, const std::string & addr, int peer_port);
void rdmacm_on_connect_peer_error(rdma_cm_id *cmid, int res);
void rdmacm_address_resolved(rdma_cm_event *ev);
void rdmacm_route_resolved(rdma_cm_event *ev);
void rdmacm_established(rdma_cm_event *ev);
#endif
};

View File

@ -37,15 +37,22 @@ msgr_rdma_context_t::~msgr_rdma_context_t()
ibv_dereg_mr(mr);
if (pd)
ibv_dealloc_pd(pd);
if (context)
if (context && !is_cm)
ibv_close_device(context);
}
msgr_rdma_connection_t::~msgr_rdma_connection_t()
{
ctx->used_max_cqe -= max_send+max_recv;
if (qp)
ctx->reserve_cqe(-max_send-max_recv);
if (qp && !cmid)
ibv_destroy_qp(qp);
if (cmid)
{
ctx->cm_refs--;
if (cmid->qp)
rdma_destroy_qp(cmid);
rdma_destroy_id(cmid);
}
if (recv_buffers.size())
{
for (auto b: recv_buffers)
@ -76,21 +83,21 @@ static bool is_ipv4_gid(ibv_gid_entry *gidx)
((uint32_t*)gidx->gid.raw)[2] == 0xffff0000);
}
static bool match_gid(ibv_gid_entry *gidx, const addr_mask_t *networks, int nnet)
static int match_gid(ibv_gid_entry *gidx, const addr_mask_t *networks, int nnet)
{
if (gidx->gid_type != IBV_GID_TYPE_ROCE_V1 &&
gidx->gid_type != IBV_GID_TYPE_ROCE_V2 ||
((uint64_t*)gidx->gid.raw)[0] == 0 &&
((uint64_t*)gidx->gid.raw)[1] == 0)
{
return false;
return -1;
}
if (is_ipv4_gid(gidx))
{
for (int i = 0; i < nnet; i++)
{
if (networks[i].family == AF_INET && cidr_match(*(in_addr*)(gidx->gid.raw+12), networks[i].ipv4, networks[i].bits))
return true;
return i;
}
}
else
@ -98,119 +105,67 @@ static bool match_gid(ibv_gid_entry *gidx, const addr_mask_t *networks, int nnet
for (int i = 0; i < nnet; i++)
{
if (networks[i].family == AF_INET6 && cidr6_match(*(in6_addr*)gidx->gid.raw, networks[i].ipv6, networks[i].bits))
return true;
return i;
}
}
return false;
return -1;
}
struct matched_dev
{
int dev = -1;
int port = -1;
int gid = -1;
bool rocev2 = false;
};
static void log_rdma_dev_port_gid(ibv_device *dev, int ib_port, int gid_index, ibv_gid_entry & gidx)
static void log_rdma_dev_port_gid(ibv_device *dev, int ib_port, int gid_index, int mtu, ibv_gid_entry & gidx)
{
bool is4 = ((uint64_t*)gidx.gid.raw)[0] == 0 && ((uint32_t*)gidx.gid.raw)[2] == 0xffff0000;
char buf[256];
inet_ntop(is4 ? AF_INET : AF_INET6, is4 ? gidx.gid.raw+12 : gidx.gid.raw, buf, sizeof(buf));
fprintf(
stderr, "Auto-selected RDMA device %s port %d GID %d - ROCEv%d IPv%d %s\n",
stderr, "Selected RDMA device %s port %d GID %d - ROCEv%d IPv%d %s, MTU %d\n",
ibv_get_device_name(dev), ib_port, gid_index,
gidx.gid_type == IBV_GID_TYPE_ROCE_V2 ? 2 : 1, is4 ? 4 : 6, buf
gidx.gid_type == IBV_GID_TYPE_ROCE_V2 ? 2 : 1, is4 ? 4 : 6, buf, mtu
);
}
static matched_dev match_device(ibv_device **dev_list, const addr_mask_t *networks, int nnet, int log_level)
static int match_port_gid(const std::vector<addr_mask_t> & osd_network_masks, ibv_context *context,
int port_num, int gid_count, int log_level, ibv_gid_entry *best_gidx, int *net_num)
{
matched_dev best;
ibv_device_attr attr;
ibv_port_attr portinfo;
ibv_gid_entry best_gidx;
int res;
bool have_non_roce = false, have_roce = false;
for (int i = 0; dev_list[i]; ++i)
// Try to find a port with matching address
int best_gid_idx = -1, res = 0;
for (int k = 0; k < gid_count; k++)
{
auto dev = dev_list[i];
ibv_context *context = ibv_open_device(dev_list[i]);
if ((res = ibv_query_device(context, &attr)) != 0)
ibv_gid_entry gidx;
if ((res = ibv_query_gid_ex(context, port_num, k, &gidx, 0)) != 0)
{
fprintf(stderr, "Couldn't query RDMA device %s for its features: %s\n", ibv_get_device_name(dev_list[i]), strerror(res));
goto cleanup;
}
for (int j = 1; j <= attr.phys_port_cnt; j++)
{
// Try to find a port with matching address
if ((res = ibv_query_port(context, j, &portinfo)) != 0)
if (res != ENODATA)
{
fprintf(stderr, "Couldn't get RDMA device %s port %d info: %s\n", ibv_get_device_name(dev), j, strerror(res));
goto cleanup;
fprintf(stderr, "Couldn't read RDMA device %s GID index %d: %s\n", ibv_get_device_name(context->device), k, strerror(res));
continue;
}
for (int k = 0; k < portinfo.gid_tbl_len; k++)
else
break;
}
if ((res = match_gid(&gidx, osd_network_masks.data(), osd_network_masks.size())) >= 0)
{
// Prefer RoCEv2
if (best_gid_idx < 0 || best_gidx->gid_type != IBV_GID_TYPE_ROCE_V2 && gidx.gid_type == IBV_GID_TYPE_ROCE_V2)
{
ibv_gid_entry gidx;
if ((res = ibv_query_gid_ex(context, j, k, &gidx, 0)) != 0)
{
if (res != ENODATA)
{
fprintf(stderr, "Couldn't read RDMA device %s GID index %d: %s\n", ibv_get_device_name(dev), k, strerror(res));
goto cleanup;
}
else
break;
}
if (gidx.gid_type != IBV_GID_TYPE_ROCE_V1 &&
gidx.gid_type != IBV_GID_TYPE_ROCE_V2)
have_non_roce = true;
else
have_roce = true;
if (match_gid(&gidx, networks, nnet))
{
// Prefer RoCEv2
if (!best.rocev2)
{
best.dev = i;
best.port = j;
best.gid = k;
best.rocev2 = (gidx.gid_type == IBV_GID_TYPE_ROCE_V2);
best_gidx = gidx;
}
}
best_gid_idx = k;
*best_gidx = gidx;
*net_num = res;
}
}
cleanup:
ibv_close_device(context);
if (best.rocev2)
{
break;
}
}
if (best.dev >= 0 && log_level > 0)
{
log_rdma_dev_port_gid(dev_list[best.dev], best.port, best.gid, best_gidx);
}
if (best.dev < 0 && have_non_roce && !have_roce)
{
best.dev = -2;
}
return best;
return best_gid_idx;
}
#endif
msgr_rdma_context_t *msgr_rdma_context_t::create(const std::vector<addr_mask_t> & osd_network_masks, const char *ib_devname, uint8_t ib_port, int gid_index, uint32_t mtu, bool odp, int log_level)
std::vector<msgr_rdma_context_t*> msgr_rdma_context_t::create_all(const std::vector<addr_mask_t> & osd_network_masks,
const char *sel_dev_name, int sel_port_num, int sel_gid_index, uint32_t sel_mtu, bool odp, int log_level)
{
int res;
std::vector<msgr_rdma_context_t*> ret;
ibv_device **raw_dev_list = NULL;
ibv_device **dev_list = NULL;
msgr_rdma_context_t *ctx = new msgr_rdma_context_t();
ctx->mtu = mtu;
ibv_device *single_list[2] = {};
timespec tv;
clock_gettime(CLOCK_REALTIME, &tv);
srand48(tv.tv_sec*1000000000 + tv.tv_nsec);
dev_list = ibv_get_device_list(NULL);
raw_dev_list = dev_list = ibv_get_device_list(NULL);
if (!dev_list || !*dev_list)
{
if (errno == -ENOSYS || errno == ENOSYS)
@ -227,116 +182,126 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const std::vector<addr_mask_t>
fprintf(stderr, "Failed to get RDMA device list: %s\n", strerror(errno));
goto cleanup;
}
if (ib_devname)
if (sel_dev_name)
{
int i;
for (i = 0; dev_list[i]; ++i)
if (!strcmp(ibv_get_device_name(dev_list[i]), ib_devname))
if (!strcmp(ibv_get_device_name(dev_list[i]), sel_dev_name))
break;
ctx->dev = dev_list[i];
if (!ctx->dev)
if (!dev_list[i])
{
fprintf(stderr, "RDMA device %s not found\n", ib_devname);
fprintf(stderr, "RDMA device %s not found\n", sel_dev_name);
goto cleanup;
}
}
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT
else if (osd_network_masks.size())
{
auto best = match_device(dev_list, osd_network_masks.data(), osd_network_masks.size(), log_level);
if (best.dev == -2)
{
best.dev = 0;
if (log_level > 0)
fprintf(stderr, "No RoCE devices found, using first available RDMA device %s\n", ibv_get_device_name(*dev_list));
}
else if (best.dev < 0)
{
if (log_level > 0)
fprintf(stderr, "RDMA device matching osd_network is not found, disabling RDMA\n");
goto cleanup;
}
else
{
ib_port = best.port;
gid_index = best.gid;
}
ctx->dev = dev_list[best.dev];
}
#endif
else
{
ctx->dev = *dev_list;
single_list[0] = dev_list[i];
dev_list = single_list;
}
ctx->context = ibv_open_device(ctx->dev);
if (!ctx->context)
for (int i = 0; dev_list[i]; ++i)
{
fprintf(stderr, "Couldn't get RDMA context for %s\n", ibv_get_device_name(ctx->dev));
goto cleanup;
}
ctx->ib_port = ib_port;
if ((res = ibv_query_port(ctx->context, ib_port, &ctx->portinfo)) != 0)
{
fprintf(stderr, "Couldn't get RDMA device %s port %d info: %s\n", ibv_get_device_name(ctx->dev), ib_port, strerror(res));
goto cleanup;
}
ctx->my_lid = ctx->portinfo.lid;
if (ctx->portinfo.link_layer != IBV_LINK_LAYER_ETHERNET && !ctx->my_lid)
{
fprintf(stderr, "RDMA device %s must have local LID because it's not Ethernet, but LID is zero\n", ibv_get_device_name(ctx->dev));
goto cleanup;
}
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT
if (gid_index != -1)
#endif
{
ctx->gid_index = gid_index < 0 ? 0 : gid_index;
if (ibv_query_gid(ctx->context, ib_port, gid_index, &ctx->my_gid))
auto dev = dev_list[i];
ibv_context *context = ibv_open_device(dev);
if (!context)
{
fprintf(stderr, "Couldn't read RDMA device %s GID index %d\n", ibv_get_device_name(ctx->dev), gid_index);
goto cleanup;
fprintf(stderr, "Couldn't get RDMA context for %s\n", ibv_get_device_name(dev));
continue;
}
}
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT
else
{
// Auto-guess GID
ibv_gid_entry best_gidx;
for (int k = 0; k < ctx->portinfo.gid_tbl_len; k++)
ibv_device_attr attr;
if ((res = ibv_query_device(context, &attr)) != 0)
{
ibv_gid_entry gidx;
if (ibv_query_gid_ex(ctx->context, ib_port, k, &gidx, 0) != 0)
fprintf(stderr, "Couldn't query RDMA device %s for its features: %s\n", ibv_get_device_name(dev), strerror(res));
goto cleanup_dev;
}
if (sel_port_num && sel_port_num > attr.phys_port_cnt)
{
fprintf(stderr, "RDMA device %s port %d does not exist\n", ibv_get_device_name(dev), sel_port_num);
goto cleanup_dev;
}
for (int port_num = (sel_port_num ? sel_port_num : 1); port_num <= (sel_port_num ? sel_port_num : attr.phys_port_cnt); port_num++)
{
ibv_port_attr portinfo;
if ((res = ibv_query_port(context, port_num, &portinfo)) != 0)
{
fprintf(stderr, "Couldn't read RDMA device %s GID index %d\n", ibv_get_device_name(ctx->dev), k);
goto cleanup;
fprintf(stderr, "Couldn't get RDMA device %s port %d info: %s\n", ibv_get_device_name(dev), port_num, strerror(res));
continue;
}
// Skip empty GID
if (((uint64_t*)gidx.gid.raw)[0] == 0 &&
((uint64_t*)gidx.gid.raw)[1] == 0)
if (portinfo.state != IBV_PORT_ACTIVE)
{
continue;
}
// Prefer IPv4 RoCEv2 -> IPv6 RoCEv2 -> IPv4 RoCEv1 -> IPv6 RoCEv1 -> IB
if (gid_index == -1 ||
gidx.gid_type == IBV_GID_TYPE_ROCE_V2 && best_gidx.gid_type != IBV_GID_TYPE_ROCE_V2 ||
gidx.gid_type == IBV_GID_TYPE_ROCE_V1 && best_gidx.gid_type == IBV_GID_TYPE_IB ||
gidx.gid_type == best_gidx.gid_type && is_ipv4_gid(&gidx))
if (sel_gid_index >= (int)portinfo.gid_tbl_len)
{
gid_index = k;
best_gidx = gidx;
fprintf(stderr, "RDMA device %s port %d GID %d does not exist\n", ibv_get_device_name(dev), port_num, sel_gid_index);
continue;
}
uint32_t port_mtu = sel_mtu ? sel_mtu : portinfo.active_mtu;
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT
if (sel_gid_index < 0)
{
ibv_gid_entry best_gidx;
int net_num = 0;
int best_gid_idx = match_port_gid(osd_network_masks, context, port_num, portinfo.gid_tbl_len, log_level, &best_gidx, &net_num);
if (best_gid_idx >= 0)
{
if (log_level > 0)
log_rdma_dev_port_gid(dev, port_num, best_gid_idx, port_mtu, best_gidx);
auto ctx = msgr_rdma_context_t::create(dev, portinfo, port_num, best_gid_idx, port_mtu, odp, log_level);
if (ctx)
{
ctx->net_mask = osd_network_masks[net_num];
ret.push_back(ctx);
}
}
}
else
#endif
{
int best_gid_idx = sel_gid_index >= 0 ? sel_gid_index : 0;
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT
if (log_level > 0)
{
ibv_gid_entry gidx;
ibv_query_gid_ex(context, port_num, best_gid_idx, &gidx, 0);
log_rdma_dev_port_gid(dev, port_num, best_gid_idx, port_mtu, gidx);
}
#endif
auto ctx = msgr_rdma_context_t::create(dev, portinfo, port_num, best_gid_idx, port_mtu, odp, log_level);
if (ctx)
ret.push_back(ctx);
}
}
ctx->gid_index = gid_index = (gid_index == -1 ? 0 : gid_index);
if (log_level > 0)
{
log_rdma_dev_port_gid(ctx->dev, ctx->ib_port, ctx->gid_index, best_gidx);
}
ctx->my_gid = best_gidx.gid;
cleanup_dev:
ibv_close_device(context);
}
#endif
cleanup:
if (raw_dev_list)
ibv_free_device_list(raw_dev_list);
return ret;
}
msgr_rdma_context_t *msgr_rdma_context_t::create(ibv_device *dev, ibv_port_attr & portinfo, int ib_port, int gid_index, uint32_t mtu, bool odp, int log_level)
{
msgr_rdma_context_t *ctx = new msgr_rdma_context_t();
ibv_context *context = ibv_open_device(dev);
if (!context)
{
fprintf(stderr, "Couldn't get RDMA context for %s\n", ibv_get_device_name(dev));
goto cleanup;
}
ctx->mtu = mtu;
ctx->context = context;
ctx->ib_port = ib_port;
ctx->portinfo = portinfo;
ctx->my_lid = ctx->portinfo.lid;
if (ctx->portinfo.link_layer != IBV_LINK_LAYER_ETHERNET && !ctx->my_lid)
{
fprintf(stderr, "RDMA device %s must have local LID because it's not Ethernet, but LID is zero\n", ibv_get_device_name(dev));
goto cleanup;
}
ctx->gid_index = gid_index;
ctx->pd = ibv_alloc_pd(ctx->context);
if (!ctx->pd)
@ -345,18 +310,18 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const std::vector<addr_mask_t>
goto cleanup;
}
ctx->odp = odp;
if (ctx->odp)
{
if (ibv_query_device_ex(ctx->context, NULL, &ctx->attrx))
{
fprintf(stderr, "Couldn't query RDMA device for its features\n");
goto cleanup;
}
ctx->odp = odp;
if (ctx->odp &&
(!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT) ||
if (!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT) ||
!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT_IMPLICIT) ||
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) ||
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV)))
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV))
{
ctx->odp = false;
if (log_level > 0)
@ -389,20 +354,43 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const std::vector<addr_mask_t>
goto cleanup;
}
if (dev_list)
ibv_free_device_list(dev_list);
return ctx;
cleanup:
if (context)
ibv_close_device(context);
delete ctx;
if (dev_list)
ibv_free_device_list(dev_list);
return NULL;
}
bool msgr_rdma_context_t::reserve_cqe(int n)
{
this->used_max_cqe += n;
if (this->used_max_cqe > this->max_cqe)
{
// Resize CQ
// Mellanox ConnectX-4 supports up to 4194303 CQEs, so it's fine to put everything into a single CQ
int new_max_cqe = this->max_cqe;
while (this->used_max_cqe > new_max_cqe)
{
new_max_cqe *= 2;
}
if (ibv_resize_cq(this->cq, new_max_cqe) != 0)
{
fprintf(stderr, "Couldn't resize RDMA completion queue to %d entries\n", new_max_cqe);
return false;
}
this->max_cqe = new_max_cqe;
}
return true;
}
msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx, uint32_t max_send,
uint32_t max_recv, uint32_t max_sge, uint32_t max_msg)
{
if (!ctx->reserve_cqe(max_send+max_recv))
return NULL;
msgr_rdma_connection_t *conn = new msgr_rdma_connection_t;
max_sge = max_sge > ctx->attrx.orig_attr.max_sge ? ctx->attrx.orig_attr.max_sge : max_sge;
@ -413,25 +401,6 @@ msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx,
conn->max_sge = max_sge;
conn->max_msg = max_msg;
ctx->used_max_cqe += max_send+max_recv;
if (ctx->used_max_cqe > ctx->max_cqe)
{
// Resize CQ
// Mellanox ConnectX-4 supports up to 4194303 CQEs, so it's fine to put everything into a single CQ
int new_max_cqe = ctx->max_cqe;
while (ctx->used_max_cqe > new_max_cqe)
{
new_max_cqe *= 2;
}
if (ibv_resize_cq(ctx->cq, new_max_cqe) != 0)
{
fprintf(stderr, "Couldn't resize RDMA completion queue to %d entries\n", new_max_cqe);
delete conn;
return NULL;
}
ctx->max_cqe = new_max_cqe;
}
ibv_qp_init_attr init_attr = {
.send_cq = ctx->cq,
.recv_cq = ctx->cq,
@ -544,7 +513,15 @@ bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address, uint64
{
client_max_msg = rdma_max_msg;
}
auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, client_max_msg);
auto cl = clients.at(peer_fd);
msgr_rdma_context_t *selected_ctx = choose_rdma_context(cl);
if (!selected_ctx)
{
if (log_level > 0)
fprintf(stderr, "No RDMA context for peer %d, using only TCP\n", cl->peer_fd);
return false;
}
msgr_rdma_connection_t *rdma_conn = msgr_rdma_connection_t::create(selected_ctx, rdma_max_send, rdma_max_recv, rdma_max_sge, client_max_msg);
if (rdma_conn)
{
int r = rdma_conn->connect(&addr);
@ -663,9 +640,9 @@ void osd_messenger_t::try_send_rdma_nodp(osd_client_t *cl)
// Allocate send ring buffer, if not yet
rc->send_out_size = rc->max_msg*rdma_max_send;
rc->send_out.buf = malloc_or_die(rc->send_out_size);
if (!rdma_context->odp)
if (!rc->ctx->odp)
{
rc->send_out.mr = ibv_reg_mr(rdma_context->pd, rc->send_out.buf, rc->send_out_size, 0);
rc->send_out.mr = ibv_reg_mr(rc->ctx->pd, rc->send_out.buf, rc->send_out_size, 0);
if (!rc->send_out.mr)
{
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
@ -695,7 +672,7 @@ void osd_messenger_t::try_send_rdma_nodp(osd_client_t *cl)
ibv_sge sge = {
.addr = (uintptr_t)dst,
.length = (uint32_t)copied,
.lkey = rdma_context->odp ? rdma_context->mr->lkey : rc->send_out.mr->lkey,
.lkey = rc->ctx->odp ? rc->ctx->mr->lkey : rc->send_out.mr->lkey,
};
try_send_rdma_wr(cl, &sge, 1);
rc->send_sizes.push_back(copied);
@ -705,7 +682,7 @@ void osd_messenger_t::try_send_rdma_nodp(osd_client_t *cl)
void osd_messenger_t::try_send_rdma(osd_client_t *cl)
{
if (rdma_context->odp)
if (cl->rdma_conn->ctx->odp)
try_send_rdma_odp(cl);
else
try_send_rdma_nodp(cl);
@ -740,9 +717,9 @@ bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
{
msgr_rdma_buf_t b;
b.buf = malloc_or_die(rc->max_msg);
if (!rdma_context->odp)
if (!rc->ctx->odp)
{
b.mr = ibv_reg_mr(rdma_context->pd, b.buf, rc->max_msg, IBV_ACCESS_LOCAL_WRITE);
b.mr = ibv_reg_mr(rc->ctx->pd, b.buf, rc->max_msg, IBV_ACCESS_LOCAL_WRITE);
if (!b.mr)
{
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
@ -757,7 +734,7 @@ bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
#define RDMA_EVENTS_AT_ONCE 32
void osd_messenger_t::handle_rdma_events()
void osd_messenger_t::handle_rdma_events(msgr_rdma_context_t *rdma_context)
{
// Request next notification
ibv_cq *ev_cq;

View File

@ -2,6 +2,9 @@
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#pragma once
#ifdef WITH_RDMACM
#include <rdma/rdma_cma.h>
#endif
#include <infiniband/verbs.h>
#include <string>
#include <vector>
@ -21,7 +24,6 @@ struct msgr_rdma_address_t
struct msgr_rdma_context_t
{
ibv_context *context = NULL;
ibv_device *dev = NULL;
ibv_device_attr_ex attrx;
ibv_pd *pd = NULL;
bool odp = false;
@ -36,8 +38,17 @@ struct msgr_rdma_context_t
uint32_t mtu;
int max_cqe = 0;
int used_max_cqe = 0;
addr_mask_t net_mask = {};
bool is_cm = false;
int cm_refs = 0;
static std::vector<msgr_rdma_context_t*> create_all(const std::vector<addr_mask_t> & osd_network_masks,
const char *sel_dev_name, int sel_port_num, int sel_gid_index, uint32_t sel_mtu, bool odp, int log_level);
static msgr_rdma_context_t *create(ibv_device *dev, ibv_port_attr & portinfo,
int ib_port, int gid_index, uint32_t mtu, bool odp, int log_level);
static msgr_rdma_context_t* create_cm(ibv_context *ctx);
bool reserve_cqe(int n);
static msgr_rdma_context_t *create(const std::vector<addr_mask_t> & osd_network_masks, const char *ib_devname, uint8_t ib_port, int gid_index, uint32_t mtu, bool odp, int log_level);
~msgr_rdma_context_t();
};
@ -51,11 +62,14 @@ struct msgr_rdma_connection_t
{
msgr_rdma_context_t *ctx = NULL;
ibv_qp *qp = NULL;
#ifdef WITH_RDMACM
rdma_cm_id *cmid = NULL;
#endif
msgr_rdma_address_t addr;
int max_send = 0, max_recv = 0, max_sge = 0;
int cur_send = 0, cur_recv = 0;
uint64_t max_msg = 0;
int cur_send = 0, cur_recv = 0;
int send_pos = 0, send_buf_pos = 0;
int next_recv_buf = 0;
std::vector<msgr_rdma_buf_t> recv_buffers;

523
src/client/msgr_rdmacm.cpp Normal file
View File

@ -0,0 +1,523 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include "msgr_rdma.h"
#include "messenger.h"
struct rdmacm_connecting_t
{
rdma_cm_id *cmid = NULL;
int peer_fd = -1;
osd_num_t peer_osd = 0;
std::string addr;
sockaddr_storage parsed_addr = {};
int peer_port = 0;
int timeout_ms = 0;
int timeout_id = -1;
msgr_rdma_context_t *rdma_context = NULL;
};
rdma_cm_id *osd_messenger_t::rdmacm_listen(const std::string & bind_address, int rdmacm_port, int *bound_port, int log_level)
{
sockaddr_storage addr = {};
rdma_cm_id *listener = NULL;
int r = rdma_create_id(rdmacm_evch, &listener, NULL, RDMA_PS_TCP);
if (r != 0)
{
fprintf(stderr, "Failed to create RDMA-CM ID: %s (code %d)\n", strerror(errno), errno);
goto fail;
}
if (!string_to_addr(bind_address, 0, rdmacm_port, &addr))
{
fprintf(stderr, "Server address: %s is not valid\n", bind_address.c_str());
goto fail;
}
r = rdma_bind_addr(listener, (sockaddr*)&addr);
if (r != 0)
{
fprintf(stderr, "Failed to bind RDMA-CM to %s:%d: %s (code %d)\n", bind_address.c_str(), rdmacm_port, strerror(errno), errno);
goto fail;
}
r = rdma_listen(listener, 128);
if (r != 0)
{
fprintf(stderr, "Failed to listen to RDMA-CM address %s:%d: %s (code %d)\n", bind_address.c_str(), rdmacm_port, strerror(errno), errno);
goto fail;
}
if (bound_port)
{
*bound_port = ntohs(rdma_get_src_port(listener));
}
if (log_level > 0)
{
fprintf(stderr, "Listening to RDMA-CM address %s port %d\n", bind_address.c_str(), *bound_port);
}
return listener;
fail:
rdma_destroy_id(listener);
return NULL;
}
void osd_messenger_t::rdmacm_destroy_listener(rdma_cm_id *listener)
{
rdma_destroy_id(listener);
}
void osd_messenger_t::handle_rdmacm_events()
{
// rdma_destroy_id infinitely waits for pthread_cond if called before all events are acked :-(
std::vector<rdma_cm_event> events_copy;
while (1)
{
rdma_cm_event *ev = NULL;
int r = rdma_get_cm_event(rdmacm_evch, &ev);
if (r != 0)
{
if (errno == EAGAIN || errno == EINTR)
break;
fprintf(stderr, "Failed to get RDMA-CM event: %s (code %d)\n", strerror(errno), errno);
exit(1);
}
events_copy.push_back(*ev);
r = rdma_ack_cm_event(ev);
if (r != 0)
{
fprintf(stderr, "Failed to ack (free) RDMA-CM event: %s (code %d)\n", strerror(errno), errno);
exit(1);
}
}
for (auto & evl: events_copy)
{
auto ev = &evl;
if (ev->event == RDMA_CM_EVENT_CONNECT_REQUEST)
{
rdmacm_accept(ev);
}
else if (ev->event == RDMA_CM_EVENT_CONNECT_ERROR ||
ev->event == RDMA_CM_EVENT_REJECTED ||
ev->event == RDMA_CM_EVENT_DISCONNECTED ||
ev->event == RDMA_CM_EVENT_DEVICE_REMOVAL)
{
auto event_type_name = ev->event == RDMA_CM_EVENT_CONNECT_ERROR ? "RDMA_CM_EVENT_CONNECT_ERROR" : (
ev->event == RDMA_CM_EVENT_REJECTED ? "RDMA_CM_EVENT_REJECTED" : (
ev->event == RDMA_CM_EVENT_DISCONNECTED ? "RDMA_CM_EVENT_DISCONNECTED" : "RDMA_CM_EVENT_DEVICE_REMOVAL"));
auto cli_it = rdmacm_connections.find(ev->id);
if (cli_it != rdmacm_connections.end())
{
fprintf(stderr, "Received %s event for peer %d, closing connection\n",
event_type_name, cli_it->second->peer_fd);
stop_client(cli_it->second->peer_fd);
}
else if (rdmacm_connecting.find(ev->id) != rdmacm_connecting.end())
{
fprintf(stderr, "Received %s event for RDMA-CM OSD %ju connection\n",
event_type_name, rdmacm_connecting[ev->id]->peer_osd);
rdmacm_established(ev);
}
else
{
fprintf(stderr, "Received %s event for an unknown RDMA-CM connection 0x%jx - ignoring\n",
event_type_name, (uint64_t)ev->id);
}
}
else if (ev->event == RDMA_CM_EVENT_ADDR_RESOLVED || ev->event == RDMA_CM_EVENT_ADDR_ERROR)
{
rdmacm_address_resolved(ev);
}
else if (ev->event == RDMA_CM_EVENT_ROUTE_RESOLVED || ev->event == RDMA_CM_EVENT_ROUTE_ERROR)
{
rdmacm_route_resolved(ev);
}
else if (ev->event == RDMA_CM_EVENT_CONNECT_RESPONSE)
{
// Just OK
}
else if (ev->event == RDMA_CM_EVENT_UNREACHABLE || ev->event == RDMA_CM_EVENT_REJECTED)
{
// Handle error
rdmacm_established(ev);
}
else if (ev->event == RDMA_CM_EVENT_ESTABLISHED)
{
rdmacm_established(ev);
}
else if (ev->event == RDMA_CM_EVENT_ADDR_CHANGE || ev->event == RDMA_CM_EVENT_TIMEWAIT_EXIT)
{
// Do nothing
}
else
{
// Other events are unexpected
fprintf(stderr, "Unexpected RDMA-CM event type: %d\n", ev->event);
}
}
}
msgr_rdma_context_t* msgr_rdma_context_t::create_cm(ibv_context *ctx)
{
auto rdma_context = new msgr_rdma_context_t;
rdma_context->is_cm = true;
rdma_context->context = ctx;
rdma_context->pd = ibv_alloc_pd(ctx);
if (!rdma_context->pd)
{
fprintf(stderr, "Couldn't allocate RDMA protection domain\n");
delete rdma_context;
return NULL;
}
rdma_context->odp = false;
rdma_context->channel = ibv_create_comp_channel(rdma_context->context);
if (!rdma_context->channel)
{
fprintf(stderr, "Couldn't create RDMA completion channel\n");
delete rdma_context;
return NULL;
}
rdma_context->max_cqe = 4096;
rdma_context->cq = ibv_create_cq(rdma_context->context, rdma_context->max_cqe, NULL, rdma_context->channel, 0);
if (!rdma_context->cq)
{
fprintf(stderr, "Couldn't create RDMA completion queue\n");
delete rdma_context;
return NULL;
}
if (ibv_query_device_ex(rdma_context->context, NULL, &rdma_context->attrx))
{
fprintf(stderr, "Couldn't query RDMA device for its features\n");
delete rdma_context;
return NULL;
}
return rdma_context;
}
msgr_rdma_context_t* osd_messenger_t::rdmacm_get_context(ibv_context *verbs)
{
// Find the context by device
// We assume that RDMA_CM ev->id->verbs is always the same for the same device (but PD for example isn't)
msgr_rdma_context_t *rdma_context = NULL;
for (auto ctx: rdma_contexts)
{
if (ctx->context == verbs)
{
rdma_context = ctx;
break;
}
}
if (!rdma_context)
{
// Wrap into a new msgr_rdma_context_t
rdma_context = msgr_rdma_context_t::create_cm(verbs);
if (!rdma_context)
return NULL;
fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK);
tfd->set_fd_handler(rdma_context->channel->fd, false, [this, rdma_context](int notify_fd, int epoll_events)
{
handle_rdma_events(rdma_context);
});
handle_rdma_events(rdma_context);
rdma_contexts.push_back(rdma_context);
}
return rdma_context;
}
msgr_rdma_context_t* osd_messenger_t::rdmacm_create_qp(rdma_cm_id *cmid)
{
auto rdma_context = rdmacm_get_context(cmid->verbs);
if (!rdma_context)
{
return NULL;
}
rdma_context->reserve_cqe(rdma_max_send+rdma_max_recv);
auto max_sge = rdma_max_sge > rdma_context->attrx.orig_attr.max_sge
? rdma_context->attrx.orig_attr.max_sge : rdma_max_sge;
ibv_qp_init_attr init_attr = {
.send_cq = rdma_context->cq,
.recv_cq = rdma_context->cq,
.cap = {
.max_send_wr = (uint32_t)rdma_max_send,
.max_recv_wr = (uint32_t)rdma_max_recv,
.max_send_sge = (uint32_t)max_sge,
.max_recv_sge = (uint32_t)max_sge,
},
.qp_type = IBV_QPT_RC,
};
int r = rdma_create_qp(cmid, rdma_context->pd, &init_attr);
if (r != 0)
{
fprintf(stderr, "Failed to create a queue pair via RDMA-CM: %s (code %d)\n", strerror(errno), errno);
rdma_context->reserve_cqe(-rdma_max_send-rdma_max_recv);
return NULL;
}
return rdma_context;
}
void osd_messenger_t::rdmacm_accept(rdma_cm_event *ev)
{
// Make a fake FD (FIXME: do not use FDs for identifying clients!)
int fake_fd = socket(AF_INET, SOCK_STREAM, 0);
if (fake_fd < 0)
{
fprintf(stderr, "Failed to allocate a fake socket for RDMA-CM client: %s (code %d)\n", strerror(errno), errno);
rdma_destroy_id(ev->id);
return;
}
auto rdma_context = rdmacm_create_qp(ev->id);
if (!rdma_context)
{
rdma_destroy_id(ev->id);
return;
}
// We don't need private_data, RDMA_READ or ATOMIC so use default 1
rdma_conn_param conn_params = {
.responder_resources = 1,
.initiator_depth = 1,
.retry_count = 7,
.rnr_retry_count = 7,
};
if (rdma_accept(ev->id, &conn_params) != 0)
{
fprintf(stderr, "Failed to accept RDMA-CM connection: %s (code %d)\n", strerror(errno), errno);
rdma_context->reserve_cqe(-rdma_max_send-rdma_max_recv);
rdma_destroy_qp(ev->id);
rdma_destroy_id(ev->id);
return;
}
rdma_context->cm_refs++;
// Wrap into a new msgr_rdma_connection_t
msgr_rdma_connection_t *conn = new msgr_rdma_connection_t;
conn->ctx = rdma_context;
conn->max_send = rdma_max_send;
conn->max_recv = rdma_max_recv;
conn->max_sge = rdma_max_sge > rdma_context->attrx.orig_attr.max_sge
? rdma_context->attrx.orig_attr.max_sge : rdma_max_sge;
conn->max_msg = rdma_max_msg;
conn->cmid = ev->id;
conn->qp = ev->id->qp;
auto cl = new osd_client_t();
cl->peer_fd = fake_fd;
cl->peer_state = PEER_RDMA;
cl->peer_addr = *(sockaddr_storage*)rdma_get_peer_addr(ev->id);
cl->in_buf = malloc_or_die(receive_buffer_size);
cl->rdma_conn = conn;
clients[fake_fd] = cl;
rdmacm_connections[ev->id] = cl;
// Add initial receive request(s)
try_recv_rdma(cl);
fprintf(stderr, "[OSD %ju] new client %d: connection from %s via RDMA-CM\n", this->osd_num, fake_fd,
addr_to_string(cl->peer_addr).c_str());
}
void osd_messenger_t::rdmacm_on_connect_peer_error(rdma_cm_id *cmid, int res)
{
auto conn = rdmacm_connecting.at(cmid);
auto addr = conn->addr;
auto peer_port = conn->peer_port;
auto peer_osd = conn->peer_osd;
if (conn->timeout_id >= 0)
tfd->clear_timer(conn->timeout_id);
if (conn->peer_fd >= 0)
close(conn->peer_fd);
if (conn->rdma_context)
conn->rdma_context->reserve_cqe(-rdma_max_send-rdma_max_recv);
if (conn->cmid)
{
if (conn->cmid->qp)
rdma_destroy_qp(conn->cmid);
rdma_destroy_id(conn->cmid);
}
rdmacm_connecting.erase(cmid);
delete conn;
if (!disable_tcp)
{
// Fall back to TCP instead of just reporting the error to on_connect_peer()
try_connect_peer_tcp(peer_osd, addr.c_str(), peer_port);
}
else
{
// TCP is disabled
on_connect_peer(peer_osd, res == 0 ? -EINVAL : (res > 0 ? -res : res));
}
}
void osd_messenger_t::rdmacm_try_connect_peer(uint64_t peer_osd, const std::string & addr, int peer_port)
{
struct sockaddr_storage sa = {};
if (!string_to_addr(addr, false, peer_port, &sa))
{
fprintf(stderr, "Address %s is invalid\n", addr.c_str());
on_connect_peer(peer_osd, -EINVAL);
return;
}
rdma_cm_id *cmid = NULL;
if (rdma_create_id(rdmacm_evch, &cmid, NULL, RDMA_PS_TCP) != 0)
{
int res = -errno;
fprintf(stderr, "Failed to create RDMA-CM ID: %s (code %d), using TCP\n", strerror(errno), errno);
if (!disable_tcp)
try_connect_peer_tcp(peer_osd, addr.c_str(), peer_port);
else
on_connect_peer(peer_osd, res);
return;
}
// Make a fake FD (FIXME: do not use FDs for identifying clients!)
int fake_fd = socket(AF_INET, SOCK_STREAM, 0);
if (fake_fd < 0)
{
int res = -errno;
rdma_destroy_id(cmid);
// Can't create socket, pointless to try TCP
on_connect_peer(peer_osd, res);
return;
}
auto conn = new rdmacm_connecting_t;
rdmacm_connecting[cmid] = conn;
conn->cmid = cmid;
conn->peer_fd = fake_fd;
conn->peer_osd = peer_osd;
conn->addr = addr;
conn->parsed_addr = sa;
conn->peer_port = peer_port;
conn->timeout_ms = peer_connect_timeout*1000;
conn->timeout_id = -1;
if (peer_connect_timeout > 0)
{
conn->timeout_id = tfd->set_timer(1000*peer_connect_timeout, false, [this, cmid](int timer_id)
{
auto conn = rdmacm_connecting.at(cmid);
conn->timeout_id = -1;
fprintf(stderr, "RDMA-CM connection to %s timed out\n", conn->addr.c_str());
rdmacm_on_connect_peer_error(cmid, -EPIPE);
return;
});
}
if (rdma_resolve_addr(cmid, NULL, (sockaddr*)&conn->parsed_addr, conn->timeout_ms) != 0)
{
auto res = -errno;
fprintf(stderr, "Failed to resolve address %s via RDMA-CM: %s (code %d)\n", addr.c_str(), strerror(errno), errno);
rdmacm_on_connect_peer_error(cmid, res);
return;
}
}
void osd_messenger_t::rdmacm_address_resolved(rdma_cm_event *ev)
{
auto cmid = ev->id;
auto conn_it = rdmacm_connecting.find(cmid);
if (conn_it == rdmacm_connecting.end())
{
// Silently ignore unknown IDs
return;
}
auto conn = conn_it->second;
if (ev->event != RDMA_CM_EVENT_ADDR_RESOLVED || ev->status != 0)
{
fprintf(stderr, "Failed to resolve address %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(),
ev->status > 0 ? "unknown error" : strerror(-ev->status), ev->status);
rdmacm_on_connect_peer_error(cmid, ev->status);
return;
}
auto rdma_context = rdmacm_create_qp(cmid);
if (!rdma_context)
{
rdmacm_on_connect_peer_error(cmid, -EIO);
return;
}
conn->rdma_context = rdma_context;
if (rdma_resolve_route(cmid, conn->timeout_ms) != 0)
{
int res = -errno;
fprintf(stderr, "Failed to resolve route to %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), strerror(errno), errno);
rdmacm_on_connect_peer_error(cmid, res);
return;
}
}
void osd_messenger_t::rdmacm_route_resolved(rdma_cm_event *ev)
{
auto cmid = ev->id;
auto conn_it = rdmacm_connecting.find(cmid);
if (conn_it == rdmacm_connecting.end())
{
// Silently ignore unknown IDs
return;
}
auto conn = conn_it->second;
if (ev->event != RDMA_CM_EVENT_ROUTE_RESOLVED || ev->status != 0)
{
fprintf(stderr, "Failed to resolve route to %s via RDMA-CM: %s (code %d)\n", conn->addr.c_str(),
ev->status > 0 ? "unknown error" : strerror(-ev->status), ev->status);
rdmacm_on_connect_peer_error(cmid, ev->status);
return;
}
// We don't need private_data, RDMA_READ or ATOMIC so use default 1
rdma_conn_param conn_params = {
.responder_resources = 1,
.initiator_depth = 1,
.retry_count = 7,
.rnr_retry_count = 7,
};
if (rdma_connect(cmid, &conn_params) != 0)
{
int res = -errno;
fprintf(stderr, "Failed to connect to %s:%d via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), conn->peer_port, strerror(errno), errno);
rdmacm_on_connect_peer_error(cmid, res);
return;
}
}
void osd_messenger_t::rdmacm_established(rdma_cm_event *ev)
{
auto cmid = ev->id;
auto conn_it = rdmacm_connecting.find(cmid);
if (conn_it == rdmacm_connecting.end())
{
// Silently ignore unknown IDs
return;
}
auto conn = conn_it->second;
auto peer_osd = conn->peer_osd;
if (ev->event != RDMA_CM_EVENT_ESTABLISHED || ev->status != 0)
{
fprintf(stderr, "Failed to connect to %s:%d via RDMA-CM: %s (code %d)\n", conn->addr.c_str(), conn->peer_port,
ev->status > 0 ? "unknown error" : strerror(-ev->status), ev->status);
rdmacm_on_connect_peer_error(cmid, ev->status);
return;
}
// Wrap into a new msgr_rdma_connection_t
msgr_rdma_connection_t *rc = new msgr_rdma_connection_t;
rc->ctx = conn->rdma_context;
rc->ctx->cm_refs++;
rc->max_send = rdma_max_send;
rc->max_recv = rdma_max_recv;
rc->max_sge = rdma_max_sge > rc->ctx->attrx.orig_attr.max_sge
? rc->ctx->attrx.orig_attr.max_sge : rdma_max_sge;
rc->max_msg = rdma_max_msg;
rc->cmid = conn->cmid;
rc->qp = conn->cmid->qp;
// And an osd_client_t
auto cl = new osd_client_t();
cl->peer_addr = conn->parsed_addr;
cl->peer_port = conn->peer_port;
cl->peer_fd = conn->peer_fd;
cl->peer_state = PEER_RDMA;
cl->connect_timeout_id = -1;
cl->osd_num = peer_osd;
cl->in_buf = malloc_or_die(receive_buffer_size);
cl->rdma_conn = rc;
clients[conn->peer_fd] = cl;
if (conn->timeout_id >= 0)
tfd->clear_timer(conn->timeout_id);
delete conn;
rdmacm_connecting.erase(cmid);
rdmacm_connections[cmid] = cl;
if (log_level > 0)
fprintf(stderr, "Successfully connected with OSD %ju using RDMA-CM\n", peer_osd);
// Add initial receive request(s)
try_recv_rdma(cl);
osd_peer_fds[peer_osd] = cl->peer_fd;
on_connect_peer(peer_osd, cl->peer_fd);
}

View File

@ -14,6 +14,7 @@ target_link_libraries(vitastor-osd
Jerasure
${ISAL_LIBRARIES}
${IBVERBS_LIBRARIES}
${RDMACM_LIBRARIES}
)
# osd_rmw_test

View File

@ -120,6 +120,11 @@ osd_t::~osd_t()
delete epmgr;
if (bs)
delete bs;
#ifdef WITH_RDMACM
for (rdma_cm_id *listener: rdmacm_listeners)
msgr.rdmacm_destroy_listener(listener);
rdmacm_listeners.clear();
#endif
for (auto listen_fd: listen_fds)
close(listen_fd);
listen_fds.clear();
@ -167,6 +172,11 @@ void osd_t::parse_config(bool init)
bind_port = config["bind_port"].uint64_value();
if (bind_port <= 0 || bind_port > 65535)
bind_port = 0;
#ifdef WITH_RDMACM
// Use RDMA CM? (required for iWARP and may be useful for IB)
this->use_rdmacm = config["use_rdmacm"].bool_value() || config["use_rdmacm"].uint64_value() != 0;
this->disable_tcp = this->use_rdmacm && (config["disable_tcp"].bool_value() || config["disable_tcp"].uint64_value() != 0);
#endif
// OSD configuration
etcd_report_interval = config["etcd_report_interval"].uint64_value();
if (etcd_report_interval <= 0)
@ -335,16 +345,30 @@ void osd_t::bind_socket()
{
bind_addresses.push_back("0.0.0.0");
}
for (auto & bind_address: bind_addresses)
if (!disable_tcp)
{
int listen_fd = create_and_bind_socket(bind_address, listening_port ? listening_port : bind_port, listen_backlog, &listening_port);
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
for (auto & bind_address: bind_addresses)
{
msgr.accept_connections(fd);
});
listen_fds.push_back(listen_fd);
int listen_fd = create_and_bind_socket(bind_address, listening_port ? listening_port : bind_port, listen_backlog, &listening_port);
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
{
msgr.accept_connections(fd);
});
listen_fds.push_back(listen_fd);
}
}
#ifdef WITH_RDMACM
if (use_rdmacm)
{
for (auto & bind_address: bind_addresses)
{
auto listener = msgr.rdmacm_listen(bind_address, listening_port, &listening_port, log_level);
if (listener)
rdmacm_listeners.push_back(listener);
}
}
#endif
}
bool osd_t::shutdown()

View File

@ -108,6 +108,8 @@ class osd_t
bool no_scrub = false;
bool allow_net_split = false;
int bind_port, listen_backlog = 128;
bool use_rdmacm = false;
bool disable_tcp = false;
// FIXME: Implement client queue depth limit
int client_queue_depth = 128;
bool allow_test_ops = false;
@ -201,6 +203,9 @@ class osd_t
int listening_port = 0;
std::vector<std::string> bind_addresses;
std::vector<int> listen_fds;
#ifdef WITH_RDMACM
std::vector<rdma_cm_id *> rdmacm_listeners;
#endif
ring_consumer_t consumer;
// op statistics

View File

@ -172,6 +172,10 @@ json11::Json osd_t::get_osd_state()
st["host"] = std::string(hostname.data(), hostname.size());
st["version"] = VITASTOR_VERSION;
st["port"] = listening_port;
#ifdef WITH_RDMACM
if (rdmacm_listeners.size())
st["rdmacm"] = true;
#endif
st["primary_enabled"] = run_primary;
st["blockstore_enabled"] = bs ? true : false;
return st;

View File

@ -25,6 +25,7 @@ target_link_libraries(stub_uring_osd
vitastor_common
${LIBURING_LIBRARIES}
${IBVERBS_LIBRARIES}
${RDMACM_LIBRARIES}
tcmalloc_minimal
)

View File

@ -93,6 +93,13 @@ bool cidr6_match(const in6_addr &address, const in6_addr &network, uint8_t bits)
return true;
}
bool cidr_sockaddr_match(const sockaddr_storage &addr, const addr_mask_t &mask)
{
return mask.family == addr.ss_family && (mask.family == AF_INET
? cidr_match(((sockaddr_in*)&addr)->sin_addr, mask.ipv4, mask.bits)
: cidr6_match(((sockaddr_in6*)&addr)->sin6_addr, mask.ipv6, mask.bits));
}
addr_mask_t cidr_parse(std::string mask)
{
unsigned bits = 255;

View File

@ -18,5 +18,6 @@ std::string addr_to_string(const sockaddr_storage &addr);
addr_mask_t cidr_parse(std::string mask);
bool cidr_match(const in_addr &address, const in_addr &network, uint8_t bits);
bool cidr6_match(const in6_addr &address, const in6_addr &network, uint8_t bits);
bool cidr_sockaddr_match(const sockaddr_storage &addr, const addr_mask_t &mask);
std::vector<std::string> getifaddr_list(const std::vector<addr_mask_t> & masks = std::vector<addr_mask_t>(), bool include_v6 = false);
int create_and_bind_socket(std::string bind_address, int bind_port, int listen_backlog, int *listening_port);