forked from vitalif/vitastor
parent
c41fd7ea18
commit
fd05e13bc4
63
osd.cpp
63
osd.cpp
|
@ -113,8 +113,7 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
|
||||||
|
|
||||||
epoll_event ev;
|
epoll_event ev;
|
||||||
ev.data.fd = listen_fd;
|
ev.data.fd = listen_fd;
|
||||||
// FIXME: Use EPOLLET
|
ev.events = EPOLLIN | EPOLLET;
|
||||||
ev.events = EPOLLIN;
|
|
||||||
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) < 0)
|
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) < 0)
|
||||||
{
|
{
|
||||||
close(listen_fd);
|
close(listen_fd);
|
||||||
|
@ -168,40 +167,23 @@ bool osd_t::shutdown()
|
||||||
|
|
||||||
void osd_t::loop()
|
void osd_t::loop()
|
||||||
{
|
{
|
||||||
if (wait_state == 0)
|
if (!wait_state)
|
||||||
{
|
|
||||||
io_uring_sqe *sqe = ringloop->get_sqe();
|
|
||||||
if (!sqe)
|
|
||||||
{
|
|
||||||
wait_state = 0;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
ring_data_t *data = ((ring_data_t*)sqe->user_data);
|
|
||||||
my_uring_prep_poll_add(sqe, epoll_fd, POLLIN);
|
|
||||||
data->callback = [&](ring_data_t *data)
|
|
||||||
{
|
|
||||||
if (data->res < 0)
|
|
||||||
{
|
|
||||||
throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res));
|
|
||||||
}
|
|
||||||
handle_epoll_events();
|
|
||||||
};
|
|
||||||
wait_state = 1;
|
|
||||||
}
|
|
||||||
else if (wait_state == 2)
|
|
||||||
{
|
{
|
||||||
handle_epoll_events();
|
handle_epoll_events();
|
||||||
|
wait_state = 1;
|
||||||
}
|
}
|
||||||
handle_peers();
|
handle_peers();
|
||||||
send_replies();
|
|
||||||
read_requests();
|
read_requests();
|
||||||
|
send_replies();
|
||||||
ringloop->submit();
|
ringloop->submit();
|
||||||
}
|
}
|
||||||
|
|
||||||
int osd_t::handle_epoll_events()
|
void osd_t::handle_epoll_events()
|
||||||
{
|
{
|
||||||
|
int nfds;
|
||||||
epoll_event events[MAX_EPOLL_EVENTS];
|
epoll_event events[MAX_EPOLL_EVENTS];
|
||||||
int nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, 0);
|
restart:
|
||||||
|
nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, 0);
|
||||||
for (int i = 0; i < nfds; i++)
|
for (int i = 0; i < nfds; i++)
|
||||||
{
|
{
|
||||||
if (events[i].data.fd == listen_fd)
|
if (events[i].data.fd == listen_fd)
|
||||||
|
@ -226,7 +208,7 @@ int osd_t::handle_epoll_events()
|
||||||
// Add FD to epoll
|
// Add FD to epoll
|
||||||
epoll_event ev;
|
epoll_event ev;
|
||||||
ev.data.fd = peer_fd;
|
ev.data.fd = peer_fd;
|
||||||
ev.events = EPOLLIN | EPOLLRDHUP;
|
ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
|
||||||
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0)
|
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0)
|
||||||
{
|
{
|
||||||
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
|
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
|
||||||
|
@ -253,11 +235,11 @@ int osd_t::handle_epoll_events()
|
||||||
printf("osd: client %d disconnected\n", cl.peer_fd);
|
printf("osd: client %d disconnected\n", cl.peer_fd);
|
||||||
stop_client(cl.peer_fd);
|
stop_client(cl.peer_fd);
|
||||||
}
|
}
|
||||||
else if (!cl.read_ready)
|
else
|
||||||
{
|
{
|
||||||
// Mark client as ready (i.e. some data is available)
|
// Mark client as ready (i.e. some data is available)
|
||||||
cl.read_ready = true;
|
cl.read_ready++;
|
||||||
if (!cl.reading)
|
if (cl.read_ready == 1)
|
||||||
{
|
{
|
||||||
read_ready_clients.push_back(cl.peer_fd);
|
read_ready_clients.push_back(cl.peer_fd);
|
||||||
ringloop->wakeup();
|
ringloop->wakeup();
|
||||||
|
@ -265,8 +247,25 @@ int osd_t::handle_epoll_events()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
wait_state = nfds == MAX_EPOLL_EVENTS ? 2 : 0;
|
if (nfds == MAX_EPOLL_EVENTS)
|
||||||
return nfds;
|
{
|
||||||
|
goto restart;
|
||||||
|
}
|
||||||
|
io_uring_sqe *sqe = ringloop->get_sqe();
|
||||||
|
if (!sqe)
|
||||||
|
{
|
||||||
|
throw std::runtime_error("can't get SQE, will fall out of sync with EPOLLET");
|
||||||
|
}
|
||||||
|
ring_data_t *data = ((ring_data_t*)sqe->user_data);
|
||||||
|
my_uring_prep_poll_add(sqe, epoll_fd, POLLIN);
|
||||||
|
data->callback = [this](ring_data_t *data)
|
||||||
|
{
|
||||||
|
if (data->res < 0)
|
||||||
|
{
|
||||||
|
throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res));
|
||||||
|
}
|
||||||
|
handle_epoll_events();
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
void osd_t::cancel_osd_ops(osd_client_t & cl)
|
void osd_t::cancel_osd_ops(osd_client_t & cl)
|
||||||
|
|
5
osd.h
5
osd.h
|
@ -131,8 +131,7 @@ struct osd_client_t
|
||||||
osd_num_t osd_num = 0;
|
osd_num_t osd_num = 0;
|
||||||
|
|
||||||
// Read state
|
// Read state
|
||||||
bool read_ready = false;
|
int read_ready = 0;
|
||||||
bool reading = false;
|
|
||||||
osd_op_t *read_op = NULL;
|
osd_op_t *read_op = NULL;
|
||||||
int read_reply_id = 0;
|
int read_reply_id = 0;
|
||||||
iovec read_iov;
|
iovec read_iov;
|
||||||
|
@ -221,7 +220,7 @@ class osd_t
|
||||||
|
|
||||||
// event loop, socket read/write
|
// event loop, socket read/write
|
||||||
void loop();
|
void loop();
|
||||||
int handle_epoll_events();
|
void handle_epoll_events();
|
||||||
void read_requests();
|
void read_requests();
|
||||||
void handle_read(ring_data_t *data, int peer_fd);
|
void handle_read(ring_data_t *data, int peer_fd);
|
||||||
void handle_op_hdr(osd_client_t *cl);
|
void handle_op_hdr(osd_client_t *cl);
|
||||||
|
|
|
@ -83,7 +83,7 @@ void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port
|
||||||
// Add FD to epoll (EPOLLOUT for tracking connect() result)
|
// Add FD to epoll (EPOLLOUT for tracking connect() result)
|
||||||
epoll_event ev;
|
epoll_event ev;
|
||||||
ev.data.fd = peer_fd;
|
ev.data.fd = peer_fd;
|
||||||
ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP;
|
ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET;
|
||||||
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0)
|
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0)
|
||||||
{
|
{
|
||||||
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
|
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
|
||||||
|
@ -114,7 +114,7 @@ void osd_t::handle_connect_result(int peer_fd)
|
||||||
cl.peer_state = PEER_CONNECTED;
|
cl.peer_state = PEER_CONNECTED;
|
||||||
epoll_event ev;
|
epoll_event ev;
|
||||||
ev.data.fd = peer_fd;
|
ev.data.fd = peer_fd;
|
||||||
ev.events = EPOLLIN | EPOLLRDHUP;
|
ev.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
|
||||||
if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peer_fd, &ev) < 0)
|
if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peer_fd, &ev) < 0)
|
||||||
{
|
{
|
||||||
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
|
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
|
||||||
|
|
|
@ -33,8 +33,6 @@ void osd_t::read_requests()
|
||||||
cl.read_msg.msg_iovlen = 1;
|
cl.read_msg.msg_iovlen = 1;
|
||||||
data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data, peer_fd); };
|
data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data, peer_fd); };
|
||||||
my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0);
|
my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0);
|
||||||
cl.reading = true;
|
|
||||||
cl.read_ready = false;
|
|
||||||
}
|
}
|
||||||
read_ready_clients.clear();
|
read_ready_clients.clear();
|
||||||
}
|
}
|
||||||
|
@ -45,18 +43,21 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd)
|
||||||
if (cl_it != clients.end())
|
if (cl_it != clients.end())
|
||||||
{
|
{
|
||||||
auto & cl = cl_it->second;
|
auto & cl = cl_it->second;
|
||||||
if (data->res < 0 && data->res != -EAGAIN)
|
if (data->res == -EAGAIN)
|
||||||
|
{
|
||||||
|
cl.read_ready--;
|
||||||
|
if (cl.read_ready > 0)
|
||||||
|
read_ready_clients.push_back(peer_fd);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
else if (data->res < 0)
|
||||||
{
|
{
|
||||||
// this is a client socket, so don't panic. just disconnect it
|
// this is a client socket, so don't panic. just disconnect it
|
||||||
printf("Client %d socket read error: %d (%s). Disconnecting client\n", peer_fd, -data->res, strerror(-data->res));
|
printf("Client %d socket read error: %d (%s). Disconnecting client\n", peer_fd, -data->res, strerror(-data->res));
|
||||||
stop_client(peer_fd);
|
stop_client(peer_fd);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
cl.reading = false;
|
read_ready_clients.push_back(peer_fd);
|
||||||
if (cl.read_ready)
|
|
||||||
{
|
|
||||||
read_ready_clients.push_back(peer_fd);
|
|
||||||
}
|
|
||||||
if (data->res > 0)
|
if (data->res > 0)
|
||||||
{
|
{
|
||||||
cl.read_remaining -= data->res;
|
cl.read_remaining -= data->res;
|
||||||
|
|
Loading…
Reference in New Issue