summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2013-04-12 17:19:33 +0100
committerEmile Joubert <emile@rabbitmq.com>2013-04-12 17:19:33 +0100
commit6ed406a98be9ed57bdc6930f226aaa9032c77b80 (patch)
tree0d6a2e8898ea83efc4da57e61fc086347d7be58f
parent13d5ac8c71cea4835be5e6d42e66d5c6cf36b20f (diff)
parentd28654b6d985242ea3d6fc80c426f1d89dd59b1d (diff)
downloadrabbitmq-server-6ed406a98be9ed57bdc6930f226aaa9032c77b80.tar.gz
Merge default into bug25494
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_exchange.erl40
-rw-r--r--src/rabbit_exchange_decorator.erl38
-rw-r--r--src/rabbit_policy.erl16
-rw-r--r--src/rabbit_tests.erl5
-rw-r--r--src/rabbit_upgrade_functions.erl16
6 files changed, 81 insertions, 36 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index eeee799e..4282755d 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -40,7 +40,7 @@
-record(resource, {virtual_host, kind, name}).
-record(exchange, {name, type, durable, auto_delete, internal, arguments,
- scratches, policy}).
+ scratches, policy, decorators}).
-record(exchange_serial, {name, next}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 9e98448d..c924f53a 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -115,23 +115,27 @@ recover() ->
rabbit_durable_exchange),
[XName || #exchange{name = XName} <- Xs].
-callback(X = #exchange{type = XType}, Fun, Serial0, Args) ->
+callback(X = #exchange{type = XType,
+ decorators = Decorators}, Fun, Serial0, Args) ->
Serial = if is_function(Serial0) -> Serial0;
is_atom(Serial0) -> fun (_Bool) -> Serial0 end
end,
[ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) ||
- M <- registry_lookup(exchange_decorator)],
+ M <- rabbit_exchange_decorator:select(all, Decorators)],
Module = type_to_module(XType),
apply(Module, Fun, [Serial(Module:serialise_events()) | Args]).
-policy_changed(X = #exchange{type = XType}, X1) ->
+policy_changed(X = #exchange{type = XType,
+ decorators = Decorators}, X1) ->
[ok = M:policy_changed(X, X1) ||
- M <- [type_to_module(XType) | registry_lookup(exchange_decorator)]],
+ M <- [type_to_module(XType) |
+ rabbit_exchange_decorator:select(all, Decorators)]],
ok.
-serialise_events(X = #exchange{type = Type}) ->
- lists:any(fun (M) -> M:serialise_events(X) end,
- registry_lookup(exchange_decorator))
+serialise_events(X = #exchange{type = Type, decorators = Decorators}) ->
+ lists:any(fun (M) ->
+ M:serialise_events(X)
+ end, rabbit_exchange_decorator:select(all, Decorators))
orelse (type_to_module(Type)):serialise_events().
serial(#exchange{name = XName} = X) ->
@@ -143,16 +147,6 @@ serial(#exchange{name = XName} = X) ->
(false) -> none
end.
-registry_lookup(exchange_decorator_route = Class) ->
- case get(exchange_decorator_route_modules) of
- undefined -> Mods = [M || {_, M} <- rabbit_registry:lookup_all(Class)],
- put(exchange_decorator_route_modules, Mods),
- Mods;
- Mods -> Mods
- end;
-registry_lookup(Class) ->
- [M || {_, M} <- rabbit_registry:lookup_all(Class)].
-
declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
X = rabbit_policy:set(#exchange{name = XName,
type = Type,
@@ -318,15 +312,15 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
-route(#exchange{name = #resource{virtual_host = VHost,
- name = RName} = XName} = X,
+route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName,
+ decorators = Decorators} = X,
#delivery{message = #basic_message{routing_keys = RKs}} = Delivery) ->
- case {registry_lookup(exchange_decorator_route), RName == <<"">>} of
- {[], true} ->
+ case {RName, rabbit_exchange_decorator:select(route, Decorators)} of
+ {<<"">>, []} ->
%% Optimisation
[rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)];
- {Decorators, _} ->
- lists:usort(route1(Delivery, Decorators, {[X], XName, []}))
+ {_, SelectedDecorators} ->
+ lists:usort(route1(Delivery, SelectedDecorators, {[X], XName, []}))
end.
route1(_, _, {[], _, QNames}) ->
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index 040b55db..3bc9de1e 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -16,6 +16,10 @@
-module(rabbit_exchange_decorator).
+-include("rabbit.hrl").
+
+-export([select/2, set/1]).
+
%% This is like an exchange type except that:
%%
%% 1) It applies to all exchanges as soon as it is installed, therefore
@@ -57,10 +61,13 @@
-callback remove_bindings(serial(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok'.
-%% Decorators can optionally implement route/2 which allows additional
-%% destinations to be added to the routing decision.
-%% -callback route(rabbit_types:exchange(), rabbit_types:delivery()) ->
-%% [rabbit_amqqueue:name() | rabbit_exchange:name()].
+%% Allows additional destinations to be added to the routing decision.
+-callback route(rabbit_types:exchange(), rabbit_types:delivery()) ->
+ [rabbit_amqqueue:name() | rabbit_exchange:name()] | ok.
+
+%% Whether the decorator wishes to receive callbacks for the exchange
+%% none:no callbacks, noroute:all callbacks except route, all:all callbacks
+-callback active_for(rabbit_types:exchange()) -> 'none' | 'noroute' | 'all'.
-else.
@@ -68,8 +75,29 @@
behaviour_info(callbacks) ->
[{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3},
- {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}];
+ {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3},
+ {route, 2}, {active_for, 1}];
behaviour_info(_Other) ->
undefined.
-endif.
+
+%%----------------------------------------------------------------------------
+
+list() -> [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)].
+
+%% select a subset of active decorators
+select(all, {Route, NoRoute}) -> Route ++ NoRoute;
+select(route, {Route, _NoRoute}) -> Route.
+
+set(X) ->
+ X#exchange{
+ decorators =
+ lists:foldl(fun (D, {Route, NoRoute}) ->
+ Callbacks = D:active_for(X),
+ {cons_if_eq(all, Callbacks, D, Route),
+ cons_if_eq(noroute, Callbacks, D, NoRoute)}
+ end, {[], []}, list())}.
+
+cons_if_eq(Select, Select, Item, List) -> [Item | List];
+cons_if_eq(_Select, _Other, _Item, List) -> List.
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 7398cd2d..d276c2fb 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -46,7 +46,8 @@ name0(undefined) -> none;
name0(Policy) -> pget(name, Policy).
set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
-set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}.
+set(X = #exchange{name = Name}) -> rabbit_exchange_decorator:set(
+ X#exchange{policy = set0(Name)}).
set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)).
@@ -169,10 +170,15 @@ update_policies(VHost) ->
update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) ->
case match(XName, Policies) of
- OldPolicy -> no_change;
- NewPolicy -> rabbit_exchange:update(
- XName, fun(X1) -> X1#exchange{policy = NewPolicy} end),
- {X, X#exchange{policy = NewPolicy}}
+ OldPolicy ->
+ no_change;
+ NewPolicy ->
+ rabbit_exchange:update(
+ XName, fun(X1) ->
+ rabbit_exchange_decorator:set(
+ X1#exchange{policy = NewPolicy})
+ end),
+ {X, X#exchange{policy = NewPolicy}}
end.
update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index e7b69879..27b588d1 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -563,8 +563,9 @@ test_topic_matching() ->
XName = #resource{virtual_host = <<"/">>,
kind = exchange,
name = <<"test_exchange">>},
- X = #exchange{name = XName, type = topic, durable = false,
- auto_delete = false, arguments = []},
+ X0 = #exchange{name = XName, type = topic, durable = false,
+ auto_delete = false, arguments = []},
+ X = rabbit_exchange_decorator:set(X0),
%% create
rabbit_exchange_type_topic:validate(X),
exchange_op_callback(X, create, []),
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 457b1567..b7b1635b 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -43,6 +43,7 @@
-rabbit_upgrade({sync_slave_pids, mnesia, [policy]}).
-rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}).
-rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}).
+-rabbit_upgrade({exchange_decorators, mnesia, [policy]}).
%% -------------------------------------------------------------------
@@ -68,6 +69,7 @@
-spec(sync_slave_pids/0 :: () -> 'ok').
-spec(no_mirror_nodes/0 :: () -> 'ok').
-spec(gm_pids/0 :: () -> 'ok').
+-spec(exchange_decorators/0 :: () -> 'ok').
-endif.
@@ -282,6 +284,20 @@ gm_pids() ->
|| T <- Tables],
ok.
+exchange_decorators() ->
+ ok = exchange_decorators(rabbit_exchange),
+ ok = exchange_decorators(rabbit_durable_exchange).
+
+exchange_decorators(Table) ->
+ transform(
+ Table,
+ fun ({exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches,
+ Policy}) ->
+ {exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches, Policy,
+ {[], []}}
+ end,
+ [name, type, durable, auto_delete, internal, arguments, scratches, policy,
+ decorators]).
%%--------------------------------------------------------------------