From 73c80e2c394ca25d8a4a2f20e4ed47dcc7baa001 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 8 Jun 2020 01:32:12 +0300 Subject: [PATCH] Move accept_connections() to osd_messenger_t, add a simple uring OSD stub --- Makefile | 20 ++++--- epoll_manager.cpp | 18 +++---- messenger.cpp | 36 +++++++++++++ messenger.h | 1 + osd.cpp | 38 +------------ osd.h | 1 - stub_bench.cpp | 21 ++++++-- stub_uring_osd.cpp | 129 +++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 206 insertions(+), 58 deletions(-) create mode 100644 stub_uring_osd.cpp diff --git a/Makefile b/Makefile index 22002477..85e4bf40 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_impl.o blockstore_init.o blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_rollback.o blockstore_flush.o crc32c.o ringloop.o # -fsanitize=address CXXFLAGS := -g -O3 -Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fPIC -fdiagnostics-color=always -all: libfio_blockstore.so osd libfio_sec_osd.so libfio_cluster.so stub_osd stub_bench osd_test dump_journal +all: libfio_blockstore.so osd libfio_sec_osd.so libfio_cluster.so stub_osd stub_uring_osd stub_bench osd_test dump_journal clean: rm -f *.o @@ -17,15 +17,19 @@ libfio_blockstore.so: ./libblockstore.so fio_engine.o json11.o OSD_OBJS := osd.o osd_secondary.o msgr_receive.o msgr_send.o osd_peering.o osd_flush.o osd_peering_pg.o \ osd_primary.o osd_primary_subops.o etcd_state_client.o messenger.o osd_cluster.o http_client.o pg_states.o \ osd_rmw.o json11.o base64.o timerfd_manager.o - osd: ./libblockstore.so osd_main.cpp osd.h osd_ops.h $(OSD_OBJS) - g++ $(CXXFLAGS) -o osd osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring -stub_osd: stub_osd.cpp osd_ops.h rw_blocking.o - g++ $(CXXFLAGS) -o stub_osd stub_osd.cpp rw_blocking.o -ltcmalloc_minimal + g++ $(CXXFLAGS) -o $@ osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring + +stub_osd: stub_osd.o rw_blocking.o + g++ $(CXXFLAGS) -o $@ stub_osd.o rw_blocking.o -ltcmalloc_minimal + +STUB_URING_OSD_OBJS := stub_uring_osd.o epoll_manager.o messenger.o msgr_send.o msgr_receive.o ringloop.o timerfd_manager.o json11.o +stub_uring_osd: $(STUB_URING_OSD_OBJS) + g++ $(CXXFLAGS) -o $@ -ltcmalloc_minimal $(STUB_URING_OSD_OBJS) -luring stub_bench: stub_bench.cpp osd_ops.h rw_blocking.o - g++ $(CXXFLAGS) -o stub_bench stub_bench.cpp rw_blocking.o -ltcmalloc_minimal + g++ $(CXXFLAGS) -o $@ stub_bench.cpp rw_blocking.o -ltcmalloc_minimal osd_test: osd_test.cpp osd_ops.h rw_blocking.o - g++ $(CXXFLAGS) -o osd_test osd_test.cpp rw_blocking.o -ltcmalloc_minimal + g++ $(CXXFLAGS) -o $@ osd_test.cpp rw_blocking.o -ltcmalloc_minimal osd_peering_pg_test: osd_peering_pg_test.cpp osd_peering_pg.o g++ $(CXXFLAGS) -o $@ $< osd_peering_pg.o -ltcmalloc_minimal @@ -135,6 +139,8 @@ stub_bench.o: stub_bench.cpp object_id.h osd_id.h osd_ops.h rw_blocking.h g++ $(CXXFLAGS) -c -o $@ $< stub_osd.o: stub_osd.cpp object_id.h osd_id.h osd_ops.h rw_blocking.h g++ $(CXXFLAGS) -c -o $@ $< +stub_uring_osd.o: stub_uring_osd.cpp epoll_manager.h json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h + g++ $(CXXFLAGS) -c -o $@ $< test.o: test.cpp allocator.h blockstore.h blockstore_flush.h blockstore_impl.h blockstore_init.h blockstore_journal.h cpp-btree/btree_map.h crc32c.h object_id.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h g++ $(CXXFLAGS) -c -o $@ $< test_allocator.o: test_allocator.cpp allocator.h diff --git a/epoll_manager.cpp b/epoll_manager.cpp index f79cbba9..e8f0edad 100644 --- a/epoll_manager.cpp +++ b/epoll_manager.cpp @@ -75,15 +75,13 @@ void epoll_manager_t::handle_epoll_events() ringloop->submit(); int nfds; epoll_event events[MAX_EPOLL_EVENTS]; -restart: - nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, 0); - for (int i = 0; i < nfds; i++) + do { - auto & cb = epoll_handlers[events[i].data.fd]; - cb(events[i].data.fd, events[i].events); - } - if (nfds == MAX_EPOLL_EVENTS) - { - goto restart; - } + nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, 0); + for (int i = 0; i < nfds; i++) + { + auto & cb = epoll_handlers[events[i].data.fd]; + cb(events[i].data.fd, events[i].events); + } + } while (nfds == MAX_EPOLL_EVENTS); } diff --git a/messenger.cpp b/messenger.cpp index 514dd83d..1f17ff73 100644 --- a/messenger.cpp +++ b/messenger.cpp @@ -360,3 +360,39 @@ void osd_messenger_t::stop_client(int peer_fd) repeer_pgs(repeer_osd); } } + +void osd_messenger_t::accept_connections(int listen_fd) +{ + // Accept new connections + sockaddr_in addr; + socklen_t peer_addr_size = sizeof(addr); + int peer_fd; + while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0) + { + assert(peer_fd != 0); + char peer_str[256]; + printf("[OSD %lu] new client %d: connection from %s port %d\n", this->osd_num, peer_fd, + inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port)); + fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); + int one = 1; + setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); + clients[peer_fd] = { + .peer_addr = addr, + .peer_port = ntohs(addr.sin_port), + .peer_fd = peer_fd, + .peer_state = PEER_CONNECTED, + .in_buf = malloc(receive_buffer_size), + }; + // Add FD to epoll + tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events) + { + handle_peer_epoll(peer_fd, epoll_events); + }); + // Try to accept next connection + peer_addr_size = sizeof(addr); + } + if (peer_fd == -1 && errno != EAGAIN) + { + throw std::runtime_error(std::string("accept: ") + strerror(errno)); + } +} diff --git a/messenger.h b/messenger.h index 83a7cadd..9755d9de 100644 --- a/messenger.h +++ b/messenger.h @@ -192,6 +192,7 @@ public: void handle_peer_epoll(int peer_fd, int epoll_events); void read_requests(); void send_replies(); + void accept_connections(int listen_fd); protected: void try_connect_peer(uint64_t osd_num); diff --git a/osd.cpp b/osd.cpp index 29e07c87..f73dec79 100644 --- a/osd.cpp +++ b/osd.cpp @@ -265,7 +265,7 @@ restart: { if (events[i].data.fd == listen_fd) { - accept_connections(); + c_cli.accept_connections(listen_fd); } else { @@ -279,42 +279,6 @@ restart: } } -void osd_t::accept_connections() -{ - // Accept new connections - sockaddr_in addr; - socklen_t peer_addr_size = sizeof(addr); - int peer_fd; - while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0) - { - assert(peer_fd != 0); - char peer_str[256]; - printf("[OSD %lu] new client %d: connection from %s port %d\n", this->osd_num, peer_fd, - inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port)); - fcntl(peer_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK); - int one = 1; - setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); - c_cli.clients[peer_fd] = { - .peer_addr = addr, - .peer_port = ntohs(addr.sin_port), - .peer_fd = peer_fd, - .peer_state = PEER_CONNECTED, - .in_buf = malloc(c_cli.receive_buffer_size), - }; - // Add FD to epoll - set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events) - { - c_cli.handle_peer_epoll(peer_fd, epoll_events); - }); - // Try to accept next connection - peer_addr_size = sizeof(addr); - } - if (peer_fd == -1 && errno != EAGAIN) - { - throw std::runtime_error(std::string("accept: ") + strerror(errno)); - } -} - void osd_t::exec_op(osd_op_t *cur_op) { clock_gettime(CLOCK_REALTIME, &cur_op->tv_begin); diff --git a/osd.h b/osd.h index d28829fc..20d4d212 100644 --- a/osd.h +++ b/osd.h @@ -149,7 +149,6 @@ class osd_t // event loop, socket read/write void loop(); - void accept_connections(); void set_fd_handler(int fd, std::function handler); void handle_epoll_events(); diff --git a/stub_bench.cpp b/stub_bench.cpp index 2de2113e..fb855450 100644 --- a/stub_bench.cpp +++ b/stub_bench.cpp @@ -30,15 +30,30 @@ static uint64_t sync_sum = 0, sync_count = 0; void handle_sigint(int sig) { - printf("4k randwrite: %lu us avg\n", write_sum/write_count); - printf("sync: %lu us avg\n", sync_sum/sync_count); + printf("4k randwrite: %lu us avg\n", write_count ? write_sum/write_count : 0); + printf("sync: %lu us avg\n", sync_count ? sync_sum/sync_count : 0); exit(0); } int main(int narg, char *args[]) { + if (narg < 2) + { + printf("USAGE: %s SERVER_IP [PORT]\n", args[0]); + return 1; + } + int port = 11203; + if (narg >= 3) + { + port = atoi(args[2]); + if (port <= 0 || port >= 65536) + { + printf("Bad port number\n"); + return 1; + } + } signal(SIGINT, handle_sigint); - int peer_fd = connect_stub("127.0.0.1", 11203); + int peer_fd = connect_stub(args[1], port); run_bench(peer_fd); close(peer_fd); return 0; diff --git a/stub_uring_osd.cpp b/stub_uring_osd.cpp new file mode 100644 index 00000000..07a9b777 --- /dev/null +++ b/stub_uring_osd.cpp @@ -0,0 +1,129 @@ +/** + * Stub "OSD" implemented on top of osd_messenger to test & compare + * network performance with sync read/write and io_uring + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "ringloop.h" +#include "epoll_manager.h" +#include "messenger.h" + +int bind_stub(const char *bind_address, int bind_port); + +void stub_exec_op(osd_messenger_t *msgr, osd_op_t *op); + +int main(int narg, char *args[]) +{ + ring_consumer_t looper; + ring_loop_t *ringloop = new ring_loop_t(512); + epoll_manager_t *epmgr = new epoll_manager_t(ringloop); + osd_messenger_t *msgr = new osd_messenger_t(); + msgr->osd_num = 1351; + msgr->tfd = epmgr->tfd; + msgr->ringloop = ringloop; + msgr->repeer_pgs = [](osd_num_t) {}; + msgr->exec_op = [msgr](osd_op_t *op) { stub_exec_op(msgr, op); }; + // Accept new connections + int listen_fd = bind_stub("0.0.0.0", 11203); + epmgr->set_fd_handler(listen_fd, [listen_fd, msgr](int fd, int events) + { + msgr->accept_connections(listen_fd); + }); + looper.loop = [msgr, ringloop]() + { + msgr->read_requests(); + msgr->send_replies(); + ringloop->submit(); + }; + ringloop->register_consumer(&looper); + printf("stub_uring_osd: waiting for clients\n"); + while (true) + { + ringloop->loop(); + ringloop->wait(); + } + delete msgr; + delete epmgr; + delete ringloop; + return 0; +} + +int bind_stub(const char *bind_address, int bind_port) +{ + int listen_backlog = 128; + + int listen_fd = socket(AF_INET, SOCK_STREAM, 0); + if (listen_fd < 0) + { + throw std::runtime_error(std::string("socket: ") + strerror(errno)); + } + int enable = 1; + setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)); + + sockaddr_in addr; + int r; + if ((r = inet_pton(AF_INET, bind_address, &addr.sin_addr)) != 1) + { + close(listen_fd); + throw std::runtime_error("bind address "+std::string(bind_address)+(r == 0 ? " is not valid" : ": no ipv4 support")); + } + addr.sin_family = AF_INET; + addr.sin_port = htons(bind_port); + + if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0) + { + close(listen_fd); + throw std::runtime_error(std::string("bind: ") + strerror(errno)); + } + + if (listen(listen_fd, listen_backlog) < 0) + { + close(listen_fd); + throw std::runtime_error(std::string("listen: ") + strerror(errno)); + } + + fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK); + + return listen_fd; +} + +void stub_exec_op(osd_messenger_t *msgr, osd_op_t *op) +{ + op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; + op->reply.hdr.id = op->req.hdr.id; + op->reply.hdr.opcode = op->req.hdr.opcode; + op->send_list.push_back(op->reply.buf, OSD_PACKET_SIZE); + if (op->req.hdr.opcode == OSD_OP_SECONDARY_READ) + { + op->reply.hdr.retval = op->req.sec_rw.len; + op->buf = malloc(op->req.sec_rw.len); + op->send_list.push_back(op->buf, op->req.sec_rw.len); + } + else if (op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE) + { + op->reply.hdr.retval = op->req.sec_rw.len; + } + else if (op->req.hdr.opcode == OSD_OP_TEST_SYNC_STAB_ALL) + { + op->reply.hdr.retval = 0; + } + else + { + printf("client %d: unsupported stub opcode: %lu\n", op->peer_fd, op->req.hdr.opcode); + op->reply.hdr.retval = -EINVAL; + } + msgr->outbox_push(op); +}