diff options
Diffstat (limited to 'src/rabbit_mnesia.erl')
-rw-r--r-- | src/rabbit_mnesia.erl | 399 |
1 files changed, 399 insertions, 0 deletions
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl new file mode 100644 index 00000000..82b80cb4 --- /dev/null +++ b/src/rabbit_mnesia.erl @@ -0,0 +1,399 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_mnesia). + +-export([ensure_mnesia_dir/0, status/0, init/0, is_db_empty/0, + cluster/1, reset/0, force_reset/0]). + +-export([table_names/0]). + +%% create_tables/0 exported for helping embed RabbitMQ in or alongside +%% other mnesia-using Erlang applications, such as ejabberd +-export([create_tables/0]). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(status/0 :: () -> [{'nodes' | 'running_nodes', [node()]}]). +-spec(ensure_mnesia_dir/0 :: () -> 'ok'). +-spec(init/0 :: () -> 'ok'). +-spec(is_db_empty/0 :: () -> bool()). +-spec(cluster/1 :: ([node()]) -> 'ok'). +-spec(reset/0 :: () -> 'ok'). +-spec(force_reset/0 :: () -> 'ok'). +-spec(create_tables/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +status() -> + [{nodes, mnesia:system_info(db_nodes)}, + {running_nodes, mnesia:system_info(running_db_nodes)}]. + +init() -> + ok = ensure_mnesia_running(), + ok = ensure_mnesia_dir(), + ok = init_db(read_cluster_nodes_config()), + ok = wait_for_tables(), + ok. + +is_db_empty() -> + lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end, + table_names()). + +%% Alter which disk nodes this node is clustered with. This can be a +%% subset of all the disk nodes in the cluster but can (and should) +%% include the node itself if it is to be a disk rather than a ram +%% node. +cluster(ClusterNodes) -> + ok = ensure_mnesia_not_running(), + ok = ensure_mnesia_dir(), + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + try + ok = init_db(ClusterNodes), + ok = wait_for_tables(), + ok = create_cluster_nodes_config(ClusterNodes) + after + mnesia:stop() + end, + ok. + +%% return node to its virgin state, where it is not member of any +%% cluster, has no cluster configuration, no local database, and no +%% persisted messages +reset() -> reset(false). +force_reset() -> reset(true). + +%%-------------------------------------------------------------------- + +table_definitions() -> + [{user, [{disc_copies, [node()]}, + {attributes, record_info(fields, user)}]}, + {user_vhost, [{type, bag}, + {disc_copies, [node()]}, + {attributes, record_info(fields, user_vhost)}, + {index, [virtual_host]}]}, + {vhost, [{disc_copies, [node()]}, + {attributes, record_info(fields, vhost)}]}, + {vhost_realm, [{type, bag}, + {disc_copies, [node()]}, + {attributes, record_info(fields, vhost_realm)}, + {index, [realm]}]}, + {realm, [{disc_copies, [node()]}, + {attributes, record_info(fields, realm)}]}, + {user_realm, [{type, bag}, + {disc_copies, [node()]}, + {attributes, record_info(fields, user_realm)}, + {index, [realm]}]}, + {exclusive_realm_visitor, + [{record_name, realm_visitor}, + {attributes, record_info(fields, realm_visitor)}, + {index, [pid]}]}, + {realm_visitor, [{type, bag}, + {attributes, record_info(fields, realm_visitor)}, + {index, [pid]}]}, + {rabbit_config, [{disc_copies, [node()]}]}, + {listener, [{type, bag}, + {attributes, record_info(fields, listener)}]}, + {binding, [{attributes, record_info(fields, binding)}]}, + {durable_exchanges, [{disc_copies, [node()]}, + {record_name, exchange}, + {attributes, record_info(fields, exchange)}]}, + {exchange, [{attributes, record_info(fields, exchange)}]}, + {durable_queues, [{disc_copies, [node()]}, + {record_name, amqqueue}, + {attributes, record_info(fields, amqqueue)}]}, + {amqqueue, [{attributes, record_info(fields, amqqueue)}, + {index, [pid]}]}]. + +table_names() -> + [Tab || {Tab, _} <- table_definitions()]. + +ensure_mnesia_dir() -> + MnesiaDir = mnesia:system_info(directory) ++ "/", + case filelib:ensure_dir(MnesiaDir) of + {error, Reason} -> + throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}}); + ok -> ok + end. + +ensure_mnesia_running() -> + case mnesia:system_info(is_running) of + yes -> ok; + no -> throw({error, mnesia_not_running}) + end. + +ensure_mnesia_not_running() -> + case mnesia:system_info(is_running) of + no -> ok; + yes -> throw({error, mnesia_unexpectedly_running}) + end. + +ensure_schema_integrity() -> + case catch lists:foreach(fun (Tab) -> + mnesia:table_info(Tab, version) + end, + table_names()) of + {'EXIT', Reason} -> throw({error, {schema_integrity_check_failed, + Reason}}); + _ -> ok + end, + %%TODO: more thorough checks + ok. + +%% The cluster node config file contains some or all of the disk nodes +%% that are members of the cluster this node is / should be a part of. +%% +%% If the file is absent, the list is empty, or only contains the +%% current node, then the current node is a standalone (disk) +%% node. Otherwise it is a node that is part of a cluster as either a +%% disk node, if it appears in the cluster node config, or ram node if +%% it doesn't. + +cluster_nodes_config_filename() -> + mnesia:system_info(directory) ++ "/cluster_nodes.config". + +create_cluster_nodes_config(ClusterNodes) -> + FileName = cluster_nodes_config_filename(), + Handle = case file:open(FileName, [write]) of + {ok, Device} -> Device; + {error, Reason} -> + throw({error, {cannot_create_cluster_nodes_config, + FileName, Reason}}) + end, + try + ok = io:write(Handle, ClusterNodes), + ok = io:put_chars(Handle, [$.]) + after + case file:close(Handle) of + ok -> ok; + {error, Reason1} -> + throw({error, {cannot_close_cluster_nodes_config, + FileName, Reason1}}) + end + end, + ok. + +read_cluster_nodes_config() -> + FileName = cluster_nodes_config_filename(), + case file:consult(FileName) of + {ok, [ClusterNodes]} -> ClusterNodes; + {error, enoent} -> + case application:get_env(cluster_config) of + undefined -> []; + {ok, DefaultFileName} -> + case file:consult(DefaultFileName) of + {ok, [ClusterNodes]} -> ClusterNodes; + {error, enoent} -> + error_logger:warning_msg( + "default cluster config file ~p does not exist~n", + [DefaultFileName]), + []; + {error, Reason} -> + throw({error, {cannot_read_cluster_nodes_config, + DefaultFileName, Reason}}) + end + end; + {error, Reason} -> + throw({error, {cannot_read_cluster_nodes_config, + FileName, Reason}}) + end. + +delete_cluster_nodes_config() -> + FileName = cluster_nodes_config_filename(), + case file:delete(FileName) of + ok -> ok; + {error, enoent} -> ok; + {error, Reason} -> + throw({error, {cannot_delete_cluster_nodes_config, + FileName, Reason}}) + end. + +%% Take a cluster node config and create the right kind of node - a +%% standalone disk node, or disk or ram node connected to the +%% specified cluster nodes. +init_db(ClusterNodes) -> + WasDiskNode = mnesia:system_info(use_dir), + IsDiskNode = ClusterNodes == [] orelse + lists:member(node(), ClusterNodes), + case mnesia:change_config(extra_db_nodes, ClusterNodes -- [node()]) of + {ok, []} -> + if WasDiskNode and IsDiskNode -> + ok; + WasDiskNode -> + throw({error, {cannot_convert_disk_node_to_ram_node, + ClusterNodes}}); + IsDiskNode -> + ok = create_schema(); + true -> + throw({error, {unable_to_contact_cluster_nodes, + ClusterNodes}}) + end; + {ok, [_|_]} -> + ok = ensure_schema_integrity(), + ok = wait_for_tables(), + ok = create_local_table_copies( + case IsDiskNode of + true -> disc; + false -> ram + end); + {error, Reason} -> + %% one reason we may end up here is if we try to join + %% nodes together that are currently running standalone or + %% are members of a different cluster + throw({error, {unable_to_join_cluster, + ClusterNodes, Reason}}) + end. + +create_schema() -> + mnesia:stop(), + rabbit_misc:ensure_ok(mnesia:create_schema([node()]), + cannot_create_schema), + rabbit_misc:ensure_ok(mnesia:start(), + cannot_start_mnesia), + create_tables(). + +create_tables() -> + lists:foreach(fun ({Tab, TabArgs}) -> + case mnesia:create_table(Tab, TabArgs) of + {atomic, ok} -> ok; + {aborted, Reason} -> + throw({error, {table_creation_failed, + Tab, TabArgs, Reason}}) + end + end, + table_definitions()), + ok. + +create_local_table_copies(Type) -> + ok = if Type /= ram -> create_local_table_copy(schema, disc_copies); + true -> ok + end, + lists:foreach( + fun({Tab, TabDef}) -> + HasDiscCopies = + lists:keymember(disc_copies, 1, TabDef), + HasDiscOnlyCopies = + lists:keymember(disc_only_copies, 1, TabDef), + StorageType = + case Type of + disc -> + if + HasDiscCopies -> disc_copies; + HasDiscOnlyCopies -> disc_only_copies; + true -> ram_copies + end; +%% unused code - commented out to keep dialyzer happy +%% disc_only -> +%% if +%% HasDiscCopies or HasDiscOnlyCopies -> +%% disc_only_copies; +%% true -> ram_copies +%% end; + ram -> + ram_copies + end, + ok = create_local_table_copy(Tab, StorageType) + end, + table_definitions()), + ok = if Type == ram -> create_local_table_copy(schema, ram_copies); + true -> ok + end, + ok. + +create_local_table_copy(Tab, Type) -> + StorageType = mnesia:table_info(Tab, storage_type), + {atomic, ok} = + if + StorageType == unknown -> + mnesia:add_table_copy(Tab, node(), Type); + StorageType /= Type -> + mnesia:change_table_copy_type(Tab, node(), Type); + true -> {atomic, ok} + end, + ok. + +wait_for_tables() -> + case mnesia:wait_for_tables(table_names(), 30000) of + ok -> ok; + {timeout, BadTabs} -> + throw({error, {timeout_waiting_for_tables, BadTabs}}); + {error, Reason} -> + throw({error, {failed_waiting_for_tables, Reason}}) + end. + +reset(Force) -> + ok = ensure_mnesia_not_running(), + Node = node(), + case Force of + true -> ok; + false -> + ok = ensure_mnesia_dir(), + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + {Nodes, RunningNodes} = + try + ok = init(), + {mnesia:system_info(db_nodes) -- [Node], + mnesia:system_info(running_db_nodes) -- [Node]} + after + mnesia:stop() + end, + leave_cluster(Nodes, RunningNodes), + rabbit_misc:ensure_ok(mnesia:delete_schema([Node]), + cannot_delete_schema) + end, + ok = delete_cluster_nodes_config(), + %% remove persistet messages and any other garbage we find + lists:foreach(fun file:delete/1, + filelib:wildcard(mnesia:system_info(directory) ++ "/*")), + ok. + +leave_cluster([], _) -> ok; +leave_cluster(Nodes, RunningNodes) -> + %% find at least one running cluster node and instruct it to + %% remove our schema copy which will in turn result in our node + %% being removed as a cluster node from the schema, with that + %% change being propagated to all nodes + case lists:any( + fun (Node) -> + case rpc:call(Node, mnesia, del_table_copy, + [schema, node()]) of + {atomic, ok} -> true; + {badrpc, nodedown} -> false; + {aborted, Reason} -> + throw({error, {failed_to_leave_cluster, + Nodes, RunningNodes, Reason}}) + end + end, + RunningNodes) of + true -> ok; + false -> throw({error, {no_running_cluster_nodes, + Nodes, RunningNodes}}) + end. |