diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-06-17 12:12:16 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-06-17 12:12:16 +0100 |
commit | a851731d708bb6c8b107e6866dba537bcead6afe (patch) | |
tree | 57bbda181f9a1bf8d5774882228ffda6dd3299f2 | |
parent | 91247a346135bc3b7b5cf21e0c285f73ddda28b5 (diff) | |
parent | c00bd9de9db3f6781571bfb8fe416406ec06b9bf (diff) | |
download | rabbitmq-server-a851731d708bb6c8b107e6866dba537bcead6afe.tar.gz |
stable to default
30 files changed, 670 insertions, 304 deletions
diff --git a/docs/rabbitmq-plugins.1.xml b/docs/rabbitmq-plugins.1.xml index 8ecb4fc8..e891969f 100644 --- a/docs/rabbitmq-plugins.1.xml +++ b/docs/rabbitmq-plugins.1.xml @@ -40,6 +40,7 @@ <refsynopsisdiv> <cmdsynopsis> <command>rabbitmq-plugins</command> + <arg choice="opt">-n <replaceable>node</replaceable></arg> <arg choice="req"><replaceable>command</replaceable></arg> <arg choice="opt" rep="repeat"><replaceable>command options</replaceable></arg> </cmdsynopsis> @@ -97,12 +98,14 @@ </variablelist> <para> Lists all plugins, their versions, dependencies and - descriptions. Each plugin is prefixed with a status - indicator - [ ] to indicate that the plugin is not - enabled, [E] to indicate that it is explicitly enabled, - [e] to indicate that it is implicitly enabled, and [!] to - indicate that it is enabled but missing and thus not - operational. + descriptions. Each plugin is prefixed with two status + indicator characters inside [ ]. The first indicator can + be " " to indicate that the plugin is not enabled, "E" to + indicate that it is explicitly enabled, "e" to indicate + that it is implicitly enabled, or "!" to indicate that it + is enabled but missing and thus not operational. The + second indicator can be " " to show that the plugin is not + running, or "*" to show that it is. </para> <para> If the optional pattern is given, only plugins whose @@ -130,17 +133,31 @@ </varlistentry> <varlistentry> - <term><cmdsynopsis><command>enable</command> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term> + <term><cmdsynopsis><command>enable</command> <arg choice="opt">--offline</arg> <arg choice="opt">--online</arg> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> + <term>--offline</term> + <listitem><para>Just modify the enabled plugins file.</para></listitem> + </varlistentry> + <varlistentry> + <term>--online</term> + <listitem><para>Treat failure to connect to the running broker as fatal.</para></listitem> + </varlistentry> + <varlistentry> <term>plugin</term> <listitem><para>One or more plugins to enable.</para></listitem> </varlistentry> </variablelist> <para> Enables the specified plugins and all their - dependencies. + dependencies. This will update the enabled plugins file + and then attempt to connect to the broker and ensure it is + running all enabled plugins. By default if it is not + possible to connect to the running broker (for example if + it is stopped) then a warning is displayed. Specify + <command>--online</command> or + <command>--offline</command> to change this. </para> <para role="example-prefix">For example:</para> @@ -154,17 +171,31 @@ </varlistentry> <varlistentry> - <term><cmdsynopsis><command>disable</command> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term> + <term><cmdsynopsis><command>disable</command> <arg choice="opt">--offline</arg> <arg choice="opt">--online</arg> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> + <term>--offline</term> + <listitem><para>Just modify the enabled plugins file.</para></listitem> + </varlistentry> + <varlistentry> + <term>--online</term> + <listitem><para>Treat failure to connect to the running broker as fatal.</para></listitem> + </varlistentry> + <varlistentry> <term>plugin</term> <listitem><para>One or more plugins to disable.</para></listitem> </varlistentry> </variablelist> <para> - Disables the specified plugins and all plugins that - depend on them. + Disables the specified plugins and all their + dependencies. This will update the enabled plugins file + and then attempt to connect to the broker and ensure it is + running all enabled plugins. By default if it is not + possible to connect to the running broker (for example if + it is stopped) then a warning is displayed. Specify + <command>--online</command> or + <command>--offline</command> to change this. </para> <para role="example-prefix">For example:</para> diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example index b0e13b1b..4fad1542 100644 --- a/docs/rabbitmq.config.example +++ b/docs/rabbitmq.config.example @@ -213,7 +213,12 @@ %% Explicitly enable/disable hipe compilation. %% - %% {hipe_compile, true} + %% {hipe_compile, true}, + + %% Timeout used when waiting for Mnesia tables in a cluster to + %% become available. + %% + %% {mnesia_table_loading_timeout, 30000} ]}, @@ -257,9 +262,13 @@ %% {certfile, "/path/to/cert.pem"}, %% {keyfile, "/path/to/key.pem"}]}]}, + %% One of 'basic', 'detailed' or 'none'. See + %% http://www.rabbitmq.com/management.html#fine-stats for more details. + %% {rates_mode, basic}, + %% Configure how long aggregated data (such as message rates and queue %% lengths) is retained. Please read the plugin's documentation in - %% https://www.rabbitmq.com/management.html#configuration for more + %% http://www.rabbitmq.com/management.html#configuration for more %% details. %% %% {sample_retention_policies, @@ -268,14 +277,6 @@ %% {detailed, [{10, 5}]}]} ]}, - {rabbitmq_management_agent, - [%% Misc/Advanced Options - %% - %% NB: Change these only if you understand what you are doing! - %% - %% {force_fine_statistics, true} - ]}, - %% ---------------------------------------------------------------------------- %% RabbitMQ Shovel Plugin %% diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 7360208a..3647c04a 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -39,6 +39,7 @@ {server_properties, []}, {collect_statistics, none}, {collect_statistics_interval, 5000}, + {mnesia_table_loading_timeout, 30000}, {auth_mechanisms, ['PLAIN', 'AMQPLAIN']}, {auth_backends, [rabbit_auth_backend_internal]}, {delegate_count, 16}, diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 5ac3197e..c1386803 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -39,13 +39,24 @@ -record(resource, {virtual_host, kind, name}). --record(exchange, {name, type, durable, auto_delete, internal, arguments, - scratches, policy, decorators}). --record(exchange_serial, {name, next}). +%% fields described as 'transient' here are cleared when writing to +%% rabbit_durable_<thing> +-record(exchange, { + name, type, durable, auto_delete, internal, arguments, %% immutable + scratches, %% durable, explicitly updated via update_scratch/3 + policy, %% durable, implicitly updated when policy changes + decorators}). %% transient, recalculated in store/1 (i.e. recovery) + +-record(amqqueue, { + name, durable, auto_delete, exclusive_owner = none, %% immutable + arguments, %% immutable + pid, %% durable (just so we know home node) + slave_pids, sync_slave_pids, %% transient + policy, %% durable, implicit update as above + gm_pids, %% transient + decorators}). %% transient, recalculated as above --record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, - arguments, pid, slave_pids, sync_slave_pids, policy, - gm_pids, decorators}). +-record(exchange_serial, {name, next}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). @@ -105,9 +116,6 @@ -define(DESIRED_HIBERNATE, 10000). -define(CREDIT_DISC_BOUND, {2000, 500}). -%% This is dictated by `erlang:send_after' on which we depend to implement TTL. --define(MAX_EXPIRY_TIMER, 4294967295). - -define(INVALID_HEADERS_KEY, <<"x-invalid-headers">>). -define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]). -define(DELETED_HEADER, <<"BCC">>). diff --git a/packaging/common/README b/packaging/common/README index 0a29ee27..35a1523a 100644 --- a/packaging/common/README +++ b/packaging/common/README @@ -17,4 +17,4 @@ run as the superuser. An example configuration file is provided in the same directory as this README. Copy it to /etc/rabbitmq/rabbitmq.config to use it. The RabbitMQ server must be restarted after changing the configuration -file or enabling or disabling plugins. +file. @@ -17,7 +17,8 @@ main([NodeStr, ModStr, TrialsStr]) -> case rpc:call(Node, proper, module, [Mod] ++ [[{numtests, Trials}, {constraint_tries, 200}]]) of [] -> ok; - _ -> quit(1) + R -> io:format("~p.~n", [R]), + quit(1) end; {badrpc, Reason} -> io:format("Could not contact node ~p: ~p.~n", [Node, Reason]), diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins index bd7d0b6a..36910eff 100755 --- a/scripts/rabbitmq-plugins +++ b/scripts/rabbitmq-plugins @@ -21,6 +21,7 @@ ##--- Set environment vars RABBITMQ_<var_name> to defaults if not set +[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} [ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE} [ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR} @@ -35,4 +36,5 @@ exec ${ERL_DIR}erl \ -s rabbit_plugins_main \ -enabled_plugins_file "$RABBITMQ_ENABLED_PLUGINS_FILE" \ -plugins_dist_dir "$RABBITMQ_PLUGINS_DIR" \ + -nodename $RABBITMQ_NODENAME \ -extra "$@" diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat index a535ebad..61e39e38 100755 --- a/scripts/rabbitmq-plugins.bat +++ b/scripts/rabbitmq-plugins.bat @@ -31,6 +31,10 @@ if "!RABBITMQ_BASE!"=="" ( set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
)
+if "!RABBITMQ_NODENAME!"=="" (
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
+)
+
if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
@@ -51,7 +55,7 @@ if "!RABBITMQ_PLUGINS_DIR!"=="" ( set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM!!TIME:~9! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM!!TIME:~9! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
endlocal
diff --git a/src/app_utils.erl b/src/app_utils.erl index 0479ce66..e321888d 100644 --- a/src/app_utils.erl +++ b/src/app_utils.erl @@ -17,7 +17,7 @@ -export([load_applications/1, start_applications/1, start_applications/2, stop_applications/1, stop_applications/2, app_dependency_order/2, - wait_for_applications/1]). + wait_for_applications/1, app_dependencies/1]). -ifdef(use_specs). @@ -30,6 +30,7 @@ -spec stop_applications([atom()], error_handler()) -> 'ok'. -spec wait_for_applications([atom()]) -> 'ok'. -spec app_dependency_order([atom()], boolean()) -> [digraph:vertex()]. +-spec app_dependencies(atom()) -> [atom()]. -endif. @@ -74,8 +75,8 @@ wait_for_applications(Apps) -> app_dependency_order(RootApps, StripUnreachable) -> {ok, G} = rabbit_misc:build_acyclic_graph( - fun (App, _Deps) -> [{App, App}] end, - fun (App, Deps) -> [{Dep, App} || Dep <- Deps] end, + fun ({App, _Deps}) -> [{App, App}] end, + fun ({App, Deps}) -> [{Dep, App} || Dep <- Deps] end, [{App, app_dependencies(App)} || {App, _Desc, _Vsn} <- application:loaded_applications()]), try diff --git a/src/rabbit.erl b/src/rabbit.erl index 29e38c1f..4901ea17 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -22,9 +22,8 @@ stop_and_halt/0, await_startup/0, status/0, is_running/0, is_running/1, environment/0, rotate_logs/1, force_event_refresh/1, start_fhc/0]). - -export([start/2, stop/1]). - +-export([start_apps/1, stop_apps/1]). -export([log_location/1]). %% for testing %%--------------------------------------------------------------------------- @@ -211,6 +210,7 @@ %% this really should be an abstract type -type(log_location() :: 'tty' | 'undefined' | file:filename()). -type(param() :: atom()). +-type(app_name() :: atom()). -spec(start/0 :: () -> 'ok'). -spec(boot/0 :: () -> 'ok'). @@ -242,6 +242,8 @@ -spec(maybe_insert_default_data/0 :: () -> 'ok'). -spec(boot_delegate/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). +-spec(start_apps/1 :: ([app_name()]) -> 'ok'). +-spec(stop_apps/1 :: ([app_name()]) -> 'ok'). -endif. @@ -312,9 +314,7 @@ start() -> ok = ensure_working_log_handlers(), rabbit_node_monitor:prepare_cluster_status_files(), rabbit_mnesia:check_cluster_consistency(), - ok = app_utils:start_applications( - app_startup_order(), fun handle_app_error/2), - ok = log_broker_started(rabbit_plugins:active()) + broker_start() end). boot() -> @@ -329,21 +329,14 @@ boot() -> %% the upgrade, since if we are a secondary node the %% primary node will have forgotten us rabbit_mnesia:check_cluster_consistency(), - Plugins = rabbit_plugins:setup(), - ToBeLoaded = Plugins ++ ?APPS, - ok = app_utils:load_applications(ToBeLoaded), - StartupApps = app_utils:app_dependency_order(ToBeLoaded, - false), - ok = app_utils:start_applications( - StartupApps, fun handle_app_error/2), - ok = log_broker_started(Plugins) + broker_start() end). -handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) -> - throw({could_not_start, App, Reason}); - -handle_app_error(App, Reason) -> - throw({could_not_start, App, Reason}). +broker_start() -> + Plugins = rabbit_plugins:setup(), + ToBeLoaded = Plugins ++ ?APPS, + start_apps(ToBeLoaded), + ok = log_broker_started(rabbit_plugins:active()). start_it(StartFun) -> Marker = spawn_link(fun() -> receive stop -> ok end end), @@ -374,7 +367,7 @@ stop() -> _ -> await_startup() end, rabbit_log:info("Stopping RabbitMQ~n"), - ok = app_utils:stop_applications(app_shutdown_order()). + stop_apps(app_shutdown_order()). stop_and_halt() -> try @@ -385,6 +378,36 @@ stop_and_halt() -> end, ok. +start_apps(Apps) -> + app_utils:load_applications(Apps), + OrderedApps = app_utils:app_dependency_order(Apps, false), + case lists:member(rabbit, Apps) of + false -> run_boot_steps(Apps); %% plugin activation + true -> ok %% will run during start of rabbit app + end, + ok = app_utils:start_applications(OrderedApps, + handle_app_error(could_not_start)). + +stop_apps(Apps) -> + ok = app_utils:stop_applications( + Apps, handle_app_error(error_during_shutdown)), + case lists:member(rabbit, Apps) of + false -> run_cleanup_steps(Apps); %% plugin deactivation + true -> ok %% it's all going anyway + end, + ok. + +handle_app_error(Term) -> + fun(App, {bad_return, {_MFA, {'EXIT', {ExitReason, _}}}}) -> + throw({Term, App, ExitReason}); + (App, Reason) -> + throw({Term, App, Reason}) + end. + +run_cleanup_steps(Apps) -> + [run_step(Name, Attrs, cleanup) || {_, Name, Attrs} <- find_steps(Apps)], + ok. + await_startup() -> app_utils:wait_for_applications(app_startup_order()). @@ -468,7 +491,7 @@ start(normal, []) -> true = register(rabbit, self()), print_banner(), log_banner(), - [ok = run_boot_step(Step) || Step <- boot_steps()], + run_boot_steps(), {ok, SupPid}; Error -> Error @@ -496,29 +519,40 @@ app_shutdown_order() -> %%--------------------------------------------------------------------------- %% boot step logic -run_boot_step({_StepName, Attributes}) -> - case [MFA || {mfa, MFA} <- Attributes] of +run_boot_steps() -> + run_boot_steps([App || {App, _, _} <- application:loaded_applications()]). + +run_boot_steps(Apps) -> + [ok = run_step(Step, Attrs, mfa) || {_, Step, Attrs} <- find_steps(Apps)], + ok. + +find_steps(Apps) -> + All = sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)), + [Step || {App, _, _} = Step <- All, lists:member(App, Apps)]. + +run_step(StepName, Attributes, AttributeName) -> + case [MFA || {Key, MFA} <- Attributes, + Key =:= AttributeName] of [] -> ok; MFAs -> [try apply(M,F,A) of - ok -> ok; - {error, Reason} -> boot_error(Reason, not_available) + ok -> ok; + {error, Reason} -> boot_error({boot_step, StepName, Reason}, + not_available) catch - _:Reason -> boot_error(Reason, erlang:get_stacktrace()) + _:Reason -> boot_error({boot_step, StepName, Reason}, + erlang:get_stacktrace()) end || {M,F,A} <- MFAs], ok end. -boot_steps() -> - sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)). - -vertices(_Module, Steps) -> - [{StepName, {StepName, Atts}} || {StepName, Atts} <- Steps]. +vertices({AppName, _Module, Steps}) -> + [{StepName, {AppName, StepName, Atts}} || {StepName, Atts} <- Steps]. -edges(_Module, Steps) -> +edges({_AppName, _Module, Steps}) -> [case Key of requires -> {StepName, OtherStep}; enables -> {OtherStep, StepName} @@ -527,7 +561,7 @@ edges(_Module, Steps) -> Key =:= requires orelse Key =:= enables]. sort_boot_steps(UnsortedSteps) -> - case rabbit_misc:build_acyclic_graph(fun vertices/2, fun edges/2, + case rabbit_misc:build_acyclic_graph(fun vertices/1, fun edges/1, UnsortedSteps) of {ok, G} -> %% Use topological sort to find a consistent ordering (if @@ -541,8 +575,8 @@ sort_boot_steps(UnsortedSteps) -> digraph:delete(G), %% Check that all mentioned {M,F,A} triples are exported. case [{StepName, {M,F,A}} || - {StepName, Attributes} <- SortedSteps, - {mfa, {M,F,A}} <- Attributes, + {_App, StepName, Attributes} <- SortedSteps, + {mfa, {M,F,A}} <- Attributes, not erlang:function_exported(M, F, length(A))] of [] -> SortedSteps; MissingFunctions -> basic_boot_error( diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1aba7ecb..8a1d162a 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -18,7 +18,7 @@ -export([recover/0, stop/0, start/1, declare/5, declare/6, delete_immediately/1, delete/3, purge/1, forget_all_durable/1]). --export([pseudo_queue/2]). +-export([pseudo_queue/2, immutable/1]). -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, @@ -30,7 +30,7 @@ -export([notify_sent/2, notify_sent_queue_down/1, resume/2]). -export([notify_down_all/2, activate_limit_all/2, credit/5]). -export([on_node_down/1]). --export([update/2, store_queue/1, policy_changed/2]). +-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]). -export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]). @@ -176,7 +176,9 @@ -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()). +-spec(immutable/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()). -spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok'). +-spec(update_decorators/1 :: (name()) -> 'ok'). -spec(policy_changed/2 :: (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -spec(start_mirroring/1 :: (pid()) -> 'ok'). @@ -254,15 +256,16 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> %% effect) this might not be possible to satisfy. declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> ok = check_declare_arguments(QueueName, Args), - Q = rabbit_policy:set(#amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, - exclusive_owner = Owner, - pid = none, - slave_pids = [], - sync_slave_pids = [], - gm_pids = []}), + Q = rabbit_queue_decorator:set( + rabbit_policy:set(#amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = none, + slave_pids = [], + sync_slave_pids = [], + gm_pids = []})), Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity). @@ -308,12 +311,24 @@ store_queue(Q = #amqqueue{durable = true}) -> ok = mnesia:write(rabbit_durable_queue, Q#amqqueue{slave_pids = [], sync_slave_pids = [], - gm_pids = []}, write), - ok = mnesia:write(rabbit_queue, Q, write), - ok; + gm_pids = [], + decorators = undefined}, write), + store_queue_ram(Q); store_queue(Q = #amqqueue{durable = false}) -> - ok = mnesia:write(rabbit_queue, Q, write), - ok. + store_queue_ram(Q). + +store_queue_ram(Q) -> + ok = mnesia:write(rabbit_queue, rabbit_queue_decorator:set(Q), write). + +update_decorators(Name) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + case mnesia:wread({rabbit_queue, Name}) of + [Q] -> store_queue_ram(Q), + ok; + [] -> ok + end + end). policy_changed(Q1 = #amqqueue{decorators = Decorators1}, Q2 = #amqqueue{decorators = Decorators2}) -> @@ -709,6 +724,13 @@ pseudo_queue(QueueName, Pid) -> pid = Pid, slave_pids = []}. +immutable(Q) -> Q#amqqueue{pid = none, + slave_pids = none, + sync_slave_pids = none, + gm_pids = none, + policy = none, + decorators = none}. + deliver([], _Delivery, _Flow) -> %% /dev/null optimisation []; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9b785303..97206df3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -385,12 +385,12 @@ ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined, V when V > 0 -> V + 999; %% always fire later _ -> 0 end) div 1000, - TRef = erlang:send_after(After, self(), {drop_expired, Version}), + TRef = rabbit_misc:send_after(After, self(), {drop_expired, Version}), State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry}; ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef, ttl_timer_expiry = TExpiry}) when Expiry + 1000 < TExpiry -> - case erlang:cancel_timer(TRef) of + case rabbit_misc:cancel_timer(TRef) of false -> State; _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined}) end; @@ -1165,7 +1165,7 @@ handle_cast({force_event_refresh, Ref}, emit_consumer_created( Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref) end, - noreply(State); + noreply(rabbit_event:init_stats_timer(State, #q.stats_timer)); handle_cast(notify_decorators, State) -> notify_decorators(State), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 74f9cacf..bb626554 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -341,7 +341,7 @@ handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) -> handle_cast({force_event_refresh, Ref}, State) -> rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State), Ref), - noreply(State); + noreply(rabbit_event:init_stats_timer(State, #ch.stats_timer)); handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) -> %% NB: don't call noreply/1 since we don't want to send confirms. @@ -992,7 +992,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, QueueName, fun (Q) -> ok = rabbit_amqqueue:assert_equivalence( Q, Durable, AutoDelete, Args, Owner), - rabbit_amqqueue:stat(Q) + maybe_stat(NoWait, Q) end) of {ok, MessageCount, ConsumerCount} -> return_queue_declare_ok(QueueName, NoWait, MessageCount, @@ -1048,7 +1048,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} = rabbit_amqqueue:with_or_die( - QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end), + QueueName, fun (Q) -> {maybe_stat(NoWait, Q), Q} end), ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid), return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); @@ -1204,6 +1204,9 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, E end. +maybe_stat(false, Q) -> rabbit_amqqueue:stat(Q); +maybe_stat(true, _Q) -> {ok, 0, 0}. + consumer_monitor(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, queue_monitors = QMons, diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl index 81c17fbf..db9349ac 100644 --- a/src/rabbit_channel_interceptor.erl +++ b/src/rabbit_channel_interceptor.erl @@ -33,7 +33,7 @@ -callback description() -> [proplists:property()]. -callback intercept(original_method(), rabbit_types:vhost()) -> - rabbit_types:ok_or_error2(processed_method(), any()). + processed_method() | rabbit_misc:channel_or_connection_exit(). %% Whether the interceptor wishes to intercept the amqp method -callback applies_to(intercept_method()) -> boolean(). @@ -62,20 +62,15 @@ intercept_method(M, VHost) -> intercept_method(M, _VHost, []) -> M; intercept_method(M, VHost, [I]) -> - case I:intercept(M, VHost) of - {ok, M2} -> - case validate_method(M, M2) of - true -> - M2; - _ -> - internal_error("Interceptor: ~p expected " - "to return method: ~p but returned: ~p", - [I, rabbit_misc:method_record_type(M), - rabbit_misc:method_record_type(M2)]) - end; - {error, Reason} -> - internal_error("Interceptor: ~p failed with reason: ~p", - [I, Reason]) + M2 = I:intercept(M, VHost), + case validate_method(M, M2) of + true -> + M2; + _ -> + internal_error("Interceptor: ~p expected " + "to return method: ~p but returned: ~p", + [I, rabbit_misc:method_record_type(M), + rabbit_misc:method_record_type(M2)]) end; intercept_method(M, _VHost, Is) -> internal_error("More than one interceptor for method: ~p -- ~p", diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index ec32e687..728bc431 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -129,6 +129,9 @@ is_cycle(Queue, Deaths) -> {longstr, <<"rejected">>} =/= rabbit_misc:table_lookup(D, <<"reason">>); (_) -> + %% There was something we didn't expect, therefore + %% a client must have put it there, therefore the + %% cycle was not "fully automatic". false end, Cycle ++ [H]) end. diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index b867223b..a33103fd 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -23,6 +23,7 @@ ensure_stats_timer/3, stop_stats_timer/2, reset_stats_timer/2]). -export([stats_level/2, if_enabled/3]). -export([notify/2, notify/3, notify_if/3]). +-export([sync_notify/2, sync_notify/3]). %%---------------------------------------------------------------------------- @@ -61,6 +62,9 @@ -spec(notify/2 :: (event_type(), event_props()) -> 'ok'). -spec(notify/3 :: (event_type(), event_props(), reference() | 'none') -> 'ok'). -spec(notify_if/3 :: (boolean(), event_type(), event_props()) -> 'ok'). +-spec(sync_notify/2 :: (event_type(), event_props()) -> 'ok'). +-spec(sync_notify/3 :: (event_type(), event_props(), + reference() | 'none') -> 'ok'). -endif. @@ -145,7 +149,16 @@ notify_if(false, _Type, _Props) -> ok. notify(Type, Props) -> notify(Type, Props, none). notify(Type, Props, Ref) -> - gen_event:notify(?MODULE, #event{type = Type, - props = Props, - reference = Ref, - timestamp = os:timestamp()}). + gen_event:notify(?MODULE, event_cons(Type, Props, Ref)). + +sync_notify(Type, Props) -> sync_notify(Type, Props, none). + +sync_notify(Type, Props, Ref) -> + gen_event:sync_notify(?MODULE, event_cons(Type, Props, Ref)). + +event_cons(Type, Props, Ref) -> + #event{type = Type, + props = Props, + reference = Ref, + timestamp = os:timestamp()}. + diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 4d4a2a58..a1772f0a 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -20,7 +20,8 @@ -export([recover/0, policy_changed/2, callback/4, declare/6, assert_equivalence/6, assert_args_equivalence/2, check_type/1, - lookup/1, lookup_or_die/1, list/1, lookup_scratch/2, update_scratch/3, + lookup/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2, + update_scratch/3, update_decorators/1, immutable/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, route/2, delete/2, validate_binding/2]). %% these must be run inside a mnesia tx @@ -61,6 +62,7 @@ -spec(lookup_or_die/1 :: (name()) -> rabbit_types:exchange() | rabbit_types:channel_exit()). +-spec(list/0 :: () -> [rabbit_types:exchange()]). -spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]). -spec(lookup_scratch/2 :: (name(), atom()) -> rabbit_types:ok(term()) | @@ -70,6 +72,8 @@ (name(), fun((rabbit_types:exchange()) -> rabbit_types:exchange())) -> not_found | rabbit_types:exchange()). +-spec(update_decorators/1 :: (name()) -> 'ok'). +-spec(immutable/1 :: (rabbit_types:exchange()) -> rabbit_types:exchange()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()). -spec(info/2 :: @@ -106,24 +110,15 @@ recover() -> mnesia:read({rabbit_exchange, XName}) =:= [] end, fun (X, Tx) -> - case Tx of - true -> store(X); - false -> ok - end, - callback(X, create, map_create_tx(Tx), [X]) + X1 = case Tx of + true -> store_ram(X); + false -> rabbit_exchange_decorator:set(X) + end, + callback(X1, create, map_create_tx(Tx), [X1]) end, rabbit_durable_exchange), - report_missing_decorators(Xs), [XName || #exchange{name = XName} <- Xs]. -report_missing_decorators(Xs) -> - Mods = lists:usort(lists:append([rabbit_exchange_decorator:select(raw, D) || - #exchange{decorators = D} <- Xs])), - case [M || M <- Mods, code:which(M) =:= non_existing] of - [] -> ok; - M -> rabbit_log:warning("Missing exchange decorators: ~p~n", [M]) - end. - callback(X = #exchange{type = XType, decorators = Decorators}, Fun, Serial0, Args) -> Serial = if is_function(Serial0) -> Serial0; @@ -158,12 +153,13 @@ serial(#exchange{name = XName} = X) -> end. declare(XName, Type, Durable, AutoDelete, Internal, Args) -> - X = rabbit_policy:set(#exchange{name = XName, - type = Type, - durable = Durable, - auto_delete = AutoDelete, - internal = Internal, - arguments = Args}), + X = rabbit_exchange_decorator:set( + rabbit_policy:set(#exchange{name = XName, + type = Type, + durable = Durable, + auto_delete = AutoDelete, + internal = Internal, + arguments = Args})), XT = type_to_module(Type), %% We want to upset things if it isn't ok ok = XT:validate(X), @@ -171,13 +167,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> fun () -> case mnesia:wread({rabbit_exchange, XName}) of [] -> - store(X), - ok = case Durable of - true -> mnesia:write(rabbit_durable_exchange, - X, write); - false -> ok - end, - {new, X}; + {new, store(X)}; [ExistingX] -> {existing, ExistingX} end @@ -195,7 +185,19 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> map_create_tx(true) -> transaction; map_create_tx(false) -> none. -store(X) -> ok = mnesia:write(rabbit_exchange, X, write). + +store(X = #exchange{durable = true}) -> + mnesia:write(rabbit_durable_exchange, X#exchange{decorators = undefined}, + write), + store_ram(X); +store(X = #exchange{durable = false}) -> + store_ram(X). + +store_ram(X) -> + X1 = rabbit_exchange_decorator:set(X), + ok = mnesia:write(rabbit_exchange, rabbit_exchange_decorator:set(X1), + write), + X1. %% Used with binaries sent over the wire; the type may not exist. check_type(TypeBin) -> @@ -243,6 +245,8 @@ lookup_or_die(Name) -> {error, not_found} -> rabbit_misc:not_found(Name) end. +list() -> mnesia:dirty_match_object(rabbit_exchange, #exchange{_ = '_'}). + %% Not dirty_match_object since that would not be transactional when used in a %% tx context list(VHostPath) -> @@ -287,20 +291,27 @@ update_scratch(Name, App, Fun) -> ok end). +update_decorators(Name) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + case mnesia:wread({rabbit_exchange, Name}) of + [X] -> store_ram(X), + ok; + [] -> ok + end + end). + update(Name, Fun) -> case mnesia:wread({rabbit_exchange, Name}) of - [X = #exchange{durable = Durable}] -> - X1 = Fun(X), - ok = mnesia:write(rabbit_exchange, X1, write), - case Durable of - true -> ok = mnesia:write(rabbit_durable_exchange, X1, write); - _ -> ok - end, - X1; - [] -> - not_found + [X] -> X1 = Fun(X), + store(X1); + [] -> not_found end. +immutable(X) -> X#exchange{scratches = none, + policy = none, + decorators = none}. + info_keys() -> ?INFO_KEYS. map(VHostPath, F) -> diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl index 2f056b1b..900f9c32 100644 --- a/src/rabbit_exchange_decorator.erl +++ b/src/rabbit_exchange_decorator.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([select/2, set/1]). +-export([select/2, set/1, register/2, unregister/1]). %% This is like an exchange type except that: %% @@ -104,3 +104,25 @@ list() -> [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)]. cons_if_eq(Select, Select, Item, List) -> [Item | List]; cons_if_eq(_Select, _Other, _Item, List) -> List. + +register(TypeName, ModuleName) -> + rabbit_registry:register(exchange_decorator, TypeName, ModuleName), + [maybe_recover(X) || X <- rabbit_exchange:list()], + ok. + +unregister(TypeName) -> + rabbit_registry:unregister(exchange_decorator, TypeName), + [maybe_recover(X) || X <- rabbit_exchange:list()], + ok. + +maybe_recover(X = #exchange{name = Name, + decorators = Decs}) -> + #exchange{decorators = Decs1} = set(X), + Old = lists:sort(select(all, Decs)), + New = lists:sort(select(all, Decs1)), + case New of + Old -> ok; + _ -> %% TODO create a tx here for non-federation decorators + [M:create(none, X) || M <- New -- Old], + rabbit_exchange:update_decorators(Name) + end. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 2b16b911..24b22d4c 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -170,10 +170,24 @@ terminate({shutdown, dropped} = Reason, State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}; terminate(Reason, - State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + State = #state { name = QName, + backing_queue = BQ, + backing_queue_state = BQS }) -> %% Backing queue termination. The queue is going down but %% shouldn't be deleted. Most likely safe shutdown of this - %% node. Thus just let some other slave take over. + %% node. + {ok, Q = #amqqueue{sync_slave_pids = SSPids}} = + rabbit_amqqueue:lookup(QName), + case SSPids =:= [] andalso + rabbit_policy:get(<<"ha-promote-on-shutdown">>, Q) =/= <<"always">> of + true -> %% Remove the whole queue to avoid data loss + rabbit_mirror_queue_misc:log_warning( + QName, "Stopping all nodes on master shutdown since no " + "synchronised slave is available~n", []), + stop_all_slaves(Reason, State); + false -> %% Just let some other slave take over. + ok + end, State #state { backing_queue_state = BQ:terminate(Reason, BQS) }. delete_and_terminate(Reason, State = #state { backing_queue = BQ, @@ -181,7 +195,7 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ, stop_all_slaves(Reason, State), State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}. -stop_all_slaves(Reason, #state{name = QName, gm = GM}) -> +stop_all_slaves(Reason, #state{name = QName, gm = GM}) -> {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), MRefs = [erlang:monitor(process, Pid) || Pid <- [GM | SPids]], ok = gm:broadcast(GM, {delete_and_terminate, Reason}), diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index b0f092a9..7aec1ac8 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -29,16 +29,19 @@ -include("rabbit.hrl"). --rabbit_boot_step({?MODULE, - [{description, "HA policy validation"}, - {mfa, {rabbit_registry, register, - [policy_validator, <<"ha-mode">>, ?MODULE]}}, - {mfa, {rabbit_registry, register, - [policy_validator, <<"ha-params">>, ?MODULE]}}, - {mfa, {rabbit_registry, register, - [policy_validator, <<"ha-sync-mode">>, ?MODULE]}}, - {requires, rabbit_registry}, - {enables, recovery}]}). +-rabbit_boot_step( + {?MODULE, + [{description, "HA policy validation"}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-mode">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-params">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-sync-mode">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, recovery}]}). %%---------------------------------------------------------------------------- @@ -374,16 +377,21 @@ validate_policy(KeyList) -> Mode = proplists:get_value(<<"ha-mode">>, KeyList, none), Params = proplists:get_value(<<"ha-params">>, KeyList, none), SyncMode = proplists:get_value(<<"ha-sync-mode">>, KeyList, none), - case {Mode, Params, SyncMode} of - {none, none, none} -> + PromoteOnShutdown = proplists:get_value( + <<"ha-promote-on-shutdown">>, KeyList, none), + case {Mode, Params, SyncMode, PromoteOnShutdown} of + {none, none, none, none} -> ok; - {none, _, _} -> - {error, "ha-mode must be specified to specify ha-params or " - "ha-sync-mode", []}; + {none, _, _, _} -> + {error, "ha-mode must be specified to specify ha-params, " + "ha-sync-mode or ha-promote-on-shutdown", []}; _ -> case module(Mode) of {ok, M} -> case M:validate_policy(Params) of - ok -> validate_sync_mode(SyncMode); + ok -> case validate_sync_mode(SyncMode) of + ok -> validate_pos(PromoteOnShutdown); + E -> E + end; E -> E end; _ -> {error, "~p is not a valid ha-mode value", [Mode]} @@ -398,3 +406,12 @@ validate_sync_mode(SyncMode) -> Mode -> {error, "ha-sync-mode must be \"manual\" " "or \"automatic\", got ~p", [Mode]} end. + +validate_pos(PromoteOnShutdown) -> + case PromoteOnShutdown of + <<"always">> -> ok; + <<"when-synced">> -> ok; + none -> ok; + Mode -> {error, "ha-promote-on-shutdown must be " + "\"always\" or \"when-synced\", got ~p", [Mode]} + end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 11d6a79c..cc06ae44 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -653,8 +653,9 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) -> timed -> {ensure_sync_timer(State1), 0 } end. -backing_queue_timeout(State = #state { backing_queue = BQ }) -> - run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State). +backing_queue_timeout(State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + State#state{backing_queue_state = BQ:timeout(BQS)}. ensure_sync_timer(State) -> rabbit_misc:ensure_timer(State, #state.sync_timer_ref, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 58e93a3f..6f353da5 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -67,7 +67,7 @@ -export([check_expiry/1]). -export([base64url/1]). -export([interval_operation/4]). --export([ensure_timer/4, stop_timer/2]). +-export([ensure_timer/4, stop_timer/2, send_after/3, cancel_timer/1]). -export([get_parent/0]). -export([store_proc_name/1, store_proc_name/2]). -export([moving_average/4]). @@ -81,7 +81,7 @@ -ifdef(use_specs). --export_type([resource_name/0, thunk/1]). +-export_type([resource_name/0, thunk/1, channel_or_connection_exit/0]). -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). @@ -94,6 +94,7 @@ fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])). -type(graph_edge_fun() :: fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])). +-type(tref() :: {'erlang', reference()} | {timer, timer:tref()}). -spec(method_record_type/1 :: (rabbit_framing:amqp_method_record()) -> rabbit_framing:amqp_method_name()). @@ -209,7 +210,8 @@ [string()]) -> {'ok', {atom(), [{string(), string()}], [string()]}} | 'no_command'). --spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]). +-spec(all_module_attributes/1 :: + (atom()) -> [{atom(), atom(), [term()]}]). -spec(build_acyclic_graph/3 :: (graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}]) -> rabbit_types:ok_or_error2(digraph(), @@ -245,6 +247,8 @@ -> {any(), non_neg_integer()}). -spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A). -spec(stop_timer/2 :: (A, non_neg_integer()) -> A). +-spec(send_after/3 :: (non_neg_integer(), pid(), any()) -> tref()). +-spec(cancel_timer/1 :: (tref()) -> 'ok'). -spec(get_parent/0 :: () -> pid()). -spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok). -spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok). @@ -849,20 +853,20 @@ module_attributes(Module) -> end. all_module_attributes(Name) -> - Modules = + Targets = lists:usort( lists:append( - [Modules || {App, _, _} <- application:loaded_applications(), - {ok, Modules} <- [application:get_key(App, modules)]])), + [[{App, Module} || Module <- Modules] || + {App, _, _} <- application:loaded_applications(), + {ok, Modules} <- [application:get_key(App, modules)]])), lists:foldl( - fun (Module, Acc) -> + fun ({App, Module}, Acc) -> case lists:append([Atts || {N, Atts} <- module_attributes(Module), N =:= Name]) of [] -> Acc; - Atts -> [{Module, Atts} | Acc] + Atts -> [{App, Module, Atts} | Acc] end - end, [], Modules). - + end, [], Targets). build_acyclic_graph(VertexFun, EdgeFun, Graph) -> G = digraph:new([acyclic]), @@ -870,13 +874,13 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) -> [case digraph:vertex(G, Vertex) of false -> digraph:add_vertex(G, Vertex, Label); _ -> ok = throw({graph_error, {vertex, duplicate, Vertex}}) - end || {Module, Atts} <- Graph, - {Vertex, Label} <- VertexFun(Module, Atts)], + end || GraphElem <- Graph, + {Vertex, Label} <- VertexFun(GraphElem)], [case digraph:add_edge(G, From, To) of {error, E} -> throw({graph_error, {edge, E, From, To}}); _ -> ok - end || {Module, Atts} <- Graph, - {From, To} <- EdgeFun(Module, Atts)], + end || GraphElem <- Graph, + {From, To} <- EdgeFun(GraphElem)], {ok, G} catch {graph_error, Reason} -> true = digraph:delete(G), @@ -1012,7 +1016,6 @@ term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse V =:= true orelse V =:= false -> V. -check_expiry(N) when N > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, N}}; check_expiry(N) when N < 0 -> {error, {value_negative, N}}; check_expiry(_N) -> ok. @@ -1040,7 +1043,7 @@ interval_operation({M, F, A}, MaxRatio, IdealInterval, LastInterval) -> ensure_timer(State, Idx, After, Msg) -> case element(Idx, State) of - undefined -> TRef = erlang:send_after(After, self(), Msg), + undefined -> TRef = send_after(After, self(), Msg), setelement(Idx, State, TRef); _ -> State end. @@ -1048,12 +1051,25 @@ ensure_timer(State, Idx, After, Msg) -> stop_timer(State, Idx) -> case element(Idx, State) of undefined -> State; - TRef -> case erlang:cancel_timer(TRef) of - false -> State; - _ -> setelement(Idx, State, undefined) - end + TRef -> cancel_timer(TRef), + setelement(Idx, State, undefined) end. +%% timer:send_after/3 goes through a single timer process but allows +%% long delays. erlang:send_after/3 does not have a bottleneck but +%% only allows max 2^32-1 millis. +-define(MAX_ERLANG_SEND_AFTER, 4294967295). +send_after(Millis, Pid, Msg) when Millis > ?MAX_ERLANG_SEND_AFTER -> + {ok, Ref} = timer:send_after(Millis, Pid, Msg), + {timer, Ref}; +send_after(Millis, Pid, Msg) -> + {erlang, erlang:send_after(Millis, Pid, Msg)}. + +cancel_timer({erlang, Ref}) -> erlang:cancel_timer(Ref), + ok; +cancel_timer({timer, Ref}) -> {ok, cancel} = timer:cancel(Ref), + ok. + store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}). store_proc_name(TypeProcName) -> put(process_name, TypeProcName). diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index c0fb05e2..7817626c 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -18,6 +18,7 @@ -include("rabbit.hrl"). -export([setup/0, active/0, read_enabled/1, list/1, dependencies/3]). +-export([ensure/1]). %%---------------------------------------------------------------------------- @@ -31,22 +32,54 @@ -spec(read_enabled/1 :: (file:filename()) -> [plugin_name()]). -spec(dependencies/3 :: (boolean(), [plugin_name()], [#plugin{}]) -> [plugin_name()]). - +-spec(ensure/1 :: (string()) -> {'ok', [atom()], [atom()]} | {error, any()}). -endif. %%---------------------------------------------------------------------------- +ensure(FileJustChanged) -> + {ok, OurFile} = application:get_env(rabbit, enabled_plugins_file), + case OurFile of + FileJustChanged -> + {ok, Dir} = application:get_env(rabbit, plugins_dir), + Enabled = read_enabled(OurFile), + Wanted = dependencies(false, Enabled, list(Dir)), + prepare_plugins(Enabled), + Current = active(), + Start = Wanted -- Current, + Stop = Current -- Wanted, + rabbit:start_apps(Start), + %% We need sync_notify here since mgmt will attempt to look at all + %% the modules for the disabled plugins - if they are unloaded + %% that won't work. + ok = rabbit_event:sync_notify(plugins_changed, [{enabled, Start}, + {disabled, Stop}]), + rabbit:stop_apps(Stop), + clean_plugins(Stop), + {ok, Start, Stop}; + _ -> + {error, {enabled_plugins_mismatch, FileJustChanged, OurFile}} + end. + %% @doc Prepares the file system and installs all enabled plugins. setup() -> - {ok, PluginDir} = application:get_env(rabbit, plugins_dir), {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), + + %% Eliminate the contents of the destination directory + case delete_recursively(ExpandDir) of + ok -> ok; + {error, E1} -> throw({error, {cannot_delete_plugins_expand_dir, + [ExpandDir, E1]}}) + end, + {ok, EnabledFile} = application:get_env(rabbit, enabled_plugins_file), - prepare_plugins(EnabledFile, PluginDir, ExpandDir). + Enabled = read_enabled(EnabledFile), + prepare_plugins(Enabled). %% @doc Lists the plugins which are currently running. active() -> {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), - InstalledPlugins = [ P#plugin.name || P <- list(ExpandDir) ], + InstalledPlugins = plugin_names(list(ExpandDir)), [App || {App, _, _} <- rabbit_misc:which_applications(), lists:member(App, InstalledPlugins)]. @@ -67,7 +100,7 @@ list(PluginsDir) -> _ -> error_logger:warning_msg( "Problem reading some plugins: ~p~n", [Problems]) end, - Plugins. + ensure_dependencies(Plugins). %% @doc Read the list of enabled plugins from the supplied term file. read_enabled(PluginsFile) -> @@ -86,15 +119,10 @@ read_enabled(PluginsFile) -> %% the resulting list, otherwise they're skipped. dependencies(Reverse, Sources, AllPlugins) -> {ok, G} = rabbit_misc:build_acyclic_graph( - fun (App, _Deps) -> [{App, App}] end, - fun (App, Deps) -> [{App, Dep} || Dep <- Deps] end, - lists:ukeysort( - 1, [{Name, Deps} || - #plugin{name = Name, - dependencies = Deps} <- AllPlugins] ++ - [{Dep, []} || - #plugin{dependencies = Deps} <- AllPlugins, - Dep <- Deps])), + fun ({App, _Deps}) -> [{App, App}] end, + fun ({App, Deps}) -> [{App, Dep} || Dep <- Deps] end, + [{Name, Deps} || #plugin{name = Name, + dependencies = Deps} <- AllPlugins]), Dests = case Reverse of false -> digraph_utils:reachable(Sources, G); true -> digraph_utils:reaching(Sources, G) @@ -102,11 +130,35 @@ dependencies(Reverse, Sources, AllPlugins) -> true = digraph:delete(G), Dests. +%% Make sure we don't list OTP apps in here, and also that we create +%% fake plugins for missing dependencies. +ensure_dependencies(Plugins) -> + Names = plugin_names(Plugins), + NotThere = [Dep || #plugin{dependencies = Deps} <- Plugins, + Dep <- Deps, + not lists:member(Dep, Names)], + {OTP, Missing} = lists:partition(fun is_loadable/1, NotThere), + Plugins1 = [P#plugin{dependencies = Deps -- OTP} + || P = #plugin{dependencies = Deps} <- Plugins], + Fake = [#plugin{name = Name, + dependencies = []}|| Name <- Missing], + Plugins1 ++ Fake. + +is_loadable(App) -> + case application:load(App) of + {error, {already_loaded, _}} -> true; + ok -> application:unload(App), + true; + _ -> false + end. + %%---------------------------------------------------------------------------- -prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) -> +prepare_plugins(Enabled) -> + {ok, PluginsDistDir} = application:get_env(rabbit, plugins_dir), + {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), + AllPlugins = list(PluginsDistDir), - Enabled = read_enabled(EnabledFile), ToUnpack = dependencies(false, Enabled, AllPlugins), ToUnpackPlugins = lookup_plugins(ToUnpack, AllPlugins), @@ -117,12 +169,6 @@ prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) -> [Missing]) end, - %% Eliminate the contents of the destination directory - case delete_recursively(ExpandDir) of - ok -> ok; - {error, E1} -> throw({error, {cannot_delete_plugins_expand_dir, - [ExpandDir, E1]}}) - end, case filelib:ensure_dir(ExpandDir ++ "/") of ok -> ok; {error, E2} -> throw({error, {cannot_create_plugins_expand_dir, @@ -134,6 +180,20 @@ prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) -> [prepare_dir_plugin(PluginAppDescPath) || PluginAppDescPath <- filelib:wildcard(ExpandDir ++ "/*/ebin/*.app")]. +clean_plugins(Plugins) -> + {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), + [clean_plugin(Plugin, ExpandDir) || Plugin <- Plugins]. + +clean_plugin(Plugin, ExpandDir) -> + {ok, Mods} = application:get_key(Plugin, modules), + application:unload(Plugin), + [begin + code:soft_purge(Mod), + code:delete(Mod), + false = code:is_loaded(Mod) + end || Mod <- Mods], + delete_recursively(rabbit_misc:format("~s/~s", [ExpandDir, Plugin])). + prepare_dir_plugin(PluginAppDescPath) -> code:add_path(filename:dirname(PluginAppDescPath)), list_to_atom(filename:basename(PluginAppDescPath, ".app")). @@ -172,8 +232,7 @@ plugin_info(Base, {app, App0}) -> mkplugin(Name, Props, Type, Location) -> Version = proplists:get_value(vsn, Props, "0"), Description = proplists:get_value(description, Props, ""), - Dependencies = - filter_applications(proplists:get_value(applications, Props, [])), + Dependencies = proplists:get_value(applications, Props, []), #plugin{name = Name, version = Version, description = Description, dependencies = Dependencies, location = Location, type = Type}. @@ -206,18 +265,6 @@ parse_binary(Bin) -> Err -> {error, {invalid_app, Err}} end. -filter_applications(Applications) -> - [Application || Application <- Applications, - not is_available_app(Application)]. - -is_available_app(Application) -> - case application:load(Application) of - {error, {already_loaded, _}} -> true; - ok -> application:unload(Application), - true; - _ -> false - end. - plugin_names(Plugins) -> [Name || #plugin{name = Name} <- Plugins]. diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl index 89e16f14..98418d8c 100644 --- a/src/rabbit_plugins_main.erl +++ b/src/rabbit_plugins_main.erl @@ -18,23 +18,33 @@ -include("rabbit.hrl"). -export([start/0, stop/0]). +-export([action/6]). +-define(NODE_OPT, "-n"). -define(VERBOSE_OPT, "-v"). -define(MINIMAL_OPT, "-m"). -define(ENABLED_OPT, "-E"). -define(ENABLED_ALL_OPT, "-e"). +-define(OFFLINE_OPT, "--offline"). +-define(ONLINE_OPT, "--online"). +-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}). -define(VERBOSE_DEF, {?VERBOSE_OPT, flag}). -define(MINIMAL_DEF, {?MINIMAL_OPT, flag}). -define(ENABLED_DEF, {?ENABLED_OPT, flag}). -define(ENABLED_ALL_DEF, {?ENABLED_ALL_OPT, flag}). +-define(OFFLINE_DEF, {?OFFLINE_OPT, flag}). +-define(ONLINE_DEF, {?ONLINE_OPT, flag}). --define(GLOBAL_DEFS, []). +-define(RPC_TIMEOUT, infinity). + +-define(GLOBAL_DEFS(Node), [?NODE_DEF(Node)]). -define(COMMANDS, [{list, [?VERBOSE_DEF, ?MINIMAL_DEF, ?ENABLED_DEF, ?ENABLED_ALL_DEF]}, - enable, - disable]). + {enable, [?OFFLINE_DEF, ?ONLINE_DEF]}, + {disable, [?OFFLINE_DEF, ?ONLINE_DEF]}, + {sync, []}]). %%---------------------------------------------------------------------------- @@ -51,11 +61,10 @@ start() -> {ok, [[PluginsFile|_]|_]} = init:get_argument(enabled_plugins_file), + {ok, [[NodeStr|_]|_]} = init:get_argument(nodename), {ok, [[PluginsDir|_]|_]} = init:get_argument(plugins_dist_dir), {Command, Opts, Args} = - case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS, - init:get_plain_arguments()) - of + case parse_arguments(init:get_plain_arguments(), NodeStr) of {ok, Res} -> Res; no_command -> print_error("could not recognise command", []), usage() @@ -67,7 +76,8 @@ start() -> [string:join([atom_to_list(Command) | Args], " ")]) end, - case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of + Node = proplists:get_value(?NODE_OPT, Opts), + case catch action(Command, Node, Args, Opts, PluginsFile, PluginsDir) of ok -> rabbit_misc:quit(0); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> @@ -82,6 +92,13 @@ start() -> {error_string, Reason} -> print_error("~s", [Reason]), rabbit_misc:quit(2); + {badrpc, {'EXIT', Reason}} -> + print_error("~p", [Reason]), + rabbit_misc:quit(2); + {badrpc, Reason} -> + print_error("unable to connect to node ~w: ~w", [Node, Reason]), + print_badrpc_diagnostics([Node]), + rabbit_misc:quit(2); Other -> print_error("~p", [Other]), rabbit_misc:quit(2) @@ -92,20 +109,32 @@ stop() -> %%---------------------------------------------------------------------------- -action(list, [], Opts, PluginsFile, PluginsDir) -> - action(list, [".*"], Opts, PluginsFile, PluginsDir); -action(list, [Pat], Opts, PluginsFile, PluginsDir) -> - format_plugins(Pat, Opts, PluginsFile, PluginsDir); +parse_arguments(CmdLine, NodeStr) -> + case rabbit_misc:parse_arguments( + ?COMMANDS, ?GLOBAL_DEFS(NodeStr), CmdLine) of + {ok, {Cmd, Opts0, Args}} -> + Opts = [case K of + ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)}; + _ -> {K, V} + end || {K, V} <- Opts0], + {ok, {Cmd, Opts, Args}}; + E -> + E + end. + +action(list, Node, [], Opts, PluginsFile, PluginsDir) -> + action(list, Node, [".*"], Opts, PluginsFile, PluginsDir); +action(list, Node, [Pat], Opts, PluginsFile, PluginsDir) -> + format_plugins(Node, Pat, Opts, PluginsFile, PluginsDir); -action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) -> +action(enable, Node, ToEnable0, Opts, PluginsFile, PluginsDir) -> case ToEnable0 of [] -> throw({error_string, "Not enough arguments for 'enable'"}); _ -> ok end, AllPlugins = rabbit_plugins:list(PluginsDir), Enabled = rabbit_plugins:read_enabled(PluginsFile), - ImplicitlyEnabled = rabbit_plugins:dependencies(false, - Enabled, AllPlugins), + ImplicitlyEnabled = rabbit_plugins:dependencies(false, Enabled, AllPlugins), ToEnable = [list_to_atom(Name) || Name <- ToEnable0], Missing = ToEnable -- plugin_names(AllPlugins), NewEnabled = lists:usort(Enabled ++ ToEnable), @@ -124,18 +153,20 @@ action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) -> case NewEnabled -- ImplicitlyEnabled of [] -> io:format("Plugin configuration unchanged.~n"); _ -> print_list("The following plugins have been enabled:", - NewImplicitlyEnabled -- ImplicitlyEnabled), - report_change() - end; + NewImplicitlyEnabled -- ImplicitlyEnabled) + end, + action_change( + Opts, Node, ImplicitlyEnabled, NewImplicitlyEnabled, PluginsFile); -action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) -> +action(disable, Node, ToDisable0, Opts, PluginsFile, PluginsDir) -> case ToDisable0 of [] -> throw({error_string, "Not enough arguments for 'disable'"}); _ -> ok end, - ToDisable = [list_to_atom(Name) || Name <- ToDisable0], - Enabled = rabbit_plugins:read_enabled(PluginsFile), AllPlugins = rabbit_plugins:list(PluginsDir), + Enabled = rabbit_plugins:read_enabled(PluginsFile), + ImplicitlyEnabled = rabbit_plugins:dependencies(false, Enabled, AllPlugins), + ToDisable = [list_to_atom(Name) || Name <- ToDisable0], Missing = ToDisable -- plugin_names(AllPlugins), case Missing of [] -> ok; @@ -144,30 +175,35 @@ action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) -> end, ToDisableDeps = rabbit_plugins:dependencies(true, ToDisable, AllPlugins), NewEnabled = Enabled -- ToDisableDeps, + NewImplicitlyEnabled = rabbit_plugins:dependencies(false, + NewEnabled, AllPlugins), case length(Enabled) =:= length(NewEnabled) of true -> io:format("Plugin configuration unchanged.~n"); - false -> ImplicitlyEnabled = - rabbit_plugins:dependencies(false, Enabled, AllPlugins), - NewImplicitlyEnabled = - rabbit_plugins:dependencies(false, - NewEnabled, AllPlugins), - print_list("The following plugins have been disabled:", + false -> print_list("The following plugins have been disabled:", ImplicitlyEnabled -- NewImplicitlyEnabled), - write_enabled_plugins(PluginsFile, NewEnabled), - report_change() - end. + write_enabled_plugins(PluginsFile, NewEnabled) + end, + action_change( + Opts, Node, ImplicitlyEnabled, NewImplicitlyEnabled, PluginsFile); + +action(sync, Node, [], _Opts, PluginsFile, _PluginsDir) -> + sync(Node, true, PluginsFile). %%---------------------------------------------------------------------------- -print_error(Format, Args) -> - rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args). +fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args). + +print_error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args). + +print_badrpc_diagnostics(Nodes) -> + fmt_stderr(rabbit_nodes:diagnostics(Nodes), []). usage() -> io:format("~s", [rabbit_plugins_usage:usage()]), rabbit_misc:quit(1). %% Pretty print a list of plugins. -format_plugins(Pattern, Opts, PluginsFile, PluginsDir) -> +format_plugins(Node, Pattern, Opts, PluginsFile, PluginsDir) -> Verbose = proplists:get_bool(?VERBOSE_OPT, Opts), Minimal = proplists:get_bool(?MINIMAL_OPT, Opts), Format = case {Verbose, Minimal} of @@ -182,9 +218,14 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) -> AvailablePlugins = rabbit_plugins:list(PluginsDir), EnabledExplicitly = rabbit_plugins:read_enabled(PluginsFile), - EnabledImplicitly = - rabbit_plugins:dependencies(false, EnabledExplicitly, - AvailablePlugins) -- EnabledExplicitly, + AllEnabled = rabbit_plugins:dependencies(false, EnabledExplicitly, + AvailablePlugins), + EnabledImplicitly = AllEnabled -- EnabledExplicitly, + {StatusMsg, Running} = + case rpc:call(Node, rabbit_plugins, active, [], ?RPC_TIMEOUT) of + {badrpc, _} -> {"[failed to contact ~s - status not shown]", []}; + Active -> {"* = running on ~s", Active} + end, Missing = [#plugin{name = Name, dependencies = []} || Name <- ((EnabledExplicitly ++ EnabledImplicitly) -- plugin_names(AvailablePlugins))], @@ -192,31 +233,42 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) -> Plugins = [ Plugin || Plugin = #plugin{name = Name} <- AvailablePlugins ++ Missing, re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match, - if OnlyEnabled -> lists:member(Name, EnabledExplicitly); - OnlyEnabledAll -> (lists:member(Name, - EnabledExplicitly) or - lists:member(Name, EnabledImplicitly)); + if OnlyEnabled -> lists:member(Name, EnabledExplicitly); + OnlyEnabledAll -> lists:member(Name, EnabledExplicitly) or + lists:member(Name,EnabledImplicitly); true -> true end], Plugins1 = usort_plugins(Plugins), MaxWidth = lists:max([length(atom_to_list(Name)) || #plugin{name = Name} <- Plugins1] ++ [0]), - [format_plugin(P, EnabledExplicitly, EnabledImplicitly, + case Format of + minimal -> ok; + _ -> io:format(" Configured: E = explicitly enabled; " + "e = implicitly enabled; ! = missing~n" + " | Status: ~s~n" + " |/~n", [rabbit_misc:format(StatusMsg, [Node])]) + end, + [format_plugin(P, EnabledExplicitly, EnabledImplicitly, Running, plugin_names(Missing), Format, MaxWidth) || P <- Plugins1], ok. format_plugin(#plugin{name = Name, version = Version, description = Description, dependencies = Deps}, - EnabledExplicitly, EnabledImplicitly, Missing, - Format, MaxWidth) -> - Glyph = case {lists:member(Name, EnabledExplicitly), - lists:member(Name, EnabledImplicitly), - lists:member(Name, Missing)} of - {true, false, false} -> "[E]"; - {false, true, false} -> "[e]"; - {_, _, true} -> "[!]"; - _ -> "[ ]" - end, + EnabledExplicitly, EnabledImplicitly, Running, + Missing, Format, MaxWidth) -> + EnabledGlyph = case {lists:member(Name, EnabledExplicitly), + lists:member(Name, EnabledImplicitly), + lists:member(Name, Missing)} of + {true, false, false} -> "E"; + {false, true, false} -> "e"; + {_, _, true} -> "!"; + _ -> " " + end, + RunningGlyph = case lists:member(Name, Running) of + true -> "*"; + false -> " " + end, + Glyph = rabbit_misc:format("[~s~s]", [EnabledGlyph, RunningGlyph]), Opt = fun (_F, A, A) -> ok; ( F, A, _) -> io:format(F, [A]) end, @@ -227,9 +279,9 @@ format_plugin(#plugin{name = Name, version = Version, Opt("~s", Version, undefined), io:format("~n"); verbose -> io:format("~s ~w~n", [Glyph, Name]), - Opt(" Version: \t~s~n", Version, undefined), - Opt(" Dependencies:\t~p~n", Deps, []), - Opt(" Description: \t~s~n", Description, undefined), + Opt(" Version: \t~s~n", Version, undefined), + Opt(" Dependencies:\t~p~n", Deps, []), + Opt(" Description: \t~s~n", Description, undefined), io:format("~n") end. @@ -262,6 +314,51 @@ write_enabled_plugins(PluginsFile, Plugins) -> PluginsFile, Reason}}) end. -report_change() -> - io:format("Plugin configuration has changed. " - "Restart RabbitMQ for changes to take effect.~n"). +action_change(Opts, Node, Old, New, PluginsFile) -> + action_change0(proplists:get_bool(?OFFLINE_OPT, Opts), + proplists:get_bool(?ONLINE_OPT, Opts), + Node, Old, New, PluginsFile). + +action_change0(true, _Online, _Node, Same, Same, _PluginsFile) -> + %% Definitely nothing to do + ok; +action_change0(true, _Online, _Node, _Old, _New, _PluginsFile) -> + io:format("Offline change; changes will take effect at broker restart.~n"); +action_change0(false, Online, Node, _Old, _New, PluginsFile) -> + sync(Node, Online, PluginsFile). + +sync(Node, ForceOnline, PluginsFile) -> + rpc_call(Node, ForceOnline, rabbit_plugins, ensure, [PluginsFile]). + +rpc_call(Node, Online, Mod, Fun, Args) -> + io:format("Applying plugin configuration to ~s...", [Node]), + case rpc:call(Node, Mod, Fun, Args) of + {ok, [], []} -> + io:format(" nothing to do.~n", []); + {ok, Start, []} -> + io:format(" started ~b plugin~s.~n", [length(Start), plur(Start)]); + {ok, [], Stop} -> + io:format(" stopped ~b plugin~s.~n", [length(Stop), plur(Stop)]); + {ok, Start, Stop} -> + io:format(" stopped ~b plugin~s and started ~b plugin~s.~n", + [length(Stop), plur(Stop), length(Start), plur(Start)]); + {badrpc, nodedown} = Error -> + io:format(" failed.~n", []), + case Online of + true -> Error; + false -> io:format( + " * Could not contact node ~s.~n" + " * Changes will take effect at broker restart.~n" + " * Specify --online for diagnostics and to treat " + "this as a failure.~n" + " * Specify --offline to disable changes to running " + "broker.~n", + [Node]) + end; + Error -> + io:format(" failed.~n", []), + Error + end. + +plur([_]) -> ""; +plur(_) -> "s". diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index fe2b766f..3558cf98 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -61,13 +61,13 @@ validate_policy0(<<"dead-letter-routing-key">>, Value) -> {error, "~p is not a valid dead letter routing key", [Value]}; validate_policy0(<<"message-ttl">>, Value) - when is_integer(Value), Value >= 0, Value =< ?MAX_EXPIRY_TIMER -> + when is_integer(Value), Value >= 0 -> ok; validate_policy0(<<"message-ttl">>, Value) -> {error, "~p is not a valid message TTL", [Value]}; validate_policy0(<<"expires">>, Value) - when is_integer(Value), Value >= 1, Value =< ?MAX_EXPIRY_TIMER -> + when is_integer(Value), Value >= 1 -> ok; validate_policy0(<<"expires">>, Value) -> {error, "~p is not a valid queue expiry", [Value]}; diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 0a69fb32..f5d03360 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -46,17 +46,11 @@ name(#exchange{policy = Policy}) -> name0(Policy). name0(undefined) -> none; name0(Policy) -> pget(name, Policy). -set(Q = #amqqueue{name = Name}) -> rabbit_queue_decorator:set( - Q#amqqueue{policy = set0(Name)}); -set(X = #exchange{name = Name}) -> rabbit_exchange_decorator:set( - X#exchange{policy = set0(Name)}). +set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)}; +set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}. set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)). -set(Q = #amqqueue{name = Name}, Ps) -> Q#amqqueue{policy = match(Name, Ps)}; -set(X = #exchange{name = Name}, Ps) -> rabbit_exchange_decorator:set( - X#exchange{policy = match(Name, Ps)}). - get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy); get(Name, #exchange{policy = Policy}) -> get0(Name, Policy); %% Caution - SLOW. @@ -104,12 +98,18 @@ recover0() -> Policies = list(), [rabbit_misc:execute_mnesia_transaction( fun () -> - mnesia:write(rabbit_durable_exchange, set(X, Policies), write) - end) || X <- Xs], + mnesia:write( + rabbit_durable_exchange, + rabbit_exchange_decorator:set( + X#exchange{policy = match(Name, Policies)}), write) + end) || X = #exchange{name = Name} <- Xs], [rabbit_misc:execute_mnesia_transaction( fun () -> - mnesia:write(rabbit_durable_queue, set(Q, Policies), write) - end) || Q <- Qs], + mnesia:write( + rabbit_durable_queue, + rabbit_queue_decorator:set( + Q#amqqueue{policy = match(Name, Policies)}), write) + end) || Q = #amqqueue{name = Name} <- Qs], ok. invalid_file() -> diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl index 6205e2dc..adfe0c7f 100644 --- a/src/rabbit_queue_decorator.erl +++ b/src/rabbit_queue_decorator.erl @@ -2,7 +2,7 @@ -include("rabbit.hrl"). --export([select/1, set/1]). +-export([select/1, set/1, register/2, unregister/1]). %%---------------------------------------------------------------------------- @@ -41,3 +41,24 @@ select(Modules) -> set(Q) -> Q#amqqueue{decorators = [D || D <- list(), D:active_for(Q)]}. list() -> [M || {_, M} <- rabbit_registry:lookup_all(queue_decorator)]. + +register(TypeName, ModuleName) -> + rabbit_registry:register(queue_decorator, TypeName, ModuleName), + [maybe_recover(Q) || Q <- rabbit_amqqueue:list()], + ok. + +unregister(TypeName) -> + rabbit_registry:unregister(queue_decorator, TypeName), + [maybe_recover(Q) || Q <- rabbit_amqqueue:list()], + ok. + +maybe_recover(Q = #amqqueue{name = Name, + decorators = Decs}) -> + #amqqueue{decorators = Decs1} = set(Q), + Old = lists:sort(select(Decs)), + New = lists:sort(select(Decs1)), + case New of + Old -> ok; + _ -> [M:startup(Q) || M <- New -- Old], + rabbit_amqqueue:update_decorators(Name) + end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index ddaf205e..906c4b6e 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -410,7 +410,7 @@ handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) rabbit_event:notify( connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State)], Ref), - State; + rabbit_event:init_stats_timer(State, #v1.stats_timer); handle_other({'$gen_cast', force_event_refresh}, State) -> %% Ignore, we will emit a created event once we start running. State; diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl index da75932d..47c77cd0 100644 --- a/src/rabbit_table.erl +++ b/src/rabbit_table.erl @@ -70,7 +70,8 @@ wait_for_replicated() -> not lists:member({local_content, true}, TabDef)]). wait(TableNames) -> - case mnesia:wait_for_tables(TableNames, 30000) of + {ok, Timeout} = application:get_env(rabbit, mnesia_table_loading_timeout), + case mnesia:wait_for_tables(TableNames, Timeout) of ok -> ok; {timeout, BadTabs} -> diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl index d943b599..3a041508 100644 --- a/src/rabbit_version.erl +++ b/src/rabbit_version.erl @@ -114,8 +114,8 @@ upgrades_required(Scope) -> with_upgrade_graph(Fun, Scope) -> case rabbit_misc:build_acyclic_graph( - fun (Module, Steps) -> vertices(Module, Steps, Scope) end, - fun (Module, Steps) -> edges(Module, Steps, Scope) end, + fun ({_App, Module, Steps}) -> vertices(Module, Steps, Scope) end, + fun ({_App, Module, Steps}) -> edges(Module, Steps, Scope) end, rabbit_misc:all_module_attributes(rabbit_upgrade)) of {ok, G} -> try Fun(G) @@ -161,7 +161,7 @@ heads(G) -> categorise_by_scope(Version) when is_list(Version) -> Categorised = - [{Scope, Name} || {_Module, Attributes} <- + [{Scope, Name} || {_App, _Module, Attributes} <- rabbit_misc:all_module_attributes(rabbit_upgrade), {Name, Scope, _Requires} <- Attributes, lists:member(Name, Version)], |