diff --git a/Makefile b/Makefile index 9ee9bcd2..4fd0c726 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_impl.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \ blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_flush.o crc32c.o ringloop.o timerfd_interval.o CXXFLAGS := -g -O3 -Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fPIC -fdiagnostics-color=always -all: $(BLOCKSTORE_OBJS) test test_blockstore libfio_blockstore.so osd +all: $(BLOCKSTORE_OBJS) test test_blockstore libfio_blockstore.so osd libfio_sec_osd.so clean: rm -f *.o crc32c.o: crc32c.c @@ -18,3 +18,5 @@ test_allocator: test_allocator.cpp allocator.o g++ $(CXXFLAGS) -o test_allocator test_allocator.cpp allocator.o libfio_blockstore.so: fio_engine.cpp $(BLOCKSTORE_OBJS) g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -luring -o libfio_blockstore.so fio_engine.cpp $(BLOCKSTORE_OBJS) +libfio_sec_osd.so: fio_sec_osd.cpp + g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -luring -o libfio_sec_osd.so fio_sec_osd.cpp diff --git a/fio_sec_osd.cpp b/fio_sec_osd.cpp new file mode 100644 index 00000000..0081d3ce --- /dev/null +++ b/fio_sec_osd.cpp @@ -0,0 +1,356 @@ +// FIO engine to test Blockstore through Secondary OSD interface +// +// Prepare storage like in fio_engine.cpp, then start OSD with ./osd, then test it +// +// Random write: +// +// fio -thread -ioengine=./libfio_sec_osd.so -name=test -bs=4k -direct=1 -fsync=16 -iodepth=16 -rw=randwrite \ +// -host=127.0.0.1 -port=11203 -size=1000M +// +// Linear write: +// +// fio -thread -ioengine=./libfio_sec_osd.so -name=test -bs=128k -direct=1 -fsync=32 -iodepth=32 -rw=write \ +// -host=127.0.0.1 -port=11203 -size=1000M +// +// Random read (run with -iodepth=32 or -iodepth=1): +// +// fio -thread -ioengine=./libfio_sec_osd.so -name=test -bs=4k -direct=1 -iodepth=32 -rw=randread \ +// -host=127.0.0.1 -port=11203 -size=1000M + +#include +#include +#include + +#include +#include + +#include "osd_ops.h" +extern "C" { +#define CONFIG_PWRITEV2 +#include "fio/fio.h" +#include "fio/optgroup.h" +} + +struct sec_data +{ + int connect_fd; + /* block_size = 1 << block_order (128KB by default) */ + uint64_t block_order = 17, block_size = 1 << 17; + std::unordered_map queue; + /* The list of completed io_u structs. */ + std::vector completed; + uint64_t op_n = 0, inflight = 0; +}; + +struct sec_options +{ + int __pad; + char *host = NULL; + int port = 0; +}; + +static struct fio_option options[] = { + { + .name = "host", + .lname = "Test Secondary OSD host", + .type = FIO_OPT_STR_STORE, + .off1 = offsetof(struct sec_options, host), + .help = "Test Secondary OSD host", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_FILENAME, + }, + { + .name = "port", + .lname = "Test Secondary OSD port", + .type = FIO_OPT_INT, + .off1 = offsetof(struct sec_options, port), + .help = "Test Secondary OSD port", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_FILENAME, + }, + { + .name = NULL, + }, +}; + +static int sec_setup(struct thread_data *td) +{ + sec_data *bsd; + //fio_file *f; + //int r; + //int64_t size; + + bsd = new sec_data; + if (!bsd) + { + td_verror(td, errno, "calloc"); + return 1; + } + td->io_ops_data = bsd; + + if (!td->files_index) + { + add_file(td, "bs_sec_osd", 0, 0); + td->o.nr_files = td->o.nr_files ? : 1; + td->o.open_files++; + } + + //f = td->files[0]; + //f->real_file_size = size; + return 0; +} + +static void sec_cleanup(struct thread_data *td) +{ + sec_data *bsd = (sec_data*)td->io_ops_data; + if (bsd) + { + close(bsd->connect_fd); + } +} + +/* Connect to the server from each thread. */ +static int sec_init(struct thread_data *td) +{ + sec_options *o = (sec_options*)td->eo; + sec_data *bsd = (sec_data*)td->io_ops_data; + + struct sockaddr_in addr; + int r; + if ((r = inet_pton(AF_INET, o->host ? o->host : "127.0.0.1", &addr.sin_addr)) != 1) + { + fprintf(stderr, "server address: %s%s\n", o->host ? o->host : "127.0.0.1", r == 0 ? " is not valid" : ": no ipv4 support"); + return 1; + } + addr.sin_family = AF_INET; + addr.sin_port = htons(o->port ? o->port : 11203); + + bsd->connect_fd = socket(AF_INET, SOCK_STREAM, 0); + if (bsd->connect_fd < 0) + { + perror("socket"); + return 1; + } + if (connect(bsd->connect_fd, (sockaddr*)&addr, sizeof(addr)) < 0) + { + perror("connect"); + return 1; + } + + // FIXME: read config (block size) from OSD + + return 0; +} + +/* Begin read or write request. */ +static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) +{ + sec_data *bsd = (sec_data*)td->io_ops_data; + int n = bsd->op_n; + + fio_ro_check(td, io); + + io->engine_data = bsd; + union + { + osd_any_op_t op; + uint8_t op_buf[OSD_OP_PACKET_SIZE] = { 0 }; + }; + + op.hdr.magic = SECONDARY_OSD_OP_MAGIC; + op.hdr.id = n; + switch (io->ddir) + { + case DDIR_READ: + op.hdr.opcode = OSD_OP_SECONDARY_READ; + op.sec_rw.oid = { + .inode = 1, + .stripe = io->offset >> bsd->block_order, + }; + op.sec_rw.version = UINT64_MAX; // last unstable + op.sec_rw.offset = io->offset % bsd->block_size; + op.sec_rw.len = io->xfer_buflen; + break; + case DDIR_WRITE: + op.hdr.opcode = OSD_OP_SECONDARY_WRITE; + op.sec_rw.oid = { + .inode = 1, + .stripe = io->offset >> bsd->block_order, + }; + op.sec_rw.version = 0; // assign automatically + op.sec_rw.offset = io->offset % bsd->block_size; + op.sec_rw.len = io->xfer_buflen; + break; + case DDIR_SYNC: + // Allowed only for testing: sync & stabilize all unstable object versions + op.hdr.opcode = OSD_OP_TEST_SYNC_STAB_ALL; + break; + default: + io->error = EINVAL; + return FIO_Q_COMPLETED; + } + + io->error = 0; + bsd->inflight++; + bsd->op_n++; + bsd->queue[n] = io; + + if (write(bsd->connect_fd, op_buf, OSD_OP_PACKET_SIZE) != OSD_OP_PACKET_SIZE) + { + perror("write"); + exit(1); + } + if (io->ddir == DDIR_WRITE) + { + // Send data + void *send_buf = io->xfer_buf; + size_t remaining = io->xfer_buflen; + while (remaining > 0) + { + size_t r = write(bsd->connect_fd, send_buf, remaining); + if (r < 0) + { + if (r != EAGAIN) + { + perror("write"); + exit(1); + } + continue; + } + remaining -= r; + send_buf += r; + } + } + + if (io->error != 0) + return FIO_Q_COMPLETED; + return FIO_Q_QUEUED; +} + +void read_blocking(int fd, void *read_buf, size_t remaining) +{ + while (remaining > 0) + { + size_t r = read(fd, read_buf, remaining); + if (r <= 0) + { + perror("read"); + exit(1); + } + remaining -= r; + read_buf += r; + } +} + +static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int max, const struct timespec *t) +{ + sec_data *bsd = (sec_data*)td->io_ops_data; + // FIXME timeout, at least poll. Now it's the stupidest implementation possible + union + { + osd_any_reply_t reply; + uint8_t reply_buf[OSD_REPLY_PACKET_SIZE] = { 0 }; + }; + while (bsd->completed.size() < min) + { + read_blocking(bsd->connect_fd, reply_buf, OSD_REPLY_PACKET_SIZE); + if (reply.hdr.magic != SECONDARY_OSD_REPLY_MAGIC) + { + fprintf(stderr, "bad reply: magic = %lx instead of %lx\n", reply.hdr.magic, SECONDARY_OSD_REPLY_MAGIC); + exit(1); + } + auto it = bsd->queue.find(reply.hdr.id); + if (it == bsd->queue.end()) + { + fprintf(stderr, "bad reply: op id %lx missing in local queue\n", reply.hdr.id); + exit(1); + } + io_u* io = it->second; + if (io->ddir == DDIR_READ) + { + if (reply.hdr.retval != io->xfer_buflen) + { + fprintf(stderr, "Short read: retval = %ld instead of %llu\n", reply.hdr.retval, io->xfer_buflen); + exit(1); + } + read_blocking(bsd->connect_fd, io->xfer_buf, io->xfer_buflen); + } + else if (io->ddir == DDIR_WRITE) + { + if (reply.hdr.retval != io->xfer_buflen) + { + fprintf(stderr, "Short write: retval = %ld instead of %llu\n", reply.hdr.retval, io->xfer_buflen); + exit(1); + } + } + else if (io->ddir == DDIR_SYNC) + { + if (reply.hdr.retval != 0) + { + fprintf(stderr, "Sync failed: retval = %ld\n", reply.hdr.retval); + exit(1); + } + } + bsd->completed.push_back(io); + } + return bsd->completed.size(); +} + +static struct io_u *sec_event(struct thread_data *td, int event) +{ + sec_data *bsd = (sec_data*)td->io_ops_data; + if (bsd->completed.size() == 0) + return NULL; + /* FIXME We ignore the event number and assume fio calls us exactly once for [0..nr_events-1] */ + struct io_u *ev = bsd->completed.back(); + bsd->completed.pop_back(); + return ev; +} + +static int sec_io_u_init(struct thread_data *td, struct io_u *io) +{ + io->engine_data = NULL; + return 0; +} + +static void sec_io_u_free(struct thread_data *td, struct io_u *io) +{ +} + +static int sec_open_file(struct thread_data *td, struct fio_file *f) +{ + return 0; +} + +static int sec_invalidate(struct thread_data *td, struct fio_file *f) +{ + return 0; +} + +struct ioengine_ops ioengine = { + .name = "microceph_secondary_osd", + .version = FIO_IOOPS_VERSION, + .flags = FIO_MEMALIGN | FIO_DISKLESSIO | FIO_NOEXTEND, + .setup = sec_setup, + .init = sec_init, + .queue = sec_queue, + .getevents = sec_getevents, + .event = sec_event, + .cleanup = sec_cleanup, + .open_file = sec_open_file, + .invalidate = sec_invalidate, + .io_u_init = sec_io_u_init, + .io_u_free = sec_io_u_free, + .option_struct_size = sizeof(struct sec_options), + .options = options, +}; + +static void fio_init fio_sec_register(void) +{ + register_ioengine(&ioengine); +} + +static void fio_exit fio_sec_unregister(void) +{ + unregister_ioengine(&ioengine); +}