summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPéter Gömöri <peter@84codes.com>2022-10-17 19:49:49 +0200
committerMergify <37929162+mergify[bot]@users.noreply.github.com>2022-10-25 08:12:57 +0000
commit601e751a6fa710ec443674caec8e827c86bc8761 (patch)
treecd375f38683dcd2ce9c408a1832da6f4a06fe3e8
parentf2fb078163c1c8938dcf927e20bf4c045224125f (diff)
downloadrabbitmq-server-git-601e751a6fa710ec443674caec8e827c86bc8761.tar.gz
Optimise shovel pending queue handling for flow-control
The pending queue can grow up to prefetch count (which is 1000 by default and can be more) while it is forwarded in chunks every time a bump_credit grants more credit to the shovel process (which is 200 by default). This change avoids trying to forward all pending entries just to put back most of them in the pending queue. (cherry picked from commit 68e931ac2e79cfeadcf6eeeb6c74d113cbb7fbe5)
-rw-r--r--deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl47
1 files changed, 33 insertions, 14 deletions
diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
index f11de46f50..8e4f3f227e 100644
--- a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
+++ b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
@@ -154,6 +154,24 @@ dest_endpoint(#{dest := Dest}) ->
Keys = [dest_exchange, dest_exchange_key, dest_queue],
maps:to_list(maps:filter(fun(K, _) -> proplists:is_defined(K, Keys) end, Dest)).
+forward_pending(State) ->
+ case pop_pending(State) of
+ empty ->
+ State;
+ {{Tag, Props, Payload}, S} ->
+ S2 = do_forward(Tag, Props, Payload, S),
+ S3 = control_throttle(S2),
+ case is_blocked(S3) of
+ true ->
+ %% We are blocked by client-side flow-control and/or
+ %% `connection.blocked` message from the destination
+ %% broker. Stop forwarding pending messages.
+ S3;
+ false ->
+ forward_pending(S3)
+ end
+ end.
+
forward(IncomingTag, Props, Payload, State) ->
State1 = control_throttle(State),
case is_blocked(State1) of
@@ -276,19 +294,15 @@ handle_dest(#'connection.blocked'{}, State) ->
update_blocked_by(connection_blocked, true, State);
handle_dest(#'connection.unblocked'{}, State) ->
- {Pending, State1} = reset_pending(update_blocked_by(connection_blocked, false, State)),
+ State1 = update_blocked_by(connection_blocked, false, State),
%% we are unblocked so can begin to forward
- lists:foldl(fun ({Tag, Props, Payload}, S) ->
- forward(Tag, Props, Payload, S)
- end, State1, lists:reverse(Pending));
+ forward_pending(State1);
handle_dest({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
- {Pending, State1} = reset_pending(control_throttle(State)),
+ State1 = control_throttle(State),
%% we have credit so can begin to forward
- lists:foldl(fun ({Tag, Props, Payload}, S) ->
- forward(Tag, Props, Payload, S)
- end, State1, lists:reverse(Pending));
+ forward_pending(State1);
handle_dest(_Msg, _State) ->
not_handled.
@@ -369,12 +383,17 @@ is_blocked(_) ->
false.
add_pending(Elem, State = #{dest := Dest}) ->
- Pending = maps:get(pending, Dest, []),
- State#{dest => Dest#{pending => [Elem|Pending]}}.
-
-reset_pending(State = #{dest := Dest}) ->
- Pending = maps:get(pending, Dest, []),
- {Pending, State#{dest => Dest#{pending => []}}}.
+ Pending = maps:get(pending, Dest, queue:new()),
+ State#{dest => Dest#{pending => queue:in(Elem, Pending)}}.
+
+pop_pending(State = #{dest := Dest}) ->
+ Pending = maps:get(pending, Dest, queue:new()),
+ case queue:out(Pending) of
+ {empty, _} ->
+ empty;
+ {{value, Elem}, Pending2} ->
+ {Elem, State#{dest => Dest#{pending => Pending2}}}
+ end.
make_conn_and_chan([], {VHost, Name} = _ShovelName) ->
rabbit_log:error(