aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJens Axboe <axboe@kernel.dk>2024-03-21 18:41:13 -0600
committerJens Axboe <axboe@kernel.dk>2024-03-21 18:41:13 -0600
commitd04d3300211a7eff32c526a31cfbb7f9389143ad (patch)
treea6800202fff7f782e34fc905c68611051fa47099
parent27a21b0fe3ce761c3634ff24f32f231a91466bb9 (diff)
downloadliburing-d04d3300211a7eff32c526a31cfbb7f9389143ad.tar.gz
examples/proxy: handle each connection in a seperate thread
This means the main thread is just responsible for house keeping, like accepting new connections and printing stats. It also means that each thread will get its own ring, which is closer to how you would do this in the real world. Signed-off-by: Jens Axboe <axboe@kernel.dk>
-rw-r--r--examples/proxy.c751
-rw-r--r--examples/proxy.h38
2 files changed, 507 insertions, 282 deletions
diff --git a/examples/proxy.c b/examples/proxy.c
index 5181d62e..0c022909 100644
--- a/examples/proxy.c
+++ b/examples/proxy.c
@@ -36,6 +36,7 @@
#include <fcntl.h>
#include <stdint.h>
#include <netinet/in.h>
+#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
@@ -43,7 +44,11 @@
#include <sys/socket.h>
#include <sys/time.h>
#include <unistd.h>
+#include <sys/mman.h>
+#include <linux/mman.h>
+#include <locale.h>
#include <assert.h>
+#include <pthread.h>
#include <liburing.h>
#include "proxy.h"
@@ -73,7 +78,7 @@ static int sqpoll;
static int defer_tw = 1;
static int is_sink;
static int fixed_files = 1;
-static char *host = "192.168.2.6";
+static char *host = "192.168.3.2";
static int send_port = 4445;
static int receive_port = 4444;
static int buf_size = 32;
@@ -97,6 +102,9 @@ static int br_mask;
static int ring_size = 128;
+static pthread_mutex_t thread_lock;
+static struct timeval last_housekeeping;
+
/*
* For sendmsg/recvmsg. recvmsg just has a single vec, sendmsg will have
* two vecs - one that is currently submitted and being sent, and one that
@@ -139,9 +147,6 @@ struct conn_dir {
int rcv, rcv_shrt, rcv_enobufs, rcv_mshot;
int snd, snd_shrt, snd_enobufs, snd_busy, snd_mshot;
- int rcv_need_rearm;
- int rcv_rearm;
-
int snd_next_bid;
int rcv_next_bid;
@@ -158,11 +163,13 @@ struct conn_dir {
};
enum {
- CONN_F_DISCONNECTING = 1,
- CONN_F_DISCONNECTED = 2,
- CONN_F_PENDING_SHUTDOWN = 4,
- CONN_F_STATS_SHOWN = 8,
- CONN_F_END_TIME = 16,
+ CONN_F_STARTED = 1,
+ CONN_F_DISCONNECTING = 2,
+ CONN_F_DISCONNECTED = 4,
+ CONN_F_PENDING_SHUTDOWN = 8,
+ CONN_F_STATS_SHOWN = 16,
+ CONN_F_END_TIME = 32,
+ CONN_F_REAPED = 64,
};
/*
@@ -175,6 +182,8 @@ struct conn_buf_ring {
};
struct conn {
+ struct io_uring ring;
+
/* receive side buffer ring, new data arrives here */
struct conn_buf_ring in_br;
/* if send_ring is used, outgoing data to send */
@@ -193,18 +202,22 @@ struct conn {
struct sockaddr_in addr;
struct sockaddr_in6 addr6;
};
+
+ pthread_t thread;
+ pthread_barrier_t startup_barrier;
};
#define MAX_CONNS 1024
static struct conn conns[MAX_CONNS];
-#define vlog(str, ...) do { \
+#define vlog(str, ...) do { \
if (verbose) \
printf(str, ##__VA_ARGS__); \
} while (0)
static void prep_next_send(struct io_uring *ring, struct conn *c,
struct conn_dir *cd, int fd);
+static void *thread_main(void *data);
static struct conn *cqe_to_conn(struct io_uring_cqe *cqe)
{
@@ -255,6 +268,9 @@ enum {
__SHUTDOWN = 8,
__CANCEL = 9,
__CLOSE = 10,
+ __FD_PASS = 11,
+ __NOP = 12,
+ __STOP = 13,
};
struct error_handler {
@@ -307,7 +323,10 @@ static void free_buffer_ring(struct io_uring *ring, struct conn_buf_ring *cbr)
io_uring_free_buf_ring(ring, cbr->br, nr_bufs, cbr->bgid);
cbr->br = NULL;
- free(cbr->buf);
+ if (use_huge)
+ munmap(cbr->buf, buf_size * nr_bufs);
+ else
+ free(cbr->buf);
}
static void free_buffer_rings(struct io_uring *ring, struct conn *c)
@@ -332,15 +351,24 @@ static int setup_recv_ring(struct io_uring *ring, struct conn *c)
{
struct conn_buf_ring *cbr = &c->in_br;
int ret, i;
+ size_t len;
void *ptr;
- cbr->buf = NULL;
-
- if (posix_memalign(&cbr->buf, page_size, buf_size * nr_bufs)) {
- perror("posix memalign");
- return 1;
+ len = buf_size * nr_bufs;
+ if (use_huge) {
+ cbr->buf = mmap(NULL, len, PROT_READ|PROT_WRITE,
+ MAP_PRIVATE|MAP_HUGETLB|MAP_HUGE_2MB|MAP_ANONYMOUS,
+ -1, 0);
+ if (cbr->buf == MAP_FAILED) {
+ perror("mmap");
+ return 1;
+ }
+ } else {
+ if (posix_memalign(&cbr->buf, page_size, len)) {
+ perror("posix memalign");
+ return 1;
+ }
}
-
cbr->br = io_uring_setup_buf_ring(ring, nr_bufs, cbr->bgid, 0, &ret);
if (!cbr->br) {
fprintf(stderr, "Buffer ring register failed %d\n", ret);
@@ -380,12 +408,13 @@ static int setup_send_ring(struct io_uring *ring, struct conn *c)
}
/*
- * Setup an input and output buffer ring
+ * Setup an input and output buffer ring.
*/
static int setup_buffer_rings(struct io_uring *ring, struct conn *c)
{
int ret;
+ /* no locking needed on cur_bgid, parent serializes setup */
c->in_br.bgid = cur_bgid++;
c->out_br.bgid = cur_bgid++;
c->out_br.br = NULL;
@@ -393,13 +422,14 @@ static int setup_buffer_rings(struct io_uring *ring, struct conn *c)
ret = setup_recv_ring(ring, c);
if (ret)
return ret;
- if (is_sink || !send_ring)
+ if (is_sink)
return 0;
-
- ret = setup_send_ring(ring, c);
- if (ret) {
- free_buffer_ring(ring, &c->in_br);
- return ret;
+ if (send_ring) {
+ ret = setup_send_ring(ring, c);
+ if (ret) {
+ free_buffer_ring(ring, &c->in_br);
+ return ret;
+ }
}
return 0;
@@ -424,10 +454,13 @@ static void show_buckets(struct conn_dir *cd)
static void __show_stats(struct conn *c)
{
unsigned long msec, qps;
+ unsigned long bytes, bw;
struct conn_dir *cd;
int i;
- if (c->flags & CONN_F_STATS_SHOWN)
+ if (c->flags & (CONN_F_STATS_SHOWN | CONN_F_REAPED))
+ return;
+ if (!(c->flags & CONN_F_STARTED))
return;
if (!(c->flags & CONN_F_END_TIME))
@@ -449,12 +482,16 @@ static void __show_stats(struct conn *c)
printf("Conn %d/(in_fd=%d, out_fd=%d): qps=%lu, msec=%lu\n", c->tid,
c->in_fd, c->out_fd, qps, msec);
+ bytes = 0;
for (i = 0; i < 2; i++) {
cd = &c->cd[i];
if (!cd->in_bytes && !cd->out_bytes && !cd->snd && !cd->rcv)
continue;
+ bytes += cd->in_bytes;
+ bytes += cd->out_bytes;
+
printf("\t%3d: rcv=%u (short=%u, enobufs=%d), snd=%u (short=%u,"
" busy=%u, enobufs=%d)\n", i, cd->rcv, cd->rcv_shrt,
cd->rcv_enobufs, cd->snd, cd->snd_shrt, cd->snd_busy,
@@ -467,6 +504,12 @@ static void __show_stats(struct conn *c)
show_buckets(cd);
}
+ if (msec) {
+ bytes *= 8UL;
+ bw = bytes / 1000;
+ bw /= msec;
+ printf("\tBW=%'luMbit\n", bw);
+ }
c->flags |= CONN_F_STATS_SHOWN;
}
@@ -631,12 +674,9 @@ static void recv_enobufs(struct io_uring *ring, struct conn *c,
* needing a rearm for receive and send. The completing send will
* kick the recv rearm.
*/
- if (!is_sink) {
- cd->rcv_need_rearm = 1;
+ if (!is_sink)
prep_next_send(ring, c, cd, fd);
- } else {
- cd->rcv_rearm = 1;
- }
+ __submit_receive(ring, c, &c->cd[0], c->in_fd);
}
/*
@@ -724,11 +764,21 @@ static bool should_shutdown(struct conn *c)
return true;
}
+/*
+ * Close this connection - send a ring message to the connection with intent
+ * to stop. When the client gets the message, it will initiate the stop.
+ */
static void __close_conn(struct io_uring *ring, struct conn *c)
{
- printf("Client %d: queueing shutdown\n", c->tid);
+ struct io_uring_sqe *sqe;
+ uint64_t user_data;
+
+ printf("Client %d: queueing stop\n", c->tid);
- queue_cancel(ring, c);
+ user_data = __raw_encode(c->tid, __STOP, 0, 0);
+ sqe = io_uring_get_sqe(ring);
+ io_uring_prep_msg_ring(sqe, c->ring.ring_fd, 0, user_data, 0);
+ encode_userdata(sqe, c, __NOP, 0, 0);
io_uring_submit(ring);
}
@@ -819,20 +869,19 @@ static void free_msgs(struct conn_dir *cd)
*/
static int handle_accept(struct io_uring *ring, struct io_uring_cqe *cqe)
{
- struct io_uring_sqe *sqe;
struct conn *c;
- int domain, i;
+ int i;
if (nr_conns == MAX_CONNS) {
fprintf(stderr, "max clients reached %d\n", nr_conns);
return 1;
}
+ /* main thread handles this, which is obviously serialized */
c = &conns[nr_conns];
c->tid = nr_conns++;
- c->in_fd = cqe->res;
+ c->in_fd = -1;
c->out_fd = -1;
- gettimeofday(&c->start_time, NULL);
for (i = 0; i < 2; i++) {
struct conn_dir *cd = &c->cd[i];
@@ -848,55 +897,33 @@ static int handle_accept(struct io_uring *ring, struct io_uring_cqe *cqe)
}
printf("New client: id=%d, in=%d\n", c->tid, c->in_fd);
+ gettimeofday(&c->start_time, NULL);
- if (setup_buffer_rings(ring, c))
- return 1;
-
- if (is_sink) {
- submit_receive(ring, c);
- return 0;
- }
-
- if (ipv6)
- domain = AF_INET6;
- else
- domain = AF_INET;
+ pthread_barrier_init(&c->startup_barrier, NULL, 2);
+ pthread_create(&c->thread, NULL, thread_main, c);
/*
- * If fixed_files is set, proxy will use fixed files for any
- * new file descriptors it instantiates. Fixd files, or fixed
- * descriptors, are io_uring private file descriptors. They
- * cannot be accessed outside of io_uring. io_uring holds a
- * fixed reference to them, which means that we do not need to
- * grab per-request references to them. Particularly for
- * threaded applications, grabbing and dropping file references
- * for each operation can be costly as the file table is shared.
- * This generally shows up as fget/fput related overhead in
- * any workload profiles.
- *
- * Fixed descriptors are passed in via the 'fd' field just
- * like regular descriptors, and then marked as such by
- * setting the IOSQE_FIXED_FILE flag in the sqe->flags field.
- * Some helpers do that automatically, like the below, others
- * will need it set manually if they don't have a *direct*()
- * helper.
- *
- * For operations that instantiate them, like the opening of
- * a direct socket, the application may either ask the kernel
- * to find a free one (as is done below), or the application
- * may manage the space itself and pass in an index for a
- * currently free slot in the table. If the kernel is asked
- * to allocate a free direct descriptor, note that io_uring
- * does not abide by the POSIX mandated "lowest free must be
- * returned". It may return any free descriptor of its
- * choosing.
+ * Wait for thread to have its ring setup, then either assign the fd
+ * if it's non-fixed, or pass the fixed one
*/
- sqe = get_sqe(ring);
- if (fixed_files)
- io_uring_prep_socket_direct_alloc(sqe, domain, SOCK_STREAM, 0, 0);
- else
- io_uring_prep_socket(sqe, domain, SOCK_STREAM, 0, 0);
- encode_userdata(sqe, c, __SOCK, 0, 0);
+ pthread_barrier_wait(&c->startup_barrier);
+ if (!fixed_files) {
+ c->in_fd = cqe->res;
+ } else {
+ struct io_uring_sqe *sqe;
+ uint64_t user_data;
+
+ /*
+ * Ring has just been setup, we'll use index 0 as the descriptor
+ * value.
+ */
+ user_data = __raw_encode(c->tid, __FD_PASS, 0, 0);
+ sqe = io_uring_get_sqe(ring);
+ io_uring_prep_msg_ring_fd(sqe, c->ring.ring_fd, cqe->res, 0,
+ user_data, 0);
+ encode_userdata(sqe, c, __NOP, 0, cqe->res);
+ }
+
return 0;
}
@@ -957,7 +984,9 @@ static int handle_connect(struct io_uring *ring, struct io_uring_cqe *cqe)
{
struct conn *c = cqe_to_conn(cqe);
+ pthread_mutex_lock(&thread_lock);
open_conns++;
+ pthread_mutex_unlock(&thread_lock);
if (bidi)
submit_bidi_receive(ring, c);
@@ -1176,8 +1205,10 @@ start_close:
if (cd->rcv_bucket)
cd->rcv_bucket[nr_packets]++;
- ocd->out_buffers += nr_packets;
- assert(ocd->out_buffers <= nr_bufs);
+ if (!is_sink) {
+ ocd->out_buffers += nr_packets;
+ assert(ocd->out_buffers <= nr_bufs);
+ }
cd->rcv++;
cd->rcv_next_bid = bid;
@@ -1191,10 +1222,8 @@ start_close:
cd->pending_recv = 0;
if (recv_done_res(cqe->res))
goto start_close;
- if (!is_sink)
- cd->rcv_need_rearm = 1;
- else
- cd->rcv_rearm = 1;
+ if (is_sink)
+ __submit_receive(ring, c, &c->cd[0], c->in_fd);
}
/*
@@ -1203,8 +1232,8 @@ start_close:
* every buffer. We assume this is interactive mode, and hence don't
* delay anything.
*/
- if ((!ocd->pending_send && (bidi || (ocd->out_buffers >= nr_bufs / 2))) ||
- !(cqe->flags & IORING_CQE_F_MORE))
+ if (((!ocd->pending_send && (bidi || (ocd->out_buffers >= nr_bufs / 2))) ||
+ !(cqe->flags & IORING_CQE_F_MORE)) && !is_sink)
prep_next_send(ring, c, ocd, other_dir_fd(c, cqe_to_fd(cqe)));
if (!recv_done_res(cqe->res))
@@ -1496,14 +1525,12 @@ static int send_error(struct error_handler *err, struct io_uring *ring,
{
struct conn *c = cqe_to_conn(cqe);
struct conn_dir *cd = cqe_to_conn_dir(c, cqe);
- struct conn_dir *ocd = &c->cd[!cd->index];
cd->pending_send = 0;
if (cqe->res != -ENOBUFS)
return default_error(err, ring, cqe);
- ocd->rcv_rearm = 1;
cd->snd_enobufs++;
return 0;
}
@@ -1554,8 +1581,10 @@ static int handle_close(struct io_uring *ring, struct io_uring_cqe *cqe)
c->out_fd = -1;
if (c->in_fd == -1 && c->out_fd == -1) {
+ pthread_mutex_lock(&thread_lock);
__show_stats(c);
open_conns--;
+ pthread_mutex_unlock(&thread_lock);
free_buffer_rings(ring, c);
free_msgs(&c->cd[0]);
free_msgs(&c->cd[1]);
@@ -1585,6 +1614,78 @@ static int handle_cancel(struct io_uring *ring, struct io_uring_cqe *cqe)
return 0;
}
+static void open_socket(struct conn *c)
+{
+ if (is_sink) {
+ submit_receive(&c->ring, c);
+ } else {
+ struct io_uring_sqe *sqe;
+ int domain;
+
+ if (ipv6)
+ domain = AF_INET6;
+ else
+ domain = AF_INET;
+
+ /*
+ * If fixed_files is set, proxy will use fixed files for any new
+ * file descriptors it instantiates. Fixd files, or fixed
+ * descriptors, are io_uring private file descriptors. They
+ * cannot be accessed outside of io_uring. io_uring holds a
+ * fixed reference to them, which means that we do not need to
+ * grab per-request references to them. Particularly for
+ * threaded applications, grabbing and dropping file references
+ * for each operation can be costly as the file table is shared.
+ * This generally shows up as fget/fput related overhead in any
+ * workload profiles.
+ *
+ * Fixed descriptors are passed in via the 'fd' field just like
+ * regular descriptors, and then marked as such by setting the
+ * IOSQE_FIXED_FILE flag in the sqe->flags field. Some helpers
+ * do that automatically, like the below, others will need it
+ * set manually if they don't have a *direct*() helper.
+ *
+ * For operations that instantiate them, like the opening of a
+ * direct socket, the application may either ask the kernel to
+ * find a free one (as is done below), or the application may
+ * manage the space itself and pass in an index for a currently
+ * free slot in the table. If the kernel is asked to allocate a
+ * free direct descriptor, note that io_uring does not abide by
+ * the POSIX mandated "lowest free must be returned". It may
+ * return any free descriptor of its choosing.
+ */
+ sqe = get_sqe(&c->ring);
+ if (fixed_files)
+ io_uring_prep_socket_direct_alloc(sqe, domain, SOCK_STREAM, 0, 0);
+ else
+ io_uring_prep_socket(sqe, domain, SOCK_STREAM, 0, 0);
+ encode_userdata(sqe, c, __SOCK, 0, 0);
+ }
+}
+
+/*
+ * Start of connection, we got our in descriptor.
+ */
+static int handle_fd_pass(struct io_uring *ring, struct io_uring_cqe *cqe)
+{
+ struct conn *c = cqe_to_conn(cqe);
+ int fd = cqe_to_fd(cqe);
+
+ vlog("%d: got fd pass %d\n", c->tid, fd);
+ c->in_fd = fd;
+ open_socket(c);
+ return 0;
+}
+
+static int handle_stop(struct io_uring *ring, struct io_uring_cqe *cqe)
+{
+ struct conn *c = cqe_to_conn(cqe);
+
+ printf("Client %d: queueing shutdown\n", c->tid);
+ queue_cancel(&c->ring, c);
+ return 0;
+}
+
/*
* Called for each CQE that we receive. Decode the request type that it
* came from, and call the appropriate handler.
@@ -1633,6 +1734,15 @@ static int handle_cqe(struct io_uring *ring, struct io_uring_cqe *cqe)
case __CLOSE:
ret = handle_close(ring, cqe);
break;
+ case __FD_PASS:
+ ret = handle_fd_pass(ring, cqe);
+ break;
+ case __STOP:
+ ret = handle_stop(ring, cqe);
+ break;
+ case __NOP:
+ ret = 0;
+ break;
default:
fprintf(stderr, "bad user data %lx\n", (long) cqe->user_data);
return 1;
@@ -1643,35 +1753,32 @@ static int handle_cqe(struct io_uring *ring, struct io_uring_cqe *cqe)
static void house_keeping(struct io_uring *ring)
{
- struct conn_dir *cd;
+ static unsigned long last_bytes;
+ unsigned long bytes, elapsed;
struct conn *c;
int i, j;
vlog("House keeping entered\n");
+ bytes = 0;
for (i = 0; i < nr_conns; i++) {
c = &conns[i];
if (c->flags & CONN_F_DISCONNECTED) {
vlog("%d: disconnected\n", i);
+
+ if (!(c->flags & CONN_F_REAPED)) {
+ void *ret;
+
+ pthread_join(c->thread, &ret);
+ c->flags |= CONN_F_REAPED;
+ }
continue;
}
-
for (j = 0; j < 2; j++) {
- int in_fd;
-
- cd = &c->cd[j];
- if (!j)
- in_fd = c->in_fd;
- else
- in_fd = c->out_fd;
-
- if (cd->rcv_rearm) {
- vlog("%d: rcv rearm on %d\n", i, j);
- cd->rcv_rearm = 0;
- if (!cd->pending_recv)
- __submit_receive(ring, c, cd, in_fd);
- }
+ struct conn_dir *cd = &c->cd[j];
+
+ bytes += cd->in_bytes + cd->out_bytes;
}
if (c->flags & CONN_F_DISCONNECTING)
@@ -1682,37 +1789,33 @@ static void house_keeping(struct io_uring *ring)
c->flags |= CONN_F_DISCONNECTING;
}
}
+
+ elapsed = mtime_since_now(&last_housekeeping);
+ if (bytes && elapsed >= 900) {
+ unsigned long bw;
+
+ bw = (8 * (bytes - last_bytes) / 1000UL) / elapsed;
+ if (bw) {
+ if (open_conns)
+ printf("Bandwidth (threads=%d): %'luMbit\n", open_conns, bw);
+ gettimeofday(&last_housekeeping, NULL);
+ last_bytes = bytes;
+ }
+ }
}
/*
- * Main event loop, Submit our multishot accept request, and then just loop
- * around handling incoming events.
+ * Event loop shared between the parent, and the connections. Could be
+ * split in two, as they don't handle the same types of events. For the per
+ * connection loop, 'c' is valid. For the main loop, it's NULL.
*/
-static int event_loop(struct io_uring *ring, int fd)
+static int __event_loop(struct io_uring *ring, struct conn *c)
{
- struct __kernel_timespec active_ts, idle_ts = { .tv_sec = 1, };
- struct io_uring_sqe *sqe;
+ struct __kernel_timespec active_ts, idle_ts;
int flags;
- /*
- * proxy provides a way to use either multishot receive or not, but
- * for accept, we always use multishot. A multishot accept request
- * needs only be armed once, and then it'll trigger a completion and
- * post a CQE whenever a new connection is accepted. No need to do
- * anything else, unless the multishot accept terminates. This happens
- * if it encounters an error. Applications should check for
- * IORING_CQE_F_MORE in cqe->flags - this tells you if more completions
- * are expected from this request or not. Non-multishot never have
- * this set, where multishot will always have this set unless an error
- * occurs.
- */
- sqe = get_sqe(ring);
- if (fixed_files)
- io_uring_prep_multishot_accept_direct(sqe, fd, NULL, NULL, 0);
- else
- io_uring_prep_multishot_accept(sqe, fd, NULL, NULL, 0);
- __encode_userdata(sqe, 0, __ACCEPT, 0, fd);
-
+ idle_ts.tv_sec = 0;
+ idle_ts.tv_nsec = 100000000LL;
active_ts = idle_ts;
if (wait_usec > 1000000) {
active_ts.tv_sec = wait_usec / 1000000;
@@ -1720,6 +1823,8 @@ static int event_loop(struct io_uring *ring, int fd)
}
active_ts.tv_nsec = wait_usec * 1000;
+ gettimeofday(&last_housekeeping, NULL);
+
flags = 0;
while (1) {
struct __kernel_timespec *ts = &idle_ts;
@@ -1746,7 +1851,7 @@ static int event_loop(struct io_uring *ring, int fd)
to_wait = 1;
if (open_conns && !flags) {
ts = &active_ts;
- to_wait = open_conns * wait_batch;
+ to_wait = wait_batch;
}
vlog("Submit and wait for %d\n", to_wait);
@@ -1781,13 +1886,234 @@ static int event_loop(struct io_uring *ring, int fd)
events += i;
}
- house_keeping(ring);
event_loops++;
+ if (c) {
+ if (c->flags & CONN_F_DISCONNECTED)
+ break;
+ } else {
+ house_keeping(ring);
+ }
+ }
+
+ return 0;
+}
+
+/*
+ * Main event loop, Submit our multishot accept request, and then just loop
+ * around handling incoming connections.
+ */
+static int parent_loop(struct io_uring *ring, int fd)
+{
+ struct io_uring_sqe *sqe;
+
+ /*
+ * proxy provides a way to use either multishot receive or not, but
+ * for accept, we always use multishot. A multishot accept request
+ * needs only be armed once, and then it'll trigger a completion and
+ * post a CQE whenever a new connection is accepted. No need to do
+ * anything else, unless the multishot accept terminates. This happens
+ * if it encounters an error. Applications should check for
+ * IORING_CQE_F_MORE in cqe->flags - this tells you if more completions
+ * are expected from this request or not. Non-multishot never have
+ * this set, where multishot will always have this set unless an error
+ * occurs.
+ */
+ sqe = get_sqe(ring);
+ if (fixed_files)
+ io_uring_prep_multishot_accept_direct(sqe, fd, NULL, NULL, 0);
+ else
+ io_uring_prep_multishot_accept(sqe, fd, NULL, NULL, 0);
+ __encode_userdata(sqe, 0, __ACCEPT, 0, fd);
+
+ return __event_loop(ring, NULL);
+}
+
+static int init_ring(struct io_uring *ring, int nr_files)
+{
+ struct io_uring_params params;
+ int ret;
+
+ /*
+ * By default, set us up with a big CQ ring. Not strictly needed
+ * here, but it's very important to never overflow the CQ ring.
+ * Events will not be dropped if this happens, but it does slow
+ * the application down in dealing with overflown events.
+ *
+ * Set SINGLE_ISSUER, which tells the kernel that only one thread
+ * is doing IO submissions. This enables certain optimizations in
+ * the kernel.
+ */
+ memset(&params, 0, sizeof(params));
+ params.flags |= IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_CLAMP;
+ params.flags |= IORING_SETUP_CQSIZE;
+ params.cq_entries = 1024;
+
+ /*
+ * If use_huge is set, setup the ring with IORING_SETUP_NO_MMAP. This
+ * means that the application allocates the memory for the ring, and
+ * the kernel maps it. The alternative is having the kernel allocate
+ * the memory, and then liburing will mmap it. But we can't really
+ * support huge pages that way. If this fails, then ensure that the
+ * system has huge pages set aside upfront.
+ */
+ if (use_huge)
+ params.flags |= IORING_SETUP_NO_MMAP;
+
+ /*
+ * DEFER_TASKRUN decouples async event reaping and retrying from
+ * regular system calls. If this isn't set, then io_uring uses
+ * normal task_work for this. task_work is always being run on any
+ * exit to userspace. Real applications do more than just call IO
+ * related system calls, and hence we can be running this work way
+ * too often. Using DEFER_TASKRUN defers any task_work running to
+ * when the application enters the kernel anyway to wait on new
+ * events. It's generally the preferred and recommended way to setup
+ * a ring.
+ */
+ if (defer_tw) {
+ params.flags |= IORING_SETUP_DEFER_TASKRUN;
+ sqpoll = 0;
+ }
+
+ /*
+ * SQPOLL offloads any request submission and retry operations to a
+ * dedicated thread. This enables an application to do IO without
+ * ever having to enter the kernel itself. The SQPOLL thread will
+ * stay busy as long as there's work to do, and go to sleep if
+ * sq_thread_idle msecs have passed. If it's running, submitting new
+ * IO just needs to make them visible to the SQPOLL thread, it needs
+ * not enter the kernel. For submission, the application will only
+ * enter the kernel if the SQPOLL has been idle long enough that it
+ * has gone to sleep.
+ *
+ * Waiting on events still need to enter the kernel, if none are
+ * available. The application may also use io_uring_peek_cqe() to
+ * check for new events without entering the kernel, as completions
+ * will be continually produced to the CQ ring by the SQPOLL thread
+ * as they occur.
+ */
+ if (sqpoll) {
+ params.flags |= IORING_SETUP_SQPOLL;
+ params.sq_thread_idle = 1000;
+ defer_tw = 0;
+ }
+
+ /*
+ * If neither DEFER_TASKRUN or SQPOLL is used, set COOP_TASKRUN. This
+ * avoids heavy signal based notifications, which can force an
+ * application to enter the kernel and process it as soon as they
+ * occur.
+ */
+ if (!sqpoll && !defer_tw)
+ params.flags |= IORING_SETUP_COOP_TASKRUN;
+
+ /*
+ * The SQ ring size need not be larger than any batch of requests
+ * that need to be prepared before submit. Normally in a loop we'd
+ * only need a few, if any, particularly if multishot is used.
+ */
+ ret = io_uring_queue_init_params(ring_size, ring, &params);
+ if (ret) {
+ fprintf(stderr, "%s\n", strerror(-ret));
+ return 1;
+ }
+
+ /*
+ * If send serialization is available and no option was given to use
+ * it or not, default it to on. If it was turned on and the kernel
+ * doesn't support it, turn it off.
+ */
+ if (params.features & IORING_FEAT_SEND_BUF_SELECT) {
+ if (send_ring == -1)
+ send_ring = 1;
+ } else {
+ if (send_ring == 1) {
+ fprintf(stderr, "Kernel doesn't support ring provided "
+ "buffers for sends, disabled\n");
+ }
+ send_ring = 0;
+ }
+
+ if (!send_ring && snd_bundle) {
+ fprintf(stderr, "Can't use send bundle without send_ring\n");
+ snd_bundle = 0;
+ }
+
+ if (fixed_files) {
+ /*
+ * If fixed files are used, we need to allocate a fixed file
+ * table upfront where new direct descriptors can be managed.
+ */
+ ret = io_uring_register_files_sparse(ring, nr_files);
+ if (ret) {
+ fprintf(stderr, "file register: %d\n", ret);
+ return 1;
+ }
+
+ /*
+ * If fixed files are used, we also register the ring fd. See
+ * comment near io_uring_prep_socket_direct_alloc() further
+ * down. This avoids the fget/fput overhead associated with
+ * the io_uring_enter(2) system call itself, which is used to
+ * submit and wait on events.
+ */
+ ret = io_uring_register_ring_fd(ring);
+ if (ret != 1) {
+ fprintf(stderr, "ring register: %d\n", ret);
+ return 1;
+ }
+ }
+
+ if (napi) {
+ struct io_uring_napi n = {
+ .prefer_busy_poll = napi > 1 ? 1 : 0,
+ .busy_poll_to = napi_timeout,
+ };
+
+ ret = io_uring_register_napi(ring, &n);
+ if (ret) {
+ fprintf(stderr, "io_uring_register_napi: %d\n", ret);
+ if (ret != -EINVAL)
+ return 1;
+ fprintf(stderr, "NAPI not available, turned off\n");
+ }
}
return 0;
}
+static void *thread_main(void *data)
+{
+ struct conn *c = data;
+ int ret;
+
+ c->flags |= CONN_F_STARTED;
+
+ /* we need a max of 4 descriptors for each client */
+ ret = init_ring(&c->ring, 4);
+ if (ret)
+ goto done;
+
+ if (setup_buffer_rings(&c->ring, c))
+ goto done;
+
+ /*
+ * If we're using fixed files, then we need to wait for the parent
+ * to install the c->in_fd into our direct descriptor table. When
+ * that happens, we'll set things up. If we're not using fixed files,
+ * we can set up the receive or connect now.
+ */
+ if (!fixed_files)
+ open_socket(c);
+
+ /* we're ready */
+ pthread_barrier_wait(&c->startup_barrier);
+
+ __event_loop(&c->ring, c);
+done:
+ return NULL;
+}
+
static void usage(const char *name)
{
printf("%s:\n", name);
@@ -1818,24 +2144,27 @@ static void usage(const char *name)
printf("\t-V:\t\tIncrease verbosity (%d)\n", verbose);
}
-
/*
* Options parsing the ring / net setup
*/
int main(int argc, char *argv[])
{
struct io_uring ring;
- struct io_uring_params params;
struct sigaction sa = { };
- const char *optstring = "m:d:S:s:b:f:H:r:p:n:B:N:T:w:t:M:R:u:c:C:q:a:x:6Vh?";
+ const char *optstring;
int opt, ret, fd;
+ setlocale(LC_NUMERIC, "en_US");
+
page_size = sysconf(_SC_PAGESIZE);
if (page_size < 0) {
perror("sysconf(_SC_PAGESIZE)");
return 1;
}
+ pthread_mutex_init(&thread_lock, NULL);
+
+ optstring = "m:d:S:s:b:f:H:r:p:n:B:N:T:w:t:M:R:u:c:C:q:a:x:6Vh?";
while ((opt = getopt(argc, argv, optstring)) != -1) {
switch (opt) {
case 'm':
@@ -1964,151 +2293,9 @@ int main(int argc, char *argv[])
sa.sa_flags = SA_RESTART;
sigaction(SIGINT, &sa, NULL);
- /*
- * By default, set us up with a big CQ ring. Not strictly needed
- * here, but it's very important to never overflow the CQ ring.
- * Events will not be dropped if this happens, but it does slow
- * the application down in dealing with overflown events.
- *
- * Set SINGLE_ISSUER, which tells the kernel that only one thread
- * is doing IO submissions. This enables certain optimizations in
- * the kernel.
- */
- memset(&params, 0, sizeof(params));
- params.flags |= IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_CLAMP;
- params.flags |= IORING_SETUP_CQSIZE;
- params.cq_entries = 1024;
-
- /*
- * If use_huge is set, setup the ring with IORING_SETUP_NO_MMAP. This
- * means that the application allocates the memory for the ring, and
- * the kernel maps it. The alternative is having the kernel allocate
- * the memory, and then liburing will mmap it. But we can't really
- * support huge pages that way. If this fails, then ensure that the
- * system has huge pages set aside upfront.
- */
- if (use_huge)
- params.flags |= IORING_SETUP_NO_MMAP;
-
- /*
- * DEFER_TASKRUN decouples async event reaping and retrying from
- * regular system calls. If this isn't set, then io_uring uses
- * normal task_work for this. task_work is always being run on any
- * exit to userspace. Real applications do more than just call IO
- * related system calls, and hence we can be running this work way
- * too often. Using DEFER_TASKRUN defers any task_work running to
- * when the application enters the kernel anyway to wait on new
- * events. It's generally the preferred and recommended way to setup
- * a ring.
- */
- if (defer_tw) {
- params.flags |= IORING_SETUP_DEFER_TASKRUN;
- sqpoll = 0;
- }
-
- /*
- * SQPOLL offloads any request submission and retry operations to a
- * dedicated thread. This enables an application to do IO without
- * ever having to enter the kernel itself. The SQPOLL thread will
- * stay busy as long as there's work to do, and go to sleep if
- * sq_thread_idle msecs have passed. If it's running, submitting new
- * IO just needs to make them visible to the SQPOLL thread, it needs
- * not enter the kernel. For submission, the application will only
- * enter the kernel if the SQPOLL has been idle long enough that it
- * has gone to sleep.
- *
- * Waiting on events still need to enter the kernel, if none are
- * available. The application may also use io_uring_peek_cqe() to
- * check for new events without entering the kernel, as completions
- * will be continually produced to the CQ ring by the SQPOLL thread
- * as they occur.
- */
- if (sqpoll) {
- params.flags |= IORING_SETUP_SQPOLL;
- params.sq_thread_idle = 1000;
- defer_tw = 0;
- }
-
- /*
- * If neither DEFER_TASKRUN or SQPOLL is used, set COOP_TASKRUN. This
- * avoids heavy signal based notifications, which can force an
- * application to enter the kernel and process it as soon as they
- * occur.
- */
- if (!sqpoll && !defer_tw)
- params.flags |= IORING_SETUP_COOP_TASKRUN;
-
- /*
- * The SQ ring size need not be larger than any batch of requests
- * that need to be prepared before submit. Normally in a loop we'd
- * only need a few, if any, particularly if multishot is used.
- */
- ret = io_uring_queue_init_params(ring_size, &ring, &params);
- if (ret) {
- fprintf(stderr, "%s\n", strerror(-ret));
- return 1;
- }
-
- /*
- * If send serialization is available and no option was given to use
- * it or not, default it to on. If it was turned on and the kernel
- * doesn't support it, turn it off.
- */
- if (params.features & IORING_FEAT_SEND_BUF_SELECT) {
- if (send_ring == -1)
- send_ring = 1;
- } else {
- if (send_ring == 1) {
- fprintf(stderr, "Kernel doesn't support ring provided "
- "buffers for sends, disabled\n");
- }
- send_ring = 0;
- }
-
- if (!send_ring && snd_bundle) {
- fprintf(stderr, "Can't use send bundle without send_ring\n");
- snd_bundle = 0;
- }
-
- if (fixed_files) {
- /*
- * If fixed files are used, we need to allocate a fixed file
- * table upfront where new direct descriptors can be managed.
- */
- ret = io_uring_register_files_sparse(&ring, 4096);
- if (ret) {
- fprintf(stderr, "file register: %d\n", ret);
- return 1;
- }
-
- /*
- * If fixed files are used, we also register the ring fd. See
- * comment near io_uring_prep_socket_direct_alloc() further
- * down. This avoids the fget/fput overhead associated with
- * the io_uring_enter(2) system call itself, which is used to
- * submit and wait on events.
- */
- ret = io_uring_register_ring_fd(&ring);
- if (ret != 1) {
- fprintf(stderr, "ring register: %d\n", ret);
- return 1;
- }
- }
-
- if (napi) {
- struct io_uring_napi n = {
- .prefer_busy_poll = napi > 1 ? 1 : 0,
- .busy_poll_to = napi_timeout,
- };
-
- ret = io_uring_register_napi(&ring, &n);
- if (ret) {
- fprintf(stderr, "io_uring_register_napi: %d\n", ret);
- if (ret != -EINVAL)
- return 1;
- fprintf(stderr, "NAPI not available, turned off\n");
- }
- }
+ ret = init_ring(&ring, MAX_CONNS * 3);
+ if (ret)
+ return ret;
printf("Backend: sqpoll=%d, defer_tw=%d, fixed_files=%d "
"is_sink=%d, buf_size=%d, nr_bufs=%d, host=%s, send_port=%d "
@@ -2121,5 +2308,5 @@ int main(int argc, char *argv[])
printf(" send options: sendmsg=%d, send_ring=%d, send_bundle=%d\n",
snd_msg, send_ring, snd_bundle);
- return event_loop(&ring, fd);
+ return parent_loop(&ring, fd);
}
diff --git a/examples/proxy.h b/examples/proxy.h
index 1778b396..3fa187bf 100644
--- a/examples/proxy.h
+++ b/examples/proxy.h
@@ -2,6 +2,8 @@
#ifndef LIBURING_PROXY_H
#define LIBURING_PROXY_H
+#include <sys/time.h>
+
/*
* Generic opcode agnostic encoding to sqe/cqe->user_data
*/
@@ -40,6 +42,17 @@ static inline void __encode_userdata(struct io_uring_sqe *sqe, int tid, int op,
io_uring_sqe_set_data64(sqe, ud.val);
}
+static inline uint64_t __raw_encode(int tid, int op, int bid, int fd)
+{
+ struct userdata ud = {
+ .op_tid = (op << OP_SHIFT) | tid,
+ .bid = bid,
+ .fd = fd
+ };
+
+ return ud.val;
+}
+
static inline int cqe_to_op(struct io_uring_cqe *cqe)
{
struct userdata ud = { .val = cqe->user_data };
@@ -61,4 +74,29 @@ static inline int cqe_to_fd(struct io_uring_cqe *cqe)
return ud.fd;
}
+static unsigned long long mtime_since(const struct timeval *s,
+ const struct timeval *e)
+{
+ long long sec, usec;
+
+ sec = e->tv_sec - s->tv_sec;
+ usec = (e->tv_usec - s->tv_usec);
+ if (sec > 0 && usec < 0) {
+ sec--;
+ usec += 1000000;
+ }
+
+ sec *= 1000;
+ usec /= 1000;
+ return sec + usec;
+}
+
+static unsigned long long mtime_since_now(struct timeval *tv)
+{
+ struct timeval end;
+
+ gettimeofday(&end, NULL);
+ return mtime_since(tv, &end);
+}
+
#endif