summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-10-05 18:20:10 +0100
committerEmile Joubert <emile@rabbitmq.com>2012-10-05 18:20:10 +0100
commit4e8e0fa18061a09a074e7975fd39a4003cb9b8ca (patch)
treee9d7beb602f18457b6b1c82591544c38fa235cae
parent2754b587b3f913b23fbc5bb34880a6b41c30849d (diff)
downloadrabbitmq-server-4e8e0fa18061a09a074e7975fd39a4003cb9b8ca.tar.gz
Updated policy validation
-rw-r--r--docs/rabbitmqctl.1.xml79
-rw-r--r--src/rabbit_control_main.erl9
-rw-r--r--src/rabbit_mirror_queue_misc.erl46
-rw-r--r--src/rabbit_policy.erl42
-rw-r--r--src/rabbit_policy_validator.erl4
-rw-r--r--src/rabbit_runtime_parameters.erl39
-rw-r--r--src/rabbit_runtime_parameters_test.erl35
-rw-r--r--src/rabbit_tests.erl38
8 files changed, 248 insertions, 44 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 73347cea..eea42484 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -637,7 +637,7 @@
</para>
<para>
Deleting a virtual host deletes all its exchanges,
- queues, bindings, user permissions and parameters.
+ queues, bindings, user permissions, parameters and policies.
</para>
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmqctl delete_vhost test</screen>
@@ -895,6 +895,83 @@
</refsect2>
<refsect2>
+ <title>Policy Management</title>
+ <para>
+ Policies are used to control and modify the behaviour of queues
+ and exchanges on a cluster-wide basis. Policies apply within a
+ given vhost, and consist of a key and a value. The key must be a
+ string and the value must be a JSON string. Policies can be set,
+ cleared and listed.
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term><cmdsynopsis><command>set_policy</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg> <arg choice="req"><replaceable>value</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Sets a policy.
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term>key</term>
+ <listitem><para>
+ The name of the policy.
+ </para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>value</term>
+ <listitem><para>
+ The definition of the policy, as a
+ JSON string. In most shells you are very likely to
+ need to quote this.
+ </para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl set_policy federate-me '{"pattern":"^amq.",\
+ "policy":{"federation-upstream-set":"all"}}'</screen>
+ <para role="example">
+ This command sets the policy <command>federate-me</command> in the default virtual host so that built-in exchanges are federated.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>clear_policy</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Clears a policy.
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term>key</term>
+ <listitem><para>
+ The name of the policy being cleared.
+ </para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl clear_policy federate-me</screen>
+ <para role="example">
+ This command clears the <command>federate-me</command> policy in the default virtual host.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>list_policies</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Lists all policies for a virtual host.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl list_policies</screen>
+ <para role="example">
+ This command lists all policies in the default virtual host.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </refsect2>
+
+ <refsect2>
<title>Server Status</title>
<para>
The server status queries interrogate the server and return a list of
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 1efde136..15fa1fd5 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -465,15 +465,14 @@ action(list_parameters, Node, [], Opts, Inform) ->
action(set_policy, Node, [Key, Value], Opts, Inform) ->
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Setting policy ~p to ~p", [Key, Value]),
- rpc_call(Node, rabbit_runtime_parameters, parse_set,
- [VHostArg, <<"policy">>, list_to_binary(Key), Value]);
+ rpc_call(Node, rabbit_runtime_parameters, parse_set_policy,
+ [VHostArg, list_to_binary(Key), Value]);
action(clear_policy, Node, [Key], Opts, Inform) ->
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Clearing policy ~p", [Key]),
- rpc_call(Node, rabbit_runtime_parameters, clear, [VHostArg,
- <<"policy">>,
- list_to_binary(Key)]);
+ rpc_call(Node, rabbit_runtime_parameters, clear_policy,
+ [VHostArg, list_to_binary(Key)]);
action(list_policies, Node, [], Opts, Inform) ->
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 453f2f2c..f02308a1 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -15,16 +15,26 @@
%%
-module(rabbit_mirror_queue_misc).
+-behaviour(rabbit_policy_validator).
-export([remove_from_queue/2, on_node_up/0, add_mirrors/2, add_mirror/2,
report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1,
- is_mirrored/1, update_mirrors/2]).
+ is_mirrored/1, update_mirrors/2, validate_policy/1]).
%% for testing only
-export([suggested_queue_nodes/4]).
-include("rabbit.hrl").
+-rabbit_boot_step({?MODULE,
+ [{description, "HA policy validator hook"},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-mode">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-params">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -320,3 +330,37 @@ update_mirrors0(OldQ = #amqqueue{name = QName},
add_mirrors(QName, NewNodes -- OldNodes),
drop_mirrors(QName, OldNodes -- NewNodes),
ok.
+
+%%----------------------------------------------------------------------------
+
+validate_policy(TagList) ->
+ Mode = proplists:get_all_values(<<"ha-mode">>, TagList),
+ Params = proplists:get_all_values(<<"ha-params">>, TagList),
+ case Mode of
+ [<<"all">>] ->
+ ok;
+ [<<"nodes">>] ->
+ validate_params(fun erlang:is_binary/1, lists:append(Params),
+ "~p has invalid node names when ha-mode=nodes");
+ [<<"exactly">>] ->
+ case Params of
+ [_] -> validate_params(
+ fun (N) -> is_integer(N) andalso N >= 0 end,
+ Params, "~p must be a positive integer");
+ X -> {error, "ha-params must be supplied with one number "
+ "when ha-mode=exactly. found ~p arguments",
+ [length(X)]}
+ end;
+ [_, _|_] ->
+ {error, "ha-mode may appear once at most", []};
+ [Other] ->
+ {error, "~p is not a valid ha-mode value", [Other]}
+ end.
+
+validate_params(FilterFun, Params, Msg) when is_list(Params) ->
+ case lists:filter(fun (P) -> not FilterFun(P) end, Params) of
+ [] -> ok;
+ X -> {error, Msg, [X]}
+ end;
+validate_params(_, Params, _) ->
+ {error, "~p was expected to be a list", [Params]}.
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index c9af46e9..5540de83 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -139,21 +139,35 @@ policy_validation() ->
{<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory},
{<<"policy">>, fun validation/2, mandatory}].
+validation(_Name, []) ->
+ ok;
validation(_Name, Terms) when is_list(Terms) ->
- [validation0(T) || T <- Terms ];
-validation(Name, Term) ->
- {error, "~s should be list, actually was ~p", [Name, Term]}.
-
-validation0({Key, Value}) when is_binary(Key) ->
- case rabbit_registry:lookup_module(policy_validator,
- list_to_atom(binary_to_list(Key))) of
- {ok, Mod} ->
- Mod:validate_policy(Key, Value);
- {error, not_found} ->
- {error, "~p is not a recognised policy option", [Key]};
- Error ->
- Error
+ {Tags, Modules} = lists:unzip(
+ rabbit_registry:lookup_all(policy_validator)),
+ case lists:usort(Tags -- lists:usort(Tags)) of
+ [] -> ok;
+ Dup -> rabbit_log:warning("Duplicate policy validators: ~p~n", [Dup])
+ end,
+ Validators = lists:zipwith(fun (M, T) -> {M, a2b(T)} end, Modules, Tags),
+ case lists:foldl(
+ fun (_, {Error, _} = Acc) when Error /= ok ->
+ Acc;
+ (Mod, {ok, TermsLeft}) ->
+ ModTags = proplists:get_all_values(Mod, Validators),
+ case [T || {Tag, _} = T <- TermsLeft,
+ lists:member(Tag, ModTags)] of
+ [] -> {ok, TermsLeft};
+ Scope -> {Mod:validate_policy(Scope), TermsLeft -- Scope}
+ end
+ end, {ok, Terms}, proplists:get_keys(Validators)) of
+ {ok, []} ->
+ ok;
+ {ok, Unvalidated} ->
+ {error, "~p are not recognised policy settings", Unvalidated};
+ {Error, _} ->
+ Error
end;
-validation0(Term) ->
+validation(_Name, Term) ->
{error, "parse error while reading policy: ~p", [Term]}.
+a2b(A) -> list_to_binary(atom_to_list(A)).
diff --git a/src/rabbit_policy_validator.erl b/src/rabbit_policy_validator.erl
index 3cc02ecc..624c3d54 100644
--- a/src/rabbit_policy_validator.erl
+++ b/src/rabbit_policy_validator.erl
@@ -21,7 +21,7 @@
-type(validate_results() ::
'ok' | {error, string(), [term()]} | [validate_results()]).
--callback validate_policy(binary(), term()) -> validate_results().
+-callback validate_policy([{binary(), term()}]) -> validate_results().
-else.
@@ -29,7 +29,7 @@
behaviour_info(callbacks) ->
[
- {validate_policy, 2},
+ {validate_policy, 1},
];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index c4608b42..70428e96 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -18,9 +18,10 @@
-include("rabbit.hrl").
--export([parse_set/4, set/4, clear/3,
- list/0, list/1, list_strict/1, list/2, list_strict/2, list_formatted/1,
- list_formatted_policies/1, lookup/3, value/3, value/4, info_keys/0]).
+-export([parse_set/4, parse_set_policy/3, set/4, set_policy/3, clear/3,
+ clear_policy/2, list/0, list/1, list_strict/1, list/2, list_strict/2,
+ list_formatted/1, list_formatted_policies/1, lookup/3, value/3,
+ value/4, info_keys/0]).
%%----------------------------------------------------------------------------
@@ -30,10 +31,16 @@
-spec(parse_set/4 :: (rabbit_types:vhost(), binary(), binary(), string())
-> ok_or_error_string()).
+-spec(parse_set_policy/3 :: (rabbit_types:vhost(), binary(), string())
+ -> ok_or_error_string()).
-spec(set/4 :: (rabbit_types:vhost(), binary(), binary(), term())
-> ok_or_error_string()).
+-spec(set_policy/3 :: (rabbit_types:vhost(), binary(), term())
+ -> ok_or_error_string()).
-spec(clear/3 :: (rabbit_types:vhost(), binary(), binary())
-> ok_or_error_string()).
+-spec(clear_policy/2 :: (rabbit_types:vhost(), binary())
+ -> ok_or_error_string()).
-spec(list/0 :: () -> [rabbit_types:infos()]).
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(list_strict/1 :: (binary()) -> [rabbit_types:infos()] | 'not_found').
@@ -59,18 +66,36 @@
%%---------------------------------------------------------------------------
+parse_set(_, <<"policy">>, _, _) ->
+ {error_string, "policies may not be set using this method"};
parse_set(VHost, Component, Key, String) ->
case rabbit_misc:json_decode(String) of
{ok, JSON} -> set(VHost, Component, Key, rabbit_misc:json_to_term(JSON));
error -> {error_string, "JSON decoding error"}
end.
+parse_set_policy(VHost, Key, String) ->
+ case rabbit_misc:json_decode(String) of
+ {ok, JSON} ->
+ set_policy(VHost, Key, rabbit_misc:json_to_term(JSON));
+ error ->
+ {error_string, "JSON decoding error"}
+ end.
+
+set(_, <<"policy">>, _, _) ->
+ {error_string, "policies may not be set using this method"};
set(VHost, Component, Key, Term) ->
case set0(VHost, Component, Key, Term) of
ok -> ok;
{errors, L} -> format_error(L)
end.
+set_policy(VHost, Key, Term) ->
+ case set0(VHost, <<"policy">>, Key, Term) of
+ ok -> ok;
+ {errors, L} -> format_error(L)
+ end.
+
format_error(L) ->
{error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}.
@@ -102,12 +127,20 @@ mnesia_update(VHost, Component, Key, Term) ->
Res
end).
+clear(_, <<"policy">> , _) ->
+ {error_string, "policies may not be cleared using this method"};
clear(VHost, Component, Key) ->
case clear0(VHost, Component, Key) of
ok -> ok;
{errors, L} -> format_error(L)
end.
+clear_policy(VHost, Key) ->
+ case clear0(VHost, <<"policy">>, Key) of
+ ok -> ok;
+ {errors, L} -> format_error(L)
+ end.
+
clear0(VHost, Component, Key) ->
case lookup_component(Component) of
{ok, Mod} -> case flatten_errors(
diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl
index 4ac19ff1..b39a98bc 100644
--- a/src/rabbit_runtime_parameters_test.erl
+++ b/src/rabbit_runtime_parameters_test.erl
@@ -20,7 +20,7 @@
-export([validate/4, validate_clear/3, notify/4, notify_clear/3]).
-export([register/0, unregister/0]).
--export([validate_policy/2]).
+-export([validate_policy/1]).
-export([register_policy_validator/0, unregister_policy_validator/0]).
%----------------------------------------------------------------------------
@@ -45,16 +45,39 @@ notify_clear(_, _, _) -> ok.
%----------------------------------------------------------------------------
register_policy_validator() ->
- rabbit_registry:register(policy_validator, <<"testpolicy">>, ?MODULE).
+ rabbit_registry:register(policy_validator, <<"testeven">>, ?MODULE),
+ rabbit_registry:register(policy_validator, <<"testpos">>, ?MODULE).
unregister_policy_validator() ->
- rabbit_registry:unregister(policy_validator, <<"testpolicy">>).
+ rabbit_registry:unregister(policy_validator, <<"testeven">>),
+ rabbit_registry:unregister(policy_validator, <<"testpos">>).
-validate_policy(<<"testpolicy">>, Terms) when is_list(Terms) ->
- rabbit_log:info("pol val ~p~n", [Terms]),
+validate_policy([{<<"testeven">>, Terms}]) when is_list(Terms) ->
case length(Terms) rem 2 =:= 0 of
true -> ok;
false -> {error, "meh", []}
end;
-validate_policy(<<"testpolicy">>, _) ->
+
+validate_policy([{<<"testpos">>, Terms}]) when is_list(Terms) ->
+ case lists:all(fun (N) -> is_integer(N) andalso N > 0 end, Terms) of
+ true -> ok;
+ false -> {error, "meh", []}
+ end;
+
+validate_policy([{Tag1, Arg1}, {Tag2, Arg2}])
+ when is_list(Arg1), is_list(Arg2) ->
+ case [Tag1, Tag2] -- [<<"testpos">>, <<"testeven">>] of
+ [] ->
+ case {lists:all(fun (N) ->
+ is_integer(N) andalso
+ N > 0
+ end, Arg1 ++ Arg2),
+ length(Arg1) rem 2, length(Arg2) rem 2} of
+ {true, 0, 0} -> ok;
+ _ -> {error, "meh", []}
+ end;
+ _ -> {error, "meh", []}
+ end;
+
+validate_policy(_) ->
{error, "meh", []}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index a7eab2d5..9550a482 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1031,18 +1031,31 @@ test_runtime_parameters() ->
test_policy_validation() ->
rabbit_runtime_parameters_test:register_policy_validator(),
- SetPol = fun (Pol, Val) ->
- control_action(
- set_policy,
- ["name", lists:flatten(
- io_lib:format("{\"pattern\":\"pat\", \"policy\":"
- "{\"~s\":~p}}", [Pol, Val]))])
- end,
- ok = SetPol("testpolicy", []),
- ok = SetPol("testpolicy", [1, 2]),
- ok = SetPol("testpolicy", [1, 2, 3, 4]),
- {error_string, _} = SetPol("testpolicy", [1, 2, 3]),
- {error_string, _} = SetPol("not_registered", []),
+ SetPol =
+ fun (TagValList) ->
+ Frag = lists:foldl(
+ fun ({Pol, Val}, Acc) ->
+ [rabbit_misc:format("\"~s\":~p", [Pol, Val]) |
+ Acc]
+ end, "", TagValList),
+ control_action(
+ set_policy,
+ ["name", rabbit_misc:format("{\"pattern\":\".*\", \"policy\":"
+ "{~s}}", [string:join(Frag, ",")])])
+ end,
+
+ ok = SetPol([{"testeven", []}]),
+ ok = SetPol([{"testeven", [1, 2]}]),
+ ok = SetPol([{"testeven", [1, 2, 3, 4]}]),
+ ok = SetPol([{"testpos", [2, 3, 5, 562]}]),
+
+ {error_string, _} = SetPol([{"testpos", [-1, 0, 1]}]),
+ {error_string, _} = SetPol([{"testeven", [ 1, 2, 3]}]),
+
+ ok = SetPol([{"testpos", [2, 16]}, {"testeven", [12, 24]}]),
+ {error_string, _} = SetPol([{"testpos", [2, 16, 32]}, {"testeven", [12, 24]}]),
+ {error_string, _} = SetPol([{"testpos", [2, 16]}, {"testeven", [12, -2]}]),
+ {error_string, _} = SetPol([{"not_registered", []}]),
rabbit_runtime_parameters_test:unregister_policy_validator().
test_server_status() ->
@@ -1481,6 +1494,7 @@ test_declare_on_dead_queue(SecondaryNode) ->
%%---------------------------------------------------------------------
control_action(Command, Args) ->
+rabbit_log:info("control args ~p~n", [Args]),
control_action(Command, node(), Args, default_options()).
control_action(Command, Args, NewOpts) ->