diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-17 11:50:57 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-17 11:50:57 +0000 |
commit | d67e7fe58411585ec825e8153d5d70c193101d26 (patch) | |
tree | 9f055b48e1a403e6e72338b4e24b2b95b0b6fff7 | |
parent | b81c80f77d45d95029489e3cb7f3c9c872aa9e1c (diff) | |
download | rabbitmq-server-d67e7fe58411585ec825e8153d5d70c193101d26.tar.gz |
Limit maximum message size.
-rw-r--r-- | include/rabbit.hrl | 7 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 6 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 9 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 8 |
4 files changed, 24 insertions, 6 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index bd7a0eed..0f1b7a50 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -111,3 +111,10 @@ -define(INVALID_HEADERS_KEY, <<"x-invalid-headers">>). -define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]). -define(DELETED_HEADER, <<"BCC">>). + +%% Trying to send a term across a cluster larger than 2^31 bytes will +%% cause the VM to exit with "Absurdly large distribution output data +%% buffer". So we limit the max message size to 2^31 - 10^6 bytes (1MB +%% to allow plenty of leeway for the #basic_message{} and #content{} +%% wrapping the message body). +-define(MAX_MSG_SIZE, 2147383648). diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 2e825536..1f42322c 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -22,7 +22,7 @@ message/3, message/4, properties/1, prepend_table_header/3, extract_headers/1, map_headers/2, delivery/3, header_routes/1, parse_expiration/1]). --export([build_content/2, from_content/1]). +-export([build_content/2, from_content/1, msg_size/1]). %%---------------------------------------------------------------------------- @@ -77,6 +77,8 @@ (rabbit_framing:amqp_property_record()) -> rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any())). +-spec(msg_size/1 :: (rabbit_types:content()) -> non_neg_integer()). + -endif. %%---------------------------------------------------------------------------- @@ -274,3 +276,5 @@ parse_expiration(#'P_basic'{expiration = Expiration}) -> {error, {leftover_string, S}} end. +msg_size(#basic_message{content = #content{payload_fragments_rev = PFR}}) -> + iolist_size(PFR). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 62e6994e..98c0c1f1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -518,6 +518,14 @@ check_internal_exchange(#exchange{name = Name, internal = true}) -> check_internal_exchange(_) -> ok. +check_msg_size(Content) -> + Size = rabbit_basic:msg_size(Content), + case Size > ?MAX_MSG_SIZE of + true -> precondition_failed("message size ~s larger than max size ~s", + [Size, ?MAX_MSG_SIZE]); + false -> ok + end. + expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) -> rabbit_misc:protocol_error( not_found, "no previously declared queue", []); @@ -648,6 +656,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, tx = Tx, confirm_enabled = ConfirmEnabled, trace_state = TraceState}) -> + check_msg_size(Content), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 20283a15..58400f39 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -212,7 +212,8 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, backing_queue = BQ, backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}, msg_size(Msg)), + ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}, + rabbit_basic:msg_size(Msg)), BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS), ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). @@ -223,7 +224,7 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}, - msg_size(Msg)), + rabbit_basic:msg_size(Msg)), {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS), State1 = State #state { backing_queue_state = BQS1 }, {AckTag, ensure_monitoring(ChPid, State1)}. @@ -480,6 +481,3 @@ ensure_monitoring(ChPid, State = #state { coordinator = CPid, CPid, [ChPid]), State #state { known_senders = sets:add_element(ChPid, KS) } end. - -msg_size(#basic_message{content = #content{payload_fragments_rev = PFR}}) -> - iolist_size(PFR). |