diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-02-28 15:33:10 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-28 15:33:10 +0100 |
commit | 7aa77b2f0b832fd1fd4fa08f8e0ef86a7009e1fe (patch) | |
tree | f3035422a09194a91ba49512fed85c15463fae66 | |
parent | eb061384fe508a8acaa39b75d32afd7c5a8ecee2 (diff) | |
parent | 877c1df3098af717f043e2fc8db32aae05257873 (diff) | |
download | rabbitmq-server-git-7aa77b2f0b832fd1fd4fa08f8e0ef86a7009e1fe.tar.gz |
Merge pull request #2262 from rabbitmq/qq-dead-letter-bug
Fix QQ dead letter crash
-rw-r--r-- | src/rabbit_fifo.erl | 44 | ||||
-rw-r--r-- | test/quorum_queue_SUITE.erl | 16 |
2 files changed, 47 insertions, 13 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 72109cfbf4..d5fab879c2 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -1162,12 +1162,26 @@ dead_letter_effects(_Reason, _Discarded, dead_letter_effects(Reason, Discarded, #?MODULE{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}}, Effects) -> - DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}}, Acc) -> - [{Reason, Msg} | Acc]; - (_, _, Acc) -> - Acc - end, [], Discarded), - [{mod_call, Mod, Fun, Args ++ [DeadLetters]} | Effects]. + RaftIdxs = maps:fold( + fun (_, {_, {RaftIdx, {_Header, 'empty'}}}, Acc) -> + [RaftIdx | Acc]; + (_, _, Acc) -> + Acc + end, [], Discarded), + [{log, RaftIdxs, + fun (Log) -> + Lookup = maps:from_list(lists:zip(RaftIdxs, Log)), + DeadLetters = maps:fold( + fun (_, {_, {RaftIdx, {_Header, 'empty'}}}, Acc) -> + {enqueue, _, _, Msg} = maps:get(RaftIdx, Lookup), + [{Reason, Msg} | Acc]; + (_, {_, {_, {_Header, Msg}}}, Acc) -> + [{Reason, Msg} | Acc]; + (_, _, Acc) -> + Acc + end, [], Discarded), + [{mod_call, Mod, Fun, Args ++ [DeadLetters]}] + end} | Effects]. cancel_consumer_effects(ConsumerId, #?MODULE{cfg = #cfg{resource = QName}}, Effects) -> @@ -1235,7 +1249,8 @@ return_one(MsgId, 0, {Tag, Header0}, %% this should not affect the release cursor in any way Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)}, {Msg, State1} = case Tag of - '$empty_msg' -> {Msg0, State0}; + '$empty_msg' -> + {Msg0, State0}; _ -> case evaluate_memory_limit(Header, State0) of true -> {{'$empty_msg', Header}, State0}; @@ -1310,14 +1325,15 @@ checkout(#{index := Index}, State0, Effects0) -> {State, ok, Effects} end. -checkout0({success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State}, Effects, - {SendAcc, LogAcc0}) -> +checkout0({success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State}, + Effects, {SendAcc, LogAcc0}) -> DelMsg = {RaftIdx, {MsgId, Header}}, LogAcc = maps:update_with(ConsumerId, fun (M) -> [DelMsg | M] end, [DelMsg], LogAcc0), checkout0(checkout_one(State), Effects, {SendAcc, LogAcc}); -checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, {SendAcc0, LogAcc}) -> +checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, + {SendAcc0, LogAcc}) -> DelMsg = {MsgId, Msg}, SendAcc = maps:update_with(ConsumerId, fun (M) -> [DelMsg | M] end, @@ -1326,10 +1342,12 @@ checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, {SendAcc0, LogAcc}) checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) -> Effects1 = case Activity of nochange -> - append_send_msg_effects(append_log_effects(Effects0, LogAcc), SendAcc); + append_send_msg_effects( + append_log_effects(Effects0, LogAcc), SendAcc); inactive -> [{aux, inactive} - | append_send_msg_effects(append_log_effects(Effects0, LogAcc), SendAcc)] + | append_send_msg_effects( + append_log_effects(Effects0, LogAcc), SendAcc)] end, {State0, ok, lists:reverse(Effects1)}. @@ -1441,7 +1459,7 @@ send_log_effect({CTag, CPid}, IdxMsgs) -> {RaftIdxs, Data} = lists:unzip(IdxMsgs), {log, RaftIdxs, fun(Log) -> - Msgs = lists:zipwith(fun({enqueue, _, _, Msg}, {MsgId, Header}) -> + Msgs = lists:zipwith(fun ({enqueue, _, _, Msg}, {MsgId, Header}) -> {MsgId, {Header, Msg}} end, Log, Data), [{send_msg, CPid, {delivery, CTag, Msgs}, [local, ra_event]}] diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 64a2dc4205..0fd76dd0fb 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -107,6 +107,7 @@ all_tests() -> publish_and_restart, subscribe_should_fail_when_global_qos_true, dead_letter_to_classic_queue, + dead_letter_with_memory_limit, dead_letter_to_quorum_queue, dead_letter_from_classic_to_quorum_queue, dead_letter_policy, @@ -774,6 +775,21 @@ dead_letter_to_classic_queue(Config) -> ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])), test_dead_lettering(true, Config, Ch, Servers, ra_name(QQ), QQ, CQ). +dead_letter_with_memory_limit(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + CQ = <<"classic-dead_letter_with_memory_limit">>, + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-length">>, long, 0}, + {<<"x-dead-letter-exchange">>, longstr, <<>>}, + {<<"x-dead-letter-routing-key">>, longstr, CQ} + ])), + ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])), + test_dead_lettering(true, Config, Ch, Servers, ra_name(QQ), QQ, CQ). + test_dead_lettering(PolicySet, Config, Ch, Servers, RaName, Source, Destination) -> publish(Ch, Source), wait_for_messages_ready(Servers, RaName, 1), |