summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-21 11:26:46 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-21 11:26:46 +0200
commitb41ed423cb4ac8205f33d268daf1e7f15c90ffae (patch)
tree6c32153d8b07929a10880723fa10dab08b9ffec5
parentdb5d3f8fa03ea6ed5c40c3d69f415cd875b7a89e (diff)
parent13483d9b9e06c5908584b5524416f808900e70a4 (diff)
downloadrabbitmq-server-git-b41ed423cb4ac8205f33d268daf1e7f15c90ffae.tar.gz
Merge branch 'master' into stream-queue-leader-locator
Conflicts: test/rabbit_stream_queue_SUITE.erl
-rw-r--r--.travis.yml6
-rw-r--r--Makefile3
-rw-r--r--README.md2
-rw-r--r--docs/rabbitmqctl.812
-rwxr-xr-xscripts/rabbitmq-env1
-rwxr-xr-xscripts/rabbitmq-server19
-rw-r--r--scripts/rabbitmq-server.bat5
-rw-r--r--scripts/rabbitmq-service.bat5
-rw-r--r--src/rabbit_channel_tracking.erl4
-rw-r--r--src/rabbit_classic_queue.erl6
-rw-r--r--src/rabbit_core_metrics_gc.erl4
-rw-r--r--src/rabbit_maintenance.erl23
-rw-r--r--src/rabbit_queue_type.erl17
-rw-r--r--src/rabbit_quorum_queue.erl2
-rw-r--r--src/rabbit_reader.erl6
-rw-r--r--src/rabbit_stream_queue.erl1
-rw-r--r--test/many_node_ha_SUITE.erl5
-rw-r--r--test/per_user_connection_channel_tracking_SUITE.erl59
-rw-r--r--test/quorum_queue_SUITE.erl42
-rw-r--r--test/quorum_queue_utils.erl2
-rw-r--r--test/rabbit_stream_queue_SUITE.erl57
-rw-r--r--test/single_active_consumer_SUITE.erl7
22 files changed, 98 insertions, 190 deletions
diff --git a/.travis.yml b/.travis.yml
index 0b5a791d7a..a502fe1922 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -25,10 +25,10 @@ env:
- base_rmq_ref=master
elixir:
- - '1.9'
+ - '1.10'
otp_release:
- - '21.3'
- - '22.2'
+ - '22.3'
+ - '23.0'
install:
# This project being an Erlang one (we just set language to Elixir
diff --git a/Makefile b/Makefile
index 3b31513c69..fc1267c13e 100644
--- a/Makefile
+++ b/Makefile
@@ -118,7 +118,8 @@ define PROJECT_ENV
{writer_gc_threshold, 1000000000},
%% interval at which connection/channel tracking executes post operations
{tracking_execution_timeout, 15000},
- {stream_messages_soft_limit, 256}
+ {stream_messages_soft_limit, 256},
+ {track_auth_attempt_source, false}
]
endef
diff --git a/README.md b/README.md
index 2a153b6f61..28bb2699fd 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,5 @@
[![OTP v22.3](https://img.shields.io/github/workflow/status/rabbitmq/rabbitmq-server/Test%20-%20Erlang%2022.3/master?label=Erlang%2022.3)](https://github.com/rabbitmq/rabbitmq-server/actions?query=workflow%3A%22Test+-+Erlang+22.3%22+branch%3A%22master%22)
-[![OTP v23](https://img.shields.io/github/workflow/status/rabbitmq/rabbitmq-server/Test%20-%20Erlang%2023.0/master?label=Erlang%2023.0)](https://github.com/rabbitmq/rabbitmq-server/actions?query=workflow%3A%22Test+-+Erlang+23.0%22+branch%3Amaster)
+[![OTP v23](https://img.shields.io/github/workflow/status/rabbitmq/rabbitmq-server/Test%20-%20Erlang%2023.1/master?label=Erlang%2023.1)](https://github.com/rabbitmq/rabbitmq-server/actions?query=workflow%3A%22Test+-+Erlang+23.1%22+branch%3Amaster)
# RabbitMQ Server
diff --git a/docs/rabbitmqctl.8 b/docs/rabbitmqctl.8
index 2c60ba10d3..3e041ad2c8 100644
--- a/docs/rabbitmqctl.8
+++ b/docs/rabbitmqctl.8
@@ -1429,18 +1429,6 @@ is located on the current node.
.Sp
.Dl rabbitmqctl list_unresponsive_queues --local name
.\" ------------------------------------------------------------------
-.It Cm node_health_check
-.Pp
-DEPRECATED. Performs intrusive, opinionated health checks on a fully booted node.
-To learn more, see the
-.Lk https://www.rabbitmq.com/monitoring.html#health-checks "Health Checks documentation"
-.Pp
-Verifies the RabbitMQ application is running and alarms are not set,
-then checks that every queue and channel on the node can emit basic stats.
-.sp
-Example:
-.Dl rabbitmqctl node_health_check -n rabbit@hostname
-.\" ------------------------------------------------------------------
.It Cm ping
.Pp
Checks that the node OS process is up, registered with EPMD and CLI tools can authenticate with it
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index afae3651aa..90702c43bb 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -130,7 +130,6 @@ SERVER_ERL_ARGS=" +P $RABBITMQ_MAX_NUMBER_OF_PROCESSES +t $RABBITMQ_MAX_NUMBER_O
[ "x" = "x$RABBITMQ_CTL_DIST_PORT_MIN" ] && RABBITMQ_CTL_DIST_PORT_MIN='35672'
[ "x" = "x$RABBITMQ_CTL_DIST_PORT_MAX" ] && RABBITMQ_CTL_DIST_PORT_MAX="$(($RABBITMQ_CTL_DIST_PORT_MIN + 10))"
-[ "x" = "x$RABBITMQ_IO_THREAD_POOL_SIZE" ] && RABBITMQ_IO_THREAD_POOL_SIZE=${IO_THREAD_POOL_SIZE}
[ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS}
[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS}
[ "x" = "x$RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS" ] && RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=${SERVER_ADDITIONAL_ERL_ARGS}
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 8f62d5ab10..82058dcb26 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -23,25 +23,9 @@ if [ "x" = "x$ERL_MAX_ETS_TABLES" ]; then
ERL_MAX_ETS_TABLES=50000
fi
-# Lazy initialization of threed pool size - if it wasn't set
-# explicitly. This parameter is only needed when server is starting,
-# so it makes no sense to do this calculations in rabbitmq-env or
-# rabbitmq-defaults scripts.
-ensure_thread_pool_size() {
- if [ -z "${RABBITMQ_IO_THREAD_POOL_SIZE}" ]; then
- RABBITMQ_IO_THREAD_POOL_SIZE=$(
- erl \
- -noinput \
- -boot "${CLEAN_BOOT_FILE}" \
- -s rabbit_misc report_default_thread_pool_size
- )
- fi
-}
-
check_start_params() {
check_not_empty RABBITMQ_BOOT_MODULE
check_not_empty SASL_BOOT_FILE
- check_not_empty RABBITMQ_IO_THREAD_POOL_SIZE
}
check_not_empty() {
@@ -59,7 +43,6 @@ start_rabbitmq_server() {
set -e
_rmq_env_set_erl_libs
- ensure_thread_pool_size
RABBITMQ_START_RABBIT=
[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT=" -noinput"
@@ -92,8 +75,6 @@ start_rabbitmq_server() {
${RABBITMQ_START_RABBIT} \
-boot "${SASL_BOOT_FILE}" \
+W w \
- +K true \
- +A ${RABBITMQ_IO_THREAD_POOL_SIZE} \
${RABBITMQ_DEFAULT_ALLOC_ARGS} \
${RABBITMQ_SERVER_ERL_ARGS} \
${RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS} \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 3eb033efc2..3a386b63c4 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -47,10 +47,6 @@ if "!RABBITMQ_NODE_ONLY!"=="" (
set RABBITMQ_START_RABBIT=!RABBITMQ_START_RABBIT! -s "!RABBITMQ_BOOT_MODULE!" boot
)
-if "!RABBITMQ_IO_THREAD_POOL_SIZE!"=="" (
- set RABBITMQ_IO_THREAD_POOL_SIZE=64
-)
-
set ENV_OK=true
CALL :check_not_empty "RABBITMQ_BOOT_MODULE" !RABBITMQ_BOOT_MODULE!
@@ -68,7 +64,6 @@ if "!RABBITMQ_ALLOW_INPUT!"=="" (
!RABBITMQ_START_RABBIT! ^
-boot "!SASL_BOOT_FILE!" ^
+W w ^
-+A "!RABBITMQ_IO_THREAD_POOL_SIZE!" ^
!RABBITMQ_DEFAULT_ALLOC_ARGS! ^
!RABBITMQ_SERVER_ERL_ARGS! ^
!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index b42e0dd89a..0b7906d4bf 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -176,10 +176,6 @@ if "!RABBITMQ_NODE_ONLY!"=="" (
set RABBITMQ_START_RABBIT=-s "!RABBITMQ_BOOT_MODULE!" boot
)
-if "!RABBITMQ_IO_THREAD_POOL_SIZE!"=="" (
- set RABBITMQ_IO_THREAD_POOL_SIZE=64
-)
-
if "!RABBITMQ_SERVICE_RESTART!"=="" (
set RABBITMQ_SERVICE_RESTART=restart
)
@@ -197,7 +193,6 @@ set ERLANG_SERVICE_ARGUMENTS= ^
!RABBITMQ_START_RABBIT! ^
-boot "!SASL_BOOT_FILE!" ^
+W w ^
-+A "!RABBITMQ_IO_THREAD_POOL_SIZE!" ^
!RABBITMQ_DEFAULT_ALLOC_ARGS! ^
!RABBITMQ_SERVER_ERL_ARGS! ^
!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
diff --git a/src/rabbit_channel_tracking.erl b/src/rabbit_channel_tracking.erl
index 52b84dc90b..42ab664a06 100644
--- a/src/rabbit_channel_tracking.erl
+++ b/src/rabbit_channel_tracking.erl
@@ -102,9 +102,9 @@ handle_cast({connection_closed, ConnDetails}) ->
rabbit_log_connection:info(
"Closing all channels from connection '~p' "
"because it has been closed", [pget(name, ConnDetails)]),
+ %% Shutting down channels will take care of unregistering the
+ %% corresponding tracking.
shutdown_tracked_items(TrackedChs, undefined),
- [unregister_tracked(rabbit_tracking:id(ThisNode, Name)) ||
- #tracked_channel{name = Name} <- TrackedChs],
ok;
_DifferentNode ->
ok
diff --git a/src/rabbit_classic_queue.erl b/src/rabbit_classic_queue.erl
index be0877c87d..45cb43dbd1 100644
--- a/src/rabbit_classic_queue.erl
+++ b/src/rabbit_classic_queue.erl
@@ -452,8 +452,10 @@ capabilities() ->
<<"x-max-length-bytes">>, <<"x-max-in-memory-length">>,
<<"x-max-in-memory-bytes">>, <<"x-max-priority">>,
<<"x-overflow">>, <<"x-queue-mode">>, <<"x-single-active-consumer">>,
- <<"x-queue-type">>],
- consumer_arguments => [<<"x-cancel-on-ha-failover">>],
+ <<"x-queue-type">>, <<"x-queue-master-locator">>],
+ consumer_arguments => [<<"x-cancel-on-ha-failover">>,
+ <<"x-priority">>, <<"x-credit">>
+ ],
server_named => true}.
reject_seq_no(SeqNo, U0) ->
diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl
index 5c0c0b7b2a..890c127586 100644
--- a/src/rabbit_core_metrics_gc.erl
+++ b/src/rabbit_core_metrics_gc.erl
@@ -36,6 +36,7 @@ handle_info(start_gc, State) ->
gc_exchanges(),
gc_nodes(),
gc_gen_server2(),
+ gc_auth_attempts(),
{noreply, start_timer(State)}.
terminate(_Reason, #state{timer = TRef}) ->
@@ -193,3 +194,6 @@ gc_process_and_entities(Table, QueueGbSet, ExchangeGbSet) ->
gc_entity(Q, Table, Key, QueueGbSet),
gc_entity(X, Table, Key, ExchangeGbSet)
end, none, Table).
+
+gc_auth_attempts() ->
+ ets:delete_all_objects(auth_attempt_detailed_metrics).
diff --git a/src/rabbit_maintenance.erl b/src/rabbit_maintenance.erl
index 004f8b4cea..039d125b13 100644
--- a/src/rabbit_maintenance.erl
+++ b/src/rabbit_maintenance.erl
@@ -89,6 +89,7 @@ do_drain() ->
[length(TransferCandidates), ReadableCandidates]),
transfer_leadership_of_classic_mirrored_queues(TransferCandidates),
transfer_leadership_of_quorum_queues(TransferCandidates),
+ stop_local_quorum_queue_followers(),
%% allow plugins to react
rabbit_event:notify(maintenance_draining, #{
@@ -276,6 +277,28 @@ transfer_leadership_of_classic_mirrored_queues(TransferCandidates) ->
end || Q <- Queues],
rabbit_log:info("Leadership transfer for local classic mirrored queues is complete").
+-spec stop_local_quorum_queue_followers() -> ok.
+stop_local_quorum_queue_followers() ->
+ Queues = rabbit_amqqueue:list_local_followers(),
+ rabbit_log:info("Will stop local follower replicas of ~b quorum queues on this node",
+ [length(Queues)]),
+ [begin
+ Name = amqqueue:get_name(Q),
+ rabbit_log:debug("Will stop a local follower replica of quorum queue ~s",
+ [rabbit_misc:rs(Name)]),
+ %% shut down Ra nodes so that they are not considered for leader election
+ {RegisteredName, _LeaderNode} = amqqueue:get_pid(Q),
+ RaNode = {RegisteredName, node()},
+ rabbit_log:debug("Will stop Ra server ~p", [RaNode]),
+ case ra:stop_server(RaNode) of
+ ok ->
+ rabbit_log:debug("Successfully stopped Ra server ~p", [RaNode]);
+ {error, nodedown} ->
+ rabbit_log:error("Failed to stop Ra server ~p: target node was reported as down")
+ end
+ end || Q <- Queues],
+ rabbit_log:info("Stopped all local replicas of quorum queues hosted on this node").
+
-spec primary_replica_transfer_candidate_nodes() -> [node()].
primary_replica_transfer_candidate_nodes() ->
filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running() -- [node()]).
diff --git a/src/rabbit_queue_type.erl b/src/rabbit_queue_type.erl
index 8424ee5d15..a357c5e334 100644
--- a/src/rabbit_queue_type.erl
+++ b/src/rabbit_queue_type.erl
@@ -213,9 +213,6 @@ is_enabled(Type) ->
rabbit_types:channel_exit().
declare(Q, Node) ->
Mod = amqqueue:get_type(Q),
- Capabilities = Mod:capabilities(),
- ValidQueueArgs = maps:get(queue_arguments, Capabilities, []),
- check_invalid_arguments(amqqueue:get_name(Q), amqqueue:get_arguments(Q), ValidQueueArgs),
Mod:declare(Q, Node).
-spec delete(amqqueue:amqqueue(), boolean(),
@@ -328,9 +325,6 @@ new(Q, State) when ?is_amqqueue(Q) ->
consume(Q, Spec, State) ->
#ctx{state = State0} = Ctx = get_ctx(Q, State),
Mod = amqqueue:get_type(Q),
- Capabilities = Mod:capabilities(),
- ValidConsumerArgs = maps:get(consumer_arguments, Capabilities, []),
- check_invalid_arguments(amqqueue:get_name(Q), maps:get(args, Spec), ValidConsumerArgs),
case Mod:consume(Q, Spec, State0) of
{ok, CtxState, Actions} ->
return_ok(set_ctx(Q, Ctx#ctx{state = CtxState}, State), Actions);
@@ -571,14 +565,3 @@ return_ok(State0, Actions0) ->
{S, [Act | A0]}
end, {State0, []}, Actions0),
{ok, State, lists:reverse(Actions)}.
-
-
-check_invalid_arguments(QueueName, Args, Keys) ->
- [case lists:member(Arg, Keys) of
- true -> ok;
- false -> rabbit_misc:protocol_error(
- precondition_failed,
- "invalid arg '~s' for ~s",
- [Arg, rabbit_misc:rs(QueueName)])
- end || {Arg, _, _} <- Args],
- ok.
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index c0b1aa0965..9d42db2169 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -340,7 +340,7 @@ capabilities() ->
<<"x-max-in-memory-bytes">>, <<"x-overflow">>,
<<"x-single-active-consumer">>, <<"x-queue-type">>,
<<"x-quorum-initial-group-size">>, <<"x-delivery-limit">>],
- consumer_arguments => [<<"x-priority">>],
+ consumer_arguments => [<<"x-priority">>, <<"x-credit">>],
server_named => false}.
rpc_delete_metrics(QName) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index f9697c96e5..c91dbbc105 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -1413,15 +1413,19 @@ auth_phase(Response,
auth_mechanism = {Name, AuthMechanism},
auth_state = AuthState},
sock = Sock}) ->
+ RemoteAddress = list_to_binary(inet:ntoa(Connection#connection.host)),
case AuthMechanism:handle_response(Response, AuthState) of
{refused, Username, Msg, Args} ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, amqp091),
auth_fail(Username, Msg, Args, Name, State);
{protocol_error, Msg, Args} ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, <<>>, amqp091),
notify_auth_result(none, user_authentication_failure,
[{error, rabbit_misc:format(Msg, Args)}],
State),
rabbit_misc:protocol_error(syntax_error, Msg, Args);
{challenge, Challenge, AuthState1} ->
+ rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, <<>>, amqp091),
Secure = #'connection.secure'{challenge = Challenge},
ok = send_on_channel0(Sock, Secure, Protocol),
State#v1{connection = Connection#connection{
@@ -1429,9 +1433,11 @@ auth_phase(Response,
{ok, User = #user{username = Username}} ->
case rabbit_access_control:check_user_loopback(Username, Sock) of
ok ->
+ rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, Username, amqp091),
notify_auth_result(Username, user_authentication_success,
[], State);
not_allowed ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, amqp091),
auth_fail(Username, "user '~s' can only connect via "
"localhost", [Username], Name, State)
end,
diff --git a/src/rabbit_stream_queue.erl b/src/rabbit_stream_queue.erl
index 3af56da9eb..2a9575c117 100644
--- a/src/rabbit_stream_queue.erl
+++ b/src/rabbit_stream_queue.erl
@@ -205,6 +205,7 @@ begin_stream(#stream_client{readers = Readers0} = State,
osiris:register_offset_listener(LocalPid, NextOffset),
%% TODO: avoid double calls to the same process
StartOffset = case Offset of
+ first -> NextOffset;
last -> NextOffset;
next -> NextOffset;
_ -> Offset
diff --git a/test/many_node_ha_SUITE.erl b/test/many_node_ha_SUITE.erl
index 6dc469f6f6..ece7dc8830 100644
--- a/test/many_node_ha_SUITE.erl
+++ b/test/many_node_ha_SUITE.erl
@@ -12,6 +12,11 @@
-compile(export_all).
+suite() ->
+ [
+ {timetrap, {minutes, 5}}
+ ].
+
all() ->
[
{group, cluster_size_6}
diff --git a/test/per_user_connection_channel_tracking_SUITE.erl b/test/per_user_connection_channel_tracking_SUITE.erl
index 923bdea541..8b4bd91d09 100644
--- a/test/per_user_connection_channel_tracking_SUITE.erl
+++ b/test/per_user_connection_channel_tracking_SUITE.erl
@@ -10,6 +10,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("eunit/include/eunit.hrl").
+-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-compile(export_all).
@@ -148,13 +149,13 @@ single_node_user_connection_channel_tracking(Config) ->
?assertEqual(true, is_process_alive(Conn1)),
?assertEqual(true, is_process_alive(Chan1)),
close_channels([Chan1]),
- ?assertEqual(0, count_channels_in(Config, Username)),
- ?assertEqual(0, tracked_user_channel_count(Config, Username)),
- ?assertEqual(false, is_process_alive(Chan1)),
+ ?awaitMatch(0, count_channels_in(Config, Username), 20000),
+ ?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000),
+ ?awaitMatch(false, is_process_alive(Chan1), 20000),
close_connections([Conn1]),
- ?assertEqual(0, length(connections_in(Config, Username))),
- ?assertEqual(0, tracked_user_connection_count(Config, Username)),
- ?assertEqual(false, is_process_alive(Conn1)),
+ ?awaitMatch(0, length(connections_in(Config, Username)), 20000),
+ ?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000),
+ ?awaitMatch(false, is_process_alive(Conn1), 20000),
[Conn2] = open_connections(Config, [{0, Username2}]),
Chans2 = [_|_] = open_channels(Conn2, 5),
@@ -183,9 +184,9 @@ single_node_user_connection_channel_tracking(Config) ->
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans4],
kill_connections([Conn4]),
[#tracked_connection{username = Username}] = connections_in(Config, Username),
- ?assertEqual(5, count_channels_in(Config, Username)),
- ?assertEqual(1, tracked_user_connection_count(Config, Username)),
- ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?awaitMatch(5, count_channels_in(Config, Username), 20000),
+ ?awaitMatch(1, tracked_user_connection_count(Config, Username), 20000),
+ ?awaitMatch(5, tracked_user_channel_count(Config, Username), 20000),
?assertEqual(false, is_process_alive(Conn4)),
[?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans4],
@@ -201,15 +202,15 @@ single_node_user_connection_channel_tracking(Config) ->
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans5],
close_channels(Chans2 ++ Chans3 ++ Chans5),
- ?assertEqual(0, length(all_channels(Config))),
- ?assertEqual(0, tracked_user_channel_count(Config, Username)),
- ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+ ?awaitMatch(0, length(all_channels(Config)), 20000),
+ ?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000),
+ ?awaitMatch(0, tracked_user_channel_count(Config, Username2), 20000),
close_connections([Conn2, Conn3, Conn5]),
rabbit_ct_broker_helpers:delete_user(Config, Username2),
- ?assertEqual(0, tracked_user_connection_count(Config, Username)),
- ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
- ?assertEqual(0, length(all_connections(Config))).
+ ?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000),
+ ?awaitMatch(0, tracked_user_connection_count(Config, Username2), 20000),
+ ?awaitMatch(0, length(all_connections(Config)), 20000).
single_node_user_deletion(Config) ->
set_tracking_execution_timeout(Config, 100),
@@ -264,8 +265,8 @@ single_node_user_deletion(Config) ->
[?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
%% ensure vhost entry is cleared after 'tracking_execution_timeout'
- ?assertEqual(false, exists_in_tracked_connection_per_user_table(Config, Username2)),
- ?assertEqual(false, exists_in_tracked_channel_per_user_table(Config, Username2)),
+ ?awaitMatch(false, exists_in_tracked_connection_per_user_table(Config, Username2), 20000),
+ ?awaitMatch(false, exists_in_tracked_channel_per_user_table(Config, Username2), 20000),
?assertEqual(1, count_connections_in(Config, Username)),
?assertEqual(5, count_channels_in(Config, Username)),
@@ -275,12 +276,12 @@ single_node_user_deletion(Config) ->
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
close_channels(Chans1),
- ?assertEqual(0, count_channels_in(Config, Username)),
- ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?awaitMatch(0, count_channels_in(Config, Username), 20000),
+ ?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000),
close_connections([Conn1]),
- ?assertEqual(0, count_connections_in(Config, Username)),
- ?assertEqual(0, tracked_user_connection_count(Config, Username)).
+ ?awaitMatch(0, count_connections_in(Config, Username), 20000),
+ ?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000).
single_node_vhost_deletion(Config) ->
set_tracking_execution_timeout(Config, 100),
@@ -456,12 +457,12 @@ cluster_user_deletion(Config) ->
?assertEqual(false, exists_in_tracked_channel_per_user_table(Config, 1, Username2)),
close_channels(Chans1),
- ?assertEqual(0, count_channels_in(Config, Username)),
- ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?awaitMatch(0, count_channels_in(Config, Username), 20000),
+ ?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000),
close_connections([Conn1]),
- ?assertEqual(0, count_connections_in(Config, Username)),
- ?assertEqual(0, tracked_user_connection_count(Config, Username)).
+ ?awaitMatch(0, count_connections_in(Config, Username), 20000),
+ ?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000).
cluster_vhost_deletion(Config) ->
set_tracking_execution_timeout(Config, 0, 100),
@@ -665,12 +666,12 @@ cluster_node_removed(Config) ->
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
close_channels(Chans1),
- ?assertEqual(0, count_channels_in(Config, Username)),
- ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?awaitMatch(0, count_channels_in(Config, Username), 20000),
+ ?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000),
close_connections([Conn1]),
- ?assertEqual(0, count_connections_in(Config, Username)),
- ?assertEqual(0, tracked_user_connection_count(Config, Username)).
+ ?awaitMatch(0, count_connections_in(Config, Username), 20000),
+ ?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000).
%% -------------------------------------------------------------------
%% Helpers
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 97b2235411..8a2bf7f5d9 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -89,7 +89,6 @@ groups() ->
all_tests() ->
[
declare_args,
- declare_invalid_args,
declare_invalid_properties,
declare_server_named,
start_queue,
@@ -338,47 +337,6 @@ declare_invalid_properties(Config) ->
durable = false,
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]})).
-declare_invalid_args(Config) ->
- Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
- LQ = ?config(queue_name, Config),
-
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-message-ttl">>, long, 2000}])),
-
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-max-priority">>, long, 2000}])),
-
- [?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-overflow">>, longstr, XOverflow}]))
- || XOverflow <- [<<"reject-publish-dlx">>]],
-
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-queue-mode">>, longstr, <<"lazy">>}])),
-
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-quorum-initial-group-size">>, longstr, <<"hop">>}])),
-
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-quorum-initial-group-size">>, long, 0}])).
-
declare_server_named(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
diff --git a/test/quorum_queue_utils.erl b/test/quorum_queue_utils.erl
index caabd617ae..923ed0bb00 100644
--- a/test/quorum_queue_utils.erl
+++ b/test/quorum_queue_utils.erl
@@ -53,7 +53,7 @@ wait_for_messages(Servers, QName, Number, Fun, N) ->
end.
wait_for_messages(Config, Stats) ->
- wait_for_messages(Config, lists:sort(Stats), 10).
+ wait_for_messages(Config, lists:sort(Stats), 60).
wait_for_messages(Config, Stats, 0) ->
?assertEqual(Stats,
diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl
index d6275eae17..c2157d9447 100644
--- a/test/rabbit_stream_queue_SUITE.erl
+++ b/test/rabbit_stream_queue_SUITE.erl
@@ -63,7 +63,6 @@ all_tests() ->
[
declare_args,
declare_max_age,
- declare_invalid_args,
declare_invalid_properties,
declare_server_named,
declare_queue,
@@ -230,52 +229,6 @@ declare_invalid_properties(Config) ->
durable = false,
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]})).
-declare_invalid_args(Config) ->
- Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
- Q = ?config(queue_name, Config),
-
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
- {<<"x-expires">>, long, 2000}])),
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
- {<<"x-message-ttl">>, long, 2000}])),
-
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
- {<<"x-max-priority">>, long, 2000}])),
-
- [?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
- {<<"x-overflow">>, longstr, XOverflow}]))
- || XOverflow <- [<<"reject-publish">>, <<"reject-publish-dlx">>]],
-
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
- {<<"x-queue-mode">>, longstr, <<"lazy">>}])),
-
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
- {<<"x-quorum-initial-group-size">>, longstr, <<"hop">>}])),
-
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
- {<<"x-queue-leader-locator">>, longstr, <<"hop">>}])).
-
declare_server_named(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
@@ -1118,6 +1071,8 @@ max_age(Config) ->
[publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)],
amqp_channel:wait_for_confirms(Ch, 5000),
+ timer:sleep(5000),
+
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
qos(Ch1, 200, false),
subscribe(Ch1, Q, false, 0),
@@ -1154,7 +1109,11 @@ leader_failover(Config) ->
ok = rabbit_ct_broker_helpers:start_node(Config, Server1).
initial_cluster_size_one(Config) ->
+<<<<<<< HEAD
[Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+=======
+ [Server1, _Server2, _Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+>>>>>>> master
Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
Q = ?config(queue_name, Config),
@@ -1168,7 +1127,11 @@ initial_cluster_size_one(Config) ->
amqp_channel:call(Ch, #'queue.delete'{queue = Q})).
initial_cluster_size_two(Config) ->
+<<<<<<< HEAD
[Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+=======
+ [Server1, _Server2, _Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+>>>>>>> master
Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
Q = ?config(queue_name, Config),
diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl
index 63b56fa009..59f2b6e83d 100644
--- a/test/single_active_consumer_SUITE.erl
+++ b/test/single_active_consumer_SUITE.erl
@@ -83,7 +83,8 @@ init_per_group(quorum_queue, Config) ->
end_per_group(_, Config) ->
Config.
-init_per_testcase(Testcase, Config) ->
+init_per_testcase(Testcase, Config0) ->
+ Config = [{queue_name, atom_to_binary(Testcase, utf8)} | Config0],
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
@@ -290,7 +291,9 @@ connection_and_channel(Config) ->
{C, Ch}.
queue_declare(Channel, Config) ->
- Declare = ?config(single_active_consumer_queue_declare, Config),
+ QueueName = ?config(queue_name, Config),
+ Declare0 = ?config(single_active_consumer_queue_declare, Config),
+ Declare = Declare0#'queue.declare'{queue = QueueName},
#'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel, Declare),
Q.