diff options
author | ANycz <aleksander.nycz@comarch.com> | 2019-12-11 23:11:15 +0100 |
---|---|---|
committer | ANycz <aleksander.nycz@comarch.com> | 2019-12-11 23:11:15 +0100 |
commit | 3daa6be55cb81d86da0bddad39a06378c3b5aeba (patch) | |
tree | c567c90fb6fe26ba0adefcab266bdb294714a7a3 | |
parent | 178757932e65704abd95a1bf64c460956a9391fa (diff) | |
download | rabbitmq-server-git-3daa6be55cb81d86da0bddad39a06378c3b5aeba.tar.gz |
make garbage collector threshold configurable
-rw-r--r-- | Makefile | 4 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 8 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 22 |
3 files changed, 24 insertions, 10 deletions
@@ -129,7 +129,9 @@ define PROJECT_ENV %% interval at which the channel can perform periodic actions {channel_tick_interval, 60000}, %% Default max message size is 128 MB - {max_message_size, 134217728} + {max_message_size, 134217728}, + %% Default is ~ 1MB + {gc_threshold, 1000000} ] endef diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 42b76218df..09a80eb8b5 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -22,7 +22,8 @@ message/3, message/4, properties/1, prepend_table_header/3, extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4, header_routes/1, parse_expiration/1, header/2, header/3]). --export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]). +-export([build_content/2, from_content/1, msg_size/1, + maybe_gc_large_msg/1, maybe_gc_large_msg/2]). -export([add_header/4]). %%---------------------------------------------------------------------------- @@ -311,6 +312,11 @@ parse_expiration(#'P_basic'{expiration = Expiration}) -> maybe_gc_large_msg(Content) -> rabbit_writer:maybe_gc_large_msg(Content). +maybe_gc_large_msg(Content, undefined) -> + rabbit_writer:msg_size(Content); +maybe_gc_large_msg(Content, GCThreshold) -> + rabbit_writer:maybe_gc_large_msg(Content, GCThreshold). + msg_size(Content) -> rabbit_writer:msg_size(Content). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5abafcbde9..b1f22d48c2 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -169,7 +169,9 @@ delivery_flow, interceptor_state, queue_states, - tick_timer + tick_timer, + %% defines how ofter gc will be executed + gc_threshold }). -define(QUEUE, lqueue). @@ -508,6 +510,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, MaxMessageSize = get_max_message_size(), ConsumerTimeout = get_consumer_timeout(), OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams), + {ok, GCThreshold} = application:get_env(rabbit, gc_threshold), State = #ch{cfg = #conf{state = starting, protocol = Protocol, channel = Channel, @@ -543,7 +546,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, reply_consumer = none, delivery_flow = Flow, interceptor_state = undefined, - queue_states = #{} + queue_states = #{}, + gc_threshold = GCThreshold }, State1 = State#ch{ interceptor_state = rabbit_channel_interceptor:init(State)}, @@ -1110,8 +1114,8 @@ extract_variable_map_from_amqp_params([Value]) -> extract_variable_map_from_amqp_params(_) -> #{}. -check_msg_size(Content, MaxMessageSize) -> - Size = rabbit_basic:maybe_gc_large_msg(Content), +check_msg_size(Content, MaxMessageSize, GCThreshold) -> + Size = rabbit_basic:maybe_gc_large_msg(Content, GCThreshold), case Size of S when S > MaxMessageSize -> ErrorMessage = case MaxMessageSize of @@ -1309,9 +1313,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, }, tx = Tx, confirm_enabled = ConfirmEnabled, - delivery_flow = Flow + delivery_flow = Flow, + gc_threshold = GCThreshold }) -> - check_msg_size(Content, MaxMessageSize), + check_msg_size(Content, MaxMessageSize, GCThreshold), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, User, AuthzContext), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), @@ -2689,7 +2694,8 @@ handle_deliver(ConsumerTag, AckRequired, routing_keys = [RoutingKey | _CcRoutes], content = Content}}, State = #ch{cfg = #conf{writer_pid = WriterPid}, - next_tag = DeliveryTag}) -> + next_tag = DeliveryTag, + gc_threshold = GCThreshold}) -> Deliver = #'basic.deliver'{consumer_tag = ConsumerTag, delivery_tag = DeliveryTag, redelivered = Redelivered, @@ -2702,7 +2708,7 @@ handle_deliver(ConsumerTag, AckRequired, false -> ok = rabbit_writer:send_command(WriterPid, Deliver, Content) end, - rabbit_basic:maybe_gc_large_msg(Content), + rabbit_basic:maybe_gc_large_msg(Content, GCThreshold), record_sent(deliver, ConsumerTag, AckRequired, Msg, State). handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount, |