diff -urNp --exclude CVS --exclude BitKeeper x-ref/fs/aio.c x/fs/aio.c --- x-ref/fs/aio.c 2003-08-31 03:54:47.000000000 +0200 +++ x/fs/aio.c 2003-08-31 03:56:22.000000000 +0200 @@ -902,6 +902,19 @@ asmlinkage long sys_io_destroy(aio_conte return -EINVAL; } +ssize_t generic_aio_poll(struct file *file, struct kiocb *req, struct iocb *iocb) +{ + unsigned events = iocb->aio_buf; + + /* Did the user set any bits they weren't supposed to? (The + * above is actually a cast. + */ + if (unlikely(events != iocb->aio_buf)) + return -EINVAL; + + return async_poll(req, events); +} + static inline int io_submit_one(struct kioctx *ctx, struct iocb *user_iocb, struct iocb *iocb) { @@ -978,6 +991,9 @@ static inline int io_submit_one(struct k case IOCB_CMD_FSYNC: op = file->f_op->aio_fsync; break; + case IOCB_CMD_POLL: + op = generic_aio_poll; + break; default: dprintk("EINVAL: io_submit: no operation %d provided by aio\n", iocb->aio_lio_opcode); @@ -1137,6 +1153,11 @@ static void generic_aio_complete_write(v generic_aio_complete_rw(WRITE, _iocb, vec, res); } +ssize_t generic_sock_aio_read(struct file *file, struct kiocb *req, struct iocb *iocb) +{ + return generic_aio_rw(READ, file, req, iocb, 1); +} + ssize_t generic_aio_rw(int rw, struct file *file, struct kiocb *req, struct iocb *iocb, size_t min_size) { int (*kvec_op)(struct file *, kvec_cb_t, size_t, loff_t); diff -urNp --exclude CVS --exclude BitKeeper x-ref/fs/pipe.c x/fs/pipe.c --- x-ref/fs/pipe.c 2003-08-31 03:54:43.000000000 +0200 +++ x/fs/pipe.c 2003-08-31 03:55:44.000000000 +0200 @@ -134,31 +134,240 @@ out_nolock: return ret; } +static int pipe_kvec_read(struct file *filp, kvec_cb_t cb, size_t size, loff_t pos) +{ + return 0; +} + +static int pipe_aio_read_cancel(struct kiocb *iocb, struct io_event *res) +{ + struct inode *inode = iocb->filp->f_dentry->d_inode; + struct pipe_inode_info *pipe = inode->i_pipe; + struct list_head *pos; + int found = 0; + int ret = -EAGAIN; + + pr_debug("cancelling aio pipe read(%p)\n", iocb); + + /* To cancel an aio, we must first prevent writers from + * removing it from the list. We must block here as the + * cancellation may be from the process exit path. + */ + down(PIPE_SEM(*inode)); + + pr_debug("got semaphore\n"); + spin_lock(&pipe->pipe_aio_lock); + + list_for_each(pos, &pipe->read_iocb_list) { + if (pos == &iocb->u.list) { + list_del(pos); + found = 1; + break; + } + } + + spin_unlock(&pipe->pipe_aio_lock); + up(PIPE_SEM(*inode)); + aio_put_req(iocb); + + if (found) { + if (iocb->data) { + unmap_kvec(iocb->data, 1); + free_kvec(iocb->data); + } + + if (unlikely(iocb->nr_transferred > INT_MAX)) { + aio_complete(iocb, iocb->nr_transferred, 0); + } else { + ret = iocb->nr_transferred; + aio_put_req(iocb); + } + } + + return ret; +} + +static ssize_t pipe_aio_read (struct file *file, struct kiocb *iocb, struct iocb *uiocb) +{ + struct inode *inode = file->f_dentry->d_inode; + int queued = 0, failed_sem = 0; + + iocb->data = NULL; + iocb->cancel = pipe_aio_read_cancel; + iocb->this_size = iocb->size; + if (iocb->this_size > aio_max_size) + iocb->this_size = aio_max_size; + + /* 0 length reads are always successful */ + if (unlikely(!iocb->size)) { + aio_complete(iocb, 0, 0); + return 0; + } + + iocb->data = map_user_kvec(READ, iocb->buf, iocb->this_size); + if (unlikely(IS_ERR(iocb->data))) { + pr_debug("pipe_aio_read: map_user_kvec=%ld\n", PTR_ERR(iocb->data)); + return PTR_ERR(iocb->data); + } + + /* down_trylock == 0 if we obtained the semaphore -> if the + * semaphore was not acquired, we queue the read request. + */ + failed_sem = down_trylock(PIPE_SEM(*inode)); + + spin_lock(&inode->i_pipe->pipe_aio_lock); + if (failed_sem || !list_empty(&inode->i_pipe->read_iocb_list)) { + pr_debug("queueing aio pipe read(%p)\n", iocb); + list_add_tail(&iocb->u.list, &inode->i_pipe->read_iocb_list); + queued = 1; + } + spin_unlock(&inode->i_pipe->pipe_aio_lock); + + if (queued) { + if (!failed_sem) + up(PIPE_SEM(*inode)); + return 0; + } + + /* Okay, we're the first read request. Try reading data, otherwise + * fall back and queue. + */ + if (PIPE_EMPTY(*inode)) { + /* No writers? EOF. */ + if (!PIPE_WRITERS(*inode)) { + aio_complete(iocb, 0, 0); + goto out; + } + + /* No data. Oh well, queue it at the head. */ + spin_lock(&inode->i_pipe->pipe_aio_lock); + list_add(&iocb->u.list, &inode->i_pipe->read_iocb_list); + spin_unlock(&inode->i_pipe->pipe_aio_lock); + up(PIPE_SEM(*inode)); + return 0; + } + + spin_lock(&inode->i_pipe->pipe_aio_lock); + list_add(&iocb->u.list, &inode->i_pipe->read_iocb_list); + spin_unlock(&inode->i_pipe->pipe_aio_lock); + up(PIPE_SEM(*inode)); + return 0; + + //pfull = PIPE_FULL(*inode); + +out: + up(PIPE_SEM(*inode)); + /* + * FIXME: writes may have been queued. The current code + * requires the caller to resubmit to get the event. + */ + + unmap_kvec(iocb->data, 1); + free_kvec(iocb->data); + iocb->data = NULL; + + return 0; +} + +/* do_pipe_write_aio: + * Performs a pipe write when there exists an outstanding aio + * read operation. Returns the number of bytes written or -EFAULT. + */ +static inline ssize_t do_pipe_write_aio(struct pipe_inode_info *pipe, + const char *buf, size_t count, struct kiocb *iocb) +{ + ssize_t written = 0; + pr_debug("do_pipe_aio_write\n"); + + while (count > 0) { + size_t len; + len = min(iocb->this_size, count); + if (unlikely(copy_user_to_kvec(iocb->data, iocb->nr_transferred, buf, len))) { + pr_debug("EFAULT?\n"); + break; + } + iocb->nr_transferred += len; + written += len; + buf += len; + count -= len; + + if ((iocb->nr_transferred == iocb->this_size) || + (iocb->filp->f_flags & O_NONBLOCK)) { + struct list_head *first = NULL; + + pr_debug("done this iocb\n"); + + /* Mark the pages as dirty and complete the request. + */ + unmap_kvec(iocb->data, 1); + free_kvec(iocb->data); + + spin_lock(&pipe->pipe_aio_lock); + list_del(&iocb->u.list); + first = list_first(&pipe->read_iocb_list); + spin_unlock(&pipe->pipe_aio_lock); + + aio_complete(iocb, iocb->nr_transferred, 0); + + iocb = NULL; + + /* No more aio reads? */ + if (!first) + break; + + pr_debug("processing another iocb\n"); + iocb = list_entry(first, struct kiocb, u.list); + } + } + + pr_debug("returning: %ld\n", written); + + return written ? written : -EFAULT; +} + static ssize_t pipe_write(struct file *filp, const char *buf, size_t count, loff_t *ppos) { struct inode *inode = filp->f_dentry->d_inode; + struct list_head *iocb; ssize_t free, written, ret; /* Seeks are not allowed on pipes. */ ret = -ESPIPE; written = 0; - if (ppos != &filp->f_pos) + if (unlikely(ppos != &filp->f_pos)) goto out_nolock; /* Null write succeeds. */ ret = 0; - if (count == 0) + if (unlikely(count == 0)) goto out_nolock; ret = -ERESTARTSYS; - if (down_interruptible(PIPE_SEM(*inode))) + if (unlikely(down_interruptible(PIPE_SEM(*inode)))) goto out_nolock; /* No readers yields SIGPIPE. */ - if (!PIPE_READERS(*inode)) + if (unlikely(!PIPE_READERS(*inode))) goto sigpipe; + spin_lock(&inode->i_pipe->pipe_aio_lock); + iocb = list_first(&inode->i_pipe->read_iocb_list); + spin_unlock(&inode->i_pipe->pipe_aio_lock); + + if (iocb) { + written = do_pipe_write_aio(inode->i_pipe, buf, count, + list_entry(iocb, struct kiocb, u.list)); + if (unlikely(written < 0)) + goto out; + + count -= written; + buf += written; + + if (!count) + goto out; + } + /* If count <= PIPE_BUF, we have to make it atomic. */ free = (count <= PIPE_BUF ? count : 1); @@ -339,6 +548,7 @@ pipe_rdwr_release(struct inode *inode, s static int pipe_read_open(struct inode *inode, struct file *filp) { + filp->private_data = inode->i_pipe; /* We could have perhaps used atomic_t, but this and friends below are the only places. So it doesn't seem worthwhile. */ down(PIPE_SEM(*inode)); @@ -351,6 +561,7 @@ pipe_read_open(struct inode *inode, stru static int pipe_write_open(struct inode *inode, struct file *filp) { + filp->private_data = inode->i_pipe; down(PIPE_SEM(*inode)); PIPE_WRITERS(*inode)++; up(PIPE_SEM(*inode)); @@ -361,6 +572,7 @@ pipe_write_open(struct inode *inode, str static int pipe_rdwr_open(struct inode *inode, struct file *filp) { + filp->private_data = inode->i_pipe; down(PIPE_SEM(*inode)); if (filp->f_mode & FMODE_READ) PIPE_READERS(*inode)++; @@ -378,6 +590,7 @@ pipe_rdwr_open(struct inode *inode, stru struct file_operations read_fifo_fops = { llseek: no_llseek, read: pipe_read, + aio_read: pipe_aio_read, write: bad_pipe_w, poll: fifo_poll, ioctl: pipe_ioctl, @@ -398,6 +611,7 @@ struct file_operations write_fifo_fops = struct file_operations rdwr_fifo_fops = { llseek: no_llseek, read: pipe_read, + aio_read: pipe_aio_read, write: pipe_write, poll: fifo_poll, ioctl: pipe_ioctl, @@ -408,6 +622,7 @@ struct file_operations rdwr_fifo_fops = struct file_operations read_pipe_fops = { llseek: no_llseek, read: pipe_read, + aio_read: pipe_aio_read, write: bad_pipe_w, poll: pipe_poll, ioctl: pipe_ioctl, @@ -428,6 +643,7 @@ struct file_operations write_pipe_fops = struct file_operations rdwr_pipe_fops = { llseek: no_llseek, read: pipe_read, + aio_read: pipe_aio_read, write: pipe_write, poll: pipe_poll, ioctl: pipe_ioctl, @@ -453,6 +669,9 @@ struct inode* pipe_new(struct inode* ino PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 0; PIPE_WAITING_READERS(*inode) = PIPE_WAITING_WRITERS(*inode) = 0; PIPE_RCOUNTER(*inode) = PIPE_WCOUNTER(*inode) = 1; + spin_lock_init(&inode->i_pipe->pipe_aio_lock); + INIT_LIST_HEAD(&inode->i_pipe->read_iocb_list); + INIT_LIST_HEAD(&inode->i_pipe->write_iocb_list); return inode; fail_page: diff -urNp --exclude CVS --exclude BitKeeper x-ref/fs/select.c x/fs/select.c --- x-ref/fs/select.c 2003-08-31 03:54:49.000000000 +0200 +++ x/fs/select.c 2003-08-31 03:55:44.000000000 +0200 @@ -12,6 +12,12 @@ * 24 January 2000 * Changed sys_poll()/do_poll() to use PAGE_SIZE chunk-based allocation * of fds to overcome nfds < 16390 descriptors limit (Tigran Aivazian). + * June 2001 + * Added async_poll implementation. -bcrl + * Nov 2001 + * Async poll improvments from Suparna Bhattacharya + * April 2002 + * smp safe async poll plus cancellation. -bcrl */ #include @@ -20,6 +26,8 @@ #include /* for STICKY_TIMEOUTS */ #include #include +#include +#include #include @@ -27,19 +35,36 @@ #define DEFAULT_POLLMASK (POLLIN | POLLOUT | POLLRDNORM | POLLWRNORM) struct poll_table_entry { - struct file * filp; wait_queue_t wait; wait_queue_head_t * wait_address; + struct file * filp; + struct poll_wqueues * p; }; struct poll_table_page { + unsigned long size; struct poll_table_page * next; struct poll_table_entry * entry; struct poll_table_entry entries[0]; }; #define POLL_TABLE_FULL(table) \ - ((unsigned long)((table)->entry+1) > PAGE_SIZE + (unsigned long)(table)) + ((unsigned long)((table)->entry+1) > \ + (table)->size + (unsigned long)(table)) + +/* async poll uses only one entry per poll table as it is linked to an iocb */ +typedef struct async_poll_table_struct { + struct poll_wqueues pwq; + struct worktodo wtd; + int events; /* event mask for async poll */ + int wake; + long sync; + struct poll_table_page pt_page; /* one poll table page hdr */ + struct poll_table_entry entries[1]; /* space for a single entry */ +} async_poll_table; + + +static kmem_cache_t *async_poll_table_cache; /* * Ok, Peter made a complicated, but straightforward multiple_wait() function. @@ -60,9 +85,10 @@ void poll_initwait(struct poll_wqueues * init_poll_funcptr(&pwq->pt, __pollwait); pwq->error = 0; pwq->table = NULL; + pwq->iocb = NULL; } -void poll_freewait(struct poll_wqueues *pwq) +void __poll_freewait(struct poll_wqueues *pwq, wait_queue_t *wait) { struct poll_table_page * p = pwq->table; while (p) { @@ -70,15 +96,142 @@ void poll_freewait(struct poll_wqueues * struct poll_table_page *old; entry = p->entry; + if (entry == p->entries) /* may happen with async poll */ + break; do { entry--; - remove_wait_queue(entry->wait_address,&entry->wait); - fput(entry->filp); + if (wait != &entry->wait) + remove_wait_queue(entry->wait_address,&entry->wait); + else + __remove_wait_queue(entry->wait_address,&entry->wait); + fput(entry->filp); } while (entry > p->entries); old = p; p = p->next; - free_page((unsigned long) old); + if (old->size == PAGE_SIZE) + free_page((unsigned long) old); } + if (pwq->iocb) + kmem_cache_free(async_poll_table_cache, pwq); +} + +void poll_freewait(struct poll_wqueues* pwq) +{ + __poll_freewait(pwq, NULL); +} + +void async_poll_complete(void *data) +{ + async_poll_table *pasync = data; + struct poll_wqueues *p = data; + struct kiocb *iocb = p->iocb; + unsigned int mask; + poll_table *wait = &p->pt; + + pasync->wake = 0; + wmb(); + do { + mask = iocb->filp->f_op->poll(iocb->filp, wait); + mask &= pasync->events | POLLERR | POLLHUP; + if (mask) { + struct poll_wqueues *p2 = xchg(&iocb->data, NULL); + if (p2) { + poll_freewait(p2); + aio_complete(iocb, mask, 0); + } + return; + } + pasync->sync = 0; + wmb(); + } while (pasync->wake); +} + +static void async_poll_waiter(wait_queue_t *wait, + int thrway1, int thrway2) +{ + struct poll_table_entry *entry = (struct poll_table_entry *)wait; + async_poll_table *pasync = (async_poll_table *)(entry->p); + struct kiocb *iocb = pasync->pwq.iocb; + unsigned int mask; + + mask = iocb->filp->f_op->poll(iocb->filp, NULL); + mask &= pasync->events | POLLERR | POLLHUP; + if (mask) { + struct poll_wqueues *p2 = xchg(&iocb->data, NULL); + if (p2) { + __poll_freewait(p2, wait); + aio_complete(iocb, mask, 0); + } + return; + } + } + +int async_poll_cancel(struct kiocb *iocb, struct io_event *res) +{ + struct poll_wqueues *p; + + p = xchg(&iocb->data, NULL); + aio_put_req(iocb); + if (p) { + poll_freewait(p); + /* + * Since poll_freewait() locks the wait queue, we know that + * async_poll_waiter() is either not going to be run or has + * finished all its work. + */ + aio_put_req(iocb); + return 0; + } + return -EAGAIN; +} + +int async_poll(struct kiocb *iocb, int events) +{ + unsigned int mask; + async_poll_table *pasync; + struct poll_wqueues *p; + poll_table *wait; + + /* Fast path */ + if (iocb->filp->f_op && iocb->filp->f_op->poll) { + mask = iocb->filp->f_op->poll(iocb->filp, NULL); + mask &= events | POLLERR | POLLHUP; + if (mask & events) + return mask; + } + + pasync = kmem_cache_alloc(async_poll_table_cache, SLAB_KERNEL); + if (!pasync) + return -ENOMEM; + + p = (struct poll_wqueues *)pasync; + poll_initwait(p); + wtd_set_action(&pasync->wtd, async_poll_complete, pasync); + p->iocb = iocb; + pasync->wake = 0; + pasync->sync = 0; + pasync->events = events; + pasync->pt_page.entry = pasync->pt_page.entries; + pasync->pt_page.size = sizeof(pasync->pt_page); + p->table = &pasync->pt_page; + + iocb->data = p; + iocb->users ++; + wmb(); + + mask = DEFAULT_POLLMASK; + wait = &p->pt; + if (iocb->filp->f_op && iocb->filp->f_op->poll) + mask = iocb->filp->f_op->poll(iocb->filp, wait); + mask &= events | POLLERR | POLLHUP; + if (mask && xchg(&iocb->data, NULL)) { + poll_freewait(p); + aio_complete(iocb, mask, 0); + } + + iocb->cancel = async_poll_cancel; + aio_put_req(iocb); + return 0; } void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *_p) @@ -95,6 +248,7 @@ void __pollwait(struct file *filp, wait_ __set_current_state(TASK_RUNNING); return; } + new_table->size = PAGE_SIZE; new_table->entry = new_table->entries; new_table->next = table; p->table = new_table; @@ -108,7 +262,11 @@ void __pollwait(struct file *filp, wait_ get_file(filp); entry->filp = filp; entry->wait_address = wait_address; - init_waitqueue_entry(&entry->wait, current); + entry->p = p; + if (p->iocb) /* async poll */ + init_waitqueue_func_entry(&entry->wait, async_poll_waiter); + else + init_waitqueue_entry(&entry->wait, current); add_wait_queue(wait_address,&entry->wait); smp_mb(); } @@ -507,3 +665,14 @@ out: poll_freewait(&table); return err; } + +static int __init async_poll_init(void) +{ + async_poll_table_cache = kmem_cache_create("async poll table", + sizeof(async_poll_table), 0, 0, NULL, NULL); + if (!async_poll_table_cache) + panic("unable to alloc poll_table_cache"); + return 0; +} + +module_init(async_poll_init); diff -urNp --exclude CVS --exclude BitKeeper x-ref/include/linux/aio.h x/include/linux/aio.h --- x-ref/include/linux/aio.h 2003-08-31 03:54:47.000000000 +0200 +++ x/include/linux/aio.h 2003-08-31 03:55:44.000000000 +0200 @@ -120,6 +120,8 @@ extern ssize_t generic_aio_read(struct f extern ssize_t generic_aio_write(struct file *file, struct kiocb *req, struct iocb *iocb, size_t min_size); extern ssize_t generic_file_aio_read(struct file *file, struct kiocb *req, struct iocb *iocb); extern ssize_t generic_file_aio_write(struct file *file, struct kiocb *req, struct iocb *iocb); +extern ssize_t generic_sock_aio_read(struct file *file, struct kiocb *req, struct iocb *iocb); + /* for sysctl: */ extern unsigned aio_nr, aio_max_nr, aio_max_size, aio_max_pinned; diff -urNp --exclude CVS --exclude BitKeeper x-ref/include/linux/aio_abi.h x/include/linux/aio_abi.h --- x-ref/include/linux/aio_abi.h 2003-08-31 03:54:47.000000000 +0200 +++ x/include/linux/aio_abi.h 2003-08-31 03:55:44.000000000 +0200 @@ -36,10 +36,11 @@ enum { IOCB_CMD_PWRITE = 1, IOCB_CMD_FSYNC = 2, IOCB_CMD_FDSYNC = 3, - /* These two are experimental. + /* + * Experimental: * IOCB_CMD_PREADX = 4, - * IOCB_CMD_POLL = 5, */ + IOCB_CMD_POLL = 5, IOCB_CMD_NOOP = 6, }; diff -urNp --exclude CVS --exclude BitKeeper x-ref/include/linux/fs.h x/include/linux/fs.h --- x-ref/include/linux/fs.h 2003-08-31 03:54:49.000000000 +0200 +++ x/include/linux/fs.h 2003-08-31 03:55:44.000000000 +0200 @@ -950,6 +950,10 @@ struct block_device_operations { * read, write, poll, fsync, readv, writev can be called * without the big kernel lock held in all filesystems. */ + +#define F_ATOMIC 0x0001 +#define F_OFFSETOK 0x0002 + struct file_operations { struct module *owner; loff_t (*llseek) (struct file *, loff_t, int); diff -urNp --exclude CVS --exclude BitKeeper x-ref/include/linux/net.h x/include/linux/net.h --- x-ref/include/linux/net.h 2003-08-26 00:13:17.000000000 +0200 +++ x/include/linux/net.h 2003-08-31 03:55:44.000000000 +0200 @@ -21,6 +21,7 @@ #include #include #include +#include struct poll_table_struct; @@ -110,6 +111,8 @@ struct proto_ops { int (*recvmsg) (struct socket *sock, struct msghdr *m, int total_len, int flags, struct scm_cookie *scm); int (*mmap) (struct file *file, struct socket *sock, struct vm_area_struct * vma); ssize_t (*sendpage) (struct socket *sock, struct page *page, int offset, size_t size, int flags); + int (*kvec_read) (struct sock *, kvec_cb_t cb, int len); + int (*kvec_write) (struct sock *, kvec_cb_t cb, int len); }; struct net_proto_family diff -urNp --exclude CVS --exclude BitKeeper x-ref/include/linux/pipe_fs_i.h x/include/linux/pipe_fs_i.h --- x-ref/include/linux/pipe_fs_i.h 2003-03-15 03:25:14.000000000 +0100 +++ x/include/linux/pipe_fs_i.h 2003-08-31 03:55:44.000000000 +0200 @@ -13,6 +13,9 @@ struct pipe_inode_info { unsigned int waiting_writers; unsigned int r_counter; unsigned int w_counter; + spinlock_t pipe_aio_lock; + struct list_head read_iocb_list; + struct list_head write_iocb_list; }; /* Differs from PIPE_BUF in that PIPE_SIZE is the length of the actual diff -urNp --exclude CVS --exclude BitKeeper x-ref/include/linux/poll.h x/include/linux/poll.h --- x-ref/include/linux/poll.h 2003-08-31 03:54:49.000000000 +0200 +++ x/include/linux/poll.h 2003-08-31 03:55:44.000000000 +0200 @@ -9,8 +9,10 @@ #include #include #include +#include struct poll_table_struct; +struct kiocb; /* * structures and helpers for f_op->poll implementations @@ -39,10 +41,12 @@ struct poll_wqueues { poll_table pt; struct poll_table_page * table; int error; + struct kiocb *iocb; /* iocb for async poll */ }; extern void poll_initwait(struct poll_wqueues *pwq); extern void poll_freewait(struct poll_wqueues *pwq); +extern int async_poll(struct kiocb *iocb, int events); /* * Scaleable version of the fd_set. diff -urNp --exclude CVS --exclude BitKeeper x-ref/include/net/sock.h x/include/net/sock.h --- x-ref/include/net/sock.h 2003-08-31 03:54:45.000000000 +0200 +++ x/include/net/sock.h 2003-08-31 03:55:44.000000000 +0200 @@ -105,8 +105,15 @@ struct atm_vcc; #include #include +#include +struct sock_iocb { + struct list_head list; + kvec_cb_t cb; + struct kvec_dst dst; +}; + /* The AF_UNIX specific socket options */ struct unix_opt { struct unix_address *addr; @@ -570,6 +577,9 @@ struct sock { struct sk_buff *tail; } backlog; + struct list_head kvec_read_list; + struct list_head kvec_write_list; + rwlock_t callback_lock; /* Error queue, rarely used. */ @@ -737,6 +747,8 @@ struct proto { int (*recvmsg)(struct sock *sk, struct msghdr *msg, int len, int noblock, int flags, int *addr_len); + int (*kvec_read)(struct sock *, kvec_cb_t cb, int len); + int (*kvec_write)(struct sock *, kvec_cb_t cb, int len); int (*bind)(struct sock *sk, struct sockaddr *uaddr, int addr_len); @@ -811,7 +823,7 @@ do { spin_lock_bh(&((__sk)->lock.slock)) if ((__sk)->backlog.tail != NULL) \ __release_sock(__sk); \ (__sk)->lock.users = 0; \ - if (waitqueue_active(&((__sk)->lock.wq))) wake_up(&((__sk)->lock.wq)); \ + wake_up(&((__sk)->lock.wq)); \ spin_unlock_bh(&((__sk)->lock.slock)); \ } while(0) diff -urNp --exclude CVS --exclude BitKeeper x-ref/include/net/tcp.h x/include/net/tcp.h --- x-ref/include/net/tcp.h 2003-08-31 03:54:45.000000000 +0200 +++ x/include/net/tcp.h 2003-08-31 03:55:44.000000000 +0200 @@ -735,6 +735,8 @@ extern int tcp_recvmsg(struct sock *sk struct msghdr *msg, int len, int nonblock, int flags, int *addr_len); +extern int tcp_kvec_read(struct sock *sk, kvec_cb_t cb, int len); +extern int tcp_kvec_write(struct sock *sk, kvec_cb_t cb, int len); extern int tcp_listen_start(struct sock *sk); diff -urNp --exclude CVS --exclude BitKeeper x-ref/net/core/datagram.c x/net/core/datagram.c --- x-ref/net/core/datagram.c 2003-03-15 03:25:19.000000000 +0100 +++ x/net/core/datagram.c 2003-08-31 03:55:44.000000000 +0200 @@ -8,6 +8,8 @@ * * Authors: Alan Cox . (datagram_poll() from old udp.c code) * + * Portions Copyright 2001 Red Hat, Inc. + * * Fixes: * Alan Cox : NULL return from skb_peek_copy() understood * Alan Cox : Rewrote skb_read_datagram to avoid the skb_peek_copy stuff. @@ -21,6 +23,7 @@ * Darryl Miles : Fixed non-blocking SOCK_STREAM. * Alan Cox : POSIXisms * Pete Wyckoff : Unconnected accept() fix. + * Benjamin LaHaise: added kvec operations * */ @@ -37,6 +40,7 @@ #include #include #include +#include #include #include @@ -446,3 +450,321 @@ unsigned int datagram_poll(struct file * return mask; } + +/* + */ +static inline void skb_copy_datagram_kvec_dst(const struct sk_buff *skb, + int offset, struct kvec_dst *dst, int len) +{ + int i, copy; + int start = skb->len - skb->data_len; + + /* Copy header. */ + if ((copy = start-offset) > 0) { + if (copy > len) + copy = len; + memcpy_to_kvec_dst(dst, skb->data + offset, copy); + if ((len -= copy) == 0) + return; + offset += copy; + } + + /* Copy paged appendix. Hmm... why does this look so complicated? */ + for (i=0; inr_frags; i++) { + int end; + + BUG_TRAP(start <= offset+len); + + end = start + skb_shinfo(skb)->frags[i].size; + if ((copy = end-offset) > 0) { + u8 *vaddr; + skb_frag_t *frag = &skb_shinfo(skb)->frags[i]; + struct page *page = frag->page; + + if (copy > len) + copy = len; + vaddr = kmap_atomic(page, KM_USER1); + memcpy_to_kvec_dst(dst, vaddr + frag->page_offset + + offset-start, copy); + kunmap_atomic(vaddr, KM_USER1); + if (!(len -= copy)) + return; + offset += copy; + } + start = end; + } + + if (skb_shinfo(skb)->frag_list) { + struct sk_buff *list; + + for (list = skb_shinfo(skb)->frag_list; list; list=list->next) { + int end; + + BUG_TRAP(start <= offset+len); + + end = start + list->len; + if ((copy = end-offset) > 0) { + if (copy > len) + copy = len; + skb_copy_datagram_kvec_dst(list, offset-start, dst, copy); + if ((len -= copy) == 0) + return; + offset += copy; + } + start = end; + } + } +} + +void skb_copy_datagram_kvec(const struct sk_buff *skb, int offset, + struct kvec *vec, int len) +{ + struct kvec_dst dst; + kvec_dst_init(&dst); + kvec_dst_set(&dst, vec->veclet); + kvec_dst_map(&dst); + skb_copy_datagram_kvec_dst(skb, offset, &dst, len); + kvec_dst_unmap(&dst); +} + +/* C++ would be better for this. Please don't torture me with this code + * ever again. + */ +static inline unsigned int csum_and_copy_to_dst(struct kvec_dst *dst, + const char *from, int len, unsigned int csum) +{ + do { + int cnt = len; + if (dst->space < cnt) + cnt = dst->space; + + memcpy(dst->dst, from, cnt); + csum = csum_partial_copy_nocheck(from, dst->dst, cnt, csum); + from += cnt; + dst->space -= cnt; + dst->dst += cnt; + len -= cnt; + if (!dst->space && len) { + kvec_dst_unmap(dst); + dst->let++; + dst->offset = 0; + kvec_dst_map(dst); + if (!dst->space) + BUG(); + } + } while (len); + return csum; +} + +static inline void skb_copy_and_csum_datagram_kvec_dst(const struct sk_buff *skb, int offset, struct kvec_dst *dst, int len, unsigned int *csump) +{ + int i, copy; + int start = skb->len - skb->data_len; + int pos = 0; + + /* Copy header. */ + if ((copy = start-offset) > 0) { + if (copy > len) + copy = len; + *csump = csum_and_copy_to_dst(dst, skb->data+offset, copy, *csump); + if ((len -= copy) == 0) + return; + offset += copy; + pos = copy; + } + + for (i=0; inr_frags; i++) { + int end; + + BUG_TRAP(start <= offset+len); + + end = start + skb_shinfo(skb)->frags[i].size; + if ((copy = end-offset) > 0) { + unsigned int csum2; + u8 *vaddr; + skb_frag_t *frag = &skb_shinfo(skb)->frags[i]; + struct page *page = frag->page; + + if (copy > len) + copy = len; + vaddr = kmap_atomic(page, KM_USER1); + csum2 = csum_and_copy_to_dst(dst, + vaddr + frag->page_offset + offset-start, + copy, 0); + kunmap_atomic(vaddr, KM_USER1); + *csump = csum_block_add(*csump, csum2, pos); + if (!(len -= copy)) + return; + offset += copy; + pos += copy; + } + start = end; + } + + if (skb_shinfo(skb)->frag_list) { + struct sk_buff *list; + + for (list = skb_shinfo(skb)->frag_list; list; list=list->next) { + int end; + + BUG_TRAP(start <= offset+len); + + end = start + list->len; + if ((copy = end-offset) > 0) { + unsigned int csum2 = 0; + if (copy > len) + copy = len; + skb_copy_and_csum_datagram_kvec_dst(list, offset-start, dst, copy, &csum2); + *csump = csum_block_add(*csump, csum2, pos); + if ((len -= copy) == 0) + return; + offset += copy; + pos += copy; + } + start = end; + } + } +} + +int skb_copy_and_csum_datagram_kvec(const struct sk_buff *skb, int offset, + struct kvec *vec, int len) +{ + unsigned int csum; + struct kvec_dst dst; + + csum = csum_partial(skb->data, offset, skb->csum); + + kvec_dst_init(&dst); + kvec_dst_set(&dst, vec->veclet); + kvec_dst_map(&dst); + skb_copy_and_csum_datagram_kvec_dst(skb, offset, &dst, len, &csum); + kvec_dst_unmap(&dst); + + if ((unsigned short)csum_fold(csum)) + return -EINVAL; + return 0; +} + +struct skb_async_info { + struct worktodo wtd; + struct sock *sk; + int len; + void (*finish)(struct sock *sk, kvec_cb_t cb, int len, struct sk_buff *skb); + kvec_cb_t cb; +}; +static void skb_async_read_worker(void *_data); + +int skb_kvec_recv_datagram(struct sock * sk, kvec_cb_t cb, int len, + void (*finish)(struct sock *sk, kvec_cb_t cb, int len, struct sk_buff *skb)) +{ + struct skb_async_info *info = kmalloc(sizeof(struct skb_async_info), GFP_KERNEL); + if (info) { + wtd_set_action(&info->wtd, skb_async_read_worker, info); + info->sk = sk; + info->len = len; + info->finish = finish; + info->cb = cb; + skb_async_read_worker(info); + return 0; + } + return -EAGAIN; +} + +static void skb_async_read_waiter(wait_queue_t *wait) +{ + struct skb_async_info *info = (void *)wait; + __remove_wait_queue(info->sk->sleep, &info->wtd.wait); + wtd_queue(&info->wtd); +} + +static void skb_async_read_worker(void *_data) +{ + struct skb_async_info *info = _data; + struct sock *sk = info->sk; + struct sk_buff *skb; + int error; + + /* Caller is allowed not to check sk->err before skb_recv_datagram() */ + error = sock_error(sk); + if (error) + goto no_packet; + + + init_waitqueue_func_entry(&info->wtd.wait, skb_async_read_waiter); + + /* Attempted to dequeue and process any skbs that already arrived. + * Note that add_wait_queue_cond is used to check against a race + * where an skb is added to the queue after we checked but before + * the callback is added to the wait queue. + */ + do { + skb = skb_dequeue(&sk->receive_queue); + if (skb) { + info->finish(sk, info->cb, info->len, skb); + kfree(info); + return; + } + } while ( add_wait_queue_cond( sk->sleep, &info->wtd.wait, + (!(error = sock_error(sk)) && + skb_queue_empty(&sk->receive_queue)) ) + && !error); + + if (!error) + return; + +no_packet: + info->cb.fn(info->cb.data, info->cb.vec, error); + kfree(info); + return; +} + +#if 0 +static void skb_async_read_worker(void *_data) +{ + struct skb_async_info *info = _data; + int error; + + /* Socket errors? */ + error = sock_error(sk); + if (error) + goto out_err; + + if (!skb_queue_empty(&sk->receive_queue)) + goto ready; + + /* Socket shut down? */ + if (sk->shutdown & RCV_SHUTDOWN) + goto out_noerr; + + /* Sequenced packets can come disconnected. If so we report the problem */ + error = -ENOTCONN; + if(connection_based(sk) && !(sk->state==TCP_ESTABLISHED || sk->state==TCP_LISTEN)) + goto out_err; + + /* handle signals */ + if (signal_pending(current)) + goto interrupted; + + /* here: queue sleep */ + *timeo_p = schedule_timeout(*timeo_p); + return; + +ready: + current->state = TASK_RUNNING; + remove_wait_queue(sk->sleep, &wait); + return 0; + +interrupted: + error = sock_intr_errno(*timeo_p); +out_err: + *err = error; +out: + current->state = TASK_RUNNING; + remove_wait_queue(sk->sleep, &wait); + return error; +out_noerr: + *err = 0; + error = 1; + goto out; +} +#endif diff -urNp --exclude CVS --exclude BitKeeper x-ref/net/core/sock.c x/net/core/sock.c --- x-ref/net/core/sock.c 2003-08-31 03:54:40.000000000 +0200 +++ x/net/core/sock.c 2003-08-31 03:56:03.000000000 +0200 @@ -587,6 +587,8 @@ struct sock *sk_alloc(int family, int pr if(sk && zero_it) { memset(sk, 0, sizeof(struct sock)); sk->family = family; + INIT_LIST_HEAD(&sk->kvec_read_list); + INIT_LIST_HEAD(&sk->kvec_write_list); sock_lock_init(sk); } @@ -1120,7 +1122,7 @@ ssize_t sock_no_sendpage(struct socket * void sock_def_wakeup(struct sock *sk) { read_lock(&sk->callback_lock); - if (sk->sleep && waitqueue_active(sk->sleep)) + if (sk->sleep) wake_up_interruptible_all(sk->sleep); read_unlock(&sk->callback_lock); } @@ -1128,7 +1130,7 @@ void sock_def_wakeup(struct sock *sk) void sock_def_error_report(struct sock *sk) { read_lock(&sk->callback_lock); - if (sk->sleep && waitqueue_active(sk->sleep)) + if (sk->sleep) wake_up_interruptible(sk->sleep); sk_wake_async(sk,0,POLL_ERR); read_unlock(&sk->callback_lock); @@ -1137,7 +1139,7 @@ void sock_def_error_report(struct sock * void sock_def_readable(struct sock *sk, int len) { read_lock(&sk->callback_lock); - if (sk->sleep && waitqueue_active(sk->sleep)) + if (sk->sleep) wake_up_interruptible(sk->sleep); sk_wake_async(sk,1,POLL_IN); read_unlock(&sk->callback_lock); @@ -1151,7 +1153,7 @@ void sock_def_write_space(struct sock *s * progress. --DaveM */ if((atomic_read(&sk->wmem_alloc) << 1) <= sk->sndbuf) { - if (sk->sleep && waitqueue_active(sk->sleep)) + if (sk->sleep) wake_up_interruptible(sk->sleep); /* Should agree with poll, otherwise some programs break */ diff -urNp --exclude CVS --exclude BitKeeper x-ref/net/ipv4/af_inet.c x/net/ipv4/af_inet.c --- x-ref/net/ipv4/af_inet.c 2003-08-31 03:54:38.000000000 +0200 +++ x/net/ipv4/af_inet.c 2003-08-31 03:55:44.000000000 +0200 @@ -727,6 +727,19 @@ static int inet_getname(struct socket *s } +int inet_kvec_read(struct socket *sock, kvec_cb_t cb, size_t len) +{ + struct sock *sk = sock->sk; + + return sk->prot->kvec_read(sk, cb, len); +} + +int inet_kvec_write(struct socket *sock, kvec_cb_t cb, size_t len) +{ + struct sock *sk = sock->sk; + + return sk->prot->kvec_write(sk, cb, len); +} int inet_recvmsg(struct socket *sock, struct msghdr *msg, int size, int flags, struct scm_cookie *scm) @@ -958,7 +971,9 @@ struct proto_ops inet_stream_ops = { sendmsg: inet_sendmsg, recvmsg: inet_recvmsg, mmap: sock_no_mmap, - sendpage: tcp_sendpage + sendpage: tcp_sendpage, + kvec_read: inet_kvec_read, + kvec_write: inet_kvec_write, }; struct proto_ops inet_dgram_ops = { @@ -980,6 +995,8 @@ struct proto_ops inet_dgram_ops = { recvmsg: inet_recvmsg, mmap: sock_no_mmap, sendpage: sock_no_sendpage, + kvec_read: inet_kvec_read, + kvec_write: inet_kvec_write, }; struct net_proto_family inet_family_ops = { diff -urNp --exclude CVS --exclude BitKeeper x-ref/net/ipv4/tcp.c x/net/ipv4/tcp.c --- x-ref/net/ipv4/tcp.c 2003-08-31 03:54:45.000000000 +0200 +++ x/net/ipv4/tcp.c 2003-08-31 03:55:44.000000000 +0200 @@ -252,6 +252,7 @@ #include #include #include +#include #include #include @@ -680,11 +681,267 @@ static int wait_for_tcp_connect(struct s return 0; } +struct tcp_write_async_info { + struct worktodo wtd; + struct sock *sk; + int len; + int done; + int offset; + struct kveclet *cur_let; + kvec_cb_t cb; + spinlock_t lock; +}; + +#define dprintk(x...) do { ; } while (0) +static void async_lock_sock_wait(wait_queue_t *wait) +{ + struct tcp_write_async_info *info = (void *)wait; + dprintk("async_lock_sock_wait(%p)\n", info); + if (!info->sk->lock.users) { + dprintk("async_lock_sock_wait: queuing\n"); + __remove_wait_queue(info->sk->sleep, &info->wtd.wait); + wtd_queue(&info->wtd); + } +} + +static void async_lock_sock(void *data) +{ + struct tcp_write_async_info *info = data; + struct sock *sk; + dprintk(KERN_DEBUG "async_lock_sock(%p)\n", info); + sk = info->sk; + spin_lock_bh(&sk->lock.slock); + if (sk->lock.users) { + dprintk(KERN_DEBUG "async_lock_sock: waiting\n"); + wtd_push(&info->wtd, async_lock_sock, info); + init_waitqueue_func_entry(&info->wtd.wait, async_lock_sock_wait); + if (!add_wait_queue_cond(sk->sleep, &info->wtd.wait, !sk->lock.users)) { + spin_unlock_bh(&sk->lock.slock); + return; + } + wtd_pop(&info->wtd); + } + dprintk(KERN_DEBUG "async_lock_sock: locking\n"); + sk->lock.users = 1; + spin_unlock_bh(&sk->lock.slock); + wtd_queue(&info->wtd); +} + +static void async_wait_for_tcp_connect(void *data); +int tcp_kvec_write(struct sock *sk, kvec_cb_t cb, int len) +{ + struct tcp_write_async_info *info; + info = kmalloc(sizeof(*info), GFP_KERNEL); + dprintk(KERN_DEBUG "tcp_kvec_write: %p\n", info); + if (!info) + return -ENOMEM; + wtd_init(&info->wtd, async_wait_for_tcp_connect); + info->sk = sk; + info->len = len; + info->done = 0; + info->offset = 0; + info->cur_let = cb.vec->veclet; + info->cb = cb; + spin_lock_init(&info->lock); + async_lock_sock(info); + return 0; +} + +static void async_cn_wait_task(void *data) +{ + struct tcp_write_async_info *info = (void *)data; + async_lock_sock(info); +} + +static void async_cn_wait(wait_queue_t *wait) +{ + struct tcp_write_async_info *info = (void *)wait; + __remove_wait_queue(info->sk->sleep, &info->wtd.wait); + wtd_set_action(&info->wtd, async_cn_wait_task, info); + wtd_queue(&info->wtd); +} + +/* sock_get_iocb + * Attempts to allocate a local socket iocb, which allows high + * performance for the common cases of a small number of ios + * outstanding per socket. + */ +struct sock_iocb *sock_get_iocb(struct sock *sk) +{ + struct sock_iocb *iocb; + + iocb = kmalloc(sizeof(*iocb), GFP_KERNEL); + return iocb; +} + +void sock_put_iocb(struct sock_iocb *iocb) +{ + kfree(iocb); +} + +/* tcp_kvec_read_kick + * Attempts to process an async read request. Must be called with + * the socket lock held. + */ +void tcp_kvec_read_kick(struct sock *sk, struct sock_iocb *iocb) +{ + TCP_CHECK_TIMER(sk); +#if 0 + if (unlikely(TCP_LISTEN == sk->state)) + goto out; +#endif + return; +} + +/* tcp_kvec_read + * Queues an async read request on a socket. If there were + & no outstanding read requests, kicks the backlog processing. + */ +int tcp_kvec_read(struct sock *sk, kvec_cb_t cb, int size) +{ + struct sock_iocb *iocb; + dprintk("tcp_kvec_read(%p, %d): blah", sk, size); + + iocb = sock_get_iocb(sk); + if (unlikely(NULL == iocb)) + return -ENOMEM; + + iocb->cb = cb; + kvec_dst_init(&iocb->dst); + + spin_lock_bh(&sk->lock.slock); + if (sk->lock.users != 0 || !list_empty(&sk->kvec_read_list)) { + list_add_tail(&iocb->list, &sk->kvec_read_list); + spin_unlock_bh(&sk->lock.slock); + return 0; + } + spin_unlock_bh(&sk->lock.slock); + + /* We're the head read request and now own the socket lock; + * attempt to kick off processing. + */ + tcp_kvec_read_kick(sk, iocb); + release_sock(sk); + return 0; +} + +static void tcp_kvec_write_worker(struct tcp_write_async_info *info); +static void async_wait_for_tcp_connect(void *data) +{ + struct tcp_write_async_info *info = data; + struct sock *sk = info->sk; + int err; + /* At this point the socket is locked for us. */ + while((1 << sk->state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT)) { + if (sk->err) { + err = sock_error(sk); + goto error; + } + if ((1 << sk->state) & + ~(TCPF_SYN_SENT | TCPF_SYN_RECV)) { + err = -EPIPE; + goto error; + } + + sk->tp_pinfo.af_tcp.write_pending++; + init_waitqueue_func_entry(&info->wtd.wait, async_cn_wait); + + /* Add our worker to the socket queue, but make sure the socket + * state isn't changed from when we checked while we do so. + */ + if (!add_wait_queue_cond(sk->sleep, &info->wtd.wait, + ((1 << sk->state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT)) + )) { + release_sock(sk); + return; + } + } + /* sk is now locked *and* the connection is established, let's + * proceed to the data transfer stage. + */ + tcp_kvec_write_worker(info); + return; + +error: + release_sock(sk); + info->cb.fn(info->cb.data, info->cb.vec, err); + kfree(info); +} + static inline int tcp_memory_free(struct sock *sk) { return sk->wmem_queued < sk->sndbuf; } +static void async_wait_for_tcp_memory(struct tcp_write_async_info *info); +static void async_wait_for_tcp_memory_done(void *data) +{ + struct tcp_write_async_info *info = data; + info->sk->tp_pinfo.af_tcp.write_pending--; + if (tcp_memory_free(info->sk)) + tcp_kvec_write_worker(info); + else + async_wait_for_tcp_memory(info); +} + +static void async_wait_for_tcp_memory_waiting(void *data) +{ + struct tcp_write_async_info *info = data; + wtd_set_action(&info->wtd, async_wait_for_tcp_memory_done, info); + async_lock_sock(info); +} + +static void async_wait_for_tcp_memory_wake(wait_queue_t *wait) +{ + struct tcp_write_async_info *info = (void *)wait; + __remove_wait_queue(info->sk->sleep, &info->wtd.wait); + wtd_set_action(&info->wtd, async_wait_for_tcp_memory_waiting, info); + wtd_queue(&info->wtd); +} + +static void async_wait_for_tcp_memory(struct tcp_write_async_info *info) +{ + struct sock *sk = info->sk; + ssize_t res; + kvec_cb_t cb; + int raced = 0; + + dprintk("async_wait_for_tcp_memory(%p)\n", info); + res = -EPIPE; + if (sk->err || (sk->shutdown & SEND_SHUTDOWN)) + goto err; + + if (tcp_memory_free(sk)) + dprintk("async_wait_for_tcp_memory: spinning?\n"); + + init_waitqueue_func_entry(&info->wtd.wait, async_wait_for_tcp_memory_wake); + clear_bit(SOCK_ASYNC_NOSPACE, &sk->socket->flags); + set_bit(SOCK_NOSPACE, &sk->socket->flags); + raced = add_wait_queue_cond( sk->sleep, &info->wtd.wait, + !(sk->err || (sk->shutdown & SEND_SHUTDOWN) || tcp_memory_free(sk)) ); + + sk->tp_pinfo.af_tcp.write_pending++; + if (raced) { + /* Requeue to be run here: this allows other tasks to + * get rescheduled in case of bugs + */ + wtd_set_action(&info->wtd, async_wait_for_tcp_memory_done, info); + wtd_queue(&info->wtd); + return; + } + + release_sock(sk); + return; + +err: + dprintk("async_wait_for_tcp_memory: err %ld\n", (long)res); + if (info->done) + res = info->done; + cb = info->cb; + kfree(info); + cb.fn(cb.data, cb.vec, res); +} + /* * Wait for more memory for a socket */ @@ -695,9 +952,17 @@ static int wait_for_tcp_memory(struct so long current_timeo = *timeo; DECLARE_WAITQUEUE(wait, current); + if (sk->err || (sk->shutdown & SEND_SHUTDOWN)) + return -EPIPE; + if (tcp_memory_free(sk)) current_timeo = vm_wait = (net_random()%(HZ/5))+2; + if (!*timeo) { + set_bit(SOCK_ASYNC_NOSPACE, &sk->socket->flags); + return -EAGAIN; + } + add_wait_queue(sk->sleep, &wait); for (;;) { set_bit(SOCK_ASYNC_NOSPACE, &sk->socket->flags); @@ -748,7 +1013,7 @@ do_interrupted: goto out; } -ssize_t do_tcp_sendpages(struct sock *sk, struct page **pages, int poffset, size_t psize, int flags); +ssize_t do_tcp_sendpages(struct sock *sk, struct kveclet *let, int poffset, size_t psize, int flags); static inline int can_coalesce(struct sk_buff *skb, int i, struct page *page, int off) @@ -827,7 +1092,7 @@ static int tcp_error(struct sock *sk, in return err; } -ssize_t do_tcp_sendpages(struct sock *sk, struct page **pages, int poffset, size_t psize, int flags) +ssize_t do_tcp_sendpages(struct sock *sk, struct kveclet *let, int poffset, size_t psize, int flags) { struct tcp_opt *tp = &(sk->tp_pinfo.af_tcp); int mss_now; @@ -854,9 +1119,14 @@ ssize_t do_tcp_sendpages(struct sock *sk int offset, size, copy, i; struct page *page; - page = pages[poffset/PAGE_SIZE]; - offset = poffset % PAGE_SIZE; - size = min_t(size_t, psize, PAGE_SIZE-offset); + while (poffset >= let->length) { + poffset -= let->length; + let++; + } + + page = let->page; + offset = let->offset + poffset; + size = min_t(unsigned int, psize, let->length); if (tp->send_head==NULL || (copy = mss_now - skb->len) <= 0) { new_segment: @@ -896,6 +1166,10 @@ new_segment: copied += copy; poffset += copy; + if (poffset >= let->length) { + poffset = 0; + let++; + } if (!(psize -= copy)) goto out; @@ -935,6 +1209,7 @@ out_err: ssize_t tcp_sendpage(struct socket *sock, struct page *page, int offset, size_t size, int flags) { + struct kveclet let = { page, offset, size }; ssize_t res; struct sock *sk = sock->sk; @@ -944,16 +1219,53 @@ ssize_t tcp_sendpage(struct socket *sock !(sk->route_caps & TCP_ZC_CSUM_FLAGS)) return sock_no_sendpage(sock, page, offset, size, flags); -#undef TCP_ZC_CSUM_FLAGS - lock_sock(sk); TCP_CHECK_TIMER(sk); - res = do_tcp_sendpages(sk, &page, offset, size, flags); + res = do_tcp_sendpages(sk, &let, 0, size, flags); TCP_CHECK_TIMER(sk); release_sock(sk); return res; } +static void tcp_kvec_write_worker(struct tcp_write_async_info *info) +{ + struct sock *sk = info->sk; + int res; + if (!(sk->route_caps & NETIF_F_SG) || + !(sk->route_caps & TCP_ZC_CSUM_FLAGS)) + BUG(); + + res = do_tcp_sendpages(sk, info->cur_let, info->offset, info->len - info->done, MSG_DONTWAIT); + if (res > 0) + info->done += res; + + if (res == -EAGAIN) { + dprintk("tcp_kvec_write_worker: -EAGAIN: queuing\n"); + goto requeue; + } + + while (res > info->cur_let->length) { + res -= info->cur_let->length; + info->cur_let++; + } + + if (res <= 0 || (info->done >= info->len)) { + kvec_cb_t cb = info->cb; + dprintk("tcp_kvec_write_worker: error(%d)\n", res); + if (info->done) + res = info->done; + release_sock(sk); + kfree(info); + cb.fn(cb.data, cb.vec, res); + return; + } + +requeue: + async_wait_for_tcp_memory(info); +} + +#undef TCP_ZC_CSUM_FLAGS + #define TCP_PAGE(sk) (sk->tp_pinfo.af_tcp.sndmsg_page) #define TCP_OFF(sk) (sk->tp_pinfo.af_tcp.sndmsg_off) diff -urNp --exclude CVS --exclude BitKeeper x-ref/net/ipv4/tcp_ipv4.c x/net/ipv4/tcp_ipv4.c --- x-ref/net/ipv4/tcp_ipv4.c 2003-08-31 03:54:38.000000000 +0200 +++ x/net/ipv4/tcp_ipv4.c 2003-08-31 03:55:44.000000000 +0200 @@ -2201,6 +2201,8 @@ struct proto tcp_prot = { hash: tcp_v4_hash, unhash: tcp_unhash, get_port: tcp_v4_get_port, + kvec_read: tcp_kvec_read, + kvec_write: tcp_kvec_write, }; diff -urNp --exclude CVS --exclude BitKeeper x-ref/net/ipv4/udp.c x/net/ipv4/udp.c --- x-ref/net/ipv4/udp.c 2003-08-31 03:54:38.000000000 +0200 +++ x/net/ipv4/udp.c 2003-08-31 03:55:44.000000000 +0200 @@ -634,6 +634,78 @@ static __inline__ int udp_checksum_compl __udp_checksum_complete(skb); } +void udp_kvec_read_finish(struct sock *sk, kvec_cb_t cb, int len, struct sk_buff *skb) +{ + struct sockaddr_in *sin = NULL; + int msg_flags = 0; + int copied, err; + + if (!skb) + BUG(); + + copied = skb->len - sizeof(struct udphdr); + if (copied > len) { + copied = len; + msg_flags |= MSG_TRUNC; + } + + err = 0; + + if (skb->ip_summed==CHECKSUM_UNNECESSARY) { + skb_copy_datagram_kvec(skb, sizeof(struct udphdr), + cb.vec, copied); + } else if (msg_flags&MSG_TRUNC) { + err = -EAGAIN; + if (unlikely(__udp_checksum_complete(skb))) { + UDP_INC_STATS_BH(UdpInErrors); + goto out_free; + } + err = 0; + skb_copy_datagram_kvec(skb, sizeof(struct udphdr), + cb.vec, copied); + } else { + err = skb_copy_and_csum_datagram_kvec(skb, + sizeof(struct udphdr), cb.vec, copied); + } + + if (err) + goto out_free; + +#if 0 + sock_recv_timestamp(msg, sk, skb); +#endif + + /* Copy the address. */ + if (sin) + { + sin->sin_family = AF_INET; + sin->sin_port = skb->h.uh->source; + sin->sin_addr.s_addr = skb->nh.iph->saddr; + memset(sin->sin_zero, 0, sizeof(sin->sin_zero)); + } +#if 0 + if (sk->protinfo.af_inet.cmsg_flags) + ip_cmsg_recv(msg, skb); +#endif + err = copied; + +out_free: + skb_free_datagram(sk, skb); + cb.fn(cb.data, cb.vec, err); + return; +} + +static int udp_kvec_read(struct sock *sk, kvec_cb_t cb, int len) +{ + return skb_kvec_recv_datagram(sk, cb, len, udp_kvec_read_finish); +} + +static int udp_kvec_write(struct sock *sk, kvec_cb_t cb, int len) +{ + return -EINVAL; /* TODO: someone please write ;-) */ +} + + /* * This should be easy, if there is something there we * return it, otherwise we block. @@ -1051,6 +1123,8 @@ struct proto udp_prot = { getsockopt: ip_getsockopt, sendmsg: udp_sendmsg, recvmsg: udp_recvmsg, + kvec_read: udp_kvec_read, + kvec_write: udp_kvec_write, backlog_rcv: udp_queue_rcv_skb, hash: udp_v4_hash, unhash: udp_v4_unhash, diff -urNp --exclude CVS --exclude BitKeeper x-ref/net/socket.c x/net/socket.c --- x-ref/net/socket.c 2003-08-31 03:54:45.000000000 +0200 +++ x/net/socket.c 2003-08-31 03:55:44.000000000 +0200 @@ -44,6 +44,7 @@ * Tigran Aivazian : sys_send(args) calls sys_sendto(args, NULL, 0) * Tigran Aivazian : Made listen(2) backlog sanity checks * protocol-independent + * Benjamin LaHaise: real aio support. * * * This program is free software; you can redistribute it and/or @@ -105,6 +106,8 @@ static ssize_t sock_writev(struct file * unsigned long count, loff_t *ppos); static ssize_t sock_sendpage(struct file *file, struct page *page, int offset, size_t size, loff_t *ppos, int more); +static int sock_kvec_read(struct file *file, kvec_cb_t cb, size_t size, loff_t pos); +static int sock_kvec_write(struct file *file, kvec_cb_t cb, size_t size, loff_t pos); /* @@ -124,7 +127,11 @@ static struct file_operations socket_fil fasync: sock_fasync, readv: sock_readv, writev: sock_writev, - sendpage: sock_sendpage + sendpage: sock_sendpage, + aio_read: generic_sock_aio_read, + aio_write: generic_file_aio_write, + kvec_read: sock_kvec_read, + kvec_write: sock_kvec_write, }; /* @@ -544,13 +551,14 @@ int sock_recvmsg(struct socket *sock, st static ssize_t sock_read(struct file *file, char *ubuf, size_t size, loff_t *ppos) { + int read_flags = 0; struct socket *sock; struct iovec iov; struct msghdr msg; int flags; - if (ppos != &file->f_pos) - return -ESPIPE; + if (read_flags & ~F_ATOMIC) + return -EINVAL; if (size==0) /* Match SYS5 behaviour */ return 0; @@ -565,6 +573,8 @@ static ssize_t sock_read(struct file *fi iov.iov_base=ubuf; iov.iov_len=size; flags = !(file->f_flags & O_NONBLOCK) ? 0 : MSG_DONTWAIT; + if (read_flags & F_ATOMIC) + flags |= MSG_DONTWAIT; return sock_recvmsg(sock, &msg, size, flags); } @@ -578,12 +588,13 @@ static ssize_t sock_read(struct file *fi static ssize_t sock_write(struct file *file, const char *ubuf, size_t size, loff_t *ppos) { + int flags = 0; struct socket *sock; struct msghdr msg; struct iovec iov; - - if (ppos != &file->f_pos) - return -ESPIPE; + + if (flags & ~F_ATOMIC) + return -EINVAL; if(size==0) /* Match SYS5 behaviour */ return 0; @@ -596,6 +607,8 @@ static ssize_t sock_write(struct file *f msg.msg_control=NULL; msg.msg_controllen=0; msg.msg_flags=!(file->f_flags & O_NONBLOCK) ? 0 : MSG_DONTWAIT; + if (flags & F_ATOMIC) + msg.msg_flags = MSG_DONTWAIT; if (sock->type == SOCK_SEQPACKET) msg.msg_flags |= MSG_EOR; iov.iov_base=(void *)ubuf; @@ -622,6 +635,29 @@ ssize_t sock_sendpage(struct file *file, return sock->ops->sendpage(sock, page, offset, size, flags); } +static int sock_kvec_read(struct file *file, kvec_cb_t cb, size_t size, loff_t pos) +{ + struct socket *sock; + sock = socki_lookup(file->f_dentry->d_inode); + if ((int)size < 0 || (size_t)(int)size != size) + return -EINVAL; + if (sock->ops->kvec_read) + return sock->ops->kvec_read(sock, cb, size); + return -EOPNOTSUPP; +} + +static int sock_kvec_write(struct file *file, kvec_cb_t cb, size_t size, loff_t pos) +{ + struct socket *sock; + sock = socki_lookup(file->f_dentry->d_inode); + if ((int)size < 0 || (size_t)(int)size != size) + return -EINVAL; + if (sock->ops->kvec_write) + return sock->ops->kvec_write(sock, cb, size); + return -EOPNOTSUPP; +} + + int sock_readv_writev(int type, struct inode * inode, struct file * file, const struct iovec * iov, long count, long size) {