summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-17 14:36:12 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-17 14:36:12 +0000
commita6ae81bd0c34ca380be4c0ad1595097ee497454a (patch)
treed9cef30e689ae1f4391a69e7604f84e68dc1cb01
parentebc2aa448dcf2825508e1a8a101844e133b88e5d (diff)
parentb499052903b568ec82bdaf476adc08e0cac401cd (diff)
downloadrabbitmq-server-a6ae81bd0c34ca380be4c0ad1595097ee497454a.tar.gz
stable to default
-rw-r--r--include/rabbit.hrl7
-rw-r--r--src/gm.erl106
-rw-r--r--src/rabbit_amqqueue.erl7
-rw-r--r--src/rabbit_basic.erl7
-rw-r--r--src/rabbit_channel.erl9
-rw-r--r--src/rabbit_error_logger_file_h.erl19
-rw-r--r--src/rabbit_mirror_queue_master.erl6
7 files changed, 118 insertions, 43 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index afb6e576..8b42cdea 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -112,4 +112,11 @@
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
-define(DELETED_HEADER, <<"BCC">>).
+%% Trying to send a term across a cluster larger than 2^31 bytes will
+%% cause the VM to exit with "Absurdly large distribution output data
+%% buffer". So we limit the max message size to 2^31 - 10^6 bytes (1MB
+%% to allow plenty of leeway for the #basic_message{} and #content{}
+%% wrapping the message body).
+-define(MAX_MSG_SIZE, 2147383648).
+
-define(store_proc_name(N), rabbit_misc:store_proc_name(?MODULE, N)).
diff --git a/src/gm.erl b/src/gm.erl
index df1c258d..5a82950a 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -382,7 +382,7 @@
-behaviour(gen_server2).
--export([create_tables/0, start_link/4, leave/1, broadcast/2,
+-export([create_tables/0, start_link/4, leave/1, broadcast/2, broadcast/3,
confirmed_broadcast/2, info/1, validate_members/2, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -395,6 +395,7 @@
-export([table_definitions/0]).
-define(GROUP_TABLE, gm_group).
+-define(MAX_BUFFER_SIZE, 100000000). %% 100MB
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
-define(BROADCAST_TIMER, 25).
@@ -414,6 +415,7 @@
callback_args,
confirms,
broadcast_buffer,
+ broadcast_buffer_sz,
broadcast_timer,
txn_executor
}).
@@ -522,8 +524,10 @@ start_link(GroupName, Module, Args, TxnFun) ->
leave(Server) ->
gen_server2:cast(Server, leave).
-broadcast(Server, Msg) ->
- gen_server2:cast(Server, {broadcast, Msg}).
+broadcast(Server, Msg) -> broadcast(Server, Msg, 0).
+
+broadcast(Server, Msg, SizeHint) ->
+ gen_server2:cast(Server, {broadcast, Msg, SizeHint}).
confirmed_broadcast(Server, Msg) ->
gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity).
@@ -547,19 +551,20 @@ init([GroupName, Module, Args, TxnFun]) ->
random:seed(MegaSecs, Secs, MicroSecs),
Self = make_member(GroupName),
gen_server2:cast(self(), join),
- {ok, #state { self = Self,
- left = {Self, undefined},
- right = {Self, undefined},
- group_name = GroupName,
- module = Module,
- view = undefined,
- pub_count = -1,
- members_state = undefined,
- callback_args = Args,
- confirms = queue:new(),
- broadcast_buffer = [],
- broadcast_timer = undefined,
- txn_executor = TxnFun }, hibernate,
+ {ok, #state { self = Self,
+ left = {Self, undefined},
+ right = {Self, undefined},
+ group_name = GroupName,
+ module = Module,
+ view = undefined,
+ pub_count = -1,
+ members_state = undefined,
+ callback_args = Args,
+ confirms = queue:new(),
+ broadcast_buffer = [],
+ broadcast_buffer_sz = 0,
+ broadcast_timer = undefined,
+ txn_executor = TxnFun }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -576,7 +581,7 @@ handle_call({confirmed_broadcast, Msg}, _From,
ok, State});
handle_call({confirmed_broadcast, Msg}, From, State) ->
- internal_broadcast(Msg, From, State);
+ internal_broadcast(Msg, From, 0, State);
handle_call(info, _From,
State = #state { members_state = undefined }) ->
@@ -639,10 +644,11 @@ handle_cast({?TAG, ReqVer, Msg},
if_callback_success(
Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1));
-handle_cast({broadcast, _Msg}, State = #state { members_state = undefined }) ->
+handle_cast({broadcast, _Msg, _SizeHint},
+ State = #state { members_state = undefined }) ->
noreply(State);
-handle_cast({broadcast, Msg},
+handle_cast({broadcast, Msg, _SizeHint},
State = #state { self = Self,
right = {Self, undefined},
module = Module,
@@ -650,8 +656,8 @@ handle_cast({broadcast, Msg},
handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
State});
-handle_cast({broadcast, Msg}, State) ->
- internal_broadcast(Msg, none, State);
+handle_cast({broadcast, Msg, SizeHint}, State) ->
+ internal_broadcast(Msg, none, SizeHint, State);
handle_cast(join, State = #state { self = Self,
group_name = GroupName,
@@ -883,12 +889,14 @@ ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) ->
ensure_broadcast_timer(State) ->
State.
-internal_broadcast(Msg, From, State = #state { self = Self,
- pub_count = PubCount,
- module = Module,
- confirms = Confirms,
- callback_args = Args,
- broadcast_buffer = Buffer }) ->
+internal_broadcast(Msg, From, SizeHint,
+ State = #state { self = Self,
+ pub_count = PubCount,
+ module = Module,
+ confirms = Confirms,
+ callback_args = Args,
+ broadcast_buffer = Buffer,
+ broadcast_buffer_sz = BufferSize }) ->
PubCount1 = PubCount + 1,
Result = Module:handle_msg(Args, get_pid(Self), Msg),
Buffer1 = [{PubCount1, Msg} | Buffer],
@@ -896,13 +904,38 @@ internal_broadcast(Msg, From, State = #state { self = Self,
none -> Confirms;
_ -> queue:in({PubCount1, From}, Confirms)
end,
- State1 = State #state { pub_count = PubCount1,
- confirms = Confirms1,
- broadcast_buffer = Buffer1 },
- handle_callback_result({Result, case From of
- none -> State1;
- _ -> flush_broadcast_buffer(State1)
- end}).
+ State1 = State #state { pub_count = PubCount1,
+ confirms = Confirms1,
+ broadcast_buffer = Buffer1,
+ broadcast_buffer_sz = BufferSize + SizeHint},
+ handle_callback_result(
+ {Result, case From of
+ none -> maybe_flush_broadcast_buffer(State1);
+ _ -> flush_broadcast_buffer(State1)
+ end}).
+
+%% The Erlang distribution mechanism has an interesting quirk - it
+%% will kill the VM cold with "Absurdly large distribution output data
+%% buffer" if you attempt to send a message which serialises out to
+%% more than 2^31 bytes in size. It's therefore a very good idea to
+%% make sure that we don't exceed that size!
+%%
+%% Now, we could figure out the size of messages as they come in using
+%% size(term_to_binary(Msg)) or similar. The trouble is, that requires
+%% us to serialise the message only to throw the serialised form
+%% away. Hard to believe that's a sensible thing to do. So instead we
+%% accept a size hint from the application, via broadcast/3. This size
+%% hint can be the size of anything in the message which we expect
+%% could be large, and we just ignore the size of any small bits of
+%% the message term. Therefore MAX_BUFFER_SIZE is set somewhat
+%% conservatively at 100MB - but the buffer is only to allow us to
+%% buffer tiny messages anyway, so 100MB is plenty.
+
+maybe_flush_broadcast_buffer(State = #state{broadcast_buffer_sz = Size}) ->
+ case Size > ?MAX_BUFFER_SIZE of
+ true -> flush_broadcast_buffer(State);
+ false -> State
+ end.
flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) ->
State;
@@ -920,8 +953,9 @@ flush_broadcast_buffer(State = #state { self = Self,
Member #member { pending_ack = PA1,
last_pub = PubCount }
end, Self, MembersState),
- State #state { members_state = MembersState1,
- broadcast_buffer = [] }.
+ State #state { members_state = MembersState1,
+ broadcast_buffer = [],
+ broadcast_buffer_sz = 0}.
%% ---------------------------------------------------------------------------
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6b1e00b7..282113a4 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -113,10 +113,9 @@
-> [rabbit_types:infos()]).
-spec(force_event_refresh/0 :: () -> 'ok').
-spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok').
--spec(consumers/1 ::
- (rabbit_types:amqqueue())
- -> [{pid(), rabbit_types:ctag(), boolean(),
- rabbit_framing:amqp_table()}]).
+-spec(consumers/1 :: (rabbit_types:amqqueue())
+ -> [{pid(), rabbit_types:ctag(), boolean(),
+ rabbit_framing:amqp_table()}]).
-spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(consumers_all/1 ::
(rabbit_types:vhost())
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 2e825536..3d70be4b 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -22,7 +22,7 @@
message/3, message/4, properties/1, prepend_table_header/3,
extract_headers/1, map_headers/2, delivery/3, header_routes/1,
parse_expiration/1]).
--export([build_content/2, from_content/1]).
+-export([build_content/2, from_content/1, msg_size/1]).
%%----------------------------------------------------------------------------
@@ -77,6 +77,9 @@
(rabbit_framing:amqp_property_record())
-> rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any())).
+-spec(msg_size/1 :: (rabbit_types:content() | rabbit_types:message()) ->
+ non_neg_integer()).
+
-endif.
%%----------------------------------------------------------------------------
@@ -274,3 +277,5 @@ parse_expiration(#'P_basic'{expiration = Expiration}) ->
{error, {leftover_string, S}}
end.
+msg_size(#content{payload_fragments_rev = PFR}) -> iolist_size(PFR);
+msg_size(#basic_message{content = Content}) -> msg_size(Content).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 469cf4f7..39c2f0b0 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -522,6 +522,14 @@ check_internal_exchange(#exchange{name = Name, internal = true}) ->
check_internal_exchange(_) ->
ok.
+check_msg_size(Content) ->
+ Size = rabbit_basic:msg_size(Content),
+ case Size > ?MAX_MSG_SIZE of
+ true -> precondition_failed("message size ~B larger than max size ~B",
+ [Size, ?MAX_MSG_SIZE]);
+ false -> ok
+ end.
+
qbin_to_resource(QueueNameBin, State) ->
name_to_resource(queue, QueueNameBin, State).
@@ -679,6 +687,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
tx = Tx,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState}) ->
+ check_msg_size(Content),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index d59641b0..9421b52e 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -79,6 +79,25 @@ init_file(File, PrevHandler) ->
%% filter out "application: foo; exited: stopped; type: temporary"
handle_event({info_report, _, {_, std_info, _}}, State) ->
{ok, State};
+%% When a node restarts quickly it is possible the rest of the cluster
+%% will not have had the chance to remove its queues from
+%% Mnesia. That's why rabbit_amqqueue:recover/0 invokes
+%% on_node_down(node()). But before we get there we can receive lots
+%% of messages intended for the old version of the node. The emulator
+%% logs an event for every one of those messages; in extremis this can
+%% bring the server to its knees just logging "Discarding..."
+%% again and again. So just log the first one, then go silent.
+handle_event(Event = {error, _, {emulator, _, ["Discarding message" ++ _]}},
+ State) ->
+ case get(discarding_message_seen) of
+ true -> {ok, State};
+ undefined -> put(discarding_message_seen, true),
+ error_logger_file_h:handle_event(Event, State)
+ end;
+%% Clear this state if we log anything else (but not a progress report).
+handle_event(Event = {info_msg, _, _}, State) ->
+ erase(discarding_message_seen),
+ error_logger_file_h:handle_event(Event, State);
handle_event(Event, State) ->
error_logger_file_h:handle_event(Event, State).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 4f50e1a5..9ce5afcb 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -212,7 +212,8 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}),
+ ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg},
+ rabbit_basic:msg_size(Msg)),
BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
@@ -222,7 +223,8 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}),
+ ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg},
+ rabbit_basic:msg_size(Msg)),
{AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
State1 = State #state { backing_queue_state = BQS1 },
{AckTag, ensure_monitoring(ChPid, State1)}.