summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-12 11:50:27 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-12 11:50:27 +0100
commit6572564dcb2952ee9766c61bf6c31fa0c65ce169 (patch)
treeb18cb2c4f0d53aabf479380c20277007807d1521
parenta6a531f2147831521f5a1e2f4811de263a6a78fe (diff)
parentdaeeade587de25ee5401975d0f56a50a097f5a0f (diff)
downloadrabbitmq-server-6572564dcb2952ee9766c61bf6c31fa0c65ce169.tar.gz
Merging bug 23095 into bug 15930
-rw-r--r--packaging/windows/Makefile2
-rw-r--r--src/rabbit_mnesia.erl128
-rw-r--r--src/rabbit_plugin_activator.erl4
-rw-r--r--src/supervisor2.erl63
4 files changed, 128 insertions, 69 deletions
diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile
index b5e3a153..f47b5340 100644
--- a/packaging/windows/Makefile
+++ b/packaging/windows/Makefile
@@ -20,6 +20,8 @@ dist:
mv $(SOURCE_DIR) $(TARGET_DIR)
mkdir -p $(TARGET_DIR)
+ mkdir -p $(TARGET_DIR)/plugins
+ echo Put your .ez plugin files in this directory > $(TARGET_DIR)/plugins/README
xmlto -o . xhtml-nochunks ../../docs/rabbitmq-service.xml
elinks -dump -no-references -no-numbering rabbitmq-service.html \
> $(TARGET_DIR)/readme-service.txt
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 505dc28f..4a5adfae 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -91,7 +91,6 @@ init() ->
ok = ensure_mnesia_running(),
ok = ensure_mnesia_dir(),
ok = init_db(read_cluster_nodes_config(), true),
- ok = wait_for_tables(),
ok.
is_db_empty() ->
@@ -114,7 +113,6 @@ cluster(ClusterNodes, Force) ->
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
try
ok = init_db(ClusterNodes, Force),
- ok = wait_for_tables(),
ok = create_cluster_nodes_config(ClusterNodes)
after
mnesia:stop()
@@ -157,57 +155,87 @@ table_definitions() ->
[{rabbit_user,
[{record_name, user},
{attributes, record_info(fields, user)},
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #user{_='_'}}]},
{rabbit_user_permission,
[{record_name, user_permission},
{attributes, record_info(fields, user_permission)},
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #user_permission{user_vhost = #user_vhost{_='_'},
+ permission = #permission{_='_'},
+ _='_'}}]},
{rabbit_vhost,
[{record_name, vhost},
{attributes, record_info(fields, vhost)},
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #vhost{_='_'}}]},
{rabbit_config,
[{attributes, [key, val]}, % same mnesia's default
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, {rabbit_config, '_', '_'}}]},
{rabbit_listener,
[{record_name, listener},
{attributes, record_info(fields, listener)},
- {type, bag}]},
+ {type, bag},
+ {match, #listener{_='_'}}]},
{rabbit_durable_route,
[{record_name, route},
{attributes, record_info(fields, route)},
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #route{binding = binding_match(), _='_'}}]},
{rabbit_route,
[{record_name, route},
{attributes, record_info(fields, route)},
- {type, ordered_set}]},
+ {type, ordered_set},
+ {match, #route{binding = binding_match(), _='_'}}]},
{rabbit_reverse_route,
[{record_name, reverse_route},
{attributes, record_info(fields, reverse_route)},
- {type, ordered_set}]},
+ {type, ordered_set},
+ {match, #reverse_route{reverse_binding = reverse_binding_match(),
+ _='_'}}]},
%% Consider the implications to nodes_of_type/1 before altering
%% the next entry.
{rabbit_durable_exchange,
[{record_name, exchange},
{attributes, record_info(fields, exchange)},
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #exchange{name = exchange_name_match(), _='_'}}]},
{rabbit_exchange,
[{record_name, exchange},
- {attributes, record_info(fields, exchange)}]},
+ {attributes, record_info(fields, exchange)},
+ {match, #exchange{name = exchange_name_match(), _='_'}}]},
{rabbit_durable_queue,
[{record_name, amqqueue},
{attributes, record_info(fields, amqqueue)},
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #amqqueue{name = queue_name_match(), _='_'}}]},
{rabbit_queue,
[{record_name, amqqueue},
- {attributes, record_info(fields, amqqueue)}]}].
+ {attributes, record_info(fields, amqqueue)},
+ {match, #amqqueue{name = queue_name_match(), _='_'}}]}].
+
+binding_match() ->
+ #binding{queue_name = queue_name_match(),
+ exchange_name = exchange_name_match(),
+ _='_'}.
+reverse_binding_match() ->
+ #reverse_binding{queue_name = queue_name_match(),
+ exchange_name = exchange_name_match(),
+ _='_'}.
+exchange_name_match() ->
+ resource_match(exchange).
+queue_name_match() ->
+ resource_match(queue).
+resource_match(Kind) ->
+ #resource{kind = Kind, _='_'}.
table_names() ->
[Tab || {Tab, _} <- table_definitions()].
replicated_table_names() ->
- [Tab || {Tab, Attrs} <- table_definitions(),
- not lists:member({local_content, true}, Attrs)
+ [Tab || {Tab, TabDef} <- table_definitions(),
+ not lists:member({local_content, true}, TabDef)
].
dir() -> mnesia:system_info(directory).
@@ -232,26 +260,55 @@ ensure_mnesia_not_running() ->
yes -> throw({error, mnesia_unexpectedly_running})
end.
+ensure_schema_integrity() ->
+ case check_schema_integrity() of
+ ok ->
+ ok;
+ {error, Reason} ->
+ throw({error, {schema_integrity_check_failed, Reason}})
+ end.
+
check_schema_integrity() ->
- TabDefs = table_definitions(),
Tables = mnesia:system_info(tables),
- case [Error || Tab <- table_names(),
+ case [Error || {Tab, TabDef} <- table_definitions(),
case lists:member(Tab, Tables) of
false ->
Error = {table_missing, Tab},
true;
true ->
- {_, TabDef} = proplists:lookup(Tab, TabDefs),
{_, ExpAttrs} = proplists:lookup(attributes, TabDef),
Attrs = mnesia:table_info(Tab, attributes),
Error = {table_attributes_mismatch, Tab,
ExpAttrs, Attrs},
Attrs /= ExpAttrs
end] of
- [] -> ok;
+ [] -> check_table_integrity();
Errors -> {error, Errors}
end.
+check_table_integrity() ->
+ ok = wait_for_tables(),
+ case lists:all(fun ({Tab, TabDef}) ->
+ {_, Match} = proplists:lookup(match, TabDef),
+ read_test_table(Tab, Match)
+ end, table_definitions()) of
+ true -> ok;
+ false -> {error, invalid_table_content}
+ end.
+
+read_test_table(Tab, Match) ->
+ case mnesia:dirty_first(Tab) of
+ '$end_of_table' ->
+ true;
+ Key ->
+ ObjList = mnesia:dirty_read(Tab, Key),
+ MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]),
+ case ets:match_spec_run(ObjList, MatchComp) of
+ ObjList -> true;
+ _ -> false
+ end
+ end.
+
%% The cluster node config file contains some or all of the disk nodes
%% that are members of the cluster this node is / should be a part of.
%%
@@ -347,8 +404,9 @@ init_db(ClusterNodes, Force) ->
ok = create_local_table_copies(case IsDiskNode of
true -> disc;
false -> ram
- end)
- end;
+ end),
+ ok = ensure_schema_integrity()
+ end;
{error, Reason} ->
%% one reason we may end up here is if we try to join
%% nodes together that are currently running standalone or
@@ -363,7 +421,9 @@ create_schema() ->
cannot_create_schema),
rabbit_misc:ensure_ok(mnesia:start(),
cannot_start_mnesia),
- create_tables().
+ ok = create_tables(),
+ ok = ensure_schema_integrity(),
+ ok = wait_for_tables().
move_db() ->
mnesia:stop(),
@@ -388,12 +448,13 @@ move_db() ->
ok.
create_tables() ->
- lists:foreach(fun ({Tab, TabArgs}) ->
- case mnesia:create_table(Tab, TabArgs) of
+ lists:foreach(fun ({Tab, TabDef}) ->
+ TabDef1 = proplists:delete(match, TabDef),
+ case mnesia:create_table(Tab, TabDef1) of
{atomic, ok} -> ok;
{aborted, Reason} ->
throw({error, {table_creation_failed,
- Tab, TabArgs, Reason}})
+ Tab, TabDef1, Reason}})
end
end,
table_definitions()),
@@ -448,17 +509,12 @@ wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()).
wait_for_tables() -> wait_for_tables(table_names()).
wait_for_tables(TableNames) ->
- case check_schema_integrity() of
- ok ->
- case mnesia:wait_for_tables(TableNames, 30000) of
- ok -> ok;
- {timeout, BadTabs} ->
- throw({error, {timeout_waiting_for_tables, BadTabs}});
- {error, Reason} ->
- throw({error, {failed_waiting_for_tables, Reason}})
- end;
+ case mnesia:wait_for_tables(TableNames, 30000) of
+ ok -> ok;
+ {timeout, BadTabs} ->
+ throw({error, {timeout_waiting_for_tables, BadTabs}});
{error, Reason} ->
- throw({error, {schema_integrity_check_failed, Reason}})
+ throw({error, {failed_waiting_for_tables, Reason}})
end.
reset(Force) ->
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index a170fb1d..c9f75be0 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -51,6 +51,7 @@
%%----------------------------------------------------------------------------
start() ->
+ io:format("Activating RabbitMQ plugins ..."),
%% Ensure Rabbit is loaded so we can access it's environment
application:load(rabbit),
@@ -129,8 +130,9 @@ start() ->
ok -> ok;
error -> error("failed to compile boot script file ~s", [ScriptFile])
end,
- io:format("~n~w plugins activated.~n~n", [length(PluginApps)]),
+ io:format("~n~w plugins activated:~n", [length(PluginApps)]),
[io:format("* ~w~n", [App]) || App <- PluginApps],
+ io:nl(),
halt(),
ok.
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 117adde2..4a1c5832 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -540,11 +540,11 @@ do_restart({RestartType, Delay}, Reason, Child, State) ->
[self(), {{RestartType, Delay}, Reason, Child}]),
{ok, NState}
end;
-do_restart(intrinsic, normal, Child, State) ->
- {shutdown, state_del_child(Child, State)};
do_restart(permanent, Reason, Child, State) ->
report_error(child_terminated, Reason, Child, State#state.name),
restart(Child, State);
+do_restart(intrinsic, normal, Child, State) ->
+ {shutdown, state_del_child(Child, State)};
do_restart(_, normal, Child, State) ->
NState = state_del_child(Child, State),
{ok, NState};
@@ -653,14 +653,22 @@ terminate_simple_children(Child, Dynamics, SupName) ->
ok.
do_terminate(Child, SupName) when Child#child.pid =/= undefined ->
- case shutdown(Child#child.pid,
- Child#child.shutdown) of
- ok ->
- Child#child{pid = undefined};
- {error, OtherReason} ->
- report_error(shutdown_error, OtherReason, Child, SupName),
- Child#child{pid = undefined}
- end;
+ ReportError = fun (Reason) ->
+ report_error(shutdown_error, Reason, Child, SupName)
+ end,
+ case shutdown(Child#child.pid, Child#child.shutdown) of
+ ok ->
+ ok;
+ {error, normal} ->
+ case Child#child.restart_type of
+ permanent -> ReportError(normal);
+ {permanent, _Delay} -> ReportError(normal);
+ _ -> ok
+ end;
+ {error, OtherReason} ->
+ ReportError(OtherReason)
+ end,
+ Child#child{pid = undefined};
do_terminate(Child, _SupName) ->
Child.
@@ -680,13 +688,10 @@ shutdown(Pid, brutal_kill) ->
ok ->
exit(Pid, kill),
receive
+ {'DOWN', _MRef, process, Pid, killed} ->
+ ok;
{'DOWN', _MRef, process, Pid, OtherReason} ->
- case OtherReason of
- killed -> ok;
- normal -> ok;
- noproc -> ok;
- _ -> {error, OtherReason}
- end
+ {error, OtherReason}
end;
{error, Reason} ->
{error, Reason}
@@ -698,13 +703,10 @@ shutdown(Pid, Time) ->
ok ->
exit(Pid, shutdown), %% Try to shutdown gracefully
receive
+ {'DOWN', _MRef, process, Pid, shutdown} ->
+ ok;
{'DOWN', _MRef, process, Pid, OtherReason} ->
- case OtherReason of
- shutdown -> ok;
- normal -> ok;
- noproc -> ok;
- _ -> {error, OtherReason}
- end
+ {error, OtherReason}
after Time ->
exit(Pid, kill), %% Force termination.
receive
@@ -730,13 +732,10 @@ monitor_child(Pid) ->
%% If the child dies before the unlik we must empty
%% the mail-box of the 'EXIT'-message and the 'DOWN'-message.
{'EXIT', Pid, Reason} ->
- case Reason of
- normal -> ok;
- _ -> receive
- {'DOWN', _, process, Pid, _} ->
- {error, Reason}
- end
- end
+ receive
+ {'DOWN', _, process, Pid, _} ->
+ {error, Reason}
+ end
after 0 ->
%% If a naughty child did unlink and the child dies before
%% monitor the result will be that shutdown/2 receives a
@@ -854,8 +853,8 @@ supname(N,_) -> N.
%%% {Name, Func, RestartType, Shutdown, ChildType, Modules}
%%% where Name is an atom
%%% Func is {Mod, Fun, Args} == {atom, atom, list}
-%%% RestartType is intrinsic | permanent | temporary |
-%%% transient | {permanent, Delay} |
+%%% RestartType is permanent | temporary | transient |
+%%% intrinsic | {permanent, Delay} |
%%% {transient, Delay} where Delay >= 0
%%% Shutdown = integer() | infinity | brutal_kill
%%% ChildType = supervisor | worker
@@ -902,10 +901,10 @@ validFunc({M, F, A}) when is_atom(M),
is_list(A) -> true;
validFunc(Func) -> throw({invalid_mfa, Func}).
-validRestartType(intrinsic) -> true;
validRestartType(permanent) -> true;
validRestartType(temporary) -> true;
validRestartType(transient) -> true;
+validRestartType(intrinsic) -> true;
validRestartType({permanent, Delay}) -> validDelay(Delay);
validRestartType({transient, Delay}) -> validDelay(Delay);
validRestartType(RestartType) -> throw({invalid_restart_type,