block/mirror: Convert to coroutines

In order to talk to the source BDS (and maybe in the future to the
target BDS as well) directly, we need to convert our existing AIO
requests into coroutine I/O requests.

Signed-off-by: Max Reitz <mreitz@redhat.com>
Reviewed-by: Fam Zheng <famz@redhat.com>
Message-id: 20180613181823.13618-3-mreitz@redhat.com
Signed-off-by: Max Reitz <mreitz@redhat.com>
master
Max Reitz 2018-06-13 20:18:11 +02:00
parent 4295c5fc61
commit 2e1990b26e
1 changed files with 90 additions and 62 deletions

View File

@ -78,6 +78,10 @@ typedef struct MirrorOp {
QEMUIOVector qiov; QEMUIOVector qiov;
int64_t offset; int64_t offset;
uint64_t bytes; uint64_t bytes;
/* The pointee is set by mirror_co_read(), mirror_co_zero(), and
* mirror_co_discard() before yielding for the first time */
int64_t *bytes_handled;
} MirrorOp; } MirrorOp;
typedef enum MirrorMethod { typedef enum MirrorMethod {
@ -99,7 +103,7 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
} }
} }
static void mirror_iteration_done(MirrorOp *op, int ret) static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret)
{ {
MirrorBlockJob *s = op->s; MirrorBlockJob *s = op->s;
struct iovec *iov; struct iovec *iov;
@ -136,9 +140,8 @@ static void mirror_iteration_done(MirrorOp *op, int ret)
} }
} }
static void mirror_write_complete(void *opaque, int ret) static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret)
{ {
MirrorOp *op = opaque;
MirrorBlockJob *s = op->s; MirrorBlockJob *s = op->s;
aio_context_acquire(blk_get_aio_context(s->common.blk)); aio_context_acquire(blk_get_aio_context(s->common.blk));
@ -155,9 +158,8 @@ static void mirror_write_complete(void *opaque, int ret)
aio_context_release(blk_get_aio_context(s->common.blk)); aio_context_release(blk_get_aio_context(s->common.blk));
} }
static void mirror_read_complete(void *opaque, int ret) static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret)
{ {
MirrorOp *op = opaque;
MirrorBlockJob *s = op->s; MirrorBlockJob *s = op->s;
aio_context_acquire(blk_get_aio_context(s->common.blk)); aio_context_acquire(blk_get_aio_context(s->common.blk));
@ -172,8 +174,9 @@ static void mirror_read_complete(void *opaque, int ret)
mirror_iteration_done(op, ret); mirror_iteration_done(op, ret);
} else { } else {
blk_aio_pwritev(s->target, op->offset, &op->qiov, ret = blk_co_pwritev(s->target, op->offset,
0, mirror_write_complete, op); op->qiov.size, &op->qiov, 0);
mirror_write_complete(op, ret);
} }
aio_context_release(blk_get_aio_context(s->common.blk)); aio_context_release(blk_get_aio_context(s->common.blk));
} }
@ -230,60 +233,57 @@ static inline void mirror_wait_for_io(MirrorBlockJob *s)
s->waiting_for_io = false; s->waiting_for_io = false;
} }
/* Submit async read while handling COW. /* Perform a mirror copy operation.
* Returns: The number of bytes copied after and including offset, *
* excluding any bytes copied prior to offset due to alignment. * *op->bytes_handled is set to the number of bytes copied after and
* This will be @bytes if no alignment is necessary, or * including offset, excluding any bytes copied prior to offset due
* (new_end - offset) if tail is rounded up or down due to * to alignment. This will be op->bytes if no alignment is necessary,
* alignment or buffer limit. * or (new_end - op->offset) if the tail is rounded up or down due to
* alignment or buffer limit.
*/ */
static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset, static void coroutine_fn mirror_co_read(void *opaque)
uint64_t bytes)
{ {
MirrorOp *op = opaque;
MirrorBlockJob *s = op->s;
BlockBackend *source = s->common.blk; BlockBackend *source = s->common.blk;
int nb_chunks; int nb_chunks;
uint64_t ret; uint64_t ret;
MirrorOp *op;
uint64_t max_bytes; uint64_t max_bytes;
max_bytes = s->granularity * s->max_iov; max_bytes = s->granularity * s->max_iov;
/* We can only handle as much as buf_size at a time. */ /* We can only handle as much as buf_size at a time. */
bytes = MIN(s->buf_size, MIN(max_bytes, bytes)); op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes));
assert(bytes); assert(op->bytes);
assert(bytes < BDRV_REQUEST_MAX_BYTES); assert(op->bytes < BDRV_REQUEST_MAX_BYTES);
ret = bytes; *op->bytes_handled = op->bytes;
if (s->cow_bitmap) { if (s->cow_bitmap) {
ret += mirror_cow_align(s, &offset, &bytes); *op->bytes_handled += mirror_cow_align(s, &op->offset, &op->bytes);
} }
assert(bytes <= s->buf_size); /* Cannot exceed BDRV_REQUEST_MAX_BYTES + INT_MAX */
assert(*op->bytes_handled <= UINT_MAX);
assert(op->bytes <= s->buf_size);
/* The offset is granularity-aligned because: /* The offset is granularity-aligned because:
* 1) Caller passes in aligned values; * 1) Caller passes in aligned values;
* 2) mirror_cow_align is used only when target cluster is larger. */ * 2) mirror_cow_align is used only when target cluster is larger. */
assert(QEMU_IS_ALIGNED(offset, s->granularity)); assert(QEMU_IS_ALIGNED(op->offset, s->granularity));
/* The range is sector-aligned, since bdrv_getlength() rounds up. */ /* The range is sector-aligned, since bdrv_getlength() rounds up. */
assert(QEMU_IS_ALIGNED(bytes, BDRV_SECTOR_SIZE)); assert(QEMU_IS_ALIGNED(op->bytes, BDRV_SECTOR_SIZE));
nb_chunks = DIV_ROUND_UP(bytes, s->granularity); nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity);
while (s->buf_free_count < nb_chunks) { while (s->buf_free_count < nb_chunks) {
trace_mirror_yield_in_flight(s, offset, s->in_flight); trace_mirror_yield_in_flight(s, op->offset, s->in_flight);
mirror_wait_for_io(s); mirror_wait_for_io(s);
} }
/* Allocate a MirrorOp that is used as an AIO callback. */
op = g_new(MirrorOp, 1);
op->s = s;
op->offset = offset;
op->bytes = bytes;
/* Now make a QEMUIOVector taking enough granularity-sized chunks /* Now make a QEMUIOVector taking enough granularity-sized chunks
* from s->buf_free. * from s->buf_free.
*/ */
qemu_iovec_init(&op->qiov, nb_chunks); qemu_iovec_init(&op->qiov, nb_chunks);
while (nb_chunks-- > 0) { while (nb_chunks-- > 0) {
MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free); MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free);
size_t remaining = bytes - op->qiov.size; size_t remaining = op->bytes - op->qiov.size;
QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next); QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next);
s->buf_free_count--; s->buf_free_count--;
@ -292,53 +292,81 @@ static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset,
/* Copy the dirty cluster. */ /* Copy the dirty cluster. */
s->in_flight++; s->in_flight++;
s->bytes_in_flight += bytes; s->bytes_in_flight += op->bytes;
trace_mirror_one_iteration(s, offset, bytes); trace_mirror_one_iteration(s, op->offset, op->bytes);
blk_aio_preadv(source, offset, &op->qiov, 0, mirror_read_complete, op); ret = blk_co_preadv(source, op->offset, op->bytes, &op->qiov, 0);
return ret; mirror_read_complete(op, ret);
} }
static void mirror_do_zero_or_discard(MirrorBlockJob *s, static void coroutine_fn mirror_co_zero(void *opaque)
int64_t offset,
uint64_t bytes,
bool is_discard)
{ {
MirrorOp *op; MirrorOp *op = opaque;
int ret;
/* Allocate a MirrorOp that is used as an AIO callback. The qiov is zeroed op->s->in_flight++;
* so the freeing in mirror_iteration_done is nop. */ op->s->bytes_in_flight += op->bytes;
op = g_new0(MirrorOp, 1); *op->bytes_handled = op->bytes;
op->s = s;
op->offset = offset;
op->bytes = bytes;
s->in_flight++; ret = blk_co_pwrite_zeroes(op->s->target, op->offset, op->bytes,
s->bytes_in_flight += bytes; op->s->unmap ? BDRV_REQ_MAY_UNMAP : 0);
if (is_discard) { mirror_write_complete(op, ret);
blk_aio_pdiscard(s->target, offset, }
op->bytes, mirror_write_complete, op);
} else { static void coroutine_fn mirror_co_discard(void *opaque)
blk_aio_pwrite_zeroes(s->target, offset, {
op->bytes, s->unmap ? BDRV_REQ_MAY_UNMAP : 0, MirrorOp *op = opaque;
mirror_write_complete, op); int ret;
}
op->s->in_flight++;
op->s->bytes_in_flight += op->bytes;
*op->bytes_handled = op->bytes;
ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes);
mirror_write_complete(op, ret);
} }
static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset, static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset,
unsigned bytes, MirrorMethod mirror_method) unsigned bytes, MirrorMethod mirror_method)
{ {
MirrorOp *op;
Coroutine *co;
int64_t bytes_handled = -1;
op = g_new(MirrorOp, 1);
*op = (MirrorOp){
.s = s,
.offset = offset,
.bytes = bytes,
.bytes_handled = &bytes_handled,
};
switch (mirror_method) { switch (mirror_method) {
case MIRROR_METHOD_COPY: case MIRROR_METHOD_COPY:
return mirror_do_read(s, offset, bytes); co = qemu_coroutine_create(mirror_co_read, op);
break;
case MIRROR_METHOD_ZERO: case MIRROR_METHOD_ZERO:
co = qemu_coroutine_create(mirror_co_zero, op);
break;
case MIRROR_METHOD_DISCARD: case MIRROR_METHOD_DISCARD:
mirror_do_zero_or_discard(s, offset, bytes, co = qemu_coroutine_create(mirror_co_discard, op);
mirror_method == MIRROR_METHOD_DISCARD); break;
return bytes;
default: default:
abort(); abort();
} }
qemu_coroutine_enter(co);
/* At this point, ownership of op has been moved to the coroutine
* and the object may already be freed */
/* Assert that this value has been set */
assert(bytes_handled >= 0);
/* Same assertion as in mirror_co_read() (and for mirror_co_read()
* and mirror_co_discard(), bytes_handled == op->bytes, which
* is the @bytes parameter given to this function) */
assert(bytes_handled <= UINT_MAX);
return bytes_handled;
} }
static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s) static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s)