summaryrefslogtreecommitdiff
path: root/src/rabbit_router.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@lshift.net>2010-03-18 11:05:59 +0000
committerSimon MacMullen <simon@lshift.net>2010-03-18 11:05:59 +0000
commit087c3b638825322414cad5954ccd6a6fc86ccbac (patch)
treedff5bbcdbd1b49fdf0db1b4c468b166e34ca0751 /src/rabbit_router.erl
parent97d4d5e8c4907af4f902a2200877c9611d69895e (diff)
downloadrabbitmq-server-087c3b638825322414cad5954ccd6a6fc86ccbac.tar.gz
Reimplement rabbit_router in terms of delegate.
Diffstat (limited to 'src/rabbit_router.erl')
-rw-r--r--src/rabbit_router.erl131
1 files changed, 16 insertions, 115 deletions
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 884ea4ab..a8cffbdf 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -33,104 +33,40 @@
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
--behaviour(gen_server2).
-
--export([start_link/0,
- deliver/2,
+-export([deliver/2,
match_bindings/2,
match_routing_key/2]).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--define(SERVER, ?MODULE).
-
-%% cross-node routing optimisation is disabled because of bug 19758.
--define(BUG19758, true).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(deliver/2 :: ([pid()], delivery()) -> {routing_result(), [pid()]}).
-endif.
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
-
--ifdef(BUG19758).
-
-deliver(QPids, Delivery) ->
- check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
- run_bindings(QPids, Delivery)).
-
--else.
-
-deliver(QPids, Delivery) ->
- %% we reduce inter-node traffic by grouping the qpids by node and
- %% only delivering one copy of the message to each node involved,
- %% which then in turn delivers it to its queues.
- deliver_per_node(
- dict:to_list(
- lists:foldl(
- fun (QPid, D) ->
- dict:update(node(QPid),
- fun (QPids1) -> [QPid | QPids1] end,
- [QPid], D)
- end,
- dict:new(), QPids)),
- Delivery).
-
-deliver_per_node([{Node, QPids}], Delivery) when Node == node() ->
- %% optimisation
- check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
- run_bindings(QPids, Delivery));
-deliver_per_node(NodeQPids, Delivery = #delivery{mandatory = false,
- immediate = false}) ->
+deliver(QPids, Delivery = #delivery{mandatory = false,
+ immediate = false}) ->
%% optimisation: when Mandatory = false and Immediate = false,
- %% rabbit_amqqueue:deliver in run_bindings below will deliver the
+ %% rabbit_amqqueue:deliver will deliver the
%% message to the queue process asynchronously, and return true,
%% which means all the QPids will always be returned. It is
%% therefore safe to use a fire-and-forget cast here and return
%% the QPids - the semantics is preserved. This scales much better
%% than the non-immediate case below.
- {routed,
- lists:flatmap(
- fun ({Node, QPids}) ->
- gen_server2:cast({?SERVER, Node}, {deliver, QPids, Delivery}),
- QPids
- end,
- NodeQPids)};
-deliver_per_node(NodeQPids, Delivery) ->
- R = rabbit_misc:upmap(
- fun ({Node, QPids}) ->
- try gen_server2:call({?SERVER, Node},
- {deliver, QPids, Delivery},
- infinity)
- catch
- _Class:_Reason ->
- %% TODO: figure out what to log (and do!) here
- {false, []}
- end
- end,
- NodeQPids),
+ delegate:delegate_async(QPids,
+ fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
+ {routed, QPids};
+
+deliver(QPids, Delivery) ->
+ Res = delegate:delegate_sync(QPids,
+ fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
{Routed, Handled} =
- lists:foldl(fun ({Routed, Handled}, {RoutedAcc, HandledAcc}) ->
- {Routed or RoutedAcc,
- %% we do the concatenation below, which
- %% should be faster
- [Handled | HandledAcc]}
- end,
- {false, []},
- R),
+ lists:foldl(fun fold_deliveries/2, {false, []}, Res),
check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
- {Routed, lists:append(Handled)}).
-
--endif.
+ {Routed, Handled}).
%% TODO: Maybe this should be handled by a cursor instead.
%% TODO: This causes a full scan for each entry with the same exchange
@@ -174,44 +110,9 @@ lookup_qpids(Queues) ->
%%--------------------------------------------------------------------
-init([]) ->
- {ok, no_state}.
-
-handle_call({deliver, QPids, Delivery}, From, State) ->
- spawn(
- fun () ->
- R = run_bindings(QPids, Delivery),
- gen_server2:reply(From, R)
- end),
- {noreply, State}.
-
-handle_cast({deliver, QPids, Delivery}, State) ->
- %% in order to preserve message ordering we must not spawn here
- run_bindings(QPids, Delivery),
- {noreply, State}.
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%--------------------------------------------------------------------
-
-run_bindings(QPids, Delivery) ->
- lists:foldl(
- fun (QPid, {Routed, Handled}) ->
- case catch rabbit_amqqueue:deliver(QPid, Delivery) of
- true -> {true, [QPid | Handled]};
- false -> {true, Handled};
- {'EXIT', _Reason} -> {Routed, Handled}
- end
- end,
- {false, []},
- QPids).
+fold_deliveries({ok, true, Pid}, {_, Handled}) -> {true, [Pid | Handled]};
+fold_deliveries({ok, false, _}, {_, Handled}) -> {true, Handled};
+fold_deliveries({error, _, _}, {Routed, Handled}) -> {Routed, Handled}.
%% check_delivery(Mandatory, Immediate, {WasRouted, QPids})
check_delivery(true, _ , {false, []}) -> {unroutable, []};