#include #include #include "msgr_rdma.h" #include "messenger.h" std::string msgr_rdma_address_t::to_string() { char msg[sizeof "0000:00000000:00000000:00000000000000000000000000000000"]; sprintf( msg, "%04x:%06x:%06x:%016lx%016lx", lid, qpn, psn, htobe64(((uint64_t*)&gid)[0]), htobe64(((uint64_t*)&gid)[1]) ); return std::string(msg); } bool msgr_rdma_address_t::from_string(const char *str, msgr_rdma_address_t *dest) { uint64_t* gid = (uint64_t*)&dest->gid; int n = sscanf( str, "%hx:%x:%x:%16lx%16lx", &dest->lid, &dest->qpn, &dest->psn, gid, gid+1 ); gid[0] = be64toh(gid[0]); gid[1] = be64toh(gid[1]); return n == 5; } msgr_rdma_context_t::~msgr_rdma_context_t() { if (cq) ibv_destroy_cq(cq); if (channel) ibv_destroy_comp_channel(channel); if (mr) ibv_dereg_mr(mr); if (pd) ibv_dealloc_pd(pd); if (context) ibv_close_device(context); } msgr_rdma_connection_t::~msgr_rdma_connection_t() { ctx->used_max_cqe -= max_send+max_recv; if (qp) ibv_destroy_qp(qp); } msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu) { int res; ibv_device **dev_list = NULL; msgr_rdma_context_t *ctx = new msgr_rdma_context_t(); ctx->mtu = mtu; dev_list = ibv_get_device_list(NULL); if (!dev_list) { fprintf(stderr, "Failed to get RDMA device list: %s\n", strerror(errno)); goto cleanup; } if (!ib_devname) { ctx->dev = *dev_list; if (!ctx->dev) { fprintf(stderr, "No RDMA devices found\n"); goto cleanup; } } else { int i; for (i = 0; dev_list[i]; ++i) if (!strcmp(ibv_get_device_name(dev_list[i]), ib_devname)) break; ctx->dev = dev_list[i]; if (!ctx->dev) { fprintf(stderr, "RDMA device %s not found\n", ib_devname); goto cleanup; } } ctx->context = ibv_open_device(ctx->dev); if (!ctx->context) { fprintf(stderr, "Couldn't get RDMA context for %s\n", ibv_get_device_name(ctx->dev)); goto cleanup; } ctx->ib_port = ib_port; ctx->gid_index = gid_index; if ((res = ibv_query_port(ctx->context, ib_port, &ctx->portinfo)) != 0) { fprintf(stderr, "Couldn't get RDMA device %s port %d info: %s\n", ibv_get_device_name(ctx->dev), ib_port, strerror(res)); goto cleanup; } ctx->my_lid = ctx->portinfo.lid; if (ctx->portinfo.link_layer != IBV_LINK_LAYER_ETHERNET && !ctx->my_lid) { fprintf(stderr, "RDMA device %s must have local LID because it's not Ethernet, but LID is zero\n", ibv_get_device_name(ctx->dev)); goto cleanup; } if (ibv_query_gid(ctx->context, ib_port, gid_index, &ctx->my_gid)) { fprintf(stderr, "Couldn't read RDMA device %s GID index %d\n", ibv_get_device_name(ctx->dev), gid_index); goto cleanup; } ctx->pd = ibv_alloc_pd(ctx->context); if (!ctx->pd) { fprintf(stderr, "Couldn't allocate RDMA protection domain\n"); goto cleanup; } { if (ibv_query_device_ex(ctx->context, NULL, &ctx->attrx)) { fprintf(stderr, "Couldn't query RDMA device for its features\n"); goto cleanup; } if (!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT) || !(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT_IMPLICIT) || !(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) || !(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV)) { fprintf(stderr, "The RDMA device isn't implicit ODP (On-Demand Paging) capable or does not support RC send and receive with ODP\n"); goto cleanup; } } ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND); if (!ctx->mr) { fprintf(stderr, "Couldn't register RDMA memory region\n"); goto cleanup; } ctx->channel = ibv_create_comp_channel(ctx->context); if (!ctx->channel) { fprintf(stderr, "Couldn't create RDMA completion channel\n"); goto cleanup; } ctx->max_cqe = 4096; ctx->cq = ibv_create_cq(ctx->context, ctx->max_cqe, NULL, ctx->channel, 0); if (!ctx->cq) { fprintf(stderr, "Couldn't create RDMA completion queue\n"); goto cleanup; } if (dev_list) ibv_free_device_list(dev_list); return ctx; cleanup: delete ctx; if (dev_list) ibv_free_device_list(dev_list); return NULL; } msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge) { msgr_rdma_connection_t *conn = new msgr_rdma_connection_t; max_sge = max_sge > ctx->attrx.orig_attr.max_sge ? ctx->attrx.orig_attr.max_sge : max_sge; conn->ctx = ctx; conn->max_send = max_send; conn->max_recv = max_recv; conn->max_sge = max_sge; ctx->used_max_cqe += max_send+max_recv; if (ctx->used_max_cqe > ctx->max_cqe) { // Resize CQ // Mellanox ConnectX-4 supports up to 4194303 CQEs, so it's fine to put everything into a single CQ int new_max_cqe = ctx->max_cqe; while (ctx->used_max_cqe > new_max_cqe) { new_max_cqe *= 2; } if (ibv_resize_cq(ctx->cq, new_max_cqe) != 0) { fprintf(stderr, "Couldn't resize RDMA completion queue to %d entries\n", new_max_cqe); delete conn; return NULL; } ctx->max_cqe = new_max_cqe; } ibv_qp_init_attr init_attr = { .send_cq = ctx->cq, .recv_cq = ctx->cq, .cap = { .max_send_wr = max_send, .max_recv_wr = max_recv, .max_send_sge = max_sge, .max_recv_sge = max_sge, }, .qp_type = IBV_QPT_RC, }; conn->qp = ibv_create_qp(ctx->pd, &init_attr); if (!conn->qp) { fprintf(stderr, "Couldn't create RDMA queue pair\n"); delete conn; return NULL; } conn->addr.lid = ctx->my_lid; conn->addr.gid = ctx->my_gid; conn->addr.qpn = conn->qp->qp_num; conn->addr.psn = lrand48() & 0xffffff; ibv_qp_attr attr = { .qp_state = IBV_QPS_INIT, .qp_access_flags = 0, .pkey_index = 0, .port_num = ctx->ib_port, }; if (ibv_modify_qp(conn->qp, &attr, IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS)) { fprintf(stderr, "Failed to switch RDMA queue pair to INIT state\n"); delete conn; return NULL; } return conn; } static ibv_mtu mtu_to_ibv_mtu(uint32_t mtu) { switch (mtu) { case 256: return IBV_MTU_256; case 512: return IBV_MTU_512; case 1024: return IBV_MTU_1024; case 2048: return IBV_MTU_2048; case 4096: return IBV_MTU_4096; } return IBV_MTU_4096; } int msgr_rdma_connection_t::connect(msgr_rdma_address_t *dest) { auto conn = this; ibv_qp_attr attr = { .qp_state = IBV_QPS_RTR, .path_mtu = mtu_to_ibv_mtu(conn->ctx->mtu), .rq_psn = dest->psn, .sq_psn = conn->addr.psn, .dest_qp_num = dest->qpn, .ah_attr = { .grh = { .dgid = dest->gid, .sgid_index = conn->ctx->gid_index, .hop_limit = 1, // FIXME can it vary? }, .dlid = dest->lid, .sl = 0, // service level .src_path_bits = 0, .is_global = (uint8_t)(dest->gid.global.interface_id ? 1 : 0), .port_num = conn->ctx->ib_port, }, .max_rd_atomic = 1, .max_dest_rd_atomic = 1, // Timeout and min_rnr_timer actual values seem to be 4.096us*2^(timeout+1) .min_rnr_timer = 1, .timeout = 14, .retry_cnt = 7, .rnr_retry = 7, }; // FIXME No idea if ibv_modify_qp is a blocking operation or not. No idea if it has a timeout and what it is. if (ibv_modify_qp(conn->qp, &attr, IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER)) { fprintf(stderr, "Failed to switch RDMA queue pair to RTR (ready-to-receive) state\n"); return 1; } attr.qp_state = IBV_QPS_RTS; if (ibv_modify_qp(conn->qp, &attr, IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC)) { fprintf(stderr, "Failed to switch RDMA queue pair to RTS (ready-to-send) state\n"); return 1; } return 0; } // Being the client, connect all server's RDMA queues to our local (client) queues bool osd_messenger_t::connect_rdma_server(osd_client_t *cl, json11::Json rdma_addresses, uint64_t server_max_sge) { if (rdma_addresses.array_items().size() > 0) { if (!server_max_sge || server_max_sge > rdma_max_sge) { server_max_sge = rdma_max_sge; } int n_conn = rdma_addresses.array_items().size(); if (n_conn < cl->rdma_queues.size()) { for (int i = n_conn; i < cl->rdma_queues.size(); i++) { delete cl->rdma_queues[i]; } cl->rdma_queues.resize(n_conn); } else if (n_conn > cl->rdma_queues.size()) { n_conn = cl->rdma_queues.size(); } for (int i = 0; i < n_conn; i++) { msgr_rdma_address_t addr; if (!msgr_rdma_address_t::from_string(rdma_addresses[i].string_value().c_str(), &addr) || cl->rdma_queues[i]->connect(&addr) != 0) { printf( "Failed to connect to OSD %lu (address %s) using RDMA\n", cl->osd_num, rdma_addresses[i].string_value().c_str() ); // FIXME: Keep TCP connection in this case osd_num_t peer_osd = cl->osd_num; stop_client(cl->peer_fd); on_connect_peer(peer_osd, -1); return false; } else { printf("Connected local queue %d to OSD %lu queue %d using RDMA\n", cl->rdma_queues[i]->qp->qp_num, cl->osd_num, addr.qpn); if (cl->rdma_queues[i]->max_sge > server_max_sge) { cl->rdma_queues[i]->max_sge = server_max_sge; } } } cl->peer_state = PEER_RDMA; tfd->set_fd_handler(cl->peer_fd, false, NULL); } else { for (auto rdma_conn: cl->rdma_queues) { delete rdma_conn; } cl->rdma_queues.resize(0); } return true; } // Being the server, try to connect all client's RDMA queues to our local (server) queues bool osd_messenger_t::connect_rdma_client(osd_client_t *cl, json11::Json rdma_addresses, uint64_t client_max_sge) { if (rdma_addresses.array_items().size() > 0) { if (!client_max_sge || client_max_sge > rdma_max_sge) { client_max_sge = rdma_max_sge; } int n_conn = rdma_addresses.array_items().size(); if (n_conn > rdma_queues_per_connection) { n_conn = rdma_queues_per_connection; } for (int i = 0; i < n_conn; i++) { msgr_rdma_address_t addr; if (msgr_rdma_address_t::from_string(rdma_addresses[i].string_value().c_str(), &addr)) { auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, client_max_sge); if (rdma_conn && rdma_conn->connect(&addr) == 0) { printf("Connected local queue %d to client %d queue %d using RDMA\n", rdma_conn->qp->qp_num, cl->peer_fd, addr.qpn); cl->rdma_queues.push_back(rdma_conn); } else { if (rdma_conn) { delete rdma_conn; } printf( "Failed to connect RDMA queue pair to %s (client %d queue %d)\n", addr.to_string().c_str(), cl->peer_fd, i+1 ); // Delete all RDMA queues to keep the TCP connection for (int j = 0; j < cl->rdma_queues.size(); j++) { delete cl->rdma_queues[j]; } cl->rdma_queues.resize(0); return false; } } } // Switch to RDMA state only after sending the configuration response cl->peer_state = PEER_RDMA_CONNECTING; for (int i = 0; i < cl->rdma_queues.size(); i++) { try_recv_rdma(cl, cl->rdma_queues[i]); } } return true; } static void try_send_rdma_wr(msgr_rdma_connection_t *rc, uint64_t wr_id, ibv_sge *sge, int op_sge) { timespec tv; clock_gettime(CLOCK_REALTIME, &tv); uint64_t total = 0; for (int i = 0; i < op_sge; i++) total += sge[i].length; printf("%lu.%09lu RDMA send to queue %d: %lu bytes\n", tv.tv_sec, tv.tv_nsec, rc->qp->qp_num, total); ibv_send_wr *bad_wr = NULL; ibv_send_wr wr = { .wr_id = wr_id, .sg_list = sge, .num_sge = op_sge, .opcode = IBV_WR_SEND, .send_flags = IBV_SEND_SIGNALED, }; int err = ibv_post_send(rc->qp, &wr, &bad_wr); if (err || bad_wr) { printf("RDMA send failed: %s\n", strerror(err)); exit(1); } rc->cur_send++; } static void try_recv_rdma_wr(msgr_rdma_connection_t *rc, uint64_t wr_id, ibv_sge *sge, int op_sge) { timespec tv; clock_gettime(CLOCK_REALTIME, &tv); uint64_t total = 0; for (int i = 0; i < op_sge; i++) total += sge[i].length; printf("%lu.%09lu RDMA receive from queue %d: %lu bytes\n", tv.tv_sec, tv.tv_nsec, rc->qp->qp_num, total); ibv_recv_wr *bad_wr = NULL; ibv_recv_wr wr = { .wr_id = wr_id, .sg_list = sge, .num_sge = op_sge, }; int err = ibv_post_recv(rc->qp, &wr, &bad_wr); if (err || bad_wr) { printf("RDMA receive failed: %s\n", strerror(err)); exit(1); } rc->cur_recv++; } static bool try_recv_rdma_read(osd_client_t *cl, msgr_rdma_connection_t *rc, osd_op_t *cur_op, uint32_t bs_bitmap_granularity) { int op_size = bs_bitmap_granularity, op_sge = 1, op_max = rc->max_sge*bs_bitmap_granularity; iovec *segments = cur_op->iov.get_iovec(); ibv_sge sge[rc->max_sge]; sge[0] = { .addr = (uintptr_t)cur_op->reply.buf, .length = (uint32_t)OSD_PACKET_SIZE, .lkey = rc->ctx->mr->lkey, }; while (rc->recv_pos < cur_op->iov.get_size()) { iovec & iov = segments[rc->recv_pos]; if (op_size >= op_max || op_sge >= rc->max_sge) { try_recv_rdma_wr(rc, cl->peer_fd, sge, op_sge); op_sge = 0; op_size = 0; if (rc->cur_recv >= rc->max_recv) { // FIXME exit(1); } } // Receive in (max_sge*4k) fragments uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->recv_buf_pos < op_max ? iov.iov_len-rc->recv_buf_pos : op_max-op_size); sge[op_sge++] = { .addr = (uintptr_t)(iov.iov_base+rc->recv_buf_pos), .length = len, .lkey = rc->ctx->mr->lkey, }; op_size += len; rc->recv_buf_pos += len; if (rc->recv_buf_pos >= iov.iov_len) { rc->recv_pos++; rc->recv_buf_pos = 0; } } if (op_sge > 0) { try_recv_rdma_wr(rc, cl->peer_fd, sge, op_sge); } rc->recv_pos = 0; rc->recv_buf_pos = 0; return true; } static bool try_send_rdma_read(osd_client_t *cl, msgr_rdma_connection_t *rc, osd_op_t *cur_op, int op_list_size, uint32_t bs_bitmap_granularity) { ibv_sge sge[rc->max_sge]; int op_size = bs_bitmap_granularity, op_sge = 1, op_max = rc->max_sge*bs_bitmap_granularity; sge[0] = { .addr = (uintptr_t)cl->send_list[0].iov_base, .length = (uint32_t)cl->send_list[0].iov_len, .lkey = rc->ctx->mr->lkey, }; rc->send_pos = 1; while (rc->send_pos < op_list_size) { iovec & iov = cl->send_list[rc->send_pos]; if (cl->outbox[rc->send_pos].flags & MSGR_SENDP_HDR) { if (op_sge > 0) { try_send_rdma_wr(rc, cl->peer_fd, sge, op_sge); op_sge = 0; op_size = 0; if (rc->cur_send >= rc->max_send) break; } assert(rc->send_buf_pos == 0); sge[0] = { .addr = (uintptr_t)iov.iov_base, .length = (uint32_t)iov.iov_len, .lkey = rc->ctx->mr->lkey, }; try_send_rdma_wr(rc, cl->peer_fd, sge, 1); rc->send_pos++; if (rc->cur_send >= rc->max_send) break; } else { if (op_size >= op_max || op_sge >= rc->max_sge) { try_send_rdma_wr(rc, cl->peer_fd, sge, op_sge); op_sge = 0; op_size = 0; if (rc->cur_send >= rc->max_send) break; } // Fragment all messages into parts no longer than (max_sge*4k) = 120k on ConnectX-4 // Otherwise the client may not be able to receive them in small parts uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->send_buf_pos < op_max ? iov.iov_len-rc->send_buf_pos : op_max-op_size); sge[op_sge++] = { .addr = (uintptr_t)(iov.iov_base+rc->send_buf_pos), .length = len, .lkey = rc->ctx->mr->lkey, }; op_size += len; rc->send_buf_pos += len; if (rc->send_buf_pos >= iov.iov_len) { rc->send_pos++; rc->send_buf_pos = 0; } } } if (op_sge > 0) { try_send_rdma_wr(rc, cl->peer_fd, sge, op_sge); } if (op_list_size == 1) { if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ || cur_op->req.hdr.opcode == OSD_OP_READ) { sge[0] = { .addr = 0, .length = 0, .lkey = rc->ctx->mr->lkey, }; uint64_t data_size = cur_op->req.hdr.opcode == OSD_OP_SEC_READ ? cur_op->req.sec_rw.len : cur_op->req.rw.len; while (data_size >= op_max) { try_send_rdma_wr(rc, cl->peer_fd, sge, 1); data_size -= op_max; } if (data_size > 0) try_send_rdma_wr(rc, cl->peer_fd, sge, 1); } else if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP) { sge[0] = { .addr = 0, .length = 0, .lkey = rc->ctx->mr->lkey, }; try_send_rdma_wr(rc, cl->peer_fd, sge, 1); } else return true; } return true; } void osd_messenger_t::try_send_rdma(osd_client_t *cl) { // Two different algorithms for outgoing and incoming operations while (cl->outbox.size() > 0) { osd_op_t *cur_op = cl->outbox[0].op; if (cur_op->op_type == OSD_OP_OUT) { // Pick a queue. Send operation to it in one part. int qi; for (qi = 0; qi < cl->rdma_queues.size() && cl->rdma_queues[qi]->cur_op != NULL; qi++) {} if (qi >= cl->rdma_queues.size()) { // No free queues, retry later. // We only post 1 operation per queue to use the queue pair number as a 'tag'. return; } // Pick all entries for the operation from the queue int op_list_size = 0; while (op_list_size < cl->outbox.size() && cl->outbox[op_list_size].op == cur_op) { op_list_size++; } auto rq = cl->rdma_queues[qi]; rq->cur_op = cur_op; ibv_sge sge[rq->max_sge]; // FIXME: This won't work with long bitmaps. But I don't care, I want to finally test fucking RDMA // header or header+data sge[0] = { .addr = (uintptr_t)cl->send_list[0].iov_base, .length = (uint32_t)cl->send_list[0].iov_len, .lkey = rq->ctx->mr->lkey, }; if (op_list_size == 2) { auto & iov = cl->send_list[1]; sge[1] = { .addr = (uintptr_t)iov.iov_base, .length = (uint32_t)iov.iov_len, .lkey = rq->ctx->mr->lkey, }; try_send_rdma_wr(rq, cl->peer_fd, sge, 2); } else if (op_list_size == 1) { try_send_rdma_wr(rq, cl->peer_fd, sge, 1); } else { printf("unexpected long send_list for opcode %lu: %lu entries\n", cur_op->req.hdr.opcode, cl->send_list.size()); exit(1); } cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+op_list_size); cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+op_list_size); // Post a receive request for the reply at the same time if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ || cur_op->req.hdr.opcode == OSD_OP_READ) { try_recv_rdma_read(cl, rq, cur_op, bs_bitmap_granularity); } else if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP) { assert(!cur_op->iov.count); // FIXME: hardcode #define clean_entry_bitmap_size 4 // Reply size is known uint64_t data_size = cur_op->req.sec_read_bmp.len / sizeof(obj_ver_id) * (8 + clean_entry_bitmap_size); cur_op->rmw_buf = malloc_or_die(data_size); sge[0] = { .addr = (uintptr_t)cur_op->reply.buf, .length = (uint32_t)OSD_PACKET_SIZE, .lkey = rq->ctx->mr->lkey, }; sge[1] = { .addr = (uintptr_t)cur_op->rmw_buf, .length = (uint32_t)data_size, .lkey = rq->ctx->mr->lkey, }; try_recv_rdma_wr(rq, cl->peer_fd, sge, 2); } else { // No reply or reply size is unknown sge[0] = { .addr = (uintptr_t)cur_op->reply.buf, .length = (uint32_t)OSD_PACKET_SIZE, .lkey = rq->ctx->mr->lkey, }; try_recv_rdma_wr(rq, cl->peer_fd, sge, 1); } } else { // Send reply to the same queue the operation came from. // Fragment it into parts no longer than (max_sge*4k) to always // be able to send and receive them correctly. int qi; for (qi = 0; qi < cl->rdma_queues.size() && cl->rdma_queues[qi]->cur_op != cur_op; qi++) {} if (qi >= cl->rdma_queues.size()) { printf("Unknown incoming operation for client %d\n", cl->peer_fd); exit(1); } // Pick all entries for the operation from the queue int op_list_size = 0; while (op_list_size < cl->outbox.size() && cl->outbox[op_list_size].op == cur_op) { op_list_size++; } auto rq = cl->rdma_queues[qi]; if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ || cur_op->req.hdr.opcode == OSD_OP_READ) { try_send_rdma_read(cl, rq, cur_op, op_list_size, bs_bitmap_granularity); rq->send_pos = 0; rq->send_buf_pos = 0; } else if (op_list_size == 1) { ibv_sge sge[1]; sge[0] = { .addr = (uintptr_t)cl->send_list[0].iov_base, .length = (uint32_t)cl->send_list[0].iov_len, .lkey = rq->ctx->mr->lkey, }; try_send_rdma_wr(rq, cl->peer_fd, sge, 1); } else if (op_list_size == 2) { ibv_sge sge[2]; sge[0] = { .addr = (uintptr_t)cl->send_list[0].iov_base, .length = (uint32_t)cl->send_list[0].iov_len, .lkey = rq->ctx->mr->lkey, }; sge[1] = { .addr = (uintptr_t)cl->send_list[1].iov_base, .length = (uint32_t)cl->send_list[1].iov_len, .lkey = rq->ctx->mr->lkey, }; if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP) try_send_rdma_wr(rq, cl->peer_fd, sge, 2); else { try_send_rdma_wr(rq, cl->peer_fd, sge, 1); try_send_rdma_wr(rq, cl->peer_fd, sge+1, 1); } } else if (op_list_size > 2) { printf("Unexpected long send_list for opcode %lu: %lu entries\n", cur_op->req.hdr.opcode, cl->send_list.size()); exit(1); } cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+op_list_size); cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+op_list_size); } } } // Try to receive an incoming operation via RDMA void osd_messenger_t::try_recv_rdma(osd_client_t *cl, msgr_rdma_connection_t *rc) { rc->cur_op = new osd_op_t; rc->cur_op->peer_fd = cl->peer_fd; rc->cur_op->op_type = OSD_OP_IN; rc->cur_op->buf = memalign_or_die(MEM_ALIGNMENT, 128*1024); // FIXME hardcode for tests ibv_sge sge[2]; sge[0] = { .addr = (uintptr_t)rc->cur_op->req.buf, .length = (uint32_t)OSD_PACKET_SIZE, .lkey = rc->ctx->mr->lkey, }; sge[1] = { .addr = (uintptr_t)rc->cur_op->buf, .length = (uint32_t)128*1024, .lkey = rc->ctx->mr->lkey, }; try_recv_rdma_wr(rc, cl->peer_fd, sge, 2); } #define RDMA_EVENTS_AT_ONCE 32 void osd_messenger_t::handle_rdma_events() { // Request next notification ibv_cq *ev_cq; void *ev_ctx; // FIXME: This is inefficient as it calls read()... timespec tv; if (ibv_get_cq_event(rdma_context->channel, &ev_cq, &ev_ctx) == 0) { ibv_ack_cq_events(rdma_context->cq, 1); } if (ibv_req_notify_cq(rdma_context->cq, 0) != 0) { printf("Failed to request RDMA completion notification, exiting\n"); exit(1); } ibv_wc wc[RDMA_EVENTS_AT_ONCE]; int event_count; do { event_count = ibv_poll_cq(rdma_context->cq, RDMA_EVENTS_AT_ONCE, wc); for (int i = 0; i < event_count; i++) { int client_id = wc[i].wr_id; bool is_send = wc[i].opcode == IBV_WC_SEND; auto cl_it = clients.find(client_id); if (cl_it == clients.end()) { continue; } osd_client_t *cl = cl_it->second; if (wc[i].status != IBV_WC_SUCCESS) { printf("RDMA work request failed for client %d", client_id); if (cl->osd_num) { printf(" (OSD %lu)", cl->osd_num); } printf(" with status: %s, stopping client\n", ibv_wc_status_str(wc[i].status)); stop_client(client_id); continue; } int q; for (q = 0; q < cl->rdma_queues.size() && cl->rdma_queues[q]->qp->qp_num != wc[i].qp_num; q++) {} if (q >= cl->rdma_queues.size()) { printf("Unknown queue %d for client %d\n", wc[i].qp_num, cl->peer_fd); exit(1); } auto rc = cl->rdma_queues[q]; if (is_send) { clock_gettime(CLOCK_REALTIME, &tv); printf("%lu.%09lu Done RDMA send on queue %d\n", tv.tv_sec, tv.tv_nsec, wc[i].qp_num); } else { clock_gettime(CLOCK_REALTIME, &tv); printf("%lu.%09lu Done RDMA recv on queue %d, %d bytes\n", tv.tv_sec, tv.tv_nsec, wc[i].qp_num, wc[i].byte_len); } if (!is_send) { rc->cur_recv--; if (!rc->cur_recv) { // Fucking shit... if (rc->cur_op->op_type == OSD_OP_IN) { if (wc[i].byte_len <= OSD_PACKET_SIZE) { free(rc->cur_op->buf); rc->cur_op->buf = NULL; } cl->received_ops.push_back(rc->cur_op); set_immediate.push_back([this, op = rc->cur_op]() { exec_op(op); }); } else /* if (rc->cur_op->op_type == OSD_OP_OUT) */ { if (rc->cur_op->reply.hdr.opcode == OSD_OP_SEC_READ || rc->cur_op->reply.hdr.opcode == OSD_OP_READ) { // Data is already received cl->sent_ops.erase(rc->cur_op->req.hdr.id); handle_reply_ready(rc->cur_op); rc->cur_op = NULL; try_send_rdma(cl); } else if (rc->cur_op->reply.hdr.opcode == OSD_OP_SEC_READ_BMP) { // Data is already received, but we need to switch buffers cl->sent_ops.erase(rc->cur_op->req.hdr.id); free(rc->cur_op->buf); rc->cur_op->buf = rc->cur_op->rmw_buf; handle_reply_ready(rc->cur_op); rc->cur_op = NULL; try_send_rdma(cl); } else if (rc->cur_op->reply.hdr.opcode == OSD_OP_SEC_LIST && rc->cur_op->reply.hdr.retval > 0 || rc->cur_op->reply.hdr.opcode == OSD_OP_SHOW_CONFIG && rc->cur_op->reply.hdr.retval > 0) { if (rc->recv_pos != 1) { // Data is not received yet (RNR) uint32_t len; if (rc->cur_op->reply.hdr.opcode == OSD_OP_SEC_LIST) len = sizeof(obj_ver_id) * rc->cur_op->reply.hdr.retval; else len = rc->cur_op->reply.hdr.retval; rc->cur_op->buf = malloc_or_die(len); ibv_sge sge[1]; sge[0] = { .addr = (uintptr_t)rc->cur_op->buf, .length = len, .lkey = rc->ctx->mr->lkey, }; try_recv_rdma_wr(rc, cl->peer_fd, sge, 1); rc->recv_pos = 1; } else { // Done cl->sent_ops.erase(rc->cur_op->req.hdr.id); handle_reply_ready(rc->cur_op); rc->cur_op = NULL; rc->recv_pos = 0; try_send_rdma(cl); } } else { // No data cl->sent_ops.erase(rc->cur_op->req.hdr.id); handle_reply_ready(rc->cur_op); rc->cur_op = NULL; try_send_rdma(cl); } } } } else { rc->cur_send--; if (!rc->cur_send) { if (rc->cur_op->op_type == OSD_OP_OUT) { // Nothing } else /* if (rc->cur_op->op_type == OSD_OP_IN) */ { // Reply fully sent delete rc->cur_op; rc->cur_op = NULL; // Post receive for the next incoming op try_recv_rdma(cl, rc); } } } } } while (event_count > 0); for (auto cb: set_immediate) { cb(); } set_immediate.clear(); } int osd_messenger_t::get_rdma_max_sge() { return rdma_max_sge; }