summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-09-21 09:04:32 +0200
committerGitHub <noreply@github.com>2021-09-21 09:04:32 +0200
commit3b087f193f2b528c0a1f38b2ac7a27effd79dd78 (patch)
tree9cf4df447a9ca127964730d5c946f02b6e564df2
parentdc10074bba78e5bd81f6aea7d4576607fdd9a7e9 (diff)
parentdc43970dd665fbebffc90fbdcd534d6126f9db85 (diff)
downloadrabbitmq-server-git-3b087f193f2b528c0a1f38b2ac7a27effd79dd78.tar.gz
Merge pull request #3460 from rabbitmq/mergify/bp/v3.9.x/pr-3448
Quorum Queue consumer cancellation fixes (backport #3448)
-rw-r--r--deps/rabbit/src/rabbit_fifo.erl51
-rw-r--r--deps/rabbit/test/rabbit_fifo_SUITE.erl13
2 files changed, 41 insertions, 23 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl
index d932d20ebb..71172fce70 100644
--- a/deps/rabbit/src/rabbit_fifo.erl
+++ b/deps/rabbit/src/rabbit_fifo.erl
@@ -340,10 +340,13 @@ apply(#{index := Index,
{State, Reply, Effects}
end
end;
-apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
- {State, Effects} = cancel_consumer(Meta, ConsumerId, State0, [],
- consumer_cancel),
- checkout(Meta, State0, State, Effects);
+apply(#{index := Idx} = Meta,
+ #checkout{spec = cancel,
+ consumer_id = ConsumerId}, State0) ->
+ {State1, Effects1} = cancel_consumer(Meta, ConsumerId, State0, [],
+ consumer_cancel),
+ {State, Reply, Effects} = checkout(Meta, State0, State1, Effects1),
+ update_smallest_raft_index(Idx, Reply, State, Effects);
apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
consumer_id = {_, Pid} = ConsumerId},
State0) ->
@@ -372,8 +375,8 @@ apply(#{index := Index}, #purge{},
{State, _, Effects} = evaluate_limit(Index, false, State0,
State1, Effects0),
update_smallest_raft_index(Index, Reply, State, Effects);
-apply(_Meta, #garbage_collection{}, State) ->
- {State, ok, [{aux, garbage_collection}]};
+apply(#{index := Idx}, #garbage_collection{}, State) ->
+ update_smallest_raft_index(Idx, ok, State, [{aux, garbage_collection}]);
apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
#?MODULE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active},
@@ -506,13 +509,14 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
checkout(Meta, State0, State, Effects);
apply(_, {nodedown, _Node}, State) ->
{State, ok};
-apply(Meta, #purge_nodes{nodes = Nodes}, State0) ->
+apply(#{index := Idx} = Meta, #purge_nodes{nodes = Nodes}, State0) ->
{State, Effects} = lists:foldl(fun(Node, {S, E}) ->
purge_node(Meta, Node, S, E)
end, {State0, []}, Nodes),
- {State, ok, Effects};
-apply(Meta, #update_config{config = Conf}, State) ->
- checkout(Meta, State, update_config(Conf, State), []);
+ update_smallest_raft_index(Idx, ok, State, Effects);
+apply(#{index := Idx} = Meta, #update_config{config = Conf}, State0) ->
+ {State, Reply, Effects} = checkout(Meta, State0, update_config(Conf, State0), []),
+ update_smallest_raft_index(Idx, Reply, State, Effects);
apply(_Meta, {machine_version, 0, 1}, V0State) ->
State = convert_v0_to_v1(V0State),
{State, ok, []};
@@ -1750,25 +1754,26 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0,
messages = Messages0,
consumers = Cons0} = InitState) ->
case priority_queue:out(SQ0) of
- {{value, ConsumerId}, SQ1} ->
+ {{value, ConsumerId}, SQ1}
+ when is_map_key(ConsumerId, Cons0) ->
case take_next_msg(InitState) of
{ConsumerMsg, State0} ->
%% there are consumers waiting to be serviced
%% process consumer checkout
- case maps:find(ConsumerId, Cons0) of
- {ok, #consumer{credit = 0}} ->
+ case maps:get(ConsumerId, Cons0) of
+ #consumer{credit = 0} ->
%% no credit but was still on queue
%% can happen when draining
%% recurse without consumer on queue
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
- {ok, #consumer{status = cancelled}} ->
+ #consumer{status = cancelled} ->
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
- {ok, #consumer{status = suspected_down}} ->
+ #consumer{status = suspected_down} ->
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
- {ok, #consumer{checked_out = Checked0,
- next_msg_id = Next,
- credit = Credit,
- delivery_count = DelCnt} = Con0} ->
+ #consumer{checked_out = Checked0,
+ next_msg_id = Next,
+ credit = Credit,
+ delivery_count = DelCnt} = Con0 ->
Checked = maps:put(Next, ConsumerMsg, Checked0),
Con = Con0#consumer{checked_out = Checked,
next_msg_id = Next + 1,
@@ -1795,14 +1800,14 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0,
add_bytes_checkout(Header, State1)),
M}
end,
- {success, ConsumerId, Next, Msg, State};
- error ->
- %% consumer did not exist but was queued, recurse
- checkout_one(Meta, InitState#?MODULE{service_queue = SQ1})
+ {success, ConsumerId, Next, Msg, State}
end;
empty ->
{nochange, InitState}
end;
+ {{value, _ConsumerId}, SQ1} ->
+ %% consumer did not exist but was queued, recurse
+ checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
{empty, _} ->
case lqueue:len(Messages0) of
0 -> {nochange, InitState};
diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl
index 2c5b15295f..9cb401fc8e 100644
--- a/deps/rabbit/test/rabbit_fifo_SUITE.erl
+++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl
@@ -386,6 +386,18 @@ return_auto_checked_out_test(_) ->
Effects),
ok.
+cancelled_checkout_empty_queue_test(_) ->
+ Cid = {<<"cid">>, self()},
+ {State1, _} = check_auto(Cid, 2, test_init(test)),
+ % cancelled checkout should clear out service_queue also, else we'd get a
+ % build up of these
+ {State2, _, Effects} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
+ ?assertEqual(0, map_size(State2#rabbit_fifo.consumers)),
+ ?assertEqual(0, priority_queue:len(State2#rabbit_fifo.service_queue)),
+ ct:pal("Effs: ~p", [Effects]),
+ ?ASSERT_EFF({release_cursor, _, _}, Effects),
+ ok.
+
cancelled_checkout_out_test(_) ->
Cid = {<<"cid">>, self()},
{State00, [_, _]} = enq(1, 1, first, test_init(test)),
@@ -395,6 +407,7 @@ cancelled_checkout_out_test(_) ->
{State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
?assertEqual(1, lqueue:len(State2#rabbit_fifo.messages)),
?assertEqual(0, lqueue:len(State2#rabbit_fifo.returns)),
+ ?assertEqual(0, priority_queue:len(State2#rabbit_fifo.service_queue)),
{State3, {dequeue, empty}} =
apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, settled}, #{}), State2),