diff options
Diffstat (limited to 'net/rxrpc/recvmsg.c')
-rw-r--r-- | net/rxrpc/recvmsg.c | 308 |
1 files changed, 96 insertions, 212 deletions
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c index 7e39c262fd79..36b25d003cf0 100644 --- a/net/rxrpc/recvmsg.c +++ b/net/rxrpc/recvmsg.c @@ -36,16 +36,16 @@ void rxrpc_notify_socket(struct rxrpc_call *call) sk = &rx->sk; if (rx && sk->sk_state < RXRPC_CLOSE) { if (call->notify_rx) { - spin_lock_bh(&call->notify_lock); + spin_lock(&call->notify_lock); call->notify_rx(sk, call, call->user_call_ID); - spin_unlock_bh(&call->notify_lock); + spin_unlock(&call->notify_lock); } else { - write_lock_bh(&rx->recvmsg_lock); + write_lock(&rx->recvmsg_lock); if (list_empty(&call->recvmsg_link)) { - rxrpc_get_call(call, rxrpc_call_got); + rxrpc_get_call(call, rxrpc_call_get_notify_socket); list_add_tail(&call->recvmsg_link, &rx->recvmsg_q); } - write_unlock_bh(&rx->recvmsg_lock); + write_unlock(&rx->recvmsg_lock); if (!sock_flag(sk, SOCK_DEAD)) { _debug("call %ps", sk->sk_data_ready); @@ -87,9 +87,9 @@ bool rxrpc_set_call_completion(struct rxrpc_call *call, bool ret = false; if (call->state < RXRPC_CALL_COMPLETE) { - write_lock_bh(&call->state_lock); + write_lock(&call->state_lock); ret = __rxrpc_set_call_completion(call, compl, abort_code, error); - write_unlock_bh(&call->state_lock); + write_unlock(&call->state_lock); } return ret; } @@ -107,9 +107,9 @@ bool rxrpc_call_completed(struct rxrpc_call *call) bool ret = false; if (call->state < RXRPC_CALL_COMPLETE) { - write_lock_bh(&call->state_lock); + write_lock(&call->state_lock); ret = __rxrpc_call_completed(call); - write_unlock_bh(&call->state_lock); + write_unlock(&call->state_lock); } return ret; } @@ -131,9 +131,9 @@ bool rxrpc_abort_call(const char *why, struct rxrpc_call *call, { bool ret; - write_lock_bh(&call->state_lock); + write_lock(&call->state_lock); ret = __rxrpc_abort_call(why, call, seq, abort_code, error); - write_unlock_bh(&call->state_lock); + write_unlock(&call->state_lock); return ret; } @@ -173,8 +173,9 @@ static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg) break; } - trace_rxrpc_recvmsg(call, rxrpc_recvmsg_terminal, call->rx_hard_ack, - call->rx_pkt_offset, call->rx_pkt_len, ret); + trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal, + lower_32_bits(atomic64_read(&call->ackr_window)) - 1, + call->rx_pkt_offset, call->rx_pkt_len, ret); return ret; } @@ -183,35 +184,32 @@ static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg) */ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial) { + rxrpc_seq_t whigh = READ_ONCE(call->rx_highest_seq); + _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]); - trace_rxrpc_receive(call, rxrpc_receive_end, 0, call->rx_top); - ASSERTCMP(call->rx_hard_ack, ==, call->rx_top); + trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh); - if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) { - rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, serial, false, true, - rxrpc_propose_ack_terminal_ack); - //rxrpc_send_ack_packet(call, false, NULL); - } + if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) + rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack); - write_lock_bh(&call->state_lock); + write_lock(&call->state_lock); switch (call->state) { case RXRPC_CALL_CLIENT_RECV_REPLY: __rxrpc_call_completed(call); - write_unlock_bh(&call->state_lock); + write_unlock(&call->state_lock); break; case RXRPC_CALL_SERVER_RECV_REQUEST: - call->tx_phase = true; call->state = RXRPC_CALL_SERVER_ACK_REQUEST; call->expect_req_by = jiffies + MAX_JIFFY_OFFSET; - write_unlock_bh(&call->state_lock); - rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial, false, true, - rxrpc_propose_ack_processing_op); + write_unlock(&call->state_lock); + rxrpc_propose_delay_ACK(call, serial, + rxrpc_propose_ack_processing_op); break; default: - write_unlock_bh(&call->state_lock); + write_unlock(&call->state_lock); break; } } @@ -224,126 +222,51 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call) struct rxrpc_skb_priv *sp; struct sk_buff *skb; rxrpc_serial_t serial; - rxrpc_seq_t hard_ack, top; - bool last = false; - u8 subpacket; - int ix; + rxrpc_seq_t old_consumed = call->rx_consumed, tseq; + bool last; + int acked; _enter("%d", call->debug_id); - hard_ack = call->rx_hard_ack; - top = smp_load_acquire(&call->rx_top); - ASSERT(before(hard_ack, top)); + skb = skb_dequeue(&call->recvmsg_queue); + rxrpc_see_skb(skb, rxrpc_skb_see_rotate); - hard_ack++; - ix = hard_ack & RXRPC_RXTX_BUFF_MASK; - skb = call->rxtx_buffer[ix]; - rxrpc_see_skb(skb, rxrpc_skb_rotated); sp = rxrpc_skb(skb); + tseq = sp->hdr.seq; + serial = sp->hdr.serial; + last = sp->hdr.flags & RXRPC_LAST_PACKET; - subpacket = call->rxtx_annotations[ix] & RXRPC_RX_ANNO_SUBPACKET; - serial = sp->hdr.serial + subpacket; - - if (subpacket == sp->nr_subpackets - 1 && - sp->rx_flags & RXRPC_SKB_INCL_LAST) - last = true; - - call->rxtx_buffer[ix] = NULL; - call->rxtx_annotations[ix] = 0; /* Barrier against rxrpc_input_data(). */ - smp_store_release(&call->rx_hard_ack, hard_ack); + if (after(tseq, call->rx_consumed)) + smp_store_release(&call->rx_consumed, tseq); - rxrpc_free_skb(skb, rxrpc_skb_freed); + rxrpc_free_skb(skb, rxrpc_skb_put_rotate); - trace_rxrpc_receive(call, rxrpc_receive_rotate, serial, hard_ack); + trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate, + serial, call->rx_consumed); if (last) { rxrpc_end_rx_phase(call, serial); - } else { - /* Check to see if there's an ACK that needs sending. */ - if (atomic_inc_return(&call->ackr_nr_consumed) > 2) - rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, serial, - true, false, - rxrpc_propose_ack_rotate_rx); - if (call->ackr_reason && call->ackr_reason != RXRPC_ACK_DELAY) - rxrpc_send_ack_packet(call, false, NULL); - } -} - -/* - * Decrypt and verify a (sub)packet. The packet's length may be changed due to - * padding, but if this is the case, the packet length will be resident in the - * socket buffer. Note that we can't modify the master skb info as the skb may - * be the home to multiple subpackets. - */ -static int rxrpc_verify_packet(struct rxrpc_call *call, struct sk_buff *skb, - u8 annotation, - unsigned int offset, unsigned int len) -{ - struct rxrpc_skb_priv *sp = rxrpc_skb(skb); - rxrpc_seq_t seq = sp->hdr.seq; - u16 cksum = sp->hdr.cksum; - u8 subpacket = annotation & RXRPC_RX_ANNO_SUBPACKET; - - _enter(""); - - /* For all but the head jumbo subpacket, the security checksum is in a - * jumbo header immediately prior to the data. - */ - if (subpacket > 0) { - __be16 tmp; - if (skb_copy_bits(skb, offset - 2, &tmp, 2) < 0) - BUG(); - cksum = ntohs(tmp); - seq += subpacket; + return; } - return call->security->verify_packet(call, skb, offset, len, - seq, cksum); + /* Check to see if there's an ACK that needs sending. */ + acked = atomic_add_return(call->rx_consumed - old_consumed, + &call->ackr_nr_consumed); + if (acked > 2 && + !test_and_set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags)) + rxrpc_poke_call(call, rxrpc_call_poke_idle); } /* - * Locate the data within a packet. This is complicated by: - * - * (1) An skb may contain a jumbo packet - so we have to find the appropriate - * subpacket. - * - * (2) The (sub)packets may be encrypted and, if so, the encrypted portion - * contains an extra header which includes the true length of the data, - * excluding any encrypted padding. + * Decrypt and verify a DATA packet. */ -static int rxrpc_locate_data(struct rxrpc_call *call, struct sk_buff *skb, - u8 *_annotation, - unsigned int *_offset, unsigned int *_len, - bool *_last) +static int rxrpc_verify_data(struct rxrpc_call *call, struct sk_buff *skb) { struct rxrpc_skb_priv *sp = rxrpc_skb(skb); - unsigned int offset = sizeof(struct rxrpc_wire_header); - unsigned int len; - bool last = false; - int ret; - u8 annotation = *_annotation; - u8 subpacket = annotation & RXRPC_RX_ANNO_SUBPACKET; - - /* Locate the subpacket */ - offset += subpacket * RXRPC_JUMBO_SUBPKTLEN; - len = skb->len - offset; - if (subpacket < sp->nr_subpackets - 1) - len = RXRPC_JUMBO_DATALEN; - else if (sp->rx_flags & RXRPC_SKB_INCL_LAST) - last = true; - - if (!(annotation & RXRPC_RX_ANNO_VERIFIED)) { - ret = rxrpc_verify_packet(call, skb, annotation, offset, len); - if (ret < 0) - return ret; - *_annotation |= RXRPC_RX_ANNO_VERIFIED; - } - *_offset = offset; - *_len = len; - *_last = last; - call->security->locate_data(call, skb, _offset, _len); - return 0; + if (sp->flags & RXRPC_RX_VERIFIED) + return 0; + return call->security->verify_packet(call, skb); } /* @@ -357,69 +280,49 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call, { struct rxrpc_skb_priv *sp; struct sk_buff *skb; - rxrpc_serial_t serial; - rxrpc_seq_t hard_ack, top, seq; + rxrpc_seq_t seq = 0; size_t remain; - bool rx_pkt_last; unsigned int rx_pkt_offset, rx_pkt_len; - int ix, copy, ret = -EAGAIN, ret2; - - if (test_and_clear_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags) && - call->ackr_reason) - rxrpc_send_ack_packet(call, false, NULL); + int copy, ret = -EAGAIN, ret2; rx_pkt_offset = call->rx_pkt_offset; rx_pkt_len = call->rx_pkt_len; - rx_pkt_last = call->rx_pkt_last; if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) { - seq = call->rx_hard_ack; + seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1; ret = 1; goto done; } - /* Barriers against rxrpc_input_data(). */ - hard_ack = call->rx_hard_ack; - seq = hard_ack + 1; - - while (top = smp_load_acquire(&call->rx_top), - before_eq(seq, top) - ) { - ix = seq & RXRPC_RXTX_BUFF_MASK; - skb = call->rxtx_buffer[ix]; - if (!skb) { - trace_rxrpc_recvmsg(call, rxrpc_recvmsg_hole, seq, - rx_pkt_offset, rx_pkt_len, 0); - break; - } - smp_rmb(); - rxrpc_see_skb(skb, rxrpc_skb_seen); + /* No one else can be removing stuff from the queue, so we shouldn't + * need the Rx lock to walk it. + */ + skb = skb_peek(&call->recvmsg_queue); + while (skb) { + rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg); sp = rxrpc_skb(skb); + seq = sp->hdr.seq; - if (!(flags & MSG_PEEK)) { - serial = sp->hdr.serial; - serial += call->rxtx_annotations[ix] & RXRPC_RX_ANNO_SUBPACKET; + if (!(flags & MSG_PEEK)) trace_rxrpc_receive(call, rxrpc_receive_front, - serial, seq); - } + sp->hdr.serial, seq); if (msg) sock_recv_timestamp(msg, sock->sk, skb); if (rx_pkt_offset == 0) { - ret2 = rxrpc_locate_data(call, skb, - &call->rxtx_annotations[ix], - &rx_pkt_offset, &rx_pkt_len, - &rx_pkt_last); - trace_rxrpc_recvmsg(call, rxrpc_recvmsg_next, seq, - rx_pkt_offset, rx_pkt_len, ret2); + ret2 = rxrpc_verify_data(call, skb); + rx_pkt_offset = sp->offset; + rx_pkt_len = sp->len; + trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq, + rx_pkt_offset, rx_pkt_len, ret2); if (ret2 < 0) { ret = ret2; goto out; } } else { - trace_rxrpc_recvmsg(call, rxrpc_recvmsg_cont, seq, - rx_pkt_offset, rx_pkt_len, 0); + trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq, + rx_pkt_offset, rx_pkt_len, 0); } /* We have to handle short, empty and used-up DATA packets. */ @@ -442,39 +345,35 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call, } if (rx_pkt_len > 0) { - trace_rxrpc_recvmsg(call, rxrpc_recvmsg_full, seq, - rx_pkt_offset, rx_pkt_len, 0); + trace_rxrpc_recvdata(call, rxrpc_recvmsg_full, seq, + rx_pkt_offset, rx_pkt_len, 0); ASSERTCMP(*_offset, ==, len); ret = 0; break; } /* The whole packet has been transferred. */ - if (!(flags & MSG_PEEK)) - rxrpc_rotate_rx_window(call); + if (sp->hdr.flags & RXRPC_LAST_PACKET) + ret = 1; rx_pkt_offset = 0; rx_pkt_len = 0; - if (rx_pkt_last) { - ASSERTCMP(seq, ==, READ_ONCE(call->rx_top)); - ret = 1; - goto out; - } + skb = skb_peek_next(skb, &call->recvmsg_queue); - seq++; + if (!(flags & MSG_PEEK)) + rxrpc_rotate_rx_window(call); } out: if (!(flags & MSG_PEEK)) { call->rx_pkt_offset = rx_pkt_offset; call->rx_pkt_len = rx_pkt_len; - call->rx_pkt_last = rx_pkt_last; } done: - trace_rxrpc_recvmsg(call, rxrpc_recvmsg_data_return, seq, - rx_pkt_offset, rx_pkt_len, ret); + trace_rxrpc_recvdata(call, rxrpc_recvmsg_data_return, seq, + rx_pkt_offset, rx_pkt_len, ret); if (ret == -EAGAIN) - set_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags); + set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags); return ret; } @@ -495,7 +394,7 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len, DEFINE_WAIT(wait); - trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_enter, 0, 0, 0, 0); + trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_enter, 0); if (flags & (MSG_OOB | MSG_TRUNC)) return -EOPNOTSUPP; @@ -532,8 +431,7 @@ try_again: if (list_empty(&rx->recvmsg_q)) { if (signal_pending(current)) goto wait_interrupted; - trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_wait, - 0, 0, 0, 0); + trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_wait, 0); timeo = schedule_timeout(timeo); } finish_wait(sk_sleep(&rx->sk), &wait); @@ -543,16 +441,16 @@ try_again: /* Find the next call and dequeue it if we're not just peeking. If we * do dequeue it, that comes with a ref that we will need to release. */ - write_lock_bh(&rx->recvmsg_lock); + write_lock(&rx->recvmsg_lock); l = rx->recvmsg_q.next; call = list_entry(l, struct rxrpc_call, recvmsg_link); if (!(flags & MSG_PEEK)) list_del_init(&call->recvmsg_link); else - rxrpc_get_call(call, rxrpc_call_got); - write_unlock_bh(&rx->recvmsg_lock); + rxrpc_get_call(call, rxrpc_call_get_recvmsg); + write_unlock(&rx->recvmsg_lock); - trace_rxrpc_recvmsg(call, rxrpc_recvmsg_dequeue, 0, 0, 0, 0); + trace_rxrpc_recvmsg(call, rxrpc_recvmsg_dequeue, 0); /* We're going to drop the socket lock, so we need to lock the call * against interference by sendmsg. @@ -588,11 +486,9 @@ try_again: } if (msg->msg_name && call->peer) { - struct sockaddr_rxrpc *srx = msg->msg_name; - size_t len = sizeof(call->peer->srx); + size_t len = sizeof(call->dest_srx); - memcpy(msg->msg_name, &call->peer->srx, len); - srx->srx_service = call->service_id; + memcpy(msg->msg_name, &call->dest_srx, len); msg->msg_namelen = len; } @@ -605,8 +501,7 @@ try_again: if (ret == -EAGAIN) ret = 0; - if (after(call->rx_top, call->rx_hard_ack) && - call->rxtx_buffer[(call->rx_hard_ack + 1) & RXRPC_RXTX_BUFF_MASK]) + if (!skb_queue_empty(&call->recvmsg_queue)) rxrpc_notify_socket(call); break; default: @@ -635,23 +530,23 @@ try_again: error_unlock_call: mutex_unlock(&call->user_mutex); - rxrpc_put_call(call, rxrpc_call_put); - trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret); + rxrpc_put_call(call, rxrpc_call_put_recvmsg); + trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, ret); return ret; error_requeue_call: if (!(flags & MSG_PEEK)) { - write_lock_bh(&rx->recvmsg_lock); + write_lock(&rx->recvmsg_lock); list_add(&call->recvmsg_link, &rx->recvmsg_q); - write_unlock_bh(&rx->recvmsg_lock); - trace_rxrpc_recvmsg(call, rxrpc_recvmsg_requeue, 0, 0, 0, 0); + write_unlock(&rx->recvmsg_lock); + trace_rxrpc_recvmsg(call, rxrpc_recvmsg_requeue, 0); } else { - rxrpc_put_call(call, rxrpc_call_put); + rxrpc_put_call(call, rxrpc_call_put_recvmsg); } error_no_call: release_sock(&rx->sk); error_trace: - trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret); + trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, ret); return ret; wait_interrupted: @@ -735,19 +630,8 @@ int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call, read_phase_complete: ret = 1; out: - switch (call->ackr_reason) { - case RXRPC_ACK_IDLE: - break; - case RXRPC_ACK_DELAY: - if (ret != -EAGAIN) - break; - fallthrough; - default: - rxrpc_send_ack_packet(call, false, NULL); - } - if (_service) - *_service = call->service_id; + *_service = call->dest_srx.srx_service; mutex_unlock(&call->user_mutex); _leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort); return ret; |