summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-05-04 11:59:25 +0100
committerEmile Joubert <emile@rabbitmq.com>2012-05-04 11:59:25 +0100
commitf0428b3a557ba4d9a41c7537055582e33e00fbf0 (patch)
tree7fd675a2b5b25c0fd62e4ccfb7376ad21bdf37aa
parente3321662f859d746876c80af1ea713692fccea0f (diff)
parent0960e22cb63aa8499cf6a3aa3ae78e70547c2349 (diff)
downloadrabbitmq-server-f0428b3a557ba4d9a41c7537055582e33e00fbf0.tar.gz
Merged bug24889 into default
-rw-r--r--docs/rabbitmqctl.1.xml93
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/mirrored_supervisor.erl45
-rw-r--r--src/mirrored_supervisor_tests.erl2
-rw-r--r--src/rabbit.erl15
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_basic.erl3
-rw-r--r--src/rabbit_control.erl17
-rw-r--r--src/rabbit_event.erl6
-rw-r--r--src/rabbit_limiter.erl2
-rw-r--r--src/rabbit_misc.erl12
-rw-r--r--src/rabbit_mnesia.erl5
-rw-r--r--src/rabbit_nodes.erl3
-rw-r--r--src/rabbit_plugins.erl18
-rw-r--r--src/rabbit_registry.erl20
-rw-r--r--src/rabbit_runtime_parameter.erl43
-rw-r--r--src/rabbit_runtime_parameters.erl239
-rw-r--r--src/rabbit_runtime_parameters_test.erl38
-rw-r--r--src/rabbit_tests.erl33
-rw-r--r--src/rabbit_upgrade_functions.erl8
-rw-r--r--src/rabbit_variable_queue.erl10
-rw-r--r--src/supervisor2.erl4
22 files changed, 558 insertions, 62 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index ded3ab48..1effd691 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -745,6 +745,99 @@
</refsect2>
<refsect2>
+ <title>Parameter Management</title>
+ <para>
+ Certain features of RabbitMQ (such as the federation plugin)
+ are controlled by dynamic,
+ cluster-wide <emphasis>parameters</emphasis>. Each parameter
+ consists of a component name, a key and a value. The
+ component name and key are strings, and the value is an
+ Erlang term. Parameters can be set, cleared and listed. In
+ general you should refer to the documentation for the feature
+ in question to see how to set parameters.
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term><cmdsynopsis><command>set_parameter</command> <arg choice="req"><replaceable>component_name</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg> <arg choice="req"><replaceable>value</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Sets a parameter.
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term>component_name</term>
+ <listitem><para>
+ The name of the component for which the
+ parameter is being set.
+ </para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>key</term>
+ <listitem><para>
+ The key for which the parameter is being set.
+ </para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>value</term>
+ <listitem><para>
+ The value for the parameter, as an
+ Erlang term. In most shells you are very likely to
+ need to quote this.
+ </para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl set_parameter federation local_username '&lt;&lt;"guest">>'</screen>
+ <para role="example">
+ This command sets the parameter <command>local_username</command> for the <command>federation</command> component to the Erlang term <command>&lt;&lt;"guest">></command>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>clear_parameter</command> <arg choice="req"><replaceable>component_name</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Clears a parameter.
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term>component_name</term>
+ <listitem><para>
+ The name of the component for which the
+ parameter is being cleared.
+ </para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>key</term>
+ <listitem><para>
+ The key for which the parameter is being cleared.
+ </para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl clear_parameter federation local_username</screen>
+ <para role="example">
+ This command clears the parameter <command>local_username</command> for the <command>federation</command> component.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>list_parameters</command></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Lists all parameters.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl list_parameters</screen>
+ <para role="example">
+ This command lists all parameters.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </refsect2>
+
+ <refsect2>
<title>Server Status</title>
<para>
The server status queries interrogate the server and return a list of
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index faf3059a..5c73c8b8 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -66,6 +66,8 @@
-record(listener, {node, protocol, host, ip_address, port}).
+-record(runtime_parameters, {key, value}).
+
-record(basic_message, {exchange_name, routing_keys = [], content, id,
is_persistent}).
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 221f6a87..4fe93981 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -261,24 +261,19 @@ start_internal(Group, ChildSpecs) ->
%%----------------------------------------------------------------------------
-init({overall, Group, Init}) ->
- case Init of
- {ok, {Restart, ChildSpecs}} ->
- Delegate = {delegate, {?SUPERVISOR, start_link,
- [?MODULE, {delegate, Restart}]},
- temporary, 16#ffffffff, supervisor, [?SUPERVISOR]},
- Mirroring = {mirroring, {?MODULE, start_internal,
- [Group, ChildSpecs]},
- permanent, 16#ffffffff, worker, [?MODULE]},
- %% Important: Delegate MUST start before Mirroring so that
- %% when we shut down from above it shuts down last, so
- %% Mirroring does not see it die.
- %%
- %% See comment in handle_info('DOWN', ...) below
- {ok, {{one_for_all, 0, 1}, [Delegate, Mirroring]}};
- ignore ->
- ignore
- end;
+init({overall, _Group, ignore}) -> ignore;
+init({overall, Group, {ok, {Restart, ChildSpecs}}}) ->
+ %% Important: Delegate MUST start before Mirroring so that when we
+ %% shut down from above it shuts down last, so Mirroring does not
+ %% see it die.
+ %%
+ %% See comment in handle_info('DOWN', ...) below
+ {ok, {{one_for_all, 0, 1},
+ [{delegate, {?SUPERVISOR, start_link, [?MODULE, {delegate, Restart}]},
+ temporary, 16#ffffffff, supervisor, [?SUPERVISOR]},
+ {mirroring, {?MODULE, start_internal, [Group, ChildSpecs]},
+ permanent, 16#ffffffff, worker, [?MODULE]}]}};
+
init({delegate, Restart}) ->
{ok, {Restart, []}};
@@ -306,9 +301,9 @@ handle_call({init, Overall}, _From,
Delegate = child(Overall, delegate),
erlang:monitor(process, Delegate),
State1 = State#state{overall = Overall, delegate = Delegate},
- case all_started([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of
- true -> {reply, ok, State1};
- false -> {stop, shutdown, State1}
+ case errors([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of
+ [] -> {reply, ok, State1};
+ Errors -> {stop, {shutdown, Errors}, State1}
end;
handle_call({start_child, ChildSpec}, _From,
@@ -372,9 +367,9 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason},
[start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
_ -> []
end,
- case all_started(R) of
- true -> {noreply, State};
- false -> {stop, shutdown, State}
+ case errors(R) of
+ [] -> {noreply, State};
+ Errors -> {stop, {shutdown, Errors}, State}
end;
handle_info(Info, State) ->
@@ -468,7 +463,7 @@ delete_all(Group) ->
[delete(Group, id(C)) ||
C <- mnesia:select(?TABLE, [{MatchHead, [], ['$1']}])].
-all_started(Results) -> [] =:= [R || R = {error, _} <- Results].
+errors(Results) -> [E || {error, E} <- Results].
%%----------------------------------------------------------------------------
diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl
index fe56b530..e8baabe8 100644
--- a/src/mirrored_supervisor_tests.erl
+++ b/src/mirrored_supervisor_tests.erl
@@ -268,7 +268,7 @@ inc_group() ->
get_group(Group) ->
{Group, get(counter)}.
-call(Id, Msg) -> call(Id, Msg, 10000, 100).
+call(Id, Msg) -> call(Id, Msg, 1000, 100).
call(Id, Msg, 0, _Decr) ->
exit({timeout_waiting_for_server, {Id, Msg}, erlang:get_stacktrace()});
diff --git a/src/rabbit.erl b/src/rabbit.erl
index b1f786a0..5e579165 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -621,15 +621,12 @@ log_location(Type) ->
rotate_logs(File, Suffix, Handler) ->
rotate_logs(File, Suffix, Handler, Handler).
-rotate_logs(File, Suffix, OldHandler, NewHandler) ->
- case File of
- undefined -> ok;
- tty -> ok;
- _ -> gen_event:swap_handler(
- error_logger,
- {OldHandler, swap},
- {NewHandler, {File, Suffix}})
- end.
+rotate_logs(undefined, _Suffix, _OldHandler, _NewHandler) -> ok;
+rotate_logs(tty, _Suffix, _OldHandler, _NewHandler) -> ok;
+rotate_logs(File, Suffix, OldHandler, NewHandler) ->
+ gen_event:swap_handler(error_logger,
+ {OldHandler, swap},
+ {NewHandler, {File, Suffix}}).
log_rotation_result({error, MainLogError}, {error, SaslLogError}) ->
{error, {{cannot_rotate_main_logs, MainLogError},
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 28c57bb0..dc144a0e 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -26,10 +26,8 @@
('empty' |
%% Message, IsDelivered, AckTag, Remaining_Len
{rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
--type(is_durable() :: boolean()).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
--type(confirm_required() :: boolean()).
-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
-type(duration() :: ('undefined' | 'infinity' | number())).
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 17d848da..734456d3 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -224,6 +224,5 @@ header_routes(HeadersTable) ->
{array, Routes} -> [Route || {longstr, Route} <- Routes];
undefined -> [];
{Type, _Val} -> throw({error, {unacceptable_type_in_header,
- Type,
- binary_to_list(HeaderKey)}})
+ binary_to_list(HeaderKey), Type}})
end || HeaderKey <- ?ROUTING_HEADERS]).
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 573ed6a3..2dea2a2f 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -356,6 +356,23 @@ action(list_permissions, Node, [], Opts, Inform) ->
list_vhost_permissions, [VHost]}),
rabbit_auth_backend_internal:vhost_perms_info_keys());
+action(set_parameter, Node, [Component, Key, Value], _Opts, Inform) ->
+ Inform("Setting runtime parameter ~p for component ~p to ~p",
+ [Key, Component, Value]),
+ rpc_call(Node, rabbit_runtime_parameters, parse_set,
+ [list_to_binary(Component), list_to_binary(Key), Value]);
+
+action(clear_parameter, Node, [Component, Key], _Opts, Inform) ->
+ Inform("Clearing runtime parameter ~p for component ~p", [Key, Component]),
+ rpc_call(Node, rabbit_runtime_parameters, clear, [list_to_binary(Component),
+ list_to_binary(Key)]);
+
+action(list_parameters, Node, Args = [], _Opts, Inform) ->
+ Inform("Listing runtime parameters", []),
+ display_info_list(
+ rpc_call(Node, rabbit_runtime_parameters, list_formatted, Args),
+ rabbit_runtime_parameters:info_keys());
+
action(report, Node, _Args, _Opts, Inform) ->
io:format("Reporting server status on ~p~n~n", [erlang:universaltime()]),
[begin ok = action(Action, N, [], [], Inform), io:nl() end ||
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 4ec141cf..3f1b20fe 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -139,6 +139,6 @@ notify_if(false, _Type, _Props) -> ok.
notify(Type, Props) ->
%% TODO: switch to os:timestamp() when we drop support for
%% Erlang/OTP < R13B01
- gen_event:notify(rabbit_event, #event{type = Type,
- props = Props,
- timestamp = now()}).
+ gen_event:notify(?MODULE, #event{type = Type,
+ props = Props,
+ timestamp = now()}).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 9fa6213b..2b15498e 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -192,7 +192,7 @@ terminate(_, _) ->
ok.
code_change(_, State, _) ->
- State.
+ {ok, State}.
%%----------------------------------------------------------------------------
%% Internal plumbing
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 0aacd654..706de835 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -40,7 +40,8 @@
-export([upmap/2, map_in_order/2]).
-export([table_filter/3]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
--export([format/2, format_stderr/2, with_local_io/1, local_info_msg/2]).
+-export([format/2, format_many/1, format_stderr/2]).
+-export([with_local_io/1, local_info_msg/2]).
-export([start_applications/1, stop_applications/1]).
-export([unfold/2, ceil/1, queue_fold/3]).
-export([sort_field_table/1]).
@@ -54,7 +55,7 @@
-export([const_ok/0, const/1]).
-export([ntoa/1, ntoab/1]).
-export([is_process_alive/1]).
--export([pget/2, pget/3, pget_or_die/2]).
+-export([pget/2, pget/3, pget_or_die/2, pset/3]).
-export([format_message_queue/2]).
-export([append_rpc_all_nodes/4]).
-export([multi_call/2]).
@@ -158,6 +159,7 @@
-> 'ok' | 'aborted').
-spec(dirty_dump_log/1 :: (file:filename()) -> ok_or_error()).
-spec(format/2 :: (string(), [any()]) -> string()).
+-spec(format_many/1 :: ([{string(), [any()]}]) -> string()).
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(with_local_io/1 :: (fun (() -> A)) -> A).
-spec(local_info_msg/2 :: (string(), [any()]) -> 'ok').
@@ -199,6 +201,7 @@
-spec(pget/2 :: (term(), [term()]) -> term()).
-spec(pget/3 :: (term(), [term()], term()) -> term()).
-spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()).
+-spec(pset/3 :: (term(), term(), [term()]) -> term()).
-spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()).
-spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]).
-spec(multi_call/2 ::
@@ -551,6 +554,9 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) ->
format(Fmt, Args) -> lists:flatten(io_lib:format(Fmt, Args)).
+format_many(List) ->
+ lists:flatten([io_lib:format(F ++ "~n", A) || {F, A} <- List]).
+
format_stderr(Fmt, Args) ->
case os:type() of
{unix, _} ->
@@ -848,6 +854,8 @@ pget_or_die(K, P) ->
V -> V
end.
+pset(Key, Value, List) -> [{Key, Value} | proplists:delete(Key, List)].
+
format_message_queue(_Opt, MQ) ->
Len = priority_queue:len(MQ),
{Len,
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index c714d3a7..7e9346f9 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -296,6 +296,11 @@ table_definitions() ->
[{record_name, exchange_serial},
{attributes, record_info(fields, exchange_serial)},
{match, #exchange_serial{name = exchange_name_match(), _='_'}}]},
+ {rabbit_runtime_parameters,
+ [{record_name, runtime_parameters},
+ {attributes, record_info(fields, runtime_parameters)},
+ {disc_copies, [node()]},
+ {match, #runtime_parameters{_='_'}}]},
{rabbit_durable_queue,
[{record_name, amqqueue},
{attributes, record_info(fields, amqqueue)},
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index b6a9e263..1c23632d 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -57,8 +57,7 @@ diagnostics(Nodes) ->
"hosts, their running nodes and ports:", [Nodes]}] ++
[diagnostics_host(Host) || Host <- Hosts] ++
diagnostics0(),
- lists:flatten([io_lib:format(F ++ "~n", A) || NodeDiag <- NodeDiags,
- {F, A} <- [NodeDiag]]).
+ rabbit_misc:format_many(lists:flatten(NodeDiags)).
diagnostics0() ->
[{"~ncurrent node details:~n- node name: ~w", [node()]},
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 2a93c8f2..00880fb2 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -245,7 +245,7 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
{true, true} -> throw({error_string,
"Cannot specify -m and -v together"})
end,
- OnlyEnabled = proplists:get_bool(?ENABLED_OPT, Opts),
+ OnlyEnabled = proplists:get_bool(?ENABLED_OPT, Opts),
OnlyEnabledAll = proplists:get_bool(?ENABLED_ALL_OPT, Opts),
AvailablePlugins = find_plugins(PluginsDir),
@@ -257,14 +257,10 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
Plugins = [ Plugin ||
Plugin = #plugin{name = Name} <- AvailablePlugins,
re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match,
- if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
- true -> true
- end,
- if OnlyEnabledAll ->
- lists:member(Name, EnabledImplicitly) or
- lists:member(Name, EnabledExplicitly);
- true ->
- true
+ if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
+ OnlyEnabledAll -> (lists:member(Name, EnabledExplicitly) or
+ lists:member(Name, EnabledImplicitly));
+ true -> true
end],
Plugins1 = usort_plugins(Plugins),
MaxWidth = lists:max([length(atom_to_list(Name)) ||
@@ -338,8 +334,8 @@ read_enabled_plugins(PluginsFile) ->
case rabbit_file:read_term_file(PluginsFile) of
{ok, [Plugins]} -> Plugins;
{ok, []} -> [];
- {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file,
- PluginsFile}});
+ {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file,
+ PluginsFile}});
{error, enoent} -> [];
{error, Reason} -> throw({error, {cannot_read_enabled_plugins_file,
PluginsFile, Reason}})
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 8c0ebcbe..637835c3 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -23,7 +23,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([register/3, binary_to_type/1, lookup_module/2, lookup_all/1]).
+-export([register/3, unregister/2,
+ binary_to_type/1, lookup_module/2, lookup_all/1]).
-define(SERVER, ?MODULE).
-define(ETS_NAME, ?MODULE).
@@ -32,6 +33,7 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(register/3 :: (atom(), binary(), atom()) -> 'ok').
+-spec(unregister/2 :: (atom(), binary()) -> 'ok').
-spec(binary_to_type/1 ::
(binary()) -> atom() | rabbit_types:error('not_found')).
-spec(lookup_module/2 ::
@@ -50,6 +52,9 @@ start_link() ->
register(Class, TypeName, ModuleName) ->
gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}, infinity).
+unregister(Class, TypeName) ->
+ gen_server:call(?SERVER, {unregister, Class, TypeName}, infinity).
+
%% This is used with user-supplied arguments (e.g., on exchange
%% declare), so we restrict it to existing atoms only. This means it
%% can throw a badarg, indicating that the type cannot have been
@@ -83,6 +88,10 @@ internal_register(Class, TypeName, ModuleName)
{{Class, internal_binary_to_type(TypeName)}, ModuleName}),
ok.
+internal_unregister(Class, TypeName) ->
+ true = ets:delete(?ETS_NAME, {Class, internal_binary_to_type(TypeName)}),
+ ok.
+
sanity_check_module(ClassModule, Module) ->
case catch lists:member(ClassModule,
lists:flatten(
@@ -95,8 +104,9 @@ sanity_check_module(ClassModule, Module) ->
true -> ok
end.
-class_module(exchange) -> rabbit_exchange_type;
-class_module(auth_mechanism) -> rabbit_auth_mechanism.
+class_module(exchange) -> rabbit_exchange_type;
+class_module(auth_mechanism) -> rabbit_auth_mechanism;
+class_module(runtime_parameter) -> rabbit_runtime_parameter.
%%---------------------------------------------------------------------------
@@ -108,6 +118,10 @@ handle_call({register, Class, TypeName, ModuleName}, _From, State) ->
ok = internal_register(Class, TypeName, ModuleName),
{reply, ok, State};
+handle_call({unregister, Class, TypeName}, _From, State) ->
+ ok = internal_unregister(Class, TypeName),
+ {reply, ok, State};
+
handle_call(Request, _From, State) ->
{stop, {unhandled_call, Request}, State}.
diff --git a/src/rabbit_runtime_parameter.erl b/src/rabbit_runtime_parameter.erl
new file mode 100644
index 00000000..c7d30116
--- /dev/null
+++ b/src/rabbit_runtime_parameter.erl
@@ -0,0 +1,43 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_runtime_parameter).
+
+-ifdef(use_specs).
+
+-type(validate_results() ::
+ 'ok' | {error, string(), [term()]} | [validate_results()]).
+
+-callback validate(binary(), binary(), term()) -> validate_results().
+-callback validate_clear(binary(), binary()) -> validate_results().
+-callback notify(binary(), binary(), term()) -> 'ok'.
+-callback notify_clear(binary(), binary()) -> 'ok'.
+
+-else.
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [
+ {validate, 3},
+ {validate_clear, 2},
+ {notify, 3},
+ {notify_clear, 2}
+ ];
+behaviour_info(_Other) ->
+ undefined.
+
+-endif.
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
new file mode 100644
index 00000000..172cee92
--- /dev/null
+++ b/src/rabbit_runtime_parameters.erl
@@ -0,0 +1,239 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_runtime_parameters).
+
+-include("rabbit.hrl").
+
+-export([parse_set/3, set/3, clear/2, list/0, list/1, list_formatted/0,
+ lookup/2, value/2, value/3, info_keys/0]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(ok_or_error_string() :: 'ok' | {'error_string', string()}).
+
+-spec(parse_set/3 :: (binary(), binary(), string()) -> ok_or_error_string()).
+-spec(set/3 :: (binary(), binary(), term()) -> ok_or_error_string()).
+-spec(clear/2 :: (binary(), binary()) -> ok_or_error_string()).
+-spec(list/0 :: () -> [rabbit_types:infos()]).
+-spec(list/1 :: (binary()) -> [rabbit_types:infos()] | 'not_found').
+-spec(list_formatted/0 :: () -> [rabbit_types:infos()]).
+-spec(lookup/2 :: (binary(), binary()) -> rabbit_types:infos()).
+-spec(value/2 :: (binary(), binary()) -> term()).
+-spec(value/3 :: (binary(), binary(), term()) -> term()).
+-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
+
+-endif.
+
+%%---------------------------------------------------------------------------
+
+-import(rabbit_misc, [pget/2, pset/3]).
+
+-define(TABLE, rabbit_runtime_parameters).
+
+%%---------------------------------------------------------------------------
+
+parse_set(Component, Key, String) ->
+ case parse(String) of
+ {ok, Term} -> set(Component, Key, Term);
+ {errors, L} -> format_error(L)
+ end.
+
+set(Component, Key, Term) ->
+ case set0(Component, Key, Term) of
+ ok -> ok;
+ {errors, L} -> format_error(L)
+ end.
+
+format_error(L) ->
+ {error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}.
+
+set0(Component, Key, Term) ->
+ case lookup_component(Component) of
+ {ok, Mod} ->
+ case flatten_errors(validate(Term)) of
+ ok ->
+ case flatten_errors(Mod:validate(Component, Key, Term)) of
+ ok ->
+ case mnesia_update(Component, Key, Term) of
+ {old, Term} -> ok;
+ _ -> Mod:notify(Component, Key, Term)
+ end,
+ ok;
+ E ->
+ E
+ end;
+ E ->
+ E
+ end;
+ E ->
+ E
+ end.
+
+mnesia_update(Component, Key, Term) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ Res = case mnesia:read(?TABLE, {Component, Key}, read) of
+ [] -> new;
+ [Params] -> {old, Params#runtime_parameters.value}
+ end,
+ ok = mnesia:write(?TABLE, c(Component, Key, Term), write),
+ Res
+ end).
+
+clear(Component, Key) ->
+ case clear0(Component, Key) of
+ ok -> ok;
+ {errors, L} -> format_error(L)
+ end.
+
+clear0(Component, Key) ->
+ case lookup_component(Component) of
+ {ok, Mod} -> case flatten_errors(Mod:validate_clear(Component, Key)) of
+ ok -> mnesia_clear(Component, Key),
+ Mod:notify_clear(Component, Key),
+ ok;
+ E -> E
+ end;
+ E -> E
+ end.
+
+mnesia_clear(Component, Key) ->
+ ok = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ ok = mnesia:delete(?TABLE, {Component, Key}, write)
+ end).
+
+list() ->
+ [p(P) || P <- rabbit_misc:dirty_read_all(?TABLE)].
+
+list(Component) ->
+ case lookup_component(Component) of
+ {ok, _} -> Match = #runtime_parameters{key = {Component, '_'}, _ = '_'},
+ [p(P) || P <- mnesia:dirty_match_object(?TABLE, Match)];
+ _ -> not_found
+ end.
+
+list_formatted() ->
+ [pset(value, format(pget(value, P)), P) || P <- list()].
+
+lookup(Component, Key) ->
+ case lookup0(Component, Key, rabbit_misc:const(not_found)) of
+ not_found -> not_found;
+ Params -> p(Params)
+ end.
+
+value(Component, Key) ->
+ case lookup0(Component, Key, rabbit_misc:const(not_found)) of
+ not_found -> not_found;
+ Params -> Params#runtime_parameters.value
+ end.
+
+value(Component, Key, Default) ->
+ Params = lookup0(Component, Key,
+ fun () -> lookup_missing(Component, Key, Default) end),
+ Params#runtime_parameters.value.
+
+lookup0(Component, Key, DefaultFun) ->
+ case mnesia:dirty_read(?TABLE, {Component, Key}) of
+ [] -> DefaultFun();
+ [R] -> R
+ end.
+
+lookup_missing(Component, Key, Default) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read(?TABLE, {Component, Key}, read) of
+ [] -> Record = c(Component, Key, Default),
+ mnesia:write(?TABLE, Record, write),
+ Record;
+ [R] -> R
+ end
+ end).
+
+c(Component, Key, Default) -> #runtime_parameters{key = {Component, Key},
+ value = Default}.
+
+p(#runtime_parameters{key = {Component, Key}, value = Value}) ->
+ [{component, Component},
+ {key, Key},
+ {value, Value}].
+
+info_keys() -> [component, key, value].
+
+%%---------------------------------------------------------------------------
+
+lookup_component(Component) ->
+ case rabbit_registry:lookup_module(
+ runtime_parameter, list_to_atom(binary_to_list(Component))) of
+ {error, not_found} -> {errors,
+ [{"component ~s not found", [Component]}]};
+ {ok, Module} -> {ok, Module}
+ end.
+
+parse(Src0) ->
+ Src1 = string:strip(Src0),
+ Src = case lists:reverse(Src1) of
+ [$. |_] -> Src1;
+ _ -> Src1 ++ "."
+ end,
+ case erl_scan:string(Src) of
+ {ok, Scanned, _} ->
+ case erl_parse:parse_term(Scanned) of
+ {ok, Parsed} ->
+ {ok, Parsed};
+ {error, E} ->
+ {errors,
+ [{"Could not parse value: ~s", [format_parse_error(E)]}]}
+ end;
+ {error, E, _} ->
+ {errors, [{"Could not scan value: ~s", [format_parse_error(E)]}]}
+ end.
+
+format_parse_error({_Line, Mod, Err}) ->
+ lists:flatten(Mod:format_error(Err)).
+
+format(Term) ->
+ list_to_binary(rabbit_misc:format("~p", [Term])).
+
+%%---------------------------------------------------------------------------
+
+%% We will want to be able to biject these to JSON. So we have some
+%% generic restrictions on what we consider acceptable.
+validate(Proplist = [T | _]) when is_tuple(T) -> validate_proplist(Proplist);
+validate(L) when is_list(L) -> validate_list(L);
+validate(T) when is_tuple(T) -> {error, "tuple: ~p", [T]};
+validate(B) when is_boolean(B) -> ok;
+validate(null) -> ok;
+validate(A) when is_atom(A) -> {error, "atom: ~p", [A]};
+validate(N) when is_number(N) -> ok;
+validate(B) when is_binary(B) -> ok;
+validate(B) when is_bitstring(B) -> {error, "bitstring: ~p", [B]}.
+
+validate_list(L) -> [validate(I) || I <- L].
+validate_proplist(L) -> [vp(I) || I <- L].
+
+vp({K, V}) when is_binary(K) -> validate(V);
+vp({K, _V}) -> {error, "bad key: ~p", [K]};
+vp(H) -> {error, "not two tuple: ~p", [H]}.
+
+flatten_errors(L) ->
+ case [{F, A} || I <- lists:flatten([L]), {error, F, A} <- [I]] of
+ [] -> ok;
+ E -> {errors, E}
+ end.
diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl
new file mode 100644
index 00000000..f23b3227
--- /dev/null
+++ b/src/rabbit_runtime_parameters_test.erl
@@ -0,0 +1,38 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_runtime_parameters_test).
+-behaviour(rabbit_runtime_parameter).
+
+-export([validate/3, validate_clear/2, notify/3, notify_clear/2]).
+-export([register/0, unregister/0]).
+
+register() ->
+ rabbit_registry:register(runtime_parameter, <<"test">>, ?MODULE).
+
+unregister() ->
+ rabbit_registry:unregister(runtime_parameter, <<"test">>).
+
+validate(<<"test">>, <<"good">>, _Term) -> ok;
+validate(<<"test">>, <<"maybe">>, <<"good">>) -> ok;
+validate(<<"test">>, _, _) -> {error, "meh", []}.
+
+validate_clear(<<"test">>, <<"good">>) -> ok;
+validate_clear(<<"test">>, <<"maybe">>) -> ok;
+validate_clear(<<"test">>, _) -> {error, "meh", []}.
+
+notify(_, _, _) -> ok.
+notify_clear(_, _) -> ok.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 04ee6ef2..96b5fa38 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -53,6 +53,7 @@ all_tests() ->
passed = test_option_parser(),
passed = test_cluster_management(),
passed = test_user_management(),
+ passed = test_runtime_parameters(),
passed = test_server_status(),
passed = test_confirms(),
passed = maybe_run_cluster_dependent_tests(),
@@ -1097,6 +1098,38 @@ test_user_management() ->
passed.
+test_runtime_parameters() ->
+ rabbit_runtime_parameters_test:register(),
+ Good = fun(L) -> ok = control_action(set_parameter, L) end,
+ Bad = fun(L) -> {error_string, _} = control_action(set_parameter, L) end,
+
+ %% Acceptable for bijection
+ Good(["test", "good", "<<\"ignore\">>"]),
+ Good(["test", "good", "123"]),
+ Good(["test", "good", "true"]),
+ Good(["test", "good", "false"]),
+ Good(["test", "good", "null"]),
+ Good(["test", "good", "[{<<\"key\">>, <<\"value\">>}]"]),
+
+ %% Various forms of fail due to non-bijectability
+ Bad(["test", "good", "atom"]),
+ Bad(["test", "good", "{tuple, foo}"]),
+ Bad(["test", "good", "[{<<\"key\">>, <<\"value\">>, 1}]"]),
+ Bad(["test", "good", "[{key, <<\"value\">>}]"]),
+
+ %% Test actual validation hook
+ Good(["test", "maybe", "<<\"good\">>"]),
+ Bad(["test", "maybe", "<<\"bad\">>"]),
+
+ ok = control_action(list_parameters, []),
+
+ ok = control_action(clear_parameter, ["test", "good"]),
+ ok = control_action(clear_parameter, ["test", "maybe"]),
+ {error_string, _} =
+ control_action(clear_parameter, ["test", "neverexisted"]),
+ rabbit_runtime_parameters_test:unregister(),
+ passed.
+
test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 9f2535bd..485ccc5f 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -36,6 +36,7 @@
-rabbit_upgrade({exchange_scratch, mnesia, [trace_exchanges]}).
-rabbit_upgrade({mirrored_supervisor, mnesia, []}).
-rabbit_upgrade({topic_trie_node, mnesia, []}).
+-rabbit_upgrade({runtime_parameters, mnesia, []}).
%% -------------------------------------------------------------------
@@ -56,6 +57,7 @@
-spec(exchange_scratch/0 :: () -> 'ok').
-spec(mirrored_supervisor/0 :: () -> 'ok').
-spec(topic_trie_node/0 :: () -> 'ok').
+-spec(runtime_parameters/0 :: () -> 'ok').
-endif.
@@ -185,6 +187,12 @@ topic_trie_node() ->
{attributes, [trie_node, edge_count, binding_count]},
{type, ordered_set}]).
+runtime_parameters() ->
+ create(rabbit_runtime_parameters,
+ [{record_name, runtime_parameters},
+ {attributes, [key, value]},
+ {disc_copies, [node()]}]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 209e5252..dafb3f2e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -323,7 +323,6 @@
-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
-type(seq_id() :: non_neg_integer()).
--type(ack() :: seq_id()).
-type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()},
ingress :: {timestamp(), non_neg_integer()},
@@ -335,6 +334,13 @@
count :: non_neg_integer(),
end_seq_id :: non_neg_integer() }).
+%% The compiler (rightfully) complains that ack() and state() are
+%% unused. For this reason we duplicate a -spec from
+%% rabbit_backing_queue with the only intent being to remove
+%% warnings. The problem here is that we can't parameterise the BQ
+%% behaviour by these two types as we would like to. We still leave
+%% these here for documentation purposes.
+-type(ack() :: seq_id()).
-type(state() :: #vqstate {
q1 :: ?QUEUE:?QUEUE(),
q2 :: ?QUEUE:?QUEUE(),
@@ -368,6 +374,8 @@
ack_out_counter :: non_neg_integer(),
ack_in_counter :: non_neg_integer(),
ack_rates :: rates() }).
+%% Duplicated from rabbit_backing_queue
+-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
-spec(multiple_routing_keys/0 :: () -> 'ok').
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index f1b74878..3d3623d7 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -411,6 +411,8 @@ handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) ->
#child{mfa = {M, F, A}} = hd(State#state.children),
Args = A ++ EArgs,
case do_start_child_i(M, F, Args) of
+ {ok, undefined} ->
+ {reply, {ok, undefined}, State};
{ok, Pid} ->
NState = State#state{dynamics =
?DICT:store(Pid, Args, State#state.dynamics)},
@@ -743,6 +745,8 @@ restart(Strategy, Child, State, Restart)
#child{mfa = {M, F, A}} = Child,
Dynamics = ?DICT:erase(Child#child.pid, State#state.dynamics),
case do_start_child_i(M, F, A) of
+ {ok, undefined} ->
+ {ok, State};
{ok, Pid} ->
NState = State#state{dynamics = ?DICT:store(Pid, A, Dynamics)},
{ok, NState};