diff options
author | Karl Nilsson <kjnilsson@gmail.com> | 2021-09-10 16:03:04 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-09-10 16:03:04 +0100 |
commit | 239e136480e4f947d91316b8849f5fb8123e0801 (patch) | |
tree | 216c0c2f368ace96fc67aa0e45336f410d3f55e0 | |
parent | 0dafe700055ac36a3c68cf392f6b8d56e4dc9fb7 (diff) | |
parent | 3b1714cbe3fd3ec5176116063d300020c00ec903 (diff) | |
download | rabbitmq-server-git-239e136480e4f947d91316b8849f5fb8123e0801.tar.gz |
Merge pull request #3397 from rabbitmq/handle-connection-closures-in-stream-reader
Handle closed connections in stream reader
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 125 |
1 files changed, 64 insertions, 61 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 8bad2e53af..412be10650 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -1006,6 +1006,10 @@ open(cast, Consumer, SendFileOct) of + {error, closed} -> + rabbit_log_connection:info("Stream protocol connection has been closed by peer", + []), + throw({stop, normal}); {error, Reason} -> rabbit_log_connection:info("Error while sending chunks: ~p", [Reason]), @@ -1819,52 +1823,64 @@ handle_frame_post_auth(Transport, rabbit_log:debug("Distributing existing messages to subscription ~p", [SubscriptionId]), - {{segment, Segment1}, {credit, Credit1}} = - send_chunks(Transport, ConsumerState, - SendFileOct), - ConsumerState1 = - ConsumerState#consumer{segment = Segment1, - credit = Credit1}, - Consumers1 = - Consumers#{SubscriptionId => ConsumerState1}, - - StreamSubscriptions1 = - case StreamSubscriptions of - #{Stream := SubscriptionIds} -> - StreamSubscriptions#{Stream => - [SubscriptionId] - ++ SubscriptionIds}; - _ -> - StreamSubscriptions#{Stream => - [SubscriptionId]} - end, - #consumer{counters = ConsumerCounters1} = - ConsumerState1, - - ConsumerOffset = osiris_log:next_offset(Segment1), - ConsumerOffsetLag = - consumer_i(offset_lag, ConsumerState1), - - rabbit_log:debug("Subscription ~p is now at offset ~p with ~p message(s) " - "distributed after subscription", - [SubscriptionId, ConsumerOffset, - messages_consumed(ConsumerCounters1)]), - - rabbit_stream_metrics:consumer_created(self(), - stream_r(Stream, - Connection1), - SubscriptionId, - Credit1, - messages_consumed(ConsumerCounters1), - ConsumerOffset, - ConsumerOffsetLag, - Properties), - {Connection1#stream_connection{stream_subscriptions - = - StreamSubscriptions1}, - State#stream_connection_state{consumers = - Consumers1}} + case send_chunks(Transport, ConsumerState, + SendFileOct) + of + {error, closed} -> + rabbit_log_connection:info("Stream protocol connection has been closed by peer", + []), + throw({stop, normal}); + {{segment, Segment1}, {credit, Credit1}} -> + ConsumerState1 = + ConsumerState#consumer{segment = + Segment1, + credit = + Credit1}, + Consumers1 = + Consumers#{SubscriptionId => + ConsumerState1}, + + StreamSubscriptions1 = + case StreamSubscriptions of + #{Stream := SubscriptionIds} -> + StreamSubscriptions#{Stream => + [SubscriptionId] + ++ SubscriptionIds}; + _ -> + StreamSubscriptions#{Stream => + [SubscriptionId]} + end, + + #consumer{counters = ConsumerCounters1} = + ConsumerState1, + + ConsumerOffset = + osiris_log:next_offset(Segment1), + ConsumerOffsetLag = + consumer_i(offset_lag, ConsumerState1), + + rabbit_log:debug("Subscription ~p is now at offset ~p with ~p message(s) " + "distributed after subscription", + [SubscriptionId, + ConsumerOffset, + messages_consumed(ConsumerCounters1)]), + + rabbit_stream_metrics:consumer_created(self(), + stream_r(Stream, + Connection1), + SubscriptionId, + Credit1, + messages_consumed(ConsumerCounters1), + ConsumerOffset, + ConsumerOffsetLag, + Properties), + {Connection1#stream_connection{stream_subscriptions + = + StreamSubscriptions1}, + State#stream_connection_state{consumers = + Consumers1}} + end end end; error -> @@ -1893,22 +1909,9 @@ handle_frame_post_auth(Transport, SendFileOct) of {error, closed} -> - rabbit_log:warning("Stream protocol connection for subscription ~p has been closed, removing " - "subscription", - [SubscriptionId]), - {Connection1, State1} = - remove_subscription(SubscriptionId, Connection, State), - - Code = ?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST, - Frame = - rabbit_stream_core:frame({response, 1, - {credit, Code, - SubscriptionId}}), - send(Transport, S, Frame), - rabbit_global_counters:increase_protocol_counter(stream, - ?SUBSCRIPTION_ID_DOES_NOT_EXIST, - 1), - {Connection1, State1}; + rabbit_log_connection:info("Stream protocol connection has been closed by peer", + []), + throw({stop, normal}); {{segment, Segment1}, {credit, Credit1}} -> Consumer1 = Consumer#consumer{segment = Segment1, credit = Credit1}, |