The important bits here are the first part of RCU.

v1->v2 changes are the new qemu-thread patch to fix Mac OS X,
 and cleaning up warnings.
 
 v2->v3 removed the patch to enable modules by default.
 -----BEGIN PGP SIGNATURE-----
 Version: GnuPG v2.0.22 (GNU/Linux)
 
 iQEcBAABAgAGBQJUz8/DAAoJEL/70l94x66DY68IAJHOKtBunMsX8fnMuqAXI6rO
 rN61r580tq5T3SQpAcVxjIRct1ujVLA9mzBhXSdyZj++ikR5aWXsSywU3hbNPyqk
 D6fDi5yOsR7eOCp+WFchd0usd1ZgYVgIcPvlI8iErOew63ImuzeExiDAgPmwIeki
 D687uHG75qE3l65i2/mUv2+NXuKbuVnqPRu0B4eOj7SaaGJ3g+8bpA8AbgHR8/xW
 Z6pI5sViciQRCRAXh8j6YvAQm7lfel/azjX2qxtkLV74QugcbbKWwPx5NZGlgaNc
 xJ1EVmy3F0R5MrrICL1+KrZnpbZqeWX4K/97oBN5tgA59FdOeFe2xTGfciWqSZw=
 =yzvB
 -----END PGP SIGNATURE-----

Merge remote-tracking branch 'remotes/bonzini/tags/for-upstream' into staging

The important bits here are the first part of RCU.

v1->v2 changes are the new qemu-thread patch to fix Mac OS X,
and cleaning up warnings.

v2->v3 removed the patch to enable modules by default.

# gpg: Signature made Mon 02 Feb 2015 19:28:03 GMT using RSA key ID 78C7AE83
# gpg: Good signature from "Paolo Bonzini <bonzini@gnu.org>"
# gpg:                 aka "Paolo Bonzini <pbonzini@redhat.com>"
# gpg: WARNING: This key is not certified with sufficiently trusted signatures!
# gpg:          It is not certain that the signature belongs to the owner.
# Primary key fingerprint: 46F5 9FBD 57D6 12E7 BFD4  E2F7 7E15 100C CD36 69B1
#      Subkey fingerprint: F133 3857 4B66 2389 866C  7682 BFFB D25F 78C7 AE83

* remotes/bonzini/tags/for-upstream:
  scsi: Fix scsi_req_cancel_async for no aiocb req
  cpu-exec: simplify init_delay_params
  cpu-exec: simplify align_clocks
  memory: avoid ref/unref in memory_region_find
  memory: protect current_map by RCU
  memory: remove assertion on memory_region_destroy
  rcu: add call_rcu
  rcu: allow nesting of rcu_read_lock/rcu_read_unlock
  rcu: add rcutorture
  rcu: add rcu library
  qemu-thread: fix qemu_event without futexes

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
master
Peter Maydell 2015-02-02 19:36:02 +00:00
commit d5fbb4c9ed
17 changed files with 1398 additions and 65 deletions

View File

@ -61,8 +61,7 @@ static void align_clocks(SyncClocks *sc, const CPUState *cpu)
sleep_delay.tv_sec = sc->diff_clk / 1000000000LL;
sleep_delay.tv_nsec = sc->diff_clk % 1000000000LL;
if (nanosleep(&sleep_delay, &rem_delay) < 0) {
sc->diff_clk -= (sleep_delay.tv_sec - rem_delay.tv_sec) * 1000000000LL;
sc->diff_clk -= sleep_delay.tv_nsec - rem_delay.tv_nsec;
sc->diff_clk = rem_delay.tv_sec * 1000000000LL + rem_delay.tv_nsec;
} else {
sc->diff_clk = 0;
}
@ -101,10 +100,8 @@ static void init_delay_params(SyncClocks *sc,
if (!icount_align_option) {
return;
}
sc->realtime_clock = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
sc->diff_clk = qemu_clock_get_ns(QEMU_CLOCK_VIRTUAL) -
sc->realtime_clock +
cpu_get_clock_offset();
sc->realtime_clock = qemu_clock_get_ns(QEMU_CLOCK_VIRTUAL_RT);
sc->diff_clk = qemu_clock_get_ns(QEMU_CLOCK_VIRTUAL) - sc->realtime_clock;
sc->last_cpu_icount = cpu->icount_extra + cpu->icount_decr.u16.low;
if (sc->diff_clk < max_delay) {
max_delay = sc->diff_clk;

17
cpus.c
View File

@ -229,23 +229,6 @@ int64_t cpu_get_clock(void)
return ti;
}
/* return the offset between the host clock and virtual CPU clock */
int64_t cpu_get_clock_offset(void)
{
int64_t ti;
unsigned start;
do {
start = seqlock_read_begin(&timers_state.vm_clock_seqlock);
ti = timers_state.cpu_clock_offset;
if (!timers_state.cpu_ticks_enabled) {
ti -= get_clock();
}
} while (seqlock_read_retry(&timers_state.vm_clock_seqlock, start));
return -ti;
}
/* enable cpu_get_ticks()
* Caller must hold BQL which server as mutex for vm_clock_seqlock.
*/

387
docs/rcu.txt Normal file
View File

@ -0,0 +1,387 @@
Using RCU (Read-Copy-Update) for synchronization
================================================
Read-copy update (RCU) is a synchronization mechanism that is used to
protect read-mostly data structures. RCU is very efficient and scalable
on the read side (it is wait-free), and thus can make the read paths
extremely fast.
RCU supports concurrency between a single writer and multiple readers,
thus it is not used alone. Typically, the write-side will use a lock to
serialize multiple updates, but other approaches are possible (e.g.,
restricting updates to a single task). In QEMU, when a lock is used,
this will often be the "iothread mutex", also known as the "big QEMU
lock" (BQL). Also, restricting updates to a single task is done in
QEMU using the "bottom half" API.
RCU is fundamentally a "wait-to-finish" mechanism. The read side marks
sections of code with "critical sections", and the update side will wait
for the execution of all *currently running* critical sections before
proceeding, or before asynchronously executing a callback.
The key point here is that only the currently running critical sections
are waited for; critical sections that are started _after_ the beginning
of the wait do not extend the wait, despite running concurrently with
the updater. This is the reason why RCU is more scalable than,
for example, reader-writer locks. It is so much more scalable that
the system will have a single instance of the RCU mechanism; a single
mechanism can be used for an arbitrary number of "things", without
having to worry about things such as contention or deadlocks.
How is this possible? The basic idea is to split updates in two phases,
"removal" and "reclamation". During removal, we ensure that subsequent
readers will not be able to get a reference to the old data. After
removal has completed, a critical section will not be able to access
the old data. Therefore, critical sections that begin after removal
do not matter; as soon as all previous critical sections have finished,
there cannot be any readers who hold references to the data structure,
and these can now be safely reclaimed (e.g., freed or unref'ed).
Here is a picutre:
thread 1 thread 2 thread 3
------------------- ------------------------ -------------------
enter RCU crit.sec.
| finish removal phase
| begin wait
| | enter RCU crit.sec.
exit RCU crit.sec | |
complete wait |
begin reclamation phase |
exit RCU crit.sec.
Note how thread 3 is still executing its critical section when thread 2
starts reclaiming data. This is possible, because the old version of the
data structure was not accessible at the time thread 3 began executing
that critical section.
RCU API
=======
The core RCU API is small:
void rcu_read_lock(void);
Used by a reader to inform the reclaimer that the reader is
entering an RCU read-side critical section.
void rcu_read_unlock(void);
Used by a reader to inform the reclaimer that the reader is
exiting an RCU read-side critical section. Note that RCU
read-side critical sections may be nested and/or overlapping.
void synchronize_rcu(void);
Blocks until all pre-existing RCU read-side critical sections
on all threads have completed. This marks the end of the removal
phase and the beginning of reclamation phase.
Note that it would be valid for another update to come while
synchronize_rcu is running. Because of this, it is better that
the updater releases any locks it may hold before calling
synchronize_rcu. If this is not possible (for example, because
the updater is protected by the BQL), you can use call_rcu.
void call_rcu1(struct rcu_head * head,
void (*func)(struct rcu_head *head));
This function invokes func(head) after all pre-existing RCU
read-side critical sections on all threads have completed. This
marks the end of the removal phase, with func taking care
asynchronously of the reclamation phase.
The foo struct needs to have an rcu_head structure added,
perhaps as follows:
struct foo {
struct rcu_head rcu;
int a;
char b;
long c;
};
so that the reclaimer function can fetch the struct foo address
and free it:
call_rcu1(&foo.rcu, foo_reclaim);
void foo_reclaim(struct rcu_head *rp)
{
struct foo *fp = container_of(rp, struct foo, rcu);
g_free(fp);
}
For the common case where the rcu_head member is the first of the
struct, you can use the following macro.
void call_rcu(T *p,
void (*func)(T *p),
field-name);
call_rcu1 is typically used through this macro, in the common case
where the "struct rcu_head" is the first field in the struct. In
the above case, one could have written simply:
call_rcu(foo_reclaim, g_free, rcu);
typeof(*p) atomic_rcu_read(p);
atomic_rcu_read() is similar to atomic_mb_read(), but it makes
some assumptions on the code that calls it. This allows a more
optimized implementation.
atomic_rcu_read assumes that whenever a single RCU critical
section reads multiple shared data, these reads are either
data-dependent or need no ordering. This is almost always the
case when using RCU, because read-side critical sections typically
navigate one or more pointers (the pointers that are changed on
every update) until reaching a data structure of interest,
and then read from there.
RCU read-side critical sections must use atomic_rcu_read() to
read data, unless concurrent writes are presented by another
synchronization mechanism.
Furthermore, RCU read-side critical sections should traverse the
data structure in a single direction, opposite to the direction
in which the updater initializes it.
void atomic_rcu_set(p, typeof(*p) v);
atomic_rcu_set() is also similar to atomic_mb_set(), and it also
makes assumptions on the code that calls it in order to allow a more
optimized implementation.
In particular, atomic_rcu_set() suffices for synchronization
with readers, if the updater never mutates a field within a
data item that is already accessible to readers. This is the
case when initializing a new copy of the RCU-protected data
structure; just ensure that initialization of *p is carried out
before atomic_rcu_set() makes the data item visible to readers.
If this rule is observed, writes will happen in the opposite
order as reads in the RCU read-side critical sections (or if
there is just one update), and there will be no need for other
synchronization mechanism to coordinate the accesses.
The following APIs must be used before RCU is used in a thread:
void rcu_register_thread(void);
Mark a thread as taking part in the RCU mechanism. Such a thread
will have to report quiescent points regularly, either manually
or through the QemuCond/QemuSemaphore/QemuEvent APIs.
void rcu_unregister_thread(void);
Mark a thread as not taking part anymore in the RCU mechanism.
It is not a problem if such a thread reports quiescent points,
either manually or by using the QemuCond/QemuSemaphore/QemuEvent
APIs.
Note that these APIs are relatively heavyweight, and should _not_ be
nested.
DIFFERENCES WITH LINUX
======================
- Waiting on a mutex is possible, though discouraged, within an RCU critical
section. This is because spinlocks are rarely (if ever) used in userspace
programming; not allowing this would prevent upgrading an RCU read-side
critical section to become an updater.
- atomic_rcu_read and atomic_rcu_set replace rcu_dereference and
rcu_assign_pointer. They take a _pointer_ to the variable being accessed.
- call_rcu is a macro that has an extra argument (the name of the first
field in the struct, which must be a struct rcu_head), and expects the
type of the callback's argument to be the type of the first argument.
call_rcu1 is the same as Linux's call_rcu.
RCU PATTERNS
============
Many patterns using read-writer locks translate directly to RCU, with
the advantages of higher scalability and deadlock immunity.
In general, RCU can be used whenever it is possible to create a new
"version" of a data structure every time the updater runs. This may
sound like a very strict restriction, however:
- the updater does not mean "everything that writes to a data structure",
but rather "everything that involves a reclamation step". See the
array example below
- in some cases, creating a new version of a data structure may actually
be very cheap. For example, modifying the "next" pointer of a singly
linked list is effectively creating a new version of the list.
Here are some frequently-used RCU idioms that are worth noting.
RCU list processing
-------------------
TBD (not yet used in QEMU)
RCU reference counting
----------------------
Because grace periods are not allowed to complete while there is an RCU
read-side critical section in progress, the RCU read-side primitives
may be used as a restricted reference-counting mechanism. For example,
consider the following code fragment:
rcu_read_lock();
p = atomic_rcu_read(&foo);
/* do something with p. */
rcu_read_unlock();
The RCU read-side critical section ensures that the value of "p" remains
valid until after the rcu_read_unlock(). In some sense, it is acquiring
a reference to p that is later released when the critical section ends.
The write side looks simply like this (with appropriate locking):
qemu_mutex_lock(&foo_mutex);
old = foo;
atomic_rcu_set(&foo, new);
qemu_mutex_unlock(&foo_mutex);
synchronize_rcu();
free(old);
If the processing cannot be done purely within the critical section, it
is possible to combine this idiom with a "real" reference count:
rcu_read_lock();
p = atomic_rcu_read(&foo);
foo_ref(p);
rcu_read_unlock();
/* do something with p. */
foo_unref(p);
The write side can be like this:
qemu_mutex_lock(&foo_mutex);
old = foo;
atomic_rcu_set(&foo, new);
qemu_mutex_unlock(&foo_mutex);
synchronize_rcu();
foo_unref(old);
or with call_rcu:
qemu_mutex_lock(&foo_mutex);
old = foo;
atomic_rcu_set(&foo, new);
qemu_mutex_unlock(&foo_mutex);
call_rcu(foo_unref, old, rcu);
In both cases, the write side only performs removal. Reclamation
happens when the last reference to a "foo" object is dropped.
Using synchronize_rcu() is undesirably expensive, because the
last reference may be dropped on the read side. Hence you can
use call_rcu() instead:
foo_unref(struct foo *p) {
if (atomic_fetch_dec(&p->refcount) == 1) {
call_rcu(foo_destroy, p, rcu);
}
}
Note that the same idioms would be possible with reader/writer
locks:
read_lock(&foo_rwlock); write_mutex_lock(&foo_rwlock);
p = foo; p = foo;
/* do something with p. */ foo = new;
read_unlock(&foo_rwlock); free(p);
write_mutex_unlock(&foo_rwlock);
free(p);
------------------------------------------------------------------
read_lock(&foo_rwlock); write_mutex_lock(&foo_rwlock);
p = foo; old = foo;
foo_ref(p); foo = new;
read_unlock(&foo_rwlock); foo_unref(old);
/* do something with p. */ write_mutex_unlock(&foo_rwlock);
read_lock(&foo_rwlock);
foo_unref(p);
read_unlock(&foo_rwlock);
foo_unref could use a mechanism such as bottom halves to move deallocation
out of the write-side critical section.
RCU resizable arrays
--------------------
Resizable arrays can be used with RCU. The expensive RCU synchronization
(or call_rcu) only needs to take place when the array is resized.
The two items to take care of are:
- ensuring that the old version of the array is available between removal
and reclamation;
- avoiding mismatches in the read side between the array data and the
array size.
The first problem is avoided simply by not using realloc. Instead,
each resize will allocate a new array and copy the old data into it.
The second problem would arise if the size and the data pointers were
two members of a larger struct:
struct mystuff {
...
int data_size;
int data_alloc;
T *data;
...
};
Instead, we store the size of the array with the array itself:
struct arr {
int size;
int alloc;
T data[];
};
struct arr *global_array;
read side:
rcu_read_lock();
struct arr *array = atomic_rcu_read(&global_array);
x = i < array->size ? array->data[i] : -1;
rcu_read_unlock();
return x;
write side (running under a lock):
if (global_array->size == global_array->alloc) {
/* Creating a new version. */
new_array = g_malloc(sizeof(struct arr) +
global_array->alloc * 2 * sizeof(T));
new_array->size = global_array->size;
new_array->alloc = global_array->alloc * 2;
memcpy(new_array->data, global_array->data,
global_array->alloc * sizeof(T));
/* Removal phase. */
old_array = global_array;
atomic_rcu_set(&new_array->data, new_array);
synchronize_rcu();
/* Reclamation phase. */
free(old_array);
}
SOURCES
=======
* Documentation/RCU/ from the Linux kernel

View File

@ -17,6 +17,7 @@
#include "virtio-9p-xattr.h"
#include "fsdev/qemu-fsdev.h"
#include "virtio-9p-synth.h"
#include "qemu/rcu.h"
#include <sys/stat.h>

View File

@ -1756,6 +1756,8 @@ void scsi_req_cancel_async(SCSIRequest *req, Notifier *notifier)
req->io_canceled = true;
if (req->aiocb) {
blk_aio_cancel_async(req->aiocb);
} else {
scsi_req_cancel_complete(req);
}
}

View File

@ -33,6 +33,7 @@
#include "qemu/notify.h"
#include "qapi/error.h"
#include "qom/object.h"
#include "qemu/rcu.h"
#define MAX_PHYS_ADDR_SPACE_BITS 62
#define MAX_PHYS_ADDR (((hwaddr)1 << MAX_PHYS_ADDR_SPACE_BITS) - 1)
@ -207,9 +208,13 @@ struct MemoryListener {
*/
struct AddressSpace {
/* All fields are private. */
struct rcu_head rcu;
char *name;
MemoryRegion *root;
/* Accessed via RCU. */
struct FlatView *current_map;
int ioeventfd_nb;
struct MemoryRegionIoeventfd *ioeventfds;
struct AddressSpaceDispatch *dispatch;

View File

@ -129,6 +129,67 @@
#define atomic_set(ptr, i) ((*(__typeof__(*ptr) volatile*) (ptr)) = (i))
#endif
/**
* atomic_rcu_read - reads a RCU-protected pointer to a local variable
* into a RCU read-side critical section. The pointer can later be safely
* dereferenced within the critical section.
*
* This ensures that the pointer copy is invariant thorough the whole critical
* section.
*
* Inserts memory barriers on architectures that require them (currently only
* Alpha) and documents which pointers are protected by RCU.
*
* Unless the __ATOMIC_CONSUME memory order is available, atomic_rcu_read also
* includes a compiler barrier to ensure that value-speculative optimizations
* (e.g. VSS: Value Speculation Scheduling) does not perform the data read
* before the pointer read by speculating the value of the pointer. On new
* enough compilers, atomic_load takes care of such concern about
* dependency-breaking optimizations.
*
* Should match atomic_rcu_set(), atomic_xchg(), atomic_cmpxchg().
*/
#ifndef atomic_rcu_read
#ifdef __ATOMIC_CONSUME
#define atomic_rcu_read(ptr) ({ \
typeof(*ptr) _val; \
__atomic_load(ptr, &_val, __ATOMIC_CONSUME); \
_val; \
})
#else
#define atomic_rcu_read(ptr) ({ \
typeof(*ptr) _val = atomic_read(ptr); \
smp_read_barrier_depends(); \
_val; \
})
#endif
#endif
/**
* atomic_rcu_set - assigns (publicizes) a pointer to a new data structure
* meant to be read by RCU read-side critical sections.
*
* Documents which pointers will be dereferenced by RCU read-side critical
* sections and adds the required memory barriers on architectures requiring
* them. It also makes sure the compiler does not reorder code initializing the
* data structure before its publication.
*
* Should match atomic_rcu_read().
*/
#ifndef atomic_rcu_set
#ifdef __ATOMIC_RELEASE
#define atomic_rcu_set(ptr, i) do { \
typeof(*ptr) _val = (i); \
__atomic_store(ptr, &_val, __ATOMIC_RELEASE); \
} while(0)
#else
#define atomic_rcu_set(ptr, i) do { \
smp_wmb(); \
atomic_set(ptr, i); \
} while (0)
#endif
#endif
/* These have the same semantics as Java volatile variables.
* See http://gee.cs.oswego.edu/dl/jmm/cookbook.html:
* "1. Issue a StoreStore barrier (wmb) before each volatile store."

View File

@ -104,6 +104,19 @@ struct { \
(head)->lh_first = NULL; \
} while (/*CONSTCOND*/0)
#define QLIST_SWAP(dstlist, srclist, field) do { \
void *tmplist; \
tmplist = (srclist)->lh_first; \
(srclist)->lh_first = (dstlist)->lh_first; \
if ((srclist)->lh_first != NULL) { \
(srclist)->lh_first->field.le_prev = &(srclist)->lh_first; \
} \
(dstlist)->lh_first = tmplist; \
if ((dstlist)->lh_first != NULL) { \
(dstlist)->lh_first->field.le_prev = &(dstlist)->lh_first; \
} \
} while (/*CONSTCOND*/0)
#define QLIST_INSERT_AFTER(listelm, elm, field) do { \
if (((elm)->field.le_next = (listelm)->field.le_next) != NULL) \
(listelm)->field.le_next->field.le_prev = \

147
include/qemu/rcu.h Normal file
View File

@ -0,0 +1,147 @@
#ifndef QEMU_RCU_H
#define QEMU_RCU_H
/*
* urcu-mb.h
*
* Userspace RCU header with explicit memory barrier.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* IBM's contributions to this file may be relicensed under LGPLv2 or later.
*/
#include <stdlib.h>
#include <assert.h>
#include <limits.h>
#include <unistd.h>
#include <stdint.h>
#include <stdbool.h>
#include <glib.h>
#include "qemu/compiler.h"
#include "qemu/thread.h"
#include "qemu/queue.h"
#include "qemu/atomic.h"
#ifdef __cplusplus
extern "C" {
#endif
/*
* Important !
*
* Each thread containing read-side critical sections must be registered
* with rcu_register_thread() before calling rcu_read_lock().
* rcu_unregister_thread() should be called before the thread exits.
*/
#ifdef DEBUG_RCU
#define rcu_assert(args...) assert(args)
#else
#define rcu_assert(args...)
#endif
/*
* Global quiescent period counter with low-order bits unused.
* Using a int rather than a char to eliminate false register dependencies
* causing stalls on some architectures.
*/
extern unsigned long rcu_gp_ctr;
extern QemuEvent rcu_gp_event;
struct rcu_reader_data {
/* Data used by both reader and synchronize_rcu() */
unsigned long ctr;
bool waiting;
/* Data used by reader only */
unsigned depth;
/* Data used for registry, protected by rcu_gp_lock */
QLIST_ENTRY(rcu_reader_data) node;
};
extern __thread struct rcu_reader_data rcu_reader;
static inline void rcu_read_lock(void)
{
struct rcu_reader_data *p_rcu_reader = &rcu_reader;
unsigned ctr;
if (p_rcu_reader->depth++ > 0) {
return;
}
ctr = atomic_read(&rcu_gp_ctr);
atomic_xchg(&p_rcu_reader->ctr, ctr);
if (atomic_read(&p_rcu_reader->waiting)) {
atomic_set(&p_rcu_reader->waiting, false);
qemu_event_set(&rcu_gp_event);
}
}
static inline void rcu_read_unlock(void)
{
struct rcu_reader_data *p_rcu_reader = &rcu_reader;
assert(p_rcu_reader->depth != 0);
if (--p_rcu_reader->depth > 0) {
return;
}
atomic_xchg(&p_rcu_reader->ctr, 0);
if (atomic_read(&p_rcu_reader->waiting)) {
atomic_set(&p_rcu_reader->waiting, false);
qemu_event_set(&rcu_gp_event);
}
}
extern void synchronize_rcu(void);
/*
* Reader thread registration.
*/
extern void rcu_register_thread(void);
extern void rcu_unregister_thread(void);
struct rcu_head;
typedef void RCUCBFunc(struct rcu_head *head);
struct rcu_head {
struct rcu_head *next;
RCUCBFunc *func;
};
extern void call_rcu1(struct rcu_head *head, RCUCBFunc *func);
/* The operands of the minus operator must have the same type,
* which must be the one that we specify in the cast.
*/
#define call_rcu(head, func, field) \
call_rcu1(({ \
char __attribute__((unused)) \
offset_must_be_zero[-offsetof(typeof(*(head)), field)], \
func_type_invalid = (func) - (void (*)(typeof(head)))(func); \
&(head)->field; \
}), \
(RCUCBFunc *)(func))
#ifdef __cplusplus
}
#endif
#endif /* QEMU_RCU_H */

View File

@ -25,9 +25,6 @@ void qemu_mutex_lock(QemuMutex *mutex);
int qemu_mutex_trylock(QemuMutex *mutex);
void qemu_mutex_unlock(QemuMutex *mutex);
#define rcu_read_lock() do { } while (0)
#define rcu_read_unlock() do { } while (0)
void qemu_cond_init(QemuCond *cond);
void qemu_cond_destroy(QemuCond *cond);

View File

@ -838,7 +838,6 @@ static inline int64_t get_clock(void)
int64_t cpu_get_icount_raw(void);
int64_t cpu_get_icount(void);
int64_t cpu_get_clock(void);
int64_t cpu_get_clock_offset(void);
int64_t cpu_icount_to_ns(int64_t icount);
/*******************************************/

View File

@ -33,26 +33,12 @@ static bool memory_region_update_pending;
static bool ioeventfd_update_pending;
static bool global_dirty_log = false;
/* flat_view_mutex is taken around reading as->current_map; the critical
* section is extremely short, so I'm using a single mutex for every AS.
* We could also RCU for the read-side.
*
* The BQL is taken around transaction commits, hence both locks are taken
* while writing to as->current_map (with the BQL taken outside).
*/
static QemuMutex flat_view_mutex;
static QTAILQ_HEAD(memory_listeners, MemoryListener) memory_listeners
= QTAILQ_HEAD_INITIALIZER(memory_listeners);
static QTAILQ_HEAD(, AddressSpace) address_spaces
= QTAILQ_HEAD_INITIALIZER(address_spaces);
static void memory_init(void)
{
qemu_mutex_init(&flat_view_mutex);
}
typedef struct AddrRange AddrRange;
/*
@ -242,6 +228,7 @@ struct FlatRange {
* order.
*/
struct FlatView {
struct rcu_head rcu;
unsigned ref;
FlatRange *ranges;
unsigned nr;
@ -654,10 +641,10 @@ static FlatView *address_space_get_flatview(AddressSpace *as)
{
FlatView *view;
qemu_mutex_lock(&flat_view_mutex);
view = as->current_map;
rcu_read_lock();
view = atomic_rcu_read(&as->current_map);
flatview_ref(view);
qemu_mutex_unlock(&flat_view_mutex);
rcu_read_unlock();
return view;
}
@ -766,10 +753,9 @@ static void address_space_update_topology(AddressSpace *as)
address_space_update_topology_pass(as, old_view, new_view, false);
address_space_update_topology_pass(as, old_view, new_view, true);
qemu_mutex_lock(&flat_view_mutex);
flatview_unref(as->current_map);
as->current_map = new_view;
qemu_mutex_unlock(&flat_view_mutex);
/* Writes are protected by the BQL. */
atomic_rcu_set(&as->current_map, new_view);
call_rcu(old_view, flatview_unref, rcu);
/* Note that all the old MemoryRegions are still alive up to this
* point. This relieves most MemoryListeners from the need to
@ -1263,7 +1249,6 @@ static void memory_region_finalize(Object *obj)
MemoryRegion *mr = MEMORY_REGION(obj);
assert(QTAILQ_EMPTY(&mr->subregions));
assert(memory_region_transaction_depth == 0);
mr->destructor(mr);
memory_region_clear_coalescing(mr);
g_free((char *)mr->name);
@ -1843,11 +1828,11 @@ MemoryRegionSection memory_region_find(MemoryRegion *mr,
}
range = addrrange_make(int128_make64(addr), int128_make64(size));
view = address_space_get_flatview(as);
rcu_read_lock();
view = atomic_rcu_read(&as->current_map);
fr = flatview_lookup(view, range);
if (!fr) {
flatview_unref(view);
return ret;
goto out;
}
while (fr > view->ranges && addrrange_intersects(fr[-1].addr, range)) {
@ -1864,8 +1849,8 @@ MemoryRegionSection memory_region_find(MemoryRegion *mr,
ret.offset_within_address_space = int128_get64(range.start);
ret.readonly = fr->readonly;
memory_region_ref(ret.mr);
flatview_unref(view);
out:
rcu_read_unlock();
return ret;
}
@ -1958,10 +1943,6 @@ void memory_listener_unregister(MemoryListener *listener)
void address_space_init(AddressSpace *as, MemoryRegion *root, const char *name)
{
if (QTAILQ_EMPTY(&address_spaces)) {
memory_init();
}
memory_region_transaction_begin();
as->root = root;
as->current_map = g_new(FlatView, 1);
@ -1975,15 +1956,10 @@ void address_space_init(AddressSpace *as, MemoryRegion *root, const char *name)
memory_region_transaction_commit();
}
void address_space_destroy(AddressSpace *as)
static void do_address_space_destroy(AddressSpace *as)
{
MemoryListener *listener;
/* Flush out anything from MemoryListeners listening in on this */
memory_region_transaction_begin();
as->root = NULL;
memory_region_transaction_commit();
QTAILQ_REMOVE(&address_spaces, as, address_spaces_link);
address_space_destroy_dispatch(as);
QTAILQ_FOREACH(listener, &memory_listeners, link) {
@ -1995,6 +1971,21 @@ void address_space_destroy(AddressSpace *as)
g_free(as->ioeventfds);
}
void address_space_destroy(AddressSpace *as)
{
/* Flush out anything from MemoryListeners listening in on this */
memory_region_transaction_begin();
as->root = NULL;
memory_region_transaction_commit();
QTAILQ_REMOVE(&address_spaces, as, address_spaces_link);
/* At this point, as->dispatch and as->current_map are dummy
* entries that the guest should never use. Wait for the old
* values to expire before freeing the data.
*/
call_rcu(as, do_address_space_destroy, rcu);
}
bool io_mem_read(MemoryRegion *mr, hwaddr addr, uint64_t *pval, unsigned size)
{
return memory_region_dispatch_read(mr, addr, pval, size);

View File

@ -60,6 +60,8 @@ gcov-files-test-mul64-y = util/host-utils.c
check-unit-y += tests/test-int128$(EXESUF)
# all code tested by test-int128 is inside int128.h
gcov-files-test-int128-y =
check-unit-y += tests/rcutorture$(EXESUF)
gcov-files-rcutorture-y = util/rcu.c
check-unit-y += tests/test-bitops$(EXESUF)
check-unit-$(CONFIG_HAS_GLIB_SUBPROCESS_TESTS) += tests/test-qdev-global-props$(EXESUF)
check-unit-y += tests/check-qom-interface$(EXESUF)
@ -223,7 +225,8 @@ test-obj-y = tests/check-qint.o tests/check-qstring.o tests/check-qdict.o \
tests/test-qmp-input-visitor.o tests/test-qmp-input-strict.o \
tests/test-qmp-commands.o tests/test-visitor-serialization.o \
tests/test-x86-cpuid.o tests/test-mul64.o tests/test-int128.o \
tests/test-opts-visitor.o tests/test-qmp-event.o
tests/test-opts-visitor.o tests/test-qmp-event.o \
tests/rcutorture.o
test-qapi-obj-y = tests/test-qapi-visit.o tests/test-qapi-types.o \
tests/test-qapi-event.o
@ -252,6 +255,8 @@ tests/test-x86-cpuid$(EXESUF): tests/test-x86-cpuid.o
tests/test-xbzrle$(EXESUF): tests/test-xbzrle.o migration/xbzrle.o page_cache.o libqemuutil.a
tests/test-cutils$(EXESUF): tests/test-cutils.o util/cutils.o
tests/test-int128$(EXESUF): tests/test-int128.o
tests/rcutorture$(EXESUF): tests/rcutorture.o libqemuutil.a
tests/test-qdev-global-props$(EXESUF): tests/test-qdev-global-props.o \
hw/core/qdev.o hw/core/qdev-properties.o hw/core/hotplug.o\
hw/core/irq.o \

451
tests/rcutorture.c Normal file
View File

@ -0,0 +1,451 @@
/*
* rcutorture.c: simple user-level performance/stress test of RCU.
*
* Usage:
* ./rcu <nreaders> rperf [ <seconds> ]
* Run a read-side performance test with the specified
* number of readers for <seconds> seconds.
* ./rcu <nupdaters> uperf [ <seconds> ]
* Run an update-side performance test with the specified
* number of updaters and specified duration.
* ./rcu <nreaders> perf [ <seconds> ]
* Run a combined read/update performance test with the specified
* number of readers and one updater and specified duration.
*
* The above tests produce output as follows:
*
* n_reads: 46008000 n_updates: 146026 nreaders: 2 nupdaters: 1 duration: 1
* ns/read: 43.4707 ns/update: 6848.1
*
* The first line lists the total number of RCU reads and updates executed
* during the test, the number of reader threads, the number of updater
* threads, and the duration of the test in seconds. The second line
* lists the average duration of each type of operation in nanoseconds,
* or "nan" if the corresponding type of operation was not performed.
*
* ./rcu <nreaders> stress [ <seconds> ]
* Run a stress test with the specified number of readers and
* one updater.
*
* This test produces output as follows:
*
* n_reads: 114633217 n_updates: 3903415 n_mberror: 0
* rcu_stress_count: 114618391 14826 0 0 0 0 0 0 0 0 0
*
* The first line lists the number of RCU read and update operations
* executed, followed by the number of memory-ordering violations
* (which will be zero in a correct RCU implementation). The second
* line lists the number of readers observing progressively more stale
* data. A correct RCU implementation will have all but the first two
* numbers non-zero.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* Copyright (c) 2008 Paul E. McKenney, IBM Corporation.
*/
/*
* Test variables.
*/
#include <glib.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include "qemu/atomic.h"
#include "qemu/rcu.h"
#include "qemu/compiler.h"
#include "qemu/thread.h"
long long n_reads = 0LL;
long n_updates = 0L;
int nthreadsrunning;
#define GOFLAG_INIT 0
#define GOFLAG_RUN 1
#define GOFLAG_STOP 2
static volatile int goflag = GOFLAG_INIT;
#define RCU_READ_RUN 1000
#define NR_THREADS 100
static QemuThread threads[NR_THREADS];
static struct rcu_reader_data *data[NR_THREADS];
static int n_threads;
static void create_thread(void *(*func)(void *))
{
if (n_threads >= NR_THREADS) {
fprintf(stderr, "Thread limit of %d exceeded!\n", NR_THREADS);
exit(-1);
}
qemu_thread_create(&threads[n_threads], "test", func, &data[n_threads],
QEMU_THREAD_JOINABLE);
n_threads++;
}
static void wait_all_threads(void)
{
int i;
for (i = 0; i < n_threads; i++) {
qemu_thread_join(&threads[i]);
}
n_threads = 0;
}
/*
* Performance test.
*/
static void *rcu_read_perf_test(void *arg)
{
int i;
long long n_reads_local = 0;
rcu_register_thread();
*(struct rcu_reader_data **)arg = &rcu_reader;
atomic_inc(&nthreadsrunning);
while (goflag == GOFLAG_INIT) {
g_usleep(1000);
}
while (goflag == GOFLAG_RUN) {
for (i = 0; i < RCU_READ_RUN; i++) {
rcu_read_lock();
rcu_read_unlock();
}
n_reads_local += RCU_READ_RUN;
}
atomic_add(&n_reads, n_reads_local);
rcu_unregister_thread();
return NULL;
}
static void *rcu_update_perf_test(void *arg)
{
long long n_updates_local = 0;
rcu_register_thread();
*(struct rcu_reader_data **)arg = &rcu_reader;
atomic_inc(&nthreadsrunning);
while (goflag == GOFLAG_INIT) {
g_usleep(1000);
}
while (goflag == GOFLAG_RUN) {
synchronize_rcu();
n_updates_local++;
}
atomic_add(&n_updates, n_updates_local);
rcu_unregister_thread();
return NULL;
}
static void perftestinit(void)
{
nthreadsrunning = 0;
}
static void perftestrun(int nthreads, int duration, int nreaders, int nupdaters)
{
while (atomic_read(&nthreadsrunning) < nthreads) {
g_usleep(1000);
}
goflag = GOFLAG_RUN;
g_usleep(duration * G_USEC_PER_SEC);
goflag = GOFLAG_STOP;
wait_all_threads();
printf("n_reads: %lld n_updates: %ld nreaders: %d nupdaters: %d duration: %d\n",
n_reads, n_updates, nreaders, nupdaters, duration);
printf("ns/read: %g ns/update: %g\n",
((duration * 1000*1000*1000.*(double)nreaders) /
(double)n_reads),
((duration * 1000*1000*1000.*(double)nupdaters) /
(double)n_updates));
exit(0);
}
static void perftest(int nreaders, int duration)
{
int i;
perftestinit();
for (i = 0; i < nreaders; i++) {
create_thread(rcu_read_perf_test);
}
create_thread(rcu_update_perf_test);
perftestrun(i + 1, duration, nreaders, 1);
}
static void rperftest(int nreaders, int duration)
{
int i;
perftestinit();
for (i = 0; i < nreaders; i++) {
create_thread(rcu_read_perf_test);
}
perftestrun(i, duration, nreaders, 0);
}
static void uperftest(int nupdaters, int duration)
{
int i;
perftestinit();
for (i = 0; i < nupdaters; i++) {
create_thread(rcu_update_perf_test);
}
perftestrun(i, duration, 0, nupdaters);
}
/*
* Stress test.
*/
#define RCU_STRESS_PIPE_LEN 10
struct rcu_stress {
int pipe_count;
int mbtest;
};
struct rcu_stress rcu_stress_array[RCU_STRESS_PIPE_LEN] = { { 0 } };
struct rcu_stress *rcu_stress_current;
int rcu_stress_idx;
int n_mberror;
long long rcu_stress_count[RCU_STRESS_PIPE_LEN + 1];
static void *rcu_read_stress_test(void *arg)
{
int i;
int itercnt = 0;
struct rcu_stress *p;
int pc;
long long n_reads_local = 0;
volatile int garbage = 0;
rcu_register_thread();
*(struct rcu_reader_data **)arg = &rcu_reader;
while (goflag == GOFLAG_INIT) {
g_usleep(1000);
}
while (goflag == GOFLAG_RUN) {
rcu_read_lock();
p = atomic_rcu_read(&rcu_stress_current);
if (p->mbtest == 0) {
n_mberror++;
}
rcu_read_lock();
for (i = 0; i < 100; i++) {
garbage++;
}
rcu_read_unlock();
pc = p->pipe_count;
rcu_read_unlock();
if ((pc > RCU_STRESS_PIPE_LEN) || (pc < 0)) {
pc = RCU_STRESS_PIPE_LEN;
}
atomic_inc(&rcu_stress_count[pc]);
n_reads_local++;
if ((++itercnt % 0x1000) == 0) {
synchronize_rcu();
}
}
atomic_add(&n_reads, n_reads_local);
rcu_unregister_thread();
return NULL;
}
static void *rcu_update_stress_test(void *arg)
{
int i;
struct rcu_stress *p;
rcu_register_thread();
*(struct rcu_reader_data **)arg = &rcu_reader;
while (goflag == GOFLAG_INIT) {
g_usleep(1000);
}
while (goflag == GOFLAG_RUN) {
i = rcu_stress_idx + 1;
if (i >= RCU_STRESS_PIPE_LEN) {
i = 0;
}
p = &rcu_stress_array[i];
p->mbtest = 0;
smp_mb();
p->pipe_count = 0;
p->mbtest = 1;
atomic_rcu_set(&rcu_stress_current, p);
rcu_stress_idx = i;
for (i = 0; i < RCU_STRESS_PIPE_LEN; i++) {
if (i != rcu_stress_idx) {
rcu_stress_array[i].pipe_count++;
}
}
synchronize_rcu();
n_updates++;
}
rcu_unregister_thread();
return NULL;
}
static void *rcu_fake_update_stress_test(void *arg)
{
rcu_register_thread();
*(struct rcu_reader_data **)arg = &rcu_reader;
while (goflag == GOFLAG_INIT) {
g_usleep(1000);
}
while (goflag == GOFLAG_RUN) {
synchronize_rcu();
g_usleep(1000);
}
rcu_unregister_thread();
return NULL;
}
static void stresstest(int nreaders, int duration)
{
int i;
rcu_stress_current = &rcu_stress_array[0];
rcu_stress_current->pipe_count = 0;
rcu_stress_current->mbtest = 1;
for (i = 0; i < nreaders; i++) {
create_thread(rcu_read_stress_test);
}
create_thread(rcu_update_stress_test);
for (i = 0; i < 5; i++) {
create_thread(rcu_fake_update_stress_test);
}
goflag = GOFLAG_RUN;
g_usleep(duration * G_USEC_PER_SEC);
goflag = GOFLAG_STOP;
wait_all_threads();
printf("n_reads: %lld n_updates: %ld n_mberror: %d\n",
n_reads, n_updates, n_mberror);
printf("rcu_stress_count:");
for (i = 0; i <= RCU_STRESS_PIPE_LEN; i++) {
printf(" %lld", rcu_stress_count[i]);
}
printf("\n");
exit(0);
}
/* GTest interface */
static void gtest_stress(int nreaders, int duration)
{
int i;
rcu_stress_current = &rcu_stress_array[0];
rcu_stress_current->pipe_count = 0;
rcu_stress_current->mbtest = 1;
for (i = 0; i < nreaders; i++) {
create_thread(rcu_read_stress_test);
}
create_thread(rcu_update_stress_test);
for (i = 0; i < 5; i++) {
create_thread(rcu_fake_update_stress_test);
}
goflag = GOFLAG_RUN;
g_usleep(duration * G_USEC_PER_SEC);
goflag = GOFLAG_STOP;
wait_all_threads();
g_assert_cmpint(n_mberror, ==, 0);
for (i = 2; i <= RCU_STRESS_PIPE_LEN; i++) {
g_assert_cmpint(rcu_stress_count[i], ==, 0);
}
}
static void gtest_stress_1_1(void)
{
gtest_stress(1, 1);
}
static void gtest_stress_10_1(void)
{
gtest_stress(10, 1);
}
static void gtest_stress_1_5(void)
{
gtest_stress(1, 5);
}
static void gtest_stress_10_5(void)
{
gtest_stress(10, 5);
}
/*
* Mainprogram.
*/
static void usage(int argc, char *argv[])
{
fprintf(stderr, "Usage: %s [nreaders [ perf | stress ] ]\n", argv[0]);
exit(-1);
}
int main(int argc, char *argv[])
{
int nreaders = 1;
int duration = 1;
if (argc >= 2 && argv[1][0] == '-') {
g_test_init(&argc, &argv, NULL);
if (g_test_quick()) {
g_test_add_func("/rcu/torture/1reader", gtest_stress_1_1);
g_test_add_func("/rcu/torture/10readers", gtest_stress_10_1);
} else {
g_test_add_func("/rcu/torture/1reader", gtest_stress_1_5);
g_test_add_func("/rcu/torture/10readers", gtest_stress_10_5);
}
return g_test_run();
}
if (argc >= 2) {
nreaders = strtoul(argv[1], NULL, 0);
}
if (argc > 3) {
duration = strtoul(argv[3], NULL, 0);
}
if (argc < 3 || strcmp(argv[2], "stress") == 0) {
stresstest(nreaders, duration);
} else if (strcmp(argv[2], "rperf") == 0) {
rperftest(nreaders, duration);
} else if (strcmp(argv[2], "uperf") == 0) {
uperftest(nreaders, duration);
} else if (strcmp(argv[2], "perf") == 0) {
perftest(nreaders, duration);
}
usage(argc, argv);
return 0;
}

View File

@ -17,3 +17,4 @@ util-obj-y += throttle.o
util-obj-y += getauxval.o
util-obj-y += readline.o
util-obj-y += rfifolock.o
util-obj-y += rcu.o

View File

@ -307,11 +307,13 @@ static inline void futex_wait(QemuEvent *ev, unsigned val)
#else
static inline void futex_wake(QemuEvent *ev, int n)
{
pthread_mutex_lock(&ev->lock);
if (n == 1) {
pthread_cond_signal(&ev->cond);
} else {
pthread_cond_broadcast(&ev->cond);
}
pthread_mutex_unlock(&ev->lock);
}
static inline void futex_wait(QemuEvent *ev, unsigned val)

291
util/rcu.c Normal file
View File

@ -0,0 +1,291 @@
/*
* urcu-mb.c
*
* Userspace RCU library with explicit memory barriers
*
* Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
* Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
* Copyright 2015 Red Hat, Inc.
*
* Ported to QEMU by Paolo Bonzini <pbonzini@redhat.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* IBM's contributions to this file may be relicensed under LGPLv2 or later.
*/
#include "qemu-common.h"
#include <stdio.h>
#include <assert.h>
#include <stdlib.h>
#include <stdint.h>
#include <errno.h>
#include "qemu/rcu.h"
#include "qemu/atomic.h"
#include "qemu/thread.h"
/*
* Global grace period counter. Bit 0 is always one in rcu_gp_ctr.
* Bits 1 and above are defined in synchronize_rcu.
*/
#define RCU_GP_LOCKED (1UL << 0)
#define RCU_GP_CTR (1UL << 1)
unsigned long rcu_gp_ctr = RCU_GP_LOCKED;
QemuEvent rcu_gp_event;
static QemuMutex rcu_gp_lock;
/*
* Check whether a quiescent state was crossed between the beginning of
* update_counter_and_wait and now.
*/
static inline int rcu_gp_ongoing(unsigned long *ctr)
{
unsigned long v;
v = atomic_read(ctr);
return v && (v != rcu_gp_ctr);
}
/* Written to only by each individual reader. Read by both the reader and the
* writers.
*/
__thread struct rcu_reader_data rcu_reader;
/* Protected by rcu_gp_lock. */
typedef QLIST_HEAD(, rcu_reader_data) ThreadList;
static ThreadList registry = QLIST_HEAD_INITIALIZER(registry);
/* Wait for previous parity/grace period to be empty of readers. */
static void wait_for_readers(void)
{
ThreadList qsreaders = QLIST_HEAD_INITIALIZER(qsreaders);
struct rcu_reader_data *index, *tmp;
for (;;) {
/* We want to be notified of changes made to rcu_gp_ongoing
* while we walk the list.
*/
qemu_event_reset(&rcu_gp_event);
/* Instead of using atomic_mb_set for index->waiting, and
* atomic_mb_read for index->ctr, memory barriers are placed
* manually since writes to different threads are independent.
* atomic_mb_set has a smp_wmb before...
*/
smp_wmb();
QLIST_FOREACH(index, &registry, node) {
atomic_set(&index->waiting, true);
}
/* ... and a smp_mb after. */
smp_mb();
QLIST_FOREACH_SAFE(index, &registry, node, tmp) {
if (!rcu_gp_ongoing(&index->ctr)) {
QLIST_REMOVE(index, node);
QLIST_INSERT_HEAD(&qsreaders, index, node);
/* No need for mb_set here, worst of all we
* get some extra futex wakeups.
*/
atomic_set(&index->waiting, false);
}
}
/* atomic_mb_read has smp_rmb after. */
smp_rmb();
if (QLIST_EMPTY(&registry)) {
break;
}
/* Wait for one thread to report a quiescent state and
* try again.
*/
qemu_event_wait(&rcu_gp_event);
}
/* put back the reader list in the registry */
QLIST_SWAP(&registry, &qsreaders, node);
}
void synchronize_rcu(void)
{
qemu_mutex_lock(&rcu_gp_lock);
if (!QLIST_EMPTY(&registry)) {
/* In either case, the atomic_mb_set below blocks stores that free
* old RCU-protected pointers.
*/
if (sizeof(rcu_gp_ctr) < 8) {
/* For architectures with 32-bit longs, a two-subphases algorithm
* ensures we do not encounter overflow bugs.
*
* Switch parity: 0 -> 1, 1 -> 0.
*/
atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
wait_for_readers();
atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
} else {
/* Increment current grace period. */
atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
}
wait_for_readers();
}
qemu_mutex_unlock(&rcu_gp_lock);
}
#define RCU_CALL_MIN_SIZE 30
/* Multi-producer, single-consumer queue based on urcu/static/wfqueue.h
* from liburcu. Note that head is only used by the consumer.
*/
static struct rcu_head dummy;
static struct rcu_head *head = &dummy, **tail = &dummy.next;
static int rcu_call_count;
static QemuEvent rcu_call_ready_event;
static void enqueue(struct rcu_head *node)
{
struct rcu_head **old_tail;
node->next = NULL;
old_tail = atomic_xchg(&tail, &node->next);
atomic_mb_set(old_tail, node);
}
static struct rcu_head *try_dequeue(void)
{
struct rcu_head *node, *next;
retry:
/* Test for an empty list, which we do not expect. Note that for
* the consumer head and tail are always consistent. The head
* is consistent because only the consumer reads/writes it.
* The tail, because it is the first step in the enqueuing.
* It is only the next pointers that might be inconsistent.
*/
if (head == &dummy && atomic_mb_read(&tail) == &dummy.next) {
abort();
}
/* If the head node has NULL in its next pointer, the value is
* wrong and we need to wait until its enqueuer finishes the update.
*/
node = head;
next = atomic_mb_read(&head->next);
if (!next) {
return NULL;
}
/* Since we are the sole consumer, and we excluded the empty case
* above, the queue will always have at least two nodes: the
* dummy node, and the one being removed. So we do not need to update
* the tail pointer.
*/
head = next;
/* If we dequeued the dummy node, add it back at the end and retry. */
if (node == &dummy) {
enqueue(node);
goto retry;
}
return node;
}
static void *call_rcu_thread(void *opaque)
{
struct rcu_head *node;
for (;;) {
int tries = 0;
int n = atomic_read(&rcu_call_count);
/* Heuristically wait for a decent number of callbacks to pile up.
* Fetch rcu_call_count now, we only must process elements that were
* added before synchronize_rcu() starts.
*/
while (n < RCU_CALL_MIN_SIZE && ++tries <= 5) {
g_usleep(100000);
qemu_event_reset(&rcu_call_ready_event);
n = atomic_read(&rcu_call_count);
if (n < RCU_CALL_MIN_SIZE) {
qemu_event_wait(&rcu_call_ready_event);
n = atomic_read(&rcu_call_count);
}
}
atomic_sub(&rcu_call_count, n);
synchronize_rcu();
while (n > 0) {
node = try_dequeue();
while (!node) {
qemu_event_reset(&rcu_call_ready_event);
node = try_dequeue();
if (!node) {
qemu_event_wait(&rcu_call_ready_event);
node = try_dequeue();
}
}
n--;
node->func(node);
}
}
abort();
}
void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node))
{
node->func = func;
enqueue(node);
atomic_inc(&rcu_call_count);
qemu_event_set(&rcu_call_ready_event);
}
void rcu_register_thread(void)
{
assert(rcu_reader.ctr == 0);
qemu_mutex_lock(&rcu_gp_lock);
QLIST_INSERT_HEAD(&registry, &rcu_reader, node);
qemu_mutex_unlock(&rcu_gp_lock);
}
void rcu_unregister_thread(void)
{
qemu_mutex_lock(&rcu_gp_lock);
QLIST_REMOVE(&rcu_reader, node);
qemu_mutex_unlock(&rcu_gp_lock);
}
static void __attribute__((__constructor__)) rcu_init(void)
{
QemuThread thread;
qemu_mutex_init(&rcu_gp_lock);
qemu_event_init(&rcu_gp_event, true);
qemu_event_init(&rcu_call_ready_event, false);
qemu_thread_create(&thread, "call_rcu", call_rcu_thread,
NULL, QEMU_THREAD_DETACHED);
rcu_register_thread();
}