summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-05-30 12:07:13 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-05-30 12:07:13 +0100
commit386a9a7920e5b6f1967b38219b76f44ff22dbde3 (patch)
treec63538c4334f66be7993aaae54da0a8849921ee5 /src
parent6db187ded843dd14bf7ce51d046ee8ecc2c4dc72 (diff)
parent9c2c1ed3703860eb55836eac2ee3d28881b570b5 (diff)
downloadrabbitmq-server-386a9a7920e5b6f1967b38219b76f44ff22dbde3.tar.gz
merge default
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl10
-rw-r--r--src/rabbit_control_main.erl41
-rw-r--r--src/rabbit_misc.erl6
-rw-r--r--src/rabbit_mnesia.erl1142
-rw-r--r--src/rabbit_tests.erl244
-rw-r--r--src/rabbit_upgrade.erl26
6 files changed, 773 insertions, 696 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index f69c8d1b..3f00f000 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -295,6 +295,8 @@ prepare() ->
ok -> ok;
{error, {already_loaded, rabbit}} -> ok
end,
+ ok = rabbit_mnesia:ensure_mnesia_dir(),
+ ok = rabbit_mnesia:prepare(),
ok = ensure_working_log_handlers(),
ok = rabbit_upgrade:maybe_upgrade_mnesia().
@@ -405,7 +407,7 @@ start(normal, []) ->
end.
stop(_State) ->
- ok = rabbit_mnesia:record_running_nodes(),
+ ok = rabbit_mnesia:update_cluster_nodes_status(),
terminated_ok = error_logger:delete_report_handler(rabbit_error_logger),
ok = rabbit_alarm:stop(),
ok = case rabbit_mnesia:is_clustered() of
@@ -502,12 +504,12 @@ sort_boot_steps(UnsortedSteps) ->
end.
boot_step_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
+ AllNodes = rabbit_mnesia:all_clustered_nodes(),
{Err, Nodes} =
- case rabbit_mnesia:read_previously_running_nodes() of
+ case AllNodes -- [node()] of
[] -> {"Timeout contacting cluster nodes. Since RabbitMQ was"
" shut down forcefully~nit cannot determine which nodes"
- " are timing out. Details on all nodes will~nfollow.~n",
- rabbit_mnesia:all_clustered_nodes() -- [node()]};
+ " are timing out.~n", []};
Ns -> {rabbit_misc:format(
"Timeout contacting cluster nodes: ~p.~n", [Ns]),
Ns}
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index f8b8c345..422458a9 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -25,10 +25,12 @@
-define(QUIET_OPT, "-q").
-define(NODE_OPT, "-n").
-define(VHOST_OPT, "-p").
+-define(RAM_OPT, "--ram").
-define(QUIET_DEF, {?QUIET_OPT, flag}).
-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}).
-define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}).
+-define(RAM_DEF, {?RAM_OPT, flag}).
-define(GLOBAL_DEFS(Node), [?QUIET_DEF, ?NODE_DEF(Node)]).
@@ -41,8 +43,10 @@
force_reset,
rotate_logs,
- cluster,
- force_cluster,
+ {join_cluster, [?RAM_DEF]},
+ change_node_type,
+ recluster,
+ remove_node,
cluster_status,
add_user,
@@ -234,17 +238,28 @@ action(force_reset, Node, [], _Opts, Inform) ->
Inform("Forcefully resetting node ~p", [Node]),
call(Node, {rabbit_mnesia, force_reset, []});
-action(cluster, Node, ClusterNodeSs, _Opts, Inform) ->
- ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
- Inform("Clustering node ~p with ~p",
- [Node, ClusterNodes]),
- rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]);
-
-action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) ->
- ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
- Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)",
- [Node, ClusterNodes]),
- rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]);
+action(join_cluster, Node, [ClusterNodeS], Opts, Inform) ->
+ ClusterNode = list_to_atom(ClusterNodeS),
+ DiscNode = not proplists:get_bool(?RAM_OPT, Opts),
+ Inform("Clustering node ~p with ~p", [Node, ClusterNode]),
+ rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, DiscNode]);
+
+action(change_node_type, Node, ["ram"], _Opts, Inform) ->
+ Inform("Turning ~p into a ram node", [Node]),
+ rpc_call(Node, rabbit_mnesia, change_node_type, [ram]);
+action(change_node_type, Node, ["disc"], _Opts, Inform) ->
+ Inform("Turning ~p into a disc node", [Node]),
+ rpc_call(Node, rabbit_mnesia, change_node_type, [disc]);
+
+action(recluster, Node, [ClusterNodeS], _Opts, Inform) ->
+ ClusterNode = list_to_atom(ClusterNodeS),
+ Inform("Re-clustering ~p with ~p", [Node, ClusterNode]),
+ rpc_call(Node, rabbit_mnesia, recluster, [ClusterNode]);
+
+action(remove_node, Node, [ClusterNodeS], _Opts, Inform) ->
+ ClusterNode = list_to_atom(ClusterNodeS),
+ Inform("Removing node ~p from cluster", [ClusterNode]),
+ rpc_call(Node, rabbit_mnesia, remove_node, [ClusterNode]);
action(wait, Node, [PidFile], _Opts, Inform) ->
Inform("Waiting for ~p", [Node]),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index d41aa09b..5bfb7de0 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -60,6 +60,7 @@
-export([multi_call/2]).
-export([os_cmd/1]).
-export([gb_sets_difference/2]).
+-export([rabbit_version/0]).
%%----------------------------------------------------------------------------
@@ -212,6 +213,7 @@
([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}).
-spec(os_cmd/1 :: (string()) -> string()).
-spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()).
+-spec(rabbit_version/0 :: () -> string()).
-endif.
@@ -939,3 +941,7 @@ os_cmd(Command) ->
gb_sets_difference(S1, S2) ->
gb_sets:fold(fun gb_sets:delete_any/2, S1, S2).
+
+rabbit_version() ->
+ {ok, VSN} = application:get_key(rabbit, vsn),
+ VSN.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 7e9346f9..2e34b65e 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -18,16 +18,19 @@
-module(rabbit_mnesia).
-export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
- cluster/1, force_cluster/1, reset/0, force_reset/0, init_db/3,
- is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0,
- empty_ram_only_tables/0, copy_db/1, wait_for_tables/1,
- create_cluster_nodes_config/1, read_cluster_nodes_config/0,
- record_running_nodes/0, read_previously_running_nodes/0,
- running_nodes_filename/0, is_disc_node/0, on_node_down/1,
- on_node_up/1]).
+ join_cluster/2, check_cluster_consistency/0, reset/0, force_reset/0,
+ init_db/4, is_clustered/0, running_clustered_nodes/0,
+ all_clustered_nodes/0, all_clustered_disc_nodes/0,
+ empty_ram_only_tables/0, copy_db/1, wait_for_tables/1, is_disc_node/0,
+ on_node_down/1, on_node_up/1, should_be_disc_node/1,
+ change_node_type/1, recluster/1, remove_node/1, prepare/0,
+ update_cluster_nodes_status/0]).
-export([table_names/0]).
+%% Used internally in rpc calls, see `discover_nodes/1'
+-export([cluster_status_if_running/0, node_info/0]).
+
%% create_tables/0 exported for helping embed RabbitMQ in or alongside
%% other mnesia-using Erlang applications, such as ejabberd
-export([create_tables/0]).
@@ -38,147 +41,156 @@
-ifdef(use_specs).
--export_type([node_type/0]).
+-export_type([node_type/0, node_status/0]).
--type(node_type() :: disc_only | disc | ram | unknown).
--spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
- {'running_nodes', [node()]}]).
--spec(dir/0 :: () -> file:filename()).
--spec(ensure_mnesia_dir/0 :: () -> 'ok').
+-type(node_type() :: disc | ram).
+-type(node_status() :: {[node()], [node()], [node()]}).
+
+%% Main interface
+-spec(prepare/0 :: () -> 'ok').
-spec(init/0 :: () -> 'ok').
--spec(init_db/3 :: ([node()], boolean(), rabbit_misc:thunk('ok')) -> 'ok').
--spec(is_db_empty/0 :: () -> boolean()).
--spec(cluster/1 :: ([node()]) -> 'ok').
--spec(force_cluster/1 :: ([node()]) -> 'ok').
--spec(cluster/2 :: ([node()], boolean()) -> 'ok').
+-spec(join_cluster/2 :: ([node()], boolean()) -> 'ok').
-spec(reset/0 :: () -> 'ok').
-spec(force_reset/0 :: () -> 'ok').
+-spec(recluster/1 :: (node()) -> 'ok').
+-spec(change_node_type/1 :: (node_type()) -> 'ok').
+-spec(remove_node/1 :: (node()) -> 'ok').
+
+%% Various queries to get the status of the db
+-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
+ {'running_nodes', [node()]}]).
+-spec(is_db_empty/0 :: () -> boolean()).
-spec(is_clustered/0 :: () -> boolean()).
-spec(running_clustered_nodes/0 :: () -> [node()]).
-spec(all_clustered_nodes/0 :: () -> [node()]).
+-spec(is_disc_node/0 :: () -> boolean()).
+-spec(dir/0 :: () -> file:filename()).
+-spec(table_names/0 :: () -> [atom()]).
+-spec(cluster_status_if_running/0 :: () -> 'error' | node_status()).
+
+%% Operations on the db and utils, mainly used in `rabbit_upgrade' and `rabbit'
+-spec(init_db/4 :: ([node()], boolean(), boolean(), boolean()) -> 'ok').
+-spec(ensure_mnesia_dir/0 :: () -> 'ok').
-spec(empty_ram_only_tables/0 :: () -> 'ok').
-spec(create_tables/0 :: () -> 'ok').
-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())).
-spec(wait_for_tables/1 :: ([atom()]) -> 'ok').
--spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok').
--spec(read_cluster_nodes_config/0 :: () -> [node()]).
--spec(record_running_nodes/0 :: () -> 'ok').
--spec(read_previously_running_nodes/0 :: () -> [node()]).
--spec(running_nodes_filename/0 :: () -> file:filename()).
--spec(is_disc_node/0 :: () -> boolean()).
+-spec(should_be_disc_node/1 :: ([node()]) -> boolean()).
+-spec(check_cluster_consistency/0 :: () -> 'ok' | no_return()).
+
+%% Functions to handle the cluster status file
+-spec(write_cluster_nodes_status/1 :: (node_status()) -> 'ok').
+-spec(read_cluster_nodes_status/0 :: () -> node_status()).
+-spec(update_cluster_nodes_status/0 :: () -> 'ok').
+
+%% Hooks used in `rabbit_node_monitor'
-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
--spec(table_names/0 :: () -> [atom()]).
-
-endif.
%%----------------------------------------------------------------------------
+%% Main interface
+%%----------------------------------------------------------------------------
-status() ->
- [{nodes, case mnesia:system_info(is_running) of
- yes -> [{Key, Nodes} ||
- {Key, CopyType} <- [{disc_only, disc_only_copies},
- {disc, disc_copies},
- {ram, ram_copies}],
- begin
- Nodes = nodes_of_type(CopyType),
- Nodes =/= []
- end];
- no -> case all_clustered_nodes() of
- [] -> [];
- Nodes -> [{unknown, Nodes}]
- end;
- Reason when Reason =:= starting; Reason =:= stopping ->
- exit({rabbit_busy, try_again_later})
- end},
- {running_nodes, running_clustered_nodes()}].
+%% Sets up the cluster status file when needed, taking care of the legacy
+%% files
+prepare() ->
+ NotPresent =
+ fun (AllNodes0, WantDiscNode) ->
+ ThisNode = [node()],
+
+ RunningNodes0 = legacy_read_previously_running_nodes(),
+ legacy_delete_previously_running_nodes(),
+
+ RunningNodes = lists:usort(RunningNodes0 ++ ThisNode),
+ AllNodes =
+ lists:usort(AllNodes0 ++ RunningNodes),
+ DiscNodes = case WantDiscNode of
+ true -> ThisNode;
+ false -> []
+ end,
+
+ ok = write_cluster_nodes_status({AllNodes, DiscNodes, RunningNodes})
+ end,
+ case try_read_cluster_nodes_status() of
+ {ok, _} ->
+ %% We check the consistency only when the cluster status exists,
+ %% since when it doesn't exist it means that we just started a fresh
+ %% node, and when we have a legacy node with an old
+ %% "cluster_nodes.config" we can't check the consistency anyway
+ check_cluster_consistency(),
+ ok;
+ {error, {invalid_term, _, [AllNodes]}} ->
+ %% Legacy file
+ NotPresent(AllNodes, should_be_disc_node(AllNodes));
+ {error, {cannot_read_file, _, enoent}} ->
+ {ok, {AllNodes, WantDiscNode}} =
+ application:get_env(rabbit, cluster_nodes),
+ NotPresent(AllNodes, WantDiscNode)
+ end.
init() ->
ensure_mnesia_running(),
ensure_mnesia_dir(),
- Nodes = read_cluster_nodes_config(),
- ok = init_db(Nodes, should_be_disc_node(Nodes)),
+ {AllNodes, _, DiscNodes} = read_cluster_nodes_status(),
+ WantDiscNode = should_be_disc_node(DiscNodes),
+ ok = init_db(AllNodes, WantDiscNode, WantDiscNode),
%% We intuitively expect the global name server to be synced when
%% Mnesia is up. In fact that's not guaranteed to be the case - let's
%% make it so.
ok = global:sync(),
- ok = delete_previously_running_nodes(),
ok.
-is_db_empty() ->
- lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
- table_names()).
+%% Make the node join a cluster. The node will be reset automatically before we
+%% actually cluster it. The nodes provided will be used to find out about the
+%% nodes in the cluster.
+%% This function will fail if:
+%%
+%% * The node is currently the only disc node of its cluster
+%% * We can't connect to any of the nodes provided
+%% * The node is currently already clustered with the cluster of the nodes
+%% provided
+%%
+%% Note that we make no attempt to verify that the nodes provided are all in the
+%% same cluster, we simply pick the first online node and we cluster to its
+%% cluster.
+join_cluster(DiscoveryNode, WantDiscNode) ->
+ case is_disc_and_clustered() andalso is_only_disc_node(node()) of
+ true -> throw({error,
+ {standalone_ram_node,
+ "You can't cluster a node if it's the only "
+ "disc node in its existing cluster. If new nodes "
+ "joined while this node was offline, use \"recluster\" "
+ "to add them manually"}});
+ _ -> ok
+ end,
-cluster(ClusterNodes) ->
- cluster(ClusterNodes, false).
-force_cluster(ClusterNodes) ->
- cluster(ClusterNodes, true).
-
-%% Alter which disk nodes this node is clustered with. This can be a
-%% subset of all the disk nodes in the cluster but can (and should)
-%% include the node itself if it is to be a disk rather than a ram
-%% node. If Force is false, only connections to online nodes are
-%% allowed.
-cluster(ClusterNodes, Force) ->
- rabbit_misc:local_info_msg("Clustering with ~p~s~n",
- [ClusterNodes, if Force -> " forcefully";
- true -> ""
- end]),
ensure_mnesia_not_running(),
ensure_mnesia_dir(),
- case not Force andalso is_clustered() andalso
- is_only_disc_node(node(), false) andalso
- not should_be_disc_node(ClusterNodes)
- of
- true -> log_both("last running disc node leaving cluster");
- _ -> ok
- end,
+ {ClusterNodes, DiscNodes, _} = case discover_cluster(DiscoveryNode) of
+ {ok, Res} -> Res;
+ {error, Reason} -> throw({error, Reason})
+ end,
- %% Wipe mnesia if we're changing type from disc to ram
- case {is_disc_node(), should_be_disc_node(ClusterNodes)} of
- {true, false} -> rabbit_misc:with_local_io(
- fun () -> error_logger:warning_msg(
- "changing node type; wiping "
- "mnesia...~n~n")
- end),
- rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
- cannot_delete_schema);
- _ -> ok
+ case lists:member(node(), ClusterNodes) of
+ true -> throw({error, {already_clustered,
+ "You are already clustered with the nodes you "
+ "have selected"}});
+ false -> ok
end,
- %% Pre-emptively leave the cluster
- %%
- %% We're trying to handle the following two cases:
- %% 1. We have a two-node cluster, where both nodes are disc nodes.
- %% One node is re-clustered as a ram node. When it tries to
- %% re-join the cluster, but before it has time to update its
- %% tables definitions, the other node will order it to re-create
- %% its disc tables. So, we need to leave the cluster before we
- %% can join it again.
- %% 2. We have a two-node cluster, where both nodes are disc nodes.
- %% One node is forcefully reset (so, the other node thinks its
- %% still a part of the cluster). The reset node is re-clustered
- %% as a ram node. Same as above, we need to leave the cluster
- %% before we can join it. But, since we don't know if we're in a
- %% cluster or not, we just pre-emptively leave it before joining.
- ProperClusterNodes = ClusterNodes -- [node()],
- try
- ok = leave_cluster(ProperClusterNodes, ProperClusterNodes)
- catch
- {error, {no_running_cluster_nodes, _, _}} when Force ->
- ok
- end,
+ %% reset the node. this simplifies things and it will be needed in this case
+ %% - we're joining a new cluster with new nodes which are not in synch with
+ %% the current node. I also lifts the burden of reseting the node from the
+ %% user.
+ reset(false),
+
+ rabbit_misc:local_info_msg("Clustering with ~p~s~n", [ClusterNodes]),
%% Join the cluster
- start_mnesia(),
- try
- ok = init_db(ClusterNodes, Force),
- ok = create_cluster_nodes_config(ClusterNodes)
- after
- stop_mnesia()
- end,
+ ok = start_and_init_db(DiscNodes, WantDiscNode, false),
ok.
@@ -188,15 +200,339 @@ cluster(ClusterNodes, Force) ->
reset() -> reset(false).
force_reset() -> reset(true).
+reset(Force) ->
+ rabbit_misc:local_info_msg("Resetting Rabbit~s~n",
+ [if Force -> " forcefully";
+ true -> ""
+ end]),
+ ensure_mnesia_not_running(),
+ case not Force andalso is_disc_and_clustered() andalso
+ is_only_disc_node(node())
+ of
+ true -> throw({error, {standalone_ram_node,
+ "You can't reset a node if it's the only disc "
+ "node in a cluster. Please convert another node"
+ " of the cluster to a disc node first."}});
+ false -> ok
+ end,
+ Node = node(),
+ Nodes = all_clustered_nodes(),
+ case Force of
+ true ->
+ ok;
+ false ->
+ ensure_mnesia_dir(),
+ start_mnesia(),
+ %% Reconnecting so that we will get an up to date RunningNodes
+ RunningNodes =
+ try
+ %% Force=true here so that reset still works when clustered
+ %% with a node which is down
+ {_, DiscNodes, _} = read_cluster_nodes_status(),
+ ok = init_db(DiscNodes, should_be_disc_node(DiscNodes),
+ true),
+ running_clustered_nodes()
+ after
+ stop_mnesia()
+ end,
+ leave_cluster(Nodes, RunningNodes),
+ rabbit_misc:ensure_ok(mnesia:delete_schema([Node]),
+ cannot_delete_schema)
+ end,
+ %% We need to make sure that we don't end up in a distributed Erlang system
+ %% with nodes while not being in an Mnesia cluster with them. We don't
+ %% handle that well.
+ [erlang:disconnect_node(N) || N <- Nodes],
+ %% remove persisted messages and any other garbage we find
+ ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
+ ok = write_cluster_nodes_status(initial_cluster_status()),
+ ok.
+
+change_node_type(Type) ->
+ ensure_mnesia_dir(),
+ ensure_mnesia_not_running(),
+ case is_clustered() of
+ false -> throw({error, {not_clustered,
+ "Non-clustered nodes can only be disc nodes"}});
+ true -> ok
+ end,
+
+ check_cluster_consistency(),
+ DiscoveryNodes = all_clustered_nodes(),
+ ClusterNodes =
+ case discover_cluster(DiscoveryNodes) of
+ {ok, {ClusterNodes0, _, _}} ->
+ ClusterNodes0;
+ {error, _Reason} ->
+ throw({error,
+ {cannot_connect_to_cluster,
+ "Could not connect to the cluster nodes present in "
+ "this node status file. If the cluster has changed, "
+ "you can use the \"recluster\" command to point to the "
+ "new cluster nodes"}})
+ end,
+
+ WantDiscNode = case Type of
+ ram -> false;
+ disc -> true
+ end,
+
+ ok = start_and_init_db(ClusterNodes, WantDiscNode, false),
+
+ ok.
+
+recluster(DiscoveryNode) ->
+ ensure_mnesia_not_running(),
+ ensure_mnesia_dir(),
+
+ ClusterNodes =
+ case discover_cluster(DiscoveryNode) of
+ {ok, {ClusterNodes0, _, _}} ->
+ ClusterNodes0;
+ {error, _Reason} ->
+ throw({error,
+ {cannot_connect_to_node,
+ "Could not connect to the cluster node provided"}})
+ end,
+
+ case lists:member(node(), ClusterNodes) of
+ true -> start_and_init_db(ClusterNodes, is_disc_node(), false);
+ false -> throw({error,
+ {inconsistent_cluster,
+ "The nodes provided do not have this node as part of "
+ "the cluster"}})
+ end,
+
+ ok.
+
+remove_node(Node) ->
+ case mnesia:system_info(is_running) of
+ yes -> {atomic, ok} = mnesia:del_table_copy(schema, Node),
+ update_cluster_nodes_status(),
+ {_, []} = rpc:multicall(running_clustered_nodes(), rabbit_mnesia,
+ update_cluster_nodes_status, []);
+ no -> start_mnesia(),
+ try
+ [mnesia:force_load_table(T) || T <- rabbit_mnesia:table_names()],
+ remove_node(Node)
+ after
+ stop_mnesia()
+ end
+ end,
+ ok.
+
+%%----------------------------------------------------------------------------
+%% Queries
+%%----------------------------------------------------------------------------
+
+status() ->
+ IfNonEmpty = fun (_, []) -> [];
+ (Type, Nodes) -> [{Type, Nodes}]
+ end,
+ [{nodes, (IfNonEmpty(disc, all_clustered_disc_nodes()) ++
+ IfNonEmpty(ram, all_clustered_ram_nodes()))},
+ {running_nodes, running_clustered_nodes()}].
+
+is_db_empty() ->
+ lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
+ table_names()).
+
is_clustered() ->
- RunningNodes = running_clustered_nodes(),
- [node()] /= RunningNodes andalso [] /= RunningNodes.
+ Nodes = all_clustered_nodes(),
+ [node()] /= Nodes andalso [] /= Nodes.
+
+is_disc_and_clustered() ->
+ is_disc_node() andalso is_clustered().
+
+%% Functions that retrieve the nodes in the cluster completely rely on the
+%% cluster status file. For obvious reason, if rabbit is down, they might return
+%% out of date information.
all_clustered_nodes() ->
- mnesia:system_info(db_nodes).
+ {AllNodes, _, _} = read_cluster_nodes_status(),
+ AllNodes.
+
+all_clustered_disc_nodes() ->
+ {_, DiscNodes, _} = read_cluster_nodes_status(),
+ DiscNodes.
+
+all_clustered_ram_nodes() ->
+ {AllNodes, DiscNodes, _} = read_cluster_nodes_status(),
+ sets:to_list(sets:subtract(sets:from_list(AllNodes),
+ sets:from_list(DiscNodes))).
running_clustered_nodes() ->
- mnesia:system_info(running_db_nodes).
+ {_, _, RunningNodes} = read_cluster_nodes_status(),
+ RunningNodes.
+
+running_clustered_disc_nodes() ->
+ {_, DiscNodes, RunningNodes} = read_cluster_nodes_status(),
+ sets:to_list(sets:intersection(sets:from_list(DiscNodes),
+ sets:from_list(RunningNodes))).
+
+%% This function is a bit different, we want it to return correctly only when
+%% the node is actually online. This is because when we discover the nodes we
+%% want online, "working" nodes only.
+cluster_status_if_running() ->
+ case mnesia:system_info(is_running) of
+ no -> error;
+ yes -> {ok, {mnesia:system_info(db_nodes),
+ mnesia:table_info(schema, disc_copies),
+ mnesia:system_info(running_db_nodes)}}
+ end.
+
+node_info() ->
+ {erlang:system_info(otp_release), rabbit_misc:rabbit_version(),
+ cluster_status_if_running()}.
+
+is_disc_node() -> mnesia:system_info(use_dir).
+
+dir() -> mnesia:system_info(directory).
+
+table_names() ->
+ [Tab || {Tab, _} <- table_definitions()].
+
+%%----------------------------------------------------------------------------
+%% Operations on the db
+%%----------------------------------------------------------------------------
+
+init_db(ClusterNodes, WantDiscNode, Force) ->
+ init_db(ClusterNodes, WantDiscNode, Force, true).
+
+%% Take a cluster node config and create the right kind of node - a
+%% standalone disk node, or disk or ram node connected to the
+%% specified cluster nodes. If Force is false, don't allow
+%% connections to offline nodes.
+init_db(ClusterNodes, WantDiscNode, Force, Upgrade) ->
+ UClusterNodes = lists:usort(ClusterNodes),
+ ProperClusterNodes = UClusterNodes -- [node()],
+ case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of
+ {ok, []} when not Force andalso ProperClusterNodes =/= [] ->
+ throw({error, {failed_to_cluster_with, ProperClusterNodes,
+ "Mnesia could not connect to any disc nodes."}});
+ {ok, Nodes} ->
+ WasDiscNode = is_disc_node(),
+ %% We create a new db (on disk, or in ram) in the first
+ %% two cases and attempt to upgrade the in the other two
+ case {Nodes, WasDiscNode, WantDiscNode} of
+ {[], _, false} ->
+ %% New ram node; start from scratch
+ ok = create_schema(ram);
+ {[], false, true} ->
+ %% Nothing there at all, start from scratch
+ ok = create_schema(disc);
+ {[], true, true} ->
+ %% We're the first node up
+ case rabbit_upgrade:maybe_upgrade_local() of
+ ok -> ensure_schema_integrity();
+ version_not_available -> ok = schema_ok_or_move()
+ end;
+ {[AnotherNode|_], _, _} ->
+ %% Subsequent node in cluster, catch up
+ ensure_version_ok(
+ rpc:call(AnotherNode, rabbit_version, recorded, [])),
+ ok = wait_for_replicated_tables(),
+
+ %% The sequence in which we delete the schema and then the
+ %% other tables is important: if we delete the schema first
+ %% when moving to RAM mnesia will loudly complain since it
+ %% doesn't make much sense to do that. But when moving to
+ %% disc, we need to move the schema first.
+ case WantDiscNode of
+ true -> create_local_table_copy(schema, disc_copies),
+ create_local_table_copies(disc);
+ false -> create_local_table_copies(ram),
+ create_local_table_copy(schema, ram_copies)
+ end,
+
+ %% Write the status now that mnesia is running and clustered
+ update_cluster_nodes_status(),
+
+ ok = case Upgrade of
+ true ->
+ case rabbit_upgrade:maybe_upgrade_local() of
+ ok ->
+ ok;
+ %% If we're just starting up a new node we
+ %% won't have a version
+ starting_from_scratch ->
+ rabbit_version:record_desired()
+ end;
+ false ->
+ ok
+ end,
+
+ %% We've taken down mnesia, so ram nodes will need
+ %% to re-sync
+ case is_disc_node() of
+ false -> start_mnesia(),
+ mnesia:change_config(extra_db_nodes,
+ ProperClusterNodes),
+ wait_for_replicated_tables();
+ true -> ok
+ end,
+
+ ensure_schema_integrity(),
+ ok
+ end;
+ {error, Reason} ->
+ %% one reason we may end up here is if we try to join
+ %% nodes together that are currently running standalone or
+ %% are members of a different cluster
+ throw({error, {unable_to_join_cluster, ClusterNodes, Reason}})
+ end.
+
+ensure_mnesia_dir() ->
+ MnesiaDir = dir() ++ "/",
+ case filelib:ensure_dir(MnesiaDir) of
+ {error, Reason} ->
+ throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}});
+ ok ->
+ ok
+ end.
+
+ensure_mnesia_running() ->
+ case mnesia:system_info(is_running) of
+ yes ->
+ ok;
+ starting ->
+ wait_for(mnesia_running),
+ ensure_mnesia_running();
+ Reason when Reason =:= no; Reason =:= stopping ->
+ throw({error, mnesia_not_running})
+ end.
+
+ensure_mnesia_not_running() ->
+ case mnesia:system_info(is_running) of
+ no ->
+ ok;
+ stopping ->
+ wait_for(mnesia_not_running),
+ ensure_mnesia_not_running();
+ Reason when Reason =:= yes; Reason =:= starting ->
+ throw({error, mnesia_unexpectedly_running})
+ end.
+
+ensure_schema_integrity() ->
+ case check_schema_integrity() of
+ ok ->
+ ok;
+ {error, Reason} ->
+ throw({error, {schema_integrity_check_failed, Reason}})
+ end.
+
+check_schema_integrity() ->
+ Tables = mnesia:system_info(tables),
+ case check_tables(fun (Tab, TabDef) ->
+ case lists:member(Tab, Tables) of
+ false -> {error, {table_missing, Tab}};
+ true -> check_table_attributes(Tab, TabDef)
+ end
+ end) of
+ ok -> ok = wait_for_tables(),
+ check_tables(fun check_table_content/2);
+ Other -> Other
+ end.
empty_ram_only_tables() ->
Node = node(),
@@ -209,13 +545,188 @@ empty_ram_only_tables() ->
end, table_names()),
ok.
+create_tables() -> create_tables(disc).
+
+create_tables(Type) ->
+ lists:foreach(fun ({Tab, TabDef}) ->
+ TabDef1 = proplists:delete(match, TabDef),
+ case mnesia:create_table(Tab, TabDef1) of
+ {atomic, ok} -> ok;
+ {aborted, Reason} ->
+ throw({error, {table_creation_failed,
+ Tab, TabDef1, Reason}})
+ end
+ end,
+ table_definitions(Type)),
+ ok.
+
+copy_db(Destination) ->
+ ok = ensure_mnesia_not_running(),
+ rabbit_file:recursive_copy(dir(), Destination).
+
+wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()).
+
+wait_for_tables() -> wait_for_tables(table_names()).
+
+wait_for_tables(TableNames) ->
+ case mnesia:wait_for_tables(TableNames, 30000) of
+ ok ->
+ ok;
+ {timeout, BadTabs} ->
+ throw({error, {timeout_waiting_for_tables, BadTabs}});
+ {error, Reason} ->
+ throw({error, {failed_waiting_for_tables, Reason}})
+ end.
+
+should_be_disc_node(DiscNodes) ->
+ DiscNodes == [] orelse lists:member(node(), DiscNodes).
+
+%% This does not guarantee us much, but it avoids some situations that will
+%% definitely end up badly
+check_cluster_consistency() ->
+ CheckVsn = fun (This, This, _) ->
+ ok;
+ (This, Remote, Name) ->
+ throw({error,
+ {inconsistent_cluster,
+ rabbit_misc:format(
+ "~s version mismatch: local node is ~s, "
+ "remote node ~s", [Name, This, Remote])}})
+ end,
+ CheckOTP =
+ fun (OTP) -> CheckVsn(erlang:system_info(otp_release), OTP, "OTP") end,
+ CheckRabbit =
+ fun (Rabbit) ->
+ CheckVsn(rabbit_misc:rabbit_version(), Rabbit, "Rabbit")
+ end,
+
+ CheckNodes = fun (Node, AllNodes) ->
+ ThisNode = node(),
+ case lists:member(ThisNode, AllNodes) of
+ true ->
+ ok;
+ false ->
+ throw({error,
+ {inconsistent_cluster,
+ rabbit_misc:format(
+ "Node ~p thinks it's clustered "
+ "with node ~p, but ~p disagrees",
+ [ThisNode, Node, Node])}})
+ end
+ end,
+
+ lists:foreach(
+ fun(Node) ->
+ case rpc:call(Node, rabbit_mnesia, node_info, []) of
+ {badrpc, _Reason} ->
+ ok;
+ {OTP, Rabbit, error} ->
+ CheckOTP(OTP),
+ CheckRabbit(Rabbit);
+ {OTP, Rabbit, {ok, {AllNodes, _, _}}} ->
+ CheckOTP(OTP),
+ CheckRabbit(Rabbit),
+ CheckNodes(Node, AllNodes)
+ end
+ end, all_clustered_nodes()).
+
+%%----------------------------------------------------------------------------
+%% Cluster status file functions
+%%----------------------------------------------------------------------------
+
+%% The cluster node status file contains all we need to know about the cluster:
+%%
+%% * All the clustered nodes
+%% * The disc nodes
+%% * The running nodes.
+%%
+%% If the current node is a disc node it will be included in the disc nodes
+%% list.
+%%
+%% We strive to keep the file up to date and we rely on this assumption in
+%% various situations. Obviously when mnesia is offline the information we have
+%% will be outdated, but it can't be otherwise.
+
+cluster_nodes_status_filename() ->
+ dir() ++ "/cluster_nodes.config".
+
+initial_cluster_status() ->
+ {[node()], [node()], [node()]}.
+
+write_cluster_nodes_status(Status) ->
+ FileName = cluster_nodes_status_filename(),
+ case rabbit_file:write_term_file(FileName, [Status]) of
+ ok -> ok;
+ {error, Reason} ->
+ throw({error, {cannot_write_cluster_nodes_status,
+ FileName, Reason}})
+ end.
+
+try_read_cluster_nodes_status() ->
+ FileName = cluster_nodes_status_filename(),
+ case rabbit_file:read_term_file(FileName) of
+ {ok, [{_, _, _} = Status]} ->
+ {ok, Status};
+ {ok, Term} ->
+ {error, {invalid_term, FileName, Term}};
+ {error, Reason} ->
+ {error, {cannot_read_file, FileName, Reason}}
+ end.
+
+read_cluster_nodes_status() ->
+ case try_read_cluster_nodes_status() of
+ {ok, Status} ->
+ Status;
+ {error, Reason} ->
+ throw({error, {cannot_read_cluster_nodes_status, Reason}})
+ end.
+
+%% To update the cluster status when mnesia is running.
+update_cluster_nodes_status() ->
+ {ok, Status} = cluster_status_if_running(),
+ write_cluster_nodes_status(Status).
+
+%%--------------------------------------------------------------------
+%% Hooks for `rabbit_node_monitor'
+%%--------------------------------------------------------------------
+
+on_node_up(Node) ->
+ update_cluster_nodes_status(),
+ case is_only_disc_node(Node) of
+ true -> rabbit_log:info("cluster contains disc nodes again~n");
+ false -> ok
+ end.
+
+on_node_down(Node) ->
+ update_cluster_nodes_status(),
+ case is_only_disc_node(Node) of
+ true -> rabbit_log:info("only running disc node went down~n");
+ false -> ok
+ end.
+
+%%--------------------------------------------------------------------
+%% Internal helpers
%%--------------------------------------------------------------------
-nodes_of_type(Type) ->
- %% This function should return the nodes of a certain type (ram,
- %% disc or disc_only) in the current cluster. The type of nodes
- %% is determined when the cluster is initially configured.
- mnesia:table_info(schema, Type).
+discover_cluster(Nodes) when is_list(Nodes) ->
+ lists:foldl(fun (_, {ok, Res}) -> {ok, Res};
+ (Node, {error, _}) -> discover_cluster(Node)
+ end,
+ {error, {cannot_discover_cluster,
+ "The nodes provided is either offline or not running"}},
+ Nodes);
+discover_cluster(Node) ->
+ case Node =:= node() of
+ true ->
+ {error, {cannot_discover_cluster,
+ "You provided the current node as node to cluster with"}};
+ false ->
+ case rpc:call(Node, rabbit_mnesia, cluster_status_if_running, []) of
+ {badrpc, _Reason} -> discover_cluster([]);
+ error -> discover_cluster([]);
+ {ok, Res} -> {ok, Res}
+ end
+ end.
%% The tables aren't supposed to be on disk on a ram node
table_definitions(disc) ->
@@ -336,68 +847,11 @@ queue_name_match() ->
resource_match(Kind) ->
#resource{kind = Kind, _='_'}.
-table_names() ->
- [Tab || {Tab, _} <- table_definitions()].
-
replicated_table_names() ->
[Tab || {Tab, TabDef} <- table_definitions(),
not lists:member({local_content, true}, TabDef)
].
-dir() -> mnesia:system_info(directory).
-
-ensure_mnesia_dir() ->
- MnesiaDir = dir() ++ "/",
- case filelib:ensure_dir(MnesiaDir) of
- {error, Reason} ->
- throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}});
- ok ->
- ok
- end.
-
-ensure_mnesia_running() ->
- case mnesia:system_info(is_running) of
- yes ->
- ok;
- starting ->
- wait_for(mnesia_running),
- ensure_mnesia_running();
- Reason when Reason =:= no; Reason =:= stopping ->
- throw({error, mnesia_not_running})
- end.
-
-ensure_mnesia_not_running() ->
- case mnesia:system_info(is_running) of
- no ->
- ok;
- stopping ->
- wait_for(mnesia_not_running),
- ensure_mnesia_not_running();
- Reason when Reason =:= yes; Reason =:= starting ->
- throw({error, mnesia_unexpectedly_running})
- end.
-
-ensure_schema_integrity() ->
- case check_schema_integrity() of
- ok ->
- ok;
- {error, Reason} ->
- throw({error, {schema_integrity_check_failed, Reason}})
- end.
-
-check_schema_integrity() ->
- Tables = mnesia:system_info(tables),
- case check_tables(fun (Tab, TabDef) ->
- case lists:member(Tab, Tables) of
- false -> {error, {table_missing, Tab}};
- true -> check_table_attributes(Tab, TabDef)
- end
- end) of
- ok -> ok = wait_for_tables(),
- check_tables(fun check_table_content/2);
- Other -> Other
- end.
-
check_table_attributes(Tab, TabDef) ->
{_, ExpAttrs} = proplists:lookup(attributes, TabDef),
case mnesia:table_info(Tab, attributes) of
@@ -433,153 +887,6 @@ check_tables(Fun) ->
Errors -> {error, Errors}
end.
-%% The cluster node config file contains some or all of the disk nodes
-%% that are members of the cluster this node is / should be a part of.
-%%
-%% If the file is absent, the list is empty, or only contains the
-%% current node, then the current node is a standalone (disk)
-%% node. Otherwise it is a node that is part of a cluster as either a
-%% disk node, if it appears in the cluster node config, or ram node if
-%% it doesn't.
-
-cluster_nodes_config_filename() ->
- dir() ++ "/cluster_nodes.config".
-
-create_cluster_nodes_config(ClusterNodes) ->
- FileName = cluster_nodes_config_filename(),
- case rabbit_file:write_term_file(FileName, [ClusterNodes]) of
- ok -> ok;
- {error, Reason} ->
- throw({error, {cannot_create_cluster_nodes_config,
- FileName, Reason}})
- end.
-
-read_cluster_nodes_config() ->
- FileName = cluster_nodes_config_filename(),
- case rabbit_file:read_term_file(FileName) of
- {ok, [ClusterNodes]} -> ClusterNodes;
- {error, enoent} ->
- {ok, ClusterNodes} = application:get_env(rabbit, cluster_nodes),
- ClusterNodes;
- {error, Reason} ->
- throw({error, {cannot_read_cluster_nodes_config,
- FileName, Reason}})
- end.
-
-delete_cluster_nodes_config() ->
- FileName = cluster_nodes_config_filename(),
- case file:delete(FileName) of
- ok -> ok;
- {error, enoent} -> ok;
- {error, Reason} ->
- throw({error, {cannot_delete_cluster_nodes_config,
- FileName, Reason}})
- end.
-
-running_nodes_filename() ->
- filename:join(dir(), "nodes_running_at_shutdown").
-
-record_running_nodes() ->
- FileName = running_nodes_filename(),
- Nodes = running_clustered_nodes() -- [node()],
- %% Don't check the result: we're shutting down anyway and this is
- %% a best-effort-basis.
- rabbit_file:write_term_file(FileName, [Nodes]),
- ok.
-
-read_previously_running_nodes() ->
- FileName = running_nodes_filename(),
- case rabbit_file:read_term_file(FileName) of
- {ok, [Nodes]} -> Nodes;
- {error, enoent} -> [];
- {error, Reason} -> throw({error, {cannot_read_previous_nodes_file,
- FileName, Reason}})
- end.
-
-delete_previously_running_nodes() ->
- FileName = running_nodes_filename(),
- case file:delete(FileName) of
- ok -> ok;
- {error, enoent} -> ok;
- {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file,
- FileName, Reason}})
- end.
-
-init_db(ClusterNodes, Force) ->
- init_db(
- ClusterNodes, Force,
- fun () ->
- case rabbit_upgrade:maybe_upgrade_local() of
- ok -> ok;
- %% If we're just starting up a new node we won't have a
- %% version
- starting_from_scratch -> ok = rabbit_version:record_desired()
- end
- end).
-
-%% Take a cluster node config and create the right kind of node - a
-%% standalone disk node, or disk or ram node connected to the
-%% specified cluster nodes. If Force is false, don't allow
-%% connections to offline nodes.
-init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) ->
- UClusterNodes = lists:usort(ClusterNodes),
- ProperClusterNodes = UClusterNodes -- [node()],
- case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of
- {ok, []} when not Force andalso ProperClusterNodes =/= [] ->
- throw({error, {failed_to_cluster_with, ProperClusterNodes,
- "Mnesia could not connect to any disc nodes."}});
- {ok, Nodes} ->
- WasDiscNode = is_disc_node(),
- WantDiscNode = should_be_disc_node(ClusterNodes),
- %% We create a new db (on disk, or in ram) in the first
- %% two cases and attempt to upgrade the in the other two
- case {Nodes, WasDiscNode, WantDiscNode} of
- {[], _, false} ->
- %% New ram node; start from scratch
- ok = create_schema(ram);
- {[], false, true} ->
- %% Nothing there at all, start from scratch
- ok = create_schema(disc);
- {[], true, true} ->
- %% We're the first node up
- case rabbit_upgrade:maybe_upgrade_local() of
- ok -> ensure_schema_integrity();
- version_not_available -> ok = schema_ok_or_move()
- end;
- {[AnotherNode|_], _, _} ->
- %% Subsequent node in cluster, catch up
- ensure_version_ok(
- rpc:call(AnotherNode, rabbit_version, recorded, [])),
- {CopyType, CopyTypeAlt} =
- case WantDiscNode of
- true -> {disc, disc_copies};
- false -> {ram, ram_copies}
- end,
- ok = wait_for_replicated_tables(),
- ok = create_local_table_copy(schema, CopyTypeAlt),
- ok = create_local_table_copies(CopyType),
-
- ok = SecondaryPostMnesiaFun(),
- %% We've taken down mnesia, so ram nodes will need
- %% to re-sync
- case is_disc_node() of
- false -> start_mnesia(),
- mnesia:change_config(extra_db_nodes,
- ProperClusterNodes),
- wait_for_replicated_tables();
- true -> ok
- end,
-
- ensure_schema_integrity(),
- ok
- end;
- {error, Reason} ->
- %% one reason we may end up here is if we try to join
- %% nodes together that are currently running standalone or
- %% are members of a different cluster
- throw({error, {unable_to_join_cluster, ClusterNodes, Reason}})
- end.
-
schema_ok_or_move() ->
case check_schema_integrity() of
ok ->
@@ -618,11 +925,6 @@ create_schema(Type) ->
ensure_schema_integrity(),
ok = rabbit_version:record_desired().
-is_disc_node() -> mnesia:system_info(use_dir).
-
-should_be_disc_node(ClusterNodes) ->
- ClusterNodes == [] orelse lists:member(node(), ClusterNodes).
-
move_db() ->
stop_mnesia(),
MnesiaDir = filename:dirname(dir() ++ "/"),
@@ -644,25 +946,6 @@ move_db() ->
start_mnesia(),
ok.
-copy_db(Destination) ->
- ok = ensure_mnesia_not_running(),
- rabbit_file:recursive_copy(dir(), Destination).
-
-create_tables() -> create_tables(disc).
-
-create_tables(Type) ->
- lists:foreach(fun ({Tab, TabDef}) ->
- TabDef1 = proplists:delete(match, TabDef),
- case mnesia:create_table(Tab, TabDef1) of
- {atomic, ok} -> ok;
- {aborted, Reason} ->
- throw({error, {table_creation_failed,
- Tab, TabDef1, Reason}})
- end
- end,
- table_definitions(Type)),
- ok.
-
copy_type_to_ram(TabDef) ->
[{disc_copies, []}, {ram_copies, [node()]}
| proplists:delete(ram_copies, proplists:delete(disc_copies, TabDef))].
@@ -711,114 +994,41 @@ create_local_table_copy(Tab, Type) ->
end,
ok.
-wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()).
-
-wait_for_tables() -> wait_for_tables(table_names()).
-
-wait_for_tables(TableNames) ->
- case mnesia:wait_for_tables(TableNames, 30000) of
- ok ->
- ok;
- {timeout, BadTabs} ->
- throw({error, {timeout_waiting_for_tables, BadTabs}});
- {error, Reason} ->
- throw({error, {failed_waiting_for_tables, Reason}})
- end.
-
-reset(Force) ->
- rabbit_misc:local_info_msg("Resetting Rabbit~s~n", [if Force -> " forcefully";
- true -> ""
- end]),
- ensure_mnesia_not_running(),
- case not Force andalso is_clustered() andalso
- is_only_disc_node(node(), false)
- of
- true -> log_both("no other disc nodes running");
- false -> ok
- end,
- Node = node(),
- Nodes = all_clustered_nodes() -- [Node],
- case Force of
- true -> ok;
- false ->
- ensure_mnesia_dir(),
- start_mnesia(),
- RunningNodes =
- try
- %% Force=true here so that reset still works when clustered
- %% with a node which is down
- ok = init_db(read_cluster_nodes_config(), true),
- running_clustered_nodes() -- [Node]
- after
- stop_mnesia()
- end,
- leave_cluster(Nodes, RunningNodes),
- rabbit_misc:ensure_ok(mnesia:delete_schema([Node]),
- cannot_delete_schema)
- end,
- %% We need to make sure that we don't end up in a distributed
- %% Erlang system with nodes while not being in an Mnesia cluster
- %% with them. We don't handle that well.
- [erlang:disconnect_node(N) || N <- Nodes],
- ok = delete_cluster_nodes_config(),
- %% remove persisted messages and any other garbage we find
- ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
- ok.
-
leave_cluster([], _) -> ok;
-leave_cluster(Nodes, RunningNodes) ->
- %% find at least one running cluster node and instruct it to
- %% remove our schema copy which will in turn result in our node
- %% being removed as a cluster node from the schema, with that
- %% change being propagated to all nodes
- case lists:any(
- fun (Node) ->
- case rpc:call(Node, mnesia, del_table_copy,
- [schema, node()]) of
- {atomic, ok} -> true;
- {badrpc, nodedown} -> false;
- {aborted, {node_not_running, _}} -> false;
- {aborted, Reason} ->
- throw({error, {failed_to_leave_cluster,
- Nodes, RunningNodes, Reason}})
- end
- end,
- RunningNodes) of
- true -> ok;
- false -> throw({error, {no_running_cluster_nodes,
- Nodes, RunningNodes}})
+leave_cluster(Nodes, RunningNodes0) ->
+ case RunningNodes0 -- [node()] of
+ [] -> ok;
+ RunningNodes ->
+ %% find at least one running cluster node and instruct it to remove
+ %% our schema copy which will in turn result in our node being
+ %% removed as a cluster node from the schema, with that change being
+ %% propagated to all nodes
+ case lists:any(
+ fun (Node) ->
+ case rpc:call(Node, mnesia, del_table_copy,
+ [schema, node()]) of
+ {atomic, ok} -> true;
+ {badrpc, nodedown} -> false;
+ {aborted, {node_not_running, _}} -> false;
+ {aborted, Reason} ->
+ throw({error, {failed_to_leave_cluster,
+ Nodes, RunningNodes, Reason}})
+ end
+ end,
+ RunningNodes) of
+ true -> ok;
+ false -> throw({error, {no_running_cluster_nodes,
+ Nodes, RunningNodes}})
+ end
end.
wait_for(Condition) ->
error_logger:info_msg("Waiting for ~p...~n", [Condition]),
timer:sleep(1000).
-on_node_up(Node) ->
- case is_only_disc_node(Node, true) of
- true -> rabbit_log:info("cluster contains disc nodes again~n");
- false -> ok
- end.
-
-on_node_down(Node) ->
- case is_only_disc_node(Node, true) of
- true -> rabbit_log:info("only running disc node went down~n");
- false -> ok
- end.
-
-is_only_disc_node(Node, _MnesiaRunning = true) ->
- RunningSet = sets:from_list(running_clustered_nodes()),
- DiscSet = sets:from_list(nodes_of_type(disc_copies)),
- [Node] =:= sets:to_list(sets:intersection(RunningSet, DiscSet));
-is_only_disc_node(Node, false) ->
- start_mnesia(),
- Res = is_only_disc_node(Node, true),
- stop_mnesia(),
- Res.
-
-log_both(Warning) ->
- io:format("Warning: ~s~n", [Warning]),
- rabbit_misc:with_local_io(
- fun () -> error_logger:warning_msg("~s~n", [Warning]) end).
+is_only_disc_node(Node) ->
+ Nodes = running_clustered_disc_nodes(),
+ [Node] =:= Nodes.
start_mnesia() ->
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
@@ -827,3 +1037,37 @@ start_mnesia() ->
stop_mnesia() ->
stopped = mnesia:stop(),
ensure_mnesia_not_running().
+
+start_and_init_db(ClusterNodes, WantDiscNode, Force) ->
+ mnesia:start(),
+ try
+ ok = init_db(ClusterNodes, WantDiscNode, Force)
+ after
+ mnesia:stop()
+ end,
+ ok.
+
+%%--------------------------------------------------------------------
+%% Legacy functions related to the "running nodes" file
+%%--------------------------------------------------------------------
+
+legacy_running_nodes_filename() ->
+ filename:join(dir(), "nodes_running_at_shutdown").
+
+legacy_read_previously_running_nodes() ->
+ FileName = legacy_running_nodes_filename(),
+ case rabbit_file:read_term_file(FileName) of
+ {ok, [Nodes]} -> Nodes;
+ {error, enoent} -> [];
+ {error, Reason} -> throw({error, {cannot_read_previous_nodes_file,
+ FileName, Reason}})
+ end.
+
+legacy_delete_previously_running_nodes() ->
+ FileName = legacy_running_nodes_filename(),
+ case file:delete(FileName) of
+ ok -> ok;
+ {error, enoent} -> ok;
+ {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file,
+ FileName, Reason}})
+ end.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 2760ef2d..dc5e6ef7 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -31,6 +31,7 @@
-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>).
all_tests() ->
+ ok = setup_cluster(),
passed = gm_tests:all_tests(),
passed = mirrored_supervisor_tests:all_tests(),
application:set_env(rabbit, file_handles_high_watermark, 10, infinity),
@@ -51,36 +52,51 @@ all_tests() ->
passed = test_log_management_during_startup(),
passed = test_statistics(),
passed = test_arguments_parser(),
- passed = test_cluster_management(),
passed = test_user_management(),
passed = test_runtime_parameters(),
passed = test_server_status(),
passed = test_confirms(),
- passed = maybe_run_cluster_dependent_tests(),
+ passed =
+ do_if_secondary_node(
+ fun run_cluster_dependent_tests/1,
+ fun (SecondaryNode) ->
+ io:format("Skipping cluster dependent tests with node ~p~n",
+ [SecondaryNode]),
+ passed
+ end),
passed = test_configurable_server_properties(),
passed.
-maybe_run_cluster_dependent_tests() ->
+do_if_secondary_node(Up, Down) ->
SecondaryNode = rabbit_nodes:make("hare"),
case net_adm:ping(SecondaryNode) of
- pong -> passed = run_cluster_dependent_tests(SecondaryNode);
- pang -> io:format("Skipping cluster dependent tests with node ~p~n",
- [SecondaryNode])
- end,
- passed.
+ pong -> Up(SecondaryNode);
+ pang -> Down(SecondaryNode)
+ end.
-run_cluster_dependent_tests(SecondaryNode) ->
- SecondaryNodeS = atom_to_list(SecondaryNode),
+setup_cluster() ->
+ do_if_secondary_node(
+ fun (SecondaryNode) ->
+ ok = control_action(stop_app, []),
+ ok = control_action(join_cluster,
+ [atom_to_list(SecondaryNode)]),
+ ok = control_action(start_app, []),
+ ok = control_action(start_app, SecondaryNode, [], [])
+ end,
+ fun (_) -> ok end).
- cover:stop(SecondaryNode),
- ok = control_action(stop_app, []),
- ok = control_action(reset, []),
- ok = control_action(cluster, [SecondaryNodeS]),
- ok = control_action(start_app, []),
- cover:start(SecondaryNode),
- ok = control_action(start_app, SecondaryNode, [], []),
+maybe_run_cluster_dependent_tests() ->
+ do_if_secondary_node(
+ fun (SecondaryNode) ->
+ passed = run_cluster_dependent_tests(SecondaryNode)
+ end,
+ fun (SecondaryNode) ->
+ io:format("Skipping cluster dependent tests with node ~p~n",
+ [SecondaryNode])
+ end).
+run_cluster_dependent_tests(SecondaryNode) ->
io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]),
passed = test_delegates_async(SecondaryNode),
passed = test_delegates_sync(SecondaryNode),
@@ -855,200 +871,6 @@ test_arguments_parser() ->
passed.
-test_cluster_management() ->
- %% 'cluster' and 'reset' should only work if the app is stopped
- {error, _} = control_action(cluster, []),
- {error, _} = control_action(reset, []),
- {error, _} = control_action(force_reset, []),
-
- ok = control_action(stop_app, []),
-
- %% various ways of creating a standalone node
- NodeS = atom_to_list(node()),
- ClusteringSequence = [[],
- [NodeS],
- ["invalid@invalid", NodeS],
- [NodeS, "invalid@invalid"]],
-
- ok = control_action(reset, []),
- lists:foreach(fun (Arg) ->
- ok = control_action(force_cluster, Arg),
- ok
- end,
- ClusteringSequence),
- lists:foreach(fun (Arg) ->
- ok = control_action(reset, []),
- ok = control_action(force_cluster, Arg),
- ok
- end,
- ClusteringSequence),
- ok = control_action(reset, []),
- lists:foreach(fun (Arg) ->
- ok = control_action(force_cluster, Arg),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok
- end,
- ClusteringSequence),
- lists:foreach(fun (Arg) ->
- ok = control_action(reset, []),
- ok = control_action(force_cluster, Arg),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok
- end,
- ClusteringSequence),
-
- %% convert a disk node into a ram node
- ok = control_action(reset, []),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_disc_node(),
- ok = control_action(force_cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
- ok = assert_ram_node(),
-
- %% join a non-existing cluster as a ram node
- ok = control_action(reset, []),
- ok = control_action(force_cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
- ok = assert_ram_node(),
-
- ok = control_action(reset, []),
-
- SecondaryNode = rabbit_nodes:make("hare"),
- case net_adm:ping(SecondaryNode) of
- pong -> passed = test_cluster_management2(SecondaryNode);
- pang -> io:format("Skipping clustering tests with node ~p~n",
- [SecondaryNode])
- end,
-
- ok = control_action(start_app, []),
- passed.
-
-test_cluster_management2(SecondaryNode) ->
- NodeS = atom_to_list(node()),
- SecondaryNodeS = atom_to_list(SecondaryNode),
-
- %% make a disk node
- ok = control_action(cluster, [NodeS]),
- ok = assert_disc_node(),
- %% make a ram node
- ok = control_action(reset, []),
- ok = control_action(cluster, [SecondaryNodeS]),
- ok = assert_ram_node(),
-
- %% join cluster as a ram node
- ok = control_action(reset, []),
- ok = control_action(force_cluster, [SecondaryNodeS, "invalid1@invalid"]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_ram_node(),
-
- %% ram node will not start by itself
- ok = control_action(stop_app, []),
- ok = control_action(stop_app, SecondaryNode, [], []),
- {error, _} = control_action(start_app, []),
- ok = control_action(start_app, SecondaryNode, [], []),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
-
- %% change cluster config while remaining in same cluster
- ok = control_action(force_cluster, ["invalid2@invalid", SecondaryNodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
-
- %% join non-existing cluster as a ram node
- ok = control_action(force_cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
- {error, _} = control_action(start_app, []),
- ok = assert_ram_node(),
-
- %% join empty cluster as a ram node (converts to disc)
- ok = control_action(cluster, []),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_disc_node(),
-
- %% make a new ram node
- ok = control_action(reset, []),
- ok = control_action(force_cluster, [SecondaryNodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_ram_node(),
-
- %% turn ram node into disk node
- ok = control_action(cluster, [SecondaryNodeS, NodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_disc_node(),
-
- %% convert a disk node into a ram node
- ok = assert_disc_node(),
- ok = control_action(force_cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
- ok = assert_ram_node(),
-
- %% make a new disk node
- ok = control_action(force_reset, []),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_disc_node(),
-
- %% turn a disk node into a ram node
- ok = control_action(reset, []),
- ok = control_action(cluster, [SecondaryNodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_ram_node(),
-
- %% NB: this will log an inconsistent_database error, which is harmless
- %% Turning cover on / off is OK even if we're not in general using cover,
- %% it just turns the engine on / off, doesn't actually log anything.
- cover:stop([SecondaryNode]),
- true = disconnect_node(SecondaryNode),
- pong = net_adm:ping(SecondaryNode),
- cover:start([SecondaryNode]),
-
- %% leaving a cluster as a ram node
- ok = control_action(reset, []),
- %% ...and as a disk node
- ok = control_action(cluster, [SecondaryNodeS, NodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- cover:stop(SecondaryNode),
- ok = control_action(reset, []),
- cover:start(SecondaryNode),
-
- %% attempt to leave cluster when no other node is alive
- ok = control_action(cluster, [SecondaryNodeS, NodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, SecondaryNode, [], []),
- ok = control_action(stop_app, []),
- {error, {no_running_cluster_nodes, _, _}} =
- control_action(reset, []),
-
- %% attempt to change type when no other node is alive
- {error, {no_running_cluster_nodes, _, _}} =
- control_action(cluster, [SecondaryNodeS]),
-
- %% leave system clustered, with the secondary node as a ram node
- ok = control_action(force_reset, []),
- ok = control_action(start_app, []),
- %% Yes, this is rather ugly. But since we're a clustered Mnesia
- %% node and we're telling another clustered node to reset itself,
- %% we will get disconnected half way through causing a
- %% badrpc. This never happens in real life since rabbitmqctl is
- %% not a clustered Mnesia node.
- cover:stop(SecondaryNode),
- {badrpc, nodedown} = control_action(force_reset, SecondaryNode, [], []),
- pong = net_adm:ping(SecondaryNode),
- cover:start(SecondaryNode),
- ok = control_action(cluster, SecondaryNode, [NodeS], []),
- ok = control_action(start_app, SecondaryNode, [], []),
-
- passed.
-
test_user_management() ->
%% lots if stuff that should fail
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index e1a7bcae..85bcff25 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -121,10 +121,7 @@ remove_backup() ->
info("upgrades: Mnesia backup removed~n", []).
maybe_upgrade_mnesia() ->
- %% rabbit_mnesia:all_clustered_nodes/0 will return [] at this point
- %% if we are a RAM node since Mnesia has not started yet.
- AllNodes = lists:usort(rabbit_mnesia:all_clustered_nodes() ++
- rabbit_mnesia:read_cluster_nodes_config()),
+ AllNodes = rabbit_mnesia:all_clustered_nodes(),
case rabbit_version:upgrades_required(mnesia) of
{error, starting_from_scratch} ->
ok;
@@ -150,19 +147,17 @@ maybe_upgrade_mnesia() ->
upgrade_mode(AllNodes) ->
case nodes_running(AllNodes) of
[] ->
- AfterUs = rabbit_mnesia:read_previously_running_nodes(),
+ AfterUs = rabbit_mnesia:running_clustered_nodes() -- [node()],
case {is_disc_node_legacy(), AfterUs} of
{true, []} ->
primary;
{true, _} ->
- Filename = rabbit_mnesia:running_nodes_filename(),
+ %% TODO: Here I'm assuming that the various cluster status
+ %% files are consistent with each other, I think I can
+ %% provide a solution if they're not...
die("Cluster upgrade needed but other disc nodes shut "
"down after this one.~nPlease first start the last "
- "disc node to shut down.~n~nNote: if several disc "
- "nodes were shut down simultaneously they may "
- "all~nshow this message. In which case, remove "
- "the lock file on one of them and~nstart that node. "
- "The lock file on this node is:~n~n ~s ", [Filename]);
+ "disc node to shut down.~n", []);
{false, _} ->
die("Cluster upgrade needed but this is a ram node.~n"
"Please first start the last disc node to shut down.",
@@ -222,15 +217,8 @@ secondary_upgrade(AllNodes) ->
IsDiscNode = is_disc_node_legacy(),
rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
cannot_delete_schema),
- %% Note that we cluster with all nodes, rather than all disc nodes
- %% (as we can't know all disc nodes at this point). This is safe as
- %% we're not writing the cluster config, just setting up Mnesia.
- ClusterNodes = case IsDiscNode of
- true -> AllNodes;
- false -> AllNodes -- [node()]
- end,
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
- ok = rabbit_mnesia:init_db(ClusterNodes, true, fun () -> ok end),
+ ok = rabbit_mnesia:init_db(AllNodes, IsDiscNode, true, false),
ok = rabbit_version:record_desired_for_scope(mnesia),
ok.