forked from vitalif/vitastor
Integrate QEMU driver with io_uring
parent
b8e30608d6
commit
07b2196bc2
|
@ -58,6 +58,8 @@ typedef struct VitastorFdData VitastorFdData;
|
||||||
typedef struct VitastorClient
|
typedef struct VitastorClient
|
||||||
{
|
{
|
||||||
void *proxy;
|
void *proxy;
|
||||||
|
int uring_eventfd;
|
||||||
|
|
||||||
void *watch;
|
void *watch;
|
||||||
char *config_path;
|
char *config_path;
|
||||||
char *etcd_host;
|
char *etcd_host;
|
||||||
|
@ -223,6 +225,40 @@ out:
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void vitastor_unlocked_uring_handler(VitastorClient *client)
|
||||||
|
{
|
||||||
|
do
|
||||||
|
{
|
||||||
|
vitastor_c_uring_handle_events(client->proxy);
|
||||||
|
} while (vitastor_c_uring_has_work(client->proxy));
|
||||||
|
}
|
||||||
|
|
||||||
|
static void vitastor_uring_handler(void *opaque)
|
||||||
|
{
|
||||||
|
VitastorClient *client = (VitastorClient*)opaque;
|
||||||
|
qemu_mutex_lock(&client->mutex);
|
||||||
|
do
|
||||||
|
{
|
||||||
|
vitastor_c_uring_handle_events(client->proxy);
|
||||||
|
} while (vitastor_c_uring_has_work(client->proxy));
|
||||||
|
qemu_mutex_unlock(&client->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void vitastor_schedule_uring_handler(VitastorClient *client)
|
||||||
|
{
|
||||||
|
void *opaque = client;
|
||||||
|
#if QEMU_VERSION_MAJOR > 4 || QEMU_VERSION_MAJOR == 4 && QEMU_VERSION_MINOR >= 2
|
||||||
|
replay_bh_schedule_oneshot_event(client->ctx, vitastor_uring_handler, opaque);
|
||||||
|
#elif QEMU_VERSION_MAJOR >= 3 || QEMU_VERSION_MAJOR == 2 && QEMU_VERSION_MINOR >= 8
|
||||||
|
aio_bh_schedule_oneshot(client->ctx, vitastor_uring_handler, opaque);
|
||||||
|
#elif QEMU_VERSION_MAJOR >= 2
|
||||||
|
// task->bh = aio_bh_new(bdrv_get_aio_context(task->bs), vitastor_co_generic_bh_cb, opaque);
|
||||||
|
// qemu_bh_schedule(task->bh);
|
||||||
|
#else
|
||||||
|
// vitastor_co_generic_bh_cb(opaque);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
static void coroutine_fn vitastor_co_get_metadata(VitastorRPC *task)
|
static void coroutine_fn vitastor_co_get_metadata(VitastorRPC *task)
|
||||||
{
|
{
|
||||||
BlockDriverState *bs = task->bs;
|
BlockDriverState *bs = task->bs;
|
||||||
|
@ -231,6 +267,7 @@ static void coroutine_fn vitastor_co_get_metadata(VitastorRPC *task)
|
||||||
|
|
||||||
qemu_mutex_lock(&client->mutex);
|
qemu_mutex_lock(&client->mutex);
|
||||||
vitastor_c_watch_inode(client->proxy, client->image, vitastor_co_generic_cb, task);
|
vitastor_c_watch_inode(client->proxy, client->image, vitastor_co_generic_cb, task);
|
||||||
|
vitastor_schedule_uring_handler(client);
|
||||||
qemu_mutex_unlock(&client->mutex);
|
qemu_mutex_unlock(&client->mutex);
|
||||||
|
|
||||||
while (!task->complete)
|
while (!task->complete)
|
||||||
|
@ -255,6 +292,26 @@ static void vitastor_aio_fd_write(void *fddv)
|
||||||
qemu_mutex_unlock(&fdd->cli->mutex);
|
qemu_mutex_unlock(&fdd->cli->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void universal_aio_set_fd_handler(AioContext *ctx, int fd, IOHandler *fd_read, IOHandler *fd_write, void *opaque)
|
||||||
|
{
|
||||||
|
aio_set_fd_handler(ctx, fd,
|
||||||
|
#if QEMU_VERSION_MAJOR == 2 && QEMU_VERSION_MINOR >= 5 || QEMU_VERSION_MAJOR >= 3
|
||||||
|
0 /*is_external*/,
|
||||||
|
#endif
|
||||||
|
fd_read,
|
||||||
|
fd_write,
|
||||||
|
#if QEMU_VERSION_MAJOR == 1 && QEMU_VERSION_MINOR <= 6 || QEMU_VERSION_MAJOR < 1
|
||||||
|
NULL /*io_flush*/,
|
||||||
|
#endif
|
||||||
|
#if QEMU_VERSION_MAJOR == 2 && QEMU_VERSION_MINOR >= 9 || QEMU_VERSION_MAJOR >= 3
|
||||||
|
NULL /*io_poll*/,
|
||||||
|
#endif
|
||||||
|
#if QEMU_VERSION_MAJOR >= 7
|
||||||
|
NULL /*io_poll_ready*/,
|
||||||
|
#endif
|
||||||
|
opaque);
|
||||||
|
}
|
||||||
|
|
||||||
static void vitastor_aio_set_fd_handler(void *vcli, int fd, int unused1, IOHandler *fd_read, IOHandler *fd_write, void *unused2, void *opaque)
|
static void vitastor_aio_set_fd_handler(void *vcli, int fd, int unused1, IOHandler *fd_read, IOHandler *fd_write, void *unused2, void *opaque)
|
||||||
{
|
{
|
||||||
VitastorClient *client = (VitastorClient*)vcli;
|
VitastorClient *client = (VitastorClient*)vcli;
|
||||||
|
@ -297,22 +354,9 @@ static void vitastor_aio_set_fd_handler(void *vcli, int fd, int unused1, IOHandl
|
||||||
}
|
}
|
||||||
client->fds[client->fd_count++] = fdd;
|
client->fds[client->fd_count++] = fdd;
|
||||||
}
|
}
|
||||||
aio_set_fd_handler(client->ctx, fd,
|
universal_aio_set_fd_handler(
|
||||||
#if QEMU_VERSION_MAJOR == 2 && QEMU_VERSION_MINOR >= 5 || QEMU_VERSION_MAJOR >= 3
|
client->ctx, fd, fd_read ? vitastor_aio_fd_read : NULL, fd_write ? vitastor_aio_fd_write : NULL, fdd
|
||||||
0 /*is_external*/,
|
);
|
||||||
#endif
|
|
||||||
fd_read ? vitastor_aio_fd_read : NULL,
|
|
||||||
fd_write ? vitastor_aio_fd_write : NULL,
|
|
||||||
#if QEMU_VERSION_MAJOR == 1 && QEMU_VERSION_MINOR <= 6 || QEMU_VERSION_MAJOR < 1
|
|
||||||
NULL /*io_flush*/,
|
|
||||||
#endif
|
|
||||||
#if QEMU_VERSION_MAJOR == 2 && QEMU_VERSION_MINOR >= 9 || QEMU_VERSION_MAJOR >= 3
|
|
||||||
NULL /*io_poll*/,
|
|
||||||
#endif
|
|
||||||
#if QEMU_VERSION_MAJOR >= 7
|
|
||||||
NULL /*io_poll_ready*/,
|
|
||||||
#endif
|
|
||||||
fdd);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, Error **errp)
|
static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, Error **errp)
|
||||||
|
@ -333,10 +377,19 @@ static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, E
|
||||||
client->rdma_gid_index = qdict_get_try_int(options, "rdma-gid-index", 0);
|
client->rdma_gid_index = qdict_get_try_int(options, "rdma-gid-index", 0);
|
||||||
client->rdma_mtu = qdict_get_try_int(options, "rdma-mtu", 0);
|
client->rdma_mtu = qdict_get_try_int(options, "rdma-mtu", 0);
|
||||||
client->ctx = bdrv_get_aio_context(bs);
|
client->ctx = bdrv_get_aio_context(bs);
|
||||||
client->proxy = vitastor_c_create_qemu(
|
client->proxy = vitastor_c_create_uring(
|
||||||
vitastor_aio_set_fd_handler, client, client->config_path, client->etcd_host, client->etcd_prefix,
|
client->config_path, client->etcd_host, client->etcd_prefix,
|
||||||
client->use_rdma, client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu, 0
|
client->use_rdma, client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu, 0
|
||||||
);
|
);
|
||||||
|
client->uring_eventfd = vitastor_c_uring_register_eventfd(client->proxy);
|
||||||
|
if (client->uring_eventfd < 0)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "failed to create eventfd: %s\n", strerror(errno));
|
||||||
|
error_setg(errp, "failed to create eventfd");
|
||||||
|
vitastor_close(bs);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
universal_aio_set_fd_handler(client->ctx, client->uring_eventfd, vitastor_uring_handler, NULL, client);
|
||||||
image = client->image = g_strdup(qdict_get_try_str(options, "image"));
|
image = client->image = g_strdup(qdict_get_try_str(options, "image"));
|
||||||
client->readonly = (flags & BDRV_O_RDWR) ? 1 : 0;
|
client->readonly = (flags & BDRV_O_RDWR) ? 1 : 0;
|
||||||
// Get image metadata (size and readonly flag) or just wait until the client is ready
|
// Get image metadata (size and readonly flag) or just wait until the client is ready
|
||||||
|
@ -593,6 +646,7 @@ static int coroutine_fn vitastor_co_preadv(BlockDriverState *bs,
|
||||||
uint64_t inode = client->watch ? vitastor_c_inode_get_num(client->watch) : client->inode;
|
uint64_t inode = client->watch ? vitastor_c_inode_get_num(client->watch) : client->inode;
|
||||||
qemu_mutex_lock(&client->mutex);
|
qemu_mutex_lock(&client->mutex);
|
||||||
vitastor_c_read(client->proxy, inode, offset, bytes, iov->iov, iov->niov, vitastor_co_read_cb, &task);
|
vitastor_c_read(client->proxy, inode, offset, bytes, iov->iov, iov->niov, vitastor_co_read_cb, &task);
|
||||||
|
vitastor_schedule_uring_handler(client);
|
||||||
qemu_mutex_unlock(&client->mutex);
|
qemu_mutex_unlock(&client->mutex);
|
||||||
|
|
||||||
while (!task.complete)
|
while (!task.complete)
|
||||||
|
@ -626,6 +680,7 @@ static int coroutine_fn vitastor_co_pwritev(BlockDriverState *bs,
|
||||||
uint64_t inode = client->watch ? vitastor_c_inode_get_num(client->watch) : client->inode;
|
uint64_t inode = client->watch ? vitastor_c_inode_get_num(client->watch) : client->inode;
|
||||||
qemu_mutex_lock(&client->mutex);
|
qemu_mutex_lock(&client->mutex);
|
||||||
vitastor_c_write(client->proxy, inode, offset, bytes, 0, iov->iov, iov->niov, vitastor_co_generic_cb, &task);
|
vitastor_c_write(client->proxy, inode, offset, bytes, 0, iov->iov, iov->niov, vitastor_co_generic_cb, &task);
|
||||||
|
vitastor_schedule_uring_handler(client);
|
||||||
qemu_mutex_unlock(&client->mutex);
|
qemu_mutex_unlock(&client->mutex);
|
||||||
|
|
||||||
while (!task.complete)
|
while (!task.complete)
|
||||||
|
@ -704,6 +759,7 @@ static int coroutine_fn vitastor_co_block_status(
|
||||||
task.bitmap = client->last_bitmap = NULL;
|
task.bitmap = client->last_bitmap = NULL;
|
||||||
qemu_mutex_lock(&client->mutex);
|
qemu_mutex_lock(&client->mutex);
|
||||||
vitastor_c_read_bitmap(client->proxy, task.inode, task.offset, task.len, !client->skip_parents, vitastor_co_read_bitmap_cb, &task);
|
vitastor_c_read_bitmap(client->proxy, task.inode, task.offset, task.len, !client->skip_parents, vitastor_co_read_bitmap_cb, &task);
|
||||||
|
vitastor_schedule_uring_handler(client);
|
||||||
qemu_mutex_unlock(&client->mutex);
|
qemu_mutex_unlock(&client->mutex);
|
||||||
while (!task.complete)
|
while (!task.complete)
|
||||||
{
|
{
|
||||||
|
@ -790,6 +846,7 @@ static int coroutine_fn vitastor_co_flush(BlockDriverState *bs)
|
||||||
|
|
||||||
qemu_mutex_lock(&client->mutex);
|
qemu_mutex_lock(&client->mutex);
|
||||||
vitastor_c_sync(client->proxy, vitastor_co_generic_cb, &task);
|
vitastor_c_sync(client->proxy, vitastor_co_generic_cb, &task);
|
||||||
|
vitastor_schedule_uring_handler(client);
|
||||||
qemu_mutex_unlock(&client->mutex);
|
qemu_mutex_unlock(&client->mutex);
|
||||||
|
|
||||||
while (!task.complete)
|
while (!task.complete)
|
||||||
|
|
|
@ -5,6 +5,8 @@
|
||||||
|
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
|
|
||||||
|
#include <sys/eventfd.h>
|
||||||
|
|
||||||
#include "ringloop.h"
|
#include "ringloop.h"
|
||||||
|
|
||||||
ring_loop_t::ring_loop_t(int qd)
|
ring_loop_t::ring_loop_t(int qd)
|
||||||
|
@ -32,6 +34,10 @@ ring_loop_t::~ring_loop_t()
|
||||||
free(free_ring_data);
|
free(free_ring_data);
|
||||||
free(ring_datas);
|
free(ring_datas);
|
||||||
io_uring_queue_exit(&ring);
|
io_uring_queue_exit(&ring);
|
||||||
|
if (ring_eventfd)
|
||||||
|
{
|
||||||
|
close(ring_eventfd);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void ring_loop_t::register_consumer(ring_consumer_t *consumer)
|
void ring_loop_t::register_consumer(ring_consumer_t *consumer)
|
||||||
|
@ -127,3 +133,24 @@ int ring_loop_t::sqes_left()
|
||||||
}
|
}
|
||||||
return left;
|
return left;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ring_loop_t::register_eventfd()
|
||||||
|
{
|
||||||
|
if (ring_eventfd >= 0)
|
||||||
|
{
|
||||||
|
return ring_eventfd;
|
||||||
|
}
|
||||||
|
ring_eventfd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
|
||||||
|
if (ring_eventfd < 0)
|
||||||
|
{
|
||||||
|
return -errno;
|
||||||
|
}
|
||||||
|
int r = io_uring_register_eventfd(&ring, ring_eventfd);
|
||||||
|
if (r < 0)
|
||||||
|
{
|
||||||
|
close(ring_eventfd);
|
||||||
|
ring_eventfd = -1;
|
||||||
|
return r;
|
||||||
|
}
|
||||||
|
return ring_eventfd;
|
||||||
|
}
|
||||||
|
|
|
@ -126,11 +126,13 @@ class ring_loop_t
|
||||||
unsigned free_ring_data_ptr;
|
unsigned free_ring_data_ptr;
|
||||||
bool loop_again;
|
bool loop_again;
|
||||||
struct io_uring ring;
|
struct io_uring ring;
|
||||||
|
int ring_eventfd = -1;
|
||||||
public:
|
public:
|
||||||
ring_loop_t(int qd);
|
ring_loop_t(int qd);
|
||||||
~ring_loop_t();
|
~ring_loop_t();
|
||||||
void register_consumer(ring_consumer_t *consumer);
|
void register_consumer(ring_consumer_t *consumer);
|
||||||
void unregister_consumer(ring_consumer_t *consumer);
|
void unregister_consumer(ring_consumer_t *consumer);
|
||||||
|
int register_eventfd();
|
||||||
|
|
||||||
inline struct io_uring_sqe* get_sqe()
|
inline struct io_uring_sqe* get_sqe()
|
||||||
{
|
{
|
||||||
|
|
|
@ -5,6 +5,7 @@
|
||||||
// Also acts as a C-C++ proxy for the QEMU driver (QEMU headers don't compile with g++)
|
// Also acts as a C-C++ proxy for the QEMU driver (QEMU headers don't compile with g++)
|
||||||
|
|
||||||
#include <sys/epoll.h>
|
#include <sys/epoll.h>
|
||||||
|
#include <sys/eventfd.h>
|
||||||
|
|
||||||
#include "ringloop.h"
|
#include "ringloop.h"
|
||||||
#include "epoll_manager.h"
|
#include "epoll_manager.h"
|
||||||
|
@ -25,6 +26,7 @@ struct vitastor_c
|
||||||
epoll_manager_t *epmgr = NULL;
|
epoll_manager_t *epmgr = NULL;
|
||||||
timerfd_manager_t *tfd = NULL;
|
timerfd_manager_t *tfd = NULL;
|
||||||
cluster_client_t *cli = NULL;
|
cluster_client_t *cli = NULL;
|
||||||
|
int uring_eventfd = -1;
|
||||||
|
|
||||||
QEMUSetFDHandler *aio_set_fd_handler = NULL;
|
QEMUSetFDHandler *aio_set_fd_handler = NULL;
|
||||||
void *aio_ctx = NULL;
|
void *aio_ctx = NULL;
|
||||||
|
@ -113,6 +115,15 @@ vitastor_c *vitastor_c_create_uring(const char *config_path, const char *etcd_ho
|
||||||
return self;
|
return self;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int vitastor_c_uring_register_eventfd(vitastor_c *client)
|
||||||
|
{
|
||||||
|
if (!client->ringloop)
|
||||||
|
{
|
||||||
|
return -EINVAL;
|
||||||
|
}
|
||||||
|
return client->ringloop->register_eventfd();
|
||||||
|
}
|
||||||
|
|
||||||
vitastor_c *vitastor_c_create_uring_json(const char **options, int options_len)
|
vitastor_c *vitastor_c_create_uring_json(const char **options, int options_len)
|
||||||
{
|
{
|
||||||
json11::Json::object cfg;
|
json11::Json::object cfg;
|
||||||
|
@ -166,6 +177,11 @@ void vitastor_c_uring_wait_events(vitastor_c *client)
|
||||||
client->ringloop->wait();
|
client->ringloop->wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool vitastor_c_uring_has_work(vitastor_c *client)
|
||||||
|
{
|
||||||
|
return client->ringloop->has_work();
|
||||||
|
}
|
||||||
|
|
||||||
void vitastor_c_read(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len,
|
void vitastor_c_read(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len,
|
||||||
struct iovec *iov, int iovcnt, VitastorReadHandler cb, void *opaque)
|
struct iovec *iov, int iovcnt, VitastorReadHandler cb, void *opaque)
|
||||||
{
|
{
|
||||||
|
|
|
@ -39,9 +39,11 @@ vitastor_c *vitastor_c_create_uring(const char *config_path, const char *etcd_ho
|
||||||
vitastor_c *vitastor_c_create_uring_json(const char **options, int options_len);
|
vitastor_c *vitastor_c_create_uring_json(const char **options, int options_len);
|
||||||
void vitastor_c_destroy(vitastor_c *client);
|
void vitastor_c_destroy(vitastor_c *client);
|
||||||
int vitastor_c_is_ready(vitastor_c *client);
|
int vitastor_c_is_ready(vitastor_c *client);
|
||||||
|
int vitastor_c_uring_register_eventfd(vitastor_c *client);
|
||||||
void vitastor_c_uring_wait_ready(vitastor_c *client);
|
void vitastor_c_uring_wait_ready(vitastor_c *client);
|
||||||
void vitastor_c_uring_handle_events(vitastor_c *client);
|
void vitastor_c_uring_handle_events(vitastor_c *client);
|
||||||
void vitastor_c_uring_wait_events(vitastor_c *client);
|
void vitastor_c_uring_wait_events(vitastor_c *client);
|
||||||
|
bool vitastor_c_uring_has_work(vitastor_c *client);
|
||||||
void vitastor_c_read(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len,
|
void vitastor_c_read(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len,
|
||||||
struct iovec *iov, int iovcnt, VitastorReadHandler cb, void *opaque);
|
struct iovec *iov, int iovcnt, VitastorReadHandler cb, void *opaque);
|
||||||
void vitastor_c_write(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len, uint64_t check_version,
|
void vitastor_c_write(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len, uint64_t check_version,
|
||||||
|
|
Loading…
Reference in New Issue