diff options
author | Karl Nilsson <kjnilsson@gmail.com> | 2021-09-17 17:09:30 +0100 |
---|---|---|
committer | Karl Nilsson <kjnilsson@gmail.com> | 2021-09-17 17:09:30 +0100 |
commit | eaa216da8246a2fc79108e3e0780996ab2bc11ac (patch) | |
tree | 68dcf3a6e94431fa75c0788cf9387b3526ea0c46 | |
parent | 5779059bd5b1e14807cdd9d2bcf2180b1ba5e6e4 (diff) | |
download | rabbitmq-server-git-eaa216da8246a2fc79108e3e0780996ab2bc11ac.tar.gz |
QQ: emit release cursors after consumer cancel
If this is not done apps that consume/cancel from empty queues in a loop
will grow the raft log in an unbounded manner. This could also be the
case for the garbage_collect command.
-rw-r--r-- | deps/rabbit/src/rabbit_fifo.erl | 15 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_fifo_SUITE.erl | 4 |
2 files changed, 12 insertions, 7 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 17ede7b12c..ddc517a971 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -340,10 +340,13 @@ apply(#{index := Index, {State, Reply, Effects} end end; -apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) -> - {State, Effects} = cancel_consumer(Meta, ConsumerId, State0, [], - consumer_cancel), - checkout(Meta, State0, State, Effects); +apply(#{index := Idx} = Meta, + #checkout{spec = cancel, + consumer_id = ConsumerId}, State0) -> + {State1, Effects1} = cancel_consumer(Meta, ConsumerId, State0, [], + consumer_cancel), + {State, Reply, Effects} = checkout(Meta, State0, State1, Effects1), + update_smallest_raft_index(Idx, Reply, State, Effects); apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId}, State0) -> @@ -372,8 +375,8 @@ apply(#{index := Index}, #purge{}, {State, _, Effects} = evaluate_limit(Index, false, State0, State1, Effects0), update_smallest_raft_index(Index, Reply, State, Effects); -apply(_Meta, #garbage_collection{}, State) -> - {State, ok, [{aux, garbage_collection}]}; +apply(#{index := Idx}, #garbage_collection{}, State) -> + update_smallest_raft_index(Idx, ok, State, [{aux, garbage_collection}]); apply(#{system_time := Ts} = Meta, {down, Pid, noconnection}, #?MODULE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index 1eed6a0d75..9cb401fc8e 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -391,9 +391,11 @@ cancelled_checkout_empty_queue_test(_) -> {State1, _} = check_auto(Cid, 2, test_init(test)), % cancelled checkout should clear out service_queue also, else we'd get a % build up of these - {State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1), + {State2, _, Effects} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1), ?assertEqual(0, map_size(State2#rabbit_fifo.consumers)), ?assertEqual(0, priority_queue:len(State2#rabbit_fifo.service_queue)), + ct:pal("Effs: ~p", [Effects]), + ?ASSERT_EFF({release_cursor, _, _}, Effects), ok. cancelled_checkout_out_test(_) -> |