aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJens Axboe <axboe@kernel.dk>2024-03-21 19:38:10 -0600
committerJens Axboe <axboe@kernel.dk>2024-03-21 19:40:18 -0600
commit3b0765e207447892cf2f543046935b77b1a38f3d (patch)
treed389b2a1bc90b3ccbf32974e9209fa1e8b210891
parentd04d3300211a7eff32c526a31cfbb7f9389143ad (diff)
downloadliburing-3b0765e207447892cf2f543046935b77b1a38f3d.tar.gz
examples/proxy: add zerocopy send support
This works with both -M1 (sendmsg) and -M0 (send), though at least the non-sendmsg variant seems pretty much slower than expected. Need to look into that! Very lightly tested only... Signed-off-by: Jens Axboe <axboe@kernel.dk>
-rw-r--r--examples/proxy.c91
1 files changed, 79 insertions, 12 deletions
diff --git a/examples/proxy.c b/examples/proxy.c
index 0c022909..b814d409 100644
--- a/examples/proxy.c
+++ b/examples/proxy.c
@@ -90,6 +90,7 @@ static int wait_batch = 1;
static int wait_usec = 1000000;
static int rcv_msg;
static int snd_msg;
+static int snd_zc;
static int send_ring = -1;
static int snd_bundle;
static int rcv_bundle;
@@ -407,6 +408,33 @@ static int setup_send_ring(struct io_uring *ring, struct conn *c)
return 0;
}
+static int setup_send_zc(struct io_uring *ring, struct conn *c)
+{
+ struct iovec *iovs;
+ void *buf;
+ int i, ret;
+
+ if (snd_msg)
+ return 0;
+
+ buf = c->in_br.buf;
+ iovs = calloc(nr_bufs, sizeof(struct iovec));
+ for (i = 0; i < nr_bufs; i++) {
+ iovs[i].iov_base = buf;
+ iovs[i].iov_len = buf_size;
+ buf += buf_size;
+ }
+
+ ret = io_uring_register_buffers(ring, iovs, nr_bufs);
+ if (ret) {
+ fprintf(stderr, "failed registering buffers: %d\n", ret);
+ free(iovs);
+ return ret;
+ }
+ free(iovs);
+ return 0;
+}
+
/*
* Setup an input and output buffer ring.
*/
@@ -424,6 +452,11 @@ static int setup_buffer_rings(struct io_uring *ring, struct conn *c)
return ret;
if (is_sink)
return 0;
+ if (snd_zc) {
+ ret = setup_send_zc(ring, c);
+ if (ret)
+ return ret;
+ }
if (send_ring) {
ret = setup_send_ring(ring, c);
if (ret) {
@@ -674,9 +707,12 @@ static void recv_enobufs(struct io_uring *ring, struct conn *c,
* needing a rearm for receive and send. The completing send will
* kick the recv rearm.
*/
- if (!is_sink)
- prep_next_send(ring, c, cd, fd);
- __submit_receive(ring, c, &c->cd[0], c->in_fd);
+ if (!is_sink) {
+ if (!cd->pending_send)
+ prep_next_send(ring, c, cd, fd);
+ } else {
+ __submit_receive(ring, c, &c->cd[0], c->in_fd);
+ }
}
/*
@@ -1279,11 +1315,18 @@ static void submit_send(struct io_uring *ring, struct conn *c,
if (snd_msg) {
struct io_msg *imsg = &cd->io_snd_msg;
- io_uring_prep_sendmsg(sqe, fd, &imsg->msg, MSG_WAITALL|MSG_NOSIGNAL);
+ if (snd_zc)
+ io_uring_prep_sendmsg_zc(sqe, fd, &imsg->msg, MSG_WAITALL|MSG_NOSIGNAL);
+ else
+ io_uring_prep_sendmsg(sqe, fd, &imsg->msg, MSG_WAITALL|MSG_NOSIGNAL);
} else if (send_ring) {
io_uring_prep_send(sqe, fd, NULL, 0, MSG_WAITALL|MSG_NOSIGNAL);
- } else {
+ } else if (!snd_zc) {
io_uring_prep_send(sqe, fd, data, len, MSG_WAITALL|MSG_NOSIGNAL);
+ } else {
+ io_uring_prep_send_zc(sqe, fd, data, len, MSG_WAITALL, 0);
+ sqe->ioprio |= IORING_RECVSEND_FIXED_BUF;
+ sqe->buf_index = bid;
}
encode_userdata(sqe, c, __SEND, bid, fd);
if (fixed_files)
@@ -1457,6 +1500,9 @@ static int __handle_send(struct io_uring *ring, struct conn *c,
bid = cqe_to_bid(cqe);
}
+ if (cqe->flags & IORING_CQE_F_NOTIF)
+ goto out;
+
if (cqe->res && cqe->res < buf_size)
cd->snd_shrt++;
@@ -1486,15 +1532,28 @@ static int __handle_send(struct io_uring *ring, struct conn *c,
assert(cd->out_buffers >= 0);
cd->snd++;
+out:
+ if (!(cqe->flags & IORING_CQE_F_MORE)) {
+ int do_recv_arm = 1;
- ocd = &c->cd[!cd->index];
- if (!ocd->pending_recv) {
- int fd = other_dir_fd(c, cqe_to_fd(cqe));
+ /*
+ * If we're serializing sends, finish the batch before
+ * arming a new receive.
+ */
+ if (!snd_msg) {
+ struct io_msg *imsg = &cd->io_snd_msg;
+ struct msg_vec *mvec = msg_vec(imsg);
- __submit_receive(ring, c, ocd, fd);
- }
+ if (mvec->cur_iov + 1 != mvec->iov_len)
+ do_recv_arm = 0;
+ }
+ ocd = &c->cd[!cd->index];
+ if (do_recv_arm && !ocd->pending_recv) {
+ int fd = other_dir_fd(c, cqe_to_fd(cqe));
+
+ __submit_receive(ring, c, ocd, fd);
+ }
- if (!(cqe->flags & IORING_CQE_F_MORE)) {
cd->pending_send = 0;
/*
@@ -2137,6 +2196,7 @@ static void usage(const char *name)
printf("\t-n:\t\tNumber of provided buffers (pow2) (%d)\n", nr_bufs);
printf("\t-u:\t\tUse provided buffers for send (%d)\n", send_ring);
printf("\t-C:\t\tUse bundles for send (%d)\n", snd_bundle);
+ printf("\t-z:\t\tUse zerocopy send (%d)\n", snd_zc);
printf("\t-c:\t\tUse bundles for recv (%d)\n", snd_bundle);
printf("\t-M:\t\tUse sendmsg (%d)\n", snd_msg);
printf("\t-M:\t\tUse recvmsg (%d)\n", rcv_msg);
@@ -2164,7 +2224,7 @@ int main(int argc, char *argv[])
pthread_mutex_init(&thread_lock, NULL);
- optstring = "m:d:S:s:b:f:H:r:p:n:B:N:T:w:t:M:R:u:c:C:q:a:x:6Vh?";
+ optstring = "m:d:S:s:b:f:H:r:p:n:B:N:T:w:t:M:R:u:c:C:q:a:x:z:6Vh?";
while ((opt = getopt(argc, argv, optstring)) != -1) {
switch (opt) {
case 'm':
@@ -2227,6 +2287,9 @@ int main(int argc, char *argv[])
case 'M':
snd_msg = !!atoi(optarg);
break;
+ case 'z':
+ snd_zc = !!atoi(optarg);
+ break;
case 'R':
rcv_msg = !!atoi(optarg);
break;
@@ -2269,6 +2332,10 @@ int main(int argc, char *argv[])
fprintf(stderr, "Can't use send ring sendmsg\n");
snd_msg = 0;
}
+ if (snd_zc && (send_ring || snd_bundle)) {
+ fprintf(stderr, "Can't use send zc with bundles or ring\n");
+ send_ring = snd_bundle = 0;
+ }
/*
* For recvmsg w/multishot, we waste some data at the head of the
* packet every time. Adjust the buffer size to account for that,