From b9f5c2a82363c55feaddbd18ca97efa60cc296ae Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 27 Dec 2021 02:05:29 +0300 Subject: [PATCH] Support zero-copy send in fio_sec_osd to allow testing it Prelimilary results: - CPU usage drops significantly. For example, in T1Q8 128K write test against stub_uring_osd with 10G network and Athlon X4 860k CPU it drops from 100% to 30% - Latency becomes slightly worse. In T1Q1 4K write test in the same environment latency increases from 56 to 63 us. - Small write throughput also becomes slightly worse. In T1Q128 4K write test against stub iops decreases from 138k to ~110k (unstable, fluctuates 100k..120k). Note that this is without io_uring, of course. --- src/fio_sec_osd.cpp | 43 +++++++++++++++++++++++++++++++++++-------- src/rw_blocking.cpp | 40 ++++++++++++++++++++++++++++++++++++++++ src/rw_blocking.h | 1 + 3 files changed, 76 insertions(+), 8 deletions(-) diff --git a/src/fio_sec_osd.cpp b/src/fio_sec_osd.cpp index 5fdcd626..0d0f6c33 100644 --- a/src/fio_sec_osd.cpp +++ b/src/fio_sec_osd.cpp @@ -33,12 +33,18 @@ #include "osd_ops.h" #include "fio_headers.h" +struct op_buf_t +{ + osd_any_op_t buf; + io_u* fio_op; +}; + struct sec_data { int connect_fd; /* block_size = 1 << block_order (128KB by default) */ uint64_t block_order = 17, block_size = 1 << 17; - std::unordered_map queue; + std::unordered_map queue; bool last_sync = false; /* The list of completed io_u structs. */ std::vector completed; @@ -53,6 +59,7 @@ struct sec_options int single_primary = 0; int trace = 0; int block_order = 17; + int zerocopy_send = 0; }; static struct fio_option options[] = { @@ -103,6 +110,16 @@ static struct fio_option options[] = { .category = FIO_OPT_C_ENGINE, .group = FIO_OPT_G_FILENAME, }, + { + .name = "zerocopy_send", + .lname = "Use zero-copy send", + .type = FIO_OPT_BOOL, + .off1 = offsetof(struct sec_options, zerocopy_send), + .help = "Use zero-copy send (MSG_ZEROCOPY)", + .def = "0", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_FILENAME, + }, { .name = NULL, }, @@ -173,6 +190,14 @@ static int sec_init(struct thread_data *td) } int one = 1; setsockopt(bsd->connect_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); + if (o->zerocopy_send) + { + if (setsockopt(bsd->connect_fd, SOL_SOCKET, SO_ZEROCOPY, &one, sizeof(one)) < 0) + { + perror("setsockopt zerocopy"); + return 1; + } + } // FIXME: read config (block size) from OSD @@ -193,7 +218,9 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) } io->engine_data = bsd; - osd_any_op_t op = { 0 }; + op_buf_t *op_buf = new op_buf_t; + op_buf->fio_op = io; + osd_any_op_t &op = op_buf->buf; op.hdr.magic = SECONDARY_OSD_OP_MAGIC; op.hdr.id = n; @@ -269,19 +296,18 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) io->error = 0; bsd->inflight++; bsd->op_n++; - bsd->queue[n] = io; + bsd->queue[n] = op_buf; iovec iov[2] = { { .iov_base = op.buf, .iov_len = OSD_PACKET_SIZE } }; int iovcnt = 1, wtotal = OSD_PACKET_SIZE; if (io->ddir == DDIR_WRITE) { - iov[1] = { .iov_base = io->xfer_buf, .iov_len = io->xfer_buflen }; + iov[iovcnt++] = { .iov_base = io->xfer_buf, .iov_len = io->xfer_buflen }; wtotal += io->xfer_buflen; - iovcnt++; } - if (writev_blocking(bsd->connect_fd, iov, iovcnt) != wtotal) + if (sendv_blocking(bsd->connect_fd, iov, iovcnt, opt->zerocopy_send ? MSG_ZEROCOPY : 0) != wtotal) { - perror("writev"); + perror("sendmsg"); exit(1); } @@ -310,7 +336,8 @@ static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int fprintf(stderr, "bad reply: op id %lx missing in local queue\n", reply.hdr.id); exit(1); } - io_u* io = it->second; + io_u* io = it->second->fio_op; + delete it->second; bsd->queue.erase(it); if (io->ddir == DDIR_READ) { diff --git a/src/rw_blocking.cpp b/src/rw_blocking.cpp index 532bd709..a640e92c 100644 --- a/src/rw_blocking.cpp +++ b/src/rw_blocking.cpp @@ -4,6 +4,8 @@ #include #include #include +#include +#include #include "rw_blocking.h" @@ -123,3 +125,41 @@ int writev_blocking(int fd, iovec *iov, int iovcnt) } return done; } + +int sendv_blocking(int fd, iovec *iov, int iovcnt, int flags) +{ + struct msghdr msg = { 0 }; + int v = 0; + int done = 0; + while (v < iovcnt) + { + msg.msg_iov = iov+v; + msg.msg_iovlen = iovcnt-v; + ssize_t r = sendmsg(fd, &msg, flags); + if (r < 0) + { + if (errno != EAGAIN && errno != EPIPE) + { + perror("sendmsg"); + exit(1); + } + continue; + } + done += r; + while (v < iovcnt) + { + if (iov[v].iov_len > r) + { + iov[v].iov_len -= r; + iov[v].iov_base += r; + break; + } + else + { + r -= iov[v].iov_len; + v++; + } + } + } + return done; +} diff --git a/src/rw_blocking.h b/src/rw_blocking.h index df2eafeb..0e4753ae 100644 --- a/src/rw_blocking.h +++ b/src/rw_blocking.h @@ -10,3 +10,4 @@ int read_blocking(int fd, void *read_buf, size_t remaining); int write_blocking(int fd, void *write_buf, size_t remaining); int readv_blocking(int fd, iovec *iov, int iovcnt); int writev_blocking(int fd, iovec *iov, int iovcnt); +int sendv_blocking(int fd, iovec *iov, int iovcnt, int flags);