diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-05-30 12:07:13 +0100 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-05-30 12:07:13 +0100 |
commit | 386a9a7920e5b6f1967b38219b76f44ff22dbde3 (patch) | |
tree | c63538c4334f66be7993aaae54da0a8849921ee5 /src | |
parent | 6db187ded843dd14bf7ce51d046ee8ecc2c4dc72 (diff) | |
parent | 9c2c1ed3703860eb55836eac2ee3d28881b570b5 (diff) | |
download | rabbitmq-server-386a9a7920e5b6f1967b38219b76f44ff22dbde3.tar.gz |
merge default
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit.erl | 10 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 41 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 6 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 1142 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 244 | ||||
-rw-r--r-- | src/rabbit_upgrade.erl | 26 |
6 files changed, 773 insertions, 696 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index f69c8d1b..3f00f000 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -295,6 +295,8 @@ prepare() -> ok -> ok; {error, {already_loaded, rabbit}} -> ok end, + ok = rabbit_mnesia:ensure_mnesia_dir(), + ok = rabbit_mnesia:prepare(), ok = ensure_working_log_handlers(), ok = rabbit_upgrade:maybe_upgrade_mnesia(). @@ -405,7 +407,7 @@ start(normal, []) -> end. stop(_State) -> - ok = rabbit_mnesia:record_running_nodes(), + ok = rabbit_mnesia:update_cluster_nodes_status(), terminated_ok = error_logger:delete_report_handler(rabbit_error_logger), ok = rabbit_alarm:stop(), ok = case rabbit_mnesia:is_clustered() of @@ -502,12 +504,12 @@ sort_boot_steps(UnsortedSteps) -> end. boot_step_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) -> + AllNodes = rabbit_mnesia:all_clustered_nodes(), {Err, Nodes} = - case rabbit_mnesia:read_previously_running_nodes() of + case AllNodes -- [node()] of [] -> {"Timeout contacting cluster nodes. Since RabbitMQ was" " shut down forcefully~nit cannot determine which nodes" - " are timing out. Details on all nodes will~nfollow.~n", - rabbit_mnesia:all_clustered_nodes() -- [node()]}; + " are timing out.~n", []}; Ns -> {rabbit_misc:format( "Timeout contacting cluster nodes: ~p.~n", [Ns]), Ns} diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index f8b8c345..422458a9 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -25,10 +25,12 @@ -define(QUIET_OPT, "-q"). -define(NODE_OPT, "-n"). -define(VHOST_OPT, "-p"). +-define(RAM_OPT, "--ram"). -define(QUIET_DEF, {?QUIET_OPT, flag}). -define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}). -define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}). +-define(RAM_DEF, {?RAM_OPT, flag}). -define(GLOBAL_DEFS(Node), [?QUIET_DEF, ?NODE_DEF(Node)]). @@ -41,8 +43,10 @@ force_reset, rotate_logs, - cluster, - force_cluster, + {join_cluster, [?RAM_DEF]}, + change_node_type, + recluster, + remove_node, cluster_status, add_user, @@ -234,17 +238,28 @@ action(force_reset, Node, [], _Opts, Inform) -> Inform("Forcefully resetting node ~p", [Node]), call(Node, {rabbit_mnesia, force_reset, []}); -action(cluster, Node, ClusterNodeSs, _Opts, Inform) -> - ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), - Inform("Clustering node ~p with ~p", - [Node, ClusterNodes]), - rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]); - -action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) -> - ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), - Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)", - [Node, ClusterNodes]), - rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]); +action(join_cluster, Node, [ClusterNodeS], Opts, Inform) -> + ClusterNode = list_to_atom(ClusterNodeS), + DiscNode = not proplists:get_bool(?RAM_OPT, Opts), + Inform("Clustering node ~p with ~p", [Node, ClusterNode]), + rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, DiscNode]); + +action(change_node_type, Node, ["ram"], _Opts, Inform) -> + Inform("Turning ~p into a ram node", [Node]), + rpc_call(Node, rabbit_mnesia, change_node_type, [ram]); +action(change_node_type, Node, ["disc"], _Opts, Inform) -> + Inform("Turning ~p into a disc node", [Node]), + rpc_call(Node, rabbit_mnesia, change_node_type, [disc]); + +action(recluster, Node, [ClusterNodeS], _Opts, Inform) -> + ClusterNode = list_to_atom(ClusterNodeS), + Inform("Re-clustering ~p with ~p", [Node, ClusterNode]), + rpc_call(Node, rabbit_mnesia, recluster, [ClusterNode]); + +action(remove_node, Node, [ClusterNodeS], _Opts, Inform) -> + ClusterNode = list_to_atom(ClusterNodeS), + Inform("Removing node ~p from cluster", [ClusterNode]), + rpc_call(Node, rabbit_mnesia, remove_node, [ClusterNode]); action(wait, Node, [PidFile], _Opts, Inform) -> Inform("Waiting for ~p", [Node]), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index d41aa09b..5bfb7de0 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -60,6 +60,7 @@ -export([multi_call/2]). -export([os_cmd/1]). -export([gb_sets_difference/2]). +-export([rabbit_version/0]). %%---------------------------------------------------------------------------- @@ -212,6 +213,7 @@ ([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}). -spec(os_cmd/1 :: (string()) -> string()). -spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()). +-spec(rabbit_version/0 :: () -> string()). -endif. @@ -939,3 +941,7 @@ os_cmd(Command) -> gb_sets_difference(S1, S2) -> gb_sets:fold(fun gb_sets:delete_any/2, S1, S2). + +rabbit_version() -> + {ok, VSN} = application:get_key(rabbit, vsn), + VSN. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 7e9346f9..2e34b65e 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -18,16 +18,19 @@ -module(rabbit_mnesia). -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, - cluster/1, force_cluster/1, reset/0, force_reset/0, init_db/3, - is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, - empty_ram_only_tables/0, copy_db/1, wait_for_tables/1, - create_cluster_nodes_config/1, read_cluster_nodes_config/0, - record_running_nodes/0, read_previously_running_nodes/0, - running_nodes_filename/0, is_disc_node/0, on_node_down/1, - on_node_up/1]). + join_cluster/2, check_cluster_consistency/0, reset/0, force_reset/0, + init_db/4, is_clustered/0, running_clustered_nodes/0, + all_clustered_nodes/0, all_clustered_disc_nodes/0, + empty_ram_only_tables/0, copy_db/1, wait_for_tables/1, is_disc_node/0, + on_node_down/1, on_node_up/1, should_be_disc_node/1, + change_node_type/1, recluster/1, remove_node/1, prepare/0, + update_cluster_nodes_status/0]). -export([table_names/0]). +%% Used internally in rpc calls, see `discover_nodes/1' +-export([cluster_status_if_running/0, node_info/0]). + %% create_tables/0 exported for helping embed RabbitMQ in or alongside %% other mnesia-using Erlang applications, such as ejabberd -export([create_tables/0]). @@ -38,147 +41,156 @@ -ifdef(use_specs). --export_type([node_type/0]). +-export_type([node_type/0, node_status/0]). --type(node_type() :: disc_only | disc | ram | unknown). --spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} | - {'running_nodes', [node()]}]). --spec(dir/0 :: () -> file:filename()). --spec(ensure_mnesia_dir/0 :: () -> 'ok'). +-type(node_type() :: disc | ram). +-type(node_status() :: {[node()], [node()], [node()]}). + +%% Main interface +-spec(prepare/0 :: () -> 'ok'). -spec(init/0 :: () -> 'ok'). --spec(init_db/3 :: ([node()], boolean(), rabbit_misc:thunk('ok')) -> 'ok'). --spec(is_db_empty/0 :: () -> boolean()). --spec(cluster/1 :: ([node()]) -> 'ok'). --spec(force_cluster/1 :: ([node()]) -> 'ok'). --spec(cluster/2 :: ([node()], boolean()) -> 'ok'). +-spec(join_cluster/2 :: ([node()], boolean()) -> 'ok'). -spec(reset/0 :: () -> 'ok'). -spec(force_reset/0 :: () -> 'ok'). +-spec(recluster/1 :: (node()) -> 'ok'). +-spec(change_node_type/1 :: (node_type()) -> 'ok'). +-spec(remove_node/1 :: (node()) -> 'ok'). + +%% Various queries to get the status of the db +-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} | + {'running_nodes', [node()]}]). +-spec(is_db_empty/0 :: () -> boolean()). -spec(is_clustered/0 :: () -> boolean()). -spec(running_clustered_nodes/0 :: () -> [node()]). -spec(all_clustered_nodes/0 :: () -> [node()]). +-spec(is_disc_node/0 :: () -> boolean()). +-spec(dir/0 :: () -> file:filename()). +-spec(table_names/0 :: () -> [atom()]). +-spec(cluster_status_if_running/0 :: () -> 'error' | node_status()). + +%% Operations on the db and utils, mainly used in `rabbit_upgrade' and `rabbit' +-spec(init_db/4 :: ([node()], boolean(), boolean(), boolean()) -> 'ok'). +-spec(ensure_mnesia_dir/0 :: () -> 'ok'). -spec(empty_ram_only_tables/0 :: () -> 'ok'). -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). -spec(wait_for_tables/1 :: ([atom()]) -> 'ok'). --spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok'). --spec(read_cluster_nodes_config/0 :: () -> [node()]). --spec(record_running_nodes/0 :: () -> 'ok'). --spec(read_previously_running_nodes/0 :: () -> [node()]). --spec(running_nodes_filename/0 :: () -> file:filename()). --spec(is_disc_node/0 :: () -> boolean()). +-spec(should_be_disc_node/1 :: ([node()]) -> boolean()). +-spec(check_cluster_consistency/0 :: () -> 'ok' | no_return()). + +%% Functions to handle the cluster status file +-spec(write_cluster_nodes_status/1 :: (node_status()) -> 'ok'). +-spec(read_cluster_nodes_status/0 :: () -> node_status()). +-spec(update_cluster_nodes_status/0 :: () -> 'ok'). + +%% Hooks used in `rabbit_node_monitor' -spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). --spec(table_names/0 :: () -> [atom()]). - -endif. %%---------------------------------------------------------------------------- +%% Main interface +%%---------------------------------------------------------------------------- -status() -> - [{nodes, case mnesia:system_info(is_running) of - yes -> [{Key, Nodes} || - {Key, CopyType} <- [{disc_only, disc_only_copies}, - {disc, disc_copies}, - {ram, ram_copies}], - begin - Nodes = nodes_of_type(CopyType), - Nodes =/= [] - end]; - no -> case all_clustered_nodes() of - [] -> []; - Nodes -> [{unknown, Nodes}] - end; - Reason when Reason =:= starting; Reason =:= stopping -> - exit({rabbit_busy, try_again_later}) - end}, - {running_nodes, running_clustered_nodes()}]. +%% Sets up the cluster status file when needed, taking care of the legacy +%% files +prepare() -> + NotPresent = + fun (AllNodes0, WantDiscNode) -> + ThisNode = [node()], + + RunningNodes0 = legacy_read_previously_running_nodes(), + legacy_delete_previously_running_nodes(), + + RunningNodes = lists:usort(RunningNodes0 ++ ThisNode), + AllNodes = + lists:usort(AllNodes0 ++ RunningNodes), + DiscNodes = case WantDiscNode of + true -> ThisNode; + false -> [] + end, + + ok = write_cluster_nodes_status({AllNodes, DiscNodes, RunningNodes}) + end, + case try_read_cluster_nodes_status() of + {ok, _} -> + %% We check the consistency only when the cluster status exists, + %% since when it doesn't exist it means that we just started a fresh + %% node, and when we have a legacy node with an old + %% "cluster_nodes.config" we can't check the consistency anyway + check_cluster_consistency(), + ok; + {error, {invalid_term, _, [AllNodes]}} -> + %% Legacy file + NotPresent(AllNodes, should_be_disc_node(AllNodes)); + {error, {cannot_read_file, _, enoent}} -> + {ok, {AllNodes, WantDiscNode}} = + application:get_env(rabbit, cluster_nodes), + NotPresent(AllNodes, WantDiscNode) + end. init() -> ensure_mnesia_running(), ensure_mnesia_dir(), - Nodes = read_cluster_nodes_config(), - ok = init_db(Nodes, should_be_disc_node(Nodes)), + {AllNodes, _, DiscNodes} = read_cluster_nodes_status(), + WantDiscNode = should_be_disc_node(DiscNodes), + ok = init_db(AllNodes, WantDiscNode, WantDiscNode), %% We intuitively expect the global name server to be synced when %% Mnesia is up. In fact that's not guaranteed to be the case - let's %% make it so. ok = global:sync(), - ok = delete_previously_running_nodes(), ok. -is_db_empty() -> - lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end, - table_names()). +%% Make the node join a cluster. The node will be reset automatically before we +%% actually cluster it. The nodes provided will be used to find out about the +%% nodes in the cluster. +%% This function will fail if: +%% +%% * The node is currently the only disc node of its cluster +%% * We can't connect to any of the nodes provided +%% * The node is currently already clustered with the cluster of the nodes +%% provided +%% +%% Note that we make no attempt to verify that the nodes provided are all in the +%% same cluster, we simply pick the first online node and we cluster to its +%% cluster. +join_cluster(DiscoveryNode, WantDiscNode) -> + case is_disc_and_clustered() andalso is_only_disc_node(node()) of + true -> throw({error, + {standalone_ram_node, + "You can't cluster a node if it's the only " + "disc node in its existing cluster. If new nodes " + "joined while this node was offline, use \"recluster\" " + "to add them manually"}}); + _ -> ok + end, -cluster(ClusterNodes) -> - cluster(ClusterNodes, false). -force_cluster(ClusterNodes) -> - cluster(ClusterNodes, true). - -%% Alter which disk nodes this node is clustered with. This can be a -%% subset of all the disk nodes in the cluster but can (and should) -%% include the node itself if it is to be a disk rather than a ram -%% node. If Force is false, only connections to online nodes are -%% allowed. -cluster(ClusterNodes, Force) -> - rabbit_misc:local_info_msg("Clustering with ~p~s~n", - [ClusterNodes, if Force -> " forcefully"; - true -> "" - end]), ensure_mnesia_not_running(), ensure_mnesia_dir(), - case not Force andalso is_clustered() andalso - is_only_disc_node(node(), false) andalso - not should_be_disc_node(ClusterNodes) - of - true -> log_both("last running disc node leaving cluster"); - _ -> ok - end, + {ClusterNodes, DiscNodes, _} = case discover_cluster(DiscoveryNode) of + {ok, Res} -> Res; + {error, Reason} -> throw({error, Reason}) + end, - %% Wipe mnesia if we're changing type from disc to ram - case {is_disc_node(), should_be_disc_node(ClusterNodes)} of - {true, false} -> rabbit_misc:with_local_io( - fun () -> error_logger:warning_msg( - "changing node type; wiping " - "mnesia...~n~n") - end), - rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), - cannot_delete_schema); - _ -> ok + case lists:member(node(), ClusterNodes) of + true -> throw({error, {already_clustered, + "You are already clustered with the nodes you " + "have selected"}}); + false -> ok end, - %% Pre-emptively leave the cluster - %% - %% We're trying to handle the following two cases: - %% 1. We have a two-node cluster, where both nodes are disc nodes. - %% One node is re-clustered as a ram node. When it tries to - %% re-join the cluster, but before it has time to update its - %% tables definitions, the other node will order it to re-create - %% its disc tables. So, we need to leave the cluster before we - %% can join it again. - %% 2. We have a two-node cluster, where both nodes are disc nodes. - %% One node is forcefully reset (so, the other node thinks its - %% still a part of the cluster). The reset node is re-clustered - %% as a ram node. Same as above, we need to leave the cluster - %% before we can join it. But, since we don't know if we're in a - %% cluster or not, we just pre-emptively leave it before joining. - ProperClusterNodes = ClusterNodes -- [node()], - try - ok = leave_cluster(ProperClusterNodes, ProperClusterNodes) - catch - {error, {no_running_cluster_nodes, _, _}} when Force -> - ok - end, + %% reset the node. this simplifies things and it will be needed in this case + %% - we're joining a new cluster with new nodes which are not in synch with + %% the current node. I also lifts the burden of reseting the node from the + %% user. + reset(false), + + rabbit_misc:local_info_msg("Clustering with ~p~s~n", [ClusterNodes]), %% Join the cluster - start_mnesia(), - try - ok = init_db(ClusterNodes, Force), - ok = create_cluster_nodes_config(ClusterNodes) - after - stop_mnesia() - end, + ok = start_and_init_db(DiscNodes, WantDiscNode, false), ok. @@ -188,15 +200,339 @@ cluster(ClusterNodes, Force) -> reset() -> reset(false). force_reset() -> reset(true). +reset(Force) -> + rabbit_misc:local_info_msg("Resetting Rabbit~s~n", + [if Force -> " forcefully"; + true -> "" + end]), + ensure_mnesia_not_running(), + case not Force andalso is_disc_and_clustered() andalso + is_only_disc_node(node()) + of + true -> throw({error, {standalone_ram_node, + "You can't reset a node if it's the only disc " + "node in a cluster. Please convert another node" + " of the cluster to a disc node first."}}); + false -> ok + end, + Node = node(), + Nodes = all_clustered_nodes(), + case Force of + true -> + ok; + false -> + ensure_mnesia_dir(), + start_mnesia(), + %% Reconnecting so that we will get an up to date RunningNodes + RunningNodes = + try + %% Force=true here so that reset still works when clustered + %% with a node which is down + {_, DiscNodes, _} = read_cluster_nodes_status(), + ok = init_db(DiscNodes, should_be_disc_node(DiscNodes), + true), + running_clustered_nodes() + after + stop_mnesia() + end, + leave_cluster(Nodes, RunningNodes), + rabbit_misc:ensure_ok(mnesia:delete_schema([Node]), + cannot_delete_schema) + end, + %% We need to make sure that we don't end up in a distributed Erlang system + %% with nodes while not being in an Mnesia cluster with them. We don't + %% handle that well. + [erlang:disconnect_node(N) || N <- Nodes], + %% remove persisted messages and any other garbage we find + ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")), + ok = write_cluster_nodes_status(initial_cluster_status()), + ok. + +change_node_type(Type) -> + ensure_mnesia_dir(), + ensure_mnesia_not_running(), + case is_clustered() of + false -> throw({error, {not_clustered, + "Non-clustered nodes can only be disc nodes"}}); + true -> ok + end, + + check_cluster_consistency(), + DiscoveryNodes = all_clustered_nodes(), + ClusterNodes = + case discover_cluster(DiscoveryNodes) of + {ok, {ClusterNodes0, _, _}} -> + ClusterNodes0; + {error, _Reason} -> + throw({error, + {cannot_connect_to_cluster, + "Could not connect to the cluster nodes present in " + "this node status file. If the cluster has changed, " + "you can use the \"recluster\" command to point to the " + "new cluster nodes"}}) + end, + + WantDiscNode = case Type of + ram -> false; + disc -> true + end, + + ok = start_and_init_db(ClusterNodes, WantDiscNode, false), + + ok. + +recluster(DiscoveryNode) -> + ensure_mnesia_not_running(), + ensure_mnesia_dir(), + + ClusterNodes = + case discover_cluster(DiscoveryNode) of + {ok, {ClusterNodes0, _, _}} -> + ClusterNodes0; + {error, _Reason} -> + throw({error, + {cannot_connect_to_node, + "Could not connect to the cluster node provided"}}) + end, + + case lists:member(node(), ClusterNodes) of + true -> start_and_init_db(ClusterNodes, is_disc_node(), false); + false -> throw({error, + {inconsistent_cluster, + "The nodes provided do not have this node as part of " + "the cluster"}}) + end, + + ok. + +remove_node(Node) -> + case mnesia:system_info(is_running) of + yes -> {atomic, ok} = mnesia:del_table_copy(schema, Node), + update_cluster_nodes_status(), + {_, []} = rpc:multicall(running_clustered_nodes(), rabbit_mnesia, + update_cluster_nodes_status, []); + no -> start_mnesia(), + try + [mnesia:force_load_table(T) || T <- rabbit_mnesia:table_names()], + remove_node(Node) + after + stop_mnesia() + end + end, + ok. + +%%---------------------------------------------------------------------------- +%% Queries +%%---------------------------------------------------------------------------- + +status() -> + IfNonEmpty = fun (_, []) -> []; + (Type, Nodes) -> [{Type, Nodes}] + end, + [{nodes, (IfNonEmpty(disc, all_clustered_disc_nodes()) ++ + IfNonEmpty(ram, all_clustered_ram_nodes()))}, + {running_nodes, running_clustered_nodes()}]. + +is_db_empty() -> + lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end, + table_names()). + is_clustered() -> - RunningNodes = running_clustered_nodes(), - [node()] /= RunningNodes andalso [] /= RunningNodes. + Nodes = all_clustered_nodes(), + [node()] /= Nodes andalso [] /= Nodes. + +is_disc_and_clustered() -> + is_disc_node() andalso is_clustered(). + +%% Functions that retrieve the nodes in the cluster completely rely on the +%% cluster status file. For obvious reason, if rabbit is down, they might return +%% out of date information. all_clustered_nodes() -> - mnesia:system_info(db_nodes). + {AllNodes, _, _} = read_cluster_nodes_status(), + AllNodes. + +all_clustered_disc_nodes() -> + {_, DiscNodes, _} = read_cluster_nodes_status(), + DiscNodes. + +all_clustered_ram_nodes() -> + {AllNodes, DiscNodes, _} = read_cluster_nodes_status(), + sets:to_list(sets:subtract(sets:from_list(AllNodes), + sets:from_list(DiscNodes))). running_clustered_nodes() -> - mnesia:system_info(running_db_nodes). + {_, _, RunningNodes} = read_cluster_nodes_status(), + RunningNodes. + +running_clustered_disc_nodes() -> + {_, DiscNodes, RunningNodes} = read_cluster_nodes_status(), + sets:to_list(sets:intersection(sets:from_list(DiscNodes), + sets:from_list(RunningNodes))). + +%% This function is a bit different, we want it to return correctly only when +%% the node is actually online. This is because when we discover the nodes we +%% want online, "working" nodes only. +cluster_status_if_running() -> + case mnesia:system_info(is_running) of + no -> error; + yes -> {ok, {mnesia:system_info(db_nodes), + mnesia:table_info(schema, disc_copies), + mnesia:system_info(running_db_nodes)}} + end. + +node_info() -> + {erlang:system_info(otp_release), rabbit_misc:rabbit_version(), + cluster_status_if_running()}. + +is_disc_node() -> mnesia:system_info(use_dir). + +dir() -> mnesia:system_info(directory). + +table_names() -> + [Tab || {Tab, _} <- table_definitions()]. + +%%---------------------------------------------------------------------------- +%% Operations on the db +%%---------------------------------------------------------------------------- + +init_db(ClusterNodes, WantDiscNode, Force) -> + init_db(ClusterNodes, WantDiscNode, Force, true). + +%% Take a cluster node config and create the right kind of node - a +%% standalone disk node, or disk or ram node connected to the +%% specified cluster nodes. If Force is false, don't allow +%% connections to offline nodes. +init_db(ClusterNodes, WantDiscNode, Force, Upgrade) -> + UClusterNodes = lists:usort(ClusterNodes), + ProperClusterNodes = UClusterNodes -- [node()], + case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of + {ok, []} when not Force andalso ProperClusterNodes =/= [] -> + throw({error, {failed_to_cluster_with, ProperClusterNodes, + "Mnesia could not connect to any disc nodes."}}); + {ok, Nodes} -> + WasDiscNode = is_disc_node(), + %% We create a new db (on disk, or in ram) in the first + %% two cases and attempt to upgrade the in the other two + case {Nodes, WasDiscNode, WantDiscNode} of + {[], _, false} -> + %% New ram node; start from scratch + ok = create_schema(ram); + {[], false, true} -> + %% Nothing there at all, start from scratch + ok = create_schema(disc); + {[], true, true} -> + %% We're the first node up + case rabbit_upgrade:maybe_upgrade_local() of + ok -> ensure_schema_integrity(); + version_not_available -> ok = schema_ok_or_move() + end; + {[AnotherNode|_], _, _} -> + %% Subsequent node in cluster, catch up + ensure_version_ok( + rpc:call(AnotherNode, rabbit_version, recorded, [])), + ok = wait_for_replicated_tables(), + + %% The sequence in which we delete the schema and then the + %% other tables is important: if we delete the schema first + %% when moving to RAM mnesia will loudly complain since it + %% doesn't make much sense to do that. But when moving to + %% disc, we need to move the schema first. + case WantDiscNode of + true -> create_local_table_copy(schema, disc_copies), + create_local_table_copies(disc); + false -> create_local_table_copies(ram), + create_local_table_copy(schema, ram_copies) + end, + + %% Write the status now that mnesia is running and clustered + update_cluster_nodes_status(), + + ok = case Upgrade of + true -> + case rabbit_upgrade:maybe_upgrade_local() of + ok -> + ok; + %% If we're just starting up a new node we + %% won't have a version + starting_from_scratch -> + rabbit_version:record_desired() + end; + false -> + ok + end, + + %% We've taken down mnesia, so ram nodes will need + %% to re-sync + case is_disc_node() of + false -> start_mnesia(), + mnesia:change_config(extra_db_nodes, + ProperClusterNodes), + wait_for_replicated_tables(); + true -> ok + end, + + ensure_schema_integrity(), + ok + end; + {error, Reason} -> + %% one reason we may end up here is if we try to join + %% nodes together that are currently running standalone or + %% are members of a different cluster + throw({error, {unable_to_join_cluster, ClusterNodes, Reason}}) + end. + +ensure_mnesia_dir() -> + MnesiaDir = dir() ++ "/", + case filelib:ensure_dir(MnesiaDir) of + {error, Reason} -> + throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}}); + ok -> + ok + end. + +ensure_mnesia_running() -> + case mnesia:system_info(is_running) of + yes -> + ok; + starting -> + wait_for(mnesia_running), + ensure_mnesia_running(); + Reason when Reason =:= no; Reason =:= stopping -> + throw({error, mnesia_not_running}) + end. + +ensure_mnesia_not_running() -> + case mnesia:system_info(is_running) of + no -> + ok; + stopping -> + wait_for(mnesia_not_running), + ensure_mnesia_not_running(); + Reason when Reason =:= yes; Reason =:= starting -> + throw({error, mnesia_unexpectedly_running}) + end. + +ensure_schema_integrity() -> + case check_schema_integrity() of + ok -> + ok; + {error, Reason} -> + throw({error, {schema_integrity_check_failed, Reason}}) + end. + +check_schema_integrity() -> + Tables = mnesia:system_info(tables), + case check_tables(fun (Tab, TabDef) -> + case lists:member(Tab, Tables) of + false -> {error, {table_missing, Tab}}; + true -> check_table_attributes(Tab, TabDef) + end + end) of + ok -> ok = wait_for_tables(), + check_tables(fun check_table_content/2); + Other -> Other + end. empty_ram_only_tables() -> Node = node(), @@ -209,13 +545,188 @@ empty_ram_only_tables() -> end, table_names()), ok. +create_tables() -> create_tables(disc). + +create_tables(Type) -> + lists:foreach(fun ({Tab, TabDef}) -> + TabDef1 = proplists:delete(match, TabDef), + case mnesia:create_table(Tab, TabDef1) of + {atomic, ok} -> ok; + {aborted, Reason} -> + throw({error, {table_creation_failed, + Tab, TabDef1, Reason}}) + end + end, + table_definitions(Type)), + ok. + +copy_db(Destination) -> + ok = ensure_mnesia_not_running(), + rabbit_file:recursive_copy(dir(), Destination). + +wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). + +wait_for_tables() -> wait_for_tables(table_names()). + +wait_for_tables(TableNames) -> + case mnesia:wait_for_tables(TableNames, 30000) of + ok -> + ok; + {timeout, BadTabs} -> + throw({error, {timeout_waiting_for_tables, BadTabs}}); + {error, Reason} -> + throw({error, {failed_waiting_for_tables, Reason}}) + end. + +should_be_disc_node(DiscNodes) -> + DiscNodes == [] orelse lists:member(node(), DiscNodes). + +%% This does not guarantee us much, but it avoids some situations that will +%% definitely end up badly +check_cluster_consistency() -> + CheckVsn = fun (This, This, _) -> + ok; + (This, Remote, Name) -> + throw({error, + {inconsistent_cluster, + rabbit_misc:format( + "~s version mismatch: local node is ~s, " + "remote node ~s", [Name, This, Remote])}}) + end, + CheckOTP = + fun (OTP) -> CheckVsn(erlang:system_info(otp_release), OTP, "OTP") end, + CheckRabbit = + fun (Rabbit) -> + CheckVsn(rabbit_misc:rabbit_version(), Rabbit, "Rabbit") + end, + + CheckNodes = fun (Node, AllNodes) -> + ThisNode = node(), + case lists:member(ThisNode, AllNodes) of + true -> + ok; + false -> + throw({error, + {inconsistent_cluster, + rabbit_misc:format( + "Node ~p thinks it's clustered " + "with node ~p, but ~p disagrees", + [ThisNode, Node, Node])}}) + end + end, + + lists:foreach( + fun(Node) -> + case rpc:call(Node, rabbit_mnesia, node_info, []) of + {badrpc, _Reason} -> + ok; + {OTP, Rabbit, error} -> + CheckOTP(OTP), + CheckRabbit(Rabbit); + {OTP, Rabbit, {ok, {AllNodes, _, _}}} -> + CheckOTP(OTP), + CheckRabbit(Rabbit), + CheckNodes(Node, AllNodes) + end + end, all_clustered_nodes()). + +%%---------------------------------------------------------------------------- +%% Cluster status file functions +%%---------------------------------------------------------------------------- + +%% The cluster node status file contains all we need to know about the cluster: +%% +%% * All the clustered nodes +%% * The disc nodes +%% * The running nodes. +%% +%% If the current node is a disc node it will be included in the disc nodes +%% list. +%% +%% We strive to keep the file up to date and we rely on this assumption in +%% various situations. Obviously when mnesia is offline the information we have +%% will be outdated, but it can't be otherwise. + +cluster_nodes_status_filename() -> + dir() ++ "/cluster_nodes.config". + +initial_cluster_status() -> + {[node()], [node()], [node()]}. + +write_cluster_nodes_status(Status) -> + FileName = cluster_nodes_status_filename(), + case rabbit_file:write_term_file(FileName, [Status]) of + ok -> ok; + {error, Reason} -> + throw({error, {cannot_write_cluster_nodes_status, + FileName, Reason}}) + end. + +try_read_cluster_nodes_status() -> + FileName = cluster_nodes_status_filename(), + case rabbit_file:read_term_file(FileName) of + {ok, [{_, _, _} = Status]} -> + {ok, Status}; + {ok, Term} -> + {error, {invalid_term, FileName, Term}}; + {error, Reason} -> + {error, {cannot_read_file, FileName, Reason}} + end. + +read_cluster_nodes_status() -> + case try_read_cluster_nodes_status() of + {ok, Status} -> + Status; + {error, Reason} -> + throw({error, {cannot_read_cluster_nodes_status, Reason}}) + end. + +%% To update the cluster status when mnesia is running. +update_cluster_nodes_status() -> + {ok, Status} = cluster_status_if_running(), + write_cluster_nodes_status(Status). + +%%-------------------------------------------------------------------- +%% Hooks for `rabbit_node_monitor' +%%-------------------------------------------------------------------- + +on_node_up(Node) -> + update_cluster_nodes_status(), + case is_only_disc_node(Node) of + true -> rabbit_log:info("cluster contains disc nodes again~n"); + false -> ok + end. + +on_node_down(Node) -> + update_cluster_nodes_status(), + case is_only_disc_node(Node) of + true -> rabbit_log:info("only running disc node went down~n"); + false -> ok + end. + +%%-------------------------------------------------------------------- +%% Internal helpers %%-------------------------------------------------------------------- -nodes_of_type(Type) -> - %% This function should return the nodes of a certain type (ram, - %% disc or disc_only) in the current cluster. The type of nodes - %% is determined when the cluster is initially configured. - mnesia:table_info(schema, Type). +discover_cluster(Nodes) when is_list(Nodes) -> + lists:foldl(fun (_, {ok, Res}) -> {ok, Res}; + (Node, {error, _}) -> discover_cluster(Node) + end, + {error, {cannot_discover_cluster, + "The nodes provided is either offline or not running"}}, + Nodes); +discover_cluster(Node) -> + case Node =:= node() of + true -> + {error, {cannot_discover_cluster, + "You provided the current node as node to cluster with"}}; + false -> + case rpc:call(Node, rabbit_mnesia, cluster_status_if_running, []) of + {badrpc, _Reason} -> discover_cluster([]); + error -> discover_cluster([]); + {ok, Res} -> {ok, Res} + end + end. %% The tables aren't supposed to be on disk on a ram node table_definitions(disc) -> @@ -336,68 +847,11 @@ queue_name_match() -> resource_match(Kind) -> #resource{kind = Kind, _='_'}. -table_names() -> - [Tab || {Tab, _} <- table_definitions()]. - replicated_table_names() -> [Tab || {Tab, TabDef} <- table_definitions(), not lists:member({local_content, true}, TabDef) ]. -dir() -> mnesia:system_info(directory). - -ensure_mnesia_dir() -> - MnesiaDir = dir() ++ "/", - case filelib:ensure_dir(MnesiaDir) of - {error, Reason} -> - throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}}); - ok -> - ok - end. - -ensure_mnesia_running() -> - case mnesia:system_info(is_running) of - yes -> - ok; - starting -> - wait_for(mnesia_running), - ensure_mnesia_running(); - Reason when Reason =:= no; Reason =:= stopping -> - throw({error, mnesia_not_running}) - end. - -ensure_mnesia_not_running() -> - case mnesia:system_info(is_running) of - no -> - ok; - stopping -> - wait_for(mnesia_not_running), - ensure_mnesia_not_running(); - Reason when Reason =:= yes; Reason =:= starting -> - throw({error, mnesia_unexpectedly_running}) - end. - -ensure_schema_integrity() -> - case check_schema_integrity() of - ok -> - ok; - {error, Reason} -> - throw({error, {schema_integrity_check_failed, Reason}}) - end. - -check_schema_integrity() -> - Tables = mnesia:system_info(tables), - case check_tables(fun (Tab, TabDef) -> - case lists:member(Tab, Tables) of - false -> {error, {table_missing, Tab}}; - true -> check_table_attributes(Tab, TabDef) - end - end) of - ok -> ok = wait_for_tables(), - check_tables(fun check_table_content/2); - Other -> Other - end. - check_table_attributes(Tab, TabDef) -> {_, ExpAttrs} = proplists:lookup(attributes, TabDef), case mnesia:table_info(Tab, attributes) of @@ -433,153 +887,6 @@ check_tables(Fun) -> Errors -> {error, Errors} end. -%% The cluster node config file contains some or all of the disk nodes -%% that are members of the cluster this node is / should be a part of. -%% -%% If the file is absent, the list is empty, or only contains the -%% current node, then the current node is a standalone (disk) -%% node. Otherwise it is a node that is part of a cluster as either a -%% disk node, if it appears in the cluster node config, or ram node if -%% it doesn't. - -cluster_nodes_config_filename() -> - dir() ++ "/cluster_nodes.config". - -create_cluster_nodes_config(ClusterNodes) -> - FileName = cluster_nodes_config_filename(), - case rabbit_file:write_term_file(FileName, [ClusterNodes]) of - ok -> ok; - {error, Reason} -> - throw({error, {cannot_create_cluster_nodes_config, - FileName, Reason}}) - end. - -read_cluster_nodes_config() -> - FileName = cluster_nodes_config_filename(), - case rabbit_file:read_term_file(FileName) of - {ok, [ClusterNodes]} -> ClusterNodes; - {error, enoent} -> - {ok, ClusterNodes} = application:get_env(rabbit, cluster_nodes), - ClusterNodes; - {error, Reason} -> - throw({error, {cannot_read_cluster_nodes_config, - FileName, Reason}}) - end. - -delete_cluster_nodes_config() -> - FileName = cluster_nodes_config_filename(), - case file:delete(FileName) of - ok -> ok; - {error, enoent} -> ok; - {error, Reason} -> - throw({error, {cannot_delete_cluster_nodes_config, - FileName, Reason}}) - end. - -running_nodes_filename() -> - filename:join(dir(), "nodes_running_at_shutdown"). - -record_running_nodes() -> - FileName = running_nodes_filename(), - Nodes = running_clustered_nodes() -- [node()], - %% Don't check the result: we're shutting down anyway and this is - %% a best-effort-basis. - rabbit_file:write_term_file(FileName, [Nodes]), - ok. - -read_previously_running_nodes() -> - FileName = running_nodes_filename(), - case rabbit_file:read_term_file(FileName) of - {ok, [Nodes]} -> Nodes; - {error, enoent} -> []; - {error, Reason} -> throw({error, {cannot_read_previous_nodes_file, - FileName, Reason}}) - end. - -delete_previously_running_nodes() -> - FileName = running_nodes_filename(), - case file:delete(FileName) of - ok -> ok; - {error, enoent} -> ok; - {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file, - FileName, Reason}}) - end. - -init_db(ClusterNodes, Force) -> - init_db( - ClusterNodes, Force, - fun () -> - case rabbit_upgrade:maybe_upgrade_local() of - ok -> ok; - %% If we're just starting up a new node we won't have a - %% version - starting_from_scratch -> ok = rabbit_version:record_desired() - end - end). - -%% Take a cluster node config and create the right kind of node - a -%% standalone disk node, or disk or ram node connected to the -%% specified cluster nodes. If Force is false, don't allow -%% connections to offline nodes. -init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) -> - UClusterNodes = lists:usort(ClusterNodes), - ProperClusterNodes = UClusterNodes -- [node()], - case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of - {ok, []} when not Force andalso ProperClusterNodes =/= [] -> - throw({error, {failed_to_cluster_with, ProperClusterNodes, - "Mnesia could not connect to any disc nodes."}}); - {ok, Nodes} -> - WasDiscNode = is_disc_node(), - WantDiscNode = should_be_disc_node(ClusterNodes), - %% We create a new db (on disk, or in ram) in the first - %% two cases and attempt to upgrade the in the other two - case {Nodes, WasDiscNode, WantDiscNode} of - {[], _, false} -> - %% New ram node; start from scratch - ok = create_schema(ram); - {[], false, true} -> - %% Nothing there at all, start from scratch - ok = create_schema(disc); - {[], true, true} -> - %% We're the first node up - case rabbit_upgrade:maybe_upgrade_local() of - ok -> ensure_schema_integrity(); - version_not_available -> ok = schema_ok_or_move() - end; - {[AnotherNode|_], _, _} -> - %% Subsequent node in cluster, catch up - ensure_version_ok( - rpc:call(AnotherNode, rabbit_version, recorded, [])), - {CopyType, CopyTypeAlt} = - case WantDiscNode of - true -> {disc, disc_copies}; - false -> {ram, ram_copies} - end, - ok = wait_for_replicated_tables(), - ok = create_local_table_copy(schema, CopyTypeAlt), - ok = create_local_table_copies(CopyType), - - ok = SecondaryPostMnesiaFun(), - %% We've taken down mnesia, so ram nodes will need - %% to re-sync - case is_disc_node() of - false -> start_mnesia(), - mnesia:change_config(extra_db_nodes, - ProperClusterNodes), - wait_for_replicated_tables(); - true -> ok - end, - - ensure_schema_integrity(), - ok - end; - {error, Reason} -> - %% one reason we may end up here is if we try to join - %% nodes together that are currently running standalone or - %% are members of a different cluster - throw({error, {unable_to_join_cluster, ClusterNodes, Reason}}) - end. - schema_ok_or_move() -> case check_schema_integrity() of ok -> @@ -618,11 +925,6 @@ create_schema(Type) -> ensure_schema_integrity(), ok = rabbit_version:record_desired(). -is_disc_node() -> mnesia:system_info(use_dir). - -should_be_disc_node(ClusterNodes) -> - ClusterNodes == [] orelse lists:member(node(), ClusterNodes). - move_db() -> stop_mnesia(), MnesiaDir = filename:dirname(dir() ++ "/"), @@ -644,25 +946,6 @@ move_db() -> start_mnesia(), ok. -copy_db(Destination) -> - ok = ensure_mnesia_not_running(), - rabbit_file:recursive_copy(dir(), Destination). - -create_tables() -> create_tables(disc). - -create_tables(Type) -> - lists:foreach(fun ({Tab, TabDef}) -> - TabDef1 = proplists:delete(match, TabDef), - case mnesia:create_table(Tab, TabDef1) of - {atomic, ok} -> ok; - {aborted, Reason} -> - throw({error, {table_creation_failed, - Tab, TabDef1, Reason}}) - end - end, - table_definitions(Type)), - ok. - copy_type_to_ram(TabDef) -> [{disc_copies, []}, {ram_copies, [node()]} | proplists:delete(ram_copies, proplists:delete(disc_copies, TabDef))]. @@ -711,114 +994,41 @@ create_local_table_copy(Tab, Type) -> end, ok. -wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). - -wait_for_tables() -> wait_for_tables(table_names()). - -wait_for_tables(TableNames) -> - case mnesia:wait_for_tables(TableNames, 30000) of - ok -> - ok; - {timeout, BadTabs} -> - throw({error, {timeout_waiting_for_tables, BadTabs}}); - {error, Reason} -> - throw({error, {failed_waiting_for_tables, Reason}}) - end. - -reset(Force) -> - rabbit_misc:local_info_msg("Resetting Rabbit~s~n", [if Force -> " forcefully"; - true -> "" - end]), - ensure_mnesia_not_running(), - case not Force andalso is_clustered() andalso - is_only_disc_node(node(), false) - of - true -> log_both("no other disc nodes running"); - false -> ok - end, - Node = node(), - Nodes = all_clustered_nodes() -- [Node], - case Force of - true -> ok; - false -> - ensure_mnesia_dir(), - start_mnesia(), - RunningNodes = - try - %% Force=true here so that reset still works when clustered - %% with a node which is down - ok = init_db(read_cluster_nodes_config(), true), - running_clustered_nodes() -- [Node] - after - stop_mnesia() - end, - leave_cluster(Nodes, RunningNodes), - rabbit_misc:ensure_ok(mnesia:delete_schema([Node]), - cannot_delete_schema) - end, - %% We need to make sure that we don't end up in a distributed - %% Erlang system with nodes while not being in an Mnesia cluster - %% with them. We don't handle that well. - [erlang:disconnect_node(N) || N <- Nodes], - ok = delete_cluster_nodes_config(), - %% remove persisted messages and any other garbage we find - ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")), - ok. - leave_cluster([], _) -> ok; -leave_cluster(Nodes, RunningNodes) -> - %% find at least one running cluster node and instruct it to - %% remove our schema copy which will in turn result in our node - %% being removed as a cluster node from the schema, with that - %% change being propagated to all nodes - case lists:any( - fun (Node) -> - case rpc:call(Node, mnesia, del_table_copy, - [schema, node()]) of - {atomic, ok} -> true; - {badrpc, nodedown} -> false; - {aborted, {node_not_running, _}} -> false; - {aborted, Reason} -> - throw({error, {failed_to_leave_cluster, - Nodes, RunningNodes, Reason}}) - end - end, - RunningNodes) of - true -> ok; - false -> throw({error, {no_running_cluster_nodes, - Nodes, RunningNodes}}) +leave_cluster(Nodes, RunningNodes0) -> + case RunningNodes0 -- [node()] of + [] -> ok; + RunningNodes -> + %% find at least one running cluster node and instruct it to remove + %% our schema copy which will in turn result in our node being + %% removed as a cluster node from the schema, with that change being + %% propagated to all nodes + case lists:any( + fun (Node) -> + case rpc:call(Node, mnesia, del_table_copy, + [schema, node()]) of + {atomic, ok} -> true; + {badrpc, nodedown} -> false; + {aborted, {node_not_running, _}} -> false; + {aborted, Reason} -> + throw({error, {failed_to_leave_cluster, + Nodes, RunningNodes, Reason}}) + end + end, + RunningNodes) of + true -> ok; + false -> throw({error, {no_running_cluster_nodes, + Nodes, RunningNodes}}) + end end. wait_for(Condition) -> error_logger:info_msg("Waiting for ~p...~n", [Condition]), timer:sleep(1000). -on_node_up(Node) -> - case is_only_disc_node(Node, true) of - true -> rabbit_log:info("cluster contains disc nodes again~n"); - false -> ok - end. - -on_node_down(Node) -> - case is_only_disc_node(Node, true) of - true -> rabbit_log:info("only running disc node went down~n"); - false -> ok - end. - -is_only_disc_node(Node, _MnesiaRunning = true) -> - RunningSet = sets:from_list(running_clustered_nodes()), - DiscSet = sets:from_list(nodes_of_type(disc_copies)), - [Node] =:= sets:to_list(sets:intersection(RunningSet, DiscSet)); -is_only_disc_node(Node, false) -> - start_mnesia(), - Res = is_only_disc_node(Node, true), - stop_mnesia(), - Res. - -log_both(Warning) -> - io:format("Warning: ~s~n", [Warning]), - rabbit_misc:with_local_io( - fun () -> error_logger:warning_msg("~s~n", [Warning]) end). +is_only_disc_node(Node) -> + Nodes = running_clustered_disc_nodes(), + [Node] =:= Nodes. start_mnesia() -> rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), @@ -827,3 +1037,37 @@ start_mnesia() -> stop_mnesia() -> stopped = mnesia:stop(), ensure_mnesia_not_running(). + +start_and_init_db(ClusterNodes, WantDiscNode, Force) -> + mnesia:start(), + try + ok = init_db(ClusterNodes, WantDiscNode, Force) + after + mnesia:stop() + end, + ok. + +%%-------------------------------------------------------------------- +%% Legacy functions related to the "running nodes" file +%%-------------------------------------------------------------------- + +legacy_running_nodes_filename() -> + filename:join(dir(), "nodes_running_at_shutdown"). + +legacy_read_previously_running_nodes() -> + FileName = legacy_running_nodes_filename(), + case rabbit_file:read_term_file(FileName) of + {ok, [Nodes]} -> Nodes; + {error, enoent} -> []; + {error, Reason} -> throw({error, {cannot_read_previous_nodes_file, + FileName, Reason}}) + end. + +legacy_delete_previously_running_nodes() -> + FileName = legacy_running_nodes_filename(), + case file:delete(FileName) of + ok -> ok; + {error, enoent} -> ok; + {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file, + FileName, Reason}}) + end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 2760ef2d..dc5e6ef7 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -31,6 +31,7 @@ -define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>). all_tests() -> + ok = setup_cluster(), passed = gm_tests:all_tests(), passed = mirrored_supervisor_tests:all_tests(), application:set_env(rabbit, file_handles_high_watermark, 10, infinity), @@ -51,36 +52,51 @@ all_tests() -> passed = test_log_management_during_startup(), passed = test_statistics(), passed = test_arguments_parser(), - passed = test_cluster_management(), passed = test_user_management(), passed = test_runtime_parameters(), passed = test_server_status(), passed = test_confirms(), - passed = maybe_run_cluster_dependent_tests(), + passed = + do_if_secondary_node( + fun run_cluster_dependent_tests/1, + fun (SecondaryNode) -> + io:format("Skipping cluster dependent tests with node ~p~n", + [SecondaryNode]), + passed + end), passed = test_configurable_server_properties(), passed. -maybe_run_cluster_dependent_tests() -> +do_if_secondary_node(Up, Down) -> SecondaryNode = rabbit_nodes:make("hare"), case net_adm:ping(SecondaryNode) of - pong -> passed = run_cluster_dependent_tests(SecondaryNode); - pang -> io:format("Skipping cluster dependent tests with node ~p~n", - [SecondaryNode]) - end, - passed. + pong -> Up(SecondaryNode); + pang -> Down(SecondaryNode) + end. -run_cluster_dependent_tests(SecondaryNode) -> - SecondaryNodeS = atom_to_list(SecondaryNode), +setup_cluster() -> + do_if_secondary_node( + fun (SecondaryNode) -> + ok = control_action(stop_app, []), + ok = control_action(join_cluster, + [atom_to_list(SecondaryNode)]), + ok = control_action(start_app, []), + ok = control_action(start_app, SecondaryNode, [], []) + end, + fun (_) -> ok end). - cover:stop(SecondaryNode), - ok = control_action(stop_app, []), - ok = control_action(reset, []), - ok = control_action(cluster, [SecondaryNodeS]), - ok = control_action(start_app, []), - cover:start(SecondaryNode), - ok = control_action(start_app, SecondaryNode, [], []), +maybe_run_cluster_dependent_tests() -> + do_if_secondary_node( + fun (SecondaryNode) -> + passed = run_cluster_dependent_tests(SecondaryNode) + end, + fun (SecondaryNode) -> + io:format("Skipping cluster dependent tests with node ~p~n", + [SecondaryNode]) + end). +run_cluster_dependent_tests(SecondaryNode) -> io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]), passed = test_delegates_async(SecondaryNode), passed = test_delegates_sync(SecondaryNode), @@ -855,200 +871,6 @@ test_arguments_parser() -> passed. -test_cluster_management() -> - %% 'cluster' and 'reset' should only work if the app is stopped - {error, _} = control_action(cluster, []), - {error, _} = control_action(reset, []), - {error, _} = control_action(force_reset, []), - - ok = control_action(stop_app, []), - - %% various ways of creating a standalone node - NodeS = atom_to_list(node()), - ClusteringSequence = [[], - [NodeS], - ["invalid@invalid", NodeS], - [NodeS, "invalid@invalid"]], - - ok = control_action(reset, []), - lists:foreach(fun (Arg) -> - ok = control_action(force_cluster, Arg), - ok - end, - ClusteringSequence), - lists:foreach(fun (Arg) -> - ok = control_action(reset, []), - ok = control_action(force_cluster, Arg), - ok - end, - ClusteringSequence), - ok = control_action(reset, []), - lists:foreach(fun (Arg) -> - ok = control_action(force_cluster, Arg), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok - end, - ClusteringSequence), - lists:foreach(fun (Arg) -> - ok = control_action(reset, []), - ok = control_action(force_cluster, Arg), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok - end, - ClusteringSequence), - - %% convert a disk node into a ram node - ok = control_action(reset, []), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_disc_node(), - ok = control_action(force_cluster, ["invalid1@invalid", - "invalid2@invalid"]), - ok = assert_ram_node(), - - %% join a non-existing cluster as a ram node - ok = control_action(reset, []), - ok = control_action(force_cluster, ["invalid1@invalid", - "invalid2@invalid"]), - ok = assert_ram_node(), - - ok = control_action(reset, []), - - SecondaryNode = rabbit_nodes:make("hare"), - case net_adm:ping(SecondaryNode) of - pong -> passed = test_cluster_management2(SecondaryNode); - pang -> io:format("Skipping clustering tests with node ~p~n", - [SecondaryNode]) - end, - - ok = control_action(start_app, []), - passed. - -test_cluster_management2(SecondaryNode) -> - NodeS = atom_to_list(node()), - SecondaryNodeS = atom_to_list(SecondaryNode), - - %% make a disk node - ok = control_action(cluster, [NodeS]), - ok = assert_disc_node(), - %% make a ram node - ok = control_action(reset, []), - ok = control_action(cluster, [SecondaryNodeS]), - ok = assert_ram_node(), - - %% join cluster as a ram node - ok = control_action(reset, []), - ok = control_action(force_cluster, [SecondaryNodeS, "invalid1@invalid"]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_ram_node(), - - %% ram node will not start by itself - ok = control_action(stop_app, []), - ok = control_action(stop_app, SecondaryNode, [], []), - {error, _} = control_action(start_app, []), - ok = control_action(start_app, SecondaryNode, [], []), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - - %% change cluster config while remaining in same cluster - ok = control_action(force_cluster, ["invalid2@invalid", SecondaryNodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - - %% join non-existing cluster as a ram node - ok = control_action(force_cluster, ["invalid1@invalid", - "invalid2@invalid"]), - {error, _} = control_action(start_app, []), - ok = assert_ram_node(), - - %% join empty cluster as a ram node (converts to disc) - ok = control_action(cluster, []), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_disc_node(), - - %% make a new ram node - ok = control_action(reset, []), - ok = control_action(force_cluster, [SecondaryNodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_ram_node(), - - %% turn ram node into disk node - ok = control_action(cluster, [SecondaryNodeS, NodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_disc_node(), - - %% convert a disk node into a ram node - ok = assert_disc_node(), - ok = control_action(force_cluster, ["invalid1@invalid", - "invalid2@invalid"]), - ok = assert_ram_node(), - - %% make a new disk node - ok = control_action(force_reset, []), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_disc_node(), - - %% turn a disk node into a ram node - ok = control_action(reset, []), - ok = control_action(cluster, [SecondaryNodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_ram_node(), - - %% NB: this will log an inconsistent_database error, which is harmless - %% Turning cover on / off is OK even if we're not in general using cover, - %% it just turns the engine on / off, doesn't actually log anything. - cover:stop([SecondaryNode]), - true = disconnect_node(SecondaryNode), - pong = net_adm:ping(SecondaryNode), - cover:start([SecondaryNode]), - - %% leaving a cluster as a ram node - ok = control_action(reset, []), - %% ...and as a disk node - ok = control_action(cluster, [SecondaryNodeS, NodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - cover:stop(SecondaryNode), - ok = control_action(reset, []), - cover:start(SecondaryNode), - - %% attempt to leave cluster when no other node is alive - ok = control_action(cluster, [SecondaryNodeS, NodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, SecondaryNode, [], []), - ok = control_action(stop_app, []), - {error, {no_running_cluster_nodes, _, _}} = - control_action(reset, []), - - %% attempt to change type when no other node is alive - {error, {no_running_cluster_nodes, _, _}} = - control_action(cluster, [SecondaryNodeS]), - - %% leave system clustered, with the secondary node as a ram node - ok = control_action(force_reset, []), - ok = control_action(start_app, []), - %% Yes, this is rather ugly. But since we're a clustered Mnesia - %% node and we're telling another clustered node to reset itself, - %% we will get disconnected half way through causing a - %% badrpc. This never happens in real life since rabbitmqctl is - %% not a clustered Mnesia node. - cover:stop(SecondaryNode), - {badrpc, nodedown} = control_action(force_reset, SecondaryNode, [], []), - pong = net_adm:ping(SecondaryNode), - cover:start(SecondaryNode), - ok = control_action(cluster, SecondaryNode, [NodeS], []), - ok = control_action(start_app, SecondaryNode, [], []), - - passed. - test_user_management() -> %% lots if stuff that should fail diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index e1a7bcae..85bcff25 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -121,10 +121,7 @@ remove_backup() -> info("upgrades: Mnesia backup removed~n", []). maybe_upgrade_mnesia() -> - %% rabbit_mnesia:all_clustered_nodes/0 will return [] at this point - %% if we are a RAM node since Mnesia has not started yet. - AllNodes = lists:usort(rabbit_mnesia:all_clustered_nodes() ++ - rabbit_mnesia:read_cluster_nodes_config()), + AllNodes = rabbit_mnesia:all_clustered_nodes(), case rabbit_version:upgrades_required(mnesia) of {error, starting_from_scratch} -> ok; @@ -150,19 +147,17 @@ maybe_upgrade_mnesia() -> upgrade_mode(AllNodes) -> case nodes_running(AllNodes) of [] -> - AfterUs = rabbit_mnesia:read_previously_running_nodes(), + AfterUs = rabbit_mnesia:running_clustered_nodes() -- [node()], case {is_disc_node_legacy(), AfterUs} of {true, []} -> primary; {true, _} -> - Filename = rabbit_mnesia:running_nodes_filename(), + %% TODO: Here I'm assuming that the various cluster status + %% files are consistent with each other, I think I can + %% provide a solution if they're not... die("Cluster upgrade needed but other disc nodes shut " "down after this one.~nPlease first start the last " - "disc node to shut down.~n~nNote: if several disc " - "nodes were shut down simultaneously they may " - "all~nshow this message. In which case, remove " - "the lock file on one of them and~nstart that node. " - "The lock file on this node is:~n~n ~s ", [Filename]); + "disc node to shut down.~n", []); {false, _} -> die("Cluster upgrade needed but this is a ram node.~n" "Please first start the last disc node to shut down.", @@ -222,15 +217,8 @@ secondary_upgrade(AllNodes) -> IsDiscNode = is_disc_node_legacy(), rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), cannot_delete_schema), - %% Note that we cluster with all nodes, rather than all disc nodes - %% (as we can't know all disc nodes at this point). This is safe as - %% we're not writing the cluster config, just setting up Mnesia. - ClusterNodes = case IsDiscNode of - true -> AllNodes; - false -> AllNodes -- [node()] - end, rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - ok = rabbit_mnesia:init_db(ClusterNodes, true, fun () -> ok end), + ok = rabbit_mnesia:init_db(AllNodes, IsDiscNode, true, false), ok = rabbit_version:record_desired_for_scope(mnesia), ok. |