summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2021-09-10 16:03:04 +0100
committerGitHub <noreply@github.com>2021-09-10 16:03:04 +0100
commit239e136480e4f947d91316b8849f5fb8123e0801 (patch)
tree216c0c2f368ace96fc67aa0e45336f410d3f55e0
parent0dafe700055ac36a3c68cf392f6b8d56e4dc9fb7 (diff)
parent3b1714cbe3fd3ec5176116063d300020c00ec903 (diff)
downloadrabbitmq-server-git-239e136480e4f947d91316b8849f5fb8123e0801.tar.gz
Merge pull request #3397 from rabbitmq/handle-connection-closures-in-stream-reader
Handle closed connections in stream reader
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl125
1 files changed, 64 insertions, 61 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 8bad2e53af..412be10650 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -1006,6 +1006,10 @@ open(cast,
Consumer,
SendFileOct)
of
+ {error, closed} ->
+ rabbit_log_connection:info("Stream protocol connection has been closed by peer",
+ []),
+ throw({stop, normal});
{error, Reason} ->
rabbit_log_connection:info("Error while sending chunks: ~p",
[Reason]),
@@ -1819,52 +1823,64 @@ handle_frame_post_auth(Transport,
rabbit_log:debug("Distributing existing messages to subscription ~p",
[SubscriptionId]),
- {{segment, Segment1}, {credit, Credit1}} =
- send_chunks(Transport, ConsumerState,
- SendFileOct),
- ConsumerState1 =
- ConsumerState#consumer{segment = Segment1,
- credit = Credit1},
- Consumers1 =
- Consumers#{SubscriptionId => ConsumerState1},
-
- StreamSubscriptions1 =
- case StreamSubscriptions of
- #{Stream := SubscriptionIds} ->
- StreamSubscriptions#{Stream =>
- [SubscriptionId]
- ++ SubscriptionIds};
- _ ->
- StreamSubscriptions#{Stream =>
- [SubscriptionId]}
- end,
- #consumer{counters = ConsumerCounters1} =
- ConsumerState1,
-
- ConsumerOffset = osiris_log:next_offset(Segment1),
- ConsumerOffsetLag =
- consumer_i(offset_lag, ConsumerState1),
-
- rabbit_log:debug("Subscription ~p is now at offset ~p with ~p message(s) "
- "distributed after subscription",
- [SubscriptionId, ConsumerOffset,
- messages_consumed(ConsumerCounters1)]),
-
- rabbit_stream_metrics:consumer_created(self(),
- stream_r(Stream,
- Connection1),
- SubscriptionId,
- Credit1,
- messages_consumed(ConsumerCounters1),
- ConsumerOffset,
- ConsumerOffsetLag,
- Properties),
- {Connection1#stream_connection{stream_subscriptions
- =
- StreamSubscriptions1},
- State#stream_connection_state{consumers =
- Consumers1}}
+ case send_chunks(Transport, ConsumerState,
+ SendFileOct)
+ of
+ {error, closed} ->
+ rabbit_log_connection:info("Stream protocol connection has been closed by peer",
+ []),
+ throw({stop, normal});
+ {{segment, Segment1}, {credit, Credit1}} ->
+ ConsumerState1 =
+ ConsumerState#consumer{segment =
+ Segment1,
+ credit =
+ Credit1},
+ Consumers1 =
+ Consumers#{SubscriptionId =>
+ ConsumerState1},
+
+ StreamSubscriptions1 =
+ case StreamSubscriptions of
+ #{Stream := SubscriptionIds} ->
+ StreamSubscriptions#{Stream =>
+ [SubscriptionId]
+ ++ SubscriptionIds};
+ _ ->
+ StreamSubscriptions#{Stream =>
+ [SubscriptionId]}
+ end,
+
+ #consumer{counters = ConsumerCounters1} =
+ ConsumerState1,
+
+ ConsumerOffset =
+ osiris_log:next_offset(Segment1),
+ ConsumerOffsetLag =
+ consumer_i(offset_lag, ConsumerState1),
+
+ rabbit_log:debug("Subscription ~p is now at offset ~p with ~p message(s) "
+ "distributed after subscription",
+ [SubscriptionId,
+ ConsumerOffset,
+ messages_consumed(ConsumerCounters1)]),
+
+ rabbit_stream_metrics:consumer_created(self(),
+ stream_r(Stream,
+ Connection1),
+ SubscriptionId,
+ Credit1,
+ messages_consumed(ConsumerCounters1),
+ ConsumerOffset,
+ ConsumerOffsetLag,
+ Properties),
+ {Connection1#stream_connection{stream_subscriptions
+ =
+ StreamSubscriptions1},
+ State#stream_connection_state{consumers =
+ Consumers1}}
+ end
end
end;
error ->
@@ -1893,22 +1909,9 @@ handle_frame_post_auth(Transport,
SendFileOct)
of
{error, closed} ->
- rabbit_log:warning("Stream protocol connection for subscription ~p has been closed, removing "
- "subscription",
- [SubscriptionId]),
- {Connection1, State1} =
- remove_subscription(SubscriptionId, Connection, State),
-
- Code = ?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST,
- Frame =
- rabbit_stream_core:frame({response, 1,
- {credit, Code,
- SubscriptionId}}),
- send(Transport, S, Frame),
- rabbit_global_counters:increase_protocol_counter(stream,
- ?SUBSCRIPTION_ID_DOES_NOT_EXIST,
- 1),
- {Connection1, State1};
+ rabbit_log_connection:info("Stream protocol connection has been closed by peer",
+ []),
+ throw({stop, normal});
{{segment, Segment1}, {credit, Credit1}} ->
Consumer1 =
Consumer#consumer{segment = Segment1, credit = Credit1},