diff --git a/osd.cpp b/osd.cpp index a03a28dd2..af7a87a55 100644 --- a/osd.cpp +++ b/osd.cpp @@ -107,10 +107,13 @@ void osd_t::loop() throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res)); } handle_epoll_events(); - wait_state = 0; }; wait_state = 1; } + else if (wait_state == 2) + { + handle_epoll_events(); + } send_replies(); read_requests(); ringloop->submit(); @@ -121,67 +124,64 @@ void osd_t::loop() int osd_t::handle_epoll_events() { epoll_event events[MAX_EPOLL_EVENTS]; - int count = 0; - int nfds; - // FIXME: We shouldn't probably handle ALL available events, we should sometimes - // yield control to Blockstore and possibly other consumers - while ((nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, 0)) > 0) + int nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, 0); + for (int i = 0; i < nfds; i++) { - for (int i = 0; i < nfds; i++) + if (events[i].data.fd == listen_fd) { - if (events[i].data.fd == listen_fd) + // Accept new connections + sockaddr_in addr; + socklen_t peer_addr_size = sizeof(addr); + int peer_fd; + while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0) { - // Accept new connections - sockaddr_in addr; - socklen_t peer_addr_size = sizeof(addr); - int peer_fd; - while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0) + char peer_str[256]; + printf("osd: new client %d: connection from %s port %d\n", peer_fd, inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port)); + fcntl(peer_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK); + clients[peer_fd] = { + .peer_addr = addr, + .peer_addr_size = peer_addr_size, + .peer_fd = peer_fd, + }; + // Add FD to epoll + epoll_event ev; + ev.data.fd = peer_fd; + ev.events = EPOLLIN | EPOLLRDHUP; + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0) { - char peer_str[256]; - printf("osd: new client %d: connection from %s port %d\n", peer_fd, inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port)); - fcntl(peer_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK); - clients[peer_fd] = { - .peer_addr = addr, - .peer_addr_size = peer_addr_size, - .peer_fd = peer_fd, - }; - // Add FD to epoll - epoll_event ev; - ev.data.fd = peer_fd; - ev.events = EPOLLIN | EPOLLRDHUP; - if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0) - { - throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); - } - // Try to accept next connection - peer_addr_size = sizeof(addr); + throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); } - if (peer_fd == -1 && errno != EAGAIN) + // Try to accept next connection + peer_addr_size = sizeof(addr); + } + if (peer_fd == -1 && errno != EAGAIN) + { + throw std::runtime_error(std::string("accept: ") + strerror(errno)); + } + } + else + { + auto & cl = clients[events[i].data.fd]; + if (events[i].events & EPOLLRDHUP) + { + // Stop client + printf("osd: client %d disconnected\n", cl.peer_fd); + stop_client(cl.peer_fd); + } + else if (!cl.read_ready) + { + // Mark client as ready (i.e. some data is available) + cl.read_ready = true; + if (!cl.reading) { - throw std::runtime_error(std::string("accept: ") + strerror(errno)); + read_ready_clients.push_back(cl.peer_fd); + ringloop->wakeup(); } } - else - { - auto & cl = clients[events[i].data.fd]; - if (events[i].events & EPOLLRDHUP) - { - // Stop client - printf("osd: client %d disconnected\n", cl.peer_fd); - stop_client(cl.peer_fd); - } - else if (!cl.read_ready) - { - // Mark client as ready (i.e. some data is available) - cl.read_ready = true; - if (!cl.reading) - read_ready_clients.push_back(cl.peer_fd); - } - } - count++; } } - return count; + wait_state = nfds == MAX_EPOLL_EVENTS ? 2 : 0; + return nfds; } void osd_t::stop_client(int peer_fd)