Async buffer wait DESC lock_buffer_wq fix EDESC From: Suparna Bhattacharya, Hugh Dickins. Actually lock the buffer. fs/aio.c | 4 +++- fs/buffer.c | 33 ++++++++++++++++++++++++++++----- include/linux/buffer_head.h | 27 +++++++++++++++++++++++---- include/linux/pagemap.h | 35 ++++++++++++++++++++--------------- mm/filemap.c | 7 ------- 5 files changed, 74 insertions(+), 32 deletions(-) diff -puN fs/buffer.c~aio-04-buffer_wq fs/buffer.c --- 25/fs/buffer.c~aio-04-buffer_wq 2003-10-26 21:53:36.000000000 -0800 +++ 25-akpm/fs/buffer.c 2003-10-26 21:53:36.000000000 -0800 @@ -116,27 +116,50 @@ void unlock_buffer(struct buffer_head *b } /* - * Block until a buffer comes unlocked. This doesn't stop it + * Wait until a buffer comes unlocked. This doesn't stop it * from becoming locked again - you have to lock it yourself * if you want to preserve its state. + * If the wait queue parameter specifies an async i/o callback, + * then instead of blocking, we just queue up the callback + * on the wait queue for async notification when the buffer gets + * unlocked. + * A NULL wait queue parameter defaults to synchronous behaviour */ -void __wait_on_buffer(struct buffer_head * bh) +int __wait_on_buffer_wq(struct buffer_head * bh, wait_queue_t *wait) { wait_queue_head_t *wqh = bh_waitq_head(bh); - DEFINE_WAIT(wait); + DEFINE_WAIT(local_wait); + + if (!wait) + wait = &local_wait; if (atomic_read(&bh->b_count) == 0 && (!bh->b_page || !PageLocked(bh->b_page))) buffer_error(); do { - prepare_to_wait(wqh, &wait, TASK_UNINTERRUPTIBLE); + prepare_to_wait(wqh, wait, TASK_UNINTERRUPTIBLE); if (buffer_locked(bh)) { blk_run_queues(); + if (!is_sync_wait(wait)) { + /* + * if we've queued an async wait queue + * callback do not block; just tell the + * caller to return and retry later when + * the callback is notified + */ + return -EIOCBRETRY; + } io_schedule(); } } while (buffer_locked(bh)); - finish_wait(wqh, &wait); + finish_wait(wqh, wait); + return 0; +} + +void __wait_on_buffer(struct buffer_head * bh) +{ + __wait_on_buffer_wq(bh, NULL); } static void diff -puN include/linux/buffer_head.h~aio-04-buffer_wq include/linux/buffer_head.h --- 25/include/linux/buffer_head.h~aio-04-buffer_wq 2003-10-26 21:53:36.000000000 -0800 +++ 25-akpm/include/linux/buffer_head.h 2003-10-26 21:53:36.000000000 -0800 @@ -162,6 +162,7 @@ void mark_buffer_async_write(struct buff void invalidate_bdev(struct block_device *, int); int sync_blockdev(struct block_device *bdev); void __wait_on_buffer(struct buffer_head *); +int __wait_on_buffer_wq(struct buffer_head *, wait_queue_t *wait); wait_queue_head_t *bh_waitq_head(struct buffer_head *bh); void wake_up_buffer(struct buffer_head *bh); int fsync_bdev(struct block_device *); @@ -277,16 +278,34 @@ map_bh(struct buffer_head *bh, struct su * __wait_on_buffer() just to trip a debug check. Because debug code in inline * functions is bloaty. */ -static inline void wait_on_buffer(struct buffer_head *bh) + +static inline int wait_on_buffer_wq(struct buffer_head *bh, wait_queue_t *wait) { if (buffer_locked(bh) || atomic_read(&bh->b_count) == 0) - __wait_on_buffer(bh); + return __wait_on_buffer_wq(bh, wait); + + return 0; +} + +static inline void wait_on_buffer(struct buffer_head *bh) +{ + wait_on_buffer_wq(bh, NULL); +} + +static inline int lock_buffer_wq(struct buffer_head *bh, wait_queue_t *wait) +{ + while (test_set_buffer_locked(bh)) { + int ret = __wait_on_buffer_wq(bh, wait); + if (ret) + return ret; + } + + return 0; } static inline void lock_buffer(struct buffer_head *bh) { - while (test_set_buffer_locked(bh)) - __wait_on_buffer(bh); + lock_buffer_wq(bh, NULL); } #endif /* _LINUX_BUFFER_HEAD_H */ diff -puN fs/aio.c~aio-04-buffer_wq fs/aio.c --- 25/fs/aio.c~aio-04-buffer_wq 2003-10-26 21:53:36.000000000 -0800 +++ 25-akpm/fs/aio.c 2003-10-26 21:53:36.000000000 -0800 @@ -50,6 +50,7 @@ static kmem_cache_t *kiocb_cachep; static kmem_cache_t *kioctx_cachep; static struct workqueue_struct *aio_wq; +static struct workqueue_struct *aio_fput_wq; /* Used for rare fput completion. */ static void aio_fput_routine(void *); @@ -77,6 +78,7 @@ static int __init aio_setup(void) panic("unable to create kioctx cache"); aio_wq = create_workqueue("aio"); + aio_fput_wq = create_workqueue("aio_fput"); pr_debug("aio_setup: sizeof(struct page) = %d\n", (int)sizeof(struct page)); @@ -504,7 +506,7 @@ static int __aio_put_req(struct kioctx * spin_lock(&fput_lock); list_add(&req->ki_list, &fput_head); spin_unlock(&fput_lock); - queue_work(aio_wq, &fput_work); + queue_work(aio_fput_wq, &fput_work); } else really_put_req(ctx, req); return 1; diff -puN include/linux/pagemap.h~aio-04-buffer_wq include/linux/pagemap.h --- 25/include/linux/pagemap.h~aio-04-buffer_wq 2003-10-26 21:53:36.000000000 -0800 +++ 25-akpm/include/linux/pagemap.h 2003-10-26 21:53:36.000000000 -0800 @@ -153,11 +153,6 @@ static inline void ___add_to_page_cache( extern void FASTCALL(__lock_page(struct page *page)); extern void FASTCALL(unlock_page(struct page *page)); -static inline void lock_page(struct page *page) -{ - if (TestSetPageLocked(page)) - __lock_page(page); -} extern int FASTCALL(__lock_page_wq(struct page *page, wait_queue_t *wait)); static inline int lock_page_wq(struct page *page, wait_queue_t *wait) @@ -168,12 +163,17 @@ static inline int lock_page_wq(struct pa return 0; } +static inline void lock_page(struct page *page) +{ + lock_page_wq(page, NULL); +} /* * This is exported only for wait_on_page_locked/wait_on_page_writeback. * Never use this directly! */ -extern void FASTCALL(wait_on_page_bit(struct page *page, int bit_nr)); +extern int FASTCALL(wait_on_page_bit_wq(struct page *page, int bit_nr, + wait_queue_t *wait)); /* * Wait for a page to be unlocked. @@ -182,28 +182,33 @@ extern void FASTCALL(wait_on_page_bit(st * ie with increased "page->count" so that the page won't * go away during the wait.. */ -static inline void wait_on_page_locked(struct page *page) +static inline int wait_on_page_locked_wq(struct page *page, wait_queue_t *wait) { if (PageLocked(page)) - wait_on_page_bit(page, PG_locked); + return wait_on_page_bit_wq(page, PG_locked, wait); + return 0; } -extern int FASTCALL(wait_on_page_bit_wq(struct page *page, int bit_nr, - wait_queue_t *wait)); -static inline int wait_on_page_locked_wq(struct page *page, wait_queue_t *wait) +static inline int wait_on_page_writeback_wq(struct page *page, + wait_queue_t *wait) { - if (PageLocked(page)) - return wait_on_page_bit_wq(page, PG_locked, wait); + if (PageWriteback(page)) + return wait_on_page_bit_wq(page, PG_writeback, wait); return 0; } +static inline void wait_on_page_locked(struct page *page) +{ + wait_on_page_locked_wq(page, NULL); +} + /* * Wait for a page to complete writeback */ + static inline void wait_on_page_writeback(struct page *page) { - if (PageWriteback(page)) - wait_on_page_bit(page, PG_writeback); + wait_on_page_writeback_wq(page, NULL); } extern void end_page_writeback(struct page *page); diff -puN mm/filemap.c~aio-04-buffer_wq mm/filemap.c --- 25/mm/filemap.c~aio-04-buffer_wq 2003-10-26 21:53:36.000000000 -0800 +++ 25-akpm/mm/filemap.c 2003-10-26 21:53:36.000000000 -0800 @@ -344,13 +344,6 @@ int wait_on_page_bit_wq(struct page *pag } EXPORT_SYMBOL(wait_on_page_bit_wq); -void wait_on_page_bit(struct page *page, int bit_nr) -{ - wait_on_page_bit_wq(page, bit_nr, NULL); -} - -EXPORT_SYMBOL(wait_on_page_bit); - /** * unlock_page() - unlock a locked page * _