summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-02-28 15:33:10 +0100
committerGitHub <noreply@github.com>2020-02-28 15:33:10 +0100
commit7aa77b2f0b832fd1fd4fa08f8e0ef86a7009e1fe (patch)
treef3035422a09194a91ba49512fed85c15463fae66
parenteb061384fe508a8acaa39b75d32afd7c5a8ecee2 (diff)
parent877c1df3098af717f043e2fc8db32aae05257873 (diff)
downloadrabbitmq-server-git-7aa77b2f0b832fd1fd4fa08f8e0ef86a7009e1fe.tar.gz
Merge pull request #2262 from rabbitmq/qq-dead-letter-bug
Fix QQ dead letter crash
-rw-r--r--src/rabbit_fifo.erl44
-rw-r--r--test/quorum_queue_SUITE.erl16
2 files changed, 47 insertions, 13 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 72109cfbf4..d5fab879c2 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -1162,12 +1162,26 @@ dead_letter_effects(_Reason, _Discarded,
dead_letter_effects(Reason, Discarded,
#?MODULE{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}},
Effects) ->
- DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}}, Acc) ->
- [{Reason, Msg} | Acc];
- (_, _, Acc) ->
- Acc
- end, [], Discarded),
- [{mod_call, Mod, Fun, Args ++ [DeadLetters]} | Effects].
+ RaftIdxs = maps:fold(
+ fun (_, {_, {RaftIdx, {_Header, 'empty'}}}, Acc) ->
+ [RaftIdx | Acc];
+ (_, _, Acc) ->
+ Acc
+ end, [], Discarded),
+ [{log, RaftIdxs,
+ fun (Log) ->
+ Lookup = maps:from_list(lists:zip(RaftIdxs, Log)),
+ DeadLetters = maps:fold(
+ fun (_, {_, {RaftIdx, {_Header, 'empty'}}}, Acc) ->
+ {enqueue, _, _, Msg} = maps:get(RaftIdx, Lookup),
+ [{Reason, Msg} | Acc];
+ (_, {_, {_, {_Header, Msg}}}, Acc) ->
+ [{Reason, Msg} | Acc];
+ (_, _, Acc) ->
+ Acc
+ end, [], Discarded),
+ [{mod_call, Mod, Fun, Args ++ [DeadLetters]}]
+ end} | Effects].
cancel_consumer_effects(ConsumerId,
#?MODULE{cfg = #cfg{resource = QName}}, Effects) ->
@@ -1235,7 +1249,8 @@ return_one(MsgId, 0, {Tag, Header0},
%% this should not affect the release cursor in any way
Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
{Msg, State1} = case Tag of
- '$empty_msg' -> {Msg0, State0};
+ '$empty_msg' ->
+ {Msg0, State0};
_ -> case evaluate_memory_limit(Header, State0) of
true ->
{{'$empty_msg', Header}, State0};
@@ -1310,14 +1325,15 @@ checkout(#{index := Index}, State0, Effects0) ->
{State, ok, Effects}
end.
-checkout0({success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State}, Effects,
- {SendAcc, LogAcc0}) ->
+checkout0({success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State},
+ Effects, {SendAcc, LogAcc0}) ->
DelMsg = {RaftIdx, {MsgId, Header}},
LogAcc = maps:update_with(ConsumerId,
fun (M) -> [DelMsg | M] end,
[DelMsg], LogAcc0),
checkout0(checkout_one(State), Effects, {SendAcc, LogAcc});
-checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, {SendAcc0, LogAcc}) ->
+checkout0({success, ConsumerId, MsgId, Msg, State}, Effects,
+ {SendAcc0, LogAcc}) ->
DelMsg = {MsgId, Msg},
SendAcc = maps:update_with(ConsumerId,
fun (M) -> [DelMsg | M] end,
@@ -1326,10 +1342,12 @@ checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, {SendAcc0, LogAcc})
checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) ->
Effects1 = case Activity of
nochange ->
- append_send_msg_effects(append_log_effects(Effects0, LogAcc), SendAcc);
+ append_send_msg_effects(
+ append_log_effects(Effects0, LogAcc), SendAcc);
inactive ->
[{aux, inactive}
- | append_send_msg_effects(append_log_effects(Effects0, LogAcc), SendAcc)]
+ | append_send_msg_effects(
+ append_log_effects(Effects0, LogAcc), SendAcc)]
end,
{State0, ok, lists:reverse(Effects1)}.
@@ -1441,7 +1459,7 @@ send_log_effect({CTag, CPid}, IdxMsgs) ->
{RaftIdxs, Data} = lists:unzip(IdxMsgs),
{log, RaftIdxs,
fun(Log) ->
- Msgs = lists:zipwith(fun({enqueue, _, _, Msg}, {MsgId, Header}) ->
+ Msgs = lists:zipwith(fun ({enqueue, _, _, Msg}, {MsgId, Header}) ->
{MsgId, {Header, Msg}}
end, Log, Data),
[{send_msg, CPid, {delivery, CTag, Msgs}, [local, ra_event]}]
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 64a2dc4205..0fd76dd0fb 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -107,6 +107,7 @@ all_tests() ->
publish_and_restart,
subscribe_should_fail_when_global_qos_true,
dead_letter_to_classic_queue,
+ dead_letter_with_memory_limit,
dead_letter_to_quorum_queue,
dead_letter_from_classic_to_quorum_queue,
dead_letter_policy,
@@ -774,6 +775,21 @@ dead_letter_to_classic_queue(Config) ->
?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])),
test_dead_lettering(true, Config, Ch, Servers, ra_name(QQ), QQ, CQ).
+dead_letter_with_memory_limit(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ CQ = <<"classic-dead_letter_with_memory_limit">>,
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-in-memory-length">>, long, 0},
+ {<<"x-dead-letter-exchange">>, longstr, <<>>},
+ {<<"x-dead-letter-routing-key">>, longstr, CQ}
+ ])),
+ ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])),
+ test_dead_lettering(true, Config, Ch, Servers, ra_name(QQ), QQ, CQ).
+
test_dead_lettering(PolicySet, Config, Ch, Servers, RaName, Source, Destination) ->
publish(Ch, Source),
wait_for_messages_ready(Servers, RaName, 1),