summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2013-03-21 10:02:31 +0000
committerEmile Joubert <emile@rabbitmq.com>2013-03-21 10:02:31 +0000
commitecc36c55bfd41a5eaf252057c87f5edc986bc35a (patch)
tree994f768b942a464356630a348950ad57a56e915a
parentf593d0719d3bfcbf61c607a40d8f5a9767bc3836 (diff)
downloadrabbitmq-server-ecc36c55bfd41a5eaf252057c87f5edc986bc35a.tar.gz
Exchange decorator routing improvements
-rw-r--r--src/rabbit_exchange.erl6
-rw-r--r--src/rabbit_exchange_decorator.erl7
-rw-r--r--src/rabbit_registry.erl21
3 files changed, 24 insertions, 10 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index a4a88661..a3b32c7b 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -335,7 +335,7 @@ route1(Delivery, Decorators,
{[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) ->
ExchangeDests = (type_to_module(Type)):route(X, Delivery),
AlternateDests = process_alternate(X, ExchangeDests),
- DecorateDests = process_decorators(Delivery, Decorators, X),
+ DecorateDests = process_decorators(X, Decorators, Delivery),
route1(Delivery, Decorators,
lists:foldl(fun process_route/2, {WorkList, SeenXs, QNames},
AlternateDests ++ DecorateDests ++ ExchangeDests)).
@@ -350,9 +350,9 @@ process_alternate(#exchange{name = XName, arguments = Args}, []) ->
process_alternate(_X, _Results) ->
[].
-process_decorators(_Delivery, [], _X) ->
+process_decorators(_, [], _) ->
[];
-process_decorators(Delivery, Decorators, X) ->
+process_decorators(X, Decorators, Delivery) ->
lists:append([Decorator:route(X, Delivery) || Decorator <- Decorators]).
process_route(#resource{kind = exchange} = XName,
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index bf4add73..d8600835 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -57,6 +57,13 @@
-callback remove_bindings(serial(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok'.
+%% %% called after exchange routing
+%% %% return value is a list of queues to be added to the list of
+%% %% destination queues. decorators must register separately for
+%% %% this callback using exchange_decorator_route.
+%% -callback route(rabbit_types:exchange(), rabbit_types:delivery()) ->
+%% [rabbit_amqqueue:name()].
+
-else.
-export([behaviour_info/1]).
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 41b82ba5..22700a7c 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -95,20 +95,27 @@ internal_unregister(Class, TypeName) ->
true = ets:delete(?ETS_NAME, UnregArg),
ok.
-conditional_register({{exchange_decorator, _Type}, ModuleName} = RegArg) ->
+
+%% (un)register exchange decorator route callback only when implemented
+%% to avoid decorators being called unnecessarily on the fast publishing path
+conditional_register({{exchange_decorator, _Type}, ModuleName}) ->
case erlang:function_exported(ModuleName, route, 2) of
- true -> true = ets:insert(?ETS_NAME, RegArg);
+ true -> true = ets:insert(?ETS_NAME,
+ {{exchange_decorator_route, _Type},
+ ModuleName});
false -> ok
end;
conditional_register(_) ->
ok.
-conditional_unregister({exchange_decorator, Type} = UnregArg) ->
+conditional_unregister({exchange_decorator, Type}) ->
case lookup_module(exchange_decorator, Type) of
- {ok, ModName} -> case erlang:function_exported(ModName, route, 2) of
- true -> true = ets:delete(?ETS_NAME, UnregArg);
- false -> ok
- end;
+ {ok, ModName} ->
+ case erlang:function_exported(ModName, route, 2) of
+ true -> true = ets:delete(?ETS_NAME,
+ {exchange_decorator_route, Type});
+ false -> ok
+ end;
{error, not_found} ->
ok
end;