diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-09-21 09:04:32 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-09-21 09:04:32 +0200 |
commit | 3b087f193f2b528c0a1f38b2ac7a27effd79dd78 (patch) | |
tree | 9cf4df447a9ca127964730d5c946f02b6e564df2 | |
parent | dc10074bba78e5bd81f6aea7d4576607fdd9a7e9 (diff) | |
parent | dc43970dd665fbebffc90fbdcd534d6126f9db85 (diff) | |
download | rabbitmq-server-git-3b087f193f2b528c0a1f38b2ac7a27effd79dd78.tar.gz |
Merge pull request #3460 from rabbitmq/mergify/bp/v3.9.x/pr-3448
Quorum Queue consumer cancellation fixes (backport #3448)
-rw-r--r-- | deps/rabbit/src/rabbit_fifo.erl | 51 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_fifo_SUITE.erl | 13 |
2 files changed, 41 insertions, 23 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index d932d20ebb..71172fce70 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -340,10 +340,13 @@ apply(#{index := Index, {State, Reply, Effects} end end; -apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) -> - {State, Effects} = cancel_consumer(Meta, ConsumerId, State0, [], - consumer_cancel), - checkout(Meta, State0, State, Effects); +apply(#{index := Idx} = Meta, + #checkout{spec = cancel, + consumer_id = ConsumerId}, State0) -> + {State1, Effects1} = cancel_consumer(Meta, ConsumerId, State0, [], + consumer_cancel), + {State, Reply, Effects} = checkout(Meta, State0, State1, Effects1), + update_smallest_raft_index(Idx, Reply, State, Effects); apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId}, State0) -> @@ -372,8 +375,8 @@ apply(#{index := Index}, #purge{}, {State, _, Effects} = evaluate_limit(Index, false, State0, State1, Effects0), update_smallest_raft_index(Index, Reply, State, Effects); -apply(_Meta, #garbage_collection{}, State) -> - {State, ok, [{aux, garbage_collection}]}; +apply(#{index := Idx}, #garbage_collection{}, State) -> + update_smallest_raft_index(Idx, ok, State, [{aux, garbage_collection}]); apply(#{system_time := Ts} = Meta, {down, Pid, noconnection}, #?MODULE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, @@ -506,13 +509,14 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, checkout(Meta, State0, State, Effects); apply(_, {nodedown, _Node}, State) -> {State, ok}; -apply(Meta, #purge_nodes{nodes = Nodes}, State0) -> +apply(#{index := Idx} = Meta, #purge_nodes{nodes = Nodes}, State0) -> {State, Effects} = lists:foldl(fun(Node, {S, E}) -> purge_node(Meta, Node, S, E) end, {State0, []}, Nodes), - {State, ok, Effects}; -apply(Meta, #update_config{config = Conf}, State) -> - checkout(Meta, State, update_config(Conf, State), []); + update_smallest_raft_index(Idx, ok, State, Effects); +apply(#{index := Idx} = Meta, #update_config{config = Conf}, State0) -> + {State, Reply, Effects} = checkout(Meta, State0, update_config(Conf, State0), []), + update_smallest_raft_index(Idx, Reply, State, Effects); apply(_Meta, {machine_version, 0, 1}, V0State) -> State = convert_v0_to_v1(V0State), {State, ok, []}; @@ -1750,25 +1754,26 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0, messages = Messages0, consumers = Cons0} = InitState) -> case priority_queue:out(SQ0) of - {{value, ConsumerId}, SQ1} -> + {{value, ConsumerId}, SQ1} + when is_map_key(ConsumerId, Cons0) -> case take_next_msg(InitState) of {ConsumerMsg, State0} -> %% there are consumers waiting to be serviced %% process consumer checkout - case maps:find(ConsumerId, Cons0) of - {ok, #consumer{credit = 0}} -> + case maps:get(ConsumerId, Cons0) of + #consumer{credit = 0} -> %% no credit but was still on queue %% can happen when draining %% recurse without consumer on queue checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}); - {ok, #consumer{status = cancelled}} -> + #consumer{status = cancelled} -> checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}); - {ok, #consumer{status = suspected_down}} -> + #consumer{status = suspected_down} -> checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}); - {ok, #consumer{checked_out = Checked0, - next_msg_id = Next, - credit = Credit, - delivery_count = DelCnt} = Con0} -> + #consumer{checked_out = Checked0, + next_msg_id = Next, + credit = Credit, + delivery_count = DelCnt} = Con0 -> Checked = maps:put(Next, ConsumerMsg, Checked0), Con = Con0#consumer{checked_out = Checked, next_msg_id = Next + 1, @@ -1795,14 +1800,14 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0, add_bytes_checkout(Header, State1)), M} end, - {success, ConsumerId, Next, Msg, State}; - error -> - %% consumer did not exist but was queued, recurse - checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}) + {success, ConsumerId, Next, Msg, State} end; empty -> {nochange, InitState} end; + {{value, _ConsumerId}, SQ1} -> + %% consumer did not exist but was queued, recurse + checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}); {empty, _} -> case lqueue:len(Messages0) of 0 -> {nochange, InitState}; diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index 2c5b15295f..9cb401fc8e 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -386,6 +386,18 @@ return_auto_checked_out_test(_) -> Effects), ok. +cancelled_checkout_empty_queue_test(_) -> + Cid = {<<"cid">>, self()}, + {State1, _} = check_auto(Cid, 2, test_init(test)), + % cancelled checkout should clear out service_queue also, else we'd get a + % build up of these + {State2, _, Effects} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1), + ?assertEqual(0, map_size(State2#rabbit_fifo.consumers)), + ?assertEqual(0, priority_queue:len(State2#rabbit_fifo.service_queue)), + ct:pal("Effs: ~p", [Effects]), + ?ASSERT_EFF({release_cursor, _, _}, Effects), + ok. + cancelled_checkout_out_test(_) -> Cid = {<<"cid">>, self()}, {State00, [_, _]} = enq(1, 1, first, test_init(test)), @@ -395,6 +407,7 @@ cancelled_checkout_out_test(_) -> {State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1), ?assertEqual(1, lqueue:len(State2#rabbit_fifo.messages)), ?assertEqual(0, lqueue:len(State2#rabbit_fifo.returns)), + ?assertEqual(0, priority_queue:len(State2#rabbit_fifo.service_queue)), {State3, {dequeue, empty}} = apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, settled}, #{}), State2), |