diff options
author | Daniil Fedotov <hairyhum@gmail.com> | 2019-04-25 13:10:04 -0400 |
---|---|---|
committer | Daniil Fedotov <hairyhum@gmail.com> | 2019-04-25 13:10:04 -0400 |
commit | c8d86008b4c6a9a911d9a8bae6421f555e457bd6 (patch) | |
tree | 80a42c075d39eb0a73ea63f957faaf6a1d72a3df | |
parent | 0d67fc1007f38a49b8e561bbd4db4ffba1666bd1 (diff) | |
download | rabbitmq-server-git-c8d86008b4c6a9a911d9a8bae6421f555e457bd6.tar.gz |
Reset overflow queue property to drop-head if policy is unset.
If `overflow` policy is unset the actual queue property should reset
to it's default value.
Fixes #1980
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
-rw-r--r-- | test/confirms_rejects_SUITE.erl | 106 |
2 files changed, 109 insertions, 3 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9149df0015..b3f89b7ef0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -474,8 +474,12 @@ init_max_bytes(MaxBytes, State) -> {_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}), State1. -init_overflow(undefined, State) -> +%% Reset overflow to default 'drop-head' value if it's undefined. +init_overflow(undefined, #q{overflow = 'drop-head'} = State) -> State; +init_overflow(undefined, State) -> + {_Dropped, State1} = maybe_drop_head(State#q{overflow = 'drop-head'}), + State1; init_overflow(Overflow, State) -> OverflowVal = binary_to_existing_atom(Overflow, utf8), case OverflowVal of diff --git a/test/confirms_rejects_SUITE.erl b/test/confirms_rejects_SUITE.erl index 3ac944f99b..722b5240fc 100644 --- a/test/confirms_rejects_SUITE.erl +++ b/test/confirms_rejects_SUITE.erl @@ -13,7 +13,8 @@ all() -> groups() -> [ {parallel_tests, [parallel], [ - confirms_rejects_conflict + confirms_rejects_conflict, + policy_resets_to_default ]} ]. @@ -40,7 +41,11 @@ end_per_group(_Group, Config) -> rabbit_ct_client_helpers:teardown_steps() ++ rabbit_ct_broker_helpers:teardown_steps()). -init_per_testcase(Testcase, Config) -> +init_per_testcase(policy_resets_to_default = Testcase, Config) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + rabbit_ct_helpers:testcase_started( + rabbit_ct_helpers:set_config(Config, [{conn, Conn}]), Testcase); +init_per_testcase(confirms_rejects_conflict = Testcase, Config) -> Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config), @@ -48,6 +53,16 @@ init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:set_config(Config, [{conn, Conn}, {conn1, Conn1}]), Testcase). +end_per_testcase(policy_resets_to_default = Testcase, Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"policy_resets_to_default">>}), + rabbit_ct_client_helpers:close_channels_and_connection(Config, 0), + + Conn = ?config(conn, Config), + + rabbit_ct_client_helpers:close_connection(Conn), + + rabbit_ct_helpers:testcase_finished(Config, Testcase); end_per_testcase(confirms_rejects_conflict = Testcase, Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), amqp_channel:call(Ch, #'queue.delete'{queue = <<"confirms_rejects_conflict">>}), @@ -61,6 +76,8 @@ end_per_testcase(confirms_rejects_conflict = Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). + + confirms_rejects_conflict(Config) -> Conn = ?config(conn, Config), Conn1 = ?config(conn1, Config), @@ -120,6 +137,91 @@ confirms_rejects_conflict(Config) -> {error, E} -> error(E) end. +policy_resets_to_default(Config) -> + Conn = ?config(conn, Config), + {ok, Ch} = amqp_connection:open_channel(Conn), + QueueName = <<"policy_resets_to_default">>, + + amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + + amqp_channel:call(Ch, #'queue.declare'{queue = QueueName, + durable = true + }), + MaxLength = 2, + rabbit_ct_broker_helpers:set_policy( + Config, 0, + QueueName, QueueName, <<"queues">>, + [{<<"max-length">>, MaxLength}, {<<"overflow">>, <<"reject-publish">>}]), + + timer:sleep(1000), + + [amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName}, + #amqp_msg{payload = <<"HI">>}) + || _ <- lists:seq(1, MaxLength)], + + assert_acks(MaxLength), + + #'queue.declare_ok'{message_count = MaxLength} = + amqp_channel:call(Ch, #'queue.declare'{queue = QueueName, + durable = true}), + + RejectedMessage = <<"HI-rejected">>, + amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName}, + #amqp_msg{payload = RejectedMessage}), + + assert_nack(), + + rabbit_ct_broker_helpers:set_policy( + Config, 0, + QueueName, QueueName, <<"queues">>, + [{<<"max-length">>, MaxLength}]), + + NotRejectedMessage = <<"HI-not-rejected">>, + amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName}, + #amqp_msg{payload = NotRejectedMessage}), + + assert_ack(), + + #'queue.declare_ok'{message_count = MaxLength} = + amqp_channel:call(Ch, #'queue.declare'{queue = QueueName, + durable = true}), + + Msgs = consume_all_messages(Ch, QueueName), + case {lists:member(RejectedMessage, Msgs), lists:member(NotRejectedMessage, Msgs)} of + {true, _} -> error({message_should_be_rejected, RejectedMessage}); + {_, false} -> error({message_should_be_enqueued, NotRejectedMessage}); + _ -> ok + end. + +consume_all_messages(Ch, QueueName) -> + consume_all_messages(Ch, QueueName, []). + +consume_all_messages(Ch, QueueName, Msgs) -> + case amqp_channel:call(Ch, #'basic.get'{queue = QueueName, no_ack = true}) of + {#'basic.get_ok'{}, #amqp_msg{payload = Msg}} -> + consume_all_messages(Ch, QueueName, [Msg | Msgs]); + #'basic.get_empty'{} -> Msgs + end. + +assert_ack() -> + receive {'basic.ack', _, _} -> ok + after 10000 -> error(timeout_waiting_for_ack) + end, + clean_acks_mailbox(). + +assert_nack() -> + receive {'basic.nack', _, _, _} -> ok + after 10000 -> error(timeout_waiting_for_nack) + end, + clean_acks_mailbox(). + +assert_acks(N) -> + receive {'basic.ack', N, _} -> ok + after 10000 -> error({timeout_waiting_for_ack, N}) + end, + clean_acks_mailbox(). + validate_acks_mailbox() -> Result = validate_acks_mailbox({0, ok}), clean_acks_mailbox(), |