diff options
Diffstat (limited to 'src/rabbit_router.erl')
-rw-r--r-- | src/rabbit_router.erl | 58 |
1 files changed, 31 insertions, 27 deletions
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 39eac072..00df1ce1 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -39,26 +39,27 @@ -ifdef(use_specs). --export_type([routing_key/0, routing_result/0]). +-export_type([routing_key/0, routing_result/0, match_result/0]). -type(routing_key() :: binary()). -type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). -type(qpids() :: [pid()]). +-type(match_result() :: [rabbit_types:binding_destination()]). --spec(deliver/2 :: - (qpids(), rabbit_types:delivery()) -> {routing_result(), qpids()}). --spec(match_bindings/2 :: (rabbit_exchange:name(), +-spec(deliver/2 :: ([rabbit_amqqueue:name()], rabbit_types:delivery()) -> + {routing_result(), qpids()}). +-spec(match_bindings/2 :: (rabbit_types:binding_source(), fun ((rabbit_types:binding()) -> boolean())) -> - qpids()). --spec(match_routing_key/2 :: (rabbit_exchange:name(), routing_key() | '_') -> - qpids()). + match_result()). +-spec(match_routing_key/2 :: (rabbit_types:binding_source(), + routing_key() | '_') -> match_result()). -endif. %%---------------------------------------------------------------------------- -deliver(QPids, Delivery = #delivery{mandatory = false, - immediate = false}) -> +deliver(QNames, Delivery = #delivery{mandatory = false, + immediate = false}) -> %% optimisation: when Mandatory = false and Immediate = false, %% rabbit_amqqueue:deliver will deliver the message to the queue %% process asynchronously, and return true, which means all the @@ -66,11 +67,13 @@ deliver(QPids, Delivery = #delivery{mandatory = false, %% fire-and-forget cast here and return the QPids - the semantics %% is preserved. This scales much better than the non-immediate %% case below. + QPids = lookup_qpids(QNames), delegate:invoke_no_result( QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {routed, QPids}; -deliver(QPids, Delivery) -> +deliver(QNames, Delivery) -> + QPids = lookup_qpids(QNames), {Success, _} = delegate:invoke(QPids, fun (Pid) -> @@ -82,22 +85,23 @@ deliver(QPids, Delivery) -> {Routed, Handled}). %% TODO: Maybe this should be handled by a cursor instead. -%% TODO: This causes a full scan for each entry with the same exchange -match_bindings(Name, Match) -> - Query = qlc:q([QName || #route{binding = Binding = #binding{ - exchange_name = XName, - queue_name = QName}} <- - mnesia:table(rabbit_route), - XName == Name, - Match(Binding)]), - lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])). - -match_routing_key(Name, RoutingKey) -> - MatchHead = #route{binding = #binding{exchange_name = Name, - queue_name = '$1', - key = RoutingKey, - _ = '_'}}, - lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). +%% TODO: This causes a full scan for each entry with the same source +match_bindings(SrcName, Match) -> + Query = qlc:q([DestinationName || + #route{binding = Binding = #binding{ + source = SrcName1, + destination = DestinationName}} <- + mnesia:table(rabbit_route), + SrcName == SrcName1, + Match(Binding)]), + mnesia:async_dirty(fun qlc:e/1, [Query]). + +match_routing_key(SrcName, RoutingKey) -> + MatchHead = #route{binding = #binding{source = SrcName, + destination = '$1', + key = RoutingKey, + _ = '_'}}, + mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]). %%-------------------------------------------------------------------- @@ -115,4 +119,4 @@ lookup_qpids(QNames) -> [#amqqueue{pid = QPid}] -> [QPid | QPids]; [] -> QPids end - end, [], lists:usort(QNames)). + end, [], QNames). |