From: Suparna Bhattacharya , Daniel McNeil This patch ensures that when the DIO code falls back to buffered i/o after having submitted part of the i/o, then buffered i/o is issued only for the remaining part of the request (i.e. the part not already covered by DIO), rather than redo the entire i/o. Now, instead of returning written == -ENOTBLK, generic_file_direct_IO returns the number of bytes already handled by DIO, so that the caller knows how much of the I/O is left to be handled via fallback to buffered write. We need to careful not to access dio fields if its possible that the dio could already have been freed asynchronously during i/o completion. A tricky part of this involves plugging the window between the decrement of bio_count and accessing dio->waiter during i/o completion where the dio could get freed by the submission path. This potential "bio_count race" was tackled (by Daniel) by changing bio_list_lock into bio_lock and using that for all the bio fields. Now bio_count and bios_in_flight have been converted from atomics into int and are both protected by the bio_lock. The race in finished_one_bio() could thus be fixed by leaving the bio_count at 1 until after the dio_complete() and then doing the bio_count decrement and wakeup holding the bio_lock. It appears that shifting to the spin_lock instead of atomic_inc/decs is ok performance wise as well. Update: An AIO O_DIRECT request was extending the file so it was done synchronously. However, the request got an EFAULT and direct_io_worker() was calling aio_complete() on the iocb and returning the EFAULT. When io_submit_one() got the EFAULT return, it assume it had to call aio_complete() since the i/o never got queued. The fix is for direct_io_worker() to only call aio_complete() when the upper layer is going to return -EIOCBQUEUED and not when getting errors that are being return to the submit path. --- 25-akpm/fs/direct-io.c | 113 +++++++++++++++++++++++++++++++++---------------- 25-akpm/mm/filemap.c | 8 ++- 2 files changed, 83 insertions(+), 38 deletions(-) diff -puN fs/direct-io.c~aio-fallback-bio_count-race-fix-2 fs/direct-io.c --- 25/fs/direct-io.c~aio-fallback-bio_count-race-fix-2 2004-04-03 03:00:10.315663336 -0800 +++ 25-akpm/fs/direct-io.c 2004-04-03 03:00:10.322662272 -0800 @@ -74,6 +74,7 @@ struct dio { been performed at the start of a write */ int pages_in_io; /* approximate total IO pages */ + size_t size; /* total request size (doesn't change)*/ sector_t block_in_file; /* Current offset into the underlying file in dio_block units. */ unsigned blocks_available; /* At block_in_file. changes */ @@ -115,9 +116,9 @@ struct dio { int page_errors; /* errno from get_user_pages() */ /* BIO completion state */ - atomic_t bio_count; /* nr bios to be completed */ - atomic_t bios_in_flight; /* nr bios in flight */ - spinlock_t bio_list_lock; /* protects bio_list */ + spinlock_t bio_lock; /* protects BIO fields below */ + int bio_count; /* nr bios to be completed */ + int bios_in_flight; /* nr bios in flight */ struct bio *bio_list; /* singly linked via bi_private */ struct task_struct *waiter; /* waiting task (NULL if none) */ @@ -221,20 +222,38 @@ static void dio_complete(struct dio *dio */ static void finished_one_bio(struct dio *dio) { - if (atomic_dec_and_test(&dio->bio_count)) { + unsigned long flags; + + spin_lock_irqsave(&dio->bio_lock, flags); + if (dio->bio_count == 1) { if (dio->is_async) { + /* + * Last reference to the dio is going away. + * Drop spinlock and complete the DIO. + */ + spin_unlock_irqrestore(&dio->bio_lock, flags); dio_complete(dio, dio->block_in_file << dio->blkbits, dio->result); /* Complete AIO later if falling back to buffered i/o */ - if (dio->result != -ENOTBLK) { + if (dio->result == dio->size || dio->rw == READ) { aio_complete(dio->iocb, dio->result, 0); kfree(dio); + return; } else { + /* + * Falling back to buffered + */ + spin_lock_irqsave(&dio->bio_lock, flags); + dio->bio_count--; if (dio->waiter) wake_up_process(dio->waiter); + spin_unlock_irqrestore(&dio->bio_lock, flags); + return; } } } + dio->bio_count--; + spin_unlock_irqrestore(&dio->bio_lock, flags); } static int dio_bio_complete(struct dio *dio, struct bio *bio); @@ -268,13 +287,13 @@ static int dio_bio_end_io(struct bio *bi if (bio->bi_size) return 1; - spin_lock_irqsave(&dio->bio_list_lock, flags); + spin_lock_irqsave(&dio->bio_lock, flags); bio->bi_private = dio->bio_list; dio->bio_list = bio; - atomic_dec(&dio->bios_in_flight); - if (dio->waiter && atomic_read(&dio->bios_in_flight) == 0) + dio->bios_in_flight--; + if (dio->waiter && dio->bios_in_flight == 0) wake_up_process(dio->waiter); - spin_unlock_irqrestore(&dio->bio_list_lock, flags); + spin_unlock_irqrestore(&dio->bio_lock, flags); return 0; } @@ -307,10 +326,13 @@ dio_bio_alloc(struct dio *dio, struct bl static void dio_bio_submit(struct dio *dio) { struct bio *bio = dio->bio; + unsigned long flags; bio->bi_private = dio; - atomic_inc(&dio->bio_count); - atomic_inc(&dio->bios_in_flight); + spin_lock_irqsave(&dio->bio_lock, flags); + dio->bio_count++; + dio->bios_in_flight++; + spin_unlock_irqrestore(&dio->bio_lock, flags); if (dio->is_async && dio->rw == READ) bio_set_pages_dirty(bio); submit_bio(dio->rw, bio); @@ -336,22 +358,22 @@ static struct bio *dio_await_one(struct unsigned long flags; struct bio *bio; - spin_lock_irqsave(&dio->bio_list_lock, flags); + spin_lock_irqsave(&dio->bio_lock, flags); while (dio->bio_list == NULL) { set_current_state(TASK_UNINTERRUPTIBLE); if (dio->bio_list == NULL) { dio->waiter = current; - spin_unlock_irqrestore(&dio->bio_list_lock, flags); + spin_unlock_irqrestore(&dio->bio_lock, flags); blk_run_queues(); io_schedule(); - spin_lock_irqsave(&dio->bio_list_lock, flags); + spin_lock_irqsave(&dio->bio_lock, flags); dio->waiter = NULL; } set_current_state(TASK_RUNNING); } bio = dio->bio_list; dio->bio_list = bio->bi_private; - spin_unlock_irqrestore(&dio->bio_list_lock, flags); + spin_unlock_irqrestore(&dio->bio_lock, flags); return bio; } @@ -393,7 +415,12 @@ static int dio_await_completion(struct d if (dio->bio) dio_bio_submit(dio); - while (atomic_read(&dio->bio_count)) { + /* + * The bio_lock is not held for the read of bio_count. + * This is ok since it is the dio_bio_complete() that changes + * bio_count. + */ + while (dio->bio_count) { struct bio *bio = dio_await_one(dio); int ret2; @@ -420,10 +447,10 @@ static int dio_bio_reap(struct dio *dio) unsigned long flags; struct bio *bio; - spin_lock_irqsave(&dio->bio_list_lock, flags); + spin_lock_irqsave(&dio->bio_lock, flags); bio = dio->bio_list; dio->bio_list = bio->bi_private; - spin_unlock_irqrestore(&dio->bio_list_lock, flags); + spin_unlock_irqrestore(&dio->bio_lock, flags); ret = dio_bio_complete(dio, bio); } dio->reap_counter = 0; @@ -889,6 +916,7 @@ direct_io_worker(int rw, struct kiocb *i dio->blkbits = blkbits; dio->blkfactor = inode->i_blkbits - blkbits; dio->start_zero_done = 0; + dio->size = 0; dio->block_in_file = offset >> blkbits; dio->blocks_available = 0; dio->cur_page = NULL; @@ -913,9 +941,9 @@ direct_io_worker(int rw, struct kiocb *i * (or synchronous) device could take the count to zero while we're * still submitting BIOs. */ - atomic_set(&dio->bio_count, 1); - atomic_set(&dio->bios_in_flight, 0); - spin_lock_init(&dio->bio_list_lock); + dio->bio_count = 1; + dio->bios_in_flight = 0; + spin_lock_init(&dio->bio_lock); dio->bio_list = NULL; dio->waiter = NULL; @@ -925,7 +953,7 @@ direct_io_worker(int rw, struct kiocb *i for (seg = 0; seg < nr_segs; seg++) { user_addr = (unsigned long)iov[seg].iov_base; - bytes = iov[seg].iov_len; + dio->size += bytes = iov[seg].iov_len; /* Index into the first page of the first block */ dio->first_block_in_page = (user_addr & ~PAGE_MASK) >> blkbits; @@ -956,6 +984,13 @@ direct_io_worker(int rw, struct kiocb *i } } /* end iovec loop */ + if (ret == -ENOTBLK && rw == WRITE) { + /* + * The remaining part of the request will be + * be handled by buffered I/O when we return + */ + ret = 0; + } /* * There may be some unwritten disk at the end of a part-written * fs-block-sized block. Go zero that now. @@ -991,32 +1026,35 @@ direct_io_worker(int rw, struct kiocb *i * reflect the number of to-be-processed BIOs. */ if (dio->is_async) { - if (ret == 0) - ret = dio->result; /* Bytes written */ - if (ret == -ENOTBLK) { - /* - * The request will be reissued via buffered I/O - * when we return; Any I/O already issued - * effectively becomes redundant. - */ - dio->result = ret; + int should_wait = 0; + + if (dio->result < dio->size && rw == WRITE) { dio->waiter = current; + should_wait = 1; } + if (ret == 0) + ret = dio->result; finished_one_bio(dio); /* This can free the dio */ blk_run_queues(); - if (ret == -ENOTBLK) { + if (should_wait) { + unsigned long flags; /* * Wait for already issued I/O to drain out and * release its references to user-space pages * before returning to fallback on buffered I/O */ + + spin_lock_irqsave(&dio->bio_lock, flags); set_current_state(TASK_UNINTERRUPTIBLE); - while (atomic_read(&dio->bio_count)) { + while (dio->bio_count) { + spin_unlock_irqrestore(&dio->bio_lock, flags); io_schedule(); + spin_lock_irqsave(&dio->bio_lock, flags); set_current_state(TASK_UNINTERRUPTIBLE); } + spin_unlock_irqrestore(&dio->bio_lock, flags); set_current_state(TASK_RUNNING); - dio->waiter = NULL; + kfree(dio); } } else { finished_one_bio(dio); @@ -1038,7 +1076,12 @@ direct_io_worker(int rw, struct kiocb *i } dio_complete(dio, offset, ret); /* We could have also come here on an AIO file extend */ - if (!is_sync_kiocb(iocb) && (ret != -ENOTBLK)) + if (!is_sync_kiocb(iocb) && rw == WRITE && + ret >= 0 && dio->result == dio->size) + /* + * For AIO writes where we have completed the + * i/o, we have to mark the the aio complete. + */ aio_complete(iocb, ret, 0); kfree(dio); } diff -puN mm/filemap.c~aio-fallback-bio_count-race-fix-2 mm/filemap.c --- 25/mm/filemap.c~aio-fallback-bio_count-race-fix-2 2004-04-03 03:00:10.317663032 -0800 +++ 25-akpm/mm/filemap.c 2004-04-03 03:00:10.323662120 -0800 @@ -1827,14 +1827,16 @@ generic_file_aio_write_nolock(struct kio */ if (written >= 0 && file->f_flags & O_SYNC) status = generic_osync_inode(inode, mapping, OSYNC_METADATA); - if (written >= 0 && !is_sync_kiocb(iocb)) + if (written == count && !is_sync_kiocb(iocb)) written = -EIOCBQUEUED; - if (written != -ENOTBLK) + if (written < 0 || written == count) goto out_status; /* * direct-io write to a hole: fall through to buffered I/O + * for completing the rest of the request. */ - written = 0; + pos += written; + count -= written; } buf = iov->iov_base; _