From: Dipankar Sarma This patch makes RCU callbacks friendly to scheduler. It helps low latency by limiting the number of callbacks invoked per tasklet handler. Since we cannot schedule during a single softirq handler, this reduces size of non-preemptible section significantly, specially under heavy RCU updates. The limiting is done through a kernel parameter rcupdate.maxbatch which is the maximum number of RCU callbacks to invoke during a single tasklet handler. Signed-off-by: Dipankar Sarma Signed-off-by: Andrew Morton --- 25-akpm/include/linux/rcupdate.h | 7 +++++++ 25-akpm/kernel/rcupdate.c | 27 +++++++++++++++++++-------- 2 files changed, 26 insertions(+), 8 deletions(-) diff -puN include/linux/rcupdate.h~rcu-low-latency-rcu include/linux/rcupdate.h --- 25/include/linux/rcupdate.h~rcu-low-latency-rcu 2004-07-31 17:25:24.073334184 -0700 +++ 25-akpm/include/linux/rcupdate.h 2004-07-31 17:25:24.079333272 -0700 @@ -99,6 +99,8 @@ struct rcu_data { struct rcu_head **nxttail; struct rcu_head *curlist; struct rcu_head **curtail; + struct rcu_head *donelist; + struct rcu_head **donetail; }; DECLARE_PER_CPU(struct rcu_data, rcu_data); @@ -113,6 +115,8 @@ extern struct rcu_ctrlblk rcu_ctrlblk; #define RCU_curlist(cpu) (per_cpu(rcu_data, (cpu)).curlist) #define RCU_nxttail(cpu) (per_cpu(rcu_data, (cpu)).nxttail) #define RCU_curtail(cpu) (per_cpu(rcu_data, (cpu)).curtail) +#define RCU_donelist(cpu) (per_cpu(rcu_data, (cpu)).donelist) +#define RCU_donetail(cpu) (per_cpu(rcu_data, (cpu)).donetail) static inline int rcu_pending(int cpu) { @@ -127,6 +131,9 @@ static inline int rcu_pending(int cpu) if (!RCU_curlist(cpu) && RCU_nxtlist(cpu)) return 1; + if (RCU_donelist(cpu)) + return 1; + /* The rcu core waits for a quiescent state from the cpu */ if (RCU_quiescbatch(cpu) != rcu_ctrlblk.cur || RCU_qs_pending(cpu)) return 1; diff -puN kernel/rcupdate.c~rcu-low-latency-rcu kernel/rcupdate.c --- 25/kernel/rcupdate.c~rcu-low-latency-rcu 2004-07-31 17:25:24.074334032 -0700 +++ 25-akpm/kernel/rcupdate.c 2004-07-31 17:25:24.078333424 -0700 @@ -40,6 +40,7 @@ #include #include #include +#include #include #include #include @@ -63,6 +64,7 @@ DEFINE_PER_CPU(struct rcu_data, rcu_data /* Fake initialization required by compiler */ static DEFINE_PER_CPU(struct tasklet_struct, rcu_tasklet) = {NULL}; #define RCU_tasklet(cpu) (per_cpu(rcu_tasklet, cpu)) +static int maxbatch = 10; /** * call_rcu - Queue an RCU update request. @@ -93,15 +95,23 @@ void fastcall call_rcu(struct rcu_head * * Invoke the completed RCU callbacks. They are expected to be in * a per-cpu list. */ -static void rcu_do_batch(struct rcu_head *list) +static void rcu_do_batch(int cpu) { - struct rcu_head *next; + struct rcu_head *next, *list; + int count = 0; + list = RCU_donelist(cpu); while (list) { - next = list->next; + next = RCU_donelist(cpu) = list->next; list->func(list); list = next; + if (++count >= maxbatch) + break; } + if (!RCU_donelist(cpu)) + RCU_donetail(cpu) = &RCU_donelist(cpu); + else + tasklet_schedule(&RCU_tasklet(cpu)); } /* @@ -261,11 +271,11 @@ void rcu_restart_cpu(int cpu) static void rcu_process_callbacks(unsigned long unused) { int cpu = smp_processor_id(); - struct rcu_head *rcu_list = NULL; if (RCU_curlist(cpu) && !rcu_batch_before(rcu_ctrlblk.completed, RCU_batch(cpu))) { - rcu_list = RCU_curlist(cpu); + *RCU_donetail(cpu) = RCU_curlist(cpu); + RCU_donetail(cpu) = RCU_curtail(cpu); RCU_curlist(cpu) = NULL; RCU_curtail(cpu) = &RCU_curlist(cpu); } @@ -300,8 +310,8 @@ static void rcu_process_callbacks(unsign local_irq_enable(); } rcu_check_quiescent_state(); - if (rcu_list) - rcu_do_batch(rcu_list); + if (RCU_donelist(cpu)) + rcu_do_batch(cpu); } void rcu_check_callbacks(int cpu, int user) @@ -319,6 +329,7 @@ static void __devinit rcu_online_cpu(int tasklet_init(&RCU_tasklet(cpu), rcu_process_callbacks, 0UL); RCU_curtail(cpu) = &RCU_curlist(cpu); RCU_nxttail(cpu) = &RCU_nxtlist(cpu); + RCU_donetail(cpu) = &RCU_donelist(cpu); RCU_quiescbatch(cpu) = rcu_ctrlblk.completed; RCU_qs_pending(cpu) = 0; } @@ -388,6 +399,6 @@ void synchronize_kernel(void) wait_for_completion(&rcu.completion); } - +module_param(maxbatch, int, 0); EXPORT_SYMBOL(call_rcu); EXPORT_SYMBOL(synchronize_kernel); _