1
0
Fork 0

Compare commits

...

1 Commits

Author SHA1 Message Date
Vitaliy Filippov c3ec6dd3b9 Add postponed send loop to QEMU driver 2023-07-03 02:29:01 +03:00
5 changed files with 87 additions and 5 deletions

View File

@ -132,6 +132,7 @@ protected:
uint64_t rdma_max_msg = 0;
#endif
bool has_send_loop = false;
std::vector<int> read_ready_clients;
std::vector<int> write_ready_clients;
// We don't use ringloop->set_immediate here because we may have no ringloop in client :)
@ -159,6 +160,7 @@ public:
std::function<bool(osd_client_t*, json11::Json)> check_config_hook;
void read_requests();
void send_replies();
bool can_send();
void accept_connections(int listen_fd);
~osd_messenger_t();

View File

@ -111,22 +111,28 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
return;
}
#endif
if (!ringloop)
if (!ringloop && !has_send_loop)
{
// FIXME: It's worse because it doesn't allow batching
// "Send loop" should be used, like in QEMU driver, or performance will suffer
// due to a large number of sendmsg() syscalls where they can be avoided.
// "Send loop" increases 4k T1Q128 randread from ~55k iops to ~90k on a 1 OSD.
// The difference is smaller when there are more OSDs though...
while (cl->outbox.size())
{
try_send(cl);
}
}
else if (cl->write_msg.msg_iovlen > 0 || !try_send(cl))
else if (cl->write_msg.msg_iovlen > 0 || has_send_loop || !try_send(cl))
{
if (cl->write_state == 0)
{
cl->write_state = CL_WRITE_READY;
write_ready_clients.push_back(cur_op->peer_fd);
}
ringloop->wakeup();
if (ringloop)
{
ringloop->wakeup();
}
}
}
@ -218,6 +224,12 @@ void osd_messenger_t::send_replies()
write_ready_clients.clear();
}
bool osd_messenger_t::can_send()
{
has_send_loop = true;
return write_ready_clients.size() > 0;
}
void osd_messenger_t::handle_send(int result, osd_client_t *cl)
{
cl->write_msg.msg_iovlen = 0;

View File

@ -77,6 +77,7 @@ typedef struct VitastorClient
AioContext *ctx;
VitastorFdData **fds;
int fd_count, fd_alloc;
int bh_send_all_scheduled;
uint64_t last_bitmap_inode, last_bitmap_offset, last_bitmap_len;
uint32_t last_bitmap_granularity;
@ -106,6 +107,14 @@ typedef struct VitastorRPC
#endif
} VitastorRPC;
#if QEMU_VERSION_MAJOR == 2 && QEMU_VERSION_MINOR < 8
typedef struct VitastorBH
{
VitastorClient *cli;
QEMUBH *bh;
} VitastorBH;
#endif
static void vitastor_co_init_task(BlockDriverState *bs, VitastorRPC *task);
static void vitastor_co_generic_cb(void *opaque, long retval);
static void vitastor_co_read_cb(void *opaque, long retval, uint64_t version);
@ -223,6 +232,49 @@ out:
return;
}
#if defined VITASTOR_C_API_VERSION && VITASTOR_C_API_VERSION >= 2
static void vitastor_bh_send_all(void *opaque)
{
#if QEMU_VERSION_MAJOR == 2 && QEMU_VERSION_MINOR < 8
VitastorBH *vbh = opaque;
VitastorClient *client = vbh->cli;
#else
VitastorClient *client = opaque;
#endif
qemu_mutex_lock(&client->mutex);
client->bh_send_all_scheduled = 0;
vitastor_c_qemu_send_all(client->proxy);
qemu_mutex_unlock(&client->mutex);
#if QEMU_VERSION_MAJOR == 2 && QEMU_VERSION_MINOR < 8
qemu_bh_delete(vbh->bh);
free(vbh);
#endif
}
#endif
static void vitastor_schedule_send_all(VitastorClient *client)
{
#if defined VITASTOR_C_API_VERSION && VITASTOR_C_API_VERSION >= 2
if (!client->bh_send_all_scheduled)
{
client->bh_send_all_scheduled = 1;
#if QEMU_VERSION_MAJOR > 4 || QEMU_VERSION_MAJOR == 4 && QEMU_VERSION_MINOR >= 2
replay_bh_schedule_oneshot_event(client->ctx, vitastor_bh_send_all, client);
#elif QEMU_VERSION_MAJOR >= 3 || QEMU_VERSION_MAJOR == 2 && QEMU_VERSION_MINOR >= 8
aio_bh_schedule_oneshot(client->ctx, vitastor_bh_send_all, client);
#elif QEMU_VERSION_MAJOR >= 2
VitastorBH *vbh = (VitastorBH*)malloc(sizeof(VitastorBH));
vbh->cli = client;
vbh->bh = aio_bh_new(bdrv_get_aio_context(task->bs), vitastor_bh_send_all, vbh);
qemu_bh_schedule(vbh->bh);
#else
client->bh_send_all_scheduled = 0;
vitastor_c_qemu_send_all(client->proxy);
#endif
}
#endif
}
static void coroutine_fn vitastor_co_get_metadata(VitastorRPC *task)
{
BlockDriverState *bs = task->bs;
@ -231,6 +283,7 @@ static void coroutine_fn vitastor_co_get_metadata(VitastorRPC *task)
qemu_mutex_lock(&client->mutex);
vitastor_c_watch_inode(client->proxy, client->image, vitastor_co_generic_cb, task);
vitastor_schedule_send_all(client);
qemu_mutex_unlock(&client->mutex);
while (!task->complete)
@ -244,6 +297,7 @@ static void vitastor_aio_fd_read(void *fddv)
VitastorFdData *fdd = (VitastorFdData*)fddv;
qemu_mutex_lock(&fdd->cli->mutex);
fdd->fd_read(fdd->opaque);
vitastor_schedule_send_all(fdd->cli);
qemu_mutex_unlock(&fdd->cli->mutex);
}
@ -252,6 +306,7 @@ static void vitastor_aio_fd_write(void *fddv)
VitastorFdData *fdd = (VitastorFdData*)fddv;
qemu_mutex_lock(&fdd->cli->mutex);
fdd->fd_write(fdd->opaque);
vitastor_schedule_send_all(fdd->cli);
qemu_mutex_unlock(&fdd->cli->mutex);
}
@ -593,6 +648,7 @@ static int coroutine_fn vitastor_co_preadv(BlockDriverState *bs,
uint64_t inode = client->watch ? vitastor_c_inode_get_num(client->watch) : client->inode;
qemu_mutex_lock(&client->mutex);
vitastor_c_read(client->proxy, inode, offset, bytes, iov->iov, iov->niov, vitastor_co_read_cb, &task);
vitastor_schedule_send_all(client);
qemu_mutex_unlock(&client->mutex);
while (!task.complete)
@ -626,6 +682,7 @@ static int coroutine_fn vitastor_co_pwritev(BlockDriverState *bs,
uint64_t inode = client->watch ? vitastor_c_inode_get_num(client->watch) : client->inode;
qemu_mutex_lock(&client->mutex);
vitastor_c_write(client->proxy, inode, offset, bytes, 0, iov->iov, iov->niov, vitastor_co_generic_cb, &task);
vitastor_schedule_send_all(client);
qemu_mutex_unlock(&client->mutex);
while (!task.complete)
@ -704,6 +761,7 @@ static int coroutine_fn vitastor_co_block_status(
task.bitmap = client->last_bitmap = NULL;
qemu_mutex_lock(&client->mutex);
vitastor_c_read_bitmap(client->proxy, task.inode, task.offset, task.len, !client->skip_parents, vitastor_co_read_bitmap_cb, &task);
vitastor_schedule_send_all(client);
qemu_mutex_unlock(&client->mutex);
while (!task.complete)
{
@ -790,6 +848,7 @@ static int coroutine_fn vitastor_co_flush(BlockDriverState *bs)
qemu_mutex_lock(&client->mutex);
vitastor_c_sync(client->proxy, vitastor_co_generic_cb, &task);
vitastor_schedule_send_all(client);
qemu_mutex_unlock(&client->mutex);
while (!task.complete)

View File

@ -166,6 +166,14 @@ void vitastor_c_uring_wait_events(vitastor_c *client)
client->ringloop->wait();
}
void vitastor_c_qemu_send_all(vitastor_c *client)
{
while (client->cli->msgr.can_send())
{
client->cli->msgr.send_replies();
}
}
void vitastor_c_read(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len,
struct iovec *iov, int iovcnt, VitastorReadHandler cb, void *opaque)
{

View File

@ -7,7 +7,7 @@
#define VITASTOR_QEMU_PROXY_H
// C API wrapper version
#define VITASTOR_C_API_VERSION 1
#define VITASTOR_C_API_VERSION 2
#ifndef POOL_ID_BITS
#define POOL_ID_BITS 16
@ -42,6 +42,7 @@ int vitastor_c_is_ready(vitastor_c *client);
void vitastor_c_uring_wait_ready(vitastor_c *client);
void vitastor_c_uring_handle_events(vitastor_c *client);
void vitastor_c_uring_wait_events(vitastor_c *client);
void vitastor_c_qemu_send_all(vitastor_c *client);
void vitastor_c_read(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len,
struct iovec *iov, int iovcnt, VitastorReadHandler cb, void *opaque);
void vitastor_c_write(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len, uint64_t check_version,