forked from vitalif/vitastor
Allow to send more than 1 operation at a time
parent
138ffe4032
commit
c3e80abad7
|
@ -70,9 +70,9 @@ const etcd_tree = {
|
||||||
rdma_gid_index: 0,
|
rdma_gid_index: 0,
|
||||||
rdma_mtu: 4096,
|
rdma_mtu: 4096,
|
||||||
rdma_max_sge: 128,
|
rdma_max_sge: 128,
|
||||||
rdma_max_send: 32,
|
rdma_max_send: 64,
|
||||||
rdma_max_recv: 8,
|
rdma_max_recv: 128,
|
||||||
rdma_max_msg: 1048576,
|
rdma_max_msg: 132096,
|
||||||
log_level: 0,
|
log_level: 0,
|
||||||
block_size: 131072,
|
block_size: 131072,
|
||||||
disk_alignment: 4096,
|
disk_alignment: 4096,
|
||||||
|
|
|
@ -157,7 +157,7 @@ void osd_messenger_t::parse_config(const json11::Json & config)
|
||||||
this->rdma_max_sge = 128;
|
this->rdma_max_sge = 128;
|
||||||
this->rdma_max_send = config["rdma_max_send"].uint64_value();
|
this->rdma_max_send = config["rdma_max_send"].uint64_value();
|
||||||
if (!this->rdma_max_send)
|
if (!this->rdma_max_send)
|
||||||
this->rdma_max_send = 1;
|
this->rdma_max_send = 64;
|
||||||
this->rdma_max_recv = config["rdma_max_recv"].uint64_value();
|
this->rdma_max_recv = config["rdma_max_recv"].uint64_value();
|
||||||
if (!this->rdma_max_recv)
|
if (!this->rdma_max_recv)
|
||||||
this->rdma_max_recv = 128;
|
this->rdma_max_recv = 128;
|
||||||
|
|
|
@ -368,9 +368,8 @@ static void try_send_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge)
|
||||||
bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
|
bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
|
||||||
{
|
{
|
||||||
auto rc = cl->rdma_conn;
|
auto rc = cl->rdma_conn;
|
||||||
if (!cl->send_list.size() || rc->cur_send > 0)
|
if (!cl->send_list.size() || rc->cur_send >= rc->max_send)
|
||||||
{
|
{
|
||||||
// Only send one batch at a time
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
uint64_t op_size = 0, op_sge = 0;
|
uint64_t op_size = 0, op_sge = 0;
|
||||||
|
@ -380,6 +379,7 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
|
||||||
iovec & iov = cl->send_list[rc->send_pos];
|
iovec & iov = cl->send_list[rc->send_pos];
|
||||||
if (op_size >= rc->max_msg || op_sge >= rc->max_sge)
|
if (op_size >= rc->max_msg || op_sge >= rc->max_sge)
|
||||||
{
|
{
|
||||||
|
rc->send_sizes.push_back(op_size);
|
||||||
try_send_rdma_wr(cl, sge, op_sge);
|
try_send_rdma_wr(cl, sge, op_sge);
|
||||||
op_sge = 0;
|
op_sge = 0;
|
||||||
op_size = 0;
|
op_size = 0;
|
||||||
|
@ -405,6 +405,7 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
|
||||||
}
|
}
|
||||||
if (op_sge > 0)
|
if (op_sge > 0)
|
||||||
{
|
{
|
||||||
|
rc->send_sizes.push_back(op_size);
|
||||||
try_send_rdma_wr(cl, sge, op_sge);
|
try_send_rdma_wr(cl, sge, op_sge);
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
@ -476,6 +477,7 @@ void osd_messenger_t::handle_rdma_events()
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
osd_client_t *cl = cl_it->second;
|
osd_client_t *cl = cl_it->second;
|
||||||
|
auto rc = cl->rdma_conn;
|
||||||
if (wc[i].status != IBV_WC_SUCCESS)
|
if (wc[i].status != IBV_WC_SUCCESS)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "RDMA work request failed for client %d", client_id);
|
fprintf(stderr, "RDMA work request failed for client %d", client_id);
|
||||||
|
@ -489,43 +491,59 @@ void osd_messenger_t::handle_rdma_events()
|
||||||
}
|
}
|
||||||
if (!is_send)
|
if (!is_send)
|
||||||
{
|
{
|
||||||
cl->rdma_conn->cur_recv--;
|
rc->cur_recv--;
|
||||||
if (!handle_read_buffer(cl, cl->rdma_conn->recv_buffers[cl->rdma_conn->next_recv_buf], wc[i].byte_len))
|
if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf], wc[i].byte_len))
|
||||||
{
|
{
|
||||||
// handle_read_buffer may stop the client
|
// handle_read_buffer may stop the client
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
try_recv_rdma_wr(cl, cl->rdma_conn->recv_buffers[cl->rdma_conn->next_recv_buf]);
|
try_recv_rdma_wr(cl, rc->recv_buffers[rc->next_recv_buf]);
|
||||||
cl->rdma_conn->next_recv_buf = (cl->rdma_conn->next_recv_buf+1) % cl->rdma_conn->recv_buffers.size();
|
rc->next_recv_buf = (rc->next_recv_buf+1) % rc->recv_buffers.size();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
cl->rdma_conn->cur_send--;
|
rc->cur_send--;
|
||||||
if (!cl->rdma_conn->cur_send)
|
uint64_t sent_size = rc->send_sizes.at(0);
|
||||||
|
rc->send_sizes.erase(rc->send_sizes.begin(), rc->send_sizes.begin()+1);
|
||||||
|
int send_pos = 0, send_buf_pos = 0;
|
||||||
|
while (sent_size > 0)
|
||||||
{
|
{
|
||||||
// Wait for the whole batch
|
if (sent_size >= cl->send_list.at(send_pos).iov_len)
|
||||||
for (int i = 0; i < cl->rdma_conn->send_pos; i++)
|
|
||||||
{
|
{
|
||||||
if (cl->outbox[i].flags & MSGR_SENDP_FREE)
|
sent_size -= cl->send_list[send_pos].iov_len;
|
||||||
{
|
send_pos++;
|
||||||
// Reply fully sent
|
|
||||||
delete cl->outbox[i].op;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (cl->rdma_conn->send_pos > 0)
|
else
|
||||||
{
|
{
|
||||||
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+cl->rdma_conn->send_pos);
|
send_buf_pos = sent_size;
|
||||||
cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+cl->rdma_conn->send_pos);
|
sent_size = 0;
|
||||||
cl->rdma_conn->send_pos = 0;
|
|
||||||
}
|
}
|
||||||
if (cl->rdma_conn->send_buf_pos > 0)
|
|
||||||
{
|
|
||||||
cl->send_list[0].iov_base = (uint8_t*)cl->send_list[0].iov_base + cl->rdma_conn->send_buf_pos;
|
|
||||||
cl->send_list[0].iov_len -= cl->rdma_conn->send_buf_pos;
|
|
||||||
cl->rdma_conn->send_buf_pos = 0;
|
|
||||||
}
|
|
||||||
try_send_rdma(cl);
|
|
||||||
}
|
}
|
||||||
|
assert(rc->send_pos >= send_pos);
|
||||||
|
if (rc->send_pos == send_pos)
|
||||||
|
{
|
||||||
|
rc->send_buf_pos -= send_buf_pos;
|
||||||
|
}
|
||||||
|
rc->send_pos -= send_pos;
|
||||||
|
for (int i = 0; i < send_pos; i++)
|
||||||
|
{
|
||||||
|
if (cl->outbox[i].flags & MSGR_SENDP_FREE)
|
||||||
|
{
|
||||||
|
// Reply fully sent
|
||||||
|
delete cl->outbox[i].op;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (send_pos > 0)
|
||||||
|
{
|
||||||
|
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+send_pos);
|
||||||
|
cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+send_pos);
|
||||||
|
}
|
||||||
|
if (send_buf_pos > 0)
|
||||||
|
{
|
||||||
|
cl->send_list[0].iov_base = (uint8_t*)cl->send_list[0].iov_base + send_buf_pos;
|
||||||
|
cl->send_list[0].iov_len -= send_buf_pos;
|
||||||
|
}
|
||||||
|
try_send_rdma(cl);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} while (event_count > 0);
|
} while (event_count > 0);
|
||||||
|
|
|
@ -51,6 +51,7 @@ struct msgr_rdma_connection_t
|
||||||
int send_pos = 0, send_buf_pos = 0;
|
int send_pos = 0, send_buf_pos = 0;
|
||||||
int next_recv_buf = 0;
|
int next_recv_buf = 0;
|
||||||
std::vector<void*> recv_buffers;
|
std::vector<void*> recv_buffers;
|
||||||
|
std::vector<uint64_t> send_sizes;
|
||||||
|
|
||||||
~msgr_rdma_connection_t();
|
~msgr_rdma_connection_t();
|
||||||
static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge, uint32_t max_msg);
|
static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge, uint32_t max_msg);
|
||||||
|
|
Loading…
Reference in New Issue