From c2de733e35955186e7e77f96977f458e287030a7 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 17 Nov 2019 21:39:30 +0300 Subject: [PATCH] Copy io_uring_prep_* to my_uring_prep_* so they do not clear user_data --- blockstore_flush.cpp | 14 +++--- blockstore_init.cpp | 6 +-- blockstore_journal.cpp | 2 +- blockstore_read.cpp | 2 +- blockstore_sync.cpp | 6 +-- blockstore_write.cpp | 4 +- ringloop.cpp | 10 ---- ringloop.h | 103 ++++++++++++++++++++++++++++++++++++++++- test_blockstore.cpp | 12 +++-- 9 files changed, 128 insertions(+), 31 deletions(-) diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index 6e0e48788..11968cbe0 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -168,7 +168,7 @@ resume_0: v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(512, submit_len) }); data->iov = (struct iovec){ v.back().buf, (size_t)submit_len }; data->callback = simple_callback; - io_uring_prep_readv( + my_uring_prep_readv( sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + dirty_it->second.location + offset ); wait_count++; @@ -250,7 +250,7 @@ resume_0: meta_it->second.state = 1; wait_count--; }; - io_uring_prep_readv( + my_uring_prep_readv( sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector ); wait_count++; @@ -267,7 +267,7 @@ resume_0: await_sqe(4); data->iov = (struct iovec){ it->buf, (size_t)it->len }; data->callback = simple_callback; - io_uring_prep_writev( + my_uring_prep_writev( sqe, bs->data_fd, &data->iov, 1, bs->data_offset + clean_loc + it->offset ); wait_count++; @@ -289,7 +289,7 @@ resume_0: await_sqe(6); data->iov = (struct iovec){ meta_it->second.buf, 512 }; data->callback = simple_callback; - io_uring_prep_writev( + my_uring_prep_writev( sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector ); wait_count++; @@ -325,13 +325,13 @@ resume_0: // Sync batch is ready. Do it. await_sqe(9); data->callback = simple_callback; - io_uring_prep_fsync(sqe, bs->data_fd, 0); + my_uring_prep_fsync(sqe, bs->data_fd, 0); wait_count++; if (bs->meta_fd != bs->data_fd) { await_sqe(10); data->callback = simple_callback; - io_uring_prep_fsync(sqe, bs->meta_fd, 0); + my_uring_prep_fsync(sqe, bs->meta_fd, 0); wait_count++; } wait_state = 11; @@ -423,7 +423,7 @@ resume_0: }; ((journal_entry_start*)flusher->journal_superblock)->crc32 = je_crc32((journal_entry*)flusher->journal_superblock); data->iov = (struct iovec){ flusher->journal_superblock, 512 }; - io_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset); + my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset); wait_count++; wait_state = 13; resume_13: diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 9ce8fd873..5c40ff0a1 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -43,7 +43,7 @@ int blockstore_init_meta::loop() bs->meta_len - metadata_read > bs->metadata_buf_size ? bs->metadata_buf_size : bs->meta_len - metadata_read, }; data->callback = [this](ring_data_t *data) { handle_event(data); }; - io_uring_prep_readv(sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + metadata_read); + my_uring_prep_readv(sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + metadata_read); bs->ringloop->submit(); submitted = (prev == 1 ? 2 : 1); prev = submitted; @@ -192,7 +192,7 @@ int blockstore_init_journal::loop() struct ring_data_t *data = ((ring_data_t*)sqe->user_data); data->iov = { journal_buffer, 512 }; data->callback = [this](ring_data_t *data) { handle_event(data); }; - io_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset); + my_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset); bs->ringloop->submit(); step = 1; } @@ -225,7 +225,7 @@ int blockstore_init_journal::loop() end - journal_pos < JOURNAL_BUFFER_SIZE ? end - journal_pos : JOURNAL_BUFFER_SIZE, }; data->callback = [this](ring_data_t *data) { handle_event(data); }; - io_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + journal_pos); + my_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + journal_pos); bs->ringloop->submit(); submitted = done_buf == 1 ? 2 : 1; } diff --git a/blockstore_journal.cpp b/blockstore_journal.cpp index 876850056..fec6f2bec 100644 --- a/blockstore_journal.cpp +++ b/blockstore_journal.cpp @@ -53,7 +53,7 @@ void prepare_journal_sector_write(journal_t & journal, io_uring_sqe *sqe, std::f ring_data_t *data = ((ring_data_t*)sqe->user_data); data->iov = (struct iovec){ journal.sector_buf + 512*journal.cur_sector, 512 }; data->callback = cb; - io_uring_prep_writev( + my_uring_prep_writev( sqe, journal.fd, &data->iov, 1, journal.offset + journal.sector_info[journal.cur_sector].offset ); } diff --git a/blockstore_read.cpp b/blockstore_read.cpp index 61b578300..65cf82af6 100644 --- a/blockstore_read.cpp +++ b/blockstore_read.cpp @@ -25,7 +25,7 @@ int blockstore::fulfill_read_push(blockstore_operation *op, uint64_t &fulfilled, }; // FIXME: use simple std::vector instead of map for read_vec op->read_vec[cur_start] = data->iov; - io_uring_prep_readv( + my_uring_prep_readv( sqe, IS_JOURNAL(item_state) ? journal.fd : data_fd, &data->iov, 1, diff --git a/blockstore_sync.cpp b/blockstore_sync.cpp index 1a0f01239..1edd49e68 100644 --- a/blockstore_sync.cpp +++ b/blockstore_sync.cpp @@ -42,7 +42,7 @@ int blockstore::continue_sync(blockstore_operation *op) { // No big writes, just fsync the journal BS_SUBMIT_GET_SQE(sqe, data); - io_uring_prep_fsync(sqe, journal.fd, 0); + my_uring_prep_fsync(sqe, journal.fd, 0); data->callback = cb; op->pending_ops = 1; op->sync_state = SYNC_JOURNAL_SYNC_SENT; @@ -51,7 +51,7 @@ int blockstore::continue_sync(blockstore_operation *op) { // 1st step: fsync data BS_SUBMIT_GET_SQE(sqe, data); - io_uring_prep_fsync(sqe, data_fd, 0); + my_uring_prep_fsync(sqe, data_fd, 0); data->callback = cb; op->pending_ops = 1; op->sync_state = SYNC_DATA_SYNC_SENT; @@ -96,7 +96,7 @@ int blockstore::continue_sync(blockstore_operation *op) } op->max_used_journal_sector = 1 + journal.cur_sector; // ... And a journal fsync - io_uring_prep_fsync(sqe[s], journal.fd, 0); + my_uring_prep_fsync(sqe[s], journal.fd, 0); struct ring_data_t *data = ((ring_data_t*)sqe[s]->user_data); data->callback = cb; op->pending_ops = 1 + s; diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 6899bb632..a455e4f11 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -97,7 +97,7 @@ int blockstore::dequeue_write(blockstore_operation *op) op->iov_zerofill[0] = (struct iovec){ op->buf, op->len }; } data->callback = cb; - io_uring_prep_writev( + my_uring_prep_writev( sqe, data_fd, op->iov_zerofill, vcnt, data_offset + (loc << block_order) ); op->pending_ops = 1; @@ -137,7 +137,7 @@ int blockstore::dequeue_write(blockstore_operation *op) journal.next_free = (journal.next_free + op->len) < journal.len ? journal.next_free : 512; data2->iov = (struct iovec){ op->buf, op->len }; data2->callback = cb; - io_uring_prep_writev( + my_uring_prep_writev( sqe2, journal.fd, &data2->iov, 1, journal.offset + journal.next_free ); dirty_it->second.location = journal.next_free; diff --git a/ringloop.cpp b/ringloop.cpp index 7116b2d80..02145710d 100644 --- a/ringloop.cpp +++ b/ringloop.cpp @@ -20,16 +20,6 @@ ring_loop_t::~ring_loop_t() io_uring_queue_exit(&ring); } -struct io_uring_sqe* ring_loop_t::get_sqe() -{ - struct io_uring_sqe* sqe = io_uring_get_sqe(&ring); - if (sqe) - { - io_uring_sqe_set_data(sqe, ring_data + (sqe - ring.sq.sqes)); - } - return sqe; -} - int ring_loop_t::register_consumer(ring_consumer_t & consumer) { consumer.number = consumers.size(); diff --git a/ringloop.h b/ringloop.h index ef8c21365..5b9ad93f6 100644 --- a/ringloop.h +++ b/ringloop.h @@ -10,6 +10,99 @@ #include #include +static inline void my_uring_prep_rw(int op, struct io_uring_sqe *sqe, int fd, const void *addr, unsigned len, off_t offset) +{ + sqe->opcode = op; + sqe->flags = 0; + sqe->ioprio = 0; + sqe->fd = fd; + sqe->off = offset; + sqe->addr = (unsigned long) addr; + sqe->len = len; + sqe->rw_flags = 0; + sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0; +} + +static inline void my_uring_prep_readv(struct io_uring_sqe *sqe, int fd, const struct iovec *iovecs, unsigned nr_vecs, off_t offset) +{ + my_uring_prep_rw(IORING_OP_READV, sqe, fd, iovecs, nr_vecs, offset); +} + +static inline void my_uring_prep_read_fixed(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, off_t offset, int buf_index) +{ + my_uring_prep_rw(IORING_OP_READ_FIXED, sqe, fd, buf, nbytes, offset); + sqe->buf_index = buf_index; +} + +static inline void my_uring_prep_writev(struct io_uring_sqe *sqe, int fd, const struct iovec *iovecs, unsigned nr_vecs, off_t offset) +{ + my_uring_prep_rw(IORING_OP_WRITEV, sqe, fd, iovecs, nr_vecs, offset); +} + +static inline void my_uring_prep_write_fixed(struct io_uring_sqe *sqe, int fd, const void *buf, unsigned nbytes, off_t offset, int buf_index) +{ + my_uring_prep_rw(IORING_OP_WRITE_FIXED, sqe, fd, buf, nbytes, offset); + sqe->buf_index = buf_index; +} + +static inline void my_uring_prep_recvmsg(struct io_uring_sqe *sqe, int fd, struct msghdr *msg, unsigned flags) +{ + my_uring_prep_rw(IORING_OP_RECVMSG, sqe, fd, msg, 1, 0); + sqe->msg_flags = flags; +} + +static inline void my_uring_prep_sendmsg(struct io_uring_sqe *sqe, int fd, const struct msghdr *msg, unsigned flags) +{ + my_uring_prep_rw(IORING_OP_SENDMSG, sqe, fd, msg, 1, 0); + sqe->msg_flags = flags; +} + +static inline void my_uring_prep_poll_add(struct io_uring_sqe *sqe, int fd, short poll_mask) +{ + my_uring_prep_rw(IORING_OP_POLL_ADD, sqe, fd, NULL, 0, 0); + sqe->poll_events = poll_mask; +} + +static inline void my_uring_prep_poll_remove(struct io_uring_sqe *sqe, void *user_data) +{ + my_uring_prep_rw(IORING_OP_POLL_REMOVE, sqe, 0, user_data, 0, 0); +} + +static inline void my_uring_prep_fsync(struct io_uring_sqe *sqe, int fd, unsigned fsync_flags) +{ + my_uring_prep_rw(IORING_OP_FSYNC, sqe, fd, NULL, 0, 0); + sqe->fsync_flags = fsync_flags; +} + +static inline void my_uring_prep_nop(struct io_uring_sqe *sqe) +{ + my_uring_prep_rw(IORING_OP_NOP, sqe, 0, NULL, 0, 0); +} + +static inline void my_uring_prep_timeout(struct io_uring_sqe *sqe, struct __kernel_timespec *ts, unsigned count, unsigned flags) +{ + my_uring_prep_rw(IORING_OP_TIMEOUT, sqe, 0, ts, 1, count); + sqe->timeout_flags = flags; +} + +static inline void my_uring_prep_timeout_remove(struct io_uring_sqe *sqe, __u64 user_data, unsigned flags) +{ + my_uring_prep_rw(IORING_OP_TIMEOUT_REMOVE, sqe, 0, (void *)user_data, 0, 0); + sqe->timeout_flags = flags; +} + +static inline void my_uring_prep_accept(struct io_uring_sqe *sqe, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags) +{ + my_uring_prep_rw(IORING_OP_ACCEPT, sqe, fd, addr, 0, (__u64) addrlen); + sqe->accept_flags = flags; +} + +static inline void my_uring_prep_cancel(struct io_uring_sqe *sqe, void *user_data, int flags) +{ + my_uring_prep_rw(IORING_OP_ASYNC_CANCEL, sqe, 0, user_data, 0, 0); + sqe->cancel_flags = flags; +} + struct ring_data_t { struct iovec iov; // for single-entry read/write operations @@ -31,7 +124,15 @@ public: struct io_uring ring; ring_loop_t(int qd); ~ring_loop_t(); - struct io_uring_sqe* get_sqe(); + inline struct io_uring_sqe* get_sqe() + { + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring); + if (sqe) + { + io_uring_sqe_set_data(sqe, ring_data + (sqe - ring.sq.sqes)); + } + return sqe; + } int register_consumer(ring_consumer_t & consumer); void unregister_consumer(int number); void loop(bool sleep); diff --git a/test_blockstore.cpp b/test_blockstore.cpp index 4871a3b3b..1b6d12e74 100644 --- a/test_blockstore.cpp +++ b/test_blockstore.cpp @@ -51,11 +51,17 @@ public: return; } struct ring_data_t *data = ((ring_data_t*)sqe->user_data); - io_uring_prep_poll_add(sqe, timerfd, POLLIN); + my_uring_prep_poll_add(sqe, timerfd, POLLIN); data->callback = [&](ring_data_t *data) { + if (data->res < 0) + { + throw std::runtime_error(std::string("waiting for timer failed: ") + strerror(-data->res)); + } + uint64_t n; + read(timerfd, &n, 8); wait_state = 0; - printf("tick\n"); + printf("tick 1s\n"); }; wait_state = 1; ringloop->submit(); @@ -69,7 +75,7 @@ int main(int narg, char *args[]) config["journal_device"] = "./test_journal.bin"; config["data_device"] = "./test_data.bin"; ring_loop_t *ringloop = new ring_loop_t(512); - // print "tick" each second + // print "tick" every second timerfd_interval tick_tfd(ringloop, 1); while (true) {