summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2021-09-13 11:23:35 +0100
committerKarl Nilsson <kjnilsson@gmail.com>2021-09-13 11:23:35 +0100
commit135575b3ffc83fa32218278a066b5a872a21ff85 (patch)
treed351898eadf0e32421675a2d4a0631fd11f58239
parent239e136480e4f947d91316b8849f5fb8123e0801 (diff)
downloadrabbitmq-server-git-135575b3ffc83fa32218278a066b5a872a21ff85.tar.gz
Stream reader: close osiris logs and sockets in terminate
Instead of injecting it into varios places inside the code. When the osiris log is closed it will decrement the global "readers" counter which is why it is much safer to do this in terminate.
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl17
1 files changed, 7 insertions, 10 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 412be10650..e0b4337c87 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -173,11 +173,15 @@
callback_mode() ->
[state_functions, state_enter].
-terminate(Reason, State, StatemData) ->
+terminate(Reason, State,
+ #statem_data{transport = Transport,
+ connection = #stream_connection{socket = Socket},
+ connection_state = ConnectionState} = StatemData) ->
+ close(Transport, Socket, ConnectionState),
rabbit_networking:unregister_non_amqp_connection(self()),
notify_connection_closed(StatemData),
- rabbit_log:debug("~p terminating in state '~s' with reason '~p'",
- [?MODULE, State, Reason]).
+ rabbit_log:debug("~s terminating in state '~s' with reason '~W'",
+ [?MODULE, State, Reason, 10]).
start_link(KeepaliveSup, Transport, Ref, Opts) ->
{ok,
@@ -713,7 +717,6 @@ open(info, {OK, S, Data},
#stream_connection{connection_step = Step} = Connection1,
case Step of
closing ->
- close(Transport, S, State),
stop;
close_sent ->
rabbit_log_connection:debug("Transitioned to close_sent"),
@@ -808,7 +811,6 @@ open(info, heartbeat_send,
rabbit_log_connection:info("Heartbeat send error ~p, closing connection",
[Unexpected]),
_C1 = demonitor_all_streams(Connection),
- close(Transport, S, State),
stop
end;
open(info, heartbeat_timeout,
@@ -817,7 +819,6 @@ open(info, heartbeat_timeout,
connection_state = State}) ->
rabbit_log_connection:debug("Heartbeat timeout, closing connection"),
_C1 = demonitor_all_streams(Connection),
- close(Transport, S, State),
stop;
open(info, {infos, From},
#statem_data{connection =
@@ -857,7 +858,6 @@ open({call, From}, {shutdown, Explanation},
rabbit_log_connection:info("Forcing stream connection ~p closing: ~p",
[self(), Explanation]),
demonitor_all_streams(Connection),
- close(Transport, S, State),
{stop_and_reply, normal, {reply, From, ok}};
open(cast,
{queue_event, _, {osiris_written, _, undefined, CorrelationList}},
@@ -1060,7 +1060,6 @@ close_sent(state_timeout, close,
connection_state = State}) ->
rabbit_log_connection:warning("Closing connection because of timeout in state '~s' likely due to lack of client action.",
[?FUNCTION_NAME]),
- close(Transport, Socket, State),
stop;
close_sent(info, {tcp, S, Data},
#statem_data{transport = Transport,
@@ -1075,7 +1074,6 @@ close_sent(info, {tcp, S, Data},
[?FUNCTION_NAME, Step]),
case Step of
closing_done ->
- close(Transport, S, State1),
stop;
_ ->
Transport:setopts(S, [{active, once}]),
@@ -1091,7 +1089,6 @@ close_sent(info, {tcp_error, S, Reason},
#statem_data{transport = Transport, connection_state = State}) ->
rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] [~w]",
[Reason, S, self()]),
- close(Transport, S, State),
stop;
close_sent(info, {resource_alarm, IsThereAlarm},
StatemData = #statem_data{connection = Connection}) ->