nbd: use a QemuMutex to synchronize yanking, reconnection and coroutines

The condition for waiting on the s->free_sema queue depends on
both s->in_flight and s->state.  The latter is currently using
atomics, but this is quite dubious and probably wrong.

Because s->state is written in the main thread too, for example by
the yank callback, it cannot be protected by a CoMutex.  Introduce a
separate lock that can be used by nbd_co_send_request(); later on this
lock will also be used for s->state.  There will not be any contention
on the lock unless there is a yank or reconnect, so this is not
performance sensitive.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
Message-Id: <20220414175756.671165-6-pbonzini@redhat.com>
Reviewed-by: Eric Blake <eblake@redhat.com>
Reviewed-by: Lukas Straub <lukasstraub2@web.de>
Signed-off-by: Eric Blake <eblake@redhat.com>
master
Paolo Bonzini 2022-04-14 19:57:52 +02:00 committed by Eric Blake
parent 8610b4491f
commit ee19d953ec
1 changed files with 27 additions and 19 deletions

View File

@ -71,17 +71,22 @@ typedef struct BDRVNBDState {
QIOChannel *ioc; /* The current I/O channel */
NBDExportInfo info;
CoMutex send_mutex;
/*
* Protects free_sema, in_flight, requests[].coroutine,
* reconnect_delay_timer.
*/
QemuMutex requests_lock;
CoQueue free_sema;
CoMutex receive_mutex;
int in_flight;
NBDClientRequest requests[MAX_NBD_REQUESTS];
QEMUTimer *reconnect_delay_timer;
CoMutex send_mutex;
CoMutex receive_mutex;
NBDClientState state;
QEMUTimer *reconnect_delay_timer;
QEMUTimer *open_timer;
NBDClientRequest requests[MAX_NBD_REQUESTS];
NBDReply reply;
BlockDriverState *bs;
@ -350,7 +355,7 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
return 0;
}
/* called under s->send_mutex */
/* Called with s->requests_lock taken. */
static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
{
bool blocking = nbd_client_connecting_wait(s);
@ -382,9 +387,9 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
s->ioc = NULL;
}
qemu_co_mutex_unlock(&s->send_mutex);
qemu_mutex_unlock(&s->requests_lock);
nbd_co_do_establish_connection(s->bs, blocking, NULL);
qemu_co_mutex_lock(&s->send_mutex);
qemu_mutex_lock(&s->requests_lock);
/*
* The reconnect attempt is done (maybe successfully, maybe not), so
@ -466,11 +471,10 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
int rc, i = -1;
qemu_co_mutex_lock(&s->send_mutex);
qemu_mutex_lock(&s->requests_lock);
while (s->in_flight == MAX_NBD_REQUESTS ||
(!nbd_client_connected(s) && s->in_flight > 0)) {
qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
qemu_co_queue_wait(&s->free_sema, &s->requests_lock);
}
s->in_flight++;
@ -491,13 +495,13 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
}
}
g_assert(qemu_in_coroutine());
assert(i < MAX_NBD_REQUESTS);
s->requests[i].coroutine = qemu_coroutine_self();
s->requests[i].offset = request->from;
s->requests[i].receiving = false;
qemu_mutex_unlock(&s->requests_lock);
qemu_co_mutex_lock(&s->send_mutex);
request->handle = INDEX_TO_HANDLE(s, i);
assert(s->ioc);
@ -517,17 +521,19 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
} else {
rc = nbd_send_request(s->ioc, request);
}
qemu_co_mutex_unlock(&s->send_mutex);
err:
if (rc < 0) {
qemu_mutex_lock(&s->requests_lock);
err:
nbd_channel_error(s, rc);
if (i != -1) {
s->requests[i].coroutine = NULL;
}
s->in_flight--;
qemu_co_queue_next(&s->free_sema);
qemu_mutex_unlock(&s->requests_lock);
}
qemu_co_mutex_unlock(&s->send_mutex);
return rc;
}
@ -1017,12 +1023,11 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
return true;
break_loop:
qemu_mutex_lock(&s->requests_lock);
s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
qemu_co_mutex_lock(&s->send_mutex);
s->in_flight--;
qemu_co_queue_next(&s->free_sema);
qemu_co_mutex_unlock(&s->send_mutex);
qemu_mutex_unlock(&s->requests_lock);
return false;
}
@ -1855,8 +1860,9 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
s->bs = bs;
qemu_co_mutex_init(&s->send_mutex);
qemu_mutex_init(&s->requests_lock);
qemu_co_queue_init(&s->free_sema);
qemu_co_mutex_init(&s->send_mutex);
qemu_co_mutex_init(&s->receive_mutex);
if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) {
@ -2043,9 +2049,11 @@ static void nbd_cancel_in_flight(BlockDriverState *bs)
reconnect_delay_timer_del(s);
qemu_mutex_lock(&s->requests_lock);
if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
s->state = NBD_CLIENT_CONNECTING_NOWAIT;
}
qemu_mutex_unlock(&s->requests_lock);
nbd_co_establish_connection_cancel(s->conn);
}