The workqueue code currently has a notion of a per-cpu queue being "busy". flush_scheduled_work()'s responsibility is to wait for a queue to be not busy. Problem is, flush_scheduled_work() can easily hang up. - The workqueue is deemed "busy" when there are pending delayed (timer-based) works. But if someone repeatedly schedules new delayed work in the callback, the queue will never fall idle, and flush_scheduled_work() will not terminate. - If someone reschedules work (not delayed work) in the work function, that too will cause the queue to never go idle, and flush_scheduled_work() will not terminate. So what this patch does is: - Create a new "cancel_delayed_work()" which will try to kill off any timer-based delayed works. - Change flush_scheduled_work() so that it is immune to people re-adding work in the work callout handler. We can do this by recognising that the caller does *not* want to wait until the workqueue is "empty". The caller merely wants to wait until all works which were pending at the time flush_scheduled_work() was called have completed. The patch uses a couple of sequence numbers for that. So now, if someone wants to reliably remove delayed work they should do: /* * Make sure that my work-callback will no longer schedule new work */ my_driver_is_shutting_down = 1; /* * Kill off any pending delayed work */ cancel_delayed_work(&my_work); /* * OK, there will be no new works scheduled. But there may be one * currently queued or in progress. So wait for that to complete. */ flush_scheduled_work(); The patch also changes the flush_workqueue() sleep to be uninterruptible. We cannot legally bale out if a signal is delivered anyway. include/linux/workqueue.h | 10 +++++ kernel/workqueue.c | 83 +++++++++++++++++++++++----------------------- 2 files changed, 53 insertions(+), 40 deletions(-) diff -puN kernel/workqueue.c~flush_workqueue-hang-fix kernel/workqueue.c --- 25/kernel/workqueue.c~flush_workqueue-hang-fix 2003-04-12 16:20:33.000000000 -0700 +++ 25-akpm/kernel/workqueue.c 2003-04-12 16:20:33.000000000 -0700 @@ -27,13 +27,21 @@ #include /* - * The per-CPU workqueue: + * The per-CPU workqueue. + * + * The sequence counters are for flush_scheduled_work(). It wants to wait + * until until all currently-scheduled works are completed, but it doesn't + * want to be livelocked by new, incoming ones. So it waits until + * remove_sequence is >= the insert_sequence which pertained when + * flush_scheduled_work() was called. */ struct cpu_workqueue_struct { spinlock_t lock; - atomic_t nr_queued; + long remove_sequence; /* Least-recently added (next to run) */ + long insert_sequence; /* Next to add */ + struct list_head worklist; wait_queue_head_t more_work; wait_queue_head_t work_done; @@ -71,10 +79,9 @@ int queue_work(struct workqueue_struct * spin_lock_irqsave(&cwq->lock, flags); list_add_tail(&work->entry, &cwq->worklist); - atomic_inc(&cwq->nr_queued); - spin_unlock_irqrestore(&cwq->lock, flags); - + cwq->insert_sequence++; wake_up(&cwq->more_work); + spin_unlock_irqrestore(&cwq->lock, flags); ret = 1; } put_cpu(); @@ -93,11 +100,13 @@ static void delayed_work_timer_fn(unsign */ spin_lock_irqsave(&cwq->lock, flags); list_add_tail(&work->entry, &cwq->worklist); + cwq->insert_sequence++; wake_up(&cwq->more_work); spin_unlock_irqrestore(&cwq->lock, flags); } -int queue_delayed_work(struct workqueue_struct *wq, struct work_struct *work, unsigned long delay) +int queue_delayed_work(struct workqueue_struct *wq, + struct work_struct *work, unsigned long delay) { int ret = 0, cpu = get_cpu(); struct timer_list *timer = &work->timer; @@ -107,18 +116,11 @@ int queue_delayed_work(struct workqueue_ BUG_ON(timer_pending(timer)); BUG_ON(!list_empty(&work->entry)); - /* - * Increase nr_queued so that the flush function - * knows that there's something pending. - */ - atomic_inc(&cwq->nr_queued); work->wq_data = cwq; - timer->expires = jiffies + delay; timer->data = (unsigned long)work; timer->function = delayed_work_timer_fn; add_timer(timer); - ret = 1; } put_cpu(); @@ -135,7 +137,8 @@ static inline void run_workqueue(struct */ spin_lock_irqsave(&cwq->lock, flags); while (!list_empty(&cwq->worklist)) { - struct work_struct *work = list_entry(cwq->worklist.next, struct work_struct, entry); + struct work_struct *work = list_entry(cwq->worklist.next, + struct work_struct, entry); void (*f) (void *) = work->func; void *data = work->data; @@ -146,14 +149,9 @@ static inline void run_workqueue(struct clear_bit(0, &work->pending); f(data); - /* - * We only wake up 'work done' waiters (flush) when - * the last function has been fully processed. - */ - if (atomic_dec_and_test(&cwq->nr_queued)) - wake_up(&cwq->work_done); - spin_lock_irqsave(&cwq->lock, flags); + cwq->remove_sequence++; + wake_up(&cwq->work_done); } spin_unlock_irqrestore(&cwq->lock, flags); } @@ -223,37 +221,41 @@ static int worker_thread(void *__startup * Forces execution of the workqueue and blocks until its completion. * This is typically used in driver shutdown handlers. * - * NOTE: if work is being added to the queue constantly by some other - * context then this function might block indefinitely. + * This function will sample each workqueue's current insert_sequence number and + * will sleep until the head sequence is greater than or equal to that. This + * means that we sleep until all works which were queued on entry have been + * handled, but we are not livelocked by new incoming ones. + * + * This function used to run the workqueues itself. Now we just wait for the + * helper threads to do it. */ void flush_workqueue(struct workqueue_struct *wq) { struct cpu_workqueue_struct *cwq; int cpu; + might_sleep(); + for (cpu = 0; cpu < NR_CPUS; cpu++) { + DEFINE_WAIT(wait); + long sequence_needed; + if (!cpu_online(cpu)) continue; cwq = wq->cpu_wq + cpu; - if (atomic_read(&cwq->nr_queued)) { - DECLARE_WAITQUEUE(wait, current); + spin_lock_irq(&cwq->lock); + sequence_needed = cwq->insert_sequence; - if (!list_empty(&cwq->worklist)) - run_workqueue(cwq); - - /* - * Wait for helper thread(s) to finish up - * the queue: - */ - set_task_state(current, TASK_INTERRUPTIBLE); - add_wait_queue(&cwq->work_done, &wait); - if (atomic_read(&cwq->nr_queued)) - schedule(); - else - set_task_state(current, TASK_RUNNING); - remove_wait_queue(&cwq->work_done, &wait); + while (sequence_needed - cwq->remove_sequence > 0) { + prepare_to_wait(&cwq->work_done, &wait, + TASK_UNINTERRUPTIBLE); + spin_unlock_irq(&cwq->lock); + schedule(); + spin_lock_irq(&cwq->lock); } + finish_wait(&cwq->work_done, &wait); + spin_unlock_irq(&cwq->lock); } } @@ -279,7 +281,8 @@ struct workqueue_struct *create_workqueu spin_lock_init(&cwq->lock); cwq->wq = wq; cwq->thread = NULL; - atomic_set(&cwq->nr_queued, 0); + cwq->insert_sequence = 0; + cwq->remove_sequence = 0; INIT_LIST_HEAD(&cwq->worklist); init_waitqueue_head(&cwq->more_work); init_waitqueue_head(&cwq->work_done); diff -puN include/linux/workqueue.h~flush_workqueue-hang-fix include/linux/workqueue.h --- 25/include/linux/workqueue.h~flush_workqueue-hang-fix 2003-04-12 16:20:33.000000000 -0700 +++ 25-akpm/include/linux/workqueue.h 2003-04-12 16:20:33.000000000 -0700 @@ -63,5 +63,15 @@ extern int current_is_keventd(void); extern void init_workqueues(void); +/* + * Kill off a pending schedule_delayed_work(). Note that the work callback + * function may still be running on return from cancel_delayed_work(). Run + * flush_scheduled_work() to wait on it. + */ +static inline int cancel_delayed_work(struct work_struct *work) +{ + return del_timer_sync(&work->timer); +} + #endif _