From: Trond Myklebust RPC: Remove the rpc_queue_lock global spinlock. Replace it with per-rpc_queue spinlocks Signed-off-by: Trond Myklebust Signed-off-by: Andrew Morton --- 25-akpm/include/linux/sunrpc/sched.h | 4 ++ 25-akpm/net/sunrpc/sched.c | 69 +++++++++++++++-------------------- 2 files changed, 34 insertions(+), 39 deletions(-) diff -puN include/linux/sunrpc/sched.h~linux-2.6.8.1-50-rpc_queue_lock include/linux/sunrpc/sched.h --- 25/include/linux/sunrpc/sched.h~linux-2.6.8.1-50-rpc_queue_lock 2004-08-22 20:57:41.320341224 -0700 +++ 25-akpm/include/linux/sunrpc/sched.h 2004-08-22 20:57:41.325340464 -0700 @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -185,6 +186,7 @@ typedef void (*rpc_action)(struct rpc_ * RPC synchronization objects */ struct rpc_wait_queue { + spinlock_t lock; struct list_head tasks[RPC_NR_PRIORITY]; /* task queue for each priority level */ unsigned long cookie; /* cookie of last task serviced */ unsigned char maxpriority; /* maximum priority (0 if queue is not a priority queue) */ @@ -205,6 +207,7 @@ struct rpc_wait_queue { #ifndef RPC_DEBUG # define RPC_WAITQ_INIT(var,qname) { \ + .lock = SPIN_LOCK_UNLOCKED, \ .tasks = { \ [0] = LIST_HEAD_INIT(var.tasks[0]), \ [1] = LIST_HEAD_INIT(var.tasks[1]), \ @@ -213,6 +216,7 @@ struct rpc_wait_queue { } #else # define RPC_WAITQ_INIT(var,qname) { \ + .lock = SPIN_LOCK_UNLOCKED, \ .tasks = { \ [0] = LIST_HEAD_INIT(var.tasks[0]), \ [1] = LIST_HEAD_INIT(var.tasks[1]), \ diff -puN net/sunrpc/sched.c~linux-2.6.8.1-50-rpc_queue_lock net/sunrpc/sched.c --- 25/net/sunrpc/sched.c~linux-2.6.8.1-50-rpc_queue_lock 2004-08-22 20:57:41.322340920 -0700 +++ 25-akpm/net/sunrpc/sched.c 2004-08-22 20:57:41.327340160 -0700 @@ -67,18 +67,13 @@ static unsigned int rpciod_users; static struct workqueue_struct *rpciod_workqueue; /* - * Spinlock for wait queues. Access to the latter also has to be - * interrupt-safe in order to allow timers to wake up sleeping tasks. - */ -static spinlock_t rpc_queue_lock = SPIN_LOCK_UNLOCKED; -/* * Spinlock for other critical sections of code. */ static spinlock_t rpc_sched_lock = SPIN_LOCK_UNLOCKED; /* * Disable the timer for a given RPC task. Should be called with - * rpc_queue_lock and bh_disabled in order to avoid races within + * queue->lock and bh_disabled in order to avoid races within * rpc_run_timer(). */ static inline void @@ -129,7 +124,7 @@ __rpc_add_timer(struct rpc_task *task, r /* * Delete any timer for the current task. Because we use del_timer_sync(), - * this function should never be called while holding rpc_queue_lock. + * this function should never be called while holding queue->lock. */ static inline void rpc_delete_timer(struct rpc_task *task) @@ -238,6 +233,7 @@ static void __rpc_init_priority_wait_que { int i; + spin_lock_init(&queue->lock); for (i = 0; i < ARRAY_SIZE(queue->tasks); i++) INIT_LIST_HEAD(&queue->tasks[i]); queue->maxpriority = maxprio; @@ -328,23 +324,22 @@ static void __rpc_sleep_on(struct rpc_wa __rpc_add_timer(task, timer); } -void -rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, +void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, rpc_action action, rpc_action timer) { /* * Protect the queue operations. */ - spin_lock_bh(&rpc_queue_lock); + spin_lock_bh(&q->lock); __rpc_sleep_on(q, task, action, timer); - spin_unlock_bh(&rpc_queue_lock); + spin_unlock_bh(&q->lock); } /** * __rpc_do_wake_up_task - wake up a single rpc_task * @task: task to be woken up * - * Caller must hold rpc_queue_lock, and have cleared the task queued flag. + * Caller must hold queue->lock, and have cleared the task queued flag. */ static void __rpc_do_wake_up_task(struct rpc_task *task) { @@ -402,9 +397,11 @@ void rpc_wake_up_task(struct rpc_task *t { if (rpc_start_wakeup(task)) { if (RPC_IS_QUEUED(task)) { - spin_lock_bh(&rpc_queue_lock); + struct rpc_wait_queue *queue = task->u.tk_wait.rpc_waitq; + + spin_lock_bh(&queue->lock); __rpc_do_wake_up_task(task); - spin_unlock_bh(&rpc_queue_lock); + spin_unlock_bh(&queue->lock); } rpc_finish_wakeup(task); } @@ -470,14 +467,14 @@ struct rpc_task * rpc_wake_up_next(struc struct rpc_task *task = NULL; dprintk("RPC: wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue)); - spin_lock_bh(&rpc_queue_lock); + spin_lock_bh(&queue->lock); if (RPC_IS_PRIORITY(queue)) task = __rpc_wake_up_next_priority(queue); else { task_for_first(task, &queue->tasks[0]) __rpc_wake_up_task(task); } - spin_unlock_bh(&rpc_queue_lock); + spin_unlock_bh(&queue->lock); return task; } @@ -486,14 +483,14 @@ struct rpc_task * rpc_wake_up_next(struc * rpc_wake_up - wake up all rpc_tasks * @queue: rpc_wait_queue on which the tasks are sleeping * - * Grabs rpc_queue_lock + * Grabs queue->lock */ void rpc_wake_up(struct rpc_wait_queue *queue) { struct rpc_task *task; struct list_head *head; - spin_lock_bh(&rpc_queue_lock); + spin_lock_bh(&queue->lock); head = &queue->tasks[queue->maxpriority]; for (;;) { while (!list_empty(head)) { @@ -504,7 +501,7 @@ void rpc_wake_up(struct rpc_wait_queue * break; head--; } - spin_unlock_bh(&rpc_queue_lock); + spin_unlock_bh(&queue->lock); } /** @@ -512,14 +509,14 @@ void rpc_wake_up(struct rpc_wait_queue * * @queue: rpc_wait_queue on which the tasks are sleeping * @status: status value to set * - * Grabs rpc_queue_lock + * Grabs queue->lock */ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status) { struct list_head *head; struct rpc_task *task; - spin_lock_bh(&rpc_queue_lock); + spin_lock_bh(&queue->lock); head = &queue->tasks[queue->maxpriority]; for (;;) { while (!list_empty(head)) { @@ -531,7 +528,7 @@ void rpc_wake_up_status(struct rpc_wait_ break; head--; } - spin_unlock_bh(&rpc_queue_lock); + spin_unlock_bh(&queue->lock); } /* @@ -832,8 +829,7 @@ cleanup: goto out; } -void -rpc_release_task(struct rpc_task *task) +void rpc_release_task(struct rpc_task *task) { dprintk("RPC: %4d release task\n", task->tk_pid); @@ -883,10 +879,9 @@ rpc_release_task(struct rpc_task *task) * queue 'childq'. If so returns a pointer to the parent. * Upon failure returns NULL. * - * Caller must hold rpc_queue_lock + * Caller must hold childq.lock */ -static inline struct rpc_task * -rpc_find_parent(struct rpc_task *child) +static inline struct rpc_task *rpc_find_parent(struct rpc_task *child) { struct rpc_task *task, *parent; struct list_head *le; @@ -899,17 +894,16 @@ rpc_find_parent(struct rpc_task *child) return NULL; } -static void -rpc_child_exit(struct rpc_task *child) +static void rpc_child_exit(struct rpc_task *child) { struct rpc_task *parent; - spin_lock_bh(&rpc_queue_lock); + spin_lock_bh(&childq.lock); if ((parent = rpc_find_parent(child)) != NULL) { parent->tk_status = child->tk_status; __rpc_wake_up_task(parent); } - spin_unlock_bh(&rpc_queue_lock); + spin_unlock_bh(&childq.lock); } /* @@ -932,22 +926,20 @@ fail: return NULL; } -void -rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func) +void rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func) { - spin_lock_bh(&rpc_queue_lock); + spin_lock_bh(&childq.lock); /* N.B. Is it possible for the child to have already finished? */ __rpc_sleep_on(&childq, task, func, NULL); rpc_schedule_run(child); - spin_unlock_bh(&rpc_queue_lock); + spin_unlock_bh(&childq.lock); } /* * Kill all tasks for the given client. * XXX: kill their descendants as well? */ -void -rpc_killall_tasks(struct rpc_clnt *clnt) +void rpc_killall_tasks(struct rpc_clnt *clnt) { struct rpc_task *rovr; struct list_head *le; @@ -969,8 +961,7 @@ rpc_killall_tasks(struct rpc_clnt *clnt) static DECLARE_MUTEX_LOCKED(rpciod_running); -static void -rpciod_killall(void) +static void rpciod_killall(void) { unsigned long flags; _