From: Trond Myklebust Signed-off-by: Trond Myklebust Signed-off-by: Andrew Morton --- 25-akpm/fs/nfs/unlink.c | 3 25-akpm/include/linux/sunrpc/sched.h | 61 +++- 25-akpm/net/sunrpc/sched.c | 467 ++++++++++------------------------- 25-akpm/net/sunrpc/xprt.c | 2 4 files changed, 183 insertions(+), 350 deletions(-) diff -puN fs/nfs/unlink.c~linux-2.6.8.1-49-rpc_workqueue fs/nfs/unlink.c --- 25/fs/nfs/unlink.c~linux-2.6.8.1-49-rpc_workqueue 2004-08-23 21:59:32.133415400 -0700 +++ 25-akpm/fs/nfs/unlink.c 2004-08-23 21:59:32.141414184 -0700 @@ -215,7 +215,6 @@ nfs_complete_unlink(struct dentry *dentr spin_lock(&dentry->d_lock); dentry->d_flags &= ~DCACHE_NFSFS_RENAMED; spin_unlock(&dentry->d_lock); - if (data->task.tk_rpcwait == &nfs_delete_queue) - rpc_wake_up_task(&data->task); + rpc_wake_up_task(&data->task); nfs_put_unlinkdata(data); } diff -puN include/linux/sunrpc/sched.h~linux-2.6.8.1-49-rpc_workqueue include/linux/sunrpc/sched.h --- 25/include/linux/sunrpc/sched.h~linux-2.6.8.1-49-rpc_workqueue 2004-08-23 21:59:32.135415096 -0700 +++ 25-akpm/include/linux/sunrpc/sched.h 2004-08-23 21:59:32.142414032 -0700 @@ -12,6 +12,7 @@ #include #include #include +#include #include /* @@ -25,11 +26,18 @@ struct rpc_message { struct rpc_cred * rpc_cred; /* Credentials */ }; +struct rpc_wait_queue; +struct rpc_wait { + struct list_head list; /* wait queue links */ + struct list_head links; /* Links to related tasks */ + wait_queue_head_t waitq; /* sync: sleep on this q */ + struct rpc_wait_queue * rpc_waitq; /* RPC wait queue we're on */ +}; + /* * This is the RPC task struct */ struct rpc_task { - struct list_head tk_list; /* wait queue links */ #ifdef RPC_DEBUG unsigned long tk_magic; /* 0xf00baa */ #endif @@ -37,7 +45,6 @@ struct rpc_task { struct rpc_clnt * tk_client; /* RPC client */ struct rpc_rqst * tk_rqstp; /* RPC request */ int tk_status; /* result of last operation */ - struct rpc_wait_queue * tk_rpcwait; /* RPC wait queue we're on */ /* * RPC call state @@ -70,13 +77,18 @@ struct rpc_task { * you have a pathological interest in kernel oopses. */ struct timer_list tk_timer; /* kernel timer */ - wait_queue_head_t tk_wait; /* sync: sleep on this q */ unsigned long tk_timeout; /* timeout for rpc_sleep() */ unsigned short tk_flags; /* misc flags */ unsigned char tk_active : 1;/* Task has been activated */ unsigned char tk_priority : 2;/* Task priority */ unsigned long tk_runstate; /* Task run status */ - struct list_head tk_links; /* links to related tasks */ + struct workqueue_struct *tk_workqueue; /* Normally rpciod, but could + * be any workqueue + */ + union { + struct work_struct tk_work; /* Async task work queue */ + struct rpc_wait tk_wait; /* RPC wait */ + } u; #ifdef RPC_DEBUG unsigned short tk_pid; /* debugging aid */ #endif @@ -87,11 +99,11 @@ struct rpc_task { /* support walking a list of tasks on a wait queue */ #define task_for_each(task, pos, head) \ list_for_each(pos, head) \ - if ((task=list_entry(pos, struct rpc_task, tk_list)),1) + if ((task=list_entry(pos, struct rpc_task, u.tk_wait.list)),1) #define task_for_first(task, head) \ if (!list_empty(head) && \ - ((task=list_entry((head)->next, struct rpc_task, tk_list)),1)) + ((task=list_entry((head)->next, struct rpc_task, u.tk_wait.list)),1)) /* .. and walking list of all tasks */ #define alltask_for_each(task, pos, head) \ @@ -126,22 +138,38 @@ typedef void (*rpc_action)(struct rpc_ #define RPC_IS_SOFT(t) ((t)->tk_flags & RPC_TASK_SOFT) #define RPC_TASK_UNINTERRUPTIBLE(t) ((t)->tk_flags & RPC_TASK_NOINTR) -#define RPC_TASK_SLEEPING 0 -#define RPC_TASK_RUNNING 1 -#define RPC_IS_SLEEPING(t) (test_bit(RPC_TASK_SLEEPING, &(t)->tk_runstate)) -#define RPC_IS_RUNNING(t) (test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)) +#define RPC_TASK_RUNNING 0 +#define RPC_TASK_QUEUED 1 +#define RPC_TASK_WAKEUP 2 +#define RPC_IS_RUNNING(t) (test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)) #define rpc_set_running(t) (set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)) -#define rpc_clear_running(t) (clear_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)) +#define rpc_test_and_set_running(t) \ + (test_and_set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)) +#define rpc_clear_running(t) \ + do { \ + smp_mb__before_clear_bit(); \ + clear_bit(RPC_TASK_RUNNING, &(t)->tk_runstate); \ + smp_mb__after_clear_bit(); \ + } while (0) -#define rpc_set_sleeping(t) (set_bit(RPC_TASK_SLEEPING, &(t)->tk_runstate)) +#define RPC_IS_QUEUED(t) (test_bit(RPC_TASK_QUEUED, &(t)->tk_runstate)) +#define rpc_set_queued(t) (set_bit(RPC_TASK_QUEUED, &(t)->tk_runstate)) +#define rpc_clear_queued(t) \ + do { \ + smp_mb__before_clear_bit(); \ + clear_bit(RPC_TASK_QUEUED, &(t)->tk_runstate); \ + smp_mb__after_clear_bit(); \ + } while (0) -#define rpc_clear_sleeping(t) \ +#define rpc_start_wakeup(t) \ + (test_and_set_bit(RPC_TASK_WAKEUP, &(t)->tk_runstate) == 0) +#define rpc_finish_wakeup(t) \ do { \ smp_mb__before_clear_bit(); \ - clear_bit(RPC_TASK_SLEEPING, &(t)->tk_runstate); \ + clear_bit(RPC_TASK_WAKEUP, &(t)->tk_runstate); \ smp_mb__after_clear_bit(); \ - } while(0) + } while (0) /* * Task priorities. @@ -209,13 +237,10 @@ void rpc_killall_tasks(struct rpc_clnt int rpc_execute(struct rpc_task *); void rpc_run_child(struct rpc_task *parent, struct rpc_task *child, rpc_action action); -int rpc_add_wait_queue(struct rpc_wait_queue *, struct rpc_task *); -void rpc_remove_wait_queue(struct rpc_task *); void rpc_init_priority_wait_queue(struct rpc_wait_queue *, const char *); void rpc_init_wait_queue(struct rpc_wait_queue *, const char *); void rpc_sleep_on(struct rpc_wait_queue *, struct rpc_task *, rpc_action action, rpc_action timer); -void rpc_add_timer(struct rpc_task *, rpc_action); void rpc_wake_up_task(struct rpc_task *); void rpc_wake_up(struct rpc_wait_queue *); struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *); diff -puN net/sunrpc/sched.c~linux-2.6.8.1-49-rpc_workqueue net/sunrpc/sched.c --- 25/net/sunrpc/sched.c~linux-2.6.8.1-49-rpc_workqueue 2004-08-23 21:59:32.137414792 -0700 +++ 25-akpm/net/sunrpc/sched.c 2004-08-23 21:59:32.147413272 -0700 @@ -41,13 +41,7 @@ static mempool_t *rpc_buffer_mempool; static void __rpc_default_timer(struct rpc_task *task); static void rpciod_killall(void); - -/* - * When an asynchronous RPC task is activated within a bottom half - * handler, or while executing another RPC task, it is put on - * schedq, and rpciod is woken up. - */ -static RPC_WAITQ(schedq, "schedq"); +static void rpc_async_schedule(void *); /* * RPC tasks that create another task (e.g. for contacting the portmapper) @@ -68,12 +62,9 @@ static LIST_HEAD(all_tasks); /* * rpciod-related stuff */ -static DECLARE_WAIT_QUEUE_HEAD(rpciod_idle); -static DECLARE_COMPLETION(rpciod_killer); static DECLARE_MUTEX(rpciod_sema); static unsigned int rpciod_users; -static pid_t rpciod_pid; -static int rpc_inhibit; +static struct workqueue_struct *rpciod_workqueue; /* * Spinlock for wait queues. Access to the latter also has to be @@ -105,16 +96,13 @@ __rpc_disable_timer(struct rpc_task *tas * without calling del_timer_sync(). The latter could cause a * deadlock if called while we're holding spinlocks... */ -static void -rpc_run_timer(struct rpc_task *task) +static void rpc_run_timer(struct rpc_task *task) { void (*callback)(struct rpc_task *); - spin_lock_bh(&rpc_queue_lock); callback = task->tk_timeout_fn; task->tk_timeout_fn = NULL; - spin_unlock_bh(&rpc_queue_lock); - if (callback) { + if (callback && RPC_IS_QUEUED(task)) { dprintk("RPC: %4d running timer\n", task->tk_pid); callback(task); } @@ -140,17 +128,6 @@ __rpc_add_timer(struct rpc_task *task, r } /* - * Set up a timer for an already sleeping task. - */ -void rpc_add_timer(struct rpc_task *task, rpc_action timer) -{ - spin_lock_bh(&rpc_queue_lock); - if (!RPC_IS_RUNNING(task)) - __rpc_add_timer(task, timer); - spin_unlock_bh(&rpc_queue_lock); -} - -/* * Delete any timer for the current task. Because we use del_timer_sync(), * this function should never be called while holding rpc_queue_lock. */ @@ -169,16 +146,17 @@ static void __rpc_add_wait_queue_priorit struct list_head *q; struct rpc_task *t; + INIT_LIST_HEAD(&task->u.tk_wait.links); q = &queue->tasks[task->tk_priority]; if (unlikely(task->tk_priority > queue->maxpriority)) q = &queue->tasks[queue->maxpriority]; - list_for_each_entry(t, q, tk_list) { + list_for_each_entry(t, q, u.tk_wait.list) { if (t->tk_cookie == task->tk_cookie) { - list_add_tail(&task->tk_list, &t->tk_links); + list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links); return; } } - list_add_tail(&task->tk_list, q); + list_add_tail(&task->u.tk_wait.list, q); } /* @@ -189,37 +167,21 @@ static void __rpc_add_wait_queue_priorit * improve overall performance. * Everyone else gets appended to the queue to ensure proper FIFO behavior. */ -static int __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task) +static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task) { - if (task->tk_rpcwait == queue) - return 0; + BUG_ON (RPC_IS_QUEUED(task)); - if (task->tk_rpcwait) { - printk(KERN_WARNING "RPC: doubly enqueued task!\n"); - return -EWOULDBLOCK; - } if (RPC_IS_PRIORITY(queue)) __rpc_add_wait_queue_priority(queue, task); else if (RPC_IS_SWAPPER(task)) - list_add(&task->tk_list, &queue->tasks[0]); + list_add(&task->u.tk_wait.list, &queue->tasks[0]); else - list_add_tail(&task->tk_list, &queue->tasks[0]); - task->tk_rpcwait = queue; + list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]); + task->u.tk_wait.rpc_waitq = queue; + rpc_set_queued(task); dprintk("RPC: %4d added to queue %p \"%s\"\n", task->tk_pid, queue, rpc_qname(queue)); - - return 0; -} - -int rpc_add_wait_queue(struct rpc_wait_queue *q, struct rpc_task *task) -{ - int result; - - spin_lock_bh(&rpc_queue_lock); - result = __rpc_add_wait_queue(q, task); - spin_unlock_bh(&rpc_queue_lock); - return result; } /* @@ -229,12 +191,12 @@ static void __rpc_remove_wait_queue_prio { struct rpc_task *t; - if (!list_empty(&task->tk_links)) { - t = list_entry(task->tk_links.next, struct rpc_task, tk_list); - list_move(&t->tk_list, &task->tk_list); - list_splice_init(&task->tk_links, &t->tk_links); + if (!list_empty(&task->u.tk_wait.links)) { + t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list); + list_move(&t->u.tk_wait.list, &task->u.tk_wait.list); + list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links); } - list_del(&task->tk_list); + list_del(&task->u.tk_wait.list); } /* @@ -243,31 +205,17 @@ static void __rpc_remove_wait_queue_prio */ static void __rpc_remove_wait_queue(struct rpc_task *task) { - struct rpc_wait_queue *queue = task->tk_rpcwait; - - if (!queue) - return; + struct rpc_wait_queue *queue; + queue = task->u.tk_wait.rpc_waitq; if (RPC_IS_PRIORITY(queue)) __rpc_remove_wait_queue_priority(task); else - list_del(&task->tk_list); - task->tk_rpcwait = NULL; - + list_del(&task->u.tk_wait.list); dprintk("RPC: %4d removed from queue %p \"%s\"\n", task->tk_pid, queue, rpc_qname(queue)); } -void -rpc_remove_wait_queue(struct rpc_task *task) -{ - if (!task->tk_rpcwait) - return; - spin_lock_bh(&rpc_queue_lock); - __rpc_remove_wait_queue(task); - spin_unlock_bh(&rpc_queue_lock); -} - static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority) { queue->priority = priority; @@ -316,34 +264,31 @@ EXPORT_SYMBOL(rpc_init_wait_queue); * Note: If the task is ASYNC, this must be called with * the spinlock held to protect the wait queue operation. */ -static inline void -rpc_make_runnable(struct rpc_task *task) +static void rpc_make_runnable(struct rpc_task *task) { - if (task->tk_timeout_fn) { - printk(KERN_ERR "RPC: task w/ running timer in rpc_make_runnable!!\n"); + int do_ret; + + BUG_ON(task->tk_timeout_fn); + do_ret = rpc_test_and_set_running(task); + rpc_clear_queued(task); + if (do_ret) return; - } - rpc_set_running(task); if (RPC_IS_ASYNC(task)) { - if (RPC_IS_SLEEPING(task)) { - int status; - status = __rpc_add_wait_queue(&schedq, task); - if (status < 0) { - printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); - task->tk_status = status; - return; - } - rpc_clear_sleeping(task); - wake_up(&rpciod_idle); + int status; + + INIT_WORK(&task->u.tk_work, rpc_async_schedule, (void *)task); + status = queue_work(task->tk_workqueue, &task->u.tk_work); + if (status < 0) { + printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); + task->tk_status = status; + return; } - } else { - rpc_clear_sleeping(task); - wake_up(&task->tk_wait); - } + } else + wake_up(&task->u.tk_wait.waitq); } /* - * Place a newly initialized task on the schedq. + * Place a newly initialized task on the workqueue. */ static inline void rpc_schedule_run(struct rpc_task *task) @@ -352,33 +297,18 @@ rpc_schedule_run(struct rpc_task *task) if (RPC_IS_ACTIVATED(task)) return; task->tk_active = 1; - rpc_set_sleeping(task); rpc_make_runnable(task); } /* - * For other people who may need to wake the I/O daemon - * but should (for now) know nothing about its innards - */ -void rpciod_wake_up(void) -{ - if(rpciod_pid==0) - printk(KERN_ERR "rpciod: wot no daemon?\n"); - wake_up(&rpciod_idle); -} - -/* * Prepare for sleeping on a wait queue. * By always appending tasks to the list we ensure FIFO behavior. * NB: An RPC task will only receive interrupt-driven events as long * as it's on a wait queue. */ -static void -__rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, +static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, rpc_action action, rpc_action timer) { - int status; - dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid, rpc_qname(q), jiffies); @@ -388,24 +318,14 @@ __rpc_sleep_on(struct rpc_wait_queue *q, } /* Mark the task as being activated if so needed */ - if (!RPC_IS_ACTIVATED(task)) { + if (!RPC_IS_ACTIVATED(task)) task->tk_active = 1; - rpc_set_sleeping(task); - } - status = __rpc_add_wait_queue(q, task); - if (status) { - printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); - task->tk_status = status; - } else { - rpc_clear_running(task); - if (task->tk_callback) { - dprintk(KERN_ERR "RPC: %4d overwrites an active callback\n", task->tk_pid); - BUG(); - } - task->tk_callback = action; - __rpc_add_timer(task, timer); - } + __rpc_add_wait_queue(q, task); + + BUG_ON(task->tk_callback != NULL); + task->tk_callback = action; + __rpc_add_timer(task, timer); } void @@ -421,16 +341,14 @@ rpc_sleep_on(struct rpc_wait_queue *q, s } /** - * __rpc_wake_up_task - wake up a single rpc_task + * __rpc_do_wake_up_task - wake up a single rpc_task * @task: task to be woken up * - * Caller must hold rpc_queue_lock + * Caller must hold rpc_queue_lock, and have cleared the task queued flag. */ -static void -__rpc_wake_up_task(struct rpc_task *task) +static void __rpc_do_wake_up_task(struct rpc_task *task) { - dprintk("RPC: %4d __rpc_wake_up_task (now %ld inh %d)\n", - task->tk_pid, jiffies, rpc_inhibit); + dprintk("RPC: %4d __rpc_wake_up_task (now %ld)\n", task->tk_pid, jiffies); #ifdef RPC_DEBUG if (task->tk_magic != 0xf00baa) { @@ -445,12 +363,9 @@ __rpc_wake_up_task(struct rpc_task *task printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task); return; } - if (RPC_IS_RUNNING(task)) - return; __rpc_disable_timer(task); - if (task->tk_rpcwait != &schedq) - __rpc_remove_wait_queue(task); + __rpc_remove_wait_queue(task); rpc_make_runnable(task); @@ -458,6 +373,18 @@ __rpc_wake_up_task(struct rpc_task *task } /* + * Wake up the specified task + */ +static void __rpc_wake_up_task(struct rpc_task *task) +{ + if (rpc_start_wakeup(task)) { + if (RPC_IS_QUEUED(task)) + __rpc_do_wake_up_task(task); + rpc_finish_wakeup(task); + } +} + +/* * Default timeout handler if none specified by user */ static void @@ -471,14 +398,16 @@ __rpc_default_timer(struct rpc_task *tas /* * Wake up the specified task */ -void -rpc_wake_up_task(struct rpc_task *task) +void rpc_wake_up_task(struct rpc_task *task) { - if (RPC_IS_RUNNING(task)) - return; - spin_lock_bh(&rpc_queue_lock); - __rpc_wake_up_task(task); - spin_unlock_bh(&rpc_queue_lock); + if (rpc_start_wakeup(task)) { + if (RPC_IS_QUEUED(task)) { + spin_lock_bh(&rpc_queue_lock); + __rpc_do_wake_up_task(task); + spin_unlock_bh(&rpc_queue_lock); + } + rpc_finish_wakeup(task); + } } /* @@ -494,11 +423,11 @@ static struct rpc_task * __rpc_wake_up_n */ q = &queue->tasks[queue->priority]; if (!list_empty(q)) { - task = list_entry(q->next, struct rpc_task, tk_list); + task = list_entry(q->next, struct rpc_task, u.tk_wait.list); if (queue->cookie == task->tk_cookie) { if (--queue->nr) goto out; - list_move_tail(&task->tk_list, q); + list_move_tail(&task->u.tk_wait.list, q); } /* * Check if we need to switch queues. @@ -516,7 +445,7 @@ static struct rpc_task * __rpc_wake_up_n else q = q - 1; if (!list_empty(q)) { - task = list_entry(q->next, struct rpc_task, tk_list); + task = list_entry(q->next, struct rpc_task, u.tk_wait.list); goto new_queue; } } while (q != &queue->tasks[queue->priority]); @@ -568,7 +497,7 @@ void rpc_wake_up(struct rpc_wait_queue * head = &queue->tasks[queue->maxpriority]; for (;;) { while (!list_empty(head)) { - task = list_entry(head->next, struct rpc_task, tk_list); + task = list_entry(head->next, struct rpc_task, u.tk_wait.list); __rpc_wake_up_task(task); } if (head == &queue->tasks[0]) @@ -594,7 +523,7 @@ void rpc_wake_up_status(struct rpc_wait_ head = &queue->tasks[queue->maxpriority]; for (;;) { while (!list_empty(head)) { - task = list_entry(head->next, struct rpc_task, tk_list); + task = list_entry(head->next, struct rpc_task, u.tk_wait.list); task->tk_status = status; __rpc_wake_up_task(task); } @@ -626,22 +555,23 @@ __rpc_atrun(struct rpc_task *task) /* * This is the RPC `scheduler' (or rather, the finite state machine). */ -static int -__rpc_execute(struct rpc_task *task) +static int __rpc_execute(struct rpc_task *task) { int status = 0; dprintk("RPC: %4d rpc_execute flgs %x\n", task->tk_pid, task->tk_flags); - if (!RPC_IS_RUNNING(task)) { - printk(KERN_WARNING "RPC: rpc_execute called for sleeping task!!\n"); - return 0; - } + BUG_ON(RPC_IS_QUEUED(task)); restarted: while (1) { /* + * Garbage collection of pending timers... + */ + rpc_delete_timer(task); + + /* * Execute any pending callback. */ if (RPC_DO_CALLBACK(task)) { @@ -657,7 +587,9 @@ __rpc_execute(struct rpc_task *task) */ save_callback=task->tk_callback; task->tk_callback=NULL; + lock_kernel(); save_callback(task); + unlock_kernel(); } /* @@ -665,43 +597,35 @@ __rpc_execute(struct rpc_task *task) * tk_action may be NULL when the task has been killed * by someone else. */ - if (RPC_IS_RUNNING(task)) { - /* - * Garbage collection of pending timers... - */ - rpc_delete_timer(task); + if (!RPC_IS_QUEUED(task)) { if (!task->tk_action) break; + lock_kernel(); task->tk_action(task); - /* micro-optimization to avoid spinlock */ - if (RPC_IS_RUNNING(task)) - continue; + unlock_kernel(); } /* - * Check whether task is sleeping. + * Lockless check for whether task is sleeping or not. */ - spin_lock_bh(&rpc_queue_lock); - if (!RPC_IS_RUNNING(task)) { - rpc_set_sleeping(task); - if (RPC_IS_ASYNC(task)) { - spin_unlock_bh(&rpc_queue_lock); + if (!RPC_IS_QUEUED(task)) + continue; + rpc_clear_running(task); + if (RPC_IS_ASYNC(task)) { + /* Careful! we may have raced... */ + if (RPC_IS_QUEUED(task)) return 0; - } + if (rpc_test_and_set_running(task)) + return 0; + continue; } - spin_unlock_bh(&rpc_queue_lock); - if (!RPC_IS_SLEEPING(task)) - continue; /* sync task: sleep here */ dprintk("RPC: %4d sync task going to sleep\n", task->tk_pid); - if (current->pid == rpciod_pid) - printk(KERN_ERR "RPC: rpciod waiting on sync task!\n"); - if (RPC_TASK_UNINTERRUPTIBLE(task)) { - __wait_event(task->tk_wait, !RPC_IS_SLEEPING(task)); + __wait_event(task->u.tk_wait.waitq, !RPC_IS_QUEUED(task)); } else { - __wait_event_interruptible(task->tk_wait, !RPC_IS_SLEEPING(task), status); + __wait_event_interruptible(task->u.tk_wait.waitq, !RPC_IS_QUEUED(task), status); /* * When a sync task receives a signal, it exits with * -ERESTARTSYS. In order to catch any callbacks that @@ -715,11 +639,14 @@ __rpc_execute(struct rpc_task *task) rpc_wake_up_task(task); } } + rpc_set_running(task); dprintk("RPC: %4d sync task resuming\n", task->tk_pid); } if (task->tk_exit) { + lock_kernel(); task->tk_exit(task); + unlock_kernel(); /* If tk_action is non-null, the user wants us to restart */ if (task->tk_action) { if (!RPC_ASSASSINATED(task)) { @@ -738,7 +665,6 @@ __rpc_execute(struct rpc_task *task) /* Release all resources associated with the task */ rpc_release_task(task); - return status; } @@ -754,57 +680,16 @@ __rpc_execute(struct rpc_task *task) int rpc_execute(struct rpc_task *task) { - int status = -EIO; - if (rpc_inhibit) { - printk(KERN_INFO "RPC: execution inhibited!\n"); - goto out_release; - } - - status = -EWOULDBLOCK; - if (task->tk_active) { - printk(KERN_ERR "RPC: active task was run twice!\n"); - goto out_err; - } + BUG_ON(task->tk_active); task->tk_active = 1; rpc_set_running(task); return __rpc_execute(task); - out_release: - rpc_release_task(task); - out_err: - return status; } -/* - * This is our own little scheduler for async RPC tasks. - */ -static void -__rpc_schedule(void) +static void rpc_async_schedule(void *arg) { - struct rpc_task *task; - int count = 0; - - dprintk("RPC: rpc_schedule enter\n"); - while (1) { - - task_for_first(task, &schedq.tasks[0]) { - __rpc_remove_wait_queue(task); - spin_unlock_bh(&rpc_queue_lock); - - __rpc_execute(task); - spin_lock_bh(&rpc_queue_lock); - } else { - break; - } - - if (++count >= 200 || need_resched()) { - count = 0; - spin_unlock_bh(&rpc_queue_lock); - schedule(); - spin_lock_bh(&rpc_queue_lock); - } - } - dprintk("RPC: rpc_schedule leave\n"); + __rpc_execute((struct rpc_task *)arg); } /* @@ -862,7 +747,6 @@ void rpc_init_task(struct rpc_task *task task->tk_client = clnt; task->tk_flags = flags; task->tk_exit = callback; - init_waitqueue_head(&task->tk_wait); if (current->uid != current->fsuid || current->gid != current->fsgid) task->tk_flags |= RPC_TASK_SETUID; @@ -873,7 +757,11 @@ void rpc_init_task(struct rpc_task *task task->tk_priority = RPC_PRIORITY_NORMAL; task->tk_cookie = (unsigned long)current; - INIT_LIST_HEAD(&task->tk_links); + + /* Initialize workqueue for async tasks */ + task->tk_workqueue = rpciod_workqueue; + if (!RPC_IS_ASYNC(task)) + init_waitqueue_head(&task->u.tk_wait.waitq); /* Add to global list of all tasks */ spin_lock(&rpc_sched_lock); @@ -963,19 +851,9 @@ rpc_release_task(struct rpc_task *task) list_del(&task->tk_task); spin_unlock(&rpc_sched_lock); - /* Protect the execution below. */ - spin_lock_bh(&rpc_queue_lock); - - /* Disable timer to prevent zombie wakeup */ - __rpc_disable_timer(task); - - /* Remove from any wait queue we're still on */ - __rpc_remove_wait_queue(task); - + BUG_ON (RPC_IS_QUEUED(task)); task->tk_active = 0; - spin_unlock_bh(&rpc_queue_lock); - /* Synchronously delete any running timer */ rpc_delete_timer(task); @@ -1091,82 +969,6 @@ rpc_killall_tasks(struct rpc_clnt *clnt) static DECLARE_MUTEX_LOCKED(rpciod_running); -static inline int -rpciod_task_pending(void) -{ - return !list_empty(&schedq.tasks[0]); -} - - -/* - * This is the rpciod kernel thread - */ -static int -rpciod(void *ptr) -{ - int rounds = 0; - - lock_kernel(); - /* - * Let our maker know we're running ... - */ - rpciod_pid = current->pid; - up(&rpciod_running); - - daemonize("rpciod"); - allow_signal(SIGKILL); - - dprintk("RPC: rpciod starting (pid %d)\n", rpciod_pid); - spin_lock_bh(&rpc_queue_lock); - while (rpciod_users) { - DEFINE_WAIT(wait); - if (signalled()) { - spin_unlock_bh(&rpc_queue_lock); - rpciod_killall(); - flush_signals(current); - spin_lock_bh(&rpc_queue_lock); - } - __rpc_schedule(); - if (current->flags & PF_FREEZE) { - spin_unlock_bh(&rpc_queue_lock); - refrigerator(PF_FREEZE); - spin_lock_bh(&rpc_queue_lock); - } - - if (++rounds >= 64) { /* safeguard */ - spin_unlock_bh(&rpc_queue_lock); - schedule(); - rounds = 0; - spin_lock_bh(&rpc_queue_lock); - } - - dprintk("RPC: rpciod back to sleep\n"); - prepare_to_wait(&rpciod_idle, &wait, TASK_INTERRUPTIBLE); - if (!rpciod_task_pending() && !signalled()) { - spin_unlock_bh(&rpc_queue_lock); - schedule(); - rounds = 0; - spin_lock_bh(&rpc_queue_lock); - } - finish_wait(&rpciod_idle, &wait); - dprintk("RPC: switch to rpciod\n"); - } - spin_unlock_bh(&rpc_queue_lock); - - dprintk("RPC: rpciod shutdown commences\n"); - if (!list_empty(&all_tasks)) { - printk(KERN_ERR "rpciod: active tasks at shutdown?!\n"); - rpciod_killall(); - } - - dprintk("RPC: rpciod exiting\n"); - unlock_kernel(); - - rpciod_pid = 0; - complete_and_exit(&rpciod_killer, 0); - return 0; -} - static void rpciod_killall(void) { @@ -1175,9 +977,7 @@ rpciod_killall(void) while (!list_empty(&all_tasks)) { clear_thread_flag(TIF_SIGPENDING); rpc_killall_tasks(NULL); - spin_lock_bh(&rpc_queue_lock); - __rpc_schedule(); - spin_unlock_bh(&rpc_queue_lock); + flush_workqueue(rpciod_workqueue); if (!list_empty(&all_tasks)) { dprintk("rpciod_killall: waiting for tasks to exit\n"); yield(); @@ -1195,28 +995,30 @@ rpciod_killall(void) int rpciod_up(void) { + struct workqueue_struct *wq; int error = 0; down(&rpciod_sema); - dprintk("rpciod_up: pid %d, users %d\n", rpciod_pid, rpciod_users); + dprintk("rpciod_up: users %d\n", rpciod_users); rpciod_users++; - if (rpciod_pid) + if (rpciod_workqueue) goto out; /* * If there's no pid, we should be the first user. */ if (rpciod_users > 1) - printk(KERN_WARNING "rpciod_up: no pid, %d users??\n", rpciod_users); + printk(KERN_WARNING "rpciod_up: no workqueue, %d users??\n", rpciod_users); /* * Create the rpciod thread and wait for it to start. */ - error = kernel_thread(rpciod, NULL, 0); - if (error < 0) { - printk(KERN_WARNING "rpciod_up: create thread failed, error=%d\n", error); + error = -ENOMEM; + wq = create_workqueue("rpciod"); + if (wq == NULL) { + printk(KERN_WARNING "rpciod_up: create workqueue failed, error=%d\n", error); rpciod_users--; goto out; } - down(&rpciod_running); + rpciod_workqueue = wq; error = 0; out: up(&rpciod_sema); @@ -1227,20 +1029,21 @@ void rpciod_down(void) { down(&rpciod_sema); - dprintk("rpciod_down pid %d sema %d\n", rpciod_pid, rpciod_users); + dprintk("rpciod_down sema %d\n", rpciod_users); if (rpciod_users) { if (--rpciod_users) goto out; } else - printk(KERN_WARNING "rpciod_down: pid=%d, no users??\n", rpciod_pid); + printk(KERN_WARNING "rpciod_down: no users??\n"); - if (!rpciod_pid) { + if (!rpciod_workqueue) { dprintk("rpciod_down: Nothing to do!\n"); goto out; } + rpciod_killall(); - kill_proc(rpciod_pid, SIGKILL, 1); - wait_for_completion(&rpciod_killer); + destroy_workqueue(rpciod_workqueue); + rpciod_workqueue = NULL; out: up(&rpciod_sema); } @@ -1258,7 +1061,12 @@ void rpc_show_tasks(void) } printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout " "-rpcwait -action- --exit--\n"); - alltask_for_each(t, le, &all_tasks) + alltask_for_each(t, le, &all_tasks) { + const char *rpc_waitq = "none"; + + if (RPC_IS_QUEUED(t)) + rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq); + printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n", t->tk_pid, (t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1), @@ -1266,8 +1074,9 @@ void rpc_show_tasks(void) t->tk_client, (t->tk_client ? t->tk_client->cl_prog : 0), t->tk_rqstp, t->tk_timeout, - rpc_qname(t->tk_rpcwait), + rpc_waitq, t->tk_action, t->tk_exit); + } spin_unlock(&rpc_sched_lock); } #endif diff -puN net/sunrpc/xprt.c~linux-2.6.8.1-49-rpc_workqueue net/sunrpc/xprt.c --- 25/net/sunrpc/xprt.c~linux-2.6.8.1-49-rpc_workqueue 2004-08-23 21:59:32.138414640 -0700 +++ 25-akpm/net/sunrpc/xprt.c 2004-08-23 21:59:32.154412208 -0700 @@ -1101,7 +1101,7 @@ xprt_write_space(struct sock *sk) goto out; spin_lock_bh(&xprt->sock_lock); - if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending) + if (xprt->snd_task) rpc_wake_up_task(xprt->snd_task); spin_unlock_bh(&xprt->sock_lock); out: _