forked from vitalif/vitastor
Compare commits
3 Commits
master
...
rdma-zeroc
Author | SHA1 | Date |
---|---|---|
Vitaliy Filippov | eb9fc274e8 | |
Vitaliy Filippov | 9681b62204 | |
Vitaliy Filippov | 8faf8f7b58 |
|
@ -127,7 +127,7 @@ static struct fio_option options[] = {
|
|||
},
|
||||
{
|
||||
.name = "use_rdma",
|
||||
.lname = "OSD trace",
|
||||
.lname = "Use RDMA",
|
||||
.type = FIO_OPT_BOOL,
|
||||
.off1 = offsetof(struct sec_options, use_rdma),
|
||||
.help = "Use RDMA",
|
||||
|
@ -135,6 +135,16 @@ static struct fio_option options[] = {
|
|||
.category = FIO_OPT_C_ENGINE,
|
||||
.group = FIO_OPT_G_FILENAME,
|
||||
},
|
||||
{
|
||||
.name = "rdma_gid_index",
|
||||
.lname = "RDMA gid index",
|
||||
.type = FIO_OPT_INT,
|
||||
.off1 = offsetof(struct sec_options, rdma_gid_index),
|
||||
.help = "RDMA gid index",
|
||||
.def = "0",
|
||||
.category = FIO_OPT_C_ENGINE,
|
||||
.group = FIO_OPT_G_FILENAME,
|
||||
},
|
||||
{
|
||||
.name = NULL,
|
||||
},
|
||||
|
@ -171,6 +181,7 @@ static int sec_setup(struct thread_data *td)
|
|||
{ "etcd_prefix", std::string(o->etcd_prefix ? o->etcd_prefix : "/vitastor") },
|
||||
{ "log_level", o->cluster_log },
|
||||
{ "use_rdma", o->use_rdma },
|
||||
{ "rdma_gid_index", o->rdma_gid_index },
|
||||
};
|
||||
|
||||
if (!o->image)
|
||||
|
|
|
@ -25,6 +25,8 @@ void osd_messenger_t::init()
|
|||
}
|
||||
else
|
||||
{
|
||||
rdma_max_sge = rdma_max_sge < rdma_context->attrx.orig_attr.max_sge
|
||||
? rdma_max_sge : rdma_context->attrx.orig_attr.max_sge;
|
||||
printf("[OSD %lu] RDMA initialized successfully\n", osd_num);
|
||||
fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK);
|
||||
tfd->set_fd_handler(rdma_context->channel->fd, false, [this](int notify_fd, int epoll_events)
|
||||
|
@ -356,9 +358,6 @@ void osd_messenger_t::on_connect_peer(osd_num_t peer_osd, int peer_fd)
|
|||
|
||||
void osd_messenger_t::check_peer_config(osd_client_t *cl)
|
||||
{
|
||||
#ifdef WITH_RDMA
|
||||
msgr_rdma_connection_t *rdma_conn = NULL;
|
||||
#endif
|
||||
osd_op_t *op = new osd_op_t();
|
||||
op->op_type = OSD_OP_OUT;
|
||||
op->peer_fd = cl->peer_fd;
|
||||
|
@ -374,11 +373,25 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
|
|||
#ifdef WITH_RDMA
|
||||
if (rdma_context)
|
||||
{
|
||||
cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, max_rdma_send, max_rdma_recv, max_rdma_sge);
|
||||
if (cl->rdma_conn)
|
||||
for (int i = 0; i < rdma_queues_per_connection; i++)
|
||||
{
|
||||
auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge);
|
||||
if (!rdma_conn)
|
||||
{
|
||||
break;
|
||||
}
|
||||
cl->rdma_queues.push_back(rdma_conn);
|
||||
}
|
||||
if (cl->rdma_queues.size())
|
||||
{
|
||||
json11::Json::array addresses;
|
||||
for (auto rdma_conn: cl->rdma_queues)
|
||||
{
|
||||
addresses.push_back(rdma_conn->addr.to_string());
|
||||
}
|
||||
json11::Json payload = json11::Json::object {
|
||||
{ "connect_rdma", cl->rdma_conn->addr.to_string() },
|
||||
{ "rdma_queues", addresses },
|
||||
{ "rdma_max_sge", rdma_max_sge },
|
||||
};
|
||||
std::string payload_str = payload.dump();
|
||||
op->req.show_conf.json_len = payload_str.size();
|
||||
|
@ -388,11 +401,7 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
|
|||
}
|
||||
}
|
||||
#endif
|
||||
op->callback = [this, cl
|
||||
#ifdef WITH_RDMA
|
||||
, rdma_conn
|
||||
#endif
|
||||
](osd_op_t *op)
|
||||
op->callback = [this, cl](osd_op_t *op)
|
||||
{
|
||||
std::string json_err;
|
||||
json11::Json config;
|
||||
|
@ -434,34 +443,12 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
|
|||
return;
|
||||
}
|
||||
#ifdef WITH_RDMA
|
||||
if (config["rdma_address"].is_string())
|
||||
if (!connect_rdma_server(cl, config["rdma_queues"], config["rdma_max_sge"].uint64_value()))
|
||||
{
|
||||
msgr_rdma_address_t addr;
|
||||
if (!msgr_rdma_address_t::from_string(config["rdma_address"].string_value().c_str(), &addr) ||
|
||||
cl->rdma_conn->connect(&addr) != 0)
|
||||
{
|
||||
printf(
|
||||
"Failed to connect to OSD %lu (address %s) using RDMA\n",
|
||||
cl->osd_num, config["rdma_address"].string_value().c_str()
|
||||
);
|
||||
delete cl->rdma_conn;
|
||||
cl->rdma_conn = NULL;
|
||||
// FIXME: Keep TCP connection in this case
|
||||
osd_num_t peer_osd = cl->osd_num;
|
||||
stop_client(cl->peer_fd);
|
||||
on_connect_peer(peer_osd, -1);
|
||||
delete op;
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
printf("Connected to OSD %lu using RDMA\n", cl->osd_num);
|
||||
cl->peer_state = PEER_RDMA;
|
||||
tfd->set_fd_handler(cl->peer_fd, false, NULL);
|
||||
// Add the initial receive request
|
||||
try_recv_rdma(cl);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
osd_peer_fds[cl->osd_num] = cl->peer_fd;
|
||||
on_connect_peer(cl->osd_num, cl->peer_fd);
|
||||
|
|
|
@ -40,6 +40,7 @@
|
|||
|
||||
#define MSGR_SENDP_HDR 1
|
||||
#define MSGR_SENDP_FREE 2
|
||||
#define MSGR_SENDP_BMP 4
|
||||
|
||||
struct msgr_sendp_t
|
||||
{
|
||||
|
@ -63,7 +64,7 @@ struct osd_client_t
|
|||
void *in_buf = NULL;
|
||||
|
||||
#ifdef WITH_RDMA
|
||||
msgr_rdma_connection_t *rdma_conn = NULL;
|
||||
std::vector<msgr_rdma_connection_t*> rdma_queues;
|
||||
#endif
|
||||
|
||||
// Read state
|
||||
|
@ -137,7 +138,8 @@ protected:
|
|||
std::string rdma_device;
|
||||
uint64_t rdma_port_num = 1, rdma_gid_index = 0, rdma_mtu = 0;
|
||||
msgr_rdma_context_t *rdma_context = NULL;
|
||||
int max_rdma_sge = 128, max_rdma_send = 32, max_rdma_recv = 32;
|
||||
int rdma_queues_per_connection = 128;
|
||||
int rdma_max_sge = 128, rdma_max_send = 32, rdma_max_recv = 32;
|
||||
#endif
|
||||
|
||||
std::vector<int> read_ready_clients;
|
||||
|
@ -170,7 +172,8 @@ public:
|
|||
|
||||
#ifdef WITH_RDMA
|
||||
bool is_rdma_enabled();
|
||||
bool connect_rdma(int peer_fd, std::string rdma_address);
|
||||
bool connect_rdma_client(osd_client_t *cl, json11::Json rdma_addresses, uint64_t client_max_sge);
|
||||
int get_rdma_max_sge();
|
||||
#endif
|
||||
|
||||
protected:
|
||||
|
@ -194,8 +197,9 @@ protected:
|
|||
void handle_reply_ready(osd_op_t *op);
|
||||
|
||||
#ifdef WITH_RDMA
|
||||
bool try_send_rdma(osd_client_t *cl);
|
||||
bool try_recv_rdma(osd_client_t *cl);
|
||||
void try_send_rdma(osd_client_t *cl);
|
||||
void try_recv_rdma(osd_client_t *cl, msgr_rdma_connection_t *rc);
|
||||
void handle_rdma_events();
|
||||
bool connect_rdma_server(osd_client_t *cl, json11::Json rdma_addresses, uint64_t server_max_sge);
|
||||
#endif
|
||||
};
|
||||
|
|
|
@ -293,75 +293,237 @@ int msgr_rdma_connection_t::connect(msgr_rdma_address_t *dest)
|
|||
return 0;
|
||||
}
|
||||
|
||||
bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address)
|
||||
// Being the client, connect all server's RDMA queues to our local (client) queues
|
||||
bool osd_messenger_t::connect_rdma_server(osd_client_t *cl, json11::Json rdma_addresses, uint64_t server_max_sge)
|
||||
{
|
||||
if (rdma_addresses.array_items().size() > 0)
|
||||
{
|
||||
if (!server_max_sge || server_max_sge > rdma_max_sge)
|
||||
{
|
||||
server_max_sge = rdma_max_sge;
|
||||
}
|
||||
int n_conn = rdma_addresses.array_items().size();
|
||||
if (n_conn < cl->rdma_queues.size())
|
||||
{
|
||||
for (int i = n_conn; i < cl->rdma_queues.size(); i++)
|
||||
{
|
||||
delete cl->rdma_queues[i];
|
||||
}
|
||||
cl->rdma_queues.resize(n_conn);
|
||||
}
|
||||
else if (n_conn > cl->rdma_queues.size())
|
||||
{
|
||||
n_conn = cl->rdma_queues.size();
|
||||
}
|
||||
for (int i = 0; i < n_conn; i++)
|
||||
{
|
||||
// Try to connect to the peer using RDMA
|
||||
msgr_rdma_address_t addr;
|
||||
if (msgr_rdma_address_t::from_string(rdma_address.c_str(), &addr))
|
||||
if (!msgr_rdma_address_t::from_string(rdma_addresses[i].string_value().c_str(), &addr) ||
|
||||
cl->rdma_queues[i]->connect(&addr) != 0)
|
||||
{
|
||||
auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, max_rdma_send, max_rdma_recv, max_rdma_sge);
|
||||
if (rdma_conn)
|
||||
{
|
||||
int r = rdma_conn->connect(&addr);
|
||||
if (r != 0)
|
||||
{
|
||||
delete rdma_conn;
|
||||
printf(
|
||||
"Failed to connect RDMA queue pair to %s (client %d)\n",
|
||||
addr.to_string().c_str(), peer_fd
|
||||
"Failed to connect to OSD %lu (address %s) using RDMA\n",
|
||||
cl->osd_num, rdma_addresses[i].string_value().c_str()
|
||||
);
|
||||
// FIXME: Keep TCP connection in this case
|
||||
osd_num_t peer_osd = cl->osd_num;
|
||||
stop_client(cl->peer_fd);
|
||||
on_connect_peer(peer_osd, -1);
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Remember connection, but switch to RDMA only after sending the configuration response
|
||||
auto cl = clients.at(peer_fd);
|
||||
cl->rdma_conn = rdma_conn;
|
||||
cl->peer_state = PEER_RDMA_CONNECTING;
|
||||
printf("Connected local queue %d to OSD %lu queue %d using RDMA\n", cl->rdma_queues[i]->qp->qp_num, cl->osd_num, addr.qpn);
|
||||
if (cl->rdma_queues[i]->max_sge > server_max_sge)
|
||||
{
|
||||
cl->rdma_queues[i]->max_sge = server_max_sge;
|
||||
}
|
||||
}
|
||||
}
|
||||
cl->peer_state = PEER_RDMA;
|
||||
tfd->set_fd_handler(cl->peer_fd, false, NULL);
|
||||
}
|
||||
else
|
||||
{
|
||||
for (auto rdma_conn: cl->rdma_queues)
|
||||
{
|
||||
delete rdma_conn;
|
||||
}
|
||||
cl->rdma_queues.resize(0);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// Being the server, try to connect all client's RDMA queues to our local (server) queues
|
||||
bool osd_messenger_t::connect_rdma_client(osd_client_t *cl, json11::Json rdma_addresses, uint64_t client_max_sge)
|
||||
{
|
||||
if (rdma_addresses.array_items().size() > 0)
|
||||
{
|
||||
if (!client_max_sge || client_max_sge > rdma_max_sge)
|
||||
{
|
||||
client_max_sge = rdma_max_sge;
|
||||
}
|
||||
int n_conn = rdma_addresses.array_items().size();
|
||||
if (n_conn > rdma_queues_per_connection)
|
||||
{
|
||||
n_conn = rdma_queues_per_connection;
|
||||
}
|
||||
for (int i = 0; i < n_conn; i++)
|
||||
{
|
||||
msgr_rdma_address_t addr;
|
||||
if (msgr_rdma_address_t::from_string(rdma_addresses[i].string_value().c_str(), &addr))
|
||||
{
|
||||
auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, client_max_sge);
|
||||
if (rdma_conn && rdma_conn->connect(&addr) == 0)
|
||||
{
|
||||
printf("Connected local queue %d to client %d queue %d using RDMA\n", rdma_conn->qp->qp_num, cl->peer_fd, addr.qpn);
|
||||
cl->rdma_queues.push_back(rdma_conn);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (rdma_conn)
|
||||
{
|
||||
delete rdma_conn;
|
||||
}
|
||||
printf(
|
||||
"Failed to connect RDMA queue pair to %s (client %d queue %d)\n",
|
||||
addr.to_string().c_str(), cl->peer_fd, i+1
|
||||
);
|
||||
// Delete all RDMA queues to keep the TCP connection
|
||||
for (int j = 0; j < cl->rdma_queues.size(); j++)
|
||||
{
|
||||
delete cl->rdma_queues[j];
|
||||
}
|
||||
cl->rdma_queues.resize(0);
|
||||
return false;
|
||||
}
|
||||
|
||||
static void try_send_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge)
|
||||
}
|
||||
}
|
||||
// Switch to RDMA state only after sending the configuration response
|
||||
cl->peer_state = PEER_RDMA_CONNECTING;
|
||||
for (int i = 0; i < cl->rdma_queues.size(); i++)
|
||||
{
|
||||
try_recv_rdma(cl, cl->rdma_queues[i]);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
static void try_send_rdma_wr(msgr_rdma_connection_t *rc, uint64_t wr_id, ibv_sge *sge, int op_sge)
|
||||
{
|
||||
timespec tv;
|
||||
clock_gettime(CLOCK_REALTIME, &tv);
|
||||
uint64_t total = 0;
|
||||
for (int i = 0; i < op_sge; i++)
|
||||
total += sge[i].length;
|
||||
printf("%lu.%09lu RDMA send to queue %d: %lu bytes\n", tv.tv_sec, tv.tv_nsec, rc->qp->qp_num, total);
|
||||
ibv_send_wr *bad_wr = NULL;
|
||||
ibv_send_wr wr = {
|
||||
.wr_id = (uint64_t)(cl->peer_fd*2+1),
|
||||
.wr_id = wr_id,
|
||||
.sg_list = sge,
|
||||
.num_sge = op_sge,
|
||||
.opcode = IBV_WR_SEND,
|
||||
.send_flags = IBV_SEND_SIGNALED,
|
||||
};
|
||||
int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr);
|
||||
int err = ibv_post_send(rc->qp, &wr, &bad_wr);
|
||||
if (err || bad_wr)
|
||||
{
|
||||
printf("RDMA send failed: %s\n", strerror(err));
|
||||
exit(1);
|
||||
}
|
||||
cl->rdma_conn->cur_send++;
|
||||
rc->cur_send++;
|
||||
}
|
||||
|
||||
bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
|
||||
static void try_recv_rdma_wr(msgr_rdma_connection_t *rc, uint64_t wr_id, ibv_sge *sge, int op_sge)
|
||||
{
|
||||
auto rc = cl->rdma_conn;
|
||||
if (!cl->send_list.size() || rc->cur_send > 0)
|
||||
timespec tv;
|
||||
clock_gettime(CLOCK_REALTIME, &tv);
|
||||
uint64_t total = 0;
|
||||
for (int i = 0; i < op_sge; i++)
|
||||
total += sge[i].length;
|
||||
printf("%lu.%09lu RDMA receive from queue %d: %lu bytes\n", tv.tv_sec, tv.tv_nsec, rc->qp->qp_num, total);
|
||||
ibv_recv_wr *bad_wr = NULL;
|
||||
ibv_recv_wr wr = {
|
||||
.wr_id = wr_id,
|
||||
.sg_list = sge,
|
||||
.num_sge = op_sge,
|
||||
};
|
||||
int err = ibv_post_recv(rc->qp, &wr, &bad_wr);
|
||||
if (err || bad_wr)
|
||||
{
|
||||
// Only send one batch at a time
|
||||
printf("RDMA receive failed: %s\n", strerror(err));
|
||||
exit(1);
|
||||
}
|
||||
rc->cur_recv++;
|
||||
}
|
||||
|
||||
static bool try_recv_rdma_read(osd_client_t *cl, msgr_rdma_connection_t *rc, osd_op_t *cur_op, uint32_t bs_bitmap_granularity)
|
||||
{
|
||||
int op_size = bs_bitmap_granularity, op_sge = 1, op_max = rc->max_sge*bs_bitmap_granularity;
|
||||
iovec *segments = cur_op->iov.get_iovec();
|
||||
ibv_sge sge[rc->max_sge];
|
||||
sge[0] = {
|
||||
.addr = (uintptr_t)cur_op->reply.buf,
|
||||
.length = (uint32_t)OSD_PACKET_SIZE,
|
||||
.lkey = rc->ctx->mr->lkey,
|
||||
};
|
||||
while (rc->recv_pos < cur_op->iov.get_size())
|
||||
{
|
||||
iovec & iov = segments[rc->recv_pos];
|
||||
if (op_size >= op_max || op_sge >= rc->max_sge)
|
||||
{
|
||||
try_recv_rdma_wr(rc, cl->peer_fd, sge, op_sge);
|
||||
op_sge = 0;
|
||||
op_size = 0;
|
||||
if (rc->cur_recv >= rc->max_recv)
|
||||
{
|
||||
// FIXME
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
// Receive in (max_sge*4k) fragments
|
||||
uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->recv_buf_pos < op_max
|
||||
? iov.iov_len-rc->recv_buf_pos : op_max-op_size);
|
||||
sge[op_sge++] = {
|
||||
.addr = (uintptr_t)(iov.iov_base+rc->recv_buf_pos),
|
||||
.length = len,
|
||||
.lkey = rc->ctx->mr->lkey,
|
||||
};
|
||||
op_size += len;
|
||||
rc->recv_buf_pos += len;
|
||||
if (rc->recv_buf_pos >= iov.iov_len)
|
||||
{
|
||||
rc->recv_pos++;
|
||||
rc->recv_buf_pos = 0;
|
||||
}
|
||||
}
|
||||
if (op_sge > 0)
|
||||
{
|
||||
try_recv_rdma_wr(rc, cl->peer_fd, sge, op_sge);
|
||||
}
|
||||
rc->recv_pos = 0;
|
||||
rc->recv_buf_pos = 0;
|
||||
return true;
|
||||
}
|
||||
int op_size = 0, op_sge = 0, op_max = rc->max_sge*bs_bitmap_granularity;
|
||||
// FIXME: rc->max_sge should be negotiated between client & server
|
||||
|
||||
static bool try_send_rdma_read(osd_client_t *cl, msgr_rdma_connection_t *rc, osd_op_t *cur_op, int op_list_size, uint32_t bs_bitmap_granularity)
|
||||
{
|
||||
ibv_sge sge[rc->max_sge];
|
||||
while (rc->send_pos < cl->send_list.size())
|
||||
int op_size = bs_bitmap_granularity, op_sge = 1, op_max = rc->max_sge*bs_bitmap_granularity;
|
||||
sge[0] = {
|
||||
.addr = (uintptr_t)cl->send_list[0].iov_base,
|
||||
.length = (uint32_t)cl->send_list[0].iov_len,
|
||||
.lkey = rc->ctx->mr->lkey,
|
||||
};
|
||||
rc->send_pos = 1;
|
||||
while (rc->send_pos < op_list_size)
|
||||
{
|
||||
iovec & iov = cl->send_list[rc->send_pos];
|
||||
if (cl->outbox[rc->send_pos].flags & MSGR_SENDP_HDR)
|
||||
{
|
||||
if (op_sge > 0)
|
||||
{
|
||||
try_send_rdma_wr(cl, sge, op_sge);
|
||||
try_send_rdma_wr(rc, cl->peer_fd, sge, op_sge);
|
||||
op_sge = 0;
|
||||
op_size = 0;
|
||||
if (rc->cur_send >= rc->max_send)
|
||||
|
@ -373,7 +535,7 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
|
|||
.length = (uint32_t)iov.iov_len,
|
||||
.lkey = rc->ctx->mr->lkey,
|
||||
};
|
||||
try_send_rdma_wr(cl, sge, 1);
|
||||
try_send_rdma_wr(rc, cl->peer_fd, sge, 1);
|
||||
rc->send_pos++;
|
||||
if (rc->cur_send >= rc->max_send)
|
||||
break;
|
||||
|
@ -382,7 +544,7 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
|
|||
{
|
||||
if (op_size >= op_max || op_sge >= rc->max_sge)
|
||||
{
|
||||
try_send_rdma_wr(cl, sge, op_sge);
|
||||
try_send_rdma_wr(rc, cl->peer_fd, sge, op_sge);
|
||||
op_sge = 0;
|
||||
op_size = 0;
|
||||
if (rc->cur_send >= rc->max_send)
|
||||
|
@ -407,82 +569,224 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
|
|||
}
|
||||
if (op_sge > 0)
|
||||
{
|
||||
try_send_rdma_wr(cl, sge, op_sge);
|
||||
try_send_rdma_wr(rc, cl->peer_fd, sge, op_sge);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
static void try_recv_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge)
|
||||
if (op_list_size == 1)
|
||||
{
|
||||
ibv_recv_wr *bad_wr = NULL;
|
||||
ibv_recv_wr wr = {
|
||||
.wr_id = (uint64_t)(cl->peer_fd*2),
|
||||
.sg_list = sge,
|
||||
.num_sge = op_sge,
|
||||
};
|
||||
int err = ibv_post_recv(cl->rdma_conn->qp, &wr, &bad_wr);
|
||||
if (err || bad_wr)
|
||||
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ ||
|
||||
cur_op->req.hdr.opcode == OSD_OP_READ)
|
||||
{
|
||||
printf("RDMA receive failed: %s\n", strerror(err));
|
||||
exit(1);
|
||||
}
|
||||
cl->rdma_conn->cur_recv++;
|
||||
}
|
||||
|
||||
bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
|
||||
{
|
||||
auto rc = cl->rdma_conn;
|
||||
if (rc->cur_recv > 0)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
if (!cl->recv_list.get_size())
|
||||
{
|
||||
cl->recv_list.reset();
|
||||
cl->read_op = new osd_op_t;
|
||||
cl->read_op->peer_fd = cl->peer_fd;
|
||||
cl->read_op->op_type = OSD_OP_IN;
|
||||
cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE);
|
||||
cl->read_remaining = OSD_PACKET_SIZE;
|
||||
cl->read_state = CL_READ_HDR;
|
||||
}
|
||||
int op_size = 0, op_sge = 0, op_max = rc->max_sge*bs_bitmap_granularity;
|
||||
iovec *segments = cl->recv_list.get_iovec();
|
||||
// FIXME: rc->max_sge should be negotiated between client & server
|
||||
ibv_sge sge[rc->max_sge];
|
||||
while (rc->recv_pos < cl->recv_list.get_size())
|
||||
{
|
||||
iovec & iov = segments[rc->recv_pos];
|
||||
if (op_size >= op_max || op_sge >= rc->max_sge)
|
||||
{
|
||||
try_recv_rdma_wr(cl, sge, op_sge);
|
||||
op_sge = 0;
|
||||
op_size = 0;
|
||||
if (rc->cur_recv >= rc->max_recv)
|
||||
break;
|
||||
}
|
||||
// Receive in identical (max_sge*4k) fragments
|
||||
uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->recv_buf_pos < op_max ? iov.iov_len-rc->recv_buf_pos : op_max-op_size);
|
||||
sge[op_sge++] = {
|
||||
.addr = (uintptr_t)(iov.iov_base+rc->recv_buf_pos),
|
||||
.length = len,
|
||||
sge[0] = {
|
||||
.addr = 0,
|
||||
.length = 0,
|
||||
.lkey = rc->ctx->mr->lkey,
|
||||
};
|
||||
op_size += len;
|
||||
rc->recv_buf_pos += len;
|
||||
if (rc->recv_buf_pos >= iov.iov_len)
|
||||
uint64_t data_size = cur_op->req.hdr.opcode == OSD_OP_SEC_READ
|
||||
? cur_op->req.sec_rw.len
|
||||
: cur_op->req.rw.len;
|
||||
while (data_size >= op_max)
|
||||
{
|
||||
rc->recv_pos++;
|
||||
rc->recv_buf_pos = 0;
|
||||
try_send_rdma_wr(rc, cl->peer_fd, sge, 1);
|
||||
data_size -= op_max;
|
||||
}
|
||||
if (data_size > 0)
|
||||
try_send_rdma_wr(rc, cl->peer_fd, sge, 1);
|
||||
}
|
||||
if (op_sge > 0)
|
||||
else if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
|
||||
{
|
||||
try_recv_rdma_wr(cl, sge, op_sge);
|
||||
sge[0] = {
|
||||
.addr = 0,
|
||||
.length = 0,
|
||||
.lkey = rc->ctx->mr->lkey,
|
||||
};
|
||||
try_send_rdma_wr(rc, cl->peer_fd, sge, 1);
|
||||
}
|
||||
else
|
||||
return true;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void osd_messenger_t::try_send_rdma(osd_client_t *cl)
|
||||
{
|
||||
// Two different algorithms for outgoing and incoming operations
|
||||
while (cl->outbox.size() > 0)
|
||||
{
|
||||
osd_op_t *cur_op = cl->outbox[0].op;
|
||||
if (cur_op->op_type == OSD_OP_OUT)
|
||||
{
|
||||
// Pick a queue. Send operation to it in one part.
|
||||
int qi;
|
||||
for (qi = 0; qi < cl->rdma_queues.size() && cl->rdma_queues[qi]->cur_op != NULL; qi++) {}
|
||||
if (qi >= cl->rdma_queues.size())
|
||||
{
|
||||
// No free queues, retry later.
|
||||
// We only post 1 operation per queue to use the queue pair number as a 'tag'.
|
||||
return;
|
||||
}
|
||||
// Pick all entries for the operation from the queue
|
||||
int op_list_size = 0;
|
||||
while (op_list_size < cl->outbox.size() && cl->outbox[op_list_size].op == cur_op)
|
||||
{
|
||||
op_list_size++;
|
||||
}
|
||||
auto rq = cl->rdma_queues[qi];
|
||||
rq->cur_op = cur_op;
|
||||
ibv_sge sge[rq->max_sge];
|
||||
// FIXME: This won't work with long bitmaps. But I don't care, I want to finally test fucking RDMA
|
||||
// header or header+data
|
||||
sge[0] = {
|
||||
.addr = (uintptr_t)cl->send_list[0].iov_base,
|
||||
.length = (uint32_t)cl->send_list[0].iov_len,
|
||||
.lkey = rq->ctx->mr->lkey,
|
||||
};
|
||||
if (op_list_size == 2)
|
||||
{
|
||||
auto & iov = cl->send_list[1];
|
||||
sge[1] = {
|
||||
.addr = (uintptr_t)iov.iov_base,
|
||||
.length = (uint32_t)iov.iov_len,
|
||||
.lkey = rq->ctx->mr->lkey,
|
||||
};
|
||||
try_send_rdma_wr(rq, cl->peer_fd, sge, 2);
|
||||
}
|
||||
else if (op_list_size == 1)
|
||||
{
|
||||
try_send_rdma_wr(rq, cl->peer_fd, sge, 1);
|
||||
}
|
||||
else
|
||||
{
|
||||
printf("unexpected long send_list for opcode %lu: %lu entries\n", cur_op->req.hdr.opcode, cl->send_list.size());
|
||||
exit(1);
|
||||
}
|
||||
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+op_list_size);
|
||||
cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+op_list_size);
|
||||
// Post a receive request for the reply at the same time
|
||||
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ ||
|
||||
cur_op->req.hdr.opcode == OSD_OP_READ)
|
||||
{
|
||||
try_recv_rdma_read(cl, rq, cur_op, bs_bitmap_granularity);
|
||||
}
|
||||
else if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
|
||||
{
|
||||
assert(!cur_op->iov.count);
|
||||
// FIXME: hardcode
|
||||
#define clean_entry_bitmap_size 4
|
||||
// Reply size is known
|
||||
uint64_t data_size = cur_op->req.sec_read_bmp.len / sizeof(obj_ver_id) * (8 + clean_entry_bitmap_size);
|
||||
cur_op->rmw_buf = malloc_or_die(data_size);
|
||||
sge[0] = {
|
||||
.addr = (uintptr_t)cur_op->reply.buf,
|
||||
.length = (uint32_t)OSD_PACKET_SIZE,
|
||||
.lkey = rq->ctx->mr->lkey,
|
||||
};
|
||||
sge[1] = {
|
||||
.addr = (uintptr_t)cur_op->rmw_buf,
|
||||
.length = (uint32_t)data_size,
|
||||
.lkey = rq->ctx->mr->lkey,
|
||||
};
|
||||
try_recv_rdma_wr(rq, cl->peer_fd, sge, 2);
|
||||
}
|
||||
else
|
||||
{
|
||||
// No reply or reply size is unknown
|
||||
sge[0] = {
|
||||
.addr = (uintptr_t)cur_op->reply.buf,
|
||||
.length = (uint32_t)OSD_PACKET_SIZE,
|
||||
.lkey = rq->ctx->mr->lkey,
|
||||
};
|
||||
try_recv_rdma_wr(rq, cl->peer_fd, sge, 1);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Send reply to the same queue the operation came from.
|
||||
// Fragment it into parts no longer than (max_sge*4k) to always
|
||||
// be able to send and receive them correctly.
|
||||
int qi;
|
||||
for (qi = 0; qi < cl->rdma_queues.size() && cl->rdma_queues[qi]->cur_op != cur_op; qi++) {}
|
||||
if (qi >= cl->rdma_queues.size())
|
||||
{
|
||||
printf("Unknown incoming operation for client %d\n", cl->peer_fd);
|
||||
exit(1);
|
||||
}
|
||||
// Pick all entries for the operation from the queue
|
||||
int op_list_size = 0;
|
||||
while (op_list_size < cl->outbox.size() && cl->outbox[op_list_size].op == cur_op)
|
||||
{
|
||||
op_list_size++;
|
||||
}
|
||||
auto rq = cl->rdma_queues[qi];
|
||||
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ ||
|
||||
cur_op->req.hdr.opcode == OSD_OP_READ)
|
||||
{
|
||||
try_send_rdma_read(cl, rq, cur_op, op_list_size, bs_bitmap_granularity);
|
||||
rq->send_pos = 0;
|
||||
rq->send_buf_pos = 0;
|
||||
}
|
||||
else if (op_list_size == 1)
|
||||
{
|
||||
ibv_sge sge[1];
|
||||
sge[0] = {
|
||||
.addr = (uintptr_t)cl->send_list[0].iov_base,
|
||||
.length = (uint32_t)cl->send_list[0].iov_len,
|
||||
.lkey = rq->ctx->mr->lkey,
|
||||
};
|
||||
try_send_rdma_wr(rq, cl->peer_fd, sge, 1);
|
||||
}
|
||||
else if (op_list_size == 2)
|
||||
{
|
||||
ibv_sge sge[2];
|
||||
sge[0] = {
|
||||
.addr = (uintptr_t)cl->send_list[0].iov_base,
|
||||
.length = (uint32_t)cl->send_list[0].iov_len,
|
||||
.lkey = rq->ctx->mr->lkey,
|
||||
};
|
||||
sge[1] = {
|
||||
.addr = (uintptr_t)cl->send_list[1].iov_base,
|
||||
.length = (uint32_t)cl->send_list[1].iov_len,
|
||||
.lkey = rq->ctx->mr->lkey,
|
||||
};
|
||||
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
|
||||
try_send_rdma_wr(rq, cl->peer_fd, sge, 2);
|
||||
else
|
||||
{
|
||||
try_send_rdma_wr(rq, cl->peer_fd, sge, 1);
|
||||
try_send_rdma_wr(rq, cl->peer_fd, sge+1, 1);
|
||||
}
|
||||
}
|
||||
else if (op_list_size > 2)
|
||||
{
|
||||
printf("Unexpected long send_list for opcode %lu: %lu entries\n", cur_op->req.hdr.opcode, cl->send_list.size());
|
||||
exit(1);
|
||||
}
|
||||
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+op_list_size);
|
||||
cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+op_list_size);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Try to receive an incoming operation via RDMA
|
||||
void osd_messenger_t::try_recv_rdma(osd_client_t *cl, msgr_rdma_connection_t *rc)
|
||||
{
|
||||
rc->cur_op = new osd_op_t;
|
||||
rc->cur_op->peer_fd = cl->peer_fd;
|
||||
rc->cur_op->op_type = OSD_OP_IN;
|
||||
rc->cur_op->buf = memalign_or_die(MEM_ALIGNMENT, 128*1024); // FIXME hardcode for tests
|
||||
ibv_sge sge[2];
|
||||
sge[0] = {
|
||||
.addr = (uintptr_t)rc->cur_op->req.buf,
|
||||
.length = (uint32_t)OSD_PACKET_SIZE,
|
||||
.lkey = rc->ctx->mr->lkey,
|
||||
};
|
||||
sge[1] = {
|
||||
.addr = (uintptr_t)rc->cur_op->buf,
|
||||
.length = (uint32_t)128*1024,
|
||||
.lkey = rc->ctx->mr->lkey,
|
||||
};
|
||||
try_recv_rdma_wr(rc, cl->peer_fd, sge, 2);
|
||||
}
|
||||
|
||||
#define RDMA_EVENTS_AT_ONCE 32
|
||||
|
||||
void osd_messenger_t::handle_rdma_events()
|
||||
|
@ -491,6 +795,7 @@ void osd_messenger_t::handle_rdma_events()
|
|||
ibv_cq *ev_cq;
|
||||
void *ev_ctx;
|
||||
// FIXME: This is inefficient as it calls read()...
|
||||
timespec tv;
|
||||
if (ibv_get_cq_event(rdma_context->channel, &ev_cq, &ev_ctx) == 0)
|
||||
{
|
||||
ibv_ack_cq_events(rdma_context->cq, 1);
|
||||
|
@ -507,8 +812,8 @@ void osd_messenger_t::handle_rdma_events()
|
|||
event_count = ibv_poll_cq(rdma_context->cq, RDMA_EVENTS_AT_ONCE, wc);
|
||||
for (int i = 0; i < event_count; i++)
|
||||
{
|
||||
int client_id = wc[i].wr_id >> 1;
|
||||
bool is_send = wc[i].wr_id & 1;
|
||||
int client_id = wc[i].wr_id;
|
||||
bool is_send = wc[i].opcode == IBV_WC_SEND;
|
||||
auto cl_it = clients.find(client_id);
|
||||
if (cl_it == clients.end())
|
||||
{
|
||||
|
@ -526,56 +831,121 @@ void osd_messenger_t::handle_rdma_events()
|
|||
stop_client(client_id);
|
||||
continue;
|
||||
}
|
||||
int q;
|
||||
for (q = 0; q < cl->rdma_queues.size() && cl->rdma_queues[q]->qp->qp_num != wc[i].qp_num; q++) {}
|
||||
if (q >= cl->rdma_queues.size())
|
||||
{
|
||||
printf("Unknown queue %d for client %d\n", wc[i].qp_num, cl->peer_fd);
|
||||
exit(1);
|
||||
}
|
||||
auto rc = cl->rdma_queues[q];
|
||||
if (is_send)
|
||||
{
|
||||
clock_gettime(CLOCK_REALTIME, &tv);
|
||||
printf("%lu.%09lu Done RDMA send on queue %d\n", tv.tv_sec, tv.tv_nsec, wc[i].qp_num);
|
||||
}
|
||||
else
|
||||
{
|
||||
clock_gettime(CLOCK_REALTIME, &tv);
|
||||
printf("%lu.%09lu Done RDMA recv on queue %d, %d bytes\n", tv.tv_sec, tv.tv_nsec, wc[i].qp_num, wc[i].byte_len);
|
||||
}
|
||||
if (!is_send)
|
||||
{
|
||||
cl->rdma_conn->cur_recv--;
|
||||
if (!cl->rdma_conn->cur_recv)
|
||||
rc->cur_recv--;
|
||||
if (!rc->cur_recv)
|
||||
{
|
||||
cl->recv_list.done += cl->rdma_conn->recv_pos;
|
||||
cl->rdma_conn->recv_pos = 0;
|
||||
if (!cl->recv_list.get_size())
|
||||
// Fucking shit...
|
||||
if (rc->cur_op->op_type == OSD_OP_IN)
|
||||
{
|
||||
cl->read_remaining = 0;
|
||||
if (handle_finished_read(cl))
|
||||
if (wc[i].byte_len <= OSD_PACKET_SIZE)
|
||||
{
|
||||
try_recv_rdma(cl);
|
||||
free(rc->cur_op->buf);
|
||||
rc->cur_op->buf = NULL;
|
||||
}
|
||||
cl->received_ops.push_back(rc->cur_op);
|
||||
set_immediate.push_back([this, op = rc->cur_op]() { exec_op(op); });
|
||||
}
|
||||
else /* if (rc->cur_op->op_type == OSD_OP_OUT) */
|
||||
{
|
||||
if (rc->cur_op->reply.hdr.opcode == OSD_OP_SEC_READ ||
|
||||
rc->cur_op->reply.hdr.opcode == OSD_OP_READ)
|
||||
{
|
||||
// Data is already received
|
||||
cl->sent_ops.erase(rc->cur_op->req.hdr.id);
|
||||
handle_reply_ready(rc->cur_op);
|
||||
rc->cur_op = NULL;
|
||||
try_send_rdma(cl);
|
||||
}
|
||||
else if (rc->cur_op->reply.hdr.opcode == OSD_OP_SEC_READ_BMP)
|
||||
{
|
||||
// Data is already received, but we need to switch buffers
|
||||
cl->sent_ops.erase(rc->cur_op->req.hdr.id);
|
||||
free(rc->cur_op->buf);
|
||||
rc->cur_op->buf = rc->cur_op->rmw_buf;
|
||||
handle_reply_ready(rc->cur_op);
|
||||
rc->cur_op = NULL;
|
||||
try_send_rdma(cl);
|
||||
}
|
||||
else if (rc->cur_op->reply.hdr.opcode == OSD_OP_SEC_LIST && rc->cur_op->reply.hdr.retval > 0 ||
|
||||
rc->cur_op->reply.hdr.opcode == OSD_OP_SHOW_CONFIG && rc->cur_op->reply.hdr.retval > 0)
|
||||
{
|
||||
if (rc->recv_pos != 1)
|
||||
{
|
||||
// Data is not received yet (RNR)
|
||||
uint32_t len;
|
||||
if (rc->cur_op->reply.hdr.opcode == OSD_OP_SEC_LIST)
|
||||
len = sizeof(obj_ver_id) * rc->cur_op->reply.hdr.retval;
|
||||
else
|
||||
len = rc->cur_op->reply.hdr.retval;
|
||||
rc->cur_op->buf = malloc_or_die(len);
|
||||
ibv_sge sge[1];
|
||||
sge[0] = {
|
||||
.addr = (uintptr_t)rc->cur_op->buf,
|
||||
.length = len,
|
||||
.lkey = rc->ctx->mr->lkey,
|
||||
};
|
||||
try_recv_rdma_wr(rc, cl->peer_fd, sge, 1);
|
||||
rc->recv_pos = 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Done
|
||||
cl->sent_ops.erase(rc->cur_op->req.hdr.id);
|
||||
handle_reply_ready(rc->cur_op);
|
||||
rc->cur_op = NULL;
|
||||
rc->recv_pos = 0;
|
||||
try_send_rdma(cl);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Continue to receive data
|
||||
try_recv_rdma(cl);
|
||||
// No data
|
||||
cl->sent_ops.erase(rc->cur_op->req.hdr.id);
|
||||
handle_reply_ready(rc->cur_op);
|
||||
rc->cur_op = NULL;
|
||||
try_send_rdma(cl);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
cl->rdma_conn->cur_send--;
|
||||
if (!cl->rdma_conn->cur_send)
|
||||
rc->cur_send--;
|
||||
if (!rc->cur_send)
|
||||
{
|
||||
// Wait for the whole batch
|
||||
for (int i = 0; i < cl->rdma_conn->send_pos; i++)
|
||||
if (rc->cur_op->op_type == OSD_OP_OUT)
|
||||
{
|
||||
if (cl->outbox[i].flags & MSGR_SENDP_FREE)
|
||||
// Nothing
|
||||
}
|
||||
else /* if (rc->cur_op->op_type == OSD_OP_IN) */
|
||||
{
|
||||
// Reply fully sent
|
||||
delete cl->outbox[i].op;
|
||||
delete rc->cur_op;
|
||||
rc->cur_op = NULL;
|
||||
// Post receive for the next incoming op
|
||||
try_recv_rdma(cl, rc);
|
||||
}
|
||||
}
|
||||
if (cl->rdma_conn->send_pos > 0)
|
||||
{
|
||||
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+cl->rdma_conn->send_pos);
|
||||
cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+cl->rdma_conn->send_pos);
|
||||
cl->rdma_conn->send_pos = 0;
|
||||
}
|
||||
if (cl->rdma_conn->send_buf_pos > 0)
|
||||
{
|
||||
cl->send_list[0].iov_base += cl->rdma_conn->send_buf_pos;
|
||||
cl->send_list[0].iov_len -= cl->rdma_conn->send_buf_pos;
|
||||
cl->rdma_conn->send_buf_pos = 0;
|
||||
}
|
||||
try_send_rdma(cl);
|
||||
}
|
||||
}
|
||||
}
|
||||
} while (event_count > 0);
|
||||
|
@ -585,3 +955,8 @@ void osd_messenger_t::handle_rdma_events()
|
|||
}
|
||||
set_immediate.clear();
|
||||
}
|
||||
|
||||
int osd_messenger_t::get_rdma_max_sge()
|
||||
{
|
||||
return rdma_max_sge;
|
||||
}
|
||||
|
|
|
@ -3,6 +3,8 @@
|
|||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
struct osd_op_t;
|
||||
|
||||
struct msgr_rdma_address_t
|
||||
{
|
||||
ibv_gid gid;
|
||||
|
@ -44,6 +46,7 @@ struct msgr_rdma_connection_t
|
|||
int max_send = 0, max_recv = 0, max_sge = 0;
|
||||
int cur_send = 0, cur_recv = 0;
|
||||
|
||||
osd_op_t *cur_op = NULL;
|
||||
int send_pos = 0, send_buf_pos = 0;
|
||||
int recv_pos = 0, recv_buf_pos = 0;
|
||||
|
||||
|
|
|
@ -207,20 +207,26 @@ void osd_messenger_t::handle_op_hdr(osd_client_t *cl)
|
|||
else if (cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
|
||||
cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE)
|
||||
{
|
||||
if (cur_op->req.sec_rw.attr_len > 0)
|
||||
if (cur_op->req.sec_rw.bitmap_len > 0)
|
||||
{
|
||||
if (cur_op->req.sec_rw.attr_len > sizeof(unsigned))
|
||||
cur_op->bitmap = cur_op->rmw_buf = malloc_or_die(cur_op->req.sec_rw.attr_len);
|
||||
if (cur_op->req.sec_rw.bitmap_len > sizeof(void*))
|
||||
cur_op->bitmap = cur_op->rmw_buf = malloc_or_die(cur_op->req.sec_rw.bitmap_len);
|
||||
else
|
||||
cur_op->bitmap = &cur_op->bmp_data;
|
||||
cl->recv_list.push_back(cur_op->bitmap, cur_op->req.sec_rw.attr_len);
|
||||
if (cur_op->req.sec_rw.bitmap_len <= 8)
|
||||
memcpy(cur_op->bitmap, &cur_op->req.sec_rw.bitmap, cur_op->req.sec_rw.bitmap_len);
|
||||
else
|
||||
{
|
||||
cl->recv_list.push_back(cur_op->bitmap, cur_op->req.sec_rw.bitmap_len);
|
||||
cl->read_remaining += cur_op->req.sec_rw.bitmap_len;
|
||||
}
|
||||
}
|
||||
if (cur_op->req.sec_rw.len > 0)
|
||||
{
|
||||
cur_op->buf = memalign_or_die(MEM_ALIGNMENT, cur_op->req.sec_rw.len);
|
||||
cl->recv_list.push_back(cur_op->buf, cur_op->req.sec_rw.len);
|
||||
cl->read_remaining += cur_op->req.sec_rw.len;
|
||||
}
|
||||
cl->read_remaining = cur_op->req.sec_rw.len + cur_op->req.sec_rw.attr_len;
|
||||
}
|
||||
else if (cur_op->req.hdr.opcode == OSD_OP_SEC_STABILIZE ||
|
||||
cur_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK)
|
||||
|
@ -295,7 +301,7 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
|
|||
if (op->reply.hdr.opcode == OSD_OP_SEC_READ || op->reply.hdr.opcode == OSD_OP_READ)
|
||||
{
|
||||
// Read data. In this case we assume that the buffer is preallocated by the caller (!)
|
||||
unsigned bmp_len = (op->reply.hdr.opcode == OSD_OP_SEC_READ ? op->reply.sec_rw.attr_len : op->reply.rw.bitmap_len);
|
||||
unsigned bmp_len = (op->reply.hdr.opcode == OSD_OP_SEC_READ ? op->reply.sec_rw.bitmap_len : op->reply.rw.bitmap_len);
|
||||
unsigned expected_size = (op->reply.hdr.opcode == OSD_OP_SEC_READ ? op->req.sec_rw.len : op->req.rw.len);
|
||||
if (op->reply.hdr.retval >= 0 && (op->reply.hdr.retval != expected_size || bmp_len > op->bitmap_len))
|
||||
{
|
||||
|
@ -309,14 +315,24 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
|
|||
if (op->reply.hdr.retval >= 0 && bmp_len > 0)
|
||||
{
|
||||
assert(op->bitmap);
|
||||
if (bmp_len <= 8)
|
||||
{
|
||||
memcpy(op->bitmap, (op->reply.hdr.opcode == OSD_OP_SEC_READ
|
||||
? &op->reply.sec_rw.bitmap
|
||||
: &op->reply.rw.bitmap), bmp_len);
|
||||
}
|
||||
else
|
||||
{
|
||||
cl->recv_list.push_back(op->bitmap, bmp_len);
|
||||
cl->read_remaining += bmp_len;
|
||||
}
|
||||
}
|
||||
if (op->reply.hdr.retval > 0)
|
||||
{
|
||||
assert(op->iov.count > 0);
|
||||
cl->recv_list.append(op->iov);
|
||||
cl->read_remaining += op->reply.hdr.retval;
|
||||
}
|
||||
cl->read_remaining = op->reply.hdr.retval + bmp_len;
|
||||
if (cl->read_remaining == 0)
|
||||
{
|
||||
goto reuse;
|
||||
|
|
|
@ -50,23 +50,37 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
|
|||
// Bitmap
|
||||
if (cur_op->op_type == OSD_OP_IN &&
|
||||
cur_op->req.hdr.opcode == OSD_OP_SEC_READ &&
|
||||
cur_op->reply.sec_rw.attr_len > 0)
|
||||
cur_op->reply.sec_rw.bitmap_len > 0)
|
||||
{
|
||||
if (cur_op->reply.sec_rw.bitmap_len <= 8)
|
||||
{
|
||||
memcpy(&cur_op->reply.sec_rw.bitmap, cur_op->bitmap, cur_op->reply.sec_rw.bitmap_len);
|
||||
}
|
||||
else
|
||||
{
|
||||
to_send_list.push_back((iovec){
|
||||
.iov_base = cur_op->bitmap,
|
||||
.iov_len = cur_op->reply.sec_rw.attr_len,
|
||||
.iov_len = cur_op->reply.sec_rw.bitmap_len,
|
||||
});
|
||||
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 });
|
||||
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = MSGR_SENDP_BMP });
|
||||
}
|
||||
}
|
||||
else if (cur_op->op_type == OSD_OP_OUT &&
|
||||
(cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE || cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE) &&
|
||||
cur_op->req.sec_rw.attr_len > 0)
|
||||
cur_op->req.sec_rw.bitmap_len > 0)
|
||||
{
|
||||
if (cur_op->req.sec_rw.bitmap_len <= 8)
|
||||
{
|
||||
memcpy(&cur_op->req.sec_rw.bitmap, cur_op->bitmap, cur_op->req.sec_rw.bitmap_len);
|
||||
}
|
||||
else
|
||||
{
|
||||
to_send_list.push_back((iovec){
|
||||
.iov_base = cur_op->bitmap,
|
||||
.iov_len = cur_op->req.sec_rw.attr_len,
|
||||
});
|
||||
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 });
|
||||
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = MSGR_SENDP_BMP });
|
||||
}
|
||||
}
|
||||
// Operation data
|
||||
if ((cur_op->op_type == OSD_OP_IN
|
||||
|
@ -268,15 +282,13 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
|
|||
}
|
||||
cl->write_state = cl->outbox.size() > 0 ? CL_WRITE_READY : 0;
|
||||
#ifdef WITH_RDMA
|
||||
if (cl->rdma_conn && !cl->outbox.size() && cl->peer_state == PEER_RDMA_CONNECTING)
|
||||
if (cl->peer_state == PEER_RDMA_CONNECTING && cl->rdma_queues.size() > 0 && !cl->outbox.size())
|
||||
{
|
||||
// FIXME: Do something better than just forgetting the FD
|
||||
// FIXME: Ignore pings during RDMA state transition
|
||||
printf("Successfully connected with client %d using RDMA\n", cl->peer_fd);
|
||||
cl->peer_state = PEER_RDMA;
|
||||
tfd->set_fd_handler(cl->peer_fd, false, NULL);
|
||||
// Add the initial receive request
|
||||
try_recv_rdma(cl);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
|
|
@ -123,10 +123,11 @@ void osd_messenger_t::stop_client(int peer_fd, bool force)
|
|||
// ...because peer_fd number can get reused after close()
|
||||
close(peer_fd);
|
||||
#ifdef WITH_RDMA
|
||||
if (cl->rdma_conn)
|
||||
for (auto rdma_conn: cl->rdma_queues)
|
||||
{
|
||||
delete cl->rdma_conn;
|
||||
delete rdma_conn;
|
||||
}
|
||||
cl->rdma_queues.resize(0);
|
||||
#endif
|
||||
#endif
|
||||
// Find the item again because it can be invalidated at this point
|
||||
|
|
|
@ -35,7 +35,7 @@
|
|||
#define MEM_ALIGNMENT 512
|
||||
#endif
|
||||
#define OSD_RW_MAX 64*1024*1024
|
||||
#define OSD_PROTOCOL_VERSION 1
|
||||
#define OSD_PROTOCOL_VERSION 2
|
||||
|
||||
// common request and reply headers
|
||||
struct __attribute__((__packed__)) osd_op_header_t
|
||||
|
@ -74,8 +74,10 @@ struct __attribute__((__packed__)) osd_op_sec_rw_t
|
|||
// length
|
||||
uint32_t len;
|
||||
// bitmap/attribute length - bitmap comes after header, but before data
|
||||
uint32_t attr_len;
|
||||
uint32_t bitmap_len;
|
||||
uint32_t pad0;
|
||||
// inline bitmap (when it's no longer than 8 bytes)
|
||||
uint64_t bitmap;
|
||||
};
|
||||
|
||||
struct __attribute__((__packed__)) osd_reply_sec_rw_t
|
||||
|
@ -84,8 +86,10 @@ struct __attribute__((__packed__)) osd_reply_sec_rw_t
|
|||
// for reads and writes: assigned or read version number
|
||||
uint64_t version;
|
||||
// for reads: bitmap/attribute length (just to double-check)
|
||||
uint32_t attr_len;
|
||||
uint32_t bitmap_len;
|
||||
uint32_t pad0;
|
||||
// inline bitmap (when it's no longer than 8 bytes)
|
||||
uint64_t bitmap;
|
||||
};
|
||||
|
||||
// delete object on the secondary OSD
|
||||
|
@ -199,6 +203,8 @@ struct __attribute__((__packed__)) osd_reply_rw_t
|
|||
// for reads: bitmap length
|
||||
uint32_t bitmap_len;
|
||||
uint32_t pad0;
|
||||
// inline bitmap (when it's no longer than 8 bytes)
|
||||
uint64_t bitmap;
|
||||
};
|
||||
|
||||
// sync to the primary OSD
|
||||
|
|
|
@ -235,6 +235,9 @@ resume_2:
|
|||
{
|
||||
reconstruct_stripes_jerasure(stripes, op_data->pg_size, op_data->pg_data_size, clean_entry_bitmap_size);
|
||||
}
|
||||
if (cur_op->reply.rw.bitmap_len <= 8)
|
||||
memcpy(&cur_op->reply.rw.bitmap, op_data->stripes[0].bmp_buf, cur_op->reply.rw.bitmap_len);
|
||||
else
|
||||
cur_op->iov.push_back(op_data->stripes[0].bmp_buf, cur_op->reply.rw.bitmap_len);
|
||||
for (int role = 0; role < op_data->pg_size; role++)
|
||||
{
|
||||
|
@ -250,6 +253,9 @@ resume_2:
|
|||
}
|
||||
else
|
||||
{
|
||||
if (cur_op->reply.rw.bitmap_len <= 8)
|
||||
memcpy(&cur_op->reply.rw.bitmap, op_data->stripes[0].bmp_buf, cur_op->reply.rw.bitmap_len);
|
||||
else
|
||||
cur_op->iov.push_back(op_data->stripes[0].bmp_buf, cur_op->reply.rw.bitmap_len);
|
||||
cur_op->iov.push_back(cur_op->buf, cur_op->req.rw.len);
|
||||
}
|
||||
|
|
|
@ -200,7 +200,7 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o
|
|||
.version = op_version,
|
||||
.offset = wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start,
|
||||
.len = wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start,
|
||||
.attr_len = wr ? clean_entry_bitmap_size : 0,
|
||||
.bitmap_len = wr ? clean_entry_bitmap_size : 0,
|
||||
};
|
||||
#ifdef OSD_DEBUG
|
||||
printf(
|
||||
|
|
|
@ -20,9 +20,9 @@ void osd_t::secondary_op_callback(osd_op_t *op)
|
|||
if (op->req.hdr.opcode == OSD_OP_SEC_READ)
|
||||
{
|
||||
if (op->bs_op->retval >= 0)
|
||||
op->reply.sec_rw.attr_len = clean_entry_bitmap_size;
|
||||
op->reply.sec_rw.bitmap_len = clean_entry_bitmap_size;
|
||||
else
|
||||
op->reply.sec_rw.attr_len = 0;
|
||||
op->reply.sec_rw.bitmap_len = 0;
|
||||
if (op->bs_op->retval > 0)
|
||||
op->iov.push_back(op->buf, op->bs_op->retval);
|
||||
}
|
||||
|
@ -81,7 +81,7 @@ void osd_t::exec_secondary(osd_op_t *cur_op)
|
|||
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ)
|
||||
{
|
||||
// Allocate memory for the read operation
|
||||
if (clean_entry_bitmap_size > sizeof(unsigned))
|
||||
if (clean_entry_bitmap_size > sizeof(void*))
|
||||
cur_op->bitmap = cur_op->rmw_buf = malloc_or_die(clean_entry_bitmap_size);
|
||||
else
|
||||
cur_op->bitmap = &cur_op->bmp_data;
|
||||
|
@ -166,14 +166,20 @@ void osd_t::exec_show_config(osd_op_t *cur_op)
|
|||
{
|
||||
// Indicate that RDMA is enabled
|
||||
wire_config["rdma_enabled"] = true;
|
||||
if (req_json["connect_rdma"].is_string())
|
||||
if (req_json["rdma_queues"].array_items().size())
|
||||
{
|
||||
// Peer is trying to connect using RDMA, try to satisfy him
|
||||
bool ok = msgr.connect_rdma(cur_op->peer_fd, req_json["connect_rdma"].string_value());
|
||||
auto cl = msgr.clients.at(cur_op->peer_fd);
|
||||
bool ok = msgr.connect_rdma_client(cl, req_json["rdma_queues"], req_json["rdma_max_sge"].uint64_value());
|
||||
if (ok)
|
||||
{
|
||||
wire_config["rdma_connected"] = true;
|
||||
wire_config["rdma_address"] = msgr.clients.at(cur_op->peer_fd)->rdma_conn->addr.to_string();
|
||||
json11::Json::array rdma_queues;
|
||||
for (auto rdma_conn: cl->rdma_queues)
|
||||
{
|
||||
rdma_queues.push_back(rdma_conn->addr.to_string());
|
||||
}
|
||||
wire_config["rdma_queues"] = rdma_queues;
|
||||
wire_config["rdma_max_sge"] = msgr.get_rdma_max_sge();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue