summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michaelklishin@me.com>2022-06-21 23:17:23 +0400
committerGitHub <noreply@github.com>2022-06-21 23:17:23 +0400
commitbebe9654a06e1a8f730a416e61a296defbae14f0 (patch)
tree1377e5f33ab24f6fc9bb6bd5991cf9ada278a3a6
parent2e3ba4c1d7c46f6cce0e4461b8e109de656301eb (diff)
parent8635ded7fe77435e53d69048aac6907bffa1f59c (diff)
downloadrabbitmq-server-git-bebe9654a06e1a8f730a416e61a296defbae14f0.tar.gz
Merge pull request #5085 from rabbitmq/amqp-stream-consumer-file-handle-leak-fix
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl26
1 files changed, 18 insertions, 8 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index 7f0b6a0783..04a74b460c 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -312,14 +312,20 @@ begin_stream(#stream_client{name = QName, readers = Readers0} = State0,
cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
name = QName} = State) ->
- Readers = maps:remove(ConsumerTag, Readers0),
- rabbit_core_metrics:consumer_deleted(self(), ConsumerTag, QName),
- rabbit_event:notify(consumer_deleted, [{consumer_tag, ConsumerTag},
- {channel, self()},
- {queue, QName},
- {user_who_performed_action, ActingUser}]),
- maybe_send_reply(self(), OkMsg),
- {ok, State#stream_client{readers = Readers}}.
+ case maps:take(ConsumerTag, Readers0) of
+ {#stream{log = Log}, Readers} ->
+ ok = close_log(Log),
+ rabbit_core_metrics:consumer_deleted(self(), ConsumerTag, QName),
+ rabbit_event:notify(consumer_deleted,
+ [{consumer_tag, ConsumerTag},
+ {channel, self()},
+ {queue, QName},
+ {user_who_performed_action, ActingUser}]),
+ maybe_send_reply(self(), OkMsg),
+ {ok, State#stream_client{readers = Readers}};
+ error ->
+ {ok, State}
+ end.
credit(CTag, Credit, Drain, #stream_client{readers = Readers0,
name = Name,
@@ -1019,3 +1025,7 @@ set_leader_pid(Pid, QName) ->
_ ->
ok
end.
+
+close_log(undefined) -> ok;
+close_log(Log) ->
+ osiris_log:close(Log).