summaryrefslogtreecommitdiff
path: root/src/rabbit_policy.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_policy.erl')
-rw-r--r--src/rabbit_policy.erl347
1 files changed, 0 insertions, 347 deletions
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
deleted file mode 100644
index f5d03360..00000000
--- a/src/rabbit_policy.erl
+++ /dev/null
@@ -1,347 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
-%%
-
--module(rabbit_policy).
-
-%% TODO specs
-
--behaviour(rabbit_runtime_parameter).
-
--include("rabbit.hrl").
-
--import(rabbit_misc, [pget/2]).
-
--export([register/0]).
--export([invalidate/0, recover/0]).
--export([name/1, get/2, get_arg/3, set/1]).
--export([validate/5, notify/4, notify_clear/3]).
--export([parse_set/6, set/6, delete/2, lookup/2, list/0, list/1,
- list_formatted/1, info_keys/0]).
-
--rabbit_boot_step({?MODULE,
- [{description, "policy parameters"},
- {mfa, {rabbit_policy, register, []}},
- {requires, rabbit_registry},
- {enables, recovery}]}).
-
-register() ->
- rabbit_registry:register(runtime_parameter, <<"policy">>, ?MODULE).
-
-name(#amqqueue{policy = Policy}) -> name0(Policy);
-name(#exchange{policy = Policy}) -> name0(Policy).
-
-name0(undefined) -> none;
-name0(Policy) -> pget(name, Policy).
-
-set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
-set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}.
-
-set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)).
-
-get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy);
-get(Name, #exchange{policy = Policy}) -> get0(Name, Policy);
-%% Caution - SLOW.
-get(Name, EntityName = #resource{virtual_host = VHost}) ->
- get0(Name, match(EntityName, list(VHost))).
-
-get0(_Name, undefined) -> undefined;
-get0(Name, List) -> case pget(definition, List) of
- undefined -> undefined;
- Policy -> pget(Name, Policy)
- end.
-
-%% Many heads for optimisation
-get_arg(_AName, _PName, #exchange{arguments = [], policy = undefined}) ->
- undefined;
-get_arg(_AName, PName, X = #exchange{arguments = []}) ->
- get(PName, X);
-get_arg(AName, PName, X = #exchange{arguments = Args}) ->
- case rabbit_misc:table_lookup(Args, AName) of
- undefined -> get(PName, X);
- {_Type, Arg} -> Arg
- end.
-
-%%----------------------------------------------------------------------------
-
-%% Gets called during upgrades - therefore must not assume anything about the
-%% state of Mnesia
-invalidate() ->
- rabbit_file:write_file(invalid_file(), <<"">>).
-
-recover() ->
- case rabbit_file:is_file(invalid_file()) of
- true -> recover0(),
- rabbit_file:delete(invalid_file());
- false -> ok
- end.
-
-%% To get here we have to have just completed an Mnesia upgrade - i.e. we are
-%% the first node starting. So we can rewrite the whole database. Note that
-%% recovery has not yet happened; we must work with the rabbit_durable_<thing>
-%% variants.
-recover0() ->
- Xs = mnesia:dirty_match_object(rabbit_durable_exchange, #exchange{_ = '_'}),
- Qs = mnesia:dirty_match_object(rabbit_durable_queue, #amqqueue{_ = '_'}),
- Policies = list(),
- [rabbit_misc:execute_mnesia_transaction(
- fun () ->
- mnesia:write(
- rabbit_durable_exchange,
- rabbit_exchange_decorator:set(
- X#exchange{policy = match(Name, Policies)}), write)
- end) || X = #exchange{name = Name} <- Xs],
- [rabbit_misc:execute_mnesia_transaction(
- fun () ->
- mnesia:write(
- rabbit_durable_queue,
- rabbit_queue_decorator:set(
- Q#amqqueue{policy = match(Name, Policies)}), write)
- end) || Q = #amqqueue{name = Name} <- Qs],
- ok.
-
-invalid_file() ->
- filename:join(rabbit_mnesia:dir(), "policies_are_invalid").
-
-%%----------------------------------------------------------------------------
-
-parse_set(VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
- try list_to_integer(Priority) of
- Num -> parse_set0(VHost, Name, Pattern, Definition, Num, ApplyTo)
- catch
- error:badarg -> {error, "~p priority must be a number", [Priority]}
- end.
-
-parse_set0(VHost, Name, Pattern, Defn, Priority, ApplyTo) ->
- case rabbit_misc:json_decode(Defn) of
- {ok, JSON} ->
- set0(VHost, Name,
- [{<<"pattern">>, list_to_binary(Pattern)},
- {<<"definition">>, rabbit_misc:json_to_term(JSON)},
- {<<"priority">>, Priority},
- {<<"apply-to">>, ApplyTo}]);
- error ->
- {error_string, "JSON decoding error"}
- end.
-
-set(VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
- PolicyProps = [{<<"pattern">>, Pattern},
- {<<"definition">>, Definition},
- {<<"priority">>, case Priority of
- undefined -> 0;
- _ -> Priority
- end},
- {<<"apply-to">>, case ApplyTo of
- undefined -> <<"all">>;
- _ -> ApplyTo
- end}],
- set0(VHost, Name, PolicyProps).
-
-set0(VHost, Name, Term) ->
- rabbit_runtime_parameters:set_any(VHost, <<"policy">>, Name, Term, none).
-
-delete(VHost, Name) ->
- rabbit_runtime_parameters:clear_any(VHost, <<"policy">>, Name).
-
-lookup(VHost, Name) ->
- case rabbit_runtime_parameters:lookup(VHost, <<"policy">>, Name) of
- not_found -> not_found;
- P -> p(P, fun ident/1)
- end.
-
-list() ->
- list('_').
-
-list(VHost) ->
- list0(VHost, fun ident/1).
-
-list_formatted(VHost) ->
- order_policies(list0(VHost, fun format/1)).
-
-list0(VHost, DefnFun) ->
- [p(P, DefnFun) || P <- rabbit_runtime_parameters:list(VHost, <<"policy">>)].
-
-order_policies(PropList) ->
- lists:sort(fun (A, B) -> pget(priority, A) < pget(priority, B) end,
- PropList).
-
-p(Parameter, DefnFun) ->
- Value = pget(value, Parameter),
- [{vhost, pget(vhost, Parameter)},
- {name, pget(name, Parameter)},
- {pattern, pget(<<"pattern">>, Value)},
- {'apply-to', pget(<<"apply-to">>, Value)},
- {definition, DefnFun(pget(<<"definition">>, Value))},
- {priority, pget(<<"priority">>, Value)}].
-
-format(Term) ->
- {ok, JSON} = rabbit_misc:json_encode(rabbit_misc:term_to_json(Term)),
- list_to_binary(JSON).
-
-ident(X) -> X.
-
-info_keys() -> [vhost, name, 'apply-to', pattern, definition, priority].
-
-%%----------------------------------------------------------------------------
-
-validate(_VHost, <<"policy">>, Name, Term, _User) ->
- rabbit_parameter_validation:proplist(
- Name, policy_validation(), Term).
-
-notify(VHost, <<"policy">>, Name, Term) ->
- rabbit_event:notify(policy_set, [{name, Name} | Term]),
- update_policies(VHost).
-
-notify_clear(VHost, <<"policy">>, Name) ->
- rabbit_event:notify(policy_cleared, [{name, Name}]),
- update_policies(VHost).
-
-%%----------------------------------------------------------------------------
-
-%% [1] We need to prevent this from becoming O(n^2) in a similar
-%% manner to rabbit_binding:remove_for_{source,destination}. So see
-%% the comment in rabbit_binding:lock_route_tables/0 for more rationale.
-%% [2] We could be here in a post-tx fun after the vhost has been
-%% deleted; in which case it's fine to do nothing.
-update_policies(VHost) ->
- Tabs = [rabbit_queue, rabbit_durable_queue,
- rabbit_exchange, rabbit_durable_exchange],
- {Xs, Qs} = rabbit_misc:execute_mnesia_transaction(
- fun() ->
- [mnesia:lock({table, T}, write) || T <- Tabs], %% [1]
- case catch list(VHost) of
- {error, {no_such_vhost, _}} ->
- ok; %% [2]
- Policies ->
- {[update_exchange(X, Policies) ||
- X <- rabbit_exchange:list(VHost)],
- [update_queue(Q, Policies) ||
- Q <- rabbit_amqqueue:list(VHost)]}
- end
- end),
- [catch notify(X) || X <- Xs],
- [catch notify(Q) || Q <- Qs],
- ok.
-
-update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) ->
- case match(XName, Policies) of
- OldPolicy -> no_change;
- NewPolicy -> case rabbit_exchange:update(
- XName, fun (X0) ->
- rabbit_exchange_decorator:set(
- X0 #exchange{policy = NewPolicy})
- end) of
- #exchange{} = X1 -> {X, X1};
- not_found -> {X, X }
- end
- end.
-
-update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) ->
- case match(QName, Policies) of
- OldPolicy -> no_change;
- NewPolicy -> case rabbit_amqqueue:update(
- QName, fun(Q1) ->
- rabbit_queue_decorator:set(
- Q1#amqqueue{policy = NewPolicy})
- end) of
- #amqqueue{} = Q1 -> {Q, Q1};
- not_found -> {Q, Q }
- end
- end.
-
-notify(no_change)->
- ok;
-notify({X1 = #exchange{}, X2 = #exchange{}}) ->
- rabbit_exchange:policy_changed(X1, X2);
-notify({Q1 = #amqqueue{}, Q2 = #amqqueue{}}) ->
- rabbit_amqqueue:policy_changed(Q1, Q2).
-
-match(Name, Policies) ->
- case lists:sort(fun sort_pred/2, [P || P <- Policies, matches(Name, P)]) of
- [] -> undefined;
- [Policy | _Rest] -> Policy
- end.
-
-matches(#resource{name = Name, kind = Kind, virtual_host = VHost}, Policy) ->
- matches_type(Kind, pget('apply-to', Policy)) andalso
- match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]) andalso
- VHost =:= pget(vhost, Policy).
-
-matches_type(exchange, <<"exchanges">>) -> true;
-matches_type(queue, <<"queues">>) -> true;
-matches_type(exchange, <<"all">>) -> true;
-matches_type(queue, <<"all">>) -> true;
-matches_type(_, _) -> false.
-
-sort_pred(A, B) -> pget(priority, A) >= pget(priority, B).
-
-%%----------------------------------------------------------------------------
-
-policy_validation() ->
- [{<<"priority">>, fun rabbit_parameter_validation:number/2, mandatory},
- {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory},
- {<<"apply-to">>, fun apply_to_validation/2, optional},
- {<<"definition">>, fun validation/2, mandatory}].
-
-validation(_Name, []) ->
- {error, "no policy provided", []};
-validation(_Name, Terms) when is_list(Terms) ->
- {Keys, Modules} = lists:unzip(
- rabbit_registry:lookup_all(policy_validator)),
- [] = dups(Keys), %% ASSERTION
- Validators = lists:zipwith(fun (M, K) -> {M, a2b(K)} end, Modules, Keys),
- case is_proplist(Terms) of
- true -> {TermKeys, _} = lists:unzip(Terms),
- case dups(TermKeys) of
- [] -> validation0(Validators, Terms);
- Dup -> {error, "~p duplicate keys not allowed", [Dup]}
- end;
- false -> {error, "definition must be a dictionary: ~p", [Terms]}
- end;
-validation(_Name, Term) ->
- {error, "parse error while reading policy: ~p", [Term]}.
-
-validation0(Validators, Terms) ->
- case lists:foldl(
- fun (Mod, {ok, TermsLeft}) ->
- ModKeys = proplists:get_all_values(Mod, Validators),
- case [T || {Key, _} = T <- TermsLeft,
- lists:member(Key, ModKeys)] of
- [] -> {ok, TermsLeft};
- Scope -> {Mod:validate_policy(Scope), TermsLeft -- Scope}
- end;
- (_, Acc) ->
- Acc
- end, {ok, Terms}, proplists:get_keys(Validators)) of
- {ok, []} ->
- ok;
- {ok, Unvalidated} ->
- {error, "~p are not recognised policy settings", [Unvalidated]};
- {Error, _} ->
- Error
- end.
-
-a2b(A) -> list_to_binary(atom_to_list(A)).
-
-dups(L) -> L -- lists:usort(L).
-
-is_proplist(L) -> length(L) =:= length([I || I = {_, _} <- L]).
-
-apply_to_validation(_Name, <<"all">>) -> ok;
-apply_to_validation(_Name, <<"exchanges">>) -> ok;
-apply_to_validation(_Name, <<"queues">>) -> ok;
-apply_to_validation(_Name, Term) ->
- {error, "apply-to '~s' unrecognised; should be 'queues', 'exchanges' "
- "or 'all'", [Term]}.