diff options
-rw-r--r-- | src/rabbit_mnesia.erl | 15 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 200 |
2 files changed, 97 insertions, 118 deletions
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index b5d35b95..31672512 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -119,7 +119,7 @@ prepare() -> ensure_mnesia_dir(), - rabbit_node_monitor:prepare_cluster_status_file(), + rabbit_node_monitor:prepare_cluster_status_files(), check_cluster_consistency(). init() -> @@ -250,7 +250,7 @@ reset(Force) -> end, %% remove persisted messages and any other garbage we find ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")), - ok = rabbit_node_monitor:reset_cluster_status_file(), + ok = rabbit_node_monitor:reset_cluster_status(), ok. %% We need to make sure that we don't end up in a distributed Erlang system with @@ -311,7 +311,7 @@ update_cluster_nodes(DiscoveryNode) -> true -> %% As in `check_consistency/0', we can safely delete the schema %% here, since it'll be replicated from the other nodes mnesia:delete_schema([node()]), - rabbit_node_monitor:write_cluster_status_file(Status), + rabbit_node_monitor:write_cluster_status(Status), init_db_with_mnesia(AllNodes, is_disc_node(), false); false -> throw({error, {inconsistent_cluster, @@ -487,7 +487,7 @@ cluster_status(WhichNodes, ForceMnesia) -> fun() -> running_nodes(AllNodes) end}}; {error, _Reason} when not ForceMnesia -> {AllNodes, DiscNodes, RunningNodes} = - rabbit_node_monitor:read_cluster_status_file(), + rabbit_node_monitor:read_cluster_status(), %% The cluster status file records the status when the node %% is online, but we know for sure that the node is offline %% now, so we can remove it from the list of running nodes. @@ -573,7 +573,7 @@ init_db(ClusterNodes, WantDiscNode, Force) -> end end, ensure_schema_integrity(), - rabbit_node_monitor:update_cluster_status_file(), + rabbit_node_monitor:update_cluster_status(), ok. init_db_and_upgrade(ClusterNodes, WantDiscNode, Force) -> @@ -724,7 +724,7 @@ check_cluster_consistency() -> %% nodes. mnesia:delete_schema([node()]) end, - rabbit_node_monitor:write_cluster_status_file(Status); + rabbit_node_monitor:write_cluster_status(Status); not_found -> ok end. @@ -1174,7 +1174,8 @@ is_virgin_node() -> {error, enoent} -> true; {ok, []} -> true; {ok, [File]} -> (dir() ++ "/" ++ File) =:= - rabbit_node_monitor:cluster_status_file_name(); + [rabbit_node_monitor:cluster_status_file_name(), + rabbit_node_monitor:running_nodes_file_name()]; {ok, _} -> false end. diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index d29408de..b98fa5fd 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -19,11 +19,12 @@ -behaviour(gen_server). -export([cluster_status_file_name/0, - prepare_cluster_status_file/0, - write_cluster_status_file/1, - read_cluster_status_file/0, - update_cluster_status_file/0, - reset_cluster_status_file/0, + running_nodes_file_name/0, + prepare_cluster_status_files/0, + write_cluster_status/1, + read_cluster_status/0, + update_cluster_status/0, + reset_cluster_status/0, joined_cluster/2, notify_joined_cluster/0, @@ -49,12 +50,12 @@ -ifdef(use_specs). -spec(cluster_status_file_name/0 :: () -> string()). --spec(prepare_cluster_status_file/0 :: () -> 'ok'). --spec(write_cluster_status_file/1 :: (rabbit_mnesia:cluster_status()) - -> 'ok'). --spec(read_cluster_status_file/0 :: () -> rabbit_mnesia:cluster_status()). --spec(update_cluster_status_file/0 :: () -> 'ok'). --spec(reset_cluster_status_file/0 :: () -> 'ok'). +-spec(prepare_cluster_status_files/0 :: () -> 'ok'). +-spec(write_cluster_status/1 :: (rabbit_mnesia:cluster_status()) + -> 'ok'). +-spec(read_cluster_status/0 :: () -> rabbit_mnesia:cluster_status()). +-spec(update_cluster_status/0 :: () -> 'ok'). +-spec(reset_cluster_status/0 :: () -> 'ok'). -spec(joined_cluster/2 :: (node(), boolean()) -> 'ok'). -spec(notify_joined_cluster/0 :: () -> 'ok'). @@ -85,70 +86,72 @@ cluster_status_file_name() -> rabbit_mnesia:dir() ++ "/cluster_nodes.config". -prepare_cluster_status_file() -> - NotPresent = - fun (AllNodes0, WantDiscNode) -> - ThisNode = [node()], - - RunningNodes0 = legacy_read_previously_running_nodes(), - legacy_delete_previously_running_nodes(), - - RunningNodes = lists:usort(RunningNodes0 ++ ThisNode), - AllNodes = - lists:usort(AllNodes0 ++ RunningNodes), - DiscNodes = case WantDiscNode of - true -> ThisNode; - false -> [] - end, +running_nodes_file_name() -> + filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown"). - ok = write_cluster_status_file({AllNodes, DiscNodes, - RunningNodes}) +prepare_cluster_status_files() -> + RunningNodes1 = case try_read_file(running_nodes_file_name()) of + {ok, [Nodes]} when is_list(Nodes) -> Nodes; + non_existant -> [] + end, + {AllNodes1, WantDiscNode} = + case try_read_file(cluster_status_file_name()) of + {ok, [{_, _}]} -> + ok; + {ok, [AllNodes0]} when is_list(AllNodes0) -> + {AllNodes0, legacy_should_be_disc_node(AllNodes0)}; + non_existant -> + {[], true} end, - case try_read_cluster_status_file() of - {ok, _} -> - ok; - {error, {invalid_term, _, [AllNodes]}} -> - %% Legacy file - NotPresent(AllNodes, legacy_should_be_disc_node(AllNodes)); - {error, {cannot_read_file, _, enoent}} -> - NotPresent([], true) - end. - -write_cluster_status_file(Status) -> - FileName = cluster_status_file_name(), - case rabbit_file:write_term_file(FileName, [Status]) of - ok -> ok; - {error, Reason} -> - throw({error, {cannot_write_cluster_status_file, - FileName, Reason}}) + ThisNode = [node()], + + RunningNodes2 = lists:usort(RunningNodes1 ++ ThisNode), + AllNodes2 = lists:usort(AllNodes1 ++ RunningNodes2), + DiscNodes = case WantDiscNode of + true -> ThisNode; + false -> [] + end, + + ok = write_cluster_status({AllNodes2, DiscNodes, RunningNodes2}). + +write_cluster_status({All, Disc, Running}) -> + ClusterStatusFN = cluster_status_file_name(), + Res = case rabbit_file:write_term_file(ClusterStatusFN, [{All, Disc}]) of + ok -> + RunningNodesFN = running_nodes_file_name(), + {RunningNodesFN, + rabbit_file:write_term_file(RunningNodesFN, [Running])}; + E1 = {error, _} -> + {ClusterStatusFN, E1} + end, + case Res of + {_, ok} -> ok; + {FN, {error, E2}} -> throw({error, {could_not_write_file, FN, E2}}) end. -try_read_cluster_status_file() -> - FileName = cluster_status_file_name(), +try_read_file(FileName) -> case rabbit_file:read_term_file(FileName) of - {ok, [{_, _, _} = Status]} -> - {ok, Status}; - {ok, Term} -> - {error, {invalid_term, FileName, Term}}; - {error, Reason} -> - {error, {cannot_read_file, FileName, Reason}} + {ok, Term} -> {ok, Term}; + {error, enoent} -> non_existant; + {error, E} -> throw({error, {cannot_read_file, FileName, E}}) end. -read_cluster_status_file() -> - case try_read_cluster_status_file() of - {ok, Status} -> - Status; - {error, Reason} -> - throw({error, {cannot_read_cluster_status_file, Reason}}) +read_cluster_status() -> + case {try_read_file(cluster_status_file_name()), + try_read_file(running_nodes_file_name())} of + {{ok, [{All, Disc}]}, {ok, [Running]}} when is_list(Running) -> + {All, Disc, Running}; + {_, _} -> + throw({error, corrupt_or_missing_cluster_files}) end. -update_cluster_status_file() -> +update_cluster_status() -> {ok, Status} = rabbit_mnesia:cluster_status_from_mnesia(), - write_cluster_status_file(Status). + write_cluster_status(Status). -reset_cluster_status_file() -> - write_cluster_status_file({[node()], [node()], [node()]}). +reset_cluster_status() -> + write_cluster_status({[node()], [node()], [node()]}). %%---------------------------------------------------------------------------- %% Cluster notifications @@ -198,42 +201,42 @@ handle_cast({node_up, Node, IsDiscNode}, State) -> case is_already_monitored({rabbit, Node}) of true -> {noreply, State}; false -> rabbit_log:info("rabbit on node ~p up~n", [Node]), - {AllNodes, DiscNodes, RunningNodes} = read_cluster_status_file(), - write_cluster_status_file( - {ordsets:add_element(Node, AllNodes), - case IsDiscNode of - true -> ordsets:add_element(Node, DiscNodes); - false -> DiscNodes - end, - ordsets:add_element(Node, RunningNodes)}), + {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), + write_cluster_status({ordsets:add_element(Node, AllNodes), + case IsDiscNode of + true -> ordsets:add_element( + Node, DiscNodes); + false -> DiscNodes + end, + ordsets:add_element(Node, RunningNodes)}), erlang:monitor(process, {rabbit, Node}), ok = handle_live_rabbit(Node), {noreply, State} end; handle_cast({joined_cluster, Node, IsDiscNode}, State) -> - {AllNodes, DiscNodes, RunningNodes} = read_cluster_status_file(), - write_cluster_status_file({ordsets:add_element(Node, AllNodes), - case IsDiscNode of - true -> ordsets:add_element(Node, - DiscNodes); - false -> DiscNodes - end, - RunningNodes}), + {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), + write_cluster_status({ordsets:add_element(Node, AllNodes), + case IsDiscNode of + true -> ordsets:add_element(Node, + DiscNodes); + false -> DiscNodes + end, + RunningNodes}), {noreply, State}; handle_cast({left_cluster, Node}, State) -> - {AllNodes, DiscNodes, RunningNodes} = read_cluster_status_file(), - write_cluster_status_file({ordsets:del_element(Node, AllNodes), - ordsets:del_element(Node, DiscNodes), - ordsets:del_element(Node, RunningNodes)}), + {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), + write_cluster_status({ordsets:del_element(Node, AllNodes), + ordsets:del_element(Node, DiscNodes), + ordsets:del_element(Node, RunningNodes)}), {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) -> rabbit_log:info("rabbit on node ~p down~n", [Node]), - {AllNodes, DiscNodes, RunningNodes} = read_cluster_status_file(), - write_cluster_status_file({AllNodes, DiscNodes, - ordsets:del_element(Node, RunningNodes)}), + {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), + write_cluster_status({AllNodes, DiscNodes, + ordsets:del_element(Node, RunningNodes)}), ok = handle_dead_rabbit(Node), {noreply, State}; handle_info(_Info, State) -> @@ -285,28 +288,3 @@ is_already_monitored(Item) -> legacy_should_be_disc_node(DiscNodes) -> DiscNodes == [] orelse lists:member(node(), DiscNodes). - -%%-------------------------------------------------------------------- -%% Legacy functions related to the "running nodes" file -%%-------------------------------------------------------------------- - -legacy_running_nodes_filename() -> - filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown"). - -legacy_read_previously_running_nodes() -> - FileName = legacy_running_nodes_filename(), - case rabbit_file:read_term_file(FileName) of - {ok, [Nodes]} -> Nodes; - {error, enoent} -> []; - {error, Reason} -> throw({error, {cannot_read_previous_nodes_file, - FileName, Reason}}) - end. - -legacy_delete_previously_running_nodes() -> - FileName = legacy_running_nodes_filename(), - case file:delete(FileName) of - ok -> ok; - {error, enoent} -> ok; - {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file, - FileName, Reason}}) - end. |