summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-16 15:51:26 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-16 15:51:26 +0000
commitefd407517c1cc77495d187ec1e88fc7d89c6e06b (patch)
tree3b5060256d08831a2dca99067a99c89ab6f16f37
parent6a380541b983575c44b5e5c022855d94c92ebda8 (diff)
downloadrabbitmq-server-efd407517c1cc77495d187ec1e88fc7d89c6e06b.tar.gz
Avoid "Absurdly large distribution output data buffer" death.
-rw-r--r--src/gm.erl106
-rw-r--r--src/rabbit_mirror_queue_master.erl11
2 files changed, 79 insertions, 38 deletions
diff --git a/src/gm.erl b/src/gm.erl
index cddb2a3b..cb1f70ae 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -382,7 +382,7 @@
-behaviour(gen_server2).
--export([create_tables/0, start_link/4, leave/1, broadcast/2,
+-export([create_tables/0, start_link/4, leave/1, broadcast/2, broadcast/3,
confirmed_broadcast/2, info/1, validate_members/2, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -395,6 +395,7 @@
-export([table_definitions/0]).
-define(GROUP_TABLE, gm_group).
+-define(MAX_BUFFER_SIZE, 100000000). %% 100MB
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
-define(BROADCAST_TIMER, 25).
@@ -414,6 +415,7 @@
callback_args,
confirms,
broadcast_buffer,
+ broadcast_buffer_sz,
broadcast_timer,
txn_executor
}).
@@ -522,8 +524,10 @@ start_link(GroupName, Module, Args, TxnFun) ->
leave(Server) ->
gen_server2:cast(Server, leave).
-broadcast(Server, Msg) ->
- gen_server2:cast(Server, {broadcast, Msg}).
+broadcast(Server, Msg) -> broadcast(Server, Msg, 0).
+
+broadcast(Server, Msg, SizeHint) ->
+ gen_server2:cast(Server, {broadcast, Msg, SizeHint}).
confirmed_broadcast(Server, Msg) ->
gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity).
@@ -546,19 +550,20 @@ init([GroupName, Module, Args, TxnFun]) ->
random:seed(MegaSecs, Secs, MicroSecs),
Self = make_member(GroupName),
gen_server2:cast(self(), join),
- {ok, #state { self = Self,
- left = {Self, undefined},
- right = {Self, undefined},
- group_name = GroupName,
- module = Module,
- view = undefined,
- pub_count = -1,
- members_state = undefined,
- callback_args = Args,
- confirms = queue:new(),
- broadcast_buffer = [],
- broadcast_timer = undefined,
- txn_executor = TxnFun }, hibernate,
+ {ok, #state { self = Self,
+ left = {Self, undefined},
+ right = {Self, undefined},
+ group_name = GroupName,
+ module = Module,
+ view = undefined,
+ pub_count = -1,
+ members_state = undefined,
+ callback_args = Args,
+ confirms = queue:new(),
+ broadcast_buffer = [],
+ broadcast_buffer_sz = 0,
+ broadcast_timer = undefined,
+ txn_executor = TxnFun }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -575,7 +580,7 @@ handle_call({confirmed_broadcast, Msg}, _From,
ok, State});
handle_call({confirmed_broadcast, Msg}, From, State) ->
- internal_broadcast(Msg, From, State);
+ internal_broadcast(Msg, From, 0, State);
handle_call(info, _From,
State = #state { members_state = undefined }) ->
@@ -638,10 +643,11 @@ handle_cast({?TAG, ReqVer, Msg},
if_callback_success(
Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1));
-handle_cast({broadcast, _Msg}, State = #state { members_state = undefined }) ->
+handle_cast({broadcast, _Msg, _SizeHint},
+ State = #state { members_state = undefined }) ->
noreply(State);
-handle_cast({broadcast, Msg},
+handle_cast({broadcast, Msg, _SizeHint},
State = #state { self = Self,
right = {Self, undefined},
module = Module,
@@ -649,8 +655,8 @@ handle_cast({broadcast, Msg},
handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
State});
-handle_cast({broadcast, Msg}, State) ->
- internal_broadcast(Msg, none, State);
+handle_cast({broadcast, Msg, SizeHint}, State) ->
+ internal_broadcast(Msg, none, SizeHint, State);
handle_cast(join, State = #state { self = Self,
group_name = GroupName,
@@ -882,12 +888,14 @@ ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) ->
ensure_broadcast_timer(State) ->
State.
-internal_broadcast(Msg, From, State = #state { self = Self,
- pub_count = PubCount,
- module = Module,
- confirms = Confirms,
- callback_args = Args,
- broadcast_buffer = Buffer }) ->
+internal_broadcast(Msg, From, SizeHint,
+ State = #state { self = Self,
+ pub_count = PubCount,
+ module = Module,
+ confirms = Confirms,
+ callback_args = Args,
+ broadcast_buffer = Buffer,
+ broadcast_buffer_sz = BufferSize }) ->
PubCount1 = PubCount + 1,
Result = Module:handle_msg(Args, get_pid(Self), Msg),
Buffer1 = [{PubCount1, Msg} | Buffer],
@@ -895,13 +903,38 @@ internal_broadcast(Msg, From, State = #state { self = Self,
none -> Confirms;
_ -> queue:in({PubCount1, From}, Confirms)
end,
- State1 = State #state { pub_count = PubCount1,
- confirms = Confirms1,
- broadcast_buffer = Buffer1 },
- handle_callback_result({Result, case From of
- none -> State1;
- _ -> flush_broadcast_buffer(State1)
- end}).
+ State1 = State #state { pub_count = PubCount1,
+ confirms = Confirms1,
+ broadcast_buffer = Buffer1,
+ broadcast_buffer_sz = BufferSize + SizeHint},
+ handle_callback_result(
+ {Result, case From of
+ none -> maybe_flush_broadcast_buffer(State1);
+ _ -> flush_broadcast_buffer(State1)
+ end}).
+
+%% The Erlang distribution mechanism has an interesting quirk - it
+%% will kill the VM cold with "Absurdly large distribution output data
+%% buffer" if you attempt to send a message which serialises out to
+%% more than 2^31 bytes in size. It's therefore a very good idea to
+%% make sure that we don't exceed that size!
+%%
+%% Now, we could figure out the size of messages as they come in using
+%% size(term_to_binary(Msg)) or similar. The trouble is, that requires
+%% us to serialise the message only to throw the serialised form
+%% away. Hard to believe that's a sensible thing to do. So instead we
+%% accept a size hint from the application, via broadcast/3. This size
+%% hint can be the size of anything in the message which we expect
+%% could be large, and we just ignore the size of any small bits of
+%% the message term. Therefore MAX_BUFFER_SIZE is set somewhat
+%% conservatively at 100MB - but the buffer is only to allow us to
+%% buffer tiny messages anyway, so 100MB is plenty.
+
+maybe_flush_broadcast_buffer(State = #state{broadcast_buffer_sz = Size}) ->
+ case Size > ?MAX_BUFFER_SIZE of
+ true -> flush_broadcast_buffer(State);
+ false -> State
+ end.
flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) ->
State;
@@ -919,8 +952,9 @@ flush_broadcast_buffer(State = #state { self = Self,
Member #member { pending_ack = PA1,
last_pub = PubCount }
end, Self, MembersState),
- State #state { members_state = MembersState1,
- broadcast_buffer = [] }.
+ State #state { members_state = MembersState1,
+ broadcast_buffer = [],
+ broadcast_buffer_sz = 0}.
%% ---------------------------------------------------------------------------
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index d9cef642..57bf5b33 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -212,7 +212,7 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}),
+ ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}, msg_size(Msg)),
BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
@@ -222,7 +222,8 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}),
+ ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg},
+ msg_size(Msg)),
{AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
State1 = State #state { backing_queue_state = BQS1 },
{AckTag, ensure_monitoring(ChPid, State1)}.
@@ -479,3 +480,9 @@ ensure_monitoring(ChPid, State = #state { coordinator = CPid,
CPid, [ChPid]),
State #state { known_senders = sets:add_element(ChPid, KS) }
end.
+
+msg_size(#basic_message{content = #content{payload_fragments_rev = PFR}}) ->
+ msg_size(PFR, 0).
+
+msg_size([], Size) -> Size;
+msg_size([H|T], Size) -> msg_size(T, Size + size(H)).