diff options
-rw-r--r-- | ebin/rabbit_app.in | 2 | ||||
-rw-r--r-- | src/rabbit.erl | 9 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 41 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 6 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 1194 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 244 | ||||
-rw-r--r-- | src/rabbit_upgrade.erl | 26 |
7 files changed, 815 insertions, 707 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index ffe112a0..9c0ed448 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -32,7 +32,7 @@ {default_user_tags, [administrator]}, {default_vhost, <<"/">>}, {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, - {cluster_nodes, []}, + {cluster_nodes, {[], true}}, {server_properties, []}, {collect_statistics, none}, {collect_statistics_interval, 5000}, diff --git a/src/rabbit.erl b/src/rabbit.erl index fda489fe..0d2e27b9 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -300,6 +300,7 @@ start() -> %% We do not want to HiPE compile or upgrade %% mnesia after just restarting the app ok = ensure_application_loaded(), + ok = rabbit_mnesia:prepare(), ok = ensure_working_log_handlers(), ok = app_utils:start_applications(app_startup_order()), ok = print_plugin_info(rabbit_plugins:active()) @@ -308,6 +309,7 @@ start() -> boot() -> start_it(fun() -> ok = ensure_application_loaded(), + ok = rabbit_mnesia:prepare(), maybe_hipe_compile(), ok = ensure_working_log_handlers(), ok = rabbit_upgrade:maybe_upgrade_mnesia(), @@ -408,7 +410,6 @@ start(normal, []) -> end. stop(_State) -> - ok = rabbit_mnesia:record_running_nodes(), terminated_ok = error_logger:delete_report_handler(rabbit_error_logger), ok = rabbit_alarm:stop(), ok = case rabbit_mnesia:is_clustered() of @@ -505,12 +506,12 @@ sort_boot_steps(UnsortedSteps) -> end. boot_step_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) -> + AllNodes = rabbit_mnesia:all_clustered_nodes(), {Err, Nodes} = - case rabbit_mnesia:read_previously_running_nodes() of + case AllNodes -- [node()] of [] -> {"Timeout contacting cluster nodes. Since RabbitMQ was" " shut down forcefully~nit cannot determine which nodes" - " are timing out. Details on all nodes will~nfollow.~n", - rabbit_mnesia:all_clustered_nodes() -- [node()]}; + " are timing out.~n", []}; Ns -> {rabbit_misc:format( "Timeout contacting cluster nodes: ~p.~n", [Ns]), Ns} diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 2e163cfb..d927206b 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -25,10 +25,12 @@ -define(QUIET_OPT, "-q"). -define(NODE_OPT, "-n"). -define(VHOST_OPT, "-p"). +-define(RAM_OPT, "--ram"). -define(QUIET_DEF, {?QUIET_OPT, flag}). -define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}). -define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}). +-define(RAM_DEF, {?RAM_OPT, flag}). -define(GLOBAL_DEFS(Node), [?QUIET_DEF, ?NODE_DEF(Node)]). @@ -41,8 +43,10 @@ force_reset, rotate_logs, - cluster, - force_cluster, + {join_cluster, [?RAM_DEF]}, + change_node_type, + recluster, + remove_node, cluster_status, add_user, @@ -234,17 +238,28 @@ action(force_reset, Node, [], _Opts, Inform) -> Inform("Forcefully resetting node ~p", [Node]), call(Node, {rabbit_mnesia, force_reset, []}); -action(cluster, Node, ClusterNodeSs, _Opts, Inform) -> - ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), - Inform("Clustering node ~p with ~p", - [Node, ClusterNodes]), - rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]); - -action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) -> - ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), - Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)", - [Node, ClusterNodes]), - rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]); +action(join_cluster, Node, [ClusterNodeS], Opts, Inform) -> + ClusterNode = list_to_atom(ClusterNodeS), + DiscNode = not proplists:get_bool(?RAM_OPT, Opts), + Inform("Clustering node ~p with ~p", [Node, ClusterNode]), + rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, DiscNode]); + +action(change_node_type, Node, ["ram"], _Opts, Inform) -> + Inform("Turning ~p into a ram node", [Node]), + rpc_call(Node, rabbit_mnesia, change_node_type, [ram]); +action(change_node_type, Node, ["disc"], _Opts, Inform) -> + Inform("Turning ~p into a disc node", [Node]), + rpc_call(Node, rabbit_mnesia, change_node_type, [disc]); + +action(recluster, Node, [ClusterNodeS], _Opts, Inform) -> + ClusterNode = list_to_atom(ClusterNodeS), + Inform("Re-clustering ~p with ~p", [Node, ClusterNode]), + rpc_call(Node, rabbit_mnesia, recluster, [ClusterNode]); + +action(remove_node, Node, [ClusterNodeS], _Opts, Inform) -> + ClusterNode = list_to_atom(ClusterNodeS), + Inform("Removing node ~p from cluster", [ClusterNode]), + rpc_call(Node, rabbit_mnesia, remove_node, [ClusterNode]); action(wait, Node, [PidFile], _Opts, Inform) -> Inform("Waiting for ~p", [Node]), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index d41aa09b..5bfb7de0 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -60,6 +60,7 @@ -export([multi_call/2]). -export([os_cmd/1]). -export([gb_sets_difference/2]). +-export([rabbit_version/0]). %%---------------------------------------------------------------------------- @@ -212,6 +213,7 @@ ([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}). -spec(os_cmd/1 :: (string()) -> string()). -spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()). +-spec(rabbit_version/0 :: () -> string()). -endif. @@ -939,3 +941,7 @@ os_cmd(Command) -> gb_sets_difference(S1, S2) -> gb_sets:fold(fun gb_sets:delete_any/2, S1, S2). + +rabbit_version() -> + {ok, VSN} = application:get_key(rabbit, vsn), + VSN. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 7e9346f9..7aac7f3f 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -17,16 +17,40 @@ -module(rabbit_mnesia). --export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, - cluster/1, force_cluster/1, reset/0, force_reset/0, init_db/3, - is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, - empty_ram_only_tables/0, copy_db/1, wait_for_tables/1, - create_cluster_nodes_config/1, read_cluster_nodes_config/0, - record_running_nodes/0, read_previously_running_nodes/0, - running_nodes_filename/0, is_disc_node/0, on_node_down/1, - on_node_up/1]). - --export([table_names/0]). +-export([prepare/0, + init/0, + join_cluster/2, + reset/0, + force_reset/0, + recluster/1, + change_node_type/1, + remove_node/1, + + status/0, + is_db_empty/0, + is_clustered/0, + all_clustered_nodes/0, + all_clustered_disc_nodes/0, + running_clustered_nodes/0, + is_disc_node/0, + dir/0, + table_names/0, + wait_for_tables/1, + + init_db/3, + empty_ram_only_tables/0, + copy_db/1, + wait_for_tables/0, + + on_node_up/1, + on_node_down/1 + ]). + +%% Used internally in rpc calls +-export([cluster_status_if_running/0, + node_info/0, + remove_node_if_mnesia_running/1 + ]). %% create_tables/0 exported for helping embed RabbitMQ in or alongside %% other mnesia-using Erlang applications, such as ejabberd @@ -40,145 +64,153 @@ -export_type([node_type/0]). --type(node_type() :: disc_only | disc | ram | unknown). --spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} | - {'running_nodes', [node()]}]). --spec(dir/0 :: () -> file:filename()). --spec(ensure_mnesia_dir/0 :: () -> 'ok'). +-type(node_type() :: disc | ram). +-type(node_status() :: {[node()], [node()], [node()]}). + +%% Main interface +-spec(prepare/0 :: () -> 'ok'). -spec(init/0 :: () -> 'ok'). --spec(init_db/3 :: ([node()], boolean(), rabbit_misc:thunk('ok')) -> 'ok'). --spec(is_db_empty/0 :: () -> boolean()). --spec(cluster/1 :: ([node()]) -> 'ok'). --spec(force_cluster/1 :: ([node()]) -> 'ok'). --spec(cluster/2 :: ([node()], boolean()) -> 'ok'). +-spec(join_cluster/2 :: ([node()], boolean()) -> 'ok'). -spec(reset/0 :: () -> 'ok'). -spec(force_reset/0 :: () -> 'ok'). +-spec(recluster/1 :: (node()) -> 'ok'). +-spec(change_node_type/1 :: (node_type()) -> 'ok'). +-spec(remove_node/1 :: (node()) -> 'ok'). + +%% Various queries to get the status of the db +-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} | + {'running_nodes', [node()]}]). +-spec(is_db_empty/0 :: () -> boolean()). -spec(is_clustered/0 :: () -> boolean()). --spec(running_clustered_nodes/0 :: () -> [node()]). -spec(all_clustered_nodes/0 :: () -> [node()]). +-spec(all_clustered_disc_nodes/0 :: () -> [node()]). +-spec(running_clustered_nodes/0 :: () -> [node()]). +-spec(is_disc_node/0 :: () -> boolean()). +-spec(dir/0 :: () -> file:filename()). +-spec(table_names/0 :: () -> [atom()]). + +%% Operations on the db and utils, mainly used in `rabbit_upgrade' and `rabbit' +-spec(init_db/3 :: ([node()], boolean(), boolean()) -> 'ok'). -spec(empty_ram_only_tables/0 :: () -> 'ok'). -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). -spec(wait_for_tables/1 :: ([atom()]) -> 'ok'). --spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok'). --spec(read_cluster_nodes_config/0 :: () -> [node()]). --spec(record_running_nodes/0 :: () -> 'ok'). --spec(read_previously_running_nodes/0 :: () -> [node()]). --spec(running_nodes_filename/0 :: () -> file:filename()). --spec(is_disc_node/0 :: () -> boolean()). + +%% Hooks used in `rabbit_node_monitor' -spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). --spec(table_names/0 :: () -> [atom()]). +%% Functions used in internal rpc calls +-spec(cluster_status_if_running/0 :: () -> {'ok', node_status()} | 'error'). +-spec(node_info/0 :: () -> {string(), string(), + ({'ok', node_status()} | 'error')}). +-spec(remove_node_if_mnesia_running/1 :: (node()) -> 'ok' | + {'error', term()}). -endif. %%---------------------------------------------------------------------------- +%% Main interface +%%---------------------------------------------------------------------------- -status() -> - [{nodes, case mnesia:system_info(is_running) of - yes -> [{Key, Nodes} || - {Key, CopyType} <- [{disc_only, disc_only_copies}, - {disc, disc_copies}, - {ram, ram_copies}], - begin - Nodes = nodes_of_type(CopyType), - Nodes =/= [] - end]; - no -> case all_clustered_nodes() of - [] -> []; - Nodes -> [{unknown, Nodes}] - end; - Reason when Reason =:= starting; Reason =:= stopping -> - exit({rabbit_busy, try_again_later}) - end}, - {running_nodes, running_clustered_nodes()}]. +%% Sets up the cluster status file when needed, taking care of the legacy +%% files +prepare() -> + ensure_mnesia_dir(), + NotPresent = + fun (AllNodes0, WantDiscNode) -> + ThisNode = [node()], + + RunningNodes0 = legacy_read_previously_running_nodes(), + legacy_delete_previously_running_nodes(), + + RunningNodes = lists:usort(RunningNodes0 ++ ThisNode), + AllNodes = + lists:usort(AllNodes0 ++ RunningNodes), + DiscNodes = case WantDiscNode of + true -> ThisNode; + false -> [] + end, + + ok = write_cluster_nodes_status({AllNodes, DiscNodes, RunningNodes}) + end, + case try_read_cluster_nodes_status() of + {ok, _} -> + %% We check the consistency only when the cluster status exists, + %% since when it doesn't exist it means that we just started a fresh + %% node, and when we have a legacy node with an old + %% "cluster_nodes.config" we can't check the consistency anyway + check_cluster_consistency(), + ok; + {error, {invalid_term, _, [AllNodes]}} -> + %% Legacy file + NotPresent(AllNodes, should_be_disc_node(AllNodes)); + {error, {cannot_read_file, _, enoent}} -> + {ok, {AllNodes, WantDiscNode}} = + application:get_env(rabbit, cluster_nodes), + NotPresent(AllNodes, WantDiscNode) + end. init() -> ensure_mnesia_running(), ensure_mnesia_dir(), - Nodes = read_cluster_nodes_config(), - ok = init_db(Nodes, should_be_disc_node(Nodes)), + ok = reinit_db(should_be_disc_node(all_clustered_disc_nodes())), %% We intuitively expect the global name server to be synced when %% Mnesia is up. In fact that's not guaranteed to be the case - let's %% make it so. ok = global:sync(), - ok = delete_previously_running_nodes(), ok. -is_db_empty() -> - lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end, - table_names()). +%% Make the node join a cluster. The node will be reset automatically before we +%% actually cluster it. The nodes provided will be used to find out about the +%% nodes in the cluster. +%% This function will fail if: +%% +%% * The node is currently the only disc node of its cluster +%% * We can't connect to any of the nodes provided +%% * The node is currently already clustered with the cluster of the nodes +%% provided +%% +%% Note that we make no attempt to verify that the nodes provided are all in the +%% same cluster, we simply pick the first online node and we cluster to its +%% cluster. +join_cluster(DiscoveryNode, WantDiscNode) -> + case is_disc_and_clustered() andalso is_only_disc_node(node()) of + true -> throw({error, + {standalone_ram_node, + "You can't cluster a node if it's the only " + "disc node in its existing cluster. If new nodes " + "joined while this node was offline, use \"recluster\" " + "to add them manually"}}); + _ -> ok + end, -cluster(ClusterNodes) -> - cluster(ClusterNodes, false). -force_cluster(ClusterNodes) -> - cluster(ClusterNodes, true). - -%% Alter which disk nodes this node is clustered with. This can be a -%% subset of all the disk nodes in the cluster but can (and should) -%% include the node itself if it is to be a disk rather than a ram -%% node. If Force is false, only connections to online nodes are -%% allowed. -cluster(ClusterNodes, Force) -> - rabbit_misc:local_info_msg("Clustering with ~p~s~n", - [ClusterNodes, if Force -> " forcefully"; - true -> "" - end]), ensure_mnesia_not_running(), ensure_mnesia_dir(), - case not Force andalso is_clustered() andalso - is_only_disc_node(node(), false) andalso - not should_be_disc_node(ClusterNodes) - of - true -> log_both("last running disc node leaving cluster"); - _ -> ok - end, + {ClusterNodes, DiscNodes, _} = case discover_cluster(DiscoveryNode) of + {ok, Res} -> Res; + {error, Reason} -> throw({error, Reason}) + end, - %% Wipe mnesia if we're changing type from disc to ram - case {is_disc_node(), should_be_disc_node(ClusterNodes)} of - {true, false} -> rabbit_misc:with_local_io( - fun () -> error_logger:warning_msg( - "changing node type; wiping " - "mnesia...~n~n") - end), - rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), - cannot_delete_schema); - _ -> ok + case lists:member(node(), ClusterNodes) of + true -> throw({error, {already_clustered, + "You are already clustered with the nodes you " + "have selected"}}); + false -> ok end, - %% Pre-emptively leave the cluster - %% - %% We're trying to handle the following two cases: - %% 1. We have a two-node cluster, where both nodes are disc nodes. - %% One node is re-clustered as a ram node. When it tries to - %% re-join the cluster, but before it has time to update its - %% tables definitions, the other node will order it to re-create - %% its disc tables. So, we need to leave the cluster before we - %% can join it again. - %% 2. We have a two-node cluster, where both nodes are disc nodes. - %% One node is forcefully reset (so, the other node thinks its - %% still a part of the cluster). The reset node is re-clustered - %% as a ram node. Same as above, we need to leave the cluster - %% before we can join it. But, since we don't know if we're in a - %% cluster or not, we just pre-emptively leave it before joining. - ProperClusterNodes = ClusterNodes -- [node()], - try - ok = leave_cluster(ProperClusterNodes, ProperClusterNodes) - catch - {error, {no_running_cluster_nodes, _, _}} when Force -> - ok - end, + %% reset the node. this simplifies things and it will be needed in this case + %% - we're joining a new cluster with new nodes which are not in synch with + %% the current node. I also lifts the burden of reseting the node from the + %% user. + reset(false), + + rabbit_misc:local_info_msg("Clustering with ~p~s~n", [ClusterNodes]), %% Join the cluster - start_mnesia(), - try - ok = init_db(ClusterNodes, Force), - ok = create_cluster_nodes_config(ClusterNodes) - after - stop_mnesia() - end, + ok = init_db_and_upgrade(DiscNodes, WantDiscNode, false), + stop_mnesia(), ok. @@ -188,15 +220,332 @@ cluster(ClusterNodes, Force) -> reset() -> reset(false). force_reset() -> reset(true). +reset(Force) -> + rabbit_misc:local_info_msg("Resetting Rabbit~s~n", + [if Force -> " forcefully"; + true -> "" + end]), + ensure_mnesia_not_running(), + case not Force andalso is_disc_and_clustered() andalso + is_only_disc_node(node()) + of + true -> throw({error, {standalone_ram_node, + "You can't reset a node if it's the only disc " + "node in a cluster. Please convert another node" + " of the cluster to a disc node first."}}); + false -> ok + end, + Node = node(), + case Force of + true -> + ok; + false -> + %% Reconnecting so that we will get an up to date nodes + try + %% Force=true here so that reset still works when + %% clustered with a node which is down + ok = reinit_db(true) + after + stop_mnesia() + end, + leave_cluster(), + rabbit_misc:ensure_ok(mnesia:delete_schema([Node]), + cannot_delete_schema) + end, + %% We need to make sure that we don't end up in a distributed Erlang system + %% with nodes while not being in an Mnesia cluster with them. We don't + %% handle that well. + [erlang:disconnect_node(N) || N <- all_clustered_nodes()], + %% remove persisted messages and any other garbage we find + ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")), + ok = write_cluster_nodes_status(initial_cluster_status()), + ok. + +change_node_type(Type) -> + ensure_mnesia_dir(), + ensure_mnesia_not_running(), + case is_clustered() of + false -> throw({error, {not_clustered, + "Non-clustered nodes can only be disc nodes"}}); + true -> ok + end, + + DiscoveryNodes = all_clustered_nodes(), + ClusterNodes = + case discover_cluster(DiscoveryNodes) of + {ok, {ClusterNodes0, _, _}} -> + ClusterNodes0; + {error, _Reason} -> + throw({error, + {cannot_connect_to_cluster, + "Could not connect to the cluster nodes present in " + "this node status file. If the cluster has changed, " + "you can use the \"recluster\" command to point to the " + "new cluster nodes"}}) + end, + + WantDiscNode = case Type of + ram -> false; + disc -> true + end, + + ok = init_db_and_upgrade(ClusterNodes, WantDiscNode, false), + stop_mnesia(), + + ok. + +recluster(DiscoveryNode) -> + ensure_mnesia_not_running(), + ensure_mnesia_dir(), + + ClusterNodes = + case discover_cluster(DiscoveryNode) of + {ok, {ClusterNodes0, _, _}} -> + ClusterNodes0; + {error, _Reason} -> + throw({error, + {cannot_connect_to_node, + "Could not connect to the cluster node provided"}}) + end, + + case lists:member(node(), ClusterNodes) of + true -> init_db_and_upgrade(ClusterNodes, is_disc_node(), false); + false -> throw({error, + {inconsistent_cluster, + "The nodes provided do not have this node as part of " + "the cluster"}}) + end, + stop_mnesia(), + + ok. + +%% We proceed like this: try to remove the node locally. If mnesia is offline +%% then we try to remove it remotely on some other node. If there are no other +%% nodes running, then *if the current node is a disk node* we force-load mnesia +%% and remove the node. +remove_node(Node) -> + case remove_node_if_mnesia_running(Node) of + ok -> + ok; + {error, mnesia_not_running} -> + case remove_node_remotely(Node) of + ok -> + ok; + {error, no_running_cluster_nodes} -> + case is_disc_node() of + false -> + throw({error, + {removing_node_from_ram_node, + "There are no nodes running and this is a " + "RAM node"}}); + true -> + start_mnesia(), + try + [mnesia:force_load_table(T) || + T <- rabbit_mnesia:table_names()], + remove_node(Node), + ensure_mnesia_running() + after + stop_mnesia() + end + end + end; + {error, Reason} -> + throw({error, Reason}) + end. + +%%---------------------------------------------------------------------------- +%% Queries +%%---------------------------------------------------------------------------- + +status() -> + IfNonEmpty = fun (_, []) -> []; + (Type, Nodes) -> [{Type, Nodes}] + end, + [{nodes, (IfNonEmpty(disc, all_clustered_disc_nodes()) ++ + IfNonEmpty(ram, all_clustered_ram_nodes()))}, + {running_nodes, running_clustered_nodes()}]. + +is_db_empty() -> + lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end, + table_names()). + is_clustered() -> - RunningNodes = running_clustered_nodes(), - [node()] /= RunningNodes andalso [] /= RunningNodes. + Nodes = all_clustered_nodes(), + [node()] /= Nodes andalso [] /= Nodes. + +is_disc_and_clustered() -> + is_disc_node() andalso is_clustered(). + +%% Functions that retrieve the nodes in the cluster completely rely on the +%% cluster status file. For obvious reason, if rabbit is down, they might return +%% out of date information. all_clustered_nodes() -> - mnesia:system_info(db_nodes). + {AllNodes, _, _} = read_cluster_nodes_status(), + AllNodes. + +all_clustered_disc_nodes() -> + {_, DiscNodes, _} = read_cluster_nodes_status(), + DiscNodes. + +all_clustered_ram_nodes() -> + {AllNodes, DiscNodes, _} = read_cluster_nodes_status(), + sets:to_list(sets:subtract(sets:from_list(AllNodes), + sets:from_list(DiscNodes))). running_clustered_nodes() -> - mnesia:system_info(running_db_nodes). + {_, _, RunningNodes} = read_cluster_nodes_status(), + RunningNodes. + +running_clustered_disc_nodes() -> + {_, DiscNodes, RunningNodes} = read_cluster_nodes_status(), + sets:to_list(sets:intersection(sets:from_list(DiscNodes), + sets:from_list(RunningNodes))). + +%% This function is a bit different, we want it to return correctly only when +%% the node is actually online. This is because when we discover the nodes we +%% want online, "working" nodes only. +cluster_status_if_running() -> + case mnesia:system_info(is_running) of + no -> error; + yes -> {ok, {mnesia:system_info(db_nodes), + mnesia:table_info(schema, disc_copies), + mnesia:system_info(running_db_nodes)}} + end. + +node_info() -> + {erlang:system_info(otp_release), rabbit_misc:rabbit_version(), + cluster_status_if_running()}. + +is_disc_node() -> mnesia:system_info(use_dir). + +dir() -> mnesia:system_info(directory). + +table_names() -> + [Tab || {Tab, _} <- table_definitions()]. + +%%---------------------------------------------------------------------------- +%% Operations on the db +%%---------------------------------------------------------------------------- + +%% Starts mnesia if necessary, adds the provided nodes to the mnesia cluster, +%% creating a new schema if there is the need to and catching up if there are +%% other nodes in the cluster already. It also updates the cluster status file. +init_db(ClusterNodes, WantDiscNode, Force) -> + case mnesia:system_info(is_running) of + yes -> ok; + no -> start_mnesia() + end, + case change_extra_db_nodes(ClusterNodes, Force) of + {error, Reason} -> + throw({error, Reason}); + {ok, Nodes} -> + WasDiscNode = is_disc_node(), + case {Nodes, WasDiscNode, WantDiscNode} of + {[], _, false} -> + %% Standalone ram node, we don't want that + throw({error, cannot_create_standalone_ram_node}); + {[], false, true} -> + %% RAM -> disc, starting from scratch + ok = create_schema(); + {[], true, true} -> + %% First disc node up + ok; + {[AnotherNode | _], _, _} -> + %% Subsequent node in cluster, catch up + ensure_version_ok( + rpc:call(AnotherNode, rabbit_version, recorded, [])), + ok = wait_for_replicated_tables(), + %% The sequence in which we delete the schema and then the + %% other tables is important: if we delete the schema first + %% when moving to RAM mnesia will loudly complain since it + %% doesn't make much sense to do that. But when moving to + %% disc, we need to move the schema first. + case WantDiscNode of + true -> create_local_table_copy(schema, disc_copies), + create_local_table_copies(disc); + false -> create_local_table_copies(ram), + create_local_table_copy(schema, ram_copies) + end + end, + ensure_schema_integrity(), + update_cluster_nodes_status(), + ok + end. + +init_db_and_upgrade(ClusterNodes, WantDiscNode, Force) -> + ok = init_db(ClusterNodes, WantDiscNode, Force), + ok = case rabbit_upgrade:maybe_upgrade_local() of + ok -> ok; + starting_from_scratch -> rabbit_version:record_desired(); + version_not_available -> schema_ok_or_move() + end, + %% `maybe_upgrade_local' restarts mnesia, so ram nodes will forget about the + %% cluster + case WantDiscNode of + false -> start_mnesia(), + {ok, _} = change_extra_db_nodes(ClusterNodes, true), + wait_for_replicated_tables(); + true -> ok + end, + ok. + +reinit_db(Force) -> + {AllNodes, DiscNodes, _} = read_cluster_nodes_status(), + init_db_and_upgrade(AllNodes, should_be_disc_node(DiscNodes), Force). + +ensure_mnesia_dir() -> + MnesiaDir = dir() ++ "/", + case filelib:ensure_dir(MnesiaDir) of + {error, Reason} -> + throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}}); + ok -> + ok + end. + +ensure_mnesia_running() -> + case mnesia:system_info(is_running) of + yes -> + ok; + starting -> + wait_for(mnesia_running), + ensure_mnesia_running(); + Reason when Reason =:= no; Reason =:= stopping -> + throw({error, mnesia_not_running}) + end. + +ensure_mnesia_not_running() -> + case mnesia:system_info(is_running) of + no -> + ok; + stopping -> + wait_for(mnesia_not_running), + ensure_mnesia_not_running(); + Reason when Reason =:= yes; Reason =:= starting -> + throw({error, mnesia_unexpectedly_running}) + end. + +ensure_schema_integrity() -> + case check_schema_integrity() of + ok -> + ok; + {error, Reason} -> + throw({error, {schema_integrity_check_failed, Reason}}) + end. + +check_schema_integrity() -> + Tables = mnesia:system_info(tables), + case check_tables(fun (Tab, TabDef) -> + case lists:member(Tab, Tables) of + false -> {error, {table_missing, Tab}}; + true -> check_table_attributes(Tab, TabDef) + end + end) of + ok -> ok = wait_for_tables(), + check_tables(fun check_table_content/2); + Other -> Other + end. empty_ram_only_tables() -> Node = node(), @@ -209,13 +558,188 @@ empty_ram_only_tables() -> end, table_names()), ok. +create_tables() -> create_tables(disc). + +create_tables(Type) -> + lists:foreach(fun ({Tab, TabDef}) -> + TabDef1 = proplists:delete(match, TabDef), + case mnesia:create_table(Tab, TabDef1) of + {atomic, ok} -> ok; + {aborted, Reason} -> + throw({error, {table_creation_failed, + Tab, TabDef1, Reason}}) + end + end, + table_definitions(Type)), + ok. + +copy_db(Destination) -> + ok = ensure_mnesia_not_running(), + rabbit_file:recursive_copy(dir(), Destination). + +wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). + +wait_for_tables() -> wait_for_tables(table_names()). + +wait_for_tables(TableNames) -> + case mnesia:wait_for_tables(TableNames, 30000) of + ok -> + ok; + {timeout, BadTabs} -> + throw({error, {timeout_waiting_for_tables, BadTabs}}); + {error, Reason} -> + throw({error, {failed_waiting_for_tables, Reason}}) + end. + +should_be_disc_node(DiscNodes) -> + DiscNodes == [] orelse lists:member(node(), DiscNodes). + +%% This does not guarantee us much, but it avoids some situations that will +%% definitely end up badly +check_cluster_consistency() -> + CheckVsn = fun (This, This, _) -> + ok; + (This, Remote, Name) -> + throw({error, + {inconsistent_cluster, + rabbit_misc:format( + "~s version mismatch: local node is ~s, " + "remote node ~s", [Name, This, Remote])}}) + end, + CheckOTP = + fun (OTP) -> CheckVsn(erlang:system_info(otp_release), OTP, "OTP") end, + CheckRabbit = + fun (Rabbit) -> + CheckVsn(rabbit_misc:rabbit_version(), Rabbit, "Rabbit") + end, + + CheckNodes = fun (Node, AllNodes) -> + ThisNode = node(), + case lists:member(ThisNode, AllNodes) of + true -> + ok; + false -> + throw({error, + {inconsistent_cluster, + rabbit_misc:format( + "Node ~p thinks it's clustered " + "with node ~p, but ~p disagrees", + [ThisNode, Node, Node])}}) + end + end, + + lists:foreach( + fun(Node) -> + case rpc:call(Node, rabbit_mnesia, node_info, []) of + {badrpc, _Reason} -> + ok; + {OTP, Rabbit, error} -> + CheckOTP(OTP), + CheckRabbit(Rabbit); + {OTP, Rabbit, {ok, {AllNodes, _, _}}} -> + CheckOTP(OTP), + CheckRabbit(Rabbit), + CheckNodes(Node, AllNodes) + end + end, all_clustered_nodes()). + +%%---------------------------------------------------------------------------- +%% Cluster status file functions +%%---------------------------------------------------------------------------- + +%% The cluster node status file contains all we need to know about the cluster: +%% +%% * All the clustered nodes +%% * The disc nodes +%% * The running nodes. +%% +%% If the current node is a disc node it will be included in the disc nodes +%% list. +%% +%% We strive to keep the file up to date and we rely on this assumption in +%% various situations. Obviously when mnesia is offline the information we have +%% will be outdated, but it can't be otherwise. + +cluster_nodes_status_filename() -> + dir() ++ "/cluster_nodes.config". + +initial_cluster_status() -> + {[node()], [node()], [node()]}. + +write_cluster_nodes_status(Status) -> + FileName = cluster_nodes_status_filename(), + case rabbit_file:write_term_file(FileName, [Status]) of + ok -> ok; + {error, Reason} -> + throw({error, {cannot_write_cluster_nodes_status, + FileName, Reason}}) + end. + +try_read_cluster_nodes_status() -> + FileName = cluster_nodes_status_filename(), + case rabbit_file:read_term_file(FileName) of + {ok, [{_, _, _} = Status]} -> + {ok, Status}; + {ok, Term} -> + {error, {invalid_term, FileName, Term}}; + {error, Reason} -> + {error, {cannot_read_file, FileName, Reason}} + end. + +read_cluster_nodes_status() -> + case try_read_cluster_nodes_status() of + {ok, Status} -> + Status; + {error, Reason} -> + throw({error, {cannot_read_cluster_nodes_status, Reason}}) + end. + +%% To update the cluster status when mnesia is running. +update_cluster_nodes_status() -> + {ok, Status} = cluster_status_if_running(), + write_cluster_nodes_status(Status). + +%%-------------------------------------------------------------------- +%% Hooks for `rabbit_node_monitor' %%-------------------------------------------------------------------- -nodes_of_type(Type) -> - %% This function should return the nodes of a certain type (ram, - %% disc or disc_only) in the current cluster. The type of nodes - %% is determined when the cluster is initially configured. - mnesia:table_info(schema, Type). +on_node_up(Node) -> + update_cluster_nodes_status(), + case is_only_disc_node(Node) of + true -> rabbit_log:info("cluster contains disc nodes again~n"); + false -> ok + end. + +on_node_down(Node) -> + case is_only_disc_node(Node) of + true -> rabbit_log:info("only running disc node went down~n"); + false -> ok + end, + update_cluster_nodes_status(). + +%%-------------------------------------------------------------------- +%% Internal helpers +%%-------------------------------------------------------------------- + +discover_cluster(Nodes) when is_list(Nodes) -> + lists:foldl(fun (_, {ok, Res}) -> {ok, Res}; + (Node, {error, _}) -> discover_cluster(Node) + end, + {error, {cannot_discover_cluster, + "The nodes provided is either offline or not running"}}, + Nodes); +discover_cluster(Node) -> + case Node =:= node() of + true -> + {error, {cannot_discover_cluster, + "You provided the current node as node to cluster with"}}; + false -> + case rpc:call(Node, rabbit_mnesia, cluster_status_if_running, []) of + {badrpc, _Reason} -> discover_cluster([]); + error -> discover_cluster([]); + {ok, Res} -> {ok, Res} + end + end. %% The tables aren't supposed to be on disk on a ram node table_definitions(disc) -> @@ -336,68 +860,11 @@ queue_name_match() -> resource_match(Kind) -> #resource{kind = Kind, _='_'}. -table_names() -> - [Tab || {Tab, _} <- table_definitions()]. - replicated_table_names() -> [Tab || {Tab, TabDef} <- table_definitions(), not lists:member({local_content, true}, TabDef) ]. -dir() -> mnesia:system_info(directory). - -ensure_mnesia_dir() -> - MnesiaDir = dir() ++ "/", - case filelib:ensure_dir(MnesiaDir) of - {error, Reason} -> - throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}}); - ok -> - ok - end. - -ensure_mnesia_running() -> - case mnesia:system_info(is_running) of - yes -> - ok; - starting -> - wait_for(mnesia_running), - ensure_mnesia_running(); - Reason when Reason =:= no; Reason =:= stopping -> - throw({error, mnesia_not_running}) - end. - -ensure_mnesia_not_running() -> - case mnesia:system_info(is_running) of - no -> - ok; - stopping -> - wait_for(mnesia_not_running), - ensure_mnesia_not_running(); - Reason when Reason =:= yes; Reason =:= starting -> - throw({error, mnesia_unexpectedly_running}) - end. - -ensure_schema_integrity() -> - case check_schema_integrity() of - ok -> - ok; - {error, Reason} -> - throw({error, {schema_integrity_check_failed, Reason}}) - end. - -check_schema_integrity() -> - Tables = mnesia:system_info(tables), - case check_tables(fun (Tab, TabDef) -> - case lists:member(Tab, Tables) of - false -> {error, {table_missing, Tab}}; - true -> check_table_attributes(Tab, TabDef) - end - end) of - ok -> ok = wait_for_tables(), - check_tables(fun check_table_content/2); - Other -> Other - end. - check_table_attributes(Tab, TabDef) -> {_, ExpAttrs} = proplists:lookup(attributes, TabDef), case mnesia:table_info(Tab, attributes) of @@ -433,153 +900,6 @@ check_tables(Fun) -> Errors -> {error, Errors} end. -%% The cluster node config file contains some or all of the disk nodes -%% that are members of the cluster this node is / should be a part of. -%% -%% If the file is absent, the list is empty, or only contains the -%% current node, then the current node is a standalone (disk) -%% node. Otherwise it is a node that is part of a cluster as either a -%% disk node, if it appears in the cluster node config, or ram node if -%% it doesn't. - -cluster_nodes_config_filename() -> - dir() ++ "/cluster_nodes.config". - -create_cluster_nodes_config(ClusterNodes) -> - FileName = cluster_nodes_config_filename(), - case rabbit_file:write_term_file(FileName, [ClusterNodes]) of - ok -> ok; - {error, Reason} -> - throw({error, {cannot_create_cluster_nodes_config, - FileName, Reason}}) - end. - -read_cluster_nodes_config() -> - FileName = cluster_nodes_config_filename(), - case rabbit_file:read_term_file(FileName) of - {ok, [ClusterNodes]} -> ClusterNodes; - {error, enoent} -> - {ok, ClusterNodes} = application:get_env(rabbit, cluster_nodes), - ClusterNodes; - {error, Reason} -> - throw({error, {cannot_read_cluster_nodes_config, - FileName, Reason}}) - end. - -delete_cluster_nodes_config() -> - FileName = cluster_nodes_config_filename(), - case file:delete(FileName) of - ok -> ok; - {error, enoent} -> ok; - {error, Reason} -> - throw({error, {cannot_delete_cluster_nodes_config, - FileName, Reason}}) - end. - -running_nodes_filename() -> - filename:join(dir(), "nodes_running_at_shutdown"). - -record_running_nodes() -> - FileName = running_nodes_filename(), - Nodes = running_clustered_nodes() -- [node()], - %% Don't check the result: we're shutting down anyway and this is - %% a best-effort-basis. - rabbit_file:write_term_file(FileName, [Nodes]), - ok. - -read_previously_running_nodes() -> - FileName = running_nodes_filename(), - case rabbit_file:read_term_file(FileName) of - {ok, [Nodes]} -> Nodes; - {error, enoent} -> []; - {error, Reason} -> throw({error, {cannot_read_previous_nodes_file, - FileName, Reason}}) - end. - -delete_previously_running_nodes() -> - FileName = running_nodes_filename(), - case file:delete(FileName) of - ok -> ok; - {error, enoent} -> ok; - {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file, - FileName, Reason}}) - end. - -init_db(ClusterNodes, Force) -> - init_db( - ClusterNodes, Force, - fun () -> - case rabbit_upgrade:maybe_upgrade_local() of - ok -> ok; - %% If we're just starting up a new node we won't have a - %% version - starting_from_scratch -> ok = rabbit_version:record_desired() - end - end). - -%% Take a cluster node config and create the right kind of node - a -%% standalone disk node, or disk or ram node connected to the -%% specified cluster nodes. If Force is false, don't allow -%% connections to offline nodes. -init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) -> - UClusterNodes = lists:usort(ClusterNodes), - ProperClusterNodes = UClusterNodes -- [node()], - case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of - {ok, []} when not Force andalso ProperClusterNodes =/= [] -> - throw({error, {failed_to_cluster_with, ProperClusterNodes, - "Mnesia could not connect to any disc nodes."}}); - {ok, Nodes} -> - WasDiscNode = is_disc_node(), - WantDiscNode = should_be_disc_node(ClusterNodes), - %% We create a new db (on disk, or in ram) in the first - %% two cases and attempt to upgrade the in the other two - case {Nodes, WasDiscNode, WantDiscNode} of - {[], _, false} -> - %% New ram node; start from scratch - ok = create_schema(ram); - {[], false, true} -> - %% Nothing there at all, start from scratch - ok = create_schema(disc); - {[], true, true} -> - %% We're the first node up - case rabbit_upgrade:maybe_upgrade_local() of - ok -> ensure_schema_integrity(); - version_not_available -> ok = schema_ok_or_move() - end; - {[AnotherNode|_], _, _} -> - %% Subsequent node in cluster, catch up - ensure_version_ok( - rpc:call(AnotherNode, rabbit_version, recorded, [])), - {CopyType, CopyTypeAlt} = - case WantDiscNode of - true -> {disc, disc_copies}; - false -> {ram, ram_copies} - end, - ok = wait_for_replicated_tables(), - ok = create_local_table_copy(schema, CopyTypeAlt), - ok = create_local_table_copies(CopyType), - - ok = SecondaryPostMnesiaFun(), - %% We've taken down mnesia, so ram nodes will need - %% to re-sync - case is_disc_node() of - false -> start_mnesia(), - mnesia:change_config(extra_db_nodes, - ProperClusterNodes), - wait_for_replicated_tables(); - true -> ok - end, - - ensure_schema_integrity(), - ok - end; - {error, Reason} -> - %% one reason we may end up here is if we try to join - %% nodes together that are currently running standalone or - %% are members of a different cluster - throw({error, {unable_to_join_cluster, ClusterNodes, Reason}}) - end. - schema_ok_or_move() -> case check_schema_integrity() of ok -> @@ -592,7 +912,7 @@ schema_ok_or_move() -> "and recreating schema from scratch~n", [Reason]), ok = move_db(), - ok = create_schema(disc) + ok = create_schema() end. ensure_version_ok({ok, DiscVersion}) -> @@ -604,25 +924,15 @@ ensure_version_ok({ok, DiscVersion}) -> ensure_version_ok({error, _}) -> ok = rabbit_version:record_desired(). -create_schema(Type) -> +%% We only care about disc nodes since ram nodes are supposed to catch up only +create_schema() -> stop_mnesia(), - case Type of - disc -> rabbit_misc:ensure_ok(mnesia:create_schema([node()]), - cannot_create_schema); - ram -> %% remove the disc schema since this is a ram node - rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), - cannot_delete_schema) - end, + rabbit_misc:ensure_ok(mnesia:create_schema([node()]), cannot_create_schema), start_mnesia(), - ok = create_tables(Type), + ok = create_tables(disc), ensure_schema_integrity(), ok = rabbit_version:record_desired(). -is_disc_node() -> mnesia:system_info(use_dir). - -should_be_disc_node(ClusterNodes) -> - ClusterNodes == [] orelse lists:member(node(), ClusterNodes). - move_db() -> stop_mnesia(), MnesiaDir = filename:dirname(dir() ++ "/"), @@ -644,25 +954,6 @@ move_db() -> start_mnesia(), ok. -copy_db(Destination) -> - ok = ensure_mnesia_not_running(), - rabbit_file:recursive_copy(dir(), Destination). - -create_tables() -> create_tables(disc). - -create_tables(Type) -> - lists:foreach(fun ({Tab, TabDef}) -> - TabDef1 = proplists:delete(match, TabDef), - case mnesia:create_table(Tab, TabDef1) of - {atomic, ok} -> ok; - {aborted, Reason} -> - throw({error, {table_creation_failed, - Tab, TabDef1, Reason}}) - end - end, - table_definitions(Type)), - ok. - copy_type_to_ram(TabDef) -> [{disc_copies, []}, {ram_copies, [node()]} | proplists:delete(ram_copies, proplists:delete(disc_copies, TabDef))]. @@ -711,119 +1002,104 @@ create_local_table_copy(Tab, Type) -> end, ok. -wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). - -wait_for_tables() -> wait_for_tables(table_names()). - -wait_for_tables(TableNames) -> - case mnesia:wait_for_tables(TableNames, 30000) of - ok -> - ok; - {timeout, BadTabs} -> - throw({error, {timeout_waiting_for_tables, BadTabs}}); - {error, Reason} -> - throw({error, {failed_waiting_for_tables, Reason}}) +remove_node_if_mnesia_running(Node) -> + case mnesia:system_info(is_running) of + yes -> %% Deleting the the schema copy of the node will result in the + %% node being removed from the cluster, with that change being + %% propagated to all nodes + case mnesia:del_table_copy(schema, Node) of + {atomic, ok} -> + update_cluster_nodes_status(), + io:format("nodes: ~p~n", [running_clustered_disc_nodes()]), + {_, []} = rpc:multicall(running_clustered_nodes(), + rabbit_mnesia, + update_cluster_nodes_status, []), + ok; + {aborted, Reason} -> + {error, {failed_to_remove_node, Node, Reason}} + end; + no -> {error, mnesia_not_running} end. -reset(Force) -> - rabbit_misc:local_info_msg("Resetting Rabbit~s~n", [if Force -> " forcefully"; - true -> "" - end]), - ensure_mnesia_not_running(), - case not Force andalso is_clustered() andalso - is_only_disc_node(node(), false) - of - true -> log_both("no other disc nodes running"); - false -> ok - end, - Node = node(), - Nodes = all_clustered_nodes() -- [Node], - case Force of - true -> ok; - false -> - ensure_mnesia_dir(), - start_mnesia(), - RunningNodes = - try - %% Force=true here so that reset still works when clustered - %% with a node which is down - ok = init_db(read_cluster_nodes_config(), true), - running_clustered_nodes() -- [Node] - after - stop_mnesia() - end, - leave_cluster(Nodes, RunningNodes), - rabbit_misc:ensure_ok(mnesia:delete_schema([Node]), - cannot_delete_schema) - end, - %% We need to make sure that we don't end up in a distributed - %% Erlang system with nodes while not being in an Mnesia cluster - %% with them. We don't handle that well. - [erlang:disconnect_node(N) || N <- Nodes], - ok = delete_cluster_nodes_config(), - %% remove persisted messages and any other garbage we find - ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")), - ok. +leave_cluster() -> + remove_node_remotely(node()). -leave_cluster([], _) -> ok; -leave_cluster(Nodes, RunningNodes) -> - %% find at least one running cluster node and instruct it to - %% remove our schema copy which will in turn result in our node - %% being removed as a cluster node from the schema, with that - %% change being propagated to all nodes - case lists:any( - fun (Node) -> - case rpc:call(Node, mnesia, del_table_copy, - [schema, node()]) of - {atomic, ok} -> true; - {badrpc, nodedown} -> false; - {aborted, {node_not_running, _}} -> false; - {aborted, Reason} -> - throw({error, {failed_to_leave_cluster, - Nodes, RunningNodes, Reason}}) - end - end, - RunningNodes) of - true -> ok; - false -> throw({error, {no_running_cluster_nodes, - Nodes, RunningNodes}}) +remove_node_remotely(Removee) -> + case running_clustered_nodes() -- [Removee] of + [] -> + ok; + RunningNodes -> + case lists:any( + fun (Node) -> + case rpc:call(Node, rabbit_mnesia, + remove_node_if_mnesia_running, + [Removee]) + of + ok -> + true; + {error, mnesia_not_running} -> + false; + {error, Reason} -> + throw({error, Reason}); + {badrpc, nodedown} -> + false + end + end, + RunningNodes) + of + true -> ok; + false -> {error, no_running_cluster_nodes} + end end. wait_for(Condition) -> error_logger:info_msg("Waiting for ~p...~n", [Condition]), timer:sleep(1000). -on_node_up(Node) -> - case is_only_disc_node(Node, true) of - true -> rabbit_log:info("cluster contains disc nodes again~n"); - false -> ok - end. - -on_node_down(Node) -> - case is_only_disc_node(Node, true) of - true -> rabbit_log:info("only running disc node went down~n"); - false -> ok - end. - -is_only_disc_node(Node, _MnesiaRunning = true) -> - RunningSet = sets:from_list(running_clustered_nodes()), - DiscSet = sets:from_list(nodes_of_type(disc_copies)), - [Node] =:= sets:to_list(sets:intersection(RunningSet, DiscSet)); -is_only_disc_node(Node, false) -> - start_mnesia(), - Res = is_only_disc_node(Node, true), - stop_mnesia(), - Res. - -log_both(Warning) -> - io:format("Warning: ~s~n", [Warning]), - rabbit_misc:with_local_io( - fun () -> error_logger:warning_msg("~s~n", [Warning]) end). +is_only_disc_node(Node) -> + Nodes = running_clustered_disc_nodes(), + [Node] =:= Nodes. start_mnesia() -> + check_cluster_consistency(), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), ensure_mnesia_running(). stop_mnesia() -> stopped = mnesia:stop(), ensure_mnesia_not_running(). + +change_extra_db_nodes(ClusterNodes0, Force) -> + ClusterNodes = lists:usort(ClusterNodes0) -- [node()], + case mnesia:change_config(extra_db_nodes, ClusterNodes) of + {ok, []} when not Force andalso ClusterNodes =/= [] -> + {error, {failed_to_cluster_with, ClusterNodes, + "Mnesia could not connect to any disc nodes."}}; + {ok, Nodes} -> + {ok, Nodes} + end. + +%%-------------------------------------------------------------------- +%% Legacy functions related to the "running nodes" file +%%-------------------------------------------------------------------- + +legacy_running_nodes_filename() -> + filename:join(dir(), "nodes_running_at_shutdown"). + +legacy_read_previously_running_nodes() -> + FileName = legacy_running_nodes_filename(), + case rabbit_file:read_term_file(FileName) of + {ok, [Nodes]} -> Nodes; + {error, enoent} -> []; + {error, Reason} -> throw({error, {cannot_read_previous_nodes_file, + FileName, Reason}}) + end. + +legacy_delete_previously_running_nodes() -> + FileName = legacy_running_nodes_filename(), + case file:delete(FileName) of + ok -> ok; + {error, enoent} -> ok; + {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file, + FileName, Reason}}) + end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 5545cccf..f4f74921 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -32,6 +32,7 @@ -define(TIMEOUT, 5000). all_tests() -> + ok = setup_cluster(), passed = gm_tests:all_tests(), passed = mirrored_supervisor_tests:all_tests(), application:set_env(rabbit, file_handles_high_watermark, 10, infinity), @@ -52,36 +53,51 @@ all_tests() -> passed = test_log_management_during_startup(), passed = test_statistics(), passed = test_arguments_parser(), - passed = test_cluster_management(), passed = test_user_management(), passed = test_runtime_parameters(), passed = test_server_status(), passed = test_confirms(), - passed = maybe_run_cluster_dependent_tests(), + passed = + do_if_secondary_node( + fun run_cluster_dependent_tests/1, + fun (SecondaryNode) -> + io:format("Skipping cluster dependent tests with node ~p~n", + [SecondaryNode]), + passed + end), passed = test_configurable_server_properties(), passed. -maybe_run_cluster_dependent_tests() -> +do_if_secondary_node(Up, Down) -> SecondaryNode = rabbit_nodes:make("hare"), case net_adm:ping(SecondaryNode) of - pong -> passed = run_cluster_dependent_tests(SecondaryNode); - pang -> io:format("Skipping cluster dependent tests with node ~p~n", - [SecondaryNode]) - end, - passed. + pong -> Up(SecondaryNode); + pang -> Down(SecondaryNode) + end. -run_cluster_dependent_tests(SecondaryNode) -> - SecondaryNodeS = atom_to_list(SecondaryNode), +setup_cluster() -> + do_if_secondary_node( + fun (SecondaryNode) -> + ok = control_action(stop_app, []), + ok = control_action(join_cluster, + [atom_to_list(SecondaryNode)]), + ok = control_action(start_app, []), + ok = control_action(start_app, SecondaryNode, [], []) + end, + fun (_) -> ok end). - cover:stop(SecondaryNode), - ok = control_action(stop_app, []), - ok = control_action(reset, []), - ok = control_action(cluster, [SecondaryNodeS]), - ok = control_action(start_app, []), - cover:start(SecondaryNode), - ok = control_action(start_app, SecondaryNode, [], []), +maybe_run_cluster_dependent_tests() -> + do_if_secondary_node( + fun (SecondaryNode) -> + passed = run_cluster_dependent_tests(SecondaryNode) + end, + fun (SecondaryNode) -> + io:format("Skipping cluster dependent tests with node ~p~n", + [SecondaryNode]) + end). +run_cluster_dependent_tests(SecondaryNode) -> io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]), passed = test_delegates_async(SecondaryNode), passed = test_delegates_sync(SecondaryNode), @@ -856,200 +872,6 @@ test_arguments_parser() -> passed. -test_cluster_management() -> - %% 'cluster' and 'reset' should only work if the app is stopped - {error, _} = control_action(cluster, []), - {error, _} = control_action(reset, []), - {error, _} = control_action(force_reset, []), - - ok = control_action(stop_app, []), - - %% various ways of creating a standalone node - NodeS = atom_to_list(node()), - ClusteringSequence = [[], - [NodeS], - ["invalid@invalid", NodeS], - [NodeS, "invalid@invalid"]], - - ok = control_action(reset, []), - lists:foreach(fun (Arg) -> - ok = control_action(force_cluster, Arg), - ok - end, - ClusteringSequence), - lists:foreach(fun (Arg) -> - ok = control_action(reset, []), - ok = control_action(force_cluster, Arg), - ok - end, - ClusteringSequence), - ok = control_action(reset, []), - lists:foreach(fun (Arg) -> - ok = control_action(force_cluster, Arg), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok - end, - ClusteringSequence), - lists:foreach(fun (Arg) -> - ok = control_action(reset, []), - ok = control_action(force_cluster, Arg), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok - end, - ClusteringSequence), - - %% convert a disk node into a ram node - ok = control_action(reset, []), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_disc_node(), - ok = control_action(force_cluster, ["invalid1@invalid", - "invalid2@invalid"]), - ok = assert_ram_node(), - - %% join a non-existing cluster as a ram node - ok = control_action(reset, []), - ok = control_action(force_cluster, ["invalid1@invalid", - "invalid2@invalid"]), - ok = assert_ram_node(), - - ok = control_action(reset, []), - - SecondaryNode = rabbit_nodes:make("hare"), - case net_adm:ping(SecondaryNode) of - pong -> passed = test_cluster_management2(SecondaryNode); - pang -> io:format("Skipping clustering tests with node ~p~n", - [SecondaryNode]) - end, - - ok = control_action(start_app, []), - passed. - -test_cluster_management2(SecondaryNode) -> - NodeS = atom_to_list(node()), - SecondaryNodeS = atom_to_list(SecondaryNode), - - %% make a disk node - ok = control_action(cluster, [NodeS]), - ok = assert_disc_node(), - %% make a ram node - ok = control_action(reset, []), - ok = control_action(cluster, [SecondaryNodeS]), - ok = assert_ram_node(), - - %% join cluster as a ram node - ok = control_action(reset, []), - ok = control_action(force_cluster, [SecondaryNodeS, "invalid1@invalid"]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_ram_node(), - - %% ram node will not start by itself - ok = control_action(stop_app, []), - ok = control_action(stop_app, SecondaryNode, [], []), - {error, _} = control_action(start_app, []), - ok = control_action(start_app, SecondaryNode, [], []), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - - %% change cluster config while remaining in same cluster - ok = control_action(force_cluster, ["invalid2@invalid", SecondaryNodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - - %% join non-existing cluster as a ram node - ok = control_action(force_cluster, ["invalid1@invalid", - "invalid2@invalid"]), - {error, _} = control_action(start_app, []), - ok = assert_ram_node(), - - %% join empty cluster as a ram node (converts to disc) - ok = control_action(cluster, []), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_disc_node(), - - %% make a new ram node - ok = control_action(reset, []), - ok = control_action(force_cluster, [SecondaryNodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_ram_node(), - - %% turn ram node into disk node - ok = control_action(cluster, [SecondaryNodeS, NodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_disc_node(), - - %% convert a disk node into a ram node - ok = assert_disc_node(), - ok = control_action(force_cluster, ["invalid1@invalid", - "invalid2@invalid"]), - ok = assert_ram_node(), - - %% make a new disk node - ok = control_action(force_reset, []), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_disc_node(), - - %% turn a disk node into a ram node - ok = control_action(reset, []), - ok = control_action(cluster, [SecondaryNodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_ram_node(), - - %% NB: this will log an inconsistent_database error, which is harmless - %% Turning cover on / off is OK even if we're not in general using cover, - %% it just turns the engine on / off, doesn't actually log anything. - cover:stop([SecondaryNode]), - true = disconnect_node(SecondaryNode), - pong = net_adm:ping(SecondaryNode), - cover:start([SecondaryNode]), - - %% leaving a cluster as a ram node - ok = control_action(reset, []), - %% ...and as a disk node - ok = control_action(cluster, [SecondaryNodeS, NodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - cover:stop(SecondaryNode), - ok = control_action(reset, []), - cover:start(SecondaryNode), - - %% attempt to leave cluster when no other node is alive - ok = control_action(cluster, [SecondaryNodeS, NodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, SecondaryNode, [], []), - ok = control_action(stop_app, []), - {error, {no_running_cluster_nodes, _, _}} = - control_action(reset, []), - - %% attempt to change type when no other node is alive - {error, {no_running_cluster_nodes, _, _}} = - control_action(cluster, [SecondaryNodeS]), - - %% leave system clustered, with the secondary node as a ram node - ok = control_action(force_reset, []), - ok = control_action(start_app, []), - %% Yes, this is rather ugly. But since we're a clustered Mnesia - %% node and we're telling another clustered node to reset itself, - %% we will get disconnected half way through causing a - %% badrpc. This never happens in real life since rabbitmqctl is - %% not a clustered Mnesia node. - cover:stop(SecondaryNode), - {badrpc, nodedown} = control_action(force_reset, SecondaryNode, [], []), - pong = net_adm:ping(SecondaryNode), - cover:start(SecondaryNode), - ok = control_action(cluster, SecondaryNode, [NodeS], []), - ok = control_action(start_app, SecondaryNode, [], []), - - passed. - test_user_management() -> %% lots if stuff that should fail diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index e1a7bcae..c43ce236 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -121,10 +121,7 @@ remove_backup() -> info("upgrades: Mnesia backup removed~n", []). maybe_upgrade_mnesia() -> - %% rabbit_mnesia:all_clustered_nodes/0 will return [] at this point - %% if we are a RAM node since Mnesia has not started yet. - AllNodes = lists:usort(rabbit_mnesia:all_clustered_nodes() ++ - rabbit_mnesia:read_cluster_nodes_config()), + AllNodes = rabbit_mnesia:all_clustered_nodes(), case rabbit_version:upgrades_required(mnesia) of {error, starting_from_scratch} -> ok; @@ -150,19 +147,17 @@ maybe_upgrade_mnesia() -> upgrade_mode(AllNodes) -> case nodes_running(AllNodes) of [] -> - AfterUs = rabbit_mnesia:read_previously_running_nodes(), + AfterUs = rabbit_mnesia:running_clustered_nodes() -- [node()], case {is_disc_node_legacy(), AfterUs} of {true, []} -> primary; {true, _} -> - Filename = rabbit_mnesia:running_nodes_filename(), + %% TODO: Here I'm assuming that the various cluster status + %% files are consistent with each other, I think I can + %% provide a solution if they're not... die("Cluster upgrade needed but other disc nodes shut " "down after this one.~nPlease first start the last " - "disc node to shut down.~n~nNote: if several disc " - "nodes were shut down simultaneously they may " - "all~nshow this message. In which case, remove " - "the lock file on one of them and~nstart that node. " - "The lock file on this node is:~n~n ~s ", [Filename]); + "disc node to shut down.~n", []); {false, _} -> die("Cluster upgrade needed but this is a ram node.~n" "Please first start the last disc node to shut down.", @@ -222,15 +217,8 @@ secondary_upgrade(AllNodes) -> IsDiscNode = is_disc_node_legacy(), rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), cannot_delete_schema), - %% Note that we cluster with all nodes, rather than all disc nodes - %% (as we can't know all disc nodes at this point). This is safe as - %% we're not writing the cluster config, just setting up Mnesia. - ClusterNodes = case IsDiscNode of - true -> AllNodes; - false -> AllNodes -- [node()] - end, rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - ok = rabbit_mnesia:init_db(ClusterNodes, true, fun () -> ok end), + ok = rabbit_mnesia:init_db(AllNodes, IsDiscNode, true), ok = rabbit_version:record_desired_for_scope(mnesia), ok. |