diff --git a/block/rbd.c b/block/rbd.c index f453f04757..121fae221e 100644 --- a/block/rbd.c +++ b/block/rbd.c @@ -95,18 +95,13 @@ typedef struct RADOSCB { #define RBD_FD_WRITE 1 typedef struct BDRVRBDState { - int fds[2]; rados_t cluster; rados_ioctx_t io_ctx; rbd_image_t image; char name[RBD_MAX_IMAGE_NAME_SIZE]; char *snap; - int event_reader_pos; - RADOSCB *event_rcb; } BDRVRBDState; -static void rbd_aio_bh_cb(void *opaque); - static int qemu_rbd_next_tok(char *dst, int dst_len, char *src, char delim, const char *name, @@ -369,9 +364,8 @@ static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options, } /* - * This aio completion is being called from qemu_rbd_aio_event_reader() - * and runs in qemu context. It schedules a bh, but just in case the aio - * was not cancelled before. + * This aio completion is being called from rbd_finish_bh() and runs in qemu + * BH context. */ static void qemu_rbd_complete_aio(RADOSCB *rcb) { @@ -401,36 +395,19 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb) acb->ret = r; } } - /* Note that acb->bh can be NULL in case where the aio was cancelled */ - acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); - qemu_bh_schedule(acb->bh); + g_free(rcb); -} -/* - * aio fd read handler. It runs in the qemu context and calls the - * completion handling of completed rados aio operations. - */ -static void qemu_rbd_aio_event_reader(void *opaque) -{ - BDRVRBDState *s = opaque; + if (acb->cmd == RBD_AIO_READ) { + qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size); + } + qemu_vfree(acb->bounce); + acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); + acb->status = 0; - ssize_t ret; - - do { - char *p = (char *)&s->event_rcb; - - /* now read the rcb pointer that was sent from a non qemu thread */ - ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos, - sizeof(s->event_rcb) - s->event_reader_pos); - if (ret > 0) { - s->event_reader_pos += ret; - if (s->event_reader_pos == sizeof(s->event_rcb)) { - s->event_reader_pos = 0; - qemu_rbd_complete_aio(s->event_rcb); - } - } - } while (ret < 0 && errno == EINTR); + if (!acb->cancelled) { + qemu_aio_release(acb); + } } /* TODO Convert to fine grained options */ @@ -538,23 +515,9 @@ static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags, bs->read_only = (s->snap != NULL); - s->event_reader_pos = 0; - r = qemu_pipe(s->fds); - if (r < 0) { - error_report("error opening eventfd"); - goto failed; - } - fcntl(s->fds[0], F_SETFL, O_NONBLOCK); - fcntl(s->fds[1], F_SETFL, O_NONBLOCK); - qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader, - NULL, s); - - qemu_opts_del(opts); return 0; -failed: - rbd_close(s->image); failed_open: rados_ioctx_destroy(s->io_ctx); failed_shutdown: @@ -569,10 +532,6 @@ static void qemu_rbd_close(BlockDriverState *bs) { BDRVRBDState *s = bs->opaque; - close(s->fds[0]); - close(s->fds[1]); - qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL, NULL, NULL); - rbd_close(s->image); rados_ioctx_destroy(s->io_ctx); g_free(s->snap); @@ -600,34 +559,11 @@ static const AIOCBInfo rbd_aiocb_info = { .cancel = qemu_rbd_aio_cancel, }; -static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb) +static void rbd_finish_bh(void *opaque) { - int ret = 0; - while (1) { - fd_set wfd; - int fd = s->fds[RBD_FD_WRITE]; - - /* send the op pointer to the qemu thread that is responsible - for the aio/op completion. Must do it in a qemu thread context */ - ret = write(fd, (void *)&rcb, sizeof(rcb)); - if (ret >= 0) { - break; - } - if (errno == EINTR) { - continue; - } - if (errno != EAGAIN) { - break; - } - - FD_ZERO(&wfd); - FD_SET(fd, &wfd); - do { - ret = select(fd + 1, NULL, &wfd, NULL, NULL); - } while (ret < 0 && errno == EINTR); - } - - return ret; + RADOSCB *rcb = opaque; + qemu_bh_delete(rcb->acb->bh); + qemu_rbd_complete_aio(rcb); } /* @@ -635,40 +571,18 @@ static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb) * * Note: this function is being called from a non qemu thread so * we need to be careful about what we do here. Generally we only - * write to the block notification pipe, and do the rest of the - * io completion handling from qemu_rbd_aio_event_reader() which - * runs in a qemu context. + * schedule a BH, and do the rest of the io completion handling + * from rbd_finish_bh() which runs in a qemu context. */ static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb) { - int ret; + RBDAIOCB *acb = rcb->acb; + rcb->ret = rbd_aio_get_return_value(c); rbd_aio_release(c); - ret = qemu_rbd_send_pipe(rcb->s, rcb); - if (ret < 0) { - error_report("failed writing to acb->s->fds"); - g_free(rcb); - } -} -/* Callback when all queued rbd_aio requests are complete */ - -static void rbd_aio_bh_cb(void *opaque) -{ - RBDAIOCB *acb = opaque; - - if (acb->cmd == RBD_AIO_READ) { - qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size); - } - qemu_vfree(acb->bounce); - acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); - qemu_bh_delete(acb->bh); - acb->bh = NULL; - acb->status = 0; - - if (!acb->cancelled) { - qemu_aio_release(acb); - } + acb->bh = qemu_bh_new(rbd_finish_bh, rcb); + qemu_bh_schedule(acb->bh); } static int rbd_aio_discard_wrapper(rbd_image_t image,