summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mnesia.erl15
-rw-r--r--src/rabbit_node_monitor.erl200
2 files changed, 97 insertions, 118 deletions
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index b5d35b95..31672512 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -119,7 +119,7 @@
prepare() ->
ensure_mnesia_dir(),
- rabbit_node_monitor:prepare_cluster_status_file(),
+ rabbit_node_monitor:prepare_cluster_status_files(),
check_cluster_consistency().
init() ->
@@ -250,7 +250,7 @@ reset(Force) ->
end,
%% remove persisted messages and any other garbage we find
ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
- ok = rabbit_node_monitor:reset_cluster_status_file(),
+ ok = rabbit_node_monitor:reset_cluster_status(),
ok.
%% We need to make sure that we don't end up in a distributed Erlang system with
@@ -311,7 +311,7 @@ update_cluster_nodes(DiscoveryNode) ->
true -> %% As in `check_consistency/0', we can safely delete the schema
%% here, since it'll be replicated from the other nodes
mnesia:delete_schema([node()]),
- rabbit_node_monitor:write_cluster_status_file(Status),
+ rabbit_node_monitor:write_cluster_status(Status),
init_db_with_mnesia(AllNodes, is_disc_node(), false);
false -> throw({error,
{inconsistent_cluster,
@@ -487,7 +487,7 @@ cluster_status(WhichNodes, ForceMnesia) ->
fun() -> running_nodes(AllNodes) end}};
{error, _Reason} when not ForceMnesia ->
{AllNodes, DiscNodes, RunningNodes} =
- rabbit_node_monitor:read_cluster_status_file(),
+ rabbit_node_monitor:read_cluster_status(),
%% The cluster status file records the status when the node
%% is online, but we know for sure that the node is offline
%% now, so we can remove it from the list of running nodes.
@@ -573,7 +573,7 @@ init_db(ClusterNodes, WantDiscNode, Force) ->
end
end,
ensure_schema_integrity(),
- rabbit_node_monitor:update_cluster_status_file(),
+ rabbit_node_monitor:update_cluster_status(),
ok.
init_db_and_upgrade(ClusterNodes, WantDiscNode, Force) ->
@@ -724,7 +724,7 @@ check_cluster_consistency() ->
%% nodes.
mnesia:delete_schema([node()])
end,
- rabbit_node_monitor:write_cluster_status_file(Status);
+ rabbit_node_monitor:write_cluster_status(Status);
not_found ->
ok
end.
@@ -1174,7 +1174,8 @@ is_virgin_node() ->
{error, enoent} -> true;
{ok, []} -> true;
{ok, [File]} -> (dir() ++ "/" ++ File) =:=
- rabbit_node_monitor:cluster_status_file_name();
+ [rabbit_node_monitor:cluster_status_file_name(),
+ rabbit_node_monitor:running_nodes_file_name()];
{ok, _} -> false
end.
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index d29408de..b98fa5fd 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -19,11 +19,12 @@
-behaviour(gen_server).
-export([cluster_status_file_name/0,
- prepare_cluster_status_file/0,
- write_cluster_status_file/1,
- read_cluster_status_file/0,
- update_cluster_status_file/0,
- reset_cluster_status_file/0,
+ running_nodes_file_name/0,
+ prepare_cluster_status_files/0,
+ write_cluster_status/1,
+ read_cluster_status/0,
+ update_cluster_status/0,
+ reset_cluster_status/0,
joined_cluster/2,
notify_joined_cluster/0,
@@ -49,12 +50,12 @@
-ifdef(use_specs).
-spec(cluster_status_file_name/0 :: () -> string()).
--spec(prepare_cluster_status_file/0 :: () -> 'ok').
--spec(write_cluster_status_file/1 :: (rabbit_mnesia:cluster_status())
- -> 'ok').
--spec(read_cluster_status_file/0 :: () -> rabbit_mnesia:cluster_status()).
--spec(update_cluster_status_file/0 :: () -> 'ok').
--spec(reset_cluster_status_file/0 :: () -> 'ok').
+-spec(prepare_cluster_status_files/0 :: () -> 'ok').
+-spec(write_cluster_status/1 :: (rabbit_mnesia:cluster_status())
+ -> 'ok').
+-spec(read_cluster_status/0 :: () -> rabbit_mnesia:cluster_status()).
+-spec(update_cluster_status/0 :: () -> 'ok').
+-spec(reset_cluster_status/0 :: () -> 'ok').
-spec(joined_cluster/2 :: (node(), boolean()) -> 'ok').
-spec(notify_joined_cluster/0 :: () -> 'ok').
@@ -85,70 +86,72 @@
cluster_status_file_name() ->
rabbit_mnesia:dir() ++ "/cluster_nodes.config".
-prepare_cluster_status_file() ->
- NotPresent =
- fun (AllNodes0, WantDiscNode) ->
- ThisNode = [node()],
-
- RunningNodes0 = legacy_read_previously_running_nodes(),
- legacy_delete_previously_running_nodes(),
-
- RunningNodes = lists:usort(RunningNodes0 ++ ThisNode),
- AllNodes =
- lists:usort(AllNodes0 ++ RunningNodes),
- DiscNodes = case WantDiscNode of
- true -> ThisNode;
- false -> []
- end,
+running_nodes_file_name() ->
+ filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown").
- ok = write_cluster_status_file({AllNodes, DiscNodes,
- RunningNodes})
+prepare_cluster_status_files() ->
+ RunningNodes1 = case try_read_file(running_nodes_file_name()) of
+ {ok, [Nodes]} when is_list(Nodes) -> Nodes;
+ non_existant -> []
+ end,
+ {AllNodes1, WantDiscNode} =
+ case try_read_file(cluster_status_file_name()) of
+ {ok, [{_, _}]} ->
+ ok;
+ {ok, [AllNodes0]} when is_list(AllNodes0) ->
+ {AllNodes0, legacy_should_be_disc_node(AllNodes0)};
+ non_existant ->
+ {[], true}
end,
- case try_read_cluster_status_file() of
- {ok, _} ->
- ok;
- {error, {invalid_term, _, [AllNodes]}} ->
- %% Legacy file
- NotPresent(AllNodes, legacy_should_be_disc_node(AllNodes));
- {error, {cannot_read_file, _, enoent}} ->
- NotPresent([], true)
- end.
-
-write_cluster_status_file(Status) ->
- FileName = cluster_status_file_name(),
- case rabbit_file:write_term_file(FileName, [Status]) of
- ok -> ok;
- {error, Reason} ->
- throw({error, {cannot_write_cluster_status_file,
- FileName, Reason}})
+ ThisNode = [node()],
+
+ RunningNodes2 = lists:usort(RunningNodes1 ++ ThisNode),
+ AllNodes2 = lists:usort(AllNodes1 ++ RunningNodes2),
+ DiscNodes = case WantDiscNode of
+ true -> ThisNode;
+ false -> []
+ end,
+
+ ok = write_cluster_status({AllNodes2, DiscNodes, RunningNodes2}).
+
+write_cluster_status({All, Disc, Running}) ->
+ ClusterStatusFN = cluster_status_file_name(),
+ Res = case rabbit_file:write_term_file(ClusterStatusFN, [{All, Disc}]) of
+ ok ->
+ RunningNodesFN = running_nodes_file_name(),
+ {RunningNodesFN,
+ rabbit_file:write_term_file(RunningNodesFN, [Running])};
+ E1 = {error, _} ->
+ {ClusterStatusFN, E1}
+ end,
+ case Res of
+ {_, ok} -> ok;
+ {FN, {error, E2}} -> throw({error, {could_not_write_file, FN, E2}})
end.
-try_read_cluster_status_file() ->
- FileName = cluster_status_file_name(),
+try_read_file(FileName) ->
case rabbit_file:read_term_file(FileName) of
- {ok, [{_, _, _} = Status]} ->
- {ok, Status};
- {ok, Term} ->
- {error, {invalid_term, FileName, Term}};
- {error, Reason} ->
- {error, {cannot_read_file, FileName, Reason}}
+ {ok, Term} -> {ok, Term};
+ {error, enoent} -> non_existant;
+ {error, E} -> throw({error, {cannot_read_file, FileName, E}})
end.
-read_cluster_status_file() ->
- case try_read_cluster_status_file() of
- {ok, Status} ->
- Status;
- {error, Reason} ->
- throw({error, {cannot_read_cluster_status_file, Reason}})
+read_cluster_status() ->
+ case {try_read_file(cluster_status_file_name()),
+ try_read_file(running_nodes_file_name())} of
+ {{ok, [{All, Disc}]}, {ok, [Running]}} when is_list(Running) ->
+ {All, Disc, Running};
+ {_, _} ->
+ throw({error, corrupt_or_missing_cluster_files})
end.
-update_cluster_status_file() ->
+update_cluster_status() ->
{ok, Status} = rabbit_mnesia:cluster_status_from_mnesia(),
- write_cluster_status_file(Status).
+ write_cluster_status(Status).
-reset_cluster_status_file() ->
- write_cluster_status_file({[node()], [node()], [node()]}).
+reset_cluster_status() ->
+ write_cluster_status({[node()], [node()], [node()]}).
%%----------------------------------------------------------------------------
%% Cluster notifications
@@ -198,42 +201,42 @@ handle_cast({node_up, Node, IsDiscNode}, State) ->
case is_already_monitored({rabbit, Node}) of
true -> {noreply, State};
false -> rabbit_log:info("rabbit on node ~p up~n", [Node]),
- {AllNodes, DiscNodes, RunningNodes} = read_cluster_status_file(),
- write_cluster_status_file(
- {ordsets:add_element(Node, AllNodes),
- case IsDiscNode of
- true -> ordsets:add_element(Node, DiscNodes);
- false -> DiscNodes
- end,
- ordsets:add_element(Node, RunningNodes)}),
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
+ write_cluster_status({ordsets:add_element(Node, AllNodes),
+ case IsDiscNode of
+ true -> ordsets:add_element(
+ Node, DiscNodes);
+ false -> DiscNodes
+ end,
+ ordsets:add_element(Node, RunningNodes)}),
erlang:monitor(process, {rabbit, Node}),
ok = handle_live_rabbit(Node),
{noreply, State}
end;
handle_cast({joined_cluster, Node, IsDiscNode}, State) ->
- {AllNodes, DiscNodes, RunningNodes} = read_cluster_status_file(),
- write_cluster_status_file({ordsets:add_element(Node, AllNodes),
- case IsDiscNode of
- true -> ordsets:add_element(Node,
- DiscNodes);
- false -> DiscNodes
- end,
- RunningNodes}),
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
+ write_cluster_status({ordsets:add_element(Node, AllNodes),
+ case IsDiscNode of
+ true -> ordsets:add_element(Node,
+ DiscNodes);
+ false -> DiscNodes
+ end,
+ RunningNodes}),
{noreply, State};
handle_cast({left_cluster, Node}, State) ->
- {AllNodes, DiscNodes, RunningNodes} = read_cluster_status_file(),
- write_cluster_status_file({ordsets:del_element(Node, AllNodes),
- ordsets:del_element(Node, DiscNodes),
- ordsets:del_element(Node, RunningNodes)}),
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
+ write_cluster_status({ordsets:del_element(Node, AllNodes),
+ ordsets:del_element(Node, DiscNodes),
+ ordsets:del_element(Node, RunningNodes)}),
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
rabbit_log:info("rabbit on node ~p down~n", [Node]),
- {AllNodes, DiscNodes, RunningNodes} = read_cluster_status_file(),
- write_cluster_status_file({AllNodes, DiscNodes,
- ordsets:del_element(Node, RunningNodes)}),
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
+ write_cluster_status({AllNodes, DiscNodes,
+ ordsets:del_element(Node, RunningNodes)}),
ok = handle_dead_rabbit(Node),
{noreply, State};
handle_info(_Info, State) ->
@@ -285,28 +288,3 @@ is_already_monitored(Item) ->
legacy_should_be_disc_node(DiscNodes) ->
DiscNodes == [] orelse lists:member(node(), DiscNodes).
-
-%%--------------------------------------------------------------------
-%% Legacy functions related to the "running nodes" file
-%%--------------------------------------------------------------------
-
-legacy_running_nodes_filename() ->
- filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown").
-
-legacy_read_previously_running_nodes() ->
- FileName = legacy_running_nodes_filename(),
- case rabbit_file:read_term_file(FileName) of
- {ok, [Nodes]} -> Nodes;
- {error, enoent} -> [];
- {error, Reason} -> throw({error, {cannot_read_previous_nodes_file,
- FileName, Reason}})
- end.
-
-legacy_delete_previously_running_nodes() ->
- FileName = legacy_running_nodes_filename(),
- case file:delete(FileName) of
- ok -> ok;
- {error, enoent} -> ok;
- {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file,
- FileName, Reason}})
- end.