forked from vitalif/vitastor
Compare commits
1 Commits
master
...
qemu-send-
Author | SHA1 | Date |
---|---|---|
Vitaliy Filippov | c3ec6dd3b9 |
|
@ -132,6 +132,7 @@ protected:
|
||||||
uint64_t rdma_max_msg = 0;
|
uint64_t rdma_max_msg = 0;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
bool has_send_loop = false;
|
||||||
std::vector<int> read_ready_clients;
|
std::vector<int> read_ready_clients;
|
||||||
std::vector<int> write_ready_clients;
|
std::vector<int> write_ready_clients;
|
||||||
// We don't use ringloop->set_immediate here because we may have no ringloop in client :)
|
// We don't use ringloop->set_immediate here because we may have no ringloop in client :)
|
||||||
|
@ -159,6 +160,7 @@ public:
|
||||||
std::function<bool(osd_client_t*, json11::Json)> check_config_hook;
|
std::function<bool(osd_client_t*, json11::Json)> check_config_hook;
|
||||||
void read_requests();
|
void read_requests();
|
||||||
void send_replies();
|
void send_replies();
|
||||||
|
bool can_send();
|
||||||
void accept_connections(int listen_fd);
|
void accept_connections(int listen_fd);
|
||||||
~osd_messenger_t();
|
~osd_messenger_t();
|
||||||
|
|
||||||
|
|
|
@ -111,22 +111,28 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
if (!ringloop)
|
if (!ringloop && !has_send_loop)
|
||||||
{
|
{
|
||||||
// FIXME: It's worse because it doesn't allow batching
|
// "Send loop" should be used, like in QEMU driver, or performance will suffer
|
||||||
|
// due to a large number of sendmsg() syscalls where they can be avoided.
|
||||||
|
// "Send loop" increases 4k T1Q128 randread from ~55k iops to ~90k on a 1 OSD.
|
||||||
|
// The difference is smaller when there are more OSDs though...
|
||||||
while (cl->outbox.size())
|
while (cl->outbox.size())
|
||||||
{
|
{
|
||||||
try_send(cl);
|
try_send(cl);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (cl->write_msg.msg_iovlen > 0 || !try_send(cl))
|
else if (cl->write_msg.msg_iovlen > 0 || has_send_loop || !try_send(cl))
|
||||||
{
|
{
|
||||||
if (cl->write_state == 0)
|
if (cl->write_state == 0)
|
||||||
{
|
{
|
||||||
cl->write_state = CL_WRITE_READY;
|
cl->write_state = CL_WRITE_READY;
|
||||||
write_ready_clients.push_back(cur_op->peer_fd);
|
write_ready_clients.push_back(cur_op->peer_fd);
|
||||||
}
|
}
|
||||||
ringloop->wakeup();
|
if (ringloop)
|
||||||
|
{
|
||||||
|
ringloop->wakeup();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -218,6 +224,12 @@ void osd_messenger_t::send_replies()
|
||||||
write_ready_clients.clear();
|
write_ready_clients.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool osd_messenger_t::can_send()
|
||||||
|
{
|
||||||
|
has_send_loop = true;
|
||||||
|
return write_ready_clients.size() > 0;
|
||||||
|
}
|
||||||
|
|
||||||
void osd_messenger_t::handle_send(int result, osd_client_t *cl)
|
void osd_messenger_t::handle_send(int result, osd_client_t *cl)
|
||||||
{
|
{
|
||||||
cl->write_msg.msg_iovlen = 0;
|
cl->write_msg.msg_iovlen = 0;
|
||||||
|
|
|
@ -77,6 +77,7 @@ typedef struct VitastorClient
|
||||||
AioContext *ctx;
|
AioContext *ctx;
|
||||||
VitastorFdData **fds;
|
VitastorFdData **fds;
|
||||||
int fd_count, fd_alloc;
|
int fd_count, fd_alloc;
|
||||||
|
int bh_send_all_scheduled;
|
||||||
|
|
||||||
uint64_t last_bitmap_inode, last_bitmap_offset, last_bitmap_len;
|
uint64_t last_bitmap_inode, last_bitmap_offset, last_bitmap_len;
|
||||||
uint32_t last_bitmap_granularity;
|
uint32_t last_bitmap_granularity;
|
||||||
|
@ -106,6 +107,14 @@ typedef struct VitastorRPC
|
||||||
#endif
|
#endif
|
||||||
} VitastorRPC;
|
} VitastorRPC;
|
||||||
|
|
||||||
|
#if QEMU_VERSION_MAJOR == 2 && QEMU_VERSION_MINOR < 8
|
||||||
|
typedef struct VitastorBH
|
||||||
|
{
|
||||||
|
VitastorClient *cli;
|
||||||
|
QEMUBH *bh;
|
||||||
|
} VitastorBH;
|
||||||
|
#endif
|
||||||
|
|
||||||
static void vitastor_co_init_task(BlockDriverState *bs, VitastorRPC *task);
|
static void vitastor_co_init_task(BlockDriverState *bs, VitastorRPC *task);
|
||||||
static void vitastor_co_generic_cb(void *opaque, long retval);
|
static void vitastor_co_generic_cb(void *opaque, long retval);
|
||||||
static void vitastor_co_read_cb(void *opaque, long retval, uint64_t version);
|
static void vitastor_co_read_cb(void *opaque, long retval, uint64_t version);
|
||||||
|
@ -223,6 +232,49 @@ out:
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if defined VITASTOR_C_API_VERSION && VITASTOR_C_API_VERSION >= 2
|
||||||
|
static void vitastor_bh_send_all(void *opaque)
|
||||||
|
{
|
||||||
|
#if QEMU_VERSION_MAJOR == 2 && QEMU_VERSION_MINOR < 8
|
||||||
|
VitastorBH *vbh = opaque;
|
||||||
|
VitastorClient *client = vbh->cli;
|
||||||
|
#else
|
||||||
|
VitastorClient *client = opaque;
|
||||||
|
#endif
|
||||||
|
qemu_mutex_lock(&client->mutex);
|
||||||
|
client->bh_send_all_scheduled = 0;
|
||||||
|
vitastor_c_qemu_send_all(client->proxy);
|
||||||
|
qemu_mutex_unlock(&client->mutex);
|
||||||
|
#if QEMU_VERSION_MAJOR == 2 && QEMU_VERSION_MINOR < 8
|
||||||
|
qemu_bh_delete(vbh->bh);
|
||||||
|
free(vbh);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
static void vitastor_schedule_send_all(VitastorClient *client)
|
||||||
|
{
|
||||||
|
#if defined VITASTOR_C_API_VERSION && VITASTOR_C_API_VERSION >= 2
|
||||||
|
if (!client->bh_send_all_scheduled)
|
||||||
|
{
|
||||||
|
client->bh_send_all_scheduled = 1;
|
||||||
|
#if QEMU_VERSION_MAJOR > 4 || QEMU_VERSION_MAJOR == 4 && QEMU_VERSION_MINOR >= 2
|
||||||
|
replay_bh_schedule_oneshot_event(client->ctx, vitastor_bh_send_all, client);
|
||||||
|
#elif QEMU_VERSION_MAJOR >= 3 || QEMU_VERSION_MAJOR == 2 && QEMU_VERSION_MINOR >= 8
|
||||||
|
aio_bh_schedule_oneshot(client->ctx, vitastor_bh_send_all, client);
|
||||||
|
#elif QEMU_VERSION_MAJOR >= 2
|
||||||
|
VitastorBH *vbh = (VitastorBH*)malloc(sizeof(VitastorBH));
|
||||||
|
vbh->cli = client;
|
||||||
|
vbh->bh = aio_bh_new(bdrv_get_aio_context(task->bs), vitastor_bh_send_all, vbh);
|
||||||
|
qemu_bh_schedule(vbh->bh);
|
||||||
|
#else
|
||||||
|
client->bh_send_all_scheduled = 0;
|
||||||
|
vitastor_c_qemu_send_all(client->proxy);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
static void coroutine_fn vitastor_co_get_metadata(VitastorRPC *task)
|
static void coroutine_fn vitastor_co_get_metadata(VitastorRPC *task)
|
||||||
{
|
{
|
||||||
BlockDriverState *bs = task->bs;
|
BlockDriverState *bs = task->bs;
|
||||||
|
@ -231,6 +283,7 @@ static void coroutine_fn vitastor_co_get_metadata(VitastorRPC *task)
|
||||||
|
|
||||||
qemu_mutex_lock(&client->mutex);
|
qemu_mutex_lock(&client->mutex);
|
||||||
vitastor_c_watch_inode(client->proxy, client->image, vitastor_co_generic_cb, task);
|
vitastor_c_watch_inode(client->proxy, client->image, vitastor_co_generic_cb, task);
|
||||||
|
vitastor_schedule_send_all(client);
|
||||||
qemu_mutex_unlock(&client->mutex);
|
qemu_mutex_unlock(&client->mutex);
|
||||||
|
|
||||||
while (!task->complete)
|
while (!task->complete)
|
||||||
|
@ -244,6 +297,7 @@ static void vitastor_aio_fd_read(void *fddv)
|
||||||
VitastorFdData *fdd = (VitastorFdData*)fddv;
|
VitastorFdData *fdd = (VitastorFdData*)fddv;
|
||||||
qemu_mutex_lock(&fdd->cli->mutex);
|
qemu_mutex_lock(&fdd->cli->mutex);
|
||||||
fdd->fd_read(fdd->opaque);
|
fdd->fd_read(fdd->opaque);
|
||||||
|
vitastor_schedule_send_all(fdd->cli);
|
||||||
qemu_mutex_unlock(&fdd->cli->mutex);
|
qemu_mutex_unlock(&fdd->cli->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -252,6 +306,7 @@ static void vitastor_aio_fd_write(void *fddv)
|
||||||
VitastorFdData *fdd = (VitastorFdData*)fddv;
|
VitastorFdData *fdd = (VitastorFdData*)fddv;
|
||||||
qemu_mutex_lock(&fdd->cli->mutex);
|
qemu_mutex_lock(&fdd->cli->mutex);
|
||||||
fdd->fd_write(fdd->opaque);
|
fdd->fd_write(fdd->opaque);
|
||||||
|
vitastor_schedule_send_all(fdd->cli);
|
||||||
qemu_mutex_unlock(&fdd->cli->mutex);
|
qemu_mutex_unlock(&fdd->cli->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -593,6 +648,7 @@ static int coroutine_fn vitastor_co_preadv(BlockDriverState *bs,
|
||||||
uint64_t inode = client->watch ? vitastor_c_inode_get_num(client->watch) : client->inode;
|
uint64_t inode = client->watch ? vitastor_c_inode_get_num(client->watch) : client->inode;
|
||||||
qemu_mutex_lock(&client->mutex);
|
qemu_mutex_lock(&client->mutex);
|
||||||
vitastor_c_read(client->proxy, inode, offset, bytes, iov->iov, iov->niov, vitastor_co_read_cb, &task);
|
vitastor_c_read(client->proxy, inode, offset, bytes, iov->iov, iov->niov, vitastor_co_read_cb, &task);
|
||||||
|
vitastor_schedule_send_all(client);
|
||||||
qemu_mutex_unlock(&client->mutex);
|
qemu_mutex_unlock(&client->mutex);
|
||||||
|
|
||||||
while (!task.complete)
|
while (!task.complete)
|
||||||
|
@ -626,6 +682,7 @@ static int coroutine_fn vitastor_co_pwritev(BlockDriverState *bs,
|
||||||
uint64_t inode = client->watch ? vitastor_c_inode_get_num(client->watch) : client->inode;
|
uint64_t inode = client->watch ? vitastor_c_inode_get_num(client->watch) : client->inode;
|
||||||
qemu_mutex_lock(&client->mutex);
|
qemu_mutex_lock(&client->mutex);
|
||||||
vitastor_c_write(client->proxy, inode, offset, bytes, 0, iov->iov, iov->niov, vitastor_co_generic_cb, &task);
|
vitastor_c_write(client->proxy, inode, offset, bytes, 0, iov->iov, iov->niov, vitastor_co_generic_cb, &task);
|
||||||
|
vitastor_schedule_send_all(client);
|
||||||
qemu_mutex_unlock(&client->mutex);
|
qemu_mutex_unlock(&client->mutex);
|
||||||
|
|
||||||
while (!task.complete)
|
while (!task.complete)
|
||||||
|
@ -704,6 +761,7 @@ static int coroutine_fn vitastor_co_block_status(
|
||||||
task.bitmap = client->last_bitmap = NULL;
|
task.bitmap = client->last_bitmap = NULL;
|
||||||
qemu_mutex_lock(&client->mutex);
|
qemu_mutex_lock(&client->mutex);
|
||||||
vitastor_c_read_bitmap(client->proxy, task.inode, task.offset, task.len, !client->skip_parents, vitastor_co_read_bitmap_cb, &task);
|
vitastor_c_read_bitmap(client->proxy, task.inode, task.offset, task.len, !client->skip_parents, vitastor_co_read_bitmap_cb, &task);
|
||||||
|
vitastor_schedule_send_all(client);
|
||||||
qemu_mutex_unlock(&client->mutex);
|
qemu_mutex_unlock(&client->mutex);
|
||||||
while (!task.complete)
|
while (!task.complete)
|
||||||
{
|
{
|
||||||
|
@ -790,6 +848,7 @@ static int coroutine_fn vitastor_co_flush(BlockDriverState *bs)
|
||||||
|
|
||||||
qemu_mutex_lock(&client->mutex);
|
qemu_mutex_lock(&client->mutex);
|
||||||
vitastor_c_sync(client->proxy, vitastor_co_generic_cb, &task);
|
vitastor_c_sync(client->proxy, vitastor_co_generic_cb, &task);
|
||||||
|
vitastor_schedule_send_all(client);
|
||||||
qemu_mutex_unlock(&client->mutex);
|
qemu_mutex_unlock(&client->mutex);
|
||||||
|
|
||||||
while (!task.complete)
|
while (!task.complete)
|
||||||
|
|
|
@ -166,6 +166,14 @@ void vitastor_c_uring_wait_events(vitastor_c *client)
|
||||||
client->ringloop->wait();
|
client->ringloop->wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void vitastor_c_qemu_send_all(vitastor_c *client)
|
||||||
|
{
|
||||||
|
while (client->cli->msgr.can_send())
|
||||||
|
{
|
||||||
|
client->cli->msgr.send_replies();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void vitastor_c_read(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len,
|
void vitastor_c_read(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len,
|
||||||
struct iovec *iov, int iovcnt, VitastorReadHandler cb, void *opaque)
|
struct iovec *iov, int iovcnt, VitastorReadHandler cb, void *opaque)
|
||||||
{
|
{
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
#define VITASTOR_QEMU_PROXY_H
|
#define VITASTOR_QEMU_PROXY_H
|
||||||
|
|
||||||
// C API wrapper version
|
// C API wrapper version
|
||||||
#define VITASTOR_C_API_VERSION 1
|
#define VITASTOR_C_API_VERSION 2
|
||||||
|
|
||||||
#ifndef POOL_ID_BITS
|
#ifndef POOL_ID_BITS
|
||||||
#define POOL_ID_BITS 16
|
#define POOL_ID_BITS 16
|
||||||
|
@ -42,6 +42,7 @@ int vitastor_c_is_ready(vitastor_c *client);
|
||||||
void vitastor_c_uring_wait_ready(vitastor_c *client);
|
void vitastor_c_uring_wait_ready(vitastor_c *client);
|
||||||
void vitastor_c_uring_handle_events(vitastor_c *client);
|
void vitastor_c_uring_handle_events(vitastor_c *client);
|
||||||
void vitastor_c_uring_wait_events(vitastor_c *client);
|
void vitastor_c_uring_wait_events(vitastor_c *client);
|
||||||
|
void vitastor_c_qemu_send_all(vitastor_c *client);
|
||||||
void vitastor_c_read(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len,
|
void vitastor_c_read(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len,
|
||||||
struct iovec *iov, int iovcnt, VitastorReadHandler cb, void *opaque);
|
struct iovec *iov, int iovcnt, VitastorReadHandler cb, void *opaque);
|
||||||
void vitastor_c_write(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len, uint64_t check_version,
|
void vitastor_c_write(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len, uint64_t check_version,
|
||||||
|
|
Loading…
Reference in New Issue