%% The contents of this file are subject to the Mozilla Public License %% Version 1.1 (the "License"); you may not use this file except in %% compliance with the License. You may obtain a copy of the License %% at http://www.mozilla.org/MPL/ %% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and %% limitations under the License. %% %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. %% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_table). -export([create/0, create_local_copy/1, wait_for_replicated/0, wait/1, force_load/0, is_present/0, is_empty/0, check_schema_integrity/0, clear_ram_only_tables/0]). -include("rabbit.hrl"). %%---------------------------------------------------------------------------- -ifdef(use_specs). -spec(create/0 :: () -> 'ok'). -spec(create_local_copy/1 :: ('disc' | 'ram') -> 'ok'). -spec(wait_for_replicated/0 :: () -> 'ok'). -spec(wait/1 :: ([atom()]) -> 'ok'). -spec(force_load/0 :: () -> 'ok'). -spec(is_present/0 :: () -> boolean()). -spec(is_empty/0 :: () -> boolean()). -spec(check_schema_integrity/0 :: () -> rabbit_types:ok_or_error(any())). -spec(clear_ram_only_tables/0 :: () -> 'ok'). -endif. %%---------------------------------------------------------------------------- %% Main interface %%---------------------------------------------------------------------------- create() -> lists:foreach(fun ({Tab, TabDef}) -> TabDef1 = proplists:delete(match, TabDef), case mnesia:create_table(Tab, TabDef1) of {atomic, ok} -> ok; {aborted, Reason} -> throw({error, {table_creation_failed, Tab, TabDef1, Reason}}) end end, definitions()), ok. %% The sequence in which we delete the schema and then the other %% tables is important: if we delete the schema first when moving to %% RAM mnesia will loudly complain since it doesn't make much sense to %% do that. But when moving to disc, we need to move the schema first. create_local_copy(disc) -> create_local_copy(schema, disc_copies), create_local_copies(disc); create_local_copy(ram) -> create_local_copies(ram), create_local_copy(schema, ram_copies). wait_for_replicated() -> wait([Tab || {Tab, TabDef} <- definitions(), not lists:member({local_content, true}, TabDef)]). wait(TableNames) -> case mnesia:wait_for_tables(TableNames, 30000) of ok -> ok; {timeout, BadTabs} -> throw({error, {timeout_waiting_for_tables, BadTabs}}); {error, Reason} -> throw({error, {failed_waiting_for_tables, Reason}}) end. force_load() -> [mnesia:force_load_table(T) || T <- names()], ok. is_present() -> names() -- mnesia:system_info(tables) =:= []. is_empty() -> lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end, names()). check_schema_integrity() -> Tables = mnesia:system_info(tables), case check(fun (Tab, TabDef) -> case lists:member(Tab, Tables) of false -> {error, {table_missing, Tab}}; true -> check_attributes(Tab, TabDef) end end) of ok -> ok = wait(names()), check(fun check_content/2); Other -> Other end. clear_ram_only_tables() -> Node = node(), lists:foreach( fun (TabName) -> case lists:member(Node, mnesia:table_info(TabName, ram_copies)) of true -> {atomic, ok} = mnesia:clear_table(TabName); false -> ok end end, names()), ok. %%-------------------------------------------------------------------- %% Internal helpers %%-------------------------------------------------------------------- create_local_copies(Type) -> lists:foreach( fun ({Tab, TabDef}) -> HasDiscCopies = has_copy_type(TabDef, disc_copies), HasDiscOnlyCopies = has_copy_type(TabDef, disc_only_copies), LocalTab = proplists:get_bool(local_content, TabDef), StorageType = if Type =:= disc orelse LocalTab -> if HasDiscCopies -> disc_copies; HasDiscOnlyCopies -> disc_only_copies; true -> ram_copies end; Type =:= ram -> ram_copies end, ok = create_local_copy(Tab, StorageType) end, definitions(Type)), ok. create_local_copy(Tab, Type) -> StorageType = mnesia:table_info(Tab, storage_type), {atomic, ok} = if StorageType == unknown -> mnesia:add_table_copy(Tab, node(), Type); StorageType /= Type -> mnesia:change_table_copy_type(Tab, node(), Type); true -> {atomic, ok} end, ok. has_copy_type(TabDef, DiscType) -> lists:member(node(), proplists:get_value(DiscType, TabDef, [])). check_attributes(Tab, TabDef) -> {_, ExpAttrs} = proplists:lookup(attributes, TabDef), case mnesia:table_info(Tab, attributes) of ExpAttrs -> ok; Attrs -> {error, {table_attributes_mismatch, Tab, ExpAttrs, Attrs}} end. check_content(Tab, TabDef) -> {_, Match} = proplists:lookup(match, TabDef), case mnesia:dirty_first(Tab) of '$end_of_table' -> ok; Key -> ObjList = mnesia:dirty_read(Tab, Key), MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]), case ets:match_spec_run(ObjList, MatchComp) of ObjList -> ok; _ -> {error, {table_content_invalid, Tab, Match, ObjList}} end end. check(Fun) -> case [Error || {Tab, TabDef} <- definitions(), case Fun(Tab, TabDef) of ok -> Error = none, false; {error, Error} -> true end] of [] -> ok; Errors -> {error, Errors} end. %%-------------------------------------------------------------------- %% Table definitions %%-------------------------------------------------------------------- names() -> [Tab || {Tab, _} <- definitions()]. %% The tables aren't supposed to be on disk on a ram node definitions(disc) -> definitions(); definitions(ram) -> [{Tab, [{disc_copies, []}, {ram_copies, [node()]} | proplists:delete( ram_copies, proplists:delete(disc_copies, TabDef))]} || {Tab, TabDef} <- definitions()]. definitions() -> [{rabbit_user, [{record_name, internal_user}, {attributes, record_info(fields, internal_user)}, {disc_copies, [node()]}, {match, #internal_user{_='_'}}]}, {rabbit_user_permission, [{record_name, user_permission}, {attributes, record_info(fields, user_permission)}, {disc_copies, [node()]}, {match, #user_permission{user_vhost = #user_vhost{_='_'}, permission = #permission{_='_'}, _='_'}}]}, {rabbit_vhost, [{record_name, vhost}, {attributes, record_info(fields, vhost)}, {disc_copies, [node()]}, {match, #vhost{_='_'}}]}, {rabbit_listener, [{record_name, listener}, {attributes, record_info(fields, listener)}, {type, bag}, {match, #listener{_='_'}}]}, {rabbit_durable_route, [{record_name, route}, {attributes, record_info(fields, route)}, {disc_copies, [node()]}, {match, #route{binding = binding_match(), _='_'}}]}, {rabbit_semi_durable_route, [{record_name, route}, {attributes, record_info(fields, route)}, {type, ordered_set}, {match, #route{binding = binding_match(), _='_'}}]}, {rabbit_route, [{record_name, route}, {attributes, record_info(fields, route)}, {type, ordered_set}, {match, #route{binding = binding_match(), _='_'}}]}, {rabbit_reverse_route, [{record_name, reverse_route}, {attributes, record_info(fields, reverse_route)}, {type, ordered_set}, {match, #reverse_route{reverse_binding = reverse_binding_match(), _='_'}}]}, {rabbit_topic_trie_node, [{record_name, topic_trie_node}, {attributes, record_info(fields, topic_trie_node)}, {type, ordered_set}, {match, #topic_trie_node{trie_node = trie_node_match(), _='_'}}]}, {rabbit_topic_trie_edge, [{record_name, topic_trie_edge}, {attributes, record_info(fields, topic_trie_edge)}, {type, ordered_set}, {match, #topic_trie_edge{trie_edge = trie_edge_match(), _='_'}}]}, {rabbit_topic_trie_binding, [{record_name, topic_trie_binding}, {attributes, record_info(fields, topic_trie_binding)}, {type, ordered_set}, {match, #topic_trie_binding{trie_binding = trie_binding_match(), _='_'}}]}, {rabbit_durable_exchange, [{record_name, exchange}, {attributes, record_info(fields, exchange)}, {disc_copies, [node()]}, {match, #exchange{name = exchange_name_match(), _='_'}}]}, {rabbit_exchange, [{record_name, exchange}, {attributes, record_info(fields, exchange)}, {match, #exchange{name = exchange_name_match(), _='_'}}]}, {rabbit_exchange_serial, [{record_name, exchange_serial}, {attributes, record_info(fields, exchange_serial)}, {match, #exchange_serial{name = exchange_name_match(), _='_'}}]}, {rabbit_runtime_parameters, [{record_name, runtime_parameters}, {attributes, record_info(fields, runtime_parameters)}, {disc_copies, [node()]}, {match, #runtime_parameters{_='_'}}]}, {rabbit_durable_queue, [{record_name, amqqueue}, {attributes, record_info(fields, amqqueue)}, {disc_copies, [node()]}, {match, #amqqueue{name = queue_name_match(), _='_'}}]}, {rabbit_queue, [{record_name, amqqueue}, {attributes, record_info(fields, amqqueue)}, {match, #amqqueue{name = queue_name_match(), _='_'}}]}] ++ gm:table_definitions() ++ mirrored_supervisor:table_definitions(). binding_match() -> #binding{source = exchange_name_match(), destination = binding_destination_match(), _='_'}. reverse_binding_match() -> #reverse_binding{destination = binding_destination_match(), source = exchange_name_match(), _='_'}. binding_destination_match() -> resource_match('_'). trie_node_match() -> #trie_node{ exchange_name = exchange_name_match(), _='_'}. trie_edge_match() -> #trie_edge{ exchange_name = exchange_name_match(), _='_'}. trie_binding_match() -> #trie_binding{exchange_name = exchange_name_match(), _='_'}. exchange_name_match() -> resource_match(exchange). queue_name_match() -> resource_match(queue). resource_match(Kind) -> #resource{kind = Kind, _='_'}.