summaryrefslogtreecommitdiff
path: root/src/rabbit_router.erl
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-20 10:02:48 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-20 10:02:48 +0100
commit53f116d3b4e82c36277bb365630384292ffc4daf (patch)
treed9768b66c91b2b902e1ea2fa1645fe6e71e63db2 /src/rabbit_router.erl
parent8cc9e4830fcead681caa283299e1820518860186 (diff)
parent8c21276b4c97d4be3060f2195e75d80364fa93ea (diff)
downloadrabbitmq-server-53f116d3b4e82c36277bb365630384292ffc4daf.tar.gz
merge default into bug20284
Diffstat (limited to 'src/rabbit_router.erl')
-rw-r--r--src/rabbit_router.erl64
1 files changed, 34 insertions, 30 deletions
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 6d334362..4a1a08e4 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -39,26 +39,27 @@
-ifdef(use_specs).
--export_type([routing_key/0, routing_result/0]).
+-export_type([routing_key/0, routing_result/0, match_result/0]).
-type(routing_key() :: binary()).
-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
-type(qpids() :: [pid()]).
+-type(match_result() :: [rabbit_types:binding_destination()]).
--spec(deliver/2 ::
- (qpids(), rabbit_types:delivery()) -> {routing_result(), qpids()}).
--spec(match_bindings/2 :: (rabbit_exchange:name(),
+-spec(deliver/2 :: ([rabbit_amqqueue:name()], rabbit_types:delivery()) ->
+ {routing_result(), qpids()}).
+-spec(match_bindings/2 :: (rabbit_types:binding_source(),
fun ((rabbit_types:binding()) -> boolean())) ->
- qpids()).
--spec(match_routing_key/2 :: (rabbit_exchange:name(), routing_key() | '_') ->
- qpids()).
+ match_result()).
+-spec(match_routing_key/2 :: (rabbit_types:binding_source(),
+ routing_key() | '_') -> match_result()).
-endif.
%%----------------------------------------------------------------------------
-deliver(QPids, Delivery = #delivery{mandatory = false,
- immediate = false}) ->
+deliver(QNames, Delivery = #delivery{mandatory = false,
+ immediate = false}) ->
%% optimisation: when Mandatory = false and Immediate = false,
%% rabbit_amqqueue:deliver will deliver the message to the queue
%% process asynchronously, and return true, which means all the
@@ -66,40 +67,43 @@ deliver(QPids, Delivery = #delivery{mandatory = false,
%% fire-and-forget cast here and return the QPids - the semantics
%% is preserved. This scales much better than the non-immediate
%% case below.
+ QPids = lookup_qpids(QNames),
delegate:invoke_no_result(
QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
{routed, QPids};
-deliver(QPids, Delivery = #delivery{mandatory = Mandatory,
+deliver(QNames, Delivery = #delivery{mandatory = Mandatory,
immediate = Immediate}) ->
+ QPids = lookup_qpids(QNames),
{Success, _} = delegate:invoke(
QPids, fun (Pid) ->
rabbit_amqqueue:deliver(Pid, Delivery)
end),
- case check_delivery(Mandatory, Immediate,
- lists:foldl(fun fold_deliveries/2,
- {false, []}, Success)) of
+ {Routed, Handled} =
+ lists:foldl(fun fold_deliveries/2, {false, []}, Success),
+ case check_delivery(Mandatory, Immediate, {Routed, Handled}) of
{routed, Qs} -> {routed, Qs};
O -> O
end.
%% TODO: Maybe this should be handled by a cursor instead.
-%% TODO: This causes a full scan for each entry with the same exchange
-match_bindings(Name, Match) ->
- Query = qlc:q([QName || #route{binding = Binding = #binding{
- exchange_name = XName,
- queue_name = QName}} <-
- mnesia:table(rabbit_route),
- XName == Name,
- Match(Binding)]),
- lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])).
-
-match_routing_key(Name, RoutingKey) ->
- MatchHead = #route{binding = #binding{exchange_name = Name,
- queue_name = '$1',
- key = RoutingKey,
- _ = '_'}},
- lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
+%% TODO: This causes a full scan for each entry with the same source
+match_bindings(SrcName, Match) ->
+ Query = qlc:q([DestinationName ||
+ #route{binding = Binding = #binding{
+ source = SrcName1,
+ destination = DestinationName}} <-
+ mnesia:table(rabbit_route),
+ SrcName == SrcName1,
+ Match(Binding)]),
+ mnesia:async_dirty(fun qlc:e/1, [Query]).
+
+match_routing_key(SrcName, RoutingKey) ->
+ MatchHead = #route{binding = #binding{source = SrcName,
+ destination = '$1',
+ key = RoutingKey,
+ _ = '_'}},
+ mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]).
%%--------------------------------------------------------------------
@@ -117,4 +121,4 @@ lookup_qpids(QNames) ->
[#amqqueue{pid = QPid}] -> [QPid | QPids];
[] -> QPids
end
- end, [], lists:usort(QNames)).
+ end, [], QNames).