diff options
authorEmile Joubert <>2013-07-31 13:06:16 +0100
committerEmile Joubert <>2013-07-31 13:06:16 +0100
commit09b934a686384267afdb28fe3b4bba4d9bcca78c (patch)
parentac666e08c5405aa0b4e27a7edaaaf05d60e1e55e (diff)
parentd99108bf76d3ddb972683217ae3e3e62583d036c (diff)
Refresh branch from stable
21 files changed, 316 insertions, 173 deletions
diff --git a/Makefile b/Makefile
index 9a6ad7a6..56d4b3c0 100644
--- a/Makefile
+++ b/Makefile
@@ -56,7 +56,8 @@ endif
#other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests
ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(call boolean_macro,$(USE_SPECS),use_specs) $(call boolean_macro,$(USE_PROPER_QC),use_proper_qc)
PLUGINS_SRC_DIR?=$(shell [ -d "plugins-src" ] && echo "plugins-src" || echo )
@@ -262,6 +263,8 @@ srcdist: distclean
cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/
cp Makefile generate_app generate_deps calculate-relative $(TARGET_SRC_DIR)
cp -r scripts $(TARGET_SRC_DIR)
chmod 0755 $(TARGET_SRC_DIR)/scripts/*
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 0f3c0faf..1d641144 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -405,6 +405,13 @@
must be offline, while the node we are removing from must be
online, except when using the <command>--offline</command> flag.
+ <para>
+ When using the <command>--offline</command> flag the node you
+ connect to will become the canonical source for cluster metadata
+ (e.g. which queues exist), even if it was not before. Therefore
+ you should use this command on the latest node to shut down if
+ at all possible.
+ </para>
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmqctl -n hare@mcnulty forget_cluster_node rabbit@stringer</screen>
<para role="example">
diff --git a/ebin/ b/ebin/
index 339fa69e..a4582e2d 100644
--- a/ebin/
+++ b/ebin/
@@ -14,8 +14,7 @@
%% we also depend on crypto, public_key and ssl but they shouldn't be
%% in here as we don't actually want to start it
{mod, {rabbit, []}},
- {env, [{hipe_compile, false},
- {tcp_listeners, [5672]},
+ {env, [{tcp_listeners, [5672]},
{ssl_listeners, []},
{ssl_options, []},
{vm_memory_high_watermark, 0.4},
@@ -51,5 +50,24 @@
{backlog, 128},
{nodelay, true},
{linger, {true, 0}},
- {exit_on_close, false}]}
- ]}]}.
+ {exit_on_close, false}]},
+ {hipe_compile, false},
+ %% see bug 24513 for how this list was created
+ {hipe_modules,
+ [rabbit_reader, rabbit_channel, gen_server2, rabbit_exchange,
+ rabbit_command_assembler, rabbit_framing_amqp_0_9_1, rabbit_basic,
+ rabbit_event, lists, queue, priority_queue, rabbit_router,
+ rabbit_trace, rabbit_misc, rabbit_binary_parser,
+ rabbit_exchange_type_direct, rabbit_guid, rabbit_net,
+ rabbit_amqqueue_process, rabbit_variable_queue,
+ rabbit_binary_generator, rabbit_writer, delegate, gb_sets, lqueue,
+ sets, orddict, rabbit_amqqueue, rabbit_limiter, gb_trees,
+ rabbit_queue_index, rabbit_exchange_decorator, gen, dict, ordsets,
+ file_handle_cache, rabbit_msg_store, array,
+ rabbit_msg_store_ets_index, rabbit_msg_file,
+ rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia,
+ mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow,
+ pmon, ssl_connection, tls_connection, ssl_record, tls_record,
+ gen_fsm, ssl]},
+ {ssl_apps, [asn1, crypto, public_key, ssl]}
+ ]}]}.
diff --git a/packaging/standalone/src/rabbit_release.erl b/packaging/standalone/src/rabbit_release.erl
index dd68ee7e..f5e1ecf8 100644
--- a/packaging/standalone/src/rabbit_release.erl
+++ b/packaging/standalone/src/rabbit_release.erl
@@ -54,7 +54,9 @@ start() ->
%% we need a list of ERTS apps we need to ship with rabbit
- BaseApps = AllApps -- PluginAppNames,
+ {ok, SslAppsConfig} = application:get_env(rabbit, ssl_apps),
+ BaseApps = SslAppsConfig ++ AllApps -- PluginAppNames,
AppVersions = [determine_version(App) || App <- BaseApps],
RabbitVersion = proplists:get_value(rabbit, AppVersions),
diff --git a/src/delegate.erl b/src/delegate.erl
index 4e1dcd2e..7a06c1e4 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -18,15 +18,22 @@
--export([start_link/1, invoke_no_result/2, invoke/2, call/2, cast/2]).
+-export([start_link/1, invoke_no_result/2, invoke/2, monitor/2,
+ demonitor/1, demonitor/2, call/2, cast/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
+-record(state, {node, monitors, name}).
+-type(monitor_ref() :: reference() | {atom(), pid()}).
-spec(start_link/1 ::
(non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}).
-spec(invoke/2 ::
@@ -35,6 +42,10 @@
[{pid(), term()}]}).
-spec(invoke_no_result/2 ::
(pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
+-spec(monitor/2 :: ('process', pid()) -> monitor_ref()).
+-spec(demonitor/1 :: (monitor_ref()) -> 'true').
+-spec(demonitor/2 :: (monitor_ref(), ['flush']) -> 'true').
-spec(call/2 ::
( pid(), any()) -> any();
([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}).
@@ -50,7 +61,8 @@
start_link(Num) ->
- gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []).
+ Name = delegate_name(Num),
+ gen_server2:start_link({local, Name}, ?MODULE, [Name], []).
invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
@@ -78,7 +90,7 @@ invoke(Pids, Fun) when is_list(Pids) ->
case orddict:fetch_keys(Grouped) of
[] -> {[], []};
RemoteNodes -> gen_server2:multi_call(
- RemoteNodes, delegate(RemoteNodes),
+ RemoteNodes, delegate(self(), RemoteNodes),
{invoke, Fun, Grouped}, infinity)
BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} ||
@@ -106,12 +118,27 @@ invoke_no_result(Pids, Fun) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
case orddict:fetch_keys(Grouped) of
[] -> ok;
- RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes),
- {invoke, Fun, Grouped})
+ RemoteNodes -> gen_server2:abcast(
+ RemoteNodes, delegate(self(), RemoteNodes),
+ {invoke, Fun, Grouped})
safe_invoke(LocalPids, Fun), %% must not die
+monitor(Type, Pid) when node(Pid) =:= node() ->
+ erlang:monitor(Type, Pid);
+monitor(Type, Pid) ->
+ Name = delegate(Pid, [node(Pid)]),
+ gen_server2:cast(Name, {monitor, Type, self(), Pid}),
+ {Name, Pid}.
+demonitor(Ref) -> ?MODULE:demonitor(Ref, []).
+demonitor(Ref, Options) when is_reference(Ref) ->
+ erlang:demonitor(Ref, Options);
+demonitor({Name, Pid}, Options) ->
+ gen_server2:cast(Name, {demonitor, Pid, Options}).
call(PidOrPids, Msg) ->
invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end).
@@ -134,10 +161,10 @@ group_pids_by_node(Pids) ->
delegate_name(Hash) ->
list_to_atom("delegate_" ++ integer_to_list(Hash)).
-delegate(RemoteNodes) ->
+delegate(Pid, RemoteNodes) ->
case get(delegate) of
undefined -> Name = delegate_name(
- erlang:phash2(self(),
+ erlang:phash2(Pid,
put(delegate, Name),
@@ -155,22 +182,48 @@ safe_invoke(Pid, Fun) when is_pid(Pid) ->
-init([]) ->
- {ok, node(), hibernate,
+init([Name]) ->
+ {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate,
-handle_call({invoke, Fun, Grouped}, _From, Node) ->
- {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}.
-handle_cast({invoke, Fun, Grouped}, Node) ->
+handle_call({invoke, Fun, Grouped}, _From, State = #state{node = Node}) ->
+ {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate}.
+handle_cast({monitor, Type, WantsMonitor, Pid},
+ State = #state{monitors = Monitors}) ->
+ Ref = erlang:monitor(Type, Pid),
+ Monitors1 = dict:store(Pid, {WantsMonitor, Ref}, Monitors),
+ {noreply, State#state{monitors = Monitors1}, hibernate};
+handle_cast({demonitor, Pid, Options},
+ State = #state{monitors = Monitors}) ->
+ {noreply, case dict:find(Pid, Monitors) of
+ {ok, {_WantsMonitor, Ref}} ->
+ erlang:demonitor(Ref, Options),
+ State#state{monitors = dict:erase(Pid, Monitors)};
+ error ->
+ State
+ end, hibernate};
+handle_cast({invoke, Fun, Grouped}, State = #state{node = Node}) ->
safe_invoke(orddict:fetch(Node, Grouped), Fun),
- {noreply, Node, hibernate}.
+ {noreply, State, hibernate}.
+handle_info({'DOWN', Ref, process, Pid, Info},
+ State = #state{monitors = Monitors, name = Name}) ->
+ {noreply, case dict:find(Pid, Monitors) of
+ {ok, {WantsMonitor, Ref}} ->
+ WantsMonitor ! {'DOWN', {Name, Pid}, process, Pid, Info},
+ State#state{monitors = dict:erase(Pid, Monitors)};
+ error ->
+ State
+ end, hibernate};
-handle_info(_Info, Node) ->
- {noreply, Node, hibernate}.
+handle_info(_Info, State) ->
+ {noreply, State, hibernate}.
terminate(_Reason, _State) ->
-code_change(_OldVsn, Node, _Extra) ->
- {ok, Node}.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/gm.erl b/src/gm.erl
index a6735ef8..78099499 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -81,6 +81,12 @@
%% Provide the Pid. Returns a proplist with various facts, including
%% the group name and the current group members.
+%% validate_members/2
+%% Check whether a given member list agrees with the chosen member's
+%% view. Any differences will be communicated via the members_changed
+%% callback. If there are no differences then there will be no reply.
+%% Note that members will not necessarily share the same view.
%% forget_group/1
%% Provide the group name. Removes its mnesia record. Makes no attempt
%% to ensure the group is empty.
@@ -377,7 +383,7 @@
-export([create_tables/0, start_link/4, leave/1, broadcast/2,
- confirmed_broadcast/2, info/1, forget_group/1]).
+ confirmed_broadcast/2, info/1, validate_members/2, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, prioritise_info/3]).
@@ -438,6 +444,7 @@
-spec(broadcast/2 :: (pid(), any()) -> 'ok').
-spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok').
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
+-spec(validate_members/2 :: (pid(), [pid()]) -> 'ok').
-spec(forget_group/1 :: (group_name()) -> 'ok').
%% The joined, members_changed and handle_msg callbacks can all return
@@ -524,6 +531,9 @@ confirmed_broadcast(Server, Msg) ->
info(Server) ->
gen_server2:call(Server, info, infinity).
+validate_members(Server, Members) ->
+ gen_server2:cast(Server, {validate_members, Members}).
forget_group(GroupName) ->
{atomic, ok} = mnesia:sync_transaction(
fun () ->
@@ -659,6 +669,19 @@ handle_cast(join, State = #state { self = Self,
{Module:joined(Args, get_pids(all_known_members(View))), State1});
+handle_cast({validate_members, OldMembers},
+ State = #state { view = View,
+ module = Module,
+ callback_args = Args }) ->
+ NewMembers = get_pids(all_known_members(View)),
+ Births = NewMembers -- OldMembers,
+ Deaths = OldMembers -- NewMembers,
+ case {Births, Deaths} of
+ {[], []} -> noreply(State);
+ _ -> Result = Module:members_changed(Args, Births, Deaths),
+ handle_callback_result({Result, State})
+ end;
handle_cast(leave, State) ->
{stop, normal, State}.
@@ -1053,7 +1076,7 @@ prune_or_create_group(Self, GroupName, TxnFun) ->
fun () ->
GroupNew = #gm_group { name = GroupName,
members = [Self],
- version = ?VERSION_START },
+ version = get_version(Self) },
case mnesia:read({?GROUP_TABLE, GroupName}) of
[] ->
@@ -1294,6 +1317,8 @@ remove_erased_members(MembersState, View) ->
end, blank_member_state(), all_known_members(View)).
+get_version({Version, _Pid}) -> Version.
get_pid({_Version, Pid}) -> Pid.
get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
diff --git a/src/pmon.erl b/src/pmon.erl
index b9db66fb..86308167 100644
--- a/src/pmon.erl
+++ b/src/pmon.erl
@@ -16,22 +16,26 @@
--export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2,
- monitored/1, is_empty/1]).
+-export([new/0, new/1, monitor/2, monitor_all/2, demonitor/2,
+ is_monitored/2, erase/2, monitored/1, is_empty/1]).
-compile({no_auto_import, [monitor/2]}).
+-record(state, {dict, module}).
--opaque(?MODULE() :: dict()).
+-opaque(?MODULE() :: #state{dict :: dict(),
+ module :: atom()}).
-type(item() :: pid() | {atom(), node()}).
-spec(new/0 :: () -> ?MODULE()).
+-spec(new/1 :: ('erlang' | 'delegate') -> ?MODULE()).
-spec(monitor/2 :: (item(), ?MODULE()) -> ?MODULE()).
-spec(monitor_all/2 :: ([item()], ?MODULE()) -> ?MODULE()).
-spec(demonitor/2 :: (item(), ?MODULE()) -> ?MODULE()).
@@ -42,29 +46,33 @@
-new() -> dict:new().
+new() -> new(erlang).
+new(Module) -> #state{dict = dict:new(),
+ module = Module}.
-monitor(Item, M) ->
+monitor(Item, S = #state{dict = M, module = Module}) ->
case dict:is_key(Item, M) of
- true -> M;
- false -> dict:store(Item, erlang:monitor(process, Item), M)
+ true -> S;
+ false -> S#state{dict = dict:store(
+ Item, Module:monitor(process, Item), M)}
-monitor_all([], M) -> M; %% optimisation
-monitor_all([Item], M) -> monitor(Item, M); %% optimisation
-monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items).
+monitor_all([], S) -> S; %% optimisation
+monitor_all([Item], S) -> monitor(Item, S); %% optimisation
+monitor_all(Items, S) -> lists:foldl(fun monitor/2, S, Items).
-demonitor(Item, M) ->
+demonitor(Item, S = #state{dict = M, module = Module}) ->
case dict:find(Item, M) of
- {ok, MRef} -> erlang:demonitor(MRef),
- dict:erase(Item, M);
+ {ok, MRef} -> Module:demonitor(MRef),
+ S#state{dict = dict:erase(Item, M)};
error -> M
-is_monitored(Item, M) -> dict:is_key(Item, M).
+is_monitored(Item, #state{dict = M}) -> dict:is_key(Item, M).
-erase(Item, M) -> dict:erase(Item, M).
+erase(Item, S = #state{dict = M}) -> S#state{dict = dict:erase(Item, M)}.
-monitored(M) -> dict:fetch_keys(M).
+monitored(#state{dict = M}) -> dict:fetch_keys(M).
-is_empty(M) -> dict:size(M) == 0.
+is_empty(#state{dict = M}) -> dict:size(M) == 0.
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 3d9e7c6a..6995c3be 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -51,7 +51,7 @@
-type(q() :: pqueue()).
-type(priority() :: integer() | 'infinity').
--type(squeue() :: {queue, [any()], [any()]}).
+-type(squeue() :: {queue, [any()], [any()], non_neg_integer()}).
-type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}).
-spec(new/0 :: () -> pqueue()).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 46e3d0e4..eae3b802 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -192,22 +192,6 @@
-define(APPS, [os_mon, mnesia, rabbit]).
-%% see bug 24513 for how this list was created
- [rabbit_reader, rabbit_channel, gen_server2,
- rabbit_exchange, rabbit_command_assembler, rabbit_framing_amqp_0_9_1,
- rabbit_basic, rabbit_event, lists, queue, priority_queue,
- rabbit_router, rabbit_trace, rabbit_misc, rabbit_binary_parser,
- rabbit_exchange_type_direct, rabbit_guid, rabbit_net,
- rabbit_amqqueue_process, rabbit_variable_queue,
- rabbit_binary_generator, rabbit_writer, delegate, gb_sets, lqueue,
- sets, orddict, rabbit_amqqueue, rabbit_limiter, gb_trees,
- rabbit_queue_index, gen, dict, ordsets, file_handle_cache,
- rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file,
- rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia,
- mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon,
- ssl_connection, ssl_record, gen_fsm, ssl]).
%% HiPE compilation uses multiple cores anyway, but some bits are
%% IO-bound so we can go faster if we parallelise a bit more. In
%% practice 2 processes seems just as fast as any other number > 1,
@@ -281,7 +265,9 @@ warn_if_hipe_compilation_failed(false) ->
%% long time, so make an exception to our no-stdout policy and display
%% progress via stdout.
hipe_compile() ->
- Count = length(?HIPE_WORTHY),
+ {ok, HipeModulesAll} = application:get_env(rabbit, hipe_modules),
+ HipeModules = [HM || HM <- HipeModulesAll, code:which(HM) =/= non_existing],
+ Count = length(HipeModules),
io:format("~nHiPE compiling: |~s|~n |",
[string:copies("-", Count)]),
T1 = erlang:now(),
@@ -290,7 +276,7 @@ hipe_compile() ->
end || M <- Ms]
end) ||
+ Ms <- split(HipeModules, ?HIPE_PROCESSES)],
{'DOWN', MRef, process, _, normal} -> ok;
{'DOWN', MRef, process, _, Reason} -> exit(Reason)
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ebbe4bab..6e0eb9bf 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -146,7 +146,7 @@ init_state(Q) ->
exclusive_consumer = none,
has_had_consumers = false,
active_consumers = queue:new(),
- senders = pmon:new(),
+ senders = pmon:new(delegate),
msg_id_to_channel = gb_trees:empty(),
status = running},
rabbit_event:init_stats_timer(State, #q.stats_timer).
@@ -549,10 +549,8 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
{{Message, Delivered, undefined},
true, discard(Delivery, State1)}
end, false, State#q{backing_queue_state = BQS1});
- {published, BQS1} ->
- {true, State#q{backing_queue_state = BQS1}};
- {discarded, BQS1} ->
- {true, discard(Delivery, State#q{backing_queue_state = BQS1})}
+ {true, BQS1} ->
+ {true, State#q{backing_queue_state = BQS1}}
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl
index 5739c7f3..a5b91867 100644
--- a/src/rabbit_autoheal.erl
+++ b/src/rabbit_autoheal.erl
@@ -181,8 +181,7 @@ partition_value(Partition) ->
all_partitions(PartitionedWith) ->
Nodes = rabbit_mnesia:cluster_nodes(all),
Partitions = [{node(), PartitionedWith} |
- [rpc:call(Node, rabbit_node_monitor, partitions, [])
- || Node <- Nodes -- [node()]]],
+ rabbit_node_monitor:partitions(Nodes -- [node()])],
all_partitions(Partitions, [Nodes]).
all_partitions([], Partitions) ->
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index f05e46e9..61b504bc 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -90,10 +90,7 @@
-> {ack(), state()}.
%% Called to inform the BQ about messages which have reached the
-%% queue, but are not going to be further passed to BQ for some
-%% reason. Note that this may be invoked for messages for which
-%% BQ:is_duplicate/2 has already returned {'published' | 'discarded',
-%% BQS}.
+%% queue, but are not going to be further passed to BQ.
-callback discard(rabbit_types:msg_id(), pid(), state()) -> state().
%% Return ids of messages which have been confirmed since the last
@@ -216,11 +213,10 @@
-callback invoke(atom(), fun ((atom(), A) -> A), state()) -> state().
%% Called prior to a publish or publish_delivered call. Allows the BQ
-%% to signal that it's already seen this message (and in what capacity
-%% - i.e. was it published previously or discarded previously) and
-%% thus the message should be dropped.
+%% to signal that it's already seen this message, (e.g. it was published
+%% or discarded previously) and thus the message should be dropped.
-callback is_duplicate(rabbit_types:basic_message(), state())
- -> {'false'|'published'|'discarded', state()}.
+ -> {boolean(), state()}.
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index 94891629..fee377e7 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -42,11 +42,20 @@ start_link() ->
{collector, {rabbit_queue_collector, start_link, []},
intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
+ %% We need to get channels in the hierarchy here so they close
+ %% before the reader. But for 1.0 readers we can't start the real
+ %% ch_sup_sup (because we don't know if we will be 0-9-1 or 1.0) -
+ %% so we add another supervisor into the hierarchy.
+ {ok, ChannelSup3Pid} =
+ supervisor2:start_child(
+ SupPid,
+ {channel_sup3, {rabbit_intermediate_sup, start_link, []},
+ intrinsic, infinity, supervisor, [rabbit_intermediate_sup]}),
{ok, ReaderPid} =
{reader, {rabbit_reader, start_link,
- [SupPid, Collector,
+ [ChannelSup3Pid, Collector,
intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),
{ok, SupPid, ReaderPid}.
diff --git a/src/rabbit_intermediate_sup.erl b/src/rabbit_intermediate_sup.erl
new file mode 100644
index 00000000..1919d9d6
--- /dev/null
+++ b/src/rabbit_intermediate_sup.erl
@@ -0,0 +1,39 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%% The Original Code is RabbitMQ.
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+start_link() ->
+ supervisor2:start_link(?MODULE, []).
+init([]) ->
+ {ok, {{one_for_one, 10, 10}, []}}.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 6791389e..3abd81f5 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -225,21 +225,10 @@ discard(MsgId, ChPid, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
seen_status = SS }) ->
- %% It's a massive error if we get told to discard something that's
- %% already been published or published-and-confirmed. To do that
- %% would require non FIFO access. Hence we should not find
- %% 'published' or 'confirmed' in this dict:find.
- case dict:find(MsgId, SS) of
- error ->
- ok = gm:broadcast(GM, {discard, ChPid, MsgId}),
- BQS1 = BQ:discard(MsgId, ChPid, BQS),
- ensure_monitoring(
- ChPid, State #state {
- backing_queue_state = BQS1,
- seen_status = dict:erase(MsgId, SS) });
- {ok, discarded} ->
- State
- end.
+ false = dict:is_key(MsgId, SS), %% ASSERTION
+ ok = gm:broadcast(GM, {discard, ChPid, MsgId}),
+ ensure_monitoring(ChPid, State #state { backing_queue_state =
+ BQ:discard(MsgId, ChPid, BQS) }).
dropwhile(Pred, State = #state{backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -393,8 +382,9 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% immediately after calling is_duplicate). The msg is
%% invalid. We will not see this again, nor will we be
%% further involved in confirming this message, so erase.
- {published, State #state { seen_status = dict:erase(MsgId, SS) }};
- {ok, confirmed} ->
+ {true, State #state { seen_status = dict:erase(MsgId, SS) }};
+ {ok, Disposition}
+ when Disposition =:= confirmed
%% It got published when we were a slave via gm, and
%% confirmed some time after that (maybe even after
%% promotion), but before we received the publish from the
@@ -403,12 +393,12 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% need to confirm now. As above, amqqueue_process will
%% have the entry for the msg_id_to_channel mapping added
%% immediately after calling is_duplicate/2.
- {published, State #state { seen_status = dict:erase(MsgId, SS),
- confirmed = [MsgId | Confirmed] }};
- {ok, discarded} ->
- %% Don't erase from SS here because discard/2 is about to
- %% be called and we need to be able to detect this case
- {discarded, State}
+ orelse Disposition =:= discarded ->
+ %% Message was discarded while we were a slave. Confirm now.
+ %% As above, amqqueue_process will have the entry for the
+ %% msg_id_to_channel mapping.
+ {true, State #state { seen_status = dict:erase(MsgId, SS),
+ confirmed = [MsgId | Confirmed] }}
%% ---------------------------------------------------------------------------
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 38e0da3f..1996fd0a 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -100,7 +100,7 @@ init(Q = #amqqueue { name = QName }) ->
Node = node(),
case rabbit_misc:execute_mnesia_transaction(
fun() -> init_it(Self, GM, Node, QName) end) of
- {new, QPid} ->
+ {new, QPid, GMPids} ->
erlang:monitor(process, QPid),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [Self]),
@@ -120,13 +120,14 @@ init(Q = #amqqueue { name = QName }) ->
msg_id_ack = dict:new(),
msg_id_status = dict:new(),
- known_senders = pmon:new(),
+ known_senders = pmon:new(delegate),
depth_delta = undefined
infos(?CREATION_EVENT_KEYS, State)),
ok = gm:broadcast(GM, request_depth),
+ ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]),
{ok, State, hibernate,
@@ -144,7 +145,7 @@ init_it(Self, GM, Node, QName) ->
mnesia:read({rabbit_queue, QName}),
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of
[] -> add_slave(Q, Self, GM),
- {new, QPid};
+ {new, QPid, GMPids};
[QPid] -> case rabbit_misc:is_process_alive(QPid) of
true -> duplicate_live_master;
false -> {stale, QPid}
@@ -156,7 +157,7 @@ init_it(Self, GM, Node, QName) ->
gm_pids = [T || T = {_, S} <- GMPids,
S =/= SPid] },
add_slave(Q1, Self, GM),
- {new, QPid}
+ {new, QPid, GMPids}
@@ -273,7 +274,8 @@ handle_info({'DOWN', _MonitorRef, process, MPid, _Reason},
handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) ->
- noreply(local_sender_death(ChPid, State));
+ local_sender_death(ChPid, State),
+ noreply(State);
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
@@ -604,7 +606,7 @@ stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref).
ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
State #state { known_senders = pmon:monitor(ChPid, KS) }.
-local_sender_death(ChPid, State = #state { known_senders = KS }) ->
+local_sender_death(ChPid, #state { known_senders = KS }) ->
%% The channel will be monitored iff we have received a delivery
%% from it but not heard about its death from the master. So if it
%% is monitored we need to point the death out to the master (see
@@ -612,8 +614,7 @@ local_sender_death(ChPid, State = #state { known_senders = KS }) ->
ok = case pmon:is_monitored(ChPid, KS) of
false -> ok;
true -> confirm_sender_death(ChPid)
- end,
- State.
+ end.
confirm_sender_death(Pid) ->
%% We have to deal with the possibility that we'll be promoted to
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index ea9bc7d7..5fa29b7e 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -56,7 +56,8 @@
%% Main interface
-spec(init/0 :: () -> 'ok').
--spec(join_cluster/2 :: (node(), node_type()) -> 'ok').
+-spec(join_cluster/2 :: (node(), node_type())
+ -> 'ok' | {'ok', 'already_member'}).
-spec(reset/0 :: () -> 'ok').
-spec(force_reset/0 :: () -> 'ok').
-spec(update_cluster_nodes/1 :: (node()) -> 'ok').
@@ -164,23 +165,24 @@ join_cluster(DiscoveryNode, NodeType) ->
{error, _} = E -> throw(E)
case me_in_nodes(ClusterNodes) of
- true -> e(already_clustered);
- false -> ok
- end,
- %% reset the node. this simplifies things and it will be needed in
- %% this case - we're joining a new cluster with new nodes which
- %% are not in synch with the current node. I also lifts the burden
- %% of reseting the node from the user.
- reset_gracefully(),
- %% Join the cluster
- rabbit_misc:local_info_msg("Clustering with ~p as ~p node~n",
- [ClusterNodes, NodeType]),
- ok = init_db_with_mnesia(ClusterNodes, NodeType, true, true),
- rabbit_node_monitor:notify_joined_cluster(),
- ok.
+ false ->
+ %% reset the node. this simplifies things and it will be needed in
+ %% this case - we're joining a new cluster with new nodes which
+ %% are not in synch with the current node. I also lifts the burden
+ %% of reseting the node from the user.
+ reset_gracefully(),
+ %% Join the cluster
+ rabbit_misc:local_info_msg("Clustering with ~p as ~p node~n",
+ [ClusterNodes, NodeType]),
+ ok = init_db_with_mnesia(ClusterNodes, NodeType, true, true),
+ rabbit_node_monitor:notify_joined_cluster(),
+ ok;
+ true ->
+ rabbit_misc:local_info_msg("Already member of cluster: ~p~n",
+ [ClusterNodes]),
+ {ok, already_member}
+ end.
%% return node to its virgin state, where it is not member of any
%% cluster, has no cluster configuration, no local database, and no
@@ -294,27 +296,18 @@ remove_node_offline_node(Node) ->
%% this operation from disc nodes.
case {mnesia:system_info(running_db_nodes) -- [Node], node_type()} of
{[], disc} ->
- %% Note that while we check if the nodes was the last to go down,
- %% apart from the node we're removing from, this is still unsafe.
- %% Consider the situation in which A and B are clustered. A goes
- %% down, and records B as the running node. Then B gets clustered
- %% with C, C goes down and B goes down. In this case, C is the
- %% second-to-last, but we don't know that and we'll remove B from A
- %% anyway, even if that will lead to bad things.
- case cluster_nodes(running) -- [node(), Node] of
- [] -> start_mnesia(),
- try
- %% What we want to do here is replace the last node to
- %% go down with the current node. The way we do this
- %% is by force loading the table, and making sure that
- %% they are loaded.
- rabbit_table:force_load(),
- rabbit_table:wait_for_replicated(),
- forget_cluster_node(Node, false)
- after
- stop_mnesia()
- end;
- _ -> e(not_last_node_to_go_down)
+ start_mnesia(),
+ try
+ %% What we want to do here is replace the last node to
+ %% go down with the current node. The way we do this
+ %% is by force loading the table, and making sure that
+ %% they are loaded.
+ rabbit_table:force_load(),
+ rabbit_table:wait_for_replicated(),
+ forget_cluster_node(Node, false),
+ force_load_next_boot()
+ after
+ stop_mnesia()
{_, _} ->
@@ -339,8 +332,7 @@ status() ->
mnesia_partitions(Nodes) ->
- {Replies, _BadNodes} = rpc:multicall(
- Nodes, rabbit_node_monitor, partitions, []),
+ Replies = rabbit_node_monitor:partitions(Nodes),
[Reply || Reply = {_, R} <- Replies, R =/= []].
is_running() -> mnesia:system_info(is_running) =:= yes.
@@ -439,11 +431,13 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) ->
ok = create_schema();
{[], true, disc} ->
%% First disc node up
+ maybe_force_load(),
{[AnotherNode | _], _, _} ->
%% Subsequent node in cluster, catch up
rpc:call(AnotherNode, rabbit_version, recorded, [])),
+ maybe_force_load(),
ok = rabbit_table:wait_for_replicated(),
ok = rabbit_table:create_local_copy(NodeType)
@@ -523,6 +517,19 @@ copy_db(Destination) ->
ok = ensure_mnesia_not_running(),
rabbit_file:recursive_copy(dir(), Destination).
+force_load_filename() ->
+ filename:join(rabbit_mnesia:dir(), "force_load").
+force_load_next_boot() ->
+ rabbit_file:write_file(force_load_filename(), <<"">>).
+maybe_force_load() ->
+ case rabbit_file:is_file(force_load_filename()) of
+ true -> rabbit_table:force_load(),
+ rabbit_file:delete(force_load_filename());
+ false -> ok
+ end.
%% This does not guarantee us much, but it avoids some situations that
%% will definitely end up badly
check_cluster_consistency() ->
@@ -853,10 +860,6 @@ error_description(clustering_only_disc_node) ->
error_description(resetting_only_disc_node) ->
"You cannot reset a node when it is the only disc node in a cluster. "
"Please convert another node of the cluster to a disc node first.";
-error_description(already_clustered) ->
- "You are already clustered with the nodes you have selected. If the "
- "node you are trying to cluster with is not present in the current "
- "node status, use 'update_cluster_nodes'.";
error_description(not_clustered) ->
"Non-clustered nodes can only be disc nodes.";
error_description(cannot_connect_to_cluster) ->
@@ -879,10 +882,6 @@ error_description(offline_node_no_offline_flag) ->
"You are trying to remove a node from an offline node. That is dangerous, "
"but can be done with the --offline flag. Please consult the manual "
"for rabbitmqctl for more information.";
-error_description(not_last_node_to_go_down) ->
- "The node you are trying to remove from was not the last to go down "
- "(excluding the node you are removing). Please use the the last node "
- "to go down to remove nodes when the cluster is offline.";
error_description(removing_node_from_offline_node) ->
"To remove a node remotely from an offline node, the node you are removing "
"from must be a disc node and all the other nodes must be offline.";
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 6ed6239c..46cfabe3 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -145,7 +145,8 @@ start() -> rabbit_sup:start_supervisor_child(
ensure_ssl() ->
- ok = app_utils:start_applications([asn1, crypto, public_key, ssl]),
+ {ok, SslAppsConfig} = application:get_env(rabbit, ssl_apps),
+ ok = app_utils:start_applications(SslAppsConfig),
{ok, SslOptsConfig} = application:get_env(rabbit, ssl_options),
% unknown_ca errors are silently ignored prior to R14B unless we
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index c1de914f..805f1b2b 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -24,7 +24,7 @@
write_cluster_status/1, read_cluster_status/0,
update_cluster_status/0, reset_cluster_status/0]).
-export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]).
--export([partitions/0, subscribe/1]).
+-export([partitions/0, partitions/1, subscribe/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -57,7 +57,8 @@
-spec(notify_joined_cluster/0 :: () -> 'ok').
-spec(notify_left_cluster/1 :: (node()) -> 'ok').
--spec(partitions/0 :: () -> {node(), [node()]}).
+-spec(partitions/0 :: () -> [node()]).
+-spec(partitions/1 :: ([node()]) -> [{node(), [node()]}]).
-spec(subscribe/1 :: (pid()) -> 'ok').
-spec(all_rabbit_nodes_up/0 :: () -> boolean()).
@@ -187,6 +188,10 @@ notify_left_cluster(Node) ->
partitions() ->
gen_server:call(?SERVER, partitions, infinity).
+partitions(Nodes) ->
+ {Replies, _} = gen_server:multi_call(Nodes, ?SERVER, partitions, infinity),
+ Replies.
subscribe(Pid) ->
gen_server:cast(?SERVER, {subscribe, Pid}).
@@ -208,7 +213,7 @@ init([]) ->
autoheal = rabbit_autoheal:init()}}.
handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
- {reply, {node(), Partitions}, State};
+ {reply, Partitions, State};
handle_call(_Request, _From, State) ->
{noreply, State}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 5e633f23..9b6039d1 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -37,7 +37,7 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
- conn_sup_pid, channel_sup_sup_pid, start_heartbeat_fun,
+ ch_sup3_pid, channel_sup_sup_pid, start_heartbeat_fun,
buf, buf_len, throttle}).
-record(connection, {name, host, peer_host, port, peer_port,
@@ -103,19 +103,19 @@
-start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) ->
- {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid,
+start_link(ChannelSup3Pid, Collector, StartHeartbeatFun) ->
+ {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSup3Pid,
Collector, StartHeartbeatFun])}.
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
-init(Parent, ConnSupPid, Collector, StartHeartbeatFun) ->
+init(Parent, ChSup3Pid, Collector, StartHeartbeatFun) ->
Deb = sys:debug_options([]),
{go, Sock, SockTransform} ->
- Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, Sock,
+ Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, Sock,
@@ -201,7 +201,7 @@ socket_op(Sock, Fun) ->
-start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb,
+start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb,
Sock, SockTransform) ->
process_flag(trap_exit, true),
Name = case rabbit_net:connection_string(Sock, inbound) of
@@ -240,7 +240,7 @@ start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb,
connection_state = pre_init,
queue_collector = Collector,
heartbeater = none,
- conn_sup_pid = ConnSupPid,
+ ch_sup3_pid = ChSup3Pid,
channel_sup_sup_pid = none,
start_heartbeat_fun = StartHeartbeatFun,
buf = [],
@@ -756,6 +756,9 @@ refuse_connection(Sock, Exception, {A, B, C, D}) ->
ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",A,B,C,D>>) end),
+-spec(refuse_connection/2 :: (rabbit_net:socket(), any()) -> no_return()).
refuse_connection(Sock, Exception) ->
refuse_connection(Sock, Exception, {0, 0, 9, 1}).
@@ -837,7 +840,7 @@ handle_method0(#''{virtual_host = VHostPath},
connection = Connection = #connection{
user = User,
protocol = Protocol},
- conn_sup_pid = ConnSupPid,
+ ch_sup3_pid = ChSup3Pid,
sock = Sock,
throttle = Throttle}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
@@ -847,7 +850,7 @@ handle_method0(#''{virtual_host = VHostPath},
Throttle1 = Throttle#throttle{conserve_resources = Conserve},
{ok, ChannelSupSupPid} =
- ConnSupPid,
+ ChSup3Pid,
{channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
State1 = control_throttle(
@@ -1048,9 +1051,9 @@ pack_for_1_0(#v1{parent = Parent,
recv_len = RecvLen,
pending_recv = PendingRecv,
queue_collector = QueueCollector,
- conn_sup_pid = ConnSupPid,
+ ch_sup3_pid = ChSup3Pid,
start_heartbeat_fun = SHF,
buf = Buf,
buf_len = BufLen}) ->
- {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ConnSupPid, SHF,
+ {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ChSup3Pid, SHF,
Buf, BufLen}.
diff --git a/ b/
new file mode 100644
index 00000000..5683af4a
--- /dev/null
+++ b/
@@ -0,0 +1 @@