summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-09-14 13:00:51 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-09-14 13:00:51 +0100
commit258540e323cd2465799e18db6b882487dcfc186a (patch)
treec4039eb4c9bebfc994c4cfd4ee2091eb5fc48177
parentef341f575b7a96c4c0c958e9e3400dcf73991597 (diff)
parent2f7f8b83f592d9b71b34cc1fc258d04e5f6563f2 (diff)
downloadrabbitmq-server-258540e323cd2465799e18db6b882487dcfc186a.tar.gz
merge default
-rw-r--r--docs/rabbitmqctl.1.xml195
-rw-r--r--ebin/rabbit_app.in2
-rw-r--r--src/rabbit.erl16
-rw-r--r--src/rabbit_control_main.erl46
-rw-r--r--src/rabbit_misc.erl13
-rw-r--r--src/rabbit_mnesia.erl1304
-rw-r--r--src/rabbit_node_monitor.erl257
-rw-r--r--src/rabbit_tests.erl270
-rw-r--r--src/rabbit_upgrade.erl18
9 files changed, 1299 insertions, 822 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 6d93db4c..1af93e85 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -288,105 +288,182 @@
<title>Cluster management</title>
<variablelist>
- <varlistentry id="cluster">
- <term><cmdsynopsis><command>cluster</command> <arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term>
+ <varlistentry id="join_cluster">
+ <term><cmdsynopsis><command>join_cluster</command> <arg choice="req"><replaceable>clusternode</replaceable></arg><arg choice="opt"><replaceable>--ram</replaceable></arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
<term>clusternode</term>
- <listitem><para>Subset of the nodes of the cluster to which this node should be connected.</para></listitem>
+ <listitem><para>Node to cluster with.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><arg choice="opt">--ram</arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ If provided, the node will join the cluster as a RAM node.
+ </para>
+ </listitem>
</varlistentry>
</variablelist>
<para>
- Instruct the node to become member of a cluster with the
- specified nodes. To cluster with currently offline nodes,
- use <link linkend="force_cluster"><command>force_cluster</command></link>.
+ Instruct the node to become a member of the cluster that the
+ specified node is in. Before clustering, the node is reset, so be
+ careful when using this command. For this command to succeed the
+ RabbitMQ application must have been stopped, e.g. with <link
+ linkend="stop_app"><command>stop_app</command></link>.
</para>
<para>
- Cluster nodes can be of two types: disk or ram. Disk nodes
- replicate data in ram and on disk, thus providing
- redundancy in the event of node failure and recovery from
- global events such as power failure across all nodes. Ram
- nodes replicate data in ram only and are mainly used for
- scalability. A cluster must always have at least one disk node.
+ Cluster nodes can be of two types: disk or RAM. Disk nodes
+ replicate data in RAM and on disk, thus providing redundancy in
+ the event of node failure and recovery from global events such
+ as power failure across all nodes. RAM nodes replicate data in
+ RAM only (with the exception of queue contents, which can reside
+ on disc if the queue is persistent or too big to fit in memory)
+ and are mainly used for scalability. RAM nodes are more
+ performant only when managing resources (e.g. adding/removing
+ queues, exchanges, or bindings). A cluster must always have at
+ least one disk node, and usually should have more than one.
</para>
<para>
- If the current node is to become a disk node it needs to
- appear in the cluster node list. Otherwise it becomes a
- ram node. If the node list is empty or only contains the
- current node then the node becomes a standalone,
- i.e. non-clustered, (disk) node.
+ The node will be a disk node by default. If you wish to
+ create a RAM node, provide the <command>--ram</command> flag.
</para>
<para>
After executing the <command>cluster</command> command, whenever
- the RabbitMQ application is started on the current node it
- will attempt to connect to the specified nodes, thus
- becoming an active node in the cluster comprising those
- nodes (and possibly others).
+ the RabbitMQ application is started on the current node it will
+ attempt to connect to the nodes that were in the cluster when the
+ node went down.
+ </para>
+ <para>
+ To leave a cluster, <command>reset</command> the node. You can
+ also remove nodes remotely with the
+ <command>forget_cluster_node</command> command.
</para>
<para>
- The list of nodes does not have to contain all the
- cluster's nodes; a subset is sufficient. Also, clustering
- generally succeeds as long as at least one of the
- specified nodes is active. Hence adjustments to the list
- are only necessary if the cluster configuration is to be
- altered radically.
+ For more details see the <ulink
+ url="http://www.rabbitmq.com/clustering.html">clustering
+ guide</ulink>.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl join_cluster hare@elena --ram</screen>
+ <para role="example">
+ This command instructs the RabbitMQ node to join the cluster that
+ <command>hare@elena</command> is part of, as a ram node.
</para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>cluster_status</command></cmdsynopsis></term>
+ <listitem>
<para>
- For this command to succeed the RabbitMQ application must
- have been stopped, e.g. with <link linkend="stop_app"><command>stop_app</command></link>. Furthermore,
- turning a standalone node into a clustered node requires
- the node be <link linkend="reset"><command>reset</command></link> first,
- in order to avoid accidental destruction of data with the
- <command>cluster</command> command.
+ Displays all the nodes in the cluster grouped by node type,
+ together with the currently running nodes.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl cluster_status</screen>
+ <para role="example">
+ This command displays the nodes in the cluster.
</para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ <variablelist>
+ <varlistentry>
+ <term>
+ <cmdsynopsis>
+ <command>change_cluster_node_type</command>
+ <arg choice="req">
+ disk | ram
+ </arg>
+ </cmdsynopsis>
+ </term>
+ <listitem>
<para>
- For more details see the <ulink url="http://www.rabbitmq.com/clustering.html">clustering guide</ulink>.
+ Changes the type of the cluster node. The node must be stopped for
+ this operation to succeed, and when turning a node into a RAM node
+ the node must not be the only disk node in the cluster.
</para>
<para role="example-prefix">For example:</para>
- <screen role="example">rabbitmqctl cluster rabbit@tanto hare@elena</screen>
+ <screen role="example">rabbitmqctl change_cluster_node_type disk</screen>
<para role="example">
- This command instructs the RabbitMQ node to join the
- cluster with nodes <command>rabbit@tanto</command> and
- <command>hare@elena</command>. If the node is one of these then
- it becomes a disk node, otherwise a ram node.
+ This command will turn a RAM node into a disk node.
</para>
</listitem>
</varlistentry>
- <varlistentry id="force_cluster">
- <term><cmdsynopsis><command>force_cluster</command> <arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term>
+ </variablelist>
+ <variablelist>
+ <varlistentry>
+ <term>
+ <cmdsynopsis>
+ <command>forget_cluster_node</command>
+ <arg choice="opt">--offline</arg>
+ </cmdsynopsis>
+ </term>
<listitem>
<variablelist>
<varlistentry>
- <term>clusternode</term>
- <listitem><para>Subset of the nodes of the cluster to which this node should be connected.</para></listitem>
+ <term><cmdsynopsis><arg choice="opt">--offline</arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Enables node removal from an offline node. This is only
+ useful in the situation where all the nodes are offline and
+ the last node to go down cannot be brought online, thus
+ preventing the whole cluster from starting. It should not be
+ used in any other circumstances since it can lead to
+ inconsistencies.
+ </para>
+ </listitem>
</varlistentry>
</variablelist>
<para>
- Instruct the node to become member of a cluster with the
- specified nodes. This will succeed even if the specified nodes
- are offline. For a more detailed description, see
- <link linkend="cluster"><command>cluster</command>.</link>
+ Removes a cluster node remotely. The node that is being removed
+ must be offline, while the node we are removing from must be
+ online, except when using the <command>--offline</command> flag.
</para>
- <para>
- Note that this variant of the cluster command just
- ignores the current status of the specified nodes.
- Clustering may still fail for a variety of other
- reasons.
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl -n hare@mcnulty forget_cluster_node rabbit@stringer</screen>
+ <para role="example">
+ This command will remove the node
+ <command>rabbit@stringer</command> from the node
+ <command>hare@mcnulty</command>.
</para>
</listitem>
</varlistentry>
+ </variablelist>
+ <variablelist>
<varlistentry>
- <term><cmdsynopsis><command>cluster_status</command></cmdsynopsis></term>
+ <term>
+ <cmdsynopsis>
+ <command>update_cluster_nodes</command>
+ <arg choice="req">clusternode</arg>
+ </cmdsynopsis>
+ </term>
<listitem>
+ <variablelist>
+ <varlistentry>
+ <term>clusternode</term>
+ <listitem>
+ <para>
+ The node to consult for up to date information.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
<para>
- Displays all the nodes in the cluster grouped by node type,
- together with the currently running nodes.
+ Instructs an already clustered node to contact
+ <command>clusternode</command> to cluster when waking up. This is
+ different from <command>join_cluster</command> since it does not
+ join any cluster - it checks that the node is already in a cluster
+ with <command>clusternode</command>.
</para>
- <para role="example-prefix">For example:</para>
- <screen role="example">rabbitmqctl cluster_status</screen>
- <para role="example">
- This command displays the nodes in the cluster.
+ <para>
+ The need for this command is motivated by the fact that clusters
+ can change while a node is offline. Consider the situation in
+ which node A and B are clustered. A goes down, C clusters with B,
+ and then B leaves the cluster. When A wakes up, it'll try to
+ contact B, but this will fail since B is not in the cluster
+ anymore. <command>update_cluster_nodes -n A C</command> will solve
+ this situation.
</para>
</listitem>
</varlistentry>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 087c62a9..78842281 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -33,7 +33,7 @@
{default_user_tags, [administrator]},
{default_vhost, <<"/">>},
{default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
- {cluster_nodes, []},
+ {cluster_nodes, {[], true}},
{server_properties, []},
{collect_statistics, none},
{collect_statistics_interval, 5000},
diff --git a/src/rabbit.erl b/src/rabbit.erl
index ed258c71..7a021e37 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -176,7 +176,7 @@
-rabbit_boot_step({notify_cluster,
[{description, "notify cluster nodes"},
- {mfa, {rabbit_node_monitor, notify_cluster, []}},
+ {mfa, {rabbit_node_monitor, notify_node_up, []}},
{requires, networking}]}).
%%---------------------------------------------------------------------------
@@ -300,6 +300,8 @@ start() ->
%% We do not want to HiPE compile or upgrade
%% mnesia after just restarting the app
ok = ensure_application_loaded(),
+ ok = rabbit_node_monitor:prepare_cluster_status_files(),
+ ok = rabbit_mnesia:check_cluster_consistency(),
ok = ensure_working_log_handlers(),
ok = app_utils:start_applications(app_startup_order()),
ok = print_plugin_info(rabbit_plugins:active())
@@ -309,8 +311,13 @@ boot() ->
start_it(fun() ->
ok = ensure_application_loaded(),
maybe_hipe_compile(),
+ ok = rabbit_node_monitor:prepare_cluster_status_files(),
ok = ensure_working_log_handlers(),
ok = rabbit_upgrade:maybe_upgrade_mnesia(),
+ %% It's important that the consistency check happens after
+ %% the upgrade, since if we are a secondary node the
+ %% primary node will have forgotten us
+ ok = rabbit_mnesia:check_cluster_consistency(),
Plugins = rabbit_plugins:setup(),
ToBeLoaded = Plugins ++ ?APPS,
ok = app_utils:load_applications(ToBeLoaded),
@@ -408,7 +415,6 @@ start(normal, []) ->
end.
stop(_State) ->
- ok = rabbit_mnesia:record_running_nodes(),
terminated_ok = error_logger:delete_report_handler(rabbit_error_logger),
ok = rabbit_alarm:stop(),
ok = case rabbit_mnesia:is_clustered() of
@@ -505,12 +511,12 @@ sort_boot_steps(UnsortedSteps) ->
end.
boot_step_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
+ AllNodes = rabbit_mnesia:all_clustered_nodes(),
{Err, Nodes} =
- case rabbit_mnesia:read_previously_running_nodes() of
+ case AllNodes -- [node()] of
[] -> {"Timeout contacting cluster nodes. Since RabbitMQ was"
" shut down forcefully~nit cannot determine which nodes"
- " are timing out. Details on all nodes will~nfollow.~n",
- rabbit_mnesia:all_clustered_nodes() -- [node()]};
+ " are timing out.~n", []};
Ns -> {rabbit_misc:format(
"Timeout contacting cluster nodes: ~p.~n", [Ns]),
Ns}
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 08b96757..bd01a1b1 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -25,10 +25,14 @@
-define(QUIET_OPT, "-q").
-define(NODE_OPT, "-n").
-define(VHOST_OPT, "-p").
+-define(RAM_OPT, "--ram").
+-define(OFFLINE_OPT, "--offline").
-define(QUIET_DEF, {?QUIET_OPT, flag}).
-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}).
-define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}).
+-define(RAM_DEF, {?RAM_OPT, flag}).
+-define(OFFLINE_DEF, {?OFFLINE_OPT, flag}).
-define(GLOBAL_DEFS(Node), [?QUIET_DEF, ?NODE_DEF(Node)]).
@@ -41,8 +45,10 @@
force_reset,
rotate_logs,
- cluster,
- force_cluster,
+ {join_cluster, [?RAM_DEF]},
+ change_cluster_node_type,
+ update_cluster_nodes,
+ {forget_cluster_node, [?OFFLINE_DEF]},
cluster_status,
add_user,
@@ -239,17 +245,31 @@ action(force_reset, Node, [], _Opts, Inform) ->
Inform("Forcefully resetting node ~p", [Node]),
call(Node, {rabbit_mnesia, force_reset, []});
-action(cluster, Node, ClusterNodeSs, _Opts, Inform) ->
- ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
- Inform("Clustering node ~p with ~p",
- [Node, ClusterNodes]),
- rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]);
-
-action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) ->
- ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
- Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)",
- [Node, ClusterNodes]),
- rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]);
+action(join_cluster, Node, [ClusterNodeS], Opts, Inform) ->
+ ClusterNode = list_to_atom(ClusterNodeS),
+ DiscNode = not proplists:get_bool(?RAM_OPT, Opts),
+ Inform("Clustering node ~p with ~p", [Node, ClusterNode]),
+ rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, DiscNode]);
+
+action(change_cluster_node_type, Node, ["ram"], _Opts, Inform) ->
+ Inform("Turning ~p into a ram node", [Node]),
+ rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [ram]);
+action(change_cluster_node_type, Node, [Type], _Opts, Inform)
+ when Type =:= "disc" orelse Type =:= "disk" ->
+ Inform("Turning ~p into a disc node", [Node]),
+ rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [disc]);
+
+action(update_cluster_nodes, Node, [ClusterNodeS], _Opts, Inform) ->
+ ClusterNode = list_to_atom(ClusterNodeS),
+ Inform("Updating cluster nodes for ~p from ~p", [Node, ClusterNode]),
+ rpc_call(Node, rabbit_mnesia, update_cluster_nodes, [ClusterNode]);
+
+action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) ->
+ ClusterNode = list_to_atom(ClusterNodeS),
+ RemoveWhenOffline = proplists:get_bool(?OFFLINE_OPT, Opts),
+ Inform("Removing node ~p from cluster", [ClusterNode]),
+ rpc_call(Node, rabbit_mnesia, forget_cluster_node,
+ [ClusterNode, RemoveWhenOffline]);
action(wait, Node, [PidFile], _Opts, Inform) ->
Inform("Waiting for ~p", [Node]),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 20f541e5..a0536a50 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -60,6 +60,8 @@
-export([multi_call/2]).
-export([os_cmd/1]).
-export([gb_sets_difference/2]).
+-export([version/0]).
+-export([sequence_error/1]).
-export([json_encode/1, json_decode/1, json_to_term/1, term_to_json/1]).
%% Horrible macro to use in guards
@@ -218,6 +220,9 @@
([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}).
-spec(os_cmd/1 :: (string()) -> string()).
-spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()).
+-spec(version/0 :: () -> string()).
+-spec(sequence_error/1 :: ([({'error', any()} | any())])
+ -> {'error', any()} | any()).
-spec(json_encode/1 :: (any()) -> {'ok', string()} | {'error', any()}).
-spec(json_decode/1 :: (string()) -> {'ok', any()} | 'error').
-spec(json_to_term/1 :: (any()) -> any()).
@@ -940,6 +945,14 @@ os_cmd(Command) ->
gb_sets_difference(S1, S2) ->
gb_sets:fold(fun gb_sets:delete_any/2, S1, S2).
+version() ->
+ {ok, VSN} = application:get_key(rabbit, vsn),
+ VSN.
+
+sequence_error([T]) -> T;
+sequence_error([{error, _} = Error | _]) -> Error;
+sequence_error([_ | Rest]) -> sequence_error(Rest).
+
json_encode(Term) ->
try
{ok, mochijson2:encode(Term)}
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 61b4054a..e1a68fc7 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -17,16 +17,42 @@
-module(rabbit_mnesia).
--export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
- cluster/1, force_cluster/1, reset/0, force_reset/0, init_db/3,
- is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0,
- empty_ram_only_tables/0, copy_db/1, wait_for_tables/1,
- create_cluster_nodes_config/1, read_cluster_nodes_config/0,
- record_running_nodes/0, read_previously_running_nodes/0,
- running_nodes_filename/0, is_disc_node/0, on_node_down/1,
- on_node_up/1]).
-
--export([table_names/0]).
+-export([init/0,
+ join_cluster/2,
+ reset/0,
+ force_reset/0,
+ update_cluster_nodes/1,
+ change_cluster_node_type/1,
+ forget_cluster_node/2,
+
+ status/0,
+ is_db_empty/0,
+ is_clustered/0,
+ all_clustered_nodes/0,
+ clustered_disc_nodes/0,
+ running_clustered_nodes/0,
+ is_disc_node/0,
+ dir/0,
+ table_names/0,
+ wait_for_tables/1,
+ cluster_status_from_mnesia/0,
+
+ init_db/3,
+ empty_ram_only_tables/0,
+ copy_db/1,
+ wait_for_tables/0,
+ check_cluster_consistency/0,
+ ensure_mnesia_dir/0,
+
+ on_node_up/1,
+ on_node_down/1
+ ]).
+
+%% Used internally in rpc calls
+-export([node_info/0,
+ remove_node_if_mnesia_running/1,
+ is_running_remote/0
+ ]).
%% create_tables/0 exported for helping embed RabbitMQ in or alongside
%% other mnesia-using Erlang applications, such as ejabberd
@@ -38,147 +64,144 @@
-ifdef(use_specs).
--export_type([node_type/0]).
+-export_type([node_type/0, cluster_status/0]).
--type(node_type() :: disc_only | disc | ram | unknown).
--spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
- {'running_nodes', [node()]}]).
--spec(dir/0 :: () -> file:filename()).
--spec(ensure_mnesia_dir/0 :: () -> 'ok').
+-type(node_type() :: disc | ram).
+-type(cluster_status() :: {ordsets:ordset(node()), ordsets:ordset(node()),
+ ordsets:ordset(node())}).
+
+%% Main interface
-spec(init/0 :: () -> 'ok').
--spec(init_db/3 :: ([node()], boolean(), rabbit_misc:thunk('ok')) -> 'ok').
--spec(is_db_empty/0 :: () -> boolean()).
--spec(cluster/1 :: ([node()]) -> 'ok').
--spec(force_cluster/1 :: ([node()]) -> 'ok').
--spec(cluster/2 :: ([node()], boolean()) -> 'ok').
+-spec(join_cluster/2 :: ([node()], boolean()) -> 'ok').
-spec(reset/0 :: () -> 'ok').
-spec(force_reset/0 :: () -> 'ok').
+-spec(update_cluster_nodes/1 :: (node()) -> 'ok').
+-spec(change_cluster_node_type/1 :: (node_type()) -> 'ok').
+-spec(forget_cluster_node/2 :: (node(), boolean()) -> 'ok').
+
+%% Various queries to get the status of the db
+-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
+ {'running_nodes', [node()]}]).
+-spec(is_db_empty/0 :: () -> boolean()).
-spec(is_clustered/0 :: () -> boolean()).
--spec(running_clustered_nodes/0 :: () -> [node()]).
-spec(all_clustered_nodes/0 :: () -> [node()]).
+-spec(clustered_disc_nodes/0 :: () -> [node()]).
+-spec(running_clustered_nodes/0 :: () -> [node()]).
+-spec(is_disc_node/0 :: () -> boolean()).
+-spec(dir/0 :: () -> file:filename()).
+-spec(table_names/0 :: () -> [atom()]).
+-spec(cluster_status_from_mnesia/0 :: () -> {'ok', cluster_status()} |
+ {'error', any()}).
+
+%% Operations on the db and utils, mainly used in `rabbit_upgrade' and `rabbit'
+-spec(init_db/3 :: ([node()], boolean(), boolean()) -> 'ok').
-spec(empty_ram_only_tables/0 :: () -> 'ok').
-spec(create_tables/0 :: () -> 'ok').
-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())).
-spec(wait_for_tables/1 :: ([atom()]) -> 'ok').
--spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok').
--spec(read_cluster_nodes_config/0 :: () -> [node()]).
--spec(record_running_nodes/0 :: () -> 'ok').
--spec(read_previously_running_nodes/0 :: () -> [node()]).
--spec(running_nodes_filename/0 :: () -> file:filename()).
--spec(is_disc_node/0 :: () -> boolean()).
+-spec(check_cluster_consistency/0 :: () -> 'ok').
+-spec(ensure_mnesia_dir/0 :: () -> 'ok').
+
+%% Hooks used in `rabbit_node_monitor'
-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
--spec(table_names/0 :: () -> [atom()]).
+%% Functions used in internal rpc calls
+-spec(node_info/0 :: () -> {string(), string(),
+ ({'ok', cluster_status()} | 'error')}).
+-spec(remove_node_if_mnesia_running/1 :: (node()) -> 'ok' |
+ {'error', term()}).
-endif.
%%----------------------------------------------------------------------------
-
-status() ->
- [{nodes, case mnesia:system_info(is_running) of
- yes -> [{Key, Nodes} ||
- {Key, CopyType} <- [{disc_only, disc_only_copies},
- {disc, disc_copies},
- {ram, ram_copies}],
- begin
- Nodes = nodes_of_type(CopyType),
- Nodes =/= []
- end];
- no -> case all_clustered_nodes() of
- [] -> [];
- Nodes -> [{unknown, Nodes}]
- end;
- Reason when Reason =:= starting; Reason =:= stopping ->
- exit({rabbit_busy, try_again_later})
- end},
- {running_nodes, running_clustered_nodes()}].
+%% Main interface
+%%----------------------------------------------------------------------------
init() ->
ensure_mnesia_running(),
ensure_mnesia_dir(),
- Nodes = read_cluster_nodes_config(),
- ok = init_db(Nodes, should_be_disc_node(Nodes)),
+ case is_virgin_node() of
+ true -> init_from_config();
+ false -> normal_init(is_disc_node(), all_clustered_nodes())
+ end,
%% We intuitively expect the global name server to be synced when
%% Mnesia is up. In fact that's not guaranteed to be the case - let's
%% make it so.
ok = global:sync(),
- ok = delete_previously_running_nodes(),
ok.
-is_db_empty() ->
- lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
- table_names()).
+normal_init(DiscNode, AllNodes) ->
+ init_db_and_upgrade(AllNodes, DiscNode, DiscNode).
+
+init_from_config() ->
+ {ok, {TryNodes, DiscNode}} =
+ application:get_env(rabbit, cluster_nodes),
+ case find_good_node(TryNodes -- [node()]) of
+ {ok, Node} ->
+ rabbit_log:info("Node '~p' selected for clustering from "
+ "configuration~n", [Node]),
+ {ok, {_, DiscNodes, _}} = discover_cluster(Node),
+ init_db_and_upgrade(DiscNodes, DiscNode, false),
+ rabbit_node_monitor:notify_joined_cluster();
+ none ->
+ rabbit_log:warning("Could not find any suitable node amongst the "
+ "ones provided in the configuration: ~p~n",
+ [TryNodes]),
+ normal_init(true, [node()])
+ end.
+
+%% Make the node join a cluster. The node will be reset automatically before we
+%% actually cluster it. The nodes provided will be used to find out about the
+%% nodes in the cluster.
+%% This function will fail if:
+%%
+%% * The node is currently the only disc node of its cluster
+%% * We can't connect to any of the nodes provided
+%% * The node is currently already clustered with the cluster of the nodes
+%% provided
+%%
+%% Note that we make no attempt to verify that the nodes provided are all in the
+%% same cluster, we simply pick the first online node and we cluster to its
+%% cluster.
+join_cluster(DiscoveryNode, WantDiscNode) ->
+ case is_disc_and_clustered() andalso [node()] =:= clustered_disc_nodes() of
+ true -> throw({error,
+ {standalone_ram_node,
+ "You can't cluster a node if it's the only "
+ "disc node in its existing cluster. If new nodes "
+ "joined while this node was offline, use "
+ "\"update_cluster_nodes\" to add them manually"}});
+ _ -> ok
+ end,
-cluster(ClusterNodes) ->
- cluster(ClusterNodes, false).
-force_cluster(ClusterNodes) ->
- cluster(ClusterNodes, true).
-
-%% Alter which disk nodes this node is clustered with. This can be a
-%% subset of all the disk nodes in the cluster but can (and should)
-%% include the node itself if it is to be a disk rather than a ram
-%% node. If Force is false, only connections to online nodes are
-%% allowed.
-cluster(ClusterNodes, Force) ->
- rabbit_misc:local_info_msg("Clustering with ~p~s~n",
- [ClusterNodes, if Force -> " forcefully";
- true -> ""
- end]),
ensure_mnesia_not_running(),
ensure_mnesia_dir(),
- case not Force andalso is_clustered() andalso
- is_only_disc_node(node(), false) andalso
- not should_be_disc_node(ClusterNodes)
- of
- true -> log_both("last running disc node leaving cluster");
- _ -> ok
- end,
+ {ClusterNodes, _, _} = case discover_cluster(DiscoveryNode) of
+ {ok, Res} -> Res;
+ E = {error, _} -> throw(E)
+ end,
- %% Wipe mnesia if we're changing type from disc to ram
- case {is_disc_node(), should_be_disc_node(ClusterNodes)} of
- {true, false} -> rabbit_misc:with_local_io(
- fun () -> error_logger:warning_msg(
- "changing node type; wiping "
- "mnesia...~n~n")
- end),
- rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
- cannot_delete_schema);
- _ -> ok
+ case lists:member(node(), ClusterNodes) of
+ true -> throw({error, {already_clustered,
+ "You are already clustered with the nodes you "
+ "have selected"}});
+ false -> ok
end,
- %% Pre-emptively leave the cluster
- %%
- %% We're trying to handle the following two cases:
- %% 1. We have a two-node cluster, where both nodes are disc nodes.
- %% One node is re-clustered as a ram node. When it tries to
- %% re-join the cluster, but before it has time to update its
- %% tables definitions, the other node will order it to re-create
- %% its disc tables. So, we need to leave the cluster before we
- %% can join it again.
- %% 2. We have a two-node cluster, where both nodes are disc nodes.
- %% One node is forcefully reset (so, the other node thinks its
- %% still a part of the cluster). The reset node is re-clustered
- %% as a ram node. Same as above, we need to leave the cluster
- %% before we can join it. But, since we don't know if we're in a
- %% cluster or not, we just pre-emptively leave it before joining.
- ProperClusterNodes = ClusterNodes -- [node()],
- try
- ok = leave_cluster(ProperClusterNodes, ProperClusterNodes)
- catch
- {error, {no_running_cluster_nodes, _, _}} when Force ->
- ok
- end,
+ %% reset the node. this simplifies things and it will be needed in this case
+ %% - we're joining a new cluster with new nodes which are not in synch with
+ %% the current node. I also lifts the burden of reseting the node from the
+ %% user.
+ reset(false),
+
+ rabbit_misc:local_info_msg("Clustering with ~p~n", [ClusterNodes]),
%% Join the cluster
- start_mnesia(),
- try
- ok = init_db(ClusterNodes, Force),
- ok = create_cluster_nodes_config(ClusterNodes)
- after
- stop_mnesia()
- end,
+ ok = init_db_with_mnesia(ClusterNodes, WantDiscNode, false),
+
+ rabbit_node_monitor:notify_joined_cluster(),
ok.
@@ -188,15 +211,446 @@ cluster(ClusterNodes, Force) ->
reset() -> reset(false).
force_reset() -> reset(true).
+reset(Force) ->
+ rabbit_misc:local_info_msg("Resetting Rabbit~s~n",
+ [if Force -> " forcefully";
+ true -> ""
+ end]),
+ ensure_mnesia_not_running(),
+ Node = node(),
+ case Force of
+ true ->
+ disconnect_nodes(nodes());
+ false ->
+ AllNodes = all_clustered_nodes(),
+ %% Reconnecting so that we will get an up to date nodes.
+ %% We don't need to check for consistency because we are resetting.
+ %% Force=true here so that reset still works when clustered with a
+ %% node which is down.
+ init_db_with_mnesia(AllNodes, is_disc_node(), false, true),
+ case is_disc_and_clustered() andalso
+ [node()] =:= clustered_disc_nodes()
+ of
+ true -> throw({error, {standalone_ram_node,
+ "You can't reset a node if it's the "
+ "only disc node in a cluster. Please "
+ "convert another node of the cluster "
+ "to a disc node first."}});
+ false -> ok
+ end,
+ leave_cluster(),
+ rabbit_misc:ensure_ok(mnesia:delete_schema([Node]),
+ cannot_delete_schema),
+ disconnect_nodes(all_clustered_nodes()),
+ ok
+ end,
+ %% remove persisted messages and any other garbage we find
+ ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
+ ok = rabbit_node_monitor:reset_cluster_status(),
+ ok.
+
+%% We need to make sure that we don't end up in a distributed Erlang system with
+%% nodes while not being in an Mnesia cluster with them. We don't handle that
+%% well.
+disconnect_nodes(Nodes) -> [erlang:disconnect_node(N) || N <- Nodes].
+
+change_cluster_node_type(Type) ->
+ ensure_mnesia_dir(),
+ ensure_mnesia_not_running(),
+ case is_clustered() of
+ false -> throw({error, {not_clustered,
+ "Non-clustered nodes can only be disc nodes"}});
+ true -> ok
+ end,
+ {_, _, RunningNodes} =
+ case discover_cluster(all_clustered_nodes()) of
+ {ok, Status} ->
+ Status;
+ {error, _Reason} ->
+ throw({error,
+ {cannot_connect_to_cluster,
+ "Could not connect to the cluster nodes present in "
+ "this node status file. If the cluster has changed, "
+ "you can use the \"update_cluster_nodes\" command to "
+ "point to the new cluster nodes"}})
+ end,
+ Node = case RunningNodes of
+ [] ->
+ throw({error,
+ {no_online_cluster_nodes,
+ "Could not find any online cluster nodes. If the "
+ "cluster has changed, you can use the 'recluster' "
+ "command."}});
+ [Node0|_] ->
+ Node0
+ end,
+ ok = reset(false),
+ ok = join_cluster(Node, case Type of
+ ram -> false;
+ disc -> true
+ end).
+
+update_cluster_nodes(DiscoveryNode) ->
+ ensure_mnesia_not_running(),
+ ensure_mnesia_dir(),
+
+ Status = {AllNodes, _, _} =
+ case discover_cluster(DiscoveryNode) of
+ {ok, Status0} ->
+ Status0;
+ {error, _Reason} ->
+ throw({error,
+ {cannot_connect_to_node,
+ "Could not connect to the cluster node provided"}})
+ end,
+ case ordsets:is_element(node(), AllNodes) of
+ true -> %% As in `check_consistency/0', we can safely delete the schema
+ %% here, since it'll be replicated from the other nodes
+ mnesia:delete_schema([node()]),
+ rabbit_node_monitor:write_cluster_status(Status),
+ init_db_with_mnesia(AllNodes, is_disc_node(), false);
+ false -> throw({error,
+ {inconsistent_cluster,
+ "The nodes provided do not have this node as part of "
+ "the cluster"}})
+ end,
+
+ ok.
+
+%% We proceed like this: try to remove the node locally. If the node is offline,
+%% we remove the node if:
+%% * This node is a disc node
+%% * All other nodes are offline
+%% * This node was, at the best of our knowledge (see comment below) the last
+%% or second to last after the node we're removing to go down
+forget_cluster_node(Node, RemoveWhenOffline) ->
+ case ordsets:is_element(Node, all_clustered_nodes()) of
+ true -> ok;
+ false -> throw({error, {not_a_cluster_node,
+ "The node selected is not in the cluster."}})
+ end,
+ case {mnesia:system_info(is_running), RemoveWhenOffline} of
+ {yes, true} -> throw({error, {online_node_offline_flag,
+ "You set the --offline flag, which is "
+ "used to remove nodes remotely from "
+ "offline nodes, but this node is "
+ "online. "}});
+ _ -> ok
+ end,
+ case remove_node_if_mnesia_running(Node) of
+ ok ->
+ ok;
+ {error, mnesia_not_running} ->
+ case RemoveWhenOffline of
+ true -> remove_node_offline_node(Node);
+ false -> throw({error,
+ {offline_node_no_offline_flag,
+ "You are trying to remove a node from an "
+ "offline node. That's dangerous, but can be "
+ "done with the --offline flag. Please consult "
+ "the manual for rabbitmqctl for more "
+ "information."}})
+ end;
+ Err = {error, _} ->
+ throw(Err)
+ end.
+
+remove_node_offline_node(Node) ->
+ case {ordsets:del_element(Node,
+ running_nodes(all_clustered_nodes())),
+ is_disc_node()}
+ of
+ {[], true} ->
+ %% Note that while we check if the nodes was the last to go down,
+ %% apart from the node we're removing from, this is still unsafe.
+ %% Consider the situation in which A and B are clustered. A goes
+ %% down, and records B as the running node. Then B gets clustered
+ %% with C, C goes down and B goes down. In this case, C is the
+ %% second-to-last, but we don't know that and we'll remove B from A
+ %% anyway, even if that will lead to bad things.
+ case ordsets:subtract(running_clustered_nodes(),
+ ordsets:from_list([node(), Node]))
+ of
+ [] -> start_mnesia(),
+ try
+ [mnesia:force_load_table(T) ||
+ T <- rabbit_mnesia:table_names()],
+ forget_cluster_node(Node, false),
+ ensure_mnesia_running()
+ after
+ stop_mnesia()
+ end;
+ _ -> throw({error,
+ {not_last_node_to_go_down,
+ "The node you're trying to remove from was not "
+ "the last to go down (excluding the node you are "
+ "removing). Please use the the last node to go "
+ "down to remove nodes when the cluster is "
+ "offline."}})
+ end;
+ {_, _} ->
+ throw({error,
+ {removing_node_from_offline_node,
+ "To remove a node remotely from an offline node, the node "
+ "you're removing from must be a disc node and all the "
+ "other nodes must be offline."}})
+ end.
+
+
+%%----------------------------------------------------------------------------
+%% Queries
+%%----------------------------------------------------------------------------
+
+status() ->
+ IfNonEmpty = fun (_, []) -> [];
+ (Type, Nodes) -> [{Type, Nodes}]
+ end,
+ [{nodes, (IfNonEmpty(disc, clustered_disc_nodes()) ++
+ IfNonEmpty(ram, clustered_ram_nodes()))}] ++
+ case mnesia:system_info(is_running) of
+ yes -> [{running_nodes, running_clustered_nodes()}];
+ no -> []
+ end.
+
+is_db_empty() ->
+ lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
+ table_names()).
+
is_clustered() ->
- RunningNodes = running_clustered_nodes(),
- [node()] /= RunningNodes andalso [] /= RunningNodes.
+ Nodes = all_clustered_nodes(),
+ [node()] =/= Nodes andalso [] =/= Nodes.
+
+is_disc_and_clustered() ->
+ is_disc_node() andalso is_clustered().
+
+%% Functions that retrieve the nodes in the cluster will rely on the status file
+%% if offline.
all_clustered_nodes() ->
- mnesia:system_info(db_nodes).
+ cluster_status(all).
+
+clustered_disc_nodes() ->
+ cluster_status(disc).
+
+clustered_ram_nodes() ->
+ ordsets:subtract(cluster_status(all), cluster_status(disc)).
running_clustered_nodes() ->
- mnesia:system_info(running_db_nodes).
+ cluster_status(running).
+
+running_clustered_disc_nodes() ->
+ {_, DiscNodes, RunningNodes} = cluster_status(),
+ ordsets:intersection(DiscNodes, RunningNodes).
+
+%% This function is the actual source of information, since it gets the data
+%% from mnesia. Obviously it'll work only when mnesia is running.
+mnesia_nodes() ->
+ case mnesia:system_info(is_running) of
+ no -> {error, mnesia_not_running};
+ yes -> %% If the tables are not present, it means that `init_db/3'
+ %% hasn't been run yet. In other words, either we are a virgin
+ %% node or a restarted RAM node. In both cases we're not
+ %% interested in what mnesia has to say.
+ IsDiscNode = mnesia:system_info(use_dir),
+ Tables = mnesia:system_info(tables),
+ {Table, _} = case table_definitions(case IsDiscNode of
+ true -> disc;
+ false -> ram
+ end) of [T|_] -> T end,
+ case lists:member(Table, Tables) of
+ true ->
+ AllNodes =
+ ordsets:from_list(mnesia:system_info(db_nodes)),
+ DiscCopies = ordsets:from_list(
+ mnesia:table_info(schema, disc_copies)),
+ DiscNodes =
+ case IsDiscNode of
+ true -> ordsets:add_element(node(), DiscCopies);
+ false -> DiscCopies
+ end,
+ {ok, {AllNodes, DiscNodes}};
+ false ->
+ {error, tables_not_present}
+ end
+ end.
+
+cluster_status(WhichNodes, ForceMnesia) ->
+ %% I don't want to call `running_nodes/1' unless if necessary, since it can
+ %% deadlock when stopping applications.
+ Nodes = case mnesia_nodes() of
+ {ok, {AllNodes, DiscNodes}} ->
+ {ok, {AllNodes, DiscNodes,
+ fun() -> running_nodes(AllNodes) end}};
+ {error, _Reason} when not ForceMnesia ->
+ {AllNodes, DiscNodes, RunningNodes} =
+ rabbit_node_monitor:read_cluster_status(),
+ %% The cluster status file records the status when the node
+ %% is online, but we know for sure that the node is offline
+ %% now, so we can remove it from the list of running nodes.
+ {ok,
+ {AllNodes, DiscNodes,
+ fun() -> ordsets:del_element(node(), RunningNodes) end}};
+ Err = {error, _} ->
+ Err
+ end,
+ case Nodes of
+ {ok, {AllNodes1, DiscNodes1, RunningNodesThunk}} ->
+ {ok, case WhichNodes of
+ status -> {AllNodes1, DiscNodes1, RunningNodesThunk()};
+ all -> AllNodes1;
+ disc -> DiscNodes1;
+ running -> RunningNodesThunk()
+ end};
+ Err1 = {error, _} ->
+ Err1
+ end.
+
+cluster_status(WhichNodes) ->
+ {ok, Status} = cluster_status(WhichNodes, false),
+ Status.
+
+cluster_status() ->
+ cluster_status(status).
+
+cluster_status_from_mnesia() ->
+ cluster_status(status, true).
+
+node_info() ->
+ {erlang:system_info(otp_release), rabbit_misc:version(),
+ cluster_status_from_mnesia()}.
+
+is_disc_node() ->
+ DiscNodes = clustered_disc_nodes(),
+ DiscNodes =:= [] orelse ordsets:is_element(node(), DiscNodes).
+
+dir() -> mnesia:system_info(directory).
+
+table_names() ->
+ [Tab || {Tab, _} <- table_definitions()].
+
+%%----------------------------------------------------------------------------
+%% Operations on the db
+%%----------------------------------------------------------------------------
+
+%% Adds the provided nodes to the mnesia cluster, creating a new schema if there
+%% is the need to and catching up if there are other nodes in the cluster
+%% already. It also updates the cluster status file.
+init_db(ClusterNodes, WantDiscNode, Force) ->
+ Nodes = change_extra_db_nodes(ClusterNodes, Force),
+ %% Note that we use `system_info' here and not the cluster status since when
+ %% we start rabbit for the first time the cluster status will say we are a
+ %% disc node but the tables won't be present yet.
+ WasDiscNode = mnesia:system_info(use_dir),
+ case {Nodes, WasDiscNode, WantDiscNode} of
+ {[], _, false} ->
+ %% Standalone ram node, we don't want that
+ throw({error, cannot_create_standalone_ram_node});
+ {[], false, true} ->
+ %% RAM -> disc, starting from scratch
+ ok = create_schema();
+ {[], true, true} ->
+ %% First disc node up
+ ok;
+ {[AnotherNode | _], _, _} ->
+ %% Subsequent node in cluster, catch up
+ ensure_version_ok(
+ rpc:call(AnotherNode, rabbit_version, recorded, [])),
+ ok = wait_for_replicated_tables(),
+ %% The sequence in which we delete the schema and then the other
+ %% tables is important: if we delete the schema first when moving to
+ %% RAM mnesia will loudly complain since it doesn't make much sense
+ %% to do that. But when moving to disc, we need to move the schema
+ %% first.
+ case WantDiscNode of
+ true -> create_local_table_copy(schema, disc_copies),
+ create_local_table_copies(disc);
+ false -> create_local_table_copies(ram),
+ create_local_table_copy(schema, ram_copies)
+ end
+ end,
+ ensure_schema_integrity(),
+ rabbit_node_monitor:update_cluster_status(),
+ ok.
+
+init_db_and_upgrade(ClusterNodes, WantDiscNode, Force) ->
+ ok = init_db(ClusterNodes, WantDiscNode, Force),
+ ok = case rabbit_upgrade:maybe_upgrade_local() of
+ ok -> ok;
+ starting_from_scratch -> rabbit_version:record_desired();
+ version_not_available -> schema_ok_or_move()
+ end,
+ %% `maybe_upgrade_local' restarts mnesia, so ram nodes will forget about the
+ %% cluster
+ case WantDiscNode of
+ false -> start_mnesia(),
+ change_extra_db_nodes(ClusterNodes, true),
+ wait_for_replicated_tables();
+ true -> ok
+ end,
+ ok.
+
+init_db_with_mnesia(ClusterNodes, WantDiscNode, CheckConsistency, Force) ->
+ start_mnesia(CheckConsistency),
+ try
+ init_db_and_upgrade(ClusterNodes, WantDiscNode, Force)
+ after
+ stop_mnesia()
+ end.
+
+init_db_with_mnesia(ClusterNodes, WantDiscNode, Force) ->
+ init_db_with_mnesia(ClusterNodes, WantDiscNode, true, Force).
+
+ensure_mnesia_dir() ->
+ MnesiaDir = dir() ++ "/",
+ case filelib:ensure_dir(MnesiaDir) of
+ {error, Reason} ->
+ throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}});
+ ok ->
+ ok
+ end.
+
+ensure_mnesia_running() ->
+ case mnesia:system_info(is_running) of
+ yes ->
+ ok;
+ starting ->
+ wait_for(mnesia_running),
+ ensure_mnesia_running();
+ Reason when Reason =:= no; Reason =:= stopping ->
+ throw({error, mnesia_not_running})
+ end.
+
+ensure_mnesia_not_running() ->
+ case mnesia:system_info(is_running) of
+ no ->
+ ok;
+ stopping ->
+ wait_for(mnesia_not_running),
+ ensure_mnesia_not_running();
+ Reason when Reason =:= yes; Reason =:= starting ->
+ throw({error, mnesia_unexpectedly_running})
+ end.
+
+ensure_schema_integrity() ->
+ case check_schema_integrity() of
+ ok ->
+ ok;
+ {error, Reason} ->
+ throw({error, {schema_integrity_check_failed, Reason}})
+ end.
+
+check_schema_integrity() ->
+ Tables = mnesia:system_info(tables),
+ case check_tables(fun (Tab, TabDef) ->
+ case lists:member(Tab, Tables) of
+ false -> {error, {table_missing, Tab}};
+ true -> check_table_attributes(Tab, TabDef)
+ end
+ end) of
+ ok -> ok = wait_for_tables(),
+ check_tables(fun check_table_content/2);
+ Other -> Other
+ end.
empty_ram_only_tables() ->
Node = node(),
@@ -209,13 +663,125 @@ empty_ram_only_tables() ->
end, table_names()),
ok.
+create_tables() -> create_tables(disc).
+
+create_tables(Type) ->
+ lists:foreach(fun ({Tab, TabDef}) ->
+ TabDef1 = proplists:delete(match, TabDef),
+ case mnesia:create_table(Tab, TabDef1) of
+ {atomic, ok} -> ok;
+ {aborted, Reason} ->
+ throw({error, {table_creation_failed,
+ Tab, TabDef1, Reason}})
+ end
+ end,
+ table_definitions(Type)),
+ ok.
+
+copy_db(Destination) ->
+ ok = ensure_mnesia_not_running(),
+ rabbit_file:recursive_copy(dir(), Destination).
+
+wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()).
+
+wait_for_tables() -> wait_for_tables(table_names()).
+
+wait_for_tables(TableNames) ->
+ case mnesia:wait_for_tables(TableNames, 30000) of
+ ok ->
+ ok;
+ {timeout, BadTabs} ->
+ throw({error, {timeout_waiting_for_tables, BadTabs}});
+ {error, Reason} ->
+ throw({error, {failed_waiting_for_tables, Reason}})
+ end.
+
+%% This does not guarantee us much, but it avoids some situations that will
+%% definitely end up badly
+check_cluster_consistency() ->
+ %% We want to find 0 or 1 consistent nodes.
+ case lists:foldl(
+ fun (Node, {error, _}) -> check_cluster_consistency(Node);
+ (_Node, {ok, Status}) -> {ok, Status}
+ end, {error, not_found},
+ ordsets:del_element(node(), all_clustered_nodes()))
+ of
+ {ok, Status = {RemoteAllNodes, _, _}} ->
+ case ordsets:is_subset(all_clustered_nodes(), RemoteAllNodes) of
+ true -> ok;
+ false -> %% We delete the schema here since we think we are
+ %% clustered with nodes that are no longer in the
+ %% cluster and there is no other way to remove them
+ %% from our schema. On the other hand, we are sure
+ %% that there is another online node that we can use
+ %% to sync the tables with. There is a race here: if
+ %% between this check and the `init_db' invocation the
+ %% cluster gets disbanded, we're left with a node with
+ %% no mnesia data that will try to connect to offline
+ %% nodes.
+ mnesia:delete_schema([node()])
+ end,
+ rabbit_node_monitor:write_cluster_status(Status);
+ {error, not_found} ->
+ ok;
+ E = {error, _} ->
+ throw(E)
+ end.
+
+check_cluster_consistency(Node) ->
+ case rpc:call(Node, rabbit_mnesia, node_info, []) of
+ {badrpc, _Reason} ->
+ {error, not_found};
+ {_OTP, _Rabbit, {error, _}} ->
+ {error, not_found};
+ {OTP, Rabbit, {ok, Status}} ->
+ case check_consistency(OTP, Rabbit, Node, Status) of
+ E = {error, _} -> E;
+ {ok, Res} -> {ok, Res}
+ end
+ end.
+
+%%--------------------------------------------------------------------
+%% Hooks for `rabbit_node_monitor'
%%--------------------------------------------------------------------
-nodes_of_type(Type) ->
- %% This function should return the nodes of a certain type (ram,
- %% disc or disc_only) in the current cluster. The type of nodes
- %% is determined when the cluster is initially configured.
- mnesia:table_info(schema, Type).
+on_node_up(Node) ->
+ case running_clustered_disc_nodes() =:= [Node] of
+ true -> rabbit_log:info("cluster contains disc nodes again~n");
+ false -> ok
+ end.
+
+on_node_down(_Node) ->
+ case running_clustered_disc_nodes() =:= [] of
+ true -> rabbit_log:info("only running disc node went down~n");
+ false -> ok
+ end.
+
+%%--------------------------------------------------------------------
+%% Internal helpers
+%%--------------------------------------------------------------------
+
+discover_cluster(Nodes) when is_list(Nodes) ->
+ lists:foldl(fun (_, {ok, Res}) -> {ok, Res};
+ (Node, {error, _}) -> discover_cluster(Node)
+ end,
+ {error, no_nodes_provided},
+ Nodes);
+discover_cluster(Node) ->
+ OfflineError =
+ {error, {cannot_discover_cluster,
+ "The nodes provided is either offline or not running"}},
+ case Node =:= node() of
+ true ->
+ {error, {cannot_discover_cluster,
+ "You provided the current node as node to cluster with"}};
+ false ->
+ case rpc:call(Node, rabbit_mnesia, cluster_status_from_mnesia, []) of
+ {badrpc, _Reason} -> OfflineError;
+ {error, mnesia_not_running} -> OfflineError;
+ {ok, Res} -> {ok, Res}
+ end
+ end.
%% The tables aren't supposed to be on disk on a ram node
table_definitions(disc) ->
@@ -336,68 +902,11 @@ queue_name_match() ->
resource_match(Kind) ->
#resource{kind = Kind, _='_'}.
-table_names() ->
- [Tab || {Tab, _} <- table_definitions()].
-
replicated_table_names() ->
[Tab || {Tab, TabDef} <- table_definitions(),
not lists:member({local_content, true}, TabDef)
].
-dir() -> mnesia:system_info(directory).
-
-ensure_mnesia_dir() ->
- MnesiaDir = dir() ++ "/",
- case filelib:ensure_dir(MnesiaDir) of
- {error, Reason} ->
- throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}});
- ok ->
- ok
- end.
-
-ensure_mnesia_running() ->
- case mnesia:system_info(is_running) of
- yes ->
- ok;
- starting ->
- wait_for(mnesia_running),
- ensure_mnesia_running();
- Reason when Reason =:= no; Reason =:= stopping ->
- throw({error, mnesia_not_running})
- end.
-
-ensure_mnesia_not_running() ->
- case mnesia:system_info(is_running) of
- no ->
- ok;
- stopping ->
- wait_for(mnesia_not_running),
- ensure_mnesia_not_running();
- Reason when Reason =:= yes; Reason =:= starting ->
- throw({error, mnesia_unexpectedly_running})
- end.
-
-ensure_schema_integrity() ->
- case check_schema_integrity() of
- ok ->
- ok;
- {error, Reason} ->
- throw({error, {schema_integrity_check_failed, Reason}})
- end.
-
-check_schema_integrity() ->
- Tables = mnesia:system_info(tables),
- case check_tables(fun (Tab, TabDef) ->
- case lists:member(Tab, Tables) of
- false -> {error, {table_missing, Tab}};
- true -> check_table_attributes(Tab, TabDef)
- end
- end) of
- ok -> ok = wait_for_tables(),
- check_tables(fun check_table_content/2);
- Other -> Other
- end.
-
check_table_attributes(Tab, TabDef) ->
{_, ExpAttrs} = proplists:lookup(attributes, TabDef),
case mnesia:table_info(Tab, attributes) of
@@ -433,153 +942,6 @@ check_tables(Fun) ->
Errors -> {error, Errors}
end.
-%% The cluster node config file contains some or all of the disk nodes
-%% that are members of the cluster this node is / should be a part of.
-%%
-%% If the file is absent, the list is empty, or only contains the
-%% current node, then the current node is a standalone (disk)
-%% node. Otherwise it is a node that is part of a cluster as either a
-%% disk node, if it appears in the cluster node config, or ram node if
-%% it doesn't.
-
-cluster_nodes_config_filename() ->
- dir() ++ "/cluster_nodes.config".
-
-create_cluster_nodes_config(ClusterNodes) ->
- FileName = cluster_nodes_config_filename(),
- case rabbit_file:write_term_file(FileName, [ClusterNodes]) of
- ok -> ok;
- {error, Reason} ->
- throw({error, {cannot_create_cluster_nodes_config,
- FileName, Reason}})
- end.
-
-read_cluster_nodes_config() ->
- FileName = cluster_nodes_config_filename(),
- case rabbit_file:read_term_file(FileName) of
- {ok, [ClusterNodes]} -> ClusterNodes;
- {error, enoent} ->
- {ok, ClusterNodes} = application:get_env(rabbit, cluster_nodes),
- ClusterNodes;
- {error, Reason} ->
- throw({error, {cannot_read_cluster_nodes_config,
- FileName, Reason}})
- end.
-
-delete_cluster_nodes_config() ->
- FileName = cluster_nodes_config_filename(),
- case file:delete(FileName) of
- ok -> ok;
- {error, enoent} -> ok;
- {error, Reason} ->
- throw({error, {cannot_delete_cluster_nodes_config,
- FileName, Reason}})
- end.
-
-running_nodes_filename() ->
- filename:join(dir(), "nodes_running_at_shutdown").
-
-record_running_nodes() ->
- FileName = running_nodes_filename(),
- Nodes = running_clustered_nodes() -- [node()],
- %% Don't check the result: we're shutting down anyway and this is
- %% a best-effort-basis.
- rabbit_file:write_term_file(FileName, [Nodes]),
- ok.
-
-read_previously_running_nodes() ->
- FileName = running_nodes_filename(),
- case rabbit_file:read_term_file(FileName) of
- {ok, [Nodes]} -> Nodes;
- {error, enoent} -> [];
- {error, Reason} -> throw({error, {cannot_read_previous_nodes_file,
- FileName, Reason}})
- end.
-
-delete_previously_running_nodes() ->
- FileName = running_nodes_filename(),
- case file:delete(FileName) of
- ok -> ok;
- {error, enoent} -> ok;
- {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file,
- FileName, Reason}})
- end.
-
-init_db(ClusterNodes, Force) ->
- init_db(
- ClusterNodes, Force,
- fun () ->
- case rabbit_upgrade:maybe_upgrade_local() of
- ok -> ok;
- %% If we're just starting up a new node we won't have a
- %% version
- starting_from_scratch -> ok = rabbit_version:record_desired()
- end
- end).
-
-%% Take a cluster node config and create the right kind of node - a
-%% standalone disk node, or disk or ram node connected to the
-%% specified cluster nodes. If Force is false, don't allow
-%% connections to offline nodes.
-init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) ->
- UClusterNodes = lists:usort(ClusterNodes),
- ProperClusterNodes = UClusterNodes -- [node()],
- case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of
- {ok, []} when not Force andalso ProperClusterNodes =/= [] ->
- throw({error, {failed_to_cluster_with, ProperClusterNodes,
- "Mnesia could not connect to any disc nodes."}});
- {ok, Nodes} ->
- WasDiscNode = is_disc_node(),
- WantDiscNode = should_be_disc_node(ClusterNodes),
- %% We create a new db (on disk, or in ram) in the first
- %% two cases and attempt to upgrade the in the other two
- case {Nodes, WasDiscNode, WantDiscNode} of
- {[], _, false} ->
- %% New ram node; start from scratch
- ok = create_schema(ram);
- {[], false, true} ->
- %% Nothing there at all, start from scratch
- ok = create_schema(disc);
- {[], true, true} ->
- %% We're the first node up
- case rabbit_upgrade:maybe_upgrade_local() of
- ok -> ensure_schema_integrity();
- version_not_available -> ok = schema_ok_or_move()
- end;
- {[AnotherNode|_], _, _} ->
- %% Subsequent node in cluster, catch up
- ensure_version_ok(
- rpc:call(AnotherNode, rabbit_version, recorded, [])),
- {CopyType, CopyTypeAlt} =
- case WantDiscNode of
- true -> {disc, disc_copies};
- false -> {ram, ram_copies}
- end,
- ok = wait_for_replicated_tables(),
- ok = create_local_table_copy(schema, CopyTypeAlt),
- ok = create_local_table_copies(CopyType),
-
- ok = SecondaryPostMnesiaFun(),
- %% We've taken down mnesia, so ram nodes will need
- %% to re-sync
- case is_disc_node() of
- false -> start_mnesia(),
- mnesia:change_config(extra_db_nodes,
- ProperClusterNodes),
- wait_for_replicated_tables();
- true -> ok
- end,
-
- ensure_schema_integrity(),
- ok
- end;
- {error, Reason} ->
- %% one reason we may end up here is if we try to join
- %% nodes together that are currently running standalone or
- %% are members of a different cluster
- throw({error, {unable_to_join_cluster, ClusterNodes, Reason}})
- end.
-
schema_ok_or_move() ->
case check_schema_integrity() of
ok ->
@@ -592,7 +954,7 @@ schema_ok_or_move() ->
"and recreating schema from scratch~n",
[Reason]),
ok = move_db(),
- ok = create_schema(disc)
+ ok = create_schema()
end.
ensure_version_ok({ok, DiscVersion}) ->
@@ -604,25 +966,15 @@ ensure_version_ok({ok, DiscVersion}) ->
ensure_version_ok({error, _}) ->
ok = rabbit_version:record_desired().
-create_schema(Type) ->
+%% We only care about disc nodes since ram nodes are supposed to catch up only
+create_schema() ->
stop_mnesia(),
- case Type of
- disc -> rabbit_misc:ensure_ok(mnesia:create_schema([node()]),
- cannot_create_schema);
- ram -> %% remove the disc schema since this is a ram node
- rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
- cannot_delete_schema)
- end,
+ rabbit_misc:ensure_ok(mnesia:create_schema([node()]), cannot_create_schema),
start_mnesia(),
- ok = create_tables(Type),
+ ok = create_tables(disc),
ensure_schema_integrity(),
ok = rabbit_version:record_desired().
-is_disc_node() -> mnesia:system_info(use_dir).
-
-should_be_disc_node(ClusterNodes) ->
- ClusterNodes == [] orelse lists:member(node(), ClusterNodes).
-
move_db() ->
stop_mnesia(),
MnesiaDir = filename:dirname(dir() ++ "/"),
@@ -644,25 +996,6 @@ move_db() ->
start_mnesia(),
ok.
-copy_db(Destination) ->
- ok = ensure_mnesia_not_running(),
- rabbit_file:recursive_copy(dir(), Destination).
-
-create_tables() -> create_tables(disc).
-
-create_tables(Type) ->
- lists:foreach(fun ({Tab, TabDef}) ->
- TabDef1 = proplists:delete(match, TabDef),
- case mnesia:create_table(Tab, TabDef1) of
- {atomic, ok} -> ok;
- {aborted, Reason} ->
- throw({error, {table_creation_failed,
- Tab, TabDef1, Reason}})
- end
- end,
- table_definitions(Type)),
- ok.
-
copy_type_to_ram(TabDef) ->
[{disc_copies, []}, {ram_copies, [node()]}
| proplists:delete(ram_copies, proplists:delete(disc_copies, TabDef))].
@@ -684,13 +1017,6 @@ create_local_table_copies(Type) ->
HasDiscOnlyCopies -> disc_only_copies;
true -> ram_copies
end;
-%%% unused code - commented out to keep dialyzer happy
-%%% Type =:= disc_only ->
-%%% if
-%%% HasDiscCopies or HasDiscOnlyCopies ->
-%%% disc_only_copies;
-%%% true -> ram_copies
-%%% end;
Type =:= ram ->
ram_copies
end,
@@ -711,122 +1037,156 @@ create_local_table_copy(Tab, Type) ->
end,
ok.
-wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()).
-
-wait_for_tables() -> wait_for_tables(table_names()).
-
-wait_for_tables(TableNames) ->
- case mnesia:wait_for_tables(TableNames, 30000) of
- ok ->
- ok;
- {timeout, BadTabs} ->
- throw({error, {timeout_waiting_for_tables, BadTabs}});
- {error, Reason} ->
- throw({error, {failed_waiting_for_tables, Reason}})
+remove_node_if_mnesia_running(Node) ->
+ case mnesia:system_info(is_running) of
+ yes -> %% Deleting the the schema copy of the node will result in the
+ %% node being removed from the cluster, with that change being
+ %% propagated to all nodes
+ case mnesia:del_table_copy(schema, Node) of
+ {atomic, ok} ->
+ rabbit_node_monitor:notify_left_cluster(Node),
+ ok;
+ {aborted, Reason} ->
+ {error, {failed_to_remove_node, Node, Reason}}
+ end;
+ no -> {error, mnesia_not_running}
end.
-reset(Force) ->
- rabbit_misc:local_info_msg("Resetting Rabbit~s~n",
- [if Force -> " forcefully";
- true -> ""
- end]),
- ensure_mnesia_not_running(),
- case not Force andalso is_clustered() andalso
- is_only_disc_node(node(), false)
+leave_cluster() ->
+ case {is_clustered(),
+ running_nodes(ordsets:del_element(node(), all_clustered_nodes()))}
of
- true -> log_both("no other disc nodes running");
- false -> ok
- end,
- case Force of
- true ->
- disconnect_nodes(nodes());
- false ->
- ensure_mnesia_dir(),
- start_mnesia(),
- {Nodes, RunningNodes} =
- try
- %% Force=true here so that reset still works when clustered
- %% with a node which is down
- ok = init_db(read_cluster_nodes_config(), true),
- {all_clustered_nodes() -- [node()],
- running_clustered_nodes() -- [node()]}
- after
- stop_mnesia()
- end,
- leave_cluster(Nodes, RunningNodes),
- rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
- cannot_delete_schema),
- disconnect_nodes(Nodes)
- end,
- ok = delete_cluster_nodes_config(),
- %% remove persisted messages and any other garbage we find
- ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
- ok.
-
-%% We need to make sure that we don't end up in a distributed Erlang
-%% system with nodes while not being in an Mnesia cluster with
-%% them. We don't handle that well.
-disconnect_nodes(Nodes) -> [erlang:disconnect_node(N) || N <- Nodes].
-
-leave_cluster([], _) -> ok;
-leave_cluster(Nodes, RunningNodes) ->
- %% find at least one running cluster node and instruct it to
- %% remove our schema copy which will in turn result in our node
- %% being removed as a cluster node from the schema, with that
- %% change being propagated to all nodes
- case lists:any(
- fun (Node) ->
- case rpc:call(Node, mnesia, del_table_copy,
- [schema, node()]) of
- {atomic, ok} -> true;
- {badrpc, nodedown} -> false;
- {aborted, {node_not_running, _}} -> false;
- {aborted, Reason} ->
- throw({error, {failed_to_leave_cluster,
- Nodes, RunningNodes, Reason}})
- end
- end,
- RunningNodes) of
- true -> ok;
- false -> throw({error, {no_running_cluster_nodes,
- Nodes, RunningNodes}})
+ {false, []} ->
+ ok;
+ {_, AllNodes} ->
+ case lists:any(
+ fun (Node) ->
+ case rpc:call(Node, rabbit_mnesia,
+ remove_node_if_mnesia_running,
+ [node()])
+ of
+ ok ->
+ true;
+ {error, mnesia_not_running} ->
+ false;
+ {error, Reason} ->
+ throw({error, Reason});
+ {badrpc, nodedown} ->
+ false
+ end
+ end,
+ AllNodes)
+ of
+ true -> ok;
+ false -> throw({error,
+ {no_running_cluster_nodes,
+ "You cannot leave a cluster if no online "
+ "nodes are present"}})
+ end
end.
wait_for(Condition) ->
error_logger:info_msg("Waiting for ~p...~n", [Condition]),
timer:sleep(1000).
-on_node_up(Node) ->
- case is_only_disc_node(Node, true) of
- true -> rabbit_log:info("cluster contains disc nodes again~n");
- false -> ok
- end.
-
-on_node_down(Node) ->
- case is_only_disc_node(Node, true) of
- true -> rabbit_log:info("only running disc node went down~n");
+start_mnesia(CheckConsistency) ->
+ case CheckConsistency of
+ true -> check_cluster_consistency();
false -> ok
- end.
-
-is_only_disc_node(Node, _MnesiaRunning = true) ->
- RunningSet = sets:from_list(running_clustered_nodes()),
- DiscSet = sets:from_list(nodes_of_type(disc_copies)),
- [Node] =:= sets:to_list(sets:intersection(RunningSet, DiscSet));
-is_only_disc_node(Node, false) ->
- start_mnesia(),
- Res = is_only_disc_node(Node, true),
- stop_mnesia(),
- Res.
-
-log_both(Warning) ->
- io:format("Warning: ~s~n", [Warning]),
- rabbit_misc:with_local_io(
- fun () -> error_logger:warning_msg("~s~n", [Warning]) end).
-
-start_mnesia() ->
+ end,
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
ensure_mnesia_running().
+start_mnesia() ->
+ start_mnesia(true).
+
stop_mnesia() ->
stopped = mnesia:stop(),
ensure_mnesia_not_running().
+
+change_extra_db_nodes(ClusterNodes0, Force) ->
+ ClusterNodes = lists:usort(ClusterNodes0) -- [node()],
+ case mnesia:change_config(extra_db_nodes, ClusterNodes) of
+ {ok, []} when not Force andalso ClusterNodes =/= [] ->
+ throw({error, {failed_to_cluster_with, ClusterNodes,
+ "Mnesia could not connect to any nodes."}});
+ {ok, Nodes} ->
+ Nodes
+ end.
+
+%% What we really want is nodes running rabbit, not running mnesia. Using
+%% `rabbit_mnesia:system_info(running_db_nodes)' will return false positives
+%% when we are actually just doing cluster operations (e.g. joining the
+%% cluster).
+running_nodes(Nodes) ->
+ {Replies, _BadNodes} =
+ rpc:multicall(Nodes, rabbit_mnesia, is_running_remote, []),
+ [Node || {Running, Node} <- Replies, Running].
+
+is_running_remote() ->
+ {proplists:is_defined(rabbit, application:which_applications(infinity)),
+ node()}.
+
+check_consistency(OTP, Rabbit) ->
+ rabbit_misc:sequence_error(
+ [check_otp_consistency(OTP), check_rabbit_consistency(Rabbit)]).
+
+check_consistency(OTP, Rabbit, Node, Status) ->
+ rabbit_misc:sequence_error(
+ [check_otp_consistency(OTP),
+ check_rabbit_consistency(Rabbit),
+ check_nodes_consistency(Node, Status)]).
+
+check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) ->
+ ThisNode = node(),
+ case ordsets:is_element(ThisNode, RemoteAllNodes) of
+ true ->
+ {ok, RemoteStatus};
+ false ->
+ {error, {inconsistent_cluster,
+ rabbit_misc:format("Node ~p thinks it's clustered "
+ "with node ~p, but ~p disagrees",
+ [ThisNode, Node, Node])}}
+ end.
+
+check_version_consistency(This, Remote, _) when This =:= Remote ->
+ ok;
+check_version_consistency(This, Remote, Name) ->
+ {error, {inconsistent_cluster,
+ rabbit_misc:format("~s version mismatch: local node is ~s, "
+ "remote node ~s", [Name, This, Remote])}}.
+
+check_otp_consistency(Remote) ->
+ check_version_consistency(erlang:system_info(otp_release), Remote, "OTP").
+
+check_rabbit_consistency(Remote) ->
+ check_version_consistency(rabbit_misc:version(), Remote, "Rabbit").
+
+%% This is fairly tricky. We want to know if the node is in the state that a
+%% `reset' would leave it in. We cannot simply check if the mnesia tables
+%% aren't there because restarted RAM nodes won't have tables while still being
+%% non-virgin. What we do instead is to check if the mnesia directory is non
+%% existant or empty, with the exception of the cluster status file, which will
+%% be there thanks to `rabbit_node_monitor:prepare_cluster_status_file/0'.
+is_virgin_node() ->
+ case rabbit_file:list_dir(dir()) of
+ {error, enoent} -> true;
+ {ok, []} -> true;
+ {ok, [File]} -> (dir() ++ "/" ++ File) =:=
+ [rabbit_node_monitor:cluster_status_file_name(),
+ rabbit_node_monitor:running_nodes_file_name()];
+ {ok, _} -> false
+ end.
+
+find_good_node([]) ->
+ none;
+find_good_node([Node | Nodes]) ->
+ case rpc:call(Node, rabbit_mnesia, node_info, []) of
+ {badrpc, _Reason} ->
+ find_good_node(Nodes);
+ {OTP, Rabbit, _} ->
+ case check_consistency(OTP, Rabbit) of
+ {error, _} -> find_good_node(Nodes);
+ ok -> {ok, Node}
+ end
+ end.
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 323cf0ce..b2d7bb60 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -18,11 +18,29 @@
-behaviour(gen_server).
--export([start_link/0]).
+-export([cluster_status_file_name/0,
+ running_nodes_file_name/0,
+ prepare_cluster_status_files/0,
+ write_cluster_status/1,
+ read_cluster_status/0,
+ update_cluster_status/0,
+ reset_cluster_status/0,
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
--export([notify_cluster/0, rabbit_running_on/1]).
+ joined_cluster/2,
+ notify_joined_cluster/0,
+ left_cluster/1,
+ notify_left_cluster/1,
+ node_up/2,
+ notify_node_up/0,
+
+ start_link/0,
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3
+ ]).
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
@@ -31,56 +49,202 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(rabbit_running_on/1 :: (node()) -> 'ok').
--spec(notify_cluster/0 :: () -> 'ok').
+-spec(cluster_status_file_name/0 :: () -> string()).
+-spec(prepare_cluster_status_files/0 :: () -> 'ok').
+-spec(write_cluster_status/1 :: (rabbit_mnesia:cluster_status())
+ -> 'ok').
+-spec(read_cluster_status/0 :: () -> rabbit_mnesia:cluster_status()).
+-spec(update_cluster_status/0 :: () -> 'ok').
+-spec(reset_cluster_status/0 :: () -> 'ok').
+
+-spec(joined_cluster/2 :: (node(), boolean()) -> 'ok').
+-spec(notify_joined_cluster/0 :: () -> 'ok').
+-spec(left_cluster/1 :: (node()) -> 'ok').
+-spec(notify_left_cluster/1 :: (node()) -> 'ok').
+-spec(node_up/2 :: (node(), boolean()) -> 'ok').
+-spec(notify_node_up/0 :: () -> 'ok').
-endif.
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------
+%% Cluster file operations
+%%----------------------------------------------------------------------------
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+%% The cluster node status file contains all we need to know about the cluster:
+%%
+%% * All the clustered nodes
+%% * The disc nodes
+%% * The running nodes.
+%%
+%% If the current node is a disc node it will be included in the disc nodes
+%% list.
+%%
+%% We strive to keep the file up to date and we rely on this assumption in
+%% various situations. Obviously when mnesia is offline the information we have
+%% will be outdated, but it can't be otherwise.
-rabbit_running_on(Node) ->
- gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node}).
+cluster_status_file_name() ->
+ rabbit_mnesia:dir() ++ "/cluster_nodes.config".
-notify_cluster() ->
- Node = node(),
- Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
- %% notify other rabbits of this rabbit
- case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on,
- [Node], ?RABBIT_UP_RPC_TIMEOUT) of
- {_, [] } -> ok;
- {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
- end,
+running_nodes_file_name() ->
+ filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown").
+
+prepare_cluster_status_files() ->
+ rabbit_mnesia:ensure_mnesia_dir(),
+ CorruptFiles = fun () -> throw({error, corrupt_cluster_status_files}) end,
+ RunningNodes1 = case try_read_file(running_nodes_file_name()) of
+ {ok, [Nodes]} when is_list(Nodes) -> Nodes;
+ {ok, _ } -> CorruptFiles();
+ non_existant -> []
+ end,
+ {AllNodes1, WantDiscNode} =
+ case try_read_file(cluster_status_file_name()) of
+ {ok, [{AllNodes, DiscNodes0}]} ->
+ {AllNodes, lists:member(node(), DiscNodes0)};
+ {ok, [AllNodes0]} when is_list(AllNodes0) ->
+ {legacy_cluster_nodes(AllNodes0),
+ legacy_should_be_disc_node(AllNodes0)};
+ {ok, _} ->
+ CorruptFiles();
+ non_existant ->
+ {legacy_cluster_nodes([]), true}
+ end,
+
+ ThisNode = [node()],
+
+ RunningNodes2 = lists:usort(RunningNodes1 ++ ThisNode),
+ AllNodes2 = lists:usort(AllNodes1 ++ RunningNodes2),
+ DiscNodes = case WantDiscNode of
+ true -> ThisNode;
+ false -> []
+ end,
+
+ ok = write_cluster_status({AllNodes2, DiscNodes, RunningNodes2}).
+
+write_cluster_status({All, Disc, Running}) ->
+ ClusterStatusFN = cluster_status_file_name(),
+ Res = case rabbit_file:write_term_file(ClusterStatusFN, [{All, Disc}]) of
+ ok ->
+ RunningNodesFN = running_nodes_file_name(),
+ {RunningNodesFN,
+ rabbit_file:write_term_file(RunningNodesFN, [Running])};
+ E1 = {error, _} ->
+ {ClusterStatusFN, E1}
+ end,
+ case Res of
+ {_, ok} -> ok;
+ {FN, {error, E2}} -> throw({error, {could_not_write_file, FN, E2}})
+ end.
+
+try_read_file(FileName) ->
+ case rabbit_file:read_term_file(FileName) of
+ {ok, Term} -> {ok, Term};
+ {error, enoent} -> non_existant;
+ {error, E} -> throw({error, {cannot_read_file, FileName, E}})
+ end.
+
+read_cluster_status() ->
+ case {try_read_file(cluster_status_file_name()),
+ try_read_file(running_nodes_file_name())} of
+ {{ok, [{All, Disc}]}, {ok, [Running]}} when is_list(Running) ->
+ {All, Disc, Running};
+ {_, _} ->
+ throw({error, corrupt_or_missing_cluster_files})
+ end.
+
+update_cluster_status() ->
+ {ok, Status} = rabbit_mnesia:cluster_status_from_mnesia(),
+ write_cluster_status(Status).
+
+reset_cluster_status() ->
+ write_cluster_status({[node()], [node()], [node()]}).
+
+%%----------------------------------------------------------------------------
+%% Cluster notifications
+%%----------------------------------------------------------------------------
+
+joined_cluster(Node, IsDiscNode) ->
+ gen_server:cast(?SERVER, {rabbit_join, Node, IsDiscNode}).
+
+notify_joined_cluster() ->
+ cluster_multicall(joined_cluster, [node(), rabbit_mnesia:is_disc_node()]),
+ ok.
+
+left_cluster(Node) ->
+ gen_server:cast(?SERVER, {left_cluster, Node}).
+
+notify_left_cluster(Node) ->
+ left_cluster(Node),
+ cluster_multicall(left_cluster, [Node]),
+ ok.
+
+node_up(Node, IsDiscNode) ->
+ gen_server:cast(?SERVER, {node_up, Node, IsDiscNode}).
+
+notify_node_up() ->
+ Nodes = cluster_multicall(node_up, [node(), rabbit_mnesia:is_disc_node()]),
%% register other active rabbits with this rabbit
- [ rabbit_running_on(N) || N <- Nodes ],
+ [ node_up(N, ordsets:is_element(N, rabbit_mnesia:clustered_disc_nodes())) ||
+ N <- Nodes ],
ok.
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------
+%% gen_server callbacks
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
init([]) ->
- {ok, ordsets:new()}.
+ {ok, no_state}.
handle_call(_Request, _From, State) ->
{noreply, State}.
-handle_cast({rabbit_running_on, Node}, Nodes) ->
- case ordsets:is_element(Node, Nodes) of
- true -> {noreply, Nodes};
+%% Note: when updating the status file, we can't simply write the mnesia
+%% information since the message can (and will) overtake the mnesia propagation.
+handle_cast({node_up, Node, IsDiscNode}, State) ->
+ case is_already_monitored({rabbit, Node}) of
+ true -> {noreply, State};
false -> rabbit_log:info("rabbit on node ~p up~n", [Node]),
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
+ write_cluster_status({ordsets:add_element(Node, AllNodes),
+ case IsDiscNode of
+ true -> ordsets:add_element(
+ Node, DiscNodes);
+ false -> DiscNodes
+ end,
+ ordsets:add_element(Node, RunningNodes)}),
erlang:monitor(process, {rabbit, Node}),
ok = handle_live_rabbit(Node),
- {noreply, ordsets:add_element(Node, Nodes)}
+ {noreply, State}
end;
+handle_cast({joined_cluster, Node, IsDiscNode}, State) ->
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
+ write_cluster_status({ordsets:add_element(Node, AllNodes),
+ case IsDiscNode of
+ true -> ordsets:add_element(Node,
+ DiscNodes);
+ false -> DiscNodes
+ end,
+ RunningNodes}),
+ {noreply, State};
+handle_cast({left_cluster, Node}, State) ->
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
+ write_cluster_status({ordsets:del_element(Node, AllNodes),
+ ordsets:del_element(Node, DiscNodes),
+ ordsets:del_element(Node, RunningNodes)}),
+ {noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Nodes) ->
+handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
rabbit_log:info("rabbit on node ~p down~n", [Node]),
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
+ write_cluster_status({AllNodes, DiscNodes,
+ ordsets:del_element(Node, RunningNodes)}),
ok = handle_dead_rabbit(Node),
- {noreply, ordsets:del_element(Node, Nodes)};
+ {noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
@@ -90,7 +254,9 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------
+%% Functions that call the module specific hooks when nodes go up/down
+%%----------------------------------------------------------------------------
%% TODO: This may turn out to be a performance hog when there are lots
%% of nodes. We really only need to execute some of these statements
@@ -104,3 +270,32 @@ handle_dead_rabbit(Node) ->
handle_live_rabbit(Node) ->
ok = rabbit_alarm:on_node_up(Node),
ok = rabbit_mnesia:on_node_up(Node).
+
+%%--------------------------------------------------------------------
+%% Internal utils
+%%--------------------------------------------------------------------
+
+cluster_multicall(Fun, Args) ->
+ Node = node(),
+ Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
+ %% notify other rabbits of this cluster
+ case rpc:multicall(Nodes, rabbit_node_monitor, Fun, Args,
+ ?RABBIT_UP_RPC_TIMEOUT) of
+ {_, [] } -> ok;
+ {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
+ end,
+ Nodes.
+
+is_already_monitored(Item) ->
+ {monitors, Monitors} = process_info(self(), monitors),
+ lists:any(fun ({_, Item1}) when Item =:= Item1 -> true;
+ (_) -> false
+ end, Monitors).
+
+legacy_cluster_nodes(Nodes) ->
+ %% We get all the info that we can, including the nodes from mnesia, which
+ %% will be there if the node is a disc node (empty list otherwise)
+ lists:usort(Nodes ++ mnesia:system_info(db_nodes)).
+
+legacy_should_be_disc_node(DiscNodes) ->
+ DiscNodes == [] orelse lists:member(node(), DiscNodes).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 4a6627de..3cc0e5db 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -32,6 +32,7 @@
-define(TIMEOUT, 5000).
all_tests() ->
+ ok = setup_cluster(),
ok = supervisor2_tests:test_all(),
passed = gm_tests:all_tests(),
passed = mirrored_supervisor_tests:all_tests(),
@@ -53,34 +54,61 @@ all_tests() ->
passed = test_log_management_during_startup(),
passed = test_statistics(),
passed = test_arguments_parser(),
- passed = test_cluster_management(),
passed = test_user_management(),
passed = test_runtime_parameters(),
passed = test_server_status(),
passed = test_confirms(),
- passed = maybe_run_cluster_dependent_tests(),
+ passed =
+ do_if_secondary_node(
+ fun run_cluster_dependent_tests/1,
+ fun (SecondaryNode) ->
+ io:format("Skipping cluster dependent tests with node ~p~n",
+ [SecondaryNode]),
+ passed
+ end),
passed = test_configurable_server_properties(),
passed.
-maybe_run_cluster_dependent_tests() ->
+do_if_secondary_node(Up, Down) ->
SecondaryNode = rabbit_nodes:make("hare"),
case net_adm:ping(SecondaryNode) of
- pong -> passed = run_cluster_dependent_tests(SecondaryNode);
- pang -> io:format("Skipping cluster dependent tests with node ~p~n",
- [SecondaryNode])
- end,
- passed.
+ pong -> Up(SecondaryNode);
+ pang -> Down(SecondaryNode)
+ end.
-run_cluster_dependent_tests(SecondaryNode) ->
- SecondaryNodeS = atom_to_list(SecondaryNode),
+setup_cluster() ->
+ do_if_secondary_node(
+ fun (SecondaryNode) ->
+ cover:stop(SecondaryNode),
+ ok = control_action(stop_app, []),
+ %% 'cover' does not cope at all well with nodes disconnecting,
+ %% which happens as part of reset. So we turn it off
+ %% temporarily. That is ok even if we're not in general using
+ %% cover, it just turns the engine on / off and doesn't log
+ %% anything. Note that this way cover won't be on when joining
+ %% the cluster, but this is OK since we're testing the clustering
+ %% interface elsewere anyway.
+ cover:stop(nodes()),
+ ok = control_action(join_cluster,
+ [atom_to_list(SecondaryNode)]),
+ cover:start(nodes()),
+ ok = control_action(start_app, []),
+ ok = control_action(start_app, SecondaryNode, [], [])
+ end,
+ fun (_) -> ok end).
- ok = control_action(stop_app, []),
- ok = safe_reset(),
- ok = control_action(cluster, [SecondaryNodeS]),
- ok = control_action(start_app, []),
- ok = control_action(start_app, SecondaryNode, [], []),
+maybe_run_cluster_dependent_tests() ->
+ do_if_secondary_node(
+ fun (SecondaryNode) ->
+ passed = run_cluster_dependent_tests(SecondaryNode)
+ end,
+ fun (SecondaryNode) ->
+ io:format("Skipping cluster dependent tests with node ~p~n",
+ [SecondaryNode])
+ end).
+run_cluster_dependent_tests(SecondaryNode) ->
io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]),
passed = test_delegates_async(SecondaryNode),
passed = test_delegates_sync(SecondaryNode),
@@ -855,218 +883,6 @@ test_arguments_parser() ->
passed.
-test_cluster_management() ->
- %% 'cluster' and 'reset' should only work if the app is stopped
- {error, _} = control_action(cluster, []),
- {error, _} = control_action(reset, []),
- {error, _} = control_action(force_reset, []),
-
- ok = control_action(stop_app, []),
-
- %% various ways of creating a standalone node
- NodeS = atom_to_list(node()),
- ClusteringSequence = [[],
- [NodeS],
- ["invalid@invalid", NodeS],
- [NodeS, "invalid@invalid"]],
-
- ok = control_action(reset, []),
- lists:foreach(fun (Arg) ->
- ok = control_action(force_cluster, Arg),
- ok
- end,
- ClusteringSequence),
- lists:foreach(fun (Arg) ->
- ok = control_action(reset, []),
- ok = control_action(force_cluster, Arg),
- ok
- end,
- ClusteringSequence),
- ok = control_action(reset, []),
- lists:foreach(fun (Arg) ->
- ok = control_action(force_cluster, Arg),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok
- end,
- ClusteringSequence),
- lists:foreach(fun (Arg) ->
- ok = control_action(reset, []),
- ok = control_action(force_cluster, Arg),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok
- end,
- ClusteringSequence),
-
- %% convert a disk node into a ram node
- ok = control_action(reset, []),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_disc_node(),
- ok = control_action(force_cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
- ok = assert_ram_node(),
-
- %% join a non-existing cluster as a ram node
- ok = control_action(reset, []),
- ok = control_action(force_cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
- ok = assert_ram_node(),
-
- ok = control_action(reset, []),
-
- SecondaryNode = rabbit_nodes:make("hare"),
- case net_adm:ping(SecondaryNode) of
- pong -> passed = test_cluster_management2(SecondaryNode);
- pang -> io:format("Skipping clustering tests with node ~p~n",
- [SecondaryNode])
- end,
-
- ok = control_action(start_app, []),
- passed.
-
-test_cluster_management2(SecondaryNode) ->
- NodeS = atom_to_list(node()),
- SecondaryNodeS = atom_to_list(SecondaryNode),
-
- %% make a disk node
- ok = control_action(cluster, [NodeS]),
- ok = assert_disc_node(),
- %% make a ram node
- ok = control_action(reset, []),
- ok = control_action(cluster, [SecondaryNodeS]),
- ok = assert_ram_node(),
-
- %% join cluster as a ram node
- ok = safe_reset(),
- ok = control_action(force_cluster, [SecondaryNodeS, "invalid1@invalid"]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_ram_node(),
-
- %% ram node will not start by itself
- ok = control_action(stop_app, []),
- ok = control_action(stop_app, SecondaryNode, [], []),
- {error, _} = control_action(start_app, []),
- ok = control_action(start_app, SecondaryNode, [], []),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
-
- %% change cluster config while remaining in same cluster
- ok = control_action(force_cluster, ["invalid2@invalid", SecondaryNodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
-
- %% join non-existing cluster as a ram node
- ok = control_action(force_cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
- {error, _} = control_action(start_app, []),
- ok = assert_ram_node(),
-
- %% join empty cluster as a ram node (converts to disc)
- ok = control_action(cluster, []),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_disc_node(),
-
- %% make a new ram node
- ok = control_action(reset, []),
- ok = control_action(force_cluster, [SecondaryNodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_ram_node(),
-
- %% turn ram node into disk node
- ok = control_action(cluster, [SecondaryNodeS, NodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_disc_node(),
-
- %% convert a disk node into a ram node
- ok = assert_disc_node(),
- ok = control_action(force_cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
- ok = assert_ram_node(),
-
- %% make a new disk node
- ok = control_action(force_reset, []),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_disc_node(),
-
- %% turn a disk node into a ram node
- %%
- %% can't use safe_reset here since for some reason nodes()==[] and
- %% yet w/o stopping coverage things break
- with_suspended_cover(
- [SecondaryNode], fun () -> ok = control_action(reset, []) end),
- ok = control_action(cluster, [SecondaryNodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_ram_node(),
-
- %% NB: this will log an inconsistent_database error, which is harmless
- with_suspended_cover(
- [SecondaryNode], fun () ->
- true = disconnect_node(SecondaryNode),
- pong = net_adm:ping(SecondaryNode)
- end),
-
- %% leaving a cluster as a ram node
- ok = safe_reset(),
- %% ...and as a disk node
- ok = control_action(cluster, [SecondaryNodeS, NodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = safe_reset(),
-
- %% attempt to leave cluster when no other node is alive
- ok = control_action(cluster, [SecondaryNodeS, NodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, SecondaryNode, [], []),
- ok = control_action(stop_app, []),
- {error, {no_running_cluster_nodes, _, _}} =
- control_action(reset, []),
-
- %% attempt to change type when no other node is alive
- {error, {no_running_cluster_nodes, _, _}} =
- control_action(cluster, [SecondaryNodeS]),
-
- %% leave system clustered, with the secondary node as a ram node
- with_suspended_cover(
- [SecondaryNode], fun () -> ok = control_action(force_reset, []) end),
- ok = control_action(start_app, []),
- %% Yes, this is rather ugly. But since we're a clustered Mnesia
- %% node and we're telling another clustered node to reset itself,
- %% we will get disconnected half way through causing a
- %% badrpc. This never happens in real life since rabbitmqctl is
- %% not a clustered Mnesia node and is a hidden node.
- with_suspended_cover(
- [SecondaryNode],
- fun () ->
- {badrpc, nodedown} =
- control_action(force_reset, SecondaryNode, [], []),
- pong = net_adm:ping(SecondaryNode)
- end),
- ok = control_action(cluster, SecondaryNode, [NodeS], []),
- ok = control_action(start_app, SecondaryNode, [], []),
-
- passed.
-
-%% 'cover' does not cope at all well with nodes disconnecting, which
-%% happens as part of reset. So we turn it off temporarily. That is ok
-%% even if we're not in general using cover, it just turns the engine
-%% on / off and doesn't log anything.
-safe_reset() -> with_suspended_cover(
- nodes(), fun () -> control_action(reset, []) end).
-
-with_suspended_cover(Nodes, Fun) ->
- cover:stop(Nodes),
- Res = Fun(),
- cover:start(Nodes),
- Res.
-
test_user_management() ->
%% lots if stuff that should fail
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index e1a7bcae..3fbfeed0 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -121,10 +121,7 @@ remove_backup() ->
info("upgrades: Mnesia backup removed~n", []).
maybe_upgrade_mnesia() ->
- %% rabbit_mnesia:all_clustered_nodes/0 will return [] at this point
- %% if we are a RAM node since Mnesia has not started yet.
- AllNodes = lists:usort(rabbit_mnesia:all_clustered_nodes() ++
- rabbit_mnesia:read_cluster_nodes_config()),
+ AllNodes = rabbit_mnesia:all_clustered_nodes(),
case rabbit_version:upgrades_required(mnesia) of
{error, starting_from_scratch} ->
ok;
@@ -150,12 +147,12 @@ maybe_upgrade_mnesia() ->
upgrade_mode(AllNodes) ->
case nodes_running(AllNodes) of
[] ->
- AfterUs = rabbit_mnesia:read_previously_running_nodes(),
+ AfterUs = rabbit_mnesia:running_clustered_nodes() -- [node()],
case {is_disc_node_legacy(), AfterUs} of
{true, []} ->
primary;
{true, _} ->
- Filename = rabbit_mnesia:running_nodes_filename(),
+ Filename = rabbit_node_monitor:running_nodes_filename(),
die("Cluster upgrade needed but other disc nodes shut "
"down after this one.~nPlease first start the last "
"disc node to shut down.~n~nNote: if several disc "
@@ -222,15 +219,8 @@ secondary_upgrade(AllNodes) ->
IsDiscNode = is_disc_node_legacy(),
rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
cannot_delete_schema),
- %% Note that we cluster with all nodes, rather than all disc nodes
- %% (as we can't know all disc nodes at this point). This is safe as
- %% we're not writing the cluster config, just setting up Mnesia.
- ClusterNodes = case IsDiscNode of
- true -> AllNodes;
- false -> AllNodes -- [node()]
- end,
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
- ok = rabbit_mnesia:init_db(ClusterNodes, true, fun () -> ok end),
+ ok = rabbit_mnesia:init_db(AllNodes, IsDiscNode, true),
ok = rabbit_version:record_desired_for_scope(mnesia),
ok.