From: Manfred Spraul Step two for reducing cacheline trashing within rcupdate.c: rcu_process_callbacks always acquires rcu_ctrlblk.state.mutex and calls rcu_start_batch, even if the batch is already running or already scheduled to run. This can be avoided with a sequence lock: A sequence lock allows to read the current batch number and next_pending atomically. If next_pending is already set, then there is no need to acquire the global mutex. This means that for each grace period, there will be - one write access to the rcu_ctrlblk.batch cacheline - lots of read accesses to rcu_ctrlblk.batch (3-10*cpus_online()). Behavior similar to the jiffies cacheline, shouldn't be a problem. - cpus_online()+1 write accesses to rcu_ctrlblk.state, all of them starting with spin_lock(&rcu_ctrlblk.state.mutex). For large enough cpus_online() this will be a problem, but all except two of the spin_lock calls only protect the rcu_cpu_mask bitmap, thus a hierarchical bitmap would allow to split the write accesses to multiple cachelines. Tested on an 8-way with reaim. Unfortunately it probably won't help with Jack Steiner's 'ls' test since in this test only one cpu generates rcu entries. Signed-off-by: Manfred Spraul Signed-off-by: Andrew Morton --- 25-akpm/include/linux/rcupdate.h | 6 +++++- 25-akpm/kernel/rcupdate.c | 29 +++++++++++++++++++++-------- 2 files changed, 26 insertions(+), 9 deletions(-) diff -puN include/linux/rcupdate.h~rcu-lock-update-use-a-sequence-lock-for-starting-batches include/linux/rcupdate.h --- 25/include/linux/rcupdate.h~rcu-lock-update-use-a-sequence-lock-for-starting-batches Wed May 26 15:56:11 2004 +++ 25-akpm/include/linux/rcupdate.h Wed May 26 15:56:11 2004 @@ -41,6 +41,7 @@ #include #include #include +#include /** * struct rcu_head - callback structure for use with RCU @@ -69,11 +70,14 @@ struct rcu_ctrlblk { struct { long cur; /* Current batch number. */ long completed; /* Number of the last completed batch */ + int next_pending; /* Is the next batch already waiting? */ + seqcount_t lock; /* for atomically reading cur and */ + /* next_pending. Spinlock not used, */ + /* protected by state.mutex */ } batch ____cacheline_maxaligned_in_smp; /* remaining members: bookkeeping of the progress of the grace period */ struct { spinlock_t mutex; /* Guard this struct */ - int next_pending; /* Is the next batch already waiting? */ cpumask_t rcu_cpu_mask; /* CPUs that need to switch */ /* in order for current batch to proceed. */ } state ____cacheline_maxaligned_in_smp; diff -puN kernel/rcupdate.c~rcu-lock-update-use-a-sequence-lock-for-starting-batches kernel/rcupdate.c --- 25/kernel/rcupdate.c~rcu-lock-update-use-a-sequence-lock-for-starting-batches Wed May 26 15:56:11 2004 +++ 25-akpm/kernel/rcupdate.c Wed May 26 15:56:11 2004 @@ -47,7 +47,7 @@ /* Definition for rcupdate control block. */ struct rcu_ctrlblk rcu_ctrlblk = - { .batch = { .cur = -300, .completed = -300 }, + { .batch = { .cur = -300, .completed = -300 , .lock = SEQCNT_ZERO }, .state = {.mutex = SPIN_LOCK_UNLOCKED, .rcu_cpu_mask = CPU_MASK_NONE } }; DEFINE_PER_CPU(struct rcu_data, rcu_data) = { 0L }; @@ -124,16 +124,18 @@ static void rcu_start_batch(int next_pen cpumask_t active; if (next_pending) - rcu_ctrlblk.state.next_pending = 1; + rcu_ctrlblk.batch.next_pending = 1; - if (rcu_ctrlblk.state.next_pending && + if (rcu_ctrlblk.batch.next_pending && rcu_ctrlblk.batch.completed == rcu_ctrlblk.batch.cur) { - rcu_ctrlblk.state.next_pending = 0; /* Can't change, since spin lock held. */ active = nohz_cpu_mask; cpus_complement(active); cpus_and(rcu_ctrlblk.state.rcu_cpu_mask, cpu_online_map, active); + write_seqcount_begin(&rcu_ctrlblk.batch.lock); + rcu_ctrlblk.batch.next_pending = 0; rcu_ctrlblk.batch.cur++; + write_seqcount_end(&rcu_ctrlblk.batch.lock); } } @@ -261,6 +263,8 @@ static void rcu_process_callbacks(unsign local_irq_disable(); if (!list_empty(&RCU_nxtlist(cpu)) && list_empty(&RCU_curlist(cpu))) { + int next_pending, seq; + __list_splice(&RCU_nxtlist(cpu), &RCU_curlist(cpu)); INIT_LIST_HEAD(&RCU_nxtlist(cpu)); local_irq_enable(); @@ -268,10 +272,19 @@ static void rcu_process_callbacks(unsign /* * start the next batch of callbacks */ - spin_lock(&rcu_ctrlblk.state.mutex); - RCU_batch(cpu) = rcu_ctrlblk.batch.cur + 1; - rcu_start_batch(1); - spin_unlock(&rcu_ctrlblk.state.mutex); + do { + seq = read_seqcount_begin(&rcu_ctrlblk.batch.lock); + /* determine batch number */ + RCU_batch(cpu) = rcu_ctrlblk.batch.cur + 1; + next_pending = rcu_ctrlblk.batch.next_pending; + } while (read_seqcount_retry(&rcu_ctrlblk.batch.lock, seq)); + + if (!next_pending) { + /* and start it/schedule start if it's a new batch */ + spin_lock(&rcu_ctrlblk.state.mutex); + rcu_start_batch(1); + spin_unlock(&rcu_ctrlblk.state.mutex); + } } else { local_irq_enable(); } _