summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <hairyhum@gmail.com>2019-04-25 13:10:04 -0400
committerDaniil Fedotov <hairyhum@gmail.com>2019-04-25 13:10:04 -0400
commitc8d86008b4c6a9a911d9a8bae6421f555e457bd6 (patch)
tree80a42c075d39eb0a73ea63f957faaf6a1d72a3df
parent0d67fc1007f38a49b8e561bbd4db4ffba1666bd1 (diff)
downloadrabbitmq-server-git-c8d86008b4c6a9a911d9a8bae6421f555e457bd6.tar.gz
Reset overflow queue property to drop-head if policy is unset.
If `overflow` policy is unset the actual queue property should reset to it's default value. Fixes #1980
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--test/confirms_rejects_SUITE.erl106
2 files changed, 109 insertions, 3 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9149df0015..b3f89b7ef0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -474,8 +474,12 @@ init_max_bytes(MaxBytes, State) ->
{_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}),
State1.
-init_overflow(undefined, State) ->
+%% Reset overflow to default 'drop-head' value if it's undefined.
+init_overflow(undefined, #q{overflow = 'drop-head'} = State) ->
State;
+init_overflow(undefined, State) ->
+ {_Dropped, State1} = maybe_drop_head(State#q{overflow = 'drop-head'}),
+ State1;
init_overflow(Overflow, State) ->
OverflowVal = binary_to_existing_atom(Overflow, utf8),
case OverflowVal of
diff --git a/test/confirms_rejects_SUITE.erl b/test/confirms_rejects_SUITE.erl
index 3ac944f99b..722b5240fc 100644
--- a/test/confirms_rejects_SUITE.erl
+++ b/test/confirms_rejects_SUITE.erl
@@ -13,7 +13,8 @@ all() ->
groups() ->
[
{parallel_tests, [parallel], [
- confirms_rejects_conflict
+ confirms_rejects_conflict,
+ policy_resets_to_default
]}
].
@@ -40,7 +41,11 @@ end_per_group(_Group, Config) ->
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
-init_per_testcase(Testcase, Config) ->
+init_per_testcase(policy_resets_to_default = Testcase, Config) ->
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ rabbit_ct_helpers:testcase_started(
+ rabbit_ct_helpers:set_config(Config, [{conn, Conn}]), Testcase);
+init_per_testcase(confirms_rejects_conflict = Testcase, Config) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
@@ -48,6 +53,16 @@ init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:set_config(Config, [{conn, Conn}, {conn1, Conn1}]),
Testcase).
+end_per_testcase(policy_resets_to_default = Testcase, Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ amqp_channel:call(Ch, #'queue.delete'{queue = <<"policy_resets_to_default">>}),
+ rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
+
+ Conn = ?config(conn, Config),
+
+ rabbit_ct_client_helpers:close_connection(Conn),
+
+ rabbit_ct_helpers:testcase_finished(Config, Testcase);
end_per_testcase(confirms_rejects_conflict = Testcase, Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
amqp_channel:call(Ch, #'queue.delete'{queue = <<"confirms_rejects_conflict">>}),
@@ -61,6 +76,8 @@ end_per_testcase(confirms_rejects_conflict = Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+
confirms_rejects_conflict(Config) ->
Conn = ?config(conn, Config),
Conn1 = ?config(conn1, Config),
@@ -120,6 +137,91 @@ confirms_rejects_conflict(Config) ->
{error, E} -> error(E)
end.
+policy_resets_to_default(Config) ->
+ Conn = ?config(conn, Config),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ QueueName = <<"policy_resets_to_default">>,
+
+ amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+
+ amqp_channel:call(Ch, #'queue.declare'{queue = QueueName,
+ durable = true
+ }),
+ MaxLength = 2,
+ rabbit_ct_broker_helpers:set_policy(
+ Config, 0,
+ QueueName, QueueName, <<"queues">>,
+ [{<<"max-length">>, MaxLength}, {<<"overflow">>, <<"reject-publish">>}]),
+
+ timer:sleep(1000),
+
+ [amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName},
+ #amqp_msg{payload = <<"HI">>})
+ || _ <- lists:seq(1, MaxLength)],
+
+ assert_acks(MaxLength),
+
+ #'queue.declare_ok'{message_count = MaxLength} =
+ amqp_channel:call(Ch, #'queue.declare'{queue = QueueName,
+ durable = true}),
+
+ RejectedMessage = <<"HI-rejected">>,
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName},
+ #amqp_msg{payload = RejectedMessage}),
+
+ assert_nack(),
+
+ rabbit_ct_broker_helpers:set_policy(
+ Config, 0,
+ QueueName, QueueName, <<"queues">>,
+ [{<<"max-length">>, MaxLength}]),
+
+ NotRejectedMessage = <<"HI-not-rejected">>,
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName},
+ #amqp_msg{payload = NotRejectedMessage}),
+
+ assert_ack(),
+
+ #'queue.declare_ok'{message_count = MaxLength} =
+ amqp_channel:call(Ch, #'queue.declare'{queue = QueueName,
+ durable = true}),
+
+ Msgs = consume_all_messages(Ch, QueueName),
+ case {lists:member(RejectedMessage, Msgs), lists:member(NotRejectedMessage, Msgs)} of
+ {true, _} -> error({message_should_be_rejected, RejectedMessage});
+ {_, false} -> error({message_should_be_enqueued, NotRejectedMessage});
+ _ -> ok
+ end.
+
+consume_all_messages(Ch, QueueName) ->
+ consume_all_messages(Ch, QueueName, []).
+
+consume_all_messages(Ch, QueueName, Msgs) ->
+ case amqp_channel:call(Ch, #'basic.get'{queue = QueueName, no_ack = true}) of
+ {#'basic.get_ok'{}, #amqp_msg{payload = Msg}} ->
+ consume_all_messages(Ch, QueueName, [Msg | Msgs]);
+ #'basic.get_empty'{} -> Msgs
+ end.
+
+assert_ack() ->
+ receive {'basic.ack', _, _} -> ok
+ after 10000 -> error(timeout_waiting_for_ack)
+ end,
+ clean_acks_mailbox().
+
+assert_nack() ->
+ receive {'basic.nack', _, _, _} -> ok
+ after 10000 -> error(timeout_waiting_for_nack)
+ end,
+ clean_acks_mailbox().
+
+assert_acks(N) ->
+ receive {'basic.ack', N, _} -> ok
+ after 10000 -> error({timeout_waiting_for_ack, N})
+ end,
+ clean_acks_mailbox().
+
validate_acks_mailbox() ->
Result = validate_acks_mailbox({0, ok}),
clean_acks_mailbox(),