diff options
author | Philip Kuryloski <kuryloskip@vmware.com> | 2020-11-13 14:34:42 +0100 |
---|---|---|
committer | Philip Kuryloski <kuryloskip@vmware.com> | 2020-11-13 14:34:42 +0100 |
commit | a1fe3ab06111e4ab5108315f803f9a8718d7cd1b (patch) | |
tree | e807987d48d05587a2067f84a18d69f7aee514b5 /test/dead_lettering_SUITE.erl | |
parent | f4db1ad2720dea8f4adbf7f5d8b69ea44742728b (diff) | |
download | rabbitmq-server-git-a1fe3ab06111e4ab5108315f803f9a8718d7cd1b.tar.gz |
Change repo "root" to deps/rabbit
rabbit must not be the monorepo root application, as other applications depend on it
Diffstat (limited to 'test/dead_lettering_SUITE.erl')
-rw-r--r-- | test/dead_lettering_SUITE.erl | 1174 |
1 files changed, 0 insertions, 1174 deletions
diff --git a/test/dead_lettering_SUITE.erl b/test/dead_lettering_SUITE.erl deleted file mode 100644 index 4ee917aa21..0000000000 --- a/test/dead_lettering_SUITE.erl +++ /dev/null @@ -1,1174 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2011-2020 VMware, Inc. or its affiliates. All rights reserved. -%% -%% For the full spec see: https://www.rabbitmq.com/dlx.html -%% --module(dead_lettering_SUITE). - --include_lib("common_test/include/ct.hrl"). --include_lib("kernel/include/file.hrl"). --include_lib("amqp_client/include/amqp_client.hrl"). --include_lib("eunit/include/eunit.hrl"). - --compile(export_all). - --import(quorum_queue_utils, [wait_for_messages/2]). - -all() -> - [ - {group, dead_letter_tests} - ]. - -groups() -> - DeadLetterTests = [dead_letter_nack, - dead_letter_multiple_nack, - dead_letter_nack_requeue, - dead_letter_nack_requeue_multiple, - dead_letter_reject, - dead_letter_reject_requeue, - dead_letter_max_length_drop_head, - dead_letter_missing_exchange, - dead_letter_routing_key, - dead_letter_routing_key_header_CC, - dead_letter_routing_key_header_BCC, - dead_letter_routing_key_cycle_max_length, - dead_letter_routing_key_cycle_with_reject, - dead_letter_policy, - dead_letter_override_policy, - dead_letter_ignore_policy, - dead_letter_headers, - dead_letter_headers_reason_maxlen, - dead_letter_headers_cycle, - dead_letter_headers_BCC, - dead_letter_headers_CC, - dead_letter_headers_CC_with_routing_key, - dead_letter_headers_first_death], - Opts = [], - [ - {dead_letter_tests, [], - [ - {classic_queue, Opts, DeadLetterTests ++ [dead_letter_ttl, - dead_letter_max_length_reject_publish_dlx, - dead_letter_routing_key_cycle_ttl, - dead_letter_headers_reason_expired, - dead_letter_headers_reason_expired_per_message]}, - {mirrored_queue, Opts, DeadLetterTests ++ [dead_letter_ttl, - dead_letter_max_length_reject_publish_dlx, - dead_letter_routing_key_cycle_ttl, - dead_letter_headers_reason_expired, - dead_letter_headers_reason_expired_per_message]}, - {quorum_queue, Opts, DeadLetterTests} - ]} - ]. - -suite() -> - [ - {timetrap, {minutes, 8}} - ]. - -%% ------------------------------------------------------------------- -%% Testsuite setup/teardown. -%% ------------------------------------------------------------------- - -init_per_suite(Config) -> - rabbit_ct_helpers:log_environment(), - rabbit_ct_helpers:run_setup_steps(Config). - -end_per_suite(Config) -> - rabbit_ct_helpers:run_teardown_steps(Config). - -init_per_group(classic_queue, Config) -> - rabbit_ct_helpers:set_config( - Config, - [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, - {queue_durable, false}]); -init_per_group(quorum_queue, Config) -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of - ok -> - rabbit_ct_helpers:set_config( - Config, - [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, - {queue_durable, true}]); - Skip -> - Skip - end; -init_per_group(mirrored_queue, Config) -> - rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, - <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), - Config1 = rabbit_ct_helpers:set_config( - Config, [{is_mirrored, true}, - {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, - {queue_durable, false}]), - rabbit_ct_helpers:run_steps(Config1, []); -init_per_group(Group, Config) -> - case lists:member({group, Group}, all()) of - true -> - ClusterSize = 3, - Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodename_suffix, Group}, - {rmq_nodes_count, ClusterSize} - ]), - rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()); - false -> - rabbit_ct_helpers:run_steps(Config, []) - end. - -end_per_group(Group, Config) -> - case lists:member({group, Group}, all()) of - true -> - rabbit_ct_helpers:run_steps(Config, - rabbit_ct_client_helpers:teardown_steps() ++ - rabbit_ct_broker_helpers:teardown_steps()); - false -> - Config - end. - -init_per_testcase(Testcase, Config) -> - Group = proplists:get_value(name, ?config(tc_group_properties, Config)), - Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])), - Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])), - Policy = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_policy", [Group, Testcase])), - DLXExchange = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_dlx_exchange", - [Group, Testcase])), - Config1 = rabbit_ct_helpers:set_config(Config, [{dlx_exchange, DLXExchange}, - {queue_name, Q}, - {queue_name_dlx, Q2}, - {policy, Policy}]), - rabbit_ct_helpers:testcase_started(Config1, Testcase). - -end_per_testcase(Testcase, Config) -> - {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}), - amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_dlx, Config)}), - amqp_channel:call(Ch, #'exchange.delete'{exchange = ?config(dlx_exchange, Config)}), - _ = rabbit_ct_broker_helpers:clear_policy(Config, 0, ?config(policy, Config)), - rabbit_ct_helpers:testcase_finished(Config, Testcase). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% Dead letter exchanges -%% -%% Messages are dead-lettered when: -%% 1) message is rejected with basic.reject or basic.nack with requeue=false -%% 2) message ttl expires (not implemented in quorum queues) -%% 3) queue length limit is exceeded (only drop-head implemented in quorum queues) -%% -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -%% 1) message is rejected with basic.nack, requeue=false and multiple=false -dead_letter_nack(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - declare_dead_letter_queues(Ch, Config, QName, DLXQName), - - P1 = <<"msg1">>, - P2 = <<"msg2">>, - P3 = <<"msg3">>, - - %% Publish 3 messages - publish(Ch, QName, [P1, P2, P3]), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), - %% Consume them - [DTag1, DTag2, DTag3] = consume(Ch, QName, [P1, P2, P3]), - %% Nack the last one with multiple = false - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag3, - multiple = false, - requeue = false}), - wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), - %% Queue is empty - consume_empty(Ch, QName), - %% Consume the last message from the dead letter queue - consume(Ch, DLXQName, [P3]), - consume_empty(Ch, DLXQName), - %% Nack the other two - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, - multiple = false, - requeue = false}), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2, - multiple = false, - requeue = false}), - %% Queue is empty - consume_empty(Ch, QName), - %% Consume the first two messages from the dead letter queue - consume(Ch, DLXQName, [P1, P2]), - consume_empty(Ch, DLXQName). - -%% 1) message is rejected with basic.nack, requeue=false and multiple=true -dead_letter_multiple_nack(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - declare_dead_letter_queues(Ch, Config, QName, DLXQName), - - P1 = <<"msg1">>, - P2 = <<"msg2">>, - P3 = <<"msg3">>, - - %% Publish 3 messages - publish(Ch, QName, [P1, P2, P3]), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), - %% Consume them - [_, _, DTag3] = consume(Ch, QName, [P1, P2, P3]), - %% Nack the last one with multiple = true - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag3, - multiple = true, - requeue = false}), - wait_for_messages(Config, [[DLXQName, <<"3">>, <<"3">>, <<"0">>]]), - %% Consume the 3 messages from the dead letter queue - consume(Ch, DLXQName, [P1, P2, P3]), - consume_empty(Ch, DLXQName), - %% Queue is empty - consume_empty(Ch, QName). - -%% 1) message is rejected with basic.nack, requeue=true and multiple=false. Dead-lettering does not take place -dead_letter_nack_requeue(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - declare_dead_letter_queues(Ch, Config, QName, DLXQName), - - P1 = <<"msg1">>, - P2 = <<"msg2">>, - P3 = <<"msg3">>, - - %% Publish 3 messages - publish(Ch, QName, [P1, P2, P3]), - %% Consume them - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), - [_, _, DTag3] = consume(Ch, QName, [P1, P2, P3]), - %% Queue is empty - consume_empty(Ch, QName), - %% Nack the last one with multiple = false - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag3, - multiple = false, - requeue = true}), - %% Consume the last message from the queue - wait_for_messages(Config, [[QName, <<"3">>, <<"1">>, <<"2">>]]), - consume(Ch, QName, [P3]), - consume_empty(Ch, QName), - %% Dead letter queue is empty - consume_empty(Ch, DLXQName). - -%% 1) message is rejected with basic.nack, requeue=true and multiple=true. Dead-lettering does not take place -dead_letter_nack_requeue_multiple(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - declare_dead_letter_queues(Ch, Config, QName, DLXQName), - - P1 = <<"msg1">>, - P2 = <<"msg2">>, - P3 = <<"msg3">>, - - %% Publish 3 messages - publish(Ch, QName, [P1, P2, P3]), - %% Consume them - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), - [_, _, DTag3] = consume(Ch, QName, [P1, P2, P3]), - %% Queue is empty - consume_empty(Ch, QName), - %% Nack the last one with multiple = true - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag3, - multiple = true, - requeue = true}), - %% Consume the three messages from the queue - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), - consume(Ch, QName, [P1, P2, P3]), - consume_empty(Ch, QName), - %% Dead letter queue is empty - consume_empty(Ch, DLXQName). - -%% 1) message is rejected with basic.reject, requeue=false -dead_letter_reject(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - declare_dead_letter_queues(Ch, Config, QName, DLXQName), - - P1 = <<"msg1">>, - P2 = <<"msg2">>, - P3 = <<"msg3">>, - - %% Publish 3 messages - publish(Ch, QName, [P1, P2, P3]), - %% Consume the first message - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), - [DTag] = consume(Ch, QName, [P1]), - %% Reject it - amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag, - requeue = false}), - %% Consume it from the dead letter queue - wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), - _ = consume(Ch, DLXQName, [P1]), - consume_empty(Ch, DLXQName), - %% Consume the last two from the queue - _ = consume(Ch, QName, [P2, P3]), - consume_empty(Ch, QName). - -%% 1) Message is rejected with basic.reject, requeue=true. Dead-lettering does not take place. -dead_letter_reject_requeue(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - declare_dead_letter_queues(Ch, Config, QName, DLXQName), - - P1 = <<"msg1">>, - P2 = <<"msg2">>, - P3 = <<"msg3">>, - - %% Publish 3 messages - publish(Ch, QName, [P1, P2, P3]), - %% Consume the first one - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), - [DTag] = consume(Ch, QName, [P1]), - %% Reject the first one - amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag, - requeue = true}), - %% Consume the three messages from the queue - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), - _ = consume(Ch, QName, [P1, P2, P3]), - consume_empty(Ch, QName), - %% Dead letter is empty - consume_empty(Ch, DLXQName). - -%% 2) Message ttl expires -dead_letter_ttl(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - declare_dead_letter_queues(Ch, Config, QName, DLXQName, [{<<"x-message-ttl">>, long, 1}]), - - %% Publish message - P1 = <<"msg1">>, - publish(Ch, QName, [P1]), - wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), - consume_empty(Ch, QName), - [_] = consume(Ch, DLXQName, [P1]). - -%% 3) The queue length limit is exceeded, message dropped is dead lettered. -%% Default strategy: drop-head -dead_letter_max_length_drop_head(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - - declare_dead_letter_queues(Ch, Config, QName, DLXQName, [{<<"x-max-length">>, long, 1}]), - - P1 = <<"msg1">>, - P2 = <<"msg2">>, - P3 = <<"msg3">>, - - %% Publish 3 messages - publish(Ch, QName, [P1, P2, P3]), - %% Consume the last one from the queue (max-length = 1) - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - _ = consume(Ch, QName, [P3]), - consume_empty(Ch, QName), - %% Consume the dropped ones from the dead letter queue - wait_for_messages(Config, [[DLXQName, <<"2">>, <<"2">>, <<"0">>]]), - _ = consume(Ch, DLXQName, [P1, P2]), - consume_empty(Ch, DLXQName). - -%% Another strategy: reject-publish-dlx -dead_letter_max_length_reject_publish_dlx(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - - declare_dead_letter_queues(Ch, Config, QName, DLXQName, - [{<<"x-max-length">>, long, 1}, - {<<"x-overflow">>, longstr, <<"reject-publish-dlx">>}]), - - P1 = <<"msg1">>, - P2 = <<"msg2">>, - P3 = <<"msg3">>, - - %% Publish 3 messages - publish(Ch, QName, [P1, P2, P3]), - %% Consume the first one from the queue (max-length = 1) - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - _ = consume(Ch, QName, [P1]), - consume_empty(Ch, QName), - %% Consume the dropped ones from the dead letter queue - wait_for_messages(Config, [[DLXQName, <<"2">>, <<"2">>, <<"0">>]]), - _ = consume(Ch, DLXQName, [P2, P3]), - consume_empty(Ch, DLXQName). - -%% Dead letter exchange does not have to be declared when the queue is declared, but it should -%% exist by the time messages need to be dead-lettered; if it is missing then, the messages will -%% be silently dropped. -dead_letter_missing_exchange(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - Args = ?config(queue_args, Config), - Durable = ?config(queue_durable, Config), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - DLXExchange = <<"dlx-exchange-2">>, - #'exchange.delete_ok'{} = amqp_channel:call(Ch, #'exchange.delete'{exchange = DLXExchange}), - - DeadLetterArgs = [{<<"x-max-length">>, long, 1}, - {<<"x-dead-letter-exchange">>, longstr, DLXExchange}, - {<<"x-dead-letter-routing-key">>, longstr, DLXQName}], - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}), - - P1 = <<"msg1">>, - P2 = <<"msg2">>, - - %% Publish one message - publish(Ch, QName, [P1]), - %% Consume it - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - [DTag] = consume(Ch, QName, [P1]), - %% Reject it - amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag, - requeue = false}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), - %% Message is not in the dead letter queue (exchange does not exist) - consume_empty(Ch, DLXQName), - - %% Declare the dead-letter exchange - #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}), - #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName, - exchange = DLXExchange, - routing_key = DLXQName}), - - %% Publish another message - publish(Ch, QName, [P2]), - %% Consume it - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - [DTag2] = consume(Ch, QName, [P2]), - %% Reject it - amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag2, - requeue = false}), - %% Consume the rejected message from the dead letter queue - wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), - {#'basic.get_ok'{}, #amqp_msg{payload = P2}} = - amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), - consume_empty(Ch, DLXQName). - -%% -%% ROUTING -%% -%% Dead-lettered messages are routed to their dead letter exchange either: -%% with the routing key specified for the queue they were on; or, -%% if this was not set, (3) with the same routing keys they were originally published with. -%% (4) This includes routing keys added by the CC and BCC headers. -%% -%% 3) All previous tests used a specific key, test the original ones now. -dead_letter_routing_key(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - Args = ?config(queue_args, Config), - Durable = ?config(queue_durable, Config), - DLXExchange = ?config(dlx_exchange, Config), - - %% Do not use a specific key - DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange}], - #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}), - - P1 = <<"msg1">>, - P2 = <<"msg2">>, - - %% Publish, consume and nack the first message - publish(Ch, QName, [P1]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - [DTag1] = consume(Ch, QName, [P1]), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, - multiple = false, - requeue = false}), - %% Both queues are empty as the message could not been routed in the dlx exchange - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), - consume_empty(Ch, QName), - consume_empty(Ch, DLXQName), - %% Bind the dlx queue with the original queue routing key - #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName, - exchange = DLXExchange, - routing_key = QName}), - %% Publish, consume and nack the second message - publish(Ch, QName, [P2]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - [DTag2] = consume(Ch, QName, [P2]), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2, - multiple = false, - requeue = false}), - %% Message can now be routed using the recently binded key - wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), - consume(Ch, DLXQName, [P2]), - consume_empty(Ch, QName). - - -%% 4a) If a specific routing key was not set for the queue, use routing keys added by the -%% CC and BCC headers -dead_letter_routing_key_header_CC(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - Args = ?config(queue_args, Config), - Durable = ?config(queue_durable, Config), - DLXExchange = ?config(dlx_exchange, Config), - - %% Do not use a specific key - DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange}], - #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}), - #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName, - exchange = DLXExchange, - routing_key = DLXQName}), - - P1 = <<"msg1">>, - P2 = <<"msg2">>, - CCHeader = {<<"CC">>, array, [{longstr, DLXQName}]}, - - %% Publish, consume and nack two messages, one with CC header - publish(Ch, QName, [P1]), - publish(Ch, QName, [P2], [CCHeader]), - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), - [_, DTag2] = consume(Ch, QName, [P1, P2]), - %% P2 is also published to the DLX queue because of the binding to the default exchange - [_] = consume(Ch, DLXQName, [P2]), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2, - multiple = true, - requeue = false}), - %% The second message should have been routed using the CC header - wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]), - consume_empty(Ch, QName), - consume(Ch, DLXQName, [P2]), - consume_empty(Ch, DLXQName). - -%% 4b) If a specific routing key was not set for the queue, use routing keys added by the -%% CC and BCC headers -dead_letter_routing_key_header_BCC(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - Args = ?config(queue_args, Config), - Durable = ?config(queue_durable, Config), - DLXExchange = ?config(dlx_exchange, Config), - - %% Do not use a specific key - DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange}], - #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}), - #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName, - exchange = DLXExchange, - routing_key = DLXQName}), - - P1 = <<"msg1">>, - P2 = <<"msg2">>, - BCCHeader = {<<"BCC">>, array, [{longstr, DLXQName}]}, - - %% Publish, consume and nack two messages, one with BCC header - publish(Ch, QName, [P1]), - publish(Ch, QName, [P2], [BCCHeader]), - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), - [_, DTag2] = consume(Ch, QName, [P1, P2]), - %% P2 is also published to the DLX queue because of the binding to the default exchange - [_] = consume(Ch, DLXQName, [P2]), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2, - multiple = true, - requeue = false}), - %% The second message should have been routed using the BCC header - wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]), - consume_empty(Ch, QName), - consume(Ch, DLXQName, [P2]), - consume_empty(Ch, DLXQName). - -%% It is possible to form a cycle of message dead-lettering. For instance, -%% this can happen when a queue dead-letters messages to the default exchange without -%% specifying a dead-letter routing key (5). Messages in such cycles (i.e. messages that -%% reach the same queue twice) will be dropped if there was no rejections in the entire cycle. -%% i.e. x-message-ttl (7), x-max-length (6) -%% -%% 6) Message is dead lettered due to queue length limit, and then dropped by the broker as it is -%% republished to the same queue. -dead_letter_routing_key_cycle_max_length(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - Args = ?config(queue_args, Config), - Durable = ?config(queue_durable, Config), - QName = ?config(queue_name, Config), - - DeadLetterArgs = [{<<"x-max-length">>, long, 1}, - {<<"x-dead-letter-exchange">>, longstr, <<>>}], - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), - - P1 = <<"msg1">>, - P2 = <<"msg2">>, - - %% Publish messages, consume and acknowledge the second one (x-max-length = 1) - publish(Ch, QName, [P1, P2]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - [DTag] = consume(Ch, QName, [P2]), - consume_empty(Ch, QName), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag}), - %% Queue is empty, P1 has not been republished in a loop - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), - consume_empty(Ch, QName). - -%% 7) Message is dead lettered due to message ttl. Not yet implemented in quorum queues -dead_letter_routing_key_cycle_ttl(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - Args = ?config(queue_args, Config), - Durable = ?config(queue_durable, Config), - QName = ?config(queue_name, Config), - - DeadLetterArgs = [{<<"x-message-ttl">>, long, 1}, - {<<"x-dead-letter-exchange">>, longstr, <<>>}], - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), - - P1 = <<"msg1">>, - P2 = <<"msg2">>, - - %% Publish messages - publish(Ch, QName, [P1, P2]), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), - consume_empty(Ch, QName). - -%% 5) Messages continue to be republished as there are manual rejections -dead_letter_routing_key_cycle_with_reject(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - Args = ?config(queue_args, Config), - Durable = ?config(queue_durable, Config), - QName = ?config(queue_name, Config), - - DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, <<>>}], - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), - - P = <<"msg1">>, - - %% Publish message - publish(Ch, QName, [P]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - [DTag] = consume(Ch, QName, [P]), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag, - multiple = false, - requeue = false}), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - [DTag1] = consume(Ch, QName, [P]), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, - multiple = false, - requeue = false}), - %% Message its being republished - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - [_] = consume(Ch, QName, [P]). - -%% -%% For any given queue, a DLX can be defined by clients using the queue's arguments, -%% or in the server using policies (8). In the case where both policy and arguments specify a DLX, -%% the one specified in arguments overrules the one specified in policy (9). -%% -%% 8) Use server policies -dead_letter_policy(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - Args = ?config(queue_args, Config), - Durable = ?config(queue_durable, Config), - DLXExchange = ?config(dlx_exchange, Config), - - %% Do not use arguments - #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = Args, - durable = Durable}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, - durable = Durable}), - #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName, - exchange = DLXExchange, - routing_key = DLXQName}), - - P1 = <<"msg1">>, - P2 = <<"msg2">>, - - %% Publish 2 messages - publish(Ch, QName, [P1, P2]), - %% Consume them - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), - [DTag1, DTag2] = consume(Ch, QName, [P1, P2]), - %% Nack the first one with multiple = false - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, - multiple = false, - requeue = false}), - %% Only one message unack left in the queue - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), - consume_empty(Ch, QName), - consume_empty(Ch, DLXQName), - - %% Set a policy - ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?config(policy, Config), QName, - <<"queues">>, - [{<<"dead-letter-exchange">>, DLXExchange}, - {<<"dead-letter-routing-key">>, DLXQName}]), - timer:sleep(1000), - %% Nack the second message - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2, - multiple = false, - requeue = false}), - %% Queue is empty - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), - consume_empty(Ch, QName), - %% Consume the message from the dead letter queue - consume(Ch, DLXQName, [P2]), - consume_empty(Ch, DLXQName). - -%% 9) Argument overrides server policy -dead_letter_override_policy(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - - %% Set a policy, it creates a cycle but message will be republished with the nack. - %% Good enough for this test. - ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?config(policy, Config), QName, - <<"queues">>, - [{<<"dead-letter-exchange">>, <<>>}, - {<<"dead-letter-routing-key">>, QName}]), - - %% Declare arguments override the policy and set routing queue - declare_dead_letter_queues(Ch, Config, QName, DLXQName), - - P1 = <<"msg1">>, - - publish(Ch, QName, [P1]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - [DTag1] = consume(Ch, QName, [P1]), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, - multiple = false, - requeue = false}), - %% Queue is empty - wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), - consume_empty(Ch, QName), - [_] = consume(Ch, DLXQName, [P1]). - -%% 9) Policy is set after have declared a queue with dead letter arguments. Policy will be -%% overridden/ignored. -dead_letter_ignore_policy(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - - declare_dead_letter_queues(Ch, Config, QName, DLXQName), - - %% Set a policy - ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?config(policy, Config), QName, - <<"queues">>, - [{<<"dead-letter-exchange">>, <<>>}, - {<<"dead-letter-routing-key">>, QName}]), - - P1 = <<"msg1">>, - - publish(Ch, QName, [P1]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - [DTag1] = consume(Ch, QName, [P1]), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, - multiple = false, - requeue = false}), - %% Message is in the dead letter queue, original queue is empty - wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), - [_] = consume(Ch, DLXQName, [P1]), - consume_empty(Ch, QName). - -%% -%% HEADERS -%% -%% The dead-lettering process adds an array to the header of each dead-lettered message named -%% x-death (10). This array contains an entry for each dead lettering event containing: -%% queue, reason, time, exchange, routing-keys, count -%% original-expiration (14) (if the message was dead-letterered due to per-message TTL) -%% New entries are prepended to the beginning of the x-death array. -%% Reason is one of the following: rejected (11), expired (12), maxlen (13) -%% -%% 10) and 11) Check all x-death headers, reason rejected -dead_letter_headers(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - declare_dead_letter_queues(Ch, Config, QName, DLXQName), - - %% Publish and nack a message - P1 = <<"msg1">>, - publish(Ch, QName, [P1]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - [DTag1] = consume(Ch, QName, [P1]), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, - multiple = false, - requeue = false}), - %% Consume and check headers - wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), - {#'basic.get_ok'{}, #amqp_msg{payload = P1, - props = #'P_basic'{headers = Headers}}} = - amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), - {array, [{table, Death}]} = rabbit_misc:table_lookup(Headers, <<"x-death">>), - ?assertEqual({longstr, QName}, rabbit_misc:table_lookup(Death, <<"queue">>)), - ?assertEqual({longstr, <<"rejected">>}, rabbit_misc:table_lookup(Death, <<"reason">>)), - ?assertMatch({timestamp, _}, rabbit_misc:table_lookup(Death, <<"time">>)), - ?assertEqual({longstr, <<>>}, rabbit_misc:table_lookup(Death, <<"exchange">>)), - ?assertEqual({long, 1}, rabbit_misc:table_lookup(Death, <<"count">>)), - ?assertEqual({array, [{longstr, QName}]}, rabbit_misc:table_lookup(Death, <<"routing-keys">>)). - -%% 12) Per-queue message ttl has expired -dead_letter_headers_reason_expired(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - declare_dead_letter_queues(Ch, Config, QName, DLXQName, [{<<"x-message-ttl">>, long, 1}]), - - %% Publish a message - P1 = <<"msg1">>, - publish(Ch, QName, [P1]), - %% Consume and check headers - wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), - {#'basic.get_ok'{}, #amqp_msg{payload = P1, - props = #'P_basic'{headers = Headers}}} = - amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), - {array, [{table, Death}]} = rabbit_misc:table_lookup(Headers, <<"x-death">>), - ?assertEqual({longstr, <<"expired">>}, rabbit_misc:table_lookup(Death, <<"reason">>)), - ?assertMatch(undefined, rabbit_misc:table_lookup(Death, <<"original-expiration">>)). - -%% 14) Per-message TTL has expired, original-expiration is added to x-death array -dead_letter_headers_reason_expired_per_message(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - declare_dead_letter_queues(Ch, Config, QName, DLXQName), - - %% Publish a message - P1 = <<"msg1">>, - amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, - #amqp_msg{payload = P1, - props = #'P_basic'{expiration = <<"1">>}}), - %% publish another message to ensure the queue performs message expirations - publish(Ch, QName, [<<"msg2">>]), - %% Consume and check headers - wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), - {#'basic.get_ok'{}, #amqp_msg{payload = P1, - props = #'P_basic'{headers = Headers}}} = - amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), - {array, [{table, Death}]} = rabbit_misc:table_lookup(Headers, <<"x-death">>), - ?assertEqual({longstr, <<"expired">>}, rabbit_misc:table_lookup(Death, <<"reason">>)), - ?assertMatch({longstr, <<"1">>}, rabbit_misc:table_lookup(Death, <<"original-expiration">>)). - -%% 13) Message expired with maxlen reason -dead_letter_headers_reason_maxlen(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - declare_dead_letter_queues(Ch, Config, QName, DLXQName, [{<<"x-max-length">>, long, 1}]), - - P1 = <<"msg1">>, - P2 = <<"msg2">>, - publish(Ch, QName, [P1, P2]), - %% Consume and check reason header - wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), - {#'basic.get_ok'{}, #amqp_msg{payload = P1, - props = #'P_basic'{headers = Headers}}} = - amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), - {array, [{table, Death}]} = rabbit_misc:table_lookup(Headers, <<"x-death">>), - ?assertEqual({longstr, <<"maxlen">>}, rabbit_misc:table_lookup(Death, <<"reason">>)). - -%% In case x-death already contains an entry with the same queue and dead lettering reason, -%% its count field will be incremented and it will be moved to the beginning of the array -dead_letter_headers_cycle(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - Args = ?config(queue_args, Config), - Durable = ?config(queue_durable, Config), - QName = ?config(queue_name, Config), - - DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, <<>>}], - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), - - P = <<"msg1">>, - - %% Publish message - publish(Ch, QName, [P]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - [DTag] = consume(Ch, QName, [P]), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag, - multiple = false, - requeue = false}), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - {#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P, - props = #'P_basic'{headers = Headers1}}} = - amqp_channel:call(Ch, #'basic.get'{queue = QName}), - {array, [{table, Death1}]} = rabbit_misc:table_lookup(Headers1, <<"x-death">>), - ?assertEqual({long, 1}, rabbit_misc:table_lookup(Death1, <<"count">>)), - - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, - multiple = false, - requeue = false}), - %% Message its being republished - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - {#'basic.get_ok'{}, #amqp_msg{payload = P, - props = #'P_basic'{headers = Headers2}}} = - amqp_channel:call(Ch, #'basic.get'{queue = QName}), - {array, [{table, Death2}]} = rabbit_misc:table_lookup(Headers2, <<"x-death">>), - ?assertEqual({long, 2}, rabbit_misc:table_lookup(Death2, <<"count">>)). - -%% Dead-lettering a message modifies its headers: -%% the exchange name is replaced with that of the latest dead-letter exchange, -%% the routing key may be replaced with that specified in a queue performing dead lettering, -%% if the above happens, the CC header will also be removed (15) and -%% the BCC header will be removed as per Sender-selected distribution (16) -%% -%% CC header is kept if no dead lettering routing key is provided -dead_letter_headers_CC(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - Args = ?config(queue_args, Config), - Durable = ?config(queue_durable, Config), - DLXExchange = ?config(dlx_exchange, Config), - - %% Do not use a specific key for dead lettering, the CC header is passed - DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange}], - #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}), - #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName, - exchange = DLXExchange, - routing_key = DLXQName}), - - P1 = <<"msg1">>, - CCHeader = {<<"CC">>, array, [{longstr, DLXQName}]}, - publish(Ch, QName, [P1], [CCHeader]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - %% Message is published to both queues because of CC header and DLX queue bound to both - %% exchanges - {#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P1, - props = #'P_basic'{headers = Headers1}}} = - amqp_channel:call(Ch, #'basic.get'{queue = QName}), - {#'basic.get_ok'{}, #amqp_msg{payload = P1, - props = #'P_basic'{headers = Headers2}}} = - amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), - %% We check the headers to ensure no dead lettering has happened - ?assertEqual(undefined, rabbit_misc:table_lookup(Headers1, <<"x-death">>)), - ?assertEqual(undefined, rabbit_misc:table_lookup(Headers2, <<"x-death">>)), - - %% Nack the message so it now gets dead lettered - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, - multiple = false, - requeue = false}), - wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]), - {#'basic.get_ok'{}, #amqp_msg{payload = P1, - props = #'P_basic'{headers = Headers3}}} = - amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), - consume_empty(Ch, QName), - ?assertEqual({array, [{longstr, DLXQName}]}, rabbit_misc:table_lookup(Headers3, <<"CC">>)), - ?assertMatch({array, _}, rabbit_misc:table_lookup(Headers3, <<"x-death">>)). - -%% 15) CC header is removed when routing key is specified -dead_letter_headers_CC_with_routing_key(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - Args = ?config(queue_args, Config), - Durable = ?config(queue_durable, Config), - DLXExchange = ?config(dlx_exchange, Config), - - %% Do not use a specific key for dead lettering, the CC header is passed - DeadLetterArgs = [{<<"x-dead-letter-routing-key">>, longstr, DLXQName}, - {<<"x-dead-letter-exchange">>, longstr, DLXExchange}], - #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}), - #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName, - exchange = DLXExchange, - routing_key = DLXQName}), - - P1 = <<"msg1">>, - CCHeader = {<<"CC">>, array, [{longstr, DLXQName}]}, - publish(Ch, QName, [P1], [CCHeader]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - %% Message is published to both queues because of CC header and DLX queue bound to both - %% exchanges - {#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P1, - props = #'P_basic'{headers = Headers1}}} = - amqp_channel:call(Ch, #'basic.get'{queue = QName}), - {#'basic.get_ok'{}, #amqp_msg{payload = P1, - props = #'P_basic'{headers = Headers2}}} = - amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), - %% We check the headers to ensure no dead lettering has happened - ?assertEqual(undefined, rabbit_misc:table_lookup(Headers1, <<"x-death">>)), - ?assertEqual(undefined, rabbit_misc:table_lookup(Headers2, <<"x-death">>)), - - %% Nack the message so it now gets dead lettered - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, - multiple = false, - requeue = false}), - wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]), - {#'basic.get_ok'{}, #amqp_msg{payload = P1, - props = #'P_basic'{headers = Headers3}}} = - amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), - consume_empty(Ch, QName), - ?assertEqual(undefined, rabbit_misc:table_lookup(Headers3, <<"CC">>)), - ?assertMatch({array, _}, rabbit_misc:table_lookup(Headers3, <<"x-death">>)). - -%% 16) the BCC header will always be removed -dead_letter_headers_BCC(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - Args = ?config(queue_args, Config), - Durable = ?config(queue_durable, Config), - DLXExchange = ?config(dlx_exchange, Config), - - %% Do not use a specific key for dead lettering - DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange}], - #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}), - #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName, - exchange = DLXExchange, - routing_key = DLXQName}), - - P1 = <<"msg1">>, - BCCHeader = {<<"BCC">>, array, [{longstr, DLXQName}]}, - publish(Ch, QName, [P1], [BCCHeader]), - %% Message is published to both queues because of BCC header and DLX queue bound to both - %% exchanges - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - {#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P1, - props = #'P_basic'{headers = Headers1}}} = - amqp_channel:call(Ch, #'basic.get'{queue = QName}), - {#'basic.get_ok'{}, #amqp_msg{payload = P1, - props = #'P_basic'{headers = Headers2}}} = - amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), - %% We check the headers to ensure no dead lettering has happened - ?assertEqual(undefined, rabbit_misc:table_lookup(Headers1, <<"x-death">>)), - ?assertEqual(undefined, rabbit_misc:table_lookup(Headers2, <<"x-death">>)), - - %% Nack the message so it now gets dead lettered - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, - multiple = false, - requeue = false}), - wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]), - {#'basic.get_ok'{}, #amqp_msg{payload = P1, - props = #'P_basic'{headers = Headers3}}} = - amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), - consume_empty(Ch, QName), - ?assertEqual(undefined, rabbit_misc:table_lookup(Headers3, <<"BCC">>)), - ?assertMatch({array, _}, rabbit_misc:table_lookup(Headers3, <<"x-death">>)). - - -%% Three top-level headers are added for the very first dead-lettering event. -%% They are -%% x-first-death-reason, x-first-death-queue, x-first-death-exchange -%% They have the same values as the reason, queue, and exchange fields of the -%% original -%% dead lettering event. Once added, these headers are never modified. -dead_letter_headers_first_death(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = ?config(queue_name, Config), - DLXQName = ?config(queue_name_dlx, Config), - Args = ?config(queue_args, Config), - Durable = ?config(queue_durable, Config), - DLXExchange = ?config(dlx_exchange, Config), - - %% Let's create a small dead-lettering loop QName -> DLXQName -> QName - DeadLetterArgs = [{<<"x-dead-letter-routing-key">>, longstr, DLXQName}, - {<<"x-dead-letter-exchange">>, longstr, DLXExchange}], - DLXDeadLetterArgs = [{<<"x-dead-letter-routing-key">>, longstr, QName}, - {<<"x-dead-letter-exchange">>, longstr, <<>>}], - #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable, arguments = DLXDeadLetterArgs}), - #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName, - exchange = DLXExchange, - routing_key = DLXQName}), - - - %% Publish and nack a message - P1 = <<"msg1">>, - publish(Ch, QName, [P1]), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - [DTag1] = consume(Ch, QName, [P1]), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, - multiple = false, - requeue = false}), - %% Consume and check headers - wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), - {#'basic.get_ok'{delivery_tag = DTag2}, #amqp_msg{payload = P1, - props = #'P_basic'{headers = Headers}}} = - amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), - ?assertEqual({longstr, <<"rejected">>}, - rabbit_misc:table_lookup(Headers, <<"x-first-death-reason">>)), - ?assertEqual({longstr, QName}, - rabbit_misc:table_lookup(Headers, <<"x-first-death-queue">>)), - ?assertEqual({longstr, <<>>}, - rabbit_misc:table_lookup(Headers, <<"x-first-death-exchange">>)), - %% Nack the message again so it gets dead lettered to the initial queue. x-first-death - %% headers should not change - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2, - multiple = false, - requeue = false}), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - {#'basic.get_ok'{}, #amqp_msg{payload = P1, - props = #'P_basic'{headers = Headers2}}} = - amqp_channel:call(Ch, #'basic.get'{queue = QName}), - ?assertEqual({longstr, <<"rejected">>}, - rabbit_misc:table_lookup(Headers2, <<"x-first-death-reason">>)), - ?assertEqual({longstr, QName}, - rabbit_misc:table_lookup(Headers2, <<"x-first-death-queue">>)), - ?assertEqual({longstr, <<>>}, - rabbit_misc:table_lookup(Headers2, <<"x-first-death-exchange">>)). - -%%%%%%%%%%%%%%%%%%%%%%%% -%% Test helpers -%%%%%%%%%%%%%%%%%%%%%%%% -declare_dead_letter_queues(Ch, Config, QName, DLXQName) -> - declare_dead_letter_queues(Ch, Config, QName, DLXQName, []). - -declare_dead_letter_queues(Ch, Config, QName, DLXQName, ExtraArgs) -> - Args = ?config(queue_args, Config), - Durable = ?config(queue_durable, Config), - DLXExchange = ?config(dlx_exchange, Config), - - %% Declare DLX exchange - #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}), - - %% Declare queue - DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange}, - {<<"x-dead-letter-routing-key">>, longstr, DLXQName}], - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args ++ ExtraArgs, durable = Durable}), - - %% Declare and bind DLX queue - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}), - #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName, - exchange = DLXExchange, - routing_key = DLXQName}). - -publish(Ch, QName, Payloads) -> - [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload}) - || Payload <- Payloads]. - -publish(Ch, QName, Payloads, Headers) -> - [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, - #amqp_msg{payload = Payload, - props = #'P_basic'{headers = Headers}}) - || Payload <- Payloads]. - -consume(Ch, QName, Payloads) -> - [begin - {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} = - amqp_channel:call(Ch, #'basic.get'{queue = QName}), - DTag - end || Payload <- Payloads]. - -consume_empty(Ch, QName) -> - #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). - -sync_mirrors(QName, Config) -> - case ?config(is_mirrored, Config) of - true -> - rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, [<<"sync_queue">>, QName]); - _ -> ok - end. |