summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2021-09-17 17:09:30 +0100
committerKarl Nilsson <kjnilsson@gmail.com>2021-09-17 17:09:30 +0100
commiteaa216da8246a2fc79108e3e0780996ab2bc11ac (patch)
tree68dcf3a6e94431fa75c0788cf9387b3526ea0c46
parent5779059bd5b1e14807cdd9d2bcf2180b1ba5e6e4 (diff)
downloadrabbitmq-server-git-eaa216da8246a2fc79108e3e0780996ab2bc11ac.tar.gz
QQ: emit release cursors after consumer cancel
If this is not done apps that consume/cancel from empty queues in a loop will grow the raft log in an unbounded manner. This could also be the case for the garbage_collect command.
-rw-r--r--deps/rabbit/src/rabbit_fifo.erl15
-rw-r--r--deps/rabbit/test/rabbit_fifo_SUITE.erl4
2 files changed, 12 insertions, 7 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl
index 17ede7b12c..ddc517a971 100644
--- a/deps/rabbit/src/rabbit_fifo.erl
+++ b/deps/rabbit/src/rabbit_fifo.erl
@@ -340,10 +340,13 @@ apply(#{index := Index,
{State, Reply, Effects}
end
end;
-apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
- {State, Effects} = cancel_consumer(Meta, ConsumerId, State0, [],
- consumer_cancel),
- checkout(Meta, State0, State, Effects);
+apply(#{index := Idx} = Meta,
+ #checkout{spec = cancel,
+ consumer_id = ConsumerId}, State0) ->
+ {State1, Effects1} = cancel_consumer(Meta, ConsumerId, State0, [],
+ consumer_cancel),
+ {State, Reply, Effects} = checkout(Meta, State0, State1, Effects1),
+ update_smallest_raft_index(Idx, Reply, State, Effects);
apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
consumer_id = {_, Pid} = ConsumerId},
State0) ->
@@ -372,8 +375,8 @@ apply(#{index := Index}, #purge{},
{State, _, Effects} = evaluate_limit(Index, false, State0,
State1, Effects0),
update_smallest_raft_index(Index, Reply, State, Effects);
-apply(_Meta, #garbage_collection{}, State) ->
- {State, ok, [{aux, garbage_collection}]};
+apply(#{index := Idx}, #garbage_collection{}, State) ->
+ update_smallest_raft_index(Idx, ok, State, [{aux, garbage_collection}]);
apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
#?MODULE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active},
diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl
index 1eed6a0d75..9cb401fc8e 100644
--- a/deps/rabbit/test/rabbit_fifo_SUITE.erl
+++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl
@@ -391,9 +391,11 @@ cancelled_checkout_empty_queue_test(_) ->
{State1, _} = check_auto(Cid, 2, test_init(test)),
% cancelled checkout should clear out service_queue also, else we'd get a
% build up of these
- {State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
+ {State2, _, Effects} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
?assertEqual(0, map_size(State2#rabbit_fifo.consumers)),
?assertEqual(0, priority_queue:len(State2#rabbit_fifo.service_queue)),
+ ct:pal("Effs: ~p", [Effects]),
+ ?ASSERT_EFF({release_cursor, _, _}, Effects),
ok.
cancelled_checkout_out_test(_) ->