From a61ede99518f1791332f574f15f77111266b6ce4 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Thu, 21 May 2020 01:23:43 +0300 Subject: [PATCH] Remove io_uring usage from osd_http and timerfd_manager For better future interoperability with external event loops such as QEMU's one --- osd.cpp | 22 ++++- osd.h | 1 + osd_http.cpp | 191 ++++++++++++++++---------------------------- timerfd_manager.cpp | 75 ++++++----------- timerfd_manager.h | 12 +-- 5 files changed, 122 insertions(+), 179 deletions(-) diff --git a/osd.cpp b/osd.cpp index 93d578c0..b8a5445e 100644 --- a/osd.cpp +++ b/osd.cpp @@ -43,7 +43,7 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo throw std::runtime_error(std::string("epoll_create: ") + strerror(errno)); } - this->tfd = new timerfd_manager_t(ringloop); + this->tfd = new timerfd_manager_t([this](int fd, std::function handler) { set_fd_handler(fd, handler); }); this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id) { print_stats(); @@ -236,6 +236,26 @@ void osd_t::loop() ringloop->submit(); } +void osd_t::set_fd_handler(int fd, std::function handler) +{ + if (handler != NULL) + { + epoll_event ev; + ev.data.fd = fd; + ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET; + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) + { + throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); + } + epoll_handlers[fd] = handler; + } + else + { + epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL); + epoll_handlers.erase(fd); + } +} + void osd_t::handle_epoll_events() { io_uring_sqe *sqe = ringloop->get_sqe(); diff --git a/osd.h b/osd.h index 27929a58..923664ec 100644 --- a/osd.h +++ b/osd.h @@ -327,6 +327,7 @@ class osd_t // event loop, socket read/write void loop(); + void set_fd_handler(int fd, std::function handler); void handle_epoll_events(); void read_requests(); void handle_read(ring_data_t *data, int peer_fd); diff --git a/osd_http.cpp b/osd_http.cpp index 78f8817b..3b0d5eb8 100644 --- a/osd_http.cpp +++ b/osd_http.cpp @@ -21,10 +21,7 @@ static bool ws_parse_frame(std::string & buf, int & type, std::string & res); // FIXME: Use keepalive struct http_co_t { - ring_loop_t *ringloop; timerfd_manager_t *tfd; - int epoll_fd; - std::map> *epoll_handlers; int request_timeout = 0; std::string host; @@ -40,12 +37,10 @@ struct http_co_t int peer_fd = -1; int timeout_id = -1; int epoll_events = 0; - ring_data_t *send_data = NULL, *read_data = NULL; int sent = 0; std::vector rbuf; iovec read_iov, send_iov; msghdr read_msg = { 0 }, send_msg = { 0 }; - int waiting_read_sqe = 0, waiting_send_sqe = 0; std::function callback; @@ -74,9 +69,6 @@ void osd_t::http_request(const std::string & host, const std::string & request, const http_options_t & options, std::function callback) { http_co_t *handler = new http_co_t(); - handler->ringloop = ringloop; - handler->epoll_fd = epoll_fd; - handler->epoll_handlers = &epoll_handlers; handler->request_timeout = options.timeout < 0 ? 0 : (options.timeout == 0 ? DEFAULT_TIMEOUT : options.timeout); handler->want_streaming = options.want_streaming; handler->tfd = tfd; @@ -124,9 +116,6 @@ websocket_t* osd_t::open_websocket(const std::string & host, const std::string & "Sec-WebSocket-Version: 13\r\n" "\r\n"; http_co_t *handler = new http_co_t(); - handler->ringloop = ringloop; - handler->epoll_fd = epoll_fd; - handler->epoll_handlers = &epoll_handlers; handler->request_timeout = timeout < 0 ? -1 : (timeout == 0 ? DEFAULT_TIMEOUT : timeout); handler->want_streaming = false; handler->tfd = tfd; @@ -155,28 +144,9 @@ http_co_t::~http_co_t() tfd->clear_timer(timeout_id); timeout_id = -1; } - if (read_data) - { - // Ignore CQE result - read_data->callback = [](ring_data_t *data) {}; - } - else if (waiting_read_sqe) - { - ringloop->cancel_wait_sqe(waiting_read_sqe); - } - if (send_data) - { - // Ignore CQE result - send_data->callback = [](ring_data_t *data) {}; - } - else if (waiting_send_sqe) - { - ringloop->cancel_wait_sqe(waiting_send_sqe); - } if (peer_fd >= 0) { - epoll_handlers->erase(peer_fd); - epoll_ctl(epoll_fd, EPOLL_CTL_DEL, peer_fd, NULL); + tfd->set_fd_handler(peer_fd, NULL); close(peer_fd); peer_fd = -1; } @@ -231,7 +201,7 @@ void http_co_t::start_connection() delete this; }); } - (*epoll_handlers)[peer_fd] = [this](int peer_fd, int epoll_events) + tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events) { this->epoll_events |= epoll_events; if (state == HTTP_CO_CONNECTING) @@ -249,17 +219,7 @@ void http_co_t::start_connection() delete this; } } - }; - // Add FD to epoll (EPOLLOUT for tracking connect() result) - epoll_event ev; - ev.data.fd = peer_fd; - ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET; - if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0) - { - parsed.error_code = errno; - delete this; - return; - } + }); epoll_events = 0; // Finally call connect r = ::connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); @@ -301,96 +261,83 @@ void http_co_t::handle_connect_result() void http_co_t::submit_read() { - if (!read_data && !waiting_read_sqe) + int res; +again: + if (rbuf.size() != READ_BUFFER_SIZE) { - if (rbuf.size() != READ_BUFFER_SIZE) - { - rbuf.resize(READ_BUFFER_SIZE); - } - io_uring_sqe *sqe = ringloop->get_sqe(); - if (!sqe) - { - waiting_read_sqe = ringloop->wait_sqe([this]() { waiting_read_sqe = 0; submit_read(); }); - return; - } - read_data = ((ring_data_t*)sqe->user_data); - read_iov = { .iov_base = rbuf.data(), .iov_len = READ_BUFFER_SIZE }; - read_msg.msg_iov = &read_iov; - read_msg.msg_iovlen = 1; - epoll_events = epoll_events & ~EPOLLIN; - read_data->callback = [this](ring_data_t *data) - { - read_data = NULL; - if (data->res == -EAGAIN) - { - data->res = 0; - } - if (data->res < 0) - { - delete this; - return; - } - response += std::string(rbuf.data(), data->res); - if (data->res == READ_BUFFER_SIZE) - { - submit_read(); - } - if (!handle_read()) - { - return; - } - if (data->res < READ_BUFFER_SIZE && (epoll_events & (EPOLLRDHUP|EPOLLERR))) - { - delete this; - return; - } - }; - my_uring_prep_recvmsg(sqe, peer_fd, &read_msg, 0); + rbuf.resize(READ_BUFFER_SIZE); + } + read_iov = { .iov_base = rbuf.data(), .iov_len = READ_BUFFER_SIZE }; + read_msg.msg_iov = &read_iov; + read_msg.msg_iovlen = 1; + epoll_events = epoll_events & ~EPOLLIN; + res = recvmsg(peer_fd, &read_msg, 0); + if (res < 0) + { + res = -errno; + } + if (res == -EAGAIN) + { + res = 0; + } + if (res < 0) + { + delete this; + return; + } + response += std::string(rbuf.data(), res); + if (res == READ_BUFFER_SIZE) + { + goto again; + } + if (!handle_read()) + { + return; + } + if (res < READ_BUFFER_SIZE && (epoll_events & (EPOLLRDHUP|EPOLLERR))) + { + delete this; + return; } } void http_co_t::submit_send() { - if (sent < request.size() && !send_data && !waiting_send_sqe) + int res; +again: + if (sent < request.size()) { - io_uring_sqe *sqe = ringloop->get_sqe(); - if (!sqe) - { - waiting_send_sqe = ringloop->wait_sqe([this]() { waiting_send_sqe = 0; submit_send(); }); - return; - } - send_data = ((ring_data_t*)sqe->user_data); send_iov = (iovec){ .iov_base = (void*)(request.c_str()+sent), .iov_len = request.size()-sent }; send_msg.msg_iov = &send_iov; send_msg.msg_iovlen = 1; - send_data->callback = [this](ring_data_t *data) + res = sendmsg(peer_fd, &send_msg, 0); + if (res < 0) { - send_data = NULL; - if (data->res == -EAGAIN) - { - data->res = 0; - } - else if (data->res < 0) - { - delete this; - return; - } - sent += data->res; - if (state == HTTP_CO_SENDING_REQUEST) - { - if (sent >= request.size()) - state = HTTP_CO_REQUEST_SENT; - else - submit_send(); - } - else if (state == HTTP_CO_WEBSOCKET) - { - request = request.substr(sent); - sent = 0; - submit_send(); - } - }; - my_uring_prep_sendmsg(sqe, peer_fd, &send_msg, 0); + res = -errno; + } + if (res == -EAGAIN) + { + res = 0; + } + else if (res < 0) + { + delete this; + return; + } + sent += res; + if (state == HTTP_CO_SENDING_REQUEST) + { + if (sent >= request.size()) + state = HTTP_CO_REQUEST_SENT; + else + goto again; + } + else if (state == HTTP_CO_WEBSOCKET) + { + request = request.substr(sent); + sent = 0; + goto again; + } } } diff --git a/timerfd_manager.cpp b/timerfd_manager.cpp index 298dee58..e89b2069 100644 --- a/timerfd_manager.cpp +++ b/timerfd_manager.cpp @@ -1,24 +1,29 @@ #include #include +#include #include +#include +#include #include "timerfd_manager.h" -timerfd_manager_t::timerfd_manager_t(ring_loop_t *ringloop) +timerfd_manager_t::timerfd_manager_t(std::function)> set_fd_handler) { + this->set_fd_handler = set_fd_handler; wait_state = 0; timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); if (timerfd < 0) { throw std::runtime_error(std::string("timerfd_create: ") + strerror(errno)); } - consumer.loop = [this]() { loop(); }; - ringloop->register_consumer(&consumer); - this->ringloop = ringloop; + set_fd_handler(timerfd, [this](int fd, int events) + { + handle_readable(); + }); } timerfd_manager_t::~timerfd_manager_t() { - ringloop->unregister_consumer(&consumer); + set_fd_handler(timerfd, NULL); close(timerfd); } @@ -48,7 +53,6 @@ int timerfd_manager_t::set_timer(uint64_t millis, bool repeat, std::function= 0) { - set_nearest(); - } - set_wait(); -} - -void timerfd_manager_t::set_wait() -{ - if ((wait_state & 3) == 1) - { - io_uring_sqe *sqe = ringloop->get_sqe(); - if (!sqe) + int nearest_id = timers[nearest].id; + auto cb = timers[nearest].callback; + if (timers[nearest].repeat) { - return; + inc_timer(timers[nearest]); } - ring_data_t *data = ((ring_data_t*)sqe->user_data); - my_uring_prep_poll_add(sqe, timerfd, POLLIN); - data->callback = [this](ring_data_t *data) + else { - if (data->res < 0) - { - throw std::runtime_error(std::string("waiting for timer failed: ") + strerror(-data->res)); - } - uint64_t n; - read(timerfd, &n, 8); - if (nearest >= 0) - { - int nearest_id = timers[nearest].id; - auto cb = timers[nearest].callback; - if (timers[nearest].repeat) - { - inc_timer(timers[nearest]); - } - else - { - timers.erase(timers.begin()+nearest, timers.begin()+nearest+1); - } - cb(nearest_id); - nearest = -1; - } - wait_state = 0; - set_nearest(); - set_wait(); - }; - wait_state = 3; + timers.erase(timers.begin()+nearest, timers.begin()+nearest+1); + } + cb(nearest_id); + nearest = -1; } + wait_state = 0; + set_nearest(); } diff --git a/timerfd_manager.h b/timerfd_manager.h index 1ca5a232..14e70840 100644 --- a/timerfd_manager.h +++ b/timerfd_manager.h @@ -1,7 +1,8 @@ #pragma once #include -#include "ringloop.h" +#include +#include struct timerfd_timer_t { @@ -19,16 +20,15 @@ class timerfd_manager_t int nearest = -1; int id = 1; std::vector timers; - ring_loop_t *ringloop; - ring_consumer_t consumer; void inc_timer(timerfd_timer_t & t); void set_nearest(); - void set_wait(); - void loop(); public: - timerfd_manager_t(ring_loop_t *ringloop); + std::function)> set_fd_handler; + + timerfd_manager_t(std::function)> set_fd_handler); ~timerfd_manager_t(); int set_timer(uint64_t millis, bool repeat, std::function callback); void clear_timer(int timer_id); + void handle_readable(); };