diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-08-13 15:22:19 +0100 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-08-13 15:22:19 +0100 |
commit | c8330e555e6ff84c03d8f70bd219d7a73c09065b (patch) | |
tree | 8ce27d6580e437b34427bd2755ac6119d9f09f03 | |
parent | 7f8e354dd49846dd1573c8e6b4d33831a49de8eb (diff) | |
parent | 7e0906f782b4b0a94b0e3e10dc1cd5457696ba6d (diff) | |
download | rabbitmq-server-c8330e555e6ff84c03d8f70bd219d7a73c09065b.tar.gz |
merge default
-rw-r--r-- | docs/rabbitmqctl.1.xml | 192 | ||||
-rw-r--r-- | ebin/rabbit_app.in | 2 | ||||
-rw-r--r-- | src/rabbit.erl | 11 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 46 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 13 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 1235 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 267 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 270 | ||||
-rw-r--r-- | src/rabbit_upgrade.erl | 26 |
9 files changed, 1246 insertions, 816 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 2d25edee..065de14c 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -288,105 +288,185 @@ <title>Cluster management</title> <variablelist> - <varlistentry id="cluster"> - <term><cmdsynopsis><command>cluster</command> <arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term> + <varlistentry id="join_cluster"> + <term><cmdsynopsis><command>join_cluster</command> <arg choice="req"><replaceable>clusternode</replaceable></arg><arg choice="opt"><replaceable>--ram</replaceable></arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> <term>clusternode</term> - <listitem><para>Subset of the nodes of the cluster to which this node should be connected.</para></listitem> + <listitem><para>Node to cluster with.</para></listitem> + </varlistentry> + <varlistentry> + <term><cmdsynopsis><arg choice="opt">--ram</arg></cmdsynopsis></term> + <listitem> + <para> + If provided, the node will join the cluster as a ram node. + </para> + </listitem> </varlistentry> </variablelist> <para> - Instruct the node to become member of a cluster with the - specified nodes. To cluster with currently offline nodes, - use <link linkend="force_cluster"><command>force_cluster</command></link>. + Instruct the node to become member of the cluster that the + specified node is in. Before clustering, the node is reset, so be + careful when using this command. For this command to succeed the + RabbitMQ application must have been stopped, e.g. with <link + linkend="stop_app"><command>stop_app</command></link>. </para> <para> Cluster nodes can be of two types: disk or ram. Disk nodes - replicate data in ram and on disk, thus providing - redundancy in the event of node failure and recovery from - global events such as power failure across all nodes. Ram - nodes replicate data in ram only and are mainly used for - scalability. A cluster must always have at least one disk node. + replicate data in ram and on disk, thus providing redundancy in + the event of node failure and recovery from global events such as + power failure across all nodes. Ram nodes replicate data in ram + only and are mainly used for scalability. A cluster must always + have at least one disk node. </para> <para> - If the current node is to become a disk node it needs to - appear in the cluster node list. Otherwise it becomes a - ram node. If the node list is empty or only contains the - current node then the node becomes a standalone, - i.e. non-clustered, (disk) node. + The node will be a disk node by default. If you wish to wish to + create a ram node, provide the <command>--ram</command> flag. </para> <para> After executing the <command>cluster</command> command, whenever - the RabbitMQ application is started on the current node it - will attempt to connect to the specified nodes, thus - becoming an active node in the cluster comprising those - nodes (and possibly others). + the RabbitMQ application is started on the current node it will + attempt to connect to the nodes that were in the cluster when the + node went down. </para> <para> - The list of nodes does not have to contain all the - cluster's nodes; a subset is sufficient. Also, clustering - generally succeeds as long as at least one of the - specified nodes is active. Hence adjustments to the list - are only necessary if the cluster configuration is to be - altered radically. + To leave a cluster, you can simply <command>reset</command> the + node. You can also remove nodes remotely with the + <command>remove_cluster_node</command> command. </para> <para> - For this command to succeed the RabbitMQ application must - have been stopped, e.g. with <link linkend="stop_app"><command>stop_app</command></link>. Furthermore, - turning a standalone node into a clustered node requires - the node be <link linkend="reset"><command>reset</command></link> first, - in order to avoid accidental destruction of data with the - <command>cluster</command> command. + For more details see the <ulink + url="http://www.rabbitmq.com/clustering.html">clustering + guide</ulink>. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl cluster hare@elena --ram</screen> + <para role="example"> + This command instructs the RabbitMQ node to join the cluster that + <command>hare@elena</command> is part of, as a ram node. </para> + </listitem> + </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>cluster_status</command></cmdsynopsis></term> + <listitem> <para> - For more details see the <ulink url="http://www.rabbitmq.com/clustering.html">clustering guide</ulink>. + Displays all the nodes in the cluster grouped by node type, + together with the currently running nodes. </para> <para role="example-prefix">For example:</para> - <screen role="example">rabbitmqctl cluster rabbit@tanto hare@elena</screen> + <screen role="example">rabbitmqctl cluster_status</screen> <para role="example"> - This command instructs the RabbitMQ node to join the - cluster with nodes <command>rabbit@tanto</command> and - <command>hare@elena</command>. If the node is one of these then - it becomes a disk node, otherwise a ram node. + This command displays the nodes in the cluster. </para> </listitem> </varlistentry> - <varlistentry id="force_cluster"> - <term><cmdsynopsis><command>force_cluster</command> <arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term> + </variablelist> + <variablelist> + <varlistentry> + <term> + <cmdsynopsis> + <command>change_cluster_node_type</command> + <arg choice="req"> + disk | ram + </arg> + </cmdsynopsis> + </term> + <listitem> + <para> + Changes the type of the cluster node. The node must be stopped for + this operation to succeed, and when turning a node into a ram node + the node must not be the only disk node in the cluster. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl change_cluster_node_type disk</screen> + <para role="example"> + This command displays will turn a ram node into a disk node + (provided that other disk nodes exist in the cluster). + </para> + </listitem> + </varlistentry> + </variablelist> + <variablelist> + <varlistentry> + <term> + <cmdsynopsis> + <command>remove_cluster_node</command> + <arg choice="opt">--offline</arg> + </cmdsynopsis> + </term> <listitem> <variablelist> <varlistentry> - <term>clusternode</term> - <listitem><para>Subset of the nodes of the cluster to which this node should be connected.</para></listitem> + <term><cmdsynopsis><arg choice="opt">--offline</arg></cmdsynopsis></term> + <listitem> + <para> + Enables node removal from an offline node. This is only + useful in the situation where all the nodes are offline and + the last node to go down cannot be brought online, thus + preventing the whole cluster to start. It should not be used + in any other circumstances since it can lead to + inconsistencies. + </para> + </listitem> </varlistentry> </variablelist> <para> - Instruct the node to become member of a cluster with the - specified nodes. This will succeed even if the specified nodes - are offline. For a more detailed description, see - <link linkend="cluster"><command>cluster</command>.</link> + Removes a cluster node remotely. The node that is being removed + must be online, while the node we are removing from must be + online, except when using the <command>--offline</command> flag. </para> - <para> - Note that this variant of the cluster command just - ignores the current status of the specified nodes. - Clustering may still fail for a variety of other - reasons. + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl -n hare@mcnulty remove_cluster_node rabbit@stringer</screen> + <para role="example"> + This command will remove the node + <command>rabbit@stringer</command> from the node + <command>hare@mcnulty</command>. </para> </listitem> </varlistentry> + </variablelist> + <variablelist> <varlistentry> - <term><cmdsynopsis><command>cluster_status</command></cmdsynopsis></term> + <term> + <cmdsynopsis> + <command>recluster</command> + <arg choice="req">clusternode</arg> + </cmdsynopsis> + </term> <listitem> + <variablelist> + <varlistentry> + <term>clusternode</term> + <listitem> + <para> + The node to recluster with. + </para> + </listitem> + </varlistentry> + </variablelist> <para> - Displays all the nodes in the cluster grouped by node type, - together with the currently running nodes. + Instructs an already clustered node to contact + <command>clusternode</command> to cluster when waking up. This is + different from <command>join_cluster</command> since it does not + join any cluster - it checks that the node is already in a cluster + with <command>clusternode</command>. + </para> + <para> + The need for this command is motivated by the fact that clusters + can change while a node is offline. Consider the situation in + which node A and B are clustered. A goes down, C clusters with C, + and then B leaves the cluster. When A wakes up, it'll try to + contact B, but this will fail since B is not in the cluster + anymore. <command>recluster -n A C</command> will solve this + situation. </para> <para role="example-prefix">For example:</para> - <screen role="example">rabbitmqctl cluster_status</screen> + <screen role="example">rabbitmqctl change_cluster_node_type disk</screen> <para role="example"> - This command displays the nodes in the cluster. + This command displays will turn a ram node into a disk node + (provided that other disk nodes exist in the cluster). </para> </listitem> </varlistentry> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 087c62a9..78842281 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -33,7 +33,7 @@ {default_user_tags, [administrator]}, {default_vhost, <<"/">>}, {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, - {cluster_nodes, []}, + {cluster_nodes, {[], true}}, {server_properties, []}, {collect_statistics, none}, {collect_statistics_interval, 5000}, diff --git a/src/rabbit.erl b/src/rabbit.erl index ed258c71..f7953c55 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -176,7 +176,7 @@ -rabbit_boot_step({notify_cluster, [{description, "notify cluster nodes"}, - {mfa, {rabbit_node_monitor, notify_cluster, []}}, + {mfa, {rabbit_node_monitor, notify_node_up, []}}, {requires, networking}]}). %%--------------------------------------------------------------------------- @@ -300,6 +300,7 @@ start() -> %% We do not want to HiPE compile or upgrade %% mnesia after just restarting the app ok = ensure_application_loaded(), + ok = rabbit_mnesia:prepare(), ok = ensure_working_log_handlers(), ok = app_utils:start_applications(app_startup_order()), ok = print_plugin_info(rabbit_plugins:active()) @@ -309,6 +310,7 @@ boot() -> start_it(fun() -> ok = ensure_application_loaded(), maybe_hipe_compile(), + ok = rabbit_mnesia:prepare(), ok = ensure_working_log_handlers(), ok = rabbit_upgrade:maybe_upgrade_mnesia(), Plugins = rabbit_plugins:setup(), @@ -408,7 +410,6 @@ start(normal, []) -> end. stop(_State) -> - ok = rabbit_mnesia:record_running_nodes(), terminated_ok = error_logger:delete_report_handler(rabbit_error_logger), ok = rabbit_alarm:stop(), ok = case rabbit_mnesia:is_clustered() of @@ -505,12 +506,12 @@ sort_boot_steps(UnsortedSteps) -> end. boot_step_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) -> + AllNodes = rabbit_mnesia:all_clustered_nodes(), {Err, Nodes} = - case rabbit_mnesia:read_previously_running_nodes() of + case AllNodes -- [node()] of [] -> {"Timeout contacting cluster nodes. Since RabbitMQ was" " shut down forcefully~nit cannot determine which nodes" - " are timing out. Details on all nodes will~nfollow.~n", - rabbit_mnesia:all_clustered_nodes() -- [node()]}; + " are timing out.~n", []}; Ns -> {rabbit_misc:format( "Timeout contacting cluster nodes: ~p.~n", [Ns]), Ns} diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 27a35142..b92493e3 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -25,10 +25,14 @@ -define(QUIET_OPT, "-q"). -define(NODE_OPT, "-n"). -define(VHOST_OPT, "-p"). +-define(RAM_OPT, "--ram"). +-define(OFFLINE_OPT, "--offline"). -define(QUIET_DEF, {?QUIET_OPT, flag}). -define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}). -define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}). +-define(RAM_DEF, {?RAM_OPT, flag}). +-define(OFFLINE_DEF, {?OFFLINE_OPT, flag}). -define(GLOBAL_DEFS(Node), [?QUIET_DEF, ?NODE_DEF(Node)]). @@ -41,8 +45,10 @@ force_reset, rotate_logs, - cluster, - force_cluster, + {join_cluster, [?RAM_DEF]}, + change_cluster_node_type, + recluster, + {remove_cluster_node, [?OFFLINE_DEF]}, cluster_status, add_user, @@ -239,17 +245,31 @@ action(force_reset, Node, [], _Opts, Inform) -> Inform("Forcefully resetting node ~p", [Node]), call(Node, {rabbit_mnesia, force_reset, []}); -action(cluster, Node, ClusterNodeSs, _Opts, Inform) -> - ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), - Inform("Clustering node ~p with ~p", - [Node, ClusterNodes]), - rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]); - -action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) -> - ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), - Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)", - [Node, ClusterNodes]), - rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]); +action(join_cluster, Node, [ClusterNodeS], Opts, Inform) -> + ClusterNode = list_to_atom(ClusterNodeS), + DiscNode = not proplists:get_bool(?RAM_OPT, Opts), + Inform("Clustering node ~p with ~p", [Node, ClusterNode]), + rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, DiscNode]); + +action(change_cluster_node_type, Node, ["ram"], _Opts, Inform) -> + Inform("Turning ~p into a ram node", [Node]), + rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [ram]); +action(change_cluster_node_type, Node, [Type], _Opts, Inform) + when Type =:= "disc" orelse Type =:= "disk" -> + Inform("Turning ~p into a disc node", [Node]), + rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [disc]); + +action(recluster, Node, [ClusterNodeS], _Opts, Inform) -> + ClusterNode = list_to_atom(ClusterNodeS), + Inform("Re-clustering ~p with ~p", [Node, ClusterNode]), + rpc_call(Node, rabbit_mnesia, recluster, [ClusterNode]); + +action(remove_cluster_node, Node, [ClusterNodeS], Opts, Inform) -> + ClusterNode = list_to_atom(ClusterNodeS), + RemoveWhenOffline = proplists:get_bool(?OFFLINE_OPT, Opts), + Inform("Removing node ~p from cluster", [ClusterNode]), + rpc_call(Node, rabbit_mnesia, remove_cluster_node, + [ClusterNode, RemoveWhenOffline]); action(wait, Node, [PidFile], _Opts, Inform) -> Inform("Waiting for ~p", [Node]), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 8f6a9bcf..5c91fe53 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -60,6 +60,8 @@ -export([multi_call/2]). -export([os_cmd/1]). -export([gb_sets_difference/2]). +-export([version/0]). +-export([sequence_error/1]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -217,6 +219,9 @@ ([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}). -spec(os_cmd/1 :: (string()) -> string()). -spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()). +-spec(version/0 :: () -> string()). +-spec(sequence_error/1 :: ([({'error', any()} | any())]) + -> {'error', any()} | any()). -endif. @@ -934,3 +939,11 @@ os_cmd(Command) -> gb_sets_difference(S1, S2) -> gb_sets:fold(fun gb_sets:delete_any/2, S1, S2). + +version() -> + {ok, VSN} = application:get_key(rabbit, vsn), + VSN. + +sequence_error([T]) -> T; +sequence_error([{error, _} = Error | _]) -> Error; +sequence_error([_ | Rest]) -> sequence_error(Rest). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 61b4054a..6aade1da 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -17,16 +17,41 @@ -module(rabbit_mnesia). --export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, - cluster/1, force_cluster/1, reset/0, force_reset/0, init_db/3, - is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, - empty_ram_only_tables/0, copy_db/1, wait_for_tables/1, - create_cluster_nodes_config/1, read_cluster_nodes_config/0, - record_running_nodes/0, read_previously_running_nodes/0, - running_nodes_filename/0, is_disc_node/0, on_node_down/1, - on_node_up/1]). - --export([table_names/0]). +-export([prepare/0, + init/0, + join_cluster/2, + reset/0, + force_reset/0, + recluster/1, + change_cluster_node_type/1, + remove_cluster_node/2, + + status/0, + is_db_empty/0, + is_clustered/0, + all_clustered_nodes/0, + clustered_disc_nodes/0, + running_clustered_nodes/0, + is_disc_node/0, + dir/0, + table_names/0, + wait_for_tables/1, + cluster_status_from_mnesia/0, + + init_db/3, + empty_ram_only_tables/0, + copy_db/1, + wait_for_tables/0, + + on_node_up/1, + on_node_down/1 + ]). + +%% Used internally in rpc calls +-export([node_info/0, + remove_node_if_mnesia_running/1, + is_running_remote/0 + ]). %% create_tables/0 exported for helping embed RabbitMQ in or alongside %% other mnesia-using Erlang applications, such as ejabberd @@ -38,147 +63,127 @@ -ifdef(use_specs). --export_type([node_type/0]). +-export_type([node_type/0, cluster_status/0]). --type(node_type() :: disc_only | disc | ram | unknown). --spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} | - {'running_nodes', [node()]}]). --spec(dir/0 :: () -> file:filename()). --spec(ensure_mnesia_dir/0 :: () -> 'ok'). +-type(node_type() :: disc | ram). +-type(cluster_status() :: {ordsets:ordset(node()), ordsets:ordset(node()), + ordsets:ordset(node())}). + +%% Main interface +-spec(prepare/0 :: () -> 'ok'). -spec(init/0 :: () -> 'ok'). --spec(init_db/3 :: ([node()], boolean(), rabbit_misc:thunk('ok')) -> 'ok'). --spec(is_db_empty/0 :: () -> boolean()). --spec(cluster/1 :: ([node()]) -> 'ok'). --spec(force_cluster/1 :: ([node()]) -> 'ok'). --spec(cluster/2 :: ([node()], boolean()) -> 'ok'). +-spec(join_cluster/2 :: ([node()], boolean()) -> 'ok'). -spec(reset/0 :: () -> 'ok'). -spec(force_reset/0 :: () -> 'ok'). +-spec(recluster/1 :: (node()) -> 'ok'). +-spec(change_cluster_node_type/1 :: (node_type()) -> 'ok'). +-spec(remove_cluster_node/2 :: (node(), boolean()) -> 'ok'). + +%% Various queries to get the status of the db +-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} | + {'running_nodes', [node()]}]). +-spec(is_db_empty/0 :: () -> boolean()). -spec(is_clustered/0 :: () -> boolean()). --spec(running_clustered_nodes/0 :: () -> [node()]). -spec(all_clustered_nodes/0 :: () -> [node()]). +-spec(clustered_disc_nodes/0 :: () -> [node()]). +-spec(running_clustered_nodes/0 :: () -> [node()]). +-spec(is_disc_node/0 :: () -> boolean()). +-spec(dir/0 :: () -> file:filename()). +-spec(table_names/0 :: () -> [atom()]). +-spec(cluster_status_from_mnesia/0 :: () -> {'ok', cluster_status()} | + {'error', any()}). + +%% Operations on the db and utils, mainly used in `rabbit_upgrade' and `rabbit' +-spec(init_db/3 :: ([node()], boolean(), boolean()) -> 'ok'). -spec(empty_ram_only_tables/0 :: () -> 'ok'). -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). -spec(wait_for_tables/1 :: ([atom()]) -> 'ok'). --spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok'). --spec(read_cluster_nodes_config/0 :: () -> [node()]). --spec(record_running_nodes/0 :: () -> 'ok'). --spec(read_previously_running_nodes/0 :: () -> [node()]). --spec(running_nodes_filename/0 :: () -> file:filename()). --spec(is_disc_node/0 :: () -> boolean()). +-spec(check_cluster_consistency/0 :: () -> 'ok'). + +%% Hooks used in `rabbit_node_monitor' -spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). --spec(table_names/0 :: () -> [atom()]). +%% Functions used in internal rpc calls +-spec(node_info/0 :: () -> {string(), string(), + ({'ok', cluster_status()} | 'error')}). +-spec(remove_node_if_mnesia_running/1 :: (node()) -> 'ok' | + {'error', term()}). -endif. %%---------------------------------------------------------------------------- +%% Main interface +%%---------------------------------------------------------------------------- -status() -> - [{nodes, case mnesia:system_info(is_running) of - yes -> [{Key, Nodes} || - {Key, CopyType} <- [{disc_only, disc_only_copies}, - {disc, disc_copies}, - {ram, ram_copies}], - begin - Nodes = nodes_of_type(CopyType), - Nodes =/= [] - end]; - no -> case all_clustered_nodes() of - [] -> []; - Nodes -> [{unknown, Nodes}] - end; - Reason when Reason =:= starting; Reason =:= stopping -> - exit({rabbit_busy, try_again_later}) - end}, - {running_nodes, running_clustered_nodes()}]. +prepare() -> + ensure_mnesia_dir(), + rabbit_node_monitor:prepare_cluster_status_file(), + check_cluster_consistency(). init() -> ensure_mnesia_running(), ensure_mnesia_dir(), - Nodes = read_cluster_nodes_config(), - ok = init_db(Nodes, should_be_disc_node(Nodes)), + DiscNode = is_disc_node(), + init_db_and_upgrade(all_clustered_nodes(), DiscNode, DiscNode), %% We intuitively expect the global name server to be synced when %% Mnesia is up. In fact that's not guaranteed to be the case - let's %% make it so. ok = global:sync(), - ok = delete_previously_running_nodes(), ok. -is_db_empty() -> - lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end, - table_names()). +%% Make the node join a cluster. The node will be reset automatically before we +%% actually cluster it. The nodes provided will be used to find out about the +%% nodes in the cluster. +%% This function will fail if: +%% +%% * The node is currently the only disc node of its cluster +%% * We can't connect to any of the nodes provided +%% * The node is currently already clustered with the cluster of the nodes +%% provided +%% +%% Note that we make no attempt to verify that the nodes provided are all in the +%% same cluster, we simply pick the first online node and we cluster to its +%% cluster. +join_cluster(DiscoveryNode, WantDiscNode) -> + case is_disc_and_clustered() andalso [node()] =:= clustered_disc_nodes() of + true -> throw({error, + {standalone_ram_node, + "You can't cluster a node if it's the only " + "disc node in its existing cluster. If new nodes " + "joined while this node was offline, use \"recluster\" " + "to add them manually"}}); + _ -> ok + end, -cluster(ClusterNodes) -> - cluster(ClusterNodes, false). -force_cluster(ClusterNodes) -> - cluster(ClusterNodes, true). - -%% Alter which disk nodes this node is clustered with. This can be a -%% subset of all the disk nodes in the cluster but can (and should) -%% include the node itself if it is to be a disk rather than a ram -%% node. If Force is false, only connections to online nodes are -%% allowed. -cluster(ClusterNodes, Force) -> - rabbit_misc:local_info_msg("Clustering with ~p~s~n", - [ClusterNodes, if Force -> " forcefully"; - true -> "" - end]), ensure_mnesia_not_running(), ensure_mnesia_dir(), - case not Force andalso is_clustered() andalso - is_only_disc_node(node(), false) andalso - not should_be_disc_node(ClusterNodes) - of - true -> log_both("last running disc node leaving cluster"); - _ -> ok - end, + {ClusterNodes, DiscNodes, _} = case discover_cluster(DiscoveryNode) of + {ok, Res} -> Res; + {error, Reason} -> throw({error, Reason}) + end, - %% Wipe mnesia if we're changing type from disc to ram - case {is_disc_node(), should_be_disc_node(ClusterNodes)} of - {true, false} -> rabbit_misc:with_local_io( - fun () -> error_logger:warning_msg( - "changing node type; wiping " - "mnesia...~n~n") - end), - rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), - cannot_delete_schema); - _ -> ok + case lists:member(node(), ClusterNodes) of + true -> throw({error, {already_clustered, + "You are already clustered with the nodes you " + "have selected"}}); + false -> ok end, - %% Pre-emptively leave the cluster - %% - %% We're trying to handle the following two cases: - %% 1. We have a two-node cluster, where both nodes are disc nodes. - %% One node is re-clustered as a ram node. When it tries to - %% re-join the cluster, but before it has time to update its - %% tables definitions, the other node will order it to re-create - %% its disc tables. So, we need to leave the cluster before we - %% can join it again. - %% 2. We have a two-node cluster, where both nodes are disc nodes. - %% One node is forcefully reset (so, the other node thinks its - %% still a part of the cluster). The reset node is re-clustered - %% as a ram node. Same as above, we need to leave the cluster - %% before we can join it. But, since we don't know if we're in a - %% cluster or not, we just pre-emptively leave it before joining. - ProperClusterNodes = ClusterNodes -- [node()], - try - ok = leave_cluster(ProperClusterNodes, ProperClusterNodes) - catch - {error, {no_running_cluster_nodes, _, _}} when Force -> - ok - end, + %% reset the node. this simplifies things and it will be needed in this case + %% - we're joining a new cluster with new nodes which are not in synch with + %% the current node. I also lifts the burden of reseting the node from the + %% user. + reset(false), + + rabbit_misc:local_info_msg("Clustering with ~p~n", [ClusterNodes]), %% Join the cluster - start_mnesia(), - try - ok = init_db(ClusterNodes, Force), - ok = create_cluster_nodes_config(ClusterNodes) - after - stop_mnesia() - end, + ok = init_db_with_mnesia(DiscNodes, WantDiscNode, false), + + rabbit_node_monitor:notify_joined_cluster(), ok. @@ -188,15 +193,445 @@ cluster(ClusterNodes, Force) -> reset() -> reset(false). force_reset() -> reset(true). +reset(Force) -> + rabbit_misc:local_info_msg("Resetting Rabbit~s~n", + [if Force -> " forcefully"; + true -> "" + end]), + ensure_mnesia_not_running(), + Node = node(), + case Force of + true -> + disconnect_nodes(nodes()); + false -> + AllNodes = all_clustered_nodes(), + %% Reconnecting so that we will get an up to date nodes. + %% We don't need to check for consistency because we are resetting. + %% Force=true here so that reset still works when clustered with a + %% node which is down. + init_db_with_mnesia(AllNodes, is_disc_node(), false, true), + case is_disc_and_clustered() andalso + [node()] =:= clustered_disc_nodes() + of + true -> throw({error, {standalone_ram_node, + "You can't reset a node if it's the " + "only disc node in a cluster. Please " + "convert another node of the cluster " + "to a disc node first."}}); + false -> ok + end, + leave_cluster(), + rabbit_misc:ensure_ok(mnesia:delete_schema([Node]), + cannot_delete_schema), + disconnect_nodes(all_clustered_nodes()), + ok + end, + %% remove persisted messages and any other garbage we find + ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")), + ok = rabbit_node_monitor:reset_cluster_status_file(), + ok. + +%% We need to make sure that we don't end up in a distributed Erlang system with +%% nodes while not being in an Mnesia cluster with them. We don't handle that +%% well. +disconnect_nodes(Nodes) -> [erlang:disconnect_node(N) || N <- Nodes]. + +change_cluster_node_type(Type) -> + ensure_mnesia_dir(), + ensure_mnesia_not_running(), + case is_clustered() of + false -> throw({error, {not_clustered, + "Non-clustered nodes can only be disc nodes"}}); + true -> ok + end, + DiscoveryNodes = all_clustered_nodes(), + {AllNodes, DiscNodes, _} = + case discover_cluster(DiscoveryNodes) of + {ok, Status} -> + Status; + {error, _Reason} -> + throw({error, + {cannot_connect_to_cluster, + "Could not connect to the cluster nodes present in " + "this node status file. If the cluster has changed, " + "you can use the \"recluster\" command to point to the " + "new cluster nodes"}}) + end, + WantDiscNode = case Type of + ram -> false; + disc -> true + end, + case not WantDiscNode andalso [node()] =:= DiscNodes of + true -> throw({error, + {standalone_ram_node, + "You can't change the node type to ram if the node is " + "the only disc node in its cluster. Please add more " + "disc nodes to the cluster first."}}); + false -> ok + end, + ok = init_db_with_mnesia(AllNodes, WantDiscNode, false). + +recluster(DiscoveryNode) -> + ensure_mnesia_not_running(), + ensure_mnesia_dir(), + + Status = {AllNodes, _, _} = + case discover_cluster(DiscoveryNode) of + {ok, Status0} -> + Status0; + {error, _Reason} -> + throw({error, + {cannot_connect_to_node, + "Could not connect to the cluster node provided"}}) + end, + case ordsets:is_element(node(), AllNodes) of + true -> %% As in `check_consistency/0', we can safely delete the schema + %% here, since it'll be replicated from the other nodes + mnesia:delete_schema([node()]), + rabbit_node_monitor:write_cluster_status_file(Status), + init_db_with_mnesia(AllNodes, is_disc_node(), false); + false -> throw({error, + {inconsistent_cluster, + "The nodes provided do not have this node as part of " + "the cluster"}}) + end, + + ok. + +%% We proceed like this: try to remove the node locally. If the node if offline, +%% we remove the node if: +%% * This node is a disc node +%% * All other nodes are offline +%% * This node was, at the best of our knowledge (see comment below) the last +%% or second to last after the node we're removing to go down +remove_cluster_node(Node, RemoveWhenOffline) -> + case ordsets:is_element(Node, all_clustered_nodes()) of + true -> ok; + false -> throw({error, {not_a_cluster_node, + "The node selected is not in the cluster."}}) + end, + case {mnesia:system_info(is_running), RemoveWhenOffline} of + {yes, true} -> throw({error, {online_node_offline_flag, + "You set the --offline flag, which is " + "used to remove nodes remotely from " + "offline nodes, but this node is " + "online. "}}); + _ -> ok + end, + case remove_node_if_mnesia_running(Node) of + ok -> + ok; + {error, mnesia_not_running} -> + case RemoveWhenOffline of + true -> remove_node_offline_node(Node); + false -> throw({error, + {offline_node_no_offline_flag, + "You are trying to remove a node from an " + "offline node. That's dangerous, but can be " + "done with the --offline flag. Please consult " + "the manual for rabbitmqctl for more " + "informations."}}) + end; + Err = {error, _} -> + throw(Err) + end. + +remove_node_offline_node(Node) -> + case {ordsets:del_element(Node, + running_nodes(all_clustered_nodes())), + is_disc_node()} + of + {[], true} -> + %% Note that while we check if the nodes was the last to go down, + %% apart from the node we're removing from, this is still unsafe. + %% Consider the situation in which A and B are clustered. A goes + %% down, and records B as the running node. Then B gets clustered + %% with C, C goes down and B goes down. In this case, C is the + %% second-to-last, but we don't know that and we'll remove B from A + %% anyway, even if that will lead to bad things. + case ordsets:subtract(running_clustered_nodes(), + ordsets:from_list([node(), Node])) + of + [] -> start_mnesia(), + try + [mnesia:force_load_table(T) || + T <- rabbit_mnesia:table_names()], + remove_cluster_node(Node, false), + ensure_mnesia_running() + after + stop_mnesia() + end; + _ -> throw({error, + {not_last_node_to_go_down, + "The node you're trying to remove from was not " + "the last to go down (excluding the node you are " + "removing). Please use the the last node to go " + "down to remove nodes when the cluster is " + "offline."}}) + end; + {_, _} -> + throw({error, + {removing_node_from_offline_node, + "To remove a node remotely from an offline node, the node " + "you're removing from must be a disc node and all the " + "other nodes must be offline."}}) + end. + + +%%---------------------------------------------------------------------------- +%% Queries +%%---------------------------------------------------------------------------- + +status() -> + IfNonEmpty = fun (_, []) -> []; + (Type, Nodes) -> [{Type, Nodes}] + end, + [{nodes, (IfNonEmpty(disc, clustered_disc_nodes()) ++ + IfNonEmpty(ram, clustered_ram_nodes()))}] ++ + case mnesia:system_info(is_running) of + yes -> [{running_nodes, running_clustered_nodes()}]; + no -> [] + end. + +is_db_empty() -> + lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end, + table_names()). + is_clustered() -> - RunningNodes = running_clustered_nodes(), - [node()] /= RunningNodes andalso [] /= RunningNodes. + Nodes = all_clustered_nodes(), + [node()] =/= Nodes andalso [] =/= Nodes. + +is_disc_and_clustered() -> + is_disc_node() andalso is_clustered(). + +%% Functions that retrieve the nodes in the cluster will rely on the status file +%% if offline. all_clustered_nodes() -> - mnesia:system_info(db_nodes). + cluster_status(all). + +clustered_disc_nodes() -> + cluster_status(disc). + +clustered_ram_nodes() -> + ordsets:subtract(cluster_status(all), cluster_status(disc)). running_clustered_nodes() -> - mnesia:system_info(running_db_nodes). + cluster_status(running). + +running_clustered_disc_nodes() -> + {_, DiscNodes, RunningNodes} = cluster_status(), + ordsets:intersection(DiscNodes, RunningNodes). + +%% This function is the actual source of information, since it gets the data +%% from mnesia. Obviously it'll work only when mnesia is running. +mnesia_nodes() -> + case mnesia:system_info(is_running) of + no -> {error, mnesia_not_running}; + yes -> %% If the tables are not present, it means that `init_db/3' hasn't + %% been run yet. In other words, either we are a virgin node or a + %% restarted RAM node. In both cases we're not interested in what + %% mnesia has to say. + IsDiscNode = mnesia:system_info(use_dir), + Tables = mnesia:system_info(tables), + {Table, _} = case table_definitions(case IsDiscNode of + true -> disc; + false -> ram + end) of [T|_] -> T end, + case lists:member(Table, Tables) of + true -> + AllNodes = + ordsets:from_list(mnesia:system_info(db_nodes)), + DiscCopies = ordsets:from_list( + mnesia:table_info(schema, disc_copies)), + DiscNodes = + case IsDiscNode of + true -> ordsets:add_element(node(), DiscCopies); + false -> DiscCopies + end, + {ok, {AllNodes, DiscNodes}}; + false -> + {error, tables_not_present} + end + end. + +cluster_status(WhichNodes, ForceMnesia) -> + %% I don't want to call `running_nodes/1' unless if necessary, since it can + %% deadlock when stopping applications. + Nodes = case mnesia_nodes() of + {ok, {AllNodes, DiscNodes}} -> + {ok, {AllNodes, DiscNodes, + fun() -> running_nodes(AllNodes) end}}; + {error, _Reason} when not ForceMnesia -> + {AllNodes, DiscNodes, RunningNodes} = + rabbit_node_monitor:read_cluster_status_file(), + %% The cluster status file records the status when the node + %% is online, but we know for sure that the node is offline + %% now, so we can remove it from the list of running nodes. + {ok, + {AllNodes, DiscNodes, + fun() -> ordsets:del_element(node(), RunningNodes) end}}; + Err = {error, _} -> + Err + end, + case Nodes of + {ok, {AllNodes1, DiscNodes1, RunningNodesThunk}} -> + {ok, case WhichNodes of + status -> {AllNodes1, DiscNodes1, RunningNodesThunk()}; + all -> AllNodes1; + disc -> DiscNodes1; + running -> RunningNodesThunk() + end}; + Err1 = {error, _} -> + Err1 + end. + +cluster_status(WhichNodes) -> + {ok, Status} = cluster_status(WhichNodes, false), + Status. + +cluster_status() -> + cluster_status(status). + +cluster_status_from_mnesia() -> + cluster_status(status, true). + +node_info() -> + {erlang:system_info(otp_release), rabbit_misc:version(), + cluster_status_from_mnesia()}. + +is_disc_node() -> + DiscNodes = clustered_disc_nodes(), + DiscNodes =:= [] orelse ordsets:is_element(node(), DiscNodes). + +dir() -> mnesia:system_info(directory). + +table_names() -> + [Tab || {Tab, _} <- table_definitions()]. + +%%---------------------------------------------------------------------------- +%% Operations on the db +%%---------------------------------------------------------------------------- + +%% Adds the provided nodes to the mnesia cluster, creating a new schema if there +%% is the need to and catching up if there are other nodes in the cluster +%% already. It also updates the cluster status file. +init_db(ClusterNodes, WantDiscNode, Force) -> + Nodes = change_extra_db_nodes(ClusterNodes, Force), + %% Note that we use `system_info' here and not the cluster status since when + %% we start rabbit for the first time the cluster status will say we are a + %% disc node but the tables won't be present yet. + WasDiscNode = mnesia:system_info(use_dir), + case {Nodes, WasDiscNode, WantDiscNode} of + {[], _, false} -> + %% Standalone ram node, we don't want that + throw({error, cannot_create_standalone_ram_node}); + {[], false, true} -> + %% RAM -> disc, starting from scratch + ok = create_schema(); + {[], true, true} -> + %% First disc node up + ok; + {[AnotherNode | _], _, _} -> + %% Subsequent node in cluster, catch up + ensure_version_ok( + rpc:call(AnotherNode, rabbit_version, recorded, [])), + ok = wait_for_replicated_tables(), + %% The sequence in which we delete the schema and then the other + %% tables is important: if we delete the schema first when moving to + %% RAM mnesia will loudly complain since it doesn't make much sense + %% to do that. But when moving to disc, we need to move the schema + %% first. + case WantDiscNode of + true -> create_local_table_copy(schema, disc_copies), + create_local_table_copies(disc); + false -> create_local_table_copies(ram), + create_local_table_copy(schema, ram_copies) + end + end, + ensure_schema_integrity(), + rabbit_node_monitor:update_cluster_status_file(), + ok. + +init_db_and_upgrade(ClusterNodes, WantDiscNode, Force) -> + ok = init_db(ClusterNodes, WantDiscNode, Force), + ok = case rabbit_upgrade:maybe_upgrade_local() of + ok -> ok; + starting_from_scratch -> rabbit_version:record_desired(); + version_not_available -> schema_ok_or_move() + end, + %% `maybe_upgrade_local' restarts mnesia, so ram nodes will forget about the + %% cluster + case WantDiscNode of + false -> start_mnesia(), + change_extra_db_nodes(ClusterNodes, true), + wait_for_replicated_tables(); + true -> ok + end, + ok. + +init_db_with_mnesia(ClusterNodes, WantDiscNode, CheckConsistency, Force) -> + start_mnesia(CheckConsistency), + try + init_db_and_upgrade(ClusterNodes, WantDiscNode, Force) + after + stop_mnesia() + end. + +init_db_with_mnesia(ClusterNodes, WantDiscNode, Force) -> + init_db_with_mnesia(ClusterNodes, WantDiscNode, true, Force). + +ensure_mnesia_dir() -> + MnesiaDir = dir() ++ "/", + case filelib:ensure_dir(MnesiaDir) of + {error, Reason} -> + throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}}); + ok -> + ok + end. + +ensure_mnesia_running() -> + case mnesia:system_info(is_running) of + yes -> + ok; + starting -> + wait_for(mnesia_running), + ensure_mnesia_running(); + Reason when Reason =:= no; Reason =:= stopping -> + throw({error, mnesia_not_running}) + end. + +ensure_mnesia_not_running() -> + case mnesia:system_info(is_running) of + no -> + ok; + stopping -> + wait_for(mnesia_not_running), + ensure_mnesia_not_running(); + Reason when Reason =:= yes; Reason =:= starting -> + throw({error, mnesia_unexpectedly_running}) + end. + +ensure_schema_integrity() -> + case check_schema_integrity() of + ok -> + ok; + {error, Reason} -> + throw({error, {schema_integrity_check_failed, Reason}}) + end. + +check_schema_integrity() -> + Tables = mnesia:system_info(tables), + case check_tables(fun (Tab, TabDef) -> + case lists:member(Tab, Tables) of + false -> {error, {table_missing, Tab}}; + true -> check_table_attributes(Tab, TabDef) + end + end) of + ok -> ok = wait_for_tables(), + check_tables(fun check_table_content/2); + Other -> Other + end. empty_ram_only_tables() -> Node = node(), @@ -209,13 +644,129 @@ empty_ram_only_tables() -> end, table_names()), ok. +create_tables() -> create_tables(disc). + +create_tables(Type) -> + lists:foreach(fun ({Tab, TabDef}) -> + TabDef1 = proplists:delete(match, TabDef), + case mnesia:create_table(Tab, TabDef1) of + {atomic, ok} -> ok; + {aborted, Reason} -> + throw({error, {table_creation_failed, + Tab, TabDef1, Reason}}) + end + end, + table_definitions(Type)), + ok. + +copy_db(Destination) -> + ok = ensure_mnesia_not_running(), + rabbit_file:recursive_copy(dir(), Destination). + +wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). + +wait_for_tables() -> wait_for_tables(table_names()). + +wait_for_tables(TableNames) -> + case mnesia:wait_for_tables(TableNames, 30000) of + ok -> + ok; + {timeout, BadTabs} -> + throw({error, {timeout_waiting_for_tables, BadTabs}}); + {error, Reason} -> + throw({error, {failed_waiting_for_tables, Reason}}) + end. + +%% This does not guarantee us much, but it avoids some situations that will +%% definitely end up badly +check_cluster_consistency() -> + AllNodes = ordsets:del_element(node(), all_clustered_nodes()), + %% We want to find 0 or 1 consistent nodes. + case + lists:foldl( + fun(Node, {error, Error}) -> + case rpc:call(Node, rabbit_mnesia, node_info, []) of + {badrpc, _Reason} -> + {error, Error}; + {OTP, Rabbit, Res} -> + rabbit_misc:sequence_error( + [check_version_consistency( + erlang:system_info(otp_release), OTP, "OTP"), + check_version_consistency( + rabbit_misc:version(), Rabbit, "Rabbit"), + case Res of + {ok, Status} -> + check_nodes_consistency(Node, Status); + {error, _Reason} -> + {error, Error} + end]) + end; + (_Node, {ok, Status}) -> + {ok, Status} + end, {error, no_nodes}, AllNodes) + of + {ok, Status = {RemoteAllNodes, _, _}} -> + case ordsets:is_subset(all_clustered_nodes(), RemoteAllNodes) of + true -> ok; + false -> %% We delete the schema here since we have more nodes + %% than the actually clustered ones, and there is no + %% way to remove those nodes from our schema + %% otherwise. On the other hand, we are sure that there + %% is another online node that we can use to sync the + %% tables with. There is a race here: if between this + %% check and the `init_db' invocation the cluster gets + %% disbanded, we're left with a node with no mnesia + %% data that will try to connect to offline nodes. + mnesia:delete_schema([node()]) + end, + rabbit_node_monitor:write_cluster_status_file(Status); + {error, no_nodes} -> + ok; + {error, Error} -> + throw({error, Error}) + end. + +%%-------------------------------------------------------------------- +%% Hooks for `rabbit_node_monitor' %%-------------------------------------------------------------------- -nodes_of_type(Type) -> - %% This function should return the nodes of a certain type (ram, - %% disc or disc_only) in the current cluster. The type of nodes - %% is determined when the cluster is initially configured. - mnesia:table_info(schema, Type). +on_node_up(Node) -> + case running_clustered_disc_nodes() =:= [Node] of + true -> rabbit_log:info("cluster contains disc nodes again~n"); + false -> ok + end. + +on_node_down(_Node) -> + case running_clustered_disc_nodes() =:= [] of + true -> rabbit_log:info("only running disc node went down~n"); + false -> ok + end. + +%%-------------------------------------------------------------------- +%% Internal helpers +%%-------------------------------------------------------------------- + +discover_cluster(Nodes) when is_list(Nodes) -> + lists:foldl(fun (_, {ok, Res}) -> {ok, Res}; + (Node, {error, _}) -> discover_cluster(Node) + end, + {error, no_nodes_provided}, + Nodes); +discover_cluster(Node) -> + OfflineError = + {error, {cannot_discover_cluster, + "The nodes provided is either offline or not running"}}, + case Node =:= node() of + true -> + {error, {cannot_discover_cluster, + "You provided the current node as node to cluster with"}}; + false -> + case rpc:call(Node, rabbit_mnesia, cluster_status_from_mnesia, []) of + {badrpc, _Reason} -> OfflineError; + {error, mnesia_not_running} -> OfflineError; + {ok, Res} -> {ok, Res} + end + end. %% The tables aren't supposed to be on disk on a ram node table_definitions(disc) -> @@ -336,68 +887,11 @@ queue_name_match() -> resource_match(Kind) -> #resource{kind = Kind, _='_'}. -table_names() -> - [Tab || {Tab, _} <- table_definitions()]. - replicated_table_names() -> [Tab || {Tab, TabDef} <- table_definitions(), not lists:member({local_content, true}, TabDef) ]. -dir() -> mnesia:system_info(directory). - -ensure_mnesia_dir() -> - MnesiaDir = dir() ++ "/", - case filelib:ensure_dir(MnesiaDir) of - {error, Reason} -> - throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}}); - ok -> - ok - end. - -ensure_mnesia_running() -> - case mnesia:system_info(is_running) of - yes -> - ok; - starting -> - wait_for(mnesia_running), - ensure_mnesia_running(); - Reason when Reason =:= no; Reason =:= stopping -> - throw({error, mnesia_not_running}) - end. - -ensure_mnesia_not_running() -> - case mnesia:system_info(is_running) of - no -> - ok; - stopping -> - wait_for(mnesia_not_running), - ensure_mnesia_not_running(); - Reason when Reason =:= yes; Reason =:= starting -> - throw({error, mnesia_unexpectedly_running}) - end. - -ensure_schema_integrity() -> - case check_schema_integrity() of - ok -> - ok; - {error, Reason} -> - throw({error, {schema_integrity_check_failed, Reason}}) - end. - -check_schema_integrity() -> - Tables = mnesia:system_info(tables), - case check_tables(fun (Tab, TabDef) -> - case lists:member(Tab, Tables) of - false -> {error, {table_missing, Tab}}; - true -> check_table_attributes(Tab, TabDef) - end - end) of - ok -> ok = wait_for_tables(), - check_tables(fun check_table_content/2); - Other -> Other - end. - check_table_attributes(Tab, TabDef) -> {_, ExpAttrs} = proplists:lookup(attributes, TabDef), case mnesia:table_info(Tab, attributes) of @@ -433,153 +927,6 @@ check_tables(Fun) -> Errors -> {error, Errors} end. -%% The cluster node config file contains some or all of the disk nodes -%% that are members of the cluster this node is / should be a part of. -%% -%% If the file is absent, the list is empty, or only contains the -%% current node, then the current node is a standalone (disk) -%% node. Otherwise it is a node that is part of a cluster as either a -%% disk node, if it appears in the cluster node config, or ram node if -%% it doesn't. - -cluster_nodes_config_filename() -> - dir() ++ "/cluster_nodes.config". - -create_cluster_nodes_config(ClusterNodes) -> - FileName = cluster_nodes_config_filename(), - case rabbit_file:write_term_file(FileName, [ClusterNodes]) of - ok -> ok; - {error, Reason} -> - throw({error, {cannot_create_cluster_nodes_config, - FileName, Reason}}) - end. - -read_cluster_nodes_config() -> - FileName = cluster_nodes_config_filename(), - case rabbit_file:read_term_file(FileName) of - {ok, [ClusterNodes]} -> ClusterNodes; - {error, enoent} -> - {ok, ClusterNodes} = application:get_env(rabbit, cluster_nodes), - ClusterNodes; - {error, Reason} -> - throw({error, {cannot_read_cluster_nodes_config, - FileName, Reason}}) - end. - -delete_cluster_nodes_config() -> - FileName = cluster_nodes_config_filename(), - case file:delete(FileName) of - ok -> ok; - {error, enoent} -> ok; - {error, Reason} -> - throw({error, {cannot_delete_cluster_nodes_config, - FileName, Reason}}) - end. - -running_nodes_filename() -> - filename:join(dir(), "nodes_running_at_shutdown"). - -record_running_nodes() -> - FileName = running_nodes_filename(), - Nodes = running_clustered_nodes() -- [node()], - %% Don't check the result: we're shutting down anyway and this is - %% a best-effort-basis. - rabbit_file:write_term_file(FileName, [Nodes]), - ok. - -read_previously_running_nodes() -> - FileName = running_nodes_filename(), - case rabbit_file:read_term_file(FileName) of - {ok, [Nodes]} -> Nodes; - {error, enoent} -> []; - {error, Reason} -> throw({error, {cannot_read_previous_nodes_file, - FileName, Reason}}) - end. - -delete_previously_running_nodes() -> - FileName = running_nodes_filename(), - case file:delete(FileName) of - ok -> ok; - {error, enoent} -> ok; - {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file, - FileName, Reason}}) - end. - -init_db(ClusterNodes, Force) -> - init_db( - ClusterNodes, Force, - fun () -> - case rabbit_upgrade:maybe_upgrade_local() of - ok -> ok; - %% If we're just starting up a new node we won't have a - %% version - starting_from_scratch -> ok = rabbit_version:record_desired() - end - end). - -%% Take a cluster node config and create the right kind of node - a -%% standalone disk node, or disk or ram node connected to the -%% specified cluster nodes. If Force is false, don't allow -%% connections to offline nodes. -init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) -> - UClusterNodes = lists:usort(ClusterNodes), - ProperClusterNodes = UClusterNodes -- [node()], - case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of - {ok, []} when not Force andalso ProperClusterNodes =/= [] -> - throw({error, {failed_to_cluster_with, ProperClusterNodes, - "Mnesia could not connect to any disc nodes."}}); - {ok, Nodes} -> - WasDiscNode = is_disc_node(), - WantDiscNode = should_be_disc_node(ClusterNodes), - %% We create a new db (on disk, or in ram) in the first - %% two cases and attempt to upgrade the in the other two - case {Nodes, WasDiscNode, WantDiscNode} of - {[], _, false} -> - %% New ram node; start from scratch - ok = create_schema(ram); - {[], false, true} -> - %% Nothing there at all, start from scratch - ok = create_schema(disc); - {[], true, true} -> - %% We're the first node up - case rabbit_upgrade:maybe_upgrade_local() of - ok -> ensure_schema_integrity(); - version_not_available -> ok = schema_ok_or_move() - end; - {[AnotherNode|_], _, _} -> - %% Subsequent node in cluster, catch up - ensure_version_ok( - rpc:call(AnotherNode, rabbit_version, recorded, [])), - {CopyType, CopyTypeAlt} = - case WantDiscNode of - true -> {disc, disc_copies}; - false -> {ram, ram_copies} - end, - ok = wait_for_replicated_tables(), - ok = create_local_table_copy(schema, CopyTypeAlt), - ok = create_local_table_copies(CopyType), - - ok = SecondaryPostMnesiaFun(), - %% We've taken down mnesia, so ram nodes will need - %% to re-sync - case is_disc_node() of - false -> start_mnesia(), - mnesia:change_config(extra_db_nodes, - ProperClusterNodes), - wait_for_replicated_tables(); - true -> ok - end, - - ensure_schema_integrity(), - ok - end; - {error, Reason} -> - %% one reason we may end up here is if we try to join - %% nodes together that are currently running standalone or - %% are members of a different cluster - throw({error, {unable_to_join_cluster, ClusterNodes, Reason}}) - end. - schema_ok_or_move() -> case check_schema_integrity() of ok -> @@ -592,7 +939,7 @@ schema_ok_or_move() -> "and recreating schema from scratch~n", [Reason]), ok = move_db(), - ok = create_schema(disc) + ok = create_schema() end. ensure_version_ok({ok, DiscVersion}) -> @@ -604,25 +951,15 @@ ensure_version_ok({ok, DiscVersion}) -> ensure_version_ok({error, _}) -> ok = rabbit_version:record_desired(). -create_schema(Type) -> +%% We only care about disc nodes since ram nodes are supposed to catch up only +create_schema() -> stop_mnesia(), - case Type of - disc -> rabbit_misc:ensure_ok(mnesia:create_schema([node()]), - cannot_create_schema); - ram -> %% remove the disc schema since this is a ram node - rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), - cannot_delete_schema) - end, + rabbit_misc:ensure_ok(mnesia:create_schema([node()]), cannot_create_schema), start_mnesia(), - ok = create_tables(Type), + ok = create_tables(disc), ensure_schema_integrity(), ok = rabbit_version:record_desired(). -is_disc_node() -> mnesia:system_info(use_dir). - -should_be_disc_node(ClusterNodes) -> - ClusterNodes == [] orelse lists:member(node(), ClusterNodes). - move_db() -> stop_mnesia(), MnesiaDir = filename:dirname(dir() ++ "/"), @@ -644,25 +981,6 @@ move_db() -> start_mnesia(), ok. -copy_db(Destination) -> - ok = ensure_mnesia_not_running(), - rabbit_file:recursive_copy(dir(), Destination). - -create_tables() -> create_tables(disc). - -create_tables(Type) -> - lists:foreach(fun ({Tab, TabDef}) -> - TabDef1 = proplists:delete(match, TabDef), - case mnesia:create_table(Tab, TabDef1) of - {atomic, ok} -> ok; - {aborted, Reason} -> - throw({error, {table_creation_failed, - Tab, TabDef1, Reason}}) - end - end, - table_definitions(Type)), - ok. - copy_type_to_ram(TabDef) -> [{disc_copies, []}, {ram_copies, [node()]} | proplists:delete(ram_copies, proplists:delete(disc_copies, TabDef))]. @@ -711,122 +1029,111 @@ create_local_table_copy(Tab, Type) -> end, ok. -wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). - -wait_for_tables() -> wait_for_tables(table_names()). - -wait_for_tables(TableNames) -> - case mnesia:wait_for_tables(TableNames, 30000) of - ok -> - ok; - {timeout, BadTabs} -> - throw({error, {timeout_waiting_for_tables, BadTabs}}); - {error, Reason} -> - throw({error, {failed_waiting_for_tables, Reason}}) +remove_node_if_mnesia_running(Node) -> + case mnesia:system_info(is_running) of + yes -> %% Deleting the the schema copy of the node will result in the + %% node being removed from the cluster, with that change being + %% propagated to all nodes + case mnesia:del_table_copy(schema, Node) of + {atomic, ok} -> + rabbit_node_monitor:notify_left_cluster(Node), + ok; + {aborted, Reason} -> + {error, {failed_to_remove_node, Node, Reason}} + end; + no -> {error, mnesia_not_running} end. -reset(Force) -> - rabbit_misc:local_info_msg("Resetting Rabbit~s~n", - [if Force -> " forcefully"; - true -> "" - end]), - ensure_mnesia_not_running(), - case not Force andalso is_clustered() andalso - is_only_disc_node(node(), false) +leave_cluster() -> + case {is_clustered(), + running_nodes(ordsets:del_element(node(), all_clustered_nodes()))} of - true -> log_both("no other disc nodes running"); - false -> ok - end, - case Force of - true -> - disconnect_nodes(nodes()); - false -> - ensure_mnesia_dir(), - start_mnesia(), - {Nodes, RunningNodes} = - try - %% Force=true here so that reset still works when clustered - %% with a node which is down - ok = init_db(read_cluster_nodes_config(), true), - {all_clustered_nodes() -- [node()], - running_clustered_nodes() -- [node()]} - after - stop_mnesia() - end, - leave_cluster(Nodes, RunningNodes), - rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), - cannot_delete_schema), - disconnect_nodes(Nodes) - end, - ok = delete_cluster_nodes_config(), - %% remove persisted messages and any other garbage we find - ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")), - ok. - -%% We need to make sure that we don't end up in a distributed Erlang -%% system with nodes while not being in an Mnesia cluster with -%% them. We don't handle that well. -disconnect_nodes(Nodes) -> [erlang:disconnect_node(N) || N <- Nodes]. - -leave_cluster([], _) -> ok; -leave_cluster(Nodes, RunningNodes) -> - %% find at least one running cluster node and instruct it to - %% remove our schema copy which will in turn result in our node - %% being removed as a cluster node from the schema, with that - %% change being propagated to all nodes - case lists:any( - fun (Node) -> - case rpc:call(Node, mnesia, del_table_copy, - [schema, node()]) of - {atomic, ok} -> true; - {badrpc, nodedown} -> false; - {aborted, {node_not_running, _}} -> false; - {aborted, Reason} -> - throw({error, {failed_to_leave_cluster, - Nodes, RunningNodes, Reason}}) - end - end, - RunningNodes) of - true -> ok; - false -> throw({error, {no_running_cluster_nodes, - Nodes, RunningNodes}}) + {false, []} -> + ok; + {_, AllNodes} -> + case lists:any( + fun (Node) -> + case rpc:call(Node, rabbit_mnesia, + remove_node_if_mnesia_running, + [node()]) + of + ok -> + true; + {error, mnesia_not_running} -> + false; + {error, Reason} -> + throw({error, Reason}); + {badrpc, nodedown} -> + false + end + end, + AllNodes) + of + true -> ok; + false -> throw({error, + {no_running_cluster_nodes, + "You cannot leave a cluster if no online " + "nodes are present"}}) + end end. wait_for(Condition) -> error_logger:info_msg("Waiting for ~p...~n", [Condition]), timer:sleep(1000). -on_node_up(Node) -> - case is_only_disc_node(Node, true) of - true -> rabbit_log:info("cluster contains disc nodes again~n"); +start_mnesia(CheckConsistency) -> + case CheckConsistency of + true -> check_cluster_consistency(); false -> ok - end. - -on_node_down(Node) -> - case is_only_disc_node(Node, true) of - true -> rabbit_log:info("only running disc node went down~n"); - false -> ok - end. - -is_only_disc_node(Node, _MnesiaRunning = true) -> - RunningSet = sets:from_list(running_clustered_nodes()), - DiscSet = sets:from_list(nodes_of_type(disc_copies)), - [Node] =:= sets:to_list(sets:intersection(RunningSet, DiscSet)); -is_only_disc_node(Node, false) -> - start_mnesia(), - Res = is_only_disc_node(Node, true), - stop_mnesia(), - Res. - -log_both(Warning) -> - io:format("Warning: ~s~n", [Warning]), - rabbit_misc:with_local_io( - fun () -> error_logger:warning_msg("~s~n", [Warning]) end). - -start_mnesia() -> + end, rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), ensure_mnesia_running(). +start_mnesia() -> + start_mnesia(true). + stop_mnesia() -> stopped = mnesia:stop(), ensure_mnesia_not_running(). + +change_extra_db_nodes(ClusterNodes0, Force) -> + ClusterNodes = lists:usort(ClusterNodes0) -- [node()], + case mnesia:change_config(extra_db_nodes, ClusterNodes) of + {ok, []} when not Force andalso ClusterNodes =/= [] -> + throw({error, {failed_to_cluster_with, ClusterNodes, + "Mnesia could not connect to any disc nodes."}}); + {ok, Nodes} -> + Nodes + end. + +%% What we really want is nodes running rabbit, not running mnesia. Using +%% `rabbit_mnesia:system_info(running_db_nodes)' will return false positives +%% when we are actually just doing cluster operations (e.g. joining the +%% cluster). +running_nodes(Nodes) -> + {Replies, _BadNodes} = + rpc:multicall(Nodes, rabbit_mnesia, is_running_remote, []), + [Node || {Running, Node} <- Replies, Running]. + +is_running_remote() -> + {proplists:is_defined(rabbit, application:which_applications(infinity)), + node()}. + +check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) -> + ThisNode = node(), + case ordsets:is_element(ThisNode, RemoteAllNodes) of + true -> + {ok, RemoteStatus}; + false -> + {error, {inconsistent_cluster, + rabbit_misc:format("Node ~p thinks it's clustered " + "with node ~p, but ~p disagrees", + [ThisNode, Node, Node])}} + end. + +check_version_consistency(This, Remote, _) when This =:= Remote -> + ok; +check_version_consistency(This, Remote, Name) -> + {error, {inconsistent_cluster, + rabbit_misc:format("~s version mismatch: local node is ~s, " + "remote node ~s", [Name, This, Remote])}}. diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 323cf0ce..849f5d31 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -18,11 +18,27 @@ -behaviour(gen_server). --export([start_link/0]). +-export([prepare_cluster_status_file/0, + write_cluster_status_file/1, + read_cluster_status_file/0, + update_cluster_status_file/0, + reset_cluster_status_file/0, --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). --export([notify_cluster/0, rabbit_running_on/1]). + joined_cluster/2, + notify_joined_cluster/0, + left_cluster/1, + notify_left_cluster/1, + node_up/2, + notify_node_up/0, + + start_link/0, + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 + ]). -define(SERVER, ?MODULE). -define(RABBIT_UP_RPC_TIMEOUT, 2000). @@ -31,56 +47,194 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(rabbit_running_on/1 :: (node()) -> 'ok'). --spec(notify_cluster/0 :: () -> 'ok'). +-spec(prepare_cluster_status_file/0 :: () -> 'ok'). +-spec(write_cluster_status_file/1 :: (rabbit_mnesia:cluster_status()) + -> 'ok'). +-spec(read_cluster_status_file/0 :: () -> rabbit_mnesia:cluster_status()). +-spec(update_cluster_status_file/0 :: () -> 'ok'). +-spec(reset_cluster_status_file/0 :: () -> 'ok'). + +-spec(joined_cluster/2 :: (node(), boolean()) -> 'ok'). +-spec(notify_joined_cluster/0 :: () -> 'ok'). +-spec(left_cluster/1 :: (node()) -> 'ok'). +-spec(notify_left_cluster/1 :: (node()) -> 'ok'). +-spec(node_up/2 :: (node(), boolean()) -> 'ok'). +-spec(notify_node_up/0 :: () -> 'ok'). -endif. -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------- +%% Cluster file operations +%%---------------------------------------------------------------------------- -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +%% The cluster node status file contains all we need to know about the cluster: +%% +%% * All the clustered nodes +%% * The disc nodes +%% * The running nodes. +%% +%% If the current node is a disc node it will be included in the disc nodes +%% list. +%% +%% We strive to keep the file up to date and we rely on this assumption in +%% various situations. Obviously when mnesia is offline the information we have +%% will be outdated, but it can't be otherwise. -rabbit_running_on(Node) -> - gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node}). +cluster_status_file_filename() -> + rabbit_mnesia:dir() ++ "/cluster_nodes.config". -notify_cluster() -> - Node = node(), - Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node], - %% notify other rabbits of this rabbit - case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on, - [Node], ?RABBIT_UP_RPC_TIMEOUT) of - {_, [] } -> ok; - {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad]) - end, +prepare_cluster_status_file() -> + NotPresent = + fun (AllNodes0, WantDiscNode) -> + ThisNode = [node()], + + RunningNodes0 = legacy_read_previously_running_nodes(), + legacy_delete_previously_running_nodes(), + + RunningNodes = lists:usort(RunningNodes0 ++ ThisNode), + AllNodes = + lists:usort(AllNodes0 ++ RunningNodes), + DiscNodes = case WantDiscNode of + true -> ThisNode; + false -> [] + end, + + ok = write_cluster_status_file({AllNodes, DiscNodes, RunningNodes}) + end, + case try_read_cluster_status_file() of + {ok, _} -> + ok; + {error, {invalid_term, _, [AllNodes]}} -> + %% Legacy file + NotPresent(AllNodes, legacy_should_be_disc_node(AllNodes)); + {error, {cannot_read_file, _, enoent}} -> + {ok, {AllNodes, WantDiscNode}} = + application:get_env(rabbit, cluster_nodes), + NotPresent(AllNodes, WantDiscNode) + end. + + +write_cluster_status_file(Status) -> + FileName = cluster_status_file_filename(), + case rabbit_file:write_term_file(FileName, [Status]) of + ok -> ok; + {error, Reason} -> + throw({error, {cannot_write_cluster_status_file, + FileName, Reason}}) + end. + +try_read_cluster_status_file() -> + FileName = cluster_status_file_filename(), + case rabbit_file:read_term_file(FileName) of + {ok, [{_, _, _} = Status]} -> + {ok, Status}; + {ok, Term} -> + {error, {invalid_term, FileName, Term}}; + {error, Reason} -> + {error, {cannot_read_file, FileName, Reason}} + end. + +read_cluster_status_file() -> + case try_read_cluster_status_file() of + {ok, Status} -> + Status; + {error, Reason} -> + throw({error, {cannot_read_cluster_status_file, Reason}}) + end. + +update_cluster_status_file() -> + {ok, Status} = rabbit_mnesia:cluster_status_from_mnesia(), + write_cluster_status_file(Status). + +reset_cluster_status_file() -> + write_cluster_status_file({[node()], [node()], [node()]}). + +%%---------------------------------------------------------------------------- +%% Cluster notifications +%%---------------------------------------------------------------------------- + +joined_cluster(Node, IsDiscNode) -> + gen_server:cast(?SERVER, {rabbit_join, Node, IsDiscNode}). + +notify_joined_cluster() -> + cluster_multicall(joined_cluster, [node(), rabbit_mnesia:is_disc_node()]), + ok. + +left_cluster(Node) -> + gen_server:cast(?SERVER, {left_cluster, Node}). + +notify_left_cluster(Node) -> + left_cluster(Node), + cluster_multicall(left_cluster, [Node]), + ok. + +node_up(Node, IsDiscNode) -> + gen_server:cast(?SERVER, {node_up, Node, IsDiscNode}). + +notify_node_up() -> + Nodes = cluster_multicall(node_up, [node(), rabbit_mnesia:is_disc_node()]), %% register other active rabbits with this rabbit - [ rabbit_running_on(N) || N <- Nodes ], + [ node_up(N, ordsets:is_element(N, rabbit_mnesia:clustered_disc_nodes())) || + N <- Nodes ], ok. -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------- +%% gen_server callbacks +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). init([]) -> - {ok, ordsets:new()}. + {ok, no_state}. handle_call(_Request, _From, State) -> {noreply, State}. -handle_cast({rabbit_running_on, Node}, Nodes) -> - case ordsets:is_element(Node, Nodes) of - true -> {noreply, Nodes}; +%% Note: when updating the status file, we can't simply write the mnesia +%% information since the message can (and will) overtake the mnesia propagation. +handle_cast({node_up, Node, IsDiscNode}, State) -> + case is_already_monitored({rabbit, Node}) of + true -> {noreply, State}; false -> rabbit_log:info("rabbit on node ~p up~n", [Node]), + {AllNodes, DiscNodes, RunningNodes} = read_cluster_status_file(), + write_cluster_status_file( + {ordsets:add_element(Node, AllNodes), + case IsDiscNode of + true -> ordsets:add_element(Node, DiscNodes); + false -> DiscNodes + end, + ordsets:add_element(Node, RunningNodes)}), erlang:monitor(process, {rabbit, Node}), ok = handle_live_rabbit(Node), - {noreply, ordsets:add_element(Node, Nodes)} + {noreply, State} end; +handle_cast({joined_cluster, Node, IsDiscNode}, State) -> + {AllNodes, DiscNodes, RunningNodes} = read_cluster_status_file(), + write_cluster_status_file({ordsets:add_element(Node, AllNodes), + case IsDiscNode of + true -> ordsets:add_element(Node, + DiscNodes); + false -> DiscNodes + end, + RunningNodes}), + {noreply, State}; +handle_cast({left_cluster, Node}, State) -> + {AllNodes, DiscNodes, RunningNodes} = read_cluster_status_file(), + write_cluster_status_file({ordsets:del_element(Node, AllNodes), + ordsets:del_element(Node, DiscNodes), + ordsets:del_element(Node, RunningNodes)}), + {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Nodes) -> +handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) -> rabbit_log:info("rabbit on node ~p down~n", [Node]), + {AllNodes, DiscNodes, RunningNodes} = read_cluster_status_file(), + write_cluster_status_file({AllNodes, DiscNodes, + ordsets:del_element(Node, RunningNodes)}), ok = handle_dead_rabbit(Node), - {noreply, ordsets:del_element(Node, Nodes)}; + {noreply, State}; handle_info(_Info, State) -> {noreply, State}. @@ -90,7 +244,9 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------- +%% Functions that call the module specific hooks when nodes go up/down +%%---------------------------------------------------------------------------- %% TODO: This may turn out to be a performance hog when there are lots %% of nodes. We really only need to execute some of these statements @@ -104,3 +260,52 @@ handle_dead_rabbit(Node) -> handle_live_rabbit(Node) -> ok = rabbit_alarm:on_node_up(Node), ok = rabbit_mnesia:on_node_up(Node). + +%%-------------------------------------------------------------------- +%% Internal utils +%%-------------------------------------------------------------------- + +cluster_multicall(Fun, Args) -> + Node = node(), + Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node], + %% notify other rabbits of this cluster + case rpc:multicall(Nodes, rabbit_node_monitor, Fun, Args, + ?RABBIT_UP_RPC_TIMEOUT) of + {_, [] } -> ok; + {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad]) + end, + Nodes. + +is_already_monitored(Item) -> + {monitors, Monitors} = process_info(self(), monitors), + lists:any(fun ({_, Item1}) when Item =:= Item1 -> true; + (_) -> false + end, Monitors). + +legacy_should_be_disc_node(DiscNodes) -> + DiscNodes == [] orelse lists:member(node(), DiscNodes). + +%%-------------------------------------------------------------------- +%% Legacy functions related to the "running nodes" file +%%-------------------------------------------------------------------- + +legacy_running_nodes_filename() -> + filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown"). + +legacy_read_previously_running_nodes() -> + FileName = legacy_running_nodes_filename(), + case rabbit_file:read_term_file(FileName) of + {ok, [Nodes]} -> Nodes; + {error, enoent} -> []; + {error, Reason} -> throw({error, {cannot_read_previous_nodes_file, + FileName, Reason}}) + end. + +legacy_delete_previously_running_nodes() -> + FileName = legacy_running_nodes_filename(), + case file:delete(FileName) of + ok -> ok; + {error, enoent} -> ok; + {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file, + FileName, Reason}}) + end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index d3ca4264..7fbe6bab 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -32,6 +32,7 @@ -define(TIMEOUT, 5000). all_tests() -> + ok = setup_cluster(), passed = gm_tests:all_tests(), passed = mirrored_supervisor_tests:all_tests(), application:set_env(rabbit, file_handles_high_watermark, 10, infinity), @@ -52,34 +53,61 @@ all_tests() -> passed = test_log_management_during_startup(), passed = test_statistics(), passed = test_arguments_parser(), - passed = test_cluster_management(), passed = test_user_management(), passed = test_runtime_parameters(), passed = test_server_status(), passed = test_confirms(), - passed = maybe_run_cluster_dependent_tests(), + passed = + do_if_secondary_node( + fun run_cluster_dependent_tests/1, + fun (SecondaryNode) -> + io:format("Skipping cluster dependent tests with node ~p~n", + [SecondaryNode]), + passed + end), passed = test_configurable_server_properties(), passed. -maybe_run_cluster_dependent_tests() -> +do_if_secondary_node(Up, Down) -> SecondaryNode = rabbit_nodes:make("hare"), case net_adm:ping(SecondaryNode) of - pong -> passed = run_cluster_dependent_tests(SecondaryNode); - pang -> io:format("Skipping cluster dependent tests with node ~p~n", - [SecondaryNode]) - end, - passed. + pong -> Up(SecondaryNode); + pang -> Down(SecondaryNode) + end. -run_cluster_dependent_tests(SecondaryNode) -> - SecondaryNodeS = atom_to_list(SecondaryNode), +setup_cluster() -> + do_if_secondary_node( + fun (SecondaryNode) -> + cover:stop(SecondaryNode), + ok = control_action(stop_app, []), + %% 'cover' does not cope at all well with nodes disconnecting, + %% which happens as part of reset. So we turn it off + %% temporarily. That is ok even if we're not in general using + %% cover, it just turns the engine on / off and doesn't log + %% anything. Note that this way cover won't be on when joining + %% the cluster, but this is OK since we're testing the clustering + %% interface elsewere anyway. + cover:stop(nodes()), + ok = control_action(join_cluster, + [atom_to_list(SecondaryNode)]), + cover:start(nodes()), + ok = control_action(start_app, []), + ok = control_action(start_app, SecondaryNode, [], []) + end, + fun (_) -> ok end). - ok = control_action(stop_app, []), - ok = safe_reset(), - ok = control_action(cluster, [SecondaryNodeS]), - ok = control_action(start_app, []), - ok = control_action(start_app, SecondaryNode, [], []), +maybe_run_cluster_dependent_tests() -> + do_if_secondary_node( + fun (SecondaryNode) -> + passed = run_cluster_dependent_tests(SecondaryNode) + end, + fun (SecondaryNode) -> + io:format("Skipping cluster dependent tests with node ~p~n", + [SecondaryNode]) + end). +run_cluster_dependent_tests(SecondaryNode) -> io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]), passed = test_delegates_async(SecondaryNode), passed = test_delegates_sync(SecondaryNode), @@ -854,218 +882,6 @@ test_arguments_parser() -> passed. -test_cluster_management() -> - %% 'cluster' and 'reset' should only work if the app is stopped - {error, _} = control_action(cluster, []), - {error, _} = control_action(reset, []), - {error, _} = control_action(force_reset, []), - - ok = control_action(stop_app, []), - - %% various ways of creating a standalone node - NodeS = atom_to_list(node()), - ClusteringSequence = [[], - [NodeS], - ["invalid@invalid", NodeS], - [NodeS, "invalid@invalid"]], - - ok = control_action(reset, []), - lists:foreach(fun (Arg) -> - ok = control_action(force_cluster, Arg), - ok - end, - ClusteringSequence), - lists:foreach(fun (Arg) -> - ok = control_action(reset, []), - ok = control_action(force_cluster, Arg), - ok - end, - ClusteringSequence), - ok = control_action(reset, []), - lists:foreach(fun (Arg) -> - ok = control_action(force_cluster, Arg), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok - end, - ClusteringSequence), - lists:foreach(fun (Arg) -> - ok = control_action(reset, []), - ok = control_action(force_cluster, Arg), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok - end, - ClusteringSequence), - - %% convert a disk node into a ram node - ok = control_action(reset, []), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_disc_node(), - ok = control_action(force_cluster, ["invalid1@invalid", - "invalid2@invalid"]), - ok = assert_ram_node(), - - %% join a non-existing cluster as a ram node - ok = control_action(reset, []), - ok = control_action(force_cluster, ["invalid1@invalid", - "invalid2@invalid"]), - ok = assert_ram_node(), - - ok = control_action(reset, []), - - SecondaryNode = rabbit_nodes:make("hare"), - case net_adm:ping(SecondaryNode) of - pong -> passed = test_cluster_management2(SecondaryNode); - pang -> io:format("Skipping clustering tests with node ~p~n", - [SecondaryNode]) - end, - - ok = control_action(start_app, []), - passed. - -test_cluster_management2(SecondaryNode) -> - NodeS = atom_to_list(node()), - SecondaryNodeS = atom_to_list(SecondaryNode), - - %% make a disk node - ok = control_action(cluster, [NodeS]), - ok = assert_disc_node(), - %% make a ram node - ok = control_action(reset, []), - ok = control_action(cluster, [SecondaryNodeS]), - ok = assert_ram_node(), - - %% join cluster as a ram node - ok = safe_reset(), - ok = control_action(force_cluster, [SecondaryNodeS, "invalid1@invalid"]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_ram_node(), - - %% ram node will not start by itself - ok = control_action(stop_app, []), - ok = control_action(stop_app, SecondaryNode, [], []), - {error, _} = control_action(start_app, []), - ok = control_action(start_app, SecondaryNode, [], []), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - - %% change cluster config while remaining in same cluster - ok = control_action(force_cluster, ["invalid2@invalid", SecondaryNodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - - %% join non-existing cluster as a ram node - ok = control_action(force_cluster, ["invalid1@invalid", - "invalid2@invalid"]), - {error, _} = control_action(start_app, []), - ok = assert_ram_node(), - - %% join empty cluster as a ram node (converts to disc) - ok = control_action(cluster, []), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_disc_node(), - - %% make a new ram node - ok = control_action(reset, []), - ok = control_action(force_cluster, [SecondaryNodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_ram_node(), - - %% turn ram node into disk node - ok = control_action(cluster, [SecondaryNodeS, NodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_disc_node(), - - %% convert a disk node into a ram node - ok = assert_disc_node(), - ok = control_action(force_cluster, ["invalid1@invalid", - "invalid2@invalid"]), - ok = assert_ram_node(), - - %% make a new disk node - ok = control_action(force_reset, []), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_disc_node(), - - %% turn a disk node into a ram node - %% - %% can't use safe_reset here since for some reason nodes()==[] and - %% yet w/o stopping coverage things break - with_suspended_cover( - [SecondaryNode], fun () -> ok = control_action(reset, []) end), - ok = control_action(cluster, [SecondaryNodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_ram_node(), - - %% NB: this will log an inconsistent_database error, which is harmless - with_suspended_cover( - [SecondaryNode], fun () -> - true = disconnect_node(SecondaryNode), - pong = net_adm:ping(SecondaryNode) - end), - - %% leaving a cluster as a ram node - ok = safe_reset(), - %% ...and as a disk node - ok = control_action(cluster, [SecondaryNodeS, NodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = safe_reset(), - - %% attempt to leave cluster when no other node is alive - ok = control_action(cluster, [SecondaryNodeS, NodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, SecondaryNode, [], []), - ok = control_action(stop_app, []), - {error, {no_running_cluster_nodes, _, _}} = - control_action(reset, []), - - %% attempt to change type when no other node is alive - {error, {no_running_cluster_nodes, _, _}} = - control_action(cluster, [SecondaryNodeS]), - - %% leave system clustered, with the secondary node as a ram node - with_suspended_cover( - [SecondaryNode], fun () -> ok = control_action(force_reset, []) end), - ok = control_action(start_app, []), - %% Yes, this is rather ugly. But since we're a clustered Mnesia - %% node and we're telling another clustered node to reset itself, - %% we will get disconnected half way through causing a - %% badrpc. This never happens in real life since rabbitmqctl is - %% not a clustered Mnesia node and is a hidden node. - with_suspended_cover( - [SecondaryNode], - fun () -> - {badrpc, nodedown} = - control_action(force_reset, SecondaryNode, [], []), - pong = net_adm:ping(SecondaryNode) - end), - ok = control_action(cluster, SecondaryNode, [NodeS], []), - ok = control_action(start_app, SecondaryNode, [], []), - - passed. - -%% 'cover' does not cope at all well with nodes disconnecting, which -%% happens as part of reset. So we turn it off temporarily. That is ok -%% even if we're not in general using cover, it just turns the engine -%% on / off and doesn't log anything. -safe_reset() -> with_suspended_cover( - nodes(), fun () -> control_action(reset, []) end). - -with_suspended_cover(Nodes, Fun) -> - cover:stop(Nodes), - Res = Fun(), - cover:start(Nodes), - Res. - test_user_management() -> %% lots if stuff that should fail diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index e1a7bcae..c43ce236 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -121,10 +121,7 @@ remove_backup() -> info("upgrades: Mnesia backup removed~n", []). maybe_upgrade_mnesia() -> - %% rabbit_mnesia:all_clustered_nodes/0 will return [] at this point - %% if we are a RAM node since Mnesia has not started yet. - AllNodes = lists:usort(rabbit_mnesia:all_clustered_nodes() ++ - rabbit_mnesia:read_cluster_nodes_config()), + AllNodes = rabbit_mnesia:all_clustered_nodes(), case rabbit_version:upgrades_required(mnesia) of {error, starting_from_scratch} -> ok; @@ -150,19 +147,17 @@ maybe_upgrade_mnesia() -> upgrade_mode(AllNodes) -> case nodes_running(AllNodes) of [] -> - AfterUs = rabbit_mnesia:read_previously_running_nodes(), + AfterUs = rabbit_mnesia:running_clustered_nodes() -- [node()], case {is_disc_node_legacy(), AfterUs} of {true, []} -> primary; {true, _} -> - Filename = rabbit_mnesia:running_nodes_filename(), + %% TODO: Here I'm assuming that the various cluster status + %% files are consistent with each other, I think I can + %% provide a solution if they're not... die("Cluster upgrade needed but other disc nodes shut " "down after this one.~nPlease first start the last " - "disc node to shut down.~n~nNote: if several disc " - "nodes were shut down simultaneously they may " - "all~nshow this message. In which case, remove " - "the lock file on one of them and~nstart that node. " - "The lock file on this node is:~n~n ~s ", [Filename]); + "disc node to shut down.~n", []); {false, _} -> die("Cluster upgrade needed but this is a ram node.~n" "Please first start the last disc node to shut down.", @@ -222,15 +217,8 @@ secondary_upgrade(AllNodes) -> IsDiscNode = is_disc_node_legacy(), rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), cannot_delete_schema), - %% Note that we cluster with all nodes, rather than all disc nodes - %% (as we can't know all disc nodes at this point). This is safe as - %% we're not writing the cluster config, just setting up Mnesia. - ClusterNodes = case IsDiscNode of - true -> AllNodes; - false -> AllNodes -- [node()] - end, rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - ok = rabbit_mnesia:init_db(ClusterNodes, true, fun () -> ok end), + ok = rabbit_mnesia:init_db(AllNodes, IsDiscNode, true), ok = rabbit_version:record_desired_for_scope(mnesia), ok. |