From 8ea1ccc192ab86f97deec5b1a95abce849be3b25 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Wed, 15 Jan 2020 22:03:27 +0300 Subject: [PATCH] Add an OSD stub to compare sync socket I/O with io_uring + skip multiple fsyncs that fio issues --- Makefile | 4 +- fio_sec_osd.cpp | 75 +++++++++++----- stub_osd.cpp | 229 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 284 insertions(+), 24 deletions(-) create mode 100644 stub_osd.cpp diff --git a/Makefile b/Makefile index 4a705dfa5..6a2cf255e 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_impl.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \ blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_flush.o crc32c.o ringloop.o timerfd_interval.o CXXFLAGS := -g -O3 -Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fPIC -fdiagnostics-color=always -all: $(BLOCKSTORE_OBJS) libfio_blockstore.so osd libfio_sec_osd.so test_blockstore +all: $(BLOCKSTORE_OBJS) libfio_blockstore.so osd libfio_sec_osd.so test_blockstore stub_osd clean: rm -f *.o @@ -34,6 +34,8 @@ osd.o: osd.cpp osd.h osd_ops.h g++ $(CXXFLAGS) -c -o $@ $< osd: ./libblockstore.so osd_main.cpp osd.h osd_ops.h osd.o osd_exec_secondary.o osd_read.o osd_send.o json11.o g++ $(CXXFLAGS) -o osd osd_main.cpp osd.o osd_exec_secondary.o osd_read.o osd_send.o json11.o ./libblockstore.so -ltcmalloc_minimal -luring +stub_osd: stub_osd.cpp osd_ops.h + g++ $(CXXFLAGS) -o stub_osd stub_osd.cpp -ltcmalloc_minimal libfio_sec_osd.so: fio_sec_osd.cpp osd_ops.h g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -o libfio_sec_osd.so fio_sec_osd.cpp -luring diff --git a/fio_sec_osd.cpp b/fio_sec_osd.cpp index 400c1c663..a2ad4acc8 100644 --- a/fio_sec_osd.cpp +++ b/fio_sec_osd.cpp @@ -38,6 +38,7 @@ struct sec_data /* block_size = 1 << block_order (128KB by default) */ uint64_t block_order = 17, block_size = 1 << 17; std::unordered_map queue; + bool last_sync = false; /* The list of completed io_u structs. */ std::vector completed; uint64_t op_n = 0, inflight = 0; @@ -74,6 +75,9 @@ static struct fio_option options[] = { }, }; +static int read_blocking(int fd, void *read_buf, size_t remaining); +static int write_blocking(int fd, void *write_buf, size_t remaining); + static int sec_setup(struct thread_data *td) { sec_data *bsd; @@ -152,6 +156,10 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) int n = bsd->op_n; fio_ro_check(td, io); + if (io->ddir == DDIR_SYNC && bsd->last_sync) + { + return FIO_Q_COMPLETED; + } io->engine_data = bsd; union @@ -173,6 +181,7 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) op.sec_rw.version = UINT64_MAX; // last unstable op.sec_rw.offset = io->offset % bsd->block_size; op.sec_rw.len = io->xfer_buflen; + bsd->last_sync = false; break; case DDIR_WRITE: op.hdr.opcode = OSD_OP_SECONDARY_WRITE; @@ -183,10 +192,13 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) op.sec_rw.version = 0; // assign automatically op.sec_rw.offset = io->offset % bsd->block_size; op.sec_rw.len = io->xfer_buflen; + bsd->last_sync = false; break; case DDIR_SYNC: // Allowed only for testing: sync & stabilize all unstable object versions op.hdr.opcode = OSD_OP_TEST_SYNC_STAB_ALL; + // fio sends 32 syncs with -fsync=32. we omit 31 of them even though it's not 100% fine (FIXME: fix fio itself) + bsd->last_sync = true; break; default: io->error = EINVAL; @@ -206,23 +218,7 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) if (io->ddir == DDIR_WRITE) { // Send data - void *send_buf = io->xfer_buf; - size_t remaining = io->xfer_buflen; - while (remaining > 0) - { - size_t r = write(bsd->connect_fd, send_buf, remaining); - if (r < 0) - { - if (r != EAGAIN) - { - perror("write"); - exit(1); - } - continue; - } - remaining -= r; - send_buf += r; - } + write_blocking(bsd->connect_fd, io->xfer_buf, io->xfer_buflen); } if (io->error != 0) @@ -230,19 +226,52 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) return FIO_Q_QUEUED; } -void read_blocking(int fd, void *read_buf, size_t remaining) + +static int read_blocking(int fd, void *read_buf, size_t remaining) { - while (remaining > 0) + size_t done = 0; + while (done < remaining) { - size_t r = read(fd, read_buf, remaining); + size_t r = read(fd, read_buf, remaining-done); if (r <= 0) { - perror("read"); - exit(1); + if (!errno) + { + // EOF + return done; + } + else if (errno != EAGAIN && errno != EPIPE) + { + perror("read"); + exit(1); + } + continue; } - remaining -= r; + done += r; read_buf += r; } + return done; +} + +static int write_blocking(int fd, void *write_buf, size_t remaining) +{ + size_t done = 0; + while (done < remaining) + { + size_t r = write(fd, write_buf, remaining-done); + if (r < 0) + { + if (errno != EAGAIN && errno != EPIPE) + { + perror("write"); + exit(1); + } + continue; + } + done += r; + write_buf += r; + } + return done; } static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int max, const struct timespec *t) diff --git a/stub_osd.cpp b/stub_osd.cpp new file mode 100644 index 000000000..c0e4b24ee --- /dev/null +++ b/stub_osd.cpp @@ -0,0 +1,229 @@ +/** + * Stub "OSD" to test & compare network performance with sync read/write and io_uring + * + * Core i7-6700HQ laptop + * + * stub_osd: + * randwrite Q1 S1: 36900 iops + * randwrite Q32 S32: 71000 iops + * randwrite Q32 S32 (multi-fsync fix): 113000 iops + * randread Q1: 67300 iops + * randread Q32: 144000 iops + * + * io_uring osd with #define OSD_STUB: + * randwrite Q1 S1: 30000 iops + * randwrite Q32 S32: 78600 iops + * randwrite Q32 S32 (multi-fsync fix): 125000 iops + * randread Q1: 50700 iops + * randread Q32: 86100 iops + * + * It seems io_uring is fine :) + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "osd_ops.h" + +int bind_stub(const char *bind_address, int bind_port); + +void run_stub(int peer_fd); + +static int write_blocking(int fd, void *write_buf, size_t remaining); + +static int read_blocking(int fd, void *read_buf, size_t remaining); + +int main(int narg, char *args[]) +{ + int listen_fd = bind_stub("0.0.0.0", 11203); + // Accept new connections + sockaddr_in addr; + socklen_t peer_addr_size = sizeof(addr); + int peer_fd; + while (1) + { + printf("stub_osd: waiting for 1 client\n"); + peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size); + if (peer_fd == -1) + { + if (errno == EAGAIN) + continue; + else + throw std::runtime_error(std::string("accept: ") + strerror(errno)); + } + char peer_str[256]; + printf("stub_osd: new client %d: connection from %s port %d\n", peer_fd, + inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port)); + int one = 1; + setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); + run_stub(peer_fd); + close(peer_fd); + printf("stub_osd: client %d disconnected\n", peer_fd); + // Try to accept next connection + peer_addr_size = sizeof(addr); + } + return 0; +} + +int bind_stub(const char *bind_address, int bind_port) +{ + int listen_backlog = 128; + + int listen_fd = socket(AF_INET, SOCK_STREAM, 0); + if (listen_fd < 0) + { + throw std::runtime_error(std::string("socket: ") + strerror(errno)); + } + int enable = 1; + setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)); + + sockaddr_in addr; + int r; + if ((r = inet_pton(AF_INET, bind_address, &addr.sin_addr)) != 1) + { + close(listen_fd); + throw std::runtime_error("bind address "+std::string(bind_address)+(r == 0 ? " is not valid" : ": no ipv4 support")); + } + addr.sin_family = AF_INET; + addr.sin_port = htons(bind_port); + + if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0) + { + close(listen_fd); + throw std::runtime_error(std::string("bind: ") + strerror(errno)); + } + + if (listen(listen_fd, listen_backlog) < 0) + { + close(listen_fd); + throw std::runtime_error(std::string("listen: ") + strerror(errno)); + } + + return listen_fd; +} + +void run_stub(int peer_fd) +{ + union + { + osd_any_op_t op; + uint8_t op_buf[OSD_PACKET_SIZE] = { 0 }; + }; + union + { + osd_any_reply_t reply; + uint8_t reply_buf[OSD_PACKET_SIZE] = { 0 }; + }; + void *buf = NULL; + while (1) + { + int r = read_blocking(peer_fd, op_buf, OSD_PACKET_SIZE); + if (r < OSD_PACKET_SIZE) + { + break; + } + if (op.hdr.magic != SECONDARY_OSD_OP_MAGIC) + { + printf("client %d: bad magic number in operation header\n", peer_fd); + break; + } + reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; + reply.hdr.id = op.hdr.id; + reply.hdr.opcode = op.hdr.opcode; + if (op.hdr.opcode == OSD_OP_SECONDARY_READ) + { + reply.hdr.retval = op.sec_rw.len; + buf = malloc(op.sec_rw.len); + r = write_blocking(peer_fd, reply_buf, OSD_PACKET_SIZE); + if (r == OSD_PACKET_SIZE) + r = write_blocking(peer_fd, &buf, op.sec_rw.len); + free(buf); + if (r < op.sec_rw.len) + break; + } + else if (op.hdr.opcode == OSD_OP_SECONDARY_WRITE) + { + buf = malloc(op.sec_rw.len); + r = read_blocking(peer_fd, buf, op.sec_rw.len); + free(buf); + reply.hdr.retval = op.sec_rw.len; + if (r == op.sec_rw.len) + r = write_blocking(peer_fd, reply_buf, OSD_PACKET_SIZE); + else + r = 0; + if (r < OSD_PACKET_SIZE) + break; + } + else if (op.hdr.opcode == OSD_OP_TEST_SYNC_STAB_ALL) + { + reply.hdr.retval = 0; + r = write_blocking(peer_fd, reply_buf, OSD_PACKET_SIZE); + if (r < OSD_PACKET_SIZE) + break; + } + else + { + printf("client %d: unsupported stub opcode: %lu\n", peer_fd, op.hdr.opcode); + break; + } + } + free(buf); +} + +static int read_blocking(int fd, void *read_buf, size_t remaining) +{ + size_t done = 0; + while (done < remaining) + { + size_t r = read(fd, read_buf, remaining-done); + if (r <= 0) + { + if (!errno) + { + // EOF + return done; + } + else if (errno != EAGAIN && errno != EPIPE) + { + perror("read"); + exit(1); + } + continue; + } + done += r; + read_buf += r; + } + return done; +} + +static int write_blocking(int fd, void *write_buf, size_t remaining) +{ + size_t done = 0; + while (done < remaining) + { + size_t r = write(fd, write_buf, remaining-done); + if (r < 0) + { + if (errno != EAGAIN && errno != EPIPE) + { + perror("write"); + exit(1); + } + continue; + } + done += r; + write_buf += r; + } + return done; +}