diff options
author | Jens Axboe <axboe@kernel.dk> | 2024-03-21 19:38:10 -0600 |
---|---|---|
committer | Jens Axboe <axboe@kernel.dk> | 2024-03-21 19:40:18 -0600 |
commit | 3b0765e207447892cf2f543046935b77b1a38f3d (patch) | |
tree | d389b2a1bc90b3ccbf32974e9209fa1e8b210891 | |
parent | d04d3300211a7eff32c526a31cfbb7f9389143ad (diff) | |
download | liburing-3b0765e207447892cf2f543046935b77b1a38f3d.tar.gz |
examples/proxy: add zerocopy send support
This works with both -M1 (sendmsg) and -M0 (send), though at least
the non-sendmsg variant seems pretty much slower than expected.
Need to look into that!
Very lightly tested only...
Signed-off-by: Jens Axboe <axboe@kernel.dk>
-rw-r--r-- | examples/proxy.c | 91 |
1 files changed, 79 insertions, 12 deletions
diff --git a/examples/proxy.c b/examples/proxy.c index 0c022909..b814d409 100644 --- a/examples/proxy.c +++ b/examples/proxy.c @@ -90,6 +90,7 @@ static int wait_batch = 1; static int wait_usec = 1000000; static int rcv_msg; static int snd_msg; +static int snd_zc; static int send_ring = -1; static int snd_bundle; static int rcv_bundle; @@ -407,6 +408,33 @@ static int setup_send_ring(struct io_uring *ring, struct conn *c) return 0; } +static int setup_send_zc(struct io_uring *ring, struct conn *c) +{ + struct iovec *iovs; + void *buf; + int i, ret; + + if (snd_msg) + return 0; + + buf = c->in_br.buf; + iovs = calloc(nr_bufs, sizeof(struct iovec)); + for (i = 0; i < nr_bufs; i++) { + iovs[i].iov_base = buf; + iovs[i].iov_len = buf_size; + buf += buf_size; + } + + ret = io_uring_register_buffers(ring, iovs, nr_bufs); + if (ret) { + fprintf(stderr, "failed registering buffers: %d\n", ret); + free(iovs); + return ret; + } + free(iovs); + return 0; +} + /* * Setup an input and output buffer ring. */ @@ -424,6 +452,11 @@ static int setup_buffer_rings(struct io_uring *ring, struct conn *c) return ret; if (is_sink) return 0; + if (snd_zc) { + ret = setup_send_zc(ring, c); + if (ret) + return ret; + } if (send_ring) { ret = setup_send_ring(ring, c); if (ret) { @@ -674,9 +707,12 @@ static void recv_enobufs(struct io_uring *ring, struct conn *c, * needing a rearm for receive and send. The completing send will * kick the recv rearm. */ - if (!is_sink) - prep_next_send(ring, c, cd, fd); - __submit_receive(ring, c, &c->cd[0], c->in_fd); + if (!is_sink) { + if (!cd->pending_send) + prep_next_send(ring, c, cd, fd); + } else { + __submit_receive(ring, c, &c->cd[0], c->in_fd); + } } /* @@ -1279,11 +1315,18 @@ static void submit_send(struct io_uring *ring, struct conn *c, if (snd_msg) { struct io_msg *imsg = &cd->io_snd_msg; - io_uring_prep_sendmsg(sqe, fd, &imsg->msg, MSG_WAITALL|MSG_NOSIGNAL); + if (snd_zc) + io_uring_prep_sendmsg_zc(sqe, fd, &imsg->msg, MSG_WAITALL|MSG_NOSIGNAL); + else + io_uring_prep_sendmsg(sqe, fd, &imsg->msg, MSG_WAITALL|MSG_NOSIGNAL); } else if (send_ring) { io_uring_prep_send(sqe, fd, NULL, 0, MSG_WAITALL|MSG_NOSIGNAL); - } else { + } else if (!snd_zc) { io_uring_prep_send(sqe, fd, data, len, MSG_WAITALL|MSG_NOSIGNAL); + } else { + io_uring_prep_send_zc(sqe, fd, data, len, MSG_WAITALL, 0); + sqe->ioprio |= IORING_RECVSEND_FIXED_BUF; + sqe->buf_index = bid; } encode_userdata(sqe, c, __SEND, bid, fd); if (fixed_files) @@ -1457,6 +1500,9 @@ static int __handle_send(struct io_uring *ring, struct conn *c, bid = cqe_to_bid(cqe); } + if (cqe->flags & IORING_CQE_F_NOTIF) + goto out; + if (cqe->res && cqe->res < buf_size) cd->snd_shrt++; @@ -1486,15 +1532,28 @@ static int __handle_send(struct io_uring *ring, struct conn *c, assert(cd->out_buffers >= 0); cd->snd++; +out: + if (!(cqe->flags & IORING_CQE_F_MORE)) { + int do_recv_arm = 1; - ocd = &c->cd[!cd->index]; - if (!ocd->pending_recv) { - int fd = other_dir_fd(c, cqe_to_fd(cqe)); + /* + * If we're serializing sends, finish the batch before + * arming a new receive. + */ + if (!snd_msg) { + struct io_msg *imsg = &cd->io_snd_msg; + struct msg_vec *mvec = msg_vec(imsg); - __submit_receive(ring, c, ocd, fd); - } + if (mvec->cur_iov + 1 != mvec->iov_len) + do_recv_arm = 0; + } + ocd = &c->cd[!cd->index]; + if (do_recv_arm && !ocd->pending_recv) { + int fd = other_dir_fd(c, cqe_to_fd(cqe)); + + __submit_receive(ring, c, ocd, fd); + } - if (!(cqe->flags & IORING_CQE_F_MORE)) { cd->pending_send = 0; /* @@ -2137,6 +2196,7 @@ static void usage(const char *name) printf("\t-n:\t\tNumber of provided buffers (pow2) (%d)\n", nr_bufs); printf("\t-u:\t\tUse provided buffers for send (%d)\n", send_ring); printf("\t-C:\t\tUse bundles for send (%d)\n", snd_bundle); + printf("\t-z:\t\tUse zerocopy send (%d)\n", snd_zc); printf("\t-c:\t\tUse bundles for recv (%d)\n", snd_bundle); printf("\t-M:\t\tUse sendmsg (%d)\n", snd_msg); printf("\t-M:\t\tUse recvmsg (%d)\n", rcv_msg); @@ -2164,7 +2224,7 @@ int main(int argc, char *argv[]) pthread_mutex_init(&thread_lock, NULL); - optstring = "m:d:S:s:b:f:H:r:p:n:B:N:T:w:t:M:R:u:c:C:q:a:x:6Vh?"; + optstring = "m:d:S:s:b:f:H:r:p:n:B:N:T:w:t:M:R:u:c:C:q:a:x:z:6Vh?"; while ((opt = getopt(argc, argv, optstring)) != -1) { switch (opt) { case 'm': @@ -2227,6 +2287,9 @@ int main(int argc, char *argv[]) case 'M': snd_msg = !!atoi(optarg); break; + case 'z': + snd_zc = !!atoi(optarg); + break; case 'R': rcv_msg = !!atoi(optarg); break; @@ -2269,6 +2332,10 @@ int main(int argc, char *argv[]) fprintf(stderr, "Can't use send ring sendmsg\n"); snd_msg = 0; } + if (snd_zc && (send_ring || snd_bundle)) { + fprintf(stderr, "Can't use send zc with bundles or ring\n"); + send_ring = snd_bundle = 0; + } /* * For recvmsg w/multishot, we waste some data at the head of the * packet every time. Adjust the buffer size to account for that, |