diff --git a/cluster_client.h b/cluster_client.h index 066fc3cb..8fbc5d9f 100644 --- a/cluster_client.h +++ b/cluster_client.h @@ -199,10 +199,10 @@ struct cluster_client_t bool try_send(osd_client_t & cl); void send_replies(); - void handle_send(ring_data_t *data, int peer_fd); + void handle_send(int result, int peer_fd); void read_requests(); - void handle_read(ring_data_t *data, int peer_fd); + bool handle_read(int result, int peer_fd); void handle_finished_read(osd_client_t & cl); void handle_op_hdr(osd_client_t *cl); void handle_reply_hdr(osd_client_t *cl); diff --git a/http_client.cpp b/http_client.cpp index 04ee0297..a87c7435 100644 --- a/http_client.cpp +++ b/http_client.cpp @@ -52,6 +52,7 @@ struct http_co_t ~http_co_t(); void start_connection(); + void handle_events(); void handle_connect_result(); void submit_read(); void submit_send(); @@ -142,6 +143,7 @@ void websocket_t::close() http_co_t::~http_co_t() { + epoll_events = 0; if (timeout_id >= 0) { tfd->clear_timer(timeout_id); @@ -204,25 +206,6 @@ void http_co_t::start_connection() delete this; }); } - tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events) - { - this->epoll_events |= epoll_events; - if (state == HTTP_CO_CONNECTING) - { - handle_connect_result(); - } - else - { - if (this->epoll_events & EPOLLIN) - { - submit_read(); - } - else if (this->epoll_events & (EPOLLRDHUP|EPOLLERR)) - { - delete this; - } - } - }); epoll_events = 0; // Finally call connect r = ::connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); @@ -232,40 +215,61 @@ void http_co_t::start_connection() delete this; return; } + tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events) + { + this->epoll_events |= epoll_events; + handle_events(); + }); state = HTTP_CO_CONNECTING; } +void http_co_t::handle_events() +{ + while (epoll_events) + { + if (state == HTTP_CO_CONNECTING) + { + handle_connect_result(); + } + else + { + epoll_events &= ~EPOLLOUT; + if (epoll_events & EPOLLIN) + { + submit_read(); + } + else if (epoll_events & (EPOLLRDHUP|EPOLLERR)) + { + delete this; + return; + } + } + } +} + void http_co_t::handle_connect_result() { - if (epoll_events & (EPOLLOUT | EPOLLERR)) + int result = 0; + socklen_t result_len = sizeof(result); + if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) { - int result = 0; - socklen_t result_len = sizeof(result); - if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) - { - result = errno; - } - if (result != 0) - { - parsed.error_code = result; - delete this; - return; - } - int one = 1; - setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); - state = HTTP_CO_SENDING_REQUEST; - submit_send(); + result = errno; } - else + if (result != 0) { + parsed.error_code = result; delete this; + return; } + int one = 1; + setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); + state = HTTP_CO_SENDING_REQUEST; + submit_send(); } void http_co_t::submit_read() { int res; -again: if (rbuf.size() != READ_BUFFER_SIZE) { rbuf.resize(READ_BUFFER_SIZE); @@ -273,34 +277,24 @@ again: read_iov = { .iov_base = rbuf.data(), .iov_len = READ_BUFFER_SIZE }; read_msg.msg_iov = &read_iov; read_msg.msg_iovlen = 1; - epoll_events = epoll_events & ~EPOLLIN; res = recvmsg(peer_fd, &read_msg, 0); if (res < 0) { res = -errno; } - if (res == -EAGAIN) + if (res == -EAGAIN || res == 0) { - res = 0; + epoll_events = epoll_events & ~EPOLLIN; } - if (res < 0) + else if (res < 0) { delete this; return; } - response += std::string(rbuf.data(), res); - if (res == READ_BUFFER_SIZE) + else if (res > 0) { - goto again; - } - if (!handle_read()) - { - return; - } - if (res < READ_BUFFER_SIZE && (epoll_events & (EPOLLRDHUP|EPOLLERR))) - { - delete this; - return; + response += std::string(rbuf.data(), res); + handle_read(); } } @@ -331,7 +325,9 @@ again: if (state == HTTP_CO_SENDING_REQUEST) { if (sent >= request.size()) + { state = HTTP_CO_REQUEST_SENT; + } else goto again; } diff --git a/osd.cpp b/osd.cpp index 8aab34d7..6829c045 100644 --- a/osd.cpp +++ b/osd.cpp @@ -284,38 +284,7 @@ restart: { if (events[i].data.fd == listen_fd) { - // Accept new connections - sockaddr_in addr; - socklen_t peer_addr_size = sizeof(addr); - int peer_fd; - while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0) - { - assert(peer_fd != 0); - char peer_str[256]; - printf("[OSD %lu] new client %d: connection from %s port %d\n", this->osd_num, peer_fd, - inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port)); - fcntl(peer_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK); - int one = 1; - setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); - c_cli.clients[peer_fd] = { - .peer_addr = addr, - .peer_port = ntohs(addr.sin_port), - .peer_fd = peer_fd, - .peer_state = PEER_CONNECTED, - .in_buf = malloc(c_cli.receive_buffer_size), - }; - // Add FD to epoll - set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events) - { - c_cli.handle_peer_epoll(peer_fd, epoll_events); - }); - // Try to accept next connection - peer_addr_size = sizeof(addr); - } - if (peer_fd == -1 && errno != EAGAIN) - { - throw std::runtime_error(std::string("accept: ") + strerror(errno)); - } + accept_connections(); } else { @@ -329,6 +298,42 @@ restart: } } +void osd_t::accept_connections() +{ + // Accept new connections + sockaddr_in addr; + socklen_t peer_addr_size = sizeof(addr); + int peer_fd; + while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0) + { + assert(peer_fd != 0); + char peer_str[256]; + printf("[OSD %lu] new client %d: connection from %s port %d\n", this->osd_num, peer_fd, + inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port)); + fcntl(peer_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK); + int one = 1; + setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); + c_cli.clients[peer_fd] = { + .peer_addr = addr, + .peer_port = ntohs(addr.sin_port), + .peer_fd = peer_fd, + .peer_state = PEER_CONNECTED, + .in_buf = malloc(c_cli.receive_buffer_size), + }; + // Add FD to epoll + set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events) + { + c_cli.handle_peer_epoll(peer_fd, epoll_events); + }); + // Try to accept next connection + peer_addr_size = sizeof(addr); + } + if (peer_fd == -1 && errno != EAGAIN) + { + throw std::runtime_error(std::string("accept: ") + strerror(errno)); + } +} + void osd_t::exec_op(osd_op_t *cur_op) { clock_gettime(CLOCK_REALTIME, &cur_op->tv_begin); diff --git a/osd.h b/osd.h index f808e26b..1a796287 100644 --- a/osd.h +++ b/osd.h @@ -149,6 +149,7 @@ class osd_t // event loop, socket read/write void loop(); + void accept_connections(); void set_fd_handler(int fd, std::function handler); void handle_epoll_events(); diff --git a/osd_receive.cpp b/osd_receive.cpp index a86e5f9e..4097fc94 100644 --- a/osd_receive.cpp +++ b/osd_receive.cpp @@ -25,26 +25,26 @@ void cluster_client_t::read_requests() } cl.read_msg.msg_iov = &cl.read_iov; cl.read_msg.msg_iovlen = 1; - data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data, peer_fd); }; + data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data->res, peer_fd); }; my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0); } read_ready_clients.clear(); } -void cluster_client_t::handle_read(ring_data_t *data, int peer_fd) +bool cluster_client_t::handle_read(int result, int peer_fd) { auto cl_it = clients.find(peer_fd); if (cl_it != clients.end()) { auto & cl = cl_it->second; - if (data->res < 0 && data->res != -EAGAIN) + if (result < 0 && result != -EAGAIN) { // this is a client socket, so don't panic. just disconnect it - printf("Client %d socket read error: %d (%s). Disconnecting client\n", peer_fd, -data->res, strerror(-data->res)); + printf("Client %d socket read error: %d (%s). Disconnecting client\n", peer_fd, -result, strerror(-result)); stop_client(peer_fd); - return; + return false; } - if (data->res == -EAGAIN || cl.read_iov.iov_base == cl.in_buf && data->res < receive_buffer_size) + if (result == -EAGAIN || result < cl.read_iov.iov_len) { cl.read_ready--; if (cl.read_ready > 0) @@ -54,16 +54,12 @@ void cluster_client_t::handle_read(ring_data_t *data, int peer_fd) { read_ready_clients.push_back(peer_fd); } - if (data->res == -EAGAIN) - { - return; - } - if (data->res > 0) + if (result > 0) { if (cl.read_iov.iov_base == cl.in_buf) { // Compose operation(s) from the buffer - int remain = data->res; + int remain = result; void *curbuf = cl.in_buf; while (remain > 0) { @@ -99,15 +95,20 @@ void cluster_client_t::handle_read(ring_data_t *data, int peer_fd) else { // Long data - cl.read_remaining -= data->res; - cl.read_buf += data->res; + cl.read_remaining -= result; + cl.read_buf += result; if (cl.read_remaining <= 0) { handle_finished_read(cl); } } + if (result >= cl.read_iov.iov_len) + { + return true; + } } } + return false; } void cluster_client_t::handle_finished_read(osd_client_t & cl) diff --git a/osd_send.cpp b/osd_send.cpp index 43fb3bd5..cc0527b6 100644 --- a/osd_send.cpp +++ b/osd_send.cpp @@ -65,7 +65,7 @@ bool cluster_client_t::try_send(osd_client_t & cl) } cl.write_msg.msg_iov = cl.write_op->send_list.get_iovec(); cl.write_msg.msg_iovlen = cl.write_op->send_list.get_size(); - data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data, peer_fd); }; + data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data->res, peer_fd); }; my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0); return true; } @@ -84,34 +84,34 @@ void cluster_client_t::send_replies() write_ready_clients.clear(); } -void cluster_client_t::handle_send(ring_data_t *data, int peer_fd) +void cluster_client_t::handle_send(int result, int peer_fd) { auto cl_it = clients.find(peer_fd); if (cl_it != clients.end()) { auto & cl = cl_it->second; - if (data->res < 0 && data->res != -EAGAIN) + if (result < 0 && result != -EAGAIN) { // this is a client socket, so don't panic. just disconnect it - printf("Client %d socket write error: %d (%s). Disconnecting client\n", peer_fd, -data->res, strerror(-data->res)); + printf("Client %d socket write error: %d (%s). Disconnecting client\n", peer_fd, -result, strerror(-result)); stop_client(peer_fd); return; } - if (data->res >= 0) + if (result >= 0) { osd_op_t *cur_op = cl.write_op; - while (data->res > 0 && cur_op->send_list.sent < cur_op->send_list.count) + while (result > 0 && cur_op->send_list.sent < cur_op->send_list.count) { iovec & iov = cur_op->send_list.buf[cur_op->send_list.sent]; - if (iov.iov_len <= data->res) + if (iov.iov_len <= result) { - data->res -= iov.iov_len; + result -= iov.iov_len; cur_op->send_list.sent++; } else { - iov.iov_len -= data->res; - iov.iov_base += data->res; + iov.iov_len -= result; + iov.iov_base += result; break; } }