summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-17 11:50:57 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-17 11:50:57 +0000
commitd67e7fe58411585ec825e8153d5d70c193101d26 (patch)
tree9f055b48e1a403e6e72338b4e24b2b95b0b6fff7
parentb81c80f77d45d95029489e3cb7f3c9c872aa9e1c (diff)
downloadrabbitmq-server-d67e7fe58411585ec825e8153d5d70c193101d26.tar.gz
Limit maximum message size.
-rw-r--r--include/rabbit.hrl7
-rw-r--r--src/rabbit_basic.erl6
-rw-r--r--src/rabbit_channel.erl9
-rw-r--r--src/rabbit_mirror_queue_master.erl8
4 files changed, 24 insertions, 6 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index bd7a0eed..0f1b7a50 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -111,3 +111,10 @@
-define(INVALID_HEADERS_KEY, <<"x-invalid-headers">>).
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
-define(DELETED_HEADER, <<"BCC">>).
+
+%% Trying to send a term across a cluster larger than 2^31 bytes will
+%% cause the VM to exit with "Absurdly large distribution output data
+%% buffer". So we limit the max message size to 2^31 - 10^6 bytes (1MB
+%% to allow plenty of leeway for the #basic_message{} and #content{}
+%% wrapping the message body).
+-define(MAX_MSG_SIZE, 2147383648).
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 2e825536..1f42322c 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -22,7 +22,7 @@
message/3, message/4, properties/1, prepend_table_header/3,
extract_headers/1, map_headers/2, delivery/3, header_routes/1,
parse_expiration/1]).
--export([build_content/2, from_content/1]).
+-export([build_content/2, from_content/1, msg_size/1]).
%%----------------------------------------------------------------------------
@@ -77,6 +77,8 @@
(rabbit_framing:amqp_property_record())
-> rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any())).
+-spec(msg_size/1 :: (rabbit_types:content()) -> non_neg_integer()).
+
-endif.
%%----------------------------------------------------------------------------
@@ -274,3 +276,5 @@ parse_expiration(#'P_basic'{expiration = Expiration}) ->
{error, {leftover_string, S}}
end.
+msg_size(#basic_message{content = #content{payload_fragments_rev = PFR}}) ->
+ iolist_size(PFR).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 62e6994e..98c0c1f1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -518,6 +518,14 @@ check_internal_exchange(#exchange{name = Name, internal = true}) ->
check_internal_exchange(_) ->
ok.
+check_msg_size(Content) ->
+ Size = rabbit_basic:msg_size(Content),
+ case Size > ?MAX_MSG_SIZE of
+ true -> precondition_failed("message size ~s larger than max size ~s",
+ [Size, ?MAX_MSG_SIZE]);
+ false -> ok
+ end.
+
expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) ->
rabbit_misc:protocol_error(
not_found, "no previously declared queue", []);
@@ -648,6 +656,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
tx = Tx,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState}) ->
+ check_msg_size(Content),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 20283a15..58400f39 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -212,7 +212,8 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}, msg_size(Msg)),
+ ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg},
+ rabbit_basic:msg_size(Msg)),
BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
@@ -223,7 +224,7 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg},
- msg_size(Msg)),
+ rabbit_basic:msg_size(Msg)),
{AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
State1 = State #state { backing_queue_state = BQS1 },
{AckTag, ensure_monitoring(ChPid, State1)}.
@@ -480,6 +481,3 @@ ensure_monitoring(ChPid, State = #state { coordinator = CPid,
CPid, [ChPid]),
State #state { known_senders = sets:add_element(ChPid, KS) }
end.
-
-msg_size(#basic_message{content = #content{payload_fragments_rev = PFR}}) ->
- iolist_size(PFR).