summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2021-09-13 11:38:41 +0100
committerKarl Nilsson <kjnilsson@gmail.com>2021-09-13 11:38:41 +0100
commit9e4506041d3305dfa32bd2d60c7d9491bee38551 (patch)
tree5e1427fed90c1fa47890c815c91dea5b8d0872fa
parent135575b3ffc83fa32218278a066b5a872a21ff85 (diff)
downloadrabbitmq-server-git-9e4506041d3305dfa32bd2d60c7d9491bee38551.tar.gz
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl21
1 files changed, 5 insertions, 16 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index e0b4337c87..fdfe74dcf9 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -1,6 +1,4 @@
%% The contents of this file are subject to the Mozilla Public License
-%% Version 2.0 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
%% at https://www.mozilla.org/en-US/MPL/2.0/
%%
%% Software distributed under the License is distributed on an "AS IS"
@@ -801,8 +799,7 @@ open(info, {'DOWN', MonitorRef, process, _OsirisPid, _Reason},
connection_state = State1}};
open(info, heartbeat_send,
#statem_data{transport = Transport,
- connection = #stream_connection{socket = S} = Connection,
- connection_state = State}) ->
+ connection = #stream_connection{socket = S} = Connection}) ->
Frame = rabbit_stream_core:frame(heartbeat),
case catch send(Transport, S, Frame) of
ok ->
@@ -814,9 +811,7 @@ open(info, heartbeat_send,
stop
end;
open(info, heartbeat_timeout,
- #statem_data{transport = Transport,
- connection = #stream_connection{socket = S} = Connection,
- connection_state = State}) ->
+ #statem_data{connection = #stream_connection{} = Connection}) ->
rabbit_log_connection:debug("Heartbeat timeout, closing connection"),
_C1 = demonitor_all_streams(Connection),
stop;
@@ -851,9 +846,7 @@ open({call, From}, {publishers_info, Items},
{keep_state_and_data,
{reply, From, publishers_infos(Items, Connection)}};
open({call, From}, {shutdown, Explanation},
- #statem_data{transport = Transport,
- connection = #stream_connection{socket = S} = Connection,
- connection_state = State}) ->
+ #statem_data{connection = Connection}) ->
% likely closing call from the management plugin
rabbit_log_connection:info("Forcing stream connection ~p closing: ~p",
[self(), Explanation]),
@@ -1054,10 +1047,7 @@ close_sent(enter, _OldState,
#configuration{connection_negotiation_step_timeout =
StateTimeout}}) ->
{keep_state_and_data, {state_timeout, StateTimeout, close}};
-close_sent(state_timeout, close,
- #statem_data{transport = Transport,
- connection = #stream_connection{socket = Socket},
- connection_state = State}) ->
+close_sent(state_timeout, close, #statem_data{}) ->
rabbit_log_connection:warning("Closing connection because of timeout in state '~s' likely due to lack of client action.",
[?FUNCTION_NAME]),
stop;
@@ -1085,8 +1075,7 @@ close_sent(info, {tcp_closed, S}, _StatemData) ->
rabbit_log_connection:debug("Stream protocol connection socket ~w closed [~w]",
[S, self()]),
stop;
-close_sent(info, {tcp_error, S, Reason},
- #statem_data{transport = Transport, connection_state = State}) ->
+close_sent(info, {tcp_error, S, Reason}, #statem_data{}) ->
rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] [~w]",
[Reason, S, self()]),
stop;