diff options
author | Michael Klishin <klishinm@vmware.com> | 2022-10-25 13:57:47 +0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-25 13:57:47 +0400 |
commit | bdbed63164ef517c5b1eec08597eafca326f96f7 (patch) | |
tree | cd375f38683dcd2ce9c408a1832da6f4a06fe3e8 | |
parent | 858ca37f4ccde003498687df0cec97eab5df4270 (diff) | |
parent | 601e751a6fa710ec443674caec8e827c86bc8761 (diff) | |
download | rabbitmq-server-git-bdbed63164ef517c5b1eec08597eafca326f96f7.tar.gz |
Merge pull request #6235 from rabbitmq/mergify/bp/v3.11.x/pr-6224
Shovel: handle `connection.(un)blocked` messages (backport #6224)
-rw-r--r-- | deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl | 50 | ||||
-rw-r--r-- | deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl | 6 | ||||
-rw-r--r-- | deps/rabbitmq_shovel/test/dynamic_SUITE.erl | 297 | ||||
-rw-r--r-- | deps/rabbitmq_shovel/test/shovel_test_utils.erl | 13 |
4 files changed, 316 insertions, 50 deletions
diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl index d836c11d38..8e4f3f227e 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl @@ -119,6 +119,7 @@ init_dest(Conf = #{ack_mode := AckMode, _ -> ok end, + amqp_connection:register_blocked_handler(Conn, self()), Conf#{dest => Dst#{unacked => #{}}}. ack(Tag, Multi, State = #{source := #{current := {_, Chan, _}}}) -> @@ -153,6 +154,24 @@ dest_endpoint(#{dest := Dest}) -> Keys = [dest_exchange, dest_exchange_key, dest_queue], maps:to_list(maps:filter(fun(K, _) -> proplists:is_defined(K, Keys) end, Dest)). +forward_pending(State) -> + case pop_pending(State) of + empty -> + State; + {{Tag, Props, Payload}, S} -> + S2 = do_forward(Tag, Props, Payload, S), + S3 = control_throttle(S2), + case is_blocked(S3) of + true -> + %% We are blocked by client-side flow-control and/or + %% `connection.blocked` message from the destination + %% broker. Stop forwarding pending messages. + S3; + false -> + forward_pending(S3) + end + end. + forward(IncomingTag, Props, Payload, State) -> State1 = control_throttle(State), case is_blocked(State1) of @@ -271,13 +290,19 @@ handle_dest({'EXIT', Conn, Reason}, #{dest := #{current := {Conn, _, _}}}) -> handle_dest({'EXIT', _Pid, {shutdown, {server_initiated_close, ?PRECONDITION_FAILED, Reason}}}, _State) -> {stop, {outbound_link_or_channel_closure, Reason}}; +handle_dest(#'connection.blocked'{}, State) -> + update_blocked_by(connection_blocked, true, State); + +handle_dest(#'connection.unblocked'{}, State) -> + State1 = update_blocked_by(connection_blocked, false, State), + %% we are unblocked so can begin to forward + forward_pending(State1); + handle_dest({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), - {Pending, State1} = reset_pending(control_throttle(State)), + State1 = control_throttle(State), %% we have credit so can begin to forward - lists:foldl(fun ({Tag, Props, Payload}, S) -> - forward(Tag, Props, Payload, S) - end, State1, lists:reverse(Pending)); + forward_pending(State1); handle_dest(_Msg, _State) -> not_handled. @@ -358,12 +383,17 @@ is_blocked(_) -> false. add_pending(Elem, State = #{dest := Dest}) -> - Pending = maps:get(pending, Dest, []), - State#{dest => Dest#{pending => [Elem|Pending]}}. - -reset_pending(State = #{dest := Dest}) -> - Pending = maps:get(pending, Dest, []), - {Pending, State#{dest => Dest#{pending => []}}}. + Pending = maps:get(pending, Dest, queue:new()), + State#{dest => Dest#{pending => queue:in(Elem, Pending)}}. + +pop_pending(State = #{dest := Dest}) -> + Pending = maps:get(pending, Dest, queue:new()), + case queue:out(Pending) of + {empty, _} -> + empty; + {{value, Elem}, Pending2} -> + {Elem, State#{dest => Dest#{pending => Pending2}}} + end. make_conn_and_chan([], {VHost, Name} = _ShovelName) -> rabbit_log:error( diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl index 57c81b9821..27461be3ce 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl @@ -13,7 +13,8 @@ code_change/3]). %% for testing purposes --export([get_connection_name/1]). +-export([get_connection_name/1, + get_internal_config/1]). -include("rabbit_shovel.hrl"). @@ -244,3 +245,6 @@ get_connection_name(_) -> close_connections(#state{config = Conf}) -> ok = rabbit_shovel_behaviour:close_source(Conf), ok = rabbit_shovel_behaviour:close_dest(Conf). + +get_internal_config(#state{config = Conf}) -> + Conf. diff --git a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl index 3881af992a..1d7a797872 100644 --- a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl @@ -12,6 +12,8 @@ -compile(export_all). +-export([spawn_suspender_proc/1]). + all() -> [ {group, core_tests}, @@ -35,7 +37,10 @@ groups() -> validation, security_validation, get_connection_name, - credit_flow + credit_flow, + dest_resource_alarm_on_confirm, + dest_resource_alarm_on_publish, + dest_resource_alarm_no_ack ]}, {quorum_queue_tests, [], [ @@ -445,60 +450,204 @@ credit_flow(Config) -> with_ch(Config, fun (Ch) -> + try + shovel_test_utils:set_param_nowait( + Config, + <<"test">>, [{<<"src-queue">>, <<"src">>}, + {<<"dest-queue">>, <<"dest">>}, + {<<"src-prefetch-count">>, 50}, + {<<"ack-mode">>, <<"on-publish">>}, + {<<"src-delete-after">>, <<"never">>}]), + shovel_test_utils:await_shovel(Config, <<"test">>), + + ShovelPid = find_shovel_pid(Config), + #{dest := + #{current := + {_DestConn, DestChan, _DestUri}}} = + get_shovel_state(ShovelPid), + WriterPid = find_writer_pid_for_channel(Config, DestChan), + + %% When the broker-side channel is blocked by flow + %% control, it stops reading from the tcp + %% socket. After all the OS, BEAM and process buffers + %% are full, gen_tcp:send/2 will block the writer + %% process. Simulate this by suspending the writer process. + true = suspend_process(Config, WriterPid), + + %% Publish 1000 messages to the src queue + amqp_channel:call(Ch, #'confirm.select'{}), + publish_count(Ch, <<>>, <<"src">>, <<"hello">>, 1000), + amqp_channel:wait_for_confirms(Ch), + + %% Wait until the shovel is blocked + shovel_test_utils:await( + fun() -> + case get_shovel_state(ShovelPid) of + #{dest := #{blocked_by := [flow]}} -> true; + Conf -> Conf + end + end, + 5000), + + %% There should be only one process with a message buildup + [{WriterPid, MQLen, _}, {_, 0, _} | _] = + rabbit_ct_broker_helpers:rpc( + Config, 0, recon, proc_count, [message_queue_len, 10]), + + %% The writer process should have only a limited + %% message queue. The shovel stops sending messages + %% when the channel and shovel process used up all + %% their initial credit (that is 20 + 20). + 2 * 20 = MQLen = proc_info(WriterPid, message_queue_len), + + %% Most messages should still be in the queue either ready or unacked + ExpDestCnt = 0, + #{messages := ExpDestCnt} = message_count(Config, <<"dest">>), + ExpSrcCnt = 1000 - MQLen, + #{messages := ExpSrcCnt, + messages_unacknowledged := 50} = message_count(Config, <<"src">>), + + %% After the writer process is resumed all messages + %% should be shoveled to the dest queue, and process + %% message queues should be empty + resume_process(Config), + + shovel_test_utils:await( + fun() -> + #{messages := Cnt} = message_count(Config, <<"src">>), + Cnt =:= 0 + end, + 5000), + #{messages := 1000} = message_count(Config, <<"dest">>), + [{_, 0, _}] = + rabbit_ct_broker_helpers:rpc( + Config, 0, recon, proc_count, [message_queue_len, 1]) + + after + resume_process(Config), + set_default_credit(Config, OrigCredit) + end + end). + +dest_resource_alarm_on_confirm(Config) -> + dest_resource_alarm(<<"on-confirm">>, Config). + +dest_resource_alarm_on_publish(Config) -> + dest_resource_alarm(<<"on-publish">>, Config). + +dest_resource_alarm_no_ack(Config) -> + dest_resource_alarm(<<"no-ack">>, Config). + +dest_resource_alarm(AckMode, Config) -> + with_ch(Config, + fun (Ch) -> amqp_channel:call(Ch, #'confirm.select'{}), amqp_channel:call(Ch, #'queue.declare'{queue = <<"src">>}), - %% Send larger payloads to fill up the socket buffers quicker - Payload = binary:copy(<<"hello">>, 1000), - publish_count(Ch, <<>>, <<"src">>, Payload, 1000), - amqp_channel:wait_for_confirms(Ch), + publish(Ch, <<>>, <<"src">>, <<"hello">>), + amqp_channel:call(Ch, #'queue.declare'{queue = <<"temp">>}), + publish_count(Ch, <<>>, <<"temp">>, <<"hello">>, 1000), + true = amqp_channel:wait_for_confirms(Ch), + + #{messages := 1} = message_count(Config, <<"src">>), + %%#{messages := 0} = message_count(Config, <<"dest">>), + %% A resource alarm will block publishing connections OrigLimit = set_vm_memory_high_watermark(Config, 0.00000001), %% Let connection block. timer:sleep(100), try - shovel_test_utils:set_param_nowait( + shovel_test_utils:set_param( Config, <<"test">>, [{<<"src-queue">>, <<"src">>}, {<<"dest-queue">>, <<"dest">>}, {<<"src-prefetch-count">>, 50}, - {<<"ack-mode">>, <<"on-publish">>}, + {<<"ack-mode">>, AckMode}, {<<"src-delete-after">>, <<"never">>}]), - shovel_test_utils:await_shovel(Config, <<"test">>), - %% There should be only one process with a message buildup - [{WriterPid, MQLen, _}, {_, 0, _}] = + %% The shovel is blocked + ShovelPid = find_shovel_pid(Config), + Conf = get_shovel_state(ShovelPid), + #{dest := #{blocked_by := [connection_blocked]}} = Conf, + + %% The shoveled message triggered a + %% connection.blocked notification, but hasn't + %% reached the dest queue because of the resource + %% alarm + InitialMsgCnt = + case AckMode of + <<"on-confirm">> -> 1; + _ -> 0 + end, + + #{messages := InitialMsgCnt, + messages_unacknowledged := InitialMsgCnt} = message_count(Config, <<"src">>), + #{messages := 0} = message_count(Config, <<"dest">>), + + %% Now publish messages to "src" queue + %% (network connections are blocked from publishing + %% so we use a temporary shovel with direct + %% connections to populate "src" queue with messages + %% from the "temp" queue) + ok = rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_runtime_parameters, set, + [ + <<"/">>, <<"shovel">>, <<"temp">>, + [{<<"src-uri">>, <<"amqp://">>}, + {<<"dest-uri">>, [<<"amqp://">>]}, + {<<"src-queue">>, <<"temp">>}, + {<<"dest-queue">>, <<"src">>}, + {<<"src-delete-after">>, <<"queue-length">>}], none]), + shovel_test_utils:await( + fun() -> + #{messages := Cnt} = message_count(Config, <<"temp">>), + Cnt =:= 0 + end, + 5000), + + %% No messages reached the dest queue + #{messages := 0} = message_count(Config, <<"dest">>), + + %% When the shovel sets a prefetch_count + %% (on-confirm/on-publish mode), all messages are in + %% the source queue, prefrech count are + %% unacknowledged and buffered in the shovel + MsgCnts = + case AckMode of + <<"on-confirm">> -> + #{messages => 1001, + messages_unacknowledged => 50}; + <<"on-publish">> -> + #{messages => 1000, + messages_unacknowledged => 50}; + <<"no-ack">> -> + %% no prefetch limit, all messages are + %% buffered in the shovel + #{messages => 0, + messages_unacknowledged => 0} + end, + + MsgCnts = message_count(Config, <<"src">>), + + %% There should be no process with a message buildup + [{_, 0, _}] = rabbit_ct_broker_helpers:rpc( - Config, 0, recon, proc_count, [message_queue_len, 2]), - - %% The writer process should have only a limited message queue, - %% but it is hard to exactly know how long. - %% (There are some `inet_reply' messages from the - %% inet driver, and some messages from the channel, - %% we estimate the later to be less than double the - %% initial credit) - {messages, Msgs} = rabbit_ct_broker_helpers:rpc( - Config, 0, erlang, process_info, [WriterPid, messages]), - CmdLen = length([Msg || Msg <- Msgs, - element(1, Msg) =:= send_command_flow]), - case {writer_msg_queue_len, CmdLen, MQLen} of - _ when CmdLen < 2 * 20 -> ok - end, - - ExpDest = 0, - #'queue.declare_ok'{message_count = ExpDest} = - amqp_channel:call(Ch, #'queue.declare'{queue = <<"dest">>, - durable = true}), - #'queue.declare_ok'{message_count = SrcCnt} = - amqp_channel:call(Ch, #'queue.declare'{queue = <<"src">>}), + Config, 0, recon, proc_count, [message_queue_len, 1]), - %% Most messages should still be in the queue either ready or unacked - case {src_queue_message_count, SrcCnt} of - _ when 0 < SrcCnt andalso SrcCnt < 1000 - MQLen -> ok - end - after + %% Clear the resource alarm, all messages should + %% arrive to the dest queue set_vm_memory_high_watermark(Config, OrigLimit), - set_default_credit(Config, OrigCredit) + + catch shovel_test_utils:await( + fun() -> + #{messages := Cnt} = message_count(Config, <<"dest">>), + Cnt =:= 1001 + end, + 5000), + #{messages := 0} = message_count(Config, <<"src">>) + after + set_vm_memory_high_watermark(Config, OrigLimit) end end). @@ -621,3 +770,75 @@ set_vm_memory_high_watermark(Config, Limit) -> rabbit_ct_broker_helpers:rpc( Config, 0, vm_memory_monitor, set_vm_memory_high_watermark, [Limit]), OrigLimit. + +message_count(Config, QueueName) -> + Resource = rabbit_misc:r(<<"/">>, queue, QueueName), + {ok, Q} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [Resource]), + maps:from_list( + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, info, + [Q, [messages, messages_unacknowledged]])). + +%% A process can be only suspended by another process on the same node +suspend_process(Config, Pid) -> + true = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, spawn_suspender_proc, [Pid]), + suspended = proc_info(Pid, status), + true. + +%% When the suspender process terminates, the suspended process is also resumed +resume_process(Config) -> + case rabbit_ct_broker_helpers:rpc(Config, 0, erlang, whereis, [suspender]) of + undefined -> + false; + SusPid -> + exit(SusPid, kill) + end. + +spawn_suspender_proc(Pid) -> + undefined = whereis(suspender), + ReqPid = self(), + SusPid = + spawn( + fun() -> + register(suspender, self()), + Res = catch (true = erlang:suspend_process(Pid)), + ReqPid ! {suspend_res, self(), Res}, + %% wait indefinitely + receive stop -> ok end + end), + receive + {suspend_res, SusPid, Res} -> Res + after + 5000 -> timeout + end. + +find_shovel_pid(Config) -> + [ShovelPid] = [P || P <- rabbit_ct_broker_helpers:rpc( + Config, 0, erlang, processes, []), + rabbit_shovel_worker == + (catch element(1, erpc:call(node(P), proc_lib, initial_call, [P])))], + ShovelPid. + +get_shovel_state(ShovelPid) -> + gen_server2:with_state(ShovelPid, fun rabbit_shovel_worker:get_internal_config/1). + +find_writer_pid_for_channel(Config, ChanPid) -> + {amqp_channel, ChanName} = process_name(ChanPid), + [WriterPid] = [P || P <- rabbit_ct_broker_helpers:rpc( + Config, 0, erlang, processes, []), + {rabbit_writer, ChanName} == process_name(P)], + WriterPid. + +process_name(Pid) -> + try proc_info(Pid, dictionary) of + Dict -> + proplists:get_value(process_name, Dict) + catch _:_ -> + undefined + end. + +proc_info(Pid) -> + erpc:call(node(Pid), erlang, process_info, [Pid]). + +proc_info(Pid, Item) -> + {Item, Value} = erpc:call(node(Pid), erlang, process_info, [Pid, Item]), + Value. diff --git a/deps/rabbitmq_shovel/test/shovel_test_utils.erl b/deps/rabbitmq_shovel/test/shovel_test_utils.erl index 5a93ade264..b57b7af406 100644 --- a/deps/rabbitmq_shovel/test/shovel_test_utils.erl +++ b/deps/rabbitmq_shovel/test/shovel_test_utils.erl @@ -9,7 +9,7 @@ -include_lib("common_test/include/ct.hrl"). -export([set_param/3, set_param_nowait/3, await_shovel/2, await_shovel1/2, - shovels_from_status/0, await/1, clear_param/2]). + shovels_from_status/0, await/1, await/2, clear_param/2]). make_uri(Config) -> Hostname = ?config(rmq_hostname, Config), @@ -46,6 +46,17 @@ await(Pred) -> await(Pred) end. +await(_Pred, Timeout) when Timeout =< 0 -> + error(await_timeout); +await(Pred, Timeout) -> + case Pred() of + true -> ok; + Other when Timeout =< 100 -> + error({await_timeout, Other}); + _ -> timer:sleep(100), + await(Pred, Timeout - 100) + end. + clear_param(Config, Name) -> rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_runtime_parameters, clear, [<<"/">>, <<"shovel">>, Name, <<"acting-user">>]). |