diff options
author | Emile Joubert <emile@rabbitmq.com> | 2012-05-04 11:59:25 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2012-05-04 11:59:25 +0100 |
commit | f0428b3a557ba4d9a41c7537055582e33e00fbf0 (patch) | |
tree | 7fd675a2b5b25c0fd62e4ccfb7376ad21bdf37aa | |
parent | e3321662f859d746876c80af1ea713692fccea0f (diff) | |
parent | 0960e22cb63aa8499cf6a3aa3ae78e70547c2349 (diff) | |
download | rabbitmq-server-f0428b3a557ba4d9a41c7537055582e33e00fbf0.tar.gz |
Merged bug24889 into default
-rw-r--r-- | docs/rabbitmqctl.1.xml | 93 | ||||
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/mirrored_supervisor.erl | 45 | ||||
-rw-r--r-- | src/mirrored_supervisor_tests.erl | 2 | ||||
-rw-r--r-- | src/rabbit.erl | 15 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 3 | ||||
-rw-r--r-- | src/rabbit_control.erl | 17 | ||||
-rw-r--r-- | src/rabbit_event.erl | 6 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 2 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 12 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 5 | ||||
-rw-r--r-- | src/rabbit_nodes.erl | 3 | ||||
-rw-r--r-- | src/rabbit_plugins.erl | 18 | ||||
-rw-r--r-- | src/rabbit_registry.erl | 20 | ||||
-rw-r--r-- | src/rabbit_runtime_parameter.erl | 43 | ||||
-rw-r--r-- | src/rabbit_runtime_parameters.erl | 239 | ||||
-rw-r--r-- | src/rabbit_runtime_parameters_test.erl | 38 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 33 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 8 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 10 | ||||
-rw-r--r-- | src/supervisor2.erl | 4 |
22 files changed, 558 insertions, 62 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index ded3ab48..1effd691 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -745,6 +745,99 @@ </refsect2> <refsect2> + <title>Parameter Management</title> + <para> + Certain features of RabbitMQ (such as the federation plugin) + are controlled by dynamic, + cluster-wide <emphasis>parameters</emphasis>. Each parameter + consists of a component name, a key and a value. The + component name and key are strings, and the value is an + Erlang term. Parameters can be set, cleared and listed. In + general you should refer to the documentation for the feature + in question to see how to set parameters. + </para> + <variablelist> + <varlistentry> + <term><cmdsynopsis><command>set_parameter</command> <arg choice="req"><replaceable>component_name</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg> <arg choice="req"><replaceable>value</replaceable></arg></cmdsynopsis></term> + <listitem> + <para> + Sets a parameter. + </para> + <variablelist> + <varlistentry> + <term>component_name</term> + <listitem><para> + The name of the component for which the + parameter is being set. + </para></listitem> + </varlistentry> + <varlistentry> + <term>key</term> + <listitem><para> + The key for which the parameter is being set. + </para></listitem> + </varlistentry> + <varlistentry> + <term>value</term> + <listitem><para> + The value for the parameter, as an + Erlang term. In most shells you are very likely to + need to quote this. + </para></listitem> + </varlistentry> + </variablelist> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl set_parameter federation local_username '<<"guest">>'</screen> + <para role="example"> + This command sets the parameter <command>local_username</command> for the <command>federation</command> component to the Erlang term <command><<"guest">></command>. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>clear_parameter</command> <arg choice="req"><replaceable>component_name</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg></cmdsynopsis></term> + <listitem> + <para> + Clears a parameter. + </para> + <variablelist> + <varlistentry> + <term>component_name</term> + <listitem><para> + The name of the component for which the + parameter is being cleared. + </para></listitem> + </varlistentry> + <varlistentry> + <term>key</term> + <listitem><para> + The key for which the parameter is being cleared. + </para></listitem> + </varlistentry> + </variablelist> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl clear_parameter federation local_username</screen> + <para role="example"> + This command clears the parameter <command>local_username</command> for the <command>federation</command> component. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>list_parameters</command></cmdsynopsis></term> + <listitem> + <para> + Lists all parameters. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl list_parameters</screen> + <para role="example"> + This command lists all parameters. + </para> + </listitem> + </varlistentry> + </variablelist> + </refsect2> + + <refsect2> <title>Server Status</title> <para> The server status queries interrogate the server and return a list of diff --git a/include/rabbit.hrl b/include/rabbit.hrl index faf3059a..5c73c8b8 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -66,6 +66,8 @@ -record(listener, {node, protocol, host, ip_address, port}). +-record(runtime_parameters, {key, value}). + -record(basic_message, {exchange_name, routing_keys = [], content, id, is_persistent}). diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index 221f6a87..4fe93981 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -261,24 +261,19 @@ start_internal(Group, ChildSpecs) -> %%---------------------------------------------------------------------------- -init({overall, Group, Init}) -> - case Init of - {ok, {Restart, ChildSpecs}} -> - Delegate = {delegate, {?SUPERVISOR, start_link, - [?MODULE, {delegate, Restart}]}, - temporary, 16#ffffffff, supervisor, [?SUPERVISOR]}, - Mirroring = {mirroring, {?MODULE, start_internal, - [Group, ChildSpecs]}, - permanent, 16#ffffffff, worker, [?MODULE]}, - %% Important: Delegate MUST start before Mirroring so that - %% when we shut down from above it shuts down last, so - %% Mirroring does not see it die. - %% - %% See comment in handle_info('DOWN', ...) below - {ok, {{one_for_all, 0, 1}, [Delegate, Mirroring]}}; - ignore -> - ignore - end; +init({overall, _Group, ignore}) -> ignore; +init({overall, Group, {ok, {Restart, ChildSpecs}}}) -> + %% Important: Delegate MUST start before Mirroring so that when we + %% shut down from above it shuts down last, so Mirroring does not + %% see it die. + %% + %% See comment in handle_info('DOWN', ...) below + {ok, {{one_for_all, 0, 1}, + [{delegate, {?SUPERVISOR, start_link, [?MODULE, {delegate, Restart}]}, + temporary, 16#ffffffff, supervisor, [?SUPERVISOR]}, + {mirroring, {?MODULE, start_internal, [Group, ChildSpecs]}, + permanent, 16#ffffffff, worker, [?MODULE]}]}}; + init({delegate, Restart}) -> {ok, {Restart, []}}; @@ -306,9 +301,9 @@ handle_call({init, Overall}, _From, Delegate = child(Overall, delegate), erlang:monitor(process, Delegate), State1 = State#state{overall = Overall, delegate = Delegate}, - case all_started([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of - true -> {reply, ok, State1}; - false -> {stop, shutdown, State1} + case errors([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of + [] -> {reply, ok, State1}; + Errors -> {stop, {shutdown, Errors}, State1} end; handle_call({start_child, ChildSpec}, _From, @@ -372,9 +367,9 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs]; _ -> [] end, - case all_started(R) of - true -> {noreply, State}; - false -> {stop, shutdown, State} + case errors(R) of + [] -> {noreply, State}; + Errors -> {stop, {shutdown, Errors}, State} end; handle_info(Info, State) -> @@ -468,7 +463,7 @@ delete_all(Group) -> [delete(Group, id(C)) || C <- mnesia:select(?TABLE, [{MatchHead, [], ['$1']}])]. -all_started(Results) -> [] =:= [R || R = {error, _} <- Results]. +errors(Results) -> [E || {error, E} <- Results]. %%---------------------------------------------------------------------------- diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl index fe56b530..e8baabe8 100644 --- a/src/mirrored_supervisor_tests.erl +++ b/src/mirrored_supervisor_tests.erl @@ -268,7 +268,7 @@ inc_group() -> get_group(Group) -> {Group, get(counter)}. -call(Id, Msg) -> call(Id, Msg, 10000, 100). +call(Id, Msg) -> call(Id, Msg, 1000, 100). call(Id, Msg, 0, _Decr) -> exit({timeout_waiting_for_server, {Id, Msg}, erlang:get_stacktrace()}); diff --git a/src/rabbit.erl b/src/rabbit.erl index b1f786a0..5e579165 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -621,15 +621,12 @@ log_location(Type) -> rotate_logs(File, Suffix, Handler) -> rotate_logs(File, Suffix, Handler, Handler). -rotate_logs(File, Suffix, OldHandler, NewHandler) -> - case File of - undefined -> ok; - tty -> ok; - _ -> gen_event:swap_handler( - error_logger, - {OldHandler, swap}, - {NewHandler, {File, Suffix}}) - end. +rotate_logs(undefined, _Suffix, _OldHandler, _NewHandler) -> ok; +rotate_logs(tty, _Suffix, _OldHandler, _NewHandler) -> ok; +rotate_logs(File, Suffix, OldHandler, NewHandler) -> + gen_event:swap_handler(error_logger, + {OldHandler, swap}, + {NewHandler, {File, Suffix}}). log_rotation_result({error, MainLogError}, {error, SaslLogError}) -> {error, {{cannot_rotate_main_logs, MainLogError}, diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 28c57bb0..dc144a0e 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -26,10 +26,8 @@ ('empty' | %% Message, IsDelivered, AckTag, Remaining_Len {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})). --type(is_durable() :: boolean()). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). --type(confirm_required() :: boolean()). -type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). -type(duration() :: ('undefined' | 'infinity' | number())). diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 17d848da..734456d3 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -224,6 +224,5 @@ header_routes(HeadersTable) -> {array, Routes} -> [Route || {longstr, Route} <- Routes]; undefined -> []; {Type, _Val} -> throw({error, {unacceptable_type_in_header, - Type, - binary_to_list(HeaderKey)}}) + binary_to_list(HeaderKey), Type}}) end || HeaderKey <- ?ROUTING_HEADERS]). diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 573ed6a3..2dea2a2f 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -356,6 +356,23 @@ action(list_permissions, Node, [], Opts, Inform) -> list_vhost_permissions, [VHost]}), rabbit_auth_backend_internal:vhost_perms_info_keys()); +action(set_parameter, Node, [Component, Key, Value], _Opts, Inform) -> + Inform("Setting runtime parameter ~p for component ~p to ~p", + [Key, Component, Value]), + rpc_call(Node, rabbit_runtime_parameters, parse_set, + [list_to_binary(Component), list_to_binary(Key), Value]); + +action(clear_parameter, Node, [Component, Key], _Opts, Inform) -> + Inform("Clearing runtime parameter ~p for component ~p", [Key, Component]), + rpc_call(Node, rabbit_runtime_parameters, clear, [list_to_binary(Component), + list_to_binary(Key)]); + +action(list_parameters, Node, Args = [], _Opts, Inform) -> + Inform("Listing runtime parameters", []), + display_info_list( + rpc_call(Node, rabbit_runtime_parameters, list_formatted, Args), + rabbit_runtime_parameters:info_keys()); + action(report, Node, _Args, _Opts, Inform) -> io:format("Reporting server status on ~p~n~n", [erlang:universaltime()]), [begin ok = action(Action, N, [], [], Inform), io:nl() end || diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 4ec141cf..3f1b20fe 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -139,6 +139,6 @@ notify_if(false, _Type, _Props) -> ok. notify(Type, Props) -> %% TODO: switch to os:timestamp() when we drop support for %% Erlang/OTP < R13B01 - gen_event:notify(rabbit_event, #event{type = Type, - props = Props, - timestamp = now()}). + gen_event:notify(?MODULE, #event{type = Type, + props = Props, + timestamp = now()}). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 9fa6213b..2b15498e 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -192,7 +192,7 @@ terminate(_, _) -> ok. code_change(_, State, _) -> - State. + {ok, State}. %%---------------------------------------------------------------------------- %% Internal plumbing diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 0aacd654..706de835 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -40,7 +40,8 @@ -export([upmap/2, map_in_order/2]). -export([table_filter/3]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). --export([format/2, format_stderr/2, with_local_io/1, local_info_msg/2]). +-export([format/2, format_many/1, format_stderr/2]). +-export([with_local_io/1, local_info_msg/2]). -export([start_applications/1, stop_applications/1]). -export([unfold/2, ceil/1, queue_fold/3]). -export([sort_field_table/1]). @@ -54,7 +55,7 @@ -export([const_ok/0, const/1]). -export([ntoa/1, ntoab/1]). -export([is_process_alive/1]). --export([pget/2, pget/3, pget_or_die/2]). +-export([pget/2, pget/3, pget_or_die/2, pset/3]). -export([format_message_queue/2]). -export([append_rpc_all_nodes/4]). -export([multi_call/2]). @@ -158,6 +159,7 @@ -> 'ok' | 'aborted'). -spec(dirty_dump_log/1 :: (file:filename()) -> ok_or_error()). -spec(format/2 :: (string(), [any()]) -> string()). +-spec(format_many/1 :: ([{string(), [any()]}]) -> string()). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(with_local_io/1 :: (fun (() -> A)) -> A). -spec(local_info_msg/2 :: (string(), [any()]) -> 'ok'). @@ -199,6 +201,7 @@ -spec(pget/2 :: (term(), [term()]) -> term()). -spec(pget/3 :: (term(), [term()], term()) -> term()). -spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()). +-spec(pset/3 :: (term(), term(), [term()]) -> term()). -spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()). -spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]). -spec(multi_call/2 :: @@ -551,6 +554,9 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) -> format(Fmt, Args) -> lists:flatten(io_lib:format(Fmt, Args)). +format_many(List) -> + lists:flatten([io_lib:format(F ++ "~n", A) || {F, A} <- List]). + format_stderr(Fmt, Args) -> case os:type() of {unix, _} -> @@ -848,6 +854,8 @@ pget_or_die(K, P) -> V -> V end. +pset(Key, Value, List) -> [{Key, Value} | proplists:delete(Key, List)]. + format_message_queue(_Opt, MQ) -> Len = priority_queue:len(MQ), {Len, diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index c714d3a7..7e9346f9 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -296,6 +296,11 @@ table_definitions() -> [{record_name, exchange_serial}, {attributes, record_info(fields, exchange_serial)}, {match, #exchange_serial{name = exchange_name_match(), _='_'}}]}, + {rabbit_runtime_parameters, + [{record_name, runtime_parameters}, + {attributes, record_info(fields, runtime_parameters)}, + {disc_copies, [node()]}, + {match, #runtime_parameters{_='_'}}]}, {rabbit_durable_queue, [{record_name, amqqueue}, {attributes, record_info(fields, amqqueue)}, diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index b6a9e263..1c23632d 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -57,8 +57,7 @@ diagnostics(Nodes) -> "hosts, their running nodes and ports:", [Nodes]}] ++ [diagnostics_host(Host) || Host <- Hosts] ++ diagnostics0(), - lists:flatten([io_lib:format(F ++ "~n", A) || NodeDiag <- NodeDiags, - {F, A} <- [NodeDiag]]). + rabbit_misc:format_many(lists:flatten(NodeDiags)). diagnostics0() -> [{"~ncurrent node details:~n- node name: ~w", [node()]}, diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 2a93c8f2..00880fb2 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -245,7 +245,7 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) -> {true, true} -> throw({error_string, "Cannot specify -m and -v together"}) end, - OnlyEnabled = proplists:get_bool(?ENABLED_OPT, Opts), + OnlyEnabled = proplists:get_bool(?ENABLED_OPT, Opts), OnlyEnabledAll = proplists:get_bool(?ENABLED_ALL_OPT, Opts), AvailablePlugins = find_plugins(PluginsDir), @@ -257,14 +257,10 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) -> Plugins = [ Plugin || Plugin = #plugin{name = Name} <- AvailablePlugins, re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match, - if OnlyEnabled -> lists:member(Name, EnabledExplicitly); - true -> true - end, - if OnlyEnabledAll -> - lists:member(Name, EnabledImplicitly) or - lists:member(Name, EnabledExplicitly); - true -> - true + if OnlyEnabled -> lists:member(Name, EnabledExplicitly); + OnlyEnabledAll -> (lists:member(Name, EnabledExplicitly) or + lists:member(Name, EnabledImplicitly)); + true -> true end], Plugins1 = usort_plugins(Plugins), MaxWidth = lists:max([length(atom_to_list(Name)) || @@ -338,8 +334,8 @@ read_enabled_plugins(PluginsFile) -> case rabbit_file:read_term_file(PluginsFile) of {ok, [Plugins]} -> Plugins; {ok, []} -> []; - {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file, - PluginsFile}}); + {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file, + PluginsFile}}); {error, enoent} -> []; {error, Reason} -> throw({error, {cannot_read_enabled_plugins_file, PluginsFile, Reason}}) diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index 8c0ebcbe..637835c3 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -23,7 +23,8 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([register/3, binary_to_type/1, lookup_module/2, lookup_all/1]). +-export([register/3, unregister/2, + binary_to_type/1, lookup_module/2, lookup_all/1]). -define(SERVER, ?MODULE). -define(ETS_NAME, ?MODULE). @@ -32,6 +33,7 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(register/3 :: (atom(), binary(), atom()) -> 'ok'). +-spec(unregister/2 :: (atom(), binary()) -> 'ok'). -spec(binary_to_type/1 :: (binary()) -> atom() | rabbit_types:error('not_found')). -spec(lookup_module/2 :: @@ -50,6 +52,9 @@ start_link() -> register(Class, TypeName, ModuleName) -> gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}, infinity). +unregister(Class, TypeName) -> + gen_server:call(?SERVER, {unregister, Class, TypeName}, infinity). + %% This is used with user-supplied arguments (e.g., on exchange %% declare), so we restrict it to existing atoms only. This means it %% can throw a badarg, indicating that the type cannot have been @@ -83,6 +88,10 @@ internal_register(Class, TypeName, ModuleName) {{Class, internal_binary_to_type(TypeName)}, ModuleName}), ok. +internal_unregister(Class, TypeName) -> + true = ets:delete(?ETS_NAME, {Class, internal_binary_to_type(TypeName)}), + ok. + sanity_check_module(ClassModule, Module) -> case catch lists:member(ClassModule, lists:flatten( @@ -95,8 +104,9 @@ sanity_check_module(ClassModule, Module) -> true -> ok end. -class_module(exchange) -> rabbit_exchange_type; -class_module(auth_mechanism) -> rabbit_auth_mechanism. +class_module(exchange) -> rabbit_exchange_type; +class_module(auth_mechanism) -> rabbit_auth_mechanism; +class_module(runtime_parameter) -> rabbit_runtime_parameter. %%--------------------------------------------------------------------------- @@ -108,6 +118,10 @@ handle_call({register, Class, TypeName, ModuleName}, _From, State) -> ok = internal_register(Class, TypeName, ModuleName), {reply, ok, State}; +handle_call({unregister, Class, TypeName}, _From, State) -> + ok = internal_unregister(Class, TypeName), + {reply, ok, State}; + handle_call(Request, _From, State) -> {stop, {unhandled_call, Request}, State}. diff --git a/src/rabbit_runtime_parameter.erl b/src/rabbit_runtime_parameter.erl new file mode 100644 index 00000000..c7d30116 --- /dev/null +++ b/src/rabbit_runtime_parameter.erl @@ -0,0 +1,43 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_runtime_parameter). + +-ifdef(use_specs). + +-type(validate_results() :: + 'ok' | {error, string(), [term()]} | [validate_results()]). + +-callback validate(binary(), binary(), term()) -> validate_results(). +-callback validate_clear(binary(), binary()) -> validate_results(). +-callback notify(binary(), binary(), term()) -> 'ok'. +-callback notify_clear(binary(), binary()) -> 'ok'. + +-else. + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [ + {validate, 3}, + {validate_clear, 2}, + {notify, 3}, + {notify_clear, 2} + ]; +behaviour_info(_Other) -> + undefined. + +-endif. diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl new file mode 100644 index 00000000..172cee92 --- /dev/null +++ b/src/rabbit_runtime_parameters.erl @@ -0,0 +1,239 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_runtime_parameters). + +-include("rabbit.hrl"). + +-export([parse_set/3, set/3, clear/2, list/0, list/1, list_formatted/0, + lookup/2, value/2, value/3, info_keys/0]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(ok_or_error_string() :: 'ok' | {'error_string', string()}). + +-spec(parse_set/3 :: (binary(), binary(), string()) -> ok_or_error_string()). +-spec(set/3 :: (binary(), binary(), term()) -> ok_or_error_string()). +-spec(clear/2 :: (binary(), binary()) -> ok_or_error_string()). +-spec(list/0 :: () -> [rabbit_types:infos()]). +-spec(list/1 :: (binary()) -> [rabbit_types:infos()] | 'not_found'). +-spec(list_formatted/0 :: () -> [rabbit_types:infos()]). +-spec(lookup/2 :: (binary(), binary()) -> rabbit_types:infos()). +-spec(value/2 :: (binary(), binary()) -> term()). +-spec(value/3 :: (binary(), binary(), term()) -> term()). +-spec(info_keys/0 :: () -> rabbit_types:info_keys()). + +-endif. + +%%--------------------------------------------------------------------------- + +-import(rabbit_misc, [pget/2, pset/3]). + +-define(TABLE, rabbit_runtime_parameters). + +%%--------------------------------------------------------------------------- + +parse_set(Component, Key, String) -> + case parse(String) of + {ok, Term} -> set(Component, Key, Term); + {errors, L} -> format_error(L) + end. + +set(Component, Key, Term) -> + case set0(Component, Key, Term) of + ok -> ok; + {errors, L} -> format_error(L) + end. + +format_error(L) -> + {error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}. + +set0(Component, Key, Term) -> + case lookup_component(Component) of + {ok, Mod} -> + case flatten_errors(validate(Term)) of + ok -> + case flatten_errors(Mod:validate(Component, Key, Term)) of + ok -> + case mnesia_update(Component, Key, Term) of + {old, Term} -> ok; + _ -> Mod:notify(Component, Key, Term) + end, + ok; + E -> + E + end; + E -> + E + end; + E -> + E + end. + +mnesia_update(Component, Key, Term) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + Res = case mnesia:read(?TABLE, {Component, Key}, read) of + [] -> new; + [Params] -> {old, Params#runtime_parameters.value} + end, + ok = mnesia:write(?TABLE, c(Component, Key, Term), write), + Res + end). + +clear(Component, Key) -> + case clear0(Component, Key) of + ok -> ok; + {errors, L} -> format_error(L) + end. + +clear0(Component, Key) -> + case lookup_component(Component) of + {ok, Mod} -> case flatten_errors(Mod:validate_clear(Component, Key)) of + ok -> mnesia_clear(Component, Key), + Mod:notify_clear(Component, Key), + ok; + E -> E + end; + E -> E + end. + +mnesia_clear(Component, Key) -> + ok = rabbit_misc:execute_mnesia_transaction( + fun () -> + ok = mnesia:delete(?TABLE, {Component, Key}, write) + end). + +list() -> + [p(P) || P <- rabbit_misc:dirty_read_all(?TABLE)]. + +list(Component) -> + case lookup_component(Component) of + {ok, _} -> Match = #runtime_parameters{key = {Component, '_'}, _ = '_'}, + [p(P) || P <- mnesia:dirty_match_object(?TABLE, Match)]; + _ -> not_found + end. + +list_formatted() -> + [pset(value, format(pget(value, P)), P) || P <- list()]. + +lookup(Component, Key) -> + case lookup0(Component, Key, rabbit_misc:const(not_found)) of + not_found -> not_found; + Params -> p(Params) + end. + +value(Component, Key) -> + case lookup0(Component, Key, rabbit_misc:const(not_found)) of + not_found -> not_found; + Params -> Params#runtime_parameters.value + end. + +value(Component, Key, Default) -> + Params = lookup0(Component, Key, + fun () -> lookup_missing(Component, Key, Default) end), + Params#runtime_parameters.value. + +lookup0(Component, Key, DefaultFun) -> + case mnesia:dirty_read(?TABLE, {Component, Key}) of + [] -> DefaultFun(); + [R] -> R + end. + +lookup_missing(Component, Key, Default) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read(?TABLE, {Component, Key}, read) of + [] -> Record = c(Component, Key, Default), + mnesia:write(?TABLE, Record, write), + Record; + [R] -> R + end + end). + +c(Component, Key, Default) -> #runtime_parameters{key = {Component, Key}, + value = Default}. + +p(#runtime_parameters{key = {Component, Key}, value = Value}) -> + [{component, Component}, + {key, Key}, + {value, Value}]. + +info_keys() -> [component, key, value]. + +%%--------------------------------------------------------------------------- + +lookup_component(Component) -> + case rabbit_registry:lookup_module( + runtime_parameter, list_to_atom(binary_to_list(Component))) of + {error, not_found} -> {errors, + [{"component ~s not found", [Component]}]}; + {ok, Module} -> {ok, Module} + end. + +parse(Src0) -> + Src1 = string:strip(Src0), + Src = case lists:reverse(Src1) of + [$. |_] -> Src1; + _ -> Src1 ++ "." + end, + case erl_scan:string(Src) of + {ok, Scanned, _} -> + case erl_parse:parse_term(Scanned) of + {ok, Parsed} -> + {ok, Parsed}; + {error, E} -> + {errors, + [{"Could not parse value: ~s", [format_parse_error(E)]}]} + end; + {error, E, _} -> + {errors, [{"Could not scan value: ~s", [format_parse_error(E)]}]} + end. + +format_parse_error({_Line, Mod, Err}) -> + lists:flatten(Mod:format_error(Err)). + +format(Term) -> + list_to_binary(rabbit_misc:format("~p", [Term])). + +%%--------------------------------------------------------------------------- + +%% We will want to be able to biject these to JSON. So we have some +%% generic restrictions on what we consider acceptable. +validate(Proplist = [T | _]) when is_tuple(T) -> validate_proplist(Proplist); +validate(L) when is_list(L) -> validate_list(L); +validate(T) when is_tuple(T) -> {error, "tuple: ~p", [T]}; +validate(B) when is_boolean(B) -> ok; +validate(null) -> ok; +validate(A) when is_atom(A) -> {error, "atom: ~p", [A]}; +validate(N) when is_number(N) -> ok; +validate(B) when is_binary(B) -> ok; +validate(B) when is_bitstring(B) -> {error, "bitstring: ~p", [B]}. + +validate_list(L) -> [validate(I) || I <- L]. +validate_proplist(L) -> [vp(I) || I <- L]. + +vp({K, V}) when is_binary(K) -> validate(V); +vp({K, _V}) -> {error, "bad key: ~p", [K]}; +vp(H) -> {error, "not two tuple: ~p", [H]}. + +flatten_errors(L) -> + case [{F, A} || I <- lists:flatten([L]), {error, F, A} <- [I]] of + [] -> ok; + E -> {errors, E} + end. diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl new file mode 100644 index 00000000..f23b3227 --- /dev/null +++ b/src/rabbit_runtime_parameters_test.erl @@ -0,0 +1,38 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_runtime_parameters_test). +-behaviour(rabbit_runtime_parameter). + +-export([validate/3, validate_clear/2, notify/3, notify_clear/2]). +-export([register/0, unregister/0]). + +register() -> + rabbit_registry:register(runtime_parameter, <<"test">>, ?MODULE). + +unregister() -> + rabbit_registry:unregister(runtime_parameter, <<"test">>). + +validate(<<"test">>, <<"good">>, _Term) -> ok; +validate(<<"test">>, <<"maybe">>, <<"good">>) -> ok; +validate(<<"test">>, _, _) -> {error, "meh", []}. + +validate_clear(<<"test">>, <<"good">>) -> ok; +validate_clear(<<"test">>, <<"maybe">>) -> ok; +validate_clear(<<"test">>, _) -> {error, "meh", []}. + +notify(_, _, _) -> ok. +notify_clear(_, _) -> ok. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 04ee6ef2..96b5fa38 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -53,6 +53,7 @@ all_tests() -> passed = test_option_parser(), passed = test_cluster_management(), passed = test_user_management(), + passed = test_runtime_parameters(), passed = test_server_status(), passed = test_confirms(), passed = maybe_run_cluster_dependent_tests(), @@ -1097,6 +1098,38 @@ test_user_management() -> passed. +test_runtime_parameters() -> + rabbit_runtime_parameters_test:register(), + Good = fun(L) -> ok = control_action(set_parameter, L) end, + Bad = fun(L) -> {error_string, _} = control_action(set_parameter, L) end, + + %% Acceptable for bijection + Good(["test", "good", "<<\"ignore\">>"]), + Good(["test", "good", "123"]), + Good(["test", "good", "true"]), + Good(["test", "good", "false"]), + Good(["test", "good", "null"]), + Good(["test", "good", "[{<<\"key\">>, <<\"value\">>}]"]), + + %% Various forms of fail due to non-bijectability + Bad(["test", "good", "atom"]), + Bad(["test", "good", "{tuple, foo}"]), + Bad(["test", "good", "[{<<\"key\">>, <<\"value\">>, 1}]"]), + Bad(["test", "good", "[{key, <<\"value\">>}]"]), + + %% Test actual validation hook + Good(["test", "maybe", "<<\"good\">>"]), + Bad(["test", "maybe", "<<\"bad\">>"]), + + ok = control_action(list_parameters, []), + + ok = control_action(clear_parameter, ["test", "good"]), + ok = control_action(clear_parameter, ["test", "maybe"]), + {error_string, _} = + control_action(clear_parameter, ["test", "neverexisted"]), + rabbit_runtime_parameters_test:unregister(), + passed. + test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 9f2535bd..485ccc5f 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -36,6 +36,7 @@ -rabbit_upgrade({exchange_scratch, mnesia, [trace_exchanges]}). -rabbit_upgrade({mirrored_supervisor, mnesia, []}). -rabbit_upgrade({topic_trie_node, mnesia, []}). +-rabbit_upgrade({runtime_parameters, mnesia, []}). %% ------------------------------------------------------------------- @@ -56,6 +57,7 @@ -spec(exchange_scratch/0 :: () -> 'ok'). -spec(mirrored_supervisor/0 :: () -> 'ok'). -spec(topic_trie_node/0 :: () -> 'ok'). +-spec(runtime_parameters/0 :: () -> 'ok'). -endif. @@ -185,6 +187,12 @@ topic_trie_node() -> {attributes, [trie_node, edge_count, binding_count]}, {type, ordered_set}]). +runtime_parameters() -> + create(rabbit_runtime_parameters, + [{record_name, runtime_parameters}, + {attributes, [key, value]}, + {disc_copies, [node()]}]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 209e5252..dafb3f2e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -323,7 +323,6 @@ -type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}). -type(seq_id() :: non_neg_integer()). --type(ack() :: seq_id()). -type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()}, ingress :: {timestamp(), non_neg_integer()}, @@ -335,6 +334,13 @@ count :: non_neg_integer(), end_seq_id :: non_neg_integer() }). +%% The compiler (rightfully) complains that ack() and state() are +%% unused. For this reason we duplicate a -spec from +%% rabbit_backing_queue with the only intent being to remove +%% warnings. The problem here is that we can't parameterise the BQ +%% behaviour by these two types as we would like to. We still leave +%% these here for documentation purposes. +-type(ack() :: seq_id()). -type(state() :: #vqstate { q1 :: ?QUEUE:?QUEUE(), q2 :: ?QUEUE:?QUEUE(), @@ -368,6 +374,8 @@ ack_out_counter :: non_neg_integer(), ack_in_counter :: non_neg_integer(), ack_rates :: rates() }). +%% Duplicated from rabbit_backing_queue +-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). -spec(multiple_routing_keys/0 :: () -> 'ok'). diff --git a/src/supervisor2.erl b/src/supervisor2.erl index f1b74878..3d3623d7 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -411,6 +411,8 @@ handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) -> #child{mfa = {M, F, A}} = hd(State#state.children), Args = A ++ EArgs, case do_start_child_i(M, F, Args) of + {ok, undefined} -> + {reply, {ok, undefined}, State}; {ok, Pid} -> NState = State#state{dynamics = ?DICT:store(Pid, Args, State#state.dynamics)}, @@ -743,6 +745,8 @@ restart(Strategy, Child, State, Restart) #child{mfa = {M, F, A}} = Child, Dynamics = ?DICT:erase(Child#child.pid, State#state.dynamics), case do_start_child_i(M, F, A) of + {ok, undefined} -> + {ok, State}; {ok, Pid} -> NState = State#state{dynamics = ?DICT:store(Pid, A, Dynamics)}, {ok, NState}; |