diff options
author | Péter Gömöri <peter@84codes.com> | 2022-10-17 19:49:49 +0200 |
---|---|---|
committer | Mergify <37929162+mergify[bot]@users.noreply.github.com> | 2022-10-25 08:12:57 +0000 |
commit | 601e751a6fa710ec443674caec8e827c86bc8761 (patch) | |
tree | cd375f38683dcd2ce9c408a1832da6f4a06fe3e8 | |
parent | f2fb078163c1c8938dcf927e20bf4c045224125f (diff) | |
download | rabbitmq-server-git-601e751a6fa710ec443674caec8e827c86bc8761.tar.gz |
Optimise shovel pending queue handling for flow-control
The pending queue can grow up to prefetch count (which is 1000 by
default and can be more) while it is forwarded in chunks every time a
bump_credit grants more credit to the shovel process (which is 200 by
default). This change avoids trying to forward all pending entries just
to put back most of them in the pending queue.
(cherry picked from commit 68e931ac2e79cfeadcf6eeeb6c74d113cbb7fbe5)
-rw-r--r-- | deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl | 47 |
1 files changed, 33 insertions, 14 deletions
diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl index f11de46f50..8e4f3f227e 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl @@ -154,6 +154,24 @@ dest_endpoint(#{dest := Dest}) -> Keys = [dest_exchange, dest_exchange_key, dest_queue], maps:to_list(maps:filter(fun(K, _) -> proplists:is_defined(K, Keys) end, Dest)). +forward_pending(State) -> + case pop_pending(State) of + empty -> + State; + {{Tag, Props, Payload}, S} -> + S2 = do_forward(Tag, Props, Payload, S), + S3 = control_throttle(S2), + case is_blocked(S3) of + true -> + %% We are blocked by client-side flow-control and/or + %% `connection.blocked` message from the destination + %% broker. Stop forwarding pending messages. + S3; + false -> + forward_pending(S3) + end + end. + forward(IncomingTag, Props, Payload, State) -> State1 = control_throttle(State), case is_blocked(State1) of @@ -276,19 +294,15 @@ handle_dest(#'connection.blocked'{}, State) -> update_blocked_by(connection_blocked, true, State); handle_dest(#'connection.unblocked'{}, State) -> - {Pending, State1} = reset_pending(update_blocked_by(connection_blocked, false, State)), + State1 = update_blocked_by(connection_blocked, false, State), %% we are unblocked so can begin to forward - lists:foldl(fun ({Tag, Props, Payload}, S) -> - forward(Tag, Props, Payload, S) - end, State1, lists:reverse(Pending)); + forward_pending(State1); handle_dest({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), - {Pending, State1} = reset_pending(control_throttle(State)), + State1 = control_throttle(State), %% we have credit so can begin to forward - lists:foldl(fun ({Tag, Props, Payload}, S) -> - forward(Tag, Props, Payload, S) - end, State1, lists:reverse(Pending)); + forward_pending(State1); handle_dest(_Msg, _State) -> not_handled. @@ -369,12 +383,17 @@ is_blocked(_) -> false. add_pending(Elem, State = #{dest := Dest}) -> - Pending = maps:get(pending, Dest, []), - State#{dest => Dest#{pending => [Elem|Pending]}}. - -reset_pending(State = #{dest := Dest}) -> - Pending = maps:get(pending, Dest, []), - {Pending, State#{dest => Dest#{pending => []}}}. + Pending = maps:get(pending, Dest, queue:new()), + State#{dest => Dest#{pending => queue:in(Elem, Pending)}}. + +pop_pending(State = #{dest := Dest}) -> + Pending = maps:get(pending, Dest, queue:new()), + case queue:out(Pending) of + {empty, _} -> + empty; + {{value, Elem}, Pending2} -> + {Elem, State#{dest => Dest#{pending => Pending2}}} + end. make_conn_and_chan([], {VHost, Name} = _ShovelName) -> rabbit_log:error( |