forked from vitalif/vitastor
Fix several I/O bugs
parent
6023cac361
commit
3bf53754c2
|
@ -9,6 +9,10 @@ void osd_messenger_t::read_requests()
|
||||||
{
|
{
|
||||||
int peer_fd = read_ready_clients[i];
|
int peer_fd = read_ready_clients[i];
|
||||||
osd_client_t *cl = clients[peer_fd];
|
osd_client_t *cl = clients[peer_fd];
|
||||||
|
if (cl->read_msg.msg_iovlen)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (cl->read_remaining < receive_buffer_size)
|
if (cl->read_remaining < receive_buffer_size)
|
||||||
{
|
{
|
||||||
cl->read_iov.iov_base = cl->in_buf;
|
cl->read_iov.iov_base = cl->in_buf;
|
||||||
|
@ -29,6 +33,7 @@ void osd_messenger_t::read_requests()
|
||||||
io_uring_sqe* sqe = ringloop->get_sqe();
|
io_uring_sqe* sqe = ringloop->get_sqe();
|
||||||
if (!sqe)
|
if (!sqe)
|
||||||
{
|
{
|
||||||
|
cl->read_msg.msg_iovlen = 0;
|
||||||
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i);
|
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -52,6 +57,7 @@ void osd_messenger_t::read_requests()
|
||||||
bool osd_messenger_t::handle_read(int result, osd_client_t *cl)
|
bool osd_messenger_t::handle_read(int result, osd_client_t *cl)
|
||||||
{
|
{
|
||||||
bool ret = false;
|
bool ret = false;
|
||||||
|
cl->read_msg.msg_iovlen = 0;
|
||||||
cl->refs--;
|
cl->refs--;
|
||||||
if (cl->peer_state == PEER_STOPPED)
|
if (cl->peer_state == PEER_STOPPED)
|
||||||
{
|
{
|
||||||
|
@ -160,8 +166,14 @@ bool osd_messenger_t::handle_finished_read(osd_client_t *cl)
|
||||||
{
|
{
|
||||||
if (cl->read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC)
|
if (cl->read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC)
|
||||||
return handle_reply_hdr(cl);
|
return handle_reply_hdr(cl);
|
||||||
else
|
else if (cl->read_op->req.hdr.magic == SECONDARY_OSD_OP_MAGIC)
|
||||||
handle_op_hdr(cl);
|
handle_op_hdr(cl);
|
||||||
|
else
|
||||||
|
{
|
||||||
|
printf("Received garbage: magic=%lx id=%lu opcode=%lx from %d\n", cl->read_op->req.hdr.magic, cl->read_op->req.hdr.id, cl->read_op->req.hdr.opcode, cl->peer_fd);
|
||||||
|
stop_client(cl->peer_fd);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else if (cl->read_state == CL_READ_DATA)
|
else if (cl->read_state == CL_READ_DATA)
|
||||||
{
|
{
|
||||||
|
|
|
@ -46,7 +46,8 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
|
||||||
to_send_list.push_back((iovec){ .iov_base = cur_op->req.buf, .iov_len = OSD_PACKET_SIZE });
|
to_send_list.push_back((iovec){ .iov_base = cur_op->req.buf, .iov_len = OSD_PACKET_SIZE });
|
||||||
cl->sent_ops[cur_op->req.hdr.id] = cur_op;
|
cl->sent_ops[cur_op->req.hdr.id] = cur_op;
|
||||||
}
|
}
|
||||||
// Pre-defined send_lists
|
to_outbox.push_back(NULL);
|
||||||
|
// Operation data
|
||||||
if ((cur_op->op_type == OSD_OP_IN
|
if ((cur_op->op_type == OSD_OP_IN
|
||||||
? (cur_op->req.hdr.opcode == OSD_OP_READ ||
|
? (cur_op->req.hdr.opcode == OSD_OP_READ ||
|
||||||
cur_op->req.hdr.opcode == OSD_OP_SEC_READ ||
|
cur_op->req.hdr.opcode == OSD_OP_SEC_READ ||
|
||||||
|
@ -58,17 +59,17 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
|
||||||
cur_op->req.hdr.opcode == OSD_OP_SEC_STABILIZE ||
|
cur_op->req.hdr.opcode == OSD_OP_SEC_STABILIZE ||
|
||||||
cur_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK)) && cur_op->iov.count > 0)
|
cur_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK)) && cur_op->iov.count > 0)
|
||||||
{
|
{
|
||||||
to_outbox.push_back(NULL);
|
|
||||||
for (int i = 0; i < cur_op->iov.count; i++)
|
for (int i = 0; i < cur_op->iov.count; i++)
|
||||||
{
|
{
|
||||||
assert(cur_op->iov.buf[i].iov_base);
|
assert(cur_op->iov.buf[i].iov_base);
|
||||||
to_send_list.push_back(cur_op->iov.buf[i]);
|
to_send_list.push_back(cur_op->iov.buf[i]);
|
||||||
to_outbox.push_back(i == cur_op->iov.count-1 ? cur_op : NULL);
|
to_outbox.push_back(NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
if (cur_op->op_type == OSD_OP_IN)
|
||||||
{
|
{
|
||||||
to_outbox.push_back(cur_op);
|
// To free it later
|
||||||
|
to_outbox[to_outbox.size()-1] = cur_op;
|
||||||
}
|
}
|
||||||
if (!ringloop)
|
if (!ringloop)
|
||||||
{
|
{
|
||||||
|
@ -92,6 +93,10 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
|
||||||
void osd_messenger_t::measure_exec(osd_op_t *cur_op)
|
void osd_messenger_t::measure_exec(osd_op_t *cur_op)
|
||||||
{
|
{
|
||||||
// Measure execution latency
|
// Measure execution latency
|
||||||
|
if (cur_op->req.hdr.opcode > OSD_OP_MAX)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
timespec tv_end;
|
timespec tv_end;
|
||||||
clock_gettime(CLOCK_REALTIME, &tv_end);
|
clock_gettime(CLOCK_REALTIME, &tv_end);
|
||||||
stats.op_stat_count[cur_op->req.hdr.opcode]++;
|
stats.op_stat_count[cur_op->req.hdr.opcode]++;
|
||||||
|
@ -198,11 +203,8 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
|
||||||
{
|
{
|
||||||
if (cl->outbox[done])
|
if (cl->outbox[done])
|
||||||
{
|
{
|
||||||
// Operation fully sent
|
// Reply fully sent
|
||||||
if (cl->outbox[done]->op_type == OSD_OP_IN)
|
delete cl->outbox[done];
|
||||||
{
|
|
||||||
delete cl->outbox[done];
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
result -= iov.iov_len;
|
result -= iov.iov_len;
|
||||||
done++;
|
done++;
|
||||||
|
|
|
@ -489,7 +489,11 @@ resume_7:
|
||||||
}
|
}
|
||||||
// Remember PG as dirty to drop the connection when PG goes offline
|
// Remember PG as dirty to drop the connection when PG goes offline
|
||||||
// (this is required because of the "lazy sync")
|
// (this is required because of the "lazy sync")
|
||||||
c_cli.clients[cur_op->peer_fd]->dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
|
auto cl_it = c_cli.clients.find(cur_op->peer_fd);
|
||||||
|
if (cl_it != c_cli.clients.end())
|
||||||
|
{
|
||||||
|
cl_it->second->dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
|
||||||
|
}
|
||||||
dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
|
dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
|
Loading…
Reference in New Issue