Compare commits

...

2 Commits

Author SHA1 Message Date
Vitaliy Filippov ce73402bb9 trace writes 2025-03-29 12:25:43 +03:00
Vitaliy Filippov 191eafd72a WIP Support multiple RDMA networks 2025-03-29 12:16:08 +03:00
7 changed files with 229 additions and 225 deletions

View File

@ -117,16 +117,15 @@ void msgr_iothread_t::run()
void osd_messenger_t::init() void osd_messenger_t::init()
{ {
// FIXME: Support multiple RDMA networks?!
#ifdef WITH_RDMA #ifdef WITH_RDMA
if (use_rdma) if (use_rdma)
{ {
rdma_context = msgr_rdma_context_t::create( rdma_contexts = msgr_rdma_context_t::create_all(
osd_num && osd_cluster_network_masks.size() ? osd_cluster_network_masks : osd_network_masks, osd_num && osd_cluster_network_masks.size() ? osd_cluster_network_masks : osd_network_masks,
rdma_device != "" ? rdma_device.c_str() : NULL, rdma_device != "" ? rdma_device.c_str() : NULL,
rdma_port_num, rdma_gid_index, rdma_mtu, rdma_odp, log_level rdma_port_num, rdma_gid_index, rdma_mtu, rdma_odp, log_level
); );
if (!rdma_context) if (!rdma_contexts.size())
{ {
if (force_rdma) if (force_rdma)
{ {
@ -138,15 +137,16 @@ void osd_messenger_t::init()
} }
else else
{ {
rdma_max_sge = rdma_max_sge < rdma_context->attrx.orig_attr.max_sge
? rdma_max_sge : rdma_context->attrx.orig_attr.max_sge;
fprintf(stderr, "[OSD %ju] RDMA initialized successfully\n", osd_num); fprintf(stderr, "[OSD %ju] RDMA initialized successfully\n", osd_num);
fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK); for (msgr_rdma_context_t* rdma_context: rdma_contexts)
tfd->set_fd_handler(rdma_context->channel->fd, false, [this](int notify_fd, int epoll_events)
{ {
handle_rdma_events(); fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK);
}); tfd->set_fd_handler(rdma_context->channel->fd, false, [this, rdma_context](int notify_fd, int epoll_events)
handle_rdma_events(); {
handle_rdma_events(rdma_context);
});
handle_rdma_events(rdma_context);
}
} }
} }
#endif #endif
@ -254,10 +254,11 @@ osd_messenger_t::~osd_messenger_t()
iothreads.clear(); iothreads.clear();
} }
#ifdef WITH_RDMA #ifdef WITH_RDMA
if (rdma_context) for (auto rdma_context: rdma_contexts)
{ {
delete rdma_context; delete rdma_context;
} }
rdma_contexts.clear();
#endif #endif
} }
@ -275,8 +276,6 @@ void osd_messenger_t::parse_config(const json11::Json & config)
} }
this->rdma_device = config["rdma_device"].string_value(); this->rdma_device = config["rdma_device"].string_value();
this->rdma_port_num = (uint8_t)config["rdma_port_num"].uint64_value(); this->rdma_port_num = (uint8_t)config["rdma_port_num"].uint64_value();
if (!this->rdma_port_num)
this->rdma_port_num = 1;
if (!config["rdma_gid_index"].is_null()) if (!config["rdma_gid_index"].is_null())
this->rdma_gid_index = (uint8_t)config["rdma_gid_index"].uint64_value(); this->rdma_gid_index = (uint8_t)config["rdma_gid_index"].uint64_value();
this->rdma_mtu = (uint32_t)config["rdma_mtu"].uint64_value(); this->rdma_mtu = (uint32_t)config["rdma_mtu"].uint64_value();
@ -368,9 +367,7 @@ void osd_messenger_t::connect_peer(uint64_t peer_osd, json11::Json peer_state)
bool is_cluster = false; bool is_cluster = false;
for (auto & mask: osd_cluster_network_masks) for (auto & mask: osd_cluster_network_masks)
{ {
if (mask.family == addr.ss_family && (mask.family == AF_INET if (cidr_sockaddr_match(addr, mask))
? cidr_match(*(in_addr*)&addr, mask.ipv4, mask.bits)
: cidr6_match(*(in6_addr*)&addr, mask.ipv6, mask.bits)))
{ {
is_cluster = true; is_cluster = true;
break; break;
@ -586,20 +583,30 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
}, },
}; };
#ifdef WITH_RDMA #ifdef WITH_RDMA
if (rdma_context) if (rdma_contexts.size())
{ {
cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg); // Choose the right context for the selected network
if (cl->rdma_conn) msgr_rdma_context_t *selected_ctx = choose_rdma_context(cl);
if (!selected_ctx)
{ {
json11::Json payload = json11::Json::object { if (log_level > 0)
{ "connect_rdma", cl->rdma_conn->addr.to_string() }, fprintf(stderr, "No RDMA context for OSD %lu connection (peer %d), using only TCP\n", cl->osd_num, cl->peer_fd);
{ "rdma_max_msg", cl->rdma_conn->max_msg }, }
}; else
std::string payload_str = payload.dump(); {
op->req.show_conf.json_len = payload_str.size(); cl->rdma_conn = msgr_rdma_connection_t::create(selected_ctx, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg);
op->buf = malloc_or_die(payload_str.size()); if (cl->rdma_conn)
op->iov.push_back(op->buf, payload_str.size()); {
memcpy(op->buf, payload_str.c_str(), payload_str.size()); json11::Json payload = json11::Json::object {
{ "connect_rdma", cl->rdma_conn->addr.to_string() },
{ "rdma_max_msg", cl->rdma_conn->max_msg },
};
std::string payload_str = payload.dump();
op->req.show_conf.json_len = payload_str.size();
op->buf = malloc_or_die(payload_str.size());
op->iov.push_back(op->buf, payload_str.size());
memcpy(op->buf, payload_str.c_str(), payload_str.size());
}
} }
} }
#endif #endif
@ -729,9 +736,24 @@ void osd_messenger_t::accept_connections(int listen_fd)
} }
#ifdef WITH_RDMA #ifdef WITH_RDMA
msgr_rdma_context_t* osd_messenger_t::choose_rdma_context(osd_client_t *cl)
{
// Choose the right context for the selected network
msgr_rdma_context_t *selected_ctx = NULL;
for (auto rdma_ctx: rdma_contexts)
{
if (!rdma_ctx->net_mask.family && !selected_ctx ||
rdma_ctx->net_mask.family && cidr_sockaddr_match(cl->peer_addr, rdma_ctx->net_mask))
{
selected_ctx = rdma_ctx;
}
}
return selected_ctx;
}
bool osd_messenger_t::is_rdma_enabled() bool osd_messenger_t::is_rdma_enabled()
{ {
return rdma_context != NULL; return rdma_contexts.size() > 0;
} }
#endif #endif

View File

@ -170,9 +170,10 @@ protected:
bool use_rdma = true; bool use_rdma = true;
bool force_rdma = false; bool force_rdma = false;
std::string rdma_device; std::string rdma_device;
uint64_t rdma_port_num = 1, rdma_mtu = 0; uint64_t rdma_port_num = 1;
int rdma_mtu = 0;
int rdma_gid_index = -1; int rdma_gid_index = -1;
msgr_rdma_context_t *rdma_context = NULL; std::vector<msgr_rdma_context_t *> rdma_contexts;
uint64_t rdma_max_sge = 0, rdma_max_send = 0, rdma_max_recv = 0; uint64_t rdma_max_sge = 0, rdma_max_send = 0, rdma_max_recv = 0;
uint64_t rdma_max_msg = 0; uint64_t rdma_max_msg = 0;
bool rdma_odp = false; bool rdma_odp = false;
@ -256,6 +257,7 @@ protected:
void try_send_rdma_odp(osd_client_t *cl); void try_send_rdma_odp(osd_client_t *cl);
void try_send_rdma_nodp(osd_client_t *cl); void try_send_rdma_nodp(osd_client_t *cl);
bool try_recv_rdma(osd_client_t *cl); bool try_recv_rdma(osd_client_t *cl);
void handle_rdma_events(); void handle_rdma_events(msgr_rdma_context_t *rdma_context);
msgr_rdma_context_t* choose_rdma_context(osd_client_t *cl);
#endif #endif
}; };

View File

@ -76,21 +76,21 @@ static bool is_ipv4_gid(ibv_gid_entry *gidx)
((uint32_t*)gidx->gid.raw)[2] == 0xffff0000); ((uint32_t*)gidx->gid.raw)[2] == 0xffff0000);
} }
static bool match_gid(ibv_gid_entry *gidx, const addr_mask_t *networks, int nnet) static int match_gid(ibv_gid_entry *gidx, const addr_mask_t *networks, int nnet)
{ {
if (gidx->gid_type != IBV_GID_TYPE_ROCE_V1 && if (gidx->gid_type != IBV_GID_TYPE_ROCE_V1 &&
gidx->gid_type != IBV_GID_TYPE_ROCE_V2 || gidx->gid_type != IBV_GID_TYPE_ROCE_V2 ||
((uint64_t*)gidx->gid.raw)[0] == 0 && ((uint64_t*)gidx->gid.raw)[0] == 0 &&
((uint64_t*)gidx->gid.raw)[1] == 0) ((uint64_t*)gidx->gid.raw)[1] == 0)
{ {
return false; return -1;
} }
if (is_ipv4_gid(gidx)) if (is_ipv4_gid(gidx))
{ {
for (int i = 0; i < nnet; i++) for (int i = 0; i < nnet; i++)
{ {
if (networks[i].family == AF_INET && cidr_match(*(in_addr*)(gidx->gid.raw+12), networks[i].ipv4, networks[i].bits)) if (networks[i].family == AF_INET && cidr_match(*(in_addr*)(gidx->gid.raw+12), networks[i].ipv4, networks[i].bits))
return true; return i;
} }
} }
else else
@ -98,119 +98,67 @@ static bool match_gid(ibv_gid_entry *gidx, const addr_mask_t *networks, int nnet
for (int i = 0; i < nnet; i++) for (int i = 0; i < nnet; i++)
{ {
if (networks[i].family == AF_INET6 && cidr6_match(*(in6_addr*)gidx->gid.raw, networks[i].ipv6, networks[i].bits)) if (networks[i].family == AF_INET6 && cidr6_match(*(in6_addr*)gidx->gid.raw, networks[i].ipv6, networks[i].bits))
return true; return i;
} }
} }
return false; return -1;
} }
struct matched_dev static void log_rdma_dev_port_gid(ibv_device *dev, int ib_port, int gid_index, int mtu, ibv_gid_entry & gidx)
{
int dev = -1;
int port = -1;
int gid = -1;
bool rocev2 = false;
};
static void log_rdma_dev_port_gid(ibv_device *dev, int ib_port, int gid_index, ibv_gid_entry & gidx)
{ {
bool is4 = ((uint64_t*)gidx.gid.raw)[0] == 0 && ((uint32_t*)gidx.gid.raw)[2] == 0xffff0000; bool is4 = ((uint64_t*)gidx.gid.raw)[0] == 0 && ((uint32_t*)gidx.gid.raw)[2] == 0xffff0000;
char buf[256]; char buf[256];
inet_ntop(is4 ? AF_INET : AF_INET6, is4 ? gidx.gid.raw+12 : gidx.gid.raw, buf, sizeof(buf)); inet_ntop(is4 ? AF_INET : AF_INET6, is4 ? gidx.gid.raw+12 : gidx.gid.raw, buf, sizeof(buf));
fprintf( fprintf(
stderr, "Auto-selected RDMA device %s port %d GID %d - ROCEv%d IPv%d %s\n", stderr, "Selected RDMA device %s port %d GID %d - ROCEv%d IPv%d %s, MTU %d\n",
ibv_get_device_name(dev), ib_port, gid_index, ibv_get_device_name(dev), ib_port, gid_index,
gidx.gid_type == IBV_GID_TYPE_ROCE_V2 ? 2 : 1, is4 ? 4 : 6, buf gidx.gid_type == IBV_GID_TYPE_ROCE_V2 ? 2 : 1, is4 ? 4 : 6, buf, mtu
); );
} }
static matched_dev match_device(ibv_device **dev_list, const addr_mask_t *networks, int nnet, int log_level) static int match_port_gid(const std::vector<addr_mask_t> & osd_network_masks, ibv_context *context,
int port_num, int gid_count, int log_level, ibv_gid_entry *best_gidx, int *net_num)
{ {
matched_dev best; // Try to find a port with matching address
ibv_device_attr attr; int best_gid_idx = -1, res = 0;
ibv_port_attr portinfo; for (int k = 0; k < gid_count; k++)
ibv_gid_entry best_gidx;
int res;
bool have_non_roce = false, have_roce = false;
for (int i = 0; dev_list[i]; ++i)
{ {
auto dev = dev_list[i]; ibv_gid_entry gidx;
ibv_context *context = ibv_open_device(dev_list[i]); if ((res = ibv_query_gid_ex(context, port_num, k, &gidx, 0)) != 0)
if ((res = ibv_query_device(context, &attr)) != 0)
{ {
fprintf(stderr, "Couldn't query RDMA device %s for its features: %s\n", ibv_get_device_name(dev_list[i]), strerror(res)); if (res != ENODATA)
goto cleanup;
}
for (int j = 1; j <= attr.phys_port_cnt; j++)
{
// Try to find a port with matching address
if ((res = ibv_query_port(context, j, &portinfo)) != 0)
{ {
fprintf(stderr, "Couldn't get RDMA device %s port %d info: %s\n", ibv_get_device_name(dev), j, strerror(res)); fprintf(stderr, "Couldn't read RDMA device %s GID index %d: %s\n", ibv_get_device_name(context->device), k, strerror(res));
goto cleanup; continue;
} }
for (int k = 0; k < portinfo.gid_tbl_len; k++) else
break;
}
if ((res = match_gid(&gidx, osd_network_masks.data(), osd_network_masks.size())) >= 0)
{
// Prefer RoCEv2
if (best_gid_idx < 0 || best_gidx->gid_type != IBV_GID_TYPE_ROCE_V2 && gidx.gid_type == IBV_GID_TYPE_ROCE_V2)
{ {
ibv_gid_entry gidx; best_gid_idx = k;
if ((res = ibv_query_gid_ex(context, j, k, &gidx, 0)) != 0) *best_gidx = gidx;
{ *net_num = res;
if (res != ENODATA)
{
fprintf(stderr, "Couldn't read RDMA device %s GID index %d: %s\n", ibv_get_device_name(dev), k, strerror(res));
goto cleanup;
}
else
break;
}
if (gidx.gid_type != IBV_GID_TYPE_ROCE_V1 &&
gidx.gid_type != IBV_GID_TYPE_ROCE_V2)
have_non_roce = true;
else
have_roce = true;
if (match_gid(&gidx, networks, nnet))
{
// Prefer RoCEv2
if (!best.rocev2)
{
best.dev = i;
best.port = j;
best.gid = k;
best.rocev2 = (gidx.gid_type == IBV_GID_TYPE_ROCE_V2);
best_gidx = gidx;
}
}
} }
} }
cleanup:
ibv_close_device(context);
if (best.rocev2)
{
break;
}
} }
if (best.dev >= 0 && log_level > 0) return best_gid_idx;
{
log_rdma_dev_port_gid(dev_list[best.dev], best.port, best.gid, best_gidx);
}
if (best.dev < 0 && have_non_roce && !have_roce)
{
best.dev = -2;
}
return best;
} }
#endif #endif
msgr_rdma_context_t *msgr_rdma_context_t::create(const std::vector<addr_mask_t> & osd_network_masks, const char *ib_devname, uint8_t ib_port, int gid_index, uint32_t mtu, bool odp, int log_level) std::vector<msgr_rdma_context_t*> msgr_rdma_context_t::create_all(const std::vector<addr_mask_t> & osd_network_masks,
const char *sel_dev_name, int sel_port_num, int sel_gid_index, uint32_t sel_mtu, bool odp, int log_level)
{ {
int res; int res;
std::vector<msgr_rdma_context_t*> ret;
ibv_device **raw_dev_list = NULL;
ibv_device **dev_list = NULL; ibv_device **dev_list = NULL;
msgr_rdma_context_t *ctx = new msgr_rdma_context_t(); ibv_device *single_list[2] = {};
ctx->mtu = mtu;
timespec tv; raw_dev_list = dev_list = ibv_get_device_list(NULL);
clock_gettime(CLOCK_REALTIME, &tv);
srand48(tv.tv_sec*1000000000 + tv.tv_nsec);
dev_list = ibv_get_device_list(NULL);
if (!dev_list || !*dev_list) if (!dev_list || !*dev_list)
{ {
if (errno == -ENOSYS || errno == ENOSYS) if (errno == -ENOSYS || errno == ENOSYS)
@ -227,116 +175,126 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const std::vector<addr_mask_t>
fprintf(stderr, "Failed to get RDMA device list: %s\n", strerror(errno)); fprintf(stderr, "Failed to get RDMA device list: %s\n", strerror(errno));
goto cleanup; goto cleanup;
} }
if (ib_devname)
if (sel_dev_name)
{ {
int i; int i;
for (i = 0; dev_list[i]; ++i) for (i = 0; dev_list[i]; ++i)
if (!strcmp(ibv_get_device_name(dev_list[i]), ib_devname)) if (!strcmp(ibv_get_device_name(dev_list[i]), sel_dev_name))
break; break;
ctx->dev = dev_list[i]; if (!dev_list[i])
if (!ctx->dev)
{ {
fprintf(stderr, "RDMA device %s not found\n", ib_devname); fprintf(stderr, "RDMA device %s not found\n", sel_dev_name);
goto cleanup; goto cleanup;
} }
} single_list[0] = dev_list[i];
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT dev_list = single_list;
else if (osd_network_masks.size())
{
auto best = match_device(dev_list, osd_network_masks.data(), osd_network_masks.size(), log_level);
if (best.dev == -2)
{
best.dev = 0;
if (log_level > 0)
fprintf(stderr, "No RoCE devices found, using first available RDMA device %s\n", ibv_get_device_name(*dev_list));
}
else if (best.dev < 0)
{
if (log_level > 0)
fprintf(stderr, "RDMA device matching osd_network is not found, disabling RDMA\n");
goto cleanup;
}
else
{
ib_port = best.port;
gid_index = best.gid;
}
ctx->dev = dev_list[best.dev];
}
#endif
else
{
ctx->dev = *dev_list;
} }
ctx->context = ibv_open_device(ctx->dev); for (int i = 0; dev_list[i]; ++i)
if (!ctx->context)
{ {
fprintf(stderr, "Couldn't get RDMA context for %s\n", ibv_get_device_name(ctx->dev)); auto dev = dev_list[i];
goto cleanup; ibv_context *context = ibv_open_device(dev);
} if (!context)
ctx->ib_port = ib_port;
if ((res = ibv_query_port(ctx->context, ib_port, &ctx->portinfo)) != 0)
{
fprintf(stderr, "Couldn't get RDMA device %s port %d info: %s\n", ibv_get_device_name(ctx->dev), ib_port, strerror(res));
goto cleanup;
}
ctx->my_lid = ctx->portinfo.lid;
if (ctx->portinfo.link_layer != IBV_LINK_LAYER_ETHERNET && !ctx->my_lid)
{
fprintf(stderr, "RDMA device %s must have local LID because it's not Ethernet, but LID is zero\n", ibv_get_device_name(ctx->dev));
goto cleanup;
}
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT
if (gid_index != -1)
#endif
{
ctx->gid_index = gid_index < 0 ? 0 : gid_index;
if (ibv_query_gid(ctx->context, ib_port, gid_index, &ctx->my_gid))
{ {
fprintf(stderr, "Couldn't read RDMA device %s GID index %d\n", ibv_get_device_name(ctx->dev), gid_index); fprintf(stderr, "Couldn't get RDMA context for %s\n", ibv_get_device_name(dev));
goto cleanup; continue;
} }
} ibv_device_attr attr;
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT if ((res = ibv_query_device(context, &attr)) != 0)
else
{
// Auto-guess GID
ibv_gid_entry best_gidx;
for (int k = 0; k < ctx->portinfo.gid_tbl_len; k++)
{ {
ibv_gid_entry gidx; fprintf(stderr, "Couldn't query RDMA device %s for its features: %s\n", ibv_get_device_name(dev), strerror(res));
if (ibv_query_gid_ex(ctx->context, ib_port, k, &gidx, 0) != 0) goto cleanup_dev;
}
if (sel_port_num && sel_port_num > attr.phys_port_cnt)
{
fprintf(stderr, "RDMA device %s port %d does not exist\n", ibv_get_device_name(dev), sel_port_num);
goto cleanup_dev;
}
for (int port_num = (sel_port_num ? sel_port_num : 1); port_num <= (sel_port_num ? sel_port_num : attr.phys_port_cnt); port_num++)
{
ibv_port_attr portinfo;
if ((res = ibv_query_port(context, port_num, &portinfo)) != 0)
{ {
fprintf(stderr, "Couldn't read RDMA device %s GID index %d\n", ibv_get_device_name(ctx->dev), k); fprintf(stderr, "Couldn't get RDMA device %s port %d info: %s\n", ibv_get_device_name(dev), port_num, strerror(res));
goto cleanup; continue;
} }
// Skip empty GID if (portinfo.state != IBV_PORT_ACTIVE)
if (((uint64_t*)gidx.gid.raw)[0] == 0 &&
((uint64_t*)gidx.gid.raw)[1] == 0)
{ {
continue; continue;
} }
// Prefer IPv4 RoCEv2 -> IPv6 RoCEv2 -> IPv4 RoCEv1 -> IPv6 RoCEv1 -> IB if (sel_gid_index >= (int)portinfo.gid_tbl_len)
if (gid_index == -1 ||
gidx.gid_type == IBV_GID_TYPE_ROCE_V2 && best_gidx.gid_type != IBV_GID_TYPE_ROCE_V2 ||
gidx.gid_type == IBV_GID_TYPE_ROCE_V1 && best_gidx.gid_type == IBV_GID_TYPE_IB ||
gidx.gid_type == best_gidx.gid_type && is_ipv4_gid(&gidx))
{ {
gid_index = k; fprintf(stderr, "RDMA device %s port %d GID %d does not exist\n", ibv_get_device_name(dev), port_num, sel_gid_index);
best_gidx = gidx; continue;
}
uint32_t port_mtu = sel_mtu ? sel_mtu : portinfo.active_mtu;
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT
if (sel_gid_index < 0)
{
ibv_gid_entry best_gidx;
int net_num = 0;
int best_gid_idx = match_port_gid(osd_network_masks, context, port_num, portinfo.gid_tbl_len, log_level, &best_gidx, &net_num);
if (best_gid_idx >= 0)
{
if (log_level > 0)
log_rdma_dev_port_gid(dev, port_num, best_gid_idx, port_mtu, best_gidx);
auto ctx = msgr_rdma_context_t::create(dev, portinfo, port_num, best_gid_idx, port_mtu, odp, log_level);
if (ctx)
{
ctx->net_mask = osd_network_masks[net_num];
ret.push_back(ctx);
}
}
}
else
#endif
{
int best_gid_idx = sel_gid_index >= 0 ? sel_gid_index : 0;
#ifdef IBV_ADVISE_MR_ADVICE_PREFETCH_NO_FAULT
if (log_level > 0)
{
ibv_gid_entry gidx;
ibv_query_gid_ex(context, port_num, best_gid_idx, &gidx, 0);
log_rdma_dev_port_gid(dev, port_num, best_gid_idx, port_mtu, gidx);
}
#endif
auto ctx = msgr_rdma_context_t::create(dev, portinfo, port_num, best_gid_idx, port_mtu, odp, log_level);
if (ctx)
ret.push_back(ctx);
} }
} }
ctx->gid_index = gid_index = (gid_index == -1 ? 0 : gid_index); cleanup_dev:
if (log_level > 0) ibv_close_device(context);
{
log_rdma_dev_port_gid(ctx->dev, ctx->ib_port, ctx->gid_index, best_gidx);
}
ctx->my_gid = best_gidx.gid;
} }
#endif
cleanup:
if (raw_dev_list)
ibv_free_device_list(raw_dev_list);
return ret;
}
msgr_rdma_context_t *msgr_rdma_context_t::create(ibv_device *dev, ibv_port_attr & portinfo, int ib_port, int gid_index, uint32_t mtu, bool odp, int log_level)
{
msgr_rdma_context_t *ctx = new msgr_rdma_context_t();
ibv_context *context = ibv_open_device(dev);
if (!context)
{
fprintf(stderr, "Couldn't get RDMA context for %s\n", ibv_get_device_name(dev));
goto cleanup;
}
ctx->mtu = mtu;
ctx->context = context;
ctx->ib_port = ib_port;
ctx->portinfo = portinfo;
ctx->my_lid = ctx->portinfo.lid;
if (ctx->portinfo.link_layer != IBV_LINK_LAYER_ETHERNET && !ctx->my_lid)
{
fprintf(stderr, "RDMA device %s must have local LID because it's not Ethernet, but LID is zero\n", ibv_get_device_name(dev));
goto cleanup;
}
ctx->gid_index = gid_index;
ctx->pd = ibv_alloc_pd(ctx->context); ctx->pd = ibv_alloc_pd(ctx->context);
if (!ctx->pd) if (!ctx->pd)
@ -345,18 +303,18 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const std::vector<addr_mask_t>
goto cleanup; goto cleanup;
} }
ctx->odp = odp;
if (ctx->odp)
{ {
if (ibv_query_device_ex(ctx->context, NULL, &ctx->attrx)) if (ibv_query_device_ex(ctx->context, NULL, &ctx->attrx))
{ {
fprintf(stderr, "Couldn't query RDMA device for its features\n"); fprintf(stderr, "Couldn't query RDMA device for its features\n");
goto cleanup; goto cleanup;
} }
ctx->odp = odp; if (!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT) ||
if (ctx->odp &&
(!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT) ||
!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT_IMPLICIT) || !(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT_IMPLICIT) ||
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) || !(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) ||
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV))) !(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV))
{ {
ctx->odp = false; ctx->odp = false;
if (log_level > 0) if (log_level > 0)
@ -389,14 +347,12 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const std::vector<addr_mask_t>
goto cleanup; goto cleanup;
} }
if (dev_list)
ibv_free_device_list(dev_list);
return ctx; return ctx;
cleanup: cleanup:
if (context)
ibv_close_device(context);
delete ctx; delete ctx;
if (dev_list)
ibv_free_device_list(dev_list);
return NULL; return NULL;
} }
@ -544,7 +500,15 @@ bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address, uint64
{ {
client_max_msg = rdma_max_msg; client_max_msg = rdma_max_msg;
} }
auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, client_max_msg); auto cl = clients.at(peer_fd);
msgr_rdma_context_t *selected_ctx = choose_rdma_context(cl);
if (!selected_ctx)
{
if (log_level > 0)
fprintf(stderr, "No RDMA context for peer %d, using only TCP\n", cl->peer_fd);
return false;
}
msgr_rdma_connection_t *rdma_conn = msgr_rdma_connection_t::create(selected_ctx, rdma_max_send, rdma_max_recv, rdma_max_sge, client_max_msg);
if (rdma_conn) if (rdma_conn)
{ {
int r = rdma_conn->connect(&addr); int r = rdma_conn->connect(&addr);
@ -663,9 +627,9 @@ void osd_messenger_t::try_send_rdma_nodp(osd_client_t *cl)
// Allocate send ring buffer, if not yet // Allocate send ring buffer, if not yet
rc->send_out_size = rc->max_msg*rdma_max_send; rc->send_out_size = rc->max_msg*rdma_max_send;
rc->send_out.buf = malloc_or_die(rc->send_out_size); rc->send_out.buf = malloc_or_die(rc->send_out_size);
if (!rdma_context->odp) if (!rc->ctx->odp)
{ {
rc->send_out.mr = ibv_reg_mr(rdma_context->pd, rc->send_out.buf, rc->send_out_size, 0); rc->send_out.mr = ibv_reg_mr(rc->ctx->pd, rc->send_out.buf, rc->send_out_size, 0);
if (!rc->send_out.mr) if (!rc->send_out.mr)
{ {
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno)); fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
@ -695,7 +659,7 @@ void osd_messenger_t::try_send_rdma_nodp(osd_client_t *cl)
ibv_sge sge = { ibv_sge sge = {
.addr = (uintptr_t)dst, .addr = (uintptr_t)dst,
.length = (uint32_t)copied, .length = (uint32_t)copied,
.lkey = rdma_context->odp ? rdma_context->mr->lkey : rc->send_out.mr->lkey, .lkey = rc->ctx->odp ? rc->ctx->mr->lkey : rc->send_out.mr->lkey,
}; };
try_send_rdma_wr(cl, &sge, 1); try_send_rdma_wr(cl, &sge, 1);
rc->send_sizes.push_back(copied); rc->send_sizes.push_back(copied);
@ -705,7 +669,7 @@ void osd_messenger_t::try_send_rdma_nodp(osd_client_t *cl)
void osd_messenger_t::try_send_rdma(osd_client_t *cl) void osd_messenger_t::try_send_rdma(osd_client_t *cl)
{ {
if (rdma_context->odp) if (cl->rdma_conn->ctx->odp)
try_send_rdma_odp(cl); try_send_rdma_odp(cl);
else else
try_send_rdma_nodp(cl); try_send_rdma_nodp(cl);
@ -740,9 +704,9 @@ bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
{ {
msgr_rdma_buf_t b; msgr_rdma_buf_t b;
b.buf = malloc_or_die(rc->max_msg); b.buf = malloc_or_die(rc->max_msg);
if (!rdma_context->odp) if (!rc->ctx->odp)
{ {
b.mr = ibv_reg_mr(rdma_context->pd, b.buf, rc->max_msg, IBV_ACCESS_LOCAL_WRITE); b.mr = ibv_reg_mr(rc->ctx->pd, b.buf, rc->max_msg, IBV_ACCESS_LOCAL_WRITE);
if (!b.mr) if (!b.mr)
{ {
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno)); fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
@ -757,7 +721,7 @@ bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
#define RDMA_EVENTS_AT_ONCE 32 #define RDMA_EVENTS_AT_ONCE 32
void osd_messenger_t::handle_rdma_events() void osd_messenger_t::handle_rdma_events(msgr_rdma_context_t *rdma_context)
{ {
// Request next notification // Request next notification
ibv_cq *ev_cq; ibv_cq *ev_cq;

View File

@ -21,7 +21,6 @@ struct msgr_rdma_address_t
struct msgr_rdma_context_t struct msgr_rdma_context_t
{ {
ibv_context *context = NULL; ibv_context *context = NULL;
ibv_device *dev = NULL;
ibv_device_attr_ex attrx; ibv_device_attr_ex attrx;
ibv_pd *pd = NULL; ibv_pd *pd = NULL;
bool odp = false; bool odp = false;
@ -36,8 +35,13 @@ struct msgr_rdma_context_t
uint32_t mtu; uint32_t mtu;
int max_cqe = 0; int max_cqe = 0;
int used_max_cqe = 0; int used_max_cqe = 0;
addr_mask_t net_mask = {};
static std::vector<msgr_rdma_context_t*> create_all(const std::vector<addr_mask_t> & osd_network_masks,
const char *sel_dev_name, int sel_port_num, int sel_gid_index, uint32_t sel_mtu, bool odp, int log_level);
static msgr_rdma_context_t *create(ibv_device *dev, ibv_port_attr & portinfo,
int ib_port, int gid_index, uint32_t mtu, bool odp, int log_level);
static msgr_rdma_context_t *create(const std::vector<addr_mask_t> & osd_network_masks, const char *ib_devname, uint8_t ib_port, int gid_index, uint32_t mtu, bool odp, int log_level);
~msgr_rdma_context_t(); ~msgr_rdma_context_t();
}; };

View File

@ -46,6 +46,10 @@ void osd_t::autosync()
void osd_t::finish_op(osd_op_t *cur_op, int retval) void osd_t::finish_op(osd_op_t *cur_op, int retval)
{ {
inflight_ops--; inflight_ops--;
if (cur_op->req.hdr.opcode == OSD_OP_WRITE)
{
printf("%jx %jx+%x v%jx r=%d\n", cur_op->req.rw.inode, cur_op->req.rw.offset, cur_op->req.rw.len, cur_op->reply.rw.version, retval);
}
if (cur_op->req.hdr.opcode == OSD_OP_READ || if (cur_op->req.hdr.opcode == OSD_OP_READ ||
cur_op->req.hdr.opcode == OSD_OP_WRITE || cur_op->req.hdr.opcode == OSD_OP_WRITE ||
cur_op->req.hdr.opcode == OSD_OP_DELETE) cur_op->req.hdr.opcode == OSD_OP_DELETE)

View File

@ -93,6 +93,13 @@ bool cidr6_match(const in6_addr &address, const in6_addr &network, uint8_t bits)
return true; return true;
} }
bool cidr_sockaddr_match(const sockaddr_storage &addr, const addr_mask_t &mask)
{
return mask.family == addr.ss_family && (mask.family == AF_INET
? cidr_match(*(in_addr*)&addr, mask.ipv4, mask.bits)
: cidr6_match(*(in6_addr*)&addr, mask.ipv6, mask.bits));
}
addr_mask_t cidr_parse(std::string mask) addr_mask_t cidr_parse(std::string mask)
{ {
unsigned bits = 255; unsigned bits = 255;

View File

@ -18,5 +18,6 @@ std::string addr_to_string(const sockaddr_storage &addr);
addr_mask_t cidr_parse(std::string mask); addr_mask_t cidr_parse(std::string mask);
bool cidr_match(const in_addr &address, const in_addr &network, uint8_t bits); bool cidr_match(const in_addr &address, const in_addr &network, uint8_t bits);
bool cidr6_match(const in6_addr &address, const in6_addr &network, uint8_t bits); bool cidr6_match(const in6_addr &address, const in6_addr &network, uint8_t bits);
bool cidr_sockaddr_match(const sockaddr_storage &addr, const addr_mask_t &mask);
std::vector<std::string> getifaddr_list(const std::vector<addr_mask_t> & masks = std::vector<addr_mask_t>(), bool include_v6 = false); std::vector<std::string> getifaddr_list(const std::vector<addr_mask_t> & masks = std::vector<addr_mask_t>(), bool include_v6 = false);
int create_and_bind_socket(std::string bind_address, int bind_port, int listen_backlog, int *listening_port); int create_and_bind_socket(std::string bind_address, int bind_port, int listen_backlog, int *listening_port);