summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-10-06 16:53:04 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-10-06 16:53:04 +0100
commit9b1cf9ba35cdc21c6f9a0dd175e9d9ca526c798d (patch)
tree3d9d91c1c5eced84851d391a460bba3e0a87ee51
parentcce2494b91bdf4509e977665620faf759c47582f (diff)
downloadrabbitmq-server-9b1cf9ba35cdc21c6f9a0dd175e9d9ca526c798d.tar.gz
Prevent the channel from holding a lot of binary garbage when accepting huge messages.
-rw-r--r--src/rabbit_basic.erl24
-rw-r--r--src/rabbit_channel.erl2
2 files changed, 24 insertions, 2 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 85f9d56e..ae415180 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -22,7 +22,7 @@
message/3, message/4, properties/1, prepend_table_header/3,
extract_headers/1, map_headers/2, delivery/4, header_routes/1,
parse_expiration/1]).
--export([build_content/2, from_content/1, msg_size/1]).
+-export([build_content/2, from_content/1, msg_size/1, msg_size_and_gc/1]).
%%----------------------------------------------------------------------------
@@ -79,6 +79,9 @@
-spec(msg_size/1 :: (rabbit_types:content() | rabbit_types:message()) ->
non_neg_integer()).
+-spec(msg_size_and_gc/1 :: (rabbit_types:content() | rabbit_types:message()) ->
+ non_neg_integer()).
+
-endif.
%%----------------------------------------------------------------------------
@@ -276,5 +279,24 @@ parse_expiration(#'P_basic'{expiration = Expiration}) ->
{error, {leftover_string, S}}
end.
+%% Some processes (channel, writer) can get huge amounts of binary
+%% garbage when processing huge messages at high speed (since we only
+%% do enough reductions to GC every few hundred messages, and if each
+%% message is 1MB then that's ugly). So count how many bytes of
+%% message we have processed, and force a GC every so often.
+msg_size_and_gc(Content) ->
+ Size = msg_size(Content),
+ Current = case get(msg_size_for_gc) of
+ undefined -> 0;
+ C -> C
+ end,
+ New = Current + Size,
+ put(msg_size_for_gc, case New > 1000000 of
+ true -> erlang:garbage_collect(),
+ 0;
+ false -> New
+ end),
+ Size.
+
msg_size(#content{payload_fragments_rev = PFR}) -> iolist_size(PFR);
msg_size(#basic_message{content = Content}) -> msg_size(Content).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index fc433898..7f10c19f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -607,7 +607,7 @@ check_internal_exchange(_) ->
ok.
check_msg_size(Content) ->
- Size = rabbit_basic:msg_size(Content),
+ Size = rabbit_basic:msg_size_and_gc(Content),
case Size > ?MAX_MSG_SIZE of
true -> precondition_failed("message size ~B larger than max size ~B",
[Size, ?MAX_MSG_SIZE]);