1
0
Fork 0

Compare commits

...

3 Commits

17 changed files with 458 additions and 158 deletions

View File

@ -271,7 +271,7 @@ void http_co_t::close_connection()
} }
if (peer_fd >= 0) if (peer_fd >= 0)
{ {
tfd->set_fd_handler(peer_fd, false, NULL); tfd->set_fd_handler(peer_fd, 0, NULL);
close(peer_fd); close(peer_fd);
peer_fd = -1; peer_fd = -1;
} }
@ -314,7 +314,7 @@ void http_co_t::start_connection()
stackout(); stackout();
return; return;
} }
tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events) tfd->set_fd_handler(peer_fd, EPOLLOUT, [this](int peer_fd, int epoll_events)
{ {
this->epoll_events |= epoll_events; this->epoll_events |= epoll_events;
handle_events(); handle_events();
@ -372,7 +372,7 @@ void http_co_t::handle_connect_result()
} }
int one = 1; int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events) tfd->set_fd_handler(peer_fd, EPOLLIN, [this](int peer_fd, int epoll_events)
{ {
this->epoll_events |= epoll_events; this->epoll_events |= epoll_events;
handle_events(); handle_events();

View File

@ -15,6 +15,140 @@
#include "msgr_rdma.h" #include "msgr_rdma.h"
#endif #endif
#include <sys/poll.h>
#include <sys/eventfd.h>
msgr_iothread_t::msgr_iothread_t()
{
ring = new ring_loop_t(RINGLOOP_DEFAULT_SIZE);
epmgr = new epoll_manager_t(ring);
submit_eventfd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
if (submit_eventfd < 0)
{
throw std::runtime_error(std::string("failed to create eventfd: ")+strerror(errno));
}
epmgr->tfd->set_fd_handler(submit_eventfd, false, [this](int fd, int epoll_events)
{
// Reset eventfd counter
uint64_t ctr = 0;
int r = read(submit_eventfd, &ctr, 8);
if (r < 0 && errno != EAGAIN && errno != EINTR)
{
fprintf(stderr, "Error resetting eventfd: %s\n", strerror(errno));
}
// Submit pending events
submit_pending_writes();
});
thread = new std::thread(&msgr_iothread_t::run, this);
}
msgr_iothread_t::~msgr_iothread_t()
{
stop();
delete thread;
delete epmgr;
delete ring;
}
void msgr_iothread_t::stop()
{
mu.lock();
if (stopped)
{
mu.unlock();
return;
}
stopped = true;
if (callback_stopped)
{
*callback_stopped = true;
callback_stopped = NULL;
}
close(submit_eventfd);
mu.unlock();
thread->join();
}
static uint64_t one = 1;
void msgr_iothread_t::wakeup(osd_client_t *cl, ring_loop_t *outer_ring)
{
std::unique_lock<std::mutex> lock(mu);
if (!pending_clients.size())
{
io_uring_sqe* sqe = outer_ring->get_sqe();
if (!sqe)
{
write(submit_eventfd, &one, sizeof(one));
}
else
{
ring_data_t* data = ((ring_data_t*)sqe->user_data);
data->callback = [](ring_data_t*){};
my_uring_prep_write(sqe, submit_eventfd, &one, sizeof(one), 0);
}
}
add_pending(cl);
}
void msgr_iothread_t::add_pending(osd_client_t *cl)
{
auto it = std::lower_bound(pending_clients.begin(), pending_clients.end(), cl);
if (it == pending_clients.end() || *it != cl)
{
pending_clients.insert(it, cl);
}
}
void msgr_iothread_t::submit_pending_writes()
{
mu.lock();
std::vector<osd_client_t*> clients = std::move(pending_clients);
mu.unlock();
for (int i = 0; i < clients.size(); i++)
{
auto cl = clients[i];
std::unique_lock<std::mutex> lock(cl->mu);
if (cl->peer_state == PEER_STOPPED)
{
cl->refs--;
if (cl->refs <= 0)
delete cl;
continue;
}
if (!cl->try_send(ring, false/*, lock*/))
{
// ring is full (rare but what if...)
ring->submit();
mu.lock();
for (; i < clients.size(); i++)
{
add_pending(clients[i]);
}
mu.unlock();
return;
}
else
{
cl->refs--;
}
}
ring->submit();
}
void msgr_iothread_t::run()
{
while (true)
{
mu.lock();
if (stopped)
return;
mu.unlock();
ring->loop();
ring->wait();
}
}
void osd_messenger_t::init() void osd_messenger_t::init()
{ {
#ifdef WITH_RDMA #ifdef WITH_RDMA
@ -35,7 +169,7 @@ void osd_messenger_t::init()
? rdma_max_sge : rdma_context->attrx.orig_attr.max_sge; ? rdma_max_sge : rdma_context->attrx.orig_attr.max_sge;
fprintf(stderr, "[OSD %ju] RDMA initialized successfully\n", osd_num); fprintf(stderr, "[OSD %ju] RDMA initialized successfully\n", osd_num);
fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK); fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK);
tfd->set_fd_handler(rdma_context->channel->fd, false, [this](int notify_fd, int epoll_events) tfd->set_fd_handler(rdma_context->channel->fd, EPOLLIN, [this](int notify_fd, int epoll_events)
{ {
handle_rdma_events(); handle_rdma_events();
}); });
@ -43,6 +177,19 @@ void osd_messenger_t::init()
} }
} }
#endif #endif
if (ringloop && iothread_count > 0)
{
for (int i = 0; i < iothread_count; i++)
{
auto iot = new msgr_iothread_t();
iothreads.push_back(iot);
}
completion_eventfd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
if (completion_eventfd < 0)
{
throw std::runtime_error(std::string("failed to create completion eventfd: ")+strerror(errno));
}
}
keepalive_timer_id = tfd->set_timer(1000, true, [this](int) keepalive_timer_id = tfd->set_timer(1000, true, [this](int)
{ {
auto cl_it = clients.begin(); auto cl_it = clients.begin();
@ -120,6 +267,11 @@ void osd_messenger_t::init()
osd_messenger_t::~osd_messenger_t() osd_messenger_t::~osd_messenger_t()
{ {
if (completion_eventfd >= 0)
{
close(completion_eventfd);
completion_eventfd = -1;
}
if (keepalive_timer_id >= 0) if (keepalive_timer_id >= 0)
{ {
tfd->clear_timer(keepalive_timer_id); tfd->clear_timer(keepalive_timer_id);
@ -129,6 +281,14 @@ osd_messenger_t::~osd_messenger_t()
{ {
stop_client(clients.begin()->first, true, true); stop_client(clients.begin()->first, true, true);
} }
if (iothreads.size())
{
for (auto iot: iothreads)
{
delete iot;
}
iothreads.clear();
}
#ifdef WITH_RDMA #ifdef WITH_RDMA
if (rdma_context) if (rdma_context)
{ {
@ -262,7 +422,8 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer
clients[peer_fd]->connect_timeout_id = -1; clients[peer_fd]->connect_timeout_id = -1;
clients[peer_fd]->osd_num = peer_osd; clients[peer_fd]->osd_num = peer_osd;
clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size); clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size);
tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events) clients[peer_fd]->receive_buffer_size = receive_buffer_size;
tfd->set_fd_handler(peer_fd, EPOLLOUT, [this](int peer_fd, int epoll_events)
{ {
// Either OUT (connected) or HUP // Either OUT (connected) or HUP
handle_connect_epoll(peer_fd); handle_connect_epoll(peer_fd);
@ -303,7 +464,7 @@ void osd_messenger_t::handle_connect_epoll(int peer_fd)
int one = 1; int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
cl->peer_state = PEER_CONNECTED; cl->peer_state = PEER_CONNECTED;
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events) tfd->set_fd_handler(peer_fd, EPOLLIN, [this](int peer_fd, int epoll_events)
{ {
handle_peer_epoll(peer_fd, epoll_events); handle_peer_epoll(peer_fd, epoll_events);
}); });
@ -327,6 +488,14 @@ void osd_messenger_t::handle_peer_epoll(int peer_fd, int epoll_events)
{ {
// Mark client as ready (i.e. some data is available) // Mark client as ready (i.e. some data is available)
auto cl = clients[peer_fd]; auto cl = clients[peer_fd];
#ifdef WITH_RDMA
if (cl->peer_state == PEER_RDMA)
{
// FIXME: Do something better than just forgetting the FD
// FIXME: Ignore pings during RDMA state transition
return;
}
#endif
cl->read_ready++; cl->read_ready++;
if (cl->read_ready == 1) if (cl->read_ready == 1)
{ {
@ -487,7 +656,7 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
fprintf(stderr, "Connected to OSD %ju using RDMA\n", cl->osd_num); fprintf(stderr, "Connected to OSD %ju using RDMA\n", cl->osd_num);
} }
cl->peer_state = PEER_RDMA; cl->peer_state = PEER_RDMA;
tfd->set_fd_handler(cl->peer_fd, false, [this](int peer_fd, int epoll_events) tfd->set_fd_handler(cl->peer_fd, EPOLLIN, [this](int peer_fd, int epoll_events)
{ {
// Do not miss the disconnection! // Do not miss the disconnection!
if (epoll_events & EPOLLRDHUP) if (epoll_events & EPOLLRDHUP)
@ -527,8 +696,9 @@ void osd_messenger_t::accept_connections(int listen_fd)
clients[peer_fd]->peer_fd = peer_fd; clients[peer_fd]->peer_fd = peer_fd;
clients[peer_fd]->peer_state = PEER_CONNECTED; clients[peer_fd]->peer_state = PEER_CONNECTED;
clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size); clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size);
clients[peer_fd]->receive_buffer_size = receive_buffer_size;
// Add FD to epoll // Add FD to epoll
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events) tfd->set_fd_handler(peer_fd, EPOLLIN, [this](int peer_fd, int epoll_events)
{ {
handle_peer_epoll(peer_fd, epoll_events); handle_peer_epoll(peer_fd, epoll_events);
}); });

View File

@ -11,6 +11,7 @@
#include <map> #include <map>
#include <deque> #include <deque>
#include <vector> #include <vector>
#include <mutex>
#include "malloc_or_die.h" #include "malloc_or_die.h"
#include "json11/json11.hpp" #include "json11/json11.hpp"
@ -47,6 +48,8 @@ struct msgr_rdma_context_t;
struct osd_client_t struct osd_client_t
{ {
std::mutex mu;
int refs = 0; int refs = 0;
sockaddr_storage peer_addr; sockaddr_storage peer_addr;
@ -59,6 +62,7 @@ struct osd_client_t
osd_num_t osd_num = 0; osd_num_t osd_num = 0;
void *in_buf = NULL; void *in_buf = NULL;
uint32_t receive_buffer_size = 0;
#ifdef WITH_RDMA #ifdef WITH_RDMA
msgr_rdma_connection_t *rdma_conn = NULL; msgr_rdma_connection_t *rdma_conn = NULL;
@ -89,6 +93,17 @@ struct osd_client_t
std::vector<msgr_sendp_t> outbox, next_outbox; std::vector<msgr_sendp_t> outbox, next_outbox;
~osd_client_t(); ~osd_client_t();
bool try_send(ring_loop_t *ringloop, bool use_sync_send_recv);
void handle_send(int result);
bool try_recv(ring_loop_t *ringloop, bool use_sync_send_recv);
bool handle_read(int result);
bool handle_read_buffer(void *curbuf, int remain);
bool handle_finished_read();
void handle_op_hdr();
bool handle_reply_hdr();
void handle_reply_ready(osd_op_t *op);
}; };
struct osd_wanted_peer_t struct osd_wanted_peer_t
@ -111,6 +126,53 @@ struct osd_op_stats_t
uint64_t subop_stat_count[OSD_OP_MAX+1] = { 0 }; uint64_t subop_stat_count[OSD_OP_MAX+1] = { 0 };
}; };
#ifdef __MOCK__
class msgr_iothread_t;
#else
#include <thread>
#include "epoll_manager.h"
class msgr_iothread_t
{
protected:
ring_loop_t *ring = NULL;
epoll_manager_t *epmgr = NULL;
int submit_eventfd = -1;
bool stopped = false;
bool pending = false;
bool *callback_stopped = NULL;
std::mutex mu;
std::vector<osd_client_t*> pending_clients;
std::thread *thread = NULL;
void run();
public:
msgr_iothread_t();
~msgr_iothread_t();
epoll_manager_t *get_epmgr()
{
return epmgr;
}
int get_eventfd()
{
return submit_eventfd;
}
void wakeup(osd_client_t *cl, ring_loop_t *outer_ring);
void add_pending(osd_client_t *cl);
void submit_pending_writes();
void stop();
};
#endif
struct osd_messenger_t struct osd_messenger_t
{ {
protected: protected:
@ -123,6 +185,7 @@ protected:
int osd_ping_timeout = 0; int osd_ping_timeout = 0;
int log_level = 0; int log_level = 0;
bool use_sync_send_recv = false; bool use_sync_send_recv = false;
int iothread_count = 4;
#ifdef WITH_RDMA #ifdef WITH_RDMA
bool use_rdma = true; bool use_rdma = true;
@ -134,10 +197,12 @@ protected:
bool rdma_odp = false; bool rdma_odp = false;
#endif #endif
std::vector<msgr_iothread_t*> iothreads;
std::vector<int> read_ready_clients; std::vector<int> read_ready_clients;
std::vector<int> write_ready_clients; std::vector<int> write_ready_clients;
// We don't use ringloop->set_immediate here because we may have no ringloop in client :) int completion_eventfd = -1;
std::vector<std::function<void()>> set_immediate; std::mutex completions_mu;
std::vector<osd_op_t*> completions;
public: public:
timerfd_manager_t *tfd; timerfd_manager_t *tfd;
@ -188,15 +253,8 @@ protected:
void cancel_osd_ops(osd_client_t *cl); void cancel_osd_ops(osd_client_t *cl);
void cancel_op(osd_op_t *op); void cancel_op(osd_op_t *op);
bool try_send(osd_client_t *cl); void add_completion(osd_op_t *op, ring_loop_t *ringloop = NULL);
void handle_send(int result, osd_client_t *cl); void handle_completions();
bool handle_read(int result, osd_client_t *cl);
bool handle_read_buffer(osd_client_t *cl, void *curbuf, int remain);
bool handle_finished_read(osd_client_t *cl);
void handle_op_hdr(osd_client_t *cl);
bool handle_reply_hdr(osd_client_t *cl);
void handle_reply_ready(osd_op_t *op);
#ifdef WITH_RDMA #ifdef WITH_RDMA
void try_send_rdma(osd_client_t *cl); void try_send_rdma(osd_client_t *cl);

View File

@ -363,6 +363,8 @@ bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address, uint64
auto cl = clients.at(peer_fd); auto cl = clients.at(peer_fd);
cl->rdma_conn = rdma_conn; cl->rdma_conn = rdma_conn;
cl->peer_state = PEER_RDMA_CONNECTING; cl->peer_state = PEER_RDMA_CONNECTING;
// Add the initial receive request
try_recv_rdma(cl);
return true; return true;
} }
} }
@ -603,7 +605,7 @@ void osd_messenger_t::handle_rdma_events()
if (!is_send) if (!is_send)
{ {
rc->cur_recv--; rc->cur_recv--;
if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf].buf, wc[i].byte_len)) if (!cl->handle_read_buffer(rc->recv_buffers[rc->next_recv_buf].buf, wc[i].byte_len))
{ {
// handle_read_buffer may stop the client // handle_read_buffer may stop the client
continue; continue;
@ -666,9 +668,5 @@ void osd_messenger_t::handle_rdma_events()
} }
} }
} while (event_count > 0); } while (event_count > 0);
for (auto cb: set_immediate) handle_received_ops();
{
cb();
}
set_immediate.clear();
} }

View File

@ -9,14 +9,26 @@ void osd_messenger_t::read_requests()
{ {
int peer_fd = read_ready_clients[i]; int peer_fd = read_ready_clients[i];
osd_client_t *cl = clients[peer_fd]; osd_client_t *cl = clients[peer_fd];
if (!cl->try_recv(ringloop, use_sync_send_recv))
{
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i);
return;
}
}
read_ready_clients.clear();
}
bool osd_client_t::try_recv(ring_loop_t *ringloop, bool use_sync_send_recv)
{
auto cl = this;
if (cl->read_msg.msg_iovlen) if (cl->read_msg.msg_iovlen)
{ {
continue; return true;
} }
if (cl->read_remaining < receive_buffer_size) if (cl->read_remaining < cl->receive_buffer_size)
{ {
cl->read_iov.iov_base = cl->in_buf; cl->read_iov.iov_base = cl->in_buf;
cl->read_iov.iov_len = receive_buffer_size; cl->read_iov.iov_len = cl->receive_buffer_size;
cl->read_msg.msg_iov = &cl->read_iov; cl->read_msg.msg_iov = &cl->read_iov;
cl->read_msg.msg_iovlen = 1; cl->read_msg.msg_iovlen = 1;
} }
@ -34,11 +46,10 @@ void osd_messenger_t::read_requests()
if (!sqe) if (!sqe)
{ {
cl->read_msg.msg_iovlen = 0; cl->read_msg.msg_iovlen = 0;
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i); return false;
return;
} }
ring_data_t* data = ((ring_data_t*)sqe->user_data); ring_data_t* data = ((ring_data_t*)sqe->user_data);
data->callback = [this, cl](ring_data_t *data) { handle_read(data->res, cl); }; data->callback = [this](ring_data_t *data) { handle_read(data->res); };
my_uring_prep_recvmsg(sqe, peer_fd, &cl->read_msg, 0); my_uring_prep_recvmsg(sqe, peer_fd, &cl->read_msg, 0);
} }
else else
@ -48,14 +59,14 @@ void osd_messenger_t::read_requests()
{ {
result = -errno; result = -errno;
} }
handle_read(result, cl); handle_read(result);
} }
} return true;
read_ready_clients.clear();
} }
bool osd_messenger_t::handle_read(int result, osd_client_t *cl) bool osd_client_t::handle_read(int result)
{ {
auto cl = this;
bool ret = false; bool ret = false;
cl->read_msg.msg_iovlen = 0; cl->read_msg.msg_iovlen = 0;
cl->refs--; cl->refs--;
@ -74,7 +85,7 @@ bool osd_messenger_t::handle_read(int result, osd_client_t *cl)
{ {
fprintf(stderr, "Client %d socket read error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result)); fprintf(stderr, "Client %d socket read error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result));
} }
stop_client(cl->peer_fd); close(cl->peer_fd);
return false; return false;
} }
if (result == -EAGAIN || result == -EINTR || result < cl->read_iov.iov_len) if (result == -EAGAIN || result == -EINTR || result < cl->read_iov.iov_len)
@ -91,7 +102,7 @@ bool osd_messenger_t::handle_read(int result, osd_client_t *cl)
{ {
if (cl->read_iov.iov_base == cl->in_buf) if (cl->read_iov.iov_base == cl->in_buf)
{ {
if (!handle_read_buffer(cl, cl->in_buf, result)) if (!handle_read_buffer(cl->in_buf, result))
{ {
goto fin; goto fin;
} }
@ -103,7 +114,7 @@ bool osd_messenger_t::handle_read(int result, osd_client_t *cl)
cl->recv_list.eat(result); cl->recv_list.eat(result);
if (cl->recv_list.done >= cl->recv_list.count) if (cl->recv_list.done >= cl->recv_list.count)
{ {
if (!handle_finished_read(cl)) if (!handle_finished_read())
{ {
goto fin; goto fin;
} }
@ -115,16 +126,13 @@ bool osd_messenger_t::handle_read(int result, osd_client_t *cl)
} }
} }
fin: fin:
for (auto cb: set_immediate) msgr->handle_completions();
{
cb();
}
set_immediate.clear();
return ret; return ret;
} }
bool osd_messenger_t::handle_read_buffer(osd_client_t *cl, void *curbuf, int remain) bool osd_client_t::handle_read_buffer(void *curbuf, int remain)
{ {
auto cl = this;
// Compose operation(s) from the buffer // Compose operation(s) from the buffer
while (remain > 0) while (remain > 0)
{ {
@ -160,7 +168,7 @@ bool osd_messenger_t::handle_read_buffer(osd_client_t *cl, void *curbuf, int rem
} }
if (cl->recv_list.done >= cl->recv_list.count) if (cl->recv_list.done >= cl->recv_list.count)
{ {
if (!handle_finished_read(cl)) if (!handle_finished_read())
{ {
return false; return false;
} }
@ -169,19 +177,20 @@ bool osd_messenger_t::handle_read_buffer(osd_client_t *cl, void *curbuf, int rem
return true; return true;
} }
bool osd_messenger_t::handle_finished_read(osd_client_t *cl) bool osd_client_t::handle_finished_read()
{ {
auto cl = this;
cl->recv_list.reset(); cl->recv_list.reset();
if (cl->read_state == CL_READ_HDR) if (cl->read_state == CL_READ_HDR)
{ {
if (cl->read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC) if (cl->read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC)
return handle_reply_hdr(cl); return handle_reply_hdr();
else if (cl->read_op->req.hdr.magic == SECONDARY_OSD_OP_MAGIC) else if (cl->read_op->req.hdr.magic == SECONDARY_OSD_OP_MAGIC)
handle_op_hdr(cl); handle_op_hdr();
else else
{ {
fprintf(stderr, "Received garbage: magic=%jx id=%ju opcode=%jx from %d\n", cl->read_op->req.hdr.magic, cl->read_op->req.hdr.id, cl->read_op->req.hdr.opcode, cl->peer_fd); fprintf(stderr, "Received garbage: magic=%jx id=%ju opcode=%jx from %d\n", cl->read_op->req.hdr.magic, cl->read_op->req.hdr.id, cl->read_op->req.hdr.opcode, cl->peer_fd);
stop_client(cl->peer_fd); close(cl->peer_fd);
return false; return false;
} }
} }
@ -189,14 +198,14 @@ bool osd_messenger_t::handle_finished_read(osd_client_t *cl)
{ {
// Operation is ready // Operation is ready
cl->received_ops.push_back(cl->read_op); cl->received_ops.push_back(cl->read_op);
set_immediate.push_back([this, op = cl->read_op]() { exec_op(op); }); msgr->add_completion(cl->read_op);
cl->read_op = NULL; cl->read_op = NULL;
cl->read_state = 0; cl->read_state = 0;
} }
else if (cl->read_state == CL_READ_REPLY_DATA) else if (cl->read_state == CL_READ_REPLY_DATA)
{ {
// Reply is ready // Reply is ready
handle_reply_ready(cl->read_op); msgr->add_completion(cl->read_op);
cl->read_op = NULL; cl->read_op = NULL;
cl->read_state = 0; cl->read_state = 0;
} }
@ -207,8 +216,9 @@ bool osd_messenger_t::handle_finished_read(osd_client_t *cl)
return true; return true;
} }
void osd_messenger_t::handle_op_hdr(osd_client_t *cl) void osd_client_t::handle_op_hdr()
{ {
auto cl = this;
osd_op_t *cur_op = cl->read_op; osd_op_t *cur_op = cl->read_op;
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ) if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ)
{ {
@ -285,20 +295,21 @@ void osd_messenger_t::handle_op_hdr(osd_client_t *cl)
{ {
// Operation is ready // Operation is ready
cl->received_ops.push_back(cur_op); cl->received_ops.push_back(cur_op);
set_immediate.push_back([this, cur_op]() { exec_op(cur_op); }); msgr->add_completion(cur_op);
cl->read_op = NULL; cl->read_op = NULL;
cl->read_state = 0; cl->read_state = 0;
} }
} }
bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl) bool osd_client_t::handle_reply_hdr()
{ {
auto cl = this;
auto req_it = cl->sent_ops.find(cl->read_op->req.hdr.id); auto req_it = cl->sent_ops.find(cl->read_op->req.hdr.id);
if (req_it == cl->sent_ops.end()) if (req_it == cl->sent_ops.end())
{ {
// Command out of sync. Drop connection // Command out of sync. Drop connection
fprintf(stderr, "Client %d command out of sync: id %ju\n", cl->peer_fd, cl->read_op->req.hdr.id); fprintf(stderr, "Client %d command out of sync: id %ju\n", cl->peer_fd, cl->read_op->req.hdr.id);
stop_client(cl->peer_fd); close(cl->peer_fd);
return false; return false;
} }
osd_op_t *op = req_it->second; osd_op_t *op = req_it->second;
@ -315,7 +326,7 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
fprintf(stderr, "Client %d read reply of different length: expected %u+%u, got %jd+%u\n", fprintf(stderr, "Client %d read reply of different length: expected %u+%u, got %jd+%u\n",
cl->peer_fd, expected_size, op->bitmap_len, op->reply.hdr.retval, bmp_len); cl->peer_fd, expected_size, op->bitmap_len, op->reply.hdr.retval, bmp_len);
cl->sent_ops[op->req.hdr.id] = op; cl->sent_ops[op->req.hdr.id] = op;
stop_client(cl->peer_fd); close(cl->peer_fd);
return false; return false;
} }
if (bmp_len > 0) if (bmp_len > 0)
@ -383,7 +394,7 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
{ {
reuse: reuse:
// It's fine to reuse cl->read_op for the next reply // It's fine to reuse cl->read_op for the next reply
handle_reply_ready(op); msgr->add_completion(op);
cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE); cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE);
cl->read_remaining = OSD_PACKET_SIZE; cl->read_remaining = OSD_PACKET_SIZE;
cl->read_state = CL_READ_HDR; cl->read_state = CL_READ_HDR;
@ -391,7 +402,49 @@ reuse:
return true; return true;
} }
void osd_messenger_t::handle_reply_ready(osd_op_t *op) static uint64_t one = 1;
void osd_messenger_t::add_completion(osd_op_t *op, ring_loop_t *ringloop)
{
completions_mu.lock();
bool wakeup_main_thread = !completions.size();
completions.push_back(op);
completions_mu.unlock();
if (wakeup_main_thread)
{
io_uring_sqe* sqe = ringloop ? ringloop->get_sqe() : NULL;
if (!sqe)
{
write(completion_eventfd, &one, sizeof(one));
}
else
{
ring_data_t* data = ((ring_data_t*)sqe->user_data);
data->callback = [](ring_data_t*){};
my_uring_prep_write(sqe, completion_eventfd, &one, sizeof(one), 0);
}
}
}
void osd_messenger_t::handle_completions()
{
// Reset eventfd counter
uint64_t ctr = 0;
int r = read(completion_eventfd, &ctr, 8);
if (r < 0 && errno != EAGAIN && errno != EINTR)
{
fprintf(stderr, "Error resetting eventfd: %s\n", strerror(errno));
}
completions_mu.lock();
auto ops = std::move(completions);
completions_mu.unlock();
for (auto op: completions)
{
if (op->op_type == OSD_OP_IN)
{
exec_op(op);
}
else
{ {
// Measure subop latency // Measure subop latency
timespec tv_end; timespec tv_end;
@ -406,9 +459,8 @@ void osd_messenger_t::handle_reply_ready(osd_op_t *op)
(tv_end.tv_sec - op->tv_begin.tv_sec)*1000000 + (tv_end.tv_sec - op->tv_begin.tv_sec)*1000000 +
(tv_end.tv_nsec - op->tv_begin.tv_nsec)/1000 (tv_end.tv_nsec - op->tv_begin.tv_nsec)/1000
); );
set_immediate.push_back([op]()
{
// Copy lambda to be unaffected by `delete op` // Copy lambda to be unaffected by `delete op`
std::function<void(osd_op_t*)>(op->callback)(op); std::function<void(osd_op_t*)>(op->callback)(op);
}); }
}
} }

View File

@ -15,10 +15,16 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
{ {
clock_gettime(CLOCK_REALTIME, &cur_op->tv_begin); clock_gettime(CLOCK_REALTIME, &cur_op->tv_begin);
} }
else else if (cur_op->op_type == OSD_OP_IN)
{ {
measure_exec(cur_op);
}
std::unique_lock<std::mutex> lock;
if (iothreads.size())
{
lock = std::unique_lock<std::mutex>(cl->mu);
}
// Check that operation actually belongs to this client // Check that operation actually belongs to this client
// FIXME: Review if this is still needed
bool found = false; bool found = false;
for (auto it = cl->received_ops.begin(); it != cl->received_ops.end(); it++) for (auto it = cl->received_ops.begin(); it != cl->received_ops.end(); it++)
{ {
@ -34,12 +40,10 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
delete cur_op; delete cur_op;
return; return;
} }
}
auto & to_send_list = cl->write_msg.msg_iovlen ? cl->next_send_list : cl->send_list; auto & to_send_list = cl->write_msg.msg_iovlen ? cl->next_send_list : cl->send_list;
auto & to_outbox = cl->write_msg.msg_iovlen ? cl->next_outbox : cl->outbox; auto & to_outbox = cl->write_msg.msg_iovlen ? cl->next_outbox : cl->outbox;
if (cur_op->op_type == OSD_OP_IN) if (cur_op->op_type == OSD_OP_IN)
{ {
measure_exec(cur_op);
to_send_list.push_back((iovec){ .iov_base = cur_op->reply.buf, .iov_len = OSD_PACKET_SIZE }); to_send_list.push_back((iovec){ .iov_base = cur_op->reply.buf, .iov_len = OSD_PACKET_SIZE });
} }
else else
@ -117,12 +121,18 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
// FIXME: It's worse because it doesn't allow batching // FIXME: It's worse because it doesn't allow batching
while (cl->outbox.size()) while (cl->outbox.size())
{ {
try_send(cl); cl->try_send(NULL, true);
} }
} }
else if (iothreads.size())
{
auto iot = iothreads[cl->peer_fd % iothreads.size()];
cl->refs++;
iot->wakeup(cl, ringloop);
}
else else
{ {
if ((cl->write_msg.msg_iovlen > 0 || !try_send(cl)) && (cl->write_state == 0)) if ((cl->write_msg.msg_iovlen > 0 || !cl->try_send(ringloop, use_sync_send_recv)) && (cl->write_state == 0))
{ {
cl->write_state = CL_WRITE_READY; cl->write_state = CL_WRITE_READY;
write_ready_clients.push_back(cur_op->peer_fd); write_ready_clients.push_back(cur_op->peer_fd);
@ -180,8 +190,9 @@ void osd_messenger_t::measure_exec(osd_op_t *cur_op)
} }
} }
bool osd_messenger_t::try_send(osd_client_t *cl) bool osd_client_t::try_send(ring_loop_t *ringloop, bool use_sync_send_recv)
{ {
auto cl = this;
int peer_fd = cl->peer_fd; int peer_fd = cl->peer_fd;
if (!cl->send_list.size() || cl->write_msg.msg_iovlen > 0) if (!cl->send_list.size() || cl->write_msg.msg_iovlen > 0)
{ {
@ -198,7 +209,7 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
cl->write_msg.msg_iovlen = cl->send_list.size() < IOV_MAX ? cl->send_list.size() : IOV_MAX; cl->write_msg.msg_iovlen = cl->send_list.size() < IOV_MAX ? cl->send_list.size() : IOV_MAX;
cl->refs++; cl->refs++;
ring_data_t* data = ((ring_data_t*)sqe->user_data); ring_data_t* data = ((ring_data_t*)sqe->user_data);
data->callback = [this, cl](ring_data_t *data) { handle_send(data->res, cl); }; data->callback = [this](ring_data_t *data) { handle_send(data->res); };
my_uring_prep_sendmsg(sqe, peer_fd, &cl->write_msg, 0); my_uring_prep_sendmsg(sqe, peer_fd, &cl->write_msg, 0);
} }
else else
@ -211,18 +222,27 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
{ {
result = -errno; result = -errno;
} }
handle_send(result, cl); handle_send(result);
} }
return true; return true;
} }
void osd_messenger_t::send_replies() void osd_messenger_t::send_replies()
{ {
if (iothreads.size())
{
return;
}
for (int i = 0; i < write_ready_clients.size(); i++) for (int i = 0; i < write_ready_clients.size(); i++)
{ {
int peer_fd = write_ready_clients[i]; int peer_fd = write_ready_clients[i];
auto cl_it = clients.find(peer_fd); auto cl_it = clients.find(peer_fd);
if (cl_it != clients.end() && !try_send(cl_it->second)) if (cl_it == clients.end())
{
continue;
}
auto cl = cl_it->second;
if (!cl->try_send(ringloop, use_sync_send_recv))
{ {
write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + i); write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + i);
return; return;
@ -231,8 +251,9 @@ void osd_messenger_t::send_replies()
write_ready_clients.clear(); write_ready_clients.clear();
} }
void osd_messenger_t::handle_send(int result, osd_client_t *cl) void osd_client_t::handle_send(int result)
{ {
auto cl = this;
cl->write_msg.msg_iovlen = 0; cl->write_msg.msg_iovlen = 0;
cl->refs--; cl->refs--;
if (cl->peer_state == PEER_STOPPED) if (cl->peer_state == PEER_STOPPED)
@ -247,7 +268,7 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
{ {
// this is a client socket, so don't panic. just disconnect it // this is a client socket, so don't panic. just disconnect it
fprintf(stderr, "Client %d socket write error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result)); fprintf(stderr, "Client %d socket write error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result));
stop_client(cl->peer_fd); close(cl->peer_fd);
return; return;
} }
if (result >= 0) if (result >= 0)
@ -289,23 +310,11 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
#ifdef WITH_RDMA #ifdef WITH_RDMA
if (cl->rdma_conn && !cl->outbox.size() && cl->peer_state == PEER_RDMA_CONNECTING) if (cl->rdma_conn && !cl->outbox.size() && cl->peer_state == PEER_RDMA_CONNECTING)
{ {
// FIXME: Do something better than just forgetting the FD
// FIXME: Ignore pings during RDMA state transition
if (log_level > 0) if (log_level > 0)
{ {
fprintf(stderr, "Successfully connected with client %d using RDMA\n", cl->peer_fd); fprintf(stderr, "Successfully connected with client %d using RDMA\n", cl->peer_fd);
} }
cl->peer_state = PEER_RDMA; cl->peer_state = PEER_RDMA;
tfd->set_fd_handler(cl->peer_fd, false, [this](int peer_fd, int epoll_events)
{
// Do not miss the disconnection!
if (epoll_events & EPOLLRDHUP)
{
handle_peer_epoll(peer_fd, epoll_events);
}
});
// Add the initial receive request
try_recv_rdma(cl);
} }
#endif #endif
} }

View File

@ -78,7 +78,7 @@ void osd_messenger_t::stop_client(int peer_fd, bool force, bool force_delete)
} }
#ifndef __MOCK__ #ifndef __MOCK__
// Then remove FD from the eventloop so we don't accidentally read something // Then remove FD from the eventloop so we don't accidentally read something
tfd->set_fd_handler(peer_fd, false, NULL); tfd->set_fd_handler(peer_fd, 0, NULL);
if (cl->connect_timeout_id >= 0) if (cl->connect_timeout_id >= 0)
{ {
tfd->clear_timer(cl->connect_timeout_id); tfd->clear_timer(cl->connect_timeout_id);

View File

@ -655,7 +655,7 @@ help:
ringloop->register_consumer(&consumer); ringloop->register_consumer(&consumer);
// Add FD to epoll // Add FD to epoll
bool stop = false; bool stop = false;
epmgr->tfd->set_fd_handler(sockfd[0], false, [this, &stop](int peer_fd, int epoll_events) epmgr->tfd->set_fd_handler(sockfd[0], EPOLLIN, [this, &stop](int peer_fd, int epoll_events)
{ {
if (epoll_events & EPOLLRDHUP) if (epoll_events & EPOLLRDHUP)
{ {

View File

@ -185,7 +185,7 @@ void kv_cli_t::run()
fcntl(0, F_SETFL, fcntl(0, F_GETFL, 0) | O_NONBLOCK); fcntl(0, F_SETFL, fcntl(0, F_GETFL, 0) | O_NONBLOCK);
try try
{ {
epmgr->tfd->set_fd_handler(0, false, [this](int fd, int events) epmgr->tfd->set_fd_handler(0, EPOLLIN, [this](int fd, int events)
{ {
if (events & EPOLLIN) if (events & EPOLLIN)
{ {
@ -193,7 +193,7 @@ void kv_cli_t::run()
} }
if (events & EPOLLRDHUP) if (events & EPOLLRDHUP)
{ {
epmgr->tfd->set_fd_handler(0, false, NULL); epmgr->tfd->set_fd_handler(0, 0, NULL);
finished = true; finished = true;
} }
}); });

View File

@ -243,7 +243,7 @@ void nfs_proxy_t::run(json11::Json cfg)
// Create NFS socket and add it to epoll // Create NFS socket and add it to epoll
int nfs_socket = create_and_bind_socket(bind_address, nfs_port, 128, &listening_port); int nfs_socket = create_and_bind_socket(bind_address, nfs_port, 128, &listening_port);
fcntl(nfs_socket, F_SETFL, fcntl(nfs_socket, F_GETFL, 0) | O_NONBLOCK); fcntl(nfs_socket, F_SETFL, fcntl(nfs_socket, F_GETFL, 0) | O_NONBLOCK);
epmgr->tfd->set_fd_handler(nfs_socket, false, [this](int nfs_socket, int epoll_events) epmgr->tfd->set_fd_handler(nfs_socket, EPOLLIN, [this](int nfs_socket, int epoll_events)
{ {
if (epoll_events & EPOLLRDHUP) if (epoll_events & EPOLLRDHUP)
{ {
@ -260,7 +260,7 @@ void nfs_proxy_t::run(json11::Json cfg)
// Create portmap socket and add it to epoll // Create portmap socket and add it to epoll
int portmap_socket = create_and_bind_socket(bind_address, 111, 128, NULL); int portmap_socket = create_and_bind_socket(bind_address, 111, 128, NULL);
fcntl(portmap_socket, F_SETFL, fcntl(portmap_socket, F_GETFL, 0) | O_NONBLOCK); fcntl(portmap_socket, F_SETFL, fcntl(portmap_socket, F_GETFL, 0) | O_NONBLOCK);
epmgr->tfd->set_fd_handler(portmap_socket, false, [this](int portmap_socket, int epoll_events) epmgr->tfd->set_fd_handler(portmap_socket, EPOLLIN, [this](int portmap_socket, int epoll_events)
{ {
if (epoll_events & EPOLLRDHUP) if (epoll_events & EPOLLRDHUP)
{ {
@ -466,7 +466,7 @@ void nfs_proxy_t::do_accept(int listen_fd)
{ {
cli->proc_table.insert(fn); cli->proc_table.insert(fn);
} }
epmgr->tfd->set_fd_handler(nfs_fd, true, [cli](int nfs_fd, int epoll_events) epmgr->tfd->set_fd_handler(nfs_fd, EPOLLOUT, [cli](int nfs_fd, int epoll_events)
{ {
// Handle incoming event // Handle incoming event
if (epoll_events & EPOLLRDHUP) if (epoll_events & EPOLLRDHUP)
@ -723,7 +723,7 @@ void nfs_client_t::stop()
stopped = true; stopped = true;
if (refs <= 0) if (refs <= 0)
{ {
parent->epmgr->tfd->set_fd_handler(nfs_fd, true, NULL); parent->epmgr->tfd->set_fd_handler(nfs_fd, 0, NULL);
close(nfs_fd); close(nfs_fd);
delete this; delete this;
} }

View File

@ -361,7 +361,7 @@ void osd_t::bind_socket()
listen_fd = create_and_bind_socket(bind_address, bind_port, listen_backlog, &listening_port); listen_fd = create_and_bind_socket(bind_address, bind_port, listen_backlog, &listening_port);
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK); fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events) epmgr->set_fd_handler(listen_fd, EPOLLIN, [this](int fd, int events)
{ {
msgr.accept_connections(listen_fd); msgr.accept_connections(listen_fd);
}); });

View File

@ -247,6 +247,7 @@ resume_8:
finish: finish:
if (cur_op->peer_fd) if (cur_op->peer_fd)
{ {
// FIXME: Do it before executing sync
auto it = msgr.clients.find(cur_op->peer_fd); auto it = msgr.clients.find(cur_op->peer_fd);
if (it != msgr.clients.end()) if (it != msgr.clients.end())
it->second->dirty_pgs.clear(); it->second->dirty_pgs.clear();

View File

@ -54,14 +54,14 @@ int epoll_manager_t::get_fd()
return epoll_fd; return epoll_fd;
} }
void epoll_manager_t::set_fd_handler(int fd, bool wr, std::function<void(int, int)> handler) void epoll_manager_t::set_fd_handler(int fd, int events, std::function<void(int, int)> handler)
{ {
if (handler != NULL) if (handler != NULL)
{ {
bool exists = epoll_handlers.find(fd) != epoll_handlers.end(); bool exists = epoll_handlers.find(fd) != epoll_handlers.end();
epoll_event ev; epoll_event ev;
ev.data.fd = fd; ev.data.fd = fd;
ev.events = (wr ? EPOLLOUT : 0) | EPOLLIN | EPOLLRDHUP | EPOLLET; ev.events = events | EPOLLRDHUP | EPOLLET;
if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0) if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0)
{ {
if (errno == ENOENT) if (errno == ENOENT)

View File

@ -3,6 +3,8 @@
#pragma once #pragma once
#include <sys/epoll.h>
#include <map> #include <map>
#include "ringloop.h" #include "ringloop.h"
@ -21,7 +23,7 @@ public:
epoll_manager_t(ring_loop_t *ringloop); epoll_manager_t(ring_loop_t *ringloop);
~epoll_manager_t(); ~epoll_manager_t();
int get_fd(); int get_fd();
void set_fd_handler(int fd, bool wr, std::function<void(int, int)> handler); void set_fd_handler(int fd, int events, std::function<void(int, int)> handler);
void handle_events(int timeout); void handle_events(int timeout);
timerfd_manager_t *tfd; timerfd_manager_t *tfd;

View File

@ -32,12 +32,22 @@ static inline void my_uring_prep_readv(struct io_uring_sqe *sqe, int fd, const s
my_uring_prep_rw(IORING_OP_READV, sqe, fd, iovecs, nr_vecs, offset); my_uring_prep_rw(IORING_OP_READV, sqe, fd, iovecs, nr_vecs, offset);
} }
static inline void my_uring_prep_read(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, off_t offset)
{
my_uring_prep_rw(IORING_OP_READ, sqe, fd, buf, nbytes, offset);
}
static inline void my_uring_prep_read_fixed(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, off_t offset, int buf_index) static inline void my_uring_prep_read_fixed(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, off_t offset, int buf_index)
{ {
my_uring_prep_rw(IORING_OP_READ_FIXED, sqe, fd, buf, nbytes, offset); my_uring_prep_rw(IORING_OP_READ_FIXED, sqe, fd, buf, nbytes, offset);
sqe->buf_index = buf_index; sqe->buf_index = buf_index;
} }
static inline void my_uring_prep_write(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, off_t offset)
{
my_uring_prep_rw(IORING_OP_WRITE, sqe, fd, buf, nbytes, offset);
}
static inline void my_uring_prep_writev(struct io_uring_sqe *sqe, int fd, const struct iovec *iovecs, unsigned nr_vecs, off_t offset) static inline void my_uring_prep_writev(struct io_uring_sqe *sqe, int fd, const struct iovec *iovecs, unsigned nr_vecs, off_t offset)
{ {
my_uring_prep_rw(IORING_OP_WRITEV, sqe, fd, iovecs, nr_vecs, offset); my_uring_prep_rw(IORING_OP_WRITEV, sqe, fd, iovecs, nr_vecs, offset);

View File

@ -11,7 +11,7 @@
#include <stdexcept> #include <stdexcept>
#include "timerfd_manager.h" #include "timerfd_manager.h"
timerfd_manager_t::timerfd_manager_t(std::function<void(int, bool, std::function<void(int, int)>)> set_fd_handler) timerfd_manager_t::timerfd_manager_t(std::function<void(int, int, std::function<void(int, int)>)> set_fd_handler)
{ {
this->set_fd_handler = set_fd_handler; this->set_fd_handler = set_fd_handler;
wait_state = 0; wait_state = 0;
@ -20,7 +20,7 @@ timerfd_manager_t::timerfd_manager_t(std::function<void(int, bool, std::function
{ {
throw std::runtime_error(std::string("timerfd_create: ") + strerror(errno)); throw std::runtime_error(std::string("timerfd_create: ") + strerror(errno));
} }
set_fd_handler(timerfd, false, [this](int fd, int events) set_fd_handler(timerfd, EPOLLIN, [this](int fd, int events)
{ {
handle_readable(); handle_readable();
}); });
@ -28,7 +28,7 @@ timerfd_manager_t::timerfd_manager_t(std::function<void(int, bool, std::function
timerfd_manager_t::~timerfd_manager_t() timerfd_manager_t::~timerfd_manager_t()
{ {
set_fd_handler(timerfd, false, NULL); set_fd_handler(timerfd, 0, NULL);
close(timerfd); close(timerfd);
} }

View File

@ -30,9 +30,9 @@ class timerfd_manager_t
void trigger_nearest(); void trigger_nearest();
void handle_readable(); void handle_readable();
public: public:
std::function<void(int, bool, std::function<void(int, int)>)> set_fd_handler; std::function<void(int, int, std::function<void(int, int)>)> set_fd_handler;
timerfd_manager_t(std::function<void(int, bool, std::function<void(int, int)>)> set_fd_handler); timerfd_manager_t(std::function<void(int, int, std::function<void(int, int)>)> set_fd_handler);
~timerfd_manager_t(); ~timerfd_manager_t();
int set_timer(uint64_t millis, bool repeat, std::function<void(int)> callback); int set_timer(uint64_t millis, bool repeat, std::function<void(int)> callback);
int set_timer_us(uint64_t micros, bool repeat, std::function<void(int)> callback); int set_timer_us(uint64_t micros, bool repeat, std::function<void(int)> callback);