// Copyright (c) Vitaliy Filippov, 2019+ // License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) #include #include #include "msgr_rdma.h" #include "messenger.h" std::string msgr_rdma_address_t::to_string() { char msg[sizeof "0000:00000000:00000000:00000000000000000000000000000000"]; sprintf( msg, "%04x:%06x:%06x:%016lx%016lx", lid, qpn, psn, htobe64(((uint64_t*)&gid)[0]), htobe64(((uint64_t*)&gid)[1]) ); return std::string(msg); } bool msgr_rdma_address_t::from_string(const char *str, msgr_rdma_address_t *dest) { uint64_t* gid = (uint64_t*)&dest->gid; int n = sscanf( str, "%hx:%x:%x:%16lx%16lx", &dest->lid, &dest->qpn, &dest->psn, gid, gid+1 ); gid[0] = be64toh(gid[0]); gid[1] = be64toh(gid[1]); return n == 5; } msgr_rdma_context_t::~msgr_rdma_context_t() { if (cq) ibv_destroy_cq(cq); if (channel) ibv_destroy_comp_channel(channel); if (mr) ibv_dereg_mr(mr); if (pd) ibv_dealloc_pd(pd); if (context) ibv_close_device(context); } msgr_rdma_connection_t::~msgr_rdma_connection_t() { ctx->used_max_cqe -= max_send+max_recv; if (qp) ibv_destroy_qp(qp); if (in_data_mr) ibv_dereg_mr(in_data_mr); if (in_op_mr) ibv_dereg_mr(in_op_mr); if (in_data_buf) free(in_data_buf); if (in_ops) free(in_ops); if (out_op_alloc) delete out_op_alloc; if (out_slot_data) free(out_slot_data); if (out_slot_ops) free(out_slot_ops); } msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu, int log_level) { int res; ibv_device **dev_list = NULL; msgr_rdma_context_t *ctx = new msgr_rdma_context_t(); ctx->mtu = mtu; timespec tv; clock_gettime(CLOCK_REALTIME, &tv); srand48(tv.tv_sec*1000000000 + tv.tv_nsec); dev_list = ibv_get_device_list(NULL); if (!dev_list) { if (errno == -ENOSYS || errno == ENOSYS) { if (log_level > 0) fprintf(stderr, "No RDMA devices found (RDMA device list returned ENOSYS)\n"); } else fprintf(stderr, "Failed to get RDMA device list: %s\n", strerror(errno)); goto cleanup; } if (!ib_devname) { ctx->dev = *dev_list; if (!ctx->dev) { if (log_level > 0) fprintf(stderr, "No RDMA devices found\n"); goto cleanup; } } else { int i; for (i = 0; dev_list[i]; ++i) if (!strcmp(ibv_get_device_name(dev_list[i]), ib_devname)) break; ctx->dev = dev_list[i]; if (!ctx->dev) { fprintf(stderr, "RDMA device %s not found\n", ib_devname); goto cleanup; } } ctx->context = ibv_open_device(ctx->dev); if (!ctx->context) { fprintf(stderr, "Couldn't get RDMA context for %s\n", ibv_get_device_name(ctx->dev)); goto cleanup; } ctx->ib_port = ib_port; ctx->gid_index = gid_index; if ((res = ibv_query_port(ctx->context, ib_port, &ctx->portinfo)) != 0) { fprintf(stderr, "Couldn't get RDMA device %s port %d info: %s\n", ibv_get_device_name(ctx->dev), ib_port, strerror(res)); goto cleanup; } ctx->my_lid = ctx->portinfo.lid; if (ctx->portinfo.link_layer != IBV_LINK_LAYER_ETHERNET && !ctx->my_lid) { fprintf(stderr, "RDMA device %s must have local LID because it's not Ethernet, but LID is zero\n", ibv_get_device_name(ctx->dev)); goto cleanup; } if (ibv_query_gid(ctx->context, ib_port, gid_index, &ctx->my_gid)) { fprintf(stderr, "Couldn't read RDMA device %s GID index %d\n", ibv_get_device_name(ctx->dev), gid_index); goto cleanup; } ctx->pd = ibv_alloc_pd(ctx->context); if (!ctx->pd) { fprintf(stderr, "Couldn't allocate RDMA protection domain\n"); goto cleanup; } { if (ibv_query_device_ex(ctx->context, NULL, &ctx->attrx)) { fprintf(stderr, "Couldn't query RDMA device for its features\n"); goto cleanup; } if (!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT) || !(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT_IMPLICIT) || !(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) || !(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV)) { fprintf(stderr, "The RDMA device isn't implicit ODP (On-Demand Paging) capable or does not support RC send and receive with ODP\n"); goto cleanup; } } ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND); if (!ctx->mr) { fprintf(stderr, "Couldn't register global RDMA memory region: %s\n", strerror(errno)); goto cleanup; } ctx->channel = ibv_create_comp_channel(ctx->context); if (!ctx->channel) { fprintf(stderr, "Couldn't create RDMA completion channel\n"); goto cleanup; } ctx->max_cqe = 4096; ctx->cq = ibv_create_cq(ctx->context, ctx->max_cqe, NULL, ctx->channel, 0); if (!ctx->cq) { fprintf(stderr, "Couldn't create RDMA completion queue\n"); goto cleanup; } if (dev_list) ibv_free_device_list(dev_list); return ctx; cleanup: delete ctx; if (dev_list) ibv_free_device_list(dev_list); return NULL; } msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge, uint64_t op_slots, uint64_t op_memory) { msgr_rdma_connection_t *conn = new msgr_rdma_connection_t; max_sge = max_sge > ctx->attrx.orig_attr.max_sge ? ctx->attrx.orig_attr.max_sge : max_sge; conn->ctx = ctx; conn->max_send = max_send; conn->max_recv = max_recv; conn->max_sge = max_sge; ctx->used_max_cqe += max_send+max_recv; if (ctx->used_max_cqe > ctx->max_cqe) { // Resize CQ // Mellanox ConnectX-4 supports up to 4194303 CQEs, so it's fine to put everything into a single CQ int new_max_cqe = ctx->max_cqe; while (ctx->used_max_cqe > new_max_cqe) { new_max_cqe *= 2; } if (ibv_resize_cq(ctx->cq, new_max_cqe) != 0) { fprintf(stderr, "Couldn't resize RDMA completion queue to %d entries\n", new_max_cqe); delete conn; return NULL; } ctx->max_cqe = new_max_cqe; } conn->op_memory = op_memory; conn->in_data_buf = memalign_or_die(MEM_ALIGNMENT, op_memory); conn->in_data_mr = ibv_reg_mr(ctx->pd, conn->in_data_buf, op_memory, IBV_ACCESS_ZERO_BASED | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_ON_DEMAND); if (!conn->in_data_mr) { fprintf(stderr, "Couldn't register %lu MB RDMA memory region for incoming data: %s\n", (op_memory+1024*1024-1)/1024/1024, strerror(errno)); delete conn; return NULL; } conn->op_slots = op_slots; conn->in_ops = (msgr_rdma_cmd_t *)malloc_or_die(sizeof(msgr_rdma_cmd_t) * op_slots); conn->in_op_mr = ibv_reg_mr(ctx->pd, conn->in_ops, sizeof(msgr_rdma_cmd_t) * op_slots, IBV_ACCESS_ZERO_BASED | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_ON_DEMAND); if (!conn->in_op_mr) { fprintf(stderr, "Couldn't register %lu KB RDMA memory region for incoming operation headers: %s\n", (sizeof(msgr_rdma_cmd_t) * op_slots + 1023)/1024, strerror(errno)); delete conn; return NULL; } ibv_qp_init_attr init_attr = { .send_cq = ctx->cq, .recv_cq = ctx->cq, .cap = { .max_send_wr = max_send, .max_recv_wr = max_recv, .max_send_sge = max_sge, .max_recv_sge = max_sge, }, .qp_type = IBV_QPT_RC, }; conn->qp = ibv_create_qp(ctx->pd, &init_attr); if (!conn->qp) { fprintf(stderr, "Couldn't create RDMA queue pair\n"); delete conn; return NULL; } conn->addr.lid = ctx->my_lid; conn->addr.gid = ctx->my_gid; conn->addr.qpn = conn->qp->qp_num; conn->addr.psn = lrand48() & 0xffffff; ibv_qp_attr attr = { .qp_state = IBV_QPS_INIT, .qp_access_flags = IBV_ACCESS_REMOTE_WRITE, .pkey_index = 0, .port_num = ctx->ib_port, }; if (ibv_modify_qp(conn->qp, &attr, IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS)) { fprintf(stderr, "Failed to switch RDMA queue pair to INIT state\n"); delete conn; return NULL; } return conn; } static ibv_mtu mtu_to_ibv_mtu(uint32_t mtu) { switch (mtu) { case 256: return IBV_MTU_256; case 512: return IBV_MTU_512; case 1024: return IBV_MTU_1024; case 2048: return IBV_MTU_2048; case 4096: return IBV_MTU_4096; } return IBV_MTU_4096; } void msgr_rdma_connection_t::set_out_capacity(uint32_t out_data_rkey, uint32_t out_op_rkey, uint64_t out_op_slots, uint64_t out_op_memory) { assert(!out_op_alloc); this->out_data_rkey = out_data_rkey; this->out_op_rkey = out_op_rkey; this->out_op_slots = out_op_slots; this->out_op_memory = out_op_memory; out_op_alloc = new allocator(out_op_slots); out_data_alloc.free(0, out_op_memory); out_slot_data = (msgr_rdma_out_pos_t *)malloc_or_die(sizeof(msgr_rdma_out_pos_t) * out_op_slots); out_slot_ops = (osd_op_t **)malloc_or_die(sizeof(osd_op_t *) * out_op_slots); } int msgr_rdma_connection_t::connect(msgr_rdma_address_t *dest) { auto conn = this; ibv_qp_attr attr = { .qp_state = IBV_QPS_RTR, .path_mtu = mtu_to_ibv_mtu(conn->ctx->mtu), .rq_psn = dest->psn, .sq_psn = conn->addr.psn, .dest_qp_num = dest->qpn, .ah_attr = { .grh = { .dgid = dest->gid, .sgid_index = conn->ctx->gid_index, .hop_limit = 1, // FIXME can it vary? }, .dlid = dest->lid, .sl = 0, // service level .src_path_bits = 0, .is_global = (uint8_t)(dest->gid.global.interface_id ? 1 : 0), .port_num = conn->ctx->ib_port, }, .max_rd_atomic = 1, .max_dest_rd_atomic = 1, // Timeout and min_rnr_timer actual values seem to be 4.096us*2^(timeout+1) .min_rnr_timer = 1, .timeout = 14, .retry_cnt = 7, .rnr_retry = 7, }; // FIXME No idea if ibv_modify_qp is a blocking operation or not. No idea if it has a timeout and what it is. if (ibv_modify_qp(conn->qp, &attr, IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER)) { fprintf(stderr, "Failed to switch RDMA queue pair to RTR (ready-to-receive) state\n"); return 1; } attr.qp_state = IBV_QPS_RTS; if (ibv_modify_qp(conn->qp, &attr, IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC)) { fprintf(stderr, "Failed to switch RDMA queue pair to RTS (ready-to-send) state\n"); return 1; } return 0; } bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address, uint32_t out_data_rkey, uint32_t out_op_rkey, uint64_t out_op_slots, uint64_t out_op_memory) { // Try to connect to the peer using RDMA msgr_rdma_address_t addr; if (msgr_rdma_address_t::from_string(rdma_address.c_str(), &addr)) { auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_op_slots, rdma_op_memory); if (rdma_conn) { int r = rdma_conn->connect(&addr); if (r != 0) { delete rdma_conn; fprintf( stderr, "Failed to connect RDMA queue pair to %s (client %d)\n", addr.to_string().c_str(), peer_fd ); } else { // Remember connection, but switch to RDMA only after sending the configuration response clients_by_qp[rdma_conn->qp->qp_num] = peer_fd; rdma_conn->set_out_capacity(out_data_rkey, out_op_rkey, out_op_slots, out_op_memory); auto cl = clients.at(peer_fd); cl->rdma_conn = rdma_conn; cl->peer_state = PEER_RDMA_CONNECTING; return true; } } } return false; } bool osd_messenger_t::try_send_rdma(osd_client_t *cl) { auto rc = cl->rdma_conn; if (!cl->send_list.size() && !rc->in_slots_freed.size() || rc->cur_send >= rc->max_send) { return true; } int i = 0; while (i < rc->in_slots_freed.size()) { auto op_slot = rc->in_slots_freed[i++]; assert(op_slot < 0x80000000); ibv_send_wr *bad_wr = NULL; ibv_send_wr wr = { .wr_id = 0, .opcode = IBV_WR_RDMA_WRITE_WITH_IMM, .imm_data = 0x80000000 | op_slot, }; int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr); if (err || bad_wr) { fprintf(stderr, "RDMA send failed: %s\n", strerror(err)); exit(1); } rc->cur_send++; if (rc->cur_send >= rc->max_send) { break; } } rc->in_slots_freed.erase(rc->in_slots_freed.begin(), rc->in_slots_freed.begin()+i); if (!cl->send_list.size() || rc->cur_send >= rc->max_send) { return true; } ibv_sge sge[rc->max_sge]; int op_start = 0; while (op_start < cl->send_list.size()) { uint64_t op_data_size = 0; int op_end = op_start; while (!(cl->outbox[op_end].flags & MSGR_SENDP_LAST)) { op_data_size += cl->send_list[op_end].iov_len; op_end++; } op_data_size += cl->send_list[op_end].iov_len; op_end++; op_data_size -= cl->send_list[op_start].iov_len; // Operation boundaries in send_list: op_start..op_end, first iovec is the header uint64_t op_slot = rc->out_op_alloc->find_free(); if (op_slot == UINT64_MAX) { // op queue is full return true; } uint64_t data_pos = UINT64_MAX; if (op_data_size >= 0) { if (rc->cur_send > rc->max_send-1-(op_end-op_start-1+rc->max_sge)/rc->max_sge) { // RDMA queue is full return true; } // FIXME: Oops, and what if op data is larger than the whole buffer... :) data_pos = rc->out_data_alloc.alloc(op_data_size); if (data_pos == UINT64_MAX) { // data buffers are full return true; } int cur_sge = 0; for (int data_sent = 1; data_sent < op_end; data_sent++) { sge[cur_sge++] = { .addr = (uintptr_t)cl->send_list[data_sent].iov_base, .length = (uint32_t)cl->send_list[data_sent].iov_len, .lkey = rc->ctx->mr->lkey, }; if (data_sent == op_end-1 || cur_sge >= rc->max_sge) { ibv_send_wr *bad_wr = NULL; ibv_send_wr wr = { .wr_id = op_slot, .next = NULL, .sg_list = sge, .num_sge = cur_sge, .opcode = IBV_WR_RDMA_WRITE, .send_flags = 0, .wr = { .rdma = { .remote_addr = data_pos, .rkey = rc->out_data_rkey, }, }, }; int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr); if (err || bad_wr) { fprintf(stderr, "RDMA send failed: %s\n", strerror(err)); exit(1); } rc->cur_send++; cur_sge = 0; } } } if (rc->cur_send > rc->max_send-1) { // RDMA queue is full return true; } rc->out_op_alloc->set(op_slot, true); assert(cl->send_list[op_start].iov_len == OSD_PACKET_SIZE); sge[0] = { .addr = (uintptr_t)cl->send_list[op_start].iov_base, .length = (uint32_t)cl->send_list[op_start].iov_len, .lkey = rc->ctx->mr->lkey, }; rc->out_slot_data[op_slot] = { .data_pos = data_pos, .data_size = op_data_size }; rc->out_slot_ops[op_slot] = (cl->outbox[op_end-1].flags & MSGR_SENDP_FREE) ? cl->outbox[op_end-1].op : NULL; sge[1] = { .addr = (uintptr_t)(rc->out_slot_data+op_slot), .length = sizeof(rc->out_slot_data[op_slot]), .lkey = rc->ctx->mr->lkey, }; ibv_send_wr *bad_wr = NULL; ibv_send_wr wr = { .wr_id = op_slot, .next = NULL, .sg_list = sge, .num_sge = 2, .opcode = IBV_WR_RDMA_WRITE_WITH_IMM, .send_flags = IBV_SEND_SIGNALED, .imm_data = (uint32_t)op_slot, .wr = { .rdma = { .remote_addr = op_slot*sizeof(msgr_rdma_cmd_t), .rkey = rc->out_op_rkey, }, }, }; int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr); if (err || bad_wr) { fprintf(stderr, "RDMA send failed: %s\n", strerror(err)); exit(1); } rc->cur_send++; op_start = op_end; } if (op_start > 0) { cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+op_start); } return true; } static void try_recv_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge) { ibv_recv_wr *bad_wr = NULL; ibv_recv_wr wr = { .wr_id = (uint64_t)(cl->peer_fd*2), .sg_list = sge, .num_sge = op_sge, }; int err = ibv_post_recv(cl->rdma_conn->qp, &wr, &bad_wr); if (err || bad_wr) { fprintf(stderr, "RDMA receive failed: %s\n", strerror(err)); exit(1); } cl->rdma_conn->cur_recv++; } static void copy_data_to_recv_list(uint8_t *data_buf, uint64_t data_size, osd_client_t *cl) { uint64_t pos = 0; while (cl->recv_list.done < cl->recv_list.count) { uint64_t cur = cl->recv_list.buf[cl->recv_list.done].iov_len; assert(cur <= data_size-pos); memcpy(cl->recv_list.buf[cl->recv_list.done].iov_base, data_buf+pos, cur); pos += cur; } cl->recv_list.reset(); } bool osd_messenger_t::try_recv_rdma(osd_client_t *cl) { auto rc = cl->rdma_conn; while (rc->cur_recv < rc->max_recv) { try_recv_rdma_wr(cl, NULL, 0); } return true; } bool osd_messenger_t::rdma_handle_op(osd_client_t *cl, uint32_t op_slot) { auto rc = cl->rdma_conn; if (op_slot >= rc->in_op_cap) { // Invalid incoming index fprintf(stderr, "Client %d invalid incoming RDMA op slot: %u, dropping connection\n", cl->peer_fd, op_slot); stop_client(cl->peer_fd); return false; } osd_op_header_t *hdr = (osd_op_header_t *)rc->in_ops[op_slot].header; uint8_t *data_buf = (uint8_t*)rc->in_data_buf + rc->in_ops[op_slot].pos.data_pos; uint64_t data_size = rc->in_ops[op_slot].pos.data_size; if (hdr->magic == SECONDARY_OSD_REPLY_MAGIC) { // Reply if (cl->read_op) { delete cl->read_op; cl->read_op = NULL; } if (!handle_reply_hdr(rc->in_ops[op_slot].header, cl)) return false; if (cl->read_state == CL_READ_REPLY_DATA) { // copy reply data to cl->recv_list copy_data_to_recv_list(data_buf, data_size, cl); // and handle reply with data handle_reply_ready(cl->read_op); cl->read_op = NULL; cl->read_state = 0; cl->read_remaining = 0; } } else { // Operation cl->read_op = new osd_op_t; cl->read_op->peer_fd = cl->peer_fd; cl->read_op->op_type = OSD_OP_IN; memcpy(&cl->read_op->req, hdr, OSD_PACKET_SIZE); handle_op_hdr(cl); if (cl->read_state == CL_READ_DATA) { copy_data_to_recv_list(data_buf, data_size, cl); // And handle the incoming op with data cl->received_ops.push_back(cl->read_op); set_immediate.push_back([this, op = cl->read_op]() { exec_op(op); }); cl->read_op = NULL; cl->read_state = 0; } } // We don't need the incoming data buffer anymore, notify peer about it // FIXME: Allow to pass memory to the internal layer without copying and notify after handling it rc->in_slots_freed.push_back(op_slot); return true; } #define RDMA_EVENTS_AT_ONCE 32 void osd_messenger_t::handle_rdma_events() { // Request next notification ibv_cq *ev_cq; void *ev_ctx; // FIXME: This is inefficient as it calls read()... if (ibv_get_cq_event(rdma_context->channel, &ev_cq, &ev_ctx) == 0) { ibv_ack_cq_events(rdma_context->cq, 1); } if (ibv_req_notify_cq(rdma_context->cq, 0) != 0) { fprintf(stderr, "Failed to request RDMA completion notification, exiting\n"); exit(1); } ibv_wc wc[RDMA_EVENTS_AT_ONCE]; int event_count; do { event_count = ibv_poll_cq(rdma_context->cq, RDMA_EVENTS_AT_ONCE, wc); for (int i = 0; i < event_count; i++) { auto cqp_it = clients_by_qp.find(wc[i].qp_num); int peer_fd = cqp_it != clients_by_qp.end() ? cqp_it->second : -1; auto cl_it = clients.find(peer_fd); if (cl_it == clients.end()) { continue; } osd_client_t *cl = cl_it->second; if (wc[i].status != IBV_WC_SUCCESS) { fprintf(stderr, "RDMA work request failed for client %d", peer_fd); if (cl->osd_num) fprintf(stderr, " (OSD %lu)", cl->osd_num); fprintf(stderr, " with status: %s, stopping client\n", ibv_wc_status_str(wc[i].status)); if (peer_fd >= 0) stop_client(peer_fd); continue; } auto rc = cl->rdma_conn; if (wc[i].opcode == IBV_WC_RDMA_WRITE) { // Operation or reply is sent, we can free it auto & op = rc->out_slot_ops[wc[i].wr_id]; if (op) { delete op; op = NULL; } rc->cur_send--; try_send_rdma(cl); } else if (wc[i].opcode == IBV_WC_RECV) { if (!(wc[i].imm_data & 0x80000000)) { // Operation or reply received. Handle it if (!rdma_handle_op(cl, wc[i].imm_data)) { // false means that the client is stopped due to invalid operation continue; } rc->cur_recv--; try_recv_rdma(cl); } else { // Outbox slot is marked as free (the remote side doesn't need it anymore) uint32_t op_slot = wc[i].imm_data & 0x7FFFFFFF; auto & pos = rc->in_ops[op_slot].pos; if (pos.data_size > 0) rc->out_data_alloc.free(pos.data_pos, pos.data_size); rc->out_op_alloc->set(op_slot, false); } // Try to continue sending try_send_rdma(cl); } } } while (event_count > 0); for (auto cb: set_immediate) { cb(); } set_immediate.clear(); }