From 298b013eaef85043427446c0e69fcafb201881fd Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Tue, 7 Apr 2020 01:53:13 +0300 Subject: [PATCH] Add simple http request function --- Makefile | 4 +- osd.cpp | 43 +++++---- osd.h | 6 ++ osd_cluster.cpp | 242 ++++++++++++++++++++++++++++++++++++++++++++++++ osd_peering.cpp | 31 ++++--- osd_primary.cpp | 6 +- 6 files changed, 299 insertions(+), 33 deletions(-) create mode 100644 osd_cluster.cpp diff --git a/Makefile b/Makefile index c336f263..dc903769 100644 --- a/Makefile +++ b/Makefile @@ -27,7 +27,7 @@ libblockstore.so: $(BLOCKSTORE_OBJS) libfio_blockstore.so: ./libblockstore.so fio_engine.cpp json11.o g++ $(CXXFLAGS) -shared -o libfio_blockstore.so fio_engine.cpp json11.o ./libblockstore.so -ltcmalloc_minimal -luring -OSD_OBJS := osd.o osd_secondary.o osd_receive.o osd_send.o osd_peering.o osd_flush.o osd_peering_pg.o osd_primary.o osd_rmw.o json11.o timerfd_interval.o +OSD_OBJS := osd.o osd_secondary.o osd_receive.o osd_send.o osd_peering.o osd_flush.o osd_peering_pg.o osd_primary.o osd_cluster.o osd_rmw.o json11.o timerfd_interval.o osd_secondary.o: osd_secondary.cpp osd.h osd_ops.h ringloop.h g++ $(CXXFLAGS) -c -o $@ $< osd_receive.o: osd_receive.cpp osd.h osd_ops.h ringloop.h @@ -36,6 +36,8 @@ osd_send.o: osd_send.cpp osd.h osd_ops.h ringloop.h g++ $(CXXFLAGS) -c -o $@ $< osd_peering.o: osd_peering.cpp osd.h osd_ops.h osd_peering_pg.h ringloop.h g++ $(CXXFLAGS) -c -o $@ $< +osd_cluster.o: osd_cluster.cpp osd.h osd_ops.h ringloop.h + g++ $(CXXFLAGS) -c -o $@ $< osd_flush.o: osd_flush.cpp osd.h osd_ops.h osd_peering_pg.h ringloop.h g++ $(CXXFLAGS) -c -o $@ $< osd_peering_pg.o: osd_peering_pg.cpp object_id.h osd_peering_pg.h diff --git a/osd.cpp b/osd.cpp index 4def2b6d..72bd0f23 100644 --- a/osd.cpp +++ b/osd.cpp @@ -245,27 +245,36 @@ restart: } else { - auto & cl = clients[events[i].data.fd]; - if (cl.peer_state == PEER_CONNECTING) + auto cl_it = clients.find(events[i].data.fd); + if (cl_it != clients.end()) { - // Either OUT (connected) or HUP - handle_connect_result(cl.peer_fd); - } - else if (events[i].events & EPOLLRDHUP) - { - // Stop client - printf("osd: client %d disconnected\n", cl.peer_fd); - stop_client(cl.peer_fd); + auto & cl = cl_it->second; + if (cl.peer_state == PEER_CONNECTING) + { + // Either OUT (connected) or HUP + handle_connect_result(cl.peer_fd); + } + else if (events[i].events & EPOLLRDHUP) + { + // Stop client + printf("osd: client %d disconnected\n", cl.peer_fd); + stop_client(cl.peer_fd); + } + else + { + // Mark client as ready (i.e. some data is available) + cl.read_ready++; + if (cl.read_ready == 1) + { + read_ready_clients.push_back(cl.peer_fd); + ringloop->wakeup(); + } + } } else { - // Mark client as ready (i.e. some data is available) - cl.read_ready++; - if (cl.read_ready == 1) - { - read_ready_clients.push_back(cl.peer_fd); - ringloop->wakeup(); - } + auto & cb = epoll_handlers[events[i].data.fd]; + cb(events[i].events, events[i].data.fd); } } } diff --git a/osd.h b/osd.h index 3f81f6e1..3c975753 100644 --- a/osd.h +++ b/osd.h @@ -185,8 +185,11 @@ struct osd_recovery_op_t class osd_t { + friend struct http_co_t; + // config + std::string consul_address; osd_num_t osd_num = 1; // OSD numbers start with 1 bool run_primary = false; std::vector peers; @@ -230,6 +233,7 @@ class osd_t int epoll_fd = 0; int listen_fd = 0; ring_consumer_t consumer; + std::map> epoll_handlers; std::unordered_map clients; std::vector read_ready_clients; @@ -258,6 +262,7 @@ class osd_t void send_replies(); void handle_send(ring_data_t *data, int peer_fd); void outbox_push(osd_client_t & cl, osd_op_t *op); + void http_request(std::string host, std::string request, std::function callback); // peer handling (primary OSD logic) void connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function callback); @@ -309,6 +314,7 @@ class osd_t { return (oid.inode + oid.stripe / pg_stripe_size) % pg_count + 1; } + public: osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop); ~osd_t(); diff --git a/osd_cluster.cpp b/osd_cluster.cpp new file mode 100644 index 00000000..69459a8e --- /dev/null +++ b/osd_cluster.cpp @@ -0,0 +1,242 @@ +#include +#include + +#include "osd.h" + +int get_port(std::string & host) +{ + int port = 0; + int pos = 0; + if ((pos = host.find(':')) >= 0) + { + port = strtoull(host.c_str() + pos + 1, NULL, 10); + if (port >= 0x10000) + { + port = 0; + } + host = host.substr(0, pos); + } + return port; +} + +struct http_co_t +{ + osd_t *osd; + std::string host; + std::string request; + std::vector response; + + int st = 0; + int peer_fd = -1; + int epoll_events = 0; + int code = 0; + int sent = 0, received = 0; + iovec iov; + msghdr msg = { 0 }; + int cqe_res = 0; + + std::function callback; + std::function epoll_handler; + + ~http_co_t(); + void resume(); +}; + +void osd_t::http_request(std::string host, std::string request, std::function callback) +{ + http_co_t *handler = new http_co_t(); + handler->osd = this; + handler->host = host; + handler->request = request; + handler->callback = callback; + handler->epoll_handler = [this, handler](int peer_fd, int epoll_events) + { + handler->epoll_events |= epoll_events; + handler->resume(); + }; + handler->resume(); +} + +http_co_t::~http_co_t() +{ + response.resize(response.size()+1); + response[response.size()-1] = 0; + callback(code, std::string(response.data(), response.size())); + if (peer_fd >= 0) + { + osd->epoll_handlers.erase(peer_fd); + epoll_ctl(osd->epoll_fd, EPOLL_CTL_DEL, peer_fd, NULL); + close(peer_fd); + peer_fd = -1; + } +} + +void http_co_t::resume() +{ + if (st == 0) + { + int port = get_port(host); + struct sockaddr_in addr; + int r; + if ((r = inet_pton(AF_INET, host.c_str(), &addr.sin_addr)) != 1) + { + delete this; + return; + } + addr.sin_family = AF_INET; + addr.sin_port = htons(port ? port : 80); + peer_fd = socket(AF_INET, SOCK_STREAM, 0); + if (peer_fd < 0) + { + delete this; + return; + } + fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); + r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); + if (r < 0 && errno != EINPROGRESS) + { + delete this; + return; + } + osd->epoll_handlers[peer_fd] = epoll_handler; + // Add FD to epoll (EPOLLOUT for tracking connect() result) + epoll_event ev; + ev.data.fd = peer_fd; + ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET; + if (epoll_ctl(osd->epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0) + { + throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); + } + st = 1; + return; + } + if (st == 1) + { + if (epoll_events & EPOLLOUT) + { + int result = 0; + socklen_t result_len = sizeof(result); + if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) + { + result = errno; + } + if (result != 0) + { + delete this; + return; + } + int one = 1; + setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); + // Disable EPOLLOUT on this fd + epoll_event ev; + ev.data.fd = peer_fd; + ev.events = EPOLLIN | EPOLLRDHUP | EPOLLET; + if (epoll_ctl(osd->epoll_fd, EPOLL_CTL_MOD, peer_fd, &ev) < 0) + { + throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); + } + st = 2; + epoll_events = 0; + resume(); + return; + } + else if (epoll_events & EPOLLRDHUP) + { + delete this; + return; + } + else + { + return; + } + } + // Write data + if (st == 2) + { + io_uring_sqe *sqe = osd->ringloop->get_sqe(); + if (!sqe) + return; + ring_data_t* data = ((ring_data_t*)sqe->user_data); + iov = (iovec){ .iov_base = (void*)(request.c_str()+sent), .iov_len = request.size()-sent }; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + data->callback = [this](ring_data_t *data) + { + st = 4; + cqe_res = data->res; + resume(); + }; + my_uring_prep_sendmsg(sqe, peer_fd, &msg, 0); + st = 3; + return; + } + if (st == 3) + { + return; + } + if (st == 4) + { + if (cqe_res < 0 && cqe_res != -EAGAIN) + { + delete this; + return; + } + sent += cqe_res; + if (sent < request.size()) + st = 2; + else + st = 5; + resume(); + return; + } + // Read response + if (st == 5) + { + if (epoll_events & EPOLLIN) + { + response.resize(received + 9000); + io_uring_sqe *sqe = osd->ringloop->get_sqe(); + if (!sqe) + return; + ring_data_t* data = ((ring_data_t*)sqe->user_data); + iov = { .iov_base = response.data()+received, .iov_len = 9000 }; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + data->callback = [this](ring_data_t *data) + { + st = 7; + cqe_res = data->res; + resume(); + }; + my_uring_prep_recvmsg(sqe, peer_fd, &msg, 0); + st = 6; + epoll_events = 0; + } + else if (epoll_events & EPOLLRDHUP) + { + delete this; + return; + } + } + if (st == 6) + { + return; + } + if (st == 7) + { + if (cqe_res < 0 && cqe_res != -EAGAIN) + { + delete this; + return; + } + received += cqe_res; + st = 5; + resume(); + return; + } +} + +/*void osd_t::get_pgs() +{ + //consul_address +}*/ diff --git a/osd_peering.cpp b/osd_peering.cpp index a0ea98d3..3b763f5f 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -22,7 +22,7 @@ void osd_t::init_primary() if (peers.size() < 2) throw std::runtime_error("run_primary requires at least 2 peers"); pgs[1] = (pg_t){ - .state = PG_OFFLINE, + .state = PG_PEERING, .pg_cursize = 0, .pg_num = 1, .target_set = { 1, 2, 3 }, @@ -240,22 +240,25 @@ void osd_t::repeer_pgs(osd_num_t osd_num, bool is_connected) for (auto & p: pgs) { bool repeer = false; - for (int r = 0; r < p.second.target_set.size(); r++) + if (p.second.state != PG_OFFLINE) { - if (p.second.target_set[r] == osd_num && - p.second.cur_set[r] != real_osd) + for (int r = 0; r < p.second.target_set.size(); r++) { - p.second.cur_set[r] = real_osd; - repeer = true; - break; + if (p.second.target_set[r] == osd_num && + p.second.cur_set[r] != real_osd) + { + p.second.cur_set[r] = real_osd; + repeer = true; + break; + } + } + if (repeer) + { + // Repeer this pg + printf("Repeer PG %d because of OSD %lu\n", p.second.pg_num, osd_num); + start_pg_peering(p.second.pg_num); + peering_state |= OSD_PEERING_PGS; } - } - if (repeer) - { - // Repeer this pg - printf("Repeer PG %d because of OSD %lu\n", p.second.pg_num, osd_num); - start_pg_peering(p.second.pg_num); - peering_state |= OSD_PEERING_PGS; } } } diff --git a/osd_primary.cpp b/osd_primary.cpp index 226c0ac2..f44e852c 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -480,6 +480,11 @@ resume_5: pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); return; } + if (op_data->fact_ver == 1) + { + // Object is created + pg.clean_count++; + } if (op_data->object_state) { if (op_data->object_state->state & OBJ_MISPLACED) @@ -534,7 +539,6 @@ resume_8: { throw std::runtime_error("Invalid object state during recovery: "+std::to_string(op_data->object_state->state)); } - // FIXME: Track object count during normal writes, too pg.clean_count++; op_data->object_state->object_count--; if (!op_data->object_state->object_count)