forked from vitalif/vitastor
Support zero-copy send in fio_sec_osd to allow testing it
Prelimilary results: - CPU usage drops significantly. For example, in T1Q8 128K write test against stub_uring_osd with 10G network and Athlon X4 860k CPU it drops from 100% to 30% - Latency becomes slightly worse. In T1Q1 4K write test in the same environment latency increases from 56 to 63 us. - Small write throughput also becomes slightly worse. In T1Q128 4K write test against stub iops decreases from 138k to ~110k (unstable, fluctuates 100k..120k). Note that this is without io_uring, of course.separate-data-connections
parent
e9d2f79aa7
commit
b9f5c2a823
|
@ -33,12 +33,18 @@
|
|||
#include "osd_ops.h"
|
||||
#include "fio_headers.h"
|
||||
|
||||
struct op_buf_t
|
||||
{
|
||||
osd_any_op_t buf;
|
||||
io_u* fio_op;
|
||||
};
|
||||
|
||||
struct sec_data
|
||||
{
|
||||
int connect_fd;
|
||||
/* block_size = 1 << block_order (128KB by default) */
|
||||
uint64_t block_order = 17, block_size = 1 << 17;
|
||||
std::unordered_map<uint64_t, io_u*> queue;
|
||||
std::unordered_map<uint64_t, op_buf_t*> queue;
|
||||
bool last_sync = false;
|
||||
/* The list of completed io_u structs. */
|
||||
std::vector<io_u*> completed;
|
||||
|
@ -53,6 +59,7 @@ struct sec_options
|
|||
int single_primary = 0;
|
||||
int trace = 0;
|
||||
int block_order = 17;
|
||||
int zerocopy_send = 0;
|
||||
};
|
||||
|
||||
static struct fio_option options[] = {
|
||||
|
@ -103,6 +110,16 @@ static struct fio_option options[] = {
|
|||
.category = FIO_OPT_C_ENGINE,
|
||||
.group = FIO_OPT_G_FILENAME,
|
||||
},
|
||||
{
|
||||
.name = "zerocopy_send",
|
||||
.lname = "Use zero-copy send",
|
||||
.type = FIO_OPT_BOOL,
|
||||
.off1 = offsetof(struct sec_options, zerocopy_send),
|
||||
.help = "Use zero-copy send (MSG_ZEROCOPY)",
|
||||
.def = "0",
|
||||
.category = FIO_OPT_C_ENGINE,
|
||||
.group = FIO_OPT_G_FILENAME,
|
||||
},
|
||||
{
|
||||
.name = NULL,
|
||||
},
|
||||
|
@ -173,6 +190,14 @@ static int sec_init(struct thread_data *td)
|
|||
}
|
||||
int one = 1;
|
||||
setsockopt(bsd->connect_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
||||
if (o->zerocopy_send)
|
||||
{
|
||||
if (setsockopt(bsd->connect_fd, SOL_SOCKET, SO_ZEROCOPY, &one, sizeof(one)) < 0)
|
||||
{
|
||||
perror("setsockopt zerocopy");
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: read config (block size) from OSD
|
||||
|
||||
|
@ -193,7 +218,9 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
|
|||
}
|
||||
|
||||
io->engine_data = bsd;
|
||||
osd_any_op_t op = { 0 };
|
||||
op_buf_t *op_buf = new op_buf_t;
|
||||
op_buf->fio_op = io;
|
||||
osd_any_op_t &op = op_buf->buf;
|
||||
|
||||
op.hdr.magic = SECONDARY_OSD_OP_MAGIC;
|
||||
op.hdr.id = n;
|
||||
|
@ -269,19 +296,18 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
|
|||
io->error = 0;
|
||||
bsd->inflight++;
|
||||
bsd->op_n++;
|
||||
bsd->queue[n] = io;
|
||||
bsd->queue[n] = op_buf;
|
||||
|
||||
iovec iov[2] = { { .iov_base = op.buf, .iov_len = OSD_PACKET_SIZE } };
|
||||
int iovcnt = 1, wtotal = OSD_PACKET_SIZE;
|
||||
if (io->ddir == DDIR_WRITE)
|
||||
{
|
||||
iov[1] = { .iov_base = io->xfer_buf, .iov_len = io->xfer_buflen };
|
||||
iov[iovcnt++] = { .iov_base = io->xfer_buf, .iov_len = io->xfer_buflen };
|
||||
wtotal += io->xfer_buflen;
|
||||
iovcnt++;
|
||||
}
|
||||
if (writev_blocking(bsd->connect_fd, iov, iovcnt) != wtotal)
|
||||
if (sendv_blocking(bsd->connect_fd, iov, iovcnt, opt->zerocopy_send ? MSG_ZEROCOPY : 0) != wtotal)
|
||||
{
|
||||
perror("writev");
|
||||
perror("sendmsg");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
|
@ -310,7 +336,8 @@ static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int
|
|||
fprintf(stderr, "bad reply: op id %lx missing in local queue\n", reply.hdr.id);
|
||||
exit(1);
|
||||
}
|
||||
io_u* io = it->second;
|
||||
io_u* io = it->second->fio_op;
|
||||
delete it->second;
|
||||
bsd->queue.erase(it);
|
||||
if (io->ddir == DDIR_READ)
|
||||
{
|
||||
|
|
|
@ -4,6 +4,8 @@
|
|||
#include <errno.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/socket.h>
|
||||
|
||||
#include "rw_blocking.h"
|
||||
|
||||
|
@ -123,3 +125,41 @@ int writev_blocking(int fd, iovec *iov, int iovcnt)
|
|||
}
|
||||
return done;
|
||||
}
|
||||
|
||||
int sendv_blocking(int fd, iovec *iov, int iovcnt, int flags)
|
||||
{
|
||||
struct msghdr msg = { 0 };
|
||||
int v = 0;
|
||||
int done = 0;
|
||||
while (v < iovcnt)
|
||||
{
|
||||
msg.msg_iov = iov+v;
|
||||
msg.msg_iovlen = iovcnt-v;
|
||||
ssize_t r = sendmsg(fd, &msg, flags);
|
||||
if (r < 0)
|
||||
{
|
||||
if (errno != EAGAIN && errno != EPIPE)
|
||||
{
|
||||
perror("sendmsg");
|
||||
exit(1);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
done += r;
|
||||
while (v < iovcnt)
|
||||
{
|
||||
if (iov[v].iov_len > r)
|
||||
{
|
||||
iov[v].iov_len -= r;
|
||||
iov[v].iov_base += r;
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
r -= iov[v].iov_len;
|
||||
v++;
|
||||
}
|
||||
}
|
||||
}
|
||||
return done;
|
||||
}
|
||||
|
|
|
@ -10,3 +10,4 @@ int read_blocking(int fd, void *read_buf, size_t remaining);
|
|||
int write_blocking(int fd, void *write_buf, size_t remaining);
|
||||
int readv_blocking(int fd, iovec *iov, int iovcnt);
|
||||
int writev_blocking(int fd, iovec *iov, int iovcnt);
|
||||
int sendv_blocking(int fd, iovec *iov, int iovcnt, int flags);
|
||||
|
|
Loading…
Reference in New Issue