diff --git a/src/messenger.h b/src/messenger.h index 6c455f67..9570e24c 100644 --- a/src/messenger.h +++ b/src/messenger.h @@ -132,6 +132,7 @@ protected: uint64_t rdma_max_msg = 0; #endif + bool has_send_loop = false; std::vector read_ready_clients; std::vector write_ready_clients; // We don't use ringloop->set_immediate here because we may have no ringloop in client :) @@ -159,6 +160,7 @@ public: std::function check_config_hook; void read_requests(); void send_replies(); + bool can_send(); void accept_connections(int listen_fd); ~osd_messenger_t(); diff --git a/src/msgr_send.cpp b/src/msgr_send.cpp index c721d86c..33c994c5 100644 --- a/src/msgr_send.cpp +++ b/src/msgr_send.cpp @@ -111,22 +111,28 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) return; } #endif - if (!ringloop) + if (!ringloop && !has_send_loop) { - // FIXME: It's worse because it doesn't allow batching + // "Send loop" should be used, like in QEMU driver, or performance will suffer + // due to a large number of sendmsg() syscalls where they can be avoided. + // "Send loop" increases 4k T1Q128 randread from ~55k iops to ~90k on a 1 OSD. + // The difference is smaller when there are more OSDs though... while (cl->outbox.size()) { try_send(cl); } } - else if (cl->write_msg.msg_iovlen > 0 || !try_send(cl)) + else if (cl->write_msg.msg_iovlen > 0 || has_send_loop || !try_send(cl)) { if (cl->write_state == 0) { cl->write_state = CL_WRITE_READY; write_ready_clients.push_back(cur_op->peer_fd); } - ringloop->wakeup(); + if (ringloop) + { + ringloop->wakeup(); + } } } @@ -218,6 +224,12 @@ void osd_messenger_t::send_replies() write_ready_clients.clear(); } +bool osd_messenger_t::can_send() +{ + has_send_loop = true; + return write_ready_clients.size() > 0; +} + void osd_messenger_t::handle_send(int result, osd_client_t *cl) { cl->write_msg.msg_iovlen = 0; diff --git a/src/qemu_driver.c b/src/qemu_driver.c index f73ab38f..d6b48f26 100644 --- a/src/qemu_driver.c +++ b/src/qemu_driver.c @@ -77,6 +77,7 @@ typedef struct VitastorClient AioContext *ctx; VitastorFdData **fds; int fd_count, fd_alloc; + int bh_send_all_scheduled; uint64_t last_bitmap_inode, last_bitmap_offset, last_bitmap_len; uint32_t last_bitmap_granularity; @@ -106,6 +107,14 @@ typedef struct VitastorRPC #endif } VitastorRPC; +#if QEMU_VERSION_MAJOR == 2 && QEMU_VERSION_MINOR < 8 +typedef struct VitastorBH +{ + VitastorClient *cli; + QEMUBH *bh; +} VitastorBH; +#endif + static void vitastor_co_init_task(BlockDriverState *bs, VitastorRPC *task); static void vitastor_co_generic_cb(void *opaque, long retval); static void vitastor_co_read_cb(void *opaque, long retval, uint64_t version); @@ -223,6 +232,49 @@ out: return; } +#if defined VITASTOR_C_API_VERSION && VITASTOR_C_API_VERSION >= 2 +static void vitastor_bh_send_all(void *opaque) +{ +#if QEMU_VERSION_MAJOR == 2 && QEMU_VERSION_MINOR < 8 + VitastorBH *vbh = opaque; + VitastorClient *client = vbh->cli; +#else + VitastorClient *client = opaque; +#endif + qemu_mutex_lock(&client->mutex); + client->bh_send_all_scheduled = 0; + vitastor_c_qemu_send_all(client->proxy); + qemu_mutex_unlock(&client->mutex); +#if QEMU_VERSION_MAJOR == 2 && QEMU_VERSION_MINOR < 8 + qemu_bh_delete(vbh->bh); + free(vbh); +#endif +} +#endif + +static void vitastor_schedule_send_all(VitastorClient *client) +{ +#if defined VITASTOR_C_API_VERSION && VITASTOR_C_API_VERSION >= 2 + if (!client->bh_send_all_scheduled) + { + client->bh_send_all_scheduled = 1; +#if QEMU_VERSION_MAJOR > 4 || QEMU_VERSION_MAJOR == 4 && QEMU_VERSION_MINOR >= 2 + replay_bh_schedule_oneshot_event(client->ctx, vitastor_bh_send_all, client); +#elif QEMU_VERSION_MAJOR >= 3 || QEMU_VERSION_MAJOR == 2 && QEMU_VERSION_MINOR >= 8 + aio_bh_schedule_oneshot(client->ctx, vitastor_bh_send_all, client); +#elif QEMU_VERSION_MAJOR >= 2 + VitastorBH *vbh = (VitastorBH*)malloc(sizeof(VitastorBH)); + vbh->cli = client; + vbh->bh = aio_bh_new(bdrv_get_aio_context(task->bs), vitastor_bh_send_all, vbh); + qemu_bh_schedule(vbh->bh); +#else + client->bh_send_all_scheduled = 0; + vitastor_c_qemu_send_all(client->proxy); +#endif + } +#endif +} + static void coroutine_fn vitastor_co_get_metadata(VitastorRPC *task) { BlockDriverState *bs = task->bs; @@ -231,6 +283,7 @@ static void coroutine_fn vitastor_co_get_metadata(VitastorRPC *task) qemu_mutex_lock(&client->mutex); vitastor_c_watch_inode(client->proxy, client->image, vitastor_co_generic_cb, task); + vitastor_schedule_send_all(client); qemu_mutex_unlock(&client->mutex); while (!task->complete) @@ -244,6 +297,7 @@ static void vitastor_aio_fd_read(void *fddv) VitastorFdData *fdd = (VitastorFdData*)fddv; qemu_mutex_lock(&fdd->cli->mutex); fdd->fd_read(fdd->opaque); + vitastor_schedule_send_all(fdd->cli); qemu_mutex_unlock(&fdd->cli->mutex); } @@ -252,6 +306,7 @@ static void vitastor_aio_fd_write(void *fddv) VitastorFdData *fdd = (VitastorFdData*)fddv; qemu_mutex_lock(&fdd->cli->mutex); fdd->fd_write(fdd->opaque); + vitastor_schedule_send_all(fdd->cli); qemu_mutex_unlock(&fdd->cli->mutex); } @@ -593,6 +648,7 @@ static int coroutine_fn vitastor_co_preadv(BlockDriverState *bs, uint64_t inode = client->watch ? vitastor_c_inode_get_num(client->watch) : client->inode; qemu_mutex_lock(&client->mutex); vitastor_c_read(client->proxy, inode, offset, bytes, iov->iov, iov->niov, vitastor_co_read_cb, &task); + vitastor_schedule_send_all(client); qemu_mutex_unlock(&client->mutex); while (!task.complete) @@ -626,6 +682,7 @@ static int coroutine_fn vitastor_co_pwritev(BlockDriverState *bs, uint64_t inode = client->watch ? vitastor_c_inode_get_num(client->watch) : client->inode; qemu_mutex_lock(&client->mutex); vitastor_c_write(client->proxy, inode, offset, bytes, 0, iov->iov, iov->niov, vitastor_co_generic_cb, &task); + vitastor_schedule_send_all(client); qemu_mutex_unlock(&client->mutex); while (!task.complete) @@ -704,6 +761,7 @@ static int coroutine_fn vitastor_co_block_status( task.bitmap = client->last_bitmap = NULL; qemu_mutex_lock(&client->mutex); vitastor_c_read_bitmap(client->proxy, task.inode, task.offset, task.len, !client->skip_parents, vitastor_co_read_bitmap_cb, &task); + vitastor_schedule_send_all(client); qemu_mutex_unlock(&client->mutex); while (!task.complete) { @@ -790,6 +848,7 @@ static int coroutine_fn vitastor_co_flush(BlockDriverState *bs) qemu_mutex_lock(&client->mutex); vitastor_c_sync(client->proxy, vitastor_co_generic_cb, &task); + vitastor_schedule_send_all(client); qemu_mutex_unlock(&client->mutex); while (!task.complete) diff --git a/src/vitastor_c.cpp b/src/vitastor_c.cpp index abc19b78..f9d476d0 100644 --- a/src/vitastor_c.cpp +++ b/src/vitastor_c.cpp @@ -166,6 +166,14 @@ void vitastor_c_uring_wait_events(vitastor_c *client) client->ringloop->wait(); } +void vitastor_c_qemu_send_all(vitastor_c *client) +{ + while (client->cli->msgr.can_send()) + { + client->cli->msgr.send_replies(); + } +} + void vitastor_c_read(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len, struct iovec *iov, int iovcnt, VitastorReadHandler cb, void *opaque) { diff --git a/src/vitastor_c.h b/src/vitastor_c.h index f8cd5be0..02074789 100644 --- a/src/vitastor_c.h +++ b/src/vitastor_c.h @@ -7,7 +7,7 @@ #define VITASTOR_QEMU_PROXY_H // C API wrapper version -#define VITASTOR_C_API_VERSION 1 +#define VITASTOR_C_API_VERSION 2 #ifndef POOL_ID_BITS #define POOL_ID_BITS 16 @@ -42,6 +42,7 @@ int vitastor_c_is_ready(vitastor_c *client); void vitastor_c_uring_wait_ready(vitastor_c *client); void vitastor_c_uring_handle_events(vitastor_c *client); void vitastor_c_uring_wait_events(vitastor_c *client); +void vitastor_c_qemu_send_all(vitastor_c *client); void vitastor_c_read(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len, struct iovec *iov, int iovcnt, VitastorReadHandler cb, void *opaque); void vitastor_c_write(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len, uint64_t check_version,