diff options
author | Karl Nilsson <kjnilsson@gmail.com> | 2021-09-13 11:23:35 +0100 |
---|---|---|
committer | Karl Nilsson <kjnilsson@gmail.com> | 2021-09-13 11:23:35 +0100 |
commit | 135575b3ffc83fa32218278a066b5a872a21ff85 (patch) | |
tree | d351898eadf0e32421675a2d4a0631fd11f58239 | |
parent | 239e136480e4f947d91316b8849f5fb8123e0801 (diff) | |
download | rabbitmq-server-git-135575b3ffc83fa32218278a066b5a872a21ff85.tar.gz |
Stream reader: close osiris logs and sockets in terminate
Instead of injecting it into varios places inside the code.
When the osiris log is closed it will decrement the global "readers"
counter which is why it is much safer to do this in terminate.
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 17 |
1 files changed, 7 insertions, 10 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 412be10650..e0b4337c87 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -173,11 +173,15 @@ callback_mode() -> [state_functions, state_enter]. -terminate(Reason, State, StatemData) -> +terminate(Reason, State, + #statem_data{transport = Transport, + connection = #stream_connection{socket = Socket}, + connection_state = ConnectionState} = StatemData) -> + close(Transport, Socket, ConnectionState), rabbit_networking:unregister_non_amqp_connection(self()), notify_connection_closed(StatemData), - rabbit_log:debug("~p terminating in state '~s' with reason '~p'", - [?MODULE, State, Reason]). + rabbit_log:debug("~s terminating in state '~s' with reason '~W'", + [?MODULE, State, Reason, 10]). start_link(KeepaliveSup, Transport, Ref, Opts) -> {ok, @@ -713,7 +717,6 @@ open(info, {OK, S, Data}, #stream_connection{connection_step = Step} = Connection1, case Step of closing -> - close(Transport, S, State), stop; close_sent -> rabbit_log_connection:debug("Transitioned to close_sent"), @@ -808,7 +811,6 @@ open(info, heartbeat_send, rabbit_log_connection:info("Heartbeat send error ~p, closing connection", [Unexpected]), _C1 = demonitor_all_streams(Connection), - close(Transport, S, State), stop end; open(info, heartbeat_timeout, @@ -817,7 +819,6 @@ open(info, heartbeat_timeout, connection_state = State}) -> rabbit_log_connection:debug("Heartbeat timeout, closing connection"), _C1 = demonitor_all_streams(Connection), - close(Transport, S, State), stop; open(info, {infos, From}, #statem_data{connection = @@ -857,7 +858,6 @@ open({call, From}, {shutdown, Explanation}, rabbit_log_connection:info("Forcing stream connection ~p closing: ~p", [self(), Explanation]), demonitor_all_streams(Connection), - close(Transport, S, State), {stop_and_reply, normal, {reply, From, ok}}; open(cast, {queue_event, _, {osiris_written, _, undefined, CorrelationList}}, @@ -1060,7 +1060,6 @@ close_sent(state_timeout, close, connection_state = State}) -> rabbit_log_connection:warning("Closing connection because of timeout in state '~s' likely due to lack of client action.", [?FUNCTION_NAME]), - close(Transport, Socket, State), stop; close_sent(info, {tcp, S, Data}, #statem_data{transport = Transport, @@ -1075,7 +1074,6 @@ close_sent(info, {tcp, S, Data}, [?FUNCTION_NAME, Step]), case Step of closing_done -> - close(Transport, S, State1), stop; _ -> Transport:setopts(S, [{active, once}]), @@ -1091,7 +1089,6 @@ close_sent(info, {tcp_error, S, Reason}, #statem_data{transport = Transport, connection_state = State}) -> rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] [~w]", [Reason, S, self()]), - close(Transport, S, State), stop; close_sent(info, {resource_alarm, IsThereAlarm}, StatemData = #statem_data{connection = Connection}) -> |