summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPéter Gömöri <peter@84codes.com>2022-09-16 00:49:15 +0200
committerMergify <37929162+mergify[bot]@users.noreply.github.com>2022-10-25 08:12:56 +0000
commitf2fb078163c1c8938dcf927e20bf4c045224125f (patch)
tree3a7bec6353cdab79c63ecc789fa2f4cdc1e1e8a3
parent858ca37f4ccde003498687df0cec97eab5df4270 (diff)
downloadrabbitmq-server-git-f2fb078163c1c8938dcf927e20bf4c045224125f.tar.gz
Shovel: handle `connection.(un)blocked` messages from dest broker
Also rework shovel credit_flow testcase to be more deterministic. (cherry picked from commit 836dfc4700386a6ea4b008c174b57734f2f0e586)
-rw-r--r--deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl11
-rw-r--r--deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl6
-rw-r--r--deps/rabbitmq_shovel/test/dynamic_SUITE.erl297
-rw-r--r--deps/rabbitmq_shovel/test/shovel_test_utils.erl13
4 files changed, 287 insertions, 40 deletions
diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
index d836c11d38..f11de46f50 100644
--- a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
+++ b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
@@ -119,6 +119,7 @@ init_dest(Conf = #{ack_mode := AckMode,
_ ->
ok
end,
+ amqp_connection:register_blocked_handler(Conn, self()),
Conf#{dest => Dst#{unacked => #{}}}.
ack(Tag, Multi, State = #{source := #{current := {_, Chan, _}}}) ->
@@ -271,6 +272,16 @@ handle_dest({'EXIT', Conn, Reason}, #{dest := #{current := {Conn, _, _}}}) ->
handle_dest({'EXIT', _Pid, {shutdown, {server_initiated_close, ?PRECONDITION_FAILED, Reason}}}, _State) ->
{stop, {outbound_link_or_channel_closure, Reason}};
+handle_dest(#'connection.blocked'{}, State) ->
+ update_blocked_by(connection_blocked, true, State);
+
+handle_dest(#'connection.unblocked'{}, State) ->
+ {Pending, State1} = reset_pending(update_blocked_by(connection_blocked, false, State)),
+ %% we are unblocked so can begin to forward
+ lists:foldl(fun ({Tag, Props, Payload}, S) ->
+ forward(Tag, Props, Payload, S)
+ end, State1, lists:reverse(Pending));
+
handle_dest({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
{Pending, State1} = reset_pending(control_throttle(State)),
diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl
index 57c81b9821..27461be3ce 100644
--- a/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl
+++ b/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl
@@ -13,7 +13,8 @@
code_change/3]).
%% for testing purposes
--export([get_connection_name/1]).
+-export([get_connection_name/1,
+ get_internal_config/1]).
-include("rabbit_shovel.hrl").
@@ -244,3 +245,6 @@ get_connection_name(_) ->
close_connections(#state{config = Conf}) ->
ok = rabbit_shovel_behaviour:close_source(Conf),
ok = rabbit_shovel_behaviour:close_dest(Conf).
+
+get_internal_config(#state{config = Conf}) ->
+ Conf.
diff --git a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl
index 3881af992a..1d7a797872 100644
--- a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl
+++ b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl
@@ -12,6 +12,8 @@
-compile(export_all).
+-export([spawn_suspender_proc/1]).
+
all() ->
[
{group, core_tests},
@@ -35,7 +37,10 @@ groups() ->
validation,
security_validation,
get_connection_name,
- credit_flow
+ credit_flow,
+ dest_resource_alarm_on_confirm,
+ dest_resource_alarm_on_publish,
+ dest_resource_alarm_no_ack
]},
{quorum_queue_tests, [], [
@@ -445,60 +450,204 @@ credit_flow(Config) ->
with_ch(Config,
fun (Ch) ->
+ try
+ shovel_test_utils:set_param_nowait(
+ Config,
+ <<"test">>, [{<<"src-queue">>, <<"src">>},
+ {<<"dest-queue">>, <<"dest">>},
+ {<<"src-prefetch-count">>, 50},
+ {<<"ack-mode">>, <<"on-publish">>},
+ {<<"src-delete-after">>, <<"never">>}]),
+ shovel_test_utils:await_shovel(Config, <<"test">>),
+
+ ShovelPid = find_shovel_pid(Config),
+ #{dest :=
+ #{current :=
+ {_DestConn, DestChan, _DestUri}}} =
+ get_shovel_state(ShovelPid),
+ WriterPid = find_writer_pid_for_channel(Config, DestChan),
+
+ %% When the broker-side channel is blocked by flow
+ %% control, it stops reading from the tcp
+ %% socket. After all the OS, BEAM and process buffers
+ %% are full, gen_tcp:send/2 will block the writer
+ %% process. Simulate this by suspending the writer process.
+ true = suspend_process(Config, WriterPid),
+
+ %% Publish 1000 messages to the src queue
+ amqp_channel:call(Ch, #'confirm.select'{}),
+ publish_count(Ch, <<>>, <<"src">>, <<"hello">>, 1000),
+ amqp_channel:wait_for_confirms(Ch),
+
+ %% Wait until the shovel is blocked
+ shovel_test_utils:await(
+ fun() ->
+ case get_shovel_state(ShovelPid) of
+ #{dest := #{blocked_by := [flow]}} -> true;
+ Conf -> Conf
+ end
+ end,
+ 5000),
+
+ %% There should be only one process with a message buildup
+ [{WriterPid, MQLen, _}, {_, 0, _} | _] =
+ rabbit_ct_broker_helpers:rpc(
+ Config, 0, recon, proc_count, [message_queue_len, 10]),
+
+ %% The writer process should have only a limited
+ %% message queue. The shovel stops sending messages
+ %% when the channel and shovel process used up all
+ %% their initial credit (that is 20 + 20).
+ 2 * 20 = MQLen = proc_info(WriterPid, message_queue_len),
+
+ %% Most messages should still be in the queue either ready or unacked
+ ExpDestCnt = 0,
+ #{messages := ExpDestCnt} = message_count(Config, <<"dest">>),
+ ExpSrcCnt = 1000 - MQLen,
+ #{messages := ExpSrcCnt,
+ messages_unacknowledged := 50} = message_count(Config, <<"src">>),
+
+ %% After the writer process is resumed all messages
+ %% should be shoveled to the dest queue, and process
+ %% message queues should be empty
+ resume_process(Config),
+
+ shovel_test_utils:await(
+ fun() ->
+ #{messages := Cnt} = message_count(Config, <<"src">>),
+ Cnt =:= 0
+ end,
+ 5000),
+ #{messages := 1000} = message_count(Config, <<"dest">>),
+ [{_, 0, _}] =
+ rabbit_ct_broker_helpers:rpc(
+ Config, 0, recon, proc_count, [message_queue_len, 1])
+
+ after
+ resume_process(Config),
+ set_default_credit(Config, OrigCredit)
+ end
+ end).
+
+dest_resource_alarm_on_confirm(Config) ->
+ dest_resource_alarm(<<"on-confirm">>, Config).
+
+dest_resource_alarm_on_publish(Config) ->
+ dest_resource_alarm(<<"on-publish">>, Config).
+
+dest_resource_alarm_no_ack(Config) ->
+ dest_resource_alarm(<<"no-ack">>, Config).
+
+dest_resource_alarm(AckMode, Config) ->
+ with_ch(Config,
+ fun (Ch) ->
amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:call(Ch, #'queue.declare'{queue = <<"src">>}),
- %% Send larger payloads to fill up the socket buffers quicker
- Payload = binary:copy(<<"hello">>, 1000),
- publish_count(Ch, <<>>, <<"src">>, Payload, 1000),
- amqp_channel:wait_for_confirms(Ch),
+ publish(Ch, <<>>, <<"src">>, <<"hello">>),
+ amqp_channel:call(Ch, #'queue.declare'{queue = <<"temp">>}),
+ publish_count(Ch, <<>>, <<"temp">>, <<"hello">>, 1000),
+ true = amqp_channel:wait_for_confirms(Ch),
+
+ #{messages := 1} = message_count(Config, <<"src">>),
+ %%#{messages := 0} = message_count(Config, <<"dest">>),
+ %% A resource alarm will block publishing connections
OrigLimit = set_vm_memory_high_watermark(Config, 0.00000001),
%% Let connection block.
timer:sleep(100),
try
- shovel_test_utils:set_param_nowait(
+ shovel_test_utils:set_param(
Config,
<<"test">>, [{<<"src-queue">>, <<"src">>},
{<<"dest-queue">>, <<"dest">>},
{<<"src-prefetch-count">>, 50},
- {<<"ack-mode">>, <<"on-publish">>},
+ {<<"ack-mode">>, AckMode},
{<<"src-delete-after">>, <<"never">>}]),
- shovel_test_utils:await_shovel(Config, <<"test">>),
- %% There should be only one process with a message buildup
- [{WriterPid, MQLen, _}, {_, 0, _}] =
+ %% The shovel is blocked
+ ShovelPid = find_shovel_pid(Config),
+ Conf = get_shovel_state(ShovelPid),
+ #{dest := #{blocked_by := [connection_blocked]}} = Conf,
+
+ %% The shoveled message triggered a
+ %% connection.blocked notification, but hasn't
+ %% reached the dest queue because of the resource
+ %% alarm
+ InitialMsgCnt =
+ case AckMode of
+ <<"on-confirm">> -> 1;
+ _ -> 0
+ end,
+
+ #{messages := InitialMsgCnt,
+ messages_unacknowledged := InitialMsgCnt} = message_count(Config, <<"src">>),
+ #{messages := 0} = message_count(Config, <<"dest">>),
+
+ %% Now publish messages to "src" queue
+ %% (network connections are blocked from publishing
+ %% so we use a temporary shovel with direct
+ %% connections to populate "src" queue with messages
+ %% from the "temp" queue)
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config, 0,
+ rabbit_runtime_parameters, set,
+ [
+ <<"/">>, <<"shovel">>, <<"temp">>,
+ [{<<"src-uri">>, <<"amqp://">>},
+ {<<"dest-uri">>, [<<"amqp://">>]},
+ {<<"src-queue">>, <<"temp">>},
+ {<<"dest-queue">>, <<"src">>},
+ {<<"src-delete-after">>, <<"queue-length">>}], none]),
+ shovel_test_utils:await(
+ fun() ->
+ #{messages := Cnt} = message_count(Config, <<"temp">>),
+ Cnt =:= 0
+ end,
+ 5000),
+
+ %% No messages reached the dest queue
+ #{messages := 0} = message_count(Config, <<"dest">>),
+
+ %% When the shovel sets a prefetch_count
+ %% (on-confirm/on-publish mode), all messages are in
+ %% the source queue, prefrech count are
+ %% unacknowledged and buffered in the shovel
+ MsgCnts =
+ case AckMode of
+ <<"on-confirm">> ->
+ #{messages => 1001,
+ messages_unacknowledged => 50};
+ <<"on-publish">> ->
+ #{messages => 1000,
+ messages_unacknowledged => 50};
+ <<"no-ack">> ->
+ %% no prefetch limit, all messages are
+ %% buffered in the shovel
+ #{messages => 0,
+ messages_unacknowledged => 0}
+ end,
+
+ MsgCnts = message_count(Config, <<"src">>),
+
+ %% There should be no process with a message buildup
+ [{_, 0, _}] =
rabbit_ct_broker_helpers:rpc(
- Config, 0, recon, proc_count, [message_queue_len, 2]),
-
- %% The writer process should have only a limited message queue,
- %% but it is hard to exactly know how long.
- %% (There are some `inet_reply' messages from the
- %% inet driver, and some messages from the channel,
- %% we estimate the later to be less than double the
- %% initial credit)
- {messages, Msgs} = rabbit_ct_broker_helpers:rpc(
- Config, 0, erlang, process_info, [WriterPid, messages]),
- CmdLen = length([Msg || Msg <- Msgs,
- element(1, Msg) =:= send_command_flow]),
- case {writer_msg_queue_len, CmdLen, MQLen} of
- _ when CmdLen < 2 * 20 -> ok
- end,
-
- ExpDest = 0,
- #'queue.declare_ok'{message_count = ExpDest} =
- amqp_channel:call(Ch, #'queue.declare'{queue = <<"dest">>,
- durable = true}),
- #'queue.declare_ok'{message_count = SrcCnt} =
- amqp_channel:call(Ch, #'queue.declare'{queue = <<"src">>}),
+ Config, 0, recon, proc_count, [message_queue_len, 1]),
- %% Most messages should still be in the queue either ready or unacked
- case {src_queue_message_count, SrcCnt} of
- _ when 0 < SrcCnt andalso SrcCnt < 1000 - MQLen -> ok
- end
- after
+ %% Clear the resource alarm, all messages should
+ %% arrive to the dest queue
set_vm_memory_high_watermark(Config, OrigLimit),
- set_default_credit(Config, OrigCredit)
+
+ catch shovel_test_utils:await(
+ fun() ->
+ #{messages := Cnt} = message_count(Config, <<"dest">>),
+ Cnt =:= 1001
+ end,
+ 5000),
+ #{messages := 0} = message_count(Config, <<"src">>)
+ after
+ set_vm_memory_high_watermark(Config, OrigLimit)
end
end).
@@ -621,3 +770,75 @@ set_vm_memory_high_watermark(Config, Limit) ->
rabbit_ct_broker_helpers:rpc(
Config, 0, vm_memory_monitor, set_vm_memory_high_watermark, [Limit]),
OrigLimit.
+
+message_count(Config, QueueName) ->
+ Resource = rabbit_misc:r(<<"/">>, queue, QueueName),
+ {ok, Q} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [Resource]),
+ maps:from_list(
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, info,
+ [Q, [messages, messages_unacknowledged]])).
+
+%% A process can be only suspended by another process on the same node
+suspend_process(Config, Pid) ->
+ true = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, spawn_suspender_proc, [Pid]),
+ suspended = proc_info(Pid, status),
+ true.
+
+%% When the suspender process terminates, the suspended process is also resumed
+resume_process(Config) ->
+ case rabbit_ct_broker_helpers:rpc(Config, 0, erlang, whereis, [suspender]) of
+ undefined ->
+ false;
+ SusPid ->
+ exit(SusPid, kill)
+ end.
+
+spawn_suspender_proc(Pid) ->
+ undefined = whereis(suspender),
+ ReqPid = self(),
+ SusPid =
+ spawn(
+ fun() ->
+ register(suspender, self()),
+ Res = catch (true = erlang:suspend_process(Pid)),
+ ReqPid ! {suspend_res, self(), Res},
+ %% wait indefinitely
+ receive stop -> ok end
+ end),
+ receive
+ {suspend_res, SusPid, Res} -> Res
+ after
+ 5000 -> timeout
+ end.
+
+find_shovel_pid(Config) ->
+ [ShovelPid] = [P || P <- rabbit_ct_broker_helpers:rpc(
+ Config, 0, erlang, processes, []),
+ rabbit_shovel_worker ==
+ (catch element(1, erpc:call(node(P), proc_lib, initial_call, [P])))],
+ ShovelPid.
+
+get_shovel_state(ShovelPid) ->
+ gen_server2:with_state(ShovelPid, fun rabbit_shovel_worker:get_internal_config/1).
+
+find_writer_pid_for_channel(Config, ChanPid) ->
+ {amqp_channel, ChanName} = process_name(ChanPid),
+ [WriterPid] = [P || P <- rabbit_ct_broker_helpers:rpc(
+ Config, 0, erlang, processes, []),
+ {rabbit_writer, ChanName} == process_name(P)],
+ WriterPid.
+
+process_name(Pid) ->
+ try proc_info(Pid, dictionary) of
+ Dict ->
+ proplists:get_value(process_name, Dict)
+ catch _:_ ->
+ undefined
+ end.
+
+proc_info(Pid) ->
+ erpc:call(node(Pid), erlang, process_info, [Pid]).
+
+proc_info(Pid, Item) ->
+ {Item, Value} = erpc:call(node(Pid), erlang, process_info, [Pid, Item]),
+ Value.
diff --git a/deps/rabbitmq_shovel/test/shovel_test_utils.erl b/deps/rabbitmq_shovel/test/shovel_test_utils.erl
index 5a93ade264..b57b7af406 100644
--- a/deps/rabbitmq_shovel/test/shovel_test_utils.erl
+++ b/deps/rabbitmq_shovel/test/shovel_test_utils.erl
@@ -9,7 +9,7 @@
-include_lib("common_test/include/ct.hrl").
-export([set_param/3, set_param_nowait/3, await_shovel/2, await_shovel1/2,
- shovels_from_status/0, await/1, clear_param/2]).
+ shovels_from_status/0, await/1, await/2, clear_param/2]).
make_uri(Config) ->
Hostname = ?config(rmq_hostname, Config),
@@ -46,6 +46,17 @@ await(Pred) ->
await(Pred)
end.
+await(_Pred, Timeout) when Timeout =< 0 ->
+ error(await_timeout);
+await(Pred, Timeout) ->
+ case Pred() of
+ true -> ok;
+ Other when Timeout =< 100 ->
+ error({await_timeout, Other});
+ _ -> timer:sleep(100),
+ await(Pred, Timeout - 100)
+ end.
+
clear_param(Config, Name) ->
rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_runtime_parameters, clear, [<<"/">>, <<"shovel">>, Name, <<"acting-user">>]).