diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-17 14:36:12 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-17 14:36:12 +0000 |
commit | a6ae81bd0c34ca380be4c0ad1595097ee497454a (patch) | |
tree | d9cef30e689ae1f4391a69e7604f84e68dc1cb01 | |
parent | ebc2aa448dcf2825508e1a8a101844e133b88e5d (diff) | |
parent | b499052903b568ec82bdaf476adc08e0cac401cd (diff) | |
download | rabbitmq-server-a6ae81bd0c34ca380be4c0ad1595097ee497454a.tar.gz |
stable to default
-rw-r--r-- | include/rabbit.hrl | 7 | ||||
-rw-r--r-- | src/gm.erl | 106 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 7 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 7 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 9 | ||||
-rw-r--r-- | src/rabbit_error_logger_file_h.erl | 19 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 6 |
7 files changed, 118 insertions, 43 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index afb6e576..8b42cdea 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -112,4 +112,11 @@ -define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]). -define(DELETED_HEADER, <<"BCC">>). +%% Trying to send a term across a cluster larger than 2^31 bytes will +%% cause the VM to exit with "Absurdly large distribution output data +%% buffer". So we limit the max message size to 2^31 - 10^6 bytes (1MB +%% to allow plenty of leeway for the #basic_message{} and #content{} +%% wrapping the message body). +-define(MAX_MSG_SIZE, 2147383648). + -define(store_proc_name(N), rabbit_misc:store_proc_name(?MODULE, N)). @@ -382,7 +382,7 @@ -behaviour(gen_server2). --export([create_tables/0, start_link/4, leave/1, broadcast/2, +-export([create_tables/0, start_link/4, leave/1, broadcast/2, broadcast/3, confirmed_broadcast/2, info/1, validate_members/2, forget_group/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -395,6 +395,7 @@ -export([table_definitions/0]). -define(GROUP_TABLE, gm_group). +-define(MAX_BUFFER_SIZE, 100000000). %% 100MB -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). -define(BROADCAST_TIMER, 25). @@ -414,6 +415,7 @@ callback_args, confirms, broadcast_buffer, + broadcast_buffer_sz, broadcast_timer, txn_executor }). @@ -522,8 +524,10 @@ start_link(GroupName, Module, Args, TxnFun) -> leave(Server) -> gen_server2:cast(Server, leave). -broadcast(Server, Msg) -> - gen_server2:cast(Server, {broadcast, Msg}). +broadcast(Server, Msg) -> broadcast(Server, Msg, 0). + +broadcast(Server, Msg, SizeHint) -> + gen_server2:cast(Server, {broadcast, Msg, SizeHint}). confirmed_broadcast(Server, Msg) -> gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity). @@ -547,19 +551,20 @@ init([GroupName, Module, Args, TxnFun]) -> random:seed(MegaSecs, Secs, MicroSecs), Self = make_member(GroupName), gen_server2:cast(self(), join), - {ok, #state { self = Self, - left = {Self, undefined}, - right = {Self, undefined}, - group_name = GroupName, - module = Module, - view = undefined, - pub_count = -1, - members_state = undefined, - callback_args = Args, - confirms = queue:new(), - broadcast_buffer = [], - broadcast_timer = undefined, - txn_executor = TxnFun }, hibernate, + {ok, #state { self = Self, + left = {Self, undefined}, + right = {Self, undefined}, + group_name = GroupName, + module = Module, + view = undefined, + pub_count = -1, + members_state = undefined, + callback_args = Args, + confirms = queue:new(), + broadcast_buffer = [], + broadcast_buffer_sz = 0, + broadcast_timer = undefined, + txn_executor = TxnFun }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -576,7 +581,7 @@ handle_call({confirmed_broadcast, Msg}, _From, ok, State}); handle_call({confirmed_broadcast, Msg}, From, State) -> - internal_broadcast(Msg, From, State); + internal_broadcast(Msg, From, 0, State); handle_call(info, _From, State = #state { members_state = undefined }) -> @@ -639,10 +644,11 @@ handle_cast({?TAG, ReqVer, Msg}, if_callback_success( Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1)); -handle_cast({broadcast, _Msg}, State = #state { members_state = undefined }) -> +handle_cast({broadcast, _Msg, _SizeHint}, + State = #state { members_state = undefined }) -> noreply(State); -handle_cast({broadcast, Msg}, +handle_cast({broadcast, Msg, _SizeHint}, State = #state { self = Self, right = {Self, undefined}, module = Module, @@ -650,8 +656,8 @@ handle_cast({broadcast, Msg}, handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg), State}); -handle_cast({broadcast, Msg}, State) -> - internal_broadcast(Msg, none, State); +handle_cast({broadcast, Msg, SizeHint}, State) -> + internal_broadcast(Msg, none, SizeHint, State); handle_cast(join, State = #state { self = Self, group_name = GroupName, @@ -883,12 +889,14 @@ ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) -> ensure_broadcast_timer(State) -> State. -internal_broadcast(Msg, From, State = #state { self = Self, - pub_count = PubCount, - module = Module, - confirms = Confirms, - callback_args = Args, - broadcast_buffer = Buffer }) -> +internal_broadcast(Msg, From, SizeHint, + State = #state { self = Self, + pub_count = PubCount, + module = Module, + confirms = Confirms, + callback_args = Args, + broadcast_buffer = Buffer, + broadcast_buffer_sz = BufferSize }) -> PubCount1 = PubCount + 1, Result = Module:handle_msg(Args, get_pid(Self), Msg), Buffer1 = [{PubCount1, Msg} | Buffer], @@ -896,13 +904,38 @@ internal_broadcast(Msg, From, State = #state { self = Self, none -> Confirms; _ -> queue:in({PubCount1, From}, Confirms) end, - State1 = State #state { pub_count = PubCount1, - confirms = Confirms1, - broadcast_buffer = Buffer1 }, - handle_callback_result({Result, case From of - none -> State1; - _ -> flush_broadcast_buffer(State1) - end}). + State1 = State #state { pub_count = PubCount1, + confirms = Confirms1, + broadcast_buffer = Buffer1, + broadcast_buffer_sz = BufferSize + SizeHint}, + handle_callback_result( + {Result, case From of + none -> maybe_flush_broadcast_buffer(State1); + _ -> flush_broadcast_buffer(State1) + end}). + +%% The Erlang distribution mechanism has an interesting quirk - it +%% will kill the VM cold with "Absurdly large distribution output data +%% buffer" if you attempt to send a message which serialises out to +%% more than 2^31 bytes in size. It's therefore a very good idea to +%% make sure that we don't exceed that size! +%% +%% Now, we could figure out the size of messages as they come in using +%% size(term_to_binary(Msg)) or similar. The trouble is, that requires +%% us to serialise the message only to throw the serialised form +%% away. Hard to believe that's a sensible thing to do. So instead we +%% accept a size hint from the application, via broadcast/3. This size +%% hint can be the size of anything in the message which we expect +%% could be large, and we just ignore the size of any small bits of +%% the message term. Therefore MAX_BUFFER_SIZE is set somewhat +%% conservatively at 100MB - but the buffer is only to allow us to +%% buffer tiny messages anyway, so 100MB is plenty. + +maybe_flush_broadcast_buffer(State = #state{broadcast_buffer_sz = Size}) -> + case Size > ?MAX_BUFFER_SIZE of + true -> flush_broadcast_buffer(State); + false -> State + end. flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) -> State; @@ -920,8 +953,9 @@ flush_broadcast_buffer(State = #state { self = Self, Member #member { pending_ack = PA1, last_pub = PubCount } end, Self, MembersState), - State #state { members_state = MembersState1, - broadcast_buffer = [] }. + State #state { members_state = MembersState1, + broadcast_buffer = [], + broadcast_buffer_sz = 0}. %% --------------------------------------------------------------------------- diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 6b1e00b7..282113a4 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -113,10 +113,9 @@ -> [rabbit_types:infos()]). -spec(force_event_refresh/0 :: () -> 'ok'). -spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok'). --spec(consumers/1 :: - (rabbit_types:amqqueue()) - -> [{pid(), rabbit_types:ctag(), boolean(), - rabbit_framing:amqp_table()}]). +-spec(consumers/1 :: (rabbit_types:amqqueue()) + -> [{pid(), rabbit_types:ctag(), boolean(), + rabbit_framing:amqp_table()}]). -spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()). -spec(consumers_all/1 :: (rabbit_types:vhost()) diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 2e825536..3d70be4b 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -22,7 +22,7 @@ message/3, message/4, properties/1, prepend_table_header/3, extract_headers/1, map_headers/2, delivery/3, header_routes/1, parse_expiration/1]). --export([build_content/2, from_content/1]). +-export([build_content/2, from_content/1, msg_size/1]). %%---------------------------------------------------------------------------- @@ -77,6 +77,9 @@ (rabbit_framing:amqp_property_record()) -> rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any())). +-spec(msg_size/1 :: (rabbit_types:content() | rabbit_types:message()) -> + non_neg_integer()). + -endif. %%---------------------------------------------------------------------------- @@ -274,3 +277,5 @@ parse_expiration(#'P_basic'{expiration = Expiration}) -> {error, {leftover_string, S}} end. +msg_size(#content{payload_fragments_rev = PFR}) -> iolist_size(PFR); +msg_size(#basic_message{content = Content}) -> msg_size(Content). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 469cf4f7..39c2f0b0 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -522,6 +522,14 @@ check_internal_exchange(#exchange{name = Name, internal = true}) -> check_internal_exchange(_) -> ok. +check_msg_size(Content) -> + Size = rabbit_basic:msg_size(Content), + case Size > ?MAX_MSG_SIZE of + true -> precondition_failed("message size ~B larger than max size ~B", + [Size, ?MAX_MSG_SIZE]); + false -> ok + end. + qbin_to_resource(QueueNameBin, State) -> name_to_resource(queue, QueueNameBin, State). @@ -679,6 +687,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, tx = Tx, confirm_enabled = ConfirmEnabled, trace_state = TraceState}) -> + check_msg_size(Content), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl index d59641b0..9421b52e 100644 --- a/src/rabbit_error_logger_file_h.erl +++ b/src/rabbit_error_logger_file_h.erl @@ -79,6 +79,25 @@ init_file(File, PrevHandler) -> %% filter out "application: foo; exited: stopped; type: temporary" handle_event({info_report, _, {_, std_info, _}}, State) -> {ok, State}; +%% When a node restarts quickly it is possible the rest of the cluster +%% will not have had the chance to remove its queues from +%% Mnesia. That's why rabbit_amqqueue:recover/0 invokes +%% on_node_down(node()). But before we get there we can receive lots +%% of messages intended for the old version of the node. The emulator +%% logs an event for every one of those messages; in extremis this can +%% bring the server to its knees just logging "Discarding..." +%% again and again. So just log the first one, then go silent. +handle_event(Event = {error, _, {emulator, _, ["Discarding message" ++ _]}}, + State) -> + case get(discarding_message_seen) of + true -> {ok, State}; + undefined -> put(discarding_message_seen, true), + error_logger_file_h:handle_event(Event, State) + end; +%% Clear this state if we log anything else (but not a progress report). +handle_event(Event = {info_msg, _, _}, State) -> + erase(discarding_message_seen), + error_logger_file_h:handle_event(Event, State); handle_event(Event, State) -> error_logger_file_h:handle_event(Event, State). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 4f50e1a5..9ce5afcb 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -212,7 +212,8 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, backing_queue = BQ, backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}), + ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}, + rabbit_basic:msg_size(Msg)), BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS), ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). @@ -222,7 +223,8 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, backing_queue = BQ, backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}), + ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}, + rabbit_basic:msg_size(Msg)), {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS), State1 = State #state { backing_queue_state = BQS1 }, {AckTag, ensure_monitoring(ChPid, State1)}. |