summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-09-26 13:57:55 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-09-26 13:57:55 +0100
commite6c877d3cdb6ca369d4d9c94af6187794affaac0 (patch)
tree60642659cfe3a1e15ee4332479dd91eb8f1604e6
parentcaca30e74cbc7a46bb54e5cc25f094abfd8e02ec (diff)
parent750dcde2aab7e79a01b1da8959b0038f78354374 (diff)
downloadrabbitmq-server-e6c877d3cdb6ca369d4d9c94af6187794affaac0.tar.gz
merge bug25160 into default
-rw-r--r--ebin/rabbit_app.in2
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_control_main.erl9
-rw-r--r--src/rabbit_direct.erl2
-rw-r--r--src/rabbit_mirror_queue_master.erl11
-rw-r--r--src/rabbit_mnesia.erl503
-rw-r--r--src/rabbit_networking.erl2
-rw-r--r--src/rabbit_node_monitor.erl194
-rw-r--r--src/rabbit_tests.erl12
-rw-r--r--src/rabbit_upgrade.erl37
11 files changed, 350 insertions, 426 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 78842281..9b1ff8bd 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -33,7 +33,7 @@
{default_user_tags, [administrator]},
{default_vhost, <<"/">>},
{default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
- {cluster_nodes, {[], true}},
+ {cluster_nodes, {[], disc}},
{server_properties, []},
{collect_statistics, none},
{collect_statistics_interval, 5000},
diff --git a/src/rabbit.erl b/src/rabbit.erl
index e9587841..3fe27cd9 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -519,7 +519,7 @@ sort_boot_steps(UnsortedSteps) ->
end.
boot_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
- AllNodes = rabbit_mnesia:all_clustered_nodes(),
+ AllNodes = rabbit_mnesia:cluster_nodes(all),
{Err, Nodes} =
case AllNodes -- [node()] of
[] -> {"Timeout contacting cluster nodes. Since RabbitMQ was"
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e8f3aab3..0d13312b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -136,7 +136,7 @@ flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
list() ->
- rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(),
+ rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_channel, list_local, []).
list_local() ->
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index bd01a1b1..e75e1f6f 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -247,9 +247,12 @@ action(force_reset, Node, [], _Opts, Inform) ->
action(join_cluster, Node, [ClusterNodeS], Opts, Inform) ->
ClusterNode = list_to_atom(ClusterNodeS),
- DiscNode = not proplists:get_bool(?RAM_OPT, Opts),
+ NodeType = case proplists:get_bool(?RAM_OPT, Opts) of
+ true -> ram;
+ false -> disc
+ end,
Inform("Clustering node ~p with ~p", [Node, ClusterNode]),
- rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, DiscNode]);
+ rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, NodeType]);
action(change_cluster_node_type, Node, ["ram"], _Opts, Inform) ->
Inform("Turning ~p into a ram node", [Node]),
@@ -458,7 +461,7 @@ action(list_parameters, Node, [], Opts, Inform) ->
action(report, Node, _Args, _Opts, Inform) ->
Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]),
[begin ok = action(Action, N, [], [], Inform), io:nl() end ||
- N <- unsafe_rpc(Node, rabbit_mnesia, running_clustered_nodes, []),
+ N <- unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]),
Action <- [status, cluster_status, environment]],
VHosts = unsafe_rpc(Node, rabbit_vhost, list, []),
[print_report(Node, Q) || Q <- ?GLOBAL_QUERIES],
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index a3431321..689e5d83 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -60,7 +60,7 @@ list_local() ->
pg_local:get_members(rabbit_direct).
list() ->
- rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(),
+ rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_direct, list_local, []).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index c11a8ff7..41389815 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -87,12 +87,11 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
Q, undefined, sender_death_fun(), length_fun()),
GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
- MNodes1 =
- (case MNodes of
- all -> rabbit_mnesia:all_clustered_nodes();
- undefined -> [];
- _ -> MNodes
- end) -- [node()],
+ MNodes1 = (case MNodes of
+ all -> rabbit_mnesia:cluster_nodes(all);
+ undefined -> [];
+ _ -> MNodes
+ end) -- [node()],
[rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback),
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 8ce19cc6..ae36febb 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -28,19 +28,16 @@
status/0,
is_db_empty/0,
is_clustered/0,
- all_clustered_nodes/0,
- clustered_disc_nodes/0,
- running_clustered_nodes/0,
- is_disc_node/0,
+ cluster_nodes/1,
+ node_type/0,
dir/0,
table_names/0,
- wait_for_tables/1,
cluster_status_from_mnesia/0,
- init_db/3,
+ init_db_unchecked/2,
empty_ram_only_tables/0,
copy_db/1,
- wait_for_tables/0,
+ wait_for_tables/1,
check_cluster_consistency/0,
ensure_mnesia_dir/0,
@@ -67,12 +64,11 @@
-export_type([node_type/0, cluster_status/0]).
-type(node_type() :: disc | ram).
--type(cluster_status() :: {ordsets:ordset(node()), ordsets:ordset(node()),
- ordsets:ordset(node())}).
+-type(cluster_status() :: {[node()], [node()], [node()]}).
%% Main interface
-spec(init/0 :: () -> 'ok').
--spec(join_cluster/2 :: ([node()], boolean()) -> 'ok').
+-spec(join_cluster/2 :: (node(), node_type()) -> 'ok').
-spec(reset/0 :: () -> 'ok').
-spec(force_reset/0 :: () -> 'ok').
-spec(update_cluster_nodes/1 :: (node()) -> 'ok').
@@ -84,17 +80,15 @@
{'running_nodes', [node()]}]).
-spec(is_db_empty/0 :: () -> boolean()).
-spec(is_clustered/0 :: () -> boolean()).
--spec(all_clustered_nodes/0 :: () -> [node()]).
--spec(clustered_disc_nodes/0 :: () -> [node()]).
--spec(running_clustered_nodes/0 :: () -> [node()]).
--spec(is_disc_node/0 :: () -> boolean()).
+-spec(cluster_nodes/1 :: ('all' | 'disc' | 'ram' | 'running') -> [node()]).
+-spec(node_type/0 :: () -> node_type()).
-spec(dir/0 :: () -> file:filename()).
-spec(table_names/0 :: () -> [atom()]).
--spec(cluster_status_from_mnesia/0 :: () -> {'ok', cluster_status()} |
- {'error', any()}).
+-spec(cluster_status_from_mnesia/0 :: () -> rabbit_types:ok_or_error2(
+ cluster_status(), any())).
%% Operations on the db and utils, mainly used in `rabbit_upgrade' and `rabbit'
--spec(init_db/3 :: ([node()], boolean(), boolean()) -> 'ok').
+-spec(init_db_unchecked/2 :: ([node()], node_type()) -> 'ok').
-spec(empty_ram_only_tables/0 :: () -> 'ok').
-spec(create_tables/0 :: () -> 'ok').
-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())).
@@ -106,12 +100,6 @@
-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
-%% Functions used in internal rpc calls
--spec(node_info/0 :: () -> {string(), string(),
- ({'ok', cluster_status()} | 'error')}).
--spec(remove_node_if_mnesia_running/1 :: (node()) -> 'ok' |
- {'error', term()}).
-
-endif.
%%----------------------------------------------------------------------------
@@ -123,7 +111,9 @@ init() ->
ensure_mnesia_dir(),
case is_virgin_node() of
true -> init_from_config();
- false -> init(is_disc_node(), all_clustered_nodes())
+ false -> NodeType = node_type(),
+ init_db_and_upgrade(cluster_nodes(all), NodeType,
+ NodeType =:= ram)
end,
%% We intuitively expect the global name server to be synced when
%% Mnesia is up. In fact that's not guaranteed to be the case -
@@ -131,24 +121,21 @@ init() ->
ok = global:sync(),
ok.
-init(WantDiscNode, AllNodes) ->
- init_db_and_upgrade(AllNodes, WantDiscNode, WantDiscNode).
-
init_from_config() ->
- {ok, {TryNodes, WantDiscNode}} =
+ {ok, {TryNodes, NodeType}} =
application:get_env(rabbit, cluster_nodes),
- case find_good_node(TryNodes -- [node()]) of
+ case find_good_node(nodes_excl_me(TryNodes)) of
{ok, Node} ->
rabbit_log:info("Node '~p' selected for clustering from "
"configuration~n", [Node]),
{ok, {_, DiscNodes, _}} = discover_cluster(Node),
- init_db_and_upgrade(DiscNodes, WantDiscNode, false),
+ init_db_and_upgrade(DiscNodes, NodeType, true),
rabbit_node_monitor:notify_joined_cluster();
none ->
rabbit_log:warning("Could not find any suitable node amongst the "
"ones provided in the configuration: ~p~n",
[TryNodes]),
- init(true, [node()])
+ init_db_and_upgrade([node()], disc, false)
end.
%% Make the node join a cluster. The node will be reset automatically
@@ -165,21 +152,18 @@ init_from_config() ->
%% Note that we make no attempt to verify that the nodes provided are
%% all in the same cluster, we simply pick the first online node and
%% we cluster to its cluster.
-join_cluster(DiscoveryNode, WantDiscNode) ->
- case is_disc_and_clustered() andalso [node()] =:= clustered_disc_nodes() of
- true -> e(clustering_only_disc_node);
- _ -> ok
- end,
-
+join_cluster(DiscoveryNode, NodeType) ->
ensure_mnesia_not_running(),
ensure_mnesia_dir(),
-
+ case is_only_clustered_disc_node() of
+ true -> e(clustering_only_disc_node);
+ false -> ok
+ end,
{ClusterNodes, _, _} = case discover_cluster(DiscoveryNode) of
{ok, Res} -> Res;
- E = {error, _} -> throw(E)
+ {error, _} = E -> throw(E)
end,
-
- case lists:member(node(), ClusterNodes) of
+ case me_in_nodes(ClusterNodes) of
true -> e(already_clustered);
false -> ok
end,
@@ -190,11 +174,10 @@ join_cluster(DiscoveryNode, WantDiscNode) ->
%% of reseting the node from the user.
reset(false),
- rabbit_misc:local_info_msg("Clustering with ~p~n", [ClusterNodes]),
-
%% Join the cluster
- ok = init_db_with_mnesia(ClusterNodes, WantDiscNode, false),
-
+ rabbit_misc:local_info_msg("Clustering with ~p as ~p node~n",
+ [ClusterNodes, NodeType]),
+ ok = init_db_with_mnesia(ClusterNodes, NodeType, true, true),
rabbit_node_monitor:notify_joined_cluster(),
ok.
@@ -202,87 +185,81 @@ join_cluster(DiscoveryNode, WantDiscNode) ->
%% return node to its virgin state, where it is not member of any
%% cluster, has no cluster configuration, no local database, and no
%% persisted messages
-reset() -> reset(false).
-force_reset() -> reset(true).
+reset() ->
+ rabbit_misc:local_info_msg("Resetting Rabbit~n", []),
+ reset(false).
+
+force_reset() ->
+ rabbit_misc:local_info_msg("Resetting Rabbit forcefully~n", []),
+ reset(true).
reset(Force) ->
- rabbit_misc:local_info_msg("Resetting Rabbit~s~n",
- [if Force -> " forcefully";
- true -> ""
- end]),
ensure_mnesia_not_running(),
- Node = node(),
- case Force of
- true ->
- disconnect_nodes(nodes());
- false ->
- AllNodes = all_clustered_nodes(),
- %% Reconnecting so that we will get an up to date nodes.
- %% We don't need to check for consistency because we are
- %% resetting. Force=true here so that reset still works
- %% when clustered with a node which is down.
- init_db_with_mnesia(AllNodes, is_disc_node(), false, true),
- case is_disc_and_clustered() andalso
- [node()] =:= clustered_disc_nodes()
- of
- true -> e(resetting_only_disc_node);
- false -> ok
+ Nodes = case Force of
+ true ->
+ nodes();
+ false ->
+ AllNodes = cluster_nodes(all),
+ %% Reconnecting so that we will get an up to date
+ %% nodes. We don't need to check for consistency
+ %% because we are resetting. Force=true here so
+ %% that reset still works when clustered with a
+ %% node which is down.
+ init_db_with_mnesia(AllNodes, node_type(), false, false),
+ case is_only_clustered_disc_node() of
+ true -> e(resetting_only_disc_node);
+ false -> ok
+ end,
+ leave_cluster(),
+ rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
+ cannot_delete_schema),
+ cluster_nodes(all)
end,
- leave_cluster(),
- rabbit_misc:ensure_ok(mnesia:delete_schema([Node]),
- cannot_delete_schema),
- disconnect_nodes(all_clustered_nodes()),
- ok
- end,
+ %% We need to make sure that we don't end up in a distributed
+ %% Erlang system with nodes while not being in an Mnesia cluster
+ %% with them. We don't handle that well.
+ [erlang:disconnect_node(N) || N <- Nodes],
%% remove persisted messages and any other garbage we find
ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
ok = rabbit_node_monitor:reset_cluster_status(),
ok.
-%% We need to make sure that we don't end up in a distributed Erlang
-%% system with nodes while not being in an Mnesia cluster with
-%% them. We don't handle that well.
-disconnect_nodes(Nodes) -> [erlang:disconnect_node(N) || N <- Nodes].
-
change_cluster_node_type(Type) ->
- ensure_mnesia_dir(),
ensure_mnesia_not_running(),
+ ensure_mnesia_dir(),
case is_clustered() of
false -> e(not_clustered);
true -> ok
end,
- {_, _, RunningNodes} =
- case discover_cluster(all_clustered_nodes()) of
- {ok, Status} -> Status;
- {error, _Reason} -> e(cannot_connect_to_cluster)
- end,
+ {_, _, RunningNodes} = case discover_cluster(cluster_nodes(all)) of
+ {ok, Status} -> Status;
+ {error, _Reason} -> e(cannot_connect_to_cluster)
+ end,
Node = case RunningNodes of
[] -> e(no_online_cluster_nodes);
[Node0|_] -> Node0
end,
- ok = reset(false),
- ok = join_cluster(Node, case Type of
- ram -> false;
- disc -> true
- end).
+ ok = reset(),
+ ok = join_cluster(Node, Type).
update_cluster_nodes(DiscoveryNode) ->
ensure_mnesia_not_running(),
ensure_mnesia_dir(),
-
Status = {AllNodes, _, _} =
case discover_cluster(DiscoveryNode) of
{ok, Status0} -> Status0;
{error, _Reason} -> e(cannot_connect_to_node)
end,
- case ordsets:is_element(node(), AllNodes) of
+ case me_in_nodes(AllNodes) of
true ->
%% As in `check_consistency/0', we can safely delete the
%% schema here, since it'll be replicated from the other
%% nodes
mnesia:delete_schema([node()]),
rabbit_node_monitor:write_cluster_status(Status),
- init_db_with_mnesia(AllNodes, is_disc_node(), false);
+ rabbit_misc:local_info_msg("Updating cluster nodes from ~p~n",
+ [DiscoveryNode]),
+ init_db_with_mnesia(AllNodes, node_type(), true, true);
false ->
e(inconsistent_cluster)
end,
@@ -296,29 +273,25 @@ update_cluster_nodes(DiscoveryNode) ->
%% the last or second to last after the node we're removing to go
%% down
forget_cluster_node(Node, RemoveWhenOffline) ->
- case ordsets:is_element(Node, all_clustered_nodes()) of
+ case lists:member(Node, cluster_nodes(all)) of
true -> ok;
false -> e(not_a_cluster_node)
end,
- case {mnesia:system_info(is_running), RemoveWhenOffline} of
- {yes, true} -> e(online_node_offline_flag);
- _ -> ok
- end,
- case remove_node_if_mnesia_running(Node) of
- ok ->
- ok;
- {error, mnesia_not_running} when RemoveWhenOffline ->
- remove_node_offline_node(Node);
- {error, mnesia_not_running} ->
- e(offline_node_no_offline_flag);
- Err = {error, _} ->
- throw(Err)
+ case {RemoveWhenOffline, mnesia:system_info(is_running)} of
+ {true, no} -> remove_node_offline_node(Node);
+ {true, yes} -> e(online_node_offline_flag);
+ {false, no} -> e(offline_node_no_offline_flag);
+ {false, yes} -> rabbit_misc:local_info_msg(
+ "Removing node ~p from cluster~n", [Node]),
+ case remove_node_if_mnesia_running(Node) of
+ ok -> ok;
+ {error, _} = Err -> throw(Err)
+ end
end.
remove_node_offline_node(Node) ->
- case {ordsets:del_element(Node, running_nodes(all_clustered_nodes())),
- is_disc_node()} of
- {[], true} ->
+ case {running_nodes(cluster_nodes(all)) -- [Node], node_type()} of
+ {[], disc} ->
%% Note that while we check if the nodes was the last to
%% go down, apart from the node we're removing from, this
%% is still unsafe. Consider the situation in which A and
@@ -327,12 +300,10 @@ remove_node_offline_node(Node) ->
%% and B goes down. In this case, C is the second-to-last,
%% but we don't know that and we'll remove B from A
%% anyway, even if that will lead to bad things.
- case ordsets:subtract(running_clustered_nodes(),
- ordsets:from_list([node(), Node])) of
+ case cluster_nodes(running) -- [node(), Node] of
[] -> start_mnesia(),
try
- [mnesia:force_load_table(T) ||
- T <- rabbit_mnesia:table_names()],
+ [mnesia:force_load_table(T) || T <- table_names()],
forget_cluster_node(Node, false),
ensure_mnesia_running()
after
@@ -350,13 +321,13 @@ remove_node_offline_node(Node) ->
%%----------------------------------------------------------------------------
status() ->
- IfNonEmpty = fun (_, []) -> [];
+ IfNonEmpty = fun (_, []) -> [];
(Type, Nodes) -> [{Type, Nodes}]
end,
- [{nodes, (IfNonEmpty(disc, clustered_disc_nodes()) ++
- IfNonEmpty(ram, clustered_ram_nodes()))}] ++
+ [{nodes, (IfNonEmpty(disc, cluster_nodes(disc)) ++
+ IfNonEmpty(ram, cluster_nodes(ram)))}] ++
case mnesia:system_info(is_running) of
- yes -> [{running_nodes, running_clustered_nodes()}];
+ yes -> [{running_nodes, cluster_nodes(running)}];
no -> []
end.
@@ -364,27 +335,10 @@ is_db_empty() ->
lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
table_names()).
-is_clustered() ->
- Nodes = all_clustered_nodes(),
- [node()] =/= Nodes andalso [] =/= Nodes.
-
-is_disc_and_clustered() -> is_disc_node() andalso is_clustered().
-
-%% Functions that retrieve the nodes in the cluster will rely on the
-%% status file if offline.
+is_clustered() -> AllNodes = cluster_nodes(all),
+ AllNodes =/= [] andalso AllNodes =/= [node()].
-all_clustered_nodes() -> cluster_status(all).
-
-clustered_disc_nodes() -> cluster_status(disc).
-
-clustered_ram_nodes() -> ordsets:subtract(cluster_status(all),
- cluster_status(disc)).
-
-running_clustered_nodes() -> cluster_status(running).
-
-running_clustered_disc_nodes() ->
- {_, DiscNodes, RunningNodes} = cluster_status(),
- ordsets:intersection(DiscNodes, RunningNodes).
+cluster_nodes(WhichNodes) -> cluster_status(WhichNodes).
%% This function is the actual source of information, since it gets
%% the data from mnesia. Obviously it'll work only when mnesia is
@@ -398,75 +352,64 @@ mnesia_nodes() ->
%% `init_db/3' hasn't been run yet. In other words, either
%% we are a virgin node or a restarted RAM node. In both
%% cases we're not interested in what mnesia has to say.
- IsDiscNode = mnesia:system_info(use_dir),
+ NodeType = case mnesia:system_info(use_dir) of
+ true -> disc;
+ false -> ram
+ end,
Tables = mnesia:system_info(tables),
- {Table, _} = case table_definitions(case IsDiscNode of
- true -> disc;
- false -> ram
- end) of [T|_] -> T end,
+ [{Table, _} | _] = table_definitions(NodeType),
case lists:member(Table, Tables) of
- true ->
- AllNodes =
- ordsets:from_list(mnesia:system_info(db_nodes)),
- DiscCopies = ordsets:from_list(
- mnesia:table_info(schema, disc_copies)),
- DiscNodes =
- case IsDiscNode of
- true -> ordsets:add_element(node(), DiscCopies);
- false -> DiscCopies
- end,
- {ok, {AllNodes, DiscNodes}};
- false ->
- {error, tables_not_present}
+ true -> AllNodes = mnesia:system_info(db_nodes),
+ DiscCopies = mnesia:table_info(schema, disc_copies),
+ DiscNodes = case NodeType of
+ disc -> nodes_incl_me(DiscCopies);
+ ram -> DiscCopies
+ end,
+ {ok, {AllNodes, DiscNodes}};
+ false -> {error, tables_not_present}
end
end.
-cluster_status(WhichNodes, ForceMnesia) ->
+cluster_status(WhichNodes) ->
%% I don't want to call `running_nodes/1' unless if necessary, since it's
%% pretty expensive.
- Nodes = case mnesia_nodes() of
- {ok, {AllNodes, DiscNodes}} ->
- {ok, {AllNodes, DiscNodes,
- fun() -> running_nodes(AllNodes) end}};
- {error, _Reason} when not ForceMnesia ->
- {AllNodes, DiscNodes, RunningNodes} =
- rabbit_node_monitor:read_cluster_status(),
- %% The cluster status file records the status when the node
- %% is online, but we know for sure that the node is offline
- %% now, so we can remove it from the list of running nodes.
- {ok,
- {AllNodes, DiscNodes,
- fun() -> ordsets:del_element(node(), RunningNodes) end}};
- Err = {error, _} ->
- Err
- end,
- case Nodes of
- {ok, {AllNodes1, DiscNodes1, RunningNodesThunk}} ->
- {ok, case WhichNodes of
- status -> {AllNodes1, DiscNodes1, RunningNodesThunk()};
- all -> AllNodes1;
- disc -> DiscNodes1;
- running -> RunningNodesThunk()
- end};
- Err1 = {error, _} ->
- Err1
+ {AllNodes1, DiscNodes1, RunningNodesThunk} =
+ case mnesia_nodes() of
+ {ok, {AllNodes, DiscNodes}} ->
+ {AllNodes, DiscNodes, fun() -> running_nodes(AllNodes) end};
+ {error, _Reason} ->
+ {AllNodes, DiscNodes, RunningNodes} =
+ rabbit_node_monitor:read_cluster_status(),
+ %% The cluster status file records the status when the node is
+ %% online, but we know for sure that the node is offline now, so
+ %% we can remove it from the list of running nodes.
+ {AllNodes, DiscNodes, fun() -> nodes_excl_me(RunningNodes) end}
+ end,
+ case WhichNodes of
+ status -> {AllNodes1, DiscNodes1, RunningNodesThunk()};
+ all -> AllNodes1;
+ disc -> DiscNodes1;
+ ram -> AllNodes1 -- DiscNodes1;
+ running -> RunningNodesThunk()
end.
-cluster_status(WhichNodes) ->
- {ok, Status} = cluster_status(WhichNodes, false),
- Status.
-
-cluster_status() -> cluster_status(status).
-
-cluster_status_from_mnesia() -> cluster_status(status, true).
+cluster_status_from_mnesia() ->
+ case mnesia_nodes() of
+ {ok, {AllNodes, DiscNodes}} -> {ok, {AllNodes, DiscNodes,
+ running_nodes(AllNodes)}};
+ {error, _} = Err -> Err
+ end.
node_info() ->
{erlang:system_info(otp_release), rabbit_misc:version(),
cluster_status_from_mnesia()}.
-is_disc_node() ->
- DiscNodes = clustered_disc_nodes(),
- DiscNodes =:= [] orelse ordsets:is_element(node(), DiscNodes).
+node_type() ->
+ DiscNodes = cluster_nodes(disc),
+ case DiscNodes =:= [] orelse me_in_nodes(DiscNodes) of
+ true -> disc;
+ false -> ram
+ end.
dir() -> mnesia:system_info(directory).
@@ -480,21 +423,21 @@ table_names() -> [Tab || {Tab, _} <- table_definitions()].
%% schema if there is the need to and catching up if there are other
%% nodes in the cluster already. It also updates the cluster status
%% file.
-init_db(ClusterNodes, WantDiscNode, Force) ->
- Nodes = change_extra_db_nodes(ClusterNodes, Force),
+init_db(ClusterNodes, NodeType, CheckOtherNodes) ->
+ Nodes = change_extra_db_nodes(ClusterNodes, CheckOtherNodes),
%% Note that we use `system_info' here and not the cluster status
%% since when we start rabbit for the first time the cluster
%% status will say we are a disc node but the tables won't be
%% present yet.
WasDiscNode = mnesia:system_info(use_dir),
- case {Nodes, WasDiscNode, WantDiscNode} of
- {[], _, false} ->
+ case {Nodes, WasDiscNode, NodeType} of
+ {[], _, ram} ->
%% Standalone ram node, we don't want that
throw({error, cannot_create_standalone_ram_node});
- {[], false, true} ->
+ {[], false, disc} ->
%% RAM -> disc, starting from scratch
ok = create_schema();
- {[], true, true} ->
+ {[], true, disc} ->
%% First disc node up
ok;
{[AnotherNode | _], _, _} ->
@@ -507,19 +450,22 @@ init_db(ClusterNodes, WantDiscNode, Force) ->
%% first when moving to RAM mnesia will loudly complain
%% since it doesn't make much sense to do that. But when
%% moving to disc, we need to move the schema first.
- case WantDiscNode of
- true -> create_local_table_copy(schema, disc_copies),
- create_local_table_copies(disc);
- false -> create_local_table_copies(ram),
- create_local_table_copy(schema, ram_copies)
+ case NodeType of
+ disc -> create_local_table_copy(schema, disc_copies),
+ create_local_table_copies(disc);
+ ram -> create_local_table_copies(ram),
+ create_local_table_copy(schema, ram_copies)
end
end,
ensure_schema_integrity(),
rabbit_node_monitor:update_cluster_status(),
ok.
-init_db_and_upgrade(ClusterNodes, WantDiscNode, Force) ->
- ok = init_db(ClusterNodes, WantDiscNode, Force),
+init_db_unchecked(ClusterNodes, NodeType) ->
+ init_db(ClusterNodes, NodeType, false).
+
+init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes) ->
+ ok = init_db(ClusterNodes, NodeType, CheckOtherNodes),
ok = case rabbit_upgrade:maybe_upgrade_local() of
ok -> ok;
starting_from_scratch -> rabbit_version:record_desired();
@@ -527,25 +473,23 @@ init_db_and_upgrade(ClusterNodes, WantDiscNode, Force) ->
end,
%% `maybe_upgrade_local' restarts mnesia, so ram nodes will forget
%% about the cluster
- case WantDiscNode of
- false -> start_mnesia(),
- change_extra_db_nodes(ClusterNodes, true),
- wait_for_replicated_tables();
- true -> ok
+ case NodeType of
+ ram -> start_mnesia(),
+ change_extra_db_nodes(ClusterNodes, false),
+ wait_for_replicated_tables();
+ disc -> ok
end,
ok.
-init_db_with_mnesia(ClusterNodes, WantDiscNode, CheckConsistency, Force) ->
+init_db_with_mnesia(ClusterNodes, NodeType,
+ CheckOtherNodes, CheckConsistency) ->
start_mnesia(CheckConsistency),
try
- init_db_and_upgrade(ClusterNodes, WantDiscNode, Force)
+ init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes)
after
stop_mnesia()
end.
-init_db_with_mnesia(ClusterNodes, WantDiscNode, Force) ->
- init_db_with_mnesia(ClusterNodes, WantDiscNode, true, Force).
-
ensure_mnesia_dir() ->
MnesiaDir = dir() ++ "/",
case filelib:ensure_dir(MnesiaDir) of
@@ -593,7 +537,7 @@ check_schema_integrity() ->
true -> check_table_attributes(Tab, TabDef)
end
end) of
- ok -> ok = wait_for_tables(),
+ ok -> ok = wait_for_tables(table_names()),
check_tables(fun check_table_content/2);
Other -> Other
end.
@@ -628,9 +572,9 @@ copy_db(Destination) ->
ok = ensure_mnesia_not_running(),
rabbit_file:recursive_copy(dir(), Destination).
-wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()).
-
-wait_for_tables() -> wait_for_tables(table_names()).
+wait_for_replicated_tables() ->
+ wait_for_tables([Tab || {Tab, TabDef} <- table_definitions(),
+ not lists:member({local_content, true}, TabDef)]).
wait_for_tables(TableNames) ->
case mnesia:wait_for_tables(TableNames, 30000) of
@@ -649,11 +593,11 @@ check_cluster_consistency() ->
case lists:foldl(
fun (Node, {error, _}) -> check_cluster_consistency(Node);
(_Node, {ok, Status}) -> {ok, Status}
- end, {error, not_found},
- ordsets:del_element(node(), all_clustered_nodes()))
+ end, {error, not_found}, nodes_excl_me(cluster_nodes(all)))
of
{ok, Status = {RemoteAllNodes, _, _}} ->
- case ordsets:is_subset(all_clustered_nodes(), RemoteAllNodes) of
+ case ordsets:is_subset(ordsets:from_list(cluster_nodes(all)),
+ ordsets:from_list(RemoteAllNodes)) of
true ->
ok;
false ->
@@ -673,7 +617,7 @@ check_cluster_consistency() ->
rabbit_node_monitor:write_cluster_status(Status);
{error, not_found} ->
ok;
- E = {error, _} ->
+ {error, _} = E ->
throw(E)
end.
@@ -685,7 +629,7 @@ check_cluster_consistency(Node) ->
{error, not_found};
{OTP, Rabbit, {ok, Status}} ->
case check_consistency(OTP, Rabbit, Node, Status) of
- E = {error, _} -> E;
+ {error, _} = E -> E;
{ok, Res} -> {ok, Res}
end
end.
@@ -695,17 +639,22 @@ check_cluster_consistency(Node) ->
%%--------------------------------------------------------------------
on_node_up(Node) ->
- case running_clustered_disc_nodes() =:= [Node] of
- true -> rabbit_log:info("cluster contains disc nodes again~n");
- false -> ok
+ case running_disc_nodes() of
+ [Node] -> rabbit_log:info("cluster contains disc nodes again~n");
+ _ -> ok
end.
on_node_down(_Node) ->
- case running_clustered_disc_nodes() =:= [] of
- true -> rabbit_log:info("only running disc node went down~n");
- false -> ok
+ case running_disc_nodes() of
+ [] -> rabbit_log:info("only running disc node went down~n");
+ _ -> ok
end.
+running_disc_nodes() ->
+ {_AllNodes, DiscNodes, RunningNodes} = cluster_status(status),
+ ordsets:to_list(ordsets:intersection(ordsets:from_list(DiscNodes),
+ ordsets:from_list(RunningNodes))).
+
%%--------------------------------------------------------------------
%% Internal helpers
%%--------------------------------------------------------------------
@@ -717,18 +666,16 @@ discover_cluster(Nodes) when is_list(Nodes) ->
discover_cluster(Node) ->
OfflineError =
{error, {cannot_discover_cluster,
- "The nodes provided is either offline or not running"}},
- case Node =:= node() of
- true ->
- {error, {cannot_discover_cluster,
- "You provided the current node as node to cluster with"}};
- false ->
- case rpc:call(Node,
- rabbit_mnesia, cluster_status_from_mnesia, []) of
- {badrpc, _Reason} -> OfflineError;
- {error, mnesia_not_running} -> OfflineError;
- {ok, Res} -> {ok, Res}
- end
+ "The nodes provided are either offline or not running"}},
+ case node() of
+ Node -> {error, {cannot_discover_cluster,
+ "Cannot cluster node with itself"}};
+ _ -> case rpc:call(Node,
+ rabbit_mnesia, cluster_status_from_mnesia, []) of
+ {badrpc, _Reason} -> OfflineError;
+ {error, mnesia_not_running} -> OfflineError;
+ {ok, Res} -> {ok, Res}
+ end
end.
%% The tables aren't supposed to be on disk on a ram node
@@ -850,11 +797,6 @@ queue_name_match() ->
resource_match(Kind) ->
#resource{kind = Kind, _='_'}.
-replicated_table_names() ->
- [Tab || {Tab, TabDef} <- table_definitions(),
- not lists:member({local_content, true}, TabDef)
- ].
-
check_table_attributes(Tab, TabDef) ->
{_, ExpAttrs} = proplists:lookup(attributes, TabDef),
case mnesia:table_info(Tab, attributes) of
@@ -877,11 +819,7 @@ check_table_content(Tab, TabDef) ->
end.
check_tables(Fun) ->
- case [Error || {Tab, TabDef} <- table_definitions(
- case is_disc_node() of
- true -> disc;
- false -> ram
- end),
+ case [Error || {Tab, TabDef} <- table_definitions(node_type()),
case Fun(Tab, TabDef) of
ok -> Error = none, false;
{error, Error} -> true
@@ -1004,14 +942,13 @@ remove_node_if_mnesia_running(Node) ->
end.
leave_cluster() ->
- case {is_clustered(),
- running_nodes(ordsets:del_element(node(), all_clustered_nodes()))}
- of
- {false, []} -> ok;
- {_, AllNodes} -> case lists:any(fun leave_cluster/1, AllNodes) of
- true -> ok;
- false -> e(no_running_cluster_nodes)
- end
+ RunningNodes = running_nodes(nodes_excl_me(cluster_nodes(all))),
+ case not is_clustered() andalso RunningNodes =:= [] of
+ true -> ok;
+ false -> case lists:any(fun leave_cluster/1, RunningNodes) of
+ true -> ok;
+ false -> e(no_running_cluster_nodes)
+ end
end.
leave_cluster(Node) ->
@@ -1042,25 +979,25 @@ stop_mnesia() ->
stopped = mnesia:stop(),
ensure_mnesia_not_running().
-change_extra_db_nodes(ClusterNodes0, Force) ->
- ClusterNodes = lists:usort(ClusterNodes0) -- [node()],
- case mnesia:change_config(extra_db_nodes, ClusterNodes) of
- {ok, []} when not Force andalso ClusterNodes =/= [] ->
+change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) ->
+ ClusterNodes = nodes_excl_me(ClusterNodes0),
+ case {mnesia:change_config(extra_db_nodes, ClusterNodes), ClusterNodes} of
+ {{ok, []}, [_|_]} when CheckOtherNodes ->
throw({error, {failed_to_cluster_with, ClusterNodes,
"Mnesia could not connect to any nodes."}});
- {ok, Nodes} ->
+ {{ok, Nodes}, _} ->
Nodes
end.
-%% We're not using `mnesia:system_info(running_db_nodes)' directly because if
-%% the node is a RAM node it won't know about other nodes when mnesia is stopped
+%% We're not using `mnesia:system_info(running_db_nodes)' directly
+%% because if the node is a RAM node it won't know about other nodes
+%% when mnesia is stopped
running_nodes(Nodes) ->
- {Replies, _BadNodes} =
- rpc:multicall(Nodes, rabbit_mnesia, is_running_remote, []),
+ {Replies, _BadNodes} = rpc:multicall(Nodes,
+ rabbit_mnesia, is_running_remote, []),
[Node || {Running, Node} <- Replies, Running].
-is_running_remote() ->
- {mnesia:system_info(is_running) =:= yes, node()}.
+is_running_remote() -> {mnesia:system_info(is_running) =:= yes, node()}.
check_consistency(OTP, Rabbit) ->
rabbit_misc:sequence_error(
@@ -1073,15 +1010,14 @@ check_consistency(OTP, Rabbit, Node, Status) ->
check_nodes_consistency(Node, Status)]).
check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) ->
- ThisNode = node(),
- case ordsets:is_element(ThisNode, RemoteAllNodes) of
+ case me_in_nodes(RemoteAllNodes) of
true ->
{ok, RemoteStatus};
false ->
{error, {inconsistent_cluster,
rabbit_misc:format("Node ~p thinks it's clustered "
"with node ~p, but ~p disagrees",
- [ThisNode, Node, Node])}}
+ [node(), Node, Node])}}
end.
check_version_consistency(This, Remote, _) when This =:= Remote ->
@@ -1106,13 +1042,16 @@ check_rabbit_consistency(Remote) ->
%% `rabbit_node_monitor:prepare_cluster_status_file/0'.
is_virgin_node() ->
case rabbit_file:list_dir(dir()) of
- {error, enoent} -> true;
- {ok, []} -> true;
+ {error, enoent} ->
+ true;
+ {ok, []} ->
+ true;
{ok, [File1, File2]} ->
lists:usort([dir() ++ "/" ++ File1, dir() ++ "/" ++ File2]) =:=
lists:usort([rabbit_node_monitor:cluster_status_filename(),
rabbit_node_monitor:running_nodes_filename()]);
- {ok, _} -> false
+ {ok, _} ->
+ false
end.
find_good_node([]) ->
@@ -1126,6 +1065,16 @@ find_good_node([Node | Nodes]) ->
end
end.
+is_only_clustered_disc_node() ->
+ node_type() =:= disc andalso is_clustered() andalso
+ cluster_nodes(disc) =:= [node()].
+
+me_in_nodes(Nodes) -> lists:member(node(), Nodes).
+
+nodes_incl_me(Nodes) -> lists:usort([node()|Nodes]).
+
+nodes_excl_me(Nodes) -> Nodes -- [node()].
+
e(Tag) -> throw({error, {Tag, error_description(Tag)}}).
error_description(clustering_only_disc_node) ->
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 2d0ded12..5cf8d1ae 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -295,7 +295,7 @@ start_ssl_client(SslOpts, Sock) ->
start_client(Sock, ssl_transform_fun(SslOpts)).
connections() ->
- rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(),
+ rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_networking, connections_local, []).
connections_local() ->
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 88037953..026aa362 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -18,29 +18,16 @@
-behaviour(gen_server).
+-export([start_link/0]).
-export([running_nodes_filename/0,
- cluster_status_filename/0,
- prepare_cluster_status_files/0,
- write_cluster_status/1,
- read_cluster_status/0,
- update_cluster_status/0,
- reset_cluster_status/0,
-
- joined_cluster/2,
- notify_joined_cluster/0,
- left_cluster/1,
- notify_left_cluster/1,
- node_up/2,
- notify_node_up/0,
-
- start_link/0,
- init/1,
- handle_call/3,
- handle_cast/2,
- handle_info/2,
- terminate/2,
- code_change/3
- ]).
+ cluster_status_filename/0, prepare_cluster_status_files/0,
+ write_cluster_status/1, read_cluster_status/0,
+ update_cluster_status/0, reset_cluster_status/0]).
+-export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
@@ -49,6 +36,8 @@
-ifdef(use_specs).
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+
-spec(running_nodes_filename/0 :: () -> string()).
-spec(cluster_status_filename/0 :: () -> string()).
-spec(prepare_cluster_status_files/0 :: () -> 'ok').
@@ -57,27 +46,31 @@
-spec(update_cluster_status/0 :: () -> 'ok').
-spec(reset_cluster_status/0 :: () -> 'ok').
--spec(joined_cluster/2 :: (node(), boolean()) -> 'ok').
+-spec(notify_node_up/0 :: () -> 'ok').
-spec(notify_joined_cluster/0 :: () -> 'ok').
--spec(left_cluster/1 :: (node()) -> 'ok').
-spec(notify_left_cluster/1 :: (node()) -> 'ok').
--spec(node_up/2 :: (node(), boolean()) -> 'ok').
--spec(notify_node_up/0 :: () -> 'ok').
-endif.
%%----------------------------------------------------------------------------
+%% Start
+%%----------------------------------------------------------------------------
+
+start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+%%----------------------------------------------------------------------------
%% Cluster file operations
%%----------------------------------------------------------------------------
-%% The cluster file information is kept in two files. The "cluster status file"
-%% contains all the clustered nodes and the disc nodes. The "running nodes
-%% file" contains the currently running nodes or the running nodes at shutdown
-%% when the node is down.
+%% The cluster file information is kept in two files. The "cluster
+%% status file" contains all the clustered nodes and the disc nodes.
+%% The "running nodes file" contains the currently running nodes or
+%% the running nodes at shutdown when the node is down.
%%
-%% We strive to keep the files up to date and we rely on this assumption in
-%% various situations. Obviously when mnesia is offline the information we have
-%% will be outdated, but it can't be otherwise.
+%% We strive to keep the files up to date and we rely on this
+%% assumption in various situations. Obviously when mnesia is offline
+%% the information we have will be outdated, but it cannot be
+%% otherwise.
running_nodes_filename() ->
filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown").
@@ -93,6 +86,10 @@ prepare_cluster_status_files() ->
{ok, _ } -> CorruptFiles();
{error, enoent} -> []
end,
+ ThisNode = [node()],
+ %% The running nodes file might contain a set or a list, in case
+ %% of the legacy file
+ RunningNodes2 = lists:usort(ThisNode ++ RunningNodes1),
{AllNodes1, WantDiscNode} =
case try_read_file(cluster_status_filename()) of
{ok, [{AllNodes, DiscNodes0}]} ->
@@ -105,16 +102,11 @@ prepare_cluster_status_files() ->
{error, enoent} ->
{legacy_cluster_nodes([]), true}
end,
-
- ThisNode = [node()],
-
- RunningNodes2 = lists:usort(RunningNodes1 ++ ThisNode),
AllNodes2 = lists:usort(AllNodes1 ++ RunningNodes2),
DiscNodes = case WantDiscNode of
true -> ThisNode;
false -> []
end,
-
ok = write_cluster_status({AllNodes2, DiscNodes, RunningNodes2}).
write_cluster_status({All, Disc, Running}) ->
@@ -132,13 +124,6 @@ write_cluster_status({All, Disc, Running}) ->
{FN, {error, E2}} -> throw({error, {could_not_write_file, FN, E2}})
end.
-try_read_file(FileName) ->
- case rabbit_file:read_term_file(FileName) of
- {ok, Term} -> {ok, Term};
- {error, enoent} -> {error, enoent};
- {error, E} -> throw({error, {cannot_read_file, FileName, E}})
- end.
-
read_cluster_status() ->
case {try_read_file(cluster_status_filename()),
try_read_file(running_nodes_filename())} of
@@ -159,88 +144,78 @@ reset_cluster_status() ->
%% Cluster notifications
%%----------------------------------------------------------------------------
-joined_cluster(Node, IsDiscNode) ->
- gen_server:cast(?SERVER, {joined_cluster, Node, IsDiscNode}).
+notify_node_up() ->
+ Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
+ gen_server:abcast(Nodes, ?SERVER,
+ {node_up, node(), rabbit_mnesia:node_type()}),
+ %% register other active rabbits with this rabbit
+ DiskNodes = rabbit_mnesia:cluster_nodes(disc),
+ [gen_server:cast(?SERVER, {node_up, N, case lists:member(N, DiskNodes) of
+ true -> disc;
+ false -> ram
+ end}) || N <- Nodes],
+ ok.
notify_joined_cluster() ->
- cluster_multicall(joined_cluster, [node(), rabbit_mnesia:is_disc_node()]),
+ Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
+ gen_server:abcast(Nodes, ?SERVER,
+ {joined_cluster, node(), rabbit_mnesia:node_type()}),
ok.
-left_cluster(Node) ->
- gen_server:cast(?SERVER, {left_cluster, Node}).
-
notify_left_cluster(Node) ->
- left_cluster(Node),
- cluster_multicall(left_cluster, [Node]),
- ok.
-
-node_up(Node, IsDiscNode) ->
- gen_server:cast(?SERVER, {node_up, Node, IsDiscNode}).
-
-notify_node_up() ->
- Nodes = cluster_multicall(node_up, [node(), rabbit_mnesia:is_disc_node()]),
- %% register other active rabbits with this rabbit
- [ node_up(N, ordsets:is_element(N, rabbit_mnesia:clustered_disc_nodes())) ||
- N <- Nodes ],
+ Nodes = rabbit_mnesia:cluster_nodes(running),
+ gen_server:abcast(Nodes, ?SERVER, {left_cluster, Node}),
ok.
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-
-init([]) ->
- {ok, no_state}.
+init([]) -> {ok, pmon:new()}.
handle_call(_Request, _From, State) ->
{noreply, State}.
-%% Note: when updating the status file, we can't simply write the mnesia
-%% information since the message can (and will) overtake the mnesia propagation.
-handle_cast({node_up, Node, IsDiscNode}, State) ->
- case is_already_monitored({rabbit, Node}) of
- true -> {noreply, State};
+%% Note: when updating the status file, we can't simply write the
+%% mnesia information since the message can (and will) overtake the
+%% mnesia propagation.
+handle_cast({node_up, Node, NodeType}, Monitors) ->
+ case pmon:is_monitored({rabbit, Node}, Monitors) of
+ true -> {noreply, Monitors};
false -> rabbit_log:info("rabbit on node ~p up~n", [Node]),
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
- write_cluster_status({ordsets:add_element(Node, AllNodes),
- case IsDiscNode of
- true -> ordsets:add_element(
- Node, DiscNodes);
- false -> DiscNodes
+ write_cluster_status({add_node(Node, AllNodes),
+ case NodeType of
+ disc -> add_node(Node, DiscNodes);
+ ram -> DiscNodes
end,
- ordsets:add_element(Node, RunningNodes)}),
- erlang:monitor(process, {rabbit, Node}),
+ add_node(Node, RunningNodes)}),
ok = handle_live_rabbit(Node),
- {noreply, State}
+ {noreply, pmon:monitor({rabbit, Node}, Monitors)}
end;
-handle_cast({joined_cluster, Node, IsDiscNode}, State) ->
+handle_cast({joined_cluster, Node, NodeType}, State) ->
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
- write_cluster_status({ordsets:add_element(Node, AllNodes),
- case IsDiscNode of
- true -> ordsets:add_element(Node,
- DiscNodes);
- false -> DiscNodes
+ write_cluster_status({add_node(Node, AllNodes),
+ case NodeType of
+ disc -> add_node(Node, DiscNodes);
+ ram -> DiscNodes
end,
RunningNodes}),
{noreply, State};
handle_cast({left_cluster, Node}, State) ->
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
- write_cluster_status({ordsets:del_element(Node, AllNodes),
- ordsets:del_element(Node, DiscNodes),
- ordsets:del_element(Node, RunningNodes)}),
+ write_cluster_status({del_node(Node, AllNodes), del_node(Node, DiscNodes),
+ del_node(Node, RunningNodes)}),
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
+handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Monitors) ->
rabbit_log:info("rabbit on node ~p down~n", [Node]),
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
- write_cluster_status({AllNodes, DiscNodes,
- ordsets:del_element(Node, RunningNodes)}),
+ write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}),
ok = handle_dead_rabbit(Node),
- {noreply, State};
+ {noreply, pmon:erase({rabbit, Node}, Monitors)};
handle_info(_Info, State) ->
{noreply, State}.
@@ -271,27 +246,22 @@ handle_live_rabbit(Node) ->
%% Internal utils
%%--------------------------------------------------------------------
-cluster_multicall(Fun, Args) ->
- Node = node(),
- Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
- %% notify other rabbits of this cluster
- case rpc:multicall(Nodes, rabbit_node_monitor, Fun, Args,
- ?RABBIT_UP_RPC_TIMEOUT) of
- {_, [] } -> ok;
- {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
- end,
- Nodes.
-
-is_already_monitored(Item) ->
- {monitors, Monitors} = process_info(self(), monitors),
- lists:any(fun ({_, Item1}) when Item =:= Item1 -> true;
- (_) -> false
- end, Monitors).
+try_read_file(FileName) ->
+ case rabbit_file:read_term_file(FileName) of
+ {ok, Term} -> {ok, Term};
+ {error, enoent} -> {error, enoent};
+ {error, E} -> throw({error, {cannot_read_file, FileName, E}})
+ end.
legacy_cluster_nodes(Nodes) ->
- %% We get all the info that we can, including the nodes from mnesia, which
- %% will be there if the node is a disc node (empty list otherwise)
+ %% We get all the info that we can, including the nodes from
+ %% mnesia, which will be there if the node is a disc node (empty
+ %% list otherwise)
lists:usort(Nodes ++ mnesia:system_info(db_nodes)).
legacy_should_be_disc_node(DiscNodes) ->
DiscNodes == [] orelse lists:member(node(), DiscNodes).
+
+add_node(Node, Nodes) -> lists:usort([Node | Nodes]).
+
+del_node(Node, Nodes) -> Nodes -- [Node].
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 08535e7d..df0ee721 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1511,15 +1511,15 @@ clean_logs(Files, Suffix) ->
ok.
assert_ram_node() ->
- case rabbit_mnesia:is_disc_node() of
- true -> exit('not_ram_node');
- false -> ok
+ case rabbit_mnesia:node_type() of
+ disc -> exit('not_ram_node');
+ ram -> ok
end.
assert_disc_node() ->
- case rabbit_mnesia:is_disc_node() of
- true -> ok;
- false -> exit('not_disc_node')
+ case rabbit_mnesia:node_type() of
+ disc -> ok;
+ ram -> exit('not_disc_node')
end.
delete_file(File) ->
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 3fbfeed0..d037f954 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -66,11 +66,11 @@
%% into the boot process by prelaunch before the mnesia application is
%% started. By the time Mnesia is started the upgrades have happened
%% (on the primary), or Mnesia has been reset (on the secondary) and
-%% rabbit_mnesia:init_db/3 can then make the node rejoin the cluster
+%% rabbit_mnesia:init_db_unchecked/2 can then make the node rejoin the cluster
%% in the normal way.
%%
%% The non-mnesia upgrades are then triggered by
-%% rabbit_mnesia:init_db/3. Of course, it's possible for a given
+%% rabbit_mnesia:init_db_unchecked/2. Of course, it's possible for a given
%% upgrade process to only require Mnesia upgrades, or only require
%% non-Mnesia upgrades. In the latter case no Mnesia resets and
%% reclusterings occur.
@@ -121,16 +121,16 @@ remove_backup() ->
info("upgrades: Mnesia backup removed~n", []).
maybe_upgrade_mnesia() ->
- AllNodes = rabbit_mnesia:all_clustered_nodes(),
+ AllNodes = rabbit_mnesia:cluster_nodes(all),
case rabbit_version:upgrades_required(mnesia) of
{error, starting_from_scratch} ->
ok;
{error, version_not_available} ->
case AllNodes of
- [_] -> ok;
- _ -> die("Cluster upgrade needed but upgrading from "
- "< 2.1.1.~nUnfortunately you will need to "
- "rebuild the cluster.", [])
+ [] -> die("Cluster upgrade needed but upgrading from "
+ "< 2.1.1.~nUnfortunately you will need to "
+ "rebuild the cluster.", []);
+ _ -> ok
end;
{error, _} = Err ->
throw(Err);
@@ -147,11 +147,11 @@ maybe_upgrade_mnesia() ->
upgrade_mode(AllNodes) ->
case nodes_running(AllNodes) of
[] ->
- AfterUs = rabbit_mnesia:running_clustered_nodes() -- [node()],
- case {is_disc_node_legacy(), AfterUs} of
- {true, []} ->
+ AfterUs = rabbit_mnesia:cluster_nodes(running) -- [node()],
+ case {node_type_legacy(), AfterUs} of
+ {disc, []} ->
primary;
- {true, _} ->
+ {disc, _} ->
Filename = rabbit_node_monitor:running_nodes_filename(),
die("Cluster upgrade needed but other disc nodes shut "
"down after this one.~nPlease first start the last "
@@ -160,7 +160,7 @@ upgrade_mode(AllNodes) ->
"all~nshow this message. In which case, remove "
"the lock file on one of them and~nstart that node. "
"The lock file on this node is:~n~n ~s ", [Filename]);
- {false, _} ->
+ {ram, _} ->
die("Cluster upgrade needed but this is a ram node.~n"
"Please first start the last disc node to shut down.",
[])
@@ -216,11 +216,11 @@ force_tables() ->
secondary_upgrade(AllNodes) ->
%% must do this before we wipe out schema
- IsDiscNode = is_disc_node_legacy(),
+ NodeType = node_type_legacy(),
rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
cannot_delete_schema),
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
- ok = rabbit_mnesia:init_db(AllNodes, IsDiscNode, true),
+ ok = rabbit_mnesia:init_db_unchecked(AllNodes, NodeType),
ok = rabbit_version:record_desired_for_scope(mnesia),
ok.
@@ -268,13 +268,16 @@ lock_filename() -> lock_filename(dir()).
lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME).
backup_dir() -> dir() ++ "-upgrade-backup".
-is_disc_node_legacy() ->
+node_type_legacy() ->
%% This is pretty ugly but we can't start Mnesia and ask it (will
%% hang), we can't look at the config file (may not include us
%% even if we're a disc node). We also can't use
- %% rabbit_mnesia:is_disc_node/0 because that will give false
+ %% rabbit_mnesia:node_type/0 because that will give false
%% postivies on Rabbit up to 2.5.1.
- filelib:is_regular(filename:join(dir(), "rabbit_durable_exchange.DCD")).
+ case filelib:is_regular(filename:join(dir(), "rabbit_durable_exchange.DCD")) of
+ true -> disc;
+ false -> ram
+ end.
%% NB: we cannot use rabbit_log here since it may not have been
%% started yet