diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-12 11:50:27 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-12 11:50:27 +0100 |
commit | 6572564dcb2952ee9766c61bf6c31fa0c65ce169 (patch) | |
tree | b18cb2c4f0d53aabf479380c20277007807d1521 | |
parent | a6a531f2147831521f5a1e2f4811de263a6a78fe (diff) | |
parent | daeeade587de25ee5401975d0f56a50a097f5a0f (diff) | |
download | rabbitmq-server-6572564dcb2952ee9766c61bf6c31fa0c65ce169.tar.gz |
Merging bug 23095 into bug 15930
-rw-r--r-- | packaging/windows/Makefile | 2 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 128 | ||||
-rw-r--r-- | src/rabbit_plugin_activator.erl | 4 | ||||
-rw-r--r-- | src/supervisor2.erl | 63 |
4 files changed, 128 insertions, 69 deletions
diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile index b5e3a153..f47b5340 100644 --- a/packaging/windows/Makefile +++ b/packaging/windows/Makefile @@ -20,6 +20,8 @@ dist: mv $(SOURCE_DIR) $(TARGET_DIR) mkdir -p $(TARGET_DIR) + mkdir -p $(TARGET_DIR)/plugins + echo Put your .ez plugin files in this directory > $(TARGET_DIR)/plugins/README xmlto -o . xhtml-nochunks ../../docs/rabbitmq-service.xml elinks -dump -no-references -no-numbering rabbitmq-service.html \ > $(TARGET_DIR)/readme-service.txt diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 505dc28f..4a5adfae 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -91,7 +91,6 @@ init() -> ok = ensure_mnesia_running(), ok = ensure_mnesia_dir(), ok = init_db(read_cluster_nodes_config(), true), - ok = wait_for_tables(), ok. is_db_empty() -> @@ -114,7 +113,6 @@ cluster(ClusterNodes, Force) -> rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), try ok = init_db(ClusterNodes, Force), - ok = wait_for_tables(), ok = create_cluster_nodes_config(ClusterNodes) after mnesia:stop() @@ -157,57 +155,87 @@ table_definitions() -> [{rabbit_user, [{record_name, user}, {attributes, record_info(fields, user)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #user{_='_'}}]}, {rabbit_user_permission, [{record_name, user_permission}, {attributes, record_info(fields, user_permission)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #user_permission{user_vhost = #user_vhost{_='_'}, + permission = #permission{_='_'}, + _='_'}}]}, {rabbit_vhost, [{record_name, vhost}, {attributes, record_info(fields, vhost)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #vhost{_='_'}}]}, {rabbit_config, [{attributes, [key, val]}, % same mnesia's default - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, {rabbit_config, '_', '_'}}]}, {rabbit_listener, [{record_name, listener}, {attributes, record_info(fields, listener)}, - {type, bag}]}, + {type, bag}, + {match, #listener{_='_'}}]}, {rabbit_durable_route, [{record_name, route}, {attributes, record_info(fields, route)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #route{binding = binding_match(), _='_'}}]}, {rabbit_route, [{record_name, route}, {attributes, record_info(fields, route)}, - {type, ordered_set}]}, + {type, ordered_set}, + {match, #route{binding = binding_match(), _='_'}}]}, {rabbit_reverse_route, [{record_name, reverse_route}, {attributes, record_info(fields, reverse_route)}, - {type, ordered_set}]}, + {type, ordered_set}, + {match, #reverse_route{reverse_binding = reverse_binding_match(), + _='_'}}]}, %% Consider the implications to nodes_of_type/1 before altering %% the next entry. {rabbit_durable_exchange, [{record_name, exchange}, {attributes, record_info(fields, exchange)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #exchange{name = exchange_name_match(), _='_'}}]}, {rabbit_exchange, [{record_name, exchange}, - {attributes, record_info(fields, exchange)}]}, + {attributes, record_info(fields, exchange)}, + {match, #exchange{name = exchange_name_match(), _='_'}}]}, {rabbit_durable_queue, [{record_name, amqqueue}, {attributes, record_info(fields, amqqueue)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #amqqueue{name = queue_name_match(), _='_'}}]}, {rabbit_queue, [{record_name, amqqueue}, - {attributes, record_info(fields, amqqueue)}]}]. + {attributes, record_info(fields, amqqueue)}, + {match, #amqqueue{name = queue_name_match(), _='_'}}]}]. + +binding_match() -> + #binding{queue_name = queue_name_match(), + exchange_name = exchange_name_match(), + _='_'}. +reverse_binding_match() -> + #reverse_binding{queue_name = queue_name_match(), + exchange_name = exchange_name_match(), + _='_'}. +exchange_name_match() -> + resource_match(exchange). +queue_name_match() -> + resource_match(queue). +resource_match(Kind) -> + #resource{kind = Kind, _='_'}. table_names() -> [Tab || {Tab, _} <- table_definitions()]. replicated_table_names() -> - [Tab || {Tab, Attrs} <- table_definitions(), - not lists:member({local_content, true}, Attrs) + [Tab || {Tab, TabDef} <- table_definitions(), + not lists:member({local_content, true}, TabDef) ]. dir() -> mnesia:system_info(directory). @@ -232,26 +260,55 @@ ensure_mnesia_not_running() -> yes -> throw({error, mnesia_unexpectedly_running}) end. +ensure_schema_integrity() -> + case check_schema_integrity() of + ok -> + ok; + {error, Reason} -> + throw({error, {schema_integrity_check_failed, Reason}}) + end. + check_schema_integrity() -> - TabDefs = table_definitions(), Tables = mnesia:system_info(tables), - case [Error || Tab <- table_names(), + case [Error || {Tab, TabDef} <- table_definitions(), case lists:member(Tab, Tables) of false -> Error = {table_missing, Tab}, true; true -> - {_, TabDef} = proplists:lookup(Tab, TabDefs), {_, ExpAttrs} = proplists:lookup(attributes, TabDef), Attrs = mnesia:table_info(Tab, attributes), Error = {table_attributes_mismatch, Tab, ExpAttrs, Attrs}, Attrs /= ExpAttrs end] of - [] -> ok; + [] -> check_table_integrity(); Errors -> {error, Errors} end. +check_table_integrity() -> + ok = wait_for_tables(), + case lists:all(fun ({Tab, TabDef}) -> + {_, Match} = proplists:lookup(match, TabDef), + read_test_table(Tab, Match) + end, table_definitions()) of + true -> ok; + false -> {error, invalid_table_content} + end. + +read_test_table(Tab, Match) -> + case mnesia:dirty_first(Tab) of + '$end_of_table' -> + true; + Key -> + ObjList = mnesia:dirty_read(Tab, Key), + MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]), + case ets:match_spec_run(ObjList, MatchComp) of + ObjList -> true; + _ -> false + end + end. + %% The cluster node config file contains some or all of the disk nodes %% that are members of the cluster this node is / should be a part of. %% @@ -347,8 +404,9 @@ init_db(ClusterNodes, Force) -> ok = create_local_table_copies(case IsDiskNode of true -> disc; false -> ram - end) - end; + end), + ok = ensure_schema_integrity() + end; {error, Reason} -> %% one reason we may end up here is if we try to join %% nodes together that are currently running standalone or @@ -363,7 +421,9 @@ create_schema() -> cannot_create_schema), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - create_tables(). + ok = create_tables(), + ok = ensure_schema_integrity(), + ok = wait_for_tables(). move_db() -> mnesia:stop(), @@ -388,12 +448,13 @@ move_db() -> ok. create_tables() -> - lists:foreach(fun ({Tab, TabArgs}) -> - case mnesia:create_table(Tab, TabArgs) of + lists:foreach(fun ({Tab, TabDef}) -> + TabDef1 = proplists:delete(match, TabDef), + case mnesia:create_table(Tab, TabDef1) of {atomic, ok} -> ok; {aborted, Reason} -> throw({error, {table_creation_failed, - Tab, TabArgs, Reason}}) + Tab, TabDef1, Reason}}) end end, table_definitions()), @@ -448,17 +509,12 @@ wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). wait_for_tables() -> wait_for_tables(table_names()). wait_for_tables(TableNames) -> - case check_schema_integrity() of - ok -> - case mnesia:wait_for_tables(TableNames, 30000) of - ok -> ok; - {timeout, BadTabs} -> - throw({error, {timeout_waiting_for_tables, BadTabs}}); - {error, Reason} -> - throw({error, {failed_waiting_for_tables, Reason}}) - end; + case mnesia:wait_for_tables(TableNames, 30000) of + ok -> ok; + {timeout, BadTabs} -> + throw({error, {timeout_waiting_for_tables, BadTabs}}); {error, Reason} -> - throw({error, {schema_integrity_check_failed, Reason}}) + throw({error, {failed_waiting_for_tables, Reason}}) end. reset(Force) -> diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index a170fb1d..c9f75be0 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -51,6 +51,7 @@ %%---------------------------------------------------------------------------- start() -> + io:format("Activating RabbitMQ plugins ..."), %% Ensure Rabbit is loaded so we can access it's environment application:load(rabbit), @@ -129,8 +130,9 @@ start() -> ok -> ok; error -> error("failed to compile boot script file ~s", [ScriptFile]) end, - io:format("~n~w plugins activated.~n~n", [length(PluginApps)]), + io:format("~n~w plugins activated:~n", [length(PluginApps)]), [io:format("* ~w~n", [App]) || App <- PluginApps], + io:nl(), halt(), ok. diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 117adde2..4a1c5832 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -540,11 +540,11 @@ do_restart({RestartType, Delay}, Reason, Child, State) -> [self(), {{RestartType, Delay}, Reason, Child}]), {ok, NState} end; -do_restart(intrinsic, normal, Child, State) -> - {shutdown, state_del_child(Child, State)}; do_restart(permanent, Reason, Child, State) -> report_error(child_terminated, Reason, Child, State#state.name), restart(Child, State); +do_restart(intrinsic, normal, Child, State) -> + {shutdown, state_del_child(Child, State)}; do_restart(_, normal, Child, State) -> NState = state_del_child(Child, State), {ok, NState}; @@ -653,14 +653,22 @@ terminate_simple_children(Child, Dynamics, SupName) -> ok. do_terminate(Child, SupName) when Child#child.pid =/= undefined -> - case shutdown(Child#child.pid, - Child#child.shutdown) of - ok -> - Child#child{pid = undefined}; - {error, OtherReason} -> - report_error(shutdown_error, OtherReason, Child, SupName), - Child#child{pid = undefined} - end; + ReportError = fun (Reason) -> + report_error(shutdown_error, Reason, Child, SupName) + end, + case shutdown(Child#child.pid, Child#child.shutdown) of + ok -> + ok; + {error, normal} -> + case Child#child.restart_type of + permanent -> ReportError(normal); + {permanent, _Delay} -> ReportError(normal); + _ -> ok + end; + {error, OtherReason} -> + ReportError(OtherReason) + end, + Child#child{pid = undefined}; do_terminate(Child, _SupName) -> Child. @@ -680,13 +688,10 @@ shutdown(Pid, brutal_kill) -> ok -> exit(Pid, kill), receive + {'DOWN', _MRef, process, Pid, killed} -> + ok; {'DOWN', _MRef, process, Pid, OtherReason} -> - case OtherReason of - killed -> ok; - normal -> ok; - noproc -> ok; - _ -> {error, OtherReason} - end + {error, OtherReason} end; {error, Reason} -> {error, Reason} @@ -698,13 +703,10 @@ shutdown(Pid, Time) -> ok -> exit(Pid, shutdown), %% Try to shutdown gracefully receive + {'DOWN', _MRef, process, Pid, shutdown} -> + ok; {'DOWN', _MRef, process, Pid, OtherReason} -> - case OtherReason of - shutdown -> ok; - normal -> ok; - noproc -> ok; - _ -> {error, OtherReason} - end + {error, OtherReason} after Time -> exit(Pid, kill), %% Force termination. receive @@ -730,13 +732,10 @@ monitor_child(Pid) -> %% If the child dies before the unlik we must empty %% the mail-box of the 'EXIT'-message and the 'DOWN'-message. {'EXIT', Pid, Reason} -> - case Reason of - normal -> ok; - _ -> receive - {'DOWN', _, process, Pid, _} -> - {error, Reason} - end - end + receive + {'DOWN', _, process, Pid, _} -> + {error, Reason} + end after 0 -> %% If a naughty child did unlink and the child dies before %% monitor the result will be that shutdown/2 receives a @@ -854,8 +853,8 @@ supname(N,_) -> N. %%% {Name, Func, RestartType, Shutdown, ChildType, Modules} %%% where Name is an atom %%% Func is {Mod, Fun, Args} == {atom, atom, list} -%%% RestartType is intrinsic | permanent | temporary | -%%% transient | {permanent, Delay} | +%%% RestartType is permanent | temporary | transient | +%%% intrinsic | {permanent, Delay} | %%% {transient, Delay} where Delay >= 0 %%% Shutdown = integer() | infinity | brutal_kill %%% ChildType = supervisor | worker @@ -902,10 +901,10 @@ validFunc({M, F, A}) when is_atom(M), is_list(A) -> true; validFunc(Func) -> throw({invalid_mfa, Func}). -validRestartType(intrinsic) -> true; validRestartType(permanent) -> true; validRestartType(temporary) -> true; validRestartType(transient) -> true; +validRestartType(intrinsic) -> true; validRestartType({permanent, Delay}) -> validDelay(Delay); validRestartType({transient, Delay}) -> validDelay(Delay); validRestartType(RestartType) -> throw({invalid_restart_type, |