forked from vitalif/vitastor
Compare commits
3 Commits
master
...
openonload
Author | SHA1 | Date |
---|---|---|
Vitaliy Filippov | 85010fed38 | |
Vitaliy Filippov | 2498e504c2 | |
Vitaliy Filippov | d56633843f |
8
Makefile
8
Makefile
|
@ -18,7 +18,7 @@ OSD_OBJS := osd.o osd_secondary.o msgr_receive.o msgr_send.o osd_peering.o osd_f
|
||||||
osd_primary.o osd_primary_subops.o etcd_state_client.o messenger.o osd_cluster.o http_client.o pg_states.o \
|
osd_primary.o osd_primary_subops.o etcd_state_client.o messenger.o osd_cluster.o http_client.o pg_states.o \
|
||||||
osd_rmw.o json11.o base64.o timerfd_manager.o
|
osd_rmw.o json11.o base64.o timerfd_manager.o
|
||||||
osd: ./libblockstore.so osd_main.cpp osd.h osd_ops.h $(OSD_OBJS)
|
osd: ./libblockstore.so osd_main.cpp osd.h osd_ops.h $(OSD_OBJS)
|
||||||
g++ $(CXXFLAGS) -o $@ osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring
|
g++ $(CXXFLAGS) -o $@ osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring -lpthread
|
||||||
|
|
||||||
stub_osd: stub_osd.o rw_blocking.o
|
stub_osd: stub_osd.o rw_blocking.o
|
||||||
g++ $(CXXFLAGS) -o $@ stub_osd.o rw_blocking.o -ltcmalloc_minimal
|
g++ $(CXXFLAGS) -o $@ stub_osd.o rw_blocking.o -ltcmalloc_minimal
|
||||||
|
@ -87,7 +87,7 @@ dump_journal.o: dump_journal.cpp allocator.h blockstore.h blockstore_flush.h blo
|
||||||
g++ $(CXXFLAGS) -c -o $@ $<
|
g++ $(CXXFLAGS) -c -o $@ $<
|
||||||
epoll_manager.o: epoll_manager.cpp epoll_manager.h ringloop.h timerfd_manager.h
|
epoll_manager.o: epoll_manager.cpp epoll_manager.h ringloop.h timerfd_manager.h
|
||||||
g++ $(CXXFLAGS) -c -o $@ $<
|
g++ $(CXXFLAGS) -c -o $@ $<
|
||||||
etcd_state_client.o: etcd_state_client.cpp base64.h etcd_state_client.h http_client.h json11/json11.hpp object_id.h osd_id.h osd_ops.h pg_states.h timerfd_manager.h
|
etcd_state_client.o: etcd_state_client.cpp base64.h etcd_state_client.h http_client.h json11/json11.hpp object_id.h osd_id.h osd_ops.h pg_states.h ringloop.h timerfd_manager.h
|
||||||
g++ $(CXXFLAGS) -c -o $@ $<
|
g++ $(CXXFLAGS) -c -o $@ $<
|
||||||
fio_cluster.o: fio_cluster.cpp cluster_client.h epoll_manager.h etcd_state_client.h fio/fio.h fio/optgroup.h http_client.h json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h
|
fio_cluster.o: fio_cluster.cpp cluster_client.h epoll_manager.h etcd_state_client.h fio/fio.h fio/optgroup.h http_client.h json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h
|
||||||
g++ $(CXXFLAGS) -c -o $@ $<
|
g++ $(CXXFLAGS) -c -o $@ $<
|
||||||
|
@ -95,7 +95,7 @@ fio_engine.o: fio_engine.cpp blockstore.h fio/fio.h fio/optgroup.h json11/json11
|
||||||
g++ $(CXXFLAGS) -c -o $@ $<
|
g++ $(CXXFLAGS) -c -o $@ $<
|
||||||
fio_sec_osd.o: fio_sec_osd.cpp fio/fio.h fio/optgroup.h object_id.h osd_id.h osd_ops.h rw_blocking.h
|
fio_sec_osd.o: fio_sec_osd.cpp fio/fio.h fio/optgroup.h object_id.h osd_id.h osd_ops.h rw_blocking.h
|
||||||
g++ $(CXXFLAGS) -c -o $@ $<
|
g++ $(CXXFLAGS) -c -o $@ $<
|
||||||
http_client.o: http_client.cpp http_client.h json11/json11.hpp timerfd_manager.h
|
http_client.o: http_client.cpp http_client.h json11/json11.hpp ringloop.h timerfd_manager.h
|
||||||
g++ $(CXXFLAGS) -c -o $@ $<
|
g++ $(CXXFLAGS) -c -o $@ $<
|
||||||
messenger.o: messenger.cpp json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h
|
messenger.o: messenger.cpp json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h
|
||||||
g++ $(CXXFLAGS) -c -o $@ $<
|
g++ $(CXXFLAGS) -c -o $@ $<
|
||||||
|
@ -149,5 +149,5 @@ test_blockstore.o: test_blockstore.cpp blockstore.h object_id.h ringloop.h timer
|
||||||
g++ $(CXXFLAGS) -c -o $@ $<
|
g++ $(CXXFLAGS) -c -o $@ $<
|
||||||
timerfd_interval.o: timerfd_interval.cpp ringloop.h timerfd_interval.h
|
timerfd_interval.o: timerfd_interval.cpp ringloop.h timerfd_interval.h
|
||||||
g++ $(CXXFLAGS) -c -o $@ $<
|
g++ $(CXXFLAGS) -c -o $@ $<
|
||||||
timerfd_manager.o: timerfd_manager.cpp timerfd_manager.h
|
timerfd_manager.o: timerfd_manager.cpp ringloop.h timerfd_manager.h
|
||||||
g++ $(CXXFLAGS) -c -o $@ $<
|
g++ $(CXXFLAGS) -c -o $@ $<
|
||||||
|
|
|
@ -2,17 +2,10 @@
|
||||||
|
|
||||||
void osd_messenger_t::read_requests()
|
void osd_messenger_t::read_requests()
|
||||||
{
|
{
|
||||||
for (int i = 0; i < read_ready_clients.size(); i++)
|
while (read_ready_clients.size() > 0)
|
||||||
{
|
{
|
||||||
int peer_fd = read_ready_clients[i];
|
int peer_fd = read_ready_clients[0];
|
||||||
auto & cl = clients[peer_fd];
|
auto & cl = clients[peer_fd];
|
||||||
io_uring_sqe* sqe = ringloop->get_sqe();
|
|
||||||
if (!sqe)
|
|
||||||
{
|
|
||||||
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
|
||||||
if (!cl.read_op || cl.read_remaining < receive_buffer_size)
|
if (!cl.read_op || cl.read_remaining < receive_buffer_size)
|
||||||
{
|
{
|
||||||
cl.read_iov.iov_base = cl.in_buf;
|
cl.read_iov.iov_base = cl.in_buf;
|
||||||
|
@ -25,10 +18,14 @@ void osd_messenger_t::read_requests()
|
||||||
}
|
}
|
||||||
cl.read_msg.msg_iov = &cl.read_iov;
|
cl.read_msg.msg_iov = &cl.read_iov;
|
||||||
cl.read_msg.msg_iovlen = 1;
|
cl.read_msg.msg_iovlen = 1;
|
||||||
data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data->res, peer_fd); };
|
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + 1);
|
||||||
my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0);
|
int result = recvmsg(peer_fd, &cl.read_msg, 0);
|
||||||
|
if (result < 0)
|
||||||
|
{
|
||||||
|
result = -errno;
|
||||||
|
}
|
||||||
|
handle_read(result, peer_fd);
|
||||||
}
|
}
|
||||||
read_ready_clients.clear();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool osd_messenger_t::handle_read(int result, int peer_fd)
|
bool osd_messenger_t::handle_read(int result, int peer_fd)
|
||||||
|
|
|
@ -42,12 +42,6 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
|
||||||
bool osd_messenger_t::try_send(osd_client_t & cl)
|
bool osd_messenger_t::try_send(osd_client_t & cl)
|
||||||
{
|
{
|
||||||
int peer_fd = cl.peer_fd;
|
int peer_fd = cl.peer_fd;
|
||||||
io_uring_sqe* sqe = ringloop->get_sqe();
|
|
||||||
if (!sqe)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
|
||||||
if (!cl.write_op)
|
if (!cl.write_op)
|
||||||
{
|
{
|
||||||
// pick next command
|
// pick next command
|
||||||
|
@ -84,23 +78,21 @@ bool osd_messenger_t::try_send(osd_client_t & cl)
|
||||||
}
|
}
|
||||||
cl.write_msg.msg_iov = cl.write_op->send_list.get_iovec();
|
cl.write_msg.msg_iov = cl.write_op->send_list.get_iovec();
|
||||||
cl.write_msg.msg_iovlen = cl.write_op->send_list.get_size();
|
cl.write_msg.msg_iovlen = cl.write_op->send_list.get_size();
|
||||||
data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data->res, peer_fd); };
|
int result = sendmsg(peer_fd, &cl.write_msg, MSG_NOSIGNAL);
|
||||||
my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0);
|
if (result < 0)
|
||||||
|
result = -errno;
|
||||||
|
handle_send(result, peer_fd);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void osd_messenger_t::send_replies()
|
void osd_messenger_t::send_replies()
|
||||||
{
|
{
|
||||||
for (int i = 0; i < write_ready_clients.size(); i++)
|
while (write_ready_clients.size() > 0)
|
||||||
{
|
{
|
||||||
int peer_fd = write_ready_clients[i];
|
auto & cl = clients[write_ready_clients[0]];
|
||||||
if (!try_send(clients[peer_fd]))
|
write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + 1);
|
||||||
{
|
try_send(cl);
|
||||||
write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + i);
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
write_ready_clients.clear();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void osd_messenger_t::handle_send(int result, int peer_fd)
|
void osd_messenger_t::handle_send(int result, int peer_fd)
|
||||||
|
|
95
osd.cpp
95
osd.cpp
|
@ -1,5 +1,6 @@
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <sys/epoll.h>
|
#include <sys/epoll.h>
|
||||||
|
#include <sys/eventfd.h>
|
||||||
#include <sys/poll.h>
|
#include <sys/poll.h>
|
||||||
#include <netinet/in.h>
|
#include <netinet/in.h>
|
||||||
#include <netinet/tcp.h>
|
#include <netinet/tcp.h>
|
||||||
|
@ -43,8 +44,14 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
|
||||||
{
|
{
|
||||||
throw std::runtime_error(std::string("epoll_create: ") + strerror(errno));
|
throw std::runtime_error(std::string("epoll_create: ") + strerror(errno));
|
||||||
}
|
}
|
||||||
|
event_fd = eventfd(0, EFD_NONBLOCK);
|
||||||
|
if (event_fd < 0)
|
||||||
|
{
|
||||||
|
throw std::runtime_error(std::string("eventfd: ") + strerror(errno));
|
||||||
|
}
|
||||||
|
|
||||||
this->tfd = new timerfd_manager_t([this](int fd, std::function<void(int, int)> handler) { set_fd_handler(fd, handler); });
|
this->tfd = new timerfd_manager_t(ringloop);
|
||||||
|
this->tfd->set_fd_handler = [this](int fd, std::function<void(int, int)> handler) { set_fd_handler(fd, handler); };
|
||||||
this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id)
|
this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id)
|
||||||
{
|
{
|
||||||
print_stats();
|
print_stats();
|
||||||
|
@ -59,17 +66,40 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
|
||||||
|
|
||||||
consumer.loop = [this]() { loop(); };
|
consumer.loop = [this]() { loop(); };
|
||||||
ringloop->register_consumer(&consumer);
|
ringloop->register_consumer(&consumer);
|
||||||
|
epoll_thread = new std::thread([this]()
|
||||||
|
{
|
||||||
|
int nfds;
|
||||||
|
epoll_event events[MAX_EPOLL_EVENTS];
|
||||||
|
while (1)
|
||||||
|
{
|
||||||
|
nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, -1);
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> guard(epoll_mutex);
|
||||||
|
for (int i = 0; i < nfds; i++)
|
||||||
|
{
|
||||||
|
int fd = events[i].data.fd;
|
||||||
|
int ev = events[i].events;
|
||||||
|
epoll_ready[fd] |= ev;
|
||||||
|
}
|
||||||
|
uint64_t n = 1;
|
||||||
|
write(event_fd, &n, 8);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
osd_t::~osd_t()
|
osd_t::~osd_t()
|
||||||
{
|
{
|
||||||
|
close(epoll_fd);
|
||||||
|
epoll_thread->join();
|
||||||
|
delete epoll_thread;
|
||||||
if (tfd)
|
if (tfd)
|
||||||
{
|
{
|
||||||
delete tfd;
|
delete tfd;
|
||||||
tfd = NULL;
|
tfd = NULL;
|
||||||
}
|
}
|
||||||
ringloop->unregister_consumer(&consumer);
|
ringloop->unregister_consumer(&consumer);
|
||||||
close(epoll_fd);
|
close(event_fd);
|
||||||
close(listen_fd);
|
close(listen_fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -188,8 +218,13 @@ void osd_t::bind_socket()
|
||||||
{
|
{
|
||||||
close(listen_fd);
|
close(listen_fd);
|
||||||
close(epoll_fd);
|
close(epoll_fd);
|
||||||
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
|
throw std::runtime_error(std::string("epoll_ctl (add listen_fd): ") + strerror(errno));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
epoll_handlers[listen_fd] = [this](int peer_fd, int epoll_events)
|
||||||
|
{
|
||||||
|
c_cli.accept_connections(listen_fd);
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
bool osd_t::shutdown()
|
bool osd_t::shutdown()
|
||||||
|
@ -204,10 +239,23 @@ bool osd_t::shutdown()
|
||||||
|
|
||||||
void osd_t::loop()
|
void osd_t::loop()
|
||||||
{
|
{
|
||||||
if (!wait_state)
|
std::map<int,int> cur_epoll;
|
||||||
{
|
{
|
||||||
handle_epoll_events();
|
std::lock_guard<std::mutex> guard(epoll_mutex);
|
||||||
wait_state = 1;
|
cur_epoll.swap(epoll_ready);
|
||||||
|
}
|
||||||
|
for (auto p: cur_epoll)
|
||||||
|
{
|
||||||
|
auto cb_it = epoll_handlers.find(p.first);
|
||||||
|
if (cb_it != epoll_handlers.end())
|
||||||
|
{
|
||||||
|
cb_it->second(p.first, p.second);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!(wait_state & 2))
|
||||||
|
{
|
||||||
|
handle_eventfd();
|
||||||
|
wait_state = wait_state | 2;
|
||||||
}
|
}
|
||||||
handle_peers();
|
handle_peers();
|
||||||
c_cli.read_requests();
|
c_cli.read_requests();
|
||||||
|
@ -225,7 +273,7 @@ void osd_t::set_fd_handler(int fd, std::function<void(int, int)> handler)
|
||||||
ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET;
|
ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET;
|
||||||
if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0)
|
if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0)
|
||||||
{
|
{
|
||||||
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
|
throw std::runtime_error(std::string(exists ? "epoll_ctl (mod fd): " : "epoll_ctl (add fd): ") + strerror(errno));
|
||||||
}
|
}
|
||||||
epoll_handlers[fd] = handler;
|
epoll_handlers[fd] = handler;
|
||||||
}
|
}
|
||||||
|
@ -233,49 +281,36 @@ void osd_t::set_fd_handler(int fd, std::function<void(int, int)> handler)
|
||||||
{
|
{
|
||||||
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) < 0 && errno != ENOENT)
|
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) < 0 && errno != ENOENT)
|
||||||
{
|
{
|
||||||
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
|
throw std::runtime_error(std::string("epoll_ctl (remove fd): ") + strerror(errno));
|
||||||
}
|
}
|
||||||
epoll_handlers.erase(fd);
|
epoll_handlers.erase(fd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void osd_t::handle_epoll_events()
|
void osd_t::handle_eventfd()
|
||||||
{
|
{
|
||||||
io_uring_sqe *sqe = ringloop->get_sqe();
|
io_uring_sqe *sqe = ringloop->get_sqe();
|
||||||
if (!sqe)
|
if (!sqe)
|
||||||
{
|
{
|
||||||
throw std::runtime_error("can't get SQE, will fall out of sync with EPOLLET");
|
throw std::runtime_error("can't get SQE, will fall out of sync with eventfd");
|
||||||
}
|
}
|
||||||
ring_data_t *data = ((ring_data_t*)sqe->user_data);
|
ring_data_t *data = ((ring_data_t*)sqe->user_data);
|
||||||
my_uring_prep_poll_add(sqe, epoll_fd, POLLIN);
|
my_uring_prep_poll_add(sqe, event_fd, POLLIN);
|
||||||
data->callback = [this](ring_data_t *data)
|
data->callback = [this](ring_data_t *data)
|
||||||
{
|
{
|
||||||
if (data->res < 0)
|
if (data->res < 0)
|
||||||
{
|
{
|
||||||
throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res));
|
throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res));
|
||||||
}
|
}
|
||||||
handle_epoll_events();
|
handle_eventfd();
|
||||||
};
|
};
|
||||||
ringloop->submit();
|
ringloop->submit();
|
||||||
int nfds;
|
uint64_t n = 0;
|
||||||
epoll_event events[MAX_EPOLL_EVENTS];
|
size_t res = read(event_fd, &n, 8);
|
||||||
restart:
|
if (res == 8)
|
||||||
nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, 0);
|
|
||||||
for (int i = 0; i < nfds; i++)
|
|
||||||
{
|
{
|
||||||
if (events[i].data.fd == listen_fd)
|
// No need to do anything, the loop has already woken up
|
||||||
{
|
ringloop->wakeup();
|
||||||
c_cli.accept_connections(listen_fd);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
auto & cb = epoll_handlers[events[i].data.fd];
|
|
||||||
cb(events[i].data.fd, events[i].events);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (nfds == MAX_EPOLL_EVENTS)
|
|
||||||
{
|
|
||||||
goto restart;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
8
osd.h
8
osd.h
|
@ -12,6 +12,8 @@
|
||||||
|
|
||||||
#include <set>
|
#include <set>
|
||||||
#include <deque>
|
#include <deque>
|
||||||
|
#include <mutex>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
#include "blockstore.h"
|
#include "blockstore.h"
|
||||||
#include "ringloop.h"
|
#include "ringloop.h"
|
||||||
|
@ -114,6 +116,10 @@ class osd_t
|
||||||
|
|
||||||
int wait_state = 0;
|
int wait_state = 0;
|
||||||
int epoll_fd = 0;
|
int epoll_fd = 0;
|
||||||
|
int event_fd = 0;
|
||||||
|
std::thread *epoll_thread = NULL;
|
||||||
|
std::mutex epoll_mutex;
|
||||||
|
std::map<int, int> epoll_ready;
|
||||||
int listening_port = 0;
|
int listening_port = 0;
|
||||||
int listen_fd = 0;
|
int listen_fd = 0;
|
||||||
ring_consumer_t consumer;
|
ring_consumer_t consumer;
|
||||||
|
@ -150,7 +156,7 @@ class osd_t
|
||||||
// event loop, socket read/write
|
// event loop, socket read/write
|
||||||
void loop();
|
void loop();
|
||||||
void set_fd_handler(int fd, std::function<void(int, int)> handler);
|
void set_fd_handler(int fd, std::function<void(int, int)> handler);
|
||||||
void handle_epoll_events();
|
void handle_eventfd();
|
||||||
|
|
||||||
// peer handling (primary OSD logic)
|
// peer handling (primary OSD logic)
|
||||||
void parse_test_peer(std::string peer);
|
void parse_test_peer(std::string peer);
|
||||||
|
|
|
@ -1,29 +1,24 @@
|
||||||
#include <sys/timerfd.h>
|
#include <sys/timerfd.h>
|
||||||
#include <sys/poll.h>
|
#include <sys/poll.h>
|
||||||
#include <sys/epoll.h>
|
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <errno.h>
|
|
||||||
#include <string.h>
|
|
||||||
#include "timerfd_manager.h"
|
#include "timerfd_manager.h"
|
||||||
|
|
||||||
timerfd_manager_t::timerfd_manager_t(std::function<void(int, std::function<void(int, int)>)> set_fd_handler)
|
timerfd_manager_t::timerfd_manager_t(ring_loop_t *ringloop)
|
||||||
{
|
{
|
||||||
this->set_fd_handler = set_fd_handler;
|
|
||||||
wait_state = 0;
|
wait_state = 0;
|
||||||
timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
|
timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
|
||||||
if (timerfd < 0)
|
if (timerfd < 0)
|
||||||
{
|
{
|
||||||
throw std::runtime_error(std::string("timerfd_create: ") + strerror(errno));
|
throw std::runtime_error(std::string("timerfd_create: ") + strerror(errno));
|
||||||
}
|
}
|
||||||
set_fd_handler(timerfd, [this](int fd, int events)
|
consumer.loop = [this]() { loop(); };
|
||||||
{
|
ringloop->register_consumer(&consumer);
|
||||||
handle_readable();
|
this->ringloop = ringloop;
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
timerfd_manager_t::~timerfd_manager_t()
|
timerfd_manager_t::~timerfd_manager_t()
|
||||||
{
|
{
|
||||||
set_fd_handler(timerfd, NULL);
|
ringloop->unregister_consumer(&consumer);
|
||||||
close(timerfd);
|
close(timerfd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -53,6 +48,7 @@ int timerfd_manager_t::set_timer(uint64_t millis, bool repeat, std::function<voi
|
||||||
});
|
});
|
||||||
inc_timer(timers[timers.size()-1]);
|
inc_timer(timers[timers.size()-1]);
|
||||||
set_nearest();
|
set_nearest();
|
||||||
|
set_wait();
|
||||||
return timer_id;
|
return timer_id;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,6 +69,7 @@ void timerfd_manager_t::clear_timer(int timer_id)
|
||||||
nearest--;
|
nearest--;
|
||||||
}
|
}
|
||||||
set_nearest();
|
set_nearest();
|
||||||
|
set_wait();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -157,3 +154,36 @@ void timerfd_manager_t::trigger_nearest()
|
||||||
cb(nearest_id);
|
cb(nearest_id);
|
||||||
nearest = -1;
|
nearest = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void timerfd_manager_t::loop()
|
||||||
|
{
|
||||||
|
if (!(wait_state & 1) && timers.size())
|
||||||
|
{
|
||||||
|
set_nearest();
|
||||||
|
}
|
||||||
|
set_wait();
|
||||||
|
}
|
||||||
|
|
||||||
|
void timerfd_manager_t::set_wait()
|
||||||
|
{
|
||||||
|
if ((wait_state & 3) == 1)
|
||||||
|
{
|
||||||
|
io_uring_sqe *sqe = ringloop->get_sqe();
|
||||||
|
if (!sqe)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ring_data_t *data = ((ring_data_t*)sqe->user_data);
|
||||||
|
my_uring_prep_poll_add(sqe, timerfd, POLLIN);
|
||||||
|
data->callback = [this](ring_data_t *data)
|
||||||
|
{
|
||||||
|
if (data->res < 0)
|
||||||
|
{
|
||||||
|
throw std::runtime_error(std::string("waiting for timer failed: ") + strerror(-data->res));
|
||||||
|
}
|
||||||
|
handle_readable();
|
||||||
|
set_wait();
|
||||||
|
};
|
||||||
|
wait_state = 3;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,8 +1,7 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <time.h>
|
#include <time.h>
|
||||||
#include <vector>
|
#include "ringloop.h"
|
||||||
#include <functional>
|
|
||||||
|
|
||||||
struct timerfd_timer_t
|
struct timerfd_timer_t
|
||||||
{
|
{
|
||||||
|
@ -20,15 +19,20 @@ class timerfd_manager_t
|
||||||
int nearest = -1;
|
int nearest = -1;
|
||||||
int id = 1;
|
int id = 1;
|
||||||
std::vector<timerfd_timer_t> timers;
|
std::vector<timerfd_timer_t> timers;
|
||||||
|
ring_loop_t *ringloop;
|
||||||
|
ring_consumer_t consumer;
|
||||||
|
|
||||||
void inc_timer(timerfd_timer_t & t);
|
void inc_timer(timerfd_timer_t & t);
|
||||||
void set_nearest();
|
void set_nearest();
|
||||||
void trigger_nearest();
|
void trigger_nearest();
|
||||||
void handle_readable();
|
void handle_readable();
|
||||||
|
void set_wait();
|
||||||
|
void loop();
|
||||||
public:
|
public:
|
||||||
|
// FIXME shouldn't be here
|
||||||
std::function<void(int, std::function<void(int, int)>)> set_fd_handler;
|
std::function<void(int, std::function<void(int, int)>)> set_fd_handler;
|
||||||
|
|
||||||
timerfd_manager_t(std::function<void(int, std::function<void(int, int)>)> set_fd_handler);
|
timerfd_manager_t(ring_loop_t *ringloop);
|
||||||
~timerfd_manager_t();
|
~timerfd_manager_t();
|
||||||
int set_timer(uint64_t millis, bool repeat, std::function<void(int)> callback);
|
int set_timer(uint64_t millis, bool repeat, std::function<void(int)> callback);
|
||||||
void clear_timer(int timer_id);
|
void clear_timer(int timer_id);
|
||||||
|
|
Loading…
Reference in New Issue