diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-20 10:02:48 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-20 10:02:48 +0100 |
commit | 53f116d3b4e82c36277bb365630384292ffc4daf (patch) | |
tree | d9768b66c91b2b902e1ea2fa1645fe6e71e63db2 /src/rabbit_router.erl | |
parent | 8cc9e4830fcead681caa283299e1820518860186 (diff) | |
parent | 8c21276b4c97d4be3060f2195e75d80364fa93ea (diff) | |
download | rabbitmq-server-53f116d3b4e82c36277bb365630384292ffc4daf.tar.gz |
merge default into bug20284
Diffstat (limited to 'src/rabbit_router.erl')
-rw-r--r-- | src/rabbit_router.erl | 64 |
1 files changed, 34 insertions, 30 deletions
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 6d334362..4a1a08e4 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -39,26 +39,27 @@ -ifdef(use_specs). --export_type([routing_key/0, routing_result/0]). +-export_type([routing_key/0, routing_result/0, match_result/0]). -type(routing_key() :: binary()). -type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). -type(qpids() :: [pid()]). +-type(match_result() :: [rabbit_types:binding_destination()]). --spec(deliver/2 :: - (qpids(), rabbit_types:delivery()) -> {routing_result(), qpids()}). --spec(match_bindings/2 :: (rabbit_exchange:name(), +-spec(deliver/2 :: ([rabbit_amqqueue:name()], rabbit_types:delivery()) -> + {routing_result(), qpids()}). +-spec(match_bindings/2 :: (rabbit_types:binding_source(), fun ((rabbit_types:binding()) -> boolean())) -> - qpids()). --spec(match_routing_key/2 :: (rabbit_exchange:name(), routing_key() | '_') -> - qpids()). + match_result()). +-spec(match_routing_key/2 :: (rabbit_types:binding_source(), + routing_key() | '_') -> match_result()). -endif. %%---------------------------------------------------------------------------- -deliver(QPids, Delivery = #delivery{mandatory = false, - immediate = false}) -> +deliver(QNames, Delivery = #delivery{mandatory = false, + immediate = false}) -> %% optimisation: when Mandatory = false and Immediate = false, %% rabbit_amqqueue:deliver will deliver the message to the queue %% process asynchronously, and return true, which means all the @@ -66,40 +67,43 @@ deliver(QPids, Delivery = #delivery{mandatory = false, %% fire-and-forget cast here and return the QPids - the semantics %% is preserved. This scales much better than the non-immediate %% case below. + QPids = lookup_qpids(QNames), delegate:invoke_no_result( QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {routed, QPids}; -deliver(QPids, Delivery = #delivery{mandatory = Mandatory, +deliver(QNames, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate}) -> + QPids = lookup_qpids(QNames), {Success, _} = delegate:invoke( QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), - case check_delivery(Mandatory, Immediate, - lists:foldl(fun fold_deliveries/2, - {false, []}, Success)) of + {Routed, Handled} = + lists:foldl(fun fold_deliveries/2, {false, []}, Success), + case check_delivery(Mandatory, Immediate, {Routed, Handled}) of {routed, Qs} -> {routed, Qs}; O -> O end. %% TODO: Maybe this should be handled by a cursor instead. -%% TODO: This causes a full scan for each entry with the same exchange -match_bindings(Name, Match) -> - Query = qlc:q([QName || #route{binding = Binding = #binding{ - exchange_name = XName, - queue_name = QName}} <- - mnesia:table(rabbit_route), - XName == Name, - Match(Binding)]), - lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])). - -match_routing_key(Name, RoutingKey) -> - MatchHead = #route{binding = #binding{exchange_name = Name, - queue_name = '$1', - key = RoutingKey, - _ = '_'}}, - lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). +%% TODO: This causes a full scan for each entry with the same source +match_bindings(SrcName, Match) -> + Query = qlc:q([DestinationName || + #route{binding = Binding = #binding{ + source = SrcName1, + destination = DestinationName}} <- + mnesia:table(rabbit_route), + SrcName == SrcName1, + Match(Binding)]), + mnesia:async_dirty(fun qlc:e/1, [Query]). + +match_routing_key(SrcName, RoutingKey) -> + MatchHead = #route{binding = #binding{source = SrcName, + destination = '$1', + key = RoutingKey, + _ = '_'}}, + mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]). %%-------------------------------------------------------------------- @@ -117,4 +121,4 @@ lookup_qpids(QNames) -> [#amqqueue{pid = QPid}] -> [QPid | QPids]; [] -> QPids end - end, [], lists:usort(QNames)). + end, [], QNames). |