diff options
author | Emile Joubert <emile@rabbitmq.com> | 2012-10-05 18:20:10 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2012-10-05 18:20:10 +0100 |
commit | 4e8e0fa18061a09a074e7975fd39a4003cb9b8ca (patch) | |
tree | e9d7beb602f18457b6b1c82591544c38fa235cae | |
parent | 2754b587b3f913b23fbc5bb34880a6b41c30849d (diff) | |
download | rabbitmq-server-4e8e0fa18061a09a074e7975fd39a4003cb9b8ca.tar.gz |
Updated policy validation
-rw-r--r-- | docs/rabbitmqctl.1.xml | 79 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 9 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 46 | ||||
-rw-r--r-- | src/rabbit_policy.erl | 42 | ||||
-rw-r--r-- | src/rabbit_policy_validator.erl | 4 | ||||
-rw-r--r-- | src/rabbit_runtime_parameters.erl | 39 | ||||
-rw-r--r-- | src/rabbit_runtime_parameters_test.erl | 35 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 38 |
8 files changed, 248 insertions, 44 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 73347cea..eea42484 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -637,7 +637,7 @@ </para> <para> Deleting a virtual host deletes all its exchanges, - queues, bindings, user permissions and parameters. + queues, bindings, user permissions, parameters and policies. </para> <para role="example-prefix">For example:</para> <screen role="example">rabbitmqctl delete_vhost test</screen> @@ -895,6 +895,83 @@ </refsect2> <refsect2> + <title>Policy Management</title> + <para> + Policies are used to control and modify the behaviour of queues + and exchanges on a cluster-wide basis. Policies apply within a + given vhost, and consist of a key and a value. The key must be a + string and the value must be a JSON string. Policies can be set, + cleared and listed. + </para> + <variablelist> + <varlistentry> + <term><cmdsynopsis><command>set_policy</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg> <arg choice="req"><replaceable>value</replaceable></arg></cmdsynopsis></term> + <listitem> + <para> + Sets a policy. + </para> + <variablelist> + <varlistentry> + <term>key</term> + <listitem><para> + The name of the policy. + </para></listitem> + </varlistentry> + <varlistentry> + <term>value</term> + <listitem><para> + The definition of the policy, as a + JSON string. In most shells you are very likely to + need to quote this. + </para></listitem> + </varlistentry> + </variablelist> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl set_policy federate-me '{"pattern":"^amq.",\ + "policy":{"federation-upstream-set":"all"}}'</screen> + <para role="example"> + This command sets the policy <command>federate-me</command> in the default virtual host so that built-in exchanges are federated. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>clear_policy</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg></cmdsynopsis></term> + <listitem> + <para> + Clears a policy. + </para> + <variablelist> + <varlistentry> + <term>key</term> + <listitem><para> + The name of the policy being cleared. + </para></listitem> + </varlistentry> + </variablelist> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl clear_policy federate-me</screen> + <para role="example"> + This command clears the <command>federate-me</command> policy in the default virtual host. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>list_policies</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg></cmdsynopsis></term> + <listitem> + <para> + Lists all policies for a virtual host. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl list_policies</screen> + <para role="example"> + This command lists all policies in the default virtual host. + </para> + </listitem> + </varlistentry> + </variablelist> + </refsect2> + + <refsect2> <title>Server Status</title> <para> The server status queries interrogate the server and return a list of diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 1efde136..15fa1fd5 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -465,15 +465,14 @@ action(list_parameters, Node, [], Opts, Inform) -> action(set_policy, Node, [Key, Value], Opts, Inform) -> VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), Inform("Setting policy ~p to ~p", [Key, Value]), - rpc_call(Node, rabbit_runtime_parameters, parse_set, - [VHostArg, <<"policy">>, list_to_binary(Key), Value]); + rpc_call(Node, rabbit_runtime_parameters, parse_set_policy, + [VHostArg, list_to_binary(Key), Value]); action(clear_policy, Node, [Key], Opts, Inform) -> VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), Inform("Clearing policy ~p", [Key]), - rpc_call(Node, rabbit_runtime_parameters, clear, [VHostArg, - <<"policy">>, - list_to_binary(Key)]); + rpc_call(Node, rabbit_runtime_parameters, clear_policy, + [VHostArg, list_to_binary(Key)]); action(list_policies, Node, [], Opts, Inform) -> VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 453f2f2c..f02308a1 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -15,16 +15,26 @@ %% -module(rabbit_mirror_queue_misc). +-behaviour(rabbit_policy_validator). -export([remove_from_queue/2, on_node_up/0, add_mirrors/2, add_mirror/2, report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1, - is_mirrored/1, update_mirrors/2]). + is_mirrored/1, update_mirrors/2, validate_policy/1]). %% for testing only -export([suggested_queue_nodes/4]). -include("rabbit.hrl"). +-rabbit_boot_step({?MODULE, + [{description, "HA policy validator hook"}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-mode">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-params">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, recovery}]}). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -320,3 +330,37 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, add_mirrors(QName, NewNodes -- OldNodes), drop_mirrors(QName, OldNodes -- NewNodes), ok. + +%%---------------------------------------------------------------------------- + +validate_policy(TagList) -> + Mode = proplists:get_all_values(<<"ha-mode">>, TagList), + Params = proplists:get_all_values(<<"ha-params">>, TagList), + case Mode of + [<<"all">>] -> + ok; + [<<"nodes">>] -> + validate_params(fun erlang:is_binary/1, lists:append(Params), + "~p has invalid node names when ha-mode=nodes"); + [<<"exactly">>] -> + case Params of + [_] -> validate_params( + fun (N) -> is_integer(N) andalso N >= 0 end, + Params, "~p must be a positive integer"); + X -> {error, "ha-params must be supplied with one number " + "when ha-mode=exactly. found ~p arguments", + [length(X)]} + end; + [_, _|_] -> + {error, "ha-mode may appear once at most", []}; + [Other] -> + {error, "~p is not a valid ha-mode value", [Other]} + end. + +validate_params(FilterFun, Params, Msg) when is_list(Params) -> + case lists:filter(fun (P) -> not FilterFun(P) end, Params) of + [] -> ok; + X -> {error, Msg, [X]} + end; +validate_params(_, Params, _) -> + {error, "~p was expected to be a list", [Params]}. diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index c9af46e9..5540de83 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -139,21 +139,35 @@ policy_validation() -> {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory}, {<<"policy">>, fun validation/2, mandatory}]. +validation(_Name, []) -> + ok; validation(_Name, Terms) when is_list(Terms) -> - [validation0(T) || T <- Terms ]; -validation(Name, Term) -> - {error, "~s should be list, actually was ~p", [Name, Term]}. - -validation0({Key, Value}) when is_binary(Key) -> - case rabbit_registry:lookup_module(policy_validator, - list_to_atom(binary_to_list(Key))) of - {ok, Mod} -> - Mod:validate_policy(Key, Value); - {error, not_found} -> - {error, "~p is not a recognised policy option", [Key]}; - Error -> - Error + {Tags, Modules} = lists:unzip( + rabbit_registry:lookup_all(policy_validator)), + case lists:usort(Tags -- lists:usort(Tags)) of + [] -> ok; + Dup -> rabbit_log:warning("Duplicate policy validators: ~p~n", [Dup]) + end, + Validators = lists:zipwith(fun (M, T) -> {M, a2b(T)} end, Modules, Tags), + case lists:foldl( + fun (_, {Error, _} = Acc) when Error /= ok -> + Acc; + (Mod, {ok, TermsLeft}) -> + ModTags = proplists:get_all_values(Mod, Validators), + case [T || {Tag, _} = T <- TermsLeft, + lists:member(Tag, ModTags)] of + [] -> {ok, TermsLeft}; + Scope -> {Mod:validate_policy(Scope), TermsLeft -- Scope} + end + end, {ok, Terms}, proplists:get_keys(Validators)) of + {ok, []} -> + ok; + {ok, Unvalidated} -> + {error, "~p are not recognised policy settings", Unvalidated}; + {Error, _} -> + Error end; -validation0(Term) -> +validation(_Name, Term) -> {error, "parse error while reading policy: ~p", [Term]}. +a2b(A) -> list_to_binary(atom_to_list(A)). diff --git a/src/rabbit_policy_validator.erl b/src/rabbit_policy_validator.erl index 3cc02ecc..624c3d54 100644 --- a/src/rabbit_policy_validator.erl +++ b/src/rabbit_policy_validator.erl @@ -21,7 +21,7 @@ -type(validate_results() :: 'ok' | {error, string(), [term()]} | [validate_results()]). --callback validate_policy(binary(), term()) -> validate_results(). +-callback validate_policy([{binary(), term()}]) -> validate_results(). -else. @@ -29,7 +29,7 @@ behaviour_info(callbacks) -> [ - {validate_policy, 2}, + {validate_policy, 1}, ]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index c4608b42..70428e96 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -18,9 +18,10 @@ -include("rabbit.hrl"). --export([parse_set/4, set/4, clear/3, - list/0, list/1, list_strict/1, list/2, list_strict/2, list_formatted/1, - list_formatted_policies/1, lookup/3, value/3, value/4, info_keys/0]). +-export([parse_set/4, parse_set_policy/3, set/4, set_policy/3, clear/3, + clear_policy/2, list/0, list/1, list_strict/1, list/2, list_strict/2, + list_formatted/1, list_formatted_policies/1, lookup/3, value/3, + value/4, info_keys/0]). %%---------------------------------------------------------------------------- @@ -30,10 +31,16 @@ -spec(parse_set/4 :: (rabbit_types:vhost(), binary(), binary(), string()) -> ok_or_error_string()). +-spec(parse_set_policy/3 :: (rabbit_types:vhost(), binary(), string()) + -> ok_or_error_string()). -spec(set/4 :: (rabbit_types:vhost(), binary(), binary(), term()) -> ok_or_error_string()). +-spec(set_policy/3 :: (rabbit_types:vhost(), binary(), term()) + -> ok_or_error_string()). -spec(clear/3 :: (rabbit_types:vhost(), binary(), binary()) -> ok_or_error_string()). +-spec(clear_policy/2 :: (rabbit_types:vhost(), binary()) + -> ok_or_error_string()). -spec(list/0 :: () -> [rabbit_types:infos()]). -spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(list_strict/1 :: (binary()) -> [rabbit_types:infos()] | 'not_found'). @@ -59,18 +66,36 @@ %%--------------------------------------------------------------------------- +parse_set(_, <<"policy">>, _, _) -> + {error_string, "policies may not be set using this method"}; parse_set(VHost, Component, Key, String) -> case rabbit_misc:json_decode(String) of {ok, JSON} -> set(VHost, Component, Key, rabbit_misc:json_to_term(JSON)); error -> {error_string, "JSON decoding error"} end. +parse_set_policy(VHost, Key, String) -> + case rabbit_misc:json_decode(String) of + {ok, JSON} -> + set_policy(VHost, Key, rabbit_misc:json_to_term(JSON)); + error -> + {error_string, "JSON decoding error"} + end. + +set(_, <<"policy">>, _, _) -> + {error_string, "policies may not be set using this method"}; set(VHost, Component, Key, Term) -> case set0(VHost, Component, Key, Term) of ok -> ok; {errors, L} -> format_error(L) end. +set_policy(VHost, Key, Term) -> + case set0(VHost, <<"policy">>, Key, Term) of + ok -> ok; + {errors, L} -> format_error(L) + end. + format_error(L) -> {error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}. @@ -102,12 +127,20 @@ mnesia_update(VHost, Component, Key, Term) -> Res end). +clear(_, <<"policy">> , _) -> + {error_string, "policies may not be cleared using this method"}; clear(VHost, Component, Key) -> case clear0(VHost, Component, Key) of ok -> ok; {errors, L} -> format_error(L) end. +clear_policy(VHost, Key) -> + case clear0(VHost, <<"policy">>, Key) of + ok -> ok; + {errors, L} -> format_error(L) + end. + clear0(VHost, Component, Key) -> case lookup_component(Component) of {ok, Mod} -> case flatten_errors( diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl index 4ac19ff1..b39a98bc 100644 --- a/src/rabbit_runtime_parameters_test.erl +++ b/src/rabbit_runtime_parameters_test.erl @@ -20,7 +20,7 @@ -export([validate/4, validate_clear/3, notify/4, notify_clear/3]). -export([register/0, unregister/0]). --export([validate_policy/2]). +-export([validate_policy/1]). -export([register_policy_validator/0, unregister_policy_validator/0]). %---------------------------------------------------------------------------- @@ -45,16 +45,39 @@ notify_clear(_, _, _) -> ok. %---------------------------------------------------------------------------- register_policy_validator() -> - rabbit_registry:register(policy_validator, <<"testpolicy">>, ?MODULE). + rabbit_registry:register(policy_validator, <<"testeven">>, ?MODULE), + rabbit_registry:register(policy_validator, <<"testpos">>, ?MODULE). unregister_policy_validator() -> - rabbit_registry:unregister(policy_validator, <<"testpolicy">>). + rabbit_registry:unregister(policy_validator, <<"testeven">>), + rabbit_registry:unregister(policy_validator, <<"testpos">>). -validate_policy(<<"testpolicy">>, Terms) when is_list(Terms) -> - rabbit_log:info("pol val ~p~n", [Terms]), +validate_policy([{<<"testeven">>, Terms}]) when is_list(Terms) -> case length(Terms) rem 2 =:= 0 of true -> ok; false -> {error, "meh", []} end; -validate_policy(<<"testpolicy">>, _) -> + +validate_policy([{<<"testpos">>, Terms}]) when is_list(Terms) -> + case lists:all(fun (N) -> is_integer(N) andalso N > 0 end, Terms) of + true -> ok; + false -> {error, "meh", []} + end; + +validate_policy([{Tag1, Arg1}, {Tag2, Arg2}]) + when is_list(Arg1), is_list(Arg2) -> + case [Tag1, Tag2] -- [<<"testpos">>, <<"testeven">>] of + [] -> + case {lists:all(fun (N) -> + is_integer(N) andalso + N > 0 + end, Arg1 ++ Arg2), + length(Arg1) rem 2, length(Arg2) rem 2} of + {true, 0, 0} -> ok; + _ -> {error, "meh", []} + end; + _ -> {error, "meh", []} + end; + +validate_policy(_) -> {error, "meh", []}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index a7eab2d5..9550a482 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1031,18 +1031,31 @@ test_runtime_parameters() -> test_policy_validation() -> rabbit_runtime_parameters_test:register_policy_validator(), - SetPol = fun (Pol, Val) -> - control_action( - set_policy, - ["name", lists:flatten( - io_lib:format("{\"pattern\":\"pat\", \"policy\":" - "{\"~s\":~p}}", [Pol, Val]))]) - end, - ok = SetPol("testpolicy", []), - ok = SetPol("testpolicy", [1, 2]), - ok = SetPol("testpolicy", [1, 2, 3, 4]), - {error_string, _} = SetPol("testpolicy", [1, 2, 3]), - {error_string, _} = SetPol("not_registered", []), + SetPol = + fun (TagValList) -> + Frag = lists:foldl( + fun ({Pol, Val}, Acc) -> + [rabbit_misc:format("\"~s\":~p", [Pol, Val]) | + Acc] + end, "", TagValList), + control_action( + set_policy, + ["name", rabbit_misc:format("{\"pattern\":\".*\", \"policy\":" + "{~s}}", [string:join(Frag, ",")])]) + end, + + ok = SetPol([{"testeven", []}]), + ok = SetPol([{"testeven", [1, 2]}]), + ok = SetPol([{"testeven", [1, 2, 3, 4]}]), + ok = SetPol([{"testpos", [2, 3, 5, 562]}]), + + {error_string, _} = SetPol([{"testpos", [-1, 0, 1]}]), + {error_string, _} = SetPol([{"testeven", [ 1, 2, 3]}]), + + ok = SetPol([{"testpos", [2, 16]}, {"testeven", [12, 24]}]), + {error_string, _} = SetPol([{"testpos", [2, 16, 32]}, {"testeven", [12, 24]}]), + {error_string, _} = SetPol([{"testpos", [2, 16]}, {"testeven", [12, -2]}]), + {error_string, _} = SetPol([{"not_registered", []}]), rabbit_runtime_parameters_test:unregister_policy_validator(). test_server_status() -> @@ -1481,6 +1494,7 @@ test_declare_on_dead_queue(SecondaryNode) -> %%--------------------------------------------------------------------- control_action(Command, Args) -> +rabbit_log:info("control args ~p~n", [Args]), control_action(Command, node(), Args, default_options()). control_action(Command, Args, NewOpts) -> |