summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorANycz <aleksander.nycz@comarch.com>2019-12-11 23:11:15 +0100
committerANycz <aleksander.nycz@comarch.com>2019-12-11 23:11:15 +0100
commit3daa6be55cb81d86da0bddad39a06378c3b5aeba (patch)
treec567c90fb6fe26ba0adefcab266bdb294714a7a3
parent178757932e65704abd95a1bf64c460956a9391fa (diff)
downloadrabbitmq-server-git-3daa6be55cb81d86da0bddad39a06378c3b5aeba.tar.gz
make garbage collector threshold configurable
-rw-r--r--Makefile4
-rw-r--r--src/rabbit_basic.erl8
-rw-r--r--src/rabbit_channel.erl22
3 files changed, 24 insertions, 10 deletions
diff --git a/Makefile b/Makefile
index 30a2066e38..9a8a936c69 100644
--- a/Makefile
+++ b/Makefile
@@ -129,7 +129,9 @@ define PROJECT_ENV
%% interval at which the channel can perform periodic actions
{channel_tick_interval, 60000},
%% Default max message size is 128 MB
- {max_message_size, 134217728}
+ {max_message_size, 134217728},
+ %% Default is ~ 1MB
+ {gc_threshold, 1000000}
]
endef
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 42b76218df..09a80eb8b5 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -22,7 +22,8 @@
message/3, message/4, properties/1, prepend_table_header/3,
extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4,
header_routes/1, parse_expiration/1, header/2, header/3]).
--export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]).
+-export([build_content/2, from_content/1, msg_size/1,
+ maybe_gc_large_msg/1, maybe_gc_large_msg/2]).
-export([add_header/4]).
%%----------------------------------------------------------------------------
@@ -311,6 +312,11 @@ parse_expiration(#'P_basic'{expiration = Expiration}) ->
maybe_gc_large_msg(Content) ->
rabbit_writer:maybe_gc_large_msg(Content).
+maybe_gc_large_msg(Content, undefined) ->
+ rabbit_writer:msg_size(Content);
+maybe_gc_large_msg(Content, GCThreshold) ->
+ rabbit_writer:maybe_gc_large_msg(Content, GCThreshold).
+
msg_size(Content) ->
rabbit_writer:msg_size(Content).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5abafcbde9..b1f22d48c2 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -169,7 +169,9 @@
delivery_flow,
interceptor_state,
queue_states,
- tick_timer
+ tick_timer,
+ %% defines how ofter gc will be executed
+ gc_threshold
}).
-define(QUEUE, lqueue).
@@ -508,6 +510,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
MaxMessageSize = get_max_message_size(),
ConsumerTimeout = get_consumer_timeout(),
OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams),
+ {ok, GCThreshold} = application:get_env(rabbit, gc_threshold),
State = #ch{cfg = #conf{state = starting,
protocol = Protocol,
channel = Channel,
@@ -543,7 +546,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
reply_consumer = none,
delivery_flow = Flow,
interceptor_state = undefined,
- queue_states = #{}
+ queue_states = #{},
+ gc_threshold = GCThreshold
},
State1 = State#ch{
interceptor_state = rabbit_channel_interceptor:init(State)},
@@ -1110,8 +1114,8 @@ extract_variable_map_from_amqp_params([Value]) ->
extract_variable_map_from_amqp_params(_) ->
#{}.
-check_msg_size(Content, MaxMessageSize) ->
- Size = rabbit_basic:maybe_gc_large_msg(Content),
+check_msg_size(Content, MaxMessageSize, GCThreshold) ->
+ Size = rabbit_basic:maybe_gc_large_msg(Content, GCThreshold),
case Size of
S when S > MaxMessageSize ->
ErrorMessage = case MaxMessageSize of
@@ -1309,9 +1313,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
},
tx = Tx,
confirm_enabled = ConfirmEnabled,
- delivery_flow = Flow
+ delivery_flow = Flow,
+ gc_threshold = GCThreshold
}) ->
- check_msg_size(Content, MaxMessageSize),
+ check_msg_size(Content, MaxMessageSize, GCThreshold),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, User, AuthzContext),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -2689,7 +2694,8 @@ handle_deliver(ConsumerTag, AckRequired,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}},
State = #ch{cfg = #conf{writer_pid = WriterPid},
- next_tag = DeliveryTag}) ->
+ next_tag = DeliveryTag,
+ gc_threshold = GCThreshold}) ->
Deliver = #'basic.deliver'{consumer_tag = ConsumerTag,
delivery_tag = DeliveryTag,
redelivered = Redelivered,
@@ -2702,7 +2708,7 @@ handle_deliver(ConsumerTag, AckRequired,
false ->
ok = rabbit_writer:send_command(WriterPid, Deliver, Content)
end,
- rabbit_basic:maybe_gc_large_msg(Content),
+ rabbit_basic:maybe_gc_large_msg(Content, GCThreshold),
record_sent(deliver, ConsumerTag, AckRequired, Msg, State).
handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,