summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-08-11 16:30:16 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-08-11 16:30:16 +0100
commit8ef9564e3693fc06321e1aaa29e7a55574896723 (patch)
tree77589ca8363eec16948eeeb8f4acea238a73dcc4
parentd3755e8322318256d75b7695a12e258615328bea (diff)
parent7b52fd3115167150618a410ff681edb8b1832236 (diff)
downloadrabbitmq-server-8ef9564e3693fc06321e1aaa29e7a55574896723.tar.gz
merge default into bug23104
-rw-r--r--src/rabbit_mnesia.erl117
1 files changed, 88 insertions, 29 deletions
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index d9a70797..0ce4eb91 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -91,7 +91,6 @@ init() ->
ok = ensure_mnesia_running(),
ok = ensure_mnesia_dir(),
ok = init_db(read_cluster_nodes_config(), true),
- ok = wait_for_tables(),
ok.
is_db_empty() ->
@@ -114,7 +113,6 @@ cluster(ClusterNodes, Force) ->
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
try
ok = init_db(ClusterNodes, Force),
- ok = wait_for_tables(),
ok = create_cluster_nodes_config(ClusterNodes)
after
mnesia:stop()
@@ -157,50 +155,80 @@ table_definitions() ->
[{rabbit_user,
[{record_name, user},
{attributes, record_info(fields, user)},
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #user{_='_'}}]},
{rabbit_user_permission,
[{record_name, user_permission},
{attributes, record_info(fields, user_permission)},
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #user_permission{user_vhost = #user_vhost{_='_'},
+ permission = #permission{_='_'},
+ _='_'}}]},
{rabbit_vhost,
[{record_name, vhost},
{attributes, record_info(fields, vhost)},
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #vhost{_='_'}}]},
{rabbit_config,
[{attributes, [key, val]}, % same mnesia's default
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, {rabbit_config, '_', '_'}}]},
{rabbit_listener,
[{record_name, listener},
{attributes, record_info(fields, listener)},
- {type, bag}]},
+ {type, bag},
+ {match, #listener{_='_'}}]},
{rabbit_durable_route,
[{record_name, route},
{attributes, record_info(fields, route)},
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #route{binding = binding_match(), _='_'}}]},
{rabbit_route,
[{record_name, route},
{attributes, record_info(fields, route)},
- {type, ordered_set}]},
+ {type, ordered_set},
+ {match, #route{binding = binding_match(), _='_'}}]},
{rabbit_reverse_route,
[{record_name, reverse_route},
{attributes, record_info(fields, reverse_route)},
- {type, ordered_set}]},
+ {type, ordered_set},
+ {match, #reverse_route{reverse_binding = reverse_binding_match(),
+ _='_'}}]},
%% Consider the implications to nodes_of_type/1 before altering
%% the next entry.
{rabbit_durable_exchange,
[{record_name, exchange},
{attributes, record_info(fields, exchange)},
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #exchange{name = exchange_name_match(), _='_'}}]},
{rabbit_exchange,
[{record_name, exchange},
- {attributes, record_info(fields, exchange)}]},
+ {attributes, record_info(fields, exchange)},
+ {match, #exchange{name = exchange_name_match(), _='_'}}]},
{rabbit_durable_queue,
[{record_name, amqqueue},
{attributes, record_info(fields, amqqueue)},
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #amqqueue{name = queue_name_match(), _='_'}}]},
{rabbit_queue,
[{record_name, amqqueue},
- {attributes, record_info(fields, amqqueue)}]}].
+ {attributes, record_info(fields, amqqueue)},
+ {match, #amqqueue{name = queue_name_match(), _='_'}}]}].
+
+binding_match() ->
+ #binding{queue_name = queue_name_match(),
+ exchange_name = exchange_name_match(),
+ _='_'}.
+reverse_binding_match() ->
+ #reverse_binding{queue_name = queue_name_match(),
+ exchange_name = exchange_name_match(),
+ _='_'}.
+exchange_name_match() ->
+ resource_match(exchange).
+queue_name_match() ->
+ resource_match(queue).
+resource_match(Kind) ->
+ #resource{kind = Kind, _='_'}.
table_names() ->
[Tab || {Tab, _} <- table_definitions()].
@@ -232,6 +260,14 @@ ensure_mnesia_not_running() ->
yes -> throw({error, mnesia_unexpectedly_running})
end.
+ensure_schema_integrity() ->
+ case check_schema_integrity() of
+ ok ->
+ ok;
+ {error, Reason} ->
+ throw({error, {schema_integrity_check_failed, Reason}})
+ end.
+
check_schema_integrity() ->
TabDefs = table_definitions(),
Tables = mnesia:system_info(tables),
@@ -248,10 +284,33 @@ check_schema_integrity() ->
ExpAttrs, Attrs},
Attrs /= ExpAttrs
end] of
- [] -> ok;
+ [] -> check_table_integrity();
Errors -> {error, Errors}
end.
+check_table_integrity() ->
+ ok = wait_for_tables(),
+ case lists:all(fun ({Tab, TabDef}) ->
+ {_, Match} = proplists:lookup(match, TabDef),
+ read_test_table(Tab, Match)
+ end, table_definitions()) of
+ true -> ok;
+ false -> {error, invalid_table_content}
+ end.
+
+read_test_table(Tab, Match) ->
+ case mnesia:dirty_first(Tab) of
+ '$end_of_table' ->
+ true;
+ Key ->
+ ObjList = mnesia:dirty_read(Tab, Key),
+ MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]),
+ case ets:match_spec_run(ObjList, MatchComp) of
+ ObjList -> true;
+ _ -> false
+ end
+ end.
+
%% The cluster node config file contains some or all of the disk nodes
%% that are members of the cluster this node is / should be a part of.
%%
@@ -347,7 +406,9 @@ init_db(ClusterNodes, Force) ->
ok = create_local_table_copies(case IsDiskNode of
true -> disc;
false -> ram
- end)
+ end),
+ ok = ensure_schema_integrity(),
+ ok = wait_for_tables()
end;
{error, Reason} ->
%% one reason we may end up here is if we try to join
@@ -363,7 +424,9 @@ create_schema() ->
cannot_create_schema),
rabbit_misc:ensure_ok(mnesia:start(),
cannot_start_mnesia),
- create_tables().
+ ok = create_tables(),
+ ok = ensure_schema_integrity(),
+ ok = wait_for_tables().
move_db() ->
mnesia:stop(),
@@ -389,11 +452,12 @@ move_db() ->
create_tables() ->
lists:foreach(fun ({Tab, TabDef}) ->
- case mnesia:create_table(Tab, TabDef) of
+ TabDef1 = proplists:delete(match, TabDef),
+ case mnesia:create_table(Tab, TabDef1) of
{atomic, ok} -> ok;
{aborted, Reason} ->
throw({error, {table_creation_failed,
- Tab, TabDef, Reason}})
+ Tab, TabDef1, Reason}})
end
end,
table_definitions()),
@@ -448,17 +512,12 @@ wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()).
wait_for_tables() -> wait_for_tables(table_names()).
wait_for_tables(TableNames) ->
- case check_schema_integrity() of
- ok ->
- case mnesia:wait_for_tables(TableNames, 30000) of
- ok -> ok;
- {timeout, BadTabs} ->
- throw({error, {timeout_waiting_for_tables, BadTabs}});
- {error, Reason} ->
- throw({error, {failed_waiting_for_tables, Reason}})
- end;
+ case mnesia:wait_for_tables(TableNames, 30000) of
+ ok -> ok;
+ {timeout, BadTabs} ->
+ throw({error, {timeout_waiting_for_tables, BadTabs}});
{error, Reason} ->
- throw({error, {schema_integrity_check_failed, Reason}})
+ throw({error, {failed_waiting_for_tables, Reason}})
end.
reset(Force) ->