Block layer patches

- Fix blockdev-create with iothreads
 - Remove aio_disable_external() API
 -----BEGIN PGP SIGNATURE-----
 
 iQJFBAABCAAvFiEE3D3rFZqa+V09dFb+fwmycsiPL9YFAmR2JIARHGt3b2xmQHJl
 ZGhhdC5jb20ACgkQfwmycsiPL9brtA/9HVdAdtJxW78J60TE2lTqE9XlqMOEHBZl
 8GN72trjP2geY/9mVsv/XoFie4ecqFsYjwAWWUuXZwLgAo53jh7oFN7gBH5iGyyD
 +EukYEfjqoykX5BkoK0gbMZZUe5Y4Dr2CNXYw4bNg8kDzj2RLifGA1XhdL3HoiVt
 PHZrhwBR7ddww6gVOnyJrfGL8fMkW/ZNeKRhrTZuSP+63oDOeGTsTumD+YKJzfPs
 p5WlwkuPjcqbO+w32FeVOHVhNI4swkN5svz3fkr8NuflfA7kH6nBQ5wymObbaTLc
 Erx03lrtP1+6nw43V11UnYt6iDMg4EBUQwtzNaKFnk3rMIdjoQYxIM5FTBWL2rYD
 Dg6PhkncXQ1WNWhUaFqpTFLB52XAYsSa4/y2QAGP6nWbqAUAUknQ3exaMvWiq7Z0
 nZeyyhIWvpJIHGCArWRdqqh+zsBdsmUVuPGyZnZgL/cXoJboYiHMyMJSUWE0XxML
 NGrncwxdsBXkVGGwTdHpBT64dcu3ENRgwtraqRLQm+tp5MKNTJB/+Ug2/p1vonHT
 UOoHz//UPskn8sHIyevoHXeu2Ns0uIHzrAXr+7Ay+9UYyIH6a07F4b2BGqkfyi/i
 8wQsDmJ/idx5C4q1+jS+GuIbpnjIx6nxXwXMqpscUXZmM4Am8OMkiKxQAa1wExGF
 paId+HHwyks=
 =yuER
 -----END PGP SIGNATURE-----

Merge tag 'for-upstream' of https://repo.or.cz/qemu/kevin into staging

Block layer patches

- Fix blockdev-create with iothreads
- Remove aio_disable_external() API

# -----BEGIN PGP SIGNATURE-----
#
# iQJFBAABCAAvFiEE3D3rFZqa+V09dFb+fwmycsiPL9YFAmR2JIARHGt3b2xmQHJl
# ZGhhdC5jb20ACgkQfwmycsiPL9brtA/9HVdAdtJxW78J60TE2lTqE9XlqMOEHBZl
# 8GN72trjP2geY/9mVsv/XoFie4ecqFsYjwAWWUuXZwLgAo53jh7oFN7gBH5iGyyD
# +EukYEfjqoykX5BkoK0gbMZZUe5Y4Dr2CNXYw4bNg8kDzj2RLifGA1XhdL3HoiVt
# PHZrhwBR7ddww6gVOnyJrfGL8fMkW/ZNeKRhrTZuSP+63oDOeGTsTumD+YKJzfPs
# p5WlwkuPjcqbO+w32FeVOHVhNI4swkN5svz3fkr8NuflfA7kH6nBQ5wymObbaTLc
# Erx03lrtP1+6nw43V11UnYt6iDMg4EBUQwtzNaKFnk3rMIdjoQYxIM5FTBWL2rYD
# Dg6PhkncXQ1WNWhUaFqpTFLB52XAYsSa4/y2QAGP6nWbqAUAUknQ3exaMvWiq7Z0
# nZeyyhIWvpJIHGCArWRdqqh+zsBdsmUVuPGyZnZgL/cXoJboYiHMyMJSUWE0XxML
# NGrncwxdsBXkVGGwTdHpBT64dcu3ENRgwtraqRLQm+tp5MKNTJB/+Ug2/p1vonHT
# UOoHz//UPskn8sHIyevoHXeu2Ns0uIHzrAXr+7Ay+9UYyIH6a07F4b2BGqkfyi/i
# 8wQsDmJ/idx5C4q1+jS+GuIbpnjIx6nxXwXMqpscUXZmM4Am8OMkiKxQAa1wExGF
# paId+HHwyks=
# =yuER
# -----END PGP SIGNATURE-----
# gpg: Signature made Tue 30 May 2023 09:29:52 AM PDT
# gpg:                using RSA key DC3DEB159A9AF95D3D7456FE7F09B272C88F2FD6
# gpg:                issuer "kwolf@redhat.com"
# gpg: Good signature from "Kevin Wolf <kwolf@redhat.com>" [full]

* tag 'for-upstream' of https://repo.or.cz/qemu/kevin: (32 commits)
  aio: remove aio_disable_external() API
  virtio: do not set is_external=true on host notifiers
  virtio-scsi: implement BlockDevOps->drained_begin()
  virtio-blk: implement BlockDevOps->drained_begin()
  virtio: make it possible to detach host notifier from any thread
  block/fuse: do not set is_external=true on FUSE fd
  block/export: don't require AioContext lock around blk_exp_ref/unref()
  block/export: rewrite vduse-blk drain code
  hw/xen: do not set is_external=true on evtchn fds
  xen-block: implement BlockDevOps->drained_begin()
  block: drain from main loop thread in bdrv_co_yield_to_drain()
  block: add blk_in_drain() API
  hw/xen: do not use aio_set_fd_handler(is_external=true) in xen_xenstore
  block/export: stop using is_external in vhost-user-blk server
  block/export: wait for vhost-user-blk requests when draining
  util/vhost-user-server: rename refcount to in_flight counter
  virtio-scsi: stop using aio_disable_external() during unplug
  virtio-scsi: avoid race between unplug and transport event
  hw/qdev: introduce qdev_is_realized() helper
  block-backend: split blk_do_set_aio_context()
  ...

Signed-off-by: Richard Henderson <richard.henderson@linaro.org>
master
Richard Henderson 2023-05-30 09:48:55 -07:00
commit f89f54d52b
70 changed files with 936 additions and 567 deletions

46
block.c
View File

@ -1613,6 +1613,7 @@ static int no_coroutine_fn GRAPH_UNLOCKED
bdrv_open_driver(BlockDriverState *bs, BlockDriver *drv, const char *node_name,
QDict *options, int open_flags, Error **errp)
{
AioContext *ctx;
Error *local_err = NULL;
int i, ret;
GLOBAL_STATE_CODE();
@ -1660,15 +1661,21 @@ bdrv_open_driver(BlockDriverState *bs, BlockDriver *drv, const char *node_name,
bs->supported_read_flags |= BDRV_REQ_REGISTERED_BUF;
bs->supported_write_flags |= BDRV_REQ_REGISTERED_BUF;
/* Get the context after .bdrv_open, it can change the context */
ctx = bdrv_get_aio_context(bs);
aio_context_acquire(ctx);
ret = bdrv_refresh_total_sectors(bs, bs->total_sectors);
if (ret < 0) {
error_setg_errno(errp, -ret, "Could not refresh total sector count");
aio_context_release(ctx);
return ret;
}
bdrv_graph_rdlock_main_loop();
bdrv_refresh_limits(bs, NULL, &local_err);
bdrv_graph_rdunlock_main_loop();
aio_context_release(ctx);
if (local_err) {
error_propagate(errp, local_err);
@ -3478,6 +3485,8 @@ int bdrv_set_backing_hd(BlockDriverState *bs, BlockDriverState *backing_hd,
* itself, all options starting with "${bdref_key}." are considered part of the
* BlockdevRef.
*
* The caller must hold the main AioContext lock.
*
* TODO Can this be unified with bdrv_open_image()?
*/
int bdrv_open_backing_file(BlockDriverState *bs, QDict *parent_options,
@ -3644,6 +3653,9 @@ done:
* BlockdevRef.
*
* The BlockdevRef will be removed from the options QDict.
*
* @parent can move to a different AioContext in this function. Callers must
* make sure that their AioContext locking is still correct after this.
*/
BdrvChild *bdrv_open_child(const char *filename,
QDict *options, const char *bdref_key,
@ -3668,6 +3680,9 @@ BdrvChild *bdrv_open_child(const char *filename,
/*
* Wrapper on bdrv_open_child() for most popular case: open primary child of bs.
*
* @parent can move to a different AioContext in this function. Callers must
* make sure that their AioContext locking is still correct after this.
*/
int bdrv_open_file_child(const char *filename,
QDict *options, const char *bdref_key,
@ -3810,9 +3825,7 @@ out:
* should be opened. If specified, neither options nor a filename may be given,
* nor can an existing BDS be reused (that is, *pbs has to be NULL).
*
* The caller must always hold @filename AioContext lock, because this
* function eventually calls bdrv_refresh_total_sectors() which polls
* when called from non-coroutine context.
* The caller must always hold the main AioContext lock.
*/
static BlockDriverState * no_coroutine_fn
bdrv_open_inherit(const char *filename, const char *reference, QDict *options,
@ -4100,11 +4113,7 @@ close_and_fail:
return NULL;
}
/*
* The caller must always hold @filename AioContext lock, because this
* function eventually calls bdrv_refresh_total_sectors() which polls
* when called from non-coroutine context.
*/
/* The caller must always hold the main AioContext lock. */
BlockDriverState *bdrv_open(const char *filename, const char *reference,
QDict *options, int flags, Error **errp)
{
@ -5390,12 +5399,17 @@ static void bdrv_delete(BlockDriverState *bs)
* empty set of options. The reference to the QDict belongs to the block layer
* after the call (even on failure), so if the caller intends to reuse the
* dictionary, it needs to use qobject_ref() before calling bdrv_open.
*
* The caller holds the AioContext lock for @bs. It must make sure that @bs
* stays in the same AioContext, i.e. @options must not refer to nodes in a
* different AioContext.
*/
BlockDriverState *bdrv_insert_node(BlockDriverState *bs, QDict *options,
int flags, Error **errp)
{
ERRP_GUARD();
int ret;
AioContext *ctx = bdrv_get_aio_context(bs);
BlockDriverState *new_node_bs = NULL;
const char *drvname, *node_name;
BlockDriver *drv;
@ -5416,8 +5430,14 @@ BlockDriverState *bdrv_insert_node(BlockDriverState *bs, QDict *options,
GLOBAL_STATE_CODE();
aio_context_release(ctx);
aio_context_acquire(qemu_get_aio_context());
new_node_bs = bdrv_new_open_driver_opts(drv, node_name, options, flags,
errp);
aio_context_release(qemu_get_aio_context());
aio_context_acquire(ctx);
assert(bdrv_get_aio_context(bs) == ctx);
options = NULL; /* bdrv_new_open_driver() eats options */
if (!new_node_bs) {
error_prepend(errp, "Could not create node: ");
@ -7043,6 +7063,8 @@ void bdrv_img_create(const char *filename, const char *fmt,
return;
}
aio_context_acquire(qemu_get_aio_context());
/* Create parameter list */
create_opts = qemu_opts_append(create_opts, drv->create_opts);
create_opts = qemu_opts_append(create_opts, proto_drv->create_opts);
@ -7192,6 +7214,7 @@ out:
qemu_opts_del(opts);
qemu_opts_free(create_opts);
error_propagate(errp, local_err);
aio_context_release(qemu_get_aio_context());
}
AioContext *bdrv_get_aio_context(BlockDriverState *bs)
@ -7282,9 +7305,6 @@ static void bdrv_detach_aio_context(BlockDriverState *bs)
bs->drv->bdrv_detach_aio_context(bs);
}
if (bs->quiesce_counter) {
aio_enable_external(bs->aio_context);
}
bs->aio_context = NULL;
}
@ -7294,10 +7314,6 @@ static void bdrv_attach_aio_context(BlockDriverState *bs,
BdrvAioNotifier *ban, *ban_tmp;
GLOBAL_STATE_CODE();
if (bs->quiesce_counter) {
aio_disable_external(new_context);
}
bs->aio_context = new_context;
if (bs->drv && bs->drv->bdrv_attach_aio_context) {

View File

@ -306,23 +306,18 @@ static void blkio_attach_aio_context(BlockDriverState *bs,
{
BDRVBlkioState *s = bs->opaque;
aio_set_fd_handler(new_context,
s->completion_fd,
false,
blkio_completion_fd_read,
NULL,
aio_set_fd_handler(new_context, s->completion_fd,
blkio_completion_fd_read, NULL,
blkio_completion_fd_poll,
blkio_completion_fd_poll_ready,
bs);
blkio_completion_fd_poll_ready, bs);
}
static void blkio_detach_aio_context(BlockDriverState *bs)
{
BDRVBlkioState *s = bs->opaque;
aio_set_fd_handler(bdrv_get_aio_context(bs),
s->completion_fd,
false, NULL, NULL, NULL, NULL, NULL);
aio_set_fd_handler(bdrv_get_aio_context(bs), s->completion_fd, NULL, NULL,
NULL, NULL, NULL);
}
/* Call with s->blkio_lock held to submit I/O after enqueuing a new request */

View File

@ -389,6 +389,8 @@ BlockBackend *blk_new(AioContext *ctx, uint64_t perm, uint64_t shared_perm)
* Both sets of permissions can be changed later using blk_set_perm().
*
* Return the new BlockBackend on success, null on failure.
*
* Callers must hold the AioContext lock of @bs.
*/
BlockBackend *blk_new_with_bs(BlockDriverState *bs, uint64_t perm,
uint64_t shared_perm, Error **errp)
@ -406,11 +408,15 @@ BlockBackend *blk_new_with_bs(BlockDriverState *bs, uint64_t perm,
/*
* Creates a new BlockBackend, opens a new BlockDriverState, and connects both.
* The new BlockBackend is in the main AioContext.
* By default, the new BlockBackend is in the main AioContext, but if the
* parameters connect it with any existing node in a different AioContext, it
* may end up there instead.
*
* Just as with bdrv_open(), after having called this function the reference to
* @options belongs to the block layer (even on failure).
*
* Called without holding an AioContext lock.
*
* TODO: Remove @filename and @flags; it should be possible to specify a whole
* BDS tree just by specifying the @options QDict (or @reference,
* alternatively). At the time of adding this function, this is not possible,
@ -422,6 +428,7 @@ BlockBackend *blk_new_open(const char *filename, const char *reference,
{
BlockBackend *blk;
BlockDriverState *bs;
AioContext *ctx;
uint64_t perm = 0;
uint64_t shared = BLK_PERM_ALL;
@ -451,16 +458,24 @@ BlockBackend *blk_new_open(const char *filename, const char *reference,
shared = BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE_UNCHANGED;
}
blk = blk_new(qemu_get_aio_context(), perm, shared);
aio_context_acquire(qemu_get_aio_context());
bs = bdrv_open(filename, reference, options, flags, errp);
aio_context_release(qemu_get_aio_context());
if (!bs) {
blk_unref(blk);
return NULL;
}
blk->root = bdrv_root_attach_child(bs, "root", &child_root,
BDRV_CHILD_FILTERED | BDRV_CHILD_PRIMARY,
perm, shared, blk, errp);
/* bdrv_open() could have moved bs to a different AioContext */
ctx = bdrv_get_aio_context(bs);
blk = blk_new(bdrv_get_aio_context(bs), perm, shared);
blk->perm = perm;
blk->shared_perm = shared;
aio_context_acquire(ctx);
blk_insert_bs(blk, bs, errp);
bdrv_unref(bs);
aio_context_release(ctx);
if (!blk->root) {
blk_unref(blk);
return NULL;
@ -901,6 +916,8 @@ void blk_remove_bs(BlockBackend *blk)
/*
* Associates a new BlockDriverState with @blk.
*
* Callers must hold the AioContext lock of @bs.
*/
int blk_insert_bs(BlockBackend *blk, BlockDriverState *bs, Error **errp)
{
@ -1270,6 +1287,13 @@ blk_check_byte_request(BlockBackend *blk, int64_t offset, int64_t bytes)
return 0;
}
/* Are we currently in a drained section? */
bool blk_in_drain(BlockBackend *blk)
{
GLOBAL_STATE_CODE(); /* change to IO_OR_GS_CODE(), if necessary */
return qatomic_read(&blk->quiesce_counter);
}
/* To be called between exactly one pair of blk_inc/dec_in_flight() */
static void coroutine_fn blk_wait_while_drained(BlockBackend *blk)
{
@ -2394,9 +2418,14 @@ void blk_op_unblock_all(BlockBackend *blk, Error *reason)
AioContext *blk_get_aio_context(BlockBackend *blk)
{
BlockDriverState *bs = blk_bs(blk);
BlockDriverState *bs;
IO_CODE();
if (!blk) {
return qemu_get_aio_context();
}
bs = blk_bs(blk);
if (bs) {
AioContext *ctx = bdrv_get_aio_context(blk_bs(blk));
assert(ctx == blk->ctx);
@ -2411,52 +2440,31 @@ static AioContext *blk_aiocb_get_aio_context(BlockAIOCB *acb)
return blk_get_aio_context(blk_acb->blk);
}
static int blk_do_set_aio_context(BlockBackend *blk, AioContext *new_context,
bool update_root_node, Error **errp)
{
BlockDriverState *bs = blk_bs(blk);
ThrottleGroupMember *tgm = &blk->public.throttle_group_member;
int ret;
if (bs) {
bdrv_ref(bs);
if (update_root_node) {
/*
* update_root_node MUST be false for blk_root_set_aio_ctx_commit(),
* as we are already in the commit function of a transaction.
*/
ret = bdrv_try_change_aio_context(bs, new_context, blk->root, errp);
if (ret < 0) {
bdrv_unref(bs);
return ret;
}
}
/*
* Make blk->ctx consistent with the root node before we invoke any
* other operations like drain that might inquire blk->ctx
*/
blk->ctx = new_context;
if (tgm->throttle_state) {
bdrv_drained_begin(bs);
throttle_group_detach_aio_context(tgm);
throttle_group_attach_aio_context(tgm, new_context);
bdrv_drained_end(bs);
}
bdrv_unref(bs);
} else {
blk->ctx = new_context;
}
return 0;
}
int blk_set_aio_context(BlockBackend *blk, AioContext *new_context,
Error **errp)
{
bool old_allow_change;
BlockDriverState *bs = blk_bs(blk);
int ret;
GLOBAL_STATE_CODE();
return blk_do_set_aio_context(blk, new_context, true, errp);
if (!bs) {
blk->ctx = new_context;
return 0;
}
bdrv_ref(bs);
old_allow_change = blk->allow_aio_context_change;
blk->allow_aio_context_change = true;
ret = bdrv_try_change_aio_context(bs, new_context, NULL, errp);
blk->allow_aio_context_change = old_allow_change;
bdrv_unref(bs);
return ret;
}
typedef struct BdrvStateBlkRootContext {
@ -2468,8 +2476,14 @@ static void blk_root_set_aio_ctx_commit(void *opaque)
{
BdrvStateBlkRootContext *s = opaque;
BlockBackend *blk = s->blk;
AioContext *new_context = s->new_ctx;
ThrottleGroupMember *tgm = &blk->public.throttle_group_member;
blk_do_set_aio_context(blk, s->new_ctx, false, &error_abort);
blk->ctx = new_context;
if (tgm->throttle_state) {
throttle_group_detach_aio_context(tgm);
throttle_group_attach_aio_context(tgm, new_context);
}
}
static TransactionActionDrv set_blk_root_context = {

View File

@ -412,6 +412,7 @@ static int cbw_open(BlockDriverState *bs, QDict *options, int flags,
int64_t cluster_size;
g_autoptr(BlockdevOptions) full_opts = NULL;
BlockdevOptionsCbw *opts;
AioContext *ctx;
int ret;
full_opts = cbw_parse_options(options, errp);
@ -432,11 +433,15 @@ static int cbw_open(BlockDriverState *bs, QDict *options, int flags,
return -EINVAL;
}
ctx = bdrv_get_aio_context(bs);
aio_context_acquire(ctx);
if (opts->bitmap) {
bitmap = block_dirty_bitmap_lookup(opts->bitmap->node,
opts->bitmap->name, NULL, errp);
if (!bitmap) {
return -EINVAL;
ret = -EINVAL;
goto out;
}
}
s->on_cbw_error = opts->has_on_cbw_error ? opts->on_cbw_error :
@ -454,21 +459,24 @@ static int cbw_open(BlockDriverState *bs, QDict *options, int flags,
s->bcs = block_copy_state_new(bs->file, s->target, bitmap, errp);
if (!s->bcs) {
error_prepend(errp, "Cannot create block-copy-state: ");
return -EINVAL;
ret = -EINVAL;
goto out;
}
cluster_size = block_copy_cluster_size(s->bcs);
s->done_bitmap = bdrv_create_dirty_bitmap(bs, cluster_size, NULL, errp);
if (!s->done_bitmap) {
return -EINVAL;
ret = -EINVAL;
goto out;
}
bdrv_disable_dirty_bitmap(s->done_bitmap);
/* s->access_bitmap starts equal to bcs bitmap */
s->access_bitmap = bdrv_create_dirty_bitmap(bs, cluster_size, NULL, errp);
if (!s->access_bitmap) {
return -EINVAL;
ret = -EINVAL;
goto out;
}
bdrv_disable_dirty_bitmap(s->access_bitmap);
bdrv_dirty_bitmap_merge_internal(s->access_bitmap,
@ -478,7 +486,10 @@ static int cbw_open(BlockDriverState *bs, QDict *options, int flags,
qemu_co_mutex_init(&s->lock);
QLIST_INIT(&s->frozen_read_reqs);
return 0;
ret = 0;
out:
aio_context_release(ctx);
return ret;
}
static void cbw_close(BlockDriverState *bs)

View File

@ -132,7 +132,7 @@ static gboolean curl_drop_socket(void *key, void *value, void *opaque)
CURLSocket *socket = value;
BDRVCURLState *s = socket->s;
aio_set_fd_handler(s->aio_context, socket->fd, false,
aio_set_fd_handler(s->aio_context, socket->fd,
NULL, NULL, NULL, NULL, NULL);
return true;
}
@ -180,20 +180,20 @@ static int curl_sock_cb(CURL *curl, curl_socket_t fd, int action,
trace_curl_sock_cb(action, (int)fd);
switch (action) {
case CURL_POLL_IN:
aio_set_fd_handler(s->aio_context, fd, false,
aio_set_fd_handler(s->aio_context, fd,
curl_multi_do, NULL, NULL, NULL, socket);
break;
case CURL_POLL_OUT:
aio_set_fd_handler(s->aio_context, fd, false,
aio_set_fd_handler(s->aio_context, fd,
NULL, curl_multi_do, NULL, NULL, socket);
break;
case CURL_POLL_INOUT:
aio_set_fd_handler(s->aio_context, fd, false,
aio_set_fd_handler(s->aio_context, fd,
curl_multi_do, curl_multi_do,
NULL, NULL, socket);
break;
case CURL_POLL_REMOVE:
aio_set_fd_handler(s->aio_context, fd, false,
aio_set_fd_handler(s->aio_context, fd,
NULL, NULL, NULL, NULL, NULL);
break;
}

View File

@ -204,11 +204,10 @@ fail:
return NULL;
}
/* Callers must hold exp->ctx lock */
void blk_exp_ref(BlockExport *exp)
{
assert(exp->refcount > 0);
exp->refcount++;
assert(qatomic_read(&exp->refcount) > 0);
qatomic_inc(&exp->refcount);
}
/* Runs in the main thread */
@ -231,11 +230,10 @@ static void blk_exp_delete_bh(void *opaque)
aio_context_release(aio_context);
}
/* Callers must hold exp->ctx lock */
void blk_exp_unref(BlockExport *exp)
{
assert(exp->refcount > 0);
if (--exp->refcount == 0) {
assert(qatomic_read(&exp->refcount) > 0);
if (qatomic_fetch_dec(&exp->refcount) == 1) {
/* Touch the block_exports list only in the main thread */
aio_bh_schedule_oneshot(qemu_get_aio_context(), blk_exp_delete_bh,
exp);
@ -343,7 +341,8 @@ void qmp_block_export_del(const char *id,
if (!has_mode) {
mode = BLOCK_EXPORT_REMOVE_MODE_SAFE;
}
if (mode == BLOCK_EXPORT_REMOVE_MODE_SAFE && exp->refcount > 1) {
if (mode == BLOCK_EXPORT_REMOVE_MODE_SAFE &&
qatomic_read(&exp->refcount) > 1) {
error_setg(errp, "export '%s' still in use", exp->id);
error_append_hint(errp, "Use mode='hard' to force client "
"disconnect\n");

View File

@ -50,6 +50,7 @@ typedef struct FuseExport {
struct fuse_session *fuse_session;
struct fuse_buf fuse_buf;
unsigned int in_flight; /* atomic */
bool mounted, fd_handler_set_up;
char *mountpoint;
@ -78,6 +79,42 @@ static void read_from_fuse_export(void *opaque);
static bool is_regular_file(const char *path, Error **errp);
static void fuse_export_drained_begin(void *opaque)
{
FuseExport *exp = opaque;
aio_set_fd_handler(exp->common.ctx,
fuse_session_fd(exp->fuse_session),
NULL, NULL, NULL, NULL, NULL);
exp->fd_handler_set_up = false;
}
static void fuse_export_drained_end(void *opaque)
{
FuseExport *exp = opaque;
/* Refresh AioContext in case it changed */
exp->common.ctx = blk_get_aio_context(exp->common.blk);
aio_set_fd_handler(exp->common.ctx,
fuse_session_fd(exp->fuse_session),
read_from_fuse_export, NULL, NULL, NULL, exp);
exp->fd_handler_set_up = true;
}
static bool fuse_export_drained_poll(void *opaque)
{
FuseExport *exp = opaque;
return qatomic_read(&exp->in_flight) > 0;
}
static const BlockDevOps fuse_export_blk_dev_ops = {
.drained_begin = fuse_export_drained_begin,
.drained_end = fuse_export_drained_end,
.drained_poll = fuse_export_drained_poll,
};
static int fuse_export_create(BlockExport *blk_exp,
BlockExportOptions *blk_exp_args,
Error **errp)
@ -101,6 +138,15 @@ static int fuse_export_create(BlockExport *blk_exp,
}
}
blk_set_dev_ops(exp->common.blk, &fuse_export_blk_dev_ops, exp);
/*
* We handle draining ourselves using an in-flight counter and by disabling
* the FUSE fd handler. Do not queue BlockBackend requests, they need to
* complete so the in-flight counter reaches zero.
*/
blk_set_disable_request_queuing(exp->common.blk, true);
init_exports_table();
/*
@ -224,7 +270,7 @@ static int setup_fuse_export(FuseExport *exp, const char *mountpoint,
g_hash_table_insert(exports, g_strdup(mountpoint), NULL);
aio_set_fd_handler(exp->common.ctx,
fuse_session_fd(exp->fuse_session), true,
fuse_session_fd(exp->fuse_session),
read_from_fuse_export, NULL, NULL, NULL, exp);
exp->fd_handler_set_up = true;
@ -246,6 +292,8 @@ static void read_from_fuse_export(void *opaque)
blk_exp_ref(&exp->common);
qatomic_inc(&exp->in_flight);
do {
ret = fuse_session_receive_buf(exp->fuse_session, &exp->fuse_buf);
} while (ret == -EINTR);
@ -256,6 +304,10 @@ static void read_from_fuse_export(void *opaque)
fuse_session_process_buf(exp->fuse_session, &exp->fuse_buf);
out:
if (qatomic_fetch_dec(&exp->in_flight) == 1) {
aio_wait_kick(); /* wake AIO_WAIT_WHILE() */
}
blk_exp_unref(&exp->common);
}
@ -268,7 +320,7 @@ static void fuse_export_shutdown(BlockExport *blk_exp)
if (exp->fd_handler_set_up) {
aio_set_fd_handler(exp->common.ctx,
fuse_session_fd(exp->fuse_session), true,
fuse_session_fd(exp->fuse_session),
NULL, NULL, NULL, NULL, NULL);
exp->fd_handler_set_up = false;
}

View File

@ -31,7 +31,8 @@ typedef struct VduseBlkExport {
VduseDev *dev;
uint16_t num_queues;
char *recon_file;
unsigned int inflight;
unsigned int inflight; /* atomic */
bool vqs_started;
} VduseBlkExport;
typedef struct VduseBlkReq {
@ -41,13 +42,20 @@ typedef struct VduseBlkReq {
static void vduse_blk_inflight_inc(VduseBlkExport *vblk_exp)
{
vblk_exp->inflight++;
if (qatomic_fetch_inc(&vblk_exp->inflight) == 0) {
/* Prevent export from being deleted */
blk_exp_ref(&vblk_exp->export);
}
}
static void vduse_blk_inflight_dec(VduseBlkExport *vblk_exp)
{
if (--vblk_exp->inflight == 0) {
if (qatomic_fetch_dec(&vblk_exp->inflight) == 1) {
/* Wake AIO_WAIT_WHILE() */
aio_wait_kick();
/* Now the export can be deleted */
blk_exp_unref(&vblk_exp->export);
}
}
@ -124,8 +132,12 @@ static void vduse_blk_enable_queue(VduseDev *dev, VduseVirtq *vq)
{
VduseBlkExport *vblk_exp = vduse_dev_get_priv(dev);
if (!vblk_exp->vqs_started) {
return; /* vduse_blk_drained_end() will start vqs later */
}
aio_set_fd_handler(vblk_exp->export.ctx, vduse_queue_get_fd(vq),
true, on_vduse_vq_kick, NULL, NULL, NULL, vq);
on_vduse_vq_kick, NULL, NULL, NULL, vq);
/* Make sure we don't miss any kick afer reconnecting */
eventfd_write(vduse_queue_get_fd(vq), 1);
}
@ -133,9 +145,14 @@ static void vduse_blk_enable_queue(VduseDev *dev, VduseVirtq *vq)
static void vduse_blk_disable_queue(VduseDev *dev, VduseVirtq *vq)
{
VduseBlkExport *vblk_exp = vduse_dev_get_priv(dev);
int fd = vduse_queue_get_fd(vq);
aio_set_fd_handler(vblk_exp->export.ctx, vduse_queue_get_fd(vq),
true, NULL, NULL, NULL, NULL, NULL);
if (fd < 0) {
return;
}
aio_set_fd_handler(vblk_exp->export.ctx, fd,
NULL, NULL, NULL, NULL, NULL);
}
static const VduseOps vduse_blk_ops = {
@ -152,42 +169,19 @@ static void on_vduse_dev_kick(void *opaque)
static void vduse_blk_attach_ctx(VduseBlkExport *vblk_exp, AioContext *ctx)
{
int i;
aio_set_fd_handler(vblk_exp->export.ctx, vduse_dev_get_fd(vblk_exp->dev),
true, on_vduse_dev_kick, NULL, NULL, NULL,
on_vduse_dev_kick, NULL, NULL, NULL,
vblk_exp->dev);
for (i = 0; i < vblk_exp->num_queues; i++) {
VduseVirtq *vq = vduse_dev_get_queue(vblk_exp->dev, i);
int fd = vduse_queue_get_fd(vq);
if (fd < 0) {
continue;
}
aio_set_fd_handler(vblk_exp->export.ctx, fd, true,
on_vduse_vq_kick, NULL, NULL, NULL, vq);
}
/* Virtqueues are handled by vduse_blk_drained_end() */
}
static void vduse_blk_detach_ctx(VduseBlkExport *vblk_exp)
{
int i;
for (i = 0; i < vblk_exp->num_queues; i++) {
VduseVirtq *vq = vduse_dev_get_queue(vblk_exp->dev, i);
int fd = vduse_queue_get_fd(vq);
if (fd < 0) {
continue;
}
aio_set_fd_handler(vblk_exp->export.ctx, fd,
true, NULL, NULL, NULL, NULL, NULL);
}
aio_set_fd_handler(vblk_exp->export.ctx, vduse_dev_get_fd(vblk_exp->dev),
true, NULL, NULL, NULL, NULL, NULL);
NULL, NULL, NULL, NULL, NULL);
AIO_WAIT_WHILE(vblk_exp->export.ctx, vblk_exp->inflight > 0);
/* Virtqueues are handled by vduse_blk_drained_begin() */
}
@ -220,8 +214,55 @@ static void vduse_blk_resize(void *opaque)
(char *)&config.capacity);
}
static void vduse_blk_stop_virtqueues(VduseBlkExport *vblk_exp)
{
for (uint16_t i = 0; i < vblk_exp->num_queues; i++) {
VduseVirtq *vq = vduse_dev_get_queue(vblk_exp->dev, i);
vduse_blk_disable_queue(vblk_exp->dev, vq);
}
vblk_exp->vqs_started = false;
}
static void vduse_blk_start_virtqueues(VduseBlkExport *vblk_exp)
{
vblk_exp->vqs_started = true;
for (uint16_t i = 0; i < vblk_exp->num_queues; i++) {
VduseVirtq *vq = vduse_dev_get_queue(vblk_exp->dev, i);
vduse_blk_enable_queue(vblk_exp->dev, vq);
}
}
static void vduse_blk_drained_begin(void *opaque)
{
BlockExport *exp = opaque;
VduseBlkExport *vblk_exp = container_of(exp, VduseBlkExport, export);
vduse_blk_stop_virtqueues(vblk_exp);
}
static void vduse_blk_drained_end(void *opaque)
{
BlockExport *exp = opaque;
VduseBlkExport *vblk_exp = container_of(exp, VduseBlkExport, export);
vduse_blk_start_virtqueues(vblk_exp);
}
static bool vduse_blk_drained_poll(void *opaque)
{
BlockExport *exp = opaque;
VduseBlkExport *vblk_exp = container_of(exp, VduseBlkExport, export);
return qatomic_read(&vblk_exp->inflight) > 0;
}
static const BlockDevOps vduse_block_ops = {
.resize_cb = vduse_blk_resize,
.resize_cb = vduse_blk_resize,
.drained_begin = vduse_blk_drained_begin,
.drained_end = vduse_blk_drained_end,
.drained_poll = vduse_blk_drained_poll,
};
static int vduse_blk_exp_create(BlockExport *exp, BlockExportOptions *opts,
@ -268,6 +309,7 @@ static int vduse_blk_exp_create(BlockExport *exp, BlockExportOptions *opts,
vblk_exp->handler.serial = g_strdup(vblk_opts->serial ?: "");
vblk_exp->handler.logical_block_size = logical_block_size;
vblk_exp->handler.writable = opts->writable;
vblk_exp->vqs_started = true;
config.capacity =
cpu_to_le64(blk_getlength(exp->blk) >> VIRTIO_BLK_SECTOR_BITS);
@ -322,14 +364,20 @@ static int vduse_blk_exp_create(BlockExport *exp, BlockExportOptions *opts,
vduse_dev_setup_queue(vblk_exp->dev, i, queue_size);
}
aio_set_fd_handler(exp->ctx, vduse_dev_get_fd(vblk_exp->dev), true,
aio_set_fd_handler(exp->ctx, vduse_dev_get_fd(vblk_exp->dev),
on_vduse_dev_kick, NULL, NULL, NULL, vblk_exp->dev);
blk_add_aio_context_notifier(exp->blk, blk_aio_attached, blk_aio_detach,
vblk_exp);
blk_set_dev_ops(exp->blk, &vduse_block_ops, exp);
/*
* We handle draining ourselves using an in-flight counter and by disabling
* virtqueue fd handlers. Do not queue BlockBackend requests, they need to
* complete so the in-flight counter reaches zero.
*/
blk_set_disable_request_queuing(exp->blk, true);
return 0;
err:
vduse_dev_destroy(vblk_exp->dev);
@ -344,6 +392,9 @@ static void vduse_blk_exp_delete(BlockExport *exp)
VduseBlkExport *vblk_exp = container_of(exp, VduseBlkExport, export);
int ret;
assert(qatomic_read(&vblk_exp->inflight) == 0);
vduse_blk_detach_ctx(vblk_exp);
blk_remove_aio_context_notifier(exp->blk, blk_aio_attached, blk_aio_detach,
vblk_exp);
ret = vduse_dev_destroy(vblk_exp->dev);
@ -354,13 +405,12 @@ static void vduse_blk_exp_delete(BlockExport *exp)
g_free(vblk_exp->handler.serial);
}
/* Called with exp->ctx acquired */
static void vduse_blk_exp_request_shutdown(BlockExport *exp)
{
VduseBlkExport *vblk_exp = container_of(exp, VduseBlkExport, export);
aio_context_acquire(vblk_exp->export.ctx);
vduse_blk_detach_ctx(vblk_exp);
aio_context_acquire(vblk_exp->export.ctx);
vduse_blk_stop_virtqueues(vblk_exp);
}
const BlockExportDriver blk_exp_vduse_blk = {

View File

@ -50,7 +50,10 @@ static void vu_blk_req_complete(VuBlkReq *req, size_t in_len)
free(req);
}
/* Called with server refcount increased, must decrease before returning */
/*
* Called with server in_flight counter increased, must decrease before
* returning.
*/
static void coroutine_fn vu_blk_virtio_process_req(void *opaque)
{
VuBlkReq *req = opaque;
@ -68,12 +71,12 @@ static void coroutine_fn vu_blk_virtio_process_req(void *opaque)
in_num, out_num);
if (in_len < 0) {
free(req);
vhost_user_server_unref(server);
vhost_user_server_dec_in_flight(server);
return;
}
vu_blk_req_complete(req, in_len);
vhost_user_server_unref(server);
vhost_user_server_dec_in_flight(server);
}
static void vu_blk_process_vq(VuDev *vu_dev, int idx)
@ -95,7 +98,7 @@ static void vu_blk_process_vq(VuDev *vu_dev, int idx)
Coroutine *co =
qemu_coroutine_create(vu_blk_virtio_process_req, req);
vhost_user_server_ref(server);
vhost_user_server_inc_in_flight(server);
qemu_coroutine_enter(co);
}
}
@ -209,15 +212,21 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
{
VuBlkExport *vexp = opaque;
/*
* The actual attach will happen in vu_blk_drained_end() and we just
* restore ctx here.
*/
vexp->export.ctx = ctx;
vhost_user_server_attach_aio_context(&vexp->vu_server, ctx);
}
static void blk_aio_detach(void *opaque)
{
VuBlkExport *vexp = opaque;
vhost_user_server_detach_aio_context(&vexp->vu_server);
/*
* The actual detach already happened in vu_blk_drained_begin() but from
* this point on we must not access ctx anymore.
*/
vexp->export.ctx = NULL;
}
@ -269,7 +278,38 @@ static void vu_blk_exp_resize(void *opaque)
vu_config_change_msg(&vexp->vu_server.vu_dev);
}
/* Called with vexp->export.ctx acquired */
static void vu_blk_drained_begin(void *opaque)
{
VuBlkExport *vexp = opaque;
vhost_user_server_detach_aio_context(&vexp->vu_server);
}
/* Called with vexp->export.blk AioContext acquired */
static void vu_blk_drained_end(void *opaque)
{
VuBlkExport *vexp = opaque;
vhost_user_server_attach_aio_context(&vexp->vu_server, vexp->export.ctx);
}
/*
* Ensures that bdrv_drained_begin() waits until in-flight requests complete.
*
* Called with vexp->export.ctx acquired.
*/
static bool vu_blk_drained_poll(void *opaque)
{
VuBlkExport *vexp = opaque;
return vhost_user_server_has_in_flight(&vexp->vu_server);
}
static const BlockDevOps vu_blk_dev_ops = {
.drained_begin = vu_blk_drained_begin,
.drained_end = vu_blk_drained_end,
.drained_poll = vu_blk_drained_poll,
.resize_cb = vu_blk_exp_resize,
};

View File

@ -60,7 +60,7 @@ static void bdrv_parent_drained_begin(BlockDriverState *bs, BdrvChild *ignore)
void bdrv_parent_drained_end_single(BdrvChild *c)
{
IO_OR_GS_CODE();
GLOBAL_STATE_CODE();
assert(c->quiesced_parent);
c->quiesced_parent = false;
@ -108,7 +108,7 @@ static bool bdrv_parent_drained_poll(BlockDriverState *bs, BdrvChild *ignore,
void bdrv_parent_drained_begin_single(BdrvChild *c)
{
IO_OR_GS_CODE();
GLOBAL_STATE_CODE();
assert(!c->quiesced_parent);
c->quiesced_parent = true;
@ -247,7 +247,7 @@ typedef struct {
bool bdrv_drain_poll(BlockDriverState *bs, BdrvChild *ignore_parent,
bool ignore_bds_parents)
{
IO_OR_GS_CODE();
GLOBAL_STATE_CODE();
if (bdrv_parent_drained_poll(bs, ignore_parent, ignore_bds_parents)) {
return true;
@ -334,7 +334,8 @@ static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs,
if (ctx != co_ctx) {
aio_context_release(ctx);
}
replay_bh_schedule_oneshot_event(ctx, bdrv_co_drain_bh_cb, &data);
replay_bh_schedule_oneshot_event(qemu_get_aio_context(),
bdrv_co_drain_bh_cb, &data);
qemu_coroutine_yield();
/* If we are resumed from some other event (such as an aio completion or a
@ -357,9 +358,10 @@ static void bdrv_do_drained_begin(BlockDriverState *bs, BdrvChild *parent,
return;
}
GLOBAL_STATE_CODE();
/* Stop things in parent-to-child order */
if (qatomic_fetch_inc(&bs->quiesce_counter) == 0) {
aio_disable_external(bdrv_get_aio_context(bs));
bdrv_parent_drained_begin(bs, parent);
if (bs->drv && bs->drv->bdrv_drain_begin) {
bs->drv->bdrv_drain_begin(bs);
@ -399,11 +401,14 @@ static void bdrv_do_drained_end(BlockDriverState *bs, BdrvChild *parent)
{
int old_quiesce_counter;
IO_OR_GS_CODE();
if (qemu_in_coroutine()) {
bdrv_co_yield_to_drain(bs, false, parent, false);
return;
}
assert(bs->quiesce_counter > 0);
GLOBAL_STATE_CODE();
/* Re-enable things in child-to-parent order */
old_quiesce_counter = qatomic_fetch_dec(&bs->quiesce_counter);
@ -412,7 +417,6 @@ static void bdrv_do_drained_end(BlockDriverState *bs, BdrvChild *parent)
bs->drv->bdrv_drain_end(bs);
}
bdrv_parent_drained_end(bs, parent);
aio_enable_external(bdrv_get_aio_context(bs));
}
}

View File

@ -410,7 +410,7 @@ int coroutine_fn luring_co_submit(BlockDriverState *bs, int fd, uint64_t offset,
void luring_detach_aio_context(LuringState *s, AioContext *old_context)
{
aio_set_fd_handler(old_context, s->ring.ring_fd, false,
aio_set_fd_handler(old_context, s->ring.ring_fd,
NULL, NULL, NULL, NULL, s);
qemu_bh_delete(s->completion_bh);
s->aio_context = NULL;
@ -420,7 +420,7 @@ void luring_attach_aio_context(LuringState *s, AioContext *new_context)
{
s->aio_context = new_context;
s->completion_bh = aio_bh_new(new_context, qemu_luring_completion_bh, s);
aio_set_fd_handler(s->aio_context, s->ring.ring_fd, false,
aio_set_fd_handler(s->aio_context, s->ring.ring_fd,
qemu_luring_completion_cb, NULL,
qemu_luring_poll_cb, qemu_luring_poll_ready, s);
}

View File

@ -363,7 +363,6 @@ iscsi_set_events(IscsiLun *iscsilun)
if (ev != iscsilun->events) {
aio_set_fd_handler(iscsilun->aio_context, iscsi_get_fd(iscsi),
false,
(ev & POLLIN) ? iscsi_process_read : NULL,
(ev & POLLOUT) ? iscsi_process_write : NULL,
NULL, NULL,
@ -1540,7 +1539,7 @@ static void iscsi_detach_aio_context(BlockDriverState *bs)
IscsiLun *iscsilun = bs->opaque;
aio_set_fd_handler(iscsilun->aio_context, iscsi_get_fd(iscsilun->iscsi),
false, NULL, NULL, NULL, NULL, NULL);
NULL, NULL, NULL, NULL, NULL);
iscsilun->events = 0;
if (iscsilun->nop_timer) {

View File

@ -446,7 +446,7 @@ int coroutine_fn laio_co_submit(int fd, uint64_t offset, QEMUIOVector *qiov,
void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context)
{
aio_set_event_notifier(old_context, &s->e, false, NULL, NULL, NULL);
aio_set_event_notifier(old_context, &s->e, NULL, NULL, NULL);
qemu_bh_delete(s->completion_bh);
s->aio_context = NULL;
}
@ -455,7 +455,7 @@ void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context)
{
s->aio_context = new_context;
s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
aio_set_event_notifier(new_context, &s->e, false,
aio_set_event_notifier(new_context, &s->e,
qemu_laio_completion_cb,
qemu_laio_poll_cb,
qemu_laio_poll_ready);

View File

@ -662,11 +662,15 @@ static int mirror_exit_common(Job *job)
bool abort = job->ret < 0;
int ret = 0;
GLOBAL_STATE_CODE();
if (s->prepared) {
return 0;
}
s->prepared = true;
aio_context_acquire(qemu_get_aio_context());
mirror_top_bs = s->mirror_top_bs;
bs_opaque = mirror_top_bs->opaque;
src = mirror_top_bs->backing->bs;
@ -789,6 +793,8 @@ static int mirror_exit_common(Job *job)
bdrv_unref(mirror_top_bs);
bdrv_unref(src);
aio_context_release(qemu_get_aio_context());
return ret;
}

View File

@ -195,7 +195,6 @@ static void nfs_set_events(NFSClient *client)
int ev = nfs_which_events(client->context);
if (ev != client->events) {
aio_set_fd_handler(client->aio_context, nfs_get_fd(client->context),
false,
(ev & POLLIN) ? nfs_process_read : NULL,
(ev & POLLOUT) ? nfs_process_write : NULL,
NULL, NULL, client);
@ -373,7 +372,7 @@ static void nfs_detach_aio_context(BlockDriverState *bs)
NFSClient *client = bs->opaque;
aio_set_fd_handler(client->aio_context, nfs_get_fd(client->context),
false, NULL, NULL, NULL, NULL, NULL);
NULL, NULL, NULL, NULL, NULL);
client->events = 0;
}
@ -391,7 +390,7 @@ static void nfs_client_close(NFSClient *client)
if (client->context) {
qemu_mutex_lock(&client->mutex);
aio_set_fd_handler(client->aio_context, nfs_get_fd(client->context),
false, NULL, NULL, NULL, NULL, NULL);
NULL, NULL, NULL, NULL, NULL);
qemu_mutex_unlock(&client->mutex);
if (client->fh) {
nfs_close(client->context, client->fh);

View File

@ -862,7 +862,7 @@ static int nvme_init(BlockDriverState *bs, const char *device, int namespace,
}
aio_set_event_notifier(bdrv_get_aio_context(bs),
&s->irq_notifier[MSIX_SHARED_IRQ_IDX],
false, nvme_handle_event, nvme_poll_cb,
nvme_handle_event, nvme_poll_cb,
nvme_poll_ready);
if (!nvme_identify(bs, namespace, errp)) {
@ -948,7 +948,7 @@ static void nvme_close(BlockDriverState *bs)
g_free(s->queues);
aio_set_event_notifier(bdrv_get_aio_context(bs),
&s->irq_notifier[MSIX_SHARED_IRQ_IDX],
false, NULL, NULL, NULL);
NULL, NULL, NULL);
event_notifier_cleanup(&s->irq_notifier[MSIX_SHARED_IRQ_IDX]);
qemu_vfio_pci_unmap_bar(s->vfio, 0, s->bar0_wo_map,
0, sizeof(NvmeBar) + NVME_DOORBELL_SIZE);
@ -1546,7 +1546,7 @@ static void nvme_detach_aio_context(BlockDriverState *bs)
aio_set_event_notifier(bdrv_get_aio_context(bs),
&s->irq_notifier[MSIX_SHARED_IRQ_IDX],
false, NULL, NULL, NULL);
NULL, NULL, NULL);
}
static void nvme_attach_aio_context(BlockDriverState *bs,
@ -1556,7 +1556,7 @@ static void nvme_attach_aio_context(BlockDriverState *bs,
s->aio_context = new_context;
aio_set_event_notifier(new_context, &s->irq_notifier[MSIX_SHARED_IRQ_IDX],
false, nvme_handle_event, nvme_poll_cb,
nvme_handle_event, nvme_poll_cb,
nvme_poll_ready);
for (unsigned i = 0; i < s->queue_count; i++) {

View File

@ -362,7 +362,10 @@ void qmp_blockdev_change_medium(const char *device,
qdict_put_str(options, "driver", format);
}
aio_context_acquire(qemu_get_aio_context());
medium_bs = bdrv_open(filename, NULL, options, bdrv_flags, errp);
aio_context_release(qemu_get_aio_context());
if (!medium_bs) {
goto fail;
}

View File

@ -1904,6 +1904,8 @@ static void coroutine_fn qcow2_open_entry(void *opaque)
qoc->ret = qcow2_do_open(qoc->bs, qoc->options, qoc->flags, true,
qoc->errp);
qemu_co_mutex_unlock(&s->lock);
aio_wait_kick();
}
static int qcow2_open(BlockDriverState *bs, QDict *options, int flags,
@ -1929,8 +1931,10 @@ static int qcow2_open(BlockDriverState *bs, QDict *options, int flags,
assert(!qemu_in_coroutine());
assert(qemu_get_current_aio_context() == qemu_get_aio_context());
qemu_coroutine_enter(qemu_coroutine_create(qcow2_open_entry, &qoc));
BDRV_POLL_WHILE(bs, qoc.ret == -EINPROGRESS);
aio_co_enter(bdrv_get_aio_context(bs),
qemu_coroutine_create(qcow2_open_entry, &qoc));
AIO_WAIT_WHILE_UNLOCKED(NULL, qoc.ret == -EINPROGRESS);
return qoc.ret;
}

View File

@ -468,6 +468,7 @@ static int raw_open(BlockDriverState *bs, QDict *options, int flags,
Error **errp)
{
BDRVRawState *s = bs->opaque;
AioContext *ctx;
bool has_size;
uint64_t offset, size;
BdrvChildRole file_role;
@ -515,7 +516,11 @@ static int raw_open(BlockDriverState *bs, QDict *options, int flags,
bs->file->bs->filename);
}
ctx = bdrv_get_aio_context(bs);
aio_context_acquire(ctx);
ret = raw_apply_options(bs, s, offset, has_size, size, errp);
aio_context_release(ctx);
if (ret < 0) {
return ret;
}

View File

@ -1019,7 +1019,7 @@ static void restart_coroutine(void *opaque)
AioContext *ctx = bdrv_get_aio_context(bs);
trace_ssh_restart_coroutine(restart->co);
aio_set_fd_handler(ctx, s->sock, false, NULL, NULL, NULL, NULL, NULL);
aio_set_fd_handler(ctx, s->sock, NULL, NULL, NULL, NULL, NULL);
aio_co_wake(restart->co);
}
@ -1049,7 +1049,7 @@ static coroutine_fn void co_yield(BDRVSSHState *s, BlockDriverState *bs)
trace_ssh_co_yield(s->sock, rd_handler, wr_handler);
aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock,
false, rd_handler, wr_handler, NULL, NULL, &restart);
rd_handler, wr_handler, NULL, NULL, &restart);
qemu_coroutine_yield();
trace_ssh_co_yield_back(s->sock);
}

View File

@ -174,7 +174,7 @@ int win32_aio_attach(QEMUWin32AIOState *aio, HANDLE hfile)
void win32_aio_detach_aio_context(QEMUWin32AIOState *aio,
AioContext *old_context)
{
aio_set_event_notifier(old_context, &aio->e, false, NULL, NULL, NULL);
aio_set_event_notifier(old_context, &aio->e, NULL, NULL, NULL);
aio->aio_ctx = NULL;
}
@ -182,8 +182,8 @@ void win32_aio_attach_aio_context(QEMUWin32AIOState *aio,
AioContext *new_context)
{
aio->aio_ctx = new_context;
aio_set_event_notifier(new_context, &aio->e, false,
win32_aio_completion_cb, NULL, NULL);
aio_set_event_notifier(new_context, &aio->e, win32_aio_completion_cb,
NULL, NULL);
}
QEMUWin32AIOState *win32_aio_init(void)

View File

@ -662,6 +662,7 @@ err_no_opts:
/* Takes the ownership of bs_opts */
BlockDriverState *bds_tree_init(QDict *bs_opts, Error **errp)
{
BlockDriverState *bs;
int bdrv_flags = 0;
GLOBAL_STATE_CODE();
@ -676,7 +677,11 @@ BlockDriverState *bds_tree_init(QDict *bs_opts, Error **errp)
bdrv_flags |= BDRV_O_INACTIVE;
}
return bdrv_open(NULL, NULL, bs_opts, bdrv_flags, errp);
aio_context_acquire(qemu_get_aio_context());
bs = bdrv_open(NULL, NULL, bs_opts, bdrv_flags, errp);
aio_context_release(qemu_get_aio_context());
return bs;
}
void blockdev_close_all_bdrv_states(void)
@ -1480,14 +1485,20 @@ static void external_snapshot_action(TransactionAction *action,
}
qdict_put_str(options, "driver", format);
}
aio_context_release(aio_context);
aio_context_acquire(qemu_get_aio_context());
state->new_bs = bdrv_open(new_image_file, snapshot_ref, options, flags,
errp);
aio_context_release(qemu_get_aio_context());
/* We will manually add the backing_hd field to the bs later */
if (!state->new_bs) {
goto out;
return;
}
aio_context_acquire(aio_context);
/*
* Allow attaching a backing file to an overlay that's already in use only
* if the parents don't assume that they are already seeing a valid image.
@ -1732,15 +1743,18 @@ static void drive_backup_action(DriveBackup *backup,
if (format) {
qdict_put_str(options, "driver", format);
}
aio_context_release(aio_context);
aio_context_acquire(qemu_get_aio_context());
target_bs = bdrv_open(backup->target, NULL, options, flags, errp);
aio_context_release(qemu_get_aio_context());
if (!target_bs) {
goto out;
return;
}
/* Honor bdrv_try_change_aio_context() context acquisition requirements. */
old_context = bdrv_get_aio_context(target_bs);
aio_context_release(aio_context);
aio_context_acquire(old_context);
ret = bdrv_try_change_aio_context(target_bs, aio_context, NULL, errp);
@ -3066,13 +3080,17 @@ void qmp_drive_mirror(DriveMirror *arg, Error **errp)
if (format) {
qdict_put_str(options, "driver", format);
}
aio_context_release(aio_context);
/* Mirroring takes care of copy-on-write using the source's backing
* file.
*/
aio_context_acquire(qemu_get_aio_context());
target_bs = bdrv_open(arg->target, NULL, options, flags, errp);
aio_context_release(qemu_get_aio_context());
if (!target_bs) {
goto out;
return;
}
zero_target = (arg->sync == MIRROR_SYNC_MODE_FULL &&
@ -3082,7 +3100,6 @@ void qmp_drive_mirror(DriveMirror *arg, Error **errp)
/* Honor bdrv_try_change_aio_context() context acquisition requirements. */
old_context = bdrv_get_aio_context(target_bs);
aio_context_release(aio_context);
aio_context_acquire(old_context);
ret = bdrv_try_change_aio_context(target_bs, aio_context, NULL, errp);

View File

@ -246,13 +246,15 @@ int virtio_blk_data_plane_start(VirtIODevice *vdev)
}
/* Get this show started by hooking up our callbacks */
aio_context_acquire(s->ctx);
for (i = 0; i < nvqs; i++) {
VirtQueue *vq = virtio_get_queue(s->vdev, i);
if (!blk_in_drain(s->conf->conf.blk)) {
aio_context_acquire(s->ctx);
for (i = 0; i < nvqs; i++) {
VirtQueue *vq = virtio_get_queue(s->vdev, i);
virtio_queue_aio_attach_host_notifier(vq, s->ctx);
virtio_queue_aio_attach_host_notifier(vq, s->ctx);
}
aio_context_release(s->ctx);
}
aio_context_release(s->ctx);
return 0;
fail_aio_context:
@ -287,8 +289,15 @@ static void virtio_blk_data_plane_stop_bh(void *opaque)
for (i = 0; i < s->conf->num_queues; i++) {
VirtQueue *vq = virtio_get_queue(s->vdev, i);
EventNotifier *host_notifier = virtio_queue_get_host_notifier(vq);
virtio_queue_aio_detach_host_notifier(vq, s->ctx);
/*
* Test and clear notifier after disabling event, in case poll callback
* didn't have time to run.
*/
virtio_queue_host_notifier_read(host_notifier);
}
}
@ -315,7 +324,9 @@ void virtio_blk_data_plane_stop(VirtIODevice *vdev)
s->stopping = true;
trace_virtio_blk_data_plane_stop(s);
aio_wait_bh_oneshot(s->ctx, virtio_blk_data_plane_stop_bh, s);
if (!blk_in_drain(s->conf->conf.blk)) {
aio_wait_bh_oneshot(s->ctx, virtio_blk_data_plane_stop_bh, s);
}
aio_context_acquire(s->ctx);

View File

@ -664,6 +664,30 @@ void xen_block_dataplane_destroy(XenBlockDataPlane *dataplane)
g_free(dataplane);
}
void xen_block_dataplane_detach(XenBlockDataPlane *dataplane)
{
if (!dataplane || !dataplane->event_channel) {
return;
}
/* Only reason for failure is a NULL channel */
xen_device_set_event_channel_context(dataplane->xendev,
dataplane->event_channel,
NULL, &error_abort);
}
void xen_block_dataplane_attach(XenBlockDataPlane *dataplane)
{
if (!dataplane || !dataplane->event_channel) {
return;
}
/* Only reason for failure is a NULL channel */
xen_device_set_event_channel_context(dataplane->xendev,
dataplane->event_channel,
dataplane->ctx, &error_abort);
}
void xen_block_dataplane_stop(XenBlockDataPlane *dataplane)
{
XenDevice *xendev;
@ -674,13 +698,11 @@ void xen_block_dataplane_stop(XenBlockDataPlane *dataplane)
xendev = dataplane->xendev;
aio_context_acquire(dataplane->ctx);
if (dataplane->event_channel) {
/* Only reason for failure is a NULL channel */
xen_device_set_event_channel_context(xendev, dataplane->event_channel,
qemu_get_aio_context(),
&error_abort);
if (!blk_in_drain(dataplane->blk)) {
xen_block_dataplane_detach(dataplane);
}
aio_context_acquire(dataplane->ctx);
/* Xen doesn't have multiple users for nodes, so this can't fail */
blk_set_aio_context(dataplane->blk, qemu_get_aio_context(), &error_abort);
aio_context_release(dataplane->ctx);
@ -819,11 +841,9 @@ void xen_block_dataplane_start(XenBlockDataPlane *dataplane,
blk_set_aio_context(dataplane->blk, dataplane->ctx, NULL);
aio_context_release(old_context);
/* Only reason for failure is a NULL channel */
aio_context_acquire(dataplane->ctx);
xen_device_set_event_channel_context(xendev, dataplane->event_channel,
dataplane->ctx, &error_abort);
aio_context_release(dataplane->ctx);
if (!blk_in_drain(dataplane->blk)) {
xen_block_dataplane_attach(dataplane);
}
return;

View File

@ -26,5 +26,7 @@ void xen_block_dataplane_start(XenBlockDataPlane *dataplane,
unsigned int protocol,
Error **errp);
void xen_block_dataplane_stop(XenBlockDataPlane *dataplane);
void xen_block_dataplane_attach(XenBlockDataPlane *dataplane);
void xen_block_dataplane_detach(XenBlockDataPlane *dataplane);
#endif /* HW_BLOCK_DATAPLANE_XEN_BLOCK_H */

View File

@ -1506,8 +1506,44 @@ static void virtio_blk_resize(void *opaque)
aio_bh_schedule_oneshot(qemu_get_aio_context(), virtio_resize_cb, vdev);
}
/* Suspend virtqueue ioeventfd processing during drain */
static void virtio_blk_drained_begin(void *opaque)
{
VirtIOBlock *s = opaque;
VirtIODevice *vdev = VIRTIO_DEVICE(opaque);
AioContext *ctx = blk_get_aio_context(s->conf.conf.blk);
if (!s->dataplane || !s->dataplane_started) {
return;
}
for (uint16_t i = 0; i < s->conf.num_queues; i++) {
VirtQueue *vq = virtio_get_queue(vdev, i);
virtio_queue_aio_detach_host_notifier(vq, ctx);
}
}
/* Resume virtqueue ioeventfd processing after drain */
static void virtio_blk_drained_end(void *opaque)
{
VirtIOBlock *s = opaque;
VirtIODevice *vdev = VIRTIO_DEVICE(opaque);
AioContext *ctx = blk_get_aio_context(s->conf.conf.blk);
if (!s->dataplane || !s->dataplane_started) {
return;
}
for (uint16_t i = 0; i < s->conf.num_queues; i++) {
VirtQueue *vq = virtio_get_queue(vdev, i);
virtio_queue_aio_attach_host_notifier(vq, ctx);
}
}
static const BlockDevOps virtio_block_ops = {
.resize_cb = virtio_blk_resize,
.resize_cb = virtio_blk_resize,
.drained_begin = virtio_blk_drained_begin,
.drained_end = virtio_blk_drained_end,
};
static void virtio_blk_device_realize(DeviceState *dev, Error **errp)

View File

@ -189,8 +189,26 @@ static void xen_block_resize_cb(void *opaque)
xen_device_backend_printf(xendev, "state", "%u", state);
}
/* Suspend request handling */
static void xen_block_drained_begin(void *opaque)
{
XenBlockDevice *blockdev = opaque;
xen_block_dataplane_detach(blockdev->dataplane);
}
/* Resume request handling */
static void xen_block_drained_end(void *opaque)
{
XenBlockDevice *blockdev = opaque;
xen_block_dataplane_attach(blockdev->dataplane);
}
static const BlockDevOps xen_block_dev_ops = {
.resize_cb = xen_block_resize_cb,
.resize_cb = xen_block_resize_cb,
.drained_begin = xen_block_drained_begin,
.drained_end = xen_block_drained_end,
};
static void xen_block_realize(XenDevice *xendev, Error **errp)
@ -242,8 +260,6 @@ static void xen_block_realize(XenDevice *xendev, Error **errp)
return;
}
blk_set_dev_ops(blk, &xen_block_dev_ops, blockdev);
if (conf->discard_granularity == -1) {
conf->discard_granularity = conf->physical_block_size;
}
@ -277,6 +293,8 @@ static void xen_block_realize(XenDevice *xendev, Error **errp)
blockdev->dataplane =
xen_block_dataplane_create(xendev, blk, conf->logical_block_size,
blockdev->props.iothread);
blk_set_dev_ops(blk, &xen_block_dev_ops, blockdev);
}
static void xen_block_frontend_changed(XenDevice *xendev,

View File

@ -133,7 +133,7 @@ static void xen_xenstore_realize(DeviceState *dev, Error **errp)
error_setg(errp, "Xenstore evtchn port init failed");
return;
}
aio_set_fd_handler(qemu_get_aio_context(), xen_be_evtchn_fd(s->eh), true,
aio_set_fd_handler(qemu_get_aio_context(), xen_be_evtchn_fd(s->eh),
xen_xenstore_event, NULL, NULL, NULL, s);
s->impl = xs_impl_create(xen_domid);

View File

@ -60,8 +60,7 @@ static SCSIDevice *do_scsi_device_find(SCSIBus *bus,
* the user access the device.
*/
if (retval && !include_unrealized &&
!qatomic_load_acquire(&retval->qdev.realized)) {
if (retval && !include_unrealized && !qdev_is_realized(&retval->qdev)) {
retval = NULL;
}
@ -488,7 +487,8 @@ static bool scsi_target_emulate_report_luns(SCSITargetReq *r)
DeviceState *qdev = kid->child;
SCSIDevice *dev = SCSI_DEVICE(qdev);
if (dev->channel == channel && dev->id == id && dev->lun != 0) {
if (dev->channel == channel && dev->id == id && dev->lun != 0 &&
qdev_is_realized(&dev->qdev)) {
store_lun(tmp, dev->lun);
g_byte_array_append(buf, tmp, 8);
len += 8;
@ -1669,6 +1669,46 @@ void scsi_device_purge_requests(SCSIDevice *sdev, SCSISense sense)
scsi_device_set_ua(sdev, sense);
}
void scsi_device_drained_begin(SCSIDevice *sdev)
{
SCSIBus *bus = DO_UPCAST(SCSIBus, qbus, sdev->qdev.parent_bus);
if (!bus) {
return;
}
assert(qemu_get_current_aio_context() == qemu_get_aio_context());
assert(bus->drain_count < INT_MAX);
/*
* Multiple BlockBackends can be on a SCSIBus and each may begin/end
* draining at any time. Keep a counter so HBAs only see begin/end once.
*/
if (bus->drain_count++ == 0) {
trace_scsi_bus_drained_begin(bus, sdev);
if (bus->info->drained_begin) {
bus->info->drained_begin(bus);
}
}
}
void scsi_device_drained_end(SCSIDevice *sdev)
{
SCSIBus *bus = DO_UPCAST(SCSIBus, qbus, sdev->qdev.parent_bus);
if (!bus) {
return;
}
assert(qemu_get_current_aio_context() == qemu_get_aio_context());
assert(bus->drain_count > 0);
if (bus->drain_count-- == 1) {
trace_scsi_bus_drained_end(bus, sdev);
if (bus->info->drained_end) {
bus->info->drained_end(bus);
}
}
}
static char *scsibus_get_dev_path(DeviceState *dev)
{
SCSIDevice *d = SCSI_DEVICE(dev);

View File

@ -2360,6 +2360,20 @@ static void scsi_disk_reset(DeviceState *dev)
s->qdev.scsi_version = s->qdev.default_scsi_version;
}
static void scsi_disk_drained_begin(void *opaque)
{
SCSIDiskState *s = opaque;
scsi_device_drained_begin(&s->qdev);
}
static void scsi_disk_drained_end(void *opaque)
{
SCSIDiskState *s = opaque;
scsi_device_drained_end(&s->qdev);
}
static void scsi_disk_resize_cb(void *opaque)
{
SCSIDiskState *s = opaque;
@ -2414,16 +2428,19 @@ static bool scsi_cd_is_medium_locked(void *opaque)
}
static const BlockDevOps scsi_disk_removable_block_ops = {
.change_media_cb = scsi_cd_change_media_cb,
.change_media_cb = scsi_cd_change_media_cb,
.drained_begin = scsi_disk_drained_begin,
.drained_end = scsi_disk_drained_end,
.eject_request_cb = scsi_cd_eject_request_cb,
.is_tray_open = scsi_cd_is_tray_open,
.is_medium_locked = scsi_cd_is_medium_locked,
.resize_cb = scsi_disk_resize_cb,
.is_tray_open = scsi_cd_is_tray_open,
.resize_cb = scsi_disk_resize_cb,
};
static const BlockDevOps scsi_disk_block_ops = {
.resize_cb = scsi_disk_resize_cb,
.drained_begin = scsi_disk_drained_begin,
.drained_end = scsi_disk_drained_end,
.resize_cb = scsi_disk_resize_cb,
};
static void scsi_disk_unit_attention_reported(SCSIDevice *dev)

View File

@ -6,6 +6,8 @@ scsi_req_cancel(int target, int lun, int tag) "target %d lun %d tag %d"
scsi_req_data(int target, int lun, int tag, int len) "target %d lun %d tag %d len %d"
scsi_req_data_canceled(int target, int lun, int tag, int len) "target %d lun %d tag %d len %d"
scsi_req_dequeue(int target, int lun, int tag) "target %d lun %d tag %d"
scsi_bus_drained_begin(void *bus, void *sdev) "bus %p sdev %p"
scsi_bus_drained_end(void *bus, void *sdev) "bus %p sdev %p"
scsi_req_continue(int target, int lun, int tag) "target %d lun %d tag %d"
scsi_req_continue_canceled(int target, int lun, int tag) "target %d lun %d tag %d"
scsi_req_parsed(int target, int lun, int tag, int cmd, int mode, int xfer) "target %d lun %d tag %d command %d dir %d length %d"

View File

@ -71,12 +71,26 @@ static void virtio_scsi_dataplane_stop_bh(void *opaque)
{
VirtIOSCSI *s = opaque;
VirtIOSCSICommon *vs = VIRTIO_SCSI_COMMON(s);
EventNotifier *host_notifier;
int i;
virtio_queue_aio_detach_host_notifier(vs->ctrl_vq, s->ctx);
host_notifier = virtio_queue_get_host_notifier(vs->ctrl_vq);
/*
* Test and clear notifier after disabling event, in case poll callback
* didn't have time to run.
*/
virtio_queue_host_notifier_read(host_notifier);
virtio_queue_aio_detach_host_notifier(vs->event_vq, s->ctx);
host_notifier = virtio_queue_get_host_notifier(vs->event_vq);
virtio_queue_host_notifier_read(host_notifier);
for (i = 0; i < vs->conf.num_queues; i++) {
virtio_queue_aio_detach_host_notifier(vs->cmd_vqs[i], s->ctx);
host_notifier = virtio_queue_get_host_notifier(vs->cmd_vqs[i]);
virtio_queue_host_notifier_read(host_notifier);
}
}
@ -144,14 +158,16 @@ int virtio_scsi_dataplane_start(VirtIODevice *vdev)
s->dataplane_starting = false;
s->dataplane_started = true;
aio_context_acquire(s->ctx);
virtio_queue_aio_attach_host_notifier(vs->ctrl_vq, s->ctx);
virtio_queue_aio_attach_host_notifier_no_poll(vs->event_vq, s->ctx);
if (s->bus.drain_count == 0) {
aio_context_acquire(s->ctx);
virtio_queue_aio_attach_host_notifier(vs->ctrl_vq, s->ctx);
virtio_queue_aio_attach_host_notifier_no_poll(vs->event_vq, s->ctx);
for (i = 0; i < vs->conf.num_queues; i++) {
virtio_queue_aio_attach_host_notifier(vs->cmd_vqs[i], s->ctx);
for (i = 0; i < vs->conf.num_queues; i++) {
virtio_queue_aio_attach_host_notifier(vs->cmd_vqs[i], s->ctx);
}
aio_context_release(s->ctx);
}
aio_context_release(s->ctx);
return 0;
fail_host_notifiers:
@ -197,7 +213,9 @@ void virtio_scsi_dataplane_stop(VirtIODevice *vdev)
}
s->dataplane_stopping = true;
aio_wait_bh_oneshot(s->ctx, virtio_scsi_dataplane_stop_bh, s);
if (s->bus.drain_count == 0) {
aio_wait_bh_oneshot(s->ctx, virtio_scsi_dataplane_stop_bh, s);
}
blk_drain_all(); /* ensure there are no in-flight requests */

View File

@ -933,13 +933,27 @@ static void virtio_scsi_reset(VirtIODevice *vdev)
s->events_dropped = false;
}
static void virtio_scsi_push_event(VirtIOSCSI *s, SCSIDevice *dev,
uint32_t event, uint32_t reason)
typedef struct {
uint32_t event;
uint32_t reason;
union {
/* Used by messages specific to a device */
struct {
uint32_t id;
uint32_t lun;
} address;
};
} VirtIOSCSIEventInfo;
static void virtio_scsi_push_event(VirtIOSCSI *s,
const VirtIOSCSIEventInfo *info)
{
VirtIOSCSICommon *vs = VIRTIO_SCSI_COMMON(s);
VirtIOSCSIReq *req;
VirtIOSCSIEvent *evt;
VirtIODevice *vdev = VIRTIO_DEVICE(s);
uint32_t event = info->event;
uint32_t reason = info->reason;
if (!(vdev->status & VIRTIO_CONFIG_S_DRIVER_OK)) {
return;
@ -965,27 +979,28 @@ static void virtio_scsi_push_event(VirtIOSCSI *s, SCSIDevice *dev,
memset(evt, 0, sizeof(VirtIOSCSIEvent));
evt->event = virtio_tswap32(vdev, event);
evt->reason = virtio_tswap32(vdev, reason);
if (!dev) {
assert(event == VIRTIO_SCSI_T_EVENTS_MISSED);
} else {
if (event != VIRTIO_SCSI_T_EVENTS_MISSED) {
evt->lun[0] = 1;
evt->lun[1] = dev->id;
evt->lun[1] = info->address.id;
/* Linux wants us to keep the same encoding we use for REPORT LUNS. */
if (dev->lun >= 256) {
evt->lun[2] = (dev->lun >> 8) | 0x40;
if (info->address.lun >= 256) {
evt->lun[2] = (info->address.lun >> 8) | 0x40;
}
evt->lun[3] = dev->lun & 0xFF;
evt->lun[3] = info->address.lun & 0xFF;
}
trace_virtio_scsi_event(virtio_scsi_get_lun(evt->lun), event, reason);
virtio_scsi_complete_req(req);
}
static void virtio_scsi_handle_event_vq(VirtIOSCSI *s, VirtQueue *vq)
{
if (s->events_dropped) {
virtio_scsi_push_event(s, NULL, VIRTIO_SCSI_T_NO_EVENT, 0);
VirtIOSCSIEventInfo info = {
.event = VIRTIO_SCSI_T_NO_EVENT,
};
virtio_scsi_push_event(s, &info);
}
}
@ -1009,9 +1024,17 @@ static void virtio_scsi_change(SCSIBus *bus, SCSIDevice *dev, SCSISense sense)
if (virtio_vdev_has_feature(vdev, VIRTIO_SCSI_F_CHANGE) &&
dev->type != TYPE_ROM) {
VirtIOSCSIEventInfo info = {
.event = VIRTIO_SCSI_T_PARAM_CHANGE,
.reason = sense.asc | (sense.ascq << 8),
.address = {
.id = dev->id,
.lun = dev->lun,
},
};
virtio_scsi_acquire(s);
virtio_scsi_push_event(s, dev, VIRTIO_SCSI_T_PARAM_CHANGE,
sense.asc | (sense.ascq << 8));
virtio_scsi_push_event(s, &info);
virtio_scsi_release(s);
}
}
@ -1046,10 +1069,17 @@ static void virtio_scsi_hotplug(HotplugHandler *hotplug_dev, DeviceState *dev,
}
if (virtio_vdev_has_feature(vdev, VIRTIO_SCSI_F_HOTPLUG)) {
VirtIOSCSIEventInfo info = {
.event = VIRTIO_SCSI_T_TRANSPORT_RESET,
.reason = VIRTIO_SCSI_EVT_RESET_RESCAN,
.address = {
.id = sd->id,
.lun = sd->lun,
},
};
virtio_scsi_acquire(s);
virtio_scsi_push_event(s, sd,
VIRTIO_SCSI_T_TRANSPORT_RESET,
VIRTIO_SCSI_EVT_RESET_RESCAN);
virtio_scsi_push_event(s, &info);
scsi_bus_set_ua(&s->bus, SENSE_CODE(REPORTED_LUNS_CHANGED));
virtio_scsi_release(s);
}
@ -1061,20 +1091,16 @@ static void virtio_scsi_hotunplug(HotplugHandler *hotplug_dev, DeviceState *dev,
VirtIODevice *vdev = VIRTIO_DEVICE(hotplug_dev);
VirtIOSCSI *s = VIRTIO_SCSI(vdev);
SCSIDevice *sd = SCSI_DEVICE(dev);
AioContext *ctx = s->ctx ?: qemu_get_aio_context();
VirtIOSCSIEventInfo info = {
.event = VIRTIO_SCSI_T_TRANSPORT_RESET,
.reason = VIRTIO_SCSI_EVT_RESET_REMOVED,
.address = {
.id = sd->id,
.lun = sd->lun,
},
};
if (virtio_vdev_has_feature(vdev, VIRTIO_SCSI_F_HOTPLUG)) {
virtio_scsi_acquire(s);
virtio_scsi_push_event(s, sd,
VIRTIO_SCSI_T_TRANSPORT_RESET,
VIRTIO_SCSI_EVT_RESET_REMOVED);
scsi_bus_set_ua(&s->bus, SENSE_CODE(REPORTED_LUNS_CHANGED));
virtio_scsi_release(s);
}
aio_disable_external(ctx);
qdev_simple_device_unplug_cb(hotplug_dev, dev, errp);
aio_enable_external(ctx);
if (s->ctx) {
virtio_scsi_acquire(s);
@ -1082,6 +1108,49 @@ static void virtio_scsi_hotunplug(HotplugHandler *hotplug_dev, DeviceState *dev,
blk_set_aio_context(sd->conf.blk, qemu_get_aio_context(), NULL);
virtio_scsi_release(s);
}
if (virtio_vdev_has_feature(vdev, VIRTIO_SCSI_F_HOTPLUG)) {
virtio_scsi_acquire(s);
virtio_scsi_push_event(s, &info);
scsi_bus_set_ua(&s->bus, SENSE_CODE(REPORTED_LUNS_CHANGED));
virtio_scsi_release(s);
}
}
/* Suspend virtqueue ioeventfd processing during drain */
static void virtio_scsi_drained_begin(SCSIBus *bus)
{
VirtIOSCSI *s = container_of(bus, VirtIOSCSI, bus);
VirtIODevice *vdev = VIRTIO_DEVICE(s);
uint32_t total_queues = VIRTIO_SCSI_VQ_NUM_FIXED +
s->parent_obj.conf.num_queues;
if (!s->dataplane_started) {
return;
}
for (uint32_t i = 0; i < total_queues; i++) {
VirtQueue *vq = virtio_get_queue(vdev, i);
virtio_queue_aio_detach_host_notifier(vq, s->ctx);
}
}
/* Resume virtqueue ioeventfd processing after drain */
static void virtio_scsi_drained_end(SCSIBus *bus)
{
VirtIOSCSI *s = container_of(bus, VirtIOSCSI, bus);
VirtIODevice *vdev = VIRTIO_DEVICE(s);
uint32_t total_queues = VIRTIO_SCSI_VQ_NUM_FIXED +
s->parent_obj.conf.num_queues;
if (!s->dataplane_started) {
return;
}
for (uint32_t i = 0; i < total_queues; i++) {
VirtQueue *vq = virtio_get_queue(vdev, i);
virtio_queue_aio_attach_host_notifier(vq, s->ctx);
}
}
static struct SCSIBusInfo virtio_scsi_scsi_info = {
@ -1098,6 +1167,8 @@ static struct SCSIBusInfo virtio_scsi_scsi_info = {
.get_sg_list = virtio_scsi_get_sg_list,
.save_request = virtio_scsi_save_request,
.load_request = virtio_scsi_load_request,
.drained_begin = virtio_scsi_drained_begin,
.drained_end = virtio_scsi_drained_end,
};
void virtio_scsi_common_realize(DeviceState *dev,

View File

@ -3491,7 +3491,7 @@ static void virtio_queue_host_notifier_aio_poll_end(EventNotifier *n)
void virtio_queue_aio_attach_host_notifier(VirtQueue *vq, AioContext *ctx)
{
aio_set_event_notifier(ctx, &vq->host_notifier, true,
aio_set_event_notifier(ctx, &vq->host_notifier,
virtio_queue_host_notifier_read,
virtio_queue_host_notifier_aio_poll,
virtio_queue_host_notifier_aio_poll_ready);
@ -3508,17 +3508,14 @@ void virtio_queue_aio_attach_host_notifier(VirtQueue *vq, AioContext *ctx)
*/
void virtio_queue_aio_attach_host_notifier_no_poll(VirtQueue *vq, AioContext *ctx)
{
aio_set_event_notifier(ctx, &vq->host_notifier, true,
aio_set_event_notifier(ctx, &vq->host_notifier,
virtio_queue_host_notifier_read,
NULL, NULL);
}
void virtio_queue_aio_detach_host_notifier(VirtQueue *vq, AioContext *ctx)
{
aio_set_event_notifier(ctx, &vq->host_notifier, true, NULL, NULL, NULL);
/* Test and clear notifier before after disabling event,
* in case poll callback didn't have time to run. */
virtio_queue_host_notifier_read(&vq->host_notifier);
aio_set_event_notifier(ctx, &vq->host_notifier, NULL, NULL, NULL);
}
void virtio_queue_host_notifier_read(EventNotifier *n)

View File

@ -842,12 +842,15 @@ void xen_device_set_event_channel_context(XenDevice *xendev,
}
if (channel->ctx)
aio_set_fd_handler(channel->ctx, qemu_xen_evtchn_fd(channel->xeh), true,
aio_set_fd_handler(channel->ctx, qemu_xen_evtchn_fd(channel->xeh),
NULL, NULL, NULL, NULL, NULL);
channel->ctx = ctx;
aio_set_fd_handler(channel->ctx, qemu_xen_evtchn_fd(channel->xeh), true,
xen_device_event, NULL, xen_device_poll, NULL, channel);
if (ctx) {
aio_set_fd_handler(channel->ctx, qemu_xen_evtchn_fd(channel->xeh),
xen_device_event, NULL, xen_device_poll, NULL,
channel);
}
}
XenEventChannel *xen_device_bind_event_channel(XenDevice *xendev,
@ -920,7 +923,7 @@ void xen_device_unbind_event_channel(XenDevice *xendev,
QLIST_REMOVE(channel, list);
aio_set_fd_handler(channel->ctx, qemu_xen_evtchn_fd(channel->xeh), true,
aio_set_fd_handler(channel->ctx, qemu_xen_evtchn_fd(channel->xeh),
NULL, NULL, NULL, NULL, NULL);
if (qemu_xen_evtchn_unbind(channel->xeh, channel->local_port) < 0) {

View File

@ -225,8 +225,6 @@ struct AioContext {
*/
QEMUTimerListGroup tlg;
int external_disable_cnt;
/* Number of AioHandlers without .io_poll() */
int poll_disable_cnt;
@ -481,7 +479,6 @@ bool aio_poll(AioContext *ctx, bool blocking);
*/
void aio_set_fd_handler(AioContext *ctx,
int fd,
bool is_external,
IOHandler *io_read,
IOHandler *io_write,
AioPollFn *io_poll,
@ -497,7 +494,6 @@ void aio_set_fd_handler(AioContext *ctx,
*/
void aio_set_event_notifier(AioContext *ctx,
EventNotifier *notifier,
bool is_external,
EventNotifierHandler *io_read,
AioPollFn *io_poll,
EventNotifierHandler *io_poll_ready);
@ -626,59 +622,6 @@ static inline void aio_timer_init(AioContext *ctx,
*/
int64_t aio_compute_timeout(AioContext *ctx);
/**
* aio_disable_external:
* @ctx: the aio context
*
* Disable the further processing of external clients.
*/
static inline void aio_disable_external(AioContext *ctx)
{
qatomic_inc(&ctx->external_disable_cnt);
}
/**
* aio_enable_external:
* @ctx: the aio context
*
* Enable the processing of external clients.
*/
static inline void aio_enable_external(AioContext *ctx)
{
int old;
old = qatomic_fetch_dec(&ctx->external_disable_cnt);
assert(old > 0);
if (old == 1) {
/* Kick event loop so it re-arms file descriptors */
aio_notify(ctx);
}
}
/**
* aio_external_disabled:
* @ctx: the aio context
*
* Return true if the external clients are disabled.
*/
static inline bool aio_external_disabled(AioContext *ctx)
{
return qatomic_read(&ctx->external_disable_cnt);
}
/**
* aio_node_check:
* @ctx: the aio context
* @is_external: Whether or not the checked node is an external event source.
*
* Check if the node's is_external flag is okay to be polled by the ctx at this
* moment. True means green light.
*/
static inline bool aio_node_check(AioContext *ctx, bool is_external)
{
return !is_external || !qatomic_read(&ctx->external_disable_cnt);
}
/**
* aio_co_schedule:
* @ctx: the aio context

View File

@ -65,6 +65,9 @@
* scheduling a BH in the bottom half that runs the respective non-coroutine
* function. The coroutine yields after scheduling the BH and is reentered when
* the wrapped function returns.
*
* If the first parameter of the function is a BlockDriverState, BdrvChild or
* BlockBackend pointer, the AioContext lock for it is taken in the wrapper.
*/
#define no_co_wrapper

View File

@ -363,6 +363,21 @@ struct BlockDriver {
void (*bdrv_attach_aio_context)(BlockDriverState *bs,
AioContext *new_context);
/**
* bdrv_drain_begin is called if implemented in the beginning of a
* drain operation to drain and stop any internal sources of requests in
* the driver.
* bdrv_drain_end is called if implemented at the end of the drain.
*
* They should be used by the driver to e.g. manage scheduled I/O
* requests, or toggle an internal state. After the end of the drain new
* requests will continue normally.
*
* Implementations of both functions must not call aio_poll().
*/
void (*bdrv_drain_begin)(BlockDriverState *bs);
void (*bdrv_drain_end)(BlockDriverState *bs);
/**
* Try to get @bs's logical and physical block size.
* On success, store them in @bsz and return zero.
@ -758,21 +773,6 @@ struct BlockDriver {
void coroutine_fn GRAPH_RDLOCK_PTR (*bdrv_co_io_unplug)(
BlockDriverState *bs);
/**
* bdrv_drain_begin is called if implemented in the beginning of a
* drain operation to drain and stop any internal sources of requests in
* the driver.
* bdrv_drain_end is called if implemented at the end of the drain.
*
* They should be used by the driver to e.g. manage scheduled I/O
* requests, or toggle an internal state. After the end of the drain new
* requests will continue normally.
*
* Implementations of both functions must not call aio_poll().
*/
void (*bdrv_drain_begin)(BlockDriverState *bs);
void (*bdrv_drain_end)(BlockDriverState *bs);
bool (*bdrv_supports_persistent_dirty_bitmap)(BlockDriverState *bs);
bool coroutine_fn GRAPH_RDLOCK_PTR (*bdrv_co_can_store_new_dirty_bitmap)(
@ -955,6 +955,27 @@ struct BdrvChildClass {
void GRAPH_WRLOCK_PTR (*attach)(BdrvChild *child);
void GRAPH_WRLOCK_PTR (*detach)(BdrvChild *child);
/*
* If this pair of functions is implemented, the parent doesn't issue new
* requests after returning from .drained_begin() until .drained_end() is
* called.
*
* These functions must not change the graph (and therefore also must not
* call aio_poll(), which could change the graph indirectly).
*
* Note that this can be nested. If drained_begin() was called twice, new
* I/O is allowed only after drained_end() was called twice, too.
*/
void (*drained_begin)(BdrvChild *child);
void (*drained_end)(BdrvChild *child);
/*
* Returns whether the parent has pending requests for the child. This
* callback is polled after .drained_begin() has been called until all
* activity on the child has stopped.
*/
bool (*drained_poll)(BdrvChild *child);
/*
* Notifies the parent that the filename of its child has changed (e.g.
* because the direct child was removed from the backing chain), so that it
@ -984,27 +1005,6 @@ struct BdrvChildClass {
const char *(*get_name)(BdrvChild *child);
AioContext *(*get_parent_aio_context)(BdrvChild *child);
/*
* If this pair of functions is implemented, the parent doesn't issue new
* requests after returning from .drained_begin() until .drained_end() is
* called.
*
* These functions must not change the graph (and therefore also must not
* call aio_poll(), which could change the graph indirectly).
*
* Note that this can be nested. If drained_begin() was called twice, new
* I/O is allowed only after drained_end() was called twice, too.
*/
void (*drained_begin)(BdrvChild *child);
void (*drained_end)(BdrvChild *child);
/*
* Returns whether the parent has pending requests for the child. This
* callback is polled after .drained_begin() has been called until all
* activity on the child has stopped.
*/
bool (*drained_poll)(BdrvChild *child);
};
extern const BdrvChildClass child_of_bds;

View File

@ -57,6 +57,8 @@ struct BlockExport {
* Reference count for this block export. This includes strong references
* both from the owner (qemu-nbd or the monitor) and clients connected to
* the export.
*
* Use atomics to access this field.
*/
int refcount;

View File

@ -1,6 +1,7 @@
#ifndef QDEV_CORE_H
#define QDEV_CORE_H
#include "qemu/atomic.h"
#include "qemu/queue.h"
#include "qemu/bitmap.h"
#include "qemu/rcu.h"
@ -168,9 +169,6 @@ typedef struct {
/**
* DeviceState:
* @realized: Indicates whether the device has been fully constructed.
* When accessed outside big qemu lock, must be accessed with
* qatomic_load_acquire()
* @reset: ResettableState for the device; handled by Resettable interface.
*
* This structure should not be accessed directly. We declare it here
@ -339,6 +337,19 @@ DeviceState *qdev_new(const char *name);
*/
DeviceState *qdev_try_new(const char *name);
/**
* qdev_is_realized:
* @dev: The device to check.
*
* May be called outside big qemu lock.
*
* Returns: %true% if the device has been fully constructed, %false% otherwise.
*/
static inline bool qdev_is_realized(DeviceState *dev)
{
return qatomic_load_acquire(&dev->realized);
}
/**
* qdev_realize: Realize @dev.
* @dev: device to realize

View File

@ -133,6 +133,16 @@ struct SCSIBusInfo {
void (*save_request)(QEMUFile *f, SCSIRequest *req);
void *(*load_request)(QEMUFile *f, SCSIRequest *req);
void (*free_request)(SCSIBus *bus, void *priv);
/*
* Temporarily stop submitting new requests between drained_begin() and
* drained_end(). Called from the main loop thread with the BQL held.
*
* Implement these callbacks if request processing is triggered by a file
* descriptor like an EventNotifier. Otherwise set them to NULL.
*/
void (*drained_begin)(SCSIBus *bus);
void (*drained_end)(SCSIBus *bus);
};
#define TYPE_SCSI_BUS "SCSI"
@ -144,6 +154,8 @@ struct SCSIBus {
SCSISense unit_attention;
const SCSIBusInfo *info;
int drain_count; /* protected by BQL */
};
/**
@ -213,6 +225,8 @@ void scsi_req_cancel_complete(SCSIRequest *req);
void scsi_req_cancel(SCSIRequest *req);
void scsi_req_cancel_async(SCSIRequest *req, Notifier *notifier);
void scsi_req_retry(SCSIRequest *req);
void scsi_device_drained_begin(SCSIDevice *sdev);
void scsi_device_drained_end(SCSIDevice *sdev);
void scsi_device_purge_requests(SCSIDevice *sdev, SCSISense sense);
void scsi_device_set_ua(SCSIDevice *sdev, SCSISense sense);
void scsi_device_report_change(SCSIDevice *dev, SCSISense sense);

View File

@ -40,8 +40,9 @@ typedef struct {
int max_queues;
const VuDevIface *vu_iface;
unsigned int in_flight; /* atomic */
/* Protected by ctx lock */
unsigned int refcount;
bool wait_idle;
VuDev vu_dev;
QIOChannel *ioc; /* The I/O channel with the client */
@ -60,8 +61,9 @@ bool vhost_user_server_start(VuServer *server,
void vhost_user_server_stop(VuServer *server);
void vhost_user_server_ref(VuServer *server);
void vhost_user_server_unref(VuServer *server);
void vhost_user_server_inc_in_flight(VuServer *server);
void vhost_user_server_dec_in_flight(VuServer *server);
bool vhost_user_server_has_in_flight(VuServer *server);
void vhost_user_server_attach_aio_context(VuServer *server, AioContext *ctx);
void vhost_user_server_detach_aio_context(VuServer *server);

View File

@ -59,6 +59,19 @@ typedef struct BlockDevOps {
*/
bool (*is_medium_locked)(void *opaque);
/*
* Runs when the backend receives a drain request.
*/
void (*drained_begin)(void *opaque);
/*
* Runs when the backend's last drain request ends.
*/
void (*drained_end)(void *opaque);
/*
* Is the device still busy?
*/
bool (*drained_poll)(void *opaque);
/*
* I/O API functions. These functions are thread-safe.
*
@ -76,18 +89,6 @@ typedef struct BlockDevOps {
* Runs when the size changed (e.g. monitor command block_resize)
*/
void (*resize_cb)(void *opaque);
/*
* Runs when the backend receives a drain request.
*/
void (*drained_begin)(void *opaque);
/*
* Runs when the backend's last drain request ends.
*/
void (*drained_end)(void *opaque);
/*
* Is the device still busy?
*/
bool (*drained_poll)(void *opaque);
} BlockDevOps;
/*

View File

@ -81,6 +81,7 @@ void blk_activate(BlockBackend *blk, Error **errp);
int blk_make_zero(BlockBackend *blk, BdrvRequestFlags flags);
void blk_aio_cancel(BlockAIOCB *acb);
int blk_commit_all(void);
bool blk_in_drain(BlockBackend *blk);
void blk_drain(BlockBackend *blk);
void blk_drain_all(void);
void blk_set_on_error(BlockBackend *blk, BlockdevOnError on_read_error,

View File

@ -337,10 +337,8 @@ static void qio_channel_command_set_aio_fd_handler(QIOChannel *ioc,
void *opaque)
{
QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);
aio_set_fd_handler(ctx, cioc->readfd, false,
io_read, NULL, NULL, NULL, opaque);
aio_set_fd_handler(ctx, cioc->writefd, false,
NULL, io_write, NULL, NULL, opaque);
aio_set_fd_handler(ctx, cioc->readfd, io_read, NULL, NULL, NULL, opaque);
aio_set_fd_handler(ctx, cioc->writefd, NULL, io_write, NULL, NULL, opaque);
}

View File

@ -198,8 +198,7 @@ static void qio_channel_file_set_aio_fd_handler(QIOChannel *ioc,
void *opaque)
{
QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc);
aio_set_fd_handler(ctx, fioc->fd, false, io_read, io_write,
NULL, NULL, opaque);
aio_set_fd_handler(ctx, fioc->fd, io_read, io_write, NULL, NULL, opaque);
}
static GSource *qio_channel_file_create_watch(QIOChannel *ioc,

View File

@ -899,8 +899,7 @@ static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc,
void *opaque)
{
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
aio_set_fd_handler(ctx, sioc->fd, false,
io_read, io_write, NULL, NULL, opaque);
aio_set_fd_handler(ctx, sioc->fd, io_read, io_write, NULL, NULL, opaque);
}
static GSource *qio_channel_socket_create_watch(QIOChannel *ioc,

View File

@ -3110,15 +3110,15 @@ static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
if (io_read) {
aio_set_fd_handler(ctx, rioc->rdmain->recv_comp_channel->fd,
false, io_read, io_write, NULL, NULL, opaque);
aio_set_fd_handler(ctx, rioc->rdmain->send_comp_channel->fd,
false, io_read, io_write, NULL, NULL, opaque);
aio_set_fd_handler(ctx, rioc->rdmain->recv_comp_channel->fd, io_read,
io_write, NULL, NULL, opaque);
aio_set_fd_handler(ctx, rioc->rdmain->send_comp_channel->fd, io_read,
io_write, NULL, NULL, opaque);
} else {
aio_set_fd_handler(ctx, rioc->rdmaout->recv_comp_channel->fd,
false, io_read, io_write, NULL, NULL, opaque);
aio_set_fd_handler(ctx, rioc->rdmaout->send_comp_channel->fd,
false, io_read, io_write, NULL, NULL, opaque);
aio_set_fd_handler(ctx, rioc->rdmaout->recv_comp_channel->fd, io_read,
io_write, NULL, NULL, opaque);
aio_set_fd_handler(ctx, rioc->rdmaout->send_comp_channel->fd, io_read,
io_write, NULL, NULL, opaque);
}
}

View File

@ -1071,7 +1071,11 @@ int main(int argc, char **argv)
qdict_put_str(raw_opts, "driver", "raw");
qdict_put_str(raw_opts, "file", bs->node_name);
qdict_put_int(raw_opts, "offset", dev_offset);
aio_context_acquire(qemu_get_aio_context());
bs = bdrv_open(NULL, NULL, raw_opts, flags, &error_fatal);
aio_context_release(qemu_get_aio_context());
blk_remove_bs(blk);
blk_insert_bs(blk, bs, &error_fatal);
bdrv_unref(bs);

View File

@ -88,16 +88,7 @@ class FuncDecl:
raise ValueError(f"no_co function can't be rdlock: {self.name}")
self.target_name = f'{subsystem}_{subname}'
t = self.args[0].type
if t == 'BlockDriverState *':
ctx = 'bdrv_get_aio_context(bs)'
elif t == 'BdrvChild *':
ctx = 'bdrv_get_aio_context(child->bs)'
elif t == 'BlockBackend *':
ctx = 'blk_get_aio_context(blk)'
else:
ctx = 'qemu_get_aio_context()'
self.ctx = ctx
self.ctx = self.gen_ctx()
self.get_result = 's->ret = '
self.ret = 'return s.ret;'
@ -109,6 +100,17 @@ class FuncDecl:
self.co_ret = ''
self.return_field = ''
def gen_ctx(self, prefix: str = '') -> str:
t = self.args[0].type
if t == 'BlockDriverState *':
return f'bdrv_get_aio_context({prefix}bs)'
elif t == 'BdrvChild *':
return f'bdrv_get_aio_context({prefix}child->bs)'
elif t == 'BlockBackend *':
return f'blk_get_aio_context({prefix}blk)'
else:
return 'qemu_get_aio_context()'
def gen_list(self, format: str) -> str:
return ', '.join(format.format_map(arg.__dict__) for arg in self.args)
@ -262,8 +264,11 @@ typedef struct {struct_name} {{
static void {name}_bh(void *opaque)
{{
{struct_name} *s = opaque;
AioContext *ctx = {func.gen_ctx('s->')};
aio_context_acquire(ctx);
{func.get_result}{name}({ func.gen_list('s->{name}') });
aio_context_release(ctx);
aio_co_wake(s->co);
}}

View File

@ -24,7 +24,7 @@ import os
import iotests
from iotests import log
iotests._verify_virtio_scsi_pci_or_ccw()
iotests.verify_virtio_scsi_pci_or_ccw()
iotests.script_initialize(supported_fmts=['qcow2'])
size = 64 * 1024 * 1024

View File

@ -1421,7 +1421,7 @@ def _verify_virtio_blk() -> None:
if 'virtio-blk' not in out:
notrun('Missing virtio-blk in QEMU binary')
def _verify_virtio_scsi_pci_or_ccw() -> None:
def verify_virtio_scsi_pci_or_ccw() -> None:
out = qemu_pipe('-M', 'none', '-device', 'help')
if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')

View File

@ -0,0 +1,67 @@
#!/usr/bin/env python3
# group: rw quick
#
# Copyright (C) 2023 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Creator/Owner: Kevin Wolf <kwolf@redhat.com>
import asyncio
import iotests
iotests.script_initialize(supported_fmts=['qcow2', 'qcow', 'qed', 'vdi',
'vmdk', 'parallels'])
iotests.verify_virtio_scsi_pci_or_ccw()
with iotests.FilePath('disk.img') as img_path, \
iotests.VM() as vm:
iotests.qemu_img_create('-f', 'raw', img_path, '0')
vm.add_object('iothread,id=iothread0')
vm.add_blockdev(f'file,node-name=img-file,read-only=on,'
f'filename={img_path}')
vm.add_device('virtio-scsi,iothread=iothread0')
vm.add_device('scsi-hd,drive=img-file,share-rw=on')
vm.launch()
iotests.log(vm.qmp(
'blockdev-reopen',
options=[{
'driver': 'file',
'filename': img_path,
'node-name': 'img-file',
'read-only': False,
}],
))
iotests.log(vm.qmp(
'blockdev-create',
job_id='job0',
options={
'driver': iotests.imgfmt,
'file': 'img-file',
'size': 1024 * 1024,
},
))
# Should succeed and not time out
try:
vm.run_job('job0', wait=5.0)
vm.shutdown()
except asyncio.TimeoutError:
# VM may be stuck, kill it
vm.kill()
raise

View File

@ -0,0 +1,4 @@
{"return": {}}
{"return": {}}
{"execute": "job-dismiss", "arguments": {"id": "job0"}}
{"return": {}}

View File

@ -125,9 +125,6 @@ if have_block
if nettle.found() or gcrypt.found()
tests += {'test-crypto-pbkdf': [io]}
endif
if config_host_data.get('CONFIG_EPOLL_CREATE1')
tests += {'test-fdmon-epoll': [testblock]}
endif
endif
if have_system

View File

@ -130,7 +130,7 @@ static void *test_acquire_thread(void *opaque)
static void set_event_notifier(AioContext *ctx, EventNotifier *notifier,
EventNotifierHandler *handler)
{
aio_set_event_notifier(ctx, notifier, false, handler, NULL, NULL);
aio_set_event_notifier(ctx, notifier, handler, NULL, NULL);
}
static void dummy_notifier_read(EventNotifier *n)
@ -383,30 +383,6 @@ static void test_flush_event_notifier(void)
event_notifier_cleanup(&data.e);
}
static void test_aio_external_client(void)
{
int i, j;
for (i = 1; i < 3; i++) {
EventNotifierTestData data = { .n = 0, .active = 10, .auto_set = true };
event_notifier_init(&data.e, false);
aio_set_event_notifier(ctx, &data.e, true, event_ready_cb, NULL, NULL);
event_notifier_set(&data.e);
for (j = 0; j < i; j++) {
aio_disable_external(ctx);
}
for (j = 0; j < i; j++) {
assert(!aio_poll(ctx, false));
assert(event_notifier_test_and_clear(&data.e));
event_notifier_set(&data.e);
aio_enable_external(ctx);
}
assert(aio_poll(ctx, false));
set_event_notifier(ctx, &data.e, NULL);
event_notifier_cleanup(&data.e);
}
}
static void test_wait_event_notifier_noflush(void)
{
EventNotifierTestData data = { .n = 0 };
@ -935,7 +911,6 @@ int main(int argc, char **argv)
g_test_add_func("/aio/event/wait", test_wait_event_notifier);
g_test_add_func("/aio/event/wait/no-flush-cb", test_wait_event_notifier_noflush);
g_test_add_func("/aio/event/flush", test_flush_event_notifier);
g_test_add_func("/aio/external-client", test_aio_external_client);
g_test_add_func("/aio/timer/schedule", test_timer_schedule);
g_test_add_func("/aio/coroutine/queue-chaining", test_queue_chaining);

View File

@ -473,7 +473,6 @@ static void test_graph_change_drain_all(void)
g_assert_cmpint(bs_b->quiesce_counter, ==, 0);
g_assert_cmpint(b_s->drain_count, ==, 0);
g_assert_cmpint(qemu_get_aio_context()->external_disable_cnt, ==, 0);
bdrv_unref(bs_b);
blk_unref(blk_b);
@ -483,19 +482,19 @@ struct test_iothread_data {
BlockDriverState *bs;
enum drain_type drain_type;
int *aio_ret;
bool co_done;
};
static void test_iothread_drain_entry(void *opaque)
static void coroutine_fn test_iothread_drain_co_entry(void *opaque)
{
struct test_iothread_data *data = opaque;
aio_context_acquire(bdrv_get_aio_context(data->bs));
do_drain_begin(data->drain_type, data->bs);
g_assert_cmpint(*data->aio_ret, ==, 0);
do_drain_end(data->drain_type, data->bs);
aio_context_release(bdrv_get_aio_context(data->bs));
qemu_event_set(&done_event);
data->co_done = true;
aio_wait_kick();
}
static void test_iothread_aio_cb(void *opaque, int ret)
@ -531,6 +530,7 @@ static void test_iothread_common(enum drain_type drain_type, int drain_thread)
BlockDriverState *bs;
BDRVTestState *s;
BlockAIOCB *acb;
Coroutine *co;
int aio_ret;
struct test_iothread_data data;
@ -609,8 +609,9 @@ static void test_iothread_common(enum drain_type drain_type, int drain_thread)
}
break;
case 1:
aio_bh_schedule_oneshot(ctx_a, test_iothread_drain_entry, &data);
qemu_event_wait(&done_event);
co = qemu_coroutine_create(test_iothread_drain_co_entry, &data);
aio_co_enter(ctx_a, co);
AIO_WAIT_WHILE_UNLOCKED(NULL, !data.co_done);
break;
default:
g_assert_not_reached();

View File

@ -833,9 +833,9 @@ static void test_attach_second_node(void)
qdict_put_str(options, "driver", "raw");
qdict_put_str(options, "file", "base");
aio_context_acquire(ctx);
aio_context_acquire(main_ctx);
filter = bdrv_open(NULL, NULL, options, BDRV_O_RDWR, &error_abort);
aio_context_release(ctx);
aio_context_release(main_ctx);
g_assert(blk_get_aio_context(blk) == ctx);
g_assert(bdrv_get_aio_context(bs) == ctx);

View File

@ -1,73 +0,0 @@
/* SPDX-License-Identifier: GPL-2.0-or-later */
/*
* fdmon-epoll tests
*
* Copyright (c) 2020 Red Hat, Inc.
*/
#include "qemu/osdep.h"
#include "block/aio.h"
#include "qapi/error.h"
#include "qemu/main-loop.h"
static AioContext *ctx;
static void dummy_fd_handler(EventNotifier *notifier)
{
event_notifier_test_and_clear(notifier);
}
static void add_event_notifiers(EventNotifier *notifiers, size_t n)
{
for (size_t i = 0; i < n; i++) {
event_notifier_init(&notifiers[i], false);
aio_set_event_notifier(ctx, &notifiers[i], false,
dummy_fd_handler, NULL, NULL);
}
}
static void remove_event_notifiers(EventNotifier *notifiers, size_t n)
{
for (size_t i = 0; i < n; i++) {
aio_set_event_notifier(ctx, &notifiers[i], false, NULL, NULL, NULL);
event_notifier_cleanup(&notifiers[i]);
}
}
/* Check that fd handlers work when external clients are disabled */
static void test_external_disabled(void)
{
EventNotifier notifiers[100];
/* fdmon-epoll is only enabled when many fd handlers are registered */
add_event_notifiers(notifiers, G_N_ELEMENTS(notifiers));
event_notifier_set(&notifiers[0]);
assert(aio_poll(ctx, true));
aio_disable_external(ctx);
event_notifier_set(&notifiers[0]);
assert(aio_poll(ctx, true));
aio_enable_external(ctx);
remove_event_notifiers(notifiers, G_N_ELEMENTS(notifiers));
}
int main(int argc, char **argv)
{
/*
* This code relies on the fact that fdmon-io_uring disables itself when
* the glib main loop is in use. The main loop uses fdmon-poll and upgrades
* to fdmon-epoll when the number of fds exceeds a threshold.
*/
qemu_init_main_loop(&error_fatal);
ctx = qemu_get_aio_context();
while (g_main_context_iteration(NULL, false)) {
/* Do nothing */
}
g_test_init(&argc, &argv, NULL);
g_test_add_func("/fdmon-epoll/external-disabled", test_external_disabled);
return g_test_run();
}

View File

@ -91,12 +91,12 @@ static void test(void)
/* Make the event notifier active (set) right away */
event_notifier_init(&td.poll_notifier, 1);
aio_set_event_notifier(td.ctx, &td.poll_notifier, false,
aio_set_event_notifier(td.ctx, &td.poll_notifier,
io_read, io_poll_true, io_poll_ready);
/* This event notifier will be used later */
event_notifier_init(&td.dummy_notifier, 0);
aio_set_event_notifier(td.ctx, &td.dummy_notifier, false,
aio_set_event_notifier(td.ctx, &td.dummy_notifier,
io_read, io_poll_false, io_poll_never_ready);
/* Consume aio_notify() */
@ -114,9 +114,8 @@ static void test(void)
/* Run io_poll()/io_poll_ready() one more time to show it keeps working */
g_assert(aio_poll(td.ctx, true));
aio_set_event_notifier(td.ctx, &td.dummy_notifier, false,
NULL, NULL, NULL);
aio_set_event_notifier(td.ctx, &td.poll_notifier, false, NULL, NULL, NULL);
aio_set_event_notifier(td.ctx, &td.dummy_notifier, NULL, NULL, NULL);
aio_set_event_notifier(td.ctx, &td.poll_notifier, NULL, NULL, NULL);
event_notifier_cleanup(&td.dummy_notifier);
event_notifier_cleanup(&td.poll_notifier);
aio_context_unref(td.ctx);

View File

@ -99,7 +99,6 @@ static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
void aio_set_fd_handler(AioContext *ctx,
int fd,
bool is_external,
IOHandler *io_read,
IOHandler *io_write,
AioPollFn *io_poll,
@ -144,7 +143,6 @@ void aio_set_fd_handler(AioContext *ctx,
new_node->io_poll = io_poll;
new_node->io_poll_ready = io_poll_ready;
new_node->opaque = opaque;
new_node->is_external = is_external;
if (is_new) {
new_node->pfd.fd = fd;
@ -196,12 +194,11 @@ static void aio_set_fd_poll(AioContext *ctx, int fd,
void aio_set_event_notifier(AioContext *ctx,
EventNotifier *notifier,
bool is_external,
EventNotifierHandler *io_read,
AioPollFn *io_poll,
EventNotifierHandler *io_poll_ready)
{
aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), is_external,
aio_set_fd_handler(ctx, event_notifier_get_fd(notifier),
(IOHandler *)io_read, NULL, io_poll,
(IOHandler *)io_poll_ready, notifier);
}
@ -285,13 +282,11 @@ bool aio_pending(AioContext *ctx)
/* TODO should this check poll ready? */
revents = node->pfd.revents & node->pfd.events;
if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
aio_node_check(ctx, node->is_external)) {
if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
result = true;
break;
}
if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
aio_node_check(ctx, node->is_external)) {
if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
result = true;
break;
}
@ -350,9 +345,7 @@ static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node)
QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll);
}
if (!QLIST_IS_INSERTED(node, node_deleted) &&
poll_ready && revents == 0 &&
aio_node_check(ctx, node->is_external) &&
node->io_poll_ready) {
poll_ready && revents == 0 && node->io_poll_ready) {
/*
* Remove temporarily to avoid infinite loops when ->io_poll_ready()
* calls aio_poll() before clearing the condition that made the poll
@ -375,7 +368,6 @@ static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node)
if (!QLIST_IS_INSERTED(node, node_deleted) &&
(revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
aio_node_check(ctx, node->is_external) &&
node->io_read) {
node->io_read(node->opaque);
@ -386,7 +378,6 @@ static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node)
}
if (!QLIST_IS_INSERTED(node, node_deleted) &&
(revents & (G_IO_OUT | G_IO_ERR)) &&
aio_node_check(ctx, node->is_external) &&
node->io_write) {
node->io_write(node->opaque);
progress = true;
@ -447,8 +438,7 @@ static bool run_poll_handlers_once(AioContext *ctx,
AioHandler *tmp;
QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
if (aio_node_check(ctx, node->is_external) &&
node->io_poll(node->opaque)) {
if (node->io_poll(node->opaque)) {
aio_add_poll_ready_handler(ready_list, node);
node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;

View File

@ -38,7 +38,6 @@ struct AioHandler {
#endif
int64_t poll_idle_timeout; /* when to stop userspace polling */
bool poll_ready; /* has polling detected an event? */
bool is_external;
};
/* Add a handler to a ready list */

View File

@ -32,7 +32,6 @@ struct AioHandler {
GPollFD pfd;
int deleted;
void *opaque;
bool is_external;
QLIST_ENTRY(AioHandler) node;
};
@ -64,7 +63,6 @@ static void aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
void aio_set_fd_handler(AioContext *ctx,
int fd,
bool is_external,
IOHandler *io_read,
IOHandler *io_write,
AioPollFn *io_poll,
@ -111,7 +109,6 @@ void aio_set_fd_handler(AioContext *ctx,
node->opaque = opaque;
node->io_read = io_read;
node->io_write = io_write;
node->is_external = is_external;
if (io_read) {
bitmask |= FD_READ | FD_ACCEPT | FD_CLOSE;
@ -135,7 +132,6 @@ void aio_set_fd_handler(AioContext *ctx,
void aio_set_event_notifier(AioContext *ctx,
EventNotifier *e,
bool is_external,
EventNotifierHandler *io_notify,
AioPollFn *io_poll,
EventNotifierHandler *io_poll_ready)
@ -161,7 +157,6 @@ void aio_set_event_notifier(AioContext *ctx,
node->e = e;
node->pfd.fd = (uintptr_t)event_notifier_get_handle(e);
node->pfd.events = G_IO_IN;
node->is_external = is_external;
QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
g_source_add_poll(&ctx->source, &node->pfd);
@ -368,8 +363,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
/* fill fd sets */
count = 0;
QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
if (!node->deleted && node->io_notify
&& aio_node_check(ctx, node->is_external)) {
if (!node->deleted && node->io_notify) {
assert(count < MAXIMUM_WAIT_OBJECTS);
events[count++] = event_notifier_get_handle(node->e);
}

View File

@ -409,7 +409,7 @@ aio_ctx_finalize(GSource *source)
g_free(bh);
}
aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL, NULL);
aio_set_event_notifier(ctx, &ctx->notifier, NULL, NULL, NULL);
event_notifier_cleanup(&ctx->notifier);
qemu_rec_mutex_destroy(&ctx->lock);
qemu_lockcnt_destroy(&ctx->list_lock);
@ -593,7 +593,6 @@ AioContext *aio_context_new(Error **errp)
QSLIST_INIT(&ctx->scheduled_coroutines);
aio_set_event_notifier(ctx, &ctx->notifier,
false,
aio_context_notifier_cb,
aio_context_notifier_poll,
aio_context_notifier_poll_ready);

View File

@ -64,11 +64,6 @@ static int fdmon_epoll_wait(AioContext *ctx, AioHandlerList *ready_list,
int i, ret = 0;
struct epoll_event events[128];
/* Fall back while external clients are disabled */
if (qatomic_read(&ctx->external_disable_cnt)) {
return fdmon_poll_ops.wait(ctx, ready_list, timeout);
}
if (timeout > 0) {
ret = qemu_poll_ns(&pfd, 1, timeout);
if (ret > 0) {
@ -133,11 +128,6 @@ bool fdmon_epoll_try_upgrade(AioContext *ctx, unsigned npfd)
return false;
}
/* Do not upgrade while external clients are disabled */
if (qatomic_read(&ctx->external_disable_cnt)) {
return false;
}
if (npfd < EPOLL_ENABLE_THRESHOLD) {
return false;
}

View File

@ -276,11 +276,6 @@ static int fdmon_io_uring_wait(AioContext *ctx, AioHandlerList *ready_list,
unsigned wait_nr = 1; /* block until at least one cqe is ready */
int ret;
/* Fall back while external clients are disabled */
if (qatomic_read(&ctx->external_disable_cnt)) {
return fdmon_poll_ops.wait(ctx, ready_list, timeout);
}
if (timeout == 0) {
wait_nr = 0; /* non-blocking */
} else if (timeout > 0) {
@ -315,8 +310,7 @@ static bool fdmon_io_uring_need_wait(AioContext *ctx)
return true;
}
/* Are we falling back to fdmon-poll? */
return qatomic_read(&ctx->external_disable_cnt);
return false;
}
static const FDMonOps fdmon_io_uring_ops = {

View File

@ -65,8 +65,7 @@ static int fdmon_poll_wait(AioContext *ctx, AioHandlerList *ready_list,
assert(npfd == 0);
QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
if (!QLIST_IS_INSERTED(node, node_deleted) && node->pfd.events
&& aio_node_check(ctx, node->is_external)) {
if (!QLIST_IS_INSERTED(node, node_deleted) && node->pfd.events) {
add_pollfd(node);
}
}

View File

@ -644,14 +644,13 @@ void qemu_set_fd_handler(int fd,
void *opaque)
{
iohandler_init();
aio_set_fd_handler(iohandler_ctx, fd, false,
fd_read, fd_write, NULL, NULL, opaque);
aio_set_fd_handler(iohandler_ctx, fd, fd_read, fd_write, NULL, NULL,
opaque);
}
void event_notifier_set_handler(EventNotifier *e,
EventNotifierHandler *handler)
{
iohandler_init();
aio_set_event_notifier(iohandler_ctx, e, false,
handler, NULL, NULL);
aio_set_event_notifier(iohandler_ctx, e, handler, NULL, NULL);
}

View File

@ -74,8 +74,7 @@ typedef struct {
static void fd_coroutine_enter(void *opaque)
{
FDYieldUntilData *data = opaque;
aio_set_fd_handler(data->ctx, data->fd, false,
NULL, NULL, NULL, NULL, NULL);
aio_set_fd_handler(data->ctx, data->fd, NULL, NULL, NULL, NULL, NULL);
qemu_coroutine_enter(data->co);
}
@ -87,7 +86,7 @@ void coroutine_fn yield_until_fd_readable(int fd)
data.ctx = qemu_get_current_aio_context();
data.co = qemu_coroutine_self();
data.fd = fd;
aio_set_fd_handler(
data.ctx, fd, false, fd_coroutine_enter, NULL, NULL, NULL, &data);
aio_set_fd_handler(data.ctx, fd, fd_coroutine_enter, NULL, NULL, NULL,
&data);
qemu_coroutine_yield();
}

View File

@ -75,20 +75,26 @@ static void panic_cb(VuDev *vu_dev, const char *buf)
error_report("vu_panic: %s", buf);
}
void vhost_user_server_ref(VuServer *server)
void vhost_user_server_inc_in_flight(VuServer *server)
{
assert(!server->wait_idle);
server->refcount++;
qatomic_inc(&server->in_flight);
}
void vhost_user_server_unref(VuServer *server)
void vhost_user_server_dec_in_flight(VuServer *server)
{
server->refcount--;
if (server->wait_idle && !server->refcount) {
aio_co_wake(server->co_trip);
if (qatomic_fetch_dec(&server->in_flight) == 1) {
if (server->wait_idle) {
aio_co_wake(server->co_trip);
}
}
}
bool vhost_user_server_has_in_flight(VuServer *server)
{
return qatomic_load_acquire(&server->in_flight) > 0;
}
static bool coroutine_fn
vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg)
{
@ -192,13 +198,13 @@ static coroutine_fn void vu_client_trip(void *opaque)
/* Keep running */
}
if (server->refcount) {
if (vhost_user_server_has_in_flight(server)) {
/* Wait for requests to complete before we can unmap the memory */
server->wait_idle = true;
qemu_coroutine_yield();
server->wait_idle = false;
}
assert(server->refcount == 0);
assert(!vhost_user_server_has_in_flight(server));
vu_deinit(vu_dev);
@ -272,7 +278,7 @@ set_watch(VuDev *vu_dev, int fd, int vu_evt,
vu_fd_watch->fd = fd;
vu_fd_watch->cb = cb;
qemu_socket_set_nonblock(fd);
aio_set_fd_handler(server->ioc->ctx, fd, true, kick_handler,
aio_set_fd_handler(server->ioc->ctx, fd, kick_handler,
NULL, NULL, NULL, vu_fd_watch);
vu_fd_watch->vu_dev = vu_dev;
vu_fd_watch->pvt = pvt;
@ -293,8 +299,7 @@ static void remove_watch(VuDev *vu_dev, int fd)
if (!vu_fd_watch) {
return;
}
aio_set_fd_handler(server->ioc->ctx, fd, true,
NULL, NULL, NULL, NULL, NULL);
aio_set_fd_handler(server->ioc->ctx, fd, NULL, NULL, NULL, NULL, NULL);
QTAILQ_REMOVE(&server->vu_fd_watches, vu_fd_watch, next);
g_free(vu_fd_watch);
@ -356,7 +361,7 @@ void vhost_user_server_stop(VuServer *server)
VuFdWatch *vu_fd_watch;
QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
aio_set_fd_handler(server->ctx, vu_fd_watch->fd, true,
aio_set_fd_handler(server->ctx, vu_fd_watch->fd,
NULL, NULL, NULL, NULL, vu_fd_watch);
}
@ -397,7 +402,7 @@ void vhost_user_server_attach_aio_context(VuServer *server, AioContext *ctx)
qio_channel_attach_aio_context(server->ioc, ctx);
QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
aio_set_fd_handler(ctx, vu_fd_watch->fd, true, kick_handler, NULL,
aio_set_fd_handler(ctx, vu_fd_watch->fd, kick_handler, NULL,
NULL, NULL, vu_fd_watch);
}
@ -411,7 +416,7 @@ void vhost_user_server_detach_aio_context(VuServer *server)
VuFdWatch *vu_fd_watch;
QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
aio_set_fd_handler(server->ctx, vu_fd_watch->fd, true,
aio_set_fd_handler(server->ctx, vu_fd_watch->fd,
NULL, NULL, NULL, NULL, vu_fd_watch);
}