summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-06-17 12:12:16 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-06-17 12:12:16 +0100
commita851731d708bb6c8b107e6866dba537bcead6afe (patch)
tree57bbda181f9a1bf8d5774882228ffda6dd3299f2
parent91247a346135bc3b7b5cf21e0c285f73ddda28b5 (diff)
parentc00bd9de9db3f6781571bfb8fe416406ec06b9bf (diff)
downloadrabbitmq-server-a851731d708bb6c8b107e6866dba537bcead6afe.tar.gz
stable to default
-rw-r--r--docs/rabbitmq-plugins.1.xml53
-rw-r--r--docs/rabbitmq.config.example21
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--include/rabbit.hrl26
-rw-r--r--packaging/common/README2
-rwxr-xr-xquickcheck3
-rwxr-xr-xscripts/rabbitmq-plugins2
-rwxr-xr-xscripts/rabbitmq-plugins.bat6
-rw-r--r--src/app_utils.erl7
-rw-r--r--src/rabbit.erl102
-rw-r--r--src/rabbit_amqqueue.erl54
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_channel.erl9
-rw-r--r--src/rabbit_channel_interceptor.erl25
-rw-r--r--src/rabbit_dead_letter.erl3
-rw-r--r--src/rabbit_event.erl21
-rw-r--r--src/rabbit_exchange.erl89
-rw-r--r--src/rabbit_exchange_decorator.erl24
-rw-r--r--src/rabbit_mirror_queue_master.erl20
-rw-r--r--src/rabbit_mirror_queue_misc.erl49
-rw-r--r--src/rabbit_mirror_queue_slave.erl5
-rw-r--r--src/rabbit_misc.erl56
-rw-r--r--src/rabbit_plugins.erl119
-rw-r--r--src/rabbit_plugins_main.erl209
-rw-r--r--src/rabbit_policies.erl4
-rw-r--r--src/rabbit_policy.erl24
-rw-r--r--src/rabbit_queue_decorator.erl23
-rw-r--r--src/rabbit_reader.erl2
-rw-r--r--src/rabbit_table.erl3
-rw-r--r--src/rabbit_version.erl6
30 files changed, 670 insertions, 304 deletions
diff --git a/docs/rabbitmq-plugins.1.xml b/docs/rabbitmq-plugins.1.xml
index 8ecb4fc8..e891969f 100644
--- a/docs/rabbitmq-plugins.1.xml
+++ b/docs/rabbitmq-plugins.1.xml
@@ -40,6 +40,7 @@
<refsynopsisdiv>
<cmdsynopsis>
<command>rabbitmq-plugins</command>
+ <arg choice="opt">-n <replaceable>node</replaceable></arg>
<arg choice="req"><replaceable>command</replaceable></arg>
<arg choice="opt" rep="repeat"><replaceable>command options</replaceable></arg>
</cmdsynopsis>
@@ -97,12 +98,14 @@
</variablelist>
<para>
Lists all plugins, their versions, dependencies and
- descriptions. Each plugin is prefixed with a status
- indicator - [ ] to indicate that the plugin is not
- enabled, [E] to indicate that it is explicitly enabled,
- [e] to indicate that it is implicitly enabled, and [!] to
- indicate that it is enabled but missing and thus not
- operational.
+ descriptions. Each plugin is prefixed with two status
+ indicator characters inside [ ]. The first indicator can
+ be " " to indicate that the plugin is not enabled, "E" to
+ indicate that it is explicitly enabled, "e" to indicate
+ that it is implicitly enabled, or "!" to indicate that it
+ is enabled but missing and thus not operational. The
+ second indicator can be " " to show that the plugin is not
+ running, or "*" to show that it is.
</para>
<para>
If the optional pattern is given, only plugins whose
@@ -130,17 +133,31 @@
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>enable</command> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>enable</command> <arg choice="opt">--offline</arg> <arg choice="opt">--online</arg> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
+ <term>--offline</term>
+ <listitem><para>Just modify the enabled plugins file.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>--online</term>
+ <listitem><para>Treat failure to connect to the running broker as fatal.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>plugin</term>
<listitem><para>One or more plugins to enable.</para></listitem>
</varlistentry>
</variablelist>
<para>
Enables the specified plugins and all their
- dependencies.
+ dependencies. This will update the enabled plugins file
+ and then attempt to connect to the broker and ensure it is
+ running all enabled plugins. By default if it is not
+ possible to connect to the running broker (for example if
+ it is stopped) then a warning is displayed. Specify
+ <command>--online</command> or
+ <command>--offline</command> to change this.
</para>
<para role="example-prefix">For example:</para>
@@ -154,17 +171,31 @@
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>disable</command> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>disable</command> <arg choice="opt">--offline</arg> <arg choice="opt">--online</arg> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
+ <term>--offline</term>
+ <listitem><para>Just modify the enabled plugins file.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>--online</term>
+ <listitem><para>Treat failure to connect to the running broker as fatal.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>plugin</term>
<listitem><para>One or more plugins to disable.</para></listitem>
</varlistentry>
</variablelist>
<para>
- Disables the specified plugins and all plugins that
- depend on them.
+ Disables the specified plugins and all their
+ dependencies. This will update the enabled plugins file
+ and then attempt to connect to the broker and ensure it is
+ running all enabled plugins. By default if it is not
+ possible to connect to the running broker (for example if
+ it is stopped) then a warning is displayed. Specify
+ <command>--online</command> or
+ <command>--offline</command> to change this.
</para>
<para role="example-prefix">For example:</para>
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example
index b0e13b1b..4fad1542 100644
--- a/docs/rabbitmq.config.example
+++ b/docs/rabbitmq.config.example
@@ -213,7 +213,12 @@
%% Explicitly enable/disable hipe compilation.
%%
- %% {hipe_compile, true}
+ %% {hipe_compile, true},
+
+ %% Timeout used when waiting for Mnesia tables in a cluster to
+ %% become available.
+ %%
+ %% {mnesia_table_loading_timeout, 30000}
]},
@@ -257,9 +262,13 @@
%% {certfile, "/path/to/cert.pem"},
%% {keyfile, "/path/to/key.pem"}]}]},
+ %% One of 'basic', 'detailed' or 'none'. See
+ %% http://www.rabbitmq.com/management.html#fine-stats for more details.
+ %% {rates_mode, basic},
+
%% Configure how long aggregated data (such as message rates and queue
%% lengths) is retained. Please read the plugin's documentation in
- %% https://www.rabbitmq.com/management.html#configuration for more
+ %% http://www.rabbitmq.com/management.html#configuration for more
%% details.
%%
%% {sample_retention_policies,
@@ -268,14 +277,6 @@
%% {detailed, [{10, 5}]}]}
]},
- {rabbitmq_management_agent,
- [%% Misc/Advanced Options
- %%
- %% NB: Change these only if you understand what you are doing!
- %%
- %% {force_fine_statistics, true}
- ]},
-
%% ----------------------------------------------------------------------------
%% RabbitMQ Shovel Plugin
%%
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 7360208a..3647c04a 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -39,6 +39,7 @@
{server_properties, []},
{collect_statistics, none},
{collect_statistics_interval, 5000},
+ {mnesia_table_loading_timeout, 30000},
{auth_mechanisms, ['PLAIN', 'AMQPLAIN']},
{auth_backends, [rabbit_auth_backend_internal]},
{delegate_count, 16},
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 5ac3197e..c1386803 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -39,13 +39,24 @@
-record(resource, {virtual_host, kind, name}).
--record(exchange, {name, type, durable, auto_delete, internal, arguments,
- scratches, policy, decorators}).
--record(exchange_serial, {name, next}).
+%% fields described as 'transient' here are cleared when writing to
+%% rabbit_durable_<thing>
+-record(exchange, {
+ name, type, durable, auto_delete, internal, arguments, %% immutable
+ scratches, %% durable, explicitly updated via update_scratch/3
+ policy, %% durable, implicitly updated when policy changes
+ decorators}). %% transient, recalculated in store/1 (i.e. recovery)
+
+-record(amqqueue, {
+ name, durable, auto_delete, exclusive_owner = none, %% immutable
+ arguments, %% immutable
+ pid, %% durable (just so we know home node)
+ slave_pids, sync_slave_pids, %% transient
+ policy, %% durable, implicit update as above
+ gm_pids, %% transient
+ decorators}). %% transient, recalculated as above
--record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
- arguments, pid, slave_pids, sync_slave_pids, policy,
- gm_pids, decorators}).
+-record(exchange_serial, {name, next}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
@@ -105,9 +116,6 @@
-define(DESIRED_HIBERNATE, 10000).
-define(CREDIT_DISC_BOUND, {2000, 500}).
-%% This is dictated by `erlang:send_after' on which we depend to implement TTL.
--define(MAX_EXPIRY_TIMER, 4294967295).
-
-define(INVALID_HEADERS_KEY, <<"x-invalid-headers">>).
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
-define(DELETED_HEADER, <<"BCC">>).
diff --git a/packaging/common/README b/packaging/common/README
index 0a29ee27..35a1523a 100644
--- a/packaging/common/README
+++ b/packaging/common/README
@@ -17,4 +17,4 @@ run as the superuser.
An example configuration file is provided in the same directory as
this README. Copy it to /etc/rabbitmq/rabbitmq.config to use it. The
RabbitMQ server must be restarted after changing the configuration
-file or enabling or disabling plugins.
+file.
diff --git a/quickcheck b/quickcheck
index b5382d75..40f13091 100755
--- a/quickcheck
+++ b/quickcheck
@@ -17,7 +17,8 @@ main([NodeStr, ModStr, TrialsStr]) ->
case rpc:call(Node, proper, module,
[Mod] ++ [[{numtests, Trials}, {constraint_tries, 200}]]) of
[] -> ok;
- _ -> quit(1)
+ R -> io:format("~p.~n", [R]),
+ quit(1)
end;
{badrpc, Reason} ->
io:format("Could not contact node ~p: ~p.~n", [Node, Reason]),
diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins
index bd7d0b6a..36910eff 100755
--- a/scripts/rabbitmq-plugins
+++ b/scripts/rabbitmq-plugins
@@ -21,6 +21,7 @@
##--- Set environment vars RABBITMQ_<var_name> to defaults if not set
+[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE}
[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR}
@@ -35,4 +36,5 @@ exec ${ERL_DIR}erl \
-s rabbit_plugins_main \
-enabled_plugins_file "$RABBITMQ_ENABLED_PLUGINS_FILE" \
-plugins_dist_dir "$RABBITMQ_PLUGINS_DIR" \
+ -nodename $RABBITMQ_NODENAME \
-extra "$@"
diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat
index a535ebad..61e39e38 100755
--- a/scripts/rabbitmq-plugins.bat
+++ b/scripts/rabbitmq-plugins.bat
@@ -31,6 +31,10 @@ if "!RABBITMQ_BASE!"=="" (
set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
)
+if "!RABBITMQ_NODENAME!"=="" (
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
+)
+
if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
@@ -51,7 +55,7 @@ if "!RABBITMQ_PLUGINS_DIR!"=="" (
set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM!!TIME:~9! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM!!TIME:~9! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
endlocal
diff --git a/src/app_utils.erl b/src/app_utils.erl
index 0479ce66..e321888d 100644
--- a/src/app_utils.erl
+++ b/src/app_utils.erl
@@ -17,7 +17,7 @@
-export([load_applications/1, start_applications/1, start_applications/2,
stop_applications/1, stop_applications/2, app_dependency_order/2,
- wait_for_applications/1]).
+ wait_for_applications/1, app_dependencies/1]).
-ifdef(use_specs).
@@ -30,6 +30,7 @@
-spec stop_applications([atom()], error_handler()) -> 'ok'.
-spec wait_for_applications([atom()]) -> 'ok'.
-spec app_dependency_order([atom()], boolean()) -> [digraph:vertex()].
+-spec app_dependencies(atom()) -> [atom()].
-endif.
@@ -74,8 +75,8 @@ wait_for_applications(Apps) ->
app_dependency_order(RootApps, StripUnreachable) ->
{ok, G} = rabbit_misc:build_acyclic_graph(
- fun (App, _Deps) -> [{App, App}] end,
- fun (App, Deps) -> [{Dep, App} || Dep <- Deps] end,
+ fun ({App, _Deps}) -> [{App, App}] end,
+ fun ({App, Deps}) -> [{Dep, App} || Dep <- Deps] end,
[{App, app_dependencies(App)} ||
{App, _Desc, _Vsn} <- application:loaded_applications()]),
try
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 29e38c1f..4901ea17 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -22,9 +22,8 @@
stop_and_halt/0, await_startup/0, status/0, is_running/0,
is_running/1, environment/0, rotate_logs/1, force_event_refresh/1,
start_fhc/0]).
-
-export([start/2, stop/1]).
-
+-export([start_apps/1, stop_apps/1]).
-export([log_location/1]). %% for testing
%%---------------------------------------------------------------------------
@@ -211,6 +210,7 @@
%% this really should be an abstract type
-type(log_location() :: 'tty' | 'undefined' | file:filename()).
-type(param() :: atom()).
+-type(app_name() :: atom()).
-spec(start/0 :: () -> 'ok').
-spec(boot/0 :: () -> 'ok').
@@ -242,6 +242,8 @@
-spec(maybe_insert_default_data/0 :: () -> 'ok').
-spec(boot_delegate/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
+-spec(start_apps/1 :: ([app_name()]) -> 'ok').
+-spec(stop_apps/1 :: ([app_name()]) -> 'ok').
-endif.
@@ -312,9 +314,7 @@ start() ->
ok = ensure_working_log_handlers(),
rabbit_node_monitor:prepare_cluster_status_files(),
rabbit_mnesia:check_cluster_consistency(),
- ok = app_utils:start_applications(
- app_startup_order(), fun handle_app_error/2),
- ok = log_broker_started(rabbit_plugins:active())
+ broker_start()
end).
boot() ->
@@ -329,21 +329,14 @@ boot() ->
%% the upgrade, since if we are a secondary node the
%% primary node will have forgotten us
rabbit_mnesia:check_cluster_consistency(),
- Plugins = rabbit_plugins:setup(),
- ToBeLoaded = Plugins ++ ?APPS,
- ok = app_utils:load_applications(ToBeLoaded),
- StartupApps = app_utils:app_dependency_order(ToBeLoaded,
- false),
- ok = app_utils:start_applications(
- StartupApps, fun handle_app_error/2),
- ok = log_broker_started(Plugins)
+ broker_start()
end).
-handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) ->
- throw({could_not_start, App, Reason});
-
-handle_app_error(App, Reason) ->
- throw({could_not_start, App, Reason}).
+broker_start() ->
+ Plugins = rabbit_plugins:setup(),
+ ToBeLoaded = Plugins ++ ?APPS,
+ start_apps(ToBeLoaded),
+ ok = log_broker_started(rabbit_plugins:active()).
start_it(StartFun) ->
Marker = spawn_link(fun() -> receive stop -> ok end end),
@@ -374,7 +367,7 @@ stop() ->
_ -> await_startup()
end,
rabbit_log:info("Stopping RabbitMQ~n"),
- ok = app_utils:stop_applications(app_shutdown_order()).
+ stop_apps(app_shutdown_order()).
stop_and_halt() ->
try
@@ -385,6 +378,36 @@ stop_and_halt() ->
end,
ok.
+start_apps(Apps) ->
+ app_utils:load_applications(Apps),
+ OrderedApps = app_utils:app_dependency_order(Apps, false),
+ case lists:member(rabbit, Apps) of
+ false -> run_boot_steps(Apps); %% plugin activation
+ true -> ok %% will run during start of rabbit app
+ end,
+ ok = app_utils:start_applications(OrderedApps,
+ handle_app_error(could_not_start)).
+
+stop_apps(Apps) ->
+ ok = app_utils:stop_applications(
+ Apps, handle_app_error(error_during_shutdown)),
+ case lists:member(rabbit, Apps) of
+ false -> run_cleanup_steps(Apps); %% plugin deactivation
+ true -> ok %% it's all going anyway
+ end,
+ ok.
+
+handle_app_error(Term) ->
+ fun(App, {bad_return, {_MFA, {'EXIT', {ExitReason, _}}}}) ->
+ throw({Term, App, ExitReason});
+ (App, Reason) ->
+ throw({Term, App, Reason})
+ end.
+
+run_cleanup_steps(Apps) ->
+ [run_step(Name, Attrs, cleanup) || {_, Name, Attrs} <- find_steps(Apps)],
+ ok.
+
await_startup() ->
app_utils:wait_for_applications(app_startup_order()).
@@ -468,7 +491,7 @@ start(normal, []) ->
true = register(rabbit, self()),
print_banner(),
log_banner(),
- [ok = run_boot_step(Step) || Step <- boot_steps()],
+ run_boot_steps(),
{ok, SupPid};
Error ->
Error
@@ -496,29 +519,40 @@ app_shutdown_order() ->
%%---------------------------------------------------------------------------
%% boot step logic
-run_boot_step({_StepName, Attributes}) ->
- case [MFA || {mfa, MFA} <- Attributes] of
+run_boot_steps() ->
+ run_boot_steps([App || {App, _, _} <- application:loaded_applications()]).
+
+run_boot_steps(Apps) ->
+ [ok = run_step(Step, Attrs, mfa) || {_, Step, Attrs} <- find_steps(Apps)],
+ ok.
+
+find_steps(Apps) ->
+ All = sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)),
+ [Step || {App, _, _} = Step <- All, lists:member(App, Apps)].
+
+run_step(StepName, Attributes, AttributeName) ->
+ case [MFA || {Key, MFA} <- Attributes,
+ Key =:= AttributeName] of
[] ->
ok;
MFAs ->
[try
apply(M,F,A)
of
- ok -> ok;
- {error, Reason} -> boot_error(Reason, not_available)
+ ok -> ok;
+ {error, Reason} -> boot_error({boot_step, StepName, Reason},
+ not_available)
catch
- _:Reason -> boot_error(Reason, erlang:get_stacktrace())
+ _:Reason -> boot_error({boot_step, StepName, Reason},
+ erlang:get_stacktrace())
end || {M,F,A} <- MFAs],
ok
end.
-boot_steps() ->
- sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)).
-
-vertices(_Module, Steps) ->
- [{StepName, {StepName, Atts}} || {StepName, Atts} <- Steps].
+vertices({AppName, _Module, Steps}) ->
+ [{StepName, {AppName, StepName, Atts}} || {StepName, Atts} <- Steps].
-edges(_Module, Steps) ->
+edges({_AppName, _Module, Steps}) ->
[case Key of
requires -> {StepName, OtherStep};
enables -> {OtherStep, StepName}
@@ -527,7 +561,7 @@ edges(_Module, Steps) ->
Key =:= requires orelse Key =:= enables].
sort_boot_steps(UnsortedSteps) ->
- case rabbit_misc:build_acyclic_graph(fun vertices/2, fun edges/2,
+ case rabbit_misc:build_acyclic_graph(fun vertices/1, fun edges/1,
UnsortedSteps) of
{ok, G} ->
%% Use topological sort to find a consistent ordering (if
@@ -541,8 +575,8 @@ sort_boot_steps(UnsortedSteps) ->
digraph:delete(G),
%% Check that all mentioned {M,F,A} triples are exported.
case [{StepName, {M,F,A}} ||
- {StepName, Attributes} <- SortedSteps,
- {mfa, {M,F,A}} <- Attributes,
+ {_App, StepName, Attributes} <- SortedSteps,
+ {mfa, {M,F,A}} <- Attributes,
not erlang:function_exported(M, F, length(A))] of
[] -> SortedSteps;
MissingFunctions -> basic_boot_error(
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 1aba7ecb..8a1d162a 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -18,7 +18,7 @@
-export([recover/0, stop/0, start/1, declare/5, declare/6,
delete_immediately/1, delete/3, purge/1, forget_all_durable/1]).
--export([pseudo_queue/2]).
+-export([pseudo_queue/2, immutable/1]).
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
@@ -30,7 +30,7 @@
-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
-export([on_node_down/1]).
--export([update/2, store_queue/1, policy_changed/2]).
+-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]).
-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1,
cancel_sync_mirrors/1]).
@@ -176,7 +176,9 @@
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()).
+-spec(immutable/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()).
-spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(update_decorators/1 :: (name()) -> 'ok').
-spec(policy_changed/2 ::
(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
-spec(start_mirroring/1 :: (pid()) -> 'ok').
@@ -254,15 +256,16 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
%% effect) this might not be possible to satisfy.
declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
ok = check_declare_arguments(QueueName, Args),
- Q = rabbit_policy:set(#amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
- exclusive_owner = Owner,
- pid = none,
- slave_pids = [],
- sync_slave_pids = [],
- gm_pids = []}),
+ Q = rabbit_queue_decorator:set(
+ rabbit_policy:set(#amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
+ exclusive_owner = Owner,
+ pid = none,
+ slave_pids = [],
+ sync_slave_pids = [],
+ gm_pids = []})),
Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node),
gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity).
@@ -308,12 +311,24 @@ store_queue(Q = #amqqueue{durable = true}) ->
ok = mnesia:write(rabbit_durable_queue,
Q#amqqueue{slave_pids = [],
sync_slave_pids = [],
- gm_pids = []}, write),
- ok = mnesia:write(rabbit_queue, Q, write),
- ok;
+ gm_pids = [],
+ decorators = undefined}, write),
+ store_queue_ram(Q);
store_queue(Q = #amqqueue{durable = false}) ->
- ok = mnesia:write(rabbit_queue, Q, write),
- ok.
+ store_queue_ram(Q).
+
+store_queue_ram(Q) ->
+ ok = mnesia:write(rabbit_queue, rabbit_queue_decorator:set(Q), write).
+
+update_decorators(Name) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ case mnesia:wread({rabbit_queue, Name}) of
+ [Q] -> store_queue_ram(Q),
+ ok;
+ [] -> ok
+ end
+ end).
policy_changed(Q1 = #amqqueue{decorators = Decorators1},
Q2 = #amqqueue{decorators = Decorators2}) ->
@@ -709,6 +724,13 @@ pseudo_queue(QueueName, Pid) ->
pid = Pid,
slave_pids = []}.
+immutable(Q) -> Q#amqqueue{pid = none,
+ slave_pids = none,
+ sync_slave_pids = none,
+ gm_pids = none,
+ policy = none,
+ decorators = none}.
+
deliver([], _Delivery, _Flow) ->
%% /dev/null optimisation
[];
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9b785303..97206df3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -385,12 +385,12 @@ ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined,
V when V > 0 -> V + 999; %% always fire later
_ -> 0
end) div 1000,
- TRef = erlang:send_after(After, self(), {drop_expired, Version}),
+ TRef = rabbit_misc:send_after(After, self(), {drop_expired, Version}),
State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry};
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
ttl_timer_expiry = TExpiry})
when Expiry + 1000 < TExpiry ->
- case erlang:cancel_timer(TRef) of
+ case rabbit_misc:cancel_timer(TRef) of
false -> State;
_ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined})
end;
@@ -1165,7 +1165,7 @@ handle_cast({force_event_refresh, Ref},
emit_consumer_created(
Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref)
end,
- noreply(State);
+ noreply(rabbit_event:init_stats_timer(State, #q.stats_timer));
handle_cast(notify_decorators, State) ->
notify_decorators(State),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 74f9cacf..bb626554 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -341,7 +341,7 @@ handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) ->
handle_cast({force_event_refresh, Ref}, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State),
Ref),
- noreply(State);
+ noreply(rabbit_event:init_stats_timer(State, #ch.stats_timer));
handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) ->
%% NB: don't call noreply/1 since we don't want to send confirms.
@@ -992,7 +992,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
QueueName,
fun (Q) -> ok = rabbit_amqqueue:assert_equivalence(
Q, Durable, AutoDelete, Args, Owner),
- rabbit_amqqueue:stat(Q)
+ maybe_stat(NoWait, Q)
end) of
{ok, MessageCount, ConsumerCount} ->
return_queue_declare_ok(QueueName, NoWait, MessageCount,
@@ -1048,7 +1048,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
{{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
rabbit_amqqueue:with_or_die(
- QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end),
+ QueueName, fun (Q) -> {maybe_stat(NoWait, Q), Q} end),
ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
State);
@@ -1204,6 +1204,9 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
E
end.
+maybe_stat(false, Q) -> rabbit_amqqueue:stat(Q);
+maybe_stat(true, _Q) -> {ok, 0, 0}.
+
consumer_monitor(ConsumerTag,
State = #ch{consumer_mapping = ConsumerMapping,
queue_monitors = QMons,
diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl
index 81c17fbf..db9349ac 100644
--- a/src/rabbit_channel_interceptor.erl
+++ b/src/rabbit_channel_interceptor.erl
@@ -33,7 +33,7 @@
-callback description() -> [proplists:property()].
-callback intercept(original_method(), rabbit_types:vhost()) ->
- rabbit_types:ok_or_error2(processed_method(), any()).
+ processed_method() | rabbit_misc:channel_or_connection_exit().
%% Whether the interceptor wishes to intercept the amqp method
-callback applies_to(intercept_method()) -> boolean().
@@ -62,20 +62,15 @@ intercept_method(M, VHost) ->
intercept_method(M, _VHost, []) ->
M;
intercept_method(M, VHost, [I]) ->
- case I:intercept(M, VHost) of
- {ok, M2} ->
- case validate_method(M, M2) of
- true ->
- M2;
- _ ->
- internal_error("Interceptor: ~p expected "
- "to return method: ~p but returned: ~p",
- [I, rabbit_misc:method_record_type(M),
- rabbit_misc:method_record_type(M2)])
- end;
- {error, Reason} ->
- internal_error("Interceptor: ~p failed with reason: ~p",
- [I, Reason])
+ M2 = I:intercept(M, VHost),
+ case validate_method(M, M2) of
+ true ->
+ M2;
+ _ ->
+ internal_error("Interceptor: ~p expected "
+ "to return method: ~p but returned: ~p",
+ [I, rabbit_misc:method_record_type(M),
+ rabbit_misc:method_record_type(M2)])
end;
intercept_method(M, _VHost, Is) ->
internal_error("More than one interceptor for method: ~p -- ~p",
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl
index ec32e687..728bc431 100644
--- a/src/rabbit_dead_letter.erl
+++ b/src/rabbit_dead_letter.erl
@@ -129,6 +129,9 @@ is_cycle(Queue, Deaths) ->
{longstr, <<"rejected">>} =/=
rabbit_misc:table_lookup(D, <<"reason">>);
(_) ->
+ %% There was something we didn't expect, therefore
+ %% a client must have put it there, therefore the
+ %% cycle was not "fully automatic".
false
end, Cycle ++ [H])
end.
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index b867223b..a33103fd 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -23,6 +23,7 @@
ensure_stats_timer/3, stop_stats_timer/2, reset_stats_timer/2]).
-export([stats_level/2, if_enabled/3]).
-export([notify/2, notify/3, notify_if/3]).
+-export([sync_notify/2, sync_notify/3]).
%%----------------------------------------------------------------------------
@@ -61,6 +62,9 @@
-spec(notify/2 :: (event_type(), event_props()) -> 'ok').
-spec(notify/3 :: (event_type(), event_props(), reference() | 'none') -> 'ok').
-spec(notify_if/3 :: (boolean(), event_type(), event_props()) -> 'ok').
+-spec(sync_notify/2 :: (event_type(), event_props()) -> 'ok').
+-spec(sync_notify/3 :: (event_type(), event_props(),
+ reference() | 'none') -> 'ok').
-endif.
@@ -145,7 +149,16 @@ notify_if(false, _Type, _Props) -> ok.
notify(Type, Props) -> notify(Type, Props, none).
notify(Type, Props, Ref) ->
- gen_event:notify(?MODULE, #event{type = Type,
- props = Props,
- reference = Ref,
- timestamp = os:timestamp()}).
+ gen_event:notify(?MODULE, event_cons(Type, Props, Ref)).
+
+sync_notify(Type, Props) -> sync_notify(Type, Props, none).
+
+sync_notify(Type, Props, Ref) ->
+ gen_event:sync_notify(?MODULE, event_cons(Type, Props, Ref)).
+
+event_cons(Type, Props, Ref) ->
+ #event{type = Type,
+ props = Props,
+ reference = Ref,
+ timestamp = os:timestamp()}.
+
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 4d4a2a58..a1772f0a 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -20,7 +20,8 @@
-export([recover/0, policy_changed/2, callback/4, declare/6,
assert_equivalence/6, assert_args_equivalence/2, check_type/1,
- lookup/1, lookup_or_die/1, list/1, lookup_scratch/2, update_scratch/3,
+ lookup/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2,
+ update_scratch/3, update_decorators/1, immutable/1,
info_keys/0, info/1, info/2, info_all/1, info_all/2,
route/2, delete/2, validate_binding/2]).
%% these must be run inside a mnesia tx
@@ -61,6 +62,7 @@
-spec(lookup_or_die/1 ::
(name()) -> rabbit_types:exchange() |
rabbit_types:channel_exit()).
+-spec(list/0 :: () -> [rabbit_types:exchange()]).
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]).
-spec(lookup_scratch/2 :: (name(), atom()) ->
rabbit_types:ok(term()) |
@@ -70,6 +72,8 @@
(name(),
fun((rabbit_types:exchange()) -> rabbit_types:exchange()))
-> not_found | rabbit_types:exchange()).
+-spec(update_decorators/1 :: (name()) -> 'ok').
+-spec(immutable/1 :: (rabbit_types:exchange()) -> rabbit_types:exchange()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()).
-spec(info/2 ::
@@ -106,24 +110,15 @@ recover() ->
mnesia:read({rabbit_exchange, XName}) =:= []
end,
fun (X, Tx) ->
- case Tx of
- true -> store(X);
- false -> ok
- end,
- callback(X, create, map_create_tx(Tx), [X])
+ X1 = case Tx of
+ true -> store_ram(X);
+ false -> rabbit_exchange_decorator:set(X)
+ end,
+ callback(X1, create, map_create_tx(Tx), [X1])
end,
rabbit_durable_exchange),
- report_missing_decorators(Xs),
[XName || #exchange{name = XName} <- Xs].
-report_missing_decorators(Xs) ->
- Mods = lists:usort(lists:append([rabbit_exchange_decorator:select(raw, D) ||
- #exchange{decorators = D} <- Xs])),
- case [M || M <- Mods, code:which(M) =:= non_existing] of
- [] -> ok;
- M -> rabbit_log:warning("Missing exchange decorators: ~p~n", [M])
- end.
-
callback(X = #exchange{type = XType,
decorators = Decorators}, Fun, Serial0, Args) ->
Serial = if is_function(Serial0) -> Serial0;
@@ -158,12 +153,13 @@ serial(#exchange{name = XName} = X) ->
end.
declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
- X = rabbit_policy:set(#exchange{name = XName,
- type = Type,
- durable = Durable,
- auto_delete = AutoDelete,
- internal = Internal,
- arguments = Args}),
+ X = rabbit_exchange_decorator:set(
+ rabbit_policy:set(#exchange{name = XName,
+ type = Type,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ internal = Internal,
+ arguments = Args})),
XT = type_to_module(Type),
%% We want to upset things if it isn't ok
ok = XT:validate(X),
@@ -171,13 +167,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
fun () ->
case mnesia:wread({rabbit_exchange, XName}) of
[] ->
- store(X),
- ok = case Durable of
- true -> mnesia:write(rabbit_durable_exchange,
- X, write);
- false -> ok
- end,
- {new, X};
+ {new, store(X)};
[ExistingX] ->
{existing, ExistingX}
end
@@ -195,7 +185,19 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
map_create_tx(true) -> transaction;
map_create_tx(false) -> none.
-store(X) -> ok = mnesia:write(rabbit_exchange, X, write).
+
+store(X = #exchange{durable = true}) ->
+ mnesia:write(rabbit_durable_exchange, X#exchange{decorators = undefined},
+ write),
+ store_ram(X);
+store(X = #exchange{durable = false}) ->
+ store_ram(X).
+
+store_ram(X) ->
+ X1 = rabbit_exchange_decorator:set(X),
+ ok = mnesia:write(rabbit_exchange, rabbit_exchange_decorator:set(X1),
+ write),
+ X1.
%% Used with binaries sent over the wire; the type may not exist.
check_type(TypeBin) ->
@@ -243,6 +245,8 @@ lookup_or_die(Name) ->
{error, not_found} -> rabbit_misc:not_found(Name)
end.
+list() -> mnesia:dirty_match_object(rabbit_exchange, #exchange{_ = '_'}).
+
%% Not dirty_match_object since that would not be transactional when used in a
%% tx context
list(VHostPath) ->
@@ -287,20 +291,27 @@ update_scratch(Name, App, Fun) ->
ok
end).
+update_decorators(Name) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ case mnesia:wread({rabbit_exchange, Name}) of
+ [X] -> store_ram(X),
+ ok;
+ [] -> ok
+ end
+ end).
+
update(Name, Fun) ->
case mnesia:wread({rabbit_exchange, Name}) of
- [X = #exchange{durable = Durable}] ->
- X1 = Fun(X),
- ok = mnesia:write(rabbit_exchange, X1, write),
- case Durable of
- true -> ok = mnesia:write(rabbit_durable_exchange, X1, write);
- _ -> ok
- end,
- X1;
- [] ->
- not_found
+ [X] -> X1 = Fun(X),
+ store(X1);
+ [] -> not_found
end.
+immutable(X) -> X#exchange{scratches = none,
+ policy = none,
+ decorators = none}.
+
info_keys() -> ?INFO_KEYS.
map(VHostPath, F) ->
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index 2f056b1b..900f9c32 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([select/2, set/1]).
+-export([select/2, set/1, register/2, unregister/1]).
%% This is like an exchange type except that:
%%
@@ -104,3 +104,25 @@ list() -> [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)].
cons_if_eq(Select, Select, Item, List) -> [Item | List];
cons_if_eq(_Select, _Other, _Item, List) -> List.
+
+register(TypeName, ModuleName) ->
+ rabbit_registry:register(exchange_decorator, TypeName, ModuleName),
+ [maybe_recover(X) || X <- rabbit_exchange:list()],
+ ok.
+
+unregister(TypeName) ->
+ rabbit_registry:unregister(exchange_decorator, TypeName),
+ [maybe_recover(X) || X <- rabbit_exchange:list()],
+ ok.
+
+maybe_recover(X = #exchange{name = Name,
+ decorators = Decs}) ->
+ #exchange{decorators = Decs1} = set(X),
+ Old = lists:sort(select(all, Decs)),
+ New = lists:sort(select(all, Decs1)),
+ case New of
+ Old -> ok;
+ _ -> %% TODO create a tx here for non-federation decorators
+ [M:create(none, X) || M <- New -- Old],
+ rabbit_exchange:update_decorators(Name)
+ end.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 2b16b911..24b22d4c 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -170,10 +170,24 @@ terminate({shutdown, dropped} = Reason,
State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)};
terminate(Reason,
- State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ State = #state { name = QName,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
%% shouldn't be deleted. Most likely safe shutdown of this
- %% node. Thus just let some other slave take over.
+ %% node.
+ {ok, Q = #amqqueue{sync_slave_pids = SSPids}} =
+ rabbit_amqqueue:lookup(QName),
+ case SSPids =:= [] andalso
+ rabbit_policy:get(<<"ha-promote-on-shutdown">>, Q) =/= <<"always">> of
+ true -> %% Remove the whole queue to avoid data loss
+ rabbit_mirror_queue_misc:log_warning(
+ QName, "Stopping all nodes on master shutdown since no "
+ "synchronised slave is available~n", []),
+ stop_all_slaves(Reason, State);
+ false -> %% Just let some other slave take over.
+ ok
+ end,
State #state { backing_queue_state = BQ:terminate(Reason, BQS) }.
delete_and_terminate(Reason, State = #state { backing_queue = BQ,
@@ -181,7 +195,7 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ,
stop_all_slaves(Reason, State),
State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}.
-stop_all_slaves(Reason, #state{name = QName, gm = GM}) ->
+stop_all_slaves(Reason, #state{name = QName, gm = GM}) ->
{ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
MRefs = [erlang:monitor(process, Pid) || Pid <- [GM | SPids]],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index b0f092a9..7aec1ac8 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -29,16 +29,19 @@
-include("rabbit.hrl").
--rabbit_boot_step({?MODULE,
- [{description, "HA policy validation"},
- {mfa, {rabbit_registry, register,
- [policy_validator, <<"ha-mode">>, ?MODULE]}},
- {mfa, {rabbit_registry, register,
- [policy_validator, <<"ha-params">>, ?MODULE]}},
- {mfa, {rabbit_registry, register,
- [policy_validator, <<"ha-sync-mode">>, ?MODULE]}},
- {requires, rabbit_registry},
- {enables, recovery}]}).
+-rabbit_boot_step(
+ {?MODULE,
+ [{description, "HA policy validation"},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-mode">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-params">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-sync-mode">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
%%----------------------------------------------------------------------------
@@ -374,16 +377,21 @@ validate_policy(KeyList) ->
Mode = proplists:get_value(<<"ha-mode">>, KeyList, none),
Params = proplists:get_value(<<"ha-params">>, KeyList, none),
SyncMode = proplists:get_value(<<"ha-sync-mode">>, KeyList, none),
- case {Mode, Params, SyncMode} of
- {none, none, none} ->
+ PromoteOnShutdown = proplists:get_value(
+ <<"ha-promote-on-shutdown">>, KeyList, none),
+ case {Mode, Params, SyncMode, PromoteOnShutdown} of
+ {none, none, none, none} ->
ok;
- {none, _, _} ->
- {error, "ha-mode must be specified to specify ha-params or "
- "ha-sync-mode", []};
+ {none, _, _, _} ->
+ {error, "ha-mode must be specified to specify ha-params, "
+ "ha-sync-mode or ha-promote-on-shutdown", []};
_ ->
case module(Mode) of
{ok, M} -> case M:validate_policy(Params) of
- ok -> validate_sync_mode(SyncMode);
+ ok -> case validate_sync_mode(SyncMode) of
+ ok -> validate_pos(PromoteOnShutdown);
+ E -> E
+ end;
E -> E
end;
_ -> {error, "~p is not a valid ha-mode value", [Mode]}
@@ -398,3 +406,12 @@ validate_sync_mode(SyncMode) ->
Mode -> {error, "ha-sync-mode must be \"manual\" "
"or \"automatic\", got ~p", [Mode]}
end.
+
+validate_pos(PromoteOnShutdown) ->
+ case PromoteOnShutdown of
+ <<"always">> -> ok;
+ <<"when-synced">> -> ok;
+ none -> ok;
+ Mode -> {error, "ha-promote-on-shutdown must be "
+ "\"always\" or \"when-synced\", got ~p", [Mode]}
+ end.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 11d6a79c..cc06ae44 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -653,8 +653,9 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) ->
timed -> {ensure_sync_timer(State1), 0 }
end.
-backing_queue_timeout(State = #state { backing_queue = BQ }) ->
- run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State).
+backing_queue_timeout(State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ State#state{backing_queue_state = BQ:timeout(BQS)}.
ensure_sync_timer(State) ->
rabbit_misc:ensure_timer(State, #state.sync_timer_ref,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 58e93a3f..6f353da5 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -67,7 +67,7 @@
-export([check_expiry/1]).
-export([base64url/1]).
-export([interval_operation/4]).
--export([ensure_timer/4, stop_timer/2]).
+-export([ensure_timer/4, stop_timer/2, send_after/3, cancel_timer/1]).
-export([get_parent/0]).
-export([store_proc_name/1, store_proc_name/2]).
-export([moving_average/4]).
@@ -81,7 +81,7 @@
-ifdef(use_specs).
--export_type([resource_name/0, thunk/1]).
+-export_type([resource_name/0, thunk/1, channel_or_connection_exit/0]).
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).
@@ -94,6 +94,7 @@
fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])).
-type(graph_edge_fun() ::
fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])).
+-type(tref() :: {'erlang', reference()} | {timer, timer:tref()}).
-spec(method_record_type/1 :: (rabbit_framing:amqp_method_record())
-> rabbit_framing:amqp_method_name()).
@@ -209,7 +210,8 @@
[string()])
-> {'ok', {atom(), [{string(), string()}], [string()]}} |
'no_command').
--spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]).
+-spec(all_module_attributes/1 ::
+ (atom()) -> [{atom(), atom(), [term()]}]).
-spec(build_acyclic_graph/3 ::
(graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}])
-> rabbit_types:ok_or_error2(digraph(),
@@ -245,6 +247,8 @@
-> {any(), non_neg_integer()}).
-spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A).
-spec(stop_timer/2 :: (A, non_neg_integer()) -> A).
+-spec(send_after/3 :: (non_neg_integer(), pid(), any()) -> tref()).
+-spec(cancel_timer/1 :: (tref()) -> 'ok').
-spec(get_parent/0 :: () -> pid()).
-spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok).
-spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok).
@@ -849,20 +853,20 @@ module_attributes(Module) ->
end.
all_module_attributes(Name) ->
- Modules =
+ Targets =
lists:usort(
lists:append(
- [Modules || {App, _, _} <- application:loaded_applications(),
- {ok, Modules} <- [application:get_key(App, modules)]])),
+ [[{App, Module} || Module <- Modules] ||
+ {App, _, _} <- application:loaded_applications(),
+ {ok, Modules} <- [application:get_key(App, modules)]])),
lists:foldl(
- fun (Module, Acc) ->
+ fun ({App, Module}, Acc) ->
case lists:append([Atts || {N, Atts} <- module_attributes(Module),
N =:= Name]) of
[] -> Acc;
- Atts -> [{Module, Atts} | Acc]
+ Atts -> [{App, Module, Atts} | Acc]
end
- end, [], Modules).
-
+ end, [], Targets).
build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
G = digraph:new([acyclic]),
@@ -870,13 +874,13 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
[case digraph:vertex(G, Vertex) of
false -> digraph:add_vertex(G, Vertex, Label);
_ -> ok = throw({graph_error, {vertex, duplicate, Vertex}})
- end || {Module, Atts} <- Graph,
- {Vertex, Label} <- VertexFun(Module, Atts)],
+ end || GraphElem <- Graph,
+ {Vertex, Label} <- VertexFun(GraphElem)],
[case digraph:add_edge(G, From, To) of
{error, E} -> throw({graph_error, {edge, E, From, To}});
_ -> ok
- end || {Module, Atts} <- Graph,
- {From, To} <- EdgeFun(Module, Atts)],
+ end || GraphElem <- Graph,
+ {From, To} <- EdgeFun(GraphElem)],
{ok, G}
catch {graph_error, Reason} ->
true = digraph:delete(G),
@@ -1012,7 +1016,6 @@ term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse
V =:= true orelse V =:= false ->
V.
-check_expiry(N) when N > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, N}};
check_expiry(N) when N < 0 -> {error, {value_negative, N}};
check_expiry(_N) -> ok.
@@ -1040,7 +1043,7 @@ interval_operation({M, F, A}, MaxRatio, IdealInterval, LastInterval) ->
ensure_timer(State, Idx, After, Msg) ->
case element(Idx, State) of
- undefined -> TRef = erlang:send_after(After, self(), Msg),
+ undefined -> TRef = send_after(After, self(), Msg),
setelement(Idx, State, TRef);
_ -> State
end.
@@ -1048,12 +1051,25 @@ ensure_timer(State, Idx, After, Msg) ->
stop_timer(State, Idx) ->
case element(Idx, State) of
undefined -> State;
- TRef -> case erlang:cancel_timer(TRef) of
- false -> State;
- _ -> setelement(Idx, State, undefined)
- end
+ TRef -> cancel_timer(TRef),
+ setelement(Idx, State, undefined)
end.
+%% timer:send_after/3 goes through a single timer process but allows
+%% long delays. erlang:send_after/3 does not have a bottleneck but
+%% only allows max 2^32-1 millis.
+-define(MAX_ERLANG_SEND_AFTER, 4294967295).
+send_after(Millis, Pid, Msg) when Millis > ?MAX_ERLANG_SEND_AFTER ->
+ {ok, Ref} = timer:send_after(Millis, Pid, Msg),
+ {timer, Ref};
+send_after(Millis, Pid, Msg) ->
+ {erlang, erlang:send_after(Millis, Pid, Msg)}.
+
+cancel_timer({erlang, Ref}) -> erlang:cancel_timer(Ref),
+ ok;
+cancel_timer({timer, Ref}) -> {ok, cancel} = timer:cancel(Ref),
+ ok.
+
store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}).
store_proc_name(TypeProcName) -> put(process_name, TypeProcName).
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index c0fb05e2..7817626c 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -18,6 +18,7 @@
-include("rabbit.hrl").
-export([setup/0, active/0, read_enabled/1, list/1, dependencies/3]).
+-export([ensure/1]).
%%----------------------------------------------------------------------------
@@ -31,22 +32,54 @@
-spec(read_enabled/1 :: (file:filename()) -> [plugin_name()]).
-spec(dependencies/3 :: (boolean(), [plugin_name()], [#plugin{}]) ->
[plugin_name()]).
-
+-spec(ensure/1 :: (string()) -> {'ok', [atom()], [atom()]} | {error, any()}).
-endif.
%%----------------------------------------------------------------------------
+ensure(FileJustChanged) ->
+ {ok, OurFile} = application:get_env(rabbit, enabled_plugins_file),
+ case OurFile of
+ FileJustChanged ->
+ {ok, Dir} = application:get_env(rabbit, plugins_dir),
+ Enabled = read_enabled(OurFile),
+ Wanted = dependencies(false, Enabled, list(Dir)),
+ prepare_plugins(Enabled),
+ Current = active(),
+ Start = Wanted -- Current,
+ Stop = Current -- Wanted,
+ rabbit:start_apps(Start),
+ %% We need sync_notify here since mgmt will attempt to look at all
+ %% the modules for the disabled plugins - if they are unloaded
+ %% that won't work.
+ ok = rabbit_event:sync_notify(plugins_changed, [{enabled, Start},
+ {disabled, Stop}]),
+ rabbit:stop_apps(Stop),
+ clean_plugins(Stop),
+ {ok, Start, Stop};
+ _ ->
+ {error, {enabled_plugins_mismatch, FileJustChanged, OurFile}}
+ end.
+
%% @doc Prepares the file system and installs all enabled plugins.
setup() ->
- {ok, PluginDir} = application:get_env(rabbit, plugins_dir),
{ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+
+ %% Eliminate the contents of the destination directory
+ case delete_recursively(ExpandDir) of
+ ok -> ok;
+ {error, E1} -> throw({error, {cannot_delete_plugins_expand_dir,
+ [ExpandDir, E1]}})
+ end,
+
{ok, EnabledFile} = application:get_env(rabbit, enabled_plugins_file),
- prepare_plugins(EnabledFile, PluginDir, ExpandDir).
+ Enabled = read_enabled(EnabledFile),
+ prepare_plugins(Enabled).
%% @doc Lists the plugins which are currently running.
active() ->
{ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
- InstalledPlugins = [ P#plugin.name || P <- list(ExpandDir) ],
+ InstalledPlugins = plugin_names(list(ExpandDir)),
[App || {App, _, _} <- rabbit_misc:which_applications(),
lists:member(App, InstalledPlugins)].
@@ -67,7 +100,7 @@ list(PluginsDir) ->
_ -> error_logger:warning_msg(
"Problem reading some plugins: ~p~n", [Problems])
end,
- Plugins.
+ ensure_dependencies(Plugins).
%% @doc Read the list of enabled plugins from the supplied term file.
read_enabled(PluginsFile) ->
@@ -86,15 +119,10 @@ read_enabled(PluginsFile) ->
%% the resulting list, otherwise they're skipped.
dependencies(Reverse, Sources, AllPlugins) ->
{ok, G} = rabbit_misc:build_acyclic_graph(
- fun (App, _Deps) -> [{App, App}] end,
- fun (App, Deps) -> [{App, Dep} || Dep <- Deps] end,
- lists:ukeysort(
- 1, [{Name, Deps} ||
- #plugin{name = Name,
- dependencies = Deps} <- AllPlugins] ++
- [{Dep, []} ||
- #plugin{dependencies = Deps} <- AllPlugins,
- Dep <- Deps])),
+ fun ({App, _Deps}) -> [{App, App}] end,
+ fun ({App, Deps}) -> [{App, Dep} || Dep <- Deps] end,
+ [{Name, Deps} || #plugin{name = Name,
+ dependencies = Deps} <- AllPlugins]),
Dests = case Reverse of
false -> digraph_utils:reachable(Sources, G);
true -> digraph_utils:reaching(Sources, G)
@@ -102,11 +130,35 @@ dependencies(Reverse, Sources, AllPlugins) ->
true = digraph:delete(G),
Dests.
+%% Make sure we don't list OTP apps in here, and also that we create
+%% fake plugins for missing dependencies.
+ensure_dependencies(Plugins) ->
+ Names = plugin_names(Plugins),
+ NotThere = [Dep || #plugin{dependencies = Deps} <- Plugins,
+ Dep <- Deps,
+ not lists:member(Dep, Names)],
+ {OTP, Missing} = lists:partition(fun is_loadable/1, NotThere),
+ Plugins1 = [P#plugin{dependencies = Deps -- OTP}
+ || P = #plugin{dependencies = Deps} <- Plugins],
+ Fake = [#plugin{name = Name,
+ dependencies = []}|| Name <- Missing],
+ Plugins1 ++ Fake.
+
+is_loadable(App) ->
+ case application:load(App) of
+ {error, {already_loaded, _}} -> true;
+ ok -> application:unload(App),
+ true;
+ _ -> false
+ end.
+
%%----------------------------------------------------------------------------
-prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) ->
+prepare_plugins(Enabled) ->
+ {ok, PluginsDistDir} = application:get_env(rabbit, plugins_dir),
+ {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+
AllPlugins = list(PluginsDistDir),
- Enabled = read_enabled(EnabledFile),
ToUnpack = dependencies(false, Enabled, AllPlugins),
ToUnpackPlugins = lookup_plugins(ToUnpack, AllPlugins),
@@ -117,12 +169,6 @@ prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) ->
[Missing])
end,
- %% Eliminate the contents of the destination directory
- case delete_recursively(ExpandDir) of
- ok -> ok;
- {error, E1} -> throw({error, {cannot_delete_plugins_expand_dir,
- [ExpandDir, E1]}})
- end,
case filelib:ensure_dir(ExpandDir ++ "/") of
ok -> ok;
{error, E2} -> throw({error, {cannot_create_plugins_expand_dir,
@@ -134,6 +180,20 @@ prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) ->
[prepare_dir_plugin(PluginAppDescPath) ||
PluginAppDescPath <- filelib:wildcard(ExpandDir ++ "/*/ebin/*.app")].
+clean_plugins(Plugins) ->
+ {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+ [clean_plugin(Plugin, ExpandDir) || Plugin <- Plugins].
+
+clean_plugin(Plugin, ExpandDir) ->
+ {ok, Mods} = application:get_key(Plugin, modules),
+ application:unload(Plugin),
+ [begin
+ code:soft_purge(Mod),
+ code:delete(Mod),
+ false = code:is_loaded(Mod)
+ end || Mod <- Mods],
+ delete_recursively(rabbit_misc:format("~s/~s", [ExpandDir, Plugin])).
+
prepare_dir_plugin(PluginAppDescPath) ->
code:add_path(filename:dirname(PluginAppDescPath)),
list_to_atom(filename:basename(PluginAppDescPath, ".app")).
@@ -172,8 +232,7 @@ plugin_info(Base, {app, App0}) ->
mkplugin(Name, Props, Type, Location) ->
Version = proplists:get_value(vsn, Props, "0"),
Description = proplists:get_value(description, Props, ""),
- Dependencies =
- filter_applications(proplists:get_value(applications, Props, [])),
+ Dependencies = proplists:get_value(applications, Props, []),
#plugin{name = Name, version = Version, description = Description,
dependencies = Dependencies, location = Location, type = Type}.
@@ -206,18 +265,6 @@ parse_binary(Bin) ->
Err -> {error, {invalid_app, Err}}
end.
-filter_applications(Applications) ->
- [Application || Application <- Applications,
- not is_available_app(Application)].
-
-is_available_app(Application) ->
- case application:load(Application) of
- {error, {already_loaded, _}} -> true;
- ok -> application:unload(Application),
- true;
- _ -> false
- end.
-
plugin_names(Plugins) ->
[Name || #plugin{name = Name} <- Plugins].
diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl
index 89e16f14..98418d8c 100644
--- a/src/rabbit_plugins_main.erl
+++ b/src/rabbit_plugins_main.erl
@@ -18,23 +18,33 @@
-include("rabbit.hrl").
-export([start/0, stop/0]).
+-export([action/6]).
+-define(NODE_OPT, "-n").
-define(VERBOSE_OPT, "-v").
-define(MINIMAL_OPT, "-m").
-define(ENABLED_OPT, "-E").
-define(ENABLED_ALL_OPT, "-e").
+-define(OFFLINE_OPT, "--offline").
+-define(ONLINE_OPT, "--online").
+-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}).
-define(VERBOSE_DEF, {?VERBOSE_OPT, flag}).
-define(MINIMAL_DEF, {?MINIMAL_OPT, flag}).
-define(ENABLED_DEF, {?ENABLED_OPT, flag}).
-define(ENABLED_ALL_DEF, {?ENABLED_ALL_OPT, flag}).
+-define(OFFLINE_DEF, {?OFFLINE_OPT, flag}).
+-define(ONLINE_DEF, {?ONLINE_OPT, flag}).
--define(GLOBAL_DEFS, []).
+-define(RPC_TIMEOUT, infinity).
+
+-define(GLOBAL_DEFS(Node), [?NODE_DEF(Node)]).
-define(COMMANDS,
[{list, [?VERBOSE_DEF, ?MINIMAL_DEF, ?ENABLED_DEF, ?ENABLED_ALL_DEF]},
- enable,
- disable]).
+ {enable, [?OFFLINE_DEF, ?ONLINE_DEF]},
+ {disable, [?OFFLINE_DEF, ?ONLINE_DEF]},
+ {sync, []}]).
%%----------------------------------------------------------------------------
@@ -51,11 +61,10 @@
start() ->
{ok, [[PluginsFile|_]|_]} =
init:get_argument(enabled_plugins_file),
+ {ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
{ok, [[PluginsDir|_]|_]} = init:get_argument(plugins_dist_dir),
{Command, Opts, Args} =
- case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS,
- init:get_plain_arguments())
- of
+ case parse_arguments(init:get_plain_arguments(), NodeStr) of
{ok, Res} -> Res;
no_command -> print_error("could not recognise command", []),
usage()
@@ -67,7 +76,8 @@ start() ->
[string:join([atom_to_list(Command) | Args], " ")])
end,
- case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of
+ Node = proplists:get_value(?NODE_OPT, Opts),
+ case catch action(Command, Node, Args, Opts, PluginsFile, PluginsDir) of
ok ->
rabbit_misc:quit(0);
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
@@ -82,6 +92,13 @@ start() ->
{error_string, Reason} ->
print_error("~s", [Reason]),
rabbit_misc:quit(2);
+ {badrpc, {'EXIT', Reason}} ->
+ print_error("~p", [Reason]),
+ rabbit_misc:quit(2);
+ {badrpc, Reason} ->
+ print_error("unable to connect to node ~w: ~w", [Node, Reason]),
+ print_badrpc_diagnostics([Node]),
+ rabbit_misc:quit(2);
Other ->
print_error("~p", [Other]),
rabbit_misc:quit(2)
@@ -92,20 +109,32 @@ stop() ->
%%----------------------------------------------------------------------------
-action(list, [], Opts, PluginsFile, PluginsDir) ->
- action(list, [".*"], Opts, PluginsFile, PluginsDir);
-action(list, [Pat], Opts, PluginsFile, PluginsDir) ->
- format_plugins(Pat, Opts, PluginsFile, PluginsDir);
+parse_arguments(CmdLine, NodeStr) ->
+ case rabbit_misc:parse_arguments(
+ ?COMMANDS, ?GLOBAL_DEFS(NodeStr), CmdLine) of
+ {ok, {Cmd, Opts0, Args}} ->
+ Opts = [case K of
+ ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)};
+ _ -> {K, V}
+ end || {K, V} <- Opts0],
+ {ok, {Cmd, Opts, Args}};
+ E ->
+ E
+ end.
+
+action(list, Node, [], Opts, PluginsFile, PluginsDir) ->
+ action(list, Node, [".*"], Opts, PluginsFile, PluginsDir);
+action(list, Node, [Pat], Opts, PluginsFile, PluginsDir) ->
+ format_plugins(Node, Pat, Opts, PluginsFile, PluginsDir);
-action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
+action(enable, Node, ToEnable0, Opts, PluginsFile, PluginsDir) ->
case ToEnable0 of
[] -> throw({error_string, "Not enough arguments for 'enable'"});
_ -> ok
end,
AllPlugins = rabbit_plugins:list(PluginsDir),
Enabled = rabbit_plugins:read_enabled(PluginsFile),
- ImplicitlyEnabled = rabbit_plugins:dependencies(false,
- Enabled, AllPlugins),
+ ImplicitlyEnabled = rabbit_plugins:dependencies(false, Enabled, AllPlugins),
ToEnable = [list_to_atom(Name) || Name <- ToEnable0],
Missing = ToEnable -- plugin_names(AllPlugins),
NewEnabled = lists:usort(Enabled ++ ToEnable),
@@ -124,18 +153,20 @@ action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
case NewEnabled -- ImplicitlyEnabled of
[] -> io:format("Plugin configuration unchanged.~n");
_ -> print_list("The following plugins have been enabled:",
- NewImplicitlyEnabled -- ImplicitlyEnabled),
- report_change()
- end;
+ NewImplicitlyEnabled -- ImplicitlyEnabled)
+ end,
+ action_change(
+ Opts, Node, ImplicitlyEnabled, NewImplicitlyEnabled, PluginsFile);
-action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
+action(disable, Node, ToDisable0, Opts, PluginsFile, PluginsDir) ->
case ToDisable0 of
[] -> throw({error_string, "Not enough arguments for 'disable'"});
_ -> ok
end,
- ToDisable = [list_to_atom(Name) || Name <- ToDisable0],
- Enabled = rabbit_plugins:read_enabled(PluginsFile),
AllPlugins = rabbit_plugins:list(PluginsDir),
+ Enabled = rabbit_plugins:read_enabled(PluginsFile),
+ ImplicitlyEnabled = rabbit_plugins:dependencies(false, Enabled, AllPlugins),
+ ToDisable = [list_to_atom(Name) || Name <- ToDisable0],
Missing = ToDisable -- plugin_names(AllPlugins),
case Missing of
[] -> ok;
@@ -144,30 +175,35 @@ action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
end,
ToDisableDeps = rabbit_plugins:dependencies(true, ToDisable, AllPlugins),
NewEnabled = Enabled -- ToDisableDeps,
+ NewImplicitlyEnabled = rabbit_plugins:dependencies(false,
+ NewEnabled, AllPlugins),
case length(Enabled) =:= length(NewEnabled) of
true -> io:format("Plugin configuration unchanged.~n");
- false -> ImplicitlyEnabled =
- rabbit_plugins:dependencies(false, Enabled, AllPlugins),
- NewImplicitlyEnabled =
- rabbit_plugins:dependencies(false,
- NewEnabled, AllPlugins),
- print_list("The following plugins have been disabled:",
+ false -> print_list("The following plugins have been disabled:",
ImplicitlyEnabled -- NewImplicitlyEnabled),
- write_enabled_plugins(PluginsFile, NewEnabled),
- report_change()
- end.
+ write_enabled_plugins(PluginsFile, NewEnabled)
+ end,
+ action_change(
+ Opts, Node, ImplicitlyEnabled, NewImplicitlyEnabled, PluginsFile);
+
+action(sync, Node, [], _Opts, PluginsFile, _PluginsDir) ->
+ sync(Node, true, PluginsFile).
%%----------------------------------------------------------------------------
-print_error(Format, Args) ->
- rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args).
+fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args).
+
+print_error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args).
+
+print_badrpc_diagnostics(Nodes) ->
+ fmt_stderr(rabbit_nodes:diagnostics(Nodes), []).
usage() ->
io:format("~s", [rabbit_plugins_usage:usage()]),
rabbit_misc:quit(1).
%% Pretty print a list of plugins.
-format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
+format_plugins(Node, Pattern, Opts, PluginsFile, PluginsDir) ->
Verbose = proplists:get_bool(?VERBOSE_OPT, Opts),
Minimal = proplists:get_bool(?MINIMAL_OPT, Opts),
Format = case {Verbose, Minimal} of
@@ -182,9 +218,14 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
AvailablePlugins = rabbit_plugins:list(PluginsDir),
EnabledExplicitly = rabbit_plugins:read_enabled(PluginsFile),
- EnabledImplicitly =
- rabbit_plugins:dependencies(false, EnabledExplicitly,
- AvailablePlugins) -- EnabledExplicitly,
+ AllEnabled = rabbit_plugins:dependencies(false, EnabledExplicitly,
+ AvailablePlugins),
+ EnabledImplicitly = AllEnabled -- EnabledExplicitly,
+ {StatusMsg, Running} =
+ case rpc:call(Node, rabbit_plugins, active, [], ?RPC_TIMEOUT) of
+ {badrpc, _} -> {"[failed to contact ~s - status not shown]", []};
+ Active -> {"* = running on ~s", Active}
+ end,
Missing = [#plugin{name = Name, dependencies = []} ||
Name <- ((EnabledExplicitly ++ EnabledImplicitly) --
plugin_names(AvailablePlugins))],
@@ -192,31 +233,42 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
Plugins = [ Plugin ||
Plugin = #plugin{name = Name} <- AvailablePlugins ++ Missing,
re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match,
- if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
- OnlyEnabledAll -> (lists:member(Name,
- EnabledExplicitly) or
- lists:member(Name, EnabledImplicitly));
+ if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
+ OnlyEnabledAll -> lists:member(Name, EnabledExplicitly) or
+ lists:member(Name,EnabledImplicitly);
true -> true
end],
Plugins1 = usort_plugins(Plugins),
MaxWidth = lists:max([length(atom_to_list(Name)) ||
#plugin{name = Name} <- Plugins1] ++ [0]),
- [format_plugin(P, EnabledExplicitly, EnabledImplicitly,
+ case Format of
+ minimal -> ok;
+ _ -> io:format(" Configured: E = explicitly enabled; "
+ "e = implicitly enabled; ! = missing~n"
+ " | Status: ~s~n"
+ " |/~n", [rabbit_misc:format(StatusMsg, [Node])])
+ end,
+ [format_plugin(P, EnabledExplicitly, EnabledImplicitly, Running,
plugin_names(Missing), Format, MaxWidth) || P <- Plugins1],
ok.
format_plugin(#plugin{name = Name, version = Version,
description = Description, dependencies = Deps},
- EnabledExplicitly, EnabledImplicitly, Missing,
- Format, MaxWidth) ->
- Glyph = case {lists:member(Name, EnabledExplicitly),
- lists:member(Name, EnabledImplicitly),
- lists:member(Name, Missing)} of
- {true, false, false} -> "[E]";
- {false, true, false} -> "[e]";
- {_, _, true} -> "[!]";
- _ -> "[ ]"
- end,
+ EnabledExplicitly, EnabledImplicitly, Running,
+ Missing, Format, MaxWidth) ->
+ EnabledGlyph = case {lists:member(Name, EnabledExplicitly),
+ lists:member(Name, EnabledImplicitly),
+ lists:member(Name, Missing)} of
+ {true, false, false} -> "E";
+ {false, true, false} -> "e";
+ {_, _, true} -> "!";
+ _ -> " "
+ end,
+ RunningGlyph = case lists:member(Name, Running) of
+ true -> "*";
+ false -> " "
+ end,
+ Glyph = rabbit_misc:format("[~s~s]", [EnabledGlyph, RunningGlyph]),
Opt = fun (_F, A, A) -> ok;
( F, A, _) -> io:format(F, [A])
end,
@@ -227,9 +279,9 @@ format_plugin(#plugin{name = Name, version = Version,
Opt("~s", Version, undefined),
io:format("~n");
verbose -> io:format("~s ~w~n", [Glyph, Name]),
- Opt(" Version: \t~s~n", Version, undefined),
- Opt(" Dependencies:\t~p~n", Deps, []),
- Opt(" Description: \t~s~n", Description, undefined),
+ Opt(" Version: \t~s~n", Version, undefined),
+ Opt(" Dependencies:\t~p~n", Deps, []),
+ Opt(" Description: \t~s~n", Description, undefined),
io:format("~n")
end.
@@ -262,6 +314,51 @@ write_enabled_plugins(PluginsFile, Plugins) ->
PluginsFile, Reason}})
end.
-report_change() ->
- io:format("Plugin configuration has changed. "
- "Restart RabbitMQ for changes to take effect.~n").
+action_change(Opts, Node, Old, New, PluginsFile) ->
+ action_change0(proplists:get_bool(?OFFLINE_OPT, Opts),
+ proplists:get_bool(?ONLINE_OPT, Opts),
+ Node, Old, New, PluginsFile).
+
+action_change0(true, _Online, _Node, Same, Same, _PluginsFile) ->
+ %% Definitely nothing to do
+ ok;
+action_change0(true, _Online, _Node, _Old, _New, _PluginsFile) ->
+ io:format("Offline change; changes will take effect at broker restart.~n");
+action_change0(false, Online, Node, _Old, _New, PluginsFile) ->
+ sync(Node, Online, PluginsFile).
+
+sync(Node, ForceOnline, PluginsFile) ->
+ rpc_call(Node, ForceOnline, rabbit_plugins, ensure, [PluginsFile]).
+
+rpc_call(Node, Online, Mod, Fun, Args) ->
+ io:format("Applying plugin configuration to ~s...", [Node]),
+ case rpc:call(Node, Mod, Fun, Args) of
+ {ok, [], []} ->
+ io:format(" nothing to do.~n", []);
+ {ok, Start, []} ->
+ io:format(" started ~b plugin~s.~n", [length(Start), plur(Start)]);
+ {ok, [], Stop} ->
+ io:format(" stopped ~b plugin~s.~n", [length(Stop), plur(Stop)]);
+ {ok, Start, Stop} ->
+ io:format(" stopped ~b plugin~s and started ~b plugin~s.~n",
+ [length(Stop), plur(Stop), length(Start), plur(Start)]);
+ {badrpc, nodedown} = Error ->
+ io:format(" failed.~n", []),
+ case Online of
+ true -> Error;
+ false -> io:format(
+ " * Could not contact node ~s.~n"
+ " * Changes will take effect at broker restart.~n"
+ " * Specify --online for diagnostics and to treat "
+ "this as a failure.~n"
+ " * Specify --offline to disable changes to running "
+ "broker.~n",
+ [Node])
+ end;
+ Error ->
+ io:format(" failed.~n", []),
+ Error
+ end.
+
+plur([_]) -> "";
+plur(_) -> "s".
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index fe2b766f..3558cf98 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -61,13 +61,13 @@ validate_policy0(<<"dead-letter-routing-key">>, Value) ->
{error, "~p is not a valid dead letter routing key", [Value]};
validate_policy0(<<"message-ttl">>, Value)
- when is_integer(Value), Value >= 0, Value =< ?MAX_EXPIRY_TIMER ->
+ when is_integer(Value), Value >= 0 ->
ok;
validate_policy0(<<"message-ttl">>, Value) ->
{error, "~p is not a valid message TTL", [Value]};
validate_policy0(<<"expires">>, Value)
- when is_integer(Value), Value >= 1, Value =< ?MAX_EXPIRY_TIMER ->
+ when is_integer(Value), Value >= 1 ->
ok;
validate_policy0(<<"expires">>, Value) ->
{error, "~p is not a valid queue expiry", [Value]};
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 0a69fb32..f5d03360 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -46,17 +46,11 @@ name(#exchange{policy = Policy}) -> name0(Policy).
name0(undefined) -> none;
name0(Policy) -> pget(name, Policy).
-set(Q = #amqqueue{name = Name}) -> rabbit_queue_decorator:set(
- Q#amqqueue{policy = set0(Name)});
-set(X = #exchange{name = Name}) -> rabbit_exchange_decorator:set(
- X#exchange{policy = set0(Name)}).
+set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
+set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}.
set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)).
-set(Q = #amqqueue{name = Name}, Ps) -> Q#amqqueue{policy = match(Name, Ps)};
-set(X = #exchange{name = Name}, Ps) -> rabbit_exchange_decorator:set(
- X#exchange{policy = match(Name, Ps)}).
-
get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy);
get(Name, #exchange{policy = Policy}) -> get0(Name, Policy);
%% Caution - SLOW.
@@ -104,12 +98,18 @@ recover0() ->
Policies = list(),
[rabbit_misc:execute_mnesia_transaction(
fun () ->
- mnesia:write(rabbit_durable_exchange, set(X, Policies), write)
- end) || X <- Xs],
+ mnesia:write(
+ rabbit_durable_exchange,
+ rabbit_exchange_decorator:set(
+ X#exchange{policy = match(Name, Policies)}), write)
+ end) || X = #exchange{name = Name} <- Xs],
[rabbit_misc:execute_mnesia_transaction(
fun () ->
- mnesia:write(rabbit_durable_queue, set(Q, Policies), write)
- end) || Q <- Qs],
+ mnesia:write(
+ rabbit_durable_queue,
+ rabbit_queue_decorator:set(
+ Q#amqqueue{policy = match(Name, Policies)}), write)
+ end) || Q = #amqqueue{name = Name} <- Qs],
ok.
invalid_file() ->
diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl
index 6205e2dc..adfe0c7f 100644
--- a/src/rabbit_queue_decorator.erl
+++ b/src/rabbit_queue_decorator.erl
@@ -2,7 +2,7 @@
-include("rabbit.hrl").
--export([select/1, set/1]).
+-export([select/1, set/1, register/2, unregister/1]).
%%----------------------------------------------------------------------------
@@ -41,3 +41,24 @@ select(Modules) ->
set(Q) -> Q#amqqueue{decorators = [D || D <- list(), D:active_for(Q)]}.
list() -> [M || {_, M} <- rabbit_registry:lookup_all(queue_decorator)].
+
+register(TypeName, ModuleName) ->
+ rabbit_registry:register(queue_decorator, TypeName, ModuleName),
+ [maybe_recover(Q) || Q <- rabbit_amqqueue:list()],
+ ok.
+
+unregister(TypeName) ->
+ rabbit_registry:unregister(queue_decorator, TypeName),
+ [maybe_recover(Q) || Q <- rabbit_amqqueue:list()],
+ ok.
+
+maybe_recover(Q = #amqqueue{name = Name,
+ decorators = Decs}) ->
+ #amqqueue{decorators = Decs1} = set(Q),
+ Old = lists:sort(select(Decs)),
+ New = lists:sort(select(Decs1)),
+ case New of
+ Old -> ok;
+ _ -> [M:startup(Q) || M <- New -- Old],
+ rabbit_amqqueue:update_decorators(Name)
+ end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index ddaf205e..906c4b6e 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -410,7 +410,7 @@ handle_other({'$gen_cast', {force_event_refresh, Ref}}, State)
rabbit_event:notify(
connection_created,
[{type, network} | infos(?CREATION_EVENT_KEYS, State)], Ref),
- State;
+ rabbit_event:init_stats_timer(State, #v1.stats_timer);
handle_other({'$gen_cast', force_event_refresh}, State) ->
%% Ignore, we will emit a created event once we start running.
State;
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
index da75932d..47c77cd0 100644
--- a/src/rabbit_table.erl
+++ b/src/rabbit_table.erl
@@ -70,7 +70,8 @@ wait_for_replicated() ->
not lists:member({local_content, true}, TabDef)]).
wait(TableNames) ->
- case mnesia:wait_for_tables(TableNames, 30000) of
+ {ok, Timeout} = application:get_env(rabbit, mnesia_table_loading_timeout),
+ case mnesia:wait_for_tables(TableNames, Timeout) of
ok ->
ok;
{timeout, BadTabs} ->
diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl
index d943b599..3a041508 100644
--- a/src/rabbit_version.erl
+++ b/src/rabbit_version.erl
@@ -114,8 +114,8 @@ upgrades_required(Scope) ->
with_upgrade_graph(Fun, Scope) ->
case rabbit_misc:build_acyclic_graph(
- fun (Module, Steps) -> vertices(Module, Steps, Scope) end,
- fun (Module, Steps) -> edges(Module, Steps, Scope) end,
+ fun ({_App, Module, Steps}) -> vertices(Module, Steps, Scope) end,
+ fun ({_App, Module, Steps}) -> edges(Module, Steps, Scope) end,
rabbit_misc:all_module_attributes(rabbit_upgrade)) of
{ok, G} -> try
Fun(G)
@@ -161,7 +161,7 @@ heads(G) ->
categorise_by_scope(Version) when is_list(Version) ->
Categorised =
- [{Scope, Name} || {_Module, Attributes} <-
+ [{Scope, Name} || {_App, _Module, Attributes} <-
rabbit_misc:all_module_attributes(rabbit_upgrade),
{Name, Scope, _Requires} <- Attributes,
lists:member(Name, Version)],