diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-11 16:30:16 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-11 16:30:16 +0100 |
commit | 8ef9564e3693fc06321e1aaa29e7a55574896723 (patch) | |
tree | 77589ca8363eec16948eeeb8f4acea238a73dcc4 /src/rabbit_mnesia.erl | |
parent | d3755e8322318256d75b7695a12e258615328bea (diff) | |
parent | 7b52fd3115167150618a410ff681edb8b1832236 (diff) | |
download | rabbitmq-server-8ef9564e3693fc06321e1aaa29e7a55574896723.tar.gz |
merge default into bug23104
Diffstat (limited to 'src/rabbit_mnesia.erl')
-rw-r--r-- | src/rabbit_mnesia.erl | 117 |
1 files changed, 88 insertions, 29 deletions
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index d9a70797..0ce4eb91 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -91,7 +91,6 @@ init() -> ok = ensure_mnesia_running(), ok = ensure_mnesia_dir(), ok = init_db(read_cluster_nodes_config(), true), - ok = wait_for_tables(), ok. is_db_empty() -> @@ -114,7 +113,6 @@ cluster(ClusterNodes, Force) -> rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), try ok = init_db(ClusterNodes, Force), - ok = wait_for_tables(), ok = create_cluster_nodes_config(ClusterNodes) after mnesia:stop() @@ -157,50 +155,80 @@ table_definitions() -> [{rabbit_user, [{record_name, user}, {attributes, record_info(fields, user)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #user{_='_'}}]}, {rabbit_user_permission, [{record_name, user_permission}, {attributes, record_info(fields, user_permission)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #user_permission{user_vhost = #user_vhost{_='_'}, + permission = #permission{_='_'}, + _='_'}}]}, {rabbit_vhost, [{record_name, vhost}, {attributes, record_info(fields, vhost)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #vhost{_='_'}}]}, {rabbit_config, [{attributes, [key, val]}, % same mnesia's default - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, {rabbit_config, '_', '_'}}]}, {rabbit_listener, [{record_name, listener}, {attributes, record_info(fields, listener)}, - {type, bag}]}, + {type, bag}, + {match, #listener{_='_'}}]}, {rabbit_durable_route, [{record_name, route}, {attributes, record_info(fields, route)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #route{binding = binding_match(), _='_'}}]}, {rabbit_route, [{record_name, route}, {attributes, record_info(fields, route)}, - {type, ordered_set}]}, + {type, ordered_set}, + {match, #route{binding = binding_match(), _='_'}}]}, {rabbit_reverse_route, [{record_name, reverse_route}, {attributes, record_info(fields, reverse_route)}, - {type, ordered_set}]}, + {type, ordered_set}, + {match, #reverse_route{reverse_binding = reverse_binding_match(), + _='_'}}]}, %% Consider the implications to nodes_of_type/1 before altering %% the next entry. {rabbit_durable_exchange, [{record_name, exchange}, {attributes, record_info(fields, exchange)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #exchange{name = exchange_name_match(), _='_'}}]}, {rabbit_exchange, [{record_name, exchange}, - {attributes, record_info(fields, exchange)}]}, + {attributes, record_info(fields, exchange)}, + {match, #exchange{name = exchange_name_match(), _='_'}}]}, {rabbit_durable_queue, [{record_name, amqqueue}, {attributes, record_info(fields, amqqueue)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #amqqueue{name = queue_name_match(), _='_'}}]}, {rabbit_queue, [{record_name, amqqueue}, - {attributes, record_info(fields, amqqueue)}]}]. + {attributes, record_info(fields, amqqueue)}, + {match, #amqqueue{name = queue_name_match(), _='_'}}]}]. + +binding_match() -> + #binding{queue_name = queue_name_match(), + exchange_name = exchange_name_match(), + _='_'}. +reverse_binding_match() -> + #reverse_binding{queue_name = queue_name_match(), + exchange_name = exchange_name_match(), + _='_'}. +exchange_name_match() -> + resource_match(exchange). +queue_name_match() -> + resource_match(queue). +resource_match(Kind) -> + #resource{kind = Kind, _='_'}. table_names() -> [Tab || {Tab, _} <- table_definitions()]. @@ -232,6 +260,14 @@ ensure_mnesia_not_running() -> yes -> throw({error, mnesia_unexpectedly_running}) end. +ensure_schema_integrity() -> + case check_schema_integrity() of + ok -> + ok; + {error, Reason} -> + throw({error, {schema_integrity_check_failed, Reason}}) + end. + check_schema_integrity() -> TabDefs = table_definitions(), Tables = mnesia:system_info(tables), @@ -248,10 +284,33 @@ check_schema_integrity() -> ExpAttrs, Attrs}, Attrs /= ExpAttrs end] of - [] -> ok; + [] -> check_table_integrity(); Errors -> {error, Errors} end. +check_table_integrity() -> + ok = wait_for_tables(), + case lists:all(fun ({Tab, TabDef}) -> + {_, Match} = proplists:lookup(match, TabDef), + read_test_table(Tab, Match) + end, table_definitions()) of + true -> ok; + false -> {error, invalid_table_content} + end. + +read_test_table(Tab, Match) -> + case mnesia:dirty_first(Tab) of + '$end_of_table' -> + true; + Key -> + ObjList = mnesia:dirty_read(Tab, Key), + MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]), + case ets:match_spec_run(ObjList, MatchComp) of + ObjList -> true; + _ -> false + end + end. + %% The cluster node config file contains some or all of the disk nodes %% that are members of the cluster this node is / should be a part of. %% @@ -347,7 +406,9 @@ init_db(ClusterNodes, Force) -> ok = create_local_table_copies(case IsDiskNode of true -> disc; false -> ram - end) + end), + ok = ensure_schema_integrity(), + ok = wait_for_tables() end; {error, Reason} -> %% one reason we may end up here is if we try to join @@ -363,7 +424,9 @@ create_schema() -> cannot_create_schema), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - create_tables(). + ok = create_tables(), + ok = ensure_schema_integrity(), + ok = wait_for_tables(). move_db() -> mnesia:stop(), @@ -389,11 +452,12 @@ move_db() -> create_tables() -> lists:foreach(fun ({Tab, TabDef}) -> - case mnesia:create_table(Tab, TabDef) of + TabDef1 = proplists:delete(match, TabDef), + case mnesia:create_table(Tab, TabDef1) of {atomic, ok} -> ok; {aborted, Reason} -> throw({error, {table_creation_failed, - Tab, TabDef, Reason}}) + Tab, TabDef1, Reason}}) end end, table_definitions()), @@ -448,17 +512,12 @@ wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). wait_for_tables() -> wait_for_tables(table_names()). wait_for_tables(TableNames) -> - case check_schema_integrity() of - ok -> - case mnesia:wait_for_tables(TableNames, 30000) of - ok -> ok; - {timeout, BadTabs} -> - throw({error, {timeout_waiting_for_tables, BadTabs}}); - {error, Reason} -> - throw({error, {failed_waiting_for_tables, Reason}}) - end; + case mnesia:wait_for_tables(TableNames, 30000) of + ok -> ok; + {timeout, BadTabs} -> + throw({error, {timeout_waiting_for_tables, BadTabs}}); {error, Reason} -> - throw({error, {schema_integrity_check_failed, Reason}}) + throw({error, {failed_waiting_for_tables, Reason}}) end. reset(Force) -> |