summaryrefslogtreecommitdiff
path: root/src/rabbit_router.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-01-16 11:03:38 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-01-16 11:03:38 +0000
commitb7aca5bbc2d9b7db8200f1939e641a9a7c258a80 (patch)
tree38e1ba99a5ef435317f4c9b69b3b5e022b1cfa61 /src/rabbit_router.erl
parent3dc1d7a21e47e708dc207f202333c9bf895d0828 (diff)
downloadrabbitmq-server-b7aca5bbc2d9b7db8200f1939e641a9a7c258a80.tar.gz
merge rabbit_router:deliver into rabbit_amqqueue:deliverbug24681
Diffstat (limited to 'src/rabbit_router.erl')
-rw-r--r--src/rabbit_router.erl60
1 files changed, 2 insertions, 58 deletions
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 31f5ad14..219833b7 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -18,21 +18,17 @@
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
--export([deliver/2, match_bindings/2, match_routing_key/2]).
+-export([match_bindings/2, match_routing_key/2]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--export_type([routing_key/0, routing_result/0, match_result/0]).
+-export_type([routing_key/0, match_result/0]).
-type(routing_key() :: binary()).
--type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
--type(qpids() :: [pid()]).
-type(match_result() :: [rabbit_types:binding_destination()]).
--spec(deliver/2 :: ([rabbit_amqqueue:name()], rabbit_types:delivery()) ->
- {routing_result(), qpids()}).
-spec(match_bindings/2 :: (rabbit_types:binding_source(),
fun ((rabbit_types:binding()) -> boolean())) ->
match_result()).
@@ -44,38 +40,6 @@
%%----------------------------------------------------------------------------
-deliver([], #delivery{mandatory = false,
- immediate = false}) ->
- %% /dev/null optimisation
- {routed, []};
-
-deliver(QNames, Delivery = #delivery{mandatory = false,
- immediate = false}) ->
- %% optimisation: when Mandatory = false and Immediate = false,
- %% rabbit_amqqueue:deliver will deliver the message to the queue
- %% process asynchronously, and return true, which means all the
- %% QPids will always be returned. It is therefore safe to use a
- %% fire-and-forget cast here and return the QPids - the semantics
- %% is preserved. This scales much better than the non-immediate
- %% case below.
- QPids = lookup_qpids(QNames),
- delegate:invoke_no_result(
- QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
- {routed, QPids};
-
-deliver(QNames, Delivery = #delivery{mandatory = Mandatory,
- immediate = Immediate}) ->
- QPids = lookup_qpids(QNames),
- {Success, _} =
- delegate:invoke(QPids,
- fun (Pid) ->
- rabbit_amqqueue:deliver(Pid, Delivery)
- end),
- {Routed, Handled} =
- lists:foldl(fun fold_deliveries/2, {false, []}, Success),
- check_delivery(Mandatory, Immediate, {Routed, Handled}).
-
-
%% TODO: Maybe this should be handled by a cursor instead.
%% TODO: This causes a full scan for each entry with the same source
match_bindings(SrcName, Match) ->
@@ -104,26 +68,6 @@ match_routing_key(SrcName, [_|_] = RoutingKeys) ->
%%--------------------------------------------------------------------
-fold_deliveries({Pid, true},{_, Handled}) -> {true, [Pid|Handled]};
-fold_deliveries({_, false},{_, Handled}) -> {true, Handled}.
-
-%% check_delivery(Mandatory, Immediate, {WasRouted, QPids})
-check_delivery(true, _ , {false, []}) -> {unroutable, []};
-check_delivery(_ , true, {_ , []}) -> {not_delivered, []};
-check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}.
-
-%% Normally we'd call mnesia:dirty_read/1 here, but that is quite
-%% expensive for the reasons explained in rabbit_misc:dirty_read/1.
-lookup_qpids(QNames) ->
- lists:foldl(fun (QName, QPids) ->
- case ets:lookup(rabbit_queue, QName) of
- [#amqqueue{pid = QPid, slave_pids = SPids}] ->
- [QPid | SPids ++ QPids];
- [] ->
- QPids
- end
- end, [], QNames).
-
%% Normally we'd call mnesia:dirty_select/2 here, but that is quite
%% expensive for the same reasons as above, and, additionally, due to
%% mnesia 'fixing' the table with ets:safe_fixtable/2, which is wholly