aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJens Axboe <axboe@kernel.dk>2024-02-17 15:53:10 -0700
committerJens Axboe <axboe@kernel.dk>2024-02-17 15:53:10 -0700
commitaf3642a605c23b6cec751d8a5452d334e840779a (patch)
treeb93ac9c0d9e774e69e8b5ea88206171ba56ce3cb
parentc6e5f378e37207853a2d1c87870c0f8ec4497fd1 (diff)
downloadliburing-af3642a605c23b6cec751d8a5452d334e840779a.tar.gz
examples/proxy: thorough cleaning and bug fixes
Generally not a huge fan of big changes like this, but the initial commit was mostly done early so that someone else could have easy access to it. This makes it work for a variety of more cases, and serializes sends just in case we get them reordered on the kernel side. It still works the same as before, it just works better now. Signed-off-by: Jens Axboe <axboe@kernel.dk>
-rw-r--r--examples/list.h61
-rw-r--r--examples/proxy.c589
2 files changed, 547 insertions, 103 deletions
diff --git a/examples/list.h b/examples/list.h
new file mode 100644
index 00000000..08c52882
--- /dev/null
+++ b/examples/list.h
@@ -0,0 +1,61 @@
+#ifndef LIBURING_EX_LIST_H
+#define LIBURING_EX_LIST_H
+
+struct list_head {
+ struct list_head *prev, *next;
+};
+
+#ifndef offsetof
+#define offsetof(TYPE, FIELD) ((size_t) &((TYPE *)0)->FIELD)
+#endif
+
+#ifndef container_of
+#define container_of(PTR, TYPE, FIELD) ({ \
+ __typeof__(((TYPE *)0)->FIELD) *__FIELD_PTR = (PTR); \
+ (TYPE *)((char *) __FIELD_PTR - offsetof(TYPE, FIELD)); \
+})
+#endif
+
+static inline void init_list_head(struct list_head *list)
+{
+ list->next = list;
+ list->prev = list;
+}
+
+static inline void __list_add(struct list_head *new, struct list_head *prev,
+ struct list_head *next)
+{
+ next->prev = new;
+ new->next = next;
+ new->prev = prev;
+ prev->next = new;;
+}
+
+static inline void list_add_tail(struct list_head *new, struct list_head *head)
+{
+ __list_add(new, head->prev, head);
+}
+
+static inline void __list_del(struct list_head *prev, struct list_head *next)
+{
+ next->prev = prev;
+ prev->next = next;
+}
+
+static inline void list_del(struct list_head *entry)
+{
+ __list_del(entry->prev, entry->next);
+}
+
+static inline bool list_empty(struct list_head *list)
+{
+ return list->next == list;
+}
+
+#define list_entry(ptr, type, member) \
+ container_of(ptr, type, member)
+
+#define list_first_entry(ptr, type, member) \
+ list_entry((ptr)->next, type, member)
+
+#endif
diff --git a/examples/proxy.c b/examples/proxy.c
index 17514a64..93f9916f 100644
--- a/examples/proxy.c
+++ b/examples/proxy.c
@@ -1,3 +1,4 @@
+/* SPDX-License-Identifier: MIT */
/*
* Sample program that can act either as a packet sink, where it just receives
* packets and doesn't do anything with them, or it can act as a proxy where it
@@ -23,7 +24,7 @@
*
* Run with -h to see a list of options, and their defaults.
*
- * (C) Jens Axboe <axboe@kernel.dk> 2024
+ * (C) 2024 Jens Axboe <axboe@kernel.dk>
*
*/
#include <fcntl.h>
@@ -38,34 +39,39 @@
#include <unistd.h>
#include <liburing.h>
+#include "list.h"
+
/*
- * Upper 8 bits is the command type, next 16 bits is the bid, next 16 bits is
- * the bgid, bottom 8 bits is the connection id
+ * Goes from accept new connection -> create socket, connect to end
+ * point, prepare recv, on receive do send (unless sink). If either ends
+ * disconnects, we transition to shutdown.
*/
-#define OP_SHIFT (56ULL)
-#define OP_MASK ((1ULL << OP_SHIFT) - 1)
-#define BID_SHIFT (40ULL)
-#define BID_MASK ((1ULL << 16) - 1)
-#define BGID_SHIFT (24ULL)
-#define BGID_MASK ((1ULL << 16) - 1)
-
-#define __ACCEPT 1ULL
-#define __SOCK 2ULL
-#define __CONNECT 3ULL
-#define __RECV 4ULL
-#define __RECV_OUT 5ULL
-#define __SEND 6ULL
+enum {
+ __ACCEPT = 0,
+ __SOCK = 1,
+ __CONNECT = 2,
+ __RECV = 3,
+ __SEND = 4,
+ __SHUTDOWN = 5,
+};
/*
- * Goes from accept new connection -> create socket, connect to end
- * point, prepare recv, on receive do send.
+ * Generic opcode agnostic encoding to sqe/cqe->user_data
*/
-#define ACCEPT_DATA (__ACCEPT << OP_SHIFT)
-#define SOCK_DATA (__SOCK << OP_SHIFT)
-#define CONNECT_DATA (__CONNECT << OP_SHIFT)
-#define RECV_DATA (__RECV << OP_SHIFT)
-#define RECV_OUT_DATA (__RECV_OUT << OP_SHIFT)
-#define SEND_DATA (__SEND << OP_SHIFT)
+struct userdata {
+ union {
+ struct {
+ uint16_t op_tid; /* 3 bits op, 13 bits tid */
+ uint16_t bgid;
+ uint16_t bid;
+ uint16_t fd;
+ };
+ uint64_t val;
+ };
+};
+
+#define OP_SHIFT (13)
+#define TID_MASK ((1U << 13) - 1)
static int start_bgid = 1;
@@ -76,7 +82,6 @@ static int mshot = 1;
static int sqpoll;
static int defer_tw = 1;
static int is_sink;
-static int stats_shown;
static int fixed_files;
static char *host = "192.168.2.6";
static int send_port = 4445;
@@ -96,22 +101,57 @@ struct conn_buf_ring {
int bgid;
};
+struct pending_send {
+ struct list_head list;
+
+ int fd, bgid, bid, len;
+ void *data;
+};
+
+/*
+ * Per socket stats per connection. For bi-directional, we'll have both
+ * sends and receives on each socket, this helps track them seperately.
+ * For sink or one directional, each of the two stats will be only sends
+ * or receives, not both.
+ */
+struct conn_dir {
+ int pending_shutdown;
+ int pending_sends;
+ struct list_head send_list;
+
+ int rcv, rcv_shrt;
+ int snd, snd_shrt;
+ int snd_busy;
+
+ unsigned long in_bytes, out_bytes;
+
+ int bgid_switch;
+ int mshot_resubmit;
+};
+
+enum {
+ CONN_F_DISCONNECTING = 1,
+ CONN_F_DISCONNECTED = 2,
+};
+
struct conn {
struct conn_buf_ring brs[NR_BUF_RINGS];
+ struct conn_buf_ring *cur_br;
int tid;
+ int in_fd, out_fd;
int start_bgid;
int cur_br_index;
- struct conn_buf_ring *cur_br;
- int in_fd, out_fd;
+ unsigned long rps;
- struct sockaddr_in addr;
+ struct conn_dir cd[2];
- int rcv, snd, shrt, bgid_switch, mshot_resubmit;
+ int flags;
- unsigned long rps;
- unsigned long bytes;
+ int stats_shown;
+
+ struct sockaddr_in addr;
};
static struct conn conns[MAX_CONNS];
@@ -119,7 +159,7 @@ static struct conn conns[MAX_CONNS];
static int setup_listening_socket(int port)
{
struct sockaddr_in srv_addr;
- int fd, enable;
+ int fd, enable, ret;
fd = socket(PF_INET, SOCK_STREAM, 0);
if (fd == -1) {
@@ -128,7 +168,8 @@ static int setup_listening_socket(int port)
}
enable = 1;
- if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0) {
+ ret = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int));
+ if (ret < 0) {
perror("setsockopt(SO_REUSEADDR)");
return -1;
}
@@ -138,7 +179,8 @@ static int setup_listening_socket(int port)
srv_addr.sin_port = htons(port);
srv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
- if (bind(fd, (const struct sockaddr *)&srv_addr, sizeof(srv_addr)) < 0) {
+ ret = bind(fd, (const struct sockaddr *)&srv_addr, sizeof(srv_addr));
+ if (ret < 0) {
perror("bind()");
return -1;
}
@@ -151,6 +193,12 @@ static int setup_listening_socket(int port)
return fd;
}
+/*
+ * Setup 2 ring provided buffer rings for each connection. If we get -ENOBUFS
+ * on receive, we'll switch to the other ring and re-arm. If this happens
+ * frequently (see switch= stat), then the ring sizes are likely too small.
+ * Use -nXX to make them bigger.
+ */
static int setup_buffer_ring(struct io_uring *ring, struct conn *c, int index)
{
struct conn_buf_ring *cbr = &c->brs[index];
@@ -201,14 +249,53 @@ static int setup_buffer_rings(struct io_uring *ring, struct conn *c)
return 0;
}
-static void show_stats(void)
+static void free_buffer_rings(struct io_uring *ring, struct conn *c)
{
int i;
- if (stats_shown)
+ for (i = 0; i < NR_BUF_RINGS; i++) {
+ struct conn_buf_ring *cbr = &c->brs[i];
+
+ io_uring_free_buf_ring(ring, cbr->br, nr_bufs, cbr->bgid);
+ free(cbr->buf);
+ }
+
+ c->cur_br = NULL;
+}
+
+static void __show_stats(struct conn *c)
+{
+ struct conn_dir *cd;
+ int i;
+
+ if (c->stats_shown)
return;
- stats_shown = 1;
+ printf("Conn %d/(in_fd=%d, out_fd=%d): rps=%lu\n", c->tid, c->in_fd,
+ c->out_fd, c->rps);
+
+ for (i = 0; i < 2; i++) {
+ cd = &c->cd[i];
+
+ if (!cd->in_bytes && !cd->out_bytes)
+ continue;
+
+ printf("\t%3d: rcv=%u (short=%u), snd=%u (short=%u, busy=%u)\n",
+ i, cd->rcv, cd->rcv_shrt, cd->snd, cd->snd_shrt,
+ cd->snd_busy);
+ printf("\t : switch=%u, mshot_resubmit=%d\n",
+ cd->bgid_switch, cd->mshot_resubmit);
+ printf("\t : in_bytes=%lu (Kb %lu), out_bytes=%lu (Kb %lu)\n",
+ cd->in_bytes, cd->in_bytes >> 10,
+ cd->out_bytes, cd->out_bytes >> 10);
+ }
+
+ c->stats_shown = 1;
+}
+
+static void show_stats(void)
+{
+ int i;
for (i = 0; i < MAX_CONNS; i++) {
struct conn *c = &conns[i];
@@ -216,7 +303,7 @@ static void show_stats(void)
if (!c->rps)
continue;
- printf("Conn %d/(in_fd=%d, out_fd=%d): rps=%lu (rcv=%u, snd=%u, switch=%u, mshot_resubmit=%d, short=%d), kb=%lu\n", c->tid, c->in_fd, c->out_fd, c->rps, c->rcv, c->snd, c->bgid_switch, c->mshot_resubmit, c->shrt, c->bytes >> 10);
+ __show_stats(c);
}
}
@@ -250,17 +337,82 @@ static struct io_uring_sqe *get_sqe(struct io_uring *ring)
return sqe;
}
-static void __submit_receive(struct io_uring *ring, struct conn *c, int fd,
- uint64_t type)
+static void __encode_userdata(struct io_uring_sqe *sqe, int tid, int op,
+ int bgid, int bid, int fd)
+{
+ struct userdata ud = {
+ .op_tid = (op << OP_SHIFT) | tid,
+ .bgid = bgid,
+ .bid = bid,
+ .fd = fd
+ };
+
+ io_uring_sqe_set_data64(sqe, ud.val);
+}
+
+static void encode_userdata(struct io_uring_sqe *sqe, struct conn *c, int op,
+ int bgid, int bid, int fd)
+{
+ __encode_userdata(sqe, c->tid, op, bgid, bid, fd);
+}
+
+static int cqe_to_op(struct io_uring_cqe *cqe)
+{
+ struct userdata ud = {
+ .val = cqe->user_data
+ };
+
+ return ud.op_tid >> OP_SHIFT;
+}
+
+static struct conn *cqe_to_conn(struct io_uring_cqe *cqe)
+{
+ struct userdata ud = {
+ .val = cqe->user_data
+ };
+
+ return &conns[ud.op_tid & TID_MASK];
+}
+
+static int cqe_to_bgid(struct io_uring_cqe *cqe)
+{
+ struct userdata ud = {
+ .val = cqe->user_data
+ };
+
+ return ud.bgid;
+}
+
+static int cqe_to_bid(struct io_uring_cqe *cqe)
+{
+ struct userdata ud = {
+ .val = cqe->user_data
+ };
+
+ return ud.bid;
+}
+
+static int cqe_to_fd(struct io_uring_cqe *cqe)
+{
+ struct userdata ud = {
+ .val = cqe->user_data
+ };
+
+ return ud.fd;
+}
+
+static struct conn_dir *fd_to_conn_dir(struct conn *c, int fd)
+{
+ return &c->cd[fd == c->in_fd];
+}
+
+static void __submit_receive(struct io_uring *ring, struct conn *c, int fd)
{
struct conn_buf_ring *cbr = c->cur_br;
struct io_uring_sqe *sqe;
- uint64_t user_data;
- if (verbose) {
- printf("%d: submit receive fd=%d, type=%lx\n", c->tid, fd,
- (unsigned long) type);
- }
+ if (verbose)
+ printf("%d: submit receive fd=%d\n", c->tid, fd);
sqe = get_sqe(ring);
if (mshot)
@@ -268,24 +420,28 @@ static void __submit_receive(struct io_uring *ring, struct conn *c, int fd,
else
io_uring_prep_recv(sqe, fd, NULL, 0, 0);
- user_data = type | c->tid;
- user_data |= ((uint64_t) cbr->bgid << BGID_SHIFT);
- io_uring_sqe_set_data64(sqe, user_data);
+ encode_userdata(sqe, c, __RECV, cbr->bgid, 0, fd);
sqe->buf_group = cbr->bgid;
sqe->flags |= IOSQE_BUFFER_SELECT;
if (fixed_files)
sqe->flags |= IOSQE_FIXED_FILE;
}
+/*
+ * One directional just arms receive on our in_fd
+ */
static void submit_receive(struct io_uring *ring, struct conn *c)
{
- __submit_receive(ring, c, c->in_fd, RECV_DATA);
+ __submit_receive(ring, c, c->in_fd);
}
+/*
+ * Bi-directional arms receive on both in and out fd
+ */
static void submit_bidi_receive(struct io_uring *ring, struct conn *c)
{
- __submit_receive(ring, c, c->in_fd, RECV_DATA);
- __submit_receive(ring, c, c->out_fd, RECV_OUT_DATA);
+ __submit_receive(ring, c, c->in_fd);
+ __submit_receive(ring, c, c->out_fd);
}
/*
@@ -295,32 +451,178 @@ static void submit_bidi_receive(struct io_uring *ring, struct conn *c)
* group and continue from there, previous sends should come in and replenish the
* previous one by the time we potentially hit -ENOBUFS again.
*/
-static void handle_enobufs(struct io_uring *ring, struct conn *c)
+static void handle_enobufs(struct io_uring *ring, struct conn *c,
+ struct conn_dir *cd, int fd)
{
- c->bgid_switch++;
+ cd->bgid_switch++;
c->cur_br_index ^= 1;
c->cur_br = &c->brs[c->cur_br_index];
+ if (verbose) {
+ printf("%d: enobufs: switch to bgid %d\n", c->tid,
+ c->cur_br->bgid);
+ }
+
+ __submit_receive(ring, c, fd);
+}
+
+/*
+ * Kill this socket - submit a shutdown and link a close to it. We don't
+ * care about shutdown status, so mark it as not needing to post a CQE unless
+ * it fails.
+ */
+static void queue_shutdown_close(struct io_uring *ring, struct conn *c, int fd)
+{
+ struct io_uring_sqe *sqe;
+
+ sqe = get_sqe(ring);
+ io_uring_prep_shutdown(sqe, fd, SHUT_RDWR);
+ if (fixed_files)
+ sqe->flags |= IOSQE_FIXED_FILE;
+ sqe->flags |= IOSQE_IO_LINK | IOSQE_CQE_SKIP_SUCCESS;
+ sqe = get_sqe(ring);
+ if (fixed_files)
+ io_uring_prep_close_direct(sqe, fd);
+ else
+ io_uring_prep_close(sqe, fd);
+ encode_userdata(sqe, c, __SHUTDOWN, 0, 0, fd);
+}
+
+static int pending_shutdown(struct conn *c)
+{
+ return c->cd[0].pending_shutdown + c->cd[1].pending_shutdown;
+}
+
+static void __close_conn(struct io_uring *ring, struct conn *c)
+{
+ printf("Client %d: queueing shutdown\n", c->tid);
+
+ queue_shutdown_close(ring, c, c->in_fd);
+ queue_shutdown_close(ring, c, c->out_fd);
+ io_uring_submit(ring);
+}
+
+static void close_conn(struct conn *c, struct conn_dir *cd)
+{
+ if (cd->pending_sends)
+ return;
+
+ cd->pending_shutdown = 1;
+}
+
+static void __queue_send(struct io_uring *ring, struct conn *c, int fd,
+ void *data, int len, int bgid, int bid)
+{
+ struct conn_dir *cd = fd_to_conn_dir(c, fd);
+ struct io_uring_sqe *sqe;
+
+ if (verbose) {
+ printf("%d: send %d to fd %d (%p, bgid %d, bid %d)\n", c->tid,
+ len, fd, data, bgid, bid);
+ }
+
+ sqe = get_sqe(ring);
+ io_uring_prep_send(sqe, fd, data, len, MSG_WAITALL);
+ encode_userdata(sqe, c, __SEND, bgid, bid, fd);
+ if (fixed_files)
+ sqe->flags |= IOSQE_FIXED_FILE;
+ cd->pending_sends++;
+}
+
+/*
+ * Submit any deferred sends (see comment for defer_send()).
+ */
+static bool submit_deferred_send(struct io_uring *ring, struct conn *c,
+ struct conn_dir *cd)
+{
+ struct pending_send *ps;
+
+ if (list_empty(&cd->send_list)) {
+ if (verbose)
+ printf("%d: defer send %p empty\n", c->tid, cd);
+ return false;
+ }
+
if (verbose)
- printf("%d: enobufs: switch to bgid %d\n", c->tid, c->cur_br->bgid);
+ printf("%d: queueing deferred send %p\n", c->tid, cd);
- submit_receive(ring, c);
+ ps = list_first_entry(&cd->send_list, struct pending_send, list);
+ list_del(&ps->list);
+ __queue_send(ring, c, ps->fd, ps->data, ps->len, ps->bgid, ps->bid);
+ free(ps);
+ return true;
+}
+
+/*
+ * We have pending sends on this socket. Normally this is not an issue, but
+ * if we don't serialize sends, then we can get into a situation where the
+ * following can happen:
+ *
+ * 1) Submit sendA for socket1
+ * 2) socket1 buffer is full, poll is armed for sendA
+ * 3) socket1 space frees up
+ * 4) Poll triggers retry for sendA
+ * 5) Submit sendB for socket1
+ * 6) sendB completes
+ * 7) sendA is retried
+ *
+ * Regardless of the outcome of what happens with sendA in step 7 (it completes
+ * or it gets deferred because the socket1 buffer is now full again after sendB
+ * has been filled), we've now reordered the received data.
+ *
+ * This isn't a common occurence, but more likely with big buffers. If we never
+ * run into out-of-space in the socket, we could easily support having more than
+ * one send in-flight at the same time.
+ *
+ * Something to think about on the kernel side...
+ */
+static void defer_send(struct conn *c, struct conn_dir *cd, void *data,
+ int len, int bgid, int bid, int out_fd)
+{
+ struct pending_send *ps = malloc(sizeof(*ps));
+
+ if (verbose) {
+ printf("%d: defer send %d to fd %d (%p, bgid %d, bid %d)\n",
+ c->tid, len, out_fd, data, bgid, bid);
+ printf("%d: pending %d, %p\n", c->tid, cd->pending_sends, cd);
+ }
+
+ cd->snd_busy++;
+ ps->fd = out_fd;
+ ps->bgid = bgid;
+ ps->bid = bid;
+ ps->len = len;
+ ps->data = data;
+ list_add_tail(&ps->list, &cd->send_list);
+}
+
+static bool queue_send(struct io_uring *ring, struct conn *c, void *data,
+ int len, int bgid, int bid, int out_fd)
+{
+ struct conn_dir *cd = fd_to_conn_dir(c, out_fd);
+
+ if (cd->pending_sends) {
+ defer_send(c, cd, data, len, bgid, bid, out_fd);
+ return false;
+ }
+
+ __queue_send(ring, c, out_fd, data, len, bgid, bid);
+ return true;
}
static int handle_receive(struct io_uring *ring, struct conn *c,
struct io_uring_cqe *cqe, int *need_submit,
- int in_fd, int out_fd, uint64_t type)
+ int in_fd, int out_fd)
{
- uint64_t user_data = io_uring_cqe_get_data64(cqe);
+ struct conn_dir *cd = fd_to_conn_dir(c, in_fd);
struct conn_buf_ring *cbr;
- struct io_uring_sqe *sqe;
int bid, bgid, do_recv = !mshot;
int res = cqe->res;
void *ptr;
if (res < 0) {
if (res == -ENOBUFS) {
- handle_enobufs(ring, c);
+ handle_enobufs(ring, c, cd, in_fd);
*need_submit = 1;
return 0;
} else {
@@ -329,26 +631,35 @@ static int handle_receive(struct io_uring *ring, struct conn *c,
}
}
+ if (res != buf_size)
+ cd->rcv_shrt++;
+
if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
+ if (!res) {
+ close_conn(c, cd);
+ return 0;
+ }
fprintf(stderr, "no buffer assigned, res=%d\n", res);
return 1;
}
- c->rcv++;
+ cd->rcv++;
/*
* If multishot terminates, just submit a new one.
*/
if (mshot && !(cqe->flags & IORING_CQE_F_MORE)) {
- c->mshot_resubmit++;
+ cd->mshot_resubmit++;
do_recv = 1;
}
bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
- bgid = (user_data >> BGID_SHIFT) & BGID_MASK;
+ bgid = cqe_to_bgid(cqe);
- if (verbose)
- printf("%d: recv: bid=%d, bgid=%d, res=%d\n", c->tid, bid, bgid, res);
+ if (verbose) {
+ printf("%d: recv: bid=%d, bgid=%d, res=%d\n", c->tid, bid, bgid,
+ res);
+ }
cbr = &c->brs[bgid - c->start_bgid];
ptr = cbr->buf + bid * buf_size;
@@ -364,17 +675,11 @@ static int handle_receive(struct io_uring *ring, struct conn *c,
io_uring_buf_ring_advance(cbr->br, 1);
*need_submit = 0;
} else {
- sqe = get_sqe(ring);
- io_uring_prep_send(sqe, out_fd, ptr, res, 0);
- user_data = SEND_DATA | ((uint64_t) bid << BID_SHIFT) | c->tid;
- user_data |= ((uint64_t) bgid) << BGID_SHIFT;
- io_uring_sqe_set_data64(sqe, user_data);
- if (fixed_files)
- sqe->flags |= IOSQE_FIXED_FILE;
+ *need_submit = queue_send(ring, c, ptr, res, bgid, bid, out_fd);
}
c->rps++;
- c->bytes += res;
+ cd->in_bytes += res;
/*
* If we're not doing multishot receive, or if multishot receive
@@ -382,7 +687,7 @@ static int handle_receive(struct io_uring *ring, struct conn *c,
* has completed. Multishot will stay armed.
*/
if (do_recv) {
- __submit_receive(ring, c, in_fd, type);
+ __submit_receive(ring, c, in_fd);
*need_submit = 1;
}
@@ -391,15 +696,14 @@ static int handle_receive(struct io_uring *ring, struct conn *c,
static int handle_cqe(struct io_uring *ring, struct io_uring_cqe *cqe)
{
- uint64_t user_data = io_uring_cqe_get_data64(cqe);
- int conn_id = cqe->user_data & 0xff;
- struct conn *c = &conns[conn_id];
struct io_uring_sqe *sqe;
int res = cqe->res;
- int ret, need_submit = 1;
+ int ret = 0, need_submit = 1;
- switch (user_data >> OP_SHIFT) {
+ switch (cqe_to_op(cqe)) {
case __ACCEPT: {
+ struct conn *c;
+
if (res < 0) {
fprintf(stderr, "accept error %s\n", strerror(-res));
return 1;
@@ -414,10 +718,12 @@ static int handle_cqe(struct io_uring *ring, struct io_uring_cqe *cqe)
c->tid = nr_conns;
c->in_fd = res;
- printf("New client: %d/%d\n", nr_conns, c->in_fd);
+ printf("New client: id=%d, in=%d\n", nr_conns, c->in_fd);
nr_conns++;
setup_buffer_rings(ring, c);
+ init_list_head(&c->cd[0].send_list);
+ init_list_head(&c->cd[1].send_list);
if (is_sink) {
submit_receive(ring, c);
@@ -425,14 +731,18 @@ static int handle_cqe(struct io_uring *ring, struct io_uring_cqe *cqe)
}
sqe = get_sqe(ring);
- if (fixed_files)
- io_uring_prep_socket_direct_alloc(sqe, AF_INET, SOCK_STREAM, 0, 0);
- else
+ if (fixed_files) {
+ io_uring_prep_socket_direct_alloc(sqe, AF_INET,
+ SOCK_STREAM, 0, 0);
+ } else {
io_uring_prep_socket(sqe, AF_INET, SOCK_STREAM, 0, 0);
- io_uring_sqe_set_data64(sqe, SOCK_DATA | c->tid);
+ }
+ encode_userdata(sqe, c, __SOCK, 0, 0, 0);
break;
}
case __SOCK: {
+ struct conn *c = cqe_to_conn(cqe);
+
if (res < 0) {
fprintf(stderr, "socket error %s\n", strerror(-res));
return 1;
@@ -445,7 +755,8 @@ static int handle_cqe(struct io_uring *ring, struct io_uring_cqe *cqe)
memset(&c->addr, 0, sizeof(c->addr));
c->addr.sin_family = AF_INET;
c->addr.sin_port = htons(send_port);
- ret = inet_pton(AF_INET, host, (struct sockaddr *) &c->addr.sin_addr);
+ ret = inet_pton(AF_INET, host,
+ (struct sockaddr *) &c->addr.sin_addr);
if (ret <= 0) {
if (!ret)
fprintf(stderr, "host not in right format\n");
@@ -454,13 +765,18 @@ static int handle_cqe(struct io_uring *ring, struct io_uring_cqe *cqe)
return 1;
}
sqe = get_sqe(ring);
- io_uring_prep_connect(sqe, c->out_fd, (struct sockaddr *) &c->addr, sizeof(c->addr));
- io_uring_sqe_set_data64(sqe, CONNECT_DATA | c->tid);
+ io_uring_prep_connect(sqe, c->out_fd,
+ (struct sockaddr *) &c->addr,
+ sizeof(c->addr));
+ encode_userdata(sqe, c, __CONNECT, 0, 0, c->out_fd);
if (fixed_files)
sqe->flags |= IOSQE_FIXED_FILE;
+ ret = 0;
break;
}
case __CONNECT: {
+ struct conn *c = cqe_to_conn(cqe);
+
if (res < 0) {
fprintf(stderr, "connect error %s\n", strerror(-res));
return 1;
@@ -473,51 +789,96 @@ static int handle_cqe(struct io_uring *ring, struct io_uring_cqe *cqe)
break;
}
case __RECV: {
- handle_receive(ring, c, cqe, &need_submit, c->in_fd, c->out_fd, RECV_DATA);
- break;
+ struct conn *c = cqe_to_conn(cqe);
+ int fd = cqe_to_fd(cqe);
+
+ if (fd == c->in_fd) {
+ ret = handle_receive(ring, c, cqe, &need_submit,
+ c->in_fd, c->out_fd);
+ } else {
+ ret = handle_receive(ring, c, cqe, &need_submit,
+ c->out_fd, c->in_fd);
}
- case __RECV_OUT: {
- handle_receive(ring, c, cqe, &need_submit, c->out_fd, c->in_fd, RECV_OUT_DATA);
break;
}
case __SEND: {
+ struct conn *c = cqe_to_conn(cqe);
struct conn_buf_ring *cbr;
+ int fd = cqe_to_fd(cqe);
+ struct conn_dir *cd = fd_to_conn_dir(c, fd);
int bid, bgid;
void *ptr;
- c->snd++;
-
if (res < 0) {
fprintf(stderr, "send error %s\n", strerror(-res));
return 1;
}
- if (cqe->res != buf_size)
- c->shrt++;
+ cd->snd++;
+ cd->out_bytes += res;
- bid = (user_data >> BID_SHIFT) & BID_MASK;
- bgid = (user_data >> BGID_SHIFT) & BGID_MASK;
+ if (res != buf_size) {
+ cd->snd_shrt++;
+ }
+
+ bid = cqe_to_bid(cqe);
+ bgid = cqe_to_bgid(cqe);
if (verbose)
- printf("%d: send: bid=%d, bgid=%d, res=%d\n", c->tid, bid, bgid, res);
+ printf("%d: send: bid=%d, bgid=%d, res=%d\n", c->tid,
+ bid, bgid, res);
bgid -= c->start_bgid;
cbr = &c->brs[bgid];
ptr = cbr->buf + bid * buf_size;
+
io_uring_buf_ring_add(cbr->br, ptr, buf_size, bid, br_mask, 0);
io_uring_buf_ring_advance(cbr->br, 1);
+
need_submit = 0;
+
+ cd->pending_sends--;
+
+ if (verbose) {
+ printf("%d: pending sends %d\n", c->tid,
+ cd->pending_sends);
+ }
+
+ if (!cd->pending_sends) {
+ if (!res)
+ close_conn(c, cd);
+ else
+ need_submit = submit_deferred_send(ring, c, cd);
+ }
+ break;
+ }
+ case __SHUTDOWN: {
+ struct conn *c = cqe_to_conn(cqe);
+ int fd = cqe_to_fd(cqe);
+
+ c->flags |= CONN_F_DISCONNECTED;
+
+ printf("Closed client: id=%d, in=%d\n", nr_conns, fd);
+ if (fd == c->in_fd)
+ c->in_fd = -1;
+ else if (fd == c->out_fd)
+ c->out_fd = -1;
+ if (c->in_fd == -1 && c->out_fd == -1) {
+ __show_stats(c);
+ free_buffer_rings(ring, c);
+ }
break;
}
default:
- fprintf(stderr, "bad user data %lx\n", (long) user_data);
+ fprintf(stderr, "bad user data %lx\n", (long) cqe->user_data);
+ ret = 1;
break;
}
if (need_submit)
io_uring_submit(ring);
- return 0;
+ return ret;
}
static void usage(const char *name)
@@ -527,16 +888,32 @@ static void usage(const char *name)
printf("\t-d:\t\tUse DEFER_TASKRUN (%d)\n", defer_tw);
printf("\t-S:\t\tUse SQPOLL (%d)\n", sqpoll);
printf("\t-b:\t\tSend/receive buf size (%d)\n", buf_size);
- printf("\t-n:\t\tNumber of provided buffers (%d)\n", nr_bufs);
+ printf("\t-n:\t\tNumber of provided buffers (pow2) (%d)\n", nr_bufs);
printf("\t-s:\t\tAct only as a sink (%d)\n", is_sink);
printf("\t-f:\t\tUse only fixed files (%d)\n", fixed_files);
- printf("\t-B:\t\tUse bi-directiona mode (%d)\n", bidi);
+ printf("\t-B:\t\tUse bi-directional mode (%d)\n", bidi);
printf("\t-h:\t\tHost to connect to (%s)\n", host);
printf("\t-r:\t\tPort to receive on (%d)\n", receive_port);
printf("\t-p:\t\tPort to connect to (%d)\n", send_port);
printf("\t-V:\t\tIncrease verbosity (%d)\n", verbose);
}
+static void check_for_close(struct io_uring *ring)
+{
+ int i;
+
+ for (i = 0; i < nr_conns; i++) {
+ struct conn *c = &conns[i];
+
+ if (c->flags & (CONN_F_DISCONNECTING | CONN_F_DISCONNECTED))
+ continue;
+ if (pending_shutdown(c)) {
+ __close_conn(ring, c);
+ c->flags |= CONN_F_DISCONNECTING;
+ }
+ }
+}
+
int main(int argc, char *argv[])
{
struct io_uring_sqe *sqe;
@@ -659,10 +1036,14 @@ int main(int argc, char *argv[])
io_uring_prep_multishot_accept_direct(sqe, fd, NULL, NULL, 0);
else
io_uring_prep_multishot_accept(sqe, fd, NULL, NULL, 0);
- io_uring_sqe_set_data64(sqe, ACCEPT_DATA);
+ __encode_userdata(sqe, 0, 0, 0, 0, fd);
io_uring_submit(&ring);
while (1) {
+ struct __kernel_timespec ts = {
+ .tv_sec = 0,
+ .tv_nsec = 100000000ULL,
+ };
struct io_uring_cqe *cqe;
unsigned int head;
unsigned int i = 0;
@@ -673,7 +1054,7 @@ int main(int argc, char *argv[])
to_wait = nr_conns;
to_wait = 1;
- io_uring_wait_cqes(&ring, &cqe, to_wait, NULL, NULL);
+ io_uring_wait_cqes(&ring, &cqe, to_wait, &ts, NULL);
io_uring_for_each_cqe(&ring, head, cqe) {
if (handle_cqe(&ring, cqe))
@@ -683,6 +1064,8 @@ int main(int argc, char *argv[])
if (i)
io_uring_cq_advance(&ring, i);
+ else
+ check_for_close(&ring);
}
return 0;