From 283d03ef188348224a517303811d5a6e5ad55142 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 13 Dec 2019 14:05:11 +0300 Subject: [PATCH] Fix "address already in use" in test example, begin client read loop --- osd.cpp | 112 ++++++++++++++++++++++++++++++++++++++++++----------- ringloop.h | 1 + test.cpp | 16 ++++++-- 3 files changed, 103 insertions(+), 26 deletions(-) diff --git a/osd.cpp b/osd.cpp index 9e988e8d..603683d0 100644 --- a/osd.cpp +++ b/osd.cpp @@ -12,6 +12,12 @@ struct osd_client_t socklen_t peer_addr_size; int peer_fd; bool ready; + bool reading; + + iovec iov; + msghdr msg; + void *cur_buf = NULL; + int cur_len = 0; }; class osd_t @@ -44,6 +50,8 @@ osd_t::osd_t(ring_loop_t *ringloop) { throw std::runtime_error(std::string("socket: ") + strerror(errno)); } + int enable = 1; + setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)); sockaddr_in addr; if ((int r = inet_pton(AF_INET, bind_address.c_str(), &addr.sin_addr)) != 1) @@ -52,7 +60,7 @@ osd_t::osd_t(ring_loop_t *ringloop) throw std::runtime_error("bind address "+bind_address+(r == 0 ? " is not valid" : ": no ipv4 support")); } addr.sin_family = AF_INET; - addr.sin_port = bind_port; + addr.sin_port = htons(bind_port); if (bind(listen_fd, &addr, sizeof(addr)) < 0) { @@ -142,6 +150,7 @@ int osd_t::handle_epoll_events() int peer_fd; while ((peer_fd = accept(listen_fd, &addr, &peer_addr_size)) >= 0) { + fcntl(peer_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK); clients[peer_fd] = { .peer_addr = addr, .peer_addr_size = peer_addr_size, @@ -170,32 +179,14 @@ int osd_t::handle_epoll_events() if (events[i].events & EPOLLHUP) { // Stop client - struct epoll_event ev; - ev.data.fd = cl.peer_fd; - ev.events = EPOLLIN | EPOLLHUP; - if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, cl.peer_fd, &ev) < 0) - { - throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); - } - auto it = clients.find(cl.peer_fd); - clients.erase(it); - if (cl.ready) - { - for (auto it = ready_clients.begin(); it != ready_clients.end(); it++) - { - if (*it == cl.peer_fd) - { - ready_clients.erase(it); - break; - } - } - } + stop_client(cl.peer_fd); } else if (!cl.ready) { - // Mark client as ready (i.e. some commands are available) + // Mark client as ready (i.e. some data is available) cl.ready = true; - ready_clients.push_back(cl.peer_fd); + if (!cl.reading) + ready_clients.push_back(cl.peer_fd); } } count++; @@ -203,3 +194,78 @@ int osd_t::handle_epoll_events() } return count; } + +void osd_t::stop_client(int peer_fd) +{ + struct epoll_event ev; + ev.data.fd = peer_fd; + ev.events = EPOLLIN | EPOLLHUP; + if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, peer_fd, &ev) < 0) + { + throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); + } + auto it = clients.find(peer_fd); + clients.erase(it); + if (cl.ready) + { + for (auto it = ready_clients.begin(); it != ready_clients.end(); it++) + { + if (*it == peer_fd) + { + ready_clients.erase(it); + break; + } + } + } + close(peer_fd); +} + +void osd_t::read_commands() +{ + for (int i = 0; i < ready_clients.size(); i++) + { + int peer_fd = ready_clients[i]; + auto & cl = clients[peer_fd]; + if (!cl.cur_buf) + { + // no reads in progress, this is probably a new command + cl.cur_buf = cl.command_buffer; + cl.cur_len = OSD_OP_PACKET_SIZE; + } + struct io_uring_sqe* sqe = ringloop->get_sqe(); + if (!sqe) + { + ready_clients.erase(ready_clients.begin(), ready_clients().begin() + i); + return; + } + struct ring_data_t* data = ((ring_data_t*)sqe->user_data); + cl.iov.iov_base = cl.cur_buf; + cl.iov.iov_len = cl.cur_len; + cl.msg.msg_iov = &cl.iov; + cl.msg.msg_iovlen = 1; + data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data, peer_fd); }; + my_uring_prep_recvmsg(sqe, peer_fd, &cl.msg, 0); + ringloop->submit(); + cl.reading = true; + } + ready_clients.clear(); +} + +void osd_t::handle_read(ring_data_t *data, int peer_fd) +{ + auto cl = clients.find(peer_fd); + if (cl != clients.end()) + { + if (data->res < 0 && data->res != -EAGAIN) + { + // this is a client socket, so don't panic. just disconnect it + printf("Client %d socket read error: %d (%s). Disconnecting client\n", peer_fd, -data->res, strerror(-data->res)); + stop_client(peer_fd); + return; + } + cl->reading = false; + if (cl->ready) + ready_clients.push_back(peer_fd); + + } +} diff --git a/ringloop.h b/ringloop.h index 0c573c09..17eff381 100644 --- a/ringloop.h +++ b/ringloop.h @@ -127,6 +127,7 @@ public: ~ring_loop_t(); inline struct io_uring_sqe* get_sqe() { + // FIXME: Limit inflight ops count to not overflow the completion ring struct io_uring_sqe* sqe = io_uring_get_sqe(&ring); if (sqe) { diff --git a/test.cpp b/test.cpp index 35121993..470d6d0e 100644 --- a/test.cpp +++ b/test.cpp @@ -239,12 +239,19 @@ int main02(int argc, char *argv[]) int main(int argc, char *argv[]) { - int listen_fd = socket(AF_INET, SOCK_STREAM, 0); + int listen_fd = socket(AF_INET, SOCK_STREAM, 0), enable = 1; + assert(listen_fd >= 0); + setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)); struct sockaddr_in bind_addr; assert(inet_pton(AF_INET, "0.0.0.0", &bind_addr.sin_addr) == 1); bind_addr.sin_family = AF_INET; - bind_addr.sin_port = 13892; - assert(bind(listen_fd, (sockaddr*)&bind_addr, sizeof(bind_addr)) == 0); + bind_addr.sin_port = htons(13892); + int r = bind(listen_fd, (sockaddr*)&bind_addr, sizeof(bind_addr)); + if (r) + { + perror("bind"); + return 1; + } assert(listen(listen_fd, 128) == 0); struct sockaddr_in peer_addr; socklen_t peer_addr_size = sizeof(peer_addr); @@ -271,6 +278,9 @@ int main(int argc, char *argv[]) printf("cqe result: %d\n", ret); // ok, io_uring's sendmsg always reads as much data as is available and finishes io_uring_cqe_seen(&ring, cqe); + close(peer_fd); + close(listen_fd); + io_uring_queue_exit(&ring); return 0; }