diff options
author | David Howells <dhowells@redhat.com> | 2024-04-11 08:20:14 +0100 |
---|---|---|
committer | David Howells <dhowells@redhat.com> | 2024-04-16 14:52:15 +0100 |
commit | 0a6ac49e304f1644db056801640e1976a83a484f (patch) | |
tree | 23a787f5c55b895c0a589a2364fe3c25e415f0e8 | |
parent | 8fd3f9be1ee78a21c4a4d5b4f748dc8862246ca0 (diff) | |
download | linux-fs-rxrpc-iothread.tar.gz |
rxrpc: Use the new rxrpc_tx_queue struct to more efficiently process ACKsrxrpc-iothread
With the change in the structure of the transmission buffer to store
buffers in bunches of 32 or 64 (BITS_PER_LONG) we can place sets of
per-buffer flags into the rxrpc_tx_queue struct rather than storing them in
rxrpc_tx_buf, thereby vastly increasing efficiency when assessing the SACK
table in an ACK packet.
Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: "David S. Miller" <davem@davemloft.net>
cc: Eric Dumazet <edumazet@google.com>
cc: Jakub Kicinski <kuba@kernel.org>
cc: Paolo Abeni <pabeni@redhat.com>
cc: linux-afs@lists.infradead.org
cc: netdev@vger.kernel.org
-rw-r--r-- | include/trace/events/rxrpc.h | 146 | ||||
-rw-r--r-- | net/rxrpc/ar-internal.h | 25 | ||||
-rw-r--r-- | net/rxrpc/call_event.c | 241 | ||||
-rw-r--r-- | net/rxrpc/call_object.c | 1 | ||||
-rw-r--r-- | net/rxrpc/input.c | 248 | ||||
-rw-r--r-- | net/rxrpc/output.c | 35 | ||||
-rw-r--r-- | net/rxrpc/sendmsg.c | 3 |
7 files changed, 472 insertions, 227 deletions
diff --git a/include/trace/events/rxrpc.h b/include/trace/events/rxrpc.h index 2d085a19414071..423ba575191c73 100644 --- a/include/trace/events/rxrpc.h +++ b/include/trace/events/rxrpc.h @@ -131,7 +131,6 @@ EM(rxrpc_skb_get_call_rx, "GET call-rx ") \ EM(rxrpc_skb_get_conn_secured, "GET conn-secd") \ EM(rxrpc_skb_get_conn_work, "GET conn-work") \ - EM(rxrpc_skb_get_last_nack, "GET last-nack") \ EM(rxrpc_skb_get_local_work, "GET locl-work") \ EM(rxrpc_skb_get_reject_work, "GET rej-work ") \ EM(rxrpc_skb_get_to_recvmsg, "GET to-recv ") \ @@ -146,7 +145,6 @@ EM(rxrpc_skb_put_error_report, "PUT error-rep") \ EM(rxrpc_skb_put_input, "PUT input ") \ EM(rxrpc_skb_put_jumbo_subpacket, "PUT jumbo-sub") \ - EM(rxrpc_skb_put_last_nack, "PUT last-nack") \ EM(rxrpc_skb_put_purge, "PUT purge ") \ EM(rxrpc_skb_put_rotate, "PUT rotate ") \ EM(rxrpc_skb_put_unknown, "PUT unknown ") \ @@ -496,6 +494,11 @@ EM(rxrpc_pmtud_reduce_icmp, "Icmp ") \ E_(rxrpc_pmtud_reduce_route, "Route") +#define rxrpc_rotate_traces \ + EM(rxrpc_rotate_trace_hack, "hard-ack") \ + EM(rxrpc_rotate_trace_sack, "soft-ack") \ + E_(rxrpc_rotate_trace_snak, "soft-nack") + /* * Generate enums for tracing information. */ @@ -522,6 +525,7 @@ enum rxrpc_propose_ack_trace { rxrpc_propose_ack_traces } __mode(byte); enum rxrpc_receive_trace { rxrpc_receive_traces } __mode(byte); enum rxrpc_recvmsg_trace { rxrpc_recvmsg_traces } __mode(byte); enum rxrpc_req_ack_trace { rxrpc_req_ack_traces } __mode(byte); +enum rxrpc_rotate_trace { rxrpc_rotate_traces } __mode(byte); enum rxrpc_rtt_rx_trace { rxrpc_rtt_rx_traces } __mode(byte); enum rxrpc_rtt_tx_trace { rxrpc_rtt_tx_traces } __mode(byte); enum rxrpc_sack_trace { rxrpc_sack_traces } __mode(byte); @@ -559,6 +563,7 @@ rxrpc_propose_ack_traces; rxrpc_receive_traces; rxrpc_recvmsg_traces; rxrpc_req_ack_traces; +rxrpc_rotate_traces; rxrpc_rtt_rx_traces; rxrpc_rtt_tx_traces; rxrpc_sack_traces; @@ -1610,13 +1615,14 @@ TRACE_EVENT(rxrpc_drop_ack, ); TRACE_EVENT(rxrpc_retransmit, - TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t seq, + TP_PROTO(struct rxrpc_call *call, struct rxrpc_txqueue *tq, rxrpc_seq_t seq, rxrpc_serial_t serial, ktime_t expiry), - TP_ARGS(call, seq, serial, expiry), + TP_ARGS(call, tq, seq, serial, expiry), TP_STRUCT__entry( __field(unsigned int, call) + __field(unsigned int, qbase) __field(rxrpc_seq_t, seq) __field(rxrpc_serial_t, serial) __field(ktime_t, expiry) @@ -1624,13 +1630,15 @@ TRACE_EVENT(rxrpc_retransmit, TP_fast_assign( __entry->call = call->debug_id; + __entry->qbase = tq->qbase; __entry->seq = seq; __entry->serial = serial; __entry->expiry = expiry; ), - TP_printk("c=%08x q=%x r=%x xp=%lld", + TP_printk("c=%08x tq=%x q=%x r=%x xp=%lld", __entry->call, + __entry->qbase, __entry->seq, __entry->serial, ktime_to_us(__entry->expiry)) @@ -1662,7 +1670,7 @@ TRACE_EVENT(rxrpc_congest, memcpy(&__entry->sum, summary, sizeof(__entry->sum)); ), - TP_printk("c=%08x r=%08x %s q=%08x %s cw=%u ss=%u nA=%u,%u+%u,%u b=%u u=%u d=%u l=%x%s%s%s", + TP_printk("c=%08x r=%08x %s q=%08x %s cw=%u ss=%u A=%u+%u/%u+%u r=%u b=%u u=%u d=%u l=%x%s%s%s", __entry->call, __entry->ack_serial, __print_symbolic(__entry->sum.ack_reason, rxrpc_ack_names), @@ -1670,9 +1678,9 @@ TRACE_EVENT(rxrpc_congest, __print_symbolic(__entry->sum.mode, rxrpc_congest_modes), __entry->sum.cwnd, __entry->sum.ssthresh, - __entry->sum.nr_acks, __entry->sum.nr_retained_nacks, - __entry->sum.nr_new_acks, - __entry->sum.nr_new_nacks, + __entry->sum.nr_acks, __entry->sum.nr_new_acks, + __entry->sum.nr_nacks, __entry->sum.nr_new_nacks, + __entry->sum.nr_rot_new_acks, __entry->top - __entry->hard_ack, __entry->sum.cumulative_acks, __entry->sum.dup_acks, @@ -1788,10 +1796,36 @@ TRACE_EVENT(rxrpc_connect_call, &__entry->srx.transport) ); +TRACE_EVENT(rxrpc_apply_acks, + TP_PROTO(struct rxrpc_call *call, struct rxrpc_txqueue *tq), + + TP_ARGS(call, tq), + + TP_STRUCT__entry( + __field(unsigned int, call) + __field(unsigned int, nr_rep) + __field(rxrpc_seq_t, qbase) + __field(unsigned long, acks) + ), + + TP_fast_assign( + __entry->call = call->debug_id; + __entry->qbase = tq->qbase; + __entry->acks = tq->segment_acked; + __entry->nr_rep = tq->nr_reported_acks; + ), + + TP_printk("c=%08x tq=%x acks=%016lx rep=%u", + __entry->call, + __entry->qbase, + __entry->acks, + __entry->nr_rep) + ); + TRACE_EVENT(rxrpc_resend, - TP_PROTO(struct rxrpc_call *call, struct sk_buff *ack), + TP_PROTO(struct rxrpc_call *call, rxrpc_serial_t ack_serial), - TP_ARGS(call, ack), + TP_ARGS(call, ack_serial), TP_STRUCT__entry( __field(unsigned int, call) @@ -1801,11 +1835,10 @@ TRACE_EVENT(rxrpc_resend, ), TP_fast_assign( - struct rxrpc_skb_priv *sp = ack ? rxrpc_skb(ack) : NULL; __entry->call = call->debug_id; __entry->seq = call->acks_hard_ack; __entry->transmitted = call->tx_transmitted; - __entry->ack_serial = sp ? sp->hdr.serial : 0; + __entry->ack_serial = ack_serial; ), TP_printk("c=%08x r=%x q=%x tq=%x", @@ -1815,6 +1848,93 @@ TRACE_EVENT(rxrpc_resend, __entry->transmitted) ); +TRACE_EVENT(rxrpc_resend_naks, + TP_PROTO(struct rxrpc_call *call, struct rxrpc_txqueue *tq, unsigned long naks), + + TP_ARGS(call, tq, naks), + + TP_STRUCT__entry( + __field(unsigned int, call) + __field(rxrpc_seq_t, qbase) + __field(u8, nr_rep) + __field(unsigned long, naks) + ), + + TP_fast_assign( + __entry->call = call->debug_id; + __entry->qbase = tq->qbase; + __entry->nr_rep = tq->nr_reported_acks; + __entry->naks = naks; + ), + + TP_printk("c=%08x tq=%x naks=%016lx nr=%u", + __entry->call, + __entry->qbase, + __entry->naks, + __entry->nr_rep) + ); + +TRACE_EVENT(rxrpc_resend_unrep, + TP_PROTO(struct rxrpc_call *call, struct rxrpc_txqueue *tq, + rxrpc_seq_t start, rxrpc_seq_t stop), + + TP_ARGS(call, tq, start, stop), + + TP_STRUCT__entry( + __field(unsigned int, call) + __field(rxrpc_seq_t, qbase) + __field(rxrpc_seq_t, prev) + __field(rxrpc_seq_t, start) + __field(rxrpc_seq_t, stop) + ), + + TP_fast_assign( + __entry->call = call->debug_id; + __entry->qbase = tq->qbase; + __entry->prev = call->acks_prev_seq; + __entry->start = start; + __entry->stop = stop; + ), + + TP_printk("c=%08x tq=%x pq=%x q=%x-%x", + __entry->call, + __entry->qbase, + __entry->prev, + __entry->start, + __entry->stop) + ); + +TRACE_EVENT(rxrpc_rotate, + TP_PROTO(struct rxrpc_call *call, struct rxrpc_txqueue *tq, + struct rxrpc_ack_summary *summary, rxrpc_seq_t seq, + enum rxrpc_rotate_trace trace), + + TP_ARGS(call, tq, summary, seq, trace), + + TP_STRUCT__entry( + __field(unsigned int, call) + __field(rxrpc_seq_t, qbase) + __field(rxrpc_seq_t, seq) + __field(unsigned int, nr_rep) + __field(enum rxrpc_rotate_trace, trace) + ), + + TP_fast_assign( + __entry->call = call->debug_id; + __entry->qbase = tq->qbase; + __entry->seq = seq; + __entry->nr_rep = tq->nr_reported_acks; + __entry->trace = trace; + ), + + TP_printk("c=%08x tq=%x q=%x nr=%x %s", + __entry->call, + __entry->qbase, + __entry->seq, + __entry->nr_rep, + __print_symbolic(__entry->trace, rxrpc_rotate_traces)) + ); + TRACE_EVENT(rxrpc_rx_icmp, TP_PROTO(struct rxrpc_peer *peer, struct sock_extended_err *ee, struct sockaddr_rxrpc *srx), diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h index 5574b27d0a1ca0..c16c048b2f223a 100644 --- a/net/rxrpc/ar-internal.h +++ b/net/rxrpc/ar-internal.h @@ -213,9 +213,8 @@ struct rxrpc_skb_priv { rxrpc_seq_t first_ack; /* First packet in acks table */ rxrpc_seq_t prev_ack; /* Highest seq seen */ rxrpc_serial_t acked_serial; /* Packet in response to (or 0) */ + u16 nr_acks; /* Number of acks+nacks */ u8 reason; /* Reason for ack */ - u8 nr_acks; /* Number of acks+nacks */ - u8 nr_nacks; /* Number of nacks */ } ack; }; struct rxrpc_host_header hdr; /* RxRPC packet header from this packet */ @@ -731,7 +730,6 @@ struct rxrpc_call { u8 cong_dup_acks; /* Count of ACKs showing missing packets */ u8 cong_cumul_acks; /* Cumulative ACK count */ ktime_t cong_tstamp; /* Last time cwnd was changed */ - struct sk_buff *cong_last_nack; /* Last ACK with nacks received */ /* Receive-phase ACK management (ACKs we send). */ u8 ackr_reason; /* reason to ACK */ @@ -761,6 +759,8 @@ struct rxrpc_call { rxrpc_seq_t acks_hard_ack; /* Latest hard-ack point */ rxrpc_seq_t acks_lowest_nak; /* Lowest NACK in the buffer (or ==tx_hard_ack) */ rxrpc_serial_t acks_highest_serial; /* Highest serial number ACK'd */ + unsigned short acks_nr_acks; /* Number of soft acks recorded */ + unsigned short acks_nr_nacks; /* Number of soft nacks recorded */ }; /* @@ -768,13 +768,14 @@ struct rxrpc_call { */ struct rxrpc_ack_summary { u16 nr_acks; /* Number of ACKs in packet */ + u16 nr_nacks; /* Number of nACKs in packet */ u16 nr_new_acks; /* Number of new ACKs in packet */ u16 nr_new_nacks; /* Number of new nacks in packet */ - u16 nr_retained_nacks; /* Number of nacks retained between ACKs */ + u16 nr_rot_new_acks; /* Number of rotated new ACKs */ u8 ack_reason; - bool saw_nacks; /* Saw NACKs in packet */ bool new_low_nack; /* T if new low NACK found */ bool retrans_timeo; /* T if reTx due to timeout happened */ + bool need_retransmit; /* T if we need transmission */ u8 flight_size; /* Number of unreceived transmissions */ /* Place to stash values for tracing */ enum rxrpc_congest_mode mode:8; @@ -859,9 +860,13 @@ struct rxrpc_txqueue { * in usec from the base. */ unsigned int segment_xmit_ts[RXRPC_NR_TXQUEUE]; + unsigned long segment_acked; /* Bit-per-buf: Set if ACK'd */ + unsigned long segment_lost; /* Bit-per-buf: Set if declared lost */ + unsigned long segment_retransmitted; /* Bit-per-buf: Set if retransmitted */ ktime_t xmit_ts_base; rxrpc_seq_t qbase; + u8 nr_reported_acks; /* Number of segments explicitly acked/nacked */ struct rxrpc_txqueue *next; }; @@ -922,7 +927,7 @@ void rxrpc_propose_ping(struct rxrpc_call *call, u32 serial, enum rxrpc_propose_ack_trace why); void rxrpc_propose_delay_ACK(struct rxrpc_call *, rxrpc_serial_t, enum rxrpc_propose_ack_trace); -void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb); +void rxrpc_resend(struct rxrpc_call *call, rxrpc_serial_t ack_serial, bool ping_response); bool rxrpc_input_call_event(struct rxrpc_call *call); @@ -1371,6 +1376,14 @@ static inline bool after_eq(u32 seq1, u32 seq2) { return (s32)(seq1 - seq2) >= 0; } +static inline u32 earliest(u32 seq1, u32 seq2) +{ + return before(seq1, seq2) ? seq1 : seq2; +} +static inline u32 latest(u32 seq1, u32 seq2) +{ + return after(seq1, seq2) ? seq1 : seq2; +} static inline void rxrpc_queue_rx_call_packet(struct rxrpc_call *call, struct sk_buff *skb) { diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c index 0e26758b9042bb..cd4c1c043523b1 100644 --- a/net/rxrpc/call_event.c +++ b/net/rxrpc/call_event.c @@ -62,134 +62,161 @@ static void rxrpc_congestion_timeout(struct rxrpc_call *call) set_bit(RXRPC_CALL_RETRANS_TIMEOUT, &call->flags); } +struct rxrpc_resend_state { + ktime_t now; + ktime_t rto; + struct rxrpc_txqueue *tq; + rxrpc_seq_t seq; + int n; + bool did_send; +}; + +/* + * Retransmit one or more packets. + */ +static bool rxrpc_retransmit_data(struct rxrpc_call *call, + struct rxrpc_resend_state *state, + bool skip_too_young) +{ + struct rxrpc_txqueue *tq = state->tq; + unsigned int ix = state->seq & RXRPC_TXQ_MASK; + struct rxrpc_txbuf *txb = tq->bufs[ix]; + ktime_t xmit_ts, resend_at; + + _enter("%x,%x,%x,%px", tq->qbase, state->seq, ix, txb); + + xmit_ts = ktime_add_us(tq->xmit_ts_base, tq->segment_xmit_ts[ix]); + resend_at = ktime_add(xmit_ts, state->rto); + + trace_rxrpc_retransmit(call, tq, state->seq, txb->serial, + ktime_sub(resend_at, state->now)); + if (skip_too_young && ktime_after(resend_at, state->now)) + return false; + + __set_bit(ix, &tq->segment_retransmitted); + txb->flags |= RXRPC_TXBUF_RESENT; + rxrpc_transmit_data(call, tq, state->seq, state->n); + rxrpc_inc_stat(call->rxnet, stat_tx_data_retrans); + + state->tq = NULL; + state->n = 0; + state->did_send = true; + state->now = ktime_get_real(); + return true; +} + /* * Perform retransmission of NAK'd and unack'd packets. */ -void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb) +void rxrpc_resend(struct rxrpc_call *call, rxrpc_serial_t ack_serial, bool ping_response) { - struct rxrpc_ackpacket *ack = NULL; - struct rxrpc_skb_priv *sp; - struct rxrpc_txqueue *tq; - struct rxrpc_txbuf *txb; - rxrpc_seq_t transmitted = call->tx_transmitted, qbase, seq; - ktime_t next_resend = KTIME_MAX, rto = rxrpc_get_rto_backoff(call->peer, false); - ktime_t resend_at = KTIME_MAX, now, delay; - bool unacked = false, did_send = false; - unsigned int qix; + struct rxrpc_resend_state state = { + .rto = rxrpc_get_rto_backoff(call->peer, false), + .now = ktime_get_real(), + }; + struct rxrpc_txqueue *tq = call->tx_queue; + ktime_t lowest_xmit_ts = KTIME_MAX; + bool unacked = false; _enter("{%d,%d}", call->acks_hard_ack, call->tx_top); - now = ktime_get_real(); - if (call->tx_bottom == call->tx_top) - goto no_resend; + if (call->tx_bottom == call->tx_top) { + call->resend_at = KTIME_MAX; + trace_rxrpc_timer_can(call, rxrpc_timer_trace_resend); + return; + } - trace_rxrpc_resend(call, ack_skb); - qbase = call->tx_qbase; - tq = call->tx_queue; - seq = call->tx_bottom; + trace_rxrpc_resend(call, ack_serial); - /* Scan the soft ACK table and resend any explicitly NAK'd packets. */ - if (ack_skb) { - sp = rxrpc_skb(ack_skb); - ack = (void *)ack_skb->data + sizeof(struct rxrpc_wire_header); + /* Scan the transmission queue, looking for explicitly NAK'd packets. */ + do { + unsigned long naks = ~tq->segment_acked; + rxrpc_seq_t tq_top = tq->qbase + RXRPC_NR_TXQUEUE - 1; - for (int i = 0; i < sp->ack.nr_acks; i++) { - rxrpc_seq_t aseq; + if (tq->nr_reported_acks < RXRPC_NR_TXQUEUE) + naks &= (1UL << tq->nr_reported_acks) - 1; - if (ack->acks[i] & 1) - continue; - aseq = sp->ack.first_ack + i; - while (after_eq(aseq, qbase + RXRPC_NR_TXQUEUE)) { - qbase += RXRPC_NR_TXQUEUE; - tq = tq->next; - } - seq = aseq; - qix = seq - qbase; - txb = tq->bufs[qix]; - if (after(seq, transmitted)) - goto no_further_resend; - - resend_at = ktime_add_us(tq->xmit_ts_base, tq->segment_xmit_ts[qix]); - resend_at = ktime_add(resend_at, rto); - if (after(txb->serial, call->acks_highest_serial)) { - if (ktime_after(resend_at, now) && - ktime_before(resend_at, next_resend)) - next_resend = resend_at; - continue; /* Ack point not yet reached */ - } + _debug("retr %16lx %u c=%08x [%x]", + tq->segment_acked, tq->nr_reported_acks, call->debug_id, tq->qbase); + _debug("nack %16lx", naks); - rxrpc_see_txbuf(txb, rxrpc_txbuf_see_unacked); + trace_rxrpc_resend_naks(call, tq, naks); + while (naks) { + unsigned int ix = __ffs(naks); + struct rxrpc_txbuf *txb = tq->bufs[ix]; - trace_rxrpc_retransmit(call, seq, txb->serial, - ktime_sub(resend_at, now)); + __clear_bit(ix, &naks); + if (after(txb->serial, call->acks_highest_serial)) + continue; /* Ack point not yet reached */ - txb->flags |= RXRPC_TXBUF_RESENT; - rxrpc_transmit_data(call, tq, seq, 1); - did_send = true; - now = ktime_get_real(); + rxrpc_see_txbuf(txb, rxrpc_txbuf_see_unacked); - if (after_eq(seq, call->tx_top)) - goto no_further_resend; + state.tq = tq; + state.seq = tq->qbase + ix; + state.n = 1; + rxrpc_retransmit_data(call, &state, false); } - } - - /* Fast-forward through the Tx queue to the point the peer says it has - * seen. Anything between the soft-ACK table and that point will get - * ACK'd or NACK'd in due course, so don't worry about it here; here we - * need to consider retransmitting anything beyond that point. - */ - seq = call->acks_prev_seq; - if (after_eq(seq, call->tx_transmitted)) - goto no_further_resend; - seq++; - - while (after_eq(seq, qbase + RXRPC_NR_TXQUEUE)) { - qbase += RXRPC_NR_TXQUEUE; - tq = tq->next; - } - while (before_eq(seq, call->tx_transmitted)) { - qix = seq - qbase; - if (qix >= RXRPC_NR_TXQUEUE) { - qbase += RXRPC_NR_TXQUEUE; - tq = tq->next; - continue; + /* Anything after the soft-ACK table up to and including + * ack.previousPacket will get ACK'd or NACK'd in due course, + * so don't worry about those here. We do, however, need to + * consider retransmitting anything beyond that point. + */ + if (after_eq(call->tx_transmitted, tq->qbase) && + tq->nr_reported_acks < RXRPC_NR_TXQUEUE && + after(tq_top, call->acks_prev_seq)) { + rxrpc_seq_t start = latest(call->acks_prev_seq, + tq->qbase + tq->nr_reported_acks); + rxrpc_seq_t stop = earliest(tq_top, call->tx_transmitted); + + trace_rxrpc_resend_unrep(call, tq, start, stop); + _debug("unrep %x-%x", start, stop); + for (rxrpc_seq_t seq = start; before(seq, stop); seq++) { + struct rxrpc_txbuf *txb = tq->bufs[seq & RXRPC_TXQ_MASK]; + + if (ping_response && + before(txb->serial, call->acks_highest_serial)) + break; /* Wasn't accounted for by a more recent ping. */ + state.tq = tq; + state.seq = seq; + state.n = 1; + if (rxrpc_retransmit_data(call, &state, true)) + unacked = true; + } } - txb = tq->bufs[qix]; - resend_at = ktime_add_us(tq->xmit_ts_base, tq->segment_xmit_ts[qix]); - resend_at = ktime_add(resend_at, rto); - if (ack && ack->reason == RXRPC_ACK_PING_RESPONSE && - before(txb->serial, ntohl(ack->serial))) - goto do_resend; /* Wasn't accounted for by a more recent ping. */ - - if (ktime_after(resend_at, now)) { - if (ktime_before(resend_at, next_resend)) - next_resend = resend_at; - seq++; - continue; + /* Work out the next retransmission timeout. */ + if (after_eq(call->tx_transmitted, tq->qbase) && + ktime_before(tq->xmit_ts_base, lowest_xmit_ts)) { + unsigned int lowest_us = UINT_MAX; + + for (int i = 0; i < RXRPC_NR_TXQUEUE; i++) + if (!test_bit(i, &tq->segment_acked) && + tq->segment_xmit_ts[i] < lowest_us) + lowest_us = tq->segment_xmit_ts[i]; + _debug("lowest[%x] %llx %u", tq->qbase, tq->xmit_ts_base, lowest_us); + + if (lowest_us != UINT_MAX) { + ktime_t lowest_ns = ktime_add_us(tq->xmit_ts_base, lowest_us); + if (ktime_before(lowest_ns, lowest_xmit_ts)) + lowest_xmit_ts = lowest_ns; + } } + } while ((tq = tq->next)); - do_resend: - unacked = true; - - txb->flags |= RXRPC_TXBUF_RESENT; - rxrpc_transmit_data(call, tq, seq, 1); - did_send = true; - rxrpc_inc_stat(call->rxnet, stat_tx_data_retrans); - now = ktime_get_real(); - seq++; - } + if (lowest_xmit_ts < KTIME_MAX) { + ktime_t delay = rxrpc_get_rto_backoff(call->peer, state.did_send); + ktime_t resend_at = ktime_add(lowest_xmit_ts, delay); -no_further_resend: -no_resend: - if (resend_at < KTIME_MAX) { - delay = rxrpc_get_rto_backoff(call->peer, did_send); - resend_at = ktime_add(resend_at, delay); - trace_rxrpc_timer_set(call, resend_at - now, rxrpc_timer_trace_resend_reset); + _debug("delay %llu %lld", delay, ktime_sub(resend_at, state.now)); + call->resend_at = resend_at; + trace_rxrpc_timer_set(call, resend_at - state.now, + rxrpc_timer_trace_resend_reset); + } else { + call->resend_at = KTIME_MAX; + trace_rxrpc_timer_can(call, rxrpc_timer_trace_resend); } - call->resend_at = resend_at; if (unacked) rxrpc_congestion_timeout(call); @@ -198,11 +225,11 @@ no_resend: * that an ACK got lost somewhere. Send a ping to find out instead of * retransmitting data. */ - if (!did_send) { + if (!state.did_send) { ktime_t next_ping = ktime_add_us(call->acks_latest_ts, call->peer->srtt_us >> 3); - if (ktime_sub(next_ping, now) <= 0) + if (ktime_sub(next_ping, state.now) <= 0) rxrpc_send_ACK(call, RXRPC_ACK_PING, 0, rxrpc_propose_ack_ping_for_0_retrans); } @@ -468,7 +495,7 @@ bool rxrpc_input_call_event(struct rxrpc_call *call) if (resend && __rxrpc_call_state(call) != RXRPC_CALL_CLIENT_RECV_REPLY && !test_bit(RXRPC_CALL_TX_ALL_ACKED, &call->flags)) - rxrpc_resend(call, NULL); + rxrpc_resend(call, 0, false); if (test_and_clear_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags)) rxrpc_send_ACK(call, RXRPC_ACK_IDLE, 0, diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c index 6c0612810fd479..519e6329bb1191 100644 --- a/net/rxrpc/call_object.c +++ b/net/rxrpc/call_object.c @@ -693,7 +693,6 @@ static void rxrpc_destroy_call(struct work_struct *work) del_timer_sync(&call->timer); - rxrpc_free_skb(call->cong_last_nack, rxrpc_skb_put_last_nack); rxrpc_cleanup_tx_buffers(call); rxrpc_cleanup_rx_buffers(call); rxrpc_put_txbuf(call->tx_pending, rxrpc_txbuf_put_cleaned); diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c index e93225c6061d8f..00e2faa4963d32 100644 --- a/net/rxrpc/input.c +++ b/net/rxrpc/input.c @@ -37,7 +37,6 @@ static void rxrpc_congestion_management(struct rxrpc_call *call, enum rxrpc_congest_change change = rxrpc_cong_no_change; unsigned int cumulative_acks = call->cong_cumul_acks; unsigned int cwnd = call->cong_cwnd; - bool resend = false; summary->flight_size = (call->tx_top - call->acks_hard_ack) - summary->nr_acks; @@ -56,6 +55,7 @@ static void rxrpc_congestion_management(struct rxrpc_call *call, } cumulative_acks += summary->nr_new_acks; + cumulative_acks += summary->nr_rot_new_acks; if (cumulative_acks > 255) cumulative_acks = 255; @@ -66,7 +66,7 @@ static void rxrpc_congestion_management(struct rxrpc_call *call, switch (call->cong_mode) { case RXRPC_CALL_SLOW_START: - if (summary->saw_nacks) + if (summary->nr_nacks > 0) goto packet_loss_detected; if (summary->cumulative_acks > 0) cwnd += 1; @@ -77,7 +77,7 @@ static void rxrpc_congestion_management(struct rxrpc_call *call, goto out; case RXRPC_CALL_CONGEST_AVOIDANCE: - if (summary->saw_nacks) + if (summary->nr_nacks > 0) goto packet_loss_detected; /* We analyse the number of packets that get ACK'd per RTT @@ -96,7 +96,7 @@ static void rxrpc_congestion_management(struct rxrpc_call *call, goto out; case RXRPC_CALL_PACKET_LOSS: - if (!summary->saw_nacks) + if (summary->nr_nacks == 0) goto resume_normality; if (summary->new_low_nack) { @@ -118,7 +118,7 @@ static void rxrpc_congestion_management(struct rxrpc_call *call, cwnd = call->cong_ssthresh + 3; call->cong_extra = 0; call->cong_dup_acks = 0; - resend = true; + summary->need_retransmit = true; goto out; case RXRPC_CALL_FAST_RETRANSMIT: @@ -129,12 +129,12 @@ static void rxrpc_congestion_management(struct rxrpc_call *call, if (call->cong_dup_acks == 2) { change = rxrpc_cong_retransmit_again; call->cong_dup_acks = 0; - resend = true; + summary->need_retransmit = true; } } else { change = rxrpc_cong_progress; cwnd = call->cong_ssthresh; - if (!summary->saw_nacks) + if (summary->nr_nacks == 0) goto resume_normality; } goto out; @@ -162,8 +162,6 @@ out_no_clear_ca: call->cong_cumul_acks = cumulative_acks; summary->mode = call->cong_mode; trace_rxrpc_congest(call, summary, acked_serial, change); - if (resend) - rxrpc_resend(call, skb); return; packet_loss_detected: @@ -226,6 +224,13 @@ static bool rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to, trace_rxrpc_tx_rotate(call, seq, to); trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate); + if (call->acks_lowest_nak == call->acks_hard_ack) { + call->acks_lowest_nak = to; + } else if (after(to, call->acks_lowest_nak)) { + summary->new_low_nack = true; + call->acks_lowest_nak = to; + } + /* We may have a left over fully-consumed buffer at the front that we * couldn't drop before (rotate_and_keep below). */ @@ -245,6 +250,23 @@ static bool rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to, set_bit(RXRPC_CALL_TX_LAST, &call->flags); rot_last = true; } + + if (ix == tq->nr_reported_acks) { + __set_bit(ix, &tq->segment_acked); + tq->nr_reported_acks++; + summary->nr_rot_new_acks++; + trace_rxrpc_rotate(call, tq, summary, seq, rxrpc_rotate_trace_hack); + } else { + if (__test_and_set_bit(ix, &tq->segment_acked)) { + summary->nr_acks--; + trace_rxrpc_rotate(call, tq, summary, seq, rxrpc_rotate_trace_sack); + } else { + summary->nr_nacks--; + summary->nr_rot_new_acks++; + trace_rxrpc_rotate(call, tq, summary, seq, rxrpc_rotate_trace_snak); + } + } + rxrpc_put_txbuf(tq->bufs[ix], rxrpc_txbuf_put_rotated); tq->bufs[ix] = NULL; @@ -282,13 +304,6 @@ static bool rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to, _debug("%x,%x,%x,%d", to, call->acks_hard_ack, call->tx_top, rot_last); - if (call->acks_lowest_nak == call->acks_hard_ack) { - call->acks_lowest_nak = to; - } else if (after(to, call->acks_lowest_nak)) { - summary->new_low_nack = true; - call->acks_lowest_nak = to; - } - wake_up(&call->waitq); return rot_last; } @@ -307,11 +322,6 @@ static void rxrpc_end_tx_phase(struct rxrpc_call *call, bool reply_begun, call->resend_at = KTIME_MAX; trace_rxrpc_timer_can(call, rxrpc_timer_trace_resend); - if (unlikely(call->cong_last_nack)) { - rxrpc_free_skb(call->cong_last_nack, rxrpc_skb_put_last_nack); - call->cong_last_nack = NULL; - } - switch (__rxrpc_call_state(call)) { case RXRPC_CALL_CLIENT_SEND_REQUEST: case RXRPC_CALL_CLIENT_AWAIT_REPLY: @@ -787,40 +797,91 @@ static void rxrpc_input_ack_trailer(struct rxrpc_call *call, struct sk_buff *skb wake_up(&call->waitq); } +#ifdef CONFIG_X86 +#define shiftr_adv_rotr(shift_from, rotate_into) ({ \ + asm(" shr%z1 %1\n" \ + " inc %0\n" \ + " rcr%z2 %2\n" \ + : "+d"(shift_from), "+m"(*shift_from), "+rm"(rotate_into) \ + ); \ + }) +#else +#define shiftr_adv_rotr(shift_from, rotate_into) ({ \ + typeof(rotate_into) __bit0 = *shift_from & 1; \ + *shift_from >>= 1; \ + shift_from++; \ + rotate_into >>= 1; \ + rotate_into |= __bit0 << sizeof(rotate_into) * 8 - 1; \ + }) +#endif + /* - * Determine how many nacks from the previous ACK have now been satisfied. + * Process a batch of soft ACKs specific to a transmission queue segment. */ -static rxrpc_seq_t rxrpc_input_check_prev_ack(struct rxrpc_call *call, - struct rxrpc_ack_summary *summary, - rxrpc_seq_t seq) +static void rxrpc_input_soft_ack_tq(struct rxrpc_call *call, + struct rxrpc_ack_summary *summary, + struct rxrpc_txqueue *tq, + unsigned long extracted_acks, + int nr_reported, + rxrpc_seq_t seq, + rxrpc_seq_t *lowest_nak) { - struct sk_buff *skb = call->cong_last_nack; - struct rxrpc_skb_priv *sp = rxrpc_skb(skb); - unsigned int i, new_acks = 0, retained_nacks = 0; - rxrpc_seq_t old_seq = sp->ack.first_ack; - u8 *acks = skb->data + sizeof(struct rxrpc_wire_header) + sizeof(struct rxrpc_ackpacket); + unsigned long old_reported, flipped, new_acks, a_to_n, n_to_a; + int new, a, n; + + old_reported = ~0UL >> (RXRPC_NR_TXQUEUE - tq->nr_reported_acks); + _enter("{%x,%lx,%d},%lx,%d,%x", + tq->qbase, tq->segment_acked, tq->nr_reported_acks, + extracted_acks, nr_reported, seq); + + _debug("[%x]", tq->qbase); + _debug("tq %16lx %u", tq->segment_acked, tq->nr_reported_acks); + _debug("sack %16lx %u", extracted_acks, nr_reported); + + /* See how many previously logged ACKs/NAKs have flipped. */ + flipped = (tq->segment_acked ^ extracted_acks) & old_reported; + if (flipped) { + n_to_a = ~tq->segment_acked & flipped; /* Old NAK -> ACK */ + a_to_n = tq->segment_acked & flipped; /* Old ACK -> NAK */ + a = hweight_long(n_to_a); + n = hweight_long(a_to_n); + _debug("flip %16lx", flipped); + _debug("ntoa %16lx %d", n_to_a, a); + _debug("aton %16lx %d", a_to_n, n); + summary->nr_acks += a - n; + summary->nr_nacks += n - a; + summary->nr_new_acks += a; + summary->nr_new_nacks += n; + } - if (after_eq(seq, old_seq + sp->ack.nr_acks)) { - summary->nr_new_acks += sp->ack.nr_nacks; - summary->nr_new_acks += seq - (old_seq + sp->ack.nr_acks); - summary->nr_retained_nacks = 0; - } else if (seq == old_seq) { - summary->nr_retained_nacks = sp->ack.nr_nacks; - } else { - for (i = 0; i < sp->ack.nr_acks; i++) { - if (acks[i] == RXRPC_ACK_TYPE_NACK) { - if (before(old_seq + i, seq)) - new_acks++; - else - retained_nacks++; - } + /* See how many new ACKs/NAKs have been acquired. */ + new = nr_reported - tq->nr_reported_acks; + if (new > 0) { + new_acks = extracted_acks & ~old_reported; + if (new_acks) { + a = hweight_long(new_acks); + n = new - a; + _debug("new_a %16lx new=%d a=%d n=%d", new_acks, new, a, n); + summary->nr_acks += a; + summary->nr_nacks += n; + summary->nr_new_acks += a; + summary->nr_new_nacks += n; + } else { + summary->nr_nacks += new; + summary->nr_new_nacks += new; } - - summary->nr_new_acks += new_acks; - summary->nr_retained_nacks = retained_nacks; } - return old_seq + sp->ack.nr_acks; + tq->nr_reported_acks = nr_reported; + tq->segment_acked = extracted_acks; + trace_rxrpc_apply_acks(call, tq); + + if (extracted_acks != ~0UL) { + rxrpc_seq_t lowest = seq + ffz(extracted_acks); + + if (before(lowest, *lowest_nak)) + *lowest_nak = lowest; + } } /* @@ -835,38 +896,48 @@ static rxrpc_seq_t rxrpc_input_check_prev_ack(struct rxrpc_call *call, static void rxrpc_input_soft_acks(struct rxrpc_call *call, struct rxrpc_ack_summary *summary, struct sk_buff *skb, - rxrpc_seq_t seq, - rxrpc_seq_t since) + rxrpc_seq_t first) { struct rxrpc_skb_priv *sp = rxrpc_skb(skb); - unsigned int i, old_nacks = 0; - rxrpc_seq_t lowest_nak = seq + sp->ack.nr_acks; + struct rxrpc_txqueue *tq = call->tx_queue; + unsigned long extracted = ~0UL; + unsigned int nr = 0; + rxrpc_seq_t seq = first, lowest_nak = first + sp->ack.nr_acks; u8 *acks = skb->data + sizeof(struct rxrpc_wire_header) + sizeof(struct rxrpc_ackpacket); - for (i = 0; i < sp->ack.nr_acks; i++) { - if (acks[i] == RXRPC_ACK_TYPE_ACK) { - summary->nr_acks++; - if (after_eq(seq, since)) - summary->nr_new_acks++; - } else { - summary->saw_nacks = true; - if (before(seq, since)) { - /* Overlap with previous ACK */ - old_nacks++; - } else { - summary->nr_new_nacks++; - sp->ack.nr_nacks++; - } + _enter("%x,%x,%u", tq->qbase, first, sp->ack.nr_acks); + + while (after(seq, tq->qbase + RXRPC_NR_TXQUEUE - 1)) + tq = tq->next; - if (before(seq, lowest_nak)) - lowest_nak = seq; + for (unsigned int i = 0; i < sp->ack.nr_acks; i++) { + /* Decant ACKs until we hit a txqueue boundary. */ + shiftr_adv_rotr(acks, extracted); + if (i == 256) { + acks -= i; + i = 0; } seq++; + nr++; + if ((seq & RXRPC_TXQ_MASK) != 0) + continue; + + _debug("bound %16lx %u", extracted, nr); + + rxrpc_input_soft_ack_tq(call, summary, tq, extracted, RXRPC_NR_TXQUEUE, + seq - RXRPC_NR_TXQUEUE, &lowest_nak); + extracted = ~0UL; + nr = 0; + tq = tq->next; } - if (lowest_nak != call->acks_lowest_nak) { - call->acks_lowest_nak = lowest_nak; - summary->new_low_nack = true; + if (nr) { + unsigned int nr_reported = seq & RXRPC_TXQ_MASK; + + extracted >>= RXRPC_NR_TXQUEUE - nr_reported; + _debug("tail %16lx %u", extracted, nr_reported); + rxrpc_input_soft_ack_tq(call, summary, tq, extracted, nr_reported, + seq & ~RXRPC_TXQ_MASK, &lowest_nak); } /* We *can* have more nacks than we did - the peer is permitted to drop @@ -874,9 +945,14 @@ static void rxrpc_input_soft_acks(struct rxrpc_call *call, * possible for the nack distribution to change whilst the number of * nacks stays the same or goes down. */ - if (old_nacks < summary->nr_retained_nacks) - summary->nr_new_acks += summary->nr_retained_nacks - old_nacks; - summary->nr_retained_nacks = old_nacks; + if (lowest_nak != call->acks_lowest_nak) { + call->acks_lowest_nak = lowest_nak; + summary->new_low_nack = true; + } + + _debug("summary A=%d+%d N=%d+%d", + summary->nr_acks, summary->nr_new_acks, + summary->nr_nacks, summary->nr_new_nacks); } /* @@ -919,7 +995,7 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb) struct rxrpc_skb_priv *sp = rxrpc_skb(skb); struct rxrpc_acktrailer trailer; rxrpc_serial_t ack_serial, acked_serial; - rxrpc_seq_t first_soft_ack, hard_ack, prev_pkt, since; + rxrpc_seq_t first_soft_ack, hard_ack, prev_pkt; int nr_acks, offset, ioffset; _enter(""); @@ -986,6 +1062,7 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb) /* Discard any out-of-order or duplicate ACKs (outside lock). */ if (!rxrpc_is_ack_valid(call, first_soft_ack, prev_pkt)) { + kdebug("discard"); trace_rxrpc_rx_discard_ack(call->debug_id, ack_serial, first_soft_ack, call->acks_first_seq, prev_pkt, call->acks_prev_seq); @@ -1001,16 +1078,6 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb) if (nr_acks > 0) skb_condense(skb); - if (call->cong_last_nack) { - since = rxrpc_input_check_prev_ack(call, &summary, first_soft_ack); - rxrpc_free_skb(call->cong_last_nack, rxrpc_skb_put_last_nack); - call->cong_last_nack = NULL; - } else { - summary.nr_new_acks = first_soft_ack - call->acks_first_seq; - call->acks_lowest_nak = first_soft_ack + nr_acks; - since = first_soft_ack; - } - call->acks_latest_ts = skb->tstamp; call->acks_first_seq = first_soft_ack; call->acks_prev_seq = prev_pkt; @@ -1048,6 +1115,9 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb) if (nr_acks > call->tx_top - hard_ack) return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_sack_overflow); + summary.nr_acks = call->acks_nr_acks; + summary.nr_nacks = call->acks_nr_nacks; + if (after(hard_ack, call->acks_hard_ack)) { if (rxrpc_rotate_tx_window(call, hard_ack, &summary)) { rxrpc_end_tx_phase(call, false, rxrpc_eproto_unexpected_ack); @@ -1058,9 +1128,7 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb) if (nr_acks > 0) { if (offset > (int)skb->len - nr_acks) return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_short_sack); - rxrpc_input_soft_acks(call, &summary, skb, first_soft_ack, since); - rxrpc_get_skb(skb, rxrpc_skb_get_last_nack); - call->cong_last_nack = skb; + rxrpc_input_soft_acks(call, &summary, skb, first_soft_ack); } if (test_bit(RXRPC_CALL_TX_LAST, &call->flags) && @@ -1069,7 +1137,11 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb) rxrpc_propose_ping(call, ack_serial, rxrpc_propose_ack_ping_for_lost_reply); + call->acks_nr_acks = summary.nr_acks; + call->acks_nr_nacks = summary.nr_nacks; rxrpc_congestion_management(call, skb, &summary, acked_serial); + if (summary.need_retransmit) + rxrpc_resend(call, ack_serial, summary.ack_reason == RXRPC_ACK_PING_RESPONSE); send_response: if (summary.ack_reason == RXRPC_ACK_PING) diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c index fa3a316fe76cff..1fa3d6817dd3c3 100644 --- a/net/rxrpc/output.c +++ b/net/rxrpc/output.c @@ -464,7 +464,7 @@ dont_set_request_ack: len -= sizeof(*whdr) - sizeof(*jumbo); } - trace_rxrpc_tx_data(call, txb->seq, txb->serial, flags, false); + trace_rxrpc_tx_data(call, txb->seq, txb->serial, txb->flags | flags, false); kv->iov_len = len; return len; } @@ -522,12 +522,22 @@ static void rxrpc_tstamp_data_packets(struct rxrpc_call *call, struct rxrpc_txqu for (i = 0; i < n; i++) { int ix = seq & RXRPC_TXQ_MASK; + tq->segment_xmit_ts[ix] = xmit_ts; if (tq->bufs[ix]->flags & RXRPC_REQUEST_ACK) serial = tq->bufs[ix]->serial; seq++; - if (!(seq & RXRPC_TXQ_MASK)) + if (!(seq & RXRPC_TXQ_MASK)) { tq = tq->next; + if (!tq) + break; + if (tq->xmit_ts_base == KTIME_MIN) { + tq->xmit_ts_base = now; + xmit_ts = 0; + } else { + xmit_ts = ktime_to_us(ktime_sub(now, tq->xmit_ts_base)); + } + } } if (serial) { @@ -573,16 +583,6 @@ static int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txqueue len = rxrpc_prepare_data_packet(call, tq, seq, n); txb = tq->bufs[seq & RXRPC_TXQ_MASK]; - if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) { - static int lose; - if ((lose++ & 7) == 7) { - ret = 0; - trace_rxrpc_tx_data(call, txb->seq, txb->serial, - txb->flags, true); - goto done; - } - } - iov_iter_kvec(&msg.msg_iter, WRITE, call->local->kvec, n, len); msg.msg_name = &call->peer->srx.transport; @@ -608,6 +608,17 @@ static int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txqueue frag = rxrpc_tx_point_call_data_nofrag; } + if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) { + static int lose; + if ((lose++ & 7) == 7) { + ret = 0; + trace_rxrpc_tx_data(call, txb->seq, txb->serial, + txb->flags, true); + conn->peer->last_tx_at = ktime_get_seconds(); + goto done; + } + } + retry: /* send the packet by UDP * - returns -EMSGSIZE if UDP would have to fragment the packet diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c index 4c35b705b43fea..0a31868e2a2f4f 100644 --- a/net/rxrpc/sendmsg.c +++ b/net/rxrpc/sendmsg.c @@ -297,6 +297,9 @@ static int rxrpc_alloc_txqueue(struct sock *sk, struct rxrpc_call *call) kfree(tq); return -ENOMEM; } else { + /* We start at seq 1, so pretend seq 0 is hard-acked. */ + tq->nr_reported_acks = 1; + tq->segment_acked = 1UL; tq->qbase = 0; call->tx_qbase = 0; call->send_queue = tq; |