Block layer patches

- job: replace AioContext lock with job_mutex
 - Fixes to make coroutine_fn annotations more accurate
 - QAPI schema: Fix incorrect example
 - Code cleanup
 -----BEGIN PGP SIGNATURE-----
 
 iQJFBAABCAAvFiEE3D3rFZqa+V09dFb+fwmycsiPL9YFAmNAAz8RHGt3b2xmQHJl
 ZGhhdC5jb20ACgkQfwmycsiPL9a6zg//QYLx+FYMStb50lS+6VBio8AKOVbwn5zp
 ZANoXinMknnxI5wTldjkkM1cBRg27BVjpOHz4XemBtQgT5nBqWq8+Ov31lwASVID
 na/L9o4Pa0xmywM777K+edceWk0fpJTLmnFf1Qxan9qB/VSjNFtk+fjwFopoatKg
 XbHd6maQtrY8bIOyBsBoZozNaS39E/uPqkP67V6GF09re17f0PBctGHKFkTKZr8w
 2HfyMt8/UIhFet++NFgxppTcvIKfZ20pk4AQ+yYsL+FxWr/cs4leKWl5BSc7thtP
 Sm/y0WiEB4nPNo4CSf9sA1Vo8EIGYzBhUVteqYQUF2vSXSzFmZb191fLJRYwp5bQ
 QxEmHzPVGqcUHr+jkfXI0yLolWduiKV1ATZ0zW3N41VfzGLYZdSgI2ZhbHJ0/yKO
 ZhyC63gye9V6TXxviYIz2V6iOD8QuwJ8X1P0E3yRsGploF1UY/N1lwbmek1XhFn/
 +xn/mrTeV0lu4wKuWRpUfY2C/7SR0Za6MB2GqduRWnbcAonLH3/syAxXSfu2611N
 Z1Cf9Wu8Mm0IQz0LbbVvEJZ4yoEPkg/tGH8q6dpau2uTfCb6sSylRxLcXEa5R0UQ
 W+wX5GSoTDe4DQKOSaJE7jWV/QwY5diTLHBIvSF8uKAfeCenkDDLowrMvbWafL0X
 XTFzpZ/1aA8=
 =jMFT
 -----END PGP SIGNATURE-----

Merge tag 'for-upstream' of git://repo.or.cz/qemu/kevin into staging

Block layer patches

- job: replace AioContext lock with job_mutex
- Fixes to make coroutine_fn annotations more accurate
- QAPI schema: Fix incorrect example
- Code cleanup

# -----BEGIN PGP SIGNATURE-----
#
# iQJFBAABCAAvFiEE3D3rFZqa+V09dFb+fwmycsiPL9YFAmNAAz8RHGt3b2xmQHJl
# ZGhhdC5jb20ACgkQfwmycsiPL9a6zg//QYLx+FYMStb50lS+6VBio8AKOVbwn5zp
# ZANoXinMknnxI5wTldjkkM1cBRg27BVjpOHz4XemBtQgT5nBqWq8+Ov31lwASVID
# na/L9o4Pa0xmywM777K+edceWk0fpJTLmnFf1Qxan9qB/VSjNFtk+fjwFopoatKg
# XbHd6maQtrY8bIOyBsBoZozNaS39E/uPqkP67V6GF09re17f0PBctGHKFkTKZr8w
# 2HfyMt8/UIhFet++NFgxppTcvIKfZ20pk4AQ+yYsL+FxWr/cs4leKWl5BSc7thtP
# Sm/y0WiEB4nPNo4CSf9sA1Vo8EIGYzBhUVteqYQUF2vSXSzFmZb191fLJRYwp5bQ
# QxEmHzPVGqcUHr+jkfXI0yLolWduiKV1ATZ0zW3N41VfzGLYZdSgI2ZhbHJ0/yKO
# ZhyC63gye9V6TXxviYIz2V6iOD8QuwJ8X1P0E3yRsGploF1UY/N1lwbmek1XhFn/
# +xn/mrTeV0lu4wKuWRpUfY2C/7SR0Za6MB2GqduRWnbcAonLH3/syAxXSfu2611N
# Z1Cf9Wu8Mm0IQz0LbbVvEJZ4yoEPkg/tGH8q6dpau2uTfCb6sSylRxLcXEa5R0UQ
# W+wX5GSoTDe4DQKOSaJE7jWV/QwY5diTLHBIvSF8uKAfeCenkDDLowrMvbWafL0X
# XTFzpZ/1aA8=
# =jMFT
# -----END PGP SIGNATURE-----
# gpg: Signature made Fri 07 Oct 2022 06:45:19 EDT
# gpg:                using RSA key DC3DEB159A9AF95D3D7456FE7F09B272C88F2FD6
# gpg:                issuer "kwolf@redhat.com"
# gpg: Good signature from "Kevin Wolf <kwolf@redhat.com>" [full]
# Primary key fingerprint: DC3D EB15 9A9A F95D 3D74  56FE 7F09 B272 C88F 2FD6

* tag 'for-upstream' of git://repo.or.cz/qemu/kevin: (50 commits)
  file-posix: Remove unused s->discard_zeroes
  job: remove unused functions
  blockjob: remove unused functions
  block_job_query: remove atomic read
  job.c: enable job lock/unlock and remove Aiocontext locks
  job.h: categorize JobDriver callbacks that need the AioContext lock
  blockjob: protect iostatus field in BlockJob struct
  blockjob: rename notifier callbacks as _locked
  blockjob.h: categorize fields in struct BlockJob
  jobs: protect job.aio_context with BQL and job_mutex
  job: detect change of aiocontext within job coroutine
  jobs: group together API calls under the same job lock
  block/mirror.c: use of job helpers in drivers
  jobs: use job locks also in the unit tests
  jobs: add job lock in find_* functions
  blockjob: introduce block_job _locked() APIs
  job: move and update comments from blockjob.c
  job.c: add job_lock/unlock while keeping job.h intact
  aio-wait.h: introduce AIO_WAIT_WHILE_UNLOCKED
  job.c: API functions not used outside should be static
  ...

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
master
Stefan Hajnoczi 2022-10-12 15:57:56 -04:00
commit 7fa24b8d61
44 changed files with 1254 additions and 804 deletions

24
block.c
View File

@ -631,9 +631,10 @@ static int64_t create_file_fallback_truncate(BlockBackend *blk,
* Helper function for bdrv_create_file_fallback(): Zero the first
* sector to remove any potentially pre-existing image header.
*/
static int create_file_fallback_zero_first_sector(BlockBackend *blk,
int64_t current_size,
Error **errp)
static int coroutine_fn
create_file_fallback_zero_first_sector(BlockBackend *blk,
int64_t current_size,
Error **errp)
{
int64_t bytes_to_clear;
int ret;
@ -4980,8 +4981,8 @@ static void bdrv_close(BlockDriverState *bs)
void bdrv_close_all(void)
{
assert(job_next(NULL) == NULL);
GLOBAL_STATE_CODE();
assert(job_next(NULL) == NULL);
/* Drop references from requests still in flight, such as canceled block
* jobs whose AIO context has not been polled yet */
@ -6167,13 +6168,16 @@ XDbgBlockGraph *bdrv_get_xdbg_block_graph(Error **errp)
}
}
for (job = block_job_next(NULL); job; job = block_job_next(job)) {
GSList *el;
WITH_JOB_LOCK_GUARD() {
for (job = block_job_next_locked(NULL); job;
job = block_job_next_locked(job)) {
GSList *el;
xdbg_graph_add_node(gr, job, X_DBG_BLOCK_GRAPH_NODE_TYPE_BLOCK_JOB,
job->job.id);
for (el = job->nodes; el; el = el->next) {
xdbg_graph_add_edge(gr, job, (BdrvChild *)el->data);
xdbg_graph_add_node(gr, job, X_DBG_BLOCK_GRAPH_NODE_TYPE_BLOCK_JOB,
job->job.id);
for (el = job->nodes; el; el = el->next) {
xdbg_graph_add_edge(gr, job, (BdrvChild *)el->data);
}
}
}

View File

@ -258,7 +258,7 @@ blkverify_co_pwritev(BlockDriverState *bs, int64_t offset, int64_t bytes,
return blkverify_co_prwv(bs, &r, offset, bytes, qiov, qiov, flags, true);
}
static int blkverify_co_flush(BlockDriverState *bs)
static int coroutine_fn blkverify_co_flush(BlockDriverState *bs)
{
BDRVBlkverifyState *s = bs->opaque;

View File

@ -1546,7 +1546,7 @@ static BlockAIOCB *blk_aio_prwv(BlockBackend *blk, int64_t offset,
return &acb->common;
}
static void blk_aio_read_entry(void *opaque)
static void coroutine_fn blk_aio_read_entry(void *opaque)
{
BlkAioEmAIOCB *acb = opaque;
BlkRwCo *rwco = &acb->rwco;
@ -1558,7 +1558,7 @@ static void blk_aio_read_entry(void *opaque)
blk_aio_complete(acb);
}
static void blk_aio_write_entry(void *opaque)
static void coroutine_fn blk_aio_write_entry(void *opaque)
{
BlkAioEmAIOCB *acb = opaque;
BlkRwCo *rwco = &acb->rwco;
@ -1669,7 +1669,7 @@ int coroutine_fn blk_co_ioctl(BlockBackend *blk, unsigned long int req,
return ret;
}
static void blk_aio_ioctl_entry(void *opaque)
static void coroutine_fn blk_aio_ioctl_entry(void *opaque)
{
BlkAioEmAIOCB *acb = opaque;
BlkRwCo *rwco = &acb->rwco;
@ -1703,7 +1703,7 @@ blk_co_do_pdiscard(BlockBackend *blk, int64_t offset, int64_t bytes)
return bdrv_co_pdiscard(blk->root, offset, bytes);
}
static void blk_aio_pdiscard_entry(void *opaque)
static void coroutine_fn blk_aio_pdiscard_entry(void *opaque)
{
BlkAioEmAIOCB *acb = opaque;
BlkRwCo *rwco = &acb->rwco;
@ -1747,7 +1747,7 @@ static int coroutine_fn blk_co_do_flush(BlockBackend *blk)
return bdrv_co_flush(blk_bs(blk));
}
static void blk_aio_flush_entry(void *opaque)
static void coroutine_fn blk_aio_flush_entry(void *opaque)
{
BlkAioEmAIOCB *acb = opaque;
BlkRwCo *rwco = &acb->rwco;

View File

@ -203,9 +203,9 @@ static int coroutine_fn cbw_co_flush(BlockDriverState *bs)
* It's guaranteed that guest writes will not interact in the region until
* cbw_snapshot_read_unlock() called.
*/
static BlockReq *cbw_snapshot_read_lock(BlockDriverState *bs,
int64_t offset, int64_t bytes,
int64_t *pnum, BdrvChild **file)
static coroutine_fn BlockReq *
cbw_snapshot_read_lock(BlockDriverState *bs, int64_t offset, int64_t bytes,
int64_t *pnum, BdrvChild **file)
{
BDRVCopyBeforeWriteState *s = bs->opaque;
BlockReq *req = g_new(BlockReq, 1);
@ -240,7 +240,8 @@ static BlockReq *cbw_snapshot_read_lock(BlockDriverState *bs,
return req;
}
static void cbw_snapshot_read_unlock(BlockDriverState *bs, BlockReq *req)
static coroutine_fn void
cbw_snapshot_read_unlock(BlockDriverState *bs, BlockReq *req)
{
BDRVCopyBeforeWriteState *s = bs->opaque;

View File

@ -855,7 +855,7 @@ out_noclean:
return -EINVAL;
}
static void curl_setup_preadv(BlockDriverState *bs, CURLAIOCB *acb)
static void coroutine_fn curl_setup_preadv(BlockDriverState *bs, CURLAIOCB *acb)
{
CURLState *state;
int running;

View File

@ -154,7 +154,6 @@ typedef struct BDRVRawState {
bool has_discard:1;
bool has_write_zeroes:1;
bool discard_zeroes:1;
bool use_linux_aio:1;
bool use_linux_io_uring:1;
int page_cache_inconsistent; /* errno from fdatasync failure */
@ -755,7 +754,6 @@ static int raw_open_common(BlockDriverState *bs, QDict *options,
ret = -EINVAL;
goto fail;
} else {
s->discard_zeroes = true;
s->has_fallocate = true;
}
} else {
@ -769,19 +767,12 @@ static int raw_open_common(BlockDriverState *bs, QDict *options,
}
if (S_ISBLK(st.st_mode)) {
#ifdef BLKDISCARDZEROES
unsigned int arg;
if (ioctl(s->fd, BLKDISCARDZEROES, &arg) == 0 && arg) {
s->discard_zeroes = true;
}
#endif
#ifdef __linux__
/* On Linux 3.10, BLKDISCARD leaves stale data in the page cache. Do
* not rely on the contents of discarded blocks unless using O_DIRECT.
* Same for BLKZEROOUT.
*/
if (!(bs->open_flags & BDRV_O_NOCACHE)) {
s->discard_zeroes = false;
s->has_write_zeroes = false;
}
#endif
@ -2180,7 +2171,7 @@ static void raw_aio_unplug(BlockDriverState *bs)
#endif
}
static int raw_co_flush_to_disk(BlockDriverState *bs)
static int coroutine_fn raw_co_flush_to_disk(BlockDriverState *bs)
{
BDRVRawState *s = bs->opaque;
RawPosixAIOData acb;

View File

@ -751,11 +751,11 @@ static void coroutine_fn tracked_request_end(BdrvTrackedRequest *req)
/**
* Add an active request to the tracked requests list
*/
static void tracked_request_begin(BdrvTrackedRequest *req,
BlockDriverState *bs,
int64_t offset,
int64_t bytes,
enum BdrvTrackedRequestType type)
static void coroutine_fn tracked_request_begin(BdrvTrackedRequest *req,
BlockDriverState *bs,
int64_t offset,
int64_t bytes,
enum BdrvTrackedRequestType type)
{
bdrv_check_request(offset, bytes, &error_abort);
@ -794,7 +794,7 @@ static bool tracked_request_overlaps(BdrvTrackedRequest *req,
}
/* Called with self->bs->reqs_lock held */
static BdrvTrackedRequest *
static coroutine_fn BdrvTrackedRequest *
bdrv_find_conflicting_request(BdrvTrackedRequest *self)
{
BdrvTrackedRequest *req;
@ -1635,10 +1635,10 @@ static bool bdrv_init_padding(BlockDriverState *bs,
return true;
}
static int bdrv_padding_rmw_read(BdrvChild *child,
BdrvTrackedRequest *req,
BdrvRequestPadding *pad,
bool zero_middle)
static coroutine_fn int bdrv_padding_rmw_read(BdrvChild *child,
BdrvTrackedRequest *req,
BdrvRequestPadding *pad,
bool zero_middle)
{
QEMUIOVector local_qiov;
BlockDriverState *bs = child->bs;
@ -3159,7 +3159,7 @@ out:
return ret;
}
int bdrv_co_ioctl(BlockDriverState *bs, int req, void *buf)
int coroutine_fn bdrv_co_ioctl(BlockDriverState *bs, int req, void *buf)
{
BlockDriver *drv = bs->drv;
CoroutineIOCompletion co = {

View File

@ -290,7 +290,8 @@ iscsi_co_generic_cb(struct iscsi_context *iscsi, int status,
}
}
static void iscsi_co_init_iscsitask(IscsiLun *iscsilun, struct IscsiTask *iTask)
static void coroutine_fn
iscsi_co_init_iscsitask(IscsiLun *iscsilun, struct IscsiTask *iTask)
{
*iTask = (struct IscsiTask) {
.co = qemu_coroutine_self(),

View File

@ -894,6 +894,7 @@ static int coroutine_fn mirror_run(Job *job, Error **errp)
BlockDriverState *bs = s->mirror_top_bs->backing->bs;
BlockDriverState *target_bs = blk_bs(s->target);
bool need_drain = true;
BlockDeviceIoStatus iostatus;
int64_t length;
int64_t target_length;
BlockDriverInfo bdi;
@ -1016,8 +1017,11 @@ static int coroutine_fn mirror_run(Job *job, Error **errp)
* We do so every BLKOCK_JOB_SLICE_TIME nanoseconds, or when there is
* an error, or when the source is clean, whichever comes first. */
delta = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - s->last_pause_ns;
WITH_JOB_LOCK_GUARD() {
iostatus = s->common.iostatus;
}
if (delta < BLOCK_JOB_SLICE_TIME &&
s->common.iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
if (s->in_flight >= MAX_IN_FLIGHT || s->buf_free_count == 0 ||
(cnt == 0 && s->in_flight > 0)) {
trace_mirror_yield(s, cnt, s->buf_free_count, s->in_flight);
@ -1152,8 +1156,10 @@ static void mirror_complete(Job *job, Error **errp)
s->should_complete = true;
/* If the job is paused, it will be re-entered when it is resumed */
if (!job->paused) {
job_enter(job);
WITH_JOB_LOCK_GUARD() {
if (!job->paused) {
job_enter_cond_locked(job, NULL);
}
}
}
@ -1173,8 +1179,11 @@ static bool mirror_drained_poll(BlockJob *job)
* from one of our own drain sections, to avoid a deadlock waiting for
* ourselves.
*/
if (!s->common.job.paused && !job_is_cancelled(&job->job) && !s->in_drain) {
return true;
WITH_JOB_LOCK_GUARD() {
if (!s->common.job.paused && !job_is_cancelled_locked(&job->job)
&& !s->in_drain) {
return true;
}
}
return !!s->in_flight;

View File

@ -983,11 +983,12 @@ static void nbd_iter_request_error(NBDReplyChunkIter *iter, int ret)
* nbd_reply_chunk_iter_receive
* The pointer stored in @payload requires g_free() to free it.
*/
static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
NBDReplyChunkIter *iter,
uint64_t handle,
QEMUIOVector *qiov, NBDReply *reply,
void **payload)
static bool coroutine_fn nbd_reply_chunk_iter_receive(BDRVNBDState *s,
NBDReplyChunkIter *iter,
uint64_t handle,
QEMUIOVector *qiov,
NBDReply *reply,
void **payload)
{
int ret, request_ret;
NBDReply local_reply;

View File

@ -223,7 +223,7 @@ static void nfs_process_write(void *arg)
qemu_mutex_unlock(&client->mutex);
}
static void nfs_co_init_task(BlockDriverState *bs, NFSRPC *task)
static void coroutine_fn nfs_co_init_task(BlockDriverState *bs, NFSRPC *task)
{
*task = (NFSRPC) {
.co = qemu_coroutine_self(),

View File

@ -293,34 +293,42 @@ static void nvme_kick(NVMeQueuePair *q)
q->need_kick = 0;
}
/* Find a free request element if any, otherwise:
* a) if in coroutine context, try to wait for one to become available;
* b) if not in coroutine, return NULL;
*/
static NVMeRequest *nvme_get_free_req(NVMeQueuePair *q)
static NVMeRequest *nvme_get_free_req_nofail_locked(NVMeQueuePair *q)
{
NVMeRequest *req;
qemu_mutex_lock(&q->lock);
while (q->free_req_head == -1) {
if (qemu_in_coroutine()) {
trace_nvme_free_req_queue_wait(q->s, q->index);
qemu_co_queue_wait(&q->free_req_queue, &q->lock);
} else {
qemu_mutex_unlock(&q->lock);
return NULL;
}
}
req = &q->reqs[q->free_req_head];
q->free_req_head = req->free_req_next;
req->free_req_next = -1;
qemu_mutex_unlock(&q->lock);
return req;
}
/* Return a free request element if any, otherwise return NULL. */
static NVMeRequest *nvme_get_free_req_nowait(NVMeQueuePair *q)
{
QEMU_LOCK_GUARD(&q->lock);
if (q->free_req_head == -1) {
return NULL;
}
return nvme_get_free_req_nofail_locked(q);
}
/*
* Wait for a free request to become available if necessary, then
* return it.
*/
static coroutine_fn NVMeRequest *nvme_get_free_req(NVMeQueuePair *q)
{
QEMU_LOCK_GUARD(&q->lock);
while (q->free_req_head == -1) {
trace_nvme_free_req_queue_wait(q->s, q->index);
qemu_co_queue_wait(&q->free_req_queue, &q->lock);
}
return nvme_get_free_req_nofail_locked(q);
}
/* With q->lock */
static void nvme_put_free_req_locked(NVMeQueuePair *q, NVMeRequest *req)
{
@ -506,7 +514,7 @@ static int nvme_admin_cmd_sync(BlockDriverState *bs, NvmeCmd *cmd)
AioContext *aio_context = bdrv_get_aio_context(bs);
NVMeRequest *req;
int ret = -EINPROGRESS;
req = nvme_get_free_req(q);
req = nvme_get_free_req_nowait(q);
if (!req) {
return -EBUSY;
}
@ -1234,8 +1242,10 @@ static inline bool nvme_qiov_aligned(BlockDriverState *bs,
return true;
}
static int nvme_co_prw(BlockDriverState *bs, uint64_t offset, uint64_t bytes,
QEMUIOVector *qiov, bool is_write, int flags)
static coroutine_fn int nvme_co_prw(BlockDriverState *bs,
uint64_t offset, uint64_t bytes,
QEMUIOVector *qiov, bool is_write,
int flags)
{
BDRVNVMeState *s = bs->opaque;
int r;

View File

@ -165,8 +165,9 @@ static int64_t block_status(BDRVParallelsState *s, int64_t sector_num,
return start_off;
}
static int64_t allocate_clusters(BlockDriverState *bs, int64_t sector_num,
int nb_sectors, int *pnum)
static coroutine_fn int64_t allocate_clusters(BlockDriverState *bs,
int64_t sector_num,
int nb_sectors, int *pnum)
{
int ret = 0;
BDRVParallelsState *s = bs->opaque;

View File

@ -884,7 +884,7 @@ int qcow2_alloc_compressed_cluster_offset(BlockDriverState *bs,
return 0;
}
static int perform_cow(BlockDriverState *bs, QCowL2Meta *m)
static int coroutine_fn perform_cow(BlockDriverState *bs, QCowL2Meta *m)
{
BDRVQcow2State *s = bs->opaque;
Qcow2COWRegion *start = &m->cow_start;
@ -1024,7 +1024,8 @@ fail:
return ret;
}
int qcow2_alloc_cluster_link_l2(BlockDriverState *bs, QCowL2Meta *m)
int coroutine_fn qcow2_alloc_cluster_link_l2(BlockDriverState *bs,
QCowL2Meta *m)
{
BDRVQcow2State *s = bs->opaque;
int i, j = 0, l2_index, ret;
@ -1397,8 +1398,9 @@ static int count_single_write_clusters(BlockDriverState *bs, int nb_clusters,
* information on cluster allocation may be invalid now. The caller
* must start over anyway, so consider *cur_bytes undefined.
*/
static int handle_dependencies(BlockDriverState *bs, uint64_t guest_offset,
uint64_t *cur_bytes, QCowL2Meta **m)
static int coroutine_fn handle_dependencies(BlockDriverState *bs,
uint64_t guest_offset,
uint64_t *cur_bytes, QCowL2Meta **m)
{
BDRVQcow2State *s = bs->opaque;
QCowL2Meta *old_alloc;
@ -1772,9 +1774,10 @@ out:
*
* Return 0 on success and -errno in error cases
*/
int qcow2_alloc_host_offset(BlockDriverState *bs, uint64_t offset,
unsigned int *bytes, uint64_t *host_offset,
QCowL2Meta **m)
int coroutine_fn qcow2_alloc_host_offset(BlockDriverState *bs, uint64_t offset,
unsigned int *bytes,
uint64_t *host_offset,
QCowL2Meta **m)
{
BDRVQcow2State *s = bs->opaque;
uint64_t start, remaining;
@ -2105,8 +2108,8 @@ out:
return ret;
}
int qcow2_subcluster_zeroize(BlockDriverState *bs, uint64_t offset,
uint64_t bytes, int flags)
int coroutine_fn qcow2_subcluster_zeroize(BlockDriverState *bs, uint64_t offset,
uint64_t bytes, int flags)
{
BDRVQcow2State *s = bs->opaque;
uint64_t end_offset = offset + bytes;

View File

@ -1206,7 +1206,7 @@ void qcow2_free_any_cluster(BlockDriverState *bs, uint64_t l2_entry,
}
}
int coroutine_fn qcow2_write_caches(BlockDriverState *bs)
int qcow2_write_caches(BlockDriverState *bs)
{
BDRVQcow2State *s = bs->opaque;
int ret;
@ -1226,7 +1226,7 @@ int coroutine_fn qcow2_write_caches(BlockDriverState *bs)
return 0;
}
int coroutine_fn qcow2_flush_caches(BlockDriverState *bs)
int qcow2_flush_caches(BlockDriverState *bs)
{
int ret = qcow2_write_caches(bs);
if (ret < 0) {
@ -3706,7 +3706,7 @@ int64_t qcow2_get_last_cluster(BlockDriverState *bs, int64_t size)
return -EIO;
}
int qcow2_detect_metadata_preallocation(BlockDriverState *bs)
int coroutine_fn qcow2_detect_metadata_preallocation(BlockDriverState *bs)
{
BDRVQcow2State *s = bs->opaque;
int64_t i, end_cluster, cluster_count = 0, threshold;

View File

@ -2448,7 +2448,7 @@ static bool merge_cow(uint64_t offset, unsigned bytes,
* Return 1 if the COW regions read as zeroes, 0 if not, < 0 on error.
* Note that returning 0 does not guarantee non-zero data.
*/
static int is_zero_cow(BlockDriverState *bs, QCowL2Meta *m)
static int coroutine_fn is_zero_cow(BlockDriverState *bs, QCowL2Meta *m)
{
/*
* This check is designed for optimization shortcut so it must be
@ -2466,7 +2466,8 @@ static int is_zero_cow(BlockDriverState *bs, QCowL2Meta *m)
m->cow_end.nb_bytes);
}
static int handle_alloc_space(BlockDriverState *bs, QCowL2Meta *l2meta)
static int coroutine_fn handle_alloc_space(BlockDriverState *bs,
QCowL2Meta *l2meta)
{
BDRVQcow2State *s = bs->opaque;
QCowL2Meta *m;

View File

@ -874,8 +874,8 @@ void qcow2_free_any_cluster(BlockDriverState *bs, uint64_t l2_entry,
int qcow2_update_snapshot_refcount(BlockDriverState *bs,
int64_t l1_table_offset, int l1_size, int addend);
int coroutine_fn qcow2_flush_caches(BlockDriverState *bs);
int coroutine_fn qcow2_write_caches(BlockDriverState *bs);
int qcow2_flush_caches(BlockDriverState *bs);
int qcow2_write_caches(BlockDriverState *bs);
int qcow2_check_refcounts(BlockDriverState *bs, BdrvCheckResult *res,
BdrvCheckMode fix);
@ -895,7 +895,7 @@ int qcow2_change_refcount_order(BlockDriverState *bs, int refcount_order,
void *cb_opaque, Error **errp);
int qcow2_shrink_reftable(BlockDriverState *bs);
int64_t qcow2_get_last_cluster(BlockDriverState *bs, int64_t size);
int qcow2_detect_metadata_preallocation(BlockDriverState *bs);
int coroutine_fn qcow2_detect_metadata_preallocation(BlockDriverState *bs);
/* qcow2-cluster.c functions */
int qcow2_grow_l1_table(BlockDriverState *bs, uint64_t min_size,
@ -908,9 +908,9 @@ int qcow2_encrypt_sectors(BDRVQcow2State *s, int64_t sector_num,
int qcow2_get_host_offset(BlockDriverState *bs, uint64_t offset,
unsigned int *bytes, uint64_t *host_offset,
QCow2SubclusterType *subcluster_type);
int qcow2_alloc_host_offset(BlockDriverState *bs, uint64_t offset,
unsigned int *bytes, uint64_t *host_offset,
QCowL2Meta **m);
int coroutine_fn qcow2_alloc_host_offset(BlockDriverState *bs, uint64_t offset,
unsigned int *bytes,
uint64_t *host_offset, QCowL2Meta **m);
int qcow2_alloc_compressed_cluster_offset(BlockDriverState *bs,
uint64_t offset,
int compressed_size,
@ -918,13 +918,14 @@ int qcow2_alloc_compressed_cluster_offset(BlockDriverState *bs,
void qcow2_parse_compressed_l2_entry(BlockDriverState *bs, uint64_t l2_entry,
uint64_t *coffset, int *csize);
int qcow2_alloc_cluster_link_l2(BlockDriverState *bs, QCowL2Meta *m);
int coroutine_fn qcow2_alloc_cluster_link_l2(BlockDriverState *bs,
QCowL2Meta *m);
void qcow2_alloc_cluster_abort(BlockDriverState *bs, QCowL2Meta *m);
int qcow2_cluster_discard(BlockDriverState *bs, uint64_t offset,
uint64_t bytes, enum qcow2_discard_type type,
bool full_discard);
int qcow2_subcluster_zeroize(BlockDriverState *bs, uint64_t offset,
uint64_t bytes, int flags);
int coroutine_fn qcow2_subcluster_zeroize(BlockDriverState *bs, uint64_t offset,
uint64_t bytes, int flags);
int qcow2_expand_zero_clusters(BlockDriverState *bs,
BlockDriverAmendStatusCB *status_cb,

View File

@ -254,7 +254,7 @@ static CachedL2Table *qed_new_l2_table(BDRVQEDState *s)
return l2_table;
}
static bool qed_plug_allocating_write_reqs(BDRVQEDState *s)
static bool coroutine_fn qed_plug_allocating_write_reqs(BDRVQEDState *s)
{
qemu_co_mutex_lock(&s->table_lock);
@ -273,7 +273,7 @@ static bool qed_plug_allocating_write_reqs(BDRVQEDState *s)
return true;
}
static void qed_unplug_allocating_write_reqs(BDRVQEDState *s)
static void coroutine_fn qed_unplug_allocating_write_reqs(BDRVQEDState *s)
{
qemu_co_mutex_lock(&s->table_lock);
assert(s->allocating_write_reqs_plugged);

View File

@ -161,11 +161,10 @@ static bool quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b)
return a->l == b->l;
}
static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs,
QEMUIOVector *qiov,
uint64_t offset,
uint64_t bytes,
int flags)
static QuorumAIOCB *coroutine_fn quorum_aio_get(BlockDriverState *bs,
QEMUIOVector *qiov,
uint64_t offset, uint64_t bytes,
int flags)
{
BDRVQuorumState *s = bs->opaque;
QuorumAIOCB *acb = g_new(QuorumAIOCB, 1);
@ -233,8 +232,6 @@ static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb)
return false;
}
static int read_fifo_child(QuorumAIOCB *acb);
static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
{
int i;
@ -273,7 +270,7 @@ static void quorum_report_bad_versions(BDRVQuorumState *s,
}
}
static void quorum_rewrite_entry(void *opaque)
static void coroutine_fn quorum_rewrite_entry(void *opaque)
{
QuorumCo *co = opaque;
QuorumAIOCB *acb = co->acb;
@ -574,7 +571,7 @@ free_exit:
quorum_free_vote_list(&acb->votes);
}
static void read_quorum_children_entry(void *opaque)
static void coroutine_fn read_quorum_children_entry(void *opaque)
{
QuorumCo *co = opaque;
QuorumAIOCB *acb = co->acb;
@ -602,7 +599,7 @@ static void read_quorum_children_entry(void *opaque)
}
}
static int read_quorum_children(QuorumAIOCB *acb)
static int coroutine_fn read_quorum_children(QuorumAIOCB *acb)
{
BDRVQuorumState *s = acb->bs->opaque;
int i;
@ -643,7 +640,7 @@ static int read_quorum_children(QuorumAIOCB *acb)
return acb->vote_ret;
}
static int read_fifo_child(QuorumAIOCB *acb)
static int coroutine_fn read_fifo_child(QuorumAIOCB *acb)
{
BDRVQuorumState *s = acb->bs->opaque;
int n, ret;
@ -664,8 +661,10 @@ static int read_fifo_child(QuorumAIOCB *acb)
return ret;
}
static int quorum_co_preadv(BlockDriverState *bs, int64_t offset, int64_t bytes,
QEMUIOVector *qiov, BdrvRequestFlags flags)
static int coroutine_fn quorum_co_preadv(BlockDriverState *bs,
int64_t offset, int64_t bytes,
QEMUIOVector *qiov,
BdrvRequestFlags flags)
{
BDRVQuorumState *s = bs->opaque;
QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags);
@ -684,7 +683,7 @@ static int quorum_co_preadv(BlockDriverState *bs, int64_t offset, int64_t bytes,
return ret;
}
static void write_quorum_entry(void *opaque)
static void coroutine_fn write_quorum_entry(void *opaque)
{
QuorumCo *co = opaque;
QuorumAIOCB *acb = co->acb;
@ -715,9 +714,9 @@ static void write_quorum_entry(void *opaque)
}
}
static int quorum_co_pwritev(BlockDriverState *bs, int64_t offset,
int64_t bytes, QEMUIOVector *qiov,
BdrvRequestFlags flags)
static int coroutine_fn quorum_co_pwritev(BlockDriverState *bs, int64_t offset,
int64_t bytes, QEMUIOVector *qiov,
BdrvRequestFlags flags)
{
BDRVQuorumState *s = bs->opaque;
QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags);
@ -746,8 +745,9 @@ static int quorum_co_pwritev(BlockDriverState *bs, int64_t offset,
return ret;
}
static int quorum_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
int64_t bytes, BdrvRequestFlags flags)
static int coroutine_fn quorum_co_pwrite_zeroes(BlockDriverState *bs,
int64_t offset, int64_t bytes,
BdrvRequestFlags flags)
{
return quorum_co_pwritev(bs, offset, bytes, NULL,

View File

@ -411,7 +411,8 @@ static void raw_lock_medium(BlockDriverState *bs, bool locked)
bdrv_lock_medium(bs->file->bs, locked);
}
static int raw_co_ioctl(BlockDriverState *bs, unsigned long int req, void *buf)
static int coroutine_fn raw_co_ioctl(BlockDriverState *bs,
unsigned long int req, void *buf)
{
BDRVRawState *s = bs->opaque;
if (s->offset || s->has_size) {

View File

@ -142,6 +142,7 @@ static void replication_close(BlockDriverState *bs)
{
BDRVReplicationState *s = bs->opaque;
Job *commit_job;
GLOBAL_STATE_CODE();
if (s->stage == BLOCK_REPLICATION_RUNNING) {
replication_stop(s->rs, false, NULL);
@ -726,7 +727,9 @@ static void replication_stop(ReplicationState *rs, bool failover, Error **errp)
* disk, secondary disk in backup_job_completed().
*/
if (s->backup_job) {
aio_context_release(aio_context);
job_cancel_sync(&s->backup_job->job, true);
aio_context_acquire(aio_context);
}
if (!failover) {

View File

@ -162,7 +162,7 @@ static int coroutine_fn throttle_co_pwritev_compressed(BlockDriverState *bs,
BDRV_REQ_WRITE_COMPRESSED);
}
static int throttle_co_flush(BlockDriverState *bs)
static int coroutine_fn throttle_co_flush(BlockDriverState *bs)
{
return bdrv_co_flush(bs->file->bs);
}

View File

@ -1787,10 +1787,11 @@ static int coroutine_fn vmdk_co_block_status(BlockDriverState *bs,
return ret;
}
static int vmdk_write_extent(VmdkExtent *extent, int64_t cluster_offset,
int64_t offset_in_cluster, QEMUIOVector *qiov,
uint64_t qiov_offset, uint64_t n_bytes,
uint64_t offset)
static int coroutine_fn
vmdk_write_extent(VmdkExtent *extent, int64_t cluster_offset,
int64_t offset_in_cluster, QEMUIOVector *qiov,
uint64_t qiov_offset, uint64_t n_bytes,
uint64_t offset)
{
int ret;
VmdkGrainMarker *data = NULL;
@ -1868,9 +1869,10 @@ static int vmdk_write_extent(VmdkExtent *extent, int64_t cluster_offset,
return ret;
}
static int vmdk_read_extent(VmdkExtent *extent, int64_t cluster_offset,
int64_t offset_in_cluster, QEMUIOVector *qiov,
int bytes)
static int coroutine_fn
vmdk_read_extent(VmdkExtent *extent, int64_t cluster_offset,
int64_t offset_in_cluster, QEMUIOVector *qiov,
int bytes)
{
int ret;
int cluster_bytes, buf_bytes;
@ -2015,9 +2017,9 @@ fail:
*
* Returns: error code with 0 for success.
*/
static int vmdk_pwritev(BlockDriverState *bs, uint64_t offset,
uint64_t bytes, QEMUIOVector *qiov,
bool zeroed, bool zero_dry_run)
static int coroutine_fn vmdk_pwritev(BlockDriverState *bs, uint64_t offset,
uint64_t bytes, QEMUIOVector *qiov,
bool zeroed, bool zero_dry_run)
{
BDRVVmdkState *s = bs->opaque;
VmdkExtent *extent = NULL;

View File

@ -150,14 +150,12 @@ void blockdev_mark_auto_del(BlockBackend *blk)
return;
}
for (job = block_job_next(NULL); job; job = block_job_next(job)) {
JOB_LOCK_GUARD();
for (job = block_job_next_locked(NULL); job;
job = block_job_next_locked(job)) {
if (block_job_has_bdrv(job, blk_bs(blk))) {
AioContext *aio_context = job->job.aio_context;
aio_context_acquire(aio_context);
job_cancel(&job->job, false);
aio_context_release(aio_context);
job_cancel_locked(&job->job, false);
}
}
@ -1844,14 +1842,7 @@ static void drive_backup_abort(BlkActionState *common)
DriveBackupState *state = DO_UPCAST(DriveBackupState, common, common);
if (state->job) {
AioContext *aio_context;
aio_context = bdrv_get_aio_context(state->bs);
aio_context_acquire(aio_context);
job_cancel_sync(&state->job->job, true);
aio_context_release(aio_context);
}
}
@ -1945,14 +1936,7 @@ static void blockdev_backup_abort(BlkActionState *common)
BlockdevBackupState *state = DO_UPCAST(BlockdevBackupState, common, common);
if (state->job) {
AioContext *aio_context;
aio_context = bdrv_get_aio_context(state->bs);
aio_context_acquire(aio_context);
job_cancel_sync(&state->job->job, true);
aio_context_release(aio_context);
}
}
@ -3313,17 +3297,16 @@ out:
aio_context_release(aio_context);
}
/* Get a block job using its ID and acquire its AioContext */
static BlockJob *find_block_job(const char *id, AioContext **aio_context,
Error **errp)
/*
* Get a block job using its ID. Called with job_mutex held.
*/
static BlockJob *find_block_job_locked(const char *id, Error **errp)
{
BlockJob *job;
assert(id != NULL);
*aio_context = NULL;
job = block_job_get(id);
job = block_job_get_locked(id);
if (!job) {
error_set(errp, ERROR_CLASS_DEVICE_NOT_ACTIVE,
@ -3331,30 +3314,30 @@ static BlockJob *find_block_job(const char *id, AioContext **aio_context,
return NULL;
}
*aio_context = block_job_get_aio_context(job);
aio_context_acquire(*aio_context);
return job;
}
void qmp_block_job_set_speed(const char *device, int64_t speed, Error **errp)
{
AioContext *aio_context;
BlockJob *job = find_block_job(device, &aio_context, errp);
BlockJob *job;
JOB_LOCK_GUARD();
job = find_block_job_locked(device, errp);
if (!job) {
return;
}
block_job_set_speed(job, speed, errp);
aio_context_release(aio_context);
block_job_set_speed_locked(job, speed, errp);
}
void qmp_block_job_cancel(const char *device,
bool has_force, bool force, Error **errp)
{
AioContext *aio_context;
BlockJob *job = find_block_job(device, &aio_context, errp);
BlockJob *job;
JOB_LOCK_GUARD();
job = find_block_job_locked(device, errp);
if (!job) {
return;
@ -3364,97 +3347,94 @@ void qmp_block_job_cancel(const char *device,
force = false;
}
if (job_user_paused(&job->job) && !force) {
if (job_user_paused_locked(&job->job) && !force) {
error_setg(errp, "The block job for device '%s' is currently paused",
device);
goto out;
return;
}
trace_qmp_block_job_cancel(job);
job_user_cancel(&job->job, force, errp);
out:
aio_context_release(aio_context);
job_user_cancel_locked(&job->job, force, errp);
}
void qmp_block_job_pause(const char *device, Error **errp)
{
AioContext *aio_context;
BlockJob *job = find_block_job(device, &aio_context, errp);
BlockJob *job;
JOB_LOCK_GUARD();
job = find_block_job_locked(device, errp);
if (!job) {
return;
}
trace_qmp_block_job_pause(job);
job_user_pause(&job->job, errp);
aio_context_release(aio_context);
job_user_pause_locked(&job->job, errp);
}
void qmp_block_job_resume(const char *device, Error **errp)
{
AioContext *aio_context;
BlockJob *job = find_block_job(device, &aio_context, errp);
BlockJob *job;
JOB_LOCK_GUARD();
job = find_block_job_locked(device, errp);
if (!job) {
return;
}
trace_qmp_block_job_resume(job);
job_user_resume(&job->job, errp);
aio_context_release(aio_context);
job_user_resume_locked(&job->job, errp);
}
void qmp_block_job_complete(const char *device, Error **errp)
{
AioContext *aio_context;
BlockJob *job = find_block_job(device, &aio_context, errp);
BlockJob *job;
JOB_LOCK_GUARD();
job = find_block_job_locked(device, errp);
if (!job) {
return;
}
trace_qmp_block_job_complete(job);
job_complete(&job->job, errp);
aio_context_release(aio_context);
job_complete_locked(&job->job, errp);
}
void qmp_block_job_finalize(const char *id, Error **errp)
{
AioContext *aio_context;
BlockJob *job = find_block_job(id, &aio_context, errp);
BlockJob *job;
JOB_LOCK_GUARD();
job = find_block_job_locked(id, errp);
if (!job) {
return;
}
trace_qmp_block_job_finalize(job);
job_ref(&job->job);
job_finalize(&job->job, errp);
job_ref_locked(&job->job);
job_finalize_locked(&job->job, errp);
/*
* Job's context might have changed via job_finalize (and job_txn_apply
* automatically acquires the new one), so make sure we release the correct
* one.
*/
aio_context = block_job_get_aio_context(job);
job_unref(&job->job);
aio_context_release(aio_context);
job_unref_locked(&job->job);
}
void qmp_block_job_dismiss(const char *id, Error **errp)
{
AioContext *aio_context;
BlockJob *bjob = find_block_job(id, &aio_context, errp);
BlockJob *bjob;
Job *job;
JOB_LOCK_GUARD();
bjob = find_block_job_locked(id, errp);
if (!bjob) {
return;
}
trace_qmp_block_job_dismiss(bjob);
job = &bjob->job;
job_dismiss(&job, errp);
aio_context_release(aio_context);
job_dismiss_locked(&job, errp);
}
void qmp_change_backing_file(const char *device,
@ -3731,17 +3711,16 @@ BlockJobInfoList *qmp_query_block_jobs(Error **errp)
BlockJobInfoList *head = NULL, **tail = &head;
BlockJob *job;
for (job = block_job_next(NULL); job; job = block_job_next(job)) {
JOB_LOCK_GUARD();
for (job = block_job_next_locked(NULL); job;
job = block_job_next_locked(job)) {
BlockJobInfo *value;
AioContext *aio_context;
if (block_job_is_internal(job)) {
continue;
}
aio_context = block_job_get_aio_context(job);
aio_context_acquire(aio_context);
value = block_job_query(job, errp);
aio_context_release(aio_context);
value = block_job_query_locked(job, errp);
if (!value) {
qapi_free_BlockJobInfoList(head);
return NULL;

View File

@ -36,21 +36,6 @@
#include "qemu/main-loop.h"
#include "qemu/timer.h"
/*
* The block job API is composed of two categories of functions.
*
* The first includes functions used by the monitor. The monitor is
* peculiar in that it accesses the block job list with block_job_get, and
* therefore needs consistency across block_job_get and the actual operation
* (e.g. block_job_set_speed). The consistency is achieved with
* aio_context_acquire/release. These functions are declared in blockjob.h.
*
* The second includes functions used by the block job drivers and sometimes
* by the core block layer. These do not care about locking, because the
* whole coroutine runs under the AioContext lock, and are declared in
* blockjob_int.h.
*/
static bool is_block_job(Job *job)
{
return job_type(job) == JOB_TYPE_BACKUP ||
@ -59,21 +44,21 @@ static bool is_block_job(Job *job)
job_type(job) == JOB_TYPE_STREAM;
}
BlockJob *block_job_next(BlockJob *bjob)
BlockJob *block_job_next_locked(BlockJob *bjob)
{
Job *job = bjob ? &bjob->job : NULL;
GLOBAL_STATE_CODE();
do {
job = job_next(job);
job = job_next_locked(job);
} while (job && !is_block_job(job));
return job ? container_of(job, BlockJob, job) : NULL;
}
BlockJob *block_job_get(const char *id)
BlockJob *block_job_get_locked(const char *id)
{
Job *job = job_get(id);
Job *job = job_get_locked(id);
GLOBAL_STATE_CODE();
if (job && is_block_job(job)) {
@ -83,6 +68,12 @@ BlockJob *block_job_get(const char *id)
}
}
BlockJob *block_job_get(const char *id)
{
JOB_LOCK_GUARD();
return block_job_get_locked(id);
}
void block_job_free(Job *job)
{
BlockJob *bjob = container_of(job, BlockJob, job);
@ -114,8 +105,10 @@ static bool child_job_drained_poll(BdrvChild *c)
/* An inactive or completed job doesn't have any pending requests. Jobs
* with !job->busy are either already paused or have a pause point after
* being reentered, so no job driver code will run before they pause. */
if (!job->busy || job_is_completed(job)) {
return false;
WITH_JOB_LOCK_GUARD() {
if (!job->busy || job_is_completed_locked(job)) {
return false;
}
}
/* Otherwise, assume that it isn't fully stopped yet, but allow the job to
@ -163,12 +156,13 @@ static void child_job_set_aio_ctx(BdrvChild *c, AioContext *ctx,
bdrv_set_aio_context_ignore(sibling->bs, ctx, ignore);
}
job->job.aio_context = ctx;
job_set_aio_context(&job->job, ctx);
}
static AioContext *child_job_get_parent_aio_context(BdrvChild *c)
{
BlockJob *job = c->opaque;
GLOBAL_STATE_CODE();
return job->job.aio_context;
}
@ -250,7 +244,8 @@ int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
return 0;
}
static void block_job_on_idle(Notifier *n, void *opaque)
/* Called with job_mutex lock held. */
static void block_job_on_idle_locked(Notifier *n, void *opaque)
{
aio_wait_kick();
}
@ -271,14 +266,14 @@ static bool job_timer_pending(Job *job)
return timer_pending(&job->sleep_timer);
}
bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
bool block_job_set_speed_locked(BlockJob *job, int64_t speed, Error **errp)
{
const BlockJobDriver *drv = block_job_driver(job);
int64_t old_speed = job->speed;
GLOBAL_STATE_CODE();
if (job_apply_verb(&job->job, JOB_VERB_SET_SPEED, errp) < 0) {
if (job_apply_verb_locked(&job->job, JOB_VERB_SET_SPEED, errp) < 0) {
return false;
}
if (speed < 0) {
@ -292,7 +287,9 @@ bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
job->speed = speed;
if (drv->set_speed) {
job_unlock();
drv->set_speed(job, speed);
job_lock();
}
if (speed && speed <= old_speed) {
@ -300,18 +297,24 @@ bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
}
/* kick only if a timer is pending */
job_enter_cond(&job->job, job_timer_pending);
job_enter_cond_locked(&job->job, job_timer_pending);
return true;
}
static bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
{
JOB_LOCK_GUARD();
return block_job_set_speed_locked(job, speed, errp);
}
int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n)
{
IO_CODE();
return ratelimit_calculate_delay(&job->limit, n);
}
BlockJobInfo *block_job_query(BlockJob *job, Error **errp)
BlockJobInfo *block_job_query_locked(BlockJob *job, Error **errp)
{
BlockJobInfo *info;
uint64_t progress_current, progress_total;
@ -329,13 +332,13 @@ BlockJobInfo *block_job_query(BlockJob *job, Error **errp)
info = g_new0(BlockJobInfo, 1);
info->type = g_strdup(job_type_str(&job->job));
info->device = g_strdup(job->job.id);
info->busy = qatomic_read(&job->job.busy);
info->busy = job->job.busy;
info->paused = job->job.pause_count > 0;
info->offset = progress_current;
info->len = progress_total;
info->speed = job->speed;
info->io_status = job->iostatus;
info->ready = job_is_ready(&job->job),
info->ready = job_is_ready_locked(&job->job),
info->status = job->job.status;
info->auto_finalize = job->job.auto_finalize;
info->auto_dismiss = job->job.auto_dismiss;
@ -348,7 +351,8 @@ BlockJobInfo *block_job_query(BlockJob *job, Error **errp)
return info;
}
static void block_job_iostatus_set_err(BlockJob *job, int error)
/* Called with job lock held */
static void block_job_iostatus_set_err_locked(BlockJob *job, int error)
{
if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
job->iostatus = error == ENOSPC ? BLOCK_DEVICE_IO_STATUS_NOSPACE :
@ -356,7 +360,8 @@ static void block_job_iostatus_set_err(BlockJob *job, int error)
}
}
static void block_job_event_cancelled(Notifier *n, void *opaque)
/* Called with job_mutex lock held. */
static void block_job_event_cancelled_locked(Notifier *n, void *opaque)
{
BlockJob *job = opaque;
uint64_t progress_current, progress_total;
@ -375,7 +380,8 @@ static void block_job_event_cancelled(Notifier *n, void *opaque)
job->speed);
}
static void block_job_event_completed(Notifier *n, void *opaque)
/* Called with job_mutex lock held. */
static void block_job_event_completed_locked(Notifier *n, void *opaque)
{
BlockJob *job = opaque;
const char *msg = NULL;
@ -401,7 +407,8 @@ static void block_job_event_completed(Notifier *n, void *opaque)
msg);
}
static void block_job_event_pending(Notifier *n, void *opaque)
/* Called with job_mutex lock held. */
static void block_job_event_pending_locked(Notifier *n, void *opaque)
{
BlockJob *job = opaque;
@ -413,7 +420,8 @@ static void block_job_event_pending(Notifier *n, void *opaque)
job->job.id);
}
static void block_job_event_ready(Notifier *n, void *opaque)
/* Called with job_mutex lock held. */
static void block_job_event_ready_locked(Notifier *n, void *opaque)
{
BlockJob *job = opaque;
uint64_t progress_current, progress_total;
@ -433,11 +441,6 @@ static void block_job_event_ready(Notifier *n, void *opaque)
}
/*
* API for block job drivers and the block layer. These functions are
* declared in blockjob_int.h.
*/
void *block_job_create(const char *job_id, const BlockJobDriver *driver,
JobTxn *txn, BlockDriverState *bs, uint64_t perm,
uint64_t shared_perm, int64_t speed, int flags,
@ -463,19 +466,21 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
ratelimit_init(&job->limit);
job->finalize_cancelled_notifier.notify = block_job_event_cancelled;
job->finalize_completed_notifier.notify = block_job_event_completed;
job->pending_notifier.notify = block_job_event_pending;
job->ready_notifier.notify = block_job_event_ready;
job->idle_notifier.notify = block_job_on_idle;
job->finalize_cancelled_notifier.notify = block_job_event_cancelled_locked;
job->finalize_completed_notifier.notify = block_job_event_completed_locked;
job->pending_notifier.notify = block_job_event_pending_locked;
job->ready_notifier.notify = block_job_event_ready_locked;
job->idle_notifier.notify = block_job_on_idle_locked;
notifier_list_add(&job->job.on_finalize_cancelled,
&job->finalize_cancelled_notifier);
notifier_list_add(&job->job.on_finalize_completed,
&job->finalize_completed_notifier);
notifier_list_add(&job->job.on_pending, &job->pending_notifier);
notifier_list_add(&job->job.on_ready, &job->ready_notifier);
notifier_list_add(&job->job.on_idle, &job->idle_notifier);
WITH_JOB_LOCK_GUARD() {
notifier_list_add(&job->job.on_finalize_cancelled,
&job->finalize_cancelled_notifier);
notifier_list_add(&job->job.on_finalize_completed,
&job->finalize_completed_notifier);
notifier_list_add(&job->job.on_pending, &job->pending_notifier);
notifier_list_add(&job->job.on_ready, &job->ready_notifier);
notifier_list_add(&job->job.on_idle, &job->idle_notifier);
}
error_setg(&job->blocker, "block device is in use by block job: %s",
job_type_str(&job->job));
@ -498,7 +503,7 @@ fail:
return NULL;
}
void block_job_iostatus_reset(BlockJob *job)
void block_job_iostatus_reset_locked(BlockJob *job)
{
GLOBAL_STATE_CODE();
if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
@ -508,6 +513,12 @@ void block_job_iostatus_reset(BlockJob *job)
job->iostatus = BLOCK_DEVICE_IO_STATUS_OK;
}
static void block_job_iostatus_reset(BlockJob *job)
{
JOB_LOCK_GUARD();
block_job_iostatus_reset_locked(job);
}
void block_job_user_resume(Job *job)
{
BlockJob *bjob = container_of(job, BlockJob, job);
@ -546,12 +557,17 @@ BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
action);
}
if (action == BLOCK_ERROR_ACTION_STOP) {
if (!job->job.user_paused) {
job_pause(&job->job);
/* make the pause user visible, which will be resumed from QMP. */
job->job.user_paused = true;
WITH_JOB_LOCK_GUARD() {
if (!job->job.user_paused) {
job_pause_locked(&job->job);
/*
* make the pause user visible, which will be
* resumed from QMP.
*/
job->job.user_paused = true;
}
block_job_iostatus_set_err_locked(job, error);
}
block_job_iostatus_set_err(job, error);
}
return action;
}

View File

@ -424,21 +424,24 @@ typedef struct V9fsGetlock
extern int open_fd_hw;
extern int total_open_fd;
static inline void v9fs_path_write_lock(V9fsState *s)
static inline void coroutine_fn
v9fs_path_write_lock(V9fsState *s)
{
if (s->ctx.export_flags & V9FS_PATHNAME_FSCONTEXT) {
qemu_co_rwlock_wrlock(&s->rename_lock);
}
}
static inline void v9fs_path_read_lock(V9fsState *s)
static inline void coroutine_fn
v9fs_path_read_lock(V9fsState *s)
{
if (s->ctx.export_flags & V9FS_PATHNAME_FSCONTEXT) {
qemu_co_rwlock_rdlock(&s->rename_lock);
}
}
static inline void v9fs_path_unlock(V9fsState *s)
static inline void coroutine_fn
v9fs_path_unlock(V9fsState *s)
{
if (s->ctx.export_flags & V9FS_PATHNAME_FSCONTEXT) {
qemu_co_rwlock_unlock(&s->rename_lock);

View File

@ -59,10 +59,13 @@ typedef struct {
extern AioWait global_aio_wait;
/**
* AIO_WAIT_WHILE:
* AIO_WAIT_WHILE_INTERNAL:
* @ctx: the aio context, or NULL if multiple aio contexts (for which the
* caller does not hold a lock) are involved in the polling condition.
* @cond: wait while this conditional expression is true
* @unlock: whether to unlock and then lock again @ctx. This apples
* only when waiting for another AioContext from the main loop.
* Otherwise it's ignored.
*
* Wait while a condition is true. Use this to implement synchronous
* operations that require event loop activity.
@ -75,7 +78,7 @@ extern AioWait global_aio_wait;
* wait on conditions between two IOThreads since that could lead to deadlock,
* go via the main loop instead.
*/
#define AIO_WAIT_WHILE(ctx, cond) ({ \
#define AIO_WAIT_WHILE_INTERNAL(ctx, cond, unlock) ({ \
bool waited_ = false; \
AioWait *wait_ = &global_aio_wait; \
AioContext *ctx_ = (ctx); \
@ -92,11 +95,11 @@ extern AioWait global_aio_wait;
assert(qemu_get_current_aio_context() == \
qemu_get_aio_context()); \
while ((cond)) { \
if (ctx_) { \
if (unlock && ctx_) { \
aio_context_release(ctx_); \
} \
aio_poll(qemu_get_aio_context(), true); \
if (ctx_) { \
if (unlock && ctx_) { \
aio_context_acquire(ctx_); \
} \
waited_ = true; \
@ -105,6 +108,12 @@ extern AioWait global_aio_wait;
qatomic_dec(&wait_->num_waiters); \
waited_; })
#define AIO_WAIT_WHILE(ctx, cond) \
AIO_WAIT_WHILE_INTERNAL(ctx, cond, true)
#define AIO_WAIT_WHILE_UNLOCKED(ctx, cond) \
AIO_WAIT_WHILE_INTERNAL(ctx, cond, false)
/**
* aio_wait_kick:
* Wake up the main thread if it is waiting on AIO_WAIT_WHILE(). During

View File

@ -40,21 +40,38 @@ typedef struct BlockJobDriver BlockJobDriver;
* Long-running operation on a BlockDriverState.
*/
typedef struct BlockJob {
/** Data belonging to the generic Job infrastructure */
/**
* Data belonging to the generic Job infrastructure.
* Protected by job mutex.
*/
Job job;
/** Status that is published by the query-block-jobs QMP API */
/**
* Status that is published by the query-block-jobs QMP API.
* Protected by job mutex.
*/
BlockDeviceIoStatus iostatus;
/** Speed that was set with @block_job_set_speed. */
/**
* Speed that was set with @block_job_set_speed.
* Always modified and read under QEMU global mutex (GLOBAL_STATE_CODE).
*/
int64_t speed;
/** Rate limiting data structure for implementing @speed. */
/**
* Rate limiting data structure for implementing @speed.
* RateLimit API is thread-safe.
*/
RateLimit limit;
/** Block other operations when block job is running */
/**
* Block other operations when block job is running.
* Always modified and read under QEMU global mutex (GLOBAL_STATE_CODE).
*/
Error *blocker;
/** All notifiers are set once in block_job_create() and never modified. */
/** Called when a cancelled job is finalised. */
Notifier finalize_cancelled_notifier;
@ -70,7 +87,10 @@ typedef struct BlockJob {
/** Called when the job coroutine yields or terminates */
Notifier idle_notifier;
/** BlockDriverStates that are involved in this block job */
/**
* BlockDriverStates that are involved in this block job.
* Always modified and read under QEMU global mutex (GLOBAL_STATE_CODE).
*/
GSList *nodes;
} BlockJob;
@ -82,15 +102,16 @@ typedef struct BlockJob {
*/
/**
* block_job_next:
* block_job_next_locked:
* @job: A block job, or %NULL.
*
* Get the next element from the list of block jobs after @job, or the
* first one if @job is %NULL.
*
* Returns the requested job, or %NULL if there are no more jobs left.
* Called with job lock held.
*/
BlockJob *block_job_next(BlockJob *job);
BlockJob *block_job_next_locked(BlockJob *job);
/**
* block_job_get:
@ -99,9 +120,13 @@ BlockJob *block_job_next(BlockJob *job);
* Get the block job identified by @id (which must not be %NULL).
*
* Returns the requested job, or %NULL if it doesn't exist.
* Called with job lock *not* held.
*/
BlockJob *block_job_get(const char *id);
/* Same as block_job_get(), but called with job lock held. */
BlockJob *block_job_get_locked(const char *id);
/**
* block_job_add_bdrv:
* @job: A block job
@ -135,32 +160,38 @@ void block_job_remove_all_bdrv(BlockJob *job);
bool block_job_has_bdrv(BlockJob *job, BlockDriverState *bs);
/**
* block_job_set_speed:
* block_job_set_speed_locked:
* @job: The job to set the speed for.
* @speed: The new value
* @errp: Error object.
*
* Set a rate-limiting parameter for the job; the actual meaning may
* vary depending on the job type.
*
* Called with job lock held, but might release it temporarily.
*/
bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp);
bool block_job_set_speed_locked(BlockJob *job, int64_t speed, Error **errp);
/**
* block_job_query:
* block_job_query_locked:
* @job: The job to get information about.
*
* Return information about a job.
*
* Called with job lock held.
*/
BlockJobInfo *block_job_query(BlockJob *job, Error **errp);
BlockJobInfo *block_job_query_locked(BlockJob *job, Error **errp);
/**
* block_job_iostatus_reset:
* block_job_iostatus_reset_locked:
* @job: The job whose I/O status should be reset.
*
* Reset I/O status on @job and on BlockDriverState objects it uses,
* other than job->blk.
*
* Called with job lock held.
*/
void block_job_iostatus_reset(BlockJob *job);
void block_job_iostatus_reset_locked(BlockJob *job);
/*
* block_job_get_aio_context:

View File

@ -424,6 +424,6 @@ QIOChannel *coroutine_fn
nbd_co_establish_connection(NBDClientConnection *conn, NBDExportInfo *info,
bool blocking, Error **errp);
void coroutine_fn nbd_co_establish_connection_cancel(NBDClientConnection *conn);
void nbd_co_establish_connection_cancel(NBDClientConnection *conn);
#endif

View File

@ -92,12 +92,12 @@ void coroutine_fn qemu_coroutine_yield(void);
/**
* Get the AioContext of the given coroutine
*/
AioContext *coroutine_fn qemu_coroutine_get_aio_context(Coroutine *co);
AioContext *qemu_coroutine_get_aio_context(Coroutine *co);
/**
* Get the currently executing coroutine
*/
Coroutine *coroutine_fn qemu_coroutine_self(void);
Coroutine *qemu_coroutine_self(void);
/**
* Return whether or not currently inside a coroutine

View File

@ -40,27 +40,62 @@ typedef struct JobTxn JobTxn;
* Long-running operation.
*/
typedef struct Job {
/* Fields set at initialization (job_create), and never modified */
/** The ID of the job. May be NULL for internal jobs. */
char *id;
/** The type of this job. */
/**
* The type of this job.
* All callbacks are called with job_mutex *not* held.
*/
const JobDriver *driver;
/**
* The coroutine that executes the job. If not NULL, it is reentered when
* busy is false and the job is cancelled.
* Initialized in job_start()
*/
Coroutine *co;
/** True if this job should automatically finalize itself */
bool auto_finalize;
/** True if this job should automatically dismiss itself */
bool auto_dismiss;
/**
* The completion function that will be called when the job completes.
* Called with AioContext lock held, since many callback implementations
* use bdrv_* functions that require to hold the lock.
*/
BlockCompletionFunc *cb;
/** The opaque value that is passed to the completion function. */
void *opaque;
/* ProgressMeter API is thread-safe */
ProgressMeter progress;
/**
* AioContext to run the job coroutine in.
* The job Aiocontext can be read when holding *either*
* the BQL (so we are in the main loop) or the job_mutex.
* It can only be written when we hold *both* BQL
* and the job_mutex.
*/
AioContext *aio_context;
/** Protected by job_mutex */
/** Reference count of the block job */
int refcnt;
/** Current state; See @JobStatus for details. */
JobStatus status;
/** AioContext to run the job coroutine in */
AioContext *aio_context;
/**
* The coroutine that executes the job. If not NULL, it is reentered when
* busy is false and the job is cancelled.
*/
Coroutine *co;
/**
* Timer that is used by @job_sleep_ns. Accessed under job_mutex (in
* job.c).
@ -76,7 +111,7 @@ typedef struct Job {
/**
* Set to false by the job while the coroutine has yielded and may be
* re-entered by job_enter(). There may still be I/O or event loop activity
* pending. Accessed under block_job_mutex (in blockjob.c).
* pending. Accessed under job_mutex.
*
* When the job is deferred to the main loop, busy is true as long as the
* bottom half is still pending.
@ -112,14 +147,6 @@ typedef struct Job {
/** Set to true when the job has deferred work to the main loop. */
bool deferred_to_main_loop;
/** True if this job should automatically finalize itself */
bool auto_finalize;
/** True if this job should automatically dismiss itself */
bool auto_dismiss;
ProgressMeter progress;
/**
* Return code from @run and/or @prepare callback(s).
* Not final until the job has reached the CONCLUDED status.
@ -134,12 +161,6 @@ typedef struct Job {
*/
Error *err;
/** The completion function that will be called when the job completes. */
BlockCompletionFunc *cb;
/** The opaque value that is passed to the completion function. */
void *opaque;
/** Notifiers called when a cancelled job is finalised */
NotifierList on_finalize_cancelled;
@ -167,6 +188,7 @@ typedef struct Job {
/**
* Callbacks and other information about a Job driver.
* All callbacks are invoked with job_mutex *not* held.
*/
struct JobDriver {
@ -242,6 +264,9 @@ struct JobDriver {
*
* This callback will not be invoked if the job has already failed.
* If it fails, abort and then clean will be called.
*
* Called with AioContext lock held, since many callbacs implementations
* use bdrv_* functions that require to hold the lock.
*/
int (*prepare)(Job *job);
@ -252,6 +277,9 @@ struct JobDriver {
*
* All jobs will complete with a call to either .commit() or .abort() but
* never both.
*
* Called with AioContext lock held, since many callback implementations
* use bdrv_* functions that require to hold the lock.
*/
void (*commit)(Job *job);
@ -262,6 +290,9 @@ struct JobDriver {
*
* All jobs will complete with a call to either .commit() or .abort() but
* never both.
*
* Called with AioContext lock held, since many callback implementations
* use bdrv_* functions that require to hold the lock.
*/
void (*abort)(Job *job);
@ -270,6 +301,9 @@ struct JobDriver {
* .commit() or .abort(). Regardless of which callback is invoked after
* completion, .clean() will always be called, even if the job does not
* belong to a transaction group.
*
* Called with AioContext lock held, since many callbacs implementations
* use bdrv_* functions that require to hold the lock.
*/
void (*clean)(Job *job);
@ -284,11 +318,18 @@ struct JobDriver {
* READY).
* (If the callback is NULL, the job is assumed to terminate
* without I/O.)
*
* Called with AioContext lock held, since many callback implementations
* use bdrv_* functions that require to hold the lock.
*/
bool (*cancel)(Job *job, bool force);
/** Called when the job is freed */
/**
* Called when the job is freed.
* Called with AioContext lock held, since many callback implementations
* use bdrv_* functions that require to hold the lock.
*/
void (*free)(Job *job);
};
@ -303,6 +344,30 @@ typedef enum JobCreateFlags {
JOB_MANUAL_DISMISS = 0x04,
} JobCreateFlags;
extern QemuMutex job_mutex;
#define JOB_LOCK_GUARD() QEMU_LOCK_GUARD(&job_mutex)
#define WITH_JOB_LOCK_GUARD() WITH_QEMU_LOCK_GUARD(&job_mutex)
/**
* job_lock:
*
* Take the mutex protecting the list of jobs and their status.
* Most functions called by the monitor need to call job_lock
* and job_unlock manually. On the other hand, function called
* by the block jobs themselves and by the block layer will take the
* lock for you.
*/
void job_lock(void);
/**
* job_unlock:
*
* Release the mutex protecting the list of jobs and their status.
*/
void job_unlock(void);
/**
* Allocate and return a new job transaction. Jobs can be added to the
* transaction using job_txn_add_job().
@ -319,23 +384,20 @@ JobTxn *job_txn_new(void);
/**
* Release a reference that was previously acquired with job_txn_add_job or
* job_txn_new. If it's the last reference to the object, it will be freed.
*
* Called with job lock *not* held.
*/
void job_txn_unref(JobTxn *txn);
/**
* @txn: The transaction (may be NULL)
* @job: Job to add to the transaction
*
* Add @job to the transaction. The @job must not already be in a transaction.
* The caller must call either job_txn_unref() or job_completed() to release
* the reference that is automatically grabbed here.
*
* If @txn is NULL, the function does nothing.
/*
* Same as job_txn_unref(), but called with job lock held.
* Might release the lock temporarily.
*/
void job_txn_add_job(JobTxn *txn, Job *job);
void job_txn_unref_locked(JobTxn *txn);
/**
* Create a new long-running job and return it.
* Called with job_mutex *not* held.
*
* @job_id: The id of the newly-created job, or %NULL for internal jobs
* @driver: The class object for the newly-created job.
@ -353,20 +415,27 @@ void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
/**
* Add a reference to Job refcnt, it will be decreased with job_unref, and then
* be freed if it comes to be the last reference.
*
* Called with job lock held.
*/
void job_ref(Job *job);
void job_ref_locked(Job *job);
/**
* Release a reference that was previously acquired with job_ref() or
* Release a reference that was previously acquired with job_ref_locked() or
* job_create(). If it's the last reference to the object, it will be freed.
*
* Takes AioContext lock internally to invoke a job->driver callback.
* Called with job lock held.
*/
void job_unref(Job *job);
void job_unref_locked(Job *job);
/**
* @job: The job that has made progress
* @done: How much progress the job made since the last call
*
* Updates the progress counter of the job.
*
* May be called with mutex held or not held.
*/
void job_progress_update(Job *job, uint64_t done);
@ -377,6 +446,8 @@ void job_progress_update(Job *job, uint64_t done);
*
* Sets the expected end value of the progress counter of a job so that a
* completion percentage can be calculated when the progress is updated.
*
* May be called with mutex held or not held.
*/
void job_progress_set_remaining(Job *job, uint64_t remaining);
@ -392,27 +463,27 @@ void job_progress_set_remaining(Job *job, uint64_t remaining);
* length before, and job_progress_update() afterwards.
* (So the operation acts as a parenthesis in regards to the main job
* operation running in background.)
*
* May be called with mutex held or not held.
*/
void job_progress_increase_remaining(Job *job, uint64_t delta);
/** To be called when a cancelled job is finalised. */
void job_event_cancelled(Job *job);
/** To be called when a successfully completed job is finalised. */
void job_event_completed(Job *job);
/**
* Conditionally enter the job coroutine if the job is ready to run, not
* already busy and fn() returns true. fn() is called while under the job_lock
* critical section.
*
* Called with job lock held, but might release it temporarily.
*/
void job_enter_cond(Job *job, bool(*fn)(Job *job));
void job_enter_cond_locked(Job *job, bool(*fn)(Job *job));
/**
* @job: A job that has not yet been started.
*
* Begins execution of a job.
* Takes ownership of one reference to the job object.
*
* Called with job_mutex *not* held.
*/
void job_start(Job *job);
@ -420,6 +491,7 @@ void job_start(Job *job);
* @job: The job to enter.
*
* Continue the specified job by entering the coroutine.
* Called with job_mutex *not* held.
*/
void job_enter(Job *job);
@ -428,6 +500,8 @@ void job_enter(Job *job);
*
* Pause now if job_pause() has been called. Jobs that perform lots of I/O
* must call this between requests so that the job can be paused.
*
* Called with job_mutex *not* held.
*/
void coroutine_fn job_pause_point(Job *job);
@ -435,8 +509,9 @@ void coroutine_fn job_pause_point(Job *job);
* @job: The job that calls the function.
*
* Yield the job coroutine.
* Called with job_mutex *not* held.
*/
void job_yield(Job *job);
void coroutine_fn job_yield(Job *job);
/**
* @job: The job that calls the function.
@ -445,10 +520,11 @@ void job_yield(Job *job);
* Put the job to sleep (assuming that it wasn't canceled) for @ns
* %QEMU_CLOCK_REALTIME nanoseconds. Canceling the job will immediately
* interrupt the wait.
*
* Called with job_mutex *not* held.
*/
void coroutine_fn job_sleep_ns(Job *job, int64_t ns);
/** Returns the JobType of a given Job. */
JobType job_type(const Job *job);
@ -458,88 +534,138 @@ const char *job_type_str(const Job *job);
/** Returns true if the job should not be visible to the management layer. */
bool job_is_internal(Job *job);
/** Returns whether the job is being cancelled. */
/**
* Returns whether the job is being cancelled.
* Called with job_mutex *not* held.
*/
bool job_is_cancelled(Job *job);
/* Same as job_is_cancelled(), but called with job lock held. */
bool job_is_cancelled_locked(Job *job);
/**
* Returns whether the job is scheduled for cancellation (at an
* indefinite point).
* Called with job_mutex *not* held.
*/
bool job_cancel_requested(Job *job);
/** Returns whether the job is in a completed state. */
bool job_is_completed(Job *job);
/**
* Returns whether the job is in a completed state.
* Called with job lock held.
*/
bool job_is_completed_locked(Job *job);
/** Returns whether the job is ready to be completed. */
/**
* Returns whether the job is ready to be completed.
* Called with job_mutex *not* held.
*/
bool job_is_ready(Job *job);
/* Same as job_is_ready(), but called with job lock held. */
bool job_is_ready_locked(Job *job);
/**
* Request @job to pause at the next pause point. Must be paired with
* job_resume(). If the job is supposed to be resumed by user action, call
* job_user_pause() instead.
* job_user_pause_locked() instead.
*
* Called with job lock *not* held.
*/
void job_pause(Job *job);
/** Resumes a @job paused with job_pause. */
/* Same as job_pause(), but called with job lock held. */
void job_pause_locked(Job *job);
/** Resumes a @job paused with job_pause. Called with job lock *not* held. */
void job_resume(Job *job);
/*
* Same as job_resume(), but called with job lock held.
* Might release the lock temporarily.
*/
void job_resume_locked(Job *job);
/**
* Asynchronously pause the specified @job.
* Do not allow a resume until a matching call to job_user_resume.
* Called with job lock held.
*/
void job_user_pause(Job *job, Error **errp);
void job_user_pause_locked(Job *job, Error **errp);
/** Returns true if the job is user-paused. */
bool job_user_paused(Job *job);
/**
* Returns true if the job is user-paused.
* Called with job lock held.
*/
bool job_user_paused_locked(Job *job);
/**
* Resume the specified @job.
* Must be paired with a preceding job_user_pause.
* Must be paired with a preceding job_user_pause_locked.
* Called with job lock held, but might release it temporarily.
*/
void job_user_resume(Job *job, Error **errp);
void job_user_resume_locked(Job *job, Error **errp);
/**
* Get the next element from the list of block jobs after @job, or the
* first one if @job is %NULL.
*
* Returns the requested job, or %NULL if there are no more jobs left.
* Called with job lock *not* held.
*/
Job *job_next(Job *job);
/* Same as job_next(), but called with job lock held. */
Job *job_next_locked(Job *job);
/**
* Get the job identified by @id (which must not be %NULL).
*
* Returns the requested job, or %NULL if it doesn't exist.
* Called with job lock held.
*/
Job *job_get(const char *id);
Job *job_get_locked(const char *id);
/**
* Check whether the verb @verb can be applied to @job in its current state.
* Returns 0 if the verb can be applied; otherwise errp is set and -EPERM
* returned.
*
* Called with job lock held.
*/
int job_apply_verb(Job *job, JobVerb verb, Error **errp);
int job_apply_verb_locked(Job *job, JobVerb verb, Error **errp);
/** The @job could not be started, free it. */
/**
* The @job could not be started, free it.
* Called with job_mutex *not* held.
*/
void job_early_fail(Job *job);
/** Moves the @job from RUNNING to READY */
/**
* Moves the @job from RUNNING to READY.
* Called with job_mutex *not* held.
*/
void job_transition_to_ready(Job *job);
/** Asynchronously complete the specified @job. */
void job_complete(Job *job, Error **errp);
/**
* Asynchronously complete the specified @job.
* Called with job lock held, but might release it temporarily.
*/
void job_complete_locked(Job *job, Error **errp);
/**
* Asynchronously cancel the specified @job. If @force is true, the job should
* be cancelled immediately without waiting for a consistent state.
* Called with job lock held.
*/
void job_cancel(Job *job, bool force);
void job_cancel_locked(Job *job, bool force);
/**
* Cancels the specified job like job_cancel(), but may refuse to do so if the
* operation isn't meaningful in the current state of the job.
* Cancels the specified job like job_cancel_locked(), but may refuse
* to do so if the operation isn't meaningful in the current state of the job.
* Called with job lock held.
*/
void job_user_cancel(Job *job, bool force, Error **errp);
void job_user_cancel_locked(Job *job, bool force, Error **errp);
/**
* Synchronously cancel the @job. The completion callback is called
@ -550,16 +676,23 @@ void job_user_cancel(Job *job, bool force, Error **errp);
* Returns the return value from the job if the job actually completed
* during the call, or -ECANCELED if it was canceled.
*
* Callers must hold the AioContext lock of job->aio_context.
* Called with job_lock *not* held.
*/
int job_cancel_sync(Job *job, bool force);
/** Synchronously force-cancels all jobs using job_cancel_sync(). */
/* Same as job_cancel_sync, but called with job lock held. */
int job_cancel_sync_locked(Job *job, bool force);
/**
* Synchronously force-cancels all jobs using job_cancel_sync_locked().
*
* Called with job_lock *not* held.
*/
void job_cancel_sync_all(void);
/**
* @job: The job to be completed.
* @errp: Error object which may be set by job_complete(); this is not
* @errp: Error object which may be set by job_complete_locked(); this is not
* necessarily set on every error, the job return value has to be
* checked as well.
*
@ -568,10 +701,9 @@ void job_cancel_sync_all(void);
* function).
*
* Returns the return value from the job.
*
* Callers must hold the AioContext lock of job->aio_context.
* Called with job_lock held.
*/
int job_complete_sync(Job *job, Error **errp);
int job_complete_sync_locked(Job *job, Error **errp);
/**
* For a @job that has finished its work and is pending awaiting explicit
@ -580,14 +712,18 @@ int job_complete_sync(Job *job, Error **errp);
* FIXME: Make the below statement universally true:
* For jobs that support the manual workflow mode, all graph changes that occur
* as a result will occur after this command and before a successful reply.
*
* Called with job lock held.
*/
void job_finalize(Job *job, Error **errp);
void job_finalize_locked(Job *job, Error **errp);
/**
* Remove the concluded @job from the query list and resets the passed pointer
* to %NULL. Returns an error if the job is not actually concluded.
*
* Called with job lock held.
*/
void job_dismiss(Job **job, Error **errp);
void job_dismiss_locked(Job **job, Error **errp);
/**
* Synchronously finishes the given @job. If @finish is given, it is called to
@ -596,8 +732,20 @@ void job_dismiss(Job **job, Error **errp);
* Returns 0 if the job is successfully completed, -ECANCELED if the job was
* cancelled before completing, and -errno in other error cases.
*
* Callers must hold the AioContext lock of job->aio_context.
* Called with job_lock held, but might release it temporarily.
*/
int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp);
int job_finish_sync_locked(Job *job, void (*finish)(Job *, Error **errp),
Error **errp);
/**
* Sets the @job->aio_context.
* Called with job_mutex *not* held.
*
* This function must run in the main thread to protect against
* concurrent read in job_finish_sync_locked(), takes the job_mutex
* lock to protect against the read in job_do_yield_locked(), and must
* be called when the job is quiescent.
*/
void job_set_aio_context(Job *job, AioContext *ctx);
#endif

View File

@ -29,119 +29,117 @@
#include "qapi/error.h"
#include "trace/trace-root.h"
/* Get a job using its ID and acquire its AioContext */
static Job *find_job(const char *id, AioContext **aio_context, Error **errp)
/*
* Get a job using its ID. Called with job_mutex held.
*/
static Job *find_job_locked(const char *id, Error **errp)
{
Job *job;
*aio_context = NULL;
job = job_get(id);
job = job_get_locked(id);
if (!job) {
error_setg(errp, "Job not found");
return NULL;
}
*aio_context = job->aio_context;
aio_context_acquire(*aio_context);
return job;
}
void qmp_job_cancel(const char *id, Error **errp)
{
AioContext *aio_context;
Job *job = find_job(id, &aio_context, errp);
Job *job;
JOB_LOCK_GUARD();
job = find_job_locked(id, errp);
if (!job) {
return;
}
trace_qmp_job_cancel(job);
job_user_cancel(job, true, errp);
aio_context_release(aio_context);
job_user_cancel_locked(job, true, errp);
}
void qmp_job_pause(const char *id, Error **errp)
{
AioContext *aio_context;
Job *job = find_job(id, &aio_context, errp);
Job *job;
JOB_LOCK_GUARD();
job = find_job_locked(id, errp);
if (!job) {
return;
}
trace_qmp_job_pause(job);
job_user_pause(job, errp);
aio_context_release(aio_context);
job_user_pause_locked(job, errp);
}
void qmp_job_resume(const char *id, Error **errp)
{
AioContext *aio_context;
Job *job = find_job(id, &aio_context, errp);
Job *job;
JOB_LOCK_GUARD();
job = find_job_locked(id, errp);
if (!job) {
return;
}
trace_qmp_job_resume(job);
job_user_resume(job, errp);
aio_context_release(aio_context);
job_user_resume_locked(job, errp);
}
void qmp_job_complete(const char *id, Error **errp)
{
AioContext *aio_context;
Job *job = find_job(id, &aio_context, errp);
Job *job;
JOB_LOCK_GUARD();
job = find_job_locked(id, errp);
if (!job) {
return;
}
trace_qmp_job_complete(job);
job_complete(job, errp);
aio_context_release(aio_context);
job_complete_locked(job, errp);
}
void qmp_job_finalize(const char *id, Error **errp)
{
AioContext *aio_context;
Job *job = find_job(id, &aio_context, errp);
Job *job;
JOB_LOCK_GUARD();
job = find_job_locked(id, errp);
if (!job) {
return;
}
trace_qmp_job_finalize(job);
job_ref(job);
job_finalize(job, errp);
job_ref_locked(job);
job_finalize_locked(job, errp);
/*
* Job's context might have changed via job_finalize (and job_txn_apply
* automatically acquires the new one), so make sure we release the correct
* one.
*/
aio_context = job->aio_context;
job_unref(job);
aio_context_release(aio_context);
job_unref_locked(job);
}
void qmp_job_dismiss(const char *id, Error **errp)
{
AioContext *aio_context;
Job *job = find_job(id, &aio_context, errp);
Job *job;
JOB_LOCK_GUARD();
job = find_job_locked(id, errp);
if (!job) {
return;
}
trace_qmp_job_dismiss(job);
job_dismiss(&job, errp);
aio_context_release(aio_context);
job_dismiss_locked(&job, errp);
}
static JobInfo *job_query_single(Job *job, Error **errp)
/* Called with job_mutex held. */
static JobInfo *job_query_single_locked(Job *job, Error **errp)
{
JobInfo *info;
uint64_t progress_current;
@ -171,17 +169,15 @@ JobInfoList *qmp_query_jobs(Error **errp)
JobInfoList *head = NULL, **tail = &head;
Job *job;
for (job = job_next(NULL); job; job = job_next(job)) {
JOB_LOCK_GUARD();
for (job = job_next_locked(NULL); job; job = job_next_locked(job)) {
JobInfo *value;
AioContext *aio_context;
if (job_is_internal(job)) {
continue;
}
aio_context = job->aio_context;
aio_context_acquire(aio_context);
value = job_query_single(job, errp);
aio_context_release(aio_context);
value = job_query_single_locked(job, errp);
if (!value) {
qapi_free_JobInfoList(head);
return NULL;

712
job.c

File diff suppressed because it is too large Load Diff

View File

@ -574,7 +574,8 @@ static void process_incoming_migration_bh(void *opaque)
migration_incoming_state_destroy();
}
static void process_incoming_migration_co(void *opaque)
static void coroutine_fn
process_incoming_migration_co(void *opaque)
{
MigrationIncomingState *mis = migration_incoming_get_current();
PostcopyState ps;

View File

@ -135,8 +135,11 @@ void qmp_cont(Error **errp)
blk_iostatus_reset(blk);
}
for (job = block_job_next(NULL); job; job = block_job_next(job)) {
block_job_iostatus_reset(job);
WITH_JOB_LOCK_GUARD() {
for (job = block_job_next_locked(NULL); job;
job = block_job_next_locked(job)) {
block_job_iostatus_reset_locked(job);
}
}
/* Continuing after completed migration. Images have been inactivated to

View File

@ -1541,8 +1541,8 @@
# -> { "execute": "blockdev-add",
# "arguments": { "driver": "qcow2",
# "node-name": "node1534",
# "data-file": { "driver": "file",
# "filename": "hd1.qcow2" },
# "file": { "driver": "file",
# "filename": "hd1.qcow2" },
# "backing": null } }
#
# <- { "return": {} }
@ -4378,7 +4378,7 @@
# "arguments": {
# "driver": "qcow2",
# "node-name": "test1",
# "data-file": {
# "file": {
# "driver": "file",
# "filename": "test.qcow2"
# }
@ -4395,7 +4395,7 @@
# "cache": {
# "direct": true
# },
# "data-file": {
# "file": {
# "driver": "file",
# "filename": "/tmp/test.qcow2"
# },
@ -4477,7 +4477,7 @@
# "arguments": {
# "driver": "qcow2",
# "node-name": "node0",
# "data-file": {
# "file": {
# "driver": "file",
# "filename": "test.qcow2"
# }

View File

@ -911,10 +911,11 @@ static void run_block_job(BlockJob *job, Error **errp)
AioContext *aio_context = block_job_get_aio_context(job);
int ret = 0;
aio_context_acquire(aio_context);
job_ref(&job->job);
job_lock();
job_ref_locked(&job->job);
do {
float progress = 0.0f;
job_unlock();
aio_poll(aio_context, true);
progress_get_snapshot(&job->job.progress, &progress_current,
@ -923,15 +924,17 @@ static void run_block_job(BlockJob *job, Error **errp)
progress = (float)progress_current / progress_total * 100.f;
}
qemu_progress_print(progress, 0);
} while (!job_is_ready(&job->job) && !job_is_completed(&job->job));
job_lock();
} while (!job_is_ready_locked(&job->job) &&
!job_is_completed_locked(&job->job));
if (!job_is_completed(&job->job)) {
ret = job_complete_sync(&job->job, errp);
if (!job_is_completed_locked(&job->job)) {
ret = job_complete_sync_locked(&job->job, errp);
} else {
ret = job->job.ret;
}
job_unref(&job->job);
aio_context_release(aio_context);
job_unref_locked(&job->job);
job_unlock();
/* publish completion progress only when success */
if (!ret) {

View File

@ -930,9 +930,9 @@ static void test_blockjob_common_drain_node(enum drain_type drain_type,
tjob->prepare_ret = -EIO;
break;
}
aio_context_release(ctx);
job_start(&job->job);
aio_context_release(ctx);
if (use_iothread) {
/* job_co_entry() is run in the I/O thread, wait for the actual job
@ -943,63 +943,85 @@ static void test_blockjob_common_drain_node(enum drain_type drain_type,
}
}
g_assert_cmpint(job->job.pause_count, ==, 0);
g_assert_false(job->job.paused);
g_assert_true(tjob->running);
g_assert_true(job->job.busy); /* We're in qemu_co_sleep_ns() */
WITH_JOB_LOCK_GUARD() {
g_assert_cmpint(job->job.pause_count, ==, 0);
g_assert_false(job->job.paused);
g_assert_true(tjob->running);
g_assert_true(job->job.busy); /* We're in qemu_co_sleep_ns() */
}
do_drain_begin_unlocked(drain_type, drain_bs);
if (drain_type == BDRV_DRAIN_ALL) {
/* bdrv_drain_all() drains both src and target */
g_assert_cmpint(job->job.pause_count, ==, 2);
} else {
g_assert_cmpint(job->job.pause_count, ==, 1);
WITH_JOB_LOCK_GUARD() {
if (drain_type == BDRV_DRAIN_ALL) {
/* bdrv_drain_all() drains both src and target */
g_assert_cmpint(job->job.pause_count, ==, 2);
} else {
g_assert_cmpint(job->job.pause_count, ==, 1);
}
g_assert_true(job->job.paused);
g_assert_false(job->job.busy); /* The job is paused */
}
g_assert_true(job->job.paused);
g_assert_false(job->job.busy); /* The job is paused */
do_drain_end_unlocked(drain_type, drain_bs);
if (use_iothread) {
/* paused is reset in the I/O thread, wait for it */
/*
* Here we are waiting for the paused status to change,
* so don't bother protecting the read every time.
*
* paused is reset in the I/O thread, wait for it
*/
while (job->job.paused) {
aio_poll(qemu_get_aio_context(), false);
}
}
g_assert_cmpint(job->job.pause_count, ==, 0);
g_assert_false(job->job.paused);
g_assert_true(job->job.busy); /* We're in qemu_co_sleep_ns() */
WITH_JOB_LOCK_GUARD() {
g_assert_cmpint(job->job.pause_count, ==, 0);
g_assert_false(job->job.paused);
g_assert_true(job->job.busy); /* We're in qemu_co_sleep_ns() */
}
do_drain_begin_unlocked(drain_type, target);
if (drain_type == BDRV_DRAIN_ALL) {
/* bdrv_drain_all() drains both src and target */
g_assert_cmpint(job->job.pause_count, ==, 2);
} else {
g_assert_cmpint(job->job.pause_count, ==, 1);
WITH_JOB_LOCK_GUARD() {
if (drain_type == BDRV_DRAIN_ALL) {
/* bdrv_drain_all() drains both src and target */
g_assert_cmpint(job->job.pause_count, ==, 2);
} else {
g_assert_cmpint(job->job.pause_count, ==, 1);
}
g_assert_true(job->job.paused);
g_assert_false(job->job.busy); /* The job is paused */
}
g_assert_true(job->job.paused);
g_assert_false(job->job.busy); /* The job is paused */
do_drain_end_unlocked(drain_type, target);
if (use_iothread) {
/* paused is reset in the I/O thread, wait for it */
/*
* Here we are waiting for the paused status to change,
* so don't bother protecting the read every time.
*
* paused is reset in the I/O thread, wait for it
*/
while (job->job.paused) {
aio_poll(qemu_get_aio_context(), false);
}
}
g_assert_cmpint(job->job.pause_count, ==, 0);
g_assert_false(job->job.paused);
g_assert_true(job->job.busy); /* We're in qemu_co_sleep_ns() */
WITH_JOB_LOCK_GUARD() {
g_assert_cmpint(job->job.pause_count, ==, 0);
g_assert_false(job->job.paused);
g_assert_true(job->job.busy); /* We're in qemu_co_sleep_ns() */
}
aio_context_acquire(ctx);
ret = job_complete_sync(&job->job, &error_abort);
WITH_JOB_LOCK_GUARD() {
ret = job_complete_sync_locked(&job->job, &error_abort);
}
g_assert_cmpint(ret, ==, (result == TEST_JOB_SUCCESS ? 0 : -EIO));
aio_context_acquire(ctx);
if (use_iothread) {
blk_set_aio_context(blk_src, qemu_get_aio_context(), &error_abort);
assert(blk_get_aio_context(blk_target) == qemu_get_aio_context());

View File

@ -582,8 +582,10 @@ static void test_attach_blockjob(void)
aio_poll(qemu_get_aio_context(), false);
}
WITH_JOB_LOCK_GUARD() {
job_complete_sync_locked(&tjob->common.job, &error_abort);
}
aio_context_acquire(ctx);
job_complete_sync(&tjob->common.job, &error_abort);
blk_set_aio_context(blk, qemu_get_aio_context(), &error_abort);
aio_context_release(ctx);
@ -757,7 +759,9 @@ static void test_propagate_mirror(void)
BLOCKDEV_ON_ERROR_REPORT, BLOCKDEV_ON_ERROR_REPORT,
false, "filter_node", MIRROR_COPY_MODE_BACKGROUND,
&error_abort);
job = job_get("job0");
WITH_JOB_LOCK_GUARD() {
job = job_get_locked("job0");
}
filter = bdrv_find_node("filter_node");
/* Change the AioContext of src */

View File

@ -116,8 +116,10 @@ static void test_single_job(int expected)
job = test_block_job_start(1, true, expected, &result, txn);
job_start(&job->job);
if (expected == -ECANCELED) {
job_cancel(&job->job, false);
WITH_JOB_LOCK_GUARD() {
if (expected == -ECANCELED) {
job_cancel_locked(&job->job, false);
}
}
while (result == -EINPROGRESS) {
@ -160,13 +162,15 @@ static void test_pair_jobs(int expected1, int expected2)
/* Release our reference now to trigger as many nice
* use-after-free bugs as possible.
*/
job_txn_unref(txn);
WITH_JOB_LOCK_GUARD() {
job_txn_unref_locked(txn);
if (expected1 == -ECANCELED) {
job_cancel(&job1->job, false);
}
if (expected2 == -ECANCELED) {
job_cancel(&job2->job, false);
if (expected1 == -ECANCELED) {
job_cancel_locked(&job1->job, false);
}
if (expected2 == -ECANCELED) {
job_cancel_locked(&job2->job, false);
}
}
while (result1 == -EINPROGRESS || result2 == -EINPROGRESS) {
@ -219,7 +223,9 @@ static void test_pair_jobs_fail_cancel_race(void)
job_start(&job1->job);
job_start(&job2->job);
job_cancel(&job1->job, false);
WITH_JOB_LOCK_GUARD() {
job_cancel_locked(&job1->job, false);
}
/* Now make job2 finish before the main loop kicks jobs. This simulates
* the race between a pending kick and another job completing.

View File

@ -211,8 +211,11 @@ static CancelJob *create_common(Job **pjob)
bjob = mk_job(blk, "Steve", &test_cancel_driver, true,
JOB_MANUAL_FINALIZE | JOB_MANUAL_DISMISS);
job = &bjob->job;
job_ref(job);
assert(job->status == JOB_STATUS_CREATED);
WITH_JOB_LOCK_GUARD() {
job_ref_locked(job);
assert(job->status == JOB_STATUS_CREATED);
}
s = container_of(bjob, CancelJob, common);
s->blk = blk;
@ -225,21 +228,22 @@ static void cancel_common(CancelJob *s)
BlockJob *job = &s->common;
BlockBackend *blk = s->blk;
JobStatus sts = job->job.status;
AioContext *ctx;
ctx = job->job.aio_context;
aio_context_acquire(ctx);
AioContext *ctx = job->job.aio_context;
job_cancel_sync(&job->job, true);
if (sts != JOB_STATUS_CREATED && sts != JOB_STATUS_CONCLUDED) {
Job *dummy = &job->job;
job_dismiss(&dummy, &error_abort);
WITH_JOB_LOCK_GUARD() {
if (sts != JOB_STATUS_CREATED && sts != JOB_STATUS_CONCLUDED) {
Job *dummy = &job->job;
job_dismiss_locked(&dummy, &error_abort);
}
assert(job->job.status == JOB_STATUS_NULL);
job_unref_locked(&job->job);
}
assert(job->job.status == JOB_STATUS_NULL);
job_unref(&job->job);
destroy_blk(blk);
aio_context_acquire(ctx);
destroy_blk(blk);
aio_context_release(ctx);
}
static void test_cancel_created(void)
@ -251,6 +255,13 @@ static void test_cancel_created(void)
cancel_common(s);
}
static void assert_job_status_is(Job *job, int status)
{
WITH_JOB_LOCK_GUARD() {
assert(job->status == status);
}
}
static void test_cancel_running(void)
{
Job *job;
@ -259,7 +270,7 @@ static void test_cancel_running(void)
s = create_common(&job);
job_start(job);
assert(job->status == JOB_STATUS_RUNNING);
assert_job_status_is(job, JOB_STATUS_RUNNING);
cancel_common(s);
}
@ -272,11 +283,12 @@ static void test_cancel_paused(void)
s = create_common(&job);
job_start(job);
assert(job->status == JOB_STATUS_RUNNING);
job_user_pause(job, &error_abort);
WITH_JOB_LOCK_GUARD() {
assert(job->status == JOB_STATUS_RUNNING);
job_user_pause_locked(job, &error_abort);
}
job_enter(job);
assert(job->status == JOB_STATUS_PAUSED);
assert_job_status_is(job, JOB_STATUS_PAUSED);
cancel_common(s);
}
@ -289,11 +301,11 @@ static void test_cancel_ready(void)
s = create_common(&job);
job_start(job);
assert(job->status == JOB_STATUS_RUNNING);
assert_job_status_is(job, JOB_STATUS_RUNNING);
s->should_converge = true;
job_enter(job);
assert(job->status == JOB_STATUS_READY);
assert_job_status_is(job, JOB_STATUS_READY);
cancel_common(s);
}
@ -306,15 +318,16 @@ static void test_cancel_standby(void)
s = create_common(&job);
job_start(job);
assert(job->status == JOB_STATUS_RUNNING);
assert_job_status_is(job, JOB_STATUS_RUNNING);
s->should_converge = true;
job_enter(job);
assert(job->status == JOB_STATUS_READY);
job_user_pause(job, &error_abort);
WITH_JOB_LOCK_GUARD() {
assert(job->status == JOB_STATUS_READY);
job_user_pause_locked(job, &error_abort);
}
job_enter(job);
assert(job->status == JOB_STATUS_STANDBY);
assert_job_status_is(job, JOB_STATUS_STANDBY);
cancel_common(s);
}
@ -327,20 +340,21 @@ static void test_cancel_pending(void)
s = create_common(&job);
job_start(job);
assert(job->status == JOB_STATUS_RUNNING);
assert_job_status_is(job, JOB_STATUS_RUNNING);
s->should_converge = true;
job_enter(job);
assert(job->status == JOB_STATUS_READY);
job_complete(job, &error_abort);
WITH_JOB_LOCK_GUARD() {
assert(job->status == JOB_STATUS_READY);
job_complete_locked(job, &error_abort);
}
job_enter(job);
while (!job->deferred_to_main_loop) {
aio_poll(qemu_get_aio_context(), true);
}
assert(job->status == JOB_STATUS_READY);
assert_job_status_is(job, JOB_STATUS_READY);
aio_poll(qemu_get_aio_context(), true);
assert(job->status == JOB_STATUS_PENDING);
assert_job_status_is(job, JOB_STATUS_PENDING);
cancel_common(s);
}
@ -353,25 +367,26 @@ static void test_cancel_concluded(void)
s = create_common(&job);
job_start(job);
assert(job->status == JOB_STATUS_RUNNING);
assert_job_status_is(job, JOB_STATUS_RUNNING);
s->should_converge = true;
job_enter(job);
assert(job->status == JOB_STATUS_READY);
job_complete(job, &error_abort);
WITH_JOB_LOCK_GUARD() {
assert(job->status == JOB_STATUS_READY);
job_complete_locked(job, &error_abort);
}
job_enter(job);
while (!job->deferred_to_main_loop) {
aio_poll(qemu_get_aio_context(), true);
}
assert(job->status == JOB_STATUS_READY);
assert_job_status_is(job, JOB_STATUS_READY);
aio_poll(qemu_get_aio_context(), true);
assert(job->status == JOB_STATUS_PENDING);
assert_job_status_is(job, JOB_STATUS_PENDING);
aio_context_acquire(job->aio_context);
job_finalize(job, &error_abort);
aio_context_release(job->aio_context);
assert(job->status == JOB_STATUS_CONCLUDED);
WITH_JOB_LOCK_GUARD() {
job_finalize_locked(job, &error_abort);
assert(job->status == JOB_STATUS_CONCLUDED);
}
cancel_common(s);
}
@ -417,7 +432,7 @@ static const BlockJobDriver test_yielding_driver = {
};
/*
* Test that job_complete() works even on jobs that are in a paused
* Test that job_complete_locked() works even on jobs that are in a paused
* state (i.e., STANDBY).
*
* To do this, run YieldingJob in an IO thread, get it into the READY
@ -425,7 +440,7 @@ static const BlockJobDriver test_yielding_driver = {
* acquire the context so the job will not be entered and will thus
* remain on STANDBY.
*
* job_complete() should still work without error.
* job_complete_locked() should still work without error.
*
* Note that on the QMP interface, it is impossible to lock an IO
* thread before a drained section ends. In practice, the
@ -459,37 +474,44 @@ static void test_complete_in_standby(void)
bjob = mk_job(blk, "job", &test_yielding_driver, true,
JOB_MANUAL_FINALIZE | JOB_MANUAL_DISMISS);
job = &bjob->job;
assert(job->status == JOB_STATUS_CREATED);
assert_job_status_is(job, JOB_STATUS_CREATED);
/* Wait for the job to become READY */
job_start(job);
aio_context_acquire(ctx);
AIO_WAIT_WHILE(ctx, job->status != JOB_STATUS_READY);
aio_context_release(ctx);
/*
* Here we are waiting for the status to change, so don't bother
* protecting the read every time.
*/
AIO_WAIT_WHILE_UNLOCKED(ctx, job->status != JOB_STATUS_READY);
/* Begin the drained section, pausing the job */
bdrv_drain_all_begin();
assert(job->status == JOB_STATUS_STANDBY);
assert_job_status_is(job, JOB_STATUS_STANDBY);
/* Lock the IO thread to prevent the job from being run */
aio_context_acquire(ctx);
/* This will schedule the job to resume it */
bdrv_drain_all_end();
aio_context_release(ctx);
/* But the job cannot run, so it will remain on standby */
assert(job->status == JOB_STATUS_STANDBY);
WITH_JOB_LOCK_GUARD() {
/* But the job cannot run, so it will remain on standby */
assert(job->status == JOB_STATUS_STANDBY);
/* Even though the job is on standby, this should work */
job_complete(job, &error_abort);
/* Even though the job is on standby, this should work */
job_complete_locked(job, &error_abort);
/* The test is done now, clean up. */
job_finish_sync(job, NULL, &error_abort);
assert(job->status == JOB_STATUS_PENDING);
/* The test is done now, clean up. */
job_finish_sync_locked(job, NULL, &error_abort);
assert(job->status == JOB_STATUS_PENDING);
job_finalize(job, &error_abort);
assert(job->status == JOB_STATUS_CONCLUDED);
job_finalize_locked(job, &error_abort);
assert(job->status == JOB_STATUS_CONCLUDED);
job_dismiss(&job, &error_abort);
job_dismiss_locked(&job, &error_abort);
}
aio_context_acquire(ctx);
destroy_blk(blk);
aio_context_release(ctx);
iothread_join(iothread);

View File

@ -610,7 +610,7 @@ static void perf_baseline(void)
g_test_message("Function call %u iterations: %f s", maxcycles, duration);
}
static __attribute__((noinline)) void perf_cost_func(void *opaque)
static __attribute__((noinline)) void coroutine_fn perf_cost_func(void *opaque)
{
qemu_coroutine_yield();
}

View File

@ -135,7 +135,7 @@ typedef struct CoWaitRecord {
QSLIST_ENTRY(CoWaitRecord) next;
} CoWaitRecord;
static void push_waiter(CoMutex *mutex, CoWaitRecord *w)
static void coroutine_fn push_waiter(CoMutex *mutex, CoWaitRecord *w)
{
w->co = qemu_coroutine_self();
QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next);
@ -332,7 +332,7 @@ void qemu_co_rwlock_init(CoRwlock *lock)
}
/* Releases the internal CoMutex. */
static void qemu_co_rwlock_maybe_wake_one(CoRwlock *lock)
static void coroutine_fn qemu_co_rwlock_maybe_wake_one(CoRwlock *lock)
{
CoRwTicket *tkt = QSIMPLEQ_FIRST(&lock->tickets);
Coroutine *co = NULL;
@ -365,7 +365,7 @@ static void qemu_co_rwlock_maybe_wake_one(CoRwlock *lock)
}
}
void qemu_co_rwlock_rdlock(CoRwlock *lock)
void coroutine_fn qemu_co_rwlock_rdlock(CoRwlock *lock)
{
Coroutine *self = qemu_coroutine_self();
@ -390,7 +390,7 @@ void qemu_co_rwlock_rdlock(CoRwlock *lock)
self->locks_held++;
}
void qemu_co_rwlock_unlock(CoRwlock *lock)
void coroutine_fn qemu_co_rwlock_unlock(CoRwlock *lock)
{
Coroutine *self = qemu_coroutine_self();
@ -408,7 +408,7 @@ void qemu_co_rwlock_unlock(CoRwlock *lock)
qemu_co_rwlock_maybe_wake_one(lock);
}
void qemu_co_rwlock_downgrade(CoRwlock *lock)
void coroutine_fn qemu_co_rwlock_downgrade(CoRwlock *lock)
{
qemu_co_mutex_lock(&lock->mutex);
assert(lock->owners == -1);
@ -418,7 +418,7 @@ void qemu_co_rwlock_downgrade(CoRwlock *lock)
qemu_co_rwlock_maybe_wake_one(lock);
}
void qemu_co_rwlock_wrlock(CoRwlock *lock)
void coroutine_fn qemu_co_rwlock_wrlock(CoRwlock *lock)
{
Coroutine *self = qemu_coroutine_self();
@ -438,7 +438,7 @@ void qemu_co_rwlock_wrlock(CoRwlock *lock)
self->locks_held++;
}
void qemu_co_rwlock_upgrade(CoRwlock *lock)
void coroutine_fn qemu_co_rwlock_upgrade(CoRwlock *lock)
{
qemu_co_mutex_lock(&lock->mutex);
assert(lock->owners > 0);

View File

@ -213,7 +213,7 @@ bool qemu_coroutine_entered(Coroutine *co)
return co->caller;
}
AioContext *coroutine_fn qemu_coroutine_get_aio_context(Coroutine *co)
AioContext *qemu_coroutine_get_aio_context(Coroutine *co)
{
return co->ctx;
}