diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-10-16 17:23:08 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-10-16 17:23:08 +0100 |
commit | 0b3978c4b05f6c99943efd66c243ead0c71ea3dd (patch) | |
tree | a02a1f7f55139114d260a8674e7f2ba1c61cc75d /src | |
parent | 5da4862faf958f1bfacc4b0a5f9326dac998afe0 (diff) | |
parent | 7a1127b12f62296ec1f40ecbe329041c24b5a9ea (diff) | |
download | rabbitmq-server-0b3978c4b05f6c99943efd66c243ead0c71ea3dd.tar.gz |
Merge default
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_control_main.erl | 26 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 49 | ||||
-rw-r--r-- | src/rabbit_policy.erl | 134 | ||||
-rw-r--r-- | src/rabbit_policy_validator.erl | 37 | ||||
-rw-r--r-- | src/rabbit_registry.erl | 3 | ||||
-rw-r--r-- | src/rabbit_runtime_parameters.erl | 44 | ||||
-rw-r--r-- | src/rabbit_runtime_parameters_test.erl | 30 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 21 | ||||
-rw-r--r-- | src/rabbit_vhost.erl | 2 |
9 files changed, 321 insertions, 25 deletions
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index a3cbf6e5..424b9808 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -70,6 +70,10 @@ {clear_parameter, [?VHOST_DEF]}, {list_parameters, [?VHOST_DEF]}, + {set_policy, [?VHOST_DEF]}, + {clear_policy, [?VHOST_DEF]}, + {list_policies, [?VHOST_DEF]}, + {list_queues, [?VHOST_DEF]}, {list_exchanges, [?VHOST_DEF]}, {list_bindings, [?VHOST_DEF]}, @@ -458,6 +462,28 @@ action(list_parameters, Node, [], Opts, Inform) -> rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg]), rabbit_runtime_parameters:info_keys()); +action(set_policy, Node, [Key, Pattern, Defn | Prio], Opts, Inform) + when Prio == [] orelse length(Prio) == 1 -> + Msg = "Setting policy ~p for pattern ~p to ~p", + {InformMsg, Prio1} = case Prio of [] -> {Msg, undefined}; + [P] -> {Msg ++ " with priority ~s", P} + end, + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform(InformMsg, [Key, Pattern, Defn] ++ Prio), + rpc_call(Node, rabbit_policy, parse_set, + [VHostArg, list_to_binary(Key), Pattern, Defn, Prio1]); + +action(clear_policy, Node, [Key], Opts, Inform) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Clearing policy ~p", [Key]), + rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]); + +action(list_policies, Node, [], Opts, Inform) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Listing policies", []), + display_info_list(rpc_call(Node, rabbit_policy, list_formatted, [VHostArg]), + rabbit_policy:info_keys()); + action(report, Node, _Args, _Opts, Inform) -> Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]), [begin ok = action(Action, N, [], [], Inform), io:nl() end || diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 4c8406d9..1b487534 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -15,16 +15,26 @@ %% -module(rabbit_mirror_queue_misc). +-behaviour(rabbit_policy_validator). -export([remove_from_queue/3, on_node_up/0, add_mirrors/2, add_mirror/2, report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1, - is_mirrored/1, update_mirrors/2]). + is_mirrored/1, update_mirrors/2, validate_policy/1]). %% for testing only -export([suggested_queue_nodes/4]). -include("rabbit.hrl"). +-rabbit_boot_step({?MODULE, + [{description, "HA policy validation"}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-mode">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-params">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, recovery}]}). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -328,3 +338,40 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, add_mirrors(QName, NewNodes -- OldNodes), drop_mirrors(QName, OldNodes -- NewNodes), ok. + +%%---------------------------------------------------------------------------- + +validate_policy(KeyList) -> + validate_policy( + proplists:get_value(<<"ha-mode">>, KeyList), + proplists:get_value(<<"ha-params">>, KeyList)). + +validate_policy(<<"all">>, _Params) -> + ok; +validate_policy(<<"nodes">>, Params) -> + validate_params(lists:append(Params), + fun erlang:is_binary/1, + "~p has invalid node names when ha-mode=nodes", + fun (N) -> N > 0 end, + "at least one node expected when ha-mode=nodes"); +validate_policy(<<"exactly">>, Params) -> + validate_params(Params, + fun (N) -> is_integer(N) andalso N > 0 end, + "~p must be a positive integer", + fun (N) -> N == 1 end, + "ha-params must be supplied with one number " + "when ha-mode=exactly"); +validate_policy(Mode, _Params) -> + {error, "~p is not a valid ha-mode value", [Mode]}. + +validate_params(Params, FilterPred, FilterMsg, SizePred, SizeMsg) + when is_list(Params) -> + case SizePred(length(Params)) of + true -> case lists:filter(fun (P) -> not FilterPred(P) end, Params) of + [] -> ok; + X -> {error, FilterMsg, [X]} + end; + false -> {error, SizeMsg, []} + end; +validate_params(Params, _, _, _, _) -> + {error, "~p was expected to be a list", [Params]}. diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index f4c1f42b..4dcce4e0 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -22,11 +22,13 @@ -include("rabbit.hrl"). --import(rabbit_misc, [pget/2, pget/3]). +-import(rabbit_misc, [pget/2]). -export([register/0]). -export([name/1, get/2, set/1]). -export([validate/4, validate_clear/3, notify/4, notify_clear/3]). +-export([parse_set/5, set/5, delete/2, lookup/2, list/0, list/1, + list_formatted/1, info_keys/0]). -rabbit_boot_step({?MODULE, [{description, "policy parameters"}, @@ -55,7 +57,7 @@ get(Name, EntityName = #resource{virtual_host = VHost}) -> get0(Name, match(EntityName, list(VHost))). get0(_Name, undefined) -> {error, not_found}; -get0(Name, List) -> case pget(<<"policy">>, List) of +get0(Name, List) -> case pget(definition, List) of undefined -> {error, not_found}; Policy -> case pget(Name, Policy) of undefined -> {error, not_found}; @@ -65,6 +67,81 @@ get0(Name, List) -> case pget(<<"policy">>, List) of %%---------------------------------------------------------------------------- +parse_set(VHost, Key, Pattern, Definition, undefined) -> + parse_set0(VHost, Key, Pattern, Definition, 0); +parse_set(VHost, Key, Pattern, Definition, Priority) -> + try list_to_integer(Priority) of + Num -> parse_set0(VHost, Key, Pattern, Definition, Num) + catch + error:badarg -> {error, "~p priority must be a number", [Priority]} + end. + +parse_set0(VHost, Key, Pattern, Defn, Priority) -> + case rabbit_misc:json_decode(Defn) of + {ok, JSON} -> + set0(VHost, Key, + [{<<"pattern">>, list_to_binary(Pattern)}, + {<<"definition">>, rabbit_misc:json_to_term(JSON)}, + {<<"priority">>, Priority}]); + error -> + {error_string, "JSON decoding error"} + end. + +set(VHost, Key, Pattern, Definition, Priority) -> + PolicyProps = [{<<"pattern">>, Pattern}, + {<<"definition">>, Definition}, + {<<"priority">>, case Priority of + undefined -> 0; + _ -> Priority + end}], + set0(VHost, Key, PolicyProps). + +set0(VHost, Key, Term) -> + rabbit_runtime_parameters:set_any(VHost, <<"policy">>, Key, Term). + +delete(VHost, Key) -> + rabbit_runtime_parameters:clear_any(VHost, <<"policy">>, Key). + +lookup(VHost, Key) -> + case rabbit_runtime_parameters:lookup(VHost, <<"policy">>, Key) of + not_found -> not_found; + P -> p(P, fun ident/1) + end. + +list() -> + list('_'). + +list(VHost) -> + list0(VHost, fun ident/1). + +list_formatted(VHost) -> + order_policies(list0(VHost, fun format/1)). + +list0(VHost, DefnFun) -> + [p(P, DefnFun) || P <- rabbit_runtime_parameters:list(VHost, <<"policy">>)]. + +order_policies(PropList) -> + lists:sort(fun (A, B) -> pget(priority, A) < pget(priority, B) end, + PropList). + +p(Parameter, DefnFun) -> + Value = pget(value, Parameter), + [{vhost, pget(vhost, Parameter)}, + {key, pget(key, Parameter)}, + {pattern, pget(<<"pattern">>, Value)}, + {definition, DefnFun(pget(<<"definition">>, Value))}, + {priority, pget(<<"priority">>, Value)}]. + +format(Term) -> + {ok, JSON} = rabbit_misc:json_encode(rabbit_misc:term_to_json(Term)), + list_to_binary(JSON). + +ident(X) -> X. + +info_keys() -> [vhost, key, pattern, definition, priority]. + +%%---------------------------------------------------------------------------- + validate(_VHost, <<"policy">>, Name, Term) -> rabbit_parameter_validation:proplist( Name, policy_validation(), Term). @@ -80,10 +157,6 @@ notify_clear(VHost, <<"policy">>, _Name) -> %%---------------------------------------------------------------------------- -list(VHost) -> - [[{<<"name">>, pget(key, P)} | pget(value, P)] - || P <- rabbit_runtime_parameters:list(VHost, <<"policy">>)]. - update_policies(VHost) -> Policies = list(VHost), {Xs, Qs} = rabbit_misc:execute_mnesia_transaction( @@ -127,13 +200,52 @@ match(Name, Policies) -> end. matches(#resource{name = Name}, Policy) -> - match =:= re:run(Name, pget(<<"pattern">>, Policy), [{capture, none}]). + match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]). -sort_pred(A, B) -> pget(<<"priority">>, A, 0) >= pget(<<"priority">>, B, 0). +sort_pred(A, B) -> pget(priority, A) >= pget(priority, B). %%---------------------------------------------------------------------------- policy_validation() -> - [{<<"priority">>, fun rabbit_parameter_validation:number/2, optional}, - {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory}, - {<<"policy">>, fun rabbit_parameter_validation:list/2, mandatory}]. + [{<<"priority">>, fun rabbit_parameter_validation:number/2, mandatory}, + {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory}, + {<<"definition">>, fun validation/2, mandatory}]. + +validation(_Name, []) -> + {error, "no policy provided", []}; +validation(_Name, Terms) when is_list(Terms) -> + {Keys, Modules} = lists:unzip( + rabbit_registry:lookup_all(policy_validator)), + [] = dups(Keys), %% ASSERTION + Validators = lists:zipwith(fun (M, K) -> {M, a2b(K)} end, Modules, Keys), + {TermKeys, _} = lists:unzip(Terms), + case dups(TermKeys) of + [] -> validation0(Validators, Terms); + Dup -> {error, "~p duplicate keys not allowed", [Dup]} + end; +validation(_Name, Term) -> + {error, "parse error while reading policy: ~p", [Term]}. + +validation0(Validators, Terms) -> + case lists:foldl( + fun (Mod, {ok, TermsLeft}) -> + ModKeys = proplists:get_all_values(Mod, Validators), + case [T || {Key, _} = T <- TermsLeft, + lists:member(Key, ModKeys)] of + [] -> {ok, TermsLeft}; + Scope -> {Mod:validate_policy(Scope), TermsLeft -- Scope} + end; + (_, Acc) -> + Acc + end, {ok, Terms}, proplists:get_keys(Validators)) of + {ok, []} -> + ok; + {ok, Unvalidated} -> + {error, "~p are not recognised policy settings", [Unvalidated]}; + {Error, _} -> + Error + end. + +a2b(A) -> list_to_binary(atom_to_list(A)). + +dups(L) -> L -- lists:usort(L). diff --git a/src/rabbit_policy_validator.erl b/src/rabbit_policy_validator.erl new file mode 100644 index 00000000..624c3d54 --- /dev/null +++ b/src/rabbit_policy_validator.erl @@ -0,0 +1,37 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_policy_validator). + +-ifdef(use_specs). + +-type(validate_results() :: + 'ok' | {error, string(), [term()]} | [validate_results()]). + +-callback validate_policy([{binary(), term()}]) -> validate_results(). + +-else. + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [ + {validate_policy, 1}, + ]; +behaviour_info(_Other) -> + undefined. + +-endif. diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index e14bbba0..32709d24 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -107,7 +107,8 @@ sanity_check_module(ClassModule, Module) -> class_module(exchange) -> rabbit_exchange_type; class_module(auth_mechanism) -> rabbit_auth_mechanism; class_module(runtime_parameter) -> rabbit_runtime_parameter; -class_module(exchange_decorator) -> rabbit_exchange_decorator. +class_module(exchange_decorator) -> rabbit_exchange_decorator; +class_module(policy_validator) -> rabbit_policy_validator. %%--------------------------------------------------------------------------- diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index b58b459a..3ee93fa1 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -18,9 +18,9 @@ -include("rabbit.hrl"). --export([parse_set/4, set/4, clear/3, - list/0, list/1, list_strict/1, list/2, list_strict/2, list_formatted/1, - lookup/3, value/3, value/4, info_keys/0]). +-export([parse_set/4, set/4, set_any/4, clear/3, clear_any/3, list/0, list/1, + list_strict/1, list/2, list_strict/2, list_formatted/1, lookup/3, + value/3, value/4, info_keys/0]). %%---------------------------------------------------------------------------- @@ -32,8 +32,12 @@ -> ok_or_error_string()). -spec(set/4 :: (rabbit_types:vhost(), binary(), binary(), term()) -> ok_or_error_string()). +-spec(set_any/4 :: (rabbit_types:vhost(), binary(), binary(), term()) + -> ok_or_error_string()). -spec(clear/3 :: (rabbit_types:vhost(), binary(), binary()) -> ok_or_error_string()). +-spec(clear_any/3 :: (rabbit_types:vhost(), binary(), binary()) + -> ok_or_error_string()). -spec(list/0 :: () -> [rabbit_types:infos()]). -spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(list_strict/1 :: (binary()) -> [rabbit_types:infos()] | 'not_found'). @@ -57,22 +61,29 @@ %%--------------------------------------------------------------------------- +parse_set(_, <<"policy">>, _, _) -> + {error_string, "policies may not be set using this method"}; parse_set(VHost, Component, Key, String) -> case rabbit_misc:json_decode(String) of {ok, JSON} -> set(VHost, Component, Key, rabbit_misc:json_to_term(JSON)); error -> {error_string, "JSON decoding error"} end. +set(_, <<"policy">>, _, _) -> + {error_string, "policies may not be set using this method"}; set(VHost, Component, Key, Term) -> - case set0(VHost, Component, Key, Term) of - ok -> ok; - {errors, L} -> format_error(L) - end. + set_any(VHost, Component, Key, Term). format_error(L) -> {error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}. -set0(VHost, Component, Key, Term) -> +set_any(VHost, Component, Key, Term) -> + case set_any0(VHost, Component, Key, Term) of + ok -> ok; + {errors, L} -> format_error(L) + end. + +set_any0(VHost, Component, Key, Term) -> case lookup_component(Component) of {ok, Mod} -> case flatten_errors(Mod:validate(VHost, Component, Key, Term)) of @@ -100,13 +111,18 @@ mnesia_update(VHost, Component, Key, Term) -> Res end). +clear(_, <<"policy">> , _) -> + {error_string, "policies may not be cleared using this method"}; clear(VHost, Component, Key) -> - case clear0(VHost, Component, Key) of + clear_any(VHost, Component, Key). + +clear_any(VHost, Component, Key) -> + case clear_any0(VHost, Component, Key) of ok -> ok; {errors, L} -> format_error(L) end. -clear0(VHost, Component, Key) -> +clear_any0(VHost, Component, Key) -> case lookup_component(Component) of {ok, Mod} -> case flatten_errors( Mod:validate_clear(VHost, Component, Key)) of @@ -125,7 +141,8 @@ mnesia_clear(VHost, Component, Key) -> end). list() -> - [p(P) || P <- rabbit_misc:dirty_read_all(?TABLE)]. + [p(P) || #runtime_parameters{ key = {_VHost, Comp, _Key}} = P <- + rabbit_misc:dirty_read_all(?TABLE), Comp /= <<"policy">>]. list(VHost) -> list(VHost, '_', []). list_strict(Component) -> list('_', Component, not_found). @@ -136,7 +153,10 @@ list(VHost, Component, Default) -> case component_good(Component) of true -> Match = #runtime_parameters{key = {VHost, Component, '_'}, _ = '_'}, - [p(P) || P <- mnesia:dirty_match_object(?TABLE, Match)]; + [p(P) || #runtime_parameters{ key = {_VHost, Comp, _Key}} = P <- + mnesia:dirty_match_object(?TABLE, Match), + Comp =/= <<"policy">> orelse + Component =:= <<"policy">>]; _ -> Default end. diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl index 5224ccaa..d4d7271e 100644 --- a/src/rabbit_runtime_parameters_test.erl +++ b/src/rabbit_runtime_parameters_test.erl @@ -16,9 +16,14 @@ -module(rabbit_runtime_parameters_test). -behaviour(rabbit_runtime_parameter). +-behaviour(rabbit_policy_validator). -export([validate/4, validate_clear/3, notify/4, notify_clear/3]). -export([register/0, unregister/0]). +-export([validate_policy/1]). +-export([register_policy_validator/0, unregister_policy_validator/0]). + +%---------------------------------------------------------------------------- register() -> rabbit_registry:register(runtime_parameter, <<"test">>, ?MODULE). @@ -36,3 +41,28 @@ validate_clear(_, <<"test">>, _) -> {error, "meh", []}. notify(_, _, _, _) -> ok. notify_clear(_, _, _) -> ok. + +%---------------------------------------------------------------------------- + +register_policy_validator() -> + rabbit_registry:register(policy_validator, <<"testeven">>, ?MODULE), + rabbit_registry:register(policy_validator, <<"testpos">>, ?MODULE). + +unregister_policy_validator() -> + rabbit_registry:unregister(policy_validator, <<"testeven">>), + rabbit_registry:unregister(policy_validator, <<"testpos">>). + +validate_policy([{<<"testeven">>, Terms}]) when is_list(Terms) -> + case length(Terms) rem 2 =:= 0 of + true -> ok; + false -> {error, "meh", []} + end; + +validate_policy([{<<"testpos">>, Terms}]) when is_list(Terms) -> + case lists:all(fun (N) -> is_integer(N) andalso N > 0 end, Terms) of + true -> ok; + false -> {error, "meh", []} + end; + +validate_policy(_) -> + {error, "meh", []}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index aa48f228..f085c2d5 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -57,6 +57,7 @@ all_tests() -> passed = test_dynamic_mirroring(), passed = test_user_management(), passed = test_runtime_parameters(), + passed = test_policy_validation(), passed = test_server_status(), passed = test_confirms(), passed = @@ -1039,6 +1040,26 @@ test_runtime_parameters() -> rabbit_runtime_parameters_test:unregister(), passed. +test_policy_validation() -> + rabbit_runtime_parameters_test:register_policy_validator(), + SetPol = + fun (Key, Val) -> + control_action( + set_policy, + ["name", ".*", rabbit_misc:format("{\"~s\":~p}", [Key, Val])]) + end, + + ok = SetPol("testeven", []), + ok = SetPol("testeven", [1, 2]), + ok = SetPol("testeven", [1, 2, 3, 4]), + ok = SetPol("testpos", [2, 5, 5678]), + + {error_string, _} = SetPol("testpos", [-1, 0, 1]), + {error_string, _} = SetPol("testeven", [ 1, 2, 3]), + + rabbit_runtime_parameters_test:unregister_policy_validator(), + passed. + test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 03dfbe24..297fa56f 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -97,6 +97,8 @@ internal_delete(VHostPath) -> proplists:get_value(component, Info), proplists:get_value(key, Info)) || Info <- rabbit_runtime_parameters:list(VHostPath)], + [ok = rabbit_policy:delete(VHostPath, proplists:get_value(key, Info)) + || Info <- rabbit_policy:list(VHostPath)], ok = mnesia:delete({rabbit_vhost, VHostPath}), ok. |