From: Nick Piggin Move wakeups out from under the rwsem's wait_lock spinlock. This reduces that lock's contention by a factor of around 10 on the NUMAQ running volanomark, however cacheline contention on the rwsem's "activity" drowns out these small improvements when using the i386 "optimised" rwsem: unpatched: 55802519 total 32.3097 23325323 default_idle 364458.1719 22084349 .text.lock.futex 82404.2873 2369107 queue_me 24678.1979 1875296 unqueue_me 9767.1667 1202258 .text.lock.rwsem 46240.6923 941801 finish_task_switch 7357.8203 787101 __wake_up 12298.4531 645252 drop_key_refs 13442.7500 362789 futex_wait 839.7894 333294 futex_wake 1487.9196 146797 rwsem_down_read_failed 436.8958 82788 .text.lock.dev 221.3583 81221 try_to_wake_up 133.5872 +rwsem-scale: 58120260 total 33.6458 25482132 default_idle 398158.3125 22774675 .text.lock.futex 84980.1306 2517797 queue_me 26227.0521 1953424 unqueue_me 10174.0833 1063068 finish_task_switch 8305.2188 834793 __wake_up 13043.6406 674570 drop_key_refs 14053.5417 371811 futex_wait 860.6736 343398 futex_wake 1533.0268 155419 try_to_wake_up 255.6234 114704 .text.lock.rwsem 4411.6923 The rwsem-spinlock implementation, however, is improved significantly more, and gets volanomark performance similar to the optimised rwsem. Although most users of the generic implementation probably aren't highly parallel systems, it appears this is the only implementation capable of protecting a writer from more than 32 768 readers, so it might becomore more relevant. unpatched: 30850964 total 18.1787 18986006 default_idle 296656.3438 3989183 .text.lock.rwsem_spinlock 40294.7778 2990161 .text.lock.futex 32501.7500 549707 finish_task_switch 4294.5859 535327 __down_read 3717.5486 452721 queue_me 4715.8438 439725 __up_read 9160.9375 396273 __wake_up 6191.7656 326595 unqueue_me 1701.0156 +rwsem-scale: 25378268 total 14.9537 13325514 default_idle 208211.1562 3675634 .text.lock.futex 39952.5435 2908629 .text.lock.rwsem_spinlock 28239.1165 628115 __down_read 4361.9097 607417 finish_task_switch 4745.4453 588031 queue_me 6125.3229 571169 __up_read 11899.3542 436795 __wake_up 6824.9219 416788 unqueue_me 2170.7708 --- 25-akpm/lib/rwsem-spinlock.c | 35 +++++++++++++++++++++-------------- 25-akpm/lib/rwsem.c | 40 +++++++++++++++++++--------------------- 2 files changed, 40 insertions(+), 35 deletions(-) diff -puN lib/rwsem.c~scale-rwsem-take-2 lib/rwsem.c --- 25/lib/rwsem.c~scale-rwsem-take-2 2004-04-14 19:39:34.188290888 -0700 +++ 25-akpm/lib/rwsem.c 2004-04-14 19:39:34.194289976 -0700 @@ -42,14 +42,16 @@ void rwsemtrace(struct rw_semaphore *sem * - woken process blocks are discarded from the list after having flags * zeroised * - writers are only woken if wakewrite is non-zero + * + * The spinlock will be dropped by this function. */ static inline struct rw_semaphore * __rwsem_do_wake(struct rw_semaphore *sem, int wakewrite) { + LIST_HEAD(wake_list); struct rwsem_waiter *waiter; - struct list_head *next; signed long oldcount; - int woken, loop; + int woken; rwsemtrace(sem, "Entering __rwsem_do_wake"); @@ -73,8 +75,7 @@ try_again: if (!(waiter->flags & RWSEM_WAITING_FOR_WRITE)) goto readers_only; - list_del(&waiter->list); - complete(&waiter->granted); + list_move_tail(&waiter->list, &wake_list); goto out; /* don't want to wake any writers */ @@ -91,32 +92,29 @@ dont_wake_writers: readers_only: woken = 0; do { + list_move_tail(&waiter->list, &wake_list); woken++; - if (waiter->list.next == &sem->wait_list) + if (list_empty(&sem->wait_list)) break; - waiter = list_entry(waiter->list.next, - struct rwsem_waiter, list); + waiter = list_entry(sem->wait_list.next, + struct rwsem_waiter, list); } while (waiter->flags & RWSEM_WAITING_FOR_READ); - loop = woken; woken *= RWSEM_ACTIVE_BIAS - RWSEM_WAITING_BIAS; woken -= RWSEM_ACTIVE_BIAS; rwsem_atomic_add(woken, sem); - next = sem->wait_list.next; - for (; loop > 0; loop--) { - waiter = list_entry(next, struct rwsem_waiter, list); - next = waiter->list.next; +out: + spin_unlock(&sem->wait_lock); + while (!list_empty(&wake_list)) { + waiter = list_entry(wake_list.next, struct rwsem_waiter, list); + list_del(&waiter->list); complete(&waiter->granted); } - sem->wait_list.next = next; - next->prev = &sem->wait_list; - -out: rwsemtrace(sem, "Leaving __rwsem_do_wake"); return sem; @@ -138,10 +136,10 @@ rwsem_down_failed_common(struct rw_semap signed long count; /* set up my own style of waitqueue */ - spin_lock(&sem->wait_lock); waiter->task = tsk; init_completion(&waiter->granted); + spin_lock(&sem->wait_lock); list_add_tail(&waiter->list, &sem->wait_list); /* note that we're now waiting on the lock, but no longer actively @@ -154,8 +152,8 @@ rwsem_down_failed_common(struct rw_semap */ if (!(count & RWSEM_ACTIVE_MASK)) sem = __rwsem_do_wake(sem, 1); - - spin_unlock(&sem->wait_lock); + else + spin_unlock(&sem->wait_lock); /* wait to be given the lock */ wait_for_completion(&waiter->granted); @@ -211,8 +209,8 @@ struct rw_semaphore fastcall *rwsem_wake /* do nothing if list empty */ if (!list_empty(&sem->wait_list)) sem = __rwsem_do_wake(sem, 1); - - spin_unlock(&sem->wait_lock); + else + spin_unlock(&sem->wait_lock); rwsemtrace(sem, "Leaving rwsem_wake"); diff -puN lib/rwsem-spinlock.c~scale-rwsem-take-2 lib/rwsem-spinlock.c --- 25/lib/rwsem-spinlock.c~scale-rwsem-take-2 2004-04-14 19:39:34.189290736 -0700 +++ 25-akpm/lib/rwsem-spinlock.c 2004-04-14 19:39:34.195289824 -0700 @@ -55,6 +55,7 @@ void fastcall init_rwsem(struct rw_semap static inline struct rw_semaphore * __rwsem_do_wake(struct rw_semaphore *sem, int wakewrite) { + LIST_HEAD(wake_list); struct rwsem_waiter *waiter; int woken; @@ -74,8 +75,7 @@ __rwsem_do_wake(struct rw_semaphore *sem */ if (waiter->flags & RWSEM_WAITING_FOR_WRITE) { sem->activity = -1; - list_del(&waiter->list); - complete(&waiter->granted); + list_move_tail(&waiter->list, &wake_list); goto out; } @@ -84,25 +84,31 @@ __rwsem_do_wake(struct rw_semaphore *sem dont_wake_writers: woken = 0; while (waiter->flags & RWSEM_WAITING_FOR_READ) { - struct list_head *next = waiter->list.next; - - list_del(&waiter->list); - complete(&waiter->granted); + list_move_tail(&waiter->list, &wake_list); woken++; if (list_empty(&sem->wait_list)) break; - waiter = list_entry(next, struct rwsem_waiter, list); + waiter = list_entry(sem->wait_list.next, + struct rwsem_waiter, list); } sem->activity += woken; out: + spin_unlock(&sem->wait_lock); + while (!list_empty(&wake_list)) { + waiter = list_entry(wake_list.next, struct rwsem_waiter, list); + list_del(&waiter->list); + complete(&waiter->granted); + } + rwsemtrace(sem, "Leaving __rwsem_do_wake"); return sem; } /* - * wake a single writer + * wake a single writer. + * called with wait_lock locked and unlocks it in the process. */ static inline struct rw_semaphore * __rwsem_wake_one_writer(struct rw_semaphore *sem) @@ -113,6 +119,7 @@ __rwsem_wake_one_writer(struct rw_semaph waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list); list_del(&waiter->list); + spin_unlock(&sem->wait_lock); complete(&waiter->granted); return sem; @@ -242,8 +249,8 @@ void fastcall __up_read(struct rw_semaph if (--sem->activity == 0 && !list_empty(&sem->wait_list)) sem = __rwsem_wake_one_writer(sem); - - spin_unlock(&sem->wait_lock); + else + spin_unlock(&sem->wait_lock); rwsemtrace(sem, "Leaving __up_read"); } @@ -260,8 +267,8 @@ void fastcall __up_write(struct rw_semap sem->activity = 0; if (!list_empty(&sem->wait_list)) sem = __rwsem_do_wake(sem, 1); - - spin_unlock(&sem->wait_lock); + else + spin_unlock(&sem->wait_lock); rwsemtrace(sem, "Leaving __up_write"); } @@ -279,8 +286,8 @@ void fastcall __downgrade_write(struct r sem->activity = 1; if (!list_empty(&sem->wait_list)) sem = __rwsem_do_wake(sem, 0); - - spin_unlock(&sem->wait_lock); + else + spin_unlock(&sem->wait_lock); rwsemtrace(sem, "Leaving __downgrade_write"); } _