From 00ee6b79615ff7083f6486c99032cb483abf4e20 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Thu, 12 Dec 2019 11:32:20 +0300 Subject: [PATCH] Accept connections --- osd.cpp | 104 +++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 95 insertions(+), 9 deletions(-) diff --git a/osd.cpp b/osd.cpp index 53e11cb7..9e988e8d 100644 --- a/osd.cpp +++ b/osd.cpp @@ -6,6 +6,14 @@ #include "osd_ops.h" #include "ringloop.h" +struct osd_client_t +{ + sockaddr_in peer_addr; + socklen_t peer_addr_size; + int peer_fd; + bool ready; +}; + class osd_t { int wait_state = 0; @@ -17,6 +25,9 @@ class osd_t int bind_port, listen_backlog; ring_loop_t *ringloop; + std::unordered_map clients; + std::deque ready_clients; + void handle_epoll_events(); public: osd_t(ring_loop_t *ringloop); @@ -24,11 +35,6 @@ public: void loop(); }; -class osd_client_t -{ - int sock_fd; -}; - osd_t::osd_t(ring_loop_t *ringloop) { this->ringloop = ringloop; @@ -60,6 +66,8 @@ osd_t::osd_t(ring_loop_t *ringloop) throw std::runtime_error(std::string("listen: ") + strerror(errno)); } + fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK); + epoll_fd = epoll_create(1); if (epoll_fd < 0) { @@ -69,8 +77,8 @@ osd_t::osd_t(ring_loop_t *ringloop) struct epoll_event ev; ev.data.fd = listen_fd; - ev.events = EPOLLIN | EPOLLET | EPOLLONESHOT; - if (epoll_ctl(epfd, EPOLL_CTL_ADD, sfd, &ev) < 0) + ev.events = EPOLLIN; + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) < 0) { throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); } @@ -113,7 +121,85 @@ void osd_t::loop() ringloop->submit(); } -void osd_t::handle_epoll_events() +#define MAX_EPOLL_EVENTS 16 + +int osd_t::handle_epoll_events() { - + epoll_event events[MAX_EPOLL_EVENTS]; + int count = 0; + int nfds; + // FIXME: We shouldn't probably handle ALL available events, we should sometimes + // yield control to Blockstore and possibly other consumers + while ((nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, 0)) > 0) + { + for (int i = 0; i < nfds; i++) + { + if (events[i].data.fd == listen_fd) + { + // Accept new connections + struct sockaddr_in addr; + socklen_t peer_addr_size = sizeof(addr); + int peer_fd; + while ((peer_fd = accept(listen_fd, &addr, &peer_addr_size)) >= 0) + { + clients[peer_fd] = { + .peer_addr = addr, + .peer_addr_size = peer_addr_size, + .peer_fd = peer_fd, + .ready = false, + }; + // Add FD to epoll + struct epoll_event ev; + ev.data.fd = peer_fd; + ev.events = EPOLLIN | EPOLLHUP; + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0) + { + throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); + } + // Try to accept next connection + peer_addr_size = sizeof(addr); + } + if (peer_fd == -1 && errno != EAGAIN) + { + throw std::runtime_error(std::string("accept: ") + strerror(errno)); + } + } + else + { + auto & cl = clients[events[i].data.fd]; + if (events[i].events & EPOLLHUP) + { + // Stop client + struct epoll_event ev; + ev.data.fd = cl.peer_fd; + ev.events = EPOLLIN | EPOLLHUP; + if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, cl.peer_fd, &ev) < 0) + { + throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); + } + auto it = clients.find(cl.peer_fd); + clients.erase(it); + if (cl.ready) + { + for (auto it = ready_clients.begin(); it != ready_clients.end(); it++) + { + if (*it == cl.peer_fd) + { + ready_clients.erase(it); + break; + } + } + } + } + else if (!cl.ready) + { + // Mark client as ready (i.e. some commands are available) + cl.ready = true; + ready_clients.push_back(cl.peer_fd); + } + } + count++; + } + } + return count; }