summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2021-09-17 14:53:33 +0100
committerKarl Nilsson <kjnilsson@gmail.com>2021-09-17 14:53:33 +0100
commit5779059bd5b1e14807cdd9d2bcf2180b1ba5e6e4 (patch)
treefe92a5d94ec9053ed15074c7626331f43163a4f4
parentbc05489ab9037654923c2e4168868fe447a59770 (diff)
downloadrabbitmq-server-git-5779059bd5b1e14807cdd9d2bcf2180b1ba5e6e4.tar.gz
QQ: fix memory leak when cancelling consumer
If the queue is empty when a consumer is cancelled it would leave the consumer id inside the service queue. If an application subscribes/unsubscibes in a loop from an empty queue this would cause the service queue to never be cleared up. NB: whenever we make a change to how the quorum queue state machien is calculated we need to consider how this effects determinism as during an upgrade different members may calculate a different service queue state. In this case it should be ok as they will eventually converge on the same state once all "dead" consumer ids have been removed from the queue. In any case it should not affect how messages are assigned to consumers.
-rw-r--r--deps/rabbit/src/rabbit_fifo.erl27
-rw-r--r--deps/rabbit/test/rabbit_fifo_SUITE.erl11
2 files changed, 25 insertions, 13 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl
index d932d20ebb..17ede7b12c 100644
--- a/deps/rabbit/src/rabbit_fifo.erl
+++ b/deps/rabbit/src/rabbit_fifo.erl
@@ -1750,25 +1750,26 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0,
messages = Messages0,
consumers = Cons0} = InitState) ->
case priority_queue:out(SQ0) of
- {{value, ConsumerId}, SQ1} ->
+ {{value, ConsumerId}, SQ1}
+ when is_map_key(ConsumerId, Cons0) ->
case take_next_msg(InitState) of
{ConsumerMsg, State0} ->
%% there are consumers waiting to be serviced
%% process consumer checkout
- case maps:find(ConsumerId, Cons0) of
- {ok, #consumer{credit = 0}} ->
+ case maps:get(ConsumerId, Cons0) of
+ #consumer{credit = 0} ->
%% no credit but was still on queue
%% can happen when draining
%% recurse without consumer on queue
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
- {ok, #consumer{status = cancelled}} ->
+ #consumer{status = cancelled} ->
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
- {ok, #consumer{status = suspected_down}} ->
+ #consumer{status = suspected_down} ->
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
- {ok, #consumer{checked_out = Checked0,
- next_msg_id = Next,
- credit = Credit,
- delivery_count = DelCnt} = Con0} ->
+ #consumer{checked_out = Checked0,
+ next_msg_id = Next,
+ credit = Credit,
+ delivery_count = DelCnt} = Con0 ->
Checked = maps:put(Next, ConsumerMsg, Checked0),
Con = Con0#consumer{checked_out = Checked,
next_msg_id = Next + 1,
@@ -1795,14 +1796,14 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0,
add_bytes_checkout(Header, State1)),
M}
end,
- {success, ConsumerId, Next, Msg, State};
- error ->
- %% consumer did not exist but was queued, recurse
- checkout_one(Meta, InitState#?MODULE{service_queue = SQ1})
+ {success, ConsumerId, Next, Msg, State}
end;
empty ->
{nochange, InitState}
end;
+ {{value, _ConsumerId}, SQ1} ->
+ %% consumer did not exist but was queued, recurse
+ checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
{empty, _} ->
case lqueue:len(Messages0) of
0 -> {nochange, InitState};
diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl
index 2c5b15295f..1eed6a0d75 100644
--- a/deps/rabbit/test/rabbit_fifo_SUITE.erl
+++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl
@@ -386,6 +386,16 @@ return_auto_checked_out_test(_) ->
Effects),
ok.
+cancelled_checkout_empty_queue_test(_) ->
+ Cid = {<<"cid">>, self()},
+ {State1, _} = check_auto(Cid, 2, test_init(test)),
+ % cancelled checkout should clear out service_queue also, else we'd get a
+ % build up of these
+ {State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
+ ?assertEqual(0, map_size(State2#rabbit_fifo.consumers)),
+ ?assertEqual(0, priority_queue:len(State2#rabbit_fifo.service_queue)),
+ ok.
+
cancelled_checkout_out_test(_) ->
Cid = {<<"cid">>, self()},
{State00, [_, _]} = enq(1, 1, first, test_init(test)),
@@ -395,6 +405,7 @@ cancelled_checkout_out_test(_) ->
{State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
?assertEqual(1, lqueue:len(State2#rabbit_fifo.messages)),
?assertEqual(0, lqueue:len(State2#rabbit_fifo.returns)),
+ ?assertEqual(0, priority_queue:len(State2#rabbit_fifo.service_queue)),
{State3, {dequeue, empty}} =
apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, settled}, #{}), State2),