From 582f485578b4b1b6b470221f2c861010925499da Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Wed, 15 Apr 2020 15:47:06 +0300 Subject: [PATCH] Extract http & getifaddr_list into a separate file --- Makefile | 4 +- osd_cluster.cpp | 285 +---------------------------------------------- osd_http.cpp | 286 ++++++++++++++++++++++++++++++++++++++++++++++++ osd_http.h | 7 ++ 4 files changed, 297 insertions(+), 285 deletions(-) create mode 100644 osd_http.cpp create mode 100644 osd_http.h diff --git a/Makefile b/Makefile index 839bb79b..e8a5a89f 100644 --- a/Makefile +++ b/Makefile @@ -30,7 +30,7 @@ libfio_blockstore.so: ./libblockstore.so fio_engine.cpp json11.o g++ $(CXXFLAGS) -shared -o libfio_blockstore.so fio_engine.cpp json11.o ./libblockstore.so -ltcmalloc_minimal -luring OSD_OBJS := osd.o osd_secondary.o osd_receive.o osd_send.o osd_peering.o osd_flush.o osd_peering_pg.o \ - osd_primary.o osd_cluster.o osd_rmw.o json11.o timerfd_interval.o base64.o timerfd_manager.o + osd_primary.o osd_cluster.o osd_http.o osd_rmw.o json11.o timerfd_interval.o base64.o timerfd_manager.o base64.o: base64.cpp base64.h g++ $(CXXFLAGS) -c -o $@ $< osd_secondary.o: osd_secondary.cpp osd.h osd_ops.h ringloop.h @@ -43,6 +43,8 @@ osd_peering.o: osd_peering.cpp osd.h osd_ops.h osd_peering_pg.h ringloop.h g++ $(CXXFLAGS) -c -o $@ $< osd_cluster.o: osd_cluster.cpp osd.h osd_ops.h ringloop.h g++ $(CXXFLAGS) -c -o $@ $< +osd_http.o: osd_http.cpp osd_http.h osd.h osd_ops.h ringloop.h + g++ $(CXXFLAGS) -c -o $@ $< osd_flush.o: osd_flush.cpp osd.h osd_ops.h osd_peering_pg.h ringloop.h g++ $(CXXFLAGS) -c -o $@ $< osd_peering_pg.o: osd_peering_pg.cpp object_id.h osd_peering_pg.h diff --git a/osd_cluster.cpp b/osd_cluster.cpp index c3efe4c0..d38cfc6e 100644 --- a/osd_cluster.cpp +++ b/osd_cluster.cpp @@ -1,61 +1,5 @@ -#include -#include - -#include -#include - #include "osd.h" - -static int extract_port(std::string & host) -{ - int port = 0; - int pos = 0; - if ((pos = host.find(':')) >= 0) - { - port = strtoull(host.c_str() + pos + 1, NULL, 10); - if (port >= 0x10000) - { - port = 0; - } - host = host.substr(0, pos); - } - return port; -} - -std::vector getifaddr_list() -{ - std::vector addresses; - ifaddrs *list, *ifa; - if (getifaddrs(&list) == -1) - { - throw std::runtime_error(std::string("getifaddrs: ") + strerror(errno)); - } - for (ifa = list; ifa != NULL; ifa = ifa->ifa_next) - { - if (!ifa->ifa_addr) - { - continue; - } - int family = ifa->ifa_addr->sa_family; - if ((family == AF_INET || family == AF_INET6) && - (ifa->ifa_flags & (IFF_UP | IFF_RUNNING | IFF_LOOPBACK)) == (IFF_UP | IFF_RUNNING)) - { - void *addr_ptr; - if (family == AF_INET) - addr_ptr = &((sockaddr_in *)ifa->ifa_addr)->sin_addr; - else - addr_ptr = &((sockaddr_in6 *)ifa->ifa_addr)->sin6_addr; - char addr[INET6_ADDRSTRLEN]; - if (!inet_ntop(family, addr_ptr, addr, INET6_ADDRSTRLEN)) - { - throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno)); - } - addresses.push_back(std::string(addr)); - } - } - freeifaddrs(list); - return addresses; -} +#include "osd_http.h" json11::Json osd_t::get_status() { @@ -155,230 +99,3 @@ void osd_t::report_status() } }); } - -struct http_co_t -{ - osd_t *osd; - std::string host; - std::string request; - std::string response; - std::vector rbuf; - - int st = 0; - int peer_fd = -1; - int epoll_events = 0; - int code = 0; - int sent = 0, received = 0; - iovec iov; - msghdr msg = { 0 }; - int cqe_res = 0; - - std::function callback; - std::function epoll_handler; - - ~http_co_t(); - void resume(); -}; - -void osd_t::http_request(std::string host, std::string request, std::function callback) -{ - http_co_t *handler = new http_co_t(); - handler->osd = this; - handler->host = host; - handler->request = request; - handler->callback = callback; - handler->epoll_handler = [this, handler](int peer_fd, int epoll_events) - { - handler->epoll_events |= epoll_events; - handler->resume(); - }; - handler->resume(); -} - -http_co_t::~http_co_t() -{ - callback(code, response); - if (peer_fd >= 0) - { - osd->epoll_handlers.erase(peer_fd); - epoll_ctl(osd->epoll_fd, EPOLL_CTL_DEL, peer_fd, NULL); - close(peer_fd); - peer_fd = -1; - } -} - -void http_co_t::resume() -{ - if (st == 0) - { - int port = extract_port(host); - struct sockaddr_in addr; - int r; - if ((r = inet_pton(AF_INET, host.c_str(), &addr.sin_addr)) != 1) - { - code = ENXIO; - delete this; - return; - } - addr.sin_family = AF_INET; - addr.sin_port = htons(port ? port : 80); - peer_fd = socket(AF_INET, SOCK_STREAM, 0); - if (peer_fd < 0) - { - code = errno; - delete this; - return; - } - fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); - r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); - if (r < 0 && errno != EINPROGRESS) - { - code = errno; - delete this; - return; - } - osd->epoll_handlers[peer_fd] = epoll_handler; - // Add FD to epoll (EPOLLOUT for tracking connect() result) - epoll_event ev; - ev.data.fd = peer_fd; - ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET; - if (epoll_ctl(osd->epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0) - { - code = errno; - delete this; - return; - } - epoll_events = 0; - st = 1; - return; - } - if (st == 1) - { - if (epoll_events & (EPOLLOUT | EPOLLERR)) - { - int result = 0; - socklen_t result_len = sizeof(result); - if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) - { - result = errno; - } - if (result != 0) - { - code = result; - delete this; - return; - } - int one = 1; - setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); - // Disable EPOLLOUT on this fd - epoll_event ev; - ev.data.fd = peer_fd; - ev.events = EPOLLIN | EPOLLRDHUP | EPOLLET; - if (epoll_ctl(osd->epoll_fd, EPOLL_CTL_MOD, peer_fd, &ev) < 0) - { - code = errno; - delete this; - return; - } - st = 2; - epoll_events = 0; - resume(); - return; - } - else if (epoll_events & EPOLLRDHUP) - { - delete this; - return; - } - else - { - return; - } - } - // Write data - if (st == 2) - { - io_uring_sqe *sqe = osd->ringloop->get_sqe(); - if (!sqe) - return; - ring_data_t* data = ((ring_data_t*)sqe->user_data); - iov = (iovec){ .iov_base = (void*)(request.c_str()+sent), .iov_len = request.size()-sent }; - msg.msg_iov = &iov; - msg.msg_iovlen = 1; - data->callback = [this](ring_data_t *data) - { - st = 4; - cqe_res = data->res; - resume(); - }; - my_uring_prep_sendmsg(sqe, peer_fd, &msg, 0); - st = 3; - return; - } - if (st == 3) - { - return; - } - if (st == 4) - { - if (cqe_res < 0 && cqe_res != -EAGAIN) - { - delete this; - return; - } - sent += cqe_res; - if (sent < request.size()) - st = 2; - else - st = 5; - resume(); - return; - } - // Read response - if (st == 5) - { - if (epoll_events & (EPOLLRDHUP|EPOLLERR)) - { - delete this; - return; - } - else if (epoll_events & EPOLLIN) - { - if (rbuf.size() != 9000) - rbuf.resize(9000); - io_uring_sqe *sqe = osd->ringloop->get_sqe(); - if (!sqe) - return; - ring_data_t* data = ((ring_data_t*)sqe->user_data); - iov = { .iov_base = rbuf.data(), .iov_len = 9000 }; - msg.msg_iov = &iov; - msg.msg_iovlen = 1; - data->callback = [this](ring_data_t *data) - { - st = 7; - cqe_res = data->res; - resume(); - }; - my_uring_prep_recvmsg(sqe, peer_fd, &msg, 0); - st = 6; - epoll_events = 0; - } - } - if (st == 6) - { - return; - } - if (st == 7) - { - if (cqe_res < 0 && cqe_res != -EAGAIN) - { - delete this; - return; - } - response += std::string(rbuf.data(), cqe_res); - received += cqe_res; - st = 5; - resume(); - return; - } -} diff --git a/osd_http.cpp b/osd_http.cpp new file mode 100644 index 00000000..f7a85521 --- /dev/null +++ b/osd_http.cpp @@ -0,0 +1,286 @@ +#include +#include + +#include +#include + +#include "osd_http.h" +#include "osd.h" + +int extract_port(std::string & host) +{ + int port = 0; + int pos = 0; + if ((pos = host.find(':')) >= 0) + { + port = strtoull(host.c_str() + pos + 1, NULL, 10); + if (port >= 0x10000) + { + port = 0; + } + host = host.substr(0, pos); + } + return port; +} + +std::vector getifaddr_list() +{ + std::vector addresses; + ifaddrs *list, *ifa; + if (getifaddrs(&list) == -1) + { + throw std::runtime_error(std::string("getifaddrs: ") + strerror(errno)); + } + for (ifa = list; ifa != NULL; ifa = ifa->ifa_next) + { + if (!ifa->ifa_addr) + { + continue; + } + int family = ifa->ifa_addr->sa_family; + if ((family == AF_INET || family == AF_INET6) && + (ifa->ifa_flags & (IFF_UP | IFF_RUNNING | IFF_LOOPBACK)) == (IFF_UP | IFF_RUNNING)) + { + void *addr_ptr; + if (family == AF_INET) + addr_ptr = &((sockaddr_in *)ifa->ifa_addr)->sin_addr; + else + addr_ptr = &((sockaddr_in6 *)ifa->ifa_addr)->sin6_addr; + char addr[INET6_ADDRSTRLEN]; + if (!inet_ntop(family, addr_ptr, addr, INET6_ADDRSTRLEN)) + { + throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno)); + } + addresses.push_back(std::string(addr)); + } + } + freeifaddrs(list); + return addresses; +} + +struct http_co_t +{ + osd_t *osd; + std::string host; + std::string request; + std::string response; + std::vector rbuf; + + int st = 0; + int peer_fd = -1; + int epoll_events = 0; + int code = 0; + int sent = 0, received = 0; + iovec iov; + msghdr msg = { 0 }; + int cqe_res = 0; + + std::function callback; + std::function epoll_handler; + + ~http_co_t(); + void resume(); +}; + +void osd_t::http_request(std::string host, std::string request, std::function callback) +{ + http_co_t *handler = new http_co_t(); + handler->osd = this; + handler->host = host; + handler->request = request; + handler->callback = callback; + handler->epoll_handler = [this, handler](int peer_fd, int epoll_events) + { + handler->epoll_events |= epoll_events; + handler->resume(); + }; + handler->resume(); +} + +http_co_t::~http_co_t() +{ + callback(code, response); + if (peer_fd >= 0) + { + osd->epoll_handlers.erase(peer_fd); + epoll_ctl(osd->epoll_fd, EPOLL_CTL_DEL, peer_fd, NULL); + close(peer_fd); + peer_fd = -1; + } +} + +void http_co_t::resume() +{ + if (st == 0) + { + int port = extract_port(host); + struct sockaddr_in addr; + int r; + if ((r = inet_pton(AF_INET, host.c_str(), &addr.sin_addr)) != 1) + { + code = ENXIO; + delete this; + return; + } + addr.sin_family = AF_INET; + addr.sin_port = htons(port ? port : 80); + peer_fd = socket(AF_INET, SOCK_STREAM, 0); + if (peer_fd < 0) + { + code = errno; + delete this; + return; + } + fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); + r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); + if (r < 0 && errno != EINPROGRESS) + { + code = errno; + delete this; + return; + } + osd->epoll_handlers[peer_fd] = epoll_handler; + // Add FD to epoll (EPOLLOUT for tracking connect() result) + epoll_event ev; + ev.data.fd = peer_fd; + ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET; + if (epoll_ctl(osd->epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0) + { + code = errno; + delete this; + return; + } + epoll_events = 0; + st = 1; + return; + } + if (st == 1) + { + if (epoll_events & (EPOLLOUT | EPOLLERR)) + { + int result = 0; + socklen_t result_len = sizeof(result); + if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) + { + result = errno; + } + if (result != 0) + { + code = result; + delete this; + return; + } + int one = 1; + setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); + // Disable EPOLLOUT on this fd + epoll_event ev; + ev.data.fd = peer_fd; + ev.events = EPOLLIN | EPOLLRDHUP | EPOLLET; + if (epoll_ctl(osd->epoll_fd, EPOLL_CTL_MOD, peer_fd, &ev) < 0) + { + code = errno; + delete this; + return; + } + st = 2; + epoll_events = 0; + resume(); + return; + } + else if (epoll_events & EPOLLRDHUP) + { + delete this; + return; + } + else + { + return; + } + } + // Write data + if (st == 2) + { + io_uring_sqe *sqe = osd->ringloop->get_sqe(); + if (!sqe) + return; + ring_data_t* data = ((ring_data_t*)sqe->user_data); + iov = (iovec){ .iov_base = (void*)(request.c_str()+sent), .iov_len = request.size()-sent }; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + data->callback = [this](ring_data_t *data) + { + st = 4; + cqe_res = data->res; + resume(); + }; + my_uring_prep_sendmsg(sqe, peer_fd, &msg, 0); + st = 3; + return; + } + if (st == 3) + { + return; + } + if (st == 4) + { + if (cqe_res < 0 && cqe_res != -EAGAIN) + { + delete this; + return; + } + sent += cqe_res; + if (sent < request.size()) + st = 2; + else + st = 5; + resume(); + return; + } + // Read response + if (st == 5) + { + if (epoll_events & (EPOLLRDHUP|EPOLLERR)) + { + delete this; + return; + } + else if (epoll_events & EPOLLIN) + { + if (rbuf.size() != 9000) + rbuf.resize(9000); + io_uring_sqe *sqe = osd->ringloop->get_sqe(); + if (!sqe) + return; + ring_data_t* data = ((ring_data_t*)sqe->user_data); + iov = { .iov_base = rbuf.data(), .iov_len = 9000 }; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + data->callback = [this](ring_data_t *data) + { + st = 7; + cqe_res = data->res; + resume(); + }; + my_uring_prep_recvmsg(sqe, peer_fd, &msg, 0); + st = 6; + epoll_events = 0; + } + } + if (st == 6) + { + return; + } + if (st == 7) + { + if (cqe_res < 0 && cqe_res != -EAGAIN) + { + delete this; + return; + } + response += std::string(rbuf.data(), cqe_res); + received += cqe_res; + st = 5; + resume(); + return; + } +} diff --git a/osd_http.h b/osd_http.h new file mode 100644 index 00000000..6d0aa908 --- /dev/null +++ b/osd_http.h @@ -0,0 +1,7 @@ +#pragma once +#include +#include + +int extract_port(std::string & host); + +std::vector getifaddr_list();