diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-09-26 13:57:55 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-09-26 13:57:55 +0100 |
commit | e6c877d3cdb6ca369d4d9c94af6187794affaac0 (patch) | |
tree | 60642659cfe3a1e15ee4332479dd91eb8f1604e6 | |
parent | caca30e74cbc7a46bb54e5cc25f094abfd8e02ec (diff) | |
parent | 750dcde2aab7e79a01b1da8959b0038f78354374 (diff) | |
download | rabbitmq-server-e6c877d3cdb6ca369d4d9c94af6187794affaac0.tar.gz |
merge bug25160 into default
-rw-r--r-- | ebin/rabbit_app.in | 2 | ||||
-rw-r--r-- | src/rabbit.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 9 | ||||
-rw-r--r-- | src/rabbit_direct.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 11 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 503 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 2 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 194 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 12 | ||||
-rw-r--r-- | src/rabbit_upgrade.erl | 37 |
11 files changed, 350 insertions, 426 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 78842281..9b1ff8bd 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -33,7 +33,7 @@ {default_user_tags, [administrator]}, {default_vhost, <<"/">>}, {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, - {cluster_nodes, {[], true}}, + {cluster_nodes, {[], disc}}, {server_properties, []}, {collect_statistics, none}, {collect_statistics_interval, 5000}, diff --git a/src/rabbit.erl b/src/rabbit.erl index e9587841..3fe27cd9 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -519,7 +519,7 @@ sort_boot_steps(UnsortedSteps) -> end. boot_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) -> - AllNodes = rabbit_mnesia:all_clustered_nodes(), + AllNodes = rabbit_mnesia:cluster_nodes(all), {Err, Nodes} = case AllNodes -- [node()] of [] -> {"Timeout contacting cluster nodes. Since RabbitMQ was" diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e8f3aab3..0d13312b 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -136,7 +136,7 @@ flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). list() -> - rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(), + rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running), rabbit_channel, list_local, []). list_local() -> diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index bd01a1b1..e75e1f6f 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -247,9 +247,12 @@ action(force_reset, Node, [], _Opts, Inform) -> action(join_cluster, Node, [ClusterNodeS], Opts, Inform) -> ClusterNode = list_to_atom(ClusterNodeS), - DiscNode = not proplists:get_bool(?RAM_OPT, Opts), + NodeType = case proplists:get_bool(?RAM_OPT, Opts) of + true -> ram; + false -> disc + end, Inform("Clustering node ~p with ~p", [Node, ClusterNode]), - rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, DiscNode]); + rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, NodeType]); action(change_cluster_node_type, Node, ["ram"], _Opts, Inform) -> Inform("Turning ~p into a ram node", [Node]), @@ -458,7 +461,7 @@ action(list_parameters, Node, [], Opts, Inform) -> action(report, Node, _Args, _Opts, Inform) -> Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]), [begin ok = action(Action, N, [], [], Inform), io:nl() end || - N <- unsafe_rpc(Node, rabbit_mnesia, running_clustered_nodes, []), + N <- unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]), Action <- [status, cluster_status, environment]], VHosts = unsafe_rpc(Node, rabbit_vhost, list, []), [print_report(Node, Q) || Q <- ?GLOBAL_QUERIES], diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index a3431321..689e5d83 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -60,7 +60,7 @@ list_local() -> pg_local:get_members(rabbit_direct). list() -> - rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(), + rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running), rabbit_direct, list_local, []). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index c11a8ff7..41389815 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -87,12 +87,11 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( Q, undefined, sender_death_fun(), length_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), - MNodes1 = - (case MNodes of - all -> rabbit_mnesia:all_clustered_nodes(); - undefined -> []; - _ -> MNodes - end) -- [node()], + MNodes1 = (case MNodes of + all -> rabbit_mnesia:cluster_nodes(all); + undefined -> []; + _ -> MNodes + end) -- [node()], [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover, AsyncCallback), diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 8ce19cc6..ae36febb 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -28,19 +28,16 @@ status/0, is_db_empty/0, is_clustered/0, - all_clustered_nodes/0, - clustered_disc_nodes/0, - running_clustered_nodes/0, - is_disc_node/0, + cluster_nodes/1, + node_type/0, dir/0, table_names/0, - wait_for_tables/1, cluster_status_from_mnesia/0, - init_db/3, + init_db_unchecked/2, empty_ram_only_tables/0, copy_db/1, - wait_for_tables/0, + wait_for_tables/1, check_cluster_consistency/0, ensure_mnesia_dir/0, @@ -67,12 +64,11 @@ -export_type([node_type/0, cluster_status/0]). -type(node_type() :: disc | ram). --type(cluster_status() :: {ordsets:ordset(node()), ordsets:ordset(node()), - ordsets:ordset(node())}). +-type(cluster_status() :: {[node()], [node()], [node()]}). %% Main interface -spec(init/0 :: () -> 'ok'). --spec(join_cluster/2 :: ([node()], boolean()) -> 'ok'). +-spec(join_cluster/2 :: (node(), node_type()) -> 'ok'). -spec(reset/0 :: () -> 'ok'). -spec(force_reset/0 :: () -> 'ok'). -spec(update_cluster_nodes/1 :: (node()) -> 'ok'). @@ -84,17 +80,15 @@ {'running_nodes', [node()]}]). -spec(is_db_empty/0 :: () -> boolean()). -spec(is_clustered/0 :: () -> boolean()). --spec(all_clustered_nodes/0 :: () -> [node()]). --spec(clustered_disc_nodes/0 :: () -> [node()]). --spec(running_clustered_nodes/0 :: () -> [node()]). --spec(is_disc_node/0 :: () -> boolean()). +-spec(cluster_nodes/1 :: ('all' | 'disc' | 'ram' | 'running') -> [node()]). +-spec(node_type/0 :: () -> node_type()). -spec(dir/0 :: () -> file:filename()). -spec(table_names/0 :: () -> [atom()]). --spec(cluster_status_from_mnesia/0 :: () -> {'ok', cluster_status()} | - {'error', any()}). +-spec(cluster_status_from_mnesia/0 :: () -> rabbit_types:ok_or_error2( + cluster_status(), any())). %% Operations on the db and utils, mainly used in `rabbit_upgrade' and `rabbit' --spec(init_db/3 :: ([node()], boolean(), boolean()) -> 'ok'). +-spec(init_db_unchecked/2 :: ([node()], node_type()) -> 'ok'). -spec(empty_ram_only_tables/0 :: () -> 'ok'). -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). @@ -106,12 +100,6 @@ -spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). -%% Functions used in internal rpc calls --spec(node_info/0 :: () -> {string(), string(), - ({'ok', cluster_status()} | 'error')}). --spec(remove_node_if_mnesia_running/1 :: (node()) -> 'ok' | - {'error', term()}). - -endif. %%---------------------------------------------------------------------------- @@ -123,7 +111,9 @@ init() -> ensure_mnesia_dir(), case is_virgin_node() of true -> init_from_config(); - false -> init(is_disc_node(), all_clustered_nodes()) + false -> NodeType = node_type(), + init_db_and_upgrade(cluster_nodes(all), NodeType, + NodeType =:= ram) end, %% We intuitively expect the global name server to be synced when %% Mnesia is up. In fact that's not guaranteed to be the case - @@ -131,24 +121,21 @@ init() -> ok = global:sync(), ok. -init(WantDiscNode, AllNodes) -> - init_db_and_upgrade(AllNodes, WantDiscNode, WantDiscNode). - init_from_config() -> - {ok, {TryNodes, WantDiscNode}} = + {ok, {TryNodes, NodeType}} = application:get_env(rabbit, cluster_nodes), - case find_good_node(TryNodes -- [node()]) of + case find_good_node(nodes_excl_me(TryNodes)) of {ok, Node} -> rabbit_log:info("Node '~p' selected for clustering from " "configuration~n", [Node]), {ok, {_, DiscNodes, _}} = discover_cluster(Node), - init_db_and_upgrade(DiscNodes, WantDiscNode, false), + init_db_and_upgrade(DiscNodes, NodeType, true), rabbit_node_monitor:notify_joined_cluster(); none -> rabbit_log:warning("Could not find any suitable node amongst the " "ones provided in the configuration: ~p~n", [TryNodes]), - init(true, [node()]) + init_db_and_upgrade([node()], disc, false) end. %% Make the node join a cluster. The node will be reset automatically @@ -165,21 +152,18 @@ init_from_config() -> %% Note that we make no attempt to verify that the nodes provided are %% all in the same cluster, we simply pick the first online node and %% we cluster to its cluster. -join_cluster(DiscoveryNode, WantDiscNode) -> - case is_disc_and_clustered() andalso [node()] =:= clustered_disc_nodes() of - true -> e(clustering_only_disc_node); - _ -> ok - end, - +join_cluster(DiscoveryNode, NodeType) -> ensure_mnesia_not_running(), ensure_mnesia_dir(), - + case is_only_clustered_disc_node() of + true -> e(clustering_only_disc_node); + false -> ok + end, {ClusterNodes, _, _} = case discover_cluster(DiscoveryNode) of {ok, Res} -> Res; - E = {error, _} -> throw(E) + {error, _} = E -> throw(E) end, - - case lists:member(node(), ClusterNodes) of + case me_in_nodes(ClusterNodes) of true -> e(already_clustered); false -> ok end, @@ -190,11 +174,10 @@ join_cluster(DiscoveryNode, WantDiscNode) -> %% of reseting the node from the user. reset(false), - rabbit_misc:local_info_msg("Clustering with ~p~n", [ClusterNodes]), - %% Join the cluster - ok = init_db_with_mnesia(ClusterNodes, WantDiscNode, false), - + rabbit_misc:local_info_msg("Clustering with ~p as ~p node~n", + [ClusterNodes, NodeType]), + ok = init_db_with_mnesia(ClusterNodes, NodeType, true, true), rabbit_node_monitor:notify_joined_cluster(), ok. @@ -202,87 +185,81 @@ join_cluster(DiscoveryNode, WantDiscNode) -> %% return node to its virgin state, where it is not member of any %% cluster, has no cluster configuration, no local database, and no %% persisted messages -reset() -> reset(false). -force_reset() -> reset(true). +reset() -> + rabbit_misc:local_info_msg("Resetting Rabbit~n", []), + reset(false). + +force_reset() -> + rabbit_misc:local_info_msg("Resetting Rabbit forcefully~n", []), + reset(true). reset(Force) -> - rabbit_misc:local_info_msg("Resetting Rabbit~s~n", - [if Force -> " forcefully"; - true -> "" - end]), ensure_mnesia_not_running(), - Node = node(), - case Force of - true -> - disconnect_nodes(nodes()); - false -> - AllNodes = all_clustered_nodes(), - %% Reconnecting so that we will get an up to date nodes. - %% We don't need to check for consistency because we are - %% resetting. Force=true here so that reset still works - %% when clustered with a node which is down. - init_db_with_mnesia(AllNodes, is_disc_node(), false, true), - case is_disc_and_clustered() andalso - [node()] =:= clustered_disc_nodes() - of - true -> e(resetting_only_disc_node); - false -> ok + Nodes = case Force of + true -> + nodes(); + false -> + AllNodes = cluster_nodes(all), + %% Reconnecting so that we will get an up to date + %% nodes. We don't need to check for consistency + %% because we are resetting. Force=true here so + %% that reset still works when clustered with a + %% node which is down. + init_db_with_mnesia(AllNodes, node_type(), false, false), + case is_only_clustered_disc_node() of + true -> e(resetting_only_disc_node); + false -> ok + end, + leave_cluster(), + rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), + cannot_delete_schema), + cluster_nodes(all) end, - leave_cluster(), - rabbit_misc:ensure_ok(mnesia:delete_schema([Node]), - cannot_delete_schema), - disconnect_nodes(all_clustered_nodes()), - ok - end, + %% We need to make sure that we don't end up in a distributed + %% Erlang system with nodes while not being in an Mnesia cluster + %% with them. We don't handle that well. + [erlang:disconnect_node(N) || N <- Nodes], %% remove persisted messages and any other garbage we find ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")), ok = rabbit_node_monitor:reset_cluster_status(), ok. -%% We need to make sure that we don't end up in a distributed Erlang -%% system with nodes while not being in an Mnesia cluster with -%% them. We don't handle that well. -disconnect_nodes(Nodes) -> [erlang:disconnect_node(N) || N <- Nodes]. - change_cluster_node_type(Type) -> - ensure_mnesia_dir(), ensure_mnesia_not_running(), + ensure_mnesia_dir(), case is_clustered() of false -> e(not_clustered); true -> ok end, - {_, _, RunningNodes} = - case discover_cluster(all_clustered_nodes()) of - {ok, Status} -> Status; - {error, _Reason} -> e(cannot_connect_to_cluster) - end, + {_, _, RunningNodes} = case discover_cluster(cluster_nodes(all)) of + {ok, Status} -> Status; + {error, _Reason} -> e(cannot_connect_to_cluster) + end, Node = case RunningNodes of [] -> e(no_online_cluster_nodes); [Node0|_] -> Node0 end, - ok = reset(false), - ok = join_cluster(Node, case Type of - ram -> false; - disc -> true - end). + ok = reset(), + ok = join_cluster(Node, Type). update_cluster_nodes(DiscoveryNode) -> ensure_mnesia_not_running(), ensure_mnesia_dir(), - Status = {AllNodes, _, _} = case discover_cluster(DiscoveryNode) of {ok, Status0} -> Status0; {error, _Reason} -> e(cannot_connect_to_node) end, - case ordsets:is_element(node(), AllNodes) of + case me_in_nodes(AllNodes) of true -> %% As in `check_consistency/0', we can safely delete the %% schema here, since it'll be replicated from the other %% nodes mnesia:delete_schema([node()]), rabbit_node_monitor:write_cluster_status(Status), - init_db_with_mnesia(AllNodes, is_disc_node(), false); + rabbit_misc:local_info_msg("Updating cluster nodes from ~p~n", + [DiscoveryNode]), + init_db_with_mnesia(AllNodes, node_type(), true, true); false -> e(inconsistent_cluster) end, @@ -296,29 +273,25 @@ update_cluster_nodes(DiscoveryNode) -> %% the last or second to last after the node we're removing to go %% down forget_cluster_node(Node, RemoveWhenOffline) -> - case ordsets:is_element(Node, all_clustered_nodes()) of + case lists:member(Node, cluster_nodes(all)) of true -> ok; false -> e(not_a_cluster_node) end, - case {mnesia:system_info(is_running), RemoveWhenOffline} of - {yes, true} -> e(online_node_offline_flag); - _ -> ok - end, - case remove_node_if_mnesia_running(Node) of - ok -> - ok; - {error, mnesia_not_running} when RemoveWhenOffline -> - remove_node_offline_node(Node); - {error, mnesia_not_running} -> - e(offline_node_no_offline_flag); - Err = {error, _} -> - throw(Err) + case {RemoveWhenOffline, mnesia:system_info(is_running)} of + {true, no} -> remove_node_offline_node(Node); + {true, yes} -> e(online_node_offline_flag); + {false, no} -> e(offline_node_no_offline_flag); + {false, yes} -> rabbit_misc:local_info_msg( + "Removing node ~p from cluster~n", [Node]), + case remove_node_if_mnesia_running(Node) of + ok -> ok; + {error, _} = Err -> throw(Err) + end end. remove_node_offline_node(Node) -> - case {ordsets:del_element(Node, running_nodes(all_clustered_nodes())), - is_disc_node()} of - {[], true} -> + case {running_nodes(cluster_nodes(all)) -- [Node], node_type()} of + {[], disc} -> %% Note that while we check if the nodes was the last to %% go down, apart from the node we're removing from, this %% is still unsafe. Consider the situation in which A and @@ -327,12 +300,10 @@ remove_node_offline_node(Node) -> %% and B goes down. In this case, C is the second-to-last, %% but we don't know that and we'll remove B from A %% anyway, even if that will lead to bad things. - case ordsets:subtract(running_clustered_nodes(), - ordsets:from_list([node(), Node])) of + case cluster_nodes(running) -- [node(), Node] of [] -> start_mnesia(), try - [mnesia:force_load_table(T) || - T <- rabbit_mnesia:table_names()], + [mnesia:force_load_table(T) || T <- table_names()], forget_cluster_node(Node, false), ensure_mnesia_running() after @@ -350,13 +321,13 @@ remove_node_offline_node(Node) -> %%---------------------------------------------------------------------------- status() -> - IfNonEmpty = fun (_, []) -> []; + IfNonEmpty = fun (_, []) -> []; (Type, Nodes) -> [{Type, Nodes}] end, - [{nodes, (IfNonEmpty(disc, clustered_disc_nodes()) ++ - IfNonEmpty(ram, clustered_ram_nodes()))}] ++ + [{nodes, (IfNonEmpty(disc, cluster_nodes(disc)) ++ + IfNonEmpty(ram, cluster_nodes(ram)))}] ++ case mnesia:system_info(is_running) of - yes -> [{running_nodes, running_clustered_nodes()}]; + yes -> [{running_nodes, cluster_nodes(running)}]; no -> [] end. @@ -364,27 +335,10 @@ is_db_empty() -> lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end, table_names()). -is_clustered() -> - Nodes = all_clustered_nodes(), - [node()] =/= Nodes andalso [] =/= Nodes. - -is_disc_and_clustered() -> is_disc_node() andalso is_clustered(). - -%% Functions that retrieve the nodes in the cluster will rely on the -%% status file if offline. +is_clustered() -> AllNodes = cluster_nodes(all), + AllNodes =/= [] andalso AllNodes =/= [node()]. -all_clustered_nodes() -> cluster_status(all). - -clustered_disc_nodes() -> cluster_status(disc). - -clustered_ram_nodes() -> ordsets:subtract(cluster_status(all), - cluster_status(disc)). - -running_clustered_nodes() -> cluster_status(running). - -running_clustered_disc_nodes() -> - {_, DiscNodes, RunningNodes} = cluster_status(), - ordsets:intersection(DiscNodes, RunningNodes). +cluster_nodes(WhichNodes) -> cluster_status(WhichNodes). %% This function is the actual source of information, since it gets %% the data from mnesia. Obviously it'll work only when mnesia is @@ -398,75 +352,64 @@ mnesia_nodes() -> %% `init_db/3' hasn't been run yet. In other words, either %% we are a virgin node or a restarted RAM node. In both %% cases we're not interested in what mnesia has to say. - IsDiscNode = mnesia:system_info(use_dir), + NodeType = case mnesia:system_info(use_dir) of + true -> disc; + false -> ram + end, Tables = mnesia:system_info(tables), - {Table, _} = case table_definitions(case IsDiscNode of - true -> disc; - false -> ram - end) of [T|_] -> T end, + [{Table, _} | _] = table_definitions(NodeType), case lists:member(Table, Tables) of - true -> - AllNodes = - ordsets:from_list(mnesia:system_info(db_nodes)), - DiscCopies = ordsets:from_list( - mnesia:table_info(schema, disc_copies)), - DiscNodes = - case IsDiscNode of - true -> ordsets:add_element(node(), DiscCopies); - false -> DiscCopies - end, - {ok, {AllNodes, DiscNodes}}; - false -> - {error, tables_not_present} + true -> AllNodes = mnesia:system_info(db_nodes), + DiscCopies = mnesia:table_info(schema, disc_copies), + DiscNodes = case NodeType of + disc -> nodes_incl_me(DiscCopies); + ram -> DiscCopies + end, + {ok, {AllNodes, DiscNodes}}; + false -> {error, tables_not_present} end end. -cluster_status(WhichNodes, ForceMnesia) -> +cluster_status(WhichNodes) -> %% I don't want to call `running_nodes/1' unless if necessary, since it's %% pretty expensive. - Nodes = case mnesia_nodes() of - {ok, {AllNodes, DiscNodes}} -> - {ok, {AllNodes, DiscNodes, - fun() -> running_nodes(AllNodes) end}}; - {error, _Reason} when not ForceMnesia -> - {AllNodes, DiscNodes, RunningNodes} = - rabbit_node_monitor:read_cluster_status(), - %% The cluster status file records the status when the node - %% is online, but we know for sure that the node is offline - %% now, so we can remove it from the list of running nodes. - {ok, - {AllNodes, DiscNodes, - fun() -> ordsets:del_element(node(), RunningNodes) end}}; - Err = {error, _} -> - Err - end, - case Nodes of - {ok, {AllNodes1, DiscNodes1, RunningNodesThunk}} -> - {ok, case WhichNodes of - status -> {AllNodes1, DiscNodes1, RunningNodesThunk()}; - all -> AllNodes1; - disc -> DiscNodes1; - running -> RunningNodesThunk() - end}; - Err1 = {error, _} -> - Err1 + {AllNodes1, DiscNodes1, RunningNodesThunk} = + case mnesia_nodes() of + {ok, {AllNodes, DiscNodes}} -> + {AllNodes, DiscNodes, fun() -> running_nodes(AllNodes) end}; + {error, _Reason} -> + {AllNodes, DiscNodes, RunningNodes} = + rabbit_node_monitor:read_cluster_status(), + %% The cluster status file records the status when the node is + %% online, but we know for sure that the node is offline now, so + %% we can remove it from the list of running nodes. + {AllNodes, DiscNodes, fun() -> nodes_excl_me(RunningNodes) end} + end, + case WhichNodes of + status -> {AllNodes1, DiscNodes1, RunningNodesThunk()}; + all -> AllNodes1; + disc -> DiscNodes1; + ram -> AllNodes1 -- DiscNodes1; + running -> RunningNodesThunk() end. -cluster_status(WhichNodes) -> - {ok, Status} = cluster_status(WhichNodes, false), - Status. - -cluster_status() -> cluster_status(status). - -cluster_status_from_mnesia() -> cluster_status(status, true). +cluster_status_from_mnesia() -> + case mnesia_nodes() of + {ok, {AllNodes, DiscNodes}} -> {ok, {AllNodes, DiscNodes, + running_nodes(AllNodes)}}; + {error, _} = Err -> Err + end. node_info() -> {erlang:system_info(otp_release), rabbit_misc:version(), cluster_status_from_mnesia()}. -is_disc_node() -> - DiscNodes = clustered_disc_nodes(), - DiscNodes =:= [] orelse ordsets:is_element(node(), DiscNodes). +node_type() -> + DiscNodes = cluster_nodes(disc), + case DiscNodes =:= [] orelse me_in_nodes(DiscNodes) of + true -> disc; + false -> ram + end. dir() -> mnesia:system_info(directory). @@ -480,21 +423,21 @@ table_names() -> [Tab || {Tab, _} <- table_definitions()]. %% schema if there is the need to and catching up if there are other %% nodes in the cluster already. It also updates the cluster status %% file. -init_db(ClusterNodes, WantDiscNode, Force) -> - Nodes = change_extra_db_nodes(ClusterNodes, Force), +init_db(ClusterNodes, NodeType, CheckOtherNodes) -> + Nodes = change_extra_db_nodes(ClusterNodes, CheckOtherNodes), %% Note that we use `system_info' here and not the cluster status %% since when we start rabbit for the first time the cluster %% status will say we are a disc node but the tables won't be %% present yet. WasDiscNode = mnesia:system_info(use_dir), - case {Nodes, WasDiscNode, WantDiscNode} of - {[], _, false} -> + case {Nodes, WasDiscNode, NodeType} of + {[], _, ram} -> %% Standalone ram node, we don't want that throw({error, cannot_create_standalone_ram_node}); - {[], false, true} -> + {[], false, disc} -> %% RAM -> disc, starting from scratch ok = create_schema(); - {[], true, true} -> + {[], true, disc} -> %% First disc node up ok; {[AnotherNode | _], _, _} -> @@ -507,19 +450,22 @@ init_db(ClusterNodes, WantDiscNode, Force) -> %% first when moving to RAM mnesia will loudly complain %% since it doesn't make much sense to do that. But when %% moving to disc, we need to move the schema first. - case WantDiscNode of - true -> create_local_table_copy(schema, disc_copies), - create_local_table_copies(disc); - false -> create_local_table_copies(ram), - create_local_table_copy(schema, ram_copies) + case NodeType of + disc -> create_local_table_copy(schema, disc_copies), + create_local_table_copies(disc); + ram -> create_local_table_copies(ram), + create_local_table_copy(schema, ram_copies) end end, ensure_schema_integrity(), rabbit_node_monitor:update_cluster_status(), ok. -init_db_and_upgrade(ClusterNodes, WantDiscNode, Force) -> - ok = init_db(ClusterNodes, WantDiscNode, Force), +init_db_unchecked(ClusterNodes, NodeType) -> + init_db(ClusterNodes, NodeType, false). + +init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes) -> + ok = init_db(ClusterNodes, NodeType, CheckOtherNodes), ok = case rabbit_upgrade:maybe_upgrade_local() of ok -> ok; starting_from_scratch -> rabbit_version:record_desired(); @@ -527,25 +473,23 @@ init_db_and_upgrade(ClusterNodes, WantDiscNode, Force) -> end, %% `maybe_upgrade_local' restarts mnesia, so ram nodes will forget %% about the cluster - case WantDiscNode of - false -> start_mnesia(), - change_extra_db_nodes(ClusterNodes, true), - wait_for_replicated_tables(); - true -> ok + case NodeType of + ram -> start_mnesia(), + change_extra_db_nodes(ClusterNodes, false), + wait_for_replicated_tables(); + disc -> ok end, ok. -init_db_with_mnesia(ClusterNodes, WantDiscNode, CheckConsistency, Force) -> +init_db_with_mnesia(ClusterNodes, NodeType, + CheckOtherNodes, CheckConsistency) -> start_mnesia(CheckConsistency), try - init_db_and_upgrade(ClusterNodes, WantDiscNode, Force) + init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes) after stop_mnesia() end. -init_db_with_mnesia(ClusterNodes, WantDiscNode, Force) -> - init_db_with_mnesia(ClusterNodes, WantDiscNode, true, Force). - ensure_mnesia_dir() -> MnesiaDir = dir() ++ "/", case filelib:ensure_dir(MnesiaDir) of @@ -593,7 +537,7 @@ check_schema_integrity() -> true -> check_table_attributes(Tab, TabDef) end end) of - ok -> ok = wait_for_tables(), + ok -> ok = wait_for_tables(table_names()), check_tables(fun check_table_content/2); Other -> Other end. @@ -628,9 +572,9 @@ copy_db(Destination) -> ok = ensure_mnesia_not_running(), rabbit_file:recursive_copy(dir(), Destination). -wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). - -wait_for_tables() -> wait_for_tables(table_names()). +wait_for_replicated_tables() -> + wait_for_tables([Tab || {Tab, TabDef} <- table_definitions(), + not lists:member({local_content, true}, TabDef)]). wait_for_tables(TableNames) -> case mnesia:wait_for_tables(TableNames, 30000) of @@ -649,11 +593,11 @@ check_cluster_consistency() -> case lists:foldl( fun (Node, {error, _}) -> check_cluster_consistency(Node); (_Node, {ok, Status}) -> {ok, Status} - end, {error, not_found}, - ordsets:del_element(node(), all_clustered_nodes())) + end, {error, not_found}, nodes_excl_me(cluster_nodes(all))) of {ok, Status = {RemoteAllNodes, _, _}} -> - case ordsets:is_subset(all_clustered_nodes(), RemoteAllNodes) of + case ordsets:is_subset(ordsets:from_list(cluster_nodes(all)), + ordsets:from_list(RemoteAllNodes)) of true -> ok; false -> @@ -673,7 +617,7 @@ check_cluster_consistency() -> rabbit_node_monitor:write_cluster_status(Status); {error, not_found} -> ok; - E = {error, _} -> + {error, _} = E -> throw(E) end. @@ -685,7 +629,7 @@ check_cluster_consistency(Node) -> {error, not_found}; {OTP, Rabbit, {ok, Status}} -> case check_consistency(OTP, Rabbit, Node, Status) of - E = {error, _} -> E; + {error, _} = E -> E; {ok, Res} -> {ok, Res} end end. @@ -695,17 +639,22 @@ check_cluster_consistency(Node) -> %%-------------------------------------------------------------------- on_node_up(Node) -> - case running_clustered_disc_nodes() =:= [Node] of - true -> rabbit_log:info("cluster contains disc nodes again~n"); - false -> ok + case running_disc_nodes() of + [Node] -> rabbit_log:info("cluster contains disc nodes again~n"); + _ -> ok end. on_node_down(_Node) -> - case running_clustered_disc_nodes() =:= [] of - true -> rabbit_log:info("only running disc node went down~n"); - false -> ok + case running_disc_nodes() of + [] -> rabbit_log:info("only running disc node went down~n"); + _ -> ok end. +running_disc_nodes() -> + {_AllNodes, DiscNodes, RunningNodes} = cluster_status(status), + ordsets:to_list(ordsets:intersection(ordsets:from_list(DiscNodes), + ordsets:from_list(RunningNodes))). + %%-------------------------------------------------------------------- %% Internal helpers %%-------------------------------------------------------------------- @@ -717,18 +666,16 @@ discover_cluster(Nodes) when is_list(Nodes) -> discover_cluster(Node) -> OfflineError = {error, {cannot_discover_cluster, - "The nodes provided is either offline or not running"}}, - case Node =:= node() of - true -> - {error, {cannot_discover_cluster, - "You provided the current node as node to cluster with"}}; - false -> - case rpc:call(Node, - rabbit_mnesia, cluster_status_from_mnesia, []) of - {badrpc, _Reason} -> OfflineError; - {error, mnesia_not_running} -> OfflineError; - {ok, Res} -> {ok, Res} - end + "The nodes provided are either offline or not running"}}, + case node() of + Node -> {error, {cannot_discover_cluster, + "Cannot cluster node with itself"}}; + _ -> case rpc:call(Node, + rabbit_mnesia, cluster_status_from_mnesia, []) of + {badrpc, _Reason} -> OfflineError; + {error, mnesia_not_running} -> OfflineError; + {ok, Res} -> {ok, Res} + end end. %% The tables aren't supposed to be on disk on a ram node @@ -850,11 +797,6 @@ queue_name_match() -> resource_match(Kind) -> #resource{kind = Kind, _='_'}. -replicated_table_names() -> - [Tab || {Tab, TabDef} <- table_definitions(), - not lists:member({local_content, true}, TabDef) - ]. - check_table_attributes(Tab, TabDef) -> {_, ExpAttrs} = proplists:lookup(attributes, TabDef), case mnesia:table_info(Tab, attributes) of @@ -877,11 +819,7 @@ check_table_content(Tab, TabDef) -> end. check_tables(Fun) -> - case [Error || {Tab, TabDef} <- table_definitions( - case is_disc_node() of - true -> disc; - false -> ram - end), + case [Error || {Tab, TabDef} <- table_definitions(node_type()), case Fun(Tab, TabDef) of ok -> Error = none, false; {error, Error} -> true @@ -1004,14 +942,13 @@ remove_node_if_mnesia_running(Node) -> end. leave_cluster() -> - case {is_clustered(), - running_nodes(ordsets:del_element(node(), all_clustered_nodes()))} - of - {false, []} -> ok; - {_, AllNodes} -> case lists:any(fun leave_cluster/1, AllNodes) of - true -> ok; - false -> e(no_running_cluster_nodes) - end + RunningNodes = running_nodes(nodes_excl_me(cluster_nodes(all))), + case not is_clustered() andalso RunningNodes =:= [] of + true -> ok; + false -> case lists:any(fun leave_cluster/1, RunningNodes) of + true -> ok; + false -> e(no_running_cluster_nodes) + end end. leave_cluster(Node) -> @@ -1042,25 +979,25 @@ stop_mnesia() -> stopped = mnesia:stop(), ensure_mnesia_not_running(). -change_extra_db_nodes(ClusterNodes0, Force) -> - ClusterNodes = lists:usort(ClusterNodes0) -- [node()], - case mnesia:change_config(extra_db_nodes, ClusterNodes) of - {ok, []} when not Force andalso ClusterNodes =/= [] -> +change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) -> + ClusterNodes = nodes_excl_me(ClusterNodes0), + case {mnesia:change_config(extra_db_nodes, ClusterNodes), ClusterNodes} of + {{ok, []}, [_|_]} when CheckOtherNodes -> throw({error, {failed_to_cluster_with, ClusterNodes, "Mnesia could not connect to any nodes."}}); - {ok, Nodes} -> + {{ok, Nodes}, _} -> Nodes end. -%% We're not using `mnesia:system_info(running_db_nodes)' directly because if -%% the node is a RAM node it won't know about other nodes when mnesia is stopped +%% We're not using `mnesia:system_info(running_db_nodes)' directly +%% because if the node is a RAM node it won't know about other nodes +%% when mnesia is stopped running_nodes(Nodes) -> - {Replies, _BadNodes} = - rpc:multicall(Nodes, rabbit_mnesia, is_running_remote, []), + {Replies, _BadNodes} = rpc:multicall(Nodes, + rabbit_mnesia, is_running_remote, []), [Node || {Running, Node} <- Replies, Running]. -is_running_remote() -> - {mnesia:system_info(is_running) =:= yes, node()}. +is_running_remote() -> {mnesia:system_info(is_running) =:= yes, node()}. check_consistency(OTP, Rabbit) -> rabbit_misc:sequence_error( @@ -1073,15 +1010,14 @@ check_consistency(OTP, Rabbit, Node, Status) -> check_nodes_consistency(Node, Status)]). check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) -> - ThisNode = node(), - case ordsets:is_element(ThisNode, RemoteAllNodes) of + case me_in_nodes(RemoteAllNodes) of true -> {ok, RemoteStatus}; false -> {error, {inconsistent_cluster, rabbit_misc:format("Node ~p thinks it's clustered " "with node ~p, but ~p disagrees", - [ThisNode, Node, Node])}} + [node(), Node, Node])}} end. check_version_consistency(This, Remote, _) when This =:= Remote -> @@ -1106,13 +1042,16 @@ check_rabbit_consistency(Remote) -> %% `rabbit_node_monitor:prepare_cluster_status_file/0'. is_virgin_node() -> case rabbit_file:list_dir(dir()) of - {error, enoent} -> true; - {ok, []} -> true; + {error, enoent} -> + true; + {ok, []} -> + true; {ok, [File1, File2]} -> lists:usort([dir() ++ "/" ++ File1, dir() ++ "/" ++ File2]) =:= lists:usort([rabbit_node_monitor:cluster_status_filename(), rabbit_node_monitor:running_nodes_filename()]); - {ok, _} -> false + {ok, _} -> + false end. find_good_node([]) -> @@ -1126,6 +1065,16 @@ find_good_node([Node | Nodes]) -> end end. +is_only_clustered_disc_node() -> + node_type() =:= disc andalso is_clustered() andalso + cluster_nodes(disc) =:= [node()]. + +me_in_nodes(Nodes) -> lists:member(node(), Nodes). + +nodes_incl_me(Nodes) -> lists:usort([node()|Nodes]). + +nodes_excl_me(Nodes) -> Nodes -- [node()]. + e(Tag) -> throw({error, {Tag, error_description(Tag)}}). error_description(clustering_only_disc_node) -> diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 2d0ded12..5cf8d1ae 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -295,7 +295,7 @@ start_ssl_client(SslOpts, Sock) -> start_client(Sock, ssl_transform_fun(SslOpts)). connections() -> - rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(), + rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running), rabbit_networking, connections_local, []). connections_local() -> diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 88037953..026aa362 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -18,29 +18,16 @@ -behaviour(gen_server). +-export([start_link/0]). -export([running_nodes_filename/0, - cluster_status_filename/0, - prepare_cluster_status_files/0, - write_cluster_status/1, - read_cluster_status/0, - update_cluster_status/0, - reset_cluster_status/0, - - joined_cluster/2, - notify_joined_cluster/0, - left_cluster/1, - notify_left_cluster/1, - node_up/2, - notify_node_up/0, - - start_link/0, - init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3 - ]). + cluster_status_filename/0, prepare_cluster_status_files/0, + write_cluster_status/1, read_cluster_status/0, + update_cluster_status/0, reset_cluster_status/0]). +-export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). -define(SERVER, ?MODULE). -define(RABBIT_UP_RPC_TIMEOUT, 2000). @@ -49,6 +36,8 @@ -ifdef(use_specs). +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). + -spec(running_nodes_filename/0 :: () -> string()). -spec(cluster_status_filename/0 :: () -> string()). -spec(prepare_cluster_status_files/0 :: () -> 'ok'). @@ -57,27 +46,31 @@ -spec(update_cluster_status/0 :: () -> 'ok'). -spec(reset_cluster_status/0 :: () -> 'ok'). --spec(joined_cluster/2 :: (node(), boolean()) -> 'ok'). +-spec(notify_node_up/0 :: () -> 'ok'). -spec(notify_joined_cluster/0 :: () -> 'ok'). --spec(left_cluster/1 :: (node()) -> 'ok'). -spec(notify_left_cluster/1 :: (node()) -> 'ok'). --spec(node_up/2 :: (node(), boolean()) -> 'ok'). --spec(notify_node_up/0 :: () -> 'ok'). -endif. %%---------------------------------------------------------------------------- +%% Start +%%---------------------------------------------------------------------------- + +start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%---------------------------------------------------------------------------- %% Cluster file operations %%---------------------------------------------------------------------------- -%% The cluster file information is kept in two files. The "cluster status file" -%% contains all the clustered nodes and the disc nodes. The "running nodes -%% file" contains the currently running nodes or the running nodes at shutdown -%% when the node is down. +%% The cluster file information is kept in two files. The "cluster +%% status file" contains all the clustered nodes and the disc nodes. +%% The "running nodes file" contains the currently running nodes or +%% the running nodes at shutdown when the node is down. %% -%% We strive to keep the files up to date and we rely on this assumption in -%% various situations. Obviously when mnesia is offline the information we have -%% will be outdated, but it can't be otherwise. +%% We strive to keep the files up to date and we rely on this +%% assumption in various situations. Obviously when mnesia is offline +%% the information we have will be outdated, but it cannot be +%% otherwise. running_nodes_filename() -> filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown"). @@ -93,6 +86,10 @@ prepare_cluster_status_files() -> {ok, _ } -> CorruptFiles(); {error, enoent} -> [] end, + ThisNode = [node()], + %% The running nodes file might contain a set or a list, in case + %% of the legacy file + RunningNodes2 = lists:usort(ThisNode ++ RunningNodes1), {AllNodes1, WantDiscNode} = case try_read_file(cluster_status_filename()) of {ok, [{AllNodes, DiscNodes0}]} -> @@ -105,16 +102,11 @@ prepare_cluster_status_files() -> {error, enoent} -> {legacy_cluster_nodes([]), true} end, - - ThisNode = [node()], - - RunningNodes2 = lists:usort(RunningNodes1 ++ ThisNode), AllNodes2 = lists:usort(AllNodes1 ++ RunningNodes2), DiscNodes = case WantDiscNode of true -> ThisNode; false -> [] end, - ok = write_cluster_status({AllNodes2, DiscNodes, RunningNodes2}). write_cluster_status({All, Disc, Running}) -> @@ -132,13 +124,6 @@ write_cluster_status({All, Disc, Running}) -> {FN, {error, E2}} -> throw({error, {could_not_write_file, FN, E2}}) end. -try_read_file(FileName) -> - case rabbit_file:read_term_file(FileName) of - {ok, Term} -> {ok, Term}; - {error, enoent} -> {error, enoent}; - {error, E} -> throw({error, {cannot_read_file, FileName, E}}) - end. - read_cluster_status() -> case {try_read_file(cluster_status_filename()), try_read_file(running_nodes_filename())} of @@ -159,88 +144,78 @@ reset_cluster_status() -> %% Cluster notifications %%---------------------------------------------------------------------------- -joined_cluster(Node, IsDiscNode) -> - gen_server:cast(?SERVER, {joined_cluster, Node, IsDiscNode}). +notify_node_up() -> + Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()], + gen_server:abcast(Nodes, ?SERVER, + {node_up, node(), rabbit_mnesia:node_type()}), + %% register other active rabbits with this rabbit + DiskNodes = rabbit_mnesia:cluster_nodes(disc), + [gen_server:cast(?SERVER, {node_up, N, case lists:member(N, DiskNodes) of + true -> disc; + false -> ram + end}) || N <- Nodes], + ok. notify_joined_cluster() -> - cluster_multicall(joined_cluster, [node(), rabbit_mnesia:is_disc_node()]), + Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()], + gen_server:abcast(Nodes, ?SERVER, + {joined_cluster, node(), rabbit_mnesia:node_type()}), ok. -left_cluster(Node) -> - gen_server:cast(?SERVER, {left_cluster, Node}). - notify_left_cluster(Node) -> - left_cluster(Node), - cluster_multicall(left_cluster, [Node]), - ok. - -node_up(Node, IsDiscNode) -> - gen_server:cast(?SERVER, {node_up, Node, IsDiscNode}). - -notify_node_up() -> - Nodes = cluster_multicall(node_up, [node(), rabbit_mnesia:is_disc_node()]), - %% register other active rabbits with this rabbit - [ node_up(N, ordsets:is_element(N, rabbit_mnesia:clustered_disc_nodes())) || - N <- Nodes ], + Nodes = rabbit_mnesia:cluster_nodes(running), + gen_server:abcast(Nodes, ?SERVER, {left_cluster, Node}), ok. %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - -init([]) -> - {ok, no_state}. +init([]) -> {ok, pmon:new()}. handle_call(_Request, _From, State) -> {noreply, State}. -%% Note: when updating the status file, we can't simply write the mnesia -%% information since the message can (and will) overtake the mnesia propagation. -handle_cast({node_up, Node, IsDiscNode}, State) -> - case is_already_monitored({rabbit, Node}) of - true -> {noreply, State}; +%% Note: when updating the status file, we can't simply write the +%% mnesia information since the message can (and will) overtake the +%% mnesia propagation. +handle_cast({node_up, Node, NodeType}, Monitors) -> + case pmon:is_monitored({rabbit, Node}, Monitors) of + true -> {noreply, Monitors}; false -> rabbit_log:info("rabbit on node ~p up~n", [Node]), {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), - write_cluster_status({ordsets:add_element(Node, AllNodes), - case IsDiscNode of - true -> ordsets:add_element( - Node, DiscNodes); - false -> DiscNodes + write_cluster_status({add_node(Node, AllNodes), + case NodeType of + disc -> add_node(Node, DiscNodes); + ram -> DiscNodes end, - ordsets:add_element(Node, RunningNodes)}), - erlang:monitor(process, {rabbit, Node}), + add_node(Node, RunningNodes)}), ok = handle_live_rabbit(Node), - {noreply, State} + {noreply, pmon:monitor({rabbit, Node}, Monitors)} end; -handle_cast({joined_cluster, Node, IsDiscNode}, State) -> +handle_cast({joined_cluster, Node, NodeType}, State) -> {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), - write_cluster_status({ordsets:add_element(Node, AllNodes), - case IsDiscNode of - true -> ordsets:add_element(Node, - DiscNodes); - false -> DiscNodes + write_cluster_status({add_node(Node, AllNodes), + case NodeType of + disc -> add_node(Node, DiscNodes); + ram -> DiscNodes end, RunningNodes}), {noreply, State}; handle_cast({left_cluster, Node}, State) -> {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), - write_cluster_status({ordsets:del_element(Node, AllNodes), - ordsets:del_element(Node, DiscNodes), - ordsets:del_element(Node, RunningNodes)}), + write_cluster_status({del_node(Node, AllNodes), del_node(Node, DiscNodes), + del_node(Node, RunningNodes)}), {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) -> +handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Monitors) -> rabbit_log:info("rabbit on node ~p down~n", [Node]), {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), - write_cluster_status({AllNodes, DiscNodes, - ordsets:del_element(Node, RunningNodes)}), + write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}), ok = handle_dead_rabbit(Node), - {noreply, State}; + {noreply, pmon:erase({rabbit, Node}, Monitors)}; handle_info(_Info, State) -> {noreply, State}. @@ -271,27 +246,22 @@ handle_live_rabbit(Node) -> %% Internal utils %%-------------------------------------------------------------------- -cluster_multicall(Fun, Args) -> - Node = node(), - Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node], - %% notify other rabbits of this cluster - case rpc:multicall(Nodes, rabbit_node_monitor, Fun, Args, - ?RABBIT_UP_RPC_TIMEOUT) of - {_, [] } -> ok; - {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad]) - end, - Nodes. - -is_already_monitored(Item) -> - {monitors, Monitors} = process_info(self(), monitors), - lists:any(fun ({_, Item1}) when Item =:= Item1 -> true; - (_) -> false - end, Monitors). +try_read_file(FileName) -> + case rabbit_file:read_term_file(FileName) of + {ok, Term} -> {ok, Term}; + {error, enoent} -> {error, enoent}; + {error, E} -> throw({error, {cannot_read_file, FileName, E}}) + end. legacy_cluster_nodes(Nodes) -> - %% We get all the info that we can, including the nodes from mnesia, which - %% will be there if the node is a disc node (empty list otherwise) + %% We get all the info that we can, including the nodes from + %% mnesia, which will be there if the node is a disc node (empty + %% list otherwise) lists:usort(Nodes ++ mnesia:system_info(db_nodes)). legacy_should_be_disc_node(DiscNodes) -> DiscNodes == [] orelse lists:member(node(), DiscNodes). + +add_node(Node, Nodes) -> lists:usort([Node | Nodes]). + +del_node(Node, Nodes) -> Nodes -- [Node]. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 08535e7d..df0ee721 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1511,15 +1511,15 @@ clean_logs(Files, Suffix) -> ok. assert_ram_node() -> - case rabbit_mnesia:is_disc_node() of - true -> exit('not_ram_node'); - false -> ok + case rabbit_mnesia:node_type() of + disc -> exit('not_ram_node'); + ram -> ok end. assert_disc_node() -> - case rabbit_mnesia:is_disc_node() of - true -> ok; - false -> exit('not_disc_node') + case rabbit_mnesia:node_type() of + disc -> ok; + ram -> exit('not_disc_node') end. delete_file(File) -> diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 3fbfeed0..d037f954 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -66,11 +66,11 @@ %% into the boot process by prelaunch before the mnesia application is %% started. By the time Mnesia is started the upgrades have happened %% (on the primary), or Mnesia has been reset (on the secondary) and -%% rabbit_mnesia:init_db/3 can then make the node rejoin the cluster +%% rabbit_mnesia:init_db_unchecked/2 can then make the node rejoin the cluster %% in the normal way. %% %% The non-mnesia upgrades are then triggered by -%% rabbit_mnesia:init_db/3. Of course, it's possible for a given +%% rabbit_mnesia:init_db_unchecked/2. Of course, it's possible for a given %% upgrade process to only require Mnesia upgrades, or only require %% non-Mnesia upgrades. In the latter case no Mnesia resets and %% reclusterings occur. @@ -121,16 +121,16 @@ remove_backup() -> info("upgrades: Mnesia backup removed~n", []). maybe_upgrade_mnesia() -> - AllNodes = rabbit_mnesia:all_clustered_nodes(), + AllNodes = rabbit_mnesia:cluster_nodes(all), case rabbit_version:upgrades_required(mnesia) of {error, starting_from_scratch} -> ok; {error, version_not_available} -> case AllNodes of - [_] -> ok; - _ -> die("Cluster upgrade needed but upgrading from " - "< 2.1.1.~nUnfortunately you will need to " - "rebuild the cluster.", []) + [] -> die("Cluster upgrade needed but upgrading from " + "< 2.1.1.~nUnfortunately you will need to " + "rebuild the cluster.", []); + _ -> ok end; {error, _} = Err -> throw(Err); @@ -147,11 +147,11 @@ maybe_upgrade_mnesia() -> upgrade_mode(AllNodes) -> case nodes_running(AllNodes) of [] -> - AfterUs = rabbit_mnesia:running_clustered_nodes() -- [node()], - case {is_disc_node_legacy(), AfterUs} of - {true, []} -> + AfterUs = rabbit_mnesia:cluster_nodes(running) -- [node()], + case {node_type_legacy(), AfterUs} of + {disc, []} -> primary; - {true, _} -> + {disc, _} -> Filename = rabbit_node_monitor:running_nodes_filename(), die("Cluster upgrade needed but other disc nodes shut " "down after this one.~nPlease first start the last " @@ -160,7 +160,7 @@ upgrade_mode(AllNodes) -> "all~nshow this message. In which case, remove " "the lock file on one of them and~nstart that node. " "The lock file on this node is:~n~n ~s ", [Filename]); - {false, _} -> + {ram, _} -> die("Cluster upgrade needed but this is a ram node.~n" "Please first start the last disc node to shut down.", []) @@ -216,11 +216,11 @@ force_tables() -> secondary_upgrade(AllNodes) -> %% must do this before we wipe out schema - IsDiscNode = is_disc_node_legacy(), + NodeType = node_type_legacy(), rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), cannot_delete_schema), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - ok = rabbit_mnesia:init_db(AllNodes, IsDiscNode, true), + ok = rabbit_mnesia:init_db_unchecked(AllNodes, NodeType), ok = rabbit_version:record_desired_for_scope(mnesia), ok. @@ -268,13 +268,16 @@ lock_filename() -> lock_filename(dir()). lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME). backup_dir() -> dir() ++ "-upgrade-backup". -is_disc_node_legacy() -> +node_type_legacy() -> %% This is pretty ugly but we can't start Mnesia and ask it (will %% hang), we can't look at the config file (may not include us %% even if we're a disc node). We also can't use - %% rabbit_mnesia:is_disc_node/0 because that will give false + %% rabbit_mnesia:node_type/0 because that will give false %% postivies on Rabbit up to 2.5.1. - filelib:is_regular(filename:join(dir(), "rabbit_durable_exchange.DCD")). + case filelib:is_regular(filename:join(dir(), "rabbit_durable_exchange.DCD")) of + true -> disc; + false -> ram + end. %% NB: we cannot use rabbit_log here since it may not have been %% started yet |