diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-21 11:26:46 +0200 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-21 11:26:46 +0200 |
commit | b41ed423cb4ac8205f33d268daf1e7f15c90ffae (patch) | |
tree | 6c32153d8b07929a10880723fa10dab08b9ffec5 | |
parent | db5d3f8fa03ea6ed5c40c3d69f415cd875b7a89e (diff) | |
parent | 13483d9b9e06c5908584b5524416f808900e70a4 (diff) | |
download | rabbitmq-server-git-b41ed423cb4ac8205f33d268daf1e7f15c90ffae.tar.gz |
Merge branch 'master' into stream-queue-leader-locator
Conflicts:
test/rabbit_stream_queue_SUITE.erl
-rw-r--r-- | .travis.yml | 6 | ||||
-rw-r--r-- | Makefile | 3 | ||||
-rw-r--r-- | README.md | 2 | ||||
-rw-r--r-- | docs/rabbitmqctl.8 | 12 | ||||
-rwxr-xr-x | scripts/rabbitmq-env | 1 | ||||
-rwxr-xr-x | scripts/rabbitmq-server | 19 | ||||
-rw-r--r-- | scripts/rabbitmq-server.bat | 5 | ||||
-rw-r--r-- | scripts/rabbitmq-service.bat | 5 | ||||
-rw-r--r-- | src/rabbit_channel_tracking.erl | 4 | ||||
-rw-r--r-- | src/rabbit_classic_queue.erl | 6 | ||||
-rw-r--r-- | src/rabbit_core_metrics_gc.erl | 4 | ||||
-rw-r--r-- | src/rabbit_maintenance.erl | 23 | ||||
-rw-r--r-- | src/rabbit_queue_type.erl | 17 | ||||
-rw-r--r-- | src/rabbit_quorum_queue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 6 | ||||
-rw-r--r-- | src/rabbit_stream_queue.erl | 1 | ||||
-rw-r--r-- | test/many_node_ha_SUITE.erl | 5 | ||||
-rw-r--r-- | test/per_user_connection_channel_tracking_SUITE.erl | 59 | ||||
-rw-r--r-- | test/quorum_queue_SUITE.erl | 42 | ||||
-rw-r--r-- | test/quorum_queue_utils.erl | 2 | ||||
-rw-r--r-- | test/rabbit_stream_queue_SUITE.erl | 57 | ||||
-rw-r--r-- | test/single_active_consumer_SUITE.erl | 7 |
22 files changed, 98 insertions, 190 deletions
diff --git a/.travis.yml b/.travis.yml index 0b5a791d7a..a502fe1922 100644 --- a/.travis.yml +++ b/.travis.yml @@ -25,10 +25,10 @@ env: - base_rmq_ref=master elixir: - - '1.9' + - '1.10' otp_release: - - '21.3' - - '22.2' + - '22.3' + - '23.0' install: # This project being an Erlang one (we just set language to Elixir @@ -118,7 +118,8 @@ define PROJECT_ENV {writer_gc_threshold, 1000000000}, %% interval at which connection/channel tracking executes post operations {tracking_execution_timeout, 15000}, - {stream_messages_soft_limit, 256} + {stream_messages_soft_limit, 256}, + {track_auth_attempt_source, false} ] endef @@ -1,5 +1,5 @@ [![OTP v22.3](https://img.shields.io/github/workflow/status/rabbitmq/rabbitmq-server/Test%20-%20Erlang%2022.3/master?label=Erlang%2022.3)](https://github.com/rabbitmq/rabbitmq-server/actions?query=workflow%3A%22Test+-+Erlang+22.3%22+branch%3A%22master%22) -[![OTP v23](https://img.shields.io/github/workflow/status/rabbitmq/rabbitmq-server/Test%20-%20Erlang%2023.0/master?label=Erlang%2023.0)](https://github.com/rabbitmq/rabbitmq-server/actions?query=workflow%3A%22Test+-+Erlang+23.0%22+branch%3Amaster) +[![OTP v23](https://img.shields.io/github/workflow/status/rabbitmq/rabbitmq-server/Test%20-%20Erlang%2023.1/master?label=Erlang%2023.1)](https://github.com/rabbitmq/rabbitmq-server/actions?query=workflow%3A%22Test+-+Erlang+23.1%22+branch%3Amaster) # RabbitMQ Server diff --git a/docs/rabbitmqctl.8 b/docs/rabbitmqctl.8 index 2c60ba10d3..3e041ad2c8 100644 --- a/docs/rabbitmqctl.8 +++ b/docs/rabbitmqctl.8 @@ -1429,18 +1429,6 @@ is located on the current node. .Sp .Dl rabbitmqctl list_unresponsive_queues --local name .\" ------------------------------------------------------------------ -.It Cm node_health_check -.Pp -DEPRECATED. Performs intrusive, opinionated health checks on a fully booted node. -To learn more, see the -.Lk https://www.rabbitmq.com/monitoring.html#health-checks "Health Checks documentation" -.Pp -Verifies the RabbitMQ application is running and alarms are not set, -then checks that every queue and channel on the node can emit basic stats. -.sp -Example: -.Dl rabbitmqctl node_health_check -n rabbit@hostname -.\" ------------------------------------------------------------------ .It Cm ping .Pp Checks that the node OS process is up, registered with EPMD and CLI tools can authenticate with it diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env index afae3651aa..90702c43bb 100755 --- a/scripts/rabbitmq-env +++ b/scripts/rabbitmq-env @@ -130,7 +130,6 @@ SERVER_ERL_ARGS=" +P $RABBITMQ_MAX_NUMBER_OF_PROCESSES +t $RABBITMQ_MAX_NUMBER_O [ "x" = "x$RABBITMQ_CTL_DIST_PORT_MIN" ] && RABBITMQ_CTL_DIST_PORT_MIN='35672' [ "x" = "x$RABBITMQ_CTL_DIST_PORT_MAX" ] && RABBITMQ_CTL_DIST_PORT_MAX="$(($RABBITMQ_CTL_DIST_PORT_MIN + 10))" -[ "x" = "x$RABBITMQ_IO_THREAD_POOL_SIZE" ] && RABBITMQ_IO_THREAD_POOL_SIZE=${IO_THREAD_POOL_SIZE} [ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS} [ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS} [ "x" = "x$RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS" ] && RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=${SERVER_ADDITIONAL_ERL_ARGS} diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 8f62d5ab10..82058dcb26 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -23,25 +23,9 @@ if [ "x" = "x$ERL_MAX_ETS_TABLES" ]; then ERL_MAX_ETS_TABLES=50000 fi -# Lazy initialization of threed pool size - if it wasn't set -# explicitly. This parameter is only needed when server is starting, -# so it makes no sense to do this calculations in rabbitmq-env or -# rabbitmq-defaults scripts. -ensure_thread_pool_size() { - if [ -z "${RABBITMQ_IO_THREAD_POOL_SIZE}" ]; then - RABBITMQ_IO_THREAD_POOL_SIZE=$( - erl \ - -noinput \ - -boot "${CLEAN_BOOT_FILE}" \ - -s rabbit_misc report_default_thread_pool_size - ) - fi -} - check_start_params() { check_not_empty RABBITMQ_BOOT_MODULE check_not_empty SASL_BOOT_FILE - check_not_empty RABBITMQ_IO_THREAD_POOL_SIZE } check_not_empty() { @@ -59,7 +43,6 @@ start_rabbitmq_server() { set -e _rmq_env_set_erl_libs - ensure_thread_pool_size RABBITMQ_START_RABBIT= [ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT=" -noinput" @@ -92,8 +75,6 @@ start_rabbitmq_server() { ${RABBITMQ_START_RABBIT} \ -boot "${SASL_BOOT_FILE}" \ +W w \ - +K true \ - +A ${RABBITMQ_IO_THREAD_POOL_SIZE} \ ${RABBITMQ_DEFAULT_ALLOC_ARGS} \ ${RABBITMQ_SERVER_ERL_ARGS} \ ${RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS} \ diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 3eb033efc2..3a386b63c4 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -47,10 +47,6 @@ if "!RABBITMQ_NODE_ONLY!"=="" ( set RABBITMQ_START_RABBIT=!RABBITMQ_START_RABBIT! -s "!RABBITMQ_BOOT_MODULE!" boot
)
-if "!RABBITMQ_IO_THREAD_POOL_SIZE!"=="" (
- set RABBITMQ_IO_THREAD_POOL_SIZE=64
-)
-
set ENV_OK=true
CALL :check_not_empty "RABBITMQ_BOOT_MODULE" !RABBITMQ_BOOT_MODULE!
@@ -68,7 +64,6 @@ if "!RABBITMQ_ALLOW_INPUT!"=="" ( !RABBITMQ_START_RABBIT! ^
-boot "!SASL_BOOT_FILE!" ^
+W w ^
-+A "!RABBITMQ_IO_THREAD_POOL_SIZE!" ^
!RABBITMQ_DEFAULT_ALLOC_ARGS! ^
!RABBITMQ_SERVER_ERL_ARGS! ^
!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index b42e0dd89a..0b7906d4bf 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -176,10 +176,6 @@ if "!RABBITMQ_NODE_ONLY!"=="" ( set RABBITMQ_START_RABBIT=-s "!RABBITMQ_BOOT_MODULE!" boot
)
-if "!RABBITMQ_IO_THREAD_POOL_SIZE!"=="" (
- set RABBITMQ_IO_THREAD_POOL_SIZE=64
-)
-
if "!RABBITMQ_SERVICE_RESTART!"=="" (
set RABBITMQ_SERVICE_RESTART=restart
)
@@ -197,7 +193,6 @@ set ERLANG_SERVICE_ARGUMENTS= ^ !RABBITMQ_START_RABBIT! ^
-boot "!SASL_BOOT_FILE!" ^
+W w ^
-+A "!RABBITMQ_IO_THREAD_POOL_SIZE!" ^
!RABBITMQ_DEFAULT_ALLOC_ARGS! ^
!RABBITMQ_SERVER_ERL_ARGS! ^
!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
diff --git a/src/rabbit_channel_tracking.erl b/src/rabbit_channel_tracking.erl index 52b84dc90b..42ab664a06 100644 --- a/src/rabbit_channel_tracking.erl +++ b/src/rabbit_channel_tracking.erl @@ -102,9 +102,9 @@ handle_cast({connection_closed, ConnDetails}) -> rabbit_log_connection:info( "Closing all channels from connection '~p' " "because it has been closed", [pget(name, ConnDetails)]), + %% Shutting down channels will take care of unregistering the + %% corresponding tracking. shutdown_tracked_items(TrackedChs, undefined), - [unregister_tracked(rabbit_tracking:id(ThisNode, Name)) || - #tracked_channel{name = Name} <- TrackedChs], ok; _DifferentNode -> ok diff --git a/src/rabbit_classic_queue.erl b/src/rabbit_classic_queue.erl index be0877c87d..45cb43dbd1 100644 --- a/src/rabbit_classic_queue.erl +++ b/src/rabbit_classic_queue.erl @@ -452,8 +452,10 @@ capabilities() -> <<"x-max-length-bytes">>, <<"x-max-in-memory-length">>, <<"x-max-in-memory-bytes">>, <<"x-max-priority">>, <<"x-overflow">>, <<"x-queue-mode">>, <<"x-single-active-consumer">>, - <<"x-queue-type">>], - consumer_arguments => [<<"x-cancel-on-ha-failover">>], + <<"x-queue-type">>, <<"x-queue-master-locator">>], + consumer_arguments => [<<"x-cancel-on-ha-failover">>, + <<"x-priority">>, <<"x-credit">> + ], server_named => true}. reject_seq_no(SeqNo, U0) -> diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl index 5c0c0b7b2a..890c127586 100644 --- a/src/rabbit_core_metrics_gc.erl +++ b/src/rabbit_core_metrics_gc.erl @@ -36,6 +36,7 @@ handle_info(start_gc, State) -> gc_exchanges(), gc_nodes(), gc_gen_server2(), + gc_auth_attempts(), {noreply, start_timer(State)}. terminate(_Reason, #state{timer = TRef}) -> @@ -193,3 +194,6 @@ gc_process_and_entities(Table, QueueGbSet, ExchangeGbSet) -> gc_entity(Q, Table, Key, QueueGbSet), gc_entity(X, Table, Key, ExchangeGbSet) end, none, Table). + +gc_auth_attempts() -> + ets:delete_all_objects(auth_attempt_detailed_metrics). diff --git a/src/rabbit_maintenance.erl b/src/rabbit_maintenance.erl index 004f8b4cea..039d125b13 100644 --- a/src/rabbit_maintenance.erl +++ b/src/rabbit_maintenance.erl @@ -89,6 +89,7 @@ do_drain() -> [length(TransferCandidates), ReadableCandidates]), transfer_leadership_of_classic_mirrored_queues(TransferCandidates), transfer_leadership_of_quorum_queues(TransferCandidates), + stop_local_quorum_queue_followers(), %% allow plugins to react rabbit_event:notify(maintenance_draining, #{ @@ -276,6 +277,28 @@ transfer_leadership_of_classic_mirrored_queues(TransferCandidates) -> end || Q <- Queues], rabbit_log:info("Leadership transfer for local classic mirrored queues is complete"). +-spec stop_local_quorum_queue_followers() -> ok. +stop_local_quorum_queue_followers() -> + Queues = rabbit_amqqueue:list_local_followers(), + rabbit_log:info("Will stop local follower replicas of ~b quorum queues on this node", + [length(Queues)]), + [begin + Name = amqqueue:get_name(Q), + rabbit_log:debug("Will stop a local follower replica of quorum queue ~s", + [rabbit_misc:rs(Name)]), + %% shut down Ra nodes so that they are not considered for leader election + {RegisteredName, _LeaderNode} = amqqueue:get_pid(Q), + RaNode = {RegisteredName, node()}, + rabbit_log:debug("Will stop Ra server ~p", [RaNode]), + case ra:stop_server(RaNode) of + ok -> + rabbit_log:debug("Successfully stopped Ra server ~p", [RaNode]); + {error, nodedown} -> + rabbit_log:error("Failed to stop Ra server ~p: target node was reported as down") + end + end || Q <- Queues], + rabbit_log:info("Stopped all local replicas of quorum queues hosted on this node"). + -spec primary_replica_transfer_candidate_nodes() -> [node()]. primary_replica_transfer_candidate_nodes() -> filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running() -- [node()]). diff --git a/src/rabbit_queue_type.erl b/src/rabbit_queue_type.erl index 8424ee5d15..a357c5e334 100644 --- a/src/rabbit_queue_type.erl +++ b/src/rabbit_queue_type.erl @@ -213,9 +213,6 @@ is_enabled(Type) -> rabbit_types:channel_exit(). declare(Q, Node) -> Mod = amqqueue:get_type(Q), - Capabilities = Mod:capabilities(), - ValidQueueArgs = maps:get(queue_arguments, Capabilities, []), - check_invalid_arguments(amqqueue:get_name(Q), amqqueue:get_arguments(Q), ValidQueueArgs), Mod:declare(Q, Node). -spec delete(amqqueue:amqqueue(), boolean(), @@ -328,9 +325,6 @@ new(Q, State) when ?is_amqqueue(Q) -> consume(Q, Spec, State) -> #ctx{state = State0} = Ctx = get_ctx(Q, State), Mod = amqqueue:get_type(Q), - Capabilities = Mod:capabilities(), - ValidConsumerArgs = maps:get(consumer_arguments, Capabilities, []), - check_invalid_arguments(amqqueue:get_name(Q), maps:get(args, Spec), ValidConsumerArgs), case Mod:consume(Q, Spec, State0) of {ok, CtxState, Actions} -> return_ok(set_ctx(Q, Ctx#ctx{state = CtxState}, State), Actions); @@ -571,14 +565,3 @@ return_ok(State0, Actions0) -> {S, [Act | A0]} end, {State0, []}, Actions0), {ok, State, lists:reverse(Actions)}. - - -check_invalid_arguments(QueueName, Args, Keys) -> - [case lists:member(Arg, Keys) of - true -> ok; - false -> rabbit_misc:protocol_error( - precondition_failed, - "invalid arg '~s' for ~s", - [Arg, rabbit_misc:rs(QueueName)]) - end || {Arg, _, _} <- Args], - ok. diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index c0b1aa0965..9d42db2169 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -340,7 +340,7 @@ capabilities() -> <<"x-max-in-memory-bytes">>, <<"x-overflow">>, <<"x-single-active-consumer">>, <<"x-queue-type">>, <<"x-quorum-initial-group-size">>, <<"x-delivery-limit">>], - consumer_arguments => [<<"x-priority">>], + consumer_arguments => [<<"x-priority">>, <<"x-credit">>], server_named => false}. rpc_delete_metrics(QName) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index f9697c96e5..c91dbbc105 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -1413,15 +1413,19 @@ auth_phase(Response, auth_mechanism = {Name, AuthMechanism}, auth_state = AuthState}, sock = Sock}) -> + RemoteAddress = list_to_binary(inet:ntoa(Connection#connection.host)), case AuthMechanism:handle_response(Response, AuthState) of {refused, Username, Msg, Args} -> + rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, amqp091), auth_fail(Username, Msg, Args, Name, State); {protocol_error, Msg, Args} -> + rabbit_core_metrics:auth_attempt_failed(RemoteAddress, <<>>, amqp091), notify_auth_result(none, user_authentication_failure, [{error, rabbit_misc:format(Msg, Args)}], State), rabbit_misc:protocol_error(syntax_error, Msg, Args); {challenge, Challenge, AuthState1} -> + rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, <<>>, amqp091), Secure = #'connection.secure'{challenge = Challenge}, ok = send_on_channel0(Sock, Secure, Protocol), State#v1{connection = Connection#connection{ @@ -1429,9 +1433,11 @@ auth_phase(Response, {ok, User = #user{username = Username}} -> case rabbit_access_control:check_user_loopback(Username, Sock) of ok -> + rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, Username, amqp091), notify_auth_result(Username, user_authentication_success, [], State); not_allowed -> + rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, amqp091), auth_fail(Username, "user '~s' can only connect via " "localhost", [Username], Name, State) end, diff --git a/src/rabbit_stream_queue.erl b/src/rabbit_stream_queue.erl index 3af56da9eb..2a9575c117 100644 --- a/src/rabbit_stream_queue.erl +++ b/src/rabbit_stream_queue.erl @@ -205,6 +205,7 @@ begin_stream(#stream_client{readers = Readers0} = State, osiris:register_offset_listener(LocalPid, NextOffset), %% TODO: avoid double calls to the same process StartOffset = case Offset of + first -> NextOffset; last -> NextOffset; next -> NextOffset; _ -> Offset diff --git a/test/many_node_ha_SUITE.erl b/test/many_node_ha_SUITE.erl index 6dc469f6f6..ece7dc8830 100644 --- a/test/many_node_ha_SUITE.erl +++ b/test/many_node_ha_SUITE.erl @@ -12,6 +12,11 @@ -compile(export_all). +suite() -> + [ + {timetrap, {minutes, 5}} + ]. + all() -> [ {group, cluster_size_6} diff --git a/test/per_user_connection_channel_tracking_SUITE.erl b/test/per_user_connection_channel_tracking_SUITE.erl index 923bdea541..8b4bd91d09 100644 --- a/test/per_user_connection_channel_tracking_SUITE.erl +++ b/test/per_user_connection_channel_tracking_SUITE.erl @@ -10,6 +10,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -compile(export_all). @@ -148,13 +149,13 @@ single_node_user_connection_channel_tracking(Config) -> ?assertEqual(true, is_process_alive(Conn1)), ?assertEqual(true, is_process_alive(Chan1)), close_channels([Chan1]), - ?assertEqual(0, count_channels_in(Config, Username)), - ?assertEqual(0, tracked_user_channel_count(Config, Username)), - ?assertEqual(false, is_process_alive(Chan1)), + ?awaitMatch(0, count_channels_in(Config, Username), 20000), + ?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000), + ?awaitMatch(false, is_process_alive(Chan1), 20000), close_connections([Conn1]), - ?assertEqual(0, length(connections_in(Config, Username))), - ?assertEqual(0, tracked_user_connection_count(Config, Username)), - ?assertEqual(false, is_process_alive(Conn1)), + ?awaitMatch(0, length(connections_in(Config, Username)), 20000), + ?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000), + ?awaitMatch(false, is_process_alive(Conn1), 20000), [Conn2] = open_connections(Config, [{0, Username2}]), Chans2 = [_|_] = open_channels(Conn2, 5), @@ -183,9 +184,9 @@ single_node_user_connection_channel_tracking(Config) -> [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans4], kill_connections([Conn4]), [#tracked_connection{username = Username}] = connections_in(Config, Username), - ?assertEqual(5, count_channels_in(Config, Username)), - ?assertEqual(1, tracked_user_connection_count(Config, Username)), - ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?awaitMatch(5, count_channels_in(Config, Username), 20000), + ?awaitMatch(1, tracked_user_connection_count(Config, Username), 20000), + ?awaitMatch(5, tracked_user_channel_count(Config, Username), 20000), ?assertEqual(false, is_process_alive(Conn4)), [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans4], @@ -201,15 +202,15 @@ single_node_user_connection_channel_tracking(Config) -> [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans5], close_channels(Chans2 ++ Chans3 ++ Chans5), - ?assertEqual(0, length(all_channels(Config))), - ?assertEqual(0, tracked_user_channel_count(Config, Username)), - ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + ?awaitMatch(0, length(all_channels(Config)), 20000), + ?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000), + ?awaitMatch(0, tracked_user_channel_count(Config, Username2), 20000), close_connections([Conn2, Conn3, Conn5]), rabbit_ct_broker_helpers:delete_user(Config, Username2), - ?assertEqual(0, tracked_user_connection_count(Config, Username)), - ?assertEqual(0, tracked_user_connection_count(Config, Username2)), - ?assertEqual(0, length(all_connections(Config))). + ?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000), + ?awaitMatch(0, tracked_user_connection_count(Config, Username2), 20000), + ?awaitMatch(0, length(all_connections(Config)), 20000). single_node_user_deletion(Config) -> set_tracking_execution_timeout(Config, 100), @@ -264,8 +265,8 @@ single_node_user_deletion(Config) -> [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2], %% ensure vhost entry is cleared after 'tracking_execution_timeout' - ?assertEqual(false, exists_in_tracked_connection_per_user_table(Config, Username2)), - ?assertEqual(false, exists_in_tracked_channel_per_user_table(Config, Username2)), + ?awaitMatch(false, exists_in_tracked_connection_per_user_table(Config, Username2), 20000), + ?awaitMatch(false, exists_in_tracked_channel_per_user_table(Config, Username2), 20000), ?assertEqual(1, count_connections_in(Config, Username)), ?assertEqual(5, count_channels_in(Config, Username)), @@ -275,12 +276,12 @@ single_node_user_deletion(Config) -> [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], close_channels(Chans1), - ?assertEqual(0, count_channels_in(Config, Username)), - ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?awaitMatch(0, count_channels_in(Config, Username), 20000), + ?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000), close_connections([Conn1]), - ?assertEqual(0, count_connections_in(Config, Username)), - ?assertEqual(0, tracked_user_connection_count(Config, Username)). + ?awaitMatch(0, count_connections_in(Config, Username), 20000), + ?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000). single_node_vhost_deletion(Config) -> set_tracking_execution_timeout(Config, 100), @@ -456,12 +457,12 @@ cluster_user_deletion(Config) -> ?assertEqual(false, exists_in_tracked_channel_per_user_table(Config, 1, Username2)), close_channels(Chans1), - ?assertEqual(0, count_channels_in(Config, Username)), - ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?awaitMatch(0, count_channels_in(Config, Username), 20000), + ?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000), close_connections([Conn1]), - ?assertEqual(0, count_connections_in(Config, Username)), - ?assertEqual(0, tracked_user_connection_count(Config, Username)). + ?awaitMatch(0, count_connections_in(Config, Username), 20000), + ?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000). cluster_vhost_deletion(Config) -> set_tracking_execution_timeout(Config, 0, 100), @@ -665,12 +666,12 @@ cluster_node_removed(Config) -> [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], close_channels(Chans1), - ?assertEqual(0, count_channels_in(Config, Username)), - ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?awaitMatch(0, count_channels_in(Config, Username), 20000), + ?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000), close_connections([Conn1]), - ?assertEqual(0, count_connections_in(Config, Username)), - ?assertEqual(0, tracked_user_connection_count(Config, Username)). + ?awaitMatch(0, count_connections_in(Config, Username), 20000), + ?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000). %% ------------------------------------------------------------------- %% Helpers diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 97b2235411..8a2bf7f5d9 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -89,7 +89,6 @@ groups() -> all_tests() -> [ declare_args, - declare_invalid_args, declare_invalid_properties, declare_server_named, start_queue, @@ -338,47 +337,6 @@ declare_invalid_properties(Config) -> durable = false, arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]})). -declare_invalid_args(Config) -> - Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - LQ = ?config(queue_name, Config), - - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-message-ttl">>, long, 2000}])), - - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-max-priority">>, long, 2000}])), - - [?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-overflow">>, longstr, XOverflow}])) - || XOverflow <- [<<"reject-publish-dlx">>]], - - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-queue-mode">>, longstr, <<"lazy">>}])), - - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-quorum-initial-group-size">>, longstr, <<"hop">>}])), - - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-quorum-initial-group-size">>, long, 0}])). - declare_server_named(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), diff --git a/test/quorum_queue_utils.erl b/test/quorum_queue_utils.erl index caabd617ae..923ed0bb00 100644 --- a/test/quorum_queue_utils.erl +++ b/test/quorum_queue_utils.erl @@ -53,7 +53,7 @@ wait_for_messages(Servers, QName, Number, Fun, N) -> end. wait_for_messages(Config, Stats) -> - wait_for_messages(Config, lists:sort(Stats), 10). + wait_for_messages(Config, lists:sort(Stats), 60). wait_for_messages(Config, Stats, 0) -> ?assertEqual(Stats, diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl index d6275eae17..c2157d9447 100644 --- a/test/rabbit_stream_queue_SUITE.erl +++ b/test/rabbit_stream_queue_SUITE.erl @@ -63,7 +63,6 @@ all_tests() -> [ declare_args, declare_max_age, - declare_invalid_args, declare_invalid_properties, declare_server_named, declare_queue, @@ -230,52 +229,6 @@ declare_invalid_properties(Config) -> durable = false, arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]})). -declare_invalid_args(Config) -> - Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - Q = ?config(queue_name, Config), - - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-expires">>, long, 2000}])), - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-message-ttl">>, long, 2000}])), - - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-max-priority">>, long, 2000}])), - - [?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-overflow">>, longstr, XOverflow}])) - || XOverflow <- [<<"reject-publish">>, <<"reject-publish-dlx">>]], - - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-queue-mode">>, longstr, <<"lazy">>}])), - - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-quorum-initial-group-size">>, longstr, <<"hop">>}])), - - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-queue-leader-locator">>, longstr, <<"hop">>}])). - declare_server_named(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), @@ -1118,6 +1071,8 @@ max_age(Config) -> [publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)], amqp_channel:wait_for_confirms(Ch, 5000), + timer:sleep(5000), + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), qos(Ch1, 200, false), subscribe(Ch1, Q, false, 0), @@ -1154,7 +1109,11 @@ leader_failover(Config) -> ok = rabbit_ct_broker_helpers:start_node(Config, Server1). initial_cluster_size_one(Config) -> +<<<<<<< HEAD [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), +======= + [Server1, _Server2, _Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), +>>>>>>> master Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), Q = ?config(queue_name, Config), @@ -1168,7 +1127,11 @@ initial_cluster_size_one(Config) -> amqp_channel:call(Ch, #'queue.delete'{queue = Q})). initial_cluster_size_two(Config) -> +<<<<<<< HEAD [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), +======= + [Server1, _Server2, _Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), +>>>>>>> master Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), Q = ?config(queue_name, Config), diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl index 63b56fa009..59f2b6e83d 100644 --- a/test/single_active_consumer_SUITE.erl +++ b/test/single_active_consumer_SUITE.erl @@ -83,7 +83,8 @@ init_per_group(quorum_queue, Config) -> end_per_group(_, Config) -> Config. -init_per_testcase(Testcase, Config) -> +init_per_testcase(Testcase, Config0) -> + Config = [{queue_name, atom_to_binary(Testcase, utf8)} | Config0], rabbit_ct_helpers:testcase_started(Config, Testcase). end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). @@ -290,7 +291,9 @@ connection_and_channel(Config) -> {C, Ch}. queue_declare(Channel, Config) -> - Declare = ?config(single_active_consumer_queue_declare, Config), + QueueName = ?config(queue_name, Config), + Declare0 = ?config(single_active_consumer_queue_declare, Config), + Declare = Declare0#'queue.declare'{queue = QueueName}, #'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel, Declare), Q. |