diff options
author | Andrew Morton <akpm@osdl.org> | 2004-06-23 18:49:33 -0700 |
---|---|---|
committer | Linus Torvalds <torvalds@ppc970.osdl.org> | 2004-06-23 18:49:33 -0700 |
commit | 5c60169a01af712b0b1aa1f5db3fcb8776b22d9f (patch) | |
tree | a6c9a380b2decc4f07dd4847d40087637c4e0805 /kernel | |
parent | b884e83821944633fb02295fd0470398090ac782 (diff) | |
download | history-5c60169a01af712b0b1aa1f5db3fcb8776b22d9f.tar.gz |
[PATCH] rcu lock update: Add per-cpu batch counter
From: Manfred Spraul <manfred@colorfullife.com>
Below is the one of my patches from my rcu lock update. Jack Steiner tested
the first one on a 512p and it resolved the rcu cache line trashing. All were
tested on osdl with STP.
Step one for reducing cacheline trashing within rcupdate.c:
The current code uses the rcu_cpu_mask bitmap both for keeping track of the
cpus that haven't gone through a quiescent state and for checking if a cpu
should look for quiescent states. The bitmap is frequently changed and the
check is done by polling - together this causes cache line trashing.
If it's cheaper to access a (mostly) read-only cacheline than a cacheline that
is frequently dirtied, then it's possible to reduce the trashing by splitting
the rcu_cpu_mask bitmap into two cachelines:
The patch adds a generation counter and moves it into a separate cacheline.
This allows to removes all accesses to rcu_cpumask (in the read-write
cacheline) from rcu_pending and at least 50% of the accesses from
rcu_check_quiescent_state. rcu_pending and all but one call per cpu to
rcu_check_quiescent_state access the read-only cacheline. Probably not enough
for 512p, but it's a start, just for 128 byte more memory use, without slowing
down rcu grace periods. Obviously the read-only cacheline is not really
read-only: it's written once per grace period to indicate that a new grace
period is running.
Tests on an 8-way Pentium III with reaim showed some improvement:
oprofile hits:
Reference: http://khack.osdl.org/stp/293075/
Hits %
23741 0.0994 rcu_pending
19057 0.0798 rcu_check_quiescent_state
6530 0.0273 rcu_check_callbacks
Patched: http://khack.osdl.org/stp/293076/
8291 0.0579 rcu_pending
5475 0.0382 rcu_check_quiescent_state
3604 0.0252 rcu_check_callbacks
The total runtime differs between both runs, thus the % number must
be compared: Around 50% faster. I've uninlined rcu_pending for the
test.
Tested with reaim and kernbench.
Description:
- per-cpu quiescbatch and qs_pending fields introduced: quiescbatch contains
the number of the last quiescent period that the cpu has seen and qs_pending
is set if the cpu has not yet reported the quiescent state for the current
period. With these two fields a cpu can test if it should report a
quiescent state without having to look at the frequently written
rcu_cpu_mask bitmap.
- curbatch split into two fields: rcu_ctrlblk.batch.completed and
rcu_ctrlblk.batch.cur. This makes it possible to figure out if a grace
period is running (completed != cur) without accessing the rcu_cpu_mask
bitmap.
- rcu_ctrlblk.maxbatch removed and replaced with a true/false next_pending
flag: next_pending=1 means that another grace period should be started
immediately after the end of the current period. Previously, this was
achieved by maxbatch: curbatch==maxbatch means don't start, curbatch!=
maxbatch means start. A flag improves the readability: The only possible
values for maxbatch were curbatch and curbatch+1.
- rcu_ctrlblk split into two cachelines for better performance.
- common code from rcu_offline_cpu and rcu_check_quiescent_state merged into
cpu_quiet.
- rcu_offline_cpu: replace spin_lock_irq with spin_lock_bh, there are no
accesses from irq context (and there are accesses to the spinlock with
enabled interrupts from tasklet context).
- rcu_restart_cpu introduced, s390 should call it after changing nohz:
Theoretically the global batch counter could wrap around and end up at
RCU_quiescbatch(cpu). Then the cpu would not look for a quiescent state and
rcu would lock up.
Signed-off-by: Manfred Spraul <manfred@colorfullife.com>
Signed-off-by: Andrew Morton <akpm@osdl.org>
Signed-off-by: Linus Torvalds <torvalds@osdl.org>
Diffstat (limited to 'kernel')
-rw-r--r-- | kernel/rcupdate.c | 142 |
1 files changed, 88 insertions, 54 deletions
diff --git a/kernel/rcupdate.c b/kernel/rcupdate.c index 13a1b5a5825f69..d665d001e030f0 100644 --- a/kernel/rcupdate.c +++ b/kernel/rcupdate.c @@ -47,8 +47,8 @@ /* Definition for rcupdate control block. */ struct rcu_ctrlblk rcu_ctrlblk = - { .mutex = SPIN_LOCK_UNLOCKED, .curbatch = 1, - .maxbatch = 1, .rcu_cpu_mask = CPU_MASK_NONE }; + { .batch = { .cur = -300, .completed = -300 }, + .state = {.mutex = SPIN_LOCK_UNLOCKED, .rcu_cpu_mask = CPU_MASK_NONE } }; DEFINE_PER_CPU(struct rcu_data, rcu_data) = { 0L }; /* Fake initialization required by compiler */ @@ -97,25 +97,59 @@ static void rcu_do_batch(struct list_head *list) } /* + * Grace period handling: + * The grace period handling consists out of two steps: + * - A new grace period is started. + * This is done by rcu_start_batch. The start is not broadcasted to + * all cpus, they must pick this up by comparing rcu_ctrlblk.batch.cur with + * RCU_quiescbatch(cpu). All cpus are recorded in the + * rcu_ctrlblk.state.rcu_cpu_mask bitmap. + * - All cpus must go through a quiescent state. + * Since the start of the grace period is not broadcasted, at least two + * calls to rcu_check_quiescent_state are required: + * The first call just notices that a new grace period is running. The + * following calls check if there was a quiescent state since the beginning + * of the grace period. If so, it updates rcu_ctrlblk.state.rcu_cpu_mask. If + * the bitmap is empty, then the grace period is completed. + * rcu_check_quiescent_state calls rcu_start_batch(0) to start the next grace + * period (if necessary). + */ +/* * Register a new batch of callbacks, and start it up if there is currently no * active batch and the batch to be registered has not already occurred. - * Caller must hold the rcu_ctrlblk lock. + * Caller must hold the rcu_ctrlblk.state lock. */ -static void rcu_start_batch(long newbatch) +static void rcu_start_batch(int next_pending) { cpumask_t active; - if (rcu_batch_before(rcu_ctrlblk.maxbatch, newbatch)) { - rcu_ctrlblk.maxbatch = newbatch; + if (next_pending) + rcu_ctrlblk.state.next_pending = 1; + + if (rcu_ctrlblk.state.next_pending && + rcu_ctrlblk.batch.completed == rcu_ctrlblk.batch.cur) { + rcu_ctrlblk.state.next_pending = 0; + /* Can't change, since spin lock held. */ + active = nohz_cpu_mask; + cpus_complement(active); + cpus_and(rcu_ctrlblk.state.rcu_cpu_mask, cpu_online_map, active); + rcu_ctrlblk.batch.cur++; } - if (rcu_batch_before(rcu_ctrlblk.maxbatch, rcu_ctrlblk.curbatch) || - !cpus_empty(rcu_ctrlblk.rcu_cpu_mask)) { - return; +} + +/* + * cpu went through a quiescent state since the beginning of the grace period. + * Clear it from the cpu mask and complete the grace period if it was the last + * cpu. Start another grace period if someone has further entries pending + */ +static void cpu_quiet(int cpu) +{ + cpu_clear(cpu, rcu_ctrlblk.state.rcu_cpu_mask); + if (cpus_empty(rcu_ctrlblk.state.rcu_cpu_mask)) { + /* batch completed ! */ + rcu_ctrlblk.batch.completed = rcu_ctrlblk.batch.cur; + rcu_start_batch(0); } - /* Can't change, since spin lock held. */ - active = nohz_cpu_mask; - cpus_complement(active); - cpus_and(rcu_ctrlblk.rcu_cpu_mask, cpu_online_map, active); } /* @@ -127,7 +161,19 @@ static void rcu_check_quiescent_state(void) { int cpu = smp_processor_id(); - if (!cpu_isset(cpu, rcu_ctrlblk.rcu_cpu_mask)) + if (RCU_quiescbatch(cpu) != rcu_ctrlblk.batch.cur) { + /* new grace period: record qsctr value. */ + RCU_qs_pending(cpu) = 1; + RCU_last_qsctr(cpu) = RCU_qsctr(cpu); + RCU_quiescbatch(cpu) = rcu_ctrlblk.batch.cur; + return; + } + + /* Grace period already completed for this cpu? + * qs_pending is checked instead of the actual bitmap to avoid + * cacheline trashing. + */ + if (!RCU_qs_pending(cpu)) return; /* @@ -135,27 +181,19 @@ static void rcu_check_quiescent_state(void) * we may miss one quiescent state of that CPU. That is * tolerable. So no need to disable interrupts. */ - if (RCU_last_qsctr(cpu) == RCU_QSCTR_INVALID) { - RCU_last_qsctr(cpu) = RCU_qsctr(cpu); - return; - } if (RCU_qsctr(cpu) == RCU_last_qsctr(cpu)) return; + RCU_qs_pending(cpu) = 0; - spin_lock(&rcu_ctrlblk.mutex); - if (!cpu_isset(cpu, rcu_ctrlblk.rcu_cpu_mask)) - goto out_unlock; - - cpu_clear(cpu, rcu_ctrlblk.rcu_cpu_mask); - RCU_last_qsctr(cpu) = RCU_QSCTR_INVALID; - if (!cpus_empty(rcu_ctrlblk.rcu_cpu_mask)) - goto out_unlock; - - rcu_ctrlblk.curbatch++; - rcu_start_batch(rcu_ctrlblk.maxbatch); + spin_lock(&rcu_ctrlblk.state.mutex); + /* + * RCU_quiescbatch/batch.cur and the cpu bitmap can come out of sync + * during cpu startup. Ignore the quiescent state. + */ + if (likely(RCU_quiescbatch(cpu) == rcu_ctrlblk.batch.cur)) + cpu_quiet(cpu); -out_unlock: - spin_unlock(&rcu_ctrlblk.mutex); + spin_unlock(&rcu_ctrlblk.state.mutex); } @@ -185,25 +223,11 @@ static void rcu_offline_cpu(int cpu) * we can block indefinitely waiting for it, so flush * it here */ - spin_lock_irq(&rcu_ctrlblk.mutex); - if (cpus_empty(rcu_ctrlblk.rcu_cpu_mask)) - goto unlock; - - cpu_clear(cpu, rcu_ctrlblk.rcu_cpu_mask); - if (cpus_empty(rcu_ctrlblk.rcu_cpu_mask)) { - rcu_ctrlblk.curbatch++; - /* We may avoid calling start batch if - * we are starting the batch only - * because of the DEAD CPU (the current - * CPU will start a new batch anyway for - * the callbacks we will move to current CPU). - * However, we will avoid this optimisation - * for now. - */ - rcu_start_batch(rcu_ctrlblk.maxbatch); - } + spin_lock_bh(&rcu_ctrlblk.state.mutex); + if (rcu_ctrlblk.batch.cur != rcu_ctrlblk.batch.completed) + cpu_quiet(cpu); unlock: - spin_unlock_irq(&rcu_ctrlblk.mutex); + spin_unlock_bh(&rcu_ctrlblk.state.mutex); rcu_move_batch(&RCU_curlist(cpu)); rcu_move_batch(&RCU_nxtlist(cpu)); @@ -213,6 +237,14 @@ unlock: #endif +void rcu_restart_cpu(int cpu) +{ + spin_lock_bh(&rcu_ctrlblk.state.mutex); + RCU_quiescbatch(cpu) = rcu_ctrlblk.batch.completed; + RCU_qs_pending(cpu) = 0; + spin_unlock_bh(&rcu_ctrlblk.state.mutex); +} + /* * This does the RCU processing work from tasklet context. */ @@ -222,7 +254,7 @@ static void rcu_process_callbacks(unsigned long unused) LIST_HEAD(list); if (!list_empty(&RCU_curlist(cpu)) && - rcu_batch_after(rcu_ctrlblk.curbatch, RCU_batch(cpu))) { + !rcu_batch_before(rcu_ctrlblk.batch.completed,RCU_batch(cpu))) { __list_splice(&RCU_curlist(cpu), &list); INIT_LIST_HEAD(&RCU_curlist(cpu)); } @@ -236,10 +268,10 @@ static void rcu_process_callbacks(unsigned long unused) /* * start the next batch of callbacks */ - spin_lock(&rcu_ctrlblk.mutex); - RCU_batch(cpu) = rcu_ctrlblk.curbatch + 1; - rcu_start_batch(RCU_batch(cpu)); - spin_unlock(&rcu_ctrlblk.mutex); + spin_lock(&rcu_ctrlblk.state.mutex); + RCU_batch(cpu) = rcu_ctrlblk.batch.cur + 1; + rcu_start_batch(1); + spin_unlock(&rcu_ctrlblk.state.mutex); } else { local_irq_enable(); } @@ -263,6 +295,8 @@ static void __devinit rcu_online_cpu(int cpu) tasklet_init(&RCU_tasklet(cpu), rcu_process_callbacks, 0UL); INIT_LIST_HEAD(&RCU_nxtlist(cpu)); INIT_LIST_HEAD(&RCU_curlist(cpu)); + RCU_quiescbatch(cpu) = rcu_ctrlblk.batch.completed; + RCU_qs_pending(cpu) = 0; } static int __devinit rcu_cpu_notify(struct notifier_block *self, |