diff options
author | Karl Nilsson <kjnilsson@gmail.com> | 2021-09-17 14:53:33 +0100 |
---|---|---|
committer | Karl Nilsson <kjnilsson@gmail.com> | 2021-09-17 14:53:33 +0100 |
commit | 5779059bd5b1e14807cdd9d2bcf2180b1ba5e6e4 (patch) | |
tree | fe92a5d94ec9053ed15074c7626331f43163a4f4 | |
parent | bc05489ab9037654923c2e4168868fe447a59770 (diff) | |
download | rabbitmq-server-git-5779059bd5b1e14807cdd9d2bcf2180b1ba5e6e4.tar.gz |
QQ: fix memory leak when cancelling consumer
If the queue is empty when a consumer is cancelled it would leave the
consumer id inside the service queue. If an application subscribes/unsubscibes
in a loop from an empty queue this would cause the service queue to never be
cleared up.
NB: whenever we make a change to how the quorum queue state machien is
calculated we need to consider how this effects determinism as during an
upgrade different members may calculate a different service queue state.
In this case it should be ok as they will eventually converge on the same
state once all "dead" consumer ids have been removed from the queue.
In any case it should not affect how messages are assigned to consumers.
-rw-r--r-- | deps/rabbit/src/rabbit_fifo.erl | 27 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_fifo_SUITE.erl | 11 |
2 files changed, 25 insertions, 13 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index d932d20ebb..17ede7b12c 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -1750,25 +1750,26 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0, messages = Messages0, consumers = Cons0} = InitState) -> case priority_queue:out(SQ0) of - {{value, ConsumerId}, SQ1} -> + {{value, ConsumerId}, SQ1} + when is_map_key(ConsumerId, Cons0) -> case take_next_msg(InitState) of {ConsumerMsg, State0} -> %% there are consumers waiting to be serviced %% process consumer checkout - case maps:find(ConsumerId, Cons0) of - {ok, #consumer{credit = 0}} -> + case maps:get(ConsumerId, Cons0) of + #consumer{credit = 0} -> %% no credit but was still on queue %% can happen when draining %% recurse without consumer on queue checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}); - {ok, #consumer{status = cancelled}} -> + #consumer{status = cancelled} -> checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}); - {ok, #consumer{status = suspected_down}} -> + #consumer{status = suspected_down} -> checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}); - {ok, #consumer{checked_out = Checked0, - next_msg_id = Next, - credit = Credit, - delivery_count = DelCnt} = Con0} -> + #consumer{checked_out = Checked0, + next_msg_id = Next, + credit = Credit, + delivery_count = DelCnt} = Con0 -> Checked = maps:put(Next, ConsumerMsg, Checked0), Con = Con0#consumer{checked_out = Checked, next_msg_id = Next + 1, @@ -1795,14 +1796,14 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0, add_bytes_checkout(Header, State1)), M} end, - {success, ConsumerId, Next, Msg, State}; - error -> - %% consumer did not exist but was queued, recurse - checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}) + {success, ConsumerId, Next, Msg, State} end; empty -> {nochange, InitState} end; + {{value, _ConsumerId}, SQ1} -> + %% consumer did not exist but was queued, recurse + checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}); {empty, _} -> case lqueue:len(Messages0) of 0 -> {nochange, InitState}; diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index 2c5b15295f..1eed6a0d75 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -386,6 +386,16 @@ return_auto_checked_out_test(_) -> Effects), ok. +cancelled_checkout_empty_queue_test(_) -> + Cid = {<<"cid">>, self()}, + {State1, _} = check_auto(Cid, 2, test_init(test)), + % cancelled checkout should clear out service_queue also, else we'd get a + % build up of these + {State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1), + ?assertEqual(0, map_size(State2#rabbit_fifo.consumers)), + ?assertEqual(0, priority_queue:len(State2#rabbit_fifo.service_queue)), + ok. + cancelled_checkout_out_test(_) -> Cid = {<<"cid">>, self()}, {State00, [_, _]} = enq(1, 1, first, test_init(test)), @@ -395,6 +405,7 @@ cancelled_checkout_out_test(_) -> {State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1), ?assertEqual(1, lqueue:len(State2#rabbit_fifo.messages)), ?assertEqual(0, lqueue:len(State2#rabbit_fifo.returns)), + ?assertEqual(0, priority_queue:len(State2#rabbit_fifo.service_queue)), {State3, {dequeue, empty}} = apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, settled}, #{}), State2), |