Compare commits

..

2 Commits

Author SHA1 Message Date
85010fed38 Use a separate thread for epoll
Simplest, but absolutely inefficient, way to test openonload epoll
2020-06-09 00:52:43 +03:00
2498e504c2 Add ringloop back to timerfd 2020-06-09 00:52:43 +03:00
14 changed files with 124 additions and 126 deletions

View File

@@ -18,7 +18,7 @@ OSD_OBJS := osd.o osd_secondary.o msgr_receive.o msgr_send.o osd_peering.o osd_f
osd_primary.o osd_primary_subops.o etcd_state_client.o messenger.o osd_cluster.o http_client.o pg_states.o \ osd_primary.o osd_primary_subops.o etcd_state_client.o messenger.o osd_cluster.o http_client.o pg_states.o \
osd_rmw.o json11.o base64.o timerfd_manager.o osd_rmw.o json11.o base64.o timerfd_manager.o
osd: ./libblockstore.so osd_main.cpp osd.h osd_ops.h $(OSD_OBJS) osd: ./libblockstore.so osd_main.cpp osd.h osd_ops.h $(OSD_OBJS)
g++ $(CXXFLAGS) -o $@ osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring g++ $(CXXFLAGS) -o $@ osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring -lpthread
stub_osd: stub_osd.o rw_blocking.o stub_osd: stub_osd.o rw_blocking.o
g++ $(CXXFLAGS) -o $@ stub_osd.o rw_blocking.o -ltcmalloc_minimal g++ $(CXXFLAGS) -o $@ stub_osd.o rw_blocking.o -ltcmalloc_minimal
@@ -87,7 +87,7 @@ dump_journal.o: dump_journal.cpp allocator.h blockstore.h blockstore_flush.h blo
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
epoll_manager.o: epoll_manager.cpp epoll_manager.h ringloop.h timerfd_manager.h epoll_manager.o: epoll_manager.cpp epoll_manager.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
etcd_state_client.o: etcd_state_client.cpp base64.h etcd_state_client.h http_client.h json11/json11.hpp object_id.h osd_id.h osd_ops.h pg_states.h timerfd_manager.h etcd_state_client.o: etcd_state_client.cpp base64.h etcd_state_client.h http_client.h json11/json11.hpp object_id.h osd_id.h osd_ops.h pg_states.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
fio_cluster.o: fio_cluster.cpp cluster_client.h epoll_manager.h etcd_state_client.h fio/fio.h fio/optgroup.h http_client.h json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h fio_cluster.o: fio_cluster.cpp cluster_client.h epoll_manager.h etcd_state_client.h fio/fio.h fio/optgroup.h http_client.h json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
@@ -95,7 +95,7 @@ fio_engine.o: fio_engine.cpp blockstore.h fio/fio.h fio/optgroup.h json11/json11
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
fio_sec_osd.o: fio_sec_osd.cpp fio/fio.h fio/optgroup.h object_id.h osd_id.h osd_ops.h rw_blocking.h fio_sec_osd.o: fio_sec_osd.cpp fio/fio.h fio/optgroup.h object_id.h osd_id.h osd_ops.h rw_blocking.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
http_client.o: http_client.cpp http_client.h json11/json11.hpp timerfd_manager.h http_client.o: http_client.cpp http_client.h json11/json11.hpp ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
messenger.o: messenger.cpp json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h messenger.o: messenger.cpp json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
@@ -149,5 +149,5 @@ test_blockstore.o: test_blockstore.cpp blockstore.h object_id.h ringloop.h timer
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
timerfd_interval.o: timerfd_interval.cpp ringloop.h timerfd_interval.h timerfd_interval.o: timerfd_interval.cpp ringloop.h timerfd_interval.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
timerfd_manager.o: timerfd_manager.cpp timerfd_manager.h timerfd_manager.o: timerfd_manager.cpp ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<

View File

@@ -33,12 +33,6 @@ journal_flusher_co::journal_flusher_co()
); );
} }
wait_count--; wait_count--;
if (!wait_count)
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("finished %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
}; };
simple_callback_w = [this](ring_data_t* data) simple_callback_w = [this](ring_data_t* data)
{ {
@@ -51,12 +45,6 @@ journal_flusher_co::journal_flusher_co()
); );
} }
wait_count--; wait_count--;
if (!wait_count)
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("finished %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
}; };
} }
@@ -134,11 +122,6 @@ void journal_flusher_t::release_trim()
#define await_sqe(label) \ #define await_sqe(label) \
resume_##label:\ resume_##label:\
{\
timespec now;\
clock_gettime(CLOCK_REALTIME, &now);\
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);\
}\
sqe = bs->get_sqe();\ sqe = bs->get_sqe();\
if (!sqe)\ if (!sqe)\
{\ {\

View File

@@ -62,11 +62,6 @@
struct ring_data_t *data = ((ring_data_t*)sqe->user_data) struct ring_data_t *data = ((ring_data_t*)sqe->user_data)
#define BS_SUBMIT_GET_ONLY_SQE(sqe) \ #define BS_SUBMIT_GET_ONLY_SQE(sqe) \
{\
timespec now;\
clock_gettime(CLOCK_REALTIME, &now);\
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);\
}\
struct io_uring_sqe *sqe = get_sqe();\ struct io_uring_sqe *sqe = get_sqe();\
if (!sqe)\ if (!sqe)\
{\ {\
@@ -76,11 +71,6 @@
} }
#define BS_SUBMIT_GET_SQE_DECL(sqe) \ #define BS_SUBMIT_GET_SQE_DECL(sqe) \
{\
timespec now;\
clock_gettime(CLOCK_REALTIME, &now);\
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);\
}\
sqe = get_sqe();\ sqe = get_sqe();\
if (!sqe)\ if (!sqe)\
{\ {\

View File

@@ -147,11 +147,6 @@ resume_2:
resume_3: resume_3:
if (!disable_journal_fsync) if (!disable_journal_fsync)
{ {
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
io_uring_sqe *sqe = get_sqe(); io_uring_sqe *sqe = get_sqe();
if (!sqe) if (!sqe)
{ {
@@ -242,11 +237,6 @@ void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t *
PRIV(op)->pending_ops--; PRIV(op)->pending_ops--;
if (PRIV(op)->pending_ops == 0) if (PRIV(op)->pending_ops == 0)
{ {
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("finished %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
PRIV(op)->op_state++; PRIV(op)->op_state++;
if (!continue_stable(op)) if (!continue_stable(op))
{ {

View File

@@ -302,11 +302,6 @@ int blockstore_impl_t::continue_write(blockstore_op_t *op)
return 1; return 1;
resume_2: resume_2:
// Only for the immediate_commit mode: prepare and submit big_write journal entry // Only for the immediate_commit mode: prepare and submit big_write journal entry
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
sqe = get_sqe(); sqe = get_sqe();
if (!sqe) if (!sqe)
{ {
@@ -338,11 +333,6 @@ resume_2:
return 1; return 1;
resume_4: resume_4:
// Switch object state // Switch object state
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("write_done %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
#ifdef BLOCKSTORE_DEBUG #ifdef BLOCKSTORE_DEBUG
printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state); printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state);
#endif #endif

View File

@@ -57,11 +57,6 @@ void epoll_manager_t::set_fd_handler(int fd, std::function<void(int, int)> handl
void epoll_manager_t::handle_epoll_events() void epoll_manager_t::handle_epoll_events()
{ {
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("epoll %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
io_uring_sqe *sqe = ringloop->get_sqe(); io_uring_sqe *sqe = ringloop->get_sqe();
if (!sqe) if (!sqe)
{ {

View File

@@ -24,11 +24,6 @@ void osd_messenger_t::read_requests()
{ {
result = -errno; result = -errno;
} }
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("recvmsg done %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
handle_read(result, peer_fd); handle_read(result, peer_fd);
} }
} }

View File

@@ -81,11 +81,6 @@ bool osd_messenger_t::try_send(osd_client_t & cl)
int result = sendmsg(peer_fd, &cl.write_msg, MSG_NOSIGNAL); int result = sendmsg(peer_fd, &cl.write_msg, MSG_NOSIGNAL);
if (result < 0) if (result < 0)
result = -errno; result = -errno;
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("sendmsg done %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
handle_send(result, peer_fd); handle_send(result, peer_fd);
return true; return true;
} }

100
osd.cpp
View File

@@ -1,5 +1,6 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/epoll.h> #include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/poll.h> #include <sys/poll.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <netinet/tcp.h> #include <netinet/tcp.h>
@@ -43,8 +44,14 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
{ {
throw std::runtime_error(std::string("epoll_create: ") + strerror(errno)); throw std::runtime_error(std::string("epoll_create: ") + strerror(errno));
} }
event_fd = eventfd(0, EFD_NONBLOCK);
if (event_fd < 0)
{
throw std::runtime_error(std::string("eventfd: ") + strerror(errno));
}
this->tfd = new timerfd_manager_t([this](int fd, std::function<void(int, int)> handler) { set_fd_handler(fd, handler); }); this->tfd = new timerfd_manager_t(ringloop);
this->tfd->set_fd_handler = [this](int fd, std::function<void(int, int)> handler) { set_fd_handler(fd, handler); };
this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id) this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id)
{ {
print_stats(); print_stats();
@@ -59,17 +66,40 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
consumer.loop = [this]() { loop(); }; consumer.loop = [this]() { loop(); };
ringloop->register_consumer(&consumer); ringloop->register_consumer(&consumer);
epoll_thread = new std::thread([this]()
{
int nfds;
epoll_event events[MAX_EPOLL_EVENTS];
while (1)
{
nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, -1);
{
std::lock_guard<std::mutex> guard(epoll_mutex);
for (int i = 0; i < nfds; i++)
{
int fd = events[i].data.fd;
int ev = events[i].events;
epoll_ready[fd] |= ev;
}
uint64_t n = 1;
write(event_fd, &n, 8);
}
}
});
} }
osd_t::~osd_t() osd_t::~osd_t()
{ {
close(epoll_fd);
epoll_thread->join();
delete epoll_thread;
if (tfd) if (tfd)
{ {
delete tfd; delete tfd;
tfd = NULL; tfd = NULL;
} }
ringloop->unregister_consumer(&consumer); ringloop->unregister_consumer(&consumer);
close(epoll_fd); close(event_fd);
close(listen_fd); close(listen_fd);
} }
@@ -188,8 +218,13 @@ void osd_t::bind_socket()
{ {
close(listen_fd); close(listen_fd);
close(epoll_fd); close(epoll_fd);
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); throw std::runtime_error(std::string("epoll_ctl (add listen_fd): ") + strerror(errno));
} }
epoll_handlers[listen_fd] = [this](int peer_fd, int epoll_events)
{
c_cli.accept_connections(listen_fd);
};
} }
bool osd_t::shutdown() bool osd_t::shutdown()
@@ -204,10 +239,23 @@ bool osd_t::shutdown()
void osd_t::loop() void osd_t::loop()
{ {
if (!wait_state) std::map<int,int> cur_epoll;
{ {
handle_epoll_events(); std::lock_guard<std::mutex> guard(epoll_mutex);
wait_state = 1; cur_epoll.swap(epoll_ready);
}
for (auto p: cur_epoll)
{
auto cb_it = epoll_handlers.find(p.first);
if (cb_it != epoll_handlers.end())
{
cb_it->second(p.first, p.second);
}
}
if (!(wait_state & 2))
{
handle_eventfd();
wait_state = wait_state | 2;
} }
handle_peers(); handle_peers();
c_cli.read_requests(); c_cli.read_requests();
@@ -225,7 +273,7 @@ void osd_t::set_fd_handler(int fd, std::function<void(int, int)> handler)
ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET; ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET;
if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0) if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0)
{ {
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); throw std::runtime_error(std::string(exists ? "epoll_ctl (mod fd): " : "epoll_ctl (add fd): ") + strerror(errno));
} }
epoll_handlers[fd] = handler; epoll_handlers[fd] = handler;
} }
@@ -233,54 +281,36 @@ void osd_t::set_fd_handler(int fd, std::function<void(int, int)> handler)
{ {
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) < 0 && errno != ENOENT) if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) < 0 && errno != ENOENT)
{ {
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); throw std::runtime_error(std::string("epoll_ctl (remove fd): ") + strerror(errno));
} }
epoll_handlers.erase(fd); epoll_handlers.erase(fd);
} }
} }
void osd_t::handle_epoll_events() void osd_t::handle_eventfd()
{ {
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("epoll %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
io_uring_sqe *sqe = ringloop->get_sqe(); io_uring_sqe *sqe = ringloop->get_sqe();
if (!sqe) if (!sqe)
{ {
throw std::runtime_error("can't get SQE, will fall out of sync with EPOLLET"); throw std::runtime_error("can't get SQE, will fall out of sync with eventfd");
} }
ring_data_t *data = ((ring_data_t*)sqe->user_data); ring_data_t *data = ((ring_data_t*)sqe->user_data);
my_uring_prep_poll_add(sqe, epoll_fd, POLLIN); my_uring_prep_poll_add(sqe, event_fd, POLLIN);
data->callback = [this](ring_data_t *data) data->callback = [this](ring_data_t *data)
{ {
if (data->res < 0) if (data->res < 0)
{ {
throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res)); throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res));
} }
handle_epoll_events(); handle_eventfd();
}; };
ringloop->submit(); ringloop->submit();
int nfds; uint64_t n = 0;
epoll_event events[MAX_EPOLL_EVENTS]; size_t res = read(event_fd, &n, 8);
restart: if (res == 8)
nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, 0);
for (int i = 0; i < nfds; i++)
{ {
if (events[i].data.fd == listen_fd) // No need to do anything, the loop has already woken up
{ ringloop->wakeup();
c_cli.accept_connections(listen_fd);
}
else
{
auto & cb = epoll_handlers[events[i].data.fd];
cb(events[i].data.fd, events[i].events);
}
}
if (nfds == MAX_EPOLL_EVENTS)
{
goto restart;
} }
} }

8
osd.h
View File

@@ -12,6 +12,8 @@
#include <set> #include <set>
#include <deque> #include <deque>
#include <mutex>
#include <thread>
#include "blockstore.h" #include "blockstore.h"
#include "ringloop.h" #include "ringloop.h"
@@ -114,6 +116,10 @@ class osd_t
int wait_state = 0; int wait_state = 0;
int epoll_fd = 0; int epoll_fd = 0;
int event_fd = 0;
std::thread *epoll_thread = NULL;
std::mutex epoll_mutex;
std::map<int, int> epoll_ready;
int listening_port = 0; int listening_port = 0;
int listen_fd = 0; int listen_fd = 0;
ring_consumer_t consumer; ring_consumer_t consumer;
@@ -150,7 +156,7 @@ class osd_t
// event loop, socket read/write // event loop, socket read/write
void loop(); void loop();
void set_fd_handler(int fd, std::function<void(int, int)> handler); void set_fd_handler(int fd, std::function<void(int, int)> handler);
void handle_epoll_events(); void handle_eventfd();
// peer handling (primary OSD logic) // peer handling (primary OSD logic)
void parse_test_peer(std::string peer); void parse_test_peer(std::string peer);

View File

@@ -198,7 +198,6 @@ void osd_t::continue_primary_write(osd_op_t *cur_op)
else if (op_data->st == 8) goto resume_8; else if (op_data->st == 8) goto resume_8;
else if (op_data->st == 9) goto resume_9; else if (op_data->st == 9) goto resume_9;
assert(op_data->st == 0); assert(op_data->st == 0);
printf("primary_write\n");
if (!check_write_queue(cur_op, pg)) if (!check_write_queue(cur_op, pg))
{ {
return; return;
@@ -390,7 +389,6 @@ void osd_t::continue_primary_sync(osd_op_t *cur_op)
else if (op_data->st == 5) goto resume_5; else if (op_data->st == 5) goto resume_5;
else if (op_data->st == 6) goto resume_6; else if (op_data->st == 6) goto resume_6;
assert(op_data->st == 0); assert(op_data->st == 0);
printf("primary_sync\n");
if (syncs_in_progress.size() > 0) if (syncs_in_progress.size() > 0)
{ {
// Wait for previous syncs, if any // Wait for previous syncs, if any

View File

@@ -4,8 +4,6 @@
#define _LARGEFILE64_SOURCE #define _LARGEFILE64_SOURCE
#endif #endif
#include <stdio.h>
#include <time.h>
#include <string.h> #include <string.h>
#include <assert.h> #include <assert.h>
#include <liburing.h> #include <liburing.h>
@@ -160,13 +158,7 @@ public:
} }
inline int submit() inline int submit()
{ {
int r = io_uring_submit(&ring); return io_uring_submit(&ring);
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("submit %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
return r;
} }
inline int wait() inline int wait()
{ {

View File

@@ -1,29 +1,24 @@
#include <sys/timerfd.h> #include <sys/timerfd.h>
#include <sys/poll.h> #include <sys/poll.h>
#include <sys/epoll.h>
#include <unistd.h> #include <unistd.h>
#include <errno.h>
#include <string.h>
#include "timerfd_manager.h" #include "timerfd_manager.h"
timerfd_manager_t::timerfd_manager_t(std::function<void(int, std::function<void(int, int)>)> set_fd_handler) timerfd_manager_t::timerfd_manager_t(ring_loop_t *ringloop)
{ {
this->set_fd_handler = set_fd_handler;
wait_state = 0; wait_state = 0;
timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
if (timerfd < 0) if (timerfd < 0)
{ {
throw std::runtime_error(std::string("timerfd_create: ") + strerror(errno)); throw std::runtime_error(std::string("timerfd_create: ") + strerror(errno));
} }
set_fd_handler(timerfd, [this](int fd, int events) consumer.loop = [this]() { loop(); };
{ ringloop->register_consumer(&consumer);
handle_readable(); this->ringloop = ringloop;
});
} }
timerfd_manager_t::~timerfd_manager_t() timerfd_manager_t::~timerfd_manager_t()
{ {
set_fd_handler(timerfd, NULL); ringloop->unregister_consumer(&consumer);
close(timerfd); close(timerfd);
} }
@@ -53,6 +48,7 @@ int timerfd_manager_t::set_timer(uint64_t millis, bool repeat, std::function<voi
}); });
inc_timer(timers[timers.size()-1]); inc_timer(timers[timers.size()-1]);
set_nearest(); set_nearest();
set_wait();
return timer_id; return timer_id;
} }
@@ -73,6 +69,7 @@ void timerfd_manager_t::clear_timer(int timer_id)
nearest--; nearest--;
} }
set_nearest(); set_nearest();
set_wait();
break; break;
} }
} }
@@ -157,3 +154,36 @@ void timerfd_manager_t::trigger_nearest()
cb(nearest_id); cb(nearest_id);
nearest = -1; nearest = -1;
} }
void timerfd_manager_t::loop()
{
if (!(wait_state & 1) && timers.size())
{
set_nearest();
}
set_wait();
}
void timerfd_manager_t::set_wait()
{
if ((wait_state & 3) == 1)
{
io_uring_sqe *sqe = ringloop->get_sqe();
if (!sqe)
{
return;
}
ring_data_t *data = ((ring_data_t*)sqe->user_data);
my_uring_prep_poll_add(sqe, timerfd, POLLIN);
data->callback = [this](ring_data_t *data)
{
if (data->res < 0)
{
throw std::runtime_error(std::string("waiting for timer failed: ") + strerror(-data->res));
}
handle_readable();
set_wait();
};
wait_state = 3;
}
}

View File

@@ -1,8 +1,7 @@
#pragma once #pragma once
#include <time.h> #include <time.h>
#include <vector> #include "ringloop.h"
#include <functional>
struct timerfd_timer_t struct timerfd_timer_t
{ {
@@ -20,15 +19,20 @@ class timerfd_manager_t
int nearest = -1; int nearest = -1;
int id = 1; int id = 1;
std::vector<timerfd_timer_t> timers; std::vector<timerfd_timer_t> timers;
ring_loop_t *ringloop;
ring_consumer_t consumer;
void inc_timer(timerfd_timer_t & t); void inc_timer(timerfd_timer_t & t);
void set_nearest(); void set_nearest();
void trigger_nearest(); void trigger_nearest();
void handle_readable(); void handle_readable();
void set_wait();
void loop();
public: public:
// FIXME shouldn't be here
std::function<void(int, std::function<void(int, int)>)> set_fd_handler; std::function<void(int, std::function<void(int, int)>)> set_fd_handler;
timerfd_manager_t(std::function<void(int, std::function<void(int, int)>)> set_fd_handler); timerfd_manager_t(ring_loop_t *ringloop);
~timerfd_manager_t(); ~timerfd_manager_t();
int set_timer(uint64_t millis, bool repeat, std::function<void(int)> callback); int set_timer(uint64_t millis, bool repeat, std::function<void(int)> callback);
void clear_timer(int timer_id); void clear_timer(int timer_id);