Compare commits
1 Commits
master
...
rdma-flow-
Author | SHA1 | Date |
---|---|---|
Vitaliy Filippov | 8aec63fddd |
|
@ -353,8 +353,10 @@ static void try_send_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge)
|
||||||
.wr_id = (uint64_t)(cl->peer_fd*2+1),
|
.wr_id = (uint64_t)(cl->peer_fd*2+1),
|
||||||
.sg_list = sge,
|
.sg_list = sge,
|
||||||
.num_sge = op_sge,
|
.num_sge = op_sge,
|
||||||
.opcode = IBV_WR_SEND,
|
.opcode = cl->rdma_conn->avail_recv > 0 ? IBV_WR_SEND_WITH_IMM : IBV_WR_SEND,
|
||||||
.send_flags = IBV_SEND_SIGNALED,
|
.send_flags = IBV_SEND_SIGNALED,
|
||||||
|
// Notify peer about our available incoming buffers
|
||||||
|
.imm_data = (uint32_t)cl->rdma_conn->avail_recv,
|
||||||
};
|
};
|
||||||
int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr);
|
int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr);
|
||||||
if (err || bad_wr)
|
if (err || bad_wr)
|
||||||
|
@ -363,15 +365,27 @@ static void try_send_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge)
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
cl->rdma_conn->cur_send++;
|
cl->rdma_conn->cur_send++;
|
||||||
|
cl->rdma_conn->avail_send--;
|
||||||
|
cl->rdma_conn->avail_recv = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
|
bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
|
||||||
{
|
{
|
||||||
auto rc = cl->rdma_conn;
|
auto rc = cl->rdma_conn;
|
||||||
if (!cl->send_list.size() || rc->cur_send >= rc->max_send)
|
if (rc->cur_send >= rc->max_send)
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
if (!cl->send_list.size() || rc->use_flow_control && rc->avail_send <= 0)
|
||||||
|
{
|
||||||
|
if (rc->avail_recv)
|
||||||
|
{
|
||||||
|
// Only notify about available buffers so 2 peers don't lock each other
|
||||||
|
rc->send_sizes.push_back(0);
|
||||||
|
try_send_rdma_wr(cl, NULL, 0);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
uint64_t op_size = 0, op_sge = 0;
|
uint64_t op_size = 0, op_sge = 0;
|
||||||
ibv_sge sge[rc->max_sge];
|
ibv_sge sge[rc->max_sge];
|
||||||
while (rc->send_pos < cl->send_list.size())
|
while (rc->send_pos < cl->send_list.size())
|
||||||
|
@ -431,6 +445,7 @@ static void try_recv_rdma_wr(osd_client_t *cl, void *buf)
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
cl->rdma_conn->cur_recv++;
|
cl->rdma_conn->cur_recv++;
|
||||||
|
cl->rdma_conn->avail_recv++;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
|
bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
|
||||||
|
@ -492,6 +507,11 @@ void osd_messenger_t::handle_rdma_events()
|
||||||
if (!is_send)
|
if (!is_send)
|
||||||
{
|
{
|
||||||
rc->cur_recv--;
|
rc->cur_recv--;
|
||||||
|
if ((wc[i].wc_flags & IBV_WC_WITH_IMM) && wc[i].imm_data > 0)
|
||||||
|
{
|
||||||
|
rc->avail_send += wc[i].imm_data;
|
||||||
|
rc->use_flow_control = true;
|
||||||
|
}
|
||||||
if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf], wc[i].byte_len))
|
if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf], wc[i].byte_len))
|
||||||
{
|
{
|
||||||
// handle_read_buffer may stop the client
|
// handle_read_buffer may stop the client
|
||||||
|
@ -499,6 +519,7 @@ void osd_messenger_t::handle_rdma_events()
|
||||||
}
|
}
|
||||||
try_recv_rdma_wr(cl, rc->recv_buffers[rc->next_recv_buf]);
|
try_recv_rdma_wr(cl, rc->recv_buffers[rc->next_recv_buf]);
|
||||||
rc->next_recv_buf = (rc->next_recv_buf+1) % rc->recv_buffers.size();
|
rc->next_recv_buf = (rc->next_recv_buf+1) % rc->recv_buffers.size();
|
||||||
|
try_send_rdma(cl);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
|
@ -45,7 +45,8 @@ struct msgr_rdma_connection_t
|
||||||
ibv_qp *qp = NULL;
|
ibv_qp *qp = NULL;
|
||||||
msgr_rdma_address_t addr;
|
msgr_rdma_address_t addr;
|
||||||
int max_send = 0, max_recv = 0, max_sge = 0;
|
int max_send = 0, max_recv = 0, max_sge = 0;
|
||||||
int cur_send = 0, cur_recv = 0;
|
int cur_send = 0, cur_recv = 0, avail_recv = 0, avail_send = 0;
|
||||||
|
bool use_flow_control = false;
|
||||||
uint64_t max_msg = 0;
|
uint64_t max_msg = 0;
|
||||||
|
|
||||||
int send_pos = 0, send_buf_pos = 0;
|
int send_pos = 0, send_buf_pos = 0;
|
||||||
|
|
Loading…
Reference in New Issue